Skip to content

ParallelRunner

kedro.runner.ParallelRunner

ParallelRunner(max_workers=None, is_async=False)

Bases: AbstractRunner

ParallelRunner is an AbstractRunner implementation. It can be used to run the Pipeline in parallel groups formed by toposort. Please note that this runner implementation validates dataset using the _validate_catalog method, which checks if any of the datasets are single process only using the _SINGLE_PROCESS dataset attribute.

Parameters:

  • max_workers (int | None, default: None ) –

    Number of worker processes to spawn. If not set, calculated automatically based on the pipeline configuration and CPU core count. On windows machines, the max_workers value cannot be larger than 61 and will be set to min(61, max_workers).

  • is_async (bool, default: False ) –

    If True, the node inputs and outputs are loaded and saved asynchronously with threads. Defaults to False.

Raises: ValueError: bad parameters passed

Source code in kedro/runner/parallel_runner.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def __init__(
    self,
    max_workers: int | None = None,
    is_async: bool = False,
):
    """
    Instantiates the runner by creating a Manager.

    Args:
        max_workers: Number of worker processes to spawn. If not set,
            calculated automatically based on the pipeline configuration
            and CPU core count. On windows machines, the max_workers value
            cannot be larger than 61 and will be set to min(61, max_workers).
        is_async: If True, the node inputs and outputs are loaded and saved
            asynchronously with threads. Defaults to False.
    Raises:
        ValueError: bad parameters passed
    """
    super().__init__(is_async=is_async)
    self._manager = ParallelRunnerManager()
    self._manager.start()

    self._max_workers = self._validate_max_workers(max_workers)

_manager instance-attribute

_manager = ParallelRunnerManager()

_max_workers instance-attribute

_max_workers = _validate_max_workers(max_workers)

__del__

__del__()
Source code in kedro/runner/parallel_runner.py
71
72
def __del__(self) -> None:
    self._manager.shutdown()

_get_executor

_get_executor(max_workers)
Source code in kedro/runner/parallel_runner.py
119
120
121
122
123
124
def _get_executor(self, max_workers: int) -> Executor:
    context = os.environ.get("KEDRO_MP_CONTEXT")
    if context and context not in {"fork", "spawn"}:
        context = None
    ctx = get_context(context)
    return ProcessPoolExecutor(max_workers=max_workers, mp_context=ctx)

_get_required_workers_count

_get_required_workers_count(pipeline)

Calculate the max number of processes required for the pipeline, limit to the number of CPU cores.

Source code in kedro/runner/parallel_runner.py
105
106
107
108
109
110
111
112
113
114
115
116
117
def _get_required_workers_count(self, pipeline: Pipeline) -> int:
    """
    Calculate the max number of processes required for the pipeline,
    limit to the number of CPU cores.
    """
    # Number of nodes is a safe upper-bound estimate.
    # It's also safe to reduce it by the number of layers minus one,
    # because each layer means some nodes depend on other nodes
    # and they can not run in parallel.
    # It might be not a perfect solution, but good enough and simple.
    required_processes = len(pipeline.nodes) - len(pipeline.grouped_nodes) + 1

    return min(required_processes, self._max_workers)

_run

_run(pipeline, catalog, hook_manager=None, run_id=None)

The method implementing parallel pipeline running.

Parameters:

  • pipeline (Pipeline) –

    The Pipeline to run.

  • catalog (SharedMemoryCatalogProtocol) –

    An implemented instance of SharedMemoryCatalogProtocol from which to fetch data.

  • hook_manager (PluginManager | None, default: None ) –

    The PluginManager to activate hooks.

  • run_id (str | None, default: None ) –

    The id of the run.

Raises:

  • AttributeError

    When the provided pipeline is not suitable for parallel execution.

  • RuntimeError

    If the runner is unable to schedule the execution of all pipeline nodes.

  • Exception

    In case of any downstream node failure.

Source code in kedro/runner/parallel_runner.py
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
def _run(
    self,
    pipeline: Pipeline,
    catalog: SharedMemoryCatalogProtocol,  # type: ignore[override]
    hook_manager: PluginManager | None = None,
    run_id: str | None = None,
) -> None:
    """The method implementing parallel pipeline running.

    Args:
        pipeline: The ``Pipeline`` to run.
        catalog: An implemented instance of ``SharedMemoryCatalogProtocol`` from which to fetch data.
        hook_manager: The ``PluginManager`` to activate hooks.
        run_id: The id of the run.

    Raises:
        AttributeError: When the provided pipeline is not suitable for
            parallel execution.
        RuntimeError: If the runner is unable to schedule the execution of
            all pipeline nodes.
        Exception: In case of any downstream node failure.

    """
    if not self._is_async:
        self._logger.info(
            "Using synchronous mode for loading and saving data. Use the --async flag "
            "for potential performance gains. https://docs.kedro.org/en/stable/nodes_and_pipelines/run_a_pipeline.html#load-and-save-asynchronously"
        )

    super()._run(
        pipeline=pipeline,
        catalog=catalog,
        run_id=run_id,
    )

_set_manager_datasets

_set_manager_datasets(catalog)
Source code in kedro/runner/parallel_runner.py
102
103
def _set_manager_datasets(self, catalog: SharedMemoryCatalogProtocol) -> None:  # type: ignore[override]
    catalog.set_manager_datasets(self._manager)

_validate_catalog classmethod

_validate_catalog(catalog)

Ensure that all datasets are serialisable and that we do not have any non proxied memory datasets being used as outputs as their content will not be synchronized across threads.

Source code in kedro/runner/parallel_runner.py
 94
 95
 96
 97
 98
 99
100
@classmethod
def _validate_catalog(cls, catalog: SharedMemoryCatalogProtocol) -> None:  # type: ignore[override]
    """Ensure that all datasets are serialisable and that we do not have
    any non proxied memory datasets being used as outputs as their content
    will not be synchronized across threads.
    """
    catalog.validate_catalog()

_validate_nodes classmethod

_validate_nodes(nodes)

Ensure all tasks are serialisable.

Source code in kedro/runner/parallel_runner.py
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
@classmethod
def _validate_nodes(cls, nodes: Iterable[Node]) -> None:
    """Ensure all tasks are serialisable."""
    unserialisable = []
    for node in nodes:
        try:
            ForkingPickler.dumps(node)
        except (AttributeError, PicklingError):
            unserialisable.append(node)

    if unserialisable:
        raise AttributeError(
            f"The following nodes cannot be serialised: {sorted(unserialisable)}\n"
            f"In order to utilize multiprocessing you need to make sure all nodes "
            f"are serialisable, i.e. nodes should not include lambda "
            f"functions, nested functions, closures, etc.\nIf you "
            f"are using custom decorators ensure they are correctly decorated using "
            f"functools.wraps()."
        )