Skip to content

AbstractVersionedDataset

kedro.io.AbstractVersionedDataset

AbstractVersionedDataset(filepath, version, exists_function=None, glob_function=None)

Bases: AbstractDataset[_DI, _DO], ABC

AbstractVersionedDataset is the base class for all versioned dataset implementations.

All datasets that implement versioning should extend this abstract class and implement the methods marked as abstract.

Example: ::

>>> from pathlib import Path, PurePosixPath
>>> import pandas as pd
>>> from kedro.io import AbstractVersionedDataset
>>>
>>>
>>> class MyOwnDataset(AbstractVersionedDataset):
>>>     def __init__(self, filepath, version, param1, param2=True):
>>>         super().__init__(PurePosixPath(filepath), version)
>>>         self._param1 = param1
>>>         self._param2 = param2
>>>
>>>     def load(self) -> pd.DataFrame:
>>>         load_path = self._get_load_path()
>>>         return pd.read_csv(load_path)
>>>
>>>     def save(self, df: pd.DataFrame) -> None:
>>>         save_path = self._get_save_path()
>>>         df.to_csv(str(save_path))
>>>
>>>     def _exists(self) -> bool:
>>>         path = self._get_load_path()
>>>         return Path(path.as_posix()).exists()
>>>
>>>     def _describe(self):
>>>         return dict(version=self._version, param1=self._param1, param2=self._param2)

Example catalog.yml specification: ::

my_dataset:
    type: <path-to-my-own-dataset>.MyOwnDataset
    filepath: data/01_raw/my_data.csv
    versioned: true
    param1: <param1-value> # param1 is a required argument
    # param2 will be True by default

Parameters:

  • filepath (PurePosixPath) –

    Filepath in POSIX format to a file.

  • version (Version | None) –

    If specified, should be an instance of kedro.io.core.Version. If its load attribute is None, the latest version will be loaded. If its save attribute is None, save version will be autogenerated.

  • exists_function (Callable[[str], bool] | None, default: None ) –

    Function that is used for determining whether a path exists in a filesystem.

  • glob_function (Callable[[str], list[str]] | None, default: None ) –

    Function that is used for finding all paths in a filesystem, which match a given pattern.

Source code in kedro/io/core.py
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
def __init__(
    self,
    filepath: PurePosixPath,
    version: Version | None,
    exists_function: Callable[[str], bool] | None = None,
    glob_function: Callable[[str], list[str]] | None = None,
):
    """Creates a new instance of ``AbstractVersionedDataset``.

    Args:
        filepath: Filepath in POSIX format to a file.
        version: If specified, should be an instance of
            ``kedro.io.core.Version``. If its ``load`` attribute is
            None, the latest version will be loaded. If its ``save``
            attribute is None, save version will be autogenerated.
        exists_function: Function that is used for determining whether
            a path exists in a filesystem.
        glob_function: Function that is used for finding all paths
            in a filesystem, which match a given pattern.
    """
    self._filepath = filepath
    self._version = version
    self._exists_function = exists_function or _local_exists
    self._glob_function = glob_function or iglob
    # 1 entry for load version, 1 for save version
    self._version_cache = Cache(maxsize=2)  # type: Cache

_exists_function instance-attribute

_exists_function = exists_function or _local_exists

_filepath instance-attribute

_filepath = filepath

_glob_function instance-attribute

_glob_function = glob_function or iglob

_version instance-attribute

_version = version

_version_cache instance-attribute

_version_cache = Cache(maxsize=2)

_fetch_latest_load_version

_fetch_latest_load_version()
Source code in kedro/io/core.py
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
@cachedmethod(cache=attrgetter("_version_cache"), key=partial(hashkey, "load"))
def _fetch_latest_load_version(self) -> str:
    # When load version is unpinned, fetch the most recent existing
    # version from the given path.
    pattern = str(self._get_versioned_path("*"))
    try:
        version_paths = sorted(self._glob_function(pattern), reverse=True)
    except Exception as exc:
        message = (
            f"Did not find any versions for {self}. This could be "
            f"due to insufficient permission. Exception: {exc}"
        )
        raise VersionNotFoundError(message) from exc
    most_recent = next(
        (path for path in version_paths if self._exists_function(path)), None
    )
    if not most_recent:
        message = f"Did not find any versions for {self}"
        raise VersionNotFoundError(message)
    return PurePath(most_recent).parent.name

_fetch_latest_save_version

_fetch_latest_save_version()

Generate and cache the current save version

Source code in kedro/io/core.py
764
765
766
767
@cachedmethod(cache=attrgetter("_version_cache"), key=partial(hashkey, "save"))
def _fetch_latest_save_version(self) -> str:
    """Generate and cache the current save version"""
    return generate_timestamp()

_get_load_path

_get_load_path()
Source code in kedro/io/core.py
777
778
779
780
781
782
783
def _get_load_path(self) -> PurePosixPath:
    if not self._version:
        # When versioning is disabled, load from original filepath
        return self._filepath

    load_version = self.resolve_load_version()
    return self._get_versioned_path(load_version)  # type: ignore[arg-type]

_get_save_path

_get_save_path()
Source code in kedro/io/core.py
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
def _get_save_path(self) -> PurePosixPath:
    if not self._version:
        # When versioning is disabled, return original filepath
        return self._filepath

    save_version = self.resolve_save_version()
    versioned_path = self._get_versioned_path(save_version)  # type: ignore[arg-type]

    if self._exists_function(str(versioned_path)):
        raise DatasetError(
            f"Save path '{versioned_path}' for {self!s} must not exist if "
            f"versioning is enabled."
        )

    return versioned_path

_get_versioned_path

_get_versioned_path(version)
Source code in kedro/io/core.py
809
810
def _get_versioned_path(self, version: str) -> PurePosixPath:
    return self._filepath / version / self._filepath.name

_release

_release()
Source code in kedro/io/core.py
870
871
872
def _release(self) -> None:
    super()._release()
    self._version_cache.clear()

_save_wrapper classmethod

_save_wrapper(save_func)

Decorate save_func with logging and error handling code.

Source code in kedro/io/core.py
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
@classmethod
def _save_wrapper(
    cls, save_func: Callable[[Self, _DI], None]
) -> Callable[[Self, _DI], None]:
    """Decorate `save_func` with logging and error handling code."""

    @wraps(save_func)
    def save(self: Self, data: _DI) -> None:
        self._version_cache.clear()
        save_version = (
            self.resolve_save_version()
        )  # Make sure last save version is set
        try:
            super()._save_wrapper(save_func)(self, data)
        except (FileNotFoundError, NotADirectoryError) as err:
            # FileNotFoundError raised in Win, NotADirectoryError raised in Unix
            _default_version = "YYYY-MM-DDThh.mm.ss.sssZ"
            raise DatasetError(
                f"Cannot save versioned dataset '{self._filepath.name}' to "
                f"'{self._filepath.parent.as_posix()}' because a file with the same "
                f"name already exists in the directory. This is likely because "
                f"versioning was enabled on a dataset already saved previously. Either "
                f"remove '{self._filepath.name}' from the directory or manually "
                f"convert it into a versioned dataset by placing it in a versioned "
                f"directory (e.g. with default versioning format "
                f"'{self._filepath.as_posix()}/{_default_version}/{self._filepath.name}"
                f"')."
            ) from err

        load_version = self.resolve_load_version()
        if load_version != save_version:
            warnings.warn(
                _CONSISTENCY_WARNING.format(save_version, load_version, str(self))
            )
            self._version_cache.clear()

    return save

exists

exists()

Checks whether a dataset's output already exists by calling the provided _exists() method.

Returns:

  • bool

    Flag indicating whether the output already exists.

Raises:

  • DatasetError

    when underlying exists method raises error.

Source code in kedro/io/core.py
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
def exists(self) -> bool:
    """Checks whether a dataset's output already exists by calling
    the provided _exists() method.

    Returns:
        Flag indicating whether the output already exists.

    Raises:
        DatasetError: when underlying exists method raises error.

    """
    self._logger.debug("Checking whether target of %s exists", str(self))
    try:
        return self._exists()
    except VersionNotFoundError:
        return False
    except Exception as exc:  # SKIP_IF_NO_SPARK
        message = f"Failed during exists check for dataset {self!s}.\n{exc!s}"
        raise DatasetError(message) from exc

resolve_load_version

resolve_load_version()

Compute the version the dataset should be loaded with.

Source code in kedro/io/core.py
769
770
771
772
773
774
775
def resolve_load_version(self) -> str | None:
    """Compute the version the dataset should be loaded with."""
    if not self._version:
        return None
    if self._version.load:
        return self._version.load  # type: ignore[no-any-return]
    return self._fetch_latest_load_version()

resolve_save_version

resolve_save_version()

Compute the version the dataset should be saved with.

Source code in kedro/io/core.py
785
786
787
788
789
790
791
def resolve_save_version(self) -> str | None:
    """Compute the version the dataset should be saved with."""
    if not self._version:
        return None
    if self._version.save:
        return self._version.save  # type: ignore[no-any-return]
    return self._fetch_latest_save_version()