Skip to content

SharedMemoryDataCatalog

kedro.io.SharedMemoryDataCatalog

SharedMemoryDataCatalog(datasets=None, config_resolver=None, load_versions=None, save_version=None)

Bases: DataCatalog

A specialized DataCatalog for managing datasets in a shared memory context.

The SharedMemoryDataCatalog extends the base DataCatalog to support multiprocessing by ensuring that datasets are serializable and synchronized across threads or processes. It provides additional functionality for managing shared memory datasets, such as setting a multiprocessing manager and validating dataset compatibility with multiprocessing.

Attributes:

  • default_runtime_patterns (ClassVar) –

    A dictionary defining the default runtime pattern for datasets of type kedro.io.SharedMemoryDataset.

Example: ::

>>> from multiprocessing.managers import SyncManager
>>> from kedro.io import MemoryDataset
>>> from kedro.io.data_catalog import SharedMemoryDataCatalog
>>>
>>> # Create a shared memory catalog
>>> catalog = SharedMemoryDataCatalog(
...     datasets={"shared_data": MemoryDataset(data=[1, 2, 3])}
... )
>>>
>>> # Set a multiprocessing manager
>>> manager = SyncManager()
>>> manager.start()
>>> catalog.set_manager_datasets(manager)
>>>
>>> # Validate the catalog for multiprocessing compatibility
>>> catalog.validate_catalog()
Source code in kedro/io/data_catalog.py
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
def __init__(
    self,
    datasets: dict[str, AbstractDataset] | None = None,
    config_resolver: CatalogConfigResolver | None = None,
    load_versions: dict[str, str] | None = None,
    save_version: str | None = None,
) -> None:
    """Initializes a ``DataCatalog`` to manage datasets with loading, saving, and versioning capabilities.

    This catalog combines datasets passed directly via the `datasets` argument and dynamic datasets
    resolved from config (e.g., from YAML).

    If a dataset name is present in both `datasets` and the resolved config, the dataset from `datasets`
    takes precedence. A warning is logged, and the config-defined dataset is skipped and removed from
    the internal config.

    Args:
        datasets: A dictionary of dataset names and dataset instances.
        config_resolver: An instance of CatalogConfigResolver to resolve dataset factory patterns and configurations.
        load_versions: A mapping between dataset names and versions
            to load. Has no effect on datasets without enabled versioning.
        save_version: Version string to be used for ``save`` operations
            by all datasets with enabled versioning. It must: a) be a
            case-insensitive string that conforms with operating system
            filename limitations, b) always return the latest version when
            sorted in lexicographical order.

    Example:
    ::

        >>> from kedro.io import DataCatalog, MemoryDataset
        >>> from kedro_datasets.pandas import CSVDataset

        >>> # Define datasets
        >>> datasets = {
        ...     "cars": CSVDataset(filepath="cars.csv"),
        ...     "planes": MemoryDataset(data={"type": "jet", "capacity": 200}),
        ... }

        >>> # Initialize the catalog
        >>> catalog = DataCatalog(
        ...     datasets=datasets,
        ...     load_versions={"cars": "2023-01-01T00.00.00"},
        ...     save_version="2023-01-02T00.00.00",
        ... )

        >>> print(catalog)
    """
    self._config_resolver = config_resolver or CatalogConfigResolver(
        default_runtime_patterns=self.default_runtime_patterns
    )
    self._datasets: dict[str, AbstractDataset] = datasets or {}
    self._lazy_datasets: dict[str, _LazyDataset] = {}
    self._load_versions, self._save_version = self._validate_versions(
        datasets, load_versions or {}, save_version
    )

    self._use_rich_markup = _has_rich_handler()

    for ds_name in list(self._config_resolver.config):
        if ds_name in self._datasets:
            self._logger.warning(
                f"Cannot register dataset '{ds_name}' from config: a dataset with the same name "
                f"was already provided in the `datasets` argument."
            )
            self._config_resolver.config.pop(ds_name)
        else:
            self._add_from_config(ds_name, self._config_resolver.config[ds_name])

default_runtime_patterns class-attribute instance-attribute

default_runtime_patterns = {'{default}': {'type': 'kedro.io.SharedMemoryDataset'}}

set_manager_datasets

set_manager_datasets(manager)

Associate a multiprocessing manager with all shared memory datasets in the catalog.

This method iterates through all datasets in the catalog and sets the provided multiprocessing manager for datasets of type SharedMemoryDataset. This ensures that these datasets are properly synchronized across threads or processes.

Parameters:

  • manager (SyncManager) –

    A multiprocessing manager to be associated with shared memory datasets.

Example: ::

>>> from multiprocessing.managers import SyncManager
>>> from kedro.io.data_catalog import SharedMemoryDataCatalog
>>>
>>> catalog = SharedMemoryDataCatalog(
...     datasets={"shared_data": MemoryDataset(data=[1, 2, 3])}
... )
>>>
>>> manager = SyncManager()
>>> manager.start()
>>> catalog.set_manager_datasets(manager)
>>> print(catalog)
# {'shared_data': kedro.io.memory_dataset.MemoryDataset(data='<list>')}
Source code in kedro/io/data_catalog.py
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
def set_manager_datasets(self, manager: SyncManager) -> None:
    """
    Associate a multiprocessing manager with all shared memory datasets in the catalog.

    This method iterates through all datasets in the catalog and sets the provided
    multiprocessing manager for datasets of type `SharedMemoryDataset`. This ensures
    that these datasets are properly synchronized across threads or processes.

    Args:
        manager: A multiprocessing manager to be associated with
            shared memory datasets.

    Example:
    ::

        >>> from multiprocessing.managers import SyncManager
        >>> from kedro.io.data_catalog import SharedMemoryDataCatalog
        >>>
        >>> catalog = SharedMemoryDataCatalog(
        ...     datasets={"shared_data": MemoryDataset(data=[1, 2, 3])}
        ... )
        >>>
        >>> manager = SyncManager()
        >>> manager.start()
        >>> catalog.set_manager_datasets(manager)
        >>> print(catalog)
        # {'shared_data': kedro.io.memory_dataset.MemoryDataset(data='<list>')}
    """
    for _, ds in self._datasets.items():
        if isinstance(ds, SharedMemoryDataset):
            ds.set_manager(manager)

validate_catalog

validate_catalog()

Validate the catalog to ensure all datasets are serializable and compatible with multiprocessing.

This method checks that all datasets in the catalog are serializable and do not include non-proxied memory datasets as outputs. Non-serializable datasets or datasets that rely on single-process memory cannot be used in a multiprocessing context. If any such datasets are found, an exception is raised with details.

Raises:

  • AttributeError

    If any datasets are found to be non-serializable or incompatible with multiprocessing.

Example: ::

>>> from kedro.io.data_catalog import SharedMemoryDataCatalog
>>>
>>> catalog = SharedMemoryDataCatalog(
...     datasets={"shared_data": MemoryDataset(data=[1, 2, 3])}
... )
>>>
>>> try:
...     catalog.validate_catalog()
... except AttributeError as e:
...     print(f"Validation failed: {e}")
# No error
Source code in kedro/io/data_catalog.py
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
def validate_catalog(self) -> None:
    """
    Validate the catalog to ensure all datasets are serializable and compatible with multiprocessing.

    This method checks that all datasets in the catalog are serializable and do not
    include non-proxied memory datasets as outputs. Non-serializable datasets or
    datasets that rely on single-process memory cannot be used in a multiprocessing
    context. If any such datasets are found, an exception is raised with details.

    Raises:
        AttributeError: If any datasets are found to be non-serializable or incompatible
            with multiprocessing.

    Example:
    ::

        >>> from kedro.io.data_catalog import SharedMemoryDataCatalog
        >>>
        >>> catalog = SharedMemoryDataCatalog(
        ...     datasets={"shared_data": MemoryDataset(data=[1, 2, 3])}
        ... )
        >>>
        >>> try:
        ...     catalog.validate_catalog()
        ... except AttributeError as e:
        ...     print(f"Validation failed: {e}")
        # No error
    """
    unserialisable = []
    for name, dataset in self._datasets.items():
        if getattr(dataset, "_SINGLE_PROCESS", False):  # SKIP_IF_NO_SPARK
            unserialisable.append(name)
            continue
        try:
            ForkingPickler.dumps(dataset)
        except (AttributeError, PicklingError):
            unserialisable.append(name)

    if unserialisable:
        raise AttributeError(
            f"The following datasets cannot be used with multiprocessing: "
            f"{sorted(unserialisable)}\nIn order to utilize multiprocessing you "
            f"need to make sure all datasets are serialisable, i.e. datasets "
            f"should not make use of lambda functions, nested functions, closures "
            f"etc.\nIf you are using custom decorators ensure they are correctly "
            f"decorated using functools.wraps()."
        )