Skip to content


Bases: Generic[P, RVDAG]

A disposable callable instance of a DAG.

It holds information about the last execution. Hence it is not threadsafe. It might be reusable, however it is not recommended to reuse an instance of DAGExecutor!.

Source code in tawazi/_dag/
class DAGExecution(Generic[P, RVDAG]):
    """A disposable callable instance of a DAG.

    It holds information about the last execution. Hence it is not threadsafe.
    It might be reusable, however it is not recommended to reuse an instance of DAGExecutor!.

    def __init__(
        dag: DAG[P, RVDAG],
        target_nodes: Optional[Sequence[Alias]] = None,
        exclude_nodes: Optional[Sequence[Alias]] = None,
        cache_deps_of: Optional[Sequence[Alias]] = None,
        cache_in: str = "",
        from_cache: str = "",
        call_id: Optional[str] = None,

            dag (DAG): The attached DAG.
            target_nodes (Optional[List[Alias]]): The leave ExecNodes to execute.
                If None will execute all ExecNodes.
                Defaults to None.
            exclude_nodes (Optional[List[Alias]]): The leave ExecNodes to exclude.
                If None will exclude all ExecNodes.
                Defaults to None.
            cache_deps_of (Optional[List[Alias]]): cache all the dependencies of these nodes.
                This option can not be used together with target_nodes nor exclude_nodes.
            cache_in (str):
                the path to the file where the execution should be cached.
                The path should end in `.pkl`.
                Will skip caching if `cache_in` is Falsy.
                Will raise PickleError if any of the values passed around in the DAG is not pickleable.
                Defaults to "".
            from_cache (str):
                the path to the file where the execution should be loaded from.
                The path should end in `.pkl`.
                Will skip loading from cache if `from_cache` is Falsy.
                Defaults to "".
            call_id (Optional[str]): identification of the current execution.
                This will be inserted into thread_name_prefix while executing the threadPool.
                It will be used in the future for identifying the execution inside Processes etc.
        # todo: Maybe we can support .dill to extend the possibilities of the exchanged values, but this won't solve the whole problem

        self.dag = dag
        self.target_nodes = target_nodes
        self.exclude_nodes = exclude_nodes
        self.cache_deps_of = cache_deps_of
        self.cache_in = cache_in
        self.from_cache = from_cache
        # NOTE: from_cache is orthogonal to cache_in which means that if cache_in is set at the same time as from_cache.
        #  in this case the DAG will be loaded from_cache and the results will be saved again to the cache_in file.
        self.call_id = call_id

        # get the leaves ids to execute in case of a subgraph
        self.target_nodes = target_nodes
        self.exclude_nodes = exclude_nodes

        self.xn_dict: Dict[Identifier, ExecNode] = {}
        self.results: Dict[Identifier, Any] = {}


        self.executed = False

    def _construct_dynamic_attributes(self) -> None:
        self.graph = self._make_graph()
        self.scheduled_nodes = self.graph.nodes

    def _make_graph(self) -> nx.DiGraph:
        """Make the graph of the execution.

        This method is called only once per instance.
        # logic parts
        if self.cache_deps_of is not None:
            return self.dag._make_subgraph(self.cache_deps_of)

        return self.dag._make_subgraph(self.target_nodes, self.exclude_nodes)

    def cache_in(self) -> str:
        """The path to the file where the execution should be cached.

            str: The path to the file where the execution should be cached.
        return self._cache_in

    def cache_in(self, cache_in: str) -> None:
        if cache_in and not cache_in.endswith(".pkl"):
            raise ValueError("cache_in should end with.pkl")
        self._cache_in = cache_in

    def from_cache(self) -> str:
        """Get the file path from which the cached execution should be loaded.

            str: the file path of the cached execution
        return self._from_cache

    def from_cache(self, from_cache: str) -> None:
        if from_cache and not from_cache.endswith(".pkl"):
            raise ValueError("from_cache should end with.pkl")
        self._from_cache = from_cache

    def cache_deps_of(self) -> Optional[Sequence[Alias]]:
        """Cache all the dependencies of these nodes.

            Optional[List[Alias]]: List of Aliases passed to cache_deps_of while instantiating DAGExecution
        return self._cache_deps_of

    def cache_deps_of(self, cache_deps_of: Optional[Sequence[Alias]]) -> None:
        if (
            self.target_nodes is not None or self.exclude_nodes is not None
        ) and cache_deps_of is not None:
            raise ValueError(
                "cache_deps_of can not be used together with target_nodes or exclude_nodes"
        self._cache_deps_of = cache_deps_of

    # we need to reimplement the public methods of DAG here in order to have a constant public interface
    # getters
    def get_nodes_by_tag(self, tag: Tag) -> List[ExecNode]:
        """Get all the nodes with the given tag.

            tag (Tag): tag of ExecNodes in question

            List[ExecNode]: corresponding ExecNodes
        if self.executed:
            return [ex_n for ex_n in self.xn_dict.values() if ex_n.tag == tag]
        return self.dag.get_nodes_by_tag(tag)

    def get_node_by_id(self, id_: Identifier) -> ExecNode:
        """Get node with the given id.

            id_ (Identifier): id of the ExecNode

            ExecNode: Corresponding ExecNode
        # TODO: ? catch the keyError and
        #   help the user know the id of the ExecNode by pointing to documentation!?
        if self.executed:
            return self.xn_dict[id_]
        return self.dag.get_node_by_id(id_)

    def setup(self) -> None:
        """Does the same thing as DAG.setup. However the `target_nodes` and `exclude_nodes` are taken from the DAGExecution's initization."""
        # TODO: handle the case where cache_deps_of is provided instead of target_nodes and exclude_nodes
        #  in which case the deps_of might have a setup node themselves which should not run.
        #  This is an edge case though that is not important to handle at the current moment.
        self.dag.setup(target_nodes=self.target_nodes, exclude_nodes=self.exclude_nodes)

    def __call__(self, *args: P.args, **kwargs: P.kwargs) -> RVDAG:
        """Call the DAG.

            *args: positional arguments to pass in to the DAG
            **kwargs: keyword arguments to pass in to the DAG

            TawaziUsageError: if the DAGExecution has already been executed.

            RVDAG: the return value of the DAG's Execution
        if self.executed:
            warnings.warn("DAGExecution object's reuse is not recommended.", stacklevel=2)

        # NOTE: *args will be ignored if self.from_cache is set!
        dag = self.dag

        # maybe call_id will be changed to Union[int, str].
        # Keep call_id as Optional[str] for now
        call_id = self.call_id if self.call_id is not None else ""

        # 1. copy the ExecNodes
        call_xn_dict = dag._make_call_xn_dict(*args)
        if self.from_cache:
            with open(self.from_cache, "rb") as f:
                cached_results = pickle.load(f)  # noqa: S301
            # set the result for the ExecNode that were previously executed
            # this will make them skip execution inside the scheduler
            for id_, result in cached_results.items():
                call_xn_dict[id_].result = result

        # 2. Execute the scheduler
        self.xn_dict = dag._execute(self.graph, call_xn_dict, call_id)
        self.results = { xn.result for xn in self.xn_dict.values()}

        # 3. cache in the graph results
        if self.cache_in:
            Path(self.cache_in).parent.mkdir(parents=True, exist_ok=True)
            with open(self.cache_in, "wb") as f:
                # NOTE: we are currently only storing the results of the execution,
                #  this means that the configuration of the ExecNodes are lost!
                #  But this is ok since it should not change between executions!
                #  for example, a setup ExecNode should stay a setup ExecNode between caching in the results and reading back the cached results
                #  the same goes for the DAG itself, the behavior when an error is encountered & its concurrency will be controlled via the constructor

                if self.cache_deps_of is not None:
                    non_cacheable_ids: Set[Identifier] = set()
                    for aliases in self.cache_deps_of:
                        ids = self.dag._alias_to_ids(aliases)
                        non_cacheable_ids = non_cacheable_ids.union(ids)

                    to_cache_results = {
                        id_: res
                        for id_, res in self.results.items()
                        if id_ not in non_cacheable_ids
                    to_cache_results = self.results
                    to_cache_results, f, protocol=pickle.HIGHEST_PROTOCOL, fix_imports=False

        # TODO: make DAGExecution reusable but do not guarantee ThreadSafety!
        self.executed = True
        # 3. extract the returned value/values
        return dag._get_return_values(self.xn_dict)  # type: ignore[return-value]

__init__(dag, *, target_nodes=None, exclude_nodes=None, cache_deps_of=None, cache_in='', from_cache='', call_id=None)



Name Type Description Default
dag DAG

The attached DAG.

target_nodes Optional[List[Alias]]

The leave ExecNodes to execute. If None will execute all ExecNodes. Defaults to None.

exclude_nodes Optional[List[Alias]]

The leave ExecNodes to exclude. If None will exclude all ExecNodes. Defaults to None.

cache_deps_of Optional[List[Alias]]

cache all the dependencies of these nodes. This option can not be used together with target_nodes nor exclude_nodes.

cache_in str

the path to the file where the execution should be cached. The path should end in .pkl. Will skip caching if cache_in is Falsy. Will raise PickleError if any of the values passed around in the DAG is not pickleable. Defaults to "".

from_cache str

the path to the file where the execution should be loaded from. The path should end in .pkl. Will skip loading from cache if from_cache is Falsy. Defaults to "".

call_id Optional[str]

identification of the current execution. This will be inserted into thread_name_prefix while executing the threadPool. It will be used in the future for identifying the execution inside Processes etc.

Source code in tawazi/_dag/
def __init__(
    dag: DAG[P, RVDAG],
    target_nodes: Optional[Sequence[Alias]] = None,
    exclude_nodes: Optional[Sequence[Alias]] = None,
    cache_deps_of: Optional[Sequence[Alias]] = None,
    cache_in: str = "",
    from_cache: str = "",
    call_id: Optional[str] = None,

        dag (DAG): The attached DAG.
        target_nodes (Optional[List[Alias]]): The leave ExecNodes to execute.
            If None will execute all ExecNodes.
            Defaults to None.
        exclude_nodes (Optional[List[Alias]]): The leave ExecNodes to exclude.
            If None will exclude all ExecNodes.
            Defaults to None.
        cache_deps_of (Optional[List[Alias]]): cache all the dependencies of these nodes.
            This option can not be used together with target_nodes nor exclude_nodes.
        cache_in (str):
            the path to the file where the execution should be cached.
            The path should end in `.pkl`.
            Will skip caching if `cache_in` is Falsy.
            Will raise PickleError if any of the values passed around in the DAG is not pickleable.
            Defaults to "".
        from_cache (str):
            the path to the file where the execution should be loaded from.
            The path should end in `.pkl`.
            Will skip loading from cache if `from_cache` is Falsy.
            Defaults to "".
        call_id (Optional[str]): identification of the current execution.
            This will be inserted into thread_name_prefix while executing the threadPool.
            It will be used in the future for identifying the execution inside Processes etc.
    # todo: Maybe we can support .dill to extend the possibilities of the exchanged values, but this won't solve the whole problem

    self.dag = dag
    self.target_nodes = target_nodes
    self.exclude_nodes = exclude_nodes
    self.cache_deps_of = cache_deps_of
    self.cache_in = cache_in
    self.from_cache = from_cache
    # NOTE: from_cache is orthogonal to cache_in which means that if cache_in is set at the same time as from_cache.
    #  in this case the DAG will be loaded from_cache and the results will be saved again to the cache_in file.
    self.call_id = call_id

    # get the leaves ids to execute in case of a subgraph
    self.target_nodes = target_nodes
    self.exclude_nodes = exclude_nodes

    self.xn_dict: Dict[Identifier, ExecNode] = {}
    self.results: Dict[Identifier, Any] = {}


    self.executed = False

__call__(*args, **kwargs)

Call the DAG.


Name Type Description Default
*args P.args

positional arguments to pass in to the DAG

**kwargs P.kwargs

keyword arguments to pass in to the DAG



Type Description

if the DAGExecution has already been executed.


Name Type Description

the return value of the DAG's Execution

Source code in tawazi/_dag/
def __call__(self, *args: P.args, **kwargs: P.kwargs) -> RVDAG:
    """Call the DAG.

        *args: positional arguments to pass in to the DAG
        **kwargs: keyword arguments to pass in to the DAG

        TawaziUsageError: if the DAGExecution has already been executed.

        RVDAG: the return value of the DAG's Execution
    if self.executed:
        warnings.warn("DAGExecution object's reuse is not recommended.", stacklevel=2)

    # NOTE: *args will be ignored if self.from_cache is set!
    dag = self.dag

    # maybe call_id will be changed to Union[int, str].
    # Keep call_id as Optional[str] for now
    call_id = self.call_id if self.call_id is not None else ""

    # 1. copy the ExecNodes
    call_xn_dict = dag._make_call_xn_dict(*args)
    if self.from_cache:
        with open(self.from_cache, "rb") as f:
            cached_results = pickle.load(f)  # noqa: S301
        # set the result for the ExecNode that were previously executed
        # this will make them skip execution inside the scheduler
        for id_, result in cached_results.items():
            call_xn_dict[id_].result = result

    # 2. Execute the scheduler
    self.xn_dict = dag._execute(self.graph, call_xn_dict, call_id)
    self.results = { xn.result for xn in self.xn_dict.values()}

    # 3. cache in the graph results
    if self.cache_in:
        Path(self.cache_in).parent.mkdir(parents=True, exist_ok=True)
        with open(self.cache_in, "wb") as f:
            # NOTE: we are currently only storing the results of the execution,
            #  this means that the configuration of the ExecNodes are lost!
            #  But this is ok since it should not change between executions!
            #  for example, a setup ExecNode should stay a setup ExecNode between caching in the results and reading back the cached results
            #  the same goes for the DAG itself, the behavior when an error is encountered & its concurrency will be controlled via the constructor

            if self.cache_deps_of is not None:
                non_cacheable_ids: Set[Identifier] = set()
                for aliases in self.cache_deps_of:
                    ids = self.dag._alias_to_ids(aliases)
                    non_cacheable_ids = non_cacheable_ids.union(ids)

                to_cache_results = {
                    id_: res
                    for id_, res in self.results.items()
                    if id_ not in non_cacheable_ids
                to_cache_results = self.results
                to_cache_results, f, protocol=pickle.HIGHEST_PROTOCOL, fix_imports=False

    # TODO: make DAGExecution reusable but do not guarantee ThreadSafety!
    self.executed = True
    # 3. extract the returned value/values
    return dag._get_return_values(self.xn_dict)  # type: ignore[return-value]


Does the same thing as DAG.setup. However the target_nodes and exclude_nodes are taken from the DAGExecution's initization.

Source code in tawazi/_dag/
def setup(self) -> None:
    """Does the same thing as DAG.setup. However the `target_nodes` and `exclude_nodes` are taken from the DAGExecution's initization."""
    # TODO: handle the case where cache_deps_of is provided instead of target_nodes and exclude_nodes
    #  in which case the deps_of might have a setup node themselves which should not run.
    #  This is an edge case though that is not important to handle at the current moment.
    self.dag.setup(target_nodes=self.target_nodes, exclude_nodes=self.exclude_nodes)


Get node with the given id.


Name Type Description Default
id_ Identifier

id of the ExecNode



Name Type Description
ExecNode ExecNode

Corresponding ExecNode

Source code in tawazi/_dag/
def get_node_by_id(self, id_: Identifier) -> ExecNode:
    """Get node with the given id.

        id_ (Identifier): id of the ExecNode

        ExecNode: Corresponding ExecNode
    # TODO: ? catch the keyError and
    #   help the user know the id of the ExecNode by pointing to documentation!?
    if self.executed:
        return self.xn_dict[id_]
    return self.dag.get_node_by_id(id_)


Get all the nodes with the given tag.


Name Type Description Default
tag Tag

tag of ExecNodes in question



Type Description

List[ExecNode]: corresponding ExecNodes

Source code in tawazi/_dag/
def get_nodes_by_tag(self, tag: Tag) -> List[ExecNode]:
    """Get all the nodes with the given tag.

        tag (Tag): tag of ExecNodes in question

        List[ExecNode]: corresponding ExecNodes
    if self.executed:
        return [ex_n for ex_n in self.xn_dict.values() if ex_n.tag == tag]
    return self.dag.get_nodes_by_tag(tag)