Skip to content

ExecNodes

warning: This page describes some internal functionalities of Tawazi, it is still subject to change during minor releases.

This class is the base executable node of the Directed Acyclic Execution Graph.

An ExecNode is an Object that can be executed inside a DAG scheduler.

It basically consists of a function (exec_function) that takes args and kwargs and returns a value.

When the ExecNode is executed in the DAG, the resulting value will be stored in the ExecNode.result instance attribute.

This class is not meant to be instantiated directly.

Please use @xn decorator.

Source code in tawazi/node/node.py
class ExecNode:
    """This class is the base executable node of the Directed Acyclic Execution Graph.

    An ExecNode is an Object that can be executed inside a DAG scheduler.

    It basically consists of a function (exec_function) that takes args and kwargs and returns a value.

    When the ExecNode is executed in the DAG, the resulting value will be stored in the ExecNode.result instance attribute.

    Note: This class is not meant to be instantiated directly.
        Please use `@xn` decorator.
    """

    def __init__(
        self,
        id_: Identifier,
        exec_function: Callable[..., Any] = lambda *args, **kwargs: None,
        args: Optional[List["UsageExecNode"]] = None,
        kwargs: Optional[Dict[str, "UsageExecNode"]] = None,
        priority: int = 0,
        is_sequential: bool = cfg.TAWAZI_IS_SEQUENTIAL,
        debug: bool = False,
        tag: Optional[TagOrTags] = None,
        setup: bool = False,
        unpack_to: Optional[int] = None,
        resource: Resource = cfg.TAWAZI_DEFAULT_RESOURCE,
    ):
        """Constructor of ExecNode.

        Args:
            id_ (Identifier): identifier of ExecNode.
            exec_function (Callable): a callable will be executed in the graph.
                This is useful to make Joining ExecNodes (Nodes that enforce dependencies on the graph)
            args (Optional[List[ExecNode]], optional): *args to pass to exec_function.
            kwargs (Optional[Dict[str, ExecNode]], optional): **kwargs to pass to exec_function.
            priority (int): priority compared to other ExecNodes; the higher the number the higher the priority.
            is_sequential (bool): whether to execute this ExecNode in sequential order with respect to others.
                When this ExecNode must be executed, all other nodes are waited to finish before starting execution.
                Defaults to False.
            debug (bool): Make this ExecNode a debug Node. Defaults to False.
            tag (TagOrTags): Attach a Tag or Tags to this ExecNode. Defaults to None.
            setup (bool): Make this ExecNode a setup Node. Defaults to False.
            unpack_to (Optional[int]): if not None, this ExecNode's execution must return unpacked results corresponding to the given value
            resource (str): the resource to use to execute this ExecNode. Defaults to "thread".

        Raises:
            ValueError: if setup and debug are both True.
        """
        # 1. assign attributes
        self._id = id_
        self.exec_function = exec_function
        self.priority = priority
        self.is_sequential = is_sequential
        self.debug = debug
        self.tag = tag
        self.setup = setup
        self.unpack_to = unpack_to
        self.active = True
        self.resource = resource

        self.args: List[UsageExecNode] = args or []
        self.kwargs: Dict[Identifier, UsageExecNode] = kwargs or {}

        # 2. compound_priority equals priority at the start but will be modified during the build process
        self.compound_priority = priority

        # 3. Assign the name
        # This can be used in the future but is not particularly useful at the moment
        self.__name__ = self.exec_function.__name__ if not isinstance(id_, str) else id_

        # 4. Assign a default NoVal to the result of the execution of this ExecNode,
        #  when this ExecNode will be executed, self.result will be overridden
        # It would be amazing if we can remove self.result and make ExecNode immutable
        self.result: Union[NoValType, Any] = NoVal
        """Internal attribute to store the result of the execution of this ExecNode (Might change!)."""
        # even though setting result to NoVal is not necessary... it clarifies debugging

        self.profile = Profile(cfg.TAWAZI_PROFILE_ALL_NODES)

    @property
    def executed(self) -> bool:
        """Whether this ExecNode has been executed."""
        return self.result is not NoVal

    def __repr__(self) -> str:
        """Human representation of the ExecNode.

        Returns:
            str: human representation of the ExecNode.
        """
        return f"{self.__class__.__name__} {self.id} ~ | <{hex(id(self))}>"

    # TODO: make cached_property ?
    @property
    def dependencies(self) -> List["UsageExecNode"]:
        """The List of ExecNode dependencies of This ExecNode.

        Returns:
            List[UsageExecNode]: the List of ExecNode dependencies of This ExecNode.
        """
        # Making the dependencies
        # 1. from args
        deps = self.args.copy()
        # 2. and from kwargs
        deps.extend(self.kwargs.values())

        return deps

    @property
    def id(self) -> Identifier:
        """The identifier of this ExecNode."""
        return self._id

    @property
    def tag(self) -> Optional[TagOrTags]:
        """The Tag or Tags of this ExecNode."""
        return self._tag

    @tag.setter
    def tag(self, value: Optional[TagOrTags]) -> None:
        is_none = value is None
        is_tag = isinstance(value, Tag)
        is_tuple_tag = isinstance(value, tuple) and all(isinstance(v, Tag) for v in value)
        if not (is_none or is_tag or is_tuple_tag):
            raise TypeError(
                f"tag should be of type {TagOrTags} but {value} of type {type(value)} is provided"
            )
        self._tag = value

    @property
    def priority(self) -> int:
        """The priority of this ExecNode."""
        return self._priority

    @priority.setter
    def priority(self, value: int) -> None:
        if not isinstance(value, int):
            raise ValueError(f"priority must be an int, provided {type(value)}")
        self._priority = value

    @property
    def resource(self) -> Resource:
        """The resource used to run this ExecNode."""
        return self._resource

    @resource.setter
    def resource(self, value: Resource) -> None:
        if not isinstance(value, Resource):
            raise ValueError(f"resource must be of type {Resource}, provided {type(value)}")

        self._resource = value

    @property
    def is_sequential(self) -> bool:
        """Whether `ExecNode` runs in sequential order with respect to other `ExecNode`s."""
        return self._is_sequential

    @is_sequential.setter
    def is_sequential(self, value: bool) -> None:
        if not isinstance(value, bool):
            raise TypeError(f"is_sequential should be of type bool, but {value} provided")
        self._is_sequential = value

    @property
    def debug(self) -> bool:
        """Whether this ExecNode is a debug Node. ExecNode can't be setup and debug simultaneously."""
        return self._debug

    @debug.setter
    def debug(self, value: bool) -> None:
        if not isinstance(value, bool):
            raise TypeError(f"debug must be of type bool, but {value} provided")
        self._debug = value
        self._validate()

    @property
    def setup(self) -> bool:
        """Whether this ExecNode is a setup Node. ExecNode can't be setup and debug simultaneously."""
        return self._setup

    @setup.setter
    def setup(self, value: bool) -> None:
        if not isinstance(value, bool):
            raise TypeError(f"setup must be of type bool, but {value} provided")
        self._setup = value
        self._validate()

    @property
    def active(self) -> Union["UsageExecNode", bool]:
        """Whether this ExecNode is active or not."""
        # the value is set during the DAG description
        if isinstance(self._active, UsageExecNode):
            return self._active
        # the valule set is a constant value
        return bool(self._active)

    @active.setter
    def active(self, value: Any) -> None:
        self._active = value

    def _execute(self, node_dict: Dict[Identifier, "ExecNode"]) -> Optional[Any]:
        """Execute the ExecNode inside of a DAG.

        Args:
            node_dict (Dict[Identifier, ExecNode]): A shared dictionary containing the other ExecNodes in the DAG;
                the key is the id of the ExecNode. This node_dict refers to the current execution

        Returns:
            the result of the execution of the current ExecNode
        """
        logger.debug(f"Start executing {self.id} with task {self.exec_function}")
        self.profile = Profile(cfg.TAWAZI_PROFILE_ALL_NODES)

        if self.executed:
            logger.debug(f"Skipping execution of a pre-computed node {self.id}")
            return self.result

        # 1. prepare args and kwargs for usage:
        args = [xnw.result(node_dict) for xnw in self.args]
        kwargs = {
            key: xnw.result(node_dict)
            for key, xnw in self.kwargs.items()
            if key not in RESERVED_KWARGS
        }
        # args = [arg.result for arg in self.args]
        # kwargs = {key: arg.result for key, arg in self.kwargs.items()}

        # 1. pre-
        # 1.1 prepare the profiling
        with self.profile:
            # 2 post-
            # 2.1 write the result
            self.result = self.exec_function(*args, **kwargs)

        # 3. useless return value
        logger.debug(f"Finished executing {self.id} with task {self.exec_function}")
        return self.result

    def _validate(self) -> None:
        if getattr(self, "debug", None) and getattr(self, "setup", None):
            raise ValueError(
                f"The node {self.id} can't be a setup and a debug node at the same time."
            )

    @property
    def unpack_to(self) -> Optional[int]:
        """The number of elements in the unpacked results of this ExecNode.

        Returns:
            Optional[int]: the number of elements in the unpacked results of this ExecNode.
        """
        return self._unpack_to

    @unpack_to.setter
    def unpack_to(self, value: Optional[int]) -> None:
        if value is not None:
            if not isinstance(value, int):
                raise ValueError(
                    f"unpack_to must be a positive int or None, provided {type(value)}"
                )
            # yes... empty tuples exist in Python
            if value < 0:
                raise ValueError(f"unpack_to must be a positive int or None, provided {value}")

            _validate_tuple(self.exec_function, value)

        self._unpack_to = value

id: Identifier property

The identifier of this ExecNode.

tag: Optional[TagOrTags] property writable

The Tag or Tags of this ExecNode.

debug: bool property writable

Whether this ExecNode is a debug Node. ExecNode can't be setup and debug simultaneously.

setup: bool property writable

Whether this ExecNode is a setup Node. ExecNode can't be setup and debug simultaneously.

result: Union[NoValType, Any] = NoVal instance-attribute

Internal attribute to store the result of the execution of this ExecNode (Might change!).

priority: int property writable

The priority of this ExecNode.

is_sequential: bool property writable

Whether ExecNode runs in sequential order with respect to other ExecNodes.

executed: bool property

Whether this ExecNode has been executed.