Skip to content

CachedDataset

kedro.io.CachedDataset

CachedDataset(dataset, version=None, copy_mode=None, metadata=None)

Bases: AbstractDataset

CachedDataset is a dataset wrapper which caches in memory the data saved, so that the user avoids io operations with slow storage media.

You can also specify a CachedDataset in catalog.yml: ::

>>> test_ds:
>>>    type: CachedDataset
>>>    versioned: true
>>>    dataset:
>>>       type: pandas.CSVDataset
>>>       filepath: example.csv

Please note that if your dataset is versioned, this should be indicated in the wrapper class as shown above.

Parameters:

  • dataset (AbstractDataset | dict) –

    A Kedro Dataset object or a dictionary to cache.

  • version (Version | None, default: None ) –

    If specified, should be an instance of kedro.io.core.Version. If its load attribute is None, the latest version will be loaded. If its save attribute is None, save version will be autogenerated.

  • copy_mode (str | None, default: None ) –

    The copy mode used to copy the data. Possible values are: "deepcopy", "copy" and "assign". If not provided, it is inferred based on the data type.

  • metadata (dict[str, Any] | None, default: None ) –

    Any arbitrary metadata. This is ignored by Kedro, but may be consumed by users or external plugins.

Raises:

  • ValueError

    If the provided dataset is not a valid dict/YAML representation of a dataset or an actual dataset.

Source code in kedro/io/cached_dataset.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
def __init__(
    self,
    dataset: AbstractDataset | dict,
    version: Version | None = None,
    copy_mode: str | None = None,
    metadata: dict[str, Any] | None = None,
):
    """Creates a new instance of ``CachedDataset`` pointing to the
    provided Python object.

    Args:
        dataset: A Kedro Dataset object or a dictionary to cache.
        version: If specified, should be an instance of
            ``kedro.io.core.Version``. If its ``load`` attribute is
            None, the latest version will be loaded. If its ``save``
            attribute is None, save version will be autogenerated.
        copy_mode: The copy mode used to copy the data. Possible
            values are: "deepcopy", "copy" and "assign". If not
            provided, it is inferred based on the data type.
        metadata: Any arbitrary metadata.
            This is ignored by Kedro, but may be consumed by users or external plugins.

    Raises:
        ValueError: If the provided dataset is not a valid dict/YAML
            representation of a dataset or an actual dataset.
    """
    self._EPHEMERAL = True

    if isinstance(dataset, dict):
        self._dataset = self._from_config(dataset, version)
    elif isinstance(dataset, AbstractDataset):
        self._dataset = dataset
    else:
        raise ValueError(
            "The argument type of 'dataset' should be either a dict/YAML "
            "representation of the dataset, or the actual dataset object."
        )
    self._cache = MemoryDataset(copy_mode=copy_mode)  # type: ignore[abstract]
    self.metadata = metadata

_EPHEMERAL instance-attribute

_EPHEMERAL = True

_SINGLE_PROCESS class-attribute instance-attribute

_SINGLE_PROCESS = True

_cache instance-attribute

_cache = MemoryDataset(copy_mode=copy_mode)

_dataset instance-attribute

_dataset = _from_config(dataset, version)

metadata instance-attribute

metadata = metadata

__getstate__

__getstate__()
Source code in kedro/io/cached_dataset.py
121
122
123
124
125
126
def __getstate__(self) -> dict[str, Any]:
    # clearing the cache can be prevented by modifying
    # how parallel runner handles datasets (not trivial!)
    logging.getLogger(__name__).warning("%s: clearing cache to pickle.", str(self))
    self._cache.release()
    return self.__dict__

__repr__

__repr__()
Source code in kedro/io/cached_dataset.py
 99
100
101
102
103
104
def __repr__(self) -> str:
    object_description = {
        "dataset": self._dataset._pretty_repr(self._dataset._describe()),
        "cache": self._dataset._pretty_repr(self._cache._describe()),
    }
    return self._pretty_repr(object_description)

_describe

_describe()
Source code in kedro/io/cached_dataset.py
96
97
def _describe(self) -> dict[str, Any]:
    return {"dataset": self._dataset._describe(), "cache": self._cache._describe()}

_exists

_exists()
Source code in kedro/io/cached_dataset.py
118
119
def _exists(self) -> bool:
    return self._cache.exists() or self._dataset.exists()

_from_config staticmethod

_from_config(config, version)
Source code in kedro/io/cached_dataset.py
82
83
84
85
86
87
88
89
90
91
92
93
94
@staticmethod
def _from_config(config: dict, version: Version | None) -> AbstractDataset:
    if VERSIONED_FLAG_KEY in config:
        raise ValueError(
            "Cached datasets should specify that they are versioned in the "
            "'CachedDataset', not in the wrapped dataset."
        )
    if version:
        config[VERSIONED_FLAG_KEY] = True
        return AbstractDataset.from_config(
            "_cached", config, version.load, version.save
        )
    return AbstractDataset.from_config("_cached", config)

_release

_release()
Source code in kedro/io/cached_dataset.py
78
79
80
def _release(self) -> None:
    self._cache.release()
    self._dataset.release()

load

load()
Source code in kedro/io/cached_dataset.py
106
107
108
109
110
111
112
def load(self) -> Any:
    data = self._cache.load() if self._cache.exists() else self._dataset.load()

    if not self._cache.exists():
        self._cache.save(data)

    return data

save

save(data)
Source code in kedro/io/cached_dataset.py
114
115
116
def save(self, data: Any) -> None:
    self._dataset.save(data)
    self._cache.save(data)