Skip to content

DAG

BaseDAG dataclass

Bases: Generic[P, RVDAG]

Data Structure containing ExecNodes with interdependencies.

Please do not instantiate this class directly. Use the decorator @dag instead. The ExecNodes can be executed in parallel with the following restrictions: * Limited number of threads. * Parallelization constraint of each ExecNode (is_sequential attribute) * Priority of each ExecNode (priority attribute) * Specific Resource per ExecNode (resource attribute) This Class has two flavors: * DAG: for synchronous execution * AsyncDAG: for asynchronous execution

Parameters:

Name Type Description Default
exec_nodes StrictDict[Identifier, ExecNode]

all the ExecNodes

required
input_uxns List[UsageExecNode]

all the input UsageExecNodes

required
return_uxns ReturnUXNsType

the return UsageExecNodes of various types: None, a single value, tuple, list, dict.

required
max_concurrency int

the maximal number of threads running in parallel

1

config_from_dict(config)

Allows reconfiguring the parameters of the nodes from a dictionary.

Parameters:

Name Type Description Default
config Dict[str, Any]

the dictionary containing the config example: {"nodes": {"a": {"priority": 3, "is_sequential": True}}, "max_concurrency": 3}

required

Raises:

Type Description
ValueError

if two nodes are configured by the provided config (which is ambiguous)

config_from_json(config_path)

Allows reconfiguring the parameters of the nodes from a JSON file.

Parameters:

Name Type Description Default
config_path str

the path to the JSON file

required

config_from_yaml(config_path)

Allows reconfiguring the parameters of the nodes from a YAML file.

Parameters:

Name Type Description Default
config_path str

the path to the YAML file

required

get_node_by_id(node_id)

Get the ExecNode with the given id.

the returned ExecNode is not modified by any execution!

This means that you can not get the result of its execution via DAG.get_node_by_id(<id>).result. In order to do that, you need to make a DAGExecution and then call DAGExecution.get_node_by_id(<id>).result, which will contain the results.

Parameters:

Name Type Description Default
node_id Identifier

id of the ExecNode

required

Returns:

Name Type Description
ExecNode ExecNode

corresponding ExecNode

get_nodes_by_tag(tag)

Get the ExecNodes with the given tag.

the returned ExecNode is not modified by any execution!

This means that you can not get the result of its execution via DAG.get_nodes_by_tag(<tag>).result. In order to do that, you need to make a DAGExecution and then call DAGExecution.get_nodes_by_tag(<tag>).result, which will contain the results.

Parameters:

Name Type Description Default
tag Any

tag of the ExecNodes

required

Returns:

Type Description
List[ExecNode]

List[ExecNode]: corresponding ExecNodes

compose(qualname, inputs, outputs, is_async=None, **kwargs)

Compose a new DAG using inputs and outputs ExecNodes (Experimental).

All provided Aliases must point to unique ExecNodes. Otherwise ValueError is raised The user is responsible to correctly specify inputs and outputs signature of the DAG. * The inputs can be specified as a single Alias or a Sequence of Aliases. * The outputs can be specified as a single Alias (a single value is returned) or a Sequence of Aliases in which case a Tuple of the values are returned. If outputs are specified as [], () is returned. The syntax is the following:

>>> from tawazi import dag, xn, DAG
>>> from typing import Tuple, Any
>>> @xn
... def unwanted_xn() -> int: return 42
>>> @xn
... def x(v: Any) -> int: return int(v)
>>> @xn
... def y(v: Any) -> str: return str(v)
>>> @xn
... def z(x: int, y: str) -> float: return float(x) + float(y)
>>> @dag
... def pipe() -> Tuple[int, float, int]:
...     a = unwanted_xn()
...     res = z(x(1), y(1))
...     b = unwanted_xn()
...     return a, res, b
>>> composed_dag = pipe.compose("twinkle", [x, y], z)
>>> assert composed_dag(1, 1) == 2.0
>>> # composed_dag: DAG[[int, str], float] = pipe.compose([x, y], [z])  # optional typing of the returned DAG!
>>> # assert composed_dag(1, 1) == 2.0  # type checked!

Parameters:

Name Type Description Default
qualname str

the name of the composed DAG

required
inputs (ellipsis, Alias | List[Alias])

the Inputs nodes whose results are provided. Provide ... to specify that you will provide every argument of the original DAG.

required
outputs Alias | List[Alias]

the Output nodes that must execute last, The ones that will generate results

required
is_async bool | None

if True, the composed DAG will be an AsyncDAG, if False, it will be a DAG. Defaults to whatever the original DAG is.

None
**kwargs Dict[str, Any]

additional arguments to be passed to the DAG's constructor

{}

DAG dataclass

Bases: BaseDAG[P, RVDAG]

SyncDAG implementation of the BaseDAG.

__call__(*args, **kwargs)

Execute the DAG scheduler via a similar interface to the function that describes the dependencies.

Note: Currently kwargs are not supported.

Parameters:

Name Type Description Default
*args args

arguments to be passed to the call of the DAG

()
**kwargs kwargs

keyword arguments to be passed to the call of the DAG

{}

Returns:

Name Type Description
RVDAG RVDAG

return value of the DAG's execution

Raises:

Type Description
TawaziUsageError

kwargs are passed

executor(target_nodes=None, exclude_nodes=None, root_nodes=None, cache_deps_of=None, cache_in='', from_cache='')

Generates a DAGExecution for the DAG.

Parameters:

Name Type Description Default
target_nodes Optional[Sequence[Alias]]

the nodes to execute, excluding all nodes that can be excluded

None
exclude_nodes Optional[Sequence[Alias]]

the nodes to exclude from the execution

None
root_nodes Optional[Sequence[Alias]]

these nodes and their children will be included in the execution

None
cache_deps_of Optional[Sequence[Alias]]

which nodes to cache the dependencies of

None
cache_in str

the path to the file where to cache

''
from_cache str

the cache

''

Returns:

Type Description
DAGExecution[P, RVDAG]

the DAGExecution object associated with the dag

setup(target_nodes=None, exclude_nodes=None, root_nodes=None)

Run the setup ExecNodes for the DAG.

If target_nodes are provided, run only the necessary setup ExecNodes, otherwise will run all setup ExecNodes. NOTE: DAG arguments should not be passed to setup ExecNodes. Only pass in constants or setup ExecNodes results.

Parameters:

Name Type Description Default
target_nodes Optional[List[XNId]]

The ExecNodes that the user aims to use in the DAG. This might include setup or non setup ExecNodes. If None is provided, will run all setup ExecNodes. Defaults to None.

None
exclude_nodes Optional[List[XNId]]

The ExecNodes that the user aims to exclude from the DAG. The user is responsible for ensuring that the overlapping between the target_nodes and exclude_nodes is logical.

None
root_nodes Optional[List[XNId]]

The ExecNodes that the user aims to select as ancestor nodes. The user is responsible for ensuring that the overlapping between the target_nodes, the exclude_nodes and the root nodes is logical.

None

AsyncDAG dataclass

Bases: BaseDAG[P, RVDAG]

__call__(*args, **kwargs) async

Execute the DAG scheduler via a similar interface to the function that describes the dependencies.

Note: Currently kwargs are not supported.

Parameters:

Name Type Description Default
*args args

arguments to be passed to the call of the DAG

()
**kwargs kwargs

keyword arguments to be passed to the call of the DAG

{}

Returns:

Name Type Description
RVDAG RVDAG

return value of the DAG's execution

Raises:

Type Description
TawaziUsageError

kwargs are passed

executor(target_nodes=None, exclude_nodes=None, root_nodes=None, cache_deps_of=None, cache_in='', from_cache='')

Generates a AsyncDAGExecution for the current AsyncDAG.

Parameters:

Name Type Description Default
target_nodes Optional[Sequence[Alias]]

the nodes to execute, excluding all nodes that can be excluded

None
exclude_nodes Optional[Sequence[Alias]]

the nodes to exclude from the execution

None
root_nodes Optional[Sequence[Alias]]

these nodes and their children will be included in the execution

None
cache_deps_of Optional[Sequence[Alias]]

which nodes to cache the dependencies of

None
cache_in str

the path to the file where to cache

''
from_cache str

the cache

''

Returns:

Type Description
AsyncDAGExecution[P, RVDAG]

the DAGExecution object associated with the dag

setup(target_nodes=None, exclude_nodes=None, root_nodes=None) async

Run the setup ExecNodes for the DAG.

If target_nodes are provided, run only the necessary setup ExecNodes, otherwise will run all setup ExecNodes. NOTE: DAG arguments should not be passed to setup ExecNodes. Only pass in constants or setup ExecNodes results.

Parameters:

Name Type Description Default
target_nodes Optional[List[XNId]]

The ExecNodes that the user aims to use in the DAG. This might include setup or non setup ExecNodes. If None is provided, will run all setup ExecNodes. Defaults to None.

None
exclude_nodes Optional[List[XNId]]

The ExecNodes that the user aims to exclude from the DAG. The user is responsible for ensuring that the overlapping between the target_nodes and exclude_nodes is logical.

None
root_nodes Optional[List[XNId]]

The ExecNodes that the user aims to select as ancestor nodes. The user is responsible for ensuring that the overlapping between the target_nodes, the exclude_nodes and the root nodes is logical.

None