Skip to content

decorators

_decorators

Decorators of Tawazi.

The user should use the decorators @dag and @xn to create Tawazi objects DAG and ExecNode.

xn(func=None, *, priority=0, is_sequential=cfg.TAWAZI_IS_SEQUENTIAL, debug=False, tag=None, setup=False, unpack_to=None, resource=cfg.TAWAZI_DEFAULT_RESOURCE)

Decorate a normal function to make it an ExecNode.

When the decorated function is called inside a DAG, you are actually calling an ExecNode. This way we can record the dependencies in order to build the actual DAG. Please check the example in the README for a guide to the usage.

Parameters:

Name Type Description Default
func [Callable[P, RVXN]

a Callable that will be executed in the DAG

None
priority int

priority of the execution with respect to other ExecNodes

0
is_sequential bool

whether to allow the execution of this ExecNode with others or not. If True, all other ExecNode currently running will stop before this one starts executing.

TAWAZI_IS_SEQUENTIAL
debug bool

if True, will execute only when Debug mode is active. a debug ExecNode will run its inputs exists regardless of subgraph choice.

False
tag Optional[TagOrTags]

a str or Tuple[str] to tag this ExecNode. If Tuple[str] is given, every value of the tuple is used as tag. Notice that multiple ExecNodes can have the same tag.

None
setup bool

if True, will be executed only once during the lifetime of a DAG instance. Setup ExecNodes are meant to be used to load heavy data only once inside the execution pipeline and then be used as if the results of their execution were cached. This can be useful if you want to load heavy ML models, heavy Data etc. Note that you can run all / subset of the setup nodes by invoking the DAG.setup method NOTE setup nodes are currently not threadsafe! because they are shared between all threads! If you execute the same pipeline in multiple threads during the setup phase, the behavior is undefined. It is best to invoke the DAG.setup method before using the DAG in a multithreaded environment. This problem will be resolved in the future

False
unpack_to Optional[int]

if not None, this ExecNode's execution must return unpacked results corresponding to the given value

None
resource str

the resource to use to execute this ExecNode. Defaults to "thread".

TAWAZI_DEFAULT_RESOURCE

Returns:

Name Type Description
LazyExecNode Union[Callable[[Callable[P, RVXN]], LazyExecNode[P, RVXN]], LazyExecNode[P, RVXN]]

The decorated function wrapped in an ExecNode.

Raises:

Type Description
TypeError

If the decorated function passed is not a Callable.

dag(declare_dag_function=None, *, max_concurrency=1, is_async=False)

Transform the declared ExecNodes into a DAG that can be executed by Tawazi's scheduler.

The same DAG can be executed multiple times. Note: dag is thread safe because it uses an internal lock. If you need to construct lots of DAGs in multiple threads, it is best to construct your dag once and then use it as much as you like. Please check the example in the README for a guide to the usage.

Parameters:

Name Type Description Default
declare_dag_function Optional[Callable[P, RVDAG]]

a function that describes the execution of the DAG. This function should only contain calls to ExecNodes and data Exchange between them. (i.e. You can not use a normal Python function inside it unless decorated with @xn.) However, you can use some simple python code to generate constants. These constants are computed only once during the DAG declaration.

None
max_concurrency int

the maximum number of concurrent threads to execute in parallel.

1
is_async bool

if True, the returned object will be an AsyncDAG instead of a DAG.

False

Returns:

Type Description
Union[DAG[P, RVDAG], AsyncDAG[P, RVDAG], Callable[[Callable[P, RVDAG]], Union[DAG[P, RVDAG], AsyncDAG[P, RVDAG]]]]

a DAG instance that can be used just like a normal Python function. It will be executed by Tawazi's scheduler.

Raises:

Type Description
TypeError

If the decorated object is not a Callable.