Skip to content

DAGExecution

Bases: Generic[P, RVDAG]

A disposable callable instance of a DAG.

It holds information about the last execution. Hence it is not threadsafe. It might be reusable, however it is not recommended to reuse an instance of DAGExecutor!.

Source code in tawazi/_dag/dag.py
class DAGExecution(Generic[P, RVDAG]):
    """A disposable callable instance of a DAG.

    It holds information about the last execution. Hence it is not threadsafe.
    It might be reusable, however it is not recommended to reuse an instance of DAGExecutor!.
    """

    def __init__(
        self,
        dag: DAG[P, RVDAG],
        *,
        target_nodes: Optional[Sequence[Alias]] = None,
        exclude_nodes: Optional[Sequence[Alias]] = None,
        cache_deps_of: Optional[Sequence[Alias]] = None,
        cache_in: str = "",
        from_cache: str = "",
        call_id: Optional[str] = None,
    ):
        """Constructor.

        Args:
            dag (DAG): The attached DAG.
            target_nodes (Optional[List[Alias]]): The leave ExecNodes to execute.
                If None will execute all ExecNodes.
                Defaults to None.
            exclude_nodes (Optional[List[Alias]]): The leave ExecNodes to exclude.
                If None will exclude all ExecNodes.
                Defaults to None.
            cache_deps_of (Optional[List[Alias]]): cache all the dependencies of these nodes.
                This option can not be used together with target_nodes nor exclude_nodes.
            cache_in (str):
                the path to the file where the execution should be cached.
                The path should end in `.pkl`.
                Will skip caching if `cache_in` is Falsy.
                Will raise PickleError if any of the values passed around in the DAG is not pickleable.
                Defaults to "".
            from_cache (str):
                the path to the file where the execution should be loaded from.
                The path should end in `.pkl`.
                Will skip loading from cache if `from_cache` is Falsy.
                Defaults to "".
            call_id (Optional[str]): identification of the current execution.
                This will be inserted into thread_name_prefix while executing the threadPool.
                It will be used in the future for identifying the execution inside Processes etc.
        """
        # todo: Maybe we can support .dill to extend the possibilities of the exchanged values, but this won't solve the whole problem

        self.dag = dag
        self.target_nodes = target_nodes
        self.exclude_nodes = exclude_nodes
        self.cache_deps_of = cache_deps_of
        self.cache_in = cache_in
        self.from_cache = from_cache
        # NOTE: from_cache is orthogonal to cache_in which means that if cache_in is set at the same time as from_cache.
        #  in this case the DAG will be loaded from_cache and the results will be saved again to the cache_in file.
        self.call_id = call_id

        # get the leaves ids to execute in case of a subgraph
        self.target_nodes = target_nodes
        self.exclude_nodes = exclude_nodes

        self.xn_dict: Dict[Identifier, ExecNode] = {}
        self.results: Dict[Identifier, Any] = {}

        self._construct_dynamic_attributes()

        self.executed = False

    def _construct_dynamic_attributes(self) -> None:
        self.graph = self._make_graph()
        self.scheduled_nodes = self.graph.nodes

    def _make_graph(self) -> nx.DiGraph:
        """Make the graph of the execution.

        This method is called only once per instance.
        """
        # logic parts
        if self.cache_deps_of is not None:
            return self.dag._make_subgraph(self.cache_deps_of)

        return self.dag._make_subgraph(self.target_nodes, self.exclude_nodes)

    @property
    def cache_in(self) -> str:
        """The path to the file where the execution should be cached.

        Returns:
            str: The path to the file where the execution should be cached.
        """
        return self._cache_in

    @cache_in.setter
    def cache_in(self, cache_in: str) -> None:
        if cache_in and not cache_in.endswith(".pkl"):
            raise ValueError("cache_in should end with.pkl")
        self._cache_in = cache_in

    @property
    def from_cache(self) -> str:
        """Get the file path from which the cached execution should be loaded.

        Returns:
            str: the file path of the cached execution
        """
        return self._from_cache

    @from_cache.setter
    def from_cache(self, from_cache: str) -> None:
        if from_cache and not from_cache.endswith(".pkl"):
            raise ValueError("from_cache should end with.pkl")
        self._from_cache = from_cache

    @property
    def cache_deps_of(self) -> Optional[Sequence[Alias]]:
        """Cache all the dependencies of these nodes.

        Returns:
            Optional[List[Alias]]: List of Aliases passed to cache_deps_of while instantiating DAGExecution
        """
        return self._cache_deps_of

    @cache_deps_of.setter
    def cache_deps_of(self, cache_deps_of: Optional[Sequence[Alias]]) -> None:
        if (
            self.target_nodes is not None or self.exclude_nodes is not None
        ) and cache_deps_of is not None:
            raise ValueError(
                "cache_deps_of can not be used together with target_nodes or exclude_nodes"
            )
        self._cache_deps_of = cache_deps_of

    # we need to reimplement the public methods of DAG here in order to have a constant public interface
    # getters
    def get_nodes_by_tag(self, tag: Tag) -> List[ExecNode]:
        """Get all the nodes with the given tag.

        Args:
            tag (Tag): tag of ExecNodes in question

        Returns:
            List[ExecNode]: corresponding ExecNodes
        """
        if self.executed:
            return [ex_n for ex_n in self.xn_dict.values() if ex_n.tag == tag]
        return self.dag.get_nodes_by_tag(tag)

    def get_node_by_id(self, id_: Identifier) -> ExecNode:
        """Get node with the given id.

        Args:
            id_ (Identifier): id of the ExecNode

        Returns:
            ExecNode: Corresponding ExecNode
        """
        # TODO: ? catch the keyError and
        #   help the user know the id of the ExecNode by pointing to documentation!?
        if self.executed:
            return self.xn_dict[id_]
        return self.dag.get_node_by_id(id_)

    def setup(self) -> None:
        """Does the same thing as DAG.setup. However the `target_nodes` and `exclude_nodes` are taken from the DAGExecution's initization."""
        # TODO: handle the case where cache_deps_of is provided instead of target_nodes and exclude_nodes
        #  in which case the deps_of might have a setup node themselves which should not run.
        #  This is an edge case though that is not important to handle at the current moment.
        self.dag.setup(target_nodes=self.target_nodes, exclude_nodes=self.exclude_nodes)

    def __call__(self, *args: P.args, **kwargs: P.kwargs) -> RVDAG:
        """Call the DAG.

        Args:
            *args: positional arguments to pass in to the DAG
            **kwargs: keyword arguments to pass in to the DAG

        Raises:
            TawaziUsageError: if the DAGExecution has already been executed.

        Returns:
            RVDAG: the return value of the DAG's Execution
        """
        if self.executed:
            warnings.warn("DAGExecution object's reuse is not recommended.", stacklevel=2)
            self._construct_dynamic_attributes()

        # NOTE: *args will be ignored if self.from_cache is set!
        dag = self.dag

        # maybe call_id will be changed to Union[int, str].
        # Keep call_id as Optional[str] for now
        call_id = self.call_id if self.call_id is not None else ""

        # 1. copy the ExecNodes
        call_xn_dict = dag._make_call_xn_dict(*args)
        if self.from_cache:
            with open(self.from_cache, "rb") as f:
                cached_results = pickle.load(f)  # noqa: S301
            # set the result for the ExecNode that were previously executed
            # this will make them skip execution inside the scheduler
            for id_, result in cached_results.items():
                call_xn_dict[id_].result = result

        # 2. Execute the scheduler
        self.xn_dict = dag._execute(self.graph, call_xn_dict, call_id)
        self.results = {xn.id: xn.result for xn in self.xn_dict.values()}

        # 3. cache in the graph results
        if self.cache_in:
            Path(self.cache_in).parent.mkdir(parents=True, exist_ok=True)
            with open(self.cache_in, "wb") as f:
                # NOTE: we are currently only storing the results of the execution,
                #  this means that the configuration of the ExecNodes are lost!
                #  But this is ok since it should not change between executions!
                #  for example, a setup ExecNode should stay a setup ExecNode between caching in the results and reading back the cached results
                #  the same goes for the DAG itself, the behavior when an error is encountered & its concurrency will be controlled via the constructor

                if self.cache_deps_of is not None:
                    non_cacheable_ids: Set[Identifier] = set()
                    for aliases in self.cache_deps_of:
                        ids = self.dag._alias_to_ids(aliases)
                        non_cacheable_ids = non_cacheable_ids.union(ids)

                    to_cache_results = {
                        id_: res
                        for id_, res in self.results.items()
                        if id_ not in non_cacheable_ids
                    }
                else:
                    to_cache_results = self.results
                pickle.dump(
                    to_cache_results, f, protocol=pickle.HIGHEST_PROTOCOL, fix_imports=False
                )

        # TODO: make DAGExecution reusable but do not guarantee ThreadSafety!
        self.executed = True
        # 3. extract the returned value/values
        return dag._get_return_values(self.xn_dict)  # type: ignore[return-value]

__init__(dag, *, target_nodes=None, exclude_nodes=None, cache_deps_of=None, cache_in='', from_cache='', call_id=None)

Constructor.

Parameters:

Name Type Description Default
dag DAG

The attached DAG.

required
target_nodes Optional[List[Alias]]

The leave ExecNodes to execute. If None will execute all ExecNodes. Defaults to None.

None
exclude_nodes Optional[List[Alias]]

The leave ExecNodes to exclude. If None will exclude all ExecNodes. Defaults to None.

None
cache_deps_of Optional[List[Alias]]

cache all the dependencies of these nodes. This option can not be used together with target_nodes nor exclude_nodes.

None
cache_in str

the path to the file where the execution should be cached. The path should end in .pkl. Will skip caching if cache_in is Falsy. Will raise PickleError if any of the values passed around in the DAG is not pickleable. Defaults to "".

''
from_cache str

the path to the file where the execution should be loaded from. The path should end in .pkl. Will skip loading from cache if from_cache is Falsy. Defaults to "".

''
call_id Optional[str]

identification of the current execution. This will be inserted into thread_name_prefix while executing the threadPool. It will be used in the future for identifying the execution inside Processes etc.

None
Source code in tawazi/_dag/dag.py
def __init__(
    self,
    dag: DAG[P, RVDAG],
    *,
    target_nodes: Optional[Sequence[Alias]] = None,
    exclude_nodes: Optional[Sequence[Alias]] = None,
    cache_deps_of: Optional[Sequence[Alias]] = None,
    cache_in: str = "",
    from_cache: str = "",
    call_id: Optional[str] = None,
):
    """Constructor.

    Args:
        dag (DAG): The attached DAG.
        target_nodes (Optional[List[Alias]]): The leave ExecNodes to execute.
            If None will execute all ExecNodes.
            Defaults to None.
        exclude_nodes (Optional[List[Alias]]): The leave ExecNodes to exclude.
            If None will exclude all ExecNodes.
            Defaults to None.
        cache_deps_of (Optional[List[Alias]]): cache all the dependencies of these nodes.
            This option can not be used together with target_nodes nor exclude_nodes.
        cache_in (str):
            the path to the file where the execution should be cached.
            The path should end in `.pkl`.
            Will skip caching if `cache_in` is Falsy.
            Will raise PickleError if any of the values passed around in the DAG is not pickleable.
            Defaults to "".
        from_cache (str):
            the path to the file where the execution should be loaded from.
            The path should end in `.pkl`.
            Will skip loading from cache if `from_cache` is Falsy.
            Defaults to "".
        call_id (Optional[str]): identification of the current execution.
            This will be inserted into thread_name_prefix while executing the threadPool.
            It will be used in the future for identifying the execution inside Processes etc.
    """
    # todo: Maybe we can support .dill to extend the possibilities of the exchanged values, but this won't solve the whole problem

    self.dag = dag
    self.target_nodes = target_nodes
    self.exclude_nodes = exclude_nodes
    self.cache_deps_of = cache_deps_of
    self.cache_in = cache_in
    self.from_cache = from_cache
    # NOTE: from_cache is orthogonal to cache_in which means that if cache_in is set at the same time as from_cache.
    #  in this case the DAG will be loaded from_cache and the results will be saved again to the cache_in file.
    self.call_id = call_id

    # get the leaves ids to execute in case of a subgraph
    self.target_nodes = target_nodes
    self.exclude_nodes = exclude_nodes

    self.xn_dict: Dict[Identifier, ExecNode] = {}
    self.results: Dict[Identifier, Any] = {}

    self._construct_dynamic_attributes()

    self.executed = False

__call__(*args, **kwargs)

Call the DAG.

Parameters:

Name Type Description Default
*args P.args

positional arguments to pass in to the DAG

()
**kwargs P.kwargs

keyword arguments to pass in to the DAG

{}

Raises:

Type Description
TawaziUsageError

if the DAGExecution has already been executed.

Returns:

Name Type Description
RVDAG RVDAG

the return value of the DAG's Execution

Source code in tawazi/_dag/dag.py
def __call__(self, *args: P.args, **kwargs: P.kwargs) -> RVDAG:
    """Call the DAG.

    Args:
        *args: positional arguments to pass in to the DAG
        **kwargs: keyword arguments to pass in to the DAG

    Raises:
        TawaziUsageError: if the DAGExecution has already been executed.

    Returns:
        RVDAG: the return value of the DAG's Execution
    """
    if self.executed:
        warnings.warn("DAGExecution object's reuse is not recommended.", stacklevel=2)
        self._construct_dynamic_attributes()

    # NOTE: *args will be ignored if self.from_cache is set!
    dag = self.dag

    # maybe call_id will be changed to Union[int, str].
    # Keep call_id as Optional[str] for now
    call_id = self.call_id if self.call_id is not None else ""

    # 1. copy the ExecNodes
    call_xn_dict = dag._make_call_xn_dict(*args)
    if self.from_cache:
        with open(self.from_cache, "rb") as f:
            cached_results = pickle.load(f)  # noqa: S301
        # set the result for the ExecNode that were previously executed
        # this will make them skip execution inside the scheduler
        for id_, result in cached_results.items():
            call_xn_dict[id_].result = result

    # 2. Execute the scheduler
    self.xn_dict = dag._execute(self.graph, call_xn_dict, call_id)
    self.results = {xn.id: xn.result for xn in self.xn_dict.values()}

    # 3. cache in the graph results
    if self.cache_in:
        Path(self.cache_in).parent.mkdir(parents=True, exist_ok=True)
        with open(self.cache_in, "wb") as f:
            # NOTE: we are currently only storing the results of the execution,
            #  this means that the configuration of the ExecNodes are lost!
            #  But this is ok since it should not change between executions!
            #  for example, a setup ExecNode should stay a setup ExecNode between caching in the results and reading back the cached results
            #  the same goes for the DAG itself, the behavior when an error is encountered & its concurrency will be controlled via the constructor

            if self.cache_deps_of is not None:
                non_cacheable_ids: Set[Identifier] = set()
                for aliases in self.cache_deps_of:
                    ids = self.dag._alias_to_ids(aliases)
                    non_cacheable_ids = non_cacheable_ids.union(ids)

                to_cache_results = {
                    id_: res
                    for id_, res in self.results.items()
                    if id_ not in non_cacheable_ids
                }
            else:
                to_cache_results = self.results
            pickle.dump(
                to_cache_results, f, protocol=pickle.HIGHEST_PROTOCOL, fix_imports=False
            )

    # TODO: make DAGExecution reusable but do not guarantee ThreadSafety!
    self.executed = True
    # 3. extract the returned value/values
    return dag._get_return_values(self.xn_dict)  # type: ignore[return-value]

setup()

Does the same thing as DAG.setup. However the target_nodes and exclude_nodes are taken from the DAGExecution's initization.

Source code in tawazi/_dag/dag.py
def setup(self) -> None:
    """Does the same thing as DAG.setup. However the `target_nodes` and `exclude_nodes` are taken from the DAGExecution's initization."""
    # TODO: handle the case where cache_deps_of is provided instead of target_nodes and exclude_nodes
    #  in which case the deps_of might have a setup node themselves which should not run.
    #  This is an edge case though that is not important to handle at the current moment.
    self.dag.setup(target_nodes=self.target_nodes, exclude_nodes=self.exclude_nodes)

get_node_by_id(id_)

Get node with the given id.

Parameters:

Name Type Description Default
id_ Identifier

id of the ExecNode

required

Returns:

Name Type Description
ExecNode ExecNode

Corresponding ExecNode

Source code in tawazi/_dag/dag.py
def get_node_by_id(self, id_: Identifier) -> ExecNode:
    """Get node with the given id.

    Args:
        id_ (Identifier): id of the ExecNode

    Returns:
        ExecNode: Corresponding ExecNode
    """
    # TODO: ? catch the keyError and
    #   help the user know the id of the ExecNode by pointing to documentation!?
    if self.executed:
        return self.xn_dict[id_]
    return self.dag.get_node_by_id(id_)

get_nodes_by_tag(tag)

Get all the nodes with the given tag.

Parameters:

Name Type Description Default
tag Tag

tag of ExecNodes in question

required

Returns:

Type Description
List[ExecNode]

List[ExecNode]: corresponding ExecNodes

Source code in tawazi/_dag/dag.py
def get_nodes_by_tag(self, tag: Tag) -> List[ExecNode]:
    """Get all the nodes with the given tag.

    Args:
        tag (Tag): tag of ExecNodes in question

    Returns:
        List[ExecNode]: corresponding ExecNodes
    """
    if self.executed:
        return [ex_n for ex_n in self.xn_dict.values() if ex_n.tag == tag]
    return self.dag.get_nodes_by_tag(tag)