DAG
BaseDAG
dataclass
¶
Bases: Generic[P, RVDAG]
Data Structure containing ExecNodes with interdependencies.
Please do not instantiate this class directly. Use the decorator @dag
instead.
The ExecNodes can be executed in parallel with the following restrictions:
* Limited number of threads.
* Parallelization constraint of each ExecNode (is_sequential attribute)
* Priority of each ExecNode (priority attribute)
* Specific Resource per ExecNode (resource attribute)
This Class has two flavors:
* DAG: for synchronous execution
* AsyncDAG: for asynchronous execution
Parameters:
Name | Type | Description | Default |
---|---|---|---|
exec_nodes |
StrictDict[Identifier, ExecNode]
|
all the ExecNodes |
required |
input_uxns |
List[UsageExecNode]
|
all the input UsageExecNodes |
required |
return_uxns |
ReturnUXNsType
|
the return UsageExecNodes of various types: None, a single value, tuple, list, dict. |
required |
max_concurrency |
int
|
the maximal number of threads running in parallel |
1
|
config_from_dict(config)
¶
Allows reconfiguring the parameters of the nodes from a dictionary.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config |
Dict[str, Any]
|
the dictionary containing the config example: {"nodes": {"a": {"priority": 3, "is_sequential": True}}, "max_concurrency": 3} |
required |
Raises:
Type | Description |
---|---|
ValueError
|
if two nodes are configured by the provided config (which is ambiguous) |
config_from_json(config_path)
¶
Allows reconfiguring the parameters of the nodes from a JSON file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config_path |
str
|
the path to the JSON file |
required |
config_from_yaml(config_path)
¶
Allows reconfiguring the parameters of the nodes from a YAML file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config_path |
str
|
the path to the YAML file |
required |
get_node_by_id(node_id)
¶
Get the ExecNode with the given id.
the returned ExecNode is not modified by any execution!
This means that you can not get the result of its execution via DAG.get_node_by_id(<id>).result
.
In order to do that, you need to make a DAGExecution and then call
DAGExecution.get_node_by_id(<id>).result
, which will contain the results.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
node_id |
Identifier
|
id of the ExecNode |
required |
Returns:
Name | Type | Description |
---|---|---|
ExecNode |
ExecNode
|
corresponding ExecNode |
get_nodes_by_tag(tag)
¶
Get the ExecNodes with the given tag.
the returned ExecNode is not modified by any execution!
This means that you can not get the result of its execution via DAG.get_nodes_by_tag(<tag>).result
.
In order to do that, you need to make a DAGExecution and then call
DAGExecution.get_nodes_by_tag(<tag>).result
, which will contain the results.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
tag |
Any
|
tag of the ExecNodes |
required |
Returns:
Type | Description |
---|---|
List[ExecNode]
|
List[ExecNode]: corresponding ExecNodes |
compose(qualname, inputs, outputs, is_async=None, **kwargs)
¶
Compose a new DAG using inputs and outputs ExecNodes (Experimental).
All provided Alias
es must point to unique ExecNode
s. Otherwise ValueError is raised
The user is responsible to correctly specify inputs and outputs signature of the DAG
.
* The inputs can be specified as a single Alias
or a Sequence
of Alias
es.
* The outputs can be specified as a single Alias
(a single value is returned)
or a Sequence
of Alias
es in which case a Tuple of the values are returned.
If outputs are specified as [], () is returned.
The syntax is the following:
>>> from tawazi import dag, xn, DAG
>>> from typing import Tuple, Any
>>> @xn
... def unwanted_xn() -> int: return 42
>>> @xn
... def x(v: Any) -> int: return int(v)
>>> @xn
... def y(v: Any) -> str: return str(v)
>>> @xn
... def z(x: int, y: str) -> float: return float(x) + float(y)
>>> @dag
... def pipe() -> Tuple[int, float, int]:
... a = unwanted_xn()
... res = z(x(1), y(1))
... b = unwanted_xn()
... return a, res, b
>>> composed_dag = pipe.compose("twinkle", [x, y], z)
>>> assert composed_dag(1, 1) == 2.0
>>> # composed_dag: DAG[[int, str], float] = pipe.compose([x, y], [z]) # optional typing of the returned DAG!
>>> # assert composed_dag(1, 1) == 2.0 # type checked!
Parameters:
Name | Type | Description | Default |
---|---|---|---|
qualname |
str
|
the name of the composed DAG |
required |
inputs |
(ellipsis, Alias | List[Alias])
|
the Inputs nodes whose results are provided. Provide ... to specify that you will provide every argument of the original DAG. |
required |
outputs |
Alias | List[Alias]
|
the Output nodes that must execute last, The ones that will generate results |
required |
is_async |
bool | None
|
if True, the composed DAG will be an AsyncDAG, if False, it will be a DAG. Defaults to whatever the original DAG is. |
None
|
**kwargs |
Dict[str, Any]
|
additional arguments to be passed to the DAG's constructor |
{}
|
DAG
dataclass
¶
Bases: BaseDAG[P, RVDAG]
SyncDAG implementation of the BaseDAG.
__call__(*args, **kwargs)
¶
Execute the DAG scheduler via a similar interface to the function that describes the dependencies.
Note: Currently kwargs are not supported.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
*args |
args
|
arguments to be passed to the call of the DAG |
()
|
**kwargs |
kwargs
|
keyword arguments to be passed to the call of the DAG |
{}
|
Returns:
Name | Type | Description |
---|---|---|
RVDAG |
RVDAG
|
return value of the DAG's execution |
Raises:
Type | Description |
---|---|
TawaziUsageError
|
kwargs are passed |
executor(target_nodes=None, exclude_nodes=None, root_nodes=None, cache_deps_of=None, cache_in='', from_cache='')
¶
Generates a DAGExecution for the DAG.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
target_nodes |
Optional[Sequence[Alias]]
|
the nodes to execute, excluding all nodes that can be excluded |
None
|
exclude_nodes |
Optional[Sequence[Alias]]
|
the nodes to exclude from the execution |
None
|
root_nodes |
Optional[Sequence[Alias]]
|
these nodes and their children will be included in the execution |
None
|
cache_deps_of |
Optional[Sequence[Alias]]
|
which nodes to cache the dependencies of |
None
|
cache_in |
str
|
the path to the file where to cache |
''
|
from_cache |
str
|
the cache |
''
|
Returns:
Type | Description |
---|---|
DAGExecution[P, RVDAG]
|
the DAGExecution object associated with the dag |
setup(target_nodes=None, exclude_nodes=None, root_nodes=None)
¶
Run the setup ExecNodes for the DAG.
If target_nodes are provided, run only the necessary setup ExecNodes, otherwise will run all setup ExecNodes.
NOTE: DAG
arguments should not be passed to setup ExecNodes.
Only pass in constants or setup ExecNode
s results.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
target_nodes |
Optional[List[XNId]]
|
The ExecNodes that the user aims to use in the DAG. This might include setup or non setup ExecNodes. If None is provided, will run all setup ExecNodes. Defaults to None. |
None
|
exclude_nodes |
Optional[List[XNId]]
|
The ExecNodes that the user aims to exclude from the DAG. The user is responsible for ensuring that the overlapping between the target_nodes and exclude_nodes is logical. |
None
|
root_nodes |
Optional[List[XNId]]
|
The ExecNodes that the user aims to select as ancestor nodes. The user is responsible for ensuring that the overlapping between the target_nodes, the exclude_nodes and the root nodes is logical. |
None
|
AsyncDAG
dataclass
¶
Bases: BaseDAG[P, RVDAG]
__call__(*args, **kwargs)
async
¶
Execute the DAG scheduler via a similar interface to the function that describes the dependencies.
Note: Currently kwargs are not supported.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
*args |
args
|
arguments to be passed to the call of the DAG |
()
|
**kwargs |
kwargs
|
keyword arguments to be passed to the call of the DAG |
{}
|
Returns:
Name | Type | Description |
---|---|---|
RVDAG |
RVDAG
|
return value of the DAG's execution |
Raises:
Type | Description |
---|---|
TawaziUsageError
|
kwargs are passed |
executor(target_nodes=None, exclude_nodes=None, root_nodes=None, cache_deps_of=None, cache_in='', from_cache='')
¶
Generates a AsyncDAGExecution for the current AsyncDAG.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
target_nodes |
Optional[Sequence[Alias]]
|
the nodes to execute, excluding all nodes that can be excluded |
None
|
exclude_nodes |
Optional[Sequence[Alias]]
|
the nodes to exclude from the execution |
None
|
root_nodes |
Optional[Sequence[Alias]]
|
these nodes and their children will be included in the execution |
None
|
cache_deps_of |
Optional[Sequence[Alias]]
|
which nodes to cache the dependencies of |
None
|
cache_in |
str
|
the path to the file where to cache |
''
|
from_cache |
str
|
the cache |
''
|
Returns:
Type | Description |
---|---|
AsyncDAGExecution[P, RVDAG]
|
the DAGExecution object associated with the dag |
setup(target_nodes=None, exclude_nodes=None, root_nodes=None)
async
¶
Run the setup ExecNodes for the DAG.
If target_nodes are provided, run only the necessary setup ExecNodes, otherwise will run all setup ExecNodes.
NOTE: DAG
arguments should not be passed to setup ExecNodes.
Only pass in constants or setup ExecNode
s results.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
target_nodes |
Optional[List[XNId]]
|
The ExecNodes that the user aims to use in the DAG. This might include setup or non setup ExecNodes. If None is provided, will run all setup ExecNodes. Defaults to None. |
None
|
exclude_nodes |
Optional[List[XNId]]
|
The ExecNodes that the user aims to exclude from the DAG. The user is responsible for ensuring that the overlapping between the target_nodes and exclude_nodes is logical. |
None
|
root_nodes |
Optional[List[XNId]]
|
The ExecNodes that the user aims to select as ancestor nodes. The user is responsible for ensuring that the overlapping between the target_nodes, the exclude_nodes and the root nodes is logical. |
None
|