Skip to content

node

kedro.pipeline.node

This module provides user-friendly functions for creating nodes as parts of Kedro pipelines.

GroupedNodes dataclass

GroupedNodes(name, type, nodes=list(), dependencies=list())

Represents a logical group of nodes, typically by namespace or a custom grouping. A group can also consist of a single node. This is used to support deployment—for example, by executing the entire group in a single container run.

Node

Node(func, inputs, outputs, *, name=None, tags=None, confirms=None, namespace=None)

Node is an auxiliary class facilitating the operations required to run user-provided functions as part of Kedro pipelines.

Parameters:

  • func (Callable) –

    A function that corresponds to the node logic. The function should have at least one input or output.

  • inputs (str | list[str] | dict[str, str] | None) –

    The name or the list of the names of variables used as inputs to the function. The number of names should match the number of arguments in the definition of the provided function. When dict[str, str] is provided, variable names will be mapped to function argument names.

  • outputs (str | list[str] | dict[str, str] | None) –

    The name or the list of the names of variables used as outputs of the function. The number of names should match the number of outputs returned by the provided function. When dict[str, str] is provided, variable names will be mapped to the named outputs the function returns.

  • name (str | None, default: None ) –

    Optional node name to be used when displaying the node in logs or any other visualisations. Valid node name must contain only letters, digits, hyphens, underscores and/or fullstops.

  • tags (str | Iterable[str] | None, default: None ) –

    Optional set of tags to be applied to the node. Valid node tag must contain only letters, digits, hyphens, underscores and/or fullstops.

  • confirms (str | list[str] | None, default: None ) –

    Optional name or the list of the names of the datasets that should be confirmed. This will result in calling confirm() method of the corresponding dataset instance. Specified dataset names do not necessarily need to be present in the node inputs or outputs.

  • namespace (str | None, default: None ) –

    Optional node namespace.

Raises:

  • ValueError

    Raised in the following cases: a) When the provided arguments do not conform to the format suggested by the type hint of the argument. b) When the node produces multiple outputs with the same name. c) When an input has the same name as an output. d) When the given node name violates the requirements: it must contain only letters, digits, hyphens, underscores and/or fullstops.

Source code in kedro/pipeline/node.py
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
def __init__(  # noqa: PLR0913
    self,
    func: Callable,
    inputs: str | list[str] | dict[str, str] | None,
    outputs: str | list[str] | dict[str, str] | None,
    *,
    name: str | None = None,
    tags: str | Iterable[str] | None = None,
    confirms: str | list[str] | None = None,
    namespace: str | None = None,
):
    """Create a node in the pipeline by providing a function to be called
    along with variable names for inputs and/or outputs.

    Args:
        func: A function that corresponds to the node logic.
            The function should have at least one input or output.
        inputs: The name or the list of the names of variables used as
            inputs to the function. The number of names should match
            the number of arguments in the definition of the provided
            function. When dict[str, str] is provided, variable names
            will be mapped to function argument names.
        outputs: The name or the list of the names of variables used
            as outputs of the function. The number of names should match
            the number of outputs returned by the provided function.
            When dict[str, str] is provided, variable names will be mapped
            to the named outputs the function returns.
        name: Optional node name to be used when displaying the node in
            logs or any other visualisations. Valid node name must contain
            only letters, digits, hyphens, underscores and/or fullstops.
        tags: Optional set of tags to be applied to the node. Valid node tag must
            contain only letters, digits, hyphens, underscores and/or fullstops.
        confirms: Optional name or the list of the names of the datasets
            that should be confirmed. This will result in calling
            ``confirm()`` method of the corresponding dataset instance.
            Specified dataset names do not necessarily need to be present
            in the node ``inputs`` or ``outputs``.
        namespace: Optional node namespace.

    Raises:
        ValueError: Raised in the following cases:
            a) When the provided arguments do not conform to
            the format suggested by the type hint of the argument.
            b) When the node produces multiple outputs with the same name.
            c) When an input has the same name as an output.
            d) When the given node name violates the requirements:
            it must contain only letters, digits, hyphens, underscores
            and/or fullstops.

    """
    if not callable(func):
        raise ValueError(
            _node_error_message(
                f"first argument must be a function, not '{type(func).__name__}'."
            )
        )

    if inputs and not isinstance(inputs, (list, dict, str)):
        raise ValueError(
            _node_error_message(
                f"'inputs' type must be one of [String, List, Dict, None], "
                f"not '{type(inputs).__name__}'."
            )
        )

    for _input in _to_list(inputs):
        if not isinstance(_input, str):
            raise ValueError(
                _node_error_message(
                    f"names of variables used as inputs to the function "
                    f"must be of 'String' type, but {_input} from {inputs} "
                    f"is '{type(_input)}'."
                )
            )
        _node_dataset_name_validation(_input, namespace)

    if outputs and not isinstance(outputs, (list, dict, str)):
        raise ValueError(
            _node_error_message(
                f"'outputs' type must be one of [String, List, Dict, None], "
                f"not '{type(outputs).__name__}'."
            )
        )

    for _output in _to_list(outputs):
        if not isinstance(_output, str):
            raise ValueError(
                _node_error_message(
                    f"names of variables used as outputs of the function "
                    f"must be of 'String' type, but {_output} from {outputs} "
                    f"is '{type(_output)}'."
                )
            )
        _node_dataset_name_validation(_output, namespace)

    if not inputs and not outputs:
        raise ValueError(
            _node_error_message("it must have some 'inputs' or 'outputs'.")
        )

    self._validate_inputs(func, inputs)

    self._func = func
    self._inputs = inputs
    # The type of _outputs is picked up as possibly being None, however the checks above prevent that
    # ever being the case. Mypy doesn't get that though, so it complains about the assignment of outputs to
    # _outputs with different types.
    self._outputs: str | list[str] | dict[str, str] = outputs  # type: ignore[assignment]
    if name and not re.match(r"[\w\.-]+$", name):
        raise ValueError(
            f"'{name}' is not a valid node name. It must contain only "
            f"letters, digits, hyphens, underscores and/or fullstops."
        )
    self._name = name
    self._namespace = namespace
    self._tags = set(_to_list(tags))
    for tag in self._tags:
        if not re.match(r"[\w\.-]+$", tag):
            raise ValueError(
                f"'{tag}' is not a valid node tag. It must contain only "
                f"letters, digits, hyphens, underscores and/or fullstops."
            )

    self._validate_unique_outputs()
    self._validate_inputs_dif_than_outputs()
    self._confirms = confirms

confirms property

confirms

Return dataset names to confirm as a list.

Returns:

  • list[str]

    Dataset names to confirm as a list.

func property writable

func

Exposes the underlying function of the node.

Returns:

  • Callable

    Return the underlying function of the node.

inputs cached property

inputs

Return node inputs as a list, in the order required to bind them properly to the node's function.

Returns:

  • list[str]

    Node input names as a list.

name property

name

Node's name.

Returns:

  • str

    Node's name if provided or the name of its function.

namespace property

namespace

Node's namespace.

Returns:

  • str | None

    String representing node's namespace, typically from outer to inner scopes.

outputs property

outputs

Return node outputs as a list preserving the original order if possible.

Returns:

  • list[str]

    Node output names as a list.

short_name property

short_name

Node's name.

Returns:

  • str

    Returns a short, user-friendly name that is not guaranteed to be unique.

  • str

    The namespace is stripped out of the node name.

tags property

tags

Return the tags assigned to the node.

Returns:

  • set[str]

    Return the set of all assigned tags to the node.

run

run(inputs=None)

Run this node using the provided inputs and return its results in a dictionary.

Parameters:

  • inputs (dict[str, Any] | None, default: None ) –

    Dictionary of inputs as specified at the creation of the node.

Raises:

  • ValueError

    In the following cases: a) The node function inputs are incompatible with the node input definition. Example 1: node definition input is a list of 2 DataFrames, whereas only 1 was provided or 2 different ones were provided. b) The node function outputs are incompatible with the node output definition. Example 1: node function definition is a dictionary, whereas function returns a list. Example 2: node definition output is a list of 5 strings, whereas the function returns a list of 4 objects.

  • Exception

    Any exception thrown during execution of the node.

Returns:

  • dict[str, Any]

    All produced node outputs are returned in a dictionary, where the

  • dict[str, Any]

    keys are defined by the node outputs.

Source code in kedro/pipeline/node.py
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
def run(self, inputs: dict[str, Any] | None = None) -> dict[str, Any]:
    """Run this node using the provided inputs and return its results
    in a dictionary.

    Args:
        inputs: Dictionary of inputs as specified at the creation of
            the node.

    Raises:
        ValueError: In the following cases:
            a) The node function inputs are incompatible with the node
            input definition.
            Example 1: node definition input is a list of 2
            DataFrames, whereas only 1 was provided or 2 different ones
            were provided.
            b) The node function outputs are incompatible with the node
            output definition.
            Example 1: node function definition is a dictionary,
            whereas function returns a list.
            Example 2: node definition output is a list of 5
            strings, whereas the function returns a list of 4 objects.
        Exception: Any exception thrown during execution of the node.

    Returns:
        All produced node outputs are returned in a dictionary, where the
        keys are defined by the node outputs.

    """
    self._logger.info("Running node: %s", str(self))

    outputs = None

    if not (inputs is None or isinstance(inputs, dict)):
        raise ValueError(
            f"Node.run() expects a dictionary or None, "
            f"but got {type(inputs)} instead"
        )

    try:
        inputs = {} if inputs is None else inputs
        if not self._inputs:
            outputs = self._run_with_no_inputs(inputs)
        elif isinstance(self._inputs, str):
            outputs = self._run_with_one_input(inputs, self._inputs)
        elif isinstance(self._inputs, list):
            outputs = self._run_with_list(inputs, self._inputs)
        elif isinstance(self._inputs, dict):
            outputs = self._run_with_dict(inputs, self._inputs)

        return self._outputs_to_dictionary(outputs)

    # purposely catch all exceptions
    except Exception as exc:
        self._logger.error(
            "Node %s failed with error: \n%s",
            str(self),
            str(exc),
            extra={"markup": True},
        )
        raise exc

tag

tag(tags)

Create a new Node which is an exact copy of the current one, but with more tags added to it.

Parameters:

  • tags (str | Iterable[str]) –

    The tags to be added to the new node.

Returns:

  • Node

    A copy of the current Node object with the tags added.

Source code in kedro/pipeline/node.py
281
282
283
284
285
286
287
288
289
290
291
292
def tag(self, tags: str | Iterable[str]) -> Node:
    """Create a new ``Node`` which is an exact copy of the current one,
        but with more tags added to it.

    Args:
        tags: The tags to be added to the new node.

    Returns:
        A copy of the current ``Node`` object with the tags added.

    """
    return self._copy(tags=self.tags | set(_to_list(tags)))

node

node(func, inputs, outputs, *, name=None, tags=None, confirms=None, namespace=None)

Create a node in the pipeline by providing a function to be called along with variable names for inputs and/or outputs.

Parameters:

  • func (Callable) –

    A function that corresponds to the node logic. The function should have at least one input or output.

  • inputs (str | list[str] | dict[str, str] | None) –

    The name or the list of the names of variables used as inputs to the function. The number of names should match the number of arguments in the definition of the provided function. When dict[str, str] is provided, variable names will be mapped to function argument names.

  • outputs (str | list[str] | dict[str, str] | None) –

    The name or the list of the names of variables used as outputs to the function. The number of names should match the number of outputs returned by the provided function. When dict[str, str] is provided, variable names will be mapped to the named outputs the function returns.

  • name (str | None, default: None ) –

    Optional node name to be used when displaying the node in logs or any other visualisations.

  • tags (str | Iterable[str] | None, default: None ) –

    Optional set of tags to be applied to the node.

  • confirms (str | list[str] | None, default: None ) –

    Optional name or the list of the names of the datasets that should be confirmed. This will result in calling confirm() method of the corresponding dataset instance. Specified dataset names do not necessarily need to be present in the node inputs or outputs.

  • namespace (str | None, default: None ) –

    Optional node namespace.

Returns:

  • Node

    A Node object with mapped inputs, outputs and function.

Example: ::

>>> import pandas as pd
>>> import numpy as np
>>>
>>> def clean_data(cars: pd.DataFrame,
>>>                boats: pd.DataFrame) -> dict[str, pd.DataFrame]:
>>>     return dict(cars_df=cars.dropna(), boats_df=boats.dropna())
>>>
>>> def halve_dataframe(data: pd.DataFrame) -> List[pd.DataFrame]:
>>>     return np.array_split(data, 2)
>>>
>>> nodes = [
>>>     node(clean_data,
>>>          inputs=['cars2017', 'boats2017'],
>>>          outputs=dict(cars_df='clean_cars2017',
>>>                       boats_df='clean_boats2017')),
>>>     node(halve_dataframe,
>>>          'clean_cars2017',
>>>          ['train_cars2017', 'test_cars2017']),
>>>     node(halve_dataframe,
>>>          dict(data='clean_boats2017'),
>>>          ['train_boats2017', 'test_boats2017'])
>>> ]
Source code in kedro/pipeline/node.py
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
def node(  # noqa: PLR0913
    func: Callable,
    inputs: str | list[str] | dict[str, str] | None,
    outputs: str | list[str] | dict[str, str] | None,
    *,
    name: str | None = None,
    tags: str | Iterable[str] | None = None,
    confirms: str | list[str] | None = None,
    namespace: str | None = None,
) -> Node:
    """Create a node in the pipeline by providing a function to be called
    along with variable names for inputs and/or outputs.

    Args:
        func: A function that corresponds to the node logic. The function
            should have at least one input or output.
        inputs: The name or the list of the names of variables used as inputs
            to the function. The number of names should match the number of
            arguments in the definition of the provided function. When
            dict[str, str] is provided, variable names will be mapped to
            function argument names.
        outputs: The name or the list of the names of variables used as outputs
            to the function. The number of names should match the number of
            outputs returned by the provided function. When dict[str, str]
            is provided, variable names will be mapped to the named outputs the
            function returns.
        name: Optional node name to be used when displaying the node in logs or
            any other visualisations.
        tags: Optional set of tags to be applied to the node.
        confirms: Optional name or the list of the names of the datasets
            that should be confirmed. This will result in calling ``confirm()``
            method of the corresponding dataset instance. Specified dataset
            names do not necessarily need to be present in the node ``inputs``
            or ``outputs``.
        namespace: Optional node namespace.

    Returns:
        A Node object with mapped inputs, outputs and function.

    Example:
    ::

        >>> import pandas as pd
        >>> import numpy as np
        >>>
        >>> def clean_data(cars: pd.DataFrame,
        >>>                boats: pd.DataFrame) -> dict[str, pd.DataFrame]:
        >>>     return dict(cars_df=cars.dropna(), boats_df=boats.dropna())
        >>>
        >>> def halve_dataframe(data: pd.DataFrame) -> List[pd.DataFrame]:
        >>>     return np.array_split(data, 2)
        >>>
        >>> nodes = [
        >>>     node(clean_data,
        >>>          inputs=['cars2017', 'boats2017'],
        >>>          outputs=dict(cars_df='clean_cars2017',
        >>>                       boats_df='clean_boats2017')),
        >>>     node(halve_dataframe,
        >>>          'clean_cars2017',
        >>>          ['train_cars2017', 'test_cars2017']),
        >>>     node(halve_dataframe,
        >>>          dict(data='clean_boats2017'),
        >>>          ['train_boats2017', 'test_boats2017'])
        >>> ]
    """
    return Node(
        func,
        inputs,
        outputs,
        name=name,
        tags=tags,
        confirms=confirms,
        namespace=namespace,
    )