Skip to content

DAG

Bases: Generic[P, RVDAG]

Data Structure containing ExecNodes with interdependencies.

Please do not instantiate this class directly. Use the decorator @dag instead.

The ExecNodes can be executed in parallel with the following restrictions
  • Limited number of threads.
  • Parallelization constraint of each ExecNode (is_sequential attribute)
Source code in tawazi/_dag/dag.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
class DAG(Generic[P, RVDAG]):
    """Data Structure containing ExecNodes with interdependencies.

    Please do not instantiate this class directly. Use the decorator `@dag` instead.
    The ExecNodes can be executed in parallel with the following restrictions:
        * Limited number of threads.
        * Parallelization constraint of each ExecNode (is_sequential attribute)
    """

    def __init__(
        self,
        exec_nodes: Dict[Identifier, ExecNode],
        input_uxns: List[UsageExecNode],
        return_uxns: ReturnUXNsType,
        max_concurrency: int = 1,
        behavior: ErrorStrategy = ErrorStrategy.strict,
    ):
        """Constructor of the DAG. Should not be called directly. Instead use the `dag` decorator.

        Args:
            exec_nodes: all the ExecNodes
            input_uxns: all the input UsageExecNodes
            return_uxns: the return UsageExecNodes. These can be of various types: None, a single value, tuple, list, dict.
            max_concurrency: the maximal number of threads running in parallel
            behavior: specify the behavior if an ExecNode raises an Error. Three option are currently supported:
                1. DAG.STRICT: stop the execution of all the DAG
                2. DAG.ALL_CHILDREN: do not execute all children ExecNodes, and continue execution of the DAG
                2. DAG.PERMISSIVE: continue execution of the DAG and ignore the error
        """
        self.max_concurrency = max_concurrency
        self.behavior = behavior
        self.return_uxns = return_uxns
        self.input_uxns = input_uxns

        # ExecNodes can be shared between Graphs, their call signatures might also be different
        # NOTE: maybe this should be transformed into a property because there is a deepcopy for node_dict...
        #  this means that there are different ExecNodes that are hanging arround in the same instance of the DAG
        self.node_dict = exec_nodes
        # Compute all the tags in the DAG to reduce overhead during computation
        self.tagged_nodes = defaultdict(list)
        for xn in self.node_dict.values():
            if xn.tag:
                if isinstance(xn.tag, Tag):
                    self.tagged_nodes[xn.tag].append(xn)
                # isinstance(xn.tag, tuple):
                else:
                    for t in xn.tag:
                        self.tagged_nodes[t].append(xn)

        # Might be useful in the future
        self.node_dict_by_name: Dict[str, ExecNode] = {
            exec_node.__name__: exec_node for exec_node in self.node_dict.values()
        }

        self.graph_ids = self._build_graph()

        self.bckrd_deps = {
            id_: list(self.graph_ids.predecessors(xn.id)) for id_, xn in self.node_dict.items()
        }
        self.frwrd_deps = {
            id_: list(self.graph_ids.successors(xn.id)) for id_, xn in self.node_dict.items()
        }

        # calculate the sum of priorities of all recursive children
        self._assign_compound_priority()

        # make a valid execution sequence to run sequentially if needed
        topological_order = self.graph_ids.topologically_sorted
        self.exec_node_sequence = [self.node_dict[xn_id] for xn_id in topological_order]

        self._validate()

    @property
    def max_concurrency(self) -> int:
        """Maximal number of threads running in parallel. (will change!)."""
        return self._max_concurrency

    @max_concurrency.setter
    def max_concurrency(self, value: int) -> None:
        """Set the maximal number of threads running in parallel.

        Args:
            value (int): maximum number of threads running in parallel

        Raises:
            ValueError: if value is not a positive integer
        """
        if not isinstance(value, int):
            raise ValueError("max_concurrency must be an int")
        if value < 1:
            raise ValueError("Invalid maximum number of threads! Must be a positive integer")
        self._max_concurrency = value

    # getters
    def get_nodes_by_tag(self, tag: Tag) -> List[ExecNode]:
        """Get the ExecNodes with the given tag.

        Note: the returned ExecNode is not modified by any execution!
            This means that you can not get the result of its execution via `DAG.get_nodes_by_tag(<tag>).result`.
            In order to do that, you need to make a DAGExecution and then call `DAGExecution.get_nodes_by_tag(<tag>).result`, which will contain the results.

        Args:
            tag (Any): tag of the ExecNodes

        Returns:
            List[ExecNode]: corresponding ExecNodes
        """
        if isinstance(tag, Tag):
            return self.tagged_nodes[tag]
        raise TypeError(f"tag {tag} must be of Tag type. Got {type(tag)}")

    def get_node_by_id(self, id_: Identifier) -> ExecNode:
        """Get the ExecNode with the given id.

        Note: the returned ExecNode is not modified by any execution!
            This means that you can not get the result of its execution via `DAG.get_node_by_id(<id>).result`.
            In order to do that, you need to make a DAGExecution and then call `DAGExecution.get_node_by_id(<id>).result`, which will contain the results.

        Args:
            id_ (Identifier): id of the ExecNode

        Returns:
            ExecNode: corresponding ExecNode
        """
        # TODO: ? catch the keyError and
        #   help the user know the id of the ExecNode by pointing to documentation!?
        return self.node_dict[id_]

    def _get_single_xn_by_alias(self, alias: Alias) -> ExecNode:
        """Get the ExecNode corresponding to the given Alias.

        Args:
            alias (Alias): the Alias to be resolved

        Raises:
            ValueError: if the Alias is not unique

        Returns:
            ExecNode: the ExecNode corresponding to the given Alias
        """
        xns = self._alias_to_ids(alias)
        if len(xns) > 1:
            raise ValueError(
                f"Alias {alias} is not unique. It points to {len(xns)} ExecNodes: {xns}"
            )
        return self.node_dict[xns[0]]

    # TODO: get node by usage (the order of call of an ExecNode)

    # TODO: implement None for outputs to indicate a None output ? (this is not a prioritized feature)
    # TODO: implement ellipsis for composing for the input & outputs
    # TODO: should we support kwargs when DAG.__call__ support kwargs?
    # TODO: Maybe insert an ID into DAG that is related to the dependency describing function !? just like ExecNode
    #  This will be necessary when we want to make a DAG containing DAGs besides ExecNodes
    # NOTE: by doing this, we create a new ExecNode for each input.
    #  Hence we loose all information related to the original ExecNode (tags, etc.)
    #  Maybe a better way to do this is to transform the original ExecNode into an ArgExecNode

    def compose(self, inputs: Union[Alias, Sequence[Alias]], outputs: Union[Alias, Sequence[Alias]], **kwargs: Dict[str, Any]) -> "DAG":  # type: ignore[type-arg]
        """Compose a new DAG using inputs and outputs ExecNodes (Experimental).

        All provided `Alias`es must point to unique `ExecNode`s. Otherwise ValueError is raised
        The user is responsible to correctly specify inputs and outputs signature of the `DAG`.
        * The inputs can be specified as a single `Alias` or a `Sequence` of `Alias`es.
        * The outputs can be specified as a single `Alias` (a single value is returned)
        or a `Sequence` of `Alias`es in which case a Tuple of the values are returned.
        If outputs are specified as [], () is returned.
        The syntax is the following:
        >>> from tawazi import dag, xn, DAG
        >>> from typing import Tuple, Any
        >>> @xn
        ... def unwanted_xn() -> int: return 42
        >>> @xn
        ... def x(v: Any) -> int: return int(v)
        >>> @xn
        ... def y(v: Any) -> str: return str(v)
        >>> @xn
        ... def z(x: int, y: str) -> float: return float(x) + float(y)
        >>> @dag
        ... def pipe() -> Tuple[int, float, int]:
        ...     a = unwanted_xn()
        ...     res = z(x(1), y(1))
        ...     b = unwanted_xn()
        ...     return a, res, b
        >>> composed_dag = pipe.compose([x, y], z)
        >>> assert composed_dag(1, 1) == 2.0
        >>> # composed_dag: DAG[[int, str], float] = pipe.compose([x, y], [z])  # optional typing of the returned DAG!
        >>> # assert composed_dag(1, 1) == 2.0  # type checked!


        Args:
            inputs (Alias | List[Alias]): the Inputs nodes whose results are provided.
            outputs (Alias | List[Alias]): the Output nodes that must execute last, The ones that will generate results
            **kwargs (Dict[str, Any]): additional arguments to be passed to the DAG's constructor
        """
        # what happens for edge cases ??
        # 1. if inputs are less than sufficient to produce outputs (-> error)
        # 2. if inputs are more than sufficient to produce outputs (-> warning)
        # 3. if inputs are successors of outputs (-> error)
        # 5. if inputs are successors of inputs but predecessors of outputs
        # 1. inputs and outputs are overlapping (-> error ambiguous ? maybe not)
        # 1. if inputs & outputs are [] (-> ())
        # 4. cst cubgraph inputs is [], outputs is not [] but contains constants (-> works as expected)
        # 5. inputs is not [], outputs is [] same as 2.
        # 6. a subcase of the above (some inputs suffice to produce some of the outputs, but some inputs don't)
        # 7. advanced usage: if inputs contain ... (i.e. Ellipsis) in this case we must expand it to reach all the remaining XN in a smart manner
        #  we should keep the order of the inputs and outputs (future)
        # 8. what if some arguments have default values? should they be provided by the user?
        # 9. how to specify that arguments of the original DAG should be provided by the user? the user should provide the input's ID which is not a stable Alias yet

        def _alias_or_aliases_to_ids(
            alias_or_aliases: Union[Alias, Sequence[Alias]]
        ) -> List[Identifier]:
            if isinstance(alias_or_aliases, str) or isinstance(alias_or_aliases, ExecNode):
                return [self._get_single_xn_by_alias(alias_or_aliases).id]
            return [self._get_single_xn_by_alias(a_id).id for a_id in alias_or_aliases]

        def _raise_input_successor_of_input(pred: Identifier, succ: Set[Identifier]) -> NoReturn:
            raise ValueError(
                f"Input ExecNodes {succ} depend on Input ExecNode {pred}."
                f"this is ambiguous. Remove either one of them."
            )

        def _raise_missing_input(input_: Identifier) -> NoReturn:
            raise ValueError(
                f"ExecNode {input_} are not declared as inputs. "
                f"Either declare them as inputs or modify the requests outputs."
            )

        def _alias_or_aliases_to_uxns(
            alias_or_aliases: Union[Alias, Sequence[Alias]]
        ) -> ReturnUXNsType:
            if isinstance(alias_or_aliases, str) or isinstance(alias_or_aliases, ExecNode):
                return UsageExecNode(self._get_single_xn_by_alias(alias_or_aliases).id)
            return tuple(
                UsageExecNode(self._get_single_xn_by_alias(a_id).id) for a_id in alias_or_aliases
            )

        # 1. get input ids and output ids.
        #  Alias should correspond to a single ExecNode,
        #  otherwise an ambiguous situation exists, raise error
        in_ids = _alias_or_aliases_to_ids(inputs)
        out_ids = _alias_or_aliases_to_ids(outputs)

        # 2.1 contains all the ids of the nodes that will be in the new DAG
        set_xn_ids = set(in_ids + out_ids)

        # 2.2 all ancestors of the inputs
        in_ids_ancestors: Set[Identifier] = self.graph_ids.ancestors_of_iter(in_ids)

        # 3. check edge cases
        # inputs should not be successors of inputs, otherwise (error)
        # and inputs should produce at least one of the outputs, otherwise (warning)
        for i in in_ids:
            # if pred is ancestor of an input, raise error
            if i in in_ids_ancestors:
                _raise_input_successor_of_input(i, set_xn_ids)

            # if i doesn't produce any of the wanted outputs, raise a warning!
            descendants: Set[Identifier] = nx.descendants(self.graph_ids, i)
            if descendants.isdisjoint(out_ids):
                warnings.warn(
                    f"Input ExecNode {i} is not used to produce any of the requested outputs."
                    f"Consider removing it from the inputs.",
                    stacklevel=2,
                )

        # 4. collect necessary ExecNodes' IDS

        # 4.1 original DAG's inputs that don't contain default values.
        # used to detect missing inputs
        dag_inputs_ids = [
            uxn.id for uxn in self.input_uxns if self.node_dict[uxn.id].result is NoVal
        ]

        # 4.2 define helper function
        def _add_missing_deps(candidate_id: Identifier, set_xn_ids: Set[Identifier]) -> None:
            """Adds missing dependency to the set of ExecNodes that will be in the new DAG.

            Note: uses nonlocal variable dag_inputs_ids

            Args:
                candidate_id (Identifier): candidate id of an `ExecNode` that will be in the new DAG
                set_xn_ids (Set[Identifier]): Set of `ExecNode`s that will be in the new DAG
            """
            preds = self.graph_ids.predecessors(candidate_id)
            for pred in preds:
                if pred not in set_xn_ids:
                    # this candidate is necessary to produce the output,
                    # it is an input to the original DAG
                    # it is not provided as an input to the composed DAG
                    # hence the user forgot to supply it! (raise error)
                    if pred in dag_inputs_ids:
                        _raise_missing_input(pred)

                    # necessary intermediate dependency.
                    # collect it in the set
                    set_xn_ids.add(pred)
                    _add_missing_deps(pred, set_xn_ids)

        # 4.3 add all required dependencies for each output
        for o_id in out_ids:
            _add_missing_deps(o_id, set_xn_ids)

        # 5.1 copy the ExecNodes that will be in the composed DAG because
        #  maybe the composed DAG will modify them (e.g. change their tags)
        #  and we don't want to modify the original DAG
        xn_dict = {xn_id: copy(self.node_dict[xn_id]) for xn_id in set_xn_ids}

        # 5.2 change the inputs of the ExecNodes into ArgExecNodes
        for xn_id, xn in xn_dict.items():
            if xn_id in in_ids:
                logger.debug("changing Composed-DAG's input {} into ArgExecNode", xn_id)
                xn.__class__ = ArgExecNode
                xn.exec_function = _make_raise_arg_error("composed", xn.id)
                # eliminate all dependencies
                xn.args = []
                xn.kwargs = {}

        # 5.3 make the inputs and outputs UXNs for the composed DAG
        in_uxns = [UsageExecNode(xn_id) for xn_id in in_ids]
        # if a single value is returned make the output a single value
        out_uxns = _alias_or_aliases_to_uxns(outputs)

        # 6. return the composed DAG
        # ignore[arg-type] because the type of the kwargs is not known
        return DAG(xn_dict, in_uxns, out_uxns, **kwargs)  # type: ignore[arg-type]

    def _build_graph(self) -> DiGraphEx:
        """Builds the graph and the sequence order for the computation.

        Raises:
            NetworkXUnfeasible: if the graph has cycles
        """
        graph_ids = DiGraphEx()
        # 1. Make the graph
        # 1.1 add nodes
        for id_ in self.node_dict.keys():
            graph_ids.add_node(id_)

        # 1.2 add edges
        for xn in self.node_dict.values():
            edges = [(dep.id, xn.id) for dep in xn.dependencies]
            graph_ids.add_edges_from(edges)

        # 2. Validate the DAG: check for circular dependencies
        cycle = graph_ids._find_cycle()
        if cycle:
            raise NetworkXUnfeasible(
                f"the product contains at least a circular dependency: {cycle}"
            )
        return graph_ids

    def _validate(self) -> None:
        input_ids = [uxn.id for uxn in self.input_uxns]
        # validate setup ExecNodes
        for xn in self.node_dict.values():
            if xn.setup and any(dep.id in input_ids for dep in xn.dependencies):
                raise TawaziUsageError(
                    f"The ExecNode {xn} takes as parameters one of the DAG's input parameter"
                )
        # future validations...

    def _assign_compound_priority(self) -> None:
        """Assigns a compound priority to all nodes in the graph.

        The compound priority is the sum of the priorities of all children recursively.
        """
        # 1. deepcopy graph_ids because it will be modified (pruned)
        graph_ids = deepcopy(self.graph_ids)
        leaf_ids = graph_ids.leaf_nodes

        # 2. assign the compound priority for all the remaining nodes in the graph:
        # Priority assignment happens by epochs:
        # 2.1. during every epoch, we assign the compound priority for the parents of the current leaf nodes
        # 2.2. at the end of every epoch, we trim the graph from its leaf nodes;
        #       hence the previous parents become the new leaf nodes
        while len(graph_ids) > 0:
            # Epoch level
            for leaf_id in leaf_ids:
                leaf_node = self.node_dict[leaf_id]

                for parent_id in self.bckrd_deps[leaf_id]:
                    # increment the compound_priority of the parent node by the leaf priority
                    parent_node = self.node_dict[parent_id]
                    parent_node.compound_priority += leaf_node.compound_priority

                # trim the graph from its leaf nodes
                graph_ids.remove_node(leaf_id)

            # assign the new leaf nodes
            leaf_ids = graph_ids.leaf_nodes

    def draw(self, k: float = 0.8, display: bool = True, t: int = 3) -> None:
        """Draws the Networkx directed graph.

        Args:
            k (float): parameter for the layout of the graph, the higher, the further the nodes apart. Defaults to 0.8.
            display (bool): display the layout created. Defaults to True.
            t (int): time to display in seconds. Defaults to 3.
        """
        import matplotlib.pyplot as plt

        # TODO: use graphviz instead! it is much more elegant

        pos = nx.spring_layout(self.graph_ids, seed=42069, k=k, iterations=20)
        nx.draw(self.graph_ids, pos, with_labels=True)
        if display:
            plt.ion()
            plt.show()
            time.sleep(t)
            plt.close()

    def _execute(
        self,
        graph: DiGraphEx,
        modified_node_dict: Optional[Dict[str, ExecNode]] = None,
        call_id: str = "",
    ) -> Dict[Identifier, Any]:
        return execute(
            node_dict=self.node_dict,
            max_concurrency=self.max_concurrency,
            behavior=self.behavior,
            graph=graph,
            modified_node_dict=modified_node_dict,
            call_id=call_id,
        )

    def _alias_to_ids(self, alias: Alias) -> List[Identifier]:
        """Extract an ExecNode ID from an Alias (Tag, ExecNode ID or ExecNode).

        Args:
            alias (Alias): an Alias (Tag, ExecNode ID or ExecNode)

        Returns:
            The corresponding ExecNode IDs

        Raises:
            ValueError: if a requested ExecNode is not found in the DAG
            TawaziTypeError: if the Type of the identifier is not Tag, Identifier or ExecNode
        """
        if isinstance(alias, ExecNode):
            if alias.id not in self.node_dict:
                raise ValueError(f"ExecNode {alias} not found in DAG")
            return [alias.id]
        # todo: do further validation for the case of the tag!!
        if isinstance(alias, (Identifier, tuple)):
            # if leaves_identification is not ExecNode, it can be either
            #  1. a Tag (Highest priority in case an id with the same value exists)
            nodes = self.tagged_nodes.get(alias)
            if nodes:
                return [node.id for node in nodes]
            #  2. or a node id!
            if isinstance(alias, Identifier) and alias in self.node_dict:
                node = self.get_node_by_id(alias)
                return [node.id]
            raise ValueError(
                f"node or tag {alias} not found in DAG.\n"
                f" Available nodes are {self.node_dict}.\n"
                f" Available tags are {list(self.tagged_nodes.keys())}"
            )
        raise TawaziTypeError(
            "target_nodes must be of type ExecNode, "
            f"str or tuple identifying the node but provided {alias}"
        )

    # NOTE: this function is named wrongly!
    def _get_target_ids(self, target_nodes: Sequence[Alias]) -> List[Identifier]:
        """Get the ids of ExecNodes corresponding to target_nodes.

        Args:
            target_nodes (Optional[List[Alias]]): list of a ExecNode Aliases that the user might provide to run a subgraph

        Returns:
            List[Identifier]: Leaf ExecNodes' Identities
        """
        return list(chain(*(self._alias_to_ids(alias) for alias in target_nodes)))

    def _extend_leaves_ids_debug_xns(self, leaves_ids: List[Identifier]) -> List[Identifier]:
        """Extend the leaves_ids to contain all runnable debug node ids.

        For example:
        A
        |
        B
        | \
        D E

        if D is not a debug ExecNode and E is a debug ExecNode.
        If the subgraph whose leaf ExecNode D is executed,
        E should also be included in the execution because it can be executed (debug node whose inputs are provided)
        Hence we should extend the subgraph containing only D to also contain E

        Args:
            leaves_ids (List[Identifier]): the leaves ids of the subgraph

        Returns:
            List[Identifier]: the leaves ids of the new extended subgraph that contains more debug ExecNodes
        """
        new_debug_xn_discovered = True
        while new_debug_xn_discovered:
            new_debug_xn_discovered = False
            for id_ in leaves_ids:
                for successor_id in self.frwrd_deps[id_]:
                    is_successor_debug = self.node_dict[successor_id].debug
                    if successor_id not in leaves_ids and is_successor_debug:
                        # a new debug XN has been discovered!
                        preds_of_succs_ids = [xn_id for xn_id in self.bckrd_deps[successor_id]]

                        if set(preds_of_succs_ids).issubset(set(leaves_ids)):
                            new_debug_xn_discovered = True
                            # this new XN can run by only running the current leaves_ids
                            leaves_ids.append(successor_id)
        return leaves_ids

    def setup(
        self,
        target_nodes: Optional[Sequence[Alias]] = None,
        exclude_nodes: Optional[Sequence[Alias]] = None,
    ) -> None:
        """Run the setup ExecNodes for the DAG.

        If target_nodes are provided, run only the necessary setup ExecNodes, otherwise will run all setup ExecNodes.
        NOTE: `DAG` arguments should not be passed to setup ExecNodes.
            Only pass in constants or setup `ExecNode`s results.


        Args:
            target_nodes (Optional[List[XNId]], optional): The ExecNodes that the user aims to use in the DAG.
                This might include setup or non setup ExecNodes. If None is provided, will run all setup ExecNodes. Defaults to None.
            exclude_nodes (Optional[List[XNId]], optional): The ExecNodes that the user aims to exclude from the DAG.
                The user is responsible for ensuring that the overlapping between the target_nodes and exclude_nodes is logical.
        """
        # 1. select all setup ExecNodes
        #  do not copy the setup nodes because we want them to be modified per DAG instance!
        all_setup_nodes = {id_: xn for id_, xn in self.node_dict.items() if xn.setup}

        # 2. if target_nodes is not provided run all setup ExecNodes
        if target_nodes is None:
            target_ids = list(all_setup_nodes.keys())
            graph = self._make_subgraph(target_ids, exclude_nodes)

        else:
            # 2.1 the leaves_ids that the user wants to execute
            #  however they might contain non setup nodes... so we should extract all the nodes ids
            #  that must be run in order to run the target_nodes ExecNodes
            #  afterwards we can remove the non setup nodes
            target_ids = self._get_target_ids(target_nodes)

            # 2.2 filter non setup ExecNodes
            graph = self._make_subgraph(target_ids, exclude_nodes)
            ids_to_remove = [id_ for id_ in graph if id_ not in all_setup_nodes]

            for id_ in ids_to_remove:
                graph.remove_node(id_)
        # TODO: handle debug XNs!

        self._execute(graph)

    def executor(self, **kwargs: Any) -> "DAGExecution[P, RVDAG]":
        """Generates a DAGExecution for the DAG.

        Args:
            **kwargs (Any): keyword arguments to be passed to DAGExecution's constructor

        Returns:
            DAGExecution: an executor for the DAG
        """
        return DAGExecution(self, **kwargs)

    def _make_subgraph(
        self,
        target_nodes: Optional[Sequence[Alias]] = None,
        exclude_nodes: Optional[Sequence[Alias]] = None,
    ) -> nx.DiGraph:
        graph = deepcopy(self.graph_ids)

        if target_nodes is not None:
            target_ids = self._get_target_ids(target_nodes)
            graph.subgraph_leaves(target_ids)

        if exclude_nodes is not None:
            exclude_ids = list(chain(*(self._alias_to_ids(alias) for alias in exclude_nodes)))

            for id_ in exclude_ids:
                # maybe previously removed by :
                # 1. not being inside the subgraph
                # 2. being a successor of an excluded node
                if id_ in graph:
                    graph.remove_recursively(id_)

        if target_nodes and exclude_nodes:
            for id_ in target_ids:
                if id_ not in graph:
                    raise TawaziUsageError(
                        f"target_nodes include {id_} which is removed by exclude_nodes: {exclude_ids}, "
                        f"please verify that they don't overlap in a non logical way!"
                    )

        # handle debug nodes
        if cfg.RUN_DEBUG_NODES:
            leaves_ids = graph.leaf_nodes
            # after extending leaves_ids, we should do a recheck because this might recreate another debug-able XN...
            target_ids = self._extend_leaves_ids_debug_xns(leaves_ids)

            # extend the graph with the debug XNs
            # This is not efficient but it is ok since we are debugging the code anyways
            debug_graph = deepcopy(self.graph_ids)
            debug_graph.subgraph_leaves(list(graph.nodes) + target_ids)
            graph = debug_graph

        # 3. clean all debug XNs if they shouldn't run!
        else:
            to_remove = [id_ for id_ in graph if self.node_dict[id_].debug]
            for id_ in to_remove:
                graph.remove_node(id_)

        return graph

    def __call__(self, *args: P.args, **kwargs: P.kwargs) -> RVDAG:
        """Execute the DAG scheduler via a similar interface to the function that describes the dependencies.

        Note: Currently kwargs are not supported.
            They will supported soon!

        Args:
            *args (P.args): arguments to be passed to the call of the DAG
            **kwargs (P.kwargs): keyword arguments to be passed to the call of the DAG

        Returns:
            RVDAG: return value of the DAG's execution

        Raises:
            TawaziUsageError: kwargs are passed
        """
        if kwargs:
            raise TawaziUsageError(f"currently DAG does not support keyword arguments: {kwargs}")
        # 1. generate the subgraph to be executed
        graph = self._make_subgraph()

        # 2. copy the ExecNodes
        call_xn_dict = self._make_call_xn_dict(*args)

        # 3. Execute the scheduler
        all_nodes_dict = self._execute(graph, call_xn_dict)

        # 4. extract the returned value/values
        return self._get_return_values(all_nodes_dict)  # type: ignore[return-value]

    def _make_call_xn_dict(self, *args: Any) -> Dict[Identifier, ExecNode]:
        """Generate the calling ExecNode dict.

        This is a dict containing ExecNodes that will be executed (hence modified) by the DAG scheduler.
        This takes into consideration:
         1. deep copying the ExecNodes
         2. filling the arguments of the call
         3. skipping the copy for setup ExecNodes

        Args:
            *args (Any): arguments to be passed to the call of the DAG

        Returns:
            Dict[Identifier, ExecNode]: The modified ExecNode dict which will be executed by the DAG scheduler.

        Raises:
            TypeError: If called with an invalid number of arguments
        """
        # 1. deepcopy the node_dict because it will be modified by the DAG's execution
        call_xn_dict = copy_non_setup_xns(self.node_dict)

        # 2. parse the input arguments of the pipeline
        # 2.1 default valued arguments can be skipped and not provided!
        # note: if not enough arguments are provided then the code will fail
        # inside the DAG's execution through the raise_err lambda
        if args:
            # 2.2 can't provide more than enough arguments
            if len(args) > len(self.input_uxns):
                raise TypeError(
                    f"The DAG takes a maximum of {len(self.input_uxns)} arguments. {len(args)} arguments provided"
                )

            # 2.3 modify ExecNodes corresponding to input ArgExecNodes
            for ind_arg, arg in enumerate(args):
                node_id = self.input_uxns[ind_arg].id

                call_xn_dict[node_id].result = arg

        return call_xn_dict

    def _get_return_values(self, xn_dict: Dict[Identifier, ExecNode]) -> RVTypes:
        """Extract the return value/values from the output of the DAG's scheduler!

        Args:
            xn_dict (Dict[Identifier, ExecNode]): Modified ExecNodes returned by the DAG's scheduler

        Raises:
            TawaziTypeError: if the type of the return value is not compatible with RVTypes

        Returns:
            RVTypes: the actual values extracted from xn_dict
        """
        return_uxns = self.return_uxns
        if return_uxns is None:
            return None
        if isinstance(return_uxns, UsageExecNode):
            return return_uxns.result(xn_dict)
        if isinstance(return_uxns, (tuple, list)):
            gen = (ren_uxn.result(xn_dict) for ren_uxn in return_uxns)
            if isinstance(return_uxns, tuple):
                return tuple(gen)
            if isinstance(return_uxns, list):
                return list(gen)
        if isinstance(return_uxns, dict):
            return {key: ren_uxn.result(xn_dict) for key, ren_uxn in return_uxns.items()}

        raise TawaziTypeError("Return type for the DAG can only be a single value, Tuple or List")

    # NOTE: this function should be used in case there was a bizarre behavior noticed during
    #   the execution of the DAG via DAG.execute(...)
    def _safe_execute(
        self,
        *args: Any,
        target_nodes: Optional[List[Alias]] = None,
        exclude_nodes: Optional[List[Alias]] = None,
    ) -> Any:
        """Execute the ExecNodes in topological order without priority in for loop manner for debugging purposes (Experimental).

        Args:
            *args (Any): Positional arguments passed to the DAG
            target_nodes (Optional[List[Alias]]): the ExecNodes that should be considered to construct the subgraph
            exclude_nodes (Optional[List[Alias]]): the ExecNodes that shouldn't run

        Returns:
            Any: the result of the execution of the DAG.
             If an ExecNode returns a value in the DAG but is not executed, it will return None.
        """
        # 1. make the graph_ids to be executed!
        graph = self._make_subgraph(target_nodes, exclude_nodes)

        # 2. make call_xn_dict that will be modified
        call_xn_dict = self._make_call_xn_dict(*args)

        # 3. deep copy the node_dict to store the results in each node
        for xn_id in graph.topologically_sorted:
            # only execute ExecNodes that are part of the subgraph
            call_xn_dict[xn_id]._execute(call_xn_dict)

        # 4. make returned values
        return self._get_return_values(call_xn_dict)

    def config_from_dict(self, config: Dict[str, Any]) -> None:
        """Allows reconfiguring the parameters of the nodes from a dictionary.

        Args:
            config (Dict[str, Any]): the dictionary containing the config
                example: {"nodes": {"a": {"priority": 3, "is_sequential": True}}, "max_concurrency": 3}

        Raises:
            ValueError: if two nodes are configured by the provided config (which is ambiguous)
        """

        def _override_node_config(n: ExecNode, cfg: Dict[str, Any]) -> bool:
            if "is_sequential" in cfg:
                n.is_sequential = cfg["is_sequential"]

            if "priority" in cfg:
                n.priority = cfg["priority"]
                return True

            return False

        prio_flag = False
        visited: Dict[str, Any] = {}
        if "nodes" in config:
            for alias, conf_node in config["nodes"].items():
                ids_ = self._alias_to_ids(alias)
                for node_id in ids_:
                    if node_id not in visited:
                        node = self.get_node_by_id(node_id)
                        node_prio_flag = _override_node_config(node, conf_node)
                        prio_flag = node_prio_flag or prio_flag  # keep track of flag

                    else:
                        raise ValueError(
                            f"trying to set two configs for node {node_id}.\n 1) {visited[node_id]}\n 2) {conf_node}"
                        )

                    visited[node_id] = conf_node

        if "max_concurrency" in config:
            self.max_concurrency = config["max_concurrency"]

        if prio_flag:
            # if we changed the priority of some nodes we need to recompute the compound prio
            self._assign_compound_priority()

    def config_from_yaml(self, config_path: str) -> None:
        """Allows reconfiguring the parameters of the nodes from a YAML file.

        Args:
            config_path: the path to the YAML file
        """
        with open(config_path) as f:
            yaml_config = yaml.load(f, Loader=_UniqueKeyLoader)  # noqa: S506

        self.config_from_dict(yaml_config)

    def config_from_json(self, config_path: str) -> None:
        """Allows reconfiguring the parameters of the nodes from a JSON file.

        Args:
            config_path: the path to the JSON file
        """
        with open(config_path) as f:
            json_config = json.load(f)

        self.config_from_dict(json_config)

__call__(*args, **kwargs)

Execute the DAG scheduler via a similar interface to the function that describes the dependencies.

Currently kwargs are not supported.

They will supported soon!

Parameters:

Name Type Description Default
*args P.args

arguments to be passed to the call of the DAG

()
**kwargs P.kwargs

keyword arguments to be passed to the call of the DAG

{}

Returns:

Name Type Description
RVDAG RVDAG

return value of the DAG's execution

Raises:

Type Description
TawaziUsageError

kwargs are passed

Source code in tawazi/_dag/dag.py
def __call__(self, *args: P.args, **kwargs: P.kwargs) -> RVDAG:
    """Execute the DAG scheduler via a similar interface to the function that describes the dependencies.

    Note: Currently kwargs are not supported.
        They will supported soon!

    Args:
        *args (P.args): arguments to be passed to the call of the DAG
        **kwargs (P.kwargs): keyword arguments to be passed to the call of the DAG

    Returns:
        RVDAG: return value of the DAG's execution

    Raises:
        TawaziUsageError: kwargs are passed
    """
    if kwargs:
        raise TawaziUsageError(f"currently DAG does not support keyword arguments: {kwargs}")
    # 1. generate the subgraph to be executed
    graph = self._make_subgraph()

    # 2. copy the ExecNodes
    call_xn_dict = self._make_call_xn_dict(*args)

    # 3. Execute the scheduler
    all_nodes_dict = self._execute(graph, call_xn_dict)

    # 4. extract the returned value/values
    return self._get_return_values(all_nodes_dict)  # type: ignore[return-value]

config_from_dict(config)

Allows reconfiguring the parameters of the nodes from a dictionary.

Parameters:

Name Type Description Default
config Dict[str, Any]

the dictionary containing the config example: {"nodes": {"a": {"priority": 3, "is_sequential": True}}, "max_concurrency": 3}

required

Raises:

Type Description
ValueError

if two nodes are configured by the provided config (which is ambiguous)

Source code in tawazi/_dag/dag.py
def config_from_dict(self, config: Dict[str, Any]) -> None:
    """Allows reconfiguring the parameters of the nodes from a dictionary.

    Args:
        config (Dict[str, Any]): the dictionary containing the config
            example: {"nodes": {"a": {"priority": 3, "is_sequential": True}}, "max_concurrency": 3}

    Raises:
        ValueError: if two nodes are configured by the provided config (which is ambiguous)
    """

    def _override_node_config(n: ExecNode, cfg: Dict[str, Any]) -> bool:
        if "is_sequential" in cfg:
            n.is_sequential = cfg["is_sequential"]

        if "priority" in cfg:
            n.priority = cfg["priority"]
            return True

        return False

    prio_flag = False
    visited: Dict[str, Any] = {}
    if "nodes" in config:
        for alias, conf_node in config["nodes"].items():
            ids_ = self._alias_to_ids(alias)
            for node_id in ids_:
                if node_id not in visited:
                    node = self.get_node_by_id(node_id)
                    node_prio_flag = _override_node_config(node, conf_node)
                    prio_flag = node_prio_flag or prio_flag  # keep track of flag

                else:
                    raise ValueError(
                        f"trying to set two configs for node {node_id}.\n 1) {visited[node_id]}\n 2) {conf_node}"
                    )

                visited[node_id] = conf_node

    if "max_concurrency" in config:
        self.max_concurrency = config["max_concurrency"]

    if prio_flag:
        # if we changed the priority of some nodes we need to recompute the compound prio
        self._assign_compound_priority()

config_from_json(config_path)

Allows reconfiguring the parameters of the nodes from a JSON file.

Parameters:

Name Type Description Default
config_path str

the path to the JSON file

required
Source code in tawazi/_dag/dag.py
def config_from_json(self, config_path: str) -> None:
    """Allows reconfiguring the parameters of the nodes from a JSON file.

    Args:
        config_path: the path to the JSON file
    """
    with open(config_path) as f:
        json_config = json.load(f)

    self.config_from_dict(json_config)

config_from_yaml(config_path)

Allows reconfiguring the parameters of the nodes from a YAML file.

Parameters:

Name Type Description Default
config_path str

the path to the YAML file

required
Source code in tawazi/_dag/dag.py
def config_from_yaml(self, config_path: str) -> None:
    """Allows reconfiguring the parameters of the nodes from a YAML file.

    Args:
        config_path: the path to the YAML file
    """
    with open(config_path) as f:
        yaml_config = yaml.load(f, Loader=_UniqueKeyLoader)  # noqa: S506

    self.config_from_dict(yaml_config)

draw(k=0.8, display=True, t=3)

Draws the Networkx directed graph.

Parameters:

Name Type Description Default
k float

parameter for the layout of the graph, the higher, the further the nodes apart. Defaults to 0.8.

0.8
display bool

display the layout created. Defaults to True.

True
t int

time to display in seconds. Defaults to 3.

3
Source code in tawazi/_dag/dag.py
def draw(self, k: float = 0.8, display: bool = True, t: int = 3) -> None:
    """Draws the Networkx directed graph.

    Args:
        k (float): parameter for the layout of the graph, the higher, the further the nodes apart. Defaults to 0.8.
        display (bool): display the layout created. Defaults to True.
        t (int): time to display in seconds. Defaults to 3.
    """
    import matplotlib.pyplot as plt

    # TODO: use graphviz instead! it is much more elegant

    pos = nx.spring_layout(self.graph_ids, seed=42069, k=k, iterations=20)
    nx.draw(self.graph_ids, pos, with_labels=True)
    if display:
        plt.ion()
        plt.show()
        time.sleep(t)
        plt.close()

executor(**kwargs)

Generates a DAGExecution for the DAG.

Parameters:

Name Type Description Default
**kwargs Any

keyword arguments to be passed to DAGExecution's constructor

{}

Returns:

Name Type Description
DAGExecution DAGExecution[P, RVDAG]

an executor for the DAG

Source code in tawazi/_dag/dag.py
def executor(self, **kwargs: Any) -> "DAGExecution[P, RVDAG]":
    """Generates a DAGExecution for the DAG.

    Args:
        **kwargs (Any): keyword arguments to be passed to DAGExecution's constructor

    Returns:
        DAGExecution: an executor for the DAG
    """
    return DAGExecution(self, **kwargs)

get_node_by_id(id_)

Get the ExecNode with the given id.

the returned ExecNode is not modified by any execution!

This means that you can not get the result of its execution via DAG.get_node_by_id(<id>).result. In order to do that, you need to make a DAGExecution and then call DAGExecution.get_node_by_id(<id>).result, which will contain the results.

Parameters:

Name Type Description Default
id_ Identifier

id of the ExecNode

required

Returns:

Name Type Description
ExecNode ExecNode

corresponding ExecNode

Source code in tawazi/_dag/dag.py
def get_node_by_id(self, id_: Identifier) -> ExecNode:
    """Get the ExecNode with the given id.

    Note: the returned ExecNode is not modified by any execution!
        This means that you can not get the result of its execution via `DAG.get_node_by_id(<id>).result`.
        In order to do that, you need to make a DAGExecution and then call `DAGExecution.get_node_by_id(<id>).result`, which will contain the results.

    Args:
        id_ (Identifier): id of the ExecNode

    Returns:
        ExecNode: corresponding ExecNode
    """
    # TODO: ? catch the keyError and
    #   help the user know the id of the ExecNode by pointing to documentation!?
    return self.node_dict[id_]

get_nodes_by_tag(tag)

Get the ExecNodes with the given tag.

the returned ExecNode is not modified by any execution!

This means that you can not get the result of its execution via DAG.get_nodes_by_tag(<tag>).result. In order to do that, you need to make a DAGExecution and then call DAGExecution.get_nodes_by_tag(<tag>).result, which will contain the results.

Parameters:

Name Type Description Default
tag Any

tag of the ExecNodes

required

Returns:

Type Description
List[ExecNode]

List[ExecNode]: corresponding ExecNodes

Source code in tawazi/_dag/dag.py
def get_nodes_by_tag(self, tag: Tag) -> List[ExecNode]:
    """Get the ExecNodes with the given tag.

    Note: the returned ExecNode is not modified by any execution!
        This means that you can not get the result of its execution via `DAG.get_nodes_by_tag(<tag>).result`.
        In order to do that, you need to make a DAGExecution and then call `DAGExecution.get_nodes_by_tag(<tag>).result`, which will contain the results.

    Args:
        tag (Any): tag of the ExecNodes

    Returns:
        List[ExecNode]: corresponding ExecNodes
    """
    if isinstance(tag, Tag):
        return self.tagged_nodes[tag]
    raise TypeError(f"tag {tag} must be of Tag type. Got {type(tag)}")

setup(target_nodes=None, exclude_nodes=None)

Run the setup ExecNodes for the DAG.

If target_nodes are provided, run only the necessary setup ExecNodes, otherwise will run all setup ExecNodes.

DAG arguments should not be passed to setup ExecNodes.

Only pass in constants or setup ExecNodes results.

Parameters:

Name Type Description Default
target_nodes Optional[List[XNId]]

The ExecNodes that the user aims to use in the DAG. This might include setup or non setup ExecNodes. If None is provided, will run all setup ExecNodes. Defaults to None.

None
exclude_nodes Optional[List[XNId]]

The ExecNodes that the user aims to exclude from the DAG. The user is responsible for ensuring that the overlapping between the target_nodes and exclude_nodes is logical.

None
Source code in tawazi/_dag/dag.py
def setup(
    self,
    target_nodes: Optional[Sequence[Alias]] = None,
    exclude_nodes: Optional[Sequence[Alias]] = None,
) -> None:
    """Run the setup ExecNodes for the DAG.

    If target_nodes are provided, run only the necessary setup ExecNodes, otherwise will run all setup ExecNodes.
    NOTE: `DAG` arguments should not be passed to setup ExecNodes.
        Only pass in constants or setup `ExecNode`s results.


    Args:
        target_nodes (Optional[List[XNId]], optional): The ExecNodes that the user aims to use in the DAG.
            This might include setup or non setup ExecNodes. If None is provided, will run all setup ExecNodes. Defaults to None.
        exclude_nodes (Optional[List[XNId]], optional): The ExecNodes that the user aims to exclude from the DAG.
            The user is responsible for ensuring that the overlapping between the target_nodes and exclude_nodes is logical.
    """
    # 1. select all setup ExecNodes
    #  do not copy the setup nodes because we want them to be modified per DAG instance!
    all_setup_nodes = {id_: xn for id_, xn in self.node_dict.items() if xn.setup}

    # 2. if target_nodes is not provided run all setup ExecNodes
    if target_nodes is None:
        target_ids = list(all_setup_nodes.keys())
        graph = self._make_subgraph(target_ids, exclude_nodes)

    else:
        # 2.1 the leaves_ids that the user wants to execute
        #  however they might contain non setup nodes... so we should extract all the nodes ids
        #  that must be run in order to run the target_nodes ExecNodes
        #  afterwards we can remove the non setup nodes
        target_ids = self._get_target_ids(target_nodes)

        # 2.2 filter non setup ExecNodes
        graph = self._make_subgraph(target_ids, exclude_nodes)
        ids_to_remove = [id_ for id_ in graph if id_ not in all_setup_nodes]

        for id_ in ids_to_remove:
            graph.remove_node(id_)
    # TODO: handle debug XNs!

    self._execute(graph)

compose(inputs, outputs, **kwargs)

Compose a new DAG using inputs and outputs ExecNodes (Experimental).

All provided Aliases must point to unique ExecNodes. Otherwise ValueError is raised The user is responsible to correctly specify inputs and outputs signature of the DAG. * The inputs can be specified as a single Alias or a Sequence of Aliases. * The outputs can be specified as a single Alias (a single value is returned) or a Sequence of Aliases in which case a Tuple of the values are returned. If outputs are specified as [], () is returned. The syntax is the following:

from tawazi import dag, xn, DAG from typing import Tuple, Any @xn ... def unwanted_xn() -> int: return 42 @xn ... def x(v: Any) -> int: return int(v) @xn ... def y(v: Any) -> str: return str(v) @xn ... def z(x: int, y: str) -> float: return float(x) + float(y) @dag ... def pipe() -> Tuple[int, float, int]: ... a = unwanted_xn() ... res = z(x(1), y(1)) ... b = unwanted_xn() ... return a, res, b composed_dag = pipe.compose([x, y], z) assert composed_dag(1, 1) == 2.0

composed_dag: DAG[[int, str], float] = pipe.compose([x, y], [z]) # optional typing of the returned DAG!

assert composed_dag(1, 1) == 2.0 # type checked!

Parameters:

Name Type Description Default
inputs Alias | List[Alias]

the Inputs nodes whose results are provided.

required
outputs Alias | List[Alias]

the Output nodes that must execute last, The ones that will generate results

required
**kwargs Dict[str, Any]

additional arguments to be passed to the DAG's constructor

{}
Source code in tawazi/_dag/dag.py
def compose(self, inputs: Union[Alias, Sequence[Alias]], outputs: Union[Alias, Sequence[Alias]], **kwargs: Dict[str, Any]) -> "DAG":  # type: ignore[type-arg]
    """Compose a new DAG using inputs and outputs ExecNodes (Experimental).

    All provided `Alias`es must point to unique `ExecNode`s. Otherwise ValueError is raised
    The user is responsible to correctly specify inputs and outputs signature of the `DAG`.
    * The inputs can be specified as a single `Alias` or a `Sequence` of `Alias`es.
    * The outputs can be specified as a single `Alias` (a single value is returned)
    or a `Sequence` of `Alias`es in which case a Tuple of the values are returned.
    If outputs are specified as [], () is returned.
    The syntax is the following:
    >>> from tawazi import dag, xn, DAG
    >>> from typing import Tuple, Any
    >>> @xn
    ... def unwanted_xn() -> int: return 42
    >>> @xn
    ... def x(v: Any) -> int: return int(v)
    >>> @xn
    ... def y(v: Any) -> str: return str(v)
    >>> @xn
    ... def z(x: int, y: str) -> float: return float(x) + float(y)
    >>> @dag
    ... def pipe() -> Tuple[int, float, int]:
    ...     a = unwanted_xn()
    ...     res = z(x(1), y(1))
    ...     b = unwanted_xn()
    ...     return a, res, b
    >>> composed_dag = pipe.compose([x, y], z)
    >>> assert composed_dag(1, 1) == 2.0
    >>> # composed_dag: DAG[[int, str], float] = pipe.compose([x, y], [z])  # optional typing of the returned DAG!
    >>> # assert composed_dag(1, 1) == 2.0  # type checked!


    Args:
        inputs (Alias | List[Alias]): the Inputs nodes whose results are provided.
        outputs (Alias | List[Alias]): the Output nodes that must execute last, The ones that will generate results
        **kwargs (Dict[str, Any]): additional arguments to be passed to the DAG's constructor
    """
    # what happens for edge cases ??
    # 1. if inputs are less than sufficient to produce outputs (-> error)
    # 2. if inputs are more than sufficient to produce outputs (-> warning)
    # 3. if inputs are successors of outputs (-> error)
    # 5. if inputs are successors of inputs but predecessors of outputs
    # 1. inputs and outputs are overlapping (-> error ambiguous ? maybe not)
    # 1. if inputs & outputs are [] (-> ())
    # 4. cst cubgraph inputs is [], outputs is not [] but contains constants (-> works as expected)
    # 5. inputs is not [], outputs is [] same as 2.
    # 6. a subcase of the above (some inputs suffice to produce some of the outputs, but some inputs don't)
    # 7. advanced usage: if inputs contain ... (i.e. Ellipsis) in this case we must expand it to reach all the remaining XN in a smart manner
    #  we should keep the order of the inputs and outputs (future)
    # 8. what if some arguments have default values? should they be provided by the user?
    # 9. how to specify that arguments of the original DAG should be provided by the user? the user should provide the input's ID which is not a stable Alias yet

    def _alias_or_aliases_to_ids(
        alias_or_aliases: Union[Alias, Sequence[Alias]]
    ) -> List[Identifier]:
        if isinstance(alias_or_aliases, str) or isinstance(alias_or_aliases, ExecNode):
            return [self._get_single_xn_by_alias(alias_or_aliases).id]
        return [self._get_single_xn_by_alias(a_id).id for a_id in alias_or_aliases]

    def _raise_input_successor_of_input(pred: Identifier, succ: Set[Identifier]) -> NoReturn:
        raise ValueError(
            f"Input ExecNodes {succ} depend on Input ExecNode {pred}."
            f"this is ambiguous. Remove either one of them."
        )

    def _raise_missing_input(input_: Identifier) -> NoReturn:
        raise ValueError(
            f"ExecNode {input_} are not declared as inputs. "
            f"Either declare them as inputs or modify the requests outputs."
        )

    def _alias_or_aliases_to_uxns(
        alias_or_aliases: Union[Alias, Sequence[Alias]]
    ) -> ReturnUXNsType:
        if isinstance(alias_or_aliases, str) or isinstance(alias_or_aliases, ExecNode):
            return UsageExecNode(self._get_single_xn_by_alias(alias_or_aliases).id)
        return tuple(
            UsageExecNode(self._get_single_xn_by_alias(a_id).id) for a_id in alias_or_aliases
        )

    # 1. get input ids and output ids.
    #  Alias should correspond to a single ExecNode,
    #  otherwise an ambiguous situation exists, raise error
    in_ids = _alias_or_aliases_to_ids(inputs)
    out_ids = _alias_or_aliases_to_ids(outputs)

    # 2.1 contains all the ids of the nodes that will be in the new DAG
    set_xn_ids = set(in_ids + out_ids)

    # 2.2 all ancestors of the inputs
    in_ids_ancestors: Set[Identifier] = self.graph_ids.ancestors_of_iter(in_ids)

    # 3. check edge cases
    # inputs should not be successors of inputs, otherwise (error)
    # and inputs should produce at least one of the outputs, otherwise (warning)
    for i in in_ids:
        # if pred is ancestor of an input, raise error
        if i in in_ids_ancestors:
            _raise_input_successor_of_input(i, set_xn_ids)

        # if i doesn't produce any of the wanted outputs, raise a warning!
        descendants: Set[Identifier] = nx.descendants(self.graph_ids, i)
        if descendants.isdisjoint(out_ids):
            warnings.warn(
                f"Input ExecNode {i} is not used to produce any of the requested outputs."
                f"Consider removing it from the inputs.",
                stacklevel=2,
            )

    # 4. collect necessary ExecNodes' IDS

    # 4.1 original DAG's inputs that don't contain default values.
    # used to detect missing inputs
    dag_inputs_ids = [
        uxn.id for uxn in self.input_uxns if self.node_dict[uxn.id].result is NoVal
    ]

    # 4.2 define helper function
    def _add_missing_deps(candidate_id: Identifier, set_xn_ids: Set[Identifier]) -> None:
        """Adds missing dependency to the set of ExecNodes that will be in the new DAG.

        Note: uses nonlocal variable dag_inputs_ids

        Args:
            candidate_id (Identifier): candidate id of an `ExecNode` that will be in the new DAG
            set_xn_ids (Set[Identifier]): Set of `ExecNode`s that will be in the new DAG
        """
        preds = self.graph_ids.predecessors(candidate_id)
        for pred in preds:
            if pred not in set_xn_ids:
                # this candidate is necessary to produce the output,
                # it is an input to the original DAG
                # it is not provided as an input to the composed DAG
                # hence the user forgot to supply it! (raise error)
                if pred in dag_inputs_ids:
                    _raise_missing_input(pred)

                # necessary intermediate dependency.
                # collect it in the set
                set_xn_ids.add(pred)
                _add_missing_deps(pred, set_xn_ids)

    # 4.3 add all required dependencies for each output
    for o_id in out_ids:
        _add_missing_deps(o_id, set_xn_ids)

    # 5.1 copy the ExecNodes that will be in the composed DAG because
    #  maybe the composed DAG will modify them (e.g. change their tags)
    #  and we don't want to modify the original DAG
    xn_dict = {xn_id: copy(self.node_dict[xn_id]) for xn_id in set_xn_ids}

    # 5.2 change the inputs of the ExecNodes into ArgExecNodes
    for xn_id, xn in xn_dict.items():
        if xn_id in in_ids:
            logger.debug("changing Composed-DAG's input {} into ArgExecNode", xn_id)
            xn.__class__ = ArgExecNode
            xn.exec_function = _make_raise_arg_error("composed", xn.id)
            # eliminate all dependencies
            xn.args = []
            xn.kwargs = {}

    # 5.3 make the inputs and outputs UXNs for the composed DAG
    in_uxns = [UsageExecNode(xn_id) for xn_id in in_ids]
    # if a single value is returned make the output a single value
    out_uxns = _alias_or_aliases_to_uxns(outputs)

    # 6. return the composed DAG
    # ignore[arg-type] because the type of the kwargs is not known
    return DAG(xn_dict, in_uxns, out_uxns, **kwargs)  # type: ignore[arg-type]