Skip to content

Session

kedro.framework.session

kedro.framework.session provides access to KedroSession responsible for project lifecycle.

Module Description
kedro.framework.session.session Implements Kedro session responsible for project lifecycle.
kedro.framework.session.store Implements a dict-like store object used to persist Kedro sessions.

kedro.framework.session.session

This module implements Kedro session responsible for project lifecycle.

kedro_version module-attribute

kedro_version = '1.0.0rc1'

pipelines module-attribute

pipelines = _ProjectPipelines()

settings module-attribute

settings = _ProjectSettings()

AbstractConfigLoader

AbstractConfigLoader(conf_source, env=None, runtime_params=None, **kwargs)

Bases: UserDict

AbstractConfigLoader is the abstract base class for all ConfigLoader implementations. All user-defined ConfigLoader implementations should inherit from AbstractConfigLoader and implement all relevant abstract methods.

Source code in kedro/config/abstract_config.py
19
20
21
22
23
24
25
26
27
28
29
def __init__(
    self,
    conf_source: str | Path,
    env: str | None = None,
    runtime_params: dict[str, Any] | None = None,
    **kwargs: Any,
):
    super().__init__()
    self.conf_source = conf_source
    self.env = env
    self.runtime_params = runtime_params or {}

get

get(key, default=None)

D.get(k[,d]) -> D[k] if k in D, else d. d defaults to None.

Source code in kedro/config/abstract_config.py
35
36
37
38
39
40
def get(self, key: str, default: Any = None) -> Any:
    "D.get(k[,d]) -> D[k] if k in D, else d.  d defaults to None."
    try:
        return self[key]
    except KeyError:
        return default

AbstractRunner

AbstractRunner(is_async=False)

Bases: ABC

AbstractRunner is the base class for all Pipeline runner implementations.

Parameters:

  • is_async (bool, default: False ) –

    If True, the node inputs and outputs are loaded and saved asynchronously with threads. Defaults to False.

Source code in kedro/runner/runner.py
44
45
46
47
48
49
50
51
52
53
54
def __init__(
    self,
    is_async: bool = False,
):
    """Instantiates the runner class.

    Args:
        is_async: If True, the node inputs and outputs are loaded and saved
            asynchronously with threads. Defaults to False.
    """
    self._is_async = is_async

run

run(pipeline, catalog, hook_manager=None, run_id=None, only_missing_outputs=False)

Run the Pipeline using the datasets provided by catalog and save results back to the same objects.

Parameters:

  • pipeline (Pipeline) –

    The Pipeline to run.

  • catalog (CatalogProtocol | SharedMemoryCatalogProtocol) –

    An implemented instance of CatalogProtocol or SharedMemoryCatalogProtocol from which to fetch data.

  • hook_manager (PluginManager | None, default: None ) –

    The PluginManager to activate hooks.

  • run_id (str | None, default: None ) –

    The id of the run.

  • only_missing_outputs (bool, default: False ) –

    Run only nodes with missing outputs.

Raises:

  • ValueError

    Raised when Pipeline inputs cannot be satisfied.

Returns:

  • dict[str, Any]

    Dictionary with pipeline outputs, where keys are dataset names

  • dict[str, Any]

    and values are dataset object.

Source code in kedro/runner/runner.py
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def run(
    self,
    pipeline: Pipeline,
    catalog: CatalogProtocol | SharedMemoryCatalogProtocol,
    hook_manager: PluginManager | None = None,
    run_id: str | None = None,
    only_missing_outputs: bool = False,
) -> dict[str, Any]:
    """Run the ``Pipeline`` using the datasets provided by ``catalog``
    and save results back to the same objects.

    Args:
        pipeline: The ``Pipeline`` to run.
        catalog: An implemented instance of ``CatalogProtocol`` or ``SharedMemoryCatalogProtocol`` from which to fetch data.
        hook_manager: The ``PluginManager`` to activate hooks.
        run_id: The id of the run.
        only_missing_outputs: Run only nodes with missing outputs.

    Raises:
        ValueError: Raised when ``Pipeline`` inputs cannot be satisfied.

    Returns:
        Dictionary with pipeline outputs, where keys are dataset names
        and values are dataset object.
    """
    # Apply missing outputs filtering if requested
    if only_missing_outputs:
        pipeline = self._filter_pipeline_for_missing_outputs(pipeline, catalog)

    # Check which datasets used in the pipeline are in the catalog or match
    # a pattern in the catalog, not including extra dataset patterns
    # Run a warm-up to materialize all datasets in the catalog before run
    warmed_up_ds = []
    for ds in pipeline.datasets():
        if ds in catalog:
            warmed_up_ds.append(ds)
        _ = catalog.get(ds, fallback_to_runtime_pattern=True)

    # Check if there are any input datasets that aren't in the catalog and
    # don't match a pattern in the catalog.
    unsatisfied = pipeline.inputs() - set(warmed_up_ds)

    if unsatisfied:
        raise ValueError(
            f"Pipeline input(s) {unsatisfied} not found in the {catalog.__class__.__name__}"
        )

    hook_or_null_manager = hook_manager or _NullPluginManager()

    if self._is_async:
        self._logger.info(
            "Asynchronous mode is enabled for loading and saving data."
        )

    start_time = perf_counter()
    self._run(pipeline, catalog, hook_or_null_manager, run_id)  # type: ignore[arg-type]
    end_time = perf_counter()
    run_duration = end_time - start_time

    self._logger.info(
        f"Pipeline execution completed successfully in {run_duration:.1f} sec."
    )

    # Now we return all pipeline outputs, but we do not load datasets data
    run_output = {ds_name: catalog[ds_name] for ds_name in pipeline.outputs()}

    return run_output

BaseSessionStore

BaseSessionStore(path, session_id)

Bases: UserDict

BaseSessionStore is the base class for all session stores. BaseSessionStore is an ephemeral store implementation that doesn't persist the session data.

Source code in kedro/framework/session/store.py
16
17
18
19
def __init__(self, path: str, session_id: str):
    self._path = path
    self._session_id = session_id
    super().__init__(self.read())

read

read()

Read the data from the session store.

Returns:

  • dict[str, Any]

    A mapping containing the session store data.

Source code in kedro/framework/session/store.py
25
26
27
28
29
30
31
32
33
34
35
def read(self) -> dict[str, Any]:
    """Read the data from the session store.

    Returns:
        A mapping containing the session store data.
    """
    self._logger.debug(
        "'read()' not implemented for '%s'. Assuming empty store.",
        self.__class__.__name__,
    )
    return {}

save

save()

Persist the session store

Source code in kedro/framework/session/store.py
37
38
39
40
41
42
def save(self) -> None:
    """Persist the session store"""
    self._logger.debug(
        "'save()' not implemented for '%s'. Skipping the step.",
        self.__class__.__name__,
    )

KedroContext

KedroContext is the base class which holds the configuration and Kedro's main functionality.

Create a context object by providing the root of a Kedro project and the environment configuration subfolders (see kedro.config.OmegaConfigLoader) Raises: KedroContextError: If there is a mismatch between Kedro project version and package version. Args: project_path: Project path to define the context for. config_loader: Kedro's OmegaConfigLoader for loading the configuration files. env: Optional argument for configuration default environment to be used for running the pipeline. If not specified, it defaults to "local". package_name: Package name for the Kedro project the context is created for. hook_manager: The PluginManager to activate hooks, supplied by the session. runtime_params: Optional dictionary containing runtime project parameters. If specified, will update (and therefore take precedence over) the parameters retrieved from the project configuration.

catalog property

catalog

Read-only property referring to Kedro's catalog` for this context.

Returns:

Raises: KedroContextError: Incorrect catalog registered for the project.

params property

params

Read-only property referring to Kedro's parameters for this context.

Returns:

  • dict[str, Any]

    Parameters defined in parameters.yml with the addition of any extra parameters passed at initialization.

KedroSession

KedroSession(session_id, package_name=None, project_path=None, save_on_close=False, conf_source=None)

KedroSession is the object that is responsible for managing the lifecycle of a Kedro run. Use KedroSession.create() as a context manager to construct a new KedroSession with session data provided (see the example below).

Example: ::

>>> from kedro.framework.session import KedroSession
>>> from kedro.framework.startup import bootstrap_project
>>> from pathlib import Path

>>> # If you are creating a session outside of a Kedro project (i.e. not using
>>> # `kedro run` or `kedro jupyter`), you need to run `bootstrap_project` to
>>> # let Kedro find your configuration.
>>> bootstrap_project(Path("<project_root>"))
>>> with KedroSession.create() as session:
>>>     session.run()
Source code in kedro/framework/session/session.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
def __init__(
    self,
    session_id: str,
    package_name: str | None = None,
    project_path: Path | str | None = None,
    save_on_close: bool = False,
    conf_source: str | None = None,
):
    self._project_path = Path(
        project_path or find_kedro_project(Path.cwd()) or Path.cwd()
    ).resolve()
    self.session_id = session_id
    self.save_on_close = save_on_close
    self._package_name = package_name
    self._store = self._init_store()
    self._run_called = False

    hook_manager = _create_hook_manager()
    _register_hooks(hook_manager, settings.HOOKS)
    _register_hooks_entry_points(hook_manager, settings.DISABLE_HOOKS_FOR_PLUGINS)
    self._hook_manager = hook_manager

    self._conf_source = conf_source or str(
        self._project_path / settings.CONF_SOURCE
    )

store property

store

Return a copy of internal store.

close

close()

Close the current session and save its store to disk if save_on_close attribute is True.

Source code in kedro/framework/session/session.py
266
267
268
269
270
271
def close(self) -> None:
    """Close the current session and save its store to disk
    if `save_on_close` attribute is True.
    """
    if self.save_on_close:
        self._store.save()

create classmethod

create(project_path=None, save_on_close=True, env=None, runtime_params=None, conf_source=None)

Create a new instance of KedroSession with the session data.

Parameters:

  • project_path (Path | str | None, default: None ) –

    Path to the project root directory. Default is current working directory Path.cwd().

  • save_on_close (bool, default: True ) –

    Whether or not to save the session when it's closed.

  • conf_source (str | None, default: None ) –

    Path to a directory containing configuration

  • env (str | None, default: None ) –

    Environment for the KedroContext.

  • runtime_params (dict[str, Any] | None, default: None ) –

    Optional dictionary containing extra project parameters for underlying KedroContext. If specified, will update (and therefore take precedence over) the parameters retrieved from the project configuration.

Returns:

Source code in kedro/framework/session/session.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
@classmethod
def create(
    cls,
    project_path: Path | str | None = None,
    save_on_close: bool = True,
    env: str | None = None,
    runtime_params: dict[str, Any] | None = None,
    conf_source: str | None = None,
) -> KedroSession:
    """Create a new instance of ``KedroSession`` with the session data.

    Args:
        project_path: Path to the project root directory. Default is
            current working directory Path.cwd().
        save_on_close: Whether or not to save the session when it's closed.
        conf_source: Path to a directory containing configuration
        env: Environment for the KedroContext.
        runtime_params: Optional dictionary containing extra project parameters
            for underlying KedroContext. If specified, will update (and therefore
            take precedence over) the parameters retrieved from the project
            configuration.

    Returns:
        A new ``KedroSession`` instance.
    """
    validate_settings()

    session = cls(
        project_path=project_path,
        session_id=generate_timestamp(),
        save_on_close=save_on_close,
        conf_source=conf_source,
    )

    # have to explicitly type session_data otherwise mypy will complain
    # possibly related to this: https://github.com/python/mypy/issues/1430
    session_data: dict[str, Any] = {
        "project_path": session._project_path,
        "session_id": session.session_id,
    }

    ctx = click.get_current_context(silent=True)
    if ctx:
        session_data["cli"] = _jsonify_cli_context(ctx)

    env = env or os.getenv("KEDRO_ENV")
    if env:
        session_data["env"] = env

    if runtime_params:
        session_data["runtime_params"] = runtime_params

    try:
        session_data["username"] = getpass.getuser()
    except Exception as exc:
        logging.getLogger(__name__).debug(
            "Unable to get username. Full exception: %s", exc
        )

    session_data.update(**_describe_git(session._project_path))
    session._store.update(session_data)

    return session

load_context

load_context()

An instance of the project context.

Source code in kedro/framework/session/session.py
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
def load_context(self) -> KedroContext:
    """An instance of the project context."""
    env = self.store.get("env")
    runtime_params = self.store.get("runtime_params")
    config_loader = self._get_config_loader()
    context_class = settings.CONTEXT_CLASS
    context = context_class(
        package_name=self._package_name,
        project_path=self._project_path,
        config_loader=config_loader,
        env=env,
        runtime_params=runtime_params,
        hook_manager=self._hook_manager,
    )
    self._hook_manager.hook.after_context_created(context=context)

    return context  # type: ignore[no-any-return]

run

run(pipeline_name=None, tags=None, runner=None, node_names=None, from_nodes=None, to_nodes=None, from_inputs=None, to_outputs=None, load_versions=None, namespaces=None, only_missing_outputs=False)

Runs the pipeline with a specified runner.

Parameters:

  • pipeline_name (str | None, default: None ) –

    Name of the pipeline that is being run.

  • tags (Iterable[str] | None, default: None ) –

    An optional list of node tags which should be used to filter the nodes of the Pipeline. If specified, only the nodes containing any of these tags will be run.

  • runner (AbstractRunner | None, default: None ) –

    An optional parameter specifying the runner that you want to run the pipeline with.

  • node_names (Iterable[str] | None, default: None ) –

    An optional list of node names which should be used to filter the nodes of the Pipeline. If specified, only the nodes with these names will be run.

  • from_nodes (Iterable[str] | None, default: None ) –

    An optional list of node names which should be used as a starting point of the new Pipeline.

  • to_nodes (Iterable[str] | None, default: None ) –

    An optional list of node names which should be used as an end point of the new Pipeline.

  • from_inputs (Iterable[str] | None, default: None ) –

    An optional list of input datasets which should be used as a starting point of the new Pipeline.

  • to_outputs (Iterable[str] | None, default: None ) –

    An optional list of output datasets which should be used as an end point of the new Pipeline.

  • load_versions (dict[str, str] | None, default: None ) –

    An optional flag to specify a particular dataset version timestamp to load.

  • namespaces (Iterable[str] | None, default: None ) –

    The namespaces of the nodes that are being run.

  • only_missing_outputs (bool, default: False ) –

    Run only nodes with missing outputs.

Raises: ValueError: If the named or __default__ pipeline is not defined by register_pipelines. Exception: Any uncaught exception during the run will be re-raised after being passed to on_pipeline_error hook. KedroSessionError: If more than one run is attempted to be executed during a single session. Returns: Any node outputs that cannot be processed by the DataCatalog. These are returned in a dictionary, where the keys are defined by the node outputs.

Source code in kedro/framework/session/session.py
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
def run(  # noqa: PLR0913
    self,
    pipeline_name: str | None = None,
    tags: Iterable[str] | None = None,
    runner: AbstractRunner | None = None,
    node_names: Iterable[str] | None = None,
    from_nodes: Iterable[str] | None = None,
    to_nodes: Iterable[str] | None = None,
    from_inputs: Iterable[str] | None = None,
    to_outputs: Iterable[str] | None = None,
    load_versions: dict[str, str] | None = None,
    namespaces: Iterable[str] | None = None,
    only_missing_outputs: bool = False,
) -> dict[str, Any]:
    """Runs the pipeline with a specified runner.

    Args:
        pipeline_name: Name of the pipeline that is being run.
        tags: An optional list of node tags which should be used to
            filter the nodes of the ``Pipeline``. If specified, only the nodes
            containing *any* of these tags will be run.
        runner: An optional parameter specifying the runner that you want to run
            the pipeline with.
        node_names: An optional list of node names which should be used to
            filter the nodes of the ``Pipeline``. If specified, only the nodes
            with these names will be run.
        from_nodes: An optional list of node names which should be used as a
            starting point of the new ``Pipeline``.
        to_nodes: An optional list of node names which should be used as an
            end point of the new ``Pipeline``.
        from_inputs: An optional list of input datasets which should be
            used as a starting point of the new ``Pipeline``.
        to_outputs: An optional list of output datasets which should be
            used as an end point of the new ``Pipeline``.
        load_versions: An optional flag to specify a particular dataset
            version timestamp to load.
        namespaces: The namespaces of the nodes that are being run.
        only_missing_outputs: Run only nodes with missing outputs.
    Raises:
        ValueError: If the named or `__default__` pipeline is not
            defined by `register_pipelines`.
        Exception: Any uncaught exception during the run will be re-raised
            after being passed to ``on_pipeline_error`` hook.
        KedroSessionError: If more than one run is attempted to be executed during
            a single session.
    Returns:
        Any node outputs that cannot be processed by the ``DataCatalog``.
        These are returned in a dictionary, where the keys are defined
        by the node outputs.
    """
    # Report project name
    self._logger.info("Kedro project %s", self._project_path.name)

    if self._run_called:
        raise KedroSessionError(
            "A run has already been completed as part of the"
            " active KedroSession. KedroSession has a 1-1 mapping with"
            " runs, and thus only one run should be executed per session."
        )

    session_id = self.store["session_id"]
    save_version = session_id
    runtime_params = self.store.get("runtime_params") or {}
    context = self.load_context()

    name = pipeline_name or "__default__"

    try:
        pipeline = pipelines[name]
    except KeyError as exc:
        raise ValueError(
            f"Failed to find the pipeline named '{name}'. "
            f"It needs to be generated and returned "
            f"by the 'register_pipelines' function."
        ) from exc

    filtered_pipeline = pipeline.filter(
        tags=tags,
        from_nodes=from_nodes,
        to_nodes=to_nodes,
        node_names=node_names,
        from_inputs=from_inputs,
        to_outputs=to_outputs,
        node_namespaces=namespaces,
    )

    record_data = {
        "session_id": session_id,
        "project_path": self._project_path.as_posix(),
        "env": context.env,
        "kedro_version": kedro_version,
        "tags": tags,
        "from_nodes": from_nodes,
        "to_nodes": to_nodes,
        "node_names": node_names,
        "from_inputs": from_inputs,
        "to_outputs": to_outputs,
        "load_versions": load_versions,
        "runtime_params": runtime_params,
        "pipeline_name": pipeline_name,
        "namespaces": namespaces,
        "runner": getattr(runner, "__name__", str(runner)),
        "only_missing_outputs": only_missing_outputs,
    }

    runner = runner or SequentialRunner()
    if not isinstance(runner, AbstractRunner):
        raise KedroSessionError(
            "KedroSession expect an instance of Runner instead of a class."
            "Have you forgotten the `()` at the end of the statement?"
        )

    catalog_class = (
        SharedMemoryDataCatalog
        if isinstance(runner, ParallelRunner)
        else settings.DATA_CATALOG_CLASS
    )

    catalog = context._get_catalog(
        catalog_class=catalog_class,
        save_version=save_version,
        load_versions=load_versions,
    )

    # Run the runner
    hook_manager = self._hook_manager
    hook_manager.hook.before_pipeline_run(
        run_params=record_data, pipeline=filtered_pipeline, catalog=catalog
    )
    try:
        run_result = runner.run(
            filtered_pipeline,
            catalog,
            hook_manager,
            run_id=session_id,
            only_missing_outputs=only_missing_outputs,
        )
        self._run_called = True
    except Exception as error:
        hook_manager.hook.on_pipeline_error(
            error=error,
            run_params=record_data,
            pipeline=filtered_pipeline,
            catalog=catalog,
        )
        raise

    hook_manager.hook.after_pipeline_run(
        run_params=record_data,
        run_result=run_result,
        pipeline=filtered_pipeline,
        catalog=catalog,
    )
    return run_result

KedroSessionError

Bases: Exception

KedroSessionError raised by KedroSession in the case that multiple runs are attempted in one session.

ParallelRunner

ParallelRunner(max_workers=None, is_async=False)

Bases: AbstractRunner

ParallelRunner is an AbstractRunner implementation. It can be used to run the Pipeline in parallel groups formed by toposort. Please note that this runner implementation validates dataset using the _validate_catalog method, which checks if any of the datasets are single process only using the _SINGLE_PROCESS dataset attribute.

Parameters:

  • max_workers (int | None, default: None ) –

    Number of worker processes to spawn. If not set, calculated automatically based on the pipeline configuration and CPU core count. On windows machines, the max_workers value cannot be larger than 61 and will be set to min(61, max_workers).

  • is_async (bool, default: False ) –

    If True, the node inputs and outputs are loaded and saved asynchronously with threads. Defaults to False.

Raises: ValueError: bad parameters passed

Source code in kedro/runner/parallel_runner.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def __init__(
    self,
    max_workers: int | None = None,
    is_async: bool = False,
):
    """
    Instantiates the runner by creating a Manager.

    Args:
        max_workers: Number of worker processes to spawn. If not set,
            calculated automatically based on the pipeline configuration
            and CPU core count. On windows machines, the max_workers value
            cannot be larger than 61 and will be set to min(61, max_workers).
        is_async: If True, the node inputs and outputs are loaded and saved
            asynchronously with threads. Defaults to False.
    Raises:
        ValueError: bad parameters passed
    """
    super().__init__(is_async=is_async)
    self._manager = ParallelRunnerManager()
    self._manager.start()

    self._max_workers = self._validate_max_workers(max_workers)

SequentialRunner

SequentialRunner(is_async=False)

Bases: AbstractRunner

SequentialRunner is an AbstractRunner implementation. It can be used to run the Pipeline in a sequential manner using a topological sort of provided nodes.

Parameters:

  • is_async (bool, default: False ) –

    If True, the node inputs and outputs are loaded and saved asynchronously with threads. Defaults to False.

Source code in kedro/runner/sequential_runner.py
25
26
27
28
29
30
31
32
33
34
35
def __init__(
    self,
    is_async: bool = False,
):
    """Instantiates the runner class.

    Args:
        is_async: If True, the node inputs and outputs are loaded and saved
            asynchronously with threads. Defaults to False.
    """
    super().__init__(is_async=is_async)

SharedMemoryDataCatalog

SharedMemoryDataCatalog(datasets=None, config_resolver=None, load_versions=None, save_version=None)

Bases: DataCatalog

A specialized DataCatalog for managing datasets in a shared memory context.

The SharedMemoryDataCatalog extends the base DataCatalog to support multiprocessing by ensuring that datasets are serializable and synchronized across threads or processes. It provides additional functionality for managing shared memory datasets, such as setting a multiprocessing manager and validating dataset compatibility with multiprocessing.

Attributes:

  • default_runtime_patterns (ClassVar) –

    A dictionary defining the default runtime pattern for datasets of type kedro.io.SharedMemoryDataset.

Example: ::

>>> from multiprocessing.managers import SyncManager
>>> from kedro.io import MemoryDataset
>>> from kedro.io.data_catalog import SharedMemoryDataCatalog
>>>
>>> # Create a shared memory catalog
>>> catalog = SharedMemoryDataCatalog(
...     datasets={"shared_data": MemoryDataset(data=[1, 2, 3])}
... )
>>>
>>> # Set a multiprocessing manager
>>> manager = SyncManager()
>>> manager.start()
>>> catalog.set_manager_datasets(manager)
>>>
>>> # Validate the catalog for multiprocessing compatibility
>>> catalog.validate_catalog()
Source code in kedro/io/data_catalog.py
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
def __init__(
    self,
    datasets: dict[str, AbstractDataset] | None = None,
    config_resolver: CatalogConfigResolver | None = None,
    load_versions: dict[str, str] | None = None,
    save_version: str | None = None,
) -> None:
    """Initializes a ``DataCatalog`` to manage datasets with loading, saving, and versioning capabilities.

    This catalog combines datasets passed directly via the `datasets` argument and dynamic datasets
    resolved from config (e.g., from YAML).

    If a dataset name is present in both `datasets` and the resolved config, the dataset from `datasets`
    takes precedence. A warning is logged, and the config-defined dataset is skipped and removed from
    the internal config.

    Args:
        datasets: A dictionary of dataset names and dataset instances.
        config_resolver: An instance of CatalogConfigResolver to resolve dataset factory patterns and configurations.
        load_versions: A mapping between dataset names and versions
            to load. Has no effect on datasets without enabled versioning.
        save_version: Version string to be used for ``save`` operations
            by all datasets with enabled versioning. It must: a) be a
            case-insensitive string that conforms with operating system
            filename limitations, b) always return the latest version when
            sorted in lexicographical order.

    Example:
    ::

        >>> from kedro.io import DataCatalog, MemoryDataset
        >>> from kedro_datasets.pandas import CSVDataset

        >>> # Define datasets
        >>> datasets = {
        ...     "cars": CSVDataset(filepath="cars.csv"),
        ...     "planes": MemoryDataset(data={"type": "jet", "capacity": 200}),
        ... }

        >>> # Initialize the catalog
        >>> catalog = DataCatalog(
        ...     datasets=datasets,
        ...     load_versions={"cars": "2023-01-01T00.00.00"},
        ...     save_version="2023-01-02T00.00.00",
        ... )

        >>> print(catalog)
    """
    self._config_resolver = config_resolver or CatalogConfigResolver(
        default_runtime_patterns=self.default_runtime_patterns
    )
    self._datasets: dict[str, AbstractDataset] = datasets or {}
    self._lazy_datasets: dict[str, _LazyDataset] = {}
    self._load_versions, self._save_version = self._validate_versions(
        datasets, load_versions or {}, save_version
    )

    self._use_rich_markup = _has_rich_handler()

    for ds_name in list(self._config_resolver.config):
        if ds_name in self._datasets:
            self._logger.warning(
                f"Cannot register dataset '{ds_name}' from config: a dataset with the same name "
                f"was already provided in the `datasets` argument."
            )
            self._config_resolver.config.pop(ds_name)
        else:
            self._add_from_config(ds_name, self._config_resolver.config[ds_name])

set_manager_datasets

set_manager_datasets(manager)

Associate a multiprocessing manager with all shared memory datasets in the catalog.

This method iterates through all datasets in the catalog and sets the provided multiprocessing manager for datasets of type SharedMemoryDataset. This ensures that these datasets are properly synchronized across threads or processes.

Parameters:

  • manager (SyncManager) –

    A multiprocessing manager to be associated with shared memory datasets.

Example: ::

>>> from multiprocessing.managers import SyncManager
>>> from kedro.io.data_catalog import SharedMemoryDataCatalog
>>>
>>> catalog = SharedMemoryDataCatalog(
...     datasets={"shared_data": MemoryDataset(data=[1, 2, 3])}
... )
>>>
>>> manager = SyncManager()
>>> manager.start()
>>> catalog.set_manager_datasets(manager)
>>> print(catalog)
# {'shared_data': kedro.io.memory_dataset.MemoryDataset(data='<list>')}
Source code in kedro/io/data_catalog.py
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
def set_manager_datasets(self, manager: SyncManager) -> None:
    """
    Associate a multiprocessing manager with all shared memory datasets in the catalog.

    This method iterates through all datasets in the catalog and sets the provided
    multiprocessing manager for datasets of type `SharedMemoryDataset`. This ensures
    that these datasets are properly synchronized across threads or processes.

    Args:
        manager: A multiprocessing manager to be associated with
            shared memory datasets.

    Example:
    ::

        >>> from multiprocessing.managers import SyncManager
        >>> from kedro.io.data_catalog import SharedMemoryDataCatalog
        >>>
        >>> catalog = SharedMemoryDataCatalog(
        ...     datasets={"shared_data": MemoryDataset(data=[1, 2, 3])}
        ... )
        >>>
        >>> manager = SyncManager()
        >>> manager.start()
        >>> catalog.set_manager_datasets(manager)
        >>> print(catalog)
        # {'shared_data': kedro.io.memory_dataset.MemoryDataset(data='<list>')}
    """
    for _, ds in self._datasets.items():
        if isinstance(ds, SharedMemoryDataset):
            ds.set_manager(manager)

validate_catalog

validate_catalog()

Validate the catalog to ensure all datasets are serializable and compatible with multiprocessing.

This method checks that all datasets in the catalog are serializable and do not include non-proxied memory datasets as outputs. Non-serializable datasets or datasets that rely on single-process memory cannot be used in a multiprocessing context. If any such datasets are found, an exception is raised with details.

Raises:

  • AttributeError

    If any datasets are found to be non-serializable or incompatible with multiprocessing.

Example: ::

>>> from kedro.io.data_catalog import SharedMemoryDataCatalog
>>>
>>> catalog = SharedMemoryDataCatalog(
...     datasets={"shared_data": MemoryDataset(data=[1, 2, 3])}
... )
>>>
>>> try:
...     catalog.validate_catalog()
... except AttributeError as e:
...     print(f"Validation failed: {e}")
# No error
Source code in kedro/io/data_catalog.py
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
def validate_catalog(self) -> None:
    """
    Validate the catalog to ensure all datasets are serializable and compatible with multiprocessing.

    This method checks that all datasets in the catalog are serializable and do not
    include non-proxied memory datasets as outputs. Non-serializable datasets or
    datasets that rely on single-process memory cannot be used in a multiprocessing
    context. If any such datasets are found, an exception is raised with details.

    Raises:
        AttributeError: If any datasets are found to be non-serializable or incompatible
            with multiprocessing.

    Example:
    ::

        >>> from kedro.io.data_catalog import SharedMemoryDataCatalog
        >>>
        >>> catalog = SharedMemoryDataCatalog(
        ...     datasets={"shared_data": MemoryDataset(data=[1, 2, 3])}
        ... )
        >>>
        >>> try:
        ...     catalog.validate_catalog()
        ... except AttributeError as e:
        ...     print(f"Validation failed: {e}")
        # No error
    """
    unserialisable = []
    for name, dataset in self._datasets.items():
        if getattr(dataset, "_SINGLE_PROCESS", False):  # SKIP_IF_NO_SPARK
            unserialisable.append(name)
            continue
        try:
            ForkingPickler.dumps(dataset)
        except (AttributeError, PicklingError):
            unserialisable.append(name)

    if unserialisable:
        raise AttributeError(
            f"The following datasets cannot be used with multiprocessing: "
            f"{sorted(unserialisable)}\nIn order to utilize multiprocessing you "
            f"need to make sure all datasets are serialisable, i.e. datasets "
            f"should not make use of lambda functions, nested functions, closures "
            f"etc.\nIf you are using custom decorators ensure they are correctly "
            f"decorated using functools.wraps()."
        )

_create_hook_manager

_create_hook_manager()

Create a new PluginManager instance and register Kedro's hook specs.

Source code in kedro/framework/hooks/manager.py
26
27
28
29
30
31
32
33
34
35
36
37
38
def _create_hook_manager() -> PluginManager:
    """Create a new PluginManager instance and register Kedro's hook specs."""
    manager = PluginManager(HOOK_NAMESPACE)
    manager.trace.root.setwriter(
        logger.debug if logger.getEffectiveLevel() == logging.DEBUG else None
    )
    manager.enable_tracing()
    manager.add_hookspecs(NodeSpecs)
    manager.add_hookspecs(PipelineSpecs)
    manager.add_hookspecs(DataCatalogSpecs)
    manager.add_hookspecs(DatasetSpecs)
    manager.add_hookspecs(KedroContextSpecs)
    return manager

_describe_git

_describe_git(project_path)
Source code in kedro/framework/session/session.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
def _describe_git(project_path: Path) -> dict[str, dict[str, Any]]:
    path = str(project_path)
    try:
        res = subprocess.check_output(  # noqa: S603
            ["git", "rev-parse", "--short", "HEAD"],  # noqa: S607
            cwd=path,
            stderr=subprocess.STDOUT,
        )
        git_data: dict[str, Any] = {"commit_sha": res.decode().strip()}
        git_status_res = subprocess.check_output(  # noqa: S603
            ["git", "status", "--short"],  # noqa: S607
            cwd=path,
            stderr=subprocess.STDOUT,
        )
        git_data["dirty"] = bool(git_status_res.decode().strip())

    # `subprocess.check_output()` raises `NotADirectoryError` on Windows
    except Exception:
        logger = logging.getLogger(__name__)
        logger.debug("Unable to git describe %s", path)
        logger.debug(traceback.format_exc())
        return {}

    return {"git": git_data}

_jsonify_cli_context

_jsonify_cli_context(ctx)
Source code in kedro/framework/session/session.py
65
66
67
68
69
70
71
def _jsonify_cli_context(ctx: click.core.Context) -> dict[str, Any]:
    return {
        "args": ctx.args,
        "params": ctx.params,
        "command_name": ctx.command.name,
        "command_path": " ".join(["kedro"] + sys.argv[1:]),
    }

_register_hooks

_register_hooks(hook_manager, hooks)

Register all hooks as specified in hooks with the global hook_manager.

Parameters:

  • hook_manager (PluginManager) –

    Hook manager instance to register the hooks with.

  • hooks (Iterable[Any]) –

    Hooks that need to be registered.

Source code in kedro/framework/hooks/manager.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
def _register_hooks(hook_manager: PluginManager, hooks: Iterable[Any]) -> None:
    """Register all hooks as specified in ``hooks`` with the global ``hook_manager``.

    Args:
        hook_manager: Hook manager instance to register the hooks with.
        hooks: Hooks that need to be registered.

    """
    for hooks_collection in hooks:
        # Sometimes users might call hook registration more than once, in which
        # case hooks have already been registered, so we perform a simple check
        # here to avoid an error being raised and break user's workflow.
        if not hook_manager.is_registered(hooks_collection):
            if isclass(hooks_collection):
                raise TypeError(
                    "KedroSession expects hooks to be registered as instances. "
                    "Have you forgotten the `()` when registering a hook class ?"
                )
            hook_manager.register(hooks_collection)

_register_hooks_entry_points

_register_hooks_entry_points(hook_manager, disabled_plugins)

Register pluggy hooks from python package entrypoints.

Parameters:

  • hook_manager (PluginManager) –

    Hook manager instance to register the hooks with.

  • disabled_plugins (Iterable[str]) –

    An iterable returning the names of plugins which hooks must not be registered; any already registered hooks will be unregistered.

Source code in kedro/framework/hooks/manager.py
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
def _register_hooks_entry_points(
    hook_manager: PluginManager, disabled_plugins: Iterable[str]
) -> None:
    """Register pluggy hooks from python package entrypoints.

    Args:
        hook_manager: Hook manager instance to register the hooks with.
        disabled_plugins: An iterable returning the names of plugins
            which hooks must not be registered; any already registered
            hooks will be unregistered.

    """
    already_registered = hook_manager.get_plugins()
    # Method name is misleading:
    # entry points are standard and don't require setuptools,
    # see https://packaging.python.org/en/latest/specifications/entry-points/
    hook_manager.load_setuptools_entrypoints(_PLUGIN_HOOKS)
    disabled_plugins = set(disabled_plugins)

    # Get list of plugin/distinfo tuples for all registered plugins.
    plugininfo = hook_manager.list_plugin_distinfo()
    plugin_names = set()
    disabled_plugin_names = set()
    for plugin, dist in plugininfo:
        if dist.project_name in disabled_plugins:
            # `unregister()` is used instead of `set_blocked()` because
            # we want to disable hooks for specific plugin based on project
            # name and not `entry_point` name. Also, we log project names with
            # version for which hooks were registered.
            hook_manager.unregister(plugin=plugin)
            disabled_plugin_names.add(f"{dist.project_name}-{dist.version}")
        elif plugin not in already_registered:
            plugin_names.add(f"{dist.project_name}-{dist.version}")

    if disabled_plugin_names:
        logger.debug(
            "Hooks are disabled for plugin(s): %s",
            ", ".join(sorted(disabled_plugin_names)),
        )

    if plugin_names:
        logger.debug(
            "Registered hooks from %d installed plugin(s): %s",
            len(plugin_names),
            ", ".join(sorted(plugin_names)),
        )

find_kedro_project

find_kedro_project(current_dir)

Given a path, find a Kedro project associated with it.

Can be
  • Itself, if a path is a root directory of a Kedro project.
  • One of its parents, if self is not a Kedro project but one of the parent path is.
  • None, if neither self nor any parent path is a Kedro project.

Returns:

  • Any

    Kedro project associated with a given path,

  • Any

    or None if no relevant Kedro project is found.

Source code in kedro/utils.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
def find_kedro_project(current_dir: Path) -> Any:  # pragma: no cover
    """Given a path, find a Kedro project associated with it.

    Can be:
        - Itself, if a path is a root directory of a Kedro project.
        - One of its parents, if self is not a Kedro project but one of the parent path is.
        - None, if neither self nor any parent path is a Kedro project.

    Returns:
        Kedro project associated with a given path,
        or None if no relevant Kedro project is found.
    """
    paths_to_check = [current_dir, *list(current_dir.parents)]
    for parent_dir in paths_to_check:
        if is_kedro_project(parent_dir):
            return parent_dir
    return None

generate_timestamp

generate_timestamp()

Generate the timestamp to be used by versioning.

Returns:

  • str

    String representation of the current timestamp.

Source code in kedro/io/core.py
480
481
482
483
484
485
486
487
488
def generate_timestamp() -> str:
    """Generate the timestamp to be used by versioning.

    Returns:
        String representation of the current timestamp.

    """
    current_ts = datetime.now(tz=timezone.utc).strftime(VERSION_FORMAT)
    return current_ts[:-4] + current_ts[-1:]  # Don't keep microseconds

validate_settings

validate_settings()

Eagerly validate that the settings module is importable if it exists. This is desirable to surface any syntax or import errors early. In particular, without eagerly importing the settings module, dynaconf would silence any import error (e.g. missing dependency, missing/mislabelled pipeline), and users would instead get a cryptic error message Expected an instance of `ConfigLoader`, got `NoneType` instead. More info on the dynaconf issue: https://github.com/dynaconf/dynaconf/issues/460

Source code in kedro/framework/project/__init__.py
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
def validate_settings() -> None:
    """Eagerly validate that the settings module is importable if it exists. This is desirable to
    surface any syntax or import errors early. In particular, without eagerly importing
    the settings module, dynaconf would silence any import error (e.g. missing
    dependency, missing/mislabelled pipeline), and users would instead get a cryptic
    error message ``Expected an instance of `ConfigLoader`, got `NoneType` instead``.
    More info on the dynaconf issue: https://github.com/dynaconf/dynaconf/issues/460
    """
    if PACKAGE_NAME is None:
        raise ValueError(
            "Package name not found. Make sure you have configured the project using "
            "'bootstrap_project'. This should happen automatically if you are using "
            "Kedro command line interface."
        )
    # Check if file exists, if it does, validate it.
    if importlib.util.find_spec(f"{PACKAGE_NAME}.settings") is not None:
        importlib.import_module(f"{PACKAGE_NAME}.settings")
    else:
        logger = logging.getLogger(__name__)
        logger.warning("No 'settings.py' found, defaults will be used.")

kedro.framework.session.store

This module implements a dict-like store object used to persist Kedro sessions.

BaseSessionStore

BaseSessionStore(path, session_id)

Bases: UserDict

BaseSessionStore is the base class for all session stores. BaseSessionStore is an ephemeral store implementation that doesn't persist the session data.

Source code in kedro/framework/session/store.py
16
17
18
19
def __init__(self, path: str, session_id: str):
    self._path = path
    self._session_id = session_id
    super().__init__(self.read())

read

read()

Read the data from the session store.

Returns:

  • dict[str, Any]

    A mapping containing the session store data.

Source code in kedro/framework/session/store.py
25
26
27
28
29
30
31
32
33
34
35
def read(self) -> dict[str, Any]:
    """Read the data from the session store.

    Returns:
        A mapping containing the session store data.
    """
    self._logger.debug(
        "'read()' not implemented for '%s'. Assuming empty store.",
        self.__class__.__name__,
    )
    return {}

save

save()

Persist the session store

Source code in kedro/framework/session/store.py
37
38
39
40
41
42
def save(self) -> None:
    """Persist the session store"""
    self._logger.debug(
        "'save()' not implemented for '%s'. Skipping the step.",
        self.__class__.__name__,
    )