class ExecNode:
"""This class is the base executable node of the Directed Acyclic Execution Graph.
An ExecNode is an Object that can be executed inside a DAG scheduler.
It basically consists of a function (exec_function) that takes args and kwargs and returns a value.
When the ExecNode is executed in the DAG, the resulting value will be stored in the ExecNode.result instance attribute.
Note: This class is not meant to be instantiated directly.
Please use `@xn` decorator.
"""
def __init__(
self,
id_: Identifier,
exec_function: Callable[..., Any] = lambda *args, **kwargs: None,
args: Optional[List["UsageExecNode"]] = None,
kwargs: Optional[Dict[str, "UsageExecNode"]] = None,
priority: int = 0,
is_sequential: bool = cfg.TAWAZI_IS_SEQUENTIAL,
debug: bool = False,
tag: Optional[TagOrTags] = None,
setup: bool = False,
unpack_to: Optional[int] = None,
resource: Resource = cfg.TAWAZI_DEFAULT_RESOURCE,
):
"""Constructor of ExecNode.
Args:
id_ (Identifier): identifier of ExecNode.
exec_function (Callable): a callable will be executed in the graph.
This is useful to make Joining ExecNodes (Nodes that enforce dependencies on the graph)
args (Optional[List[ExecNode]], optional): *args to pass to exec_function.
kwargs (Optional[Dict[str, ExecNode]], optional): **kwargs to pass to exec_function.
priority (int): priority compared to other ExecNodes; the higher the number the higher the priority.
is_sequential (bool): whether to execute this ExecNode in sequential order with respect to others.
When this ExecNode must be executed, all other nodes are waited to finish before starting execution.
Defaults to False.
debug (bool): Make this ExecNode a debug Node. Defaults to False.
tag (TagOrTags): Attach a Tag or Tags to this ExecNode. Defaults to None.
setup (bool): Make this ExecNode a setup Node. Defaults to False.
unpack_to (Optional[int]): if not None, this ExecNode's execution must return unpacked results corresponding to the given value
resource (str): the resource to use to execute this ExecNode. Defaults to "thread".
Raises:
ValueError: if setup and debug are both True.
"""
# 1. assign attributes
self._id = id_
self.exec_function = exec_function
self.priority = priority
self.is_sequential = is_sequential
self.debug = debug
self.tag = tag
self.setup = setup
self.unpack_to = unpack_to
self.active = True
self.resource = resource
self.args: List[UsageExecNode] = args or []
self.kwargs: Dict[Identifier, UsageExecNode] = kwargs or {}
# 2. compound_priority equals priority at the start but will be modified during the build process
self.compound_priority = priority
# 3. Assign the name
# This can be used in the future but is not particularly useful at the moment
self.__name__ = self.exec_function.__name__ if not isinstance(id_, str) else id_
# 4. Assign a default NoVal to the result of the execution of this ExecNode,
# when this ExecNode will be executed, self.result will be overridden
# It would be amazing if we can remove self.result and make ExecNode immutable
self.result: Union[NoValType, Any] = NoVal
"""Internal attribute to store the result of the execution of this ExecNode (Might change!)."""
# even though setting result to NoVal is not necessary... it clarifies debugging
self.profile = Profile(cfg.TAWAZI_PROFILE_ALL_NODES)
@property
def executed(self) -> bool:
"""Whether this ExecNode has been executed."""
return self.result is not NoVal
def __repr__(self) -> str:
"""Human representation of the ExecNode.
Returns:
str: human representation of the ExecNode.
"""
return f"{self.__class__.__name__} {self.id} ~ | <{hex(id(self))}>"
# TODO: make cached_property ?
@property
def dependencies(self) -> List["UsageExecNode"]:
"""The List of ExecNode dependencies of This ExecNode.
Returns:
List[UsageExecNode]: the List of ExecNode dependencies of This ExecNode.
"""
# Making the dependencies
# 1. from args
deps = self.args.copy()
# 2. and from kwargs
deps.extend(self.kwargs.values())
return deps
@property
def id(self) -> Identifier:
"""The identifier of this ExecNode."""
return self._id
@property
def tag(self) -> Optional[TagOrTags]:
"""The Tag or Tags of this ExecNode."""
return self._tag
@tag.setter
def tag(self, value: Optional[TagOrTags]) -> None:
is_none = value is None
is_tag = isinstance(value, Tag)
is_tuple_tag = isinstance(value, tuple) and all(isinstance(v, Tag) for v in value)
if not (is_none or is_tag or is_tuple_tag):
raise TypeError(
f"tag should be of type {TagOrTags} but {value} of type {type(value)} is provided"
)
self._tag = value
@property
def priority(self) -> int:
"""The priority of this ExecNode."""
return self._priority
@priority.setter
def priority(self, value: int) -> None:
if not isinstance(value, int):
raise ValueError(f"priority must be an int, provided {type(value)}")
self._priority = value
@property
def resource(self) -> Resource:
"""The resource used to run this ExecNode."""
return self._resource
@resource.setter
def resource(self, value: Resource) -> None:
if not isinstance(value, Resource):
raise ValueError(f"resource must be of type {Resource}, provided {type(value)}")
self._resource = value
@property
def is_sequential(self) -> bool:
"""Whether `ExecNode` runs in sequential order with respect to other `ExecNode`s."""
return self._is_sequential
@is_sequential.setter
def is_sequential(self, value: bool) -> None:
if not isinstance(value, bool):
raise TypeError(f"is_sequential should be of type bool, but {value} provided")
self._is_sequential = value
@property
def debug(self) -> bool:
"""Whether this ExecNode is a debug Node. ExecNode can't be setup and debug simultaneously."""
return self._debug
@debug.setter
def debug(self, value: bool) -> None:
if not isinstance(value, bool):
raise TypeError(f"debug must be of type bool, but {value} provided")
self._debug = value
self._validate()
@property
def setup(self) -> bool:
"""Whether this ExecNode is a setup Node. ExecNode can't be setup and debug simultaneously."""
return self._setup
@setup.setter
def setup(self, value: bool) -> None:
if not isinstance(value, bool):
raise TypeError(f"setup must be of type bool, but {value} provided")
self._setup = value
self._validate()
@property
def active(self) -> Union["UsageExecNode", bool]:
"""Whether this ExecNode is active or not."""
# the value is set during the DAG description
if isinstance(self._active, UsageExecNode):
return self._active
# the valule set is a constant value
return bool(self._active)
@active.setter
def active(self, value: Any) -> None:
self._active = value
def _execute(self, node_dict: Dict[Identifier, "ExecNode"]) -> Optional[Any]:
"""Execute the ExecNode inside of a DAG.
Args:
node_dict (Dict[Identifier, ExecNode]): A shared dictionary containing the other ExecNodes in the DAG;
the key is the id of the ExecNode. This node_dict refers to the current execution
Returns:
the result of the execution of the current ExecNode
"""
logger.debug(f"Start executing {self.id} with task {self.exec_function}")
self.profile = Profile(cfg.TAWAZI_PROFILE_ALL_NODES)
if self.executed:
logger.debug(f"Skipping execution of a pre-computed node {self.id}")
return self.result
# 1. prepare args and kwargs for usage:
args = [xnw.result(node_dict) for xnw in self.args]
kwargs = {
key: xnw.result(node_dict)
for key, xnw in self.kwargs.items()
if key not in RESERVED_KWARGS
}
# args = [arg.result for arg in self.args]
# kwargs = {key: arg.result for key, arg in self.kwargs.items()}
# 1. pre-
# 1.1 prepare the profiling
with self.profile:
# 2 post-
# 2.1 write the result
self.result = self.exec_function(*args, **kwargs)
# 3. useless return value
logger.debug(f"Finished executing {self.id} with task {self.exec_function}")
return self.result
def _validate(self) -> None:
if getattr(self, "debug", None) and getattr(self, "setup", None):
raise ValueError(
f"The node {self.id} can't be a setup and a debug node at the same time."
)
@property
def unpack_to(self) -> Optional[int]:
"""The number of elements in the unpacked results of this ExecNode.
Returns:
Optional[int]: the number of elements in the unpacked results of this ExecNode.
"""
return self._unpack_to
@unpack_to.setter
def unpack_to(self, value: Optional[int]) -> None:
if value is not None:
if not isinstance(value, int):
raise ValueError(
f"unpack_to must be a positive int or None, provided {type(value)}"
)
# yes... empty tuples exist in Python
if value < 0:
raise ValueError(f"unpack_to must be a positive int or None, provided {value}")
_validate_tuple(self.exec_function, value)
self._unpack_to = value