Hooks
kedro.framework.hooks ¶
kedro.framework.hooks
provides primitives to use hooks to extend KedroContext's behaviour
Module | Description |
---|---|
kedro.framework.hooks.manager |
Provides a utility function to retrieve the global hook_manager singleton in Kedro's execution process. |
kedro.framework.hooks.markers |
Provides markers to declare Kedro's hook specs and implementations. |
kedro.framework.hooks.specs |
Contains specifications for all callable hooks in Kedro's execution timeline. |
kedro.framework.hooks.manager¶
kedro.framework.hooks.manager ¶
This module provides an utility function to retrieve the global hook_manager singleton in a Kedro's execution process.
DataCatalogSpecs ¶
Namespace that defines all specifications for a data catalog's lifecycle hooks.
after_catalog_created ¶
after_catalog_created(catalog, conf_catalog, conf_creds, parameters, save_version, load_versions)
Hooks to be invoked after a data catalog is created.
It receives the catalog
as well as
all the arguments for KedroContext._create_catalog
.
Parameters:
-
catalog
(CatalogProtocol
) –The catalog that was created.
-
conf_catalog
(dict[str, Any]
) –The config from which the catalog was created.
-
conf_creds
(dict[str, Any]
) –The credentials conf from which the catalog was created.
-
parameters
(dict[str, Any]
) –The parameters that are added to the catalog after creation.
-
save_version
(str
) –The save_version used in
save
operations for all datasets in the catalog. -
load_versions
(dict[str, str]
) –The load_versions used in
load
operations for each dataset in the catalog.
Source code in kedro/framework/hooks/specs.py
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
|
DatasetSpecs ¶
Namespace that defines all specifications for a dataset's lifecycle hooks.
after_dataset_loaded ¶
after_dataset_loaded(dataset_name, data, node)
Hook to be invoked after a dataset is loaded from the catalog.
Parameters:
-
dataset_name
(str
) –name of the dataset that was loaded from the catalog.
-
data
(Any
) –the actual data that was loaded from the catalog.
-
node
(Node
) –The
Node
to run.
Source code in kedro/framework/hooks/specs.py
260 261 262 263 264 265 266 267 268 269 |
|
after_dataset_saved ¶
after_dataset_saved(dataset_name, data, node)
Hook to be invoked after a dataset is saved in the catalog.
Parameters:
-
dataset_name
(str
) –name of the dataset that was saved to the catalog.
-
data
(Any
) –the actual data that was saved to the catalog.
-
node
(Node
) –The
Node
that ran.
Source code in kedro/framework/hooks/specs.py
282 283 284 285 286 287 288 289 290 291 |
|
before_dataset_loaded ¶
before_dataset_loaded(dataset_name, node)
Hook to be invoked before a dataset is loaded from the catalog.
Parameters:
-
dataset_name
(str
) –name of the dataset to be loaded from the catalog.
-
node
(Node
) –The
Node
to run.
Source code in kedro/framework/hooks/specs.py
250 251 252 253 254 255 256 257 258 |
|
before_dataset_saved ¶
before_dataset_saved(dataset_name, data, node)
Hook to be invoked before a dataset is saved to the catalog.
Parameters:
-
dataset_name
(str
) –name of the dataset to be saved to the catalog.
-
data
(Any
) –the actual data to be saved to the catalog.
-
node
(Node
) –The
Node
that ran.
Source code in kedro/framework/hooks/specs.py
271 272 273 274 275 276 277 278 279 280 |
|
KedroContextSpecs ¶
Namespace that defines all specifications for a Kedro context's lifecycle hooks.
after_context_created ¶
after_context_created(context)
Hooks to be invoked after a KedroContext
is created. This is the earliest
hook triggered within a Kedro run. The KedroContext
stores useful information
such as credentials
, config_loader
and env
.
Parameters:
-
context
(KedroContext
) –The context that was created.
Source code in kedro/framework/hooks/specs.py
297 298 299 300 301 302 303 304 305 306 307 308 |
|
NodeSpecs ¶
Namespace that defines all specifications for a node's lifecycle hooks.
after_node_run ¶
after_node_run(node, catalog, inputs, outputs, is_async, run_id)
Hook to be invoked after a node runs.
Parameters:
-
node
(Node
) –The
Node
that ran. -
catalog
(CatalogProtocol
) –An implemented instance of
CatalogProtocol
containing the node's inputs and outputs. -
inputs
(dict[str, Any]
) –The dictionary of inputs dataset. The keys are dataset names and the values are the actual loaded input data, not the dataset instance.
-
outputs
(dict[str, Any]
) –The dictionary of outputs dataset. The keys are dataset names and the values are the actual computed output data, not the dataset instance.
-
is_async
(bool
) –Whether the node was run in
async
mode. -
run_id
(str
) –The id of the run.
Source code in kedro/framework/hooks/specs.py
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
|
before_node_run ¶
before_node_run(node, catalog, inputs, is_async, run_id)
Hook to be invoked before a node runs.
Parameters:
-
node
(Node
) –The
Node
to run. -
catalog
(CatalogProtocol
) –An implemented instance of
CatalogProtocol
containing the node's inputs and outputs. -
inputs
(dict[str, Any]
) –The dictionary of inputs dataset. The keys are dataset names and the values are the actual loaded input data, not the dataset instance.
-
is_async
(bool
) –Whether the node was run in
async
mode. -
run_id
(str
) –The id of the run.
Returns:
-
dict[str, Any] | None
–Either None or a dictionary mapping dataset name(s) to new value(s). If returned, this dictionary will be used to update the node inputs, which allows to overwrite the node inputs.
Source code in kedro/framework/hooks/specs.py
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
|
on_node_error ¶
on_node_error(error, node, catalog, inputs, is_async, run_id)
Hook to be invoked if a node run throws an uncaught error.
The signature of this error hook should match the signature of before_node_run
along with the error that was raised.
Parameters:
-
error
(Exception
) –The uncaught exception thrown during the node run.
-
node
(Node
) –The
Node
to run. -
catalog
(CatalogProtocol
) –An implemented instance of
CatalogProtocol
containing the node's inputs and outputs. -
inputs
(dict[str, Any]
) –The dictionary of inputs dataset. The keys are dataset names and the values are the actual loaded input data, not the dataset instance.
-
is_async
(bool
) –Whether the node was run in
async
mode. -
run_id
(str
) –The id of the run.
Source code in kedro/framework/hooks/specs.py
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
|
PipelineSpecs ¶
Namespace that defines all specifications for a pipeline's lifecycle hooks.
after_pipeline_run ¶
after_pipeline_run(run_params, run_result, pipeline, catalog)
Hook to be invoked after a pipeline runs.
Parameters:
-
run_params
(dict[str, Any]
) –The params used to run the pipeline. Should have the following schema::
{ "run_id": str "project_path": str, "env": str, "kedro_version": str, "tags": Optional[List[str]], "from_nodes": Optional[List[str]], "to_nodes": Optional[List[str]], "node_names": Optional[List[str]], "from_inputs": Optional[List[str]], "to_outputs": Optional[List[str]], "load_versions": Optional[List[str]], "runtime_params": Optional[Dict[str, Any]] "pipeline_name": str, "namespace": Optional[str], "runner": str, }
-
run_result
(dict[str, Any]
) –The output of
Pipeline
run. -
pipeline
(Pipeline
) –The
Pipeline
that was run. -
catalog
(CatalogProtocol
) –An implemented instance of
CatalogProtocol
used during the run.
Source code in kedro/framework/hooks/specs.py
168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 |
|
before_pipeline_run ¶
before_pipeline_run(run_params, pipeline, catalog)
Hook to be invoked before a pipeline runs.
Parameters:
-
run_params
(dict[str, Any]
) –The params used to run the pipeline. Should have the following schema::
{ "run_id": str "project_path": str, "env": str, "kedro_version": str, "tags": Optional[List[str]], "from_nodes": Optional[List[str]], "to_nodes": Optional[List[str]], "node_names": Optional[List[str]], "from_inputs": Optional[List[str]], "to_outputs": Optional[List[str]], "load_versions": Optional[List[str]], "runtime_params": Optional[Dict[str, Any]] "pipeline_name": str, "namespace": Optional[str], "runner": str, }
-
pipeline
(Pipeline
) –The
Pipeline
that will be run. -
catalog
(CatalogProtocol
) –An implemented instance of
CatalogProtocol
to be used during the run.
Source code in kedro/framework/hooks/specs.py
135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 |
|
on_pipeline_error ¶
on_pipeline_error(error, run_params, pipeline, catalog)
Hook to be invoked if a pipeline run throws an uncaught Exception.
The signature of this error hook should match the signature of before_pipeline_run
along with the error that was raised.
Parameters:
-
error
(Exception
) –The uncaught exception thrown during the pipeline run.
-
run_params
(dict[str, Any]
) –The params used to run the pipeline. Should have the following schema::
{ "run_id": str "project_path": str, "env": str, "kedro_version": str, "tags": Optional[List[str]], "from_nodes": Optional[List[str]], "to_nodes": Optional[List[str]], "node_names": Optional[List[str]], "from_inputs": Optional[List[str]], "to_outputs": Optional[List[str]], "load_versions": Optional[List[str]], "runtime_params": Optional[Dict[str, Any]] "pipeline_name": str, "namespace": Optional[str], "runner": str, }
-
pipeline
(Pipeline
) –The
Pipeline
that will was run. -
catalog
(CatalogProtocol
) –An implemented instance of
CatalogProtocol
used during the run.
Source code in kedro/framework/hooks/specs.py
206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 |
|
_NullPluginManager ¶
_NullPluginManager(*args, **kwargs)
This class creates an empty hook_manager
that will ignore all calls to hooks,
allowing the runner to function if no hook_manager
has been instantiated.
Source code in kedro/framework/hooks/manager.py
114 115 |
|
_create_hook_manager ¶
_create_hook_manager()
Create a new PluginManager instance and register Kedro's hook specs.
Source code in kedro/framework/hooks/manager.py
26 27 28 29 30 31 32 33 34 35 36 37 38 |
|
_register_hooks ¶
_register_hooks(hook_manager, hooks)
Register all hooks as specified in hooks
with the global hook_manager
.
Parameters:
-
hook_manager
(PluginManager
) –Hook manager instance to register the hooks with.
-
hooks
(Iterable[Any]
) –Hooks that need to be registered.
Source code in kedro/framework/hooks/manager.py
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
|
_register_hooks_entry_points ¶
_register_hooks_entry_points(hook_manager, disabled_plugins)
Register pluggy hooks from python package entrypoints.
Parameters:
-
hook_manager
(PluginManager
) –Hook manager instance to register the hooks with.
-
disabled_plugins
(Iterable[str]
) –An iterable returning the names of plugins which hooks must not be registered; any already registered hooks will be unregistered.
Source code in kedro/framework/hooks/manager.py
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 |
|
kedro.framework.hooks.markers¶
kedro.framework.hooks.markers ¶
This module provides markers to declare Kedro's hook specs and implementations. For more information, please see Pluggy's documentation.
kedro.framework.hooks.specs¶
kedro.framework.hooks.specs ¶
A module containing specifications for all callable hooks in the Kedro's execution timeline. For more information about these specifications, please visit Pluggy's documentation
CatalogProtocol ¶
Bases: Protocol[_C, _DS]
__contains__ ¶
__contains__(ds_name)
Check if a dataset is in the catalog.
Source code in kedro/io/core.py
950 951 952 |
|
__getitem__ ¶
__getitem__(ds_name)
Get a dataset by name from an internal collection of datasets.
Source code in kedro/io/core.py
970 971 972 |
|
__iter__ ¶
__iter__()
Returns an iterator for the object.
Source code in kedro/io/core.py
966 967 968 |
|
__repr__ ¶
__repr__()
Returns the canonical string representation of the object.
Source code in kedro/io/core.py
946 947 948 |
|
__setitem__ ¶
__setitem__(key, value)
Adds dataset using the given key as a dataset name and the provided data as the value.
Source code in kedro/io/core.py
974 975 976 |
|
confirm ¶
confirm(name)
Confirm a dataset by its name.
Source code in kedro/io/core.py
999 1000 1001 |
|
exists ¶
exists(name)
Checks whether registered dataset exists by calling its exists()
method.
Source code in kedro/io/core.py
1003 1004 1005 |
|
from_config
classmethod
¶
from_config(catalog)
Create a catalog instance from configuration.
Source code in kedro/io/core.py
978 979 980 981 |
|
get ¶
get(key, fallback_to_runtime_pattern=False)
Get a dataset by name from an internal collection of datasets.
Source code in kedro/io/core.py
983 984 985 |
|
items ¶
items()
List all dataset names and datasets registered in the catalog.
Source code in kedro/io/core.py
962 963 964 |
|
keys ¶
keys()
List all dataset names registered in the catalog.
Source code in kedro/io/core.py
954 955 956 |
|
load ¶
load(name, version=None)
Load data from a registered dataset.
Source code in kedro/io/core.py
991 992 993 |
|
release ¶
release(name)
Release any cached data associated with a dataset.
Source code in kedro/io/core.py
995 996 997 |
|
save ¶
save(name, data)
Save data to a registered dataset.
Source code in kedro/io/core.py
987 988 989 |
|
values ¶
values()
List all datasets registered in the catalog.
Source code in kedro/io/core.py
958 959 960 |
|
DataCatalogSpecs ¶
Namespace that defines all specifications for a data catalog's lifecycle hooks.
after_catalog_created ¶
after_catalog_created(catalog, conf_catalog, conf_creds, parameters, save_version, load_versions)
Hooks to be invoked after a data catalog is created.
It receives the catalog
as well as
all the arguments for KedroContext._create_catalog
.
Parameters:
-
catalog
(CatalogProtocol
) –The catalog that was created.
-
conf_catalog
(dict[str, Any]
) –The config from which the catalog was created.
-
conf_creds
(dict[str, Any]
) –The credentials conf from which the catalog was created.
-
parameters
(dict[str, Any]
) –The parameters that are added to the catalog after creation.
-
save_version
(str
) –The save_version used in
save
operations for all datasets in the catalog. -
load_versions
(dict[str, str]
) –The load_versions used in
load
operations for each dataset in the catalog.
Source code in kedro/framework/hooks/specs.py
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
|
DatasetSpecs ¶
Namespace that defines all specifications for a dataset's lifecycle hooks.
after_dataset_loaded ¶
after_dataset_loaded(dataset_name, data, node)
Hook to be invoked after a dataset is loaded from the catalog.
Parameters:
-
dataset_name
(str
) –name of the dataset that was loaded from the catalog.
-
data
(Any
) –the actual data that was loaded from the catalog.
-
node
(Node
) –The
Node
to run.
Source code in kedro/framework/hooks/specs.py
260 261 262 263 264 265 266 267 268 269 |
|
after_dataset_saved ¶
after_dataset_saved(dataset_name, data, node)
Hook to be invoked after a dataset is saved in the catalog.
Parameters:
-
dataset_name
(str
) –name of the dataset that was saved to the catalog.
-
data
(Any
) –the actual data that was saved to the catalog.
-
node
(Node
) –The
Node
that ran.
Source code in kedro/framework/hooks/specs.py
282 283 284 285 286 287 288 289 290 291 |
|
before_dataset_loaded ¶
before_dataset_loaded(dataset_name, node)
Hook to be invoked before a dataset is loaded from the catalog.
Parameters:
-
dataset_name
(str
) –name of the dataset to be loaded from the catalog.
-
node
(Node
) –The
Node
to run.
Source code in kedro/framework/hooks/specs.py
250 251 252 253 254 255 256 257 258 |
|
before_dataset_saved ¶
before_dataset_saved(dataset_name, data, node)
Hook to be invoked before a dataset is saved to the catalog.
Parameters:
-
dataset_name
(str
) –name of the dataset to be saved to the catalog.
-
data
(Any
) –the actual data to be saved to the catalog.
-
node
(Node
) –The
Node
that ran.
Source code in kedro/framework/hooks/specs.py
271 272 273 274 275 276 277 278 279 280 |
|
KedroContext ¶
KedroContext
is the base class which holds the configuration and
Kedro's main functionality.
Create a context object by providing the root of a Kedro project and
the environment configuration subfolders (see kedro.config.OmegaConfigLoader
)
Raises:
KedroContextError: If there is a mismatch
between Kedro project version and package version.
Args:
project_path: Project path to define the context for.
config_loader: Kedro's OmegaConfigLoader
for loading the configuration files.
env: Optional argument for configuration default environment to be used
for running the pipeline. If not specified, it defaults to "local".
package_name: Package name for the Kedro project the context is
created for.
hook_manager: The PluginManager
to activate hooks, supplied by the session.
runtime_params: Optional dictionary containing runtime project parameters.
If specified, will update (and therefore take precedence over)
the parameters retrieved from the project configuration.
catalog
property
¶
catalog
Read-only property referring to Kedro's catalog` for this context.
Returns:
-
CatalogProtocol
–catalog defined in
catalog.yml
.
Raises: KedroContextError: Incorrect catalog registered for the project.
params
property
¶
params
Read-only property referring to Kedro's parameters for this context.
Returns:
-
dict[str, Any]
–Parameters defined in
parameters.yml
with the addition of any extra parameters passed at initialization.
KedroContextSpecs ¶
Namespace that defines all specifications for a Kedro context's lifecycle hooks.
after_context_created ¶
after_context_created(context)
Hooks to be invoked after a KedroContext
is created. This is the earliest
hook triggered within a Kedro run. The KedroContext
stores useful information
such as credentials
, config_loader
and env
.
Parameters:
-
context
(KedroContext
) –The context that was created.
Source code in kedro/framework/hooks/specs.py
297 298 299 300 301 302 303 304 305 306 307 308 |
|
Node ¶
Node(func, inputs, outputs, *, name=None, tags=None, confirms=None, namespace=None)
Node
is an auxiliary class facilitating the operations required to
run user-provided functions as part of Kedro pipelines.
Parameters:
-
func
(Callable
) –A function that corresponds to the node logic. The function should have at least one input or output.
-
inputs
(str | list[str] | dict[str, str] | None
) –The name or the list of the names of variables used as inputs to the function. The number of names should match the number of arguments in the definition of the provided function. When dict[str, str] is provided, variable names will be mapped to function argument names.
-
outputs
(str | list[str] | dict[str, str] | None
) –The name or the list of the names of variables used as outputs of the function. The number of names should match the number of outputs returned by the provided function. When dict[str, str] is provided, variable names will be mapped to the named outputs the function returns.
-
name
(str | None
, default:None
) –Optional node name to be used when displaying the node in logs or any other visualisations. Valid node name must contain only letters, digits, hyphens, underscores and/or fullstops.
-
tags
(str | Iterable[str] | None
, default:None
) –Optional set of tags to be applied to the node. Valid node tag must contain only letters, digits, hyphens, underscores and/or fullstops.
-
confirms
(str | list[str] | None
, default:None
) –Optional name or the list of the names of the datasets that should be confirmed. This will result in calling
confirm()
method of the corresponding dataset instance. Specified dataset names do not necessarily need to be present in the nodeinputs
oroutputs
. -
namespace
(str | None
, default:None
) –Optional node namespace.
Raises:
-
ValueError
–Raised in the following cases: a) When the provided arguments do not conform to the format suggested by the type hint of the argument. b) When the node produces multiple outputs with the same name. c) When an input has the same name as an output. d) When the given node name violates the requirements: it must contain only letters, digits, hyphens, underscores and/or fullstops.
Source code in kedro/pipeline/node.py
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 |
|
confirms
property
¶
confirms
Return dataset names to confirm as a list.
Returns:
-
list[str]
–Dataset names to confirm as a list.
func
property
writable
¶
func
Exposes the underlying function of the node.
Returns:
-
Callable
–Return the underlying function of the node.
inputs
cached
property
¶
inputs
Return node inputs as a list, in the order required to bind them properly to the node's function.
Returns:
-
list[str]
–Node input names as a list.
name
property
¶
name
Node's name.
Returns:
-
str
–Node's name if provided or the name of its function.
namespace
property
¶
namespace
Node's namespace.
Returns:
-
str | None
–String representing node's namespace, typically from outer to inner scopes.
outputs
property
¶
outputs
Return node outputs as a list preserving the original order if possible.
Returns:
-
list[str]
–Node output names as a list.
short_name
property
¶
short_name
Node's name.
Returns:
-
str
–Returns a short, user-friendly name that is not guaranteed to be unique.
-
str
–The namespace is stripped out of the node name.
tags
property
¶
tags
Return the tags assigned to the node.
Returns:
-
set[str]
–Return the set of all assigned tags to the node.
run ¶
run(inputs=None)
Run this node using the provided inputs and return its results in a dictionary.
Parameters:
-
inputs
(dict[str, Any] | None
, default:None
) –Dictionary of inputs as specified at the creation of the node.
Raises:
-
ValueError
–In the following cases: a) The node function inputs are incompatible with the node input definition. Example 1: node definition input is a list of 2 DataFrames, whereas only 1 was provided or 2 different ones were provided. b) The node function outputs are incompatible with the node output definition. Example 1: node function definition is a dictionary, whereas function returns a list. Example 2: node definition output is a list of 5 strings, whereas the function returns a list of 4 objects.
-
Exception
–Any exception thrown during execution of the node.
Returns:
-
dict[str, Any]
–All produced node outputs are returned in a dictionary, where the
-
dict[str, Any]
–keys are defined by the node outputs.
Source code in kedro/pipeline/node.py
373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 |
|
tag ¶
tag(tags)
Create a new Node
which is an exact copy of the current one,
but with more tags added to it.
Parameters:
-
tags
(str | Iterable[str]
) –The tags to be added to the new node.
Returns:
-
Node
–A copy of the current
Node
object with the tags added.
Source code in kedro/pipeline/node.py
281 282 283 284 285 286 287 288 289 290 291 292 |
|
NodeSpecs ¶
Namespace that defines all specifications for a node's lifecycle hooks.
after_node_run ¶
after_node_run(node, catalog, inputs, outputs, is_async, run_id)
Hook to be invoked after a node runs.
Parameters:
-
node
(Node
) –The
Node
that ran. -
catalog
(CatalogProtocol
) –An implemented instance of
CatalogProtocol
containing the node's inputs and outputs. -
inputs
(dict[str, Any]
) –The dictionary of inputs dataset. The keys are dataset names and the values are the actual loaded input data, not the dataset instance.
-
outputs
(dict[str, Any]
) –The dictionary of outputs dataset. The keys are dataset names and the values are the actual computed output data, not the dataset instance.
-
is_async
(bool
) –Whether the node was run in
async
mode. -
run_id
(str
) –The id of the run.
Source code in kedro/framework/hooks/specs.py
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
|
before_node_run ¶
before_node_run(node, catalog, inputs, is_async, run_id)
Hook to be invoked before a node runs.
Parameters:
-
node
(Node
) –The
Node
to run. -
catalog
(CatalogProtocol
) –An implemented instance of
CatalogProtocol
containing the node's inputs and outputs. -
inputs
(dict[str, Any]
) –The dictionary of inputs dataset. The keys are dataset names and the values are the actual loaded input data, not the dataset instance.
-
is_async
(bool
) –Whether the node was run in
async
mode. -
run_id
(str
) –The id of the run.
Returns:
-
dict[str, Any] | None
–Either None or a dictionary mapping dataset name(s) to new value(s). If returned, this dictionary will be used to update the node inputs, which allows to overwrite the node inputs.
Source code in kedro/framework/hooks/specs.py
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
|
on_node_error ¶
on_node_error(error, node, catalog, inputs, is_async, run_id)
Hook to be invoked if a node run throws an uncaught error.
The signature of this error hook should match the signature of before_node_run
along with the error that was raised.
Parameters:
-
error
(Exception
) –The uncaught exception thrown during the node run.
-
node
(Node
) –The
Node
to run. -
catalog
(CatalogProtocol
) –An implemented instance of
CatalogProtocol
containing the node's inputs and outputs. -
inputs
(dict[str, Any]
) –The dictionary of inputs dataset. The keys are dataset names and the values are the actual loaded input data, not the dataset instance.
-
is_async
(bool
) –Whether the node was run in
async
mode. -
run_id
(str
) –The id of the run.
Source code in kedro/framework/hooks/specs.py
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
|
Pipeline ¶
Pipeline(nodes, *, inputs=None, outputs=None, parameters=None, tags=None, namespace=None, prefix_datasets_with_namespace=True)
A Pipeline
defined as a collection of Node
objects. This class
treats nodes as part of a graph representation and provides inputs,
outputs and execution order.
Parameters:
-
nodes
(Iterable[Node | Pipeline] | Pipeline
) –The iterable of nodes the
Pipeline
will be made of. If you provide pipelines among the list of nodes, those pipelines will be expanded and all their nodes will become part of this new pipeline. -
inputs
(str | set[str] | dict[str, str] | None
, default:None
) –A name or collection of input names to be exposed as connection points to other pipelines upstream. This is optional; if not provided, the pipeline inputs are automatically inferred from the pipeline structure. When str or set[str] is provided, the listed input names will stay the same as they are named in the provided pipeline. When dict[str, str] is provided, current input names will be mapped to new names. Must only refer to the pipeline's free inputs.
-
outputs
(str | set[str] | dict[str, str] | None
, default:None
) –A name or collection of names to be exposed as connection points to other pipelines downstream. This is optional; if not provided, the pipeline outputs are automatically inferred from the pipeline structure. When str or set[str] is provided, the listed output names will stay the same as they are named in the provided pipeline. When dict[str, str] is provided, current output names will be mapped to new names. Can refer to both the pipeline's free outputs, as well as intermediate results that need to be exposed.
-
parameters
(str | set[str] | dict[str, str] | None
, default:None
) –A name or collection of parameters to namespace. When str or set[str] are provided, the listed parameter names will stay the same as they are named in the provided pipeline. When dict[str, str] is provided, current parameter names will be mapped to new names. The parameters can be specified without the
params:
prefix. -
tags
(str | Iterable[str] | None
, default:None
) –Optional set of tags to be applied to all the pipeline nodes.
-
namespace
(str | None
, default:None
) –A prefix to give to all dataset names, except those explicitly named with the
inputs
/outputs
arguments, and parameter references (params:
andparameters
). -
prefix_datasets_with_namespace
(bool
, default:True
) –A flag to specify if the inputs, outputs, and parameters of the nodes should be prefixed with the namespace. It is set to True by default. It is useful to turn off when namespacing is used for grouping nodes for deployment purposes.
Raises:
-
ValueError
–When an empty list of nodes is provided, or when not all nodes have unique names.
-
CircularDependencyError
–When visiting all the nodes is not possible due to the existence of a circular dependency.
-
OutputNotUniqueError
–When multiple
Node
instances produce the same output. -
ConfirmNotUniqueError
–When multiple
Node
instances attempt to confirm the same dataset. -
PipelineError
–When inputs, outputs or parameters are incorrectly specified, or they do not exist on the original pipeline.
Example: ::
>>> from kedro.pipeline import Pipeline
>>> from kedro.pipeline import node
>>>
>>> # In the following scenario first_ds and second_ds
>>> # are datasets provided by io. Pipeline will pass these
>>> # datasets to first_node function and provides the result
>>> # to the second_node as input.
>>>
>>> def first_node(first_ds, second_ds):
>>> return dict(third_ds=first_ds+second_ds)
>>>
>>> def second_node(third_ds):
>>> return third_ds
>>>
>>> pipeline = Pipeline([
>>> node(first_node, ['first_ds', 'second_ds'], ['third_ds']),
>>> node(second_node, dict(third_ds='third_ds'), 'fourth_ds')])
>>>
>>> pipeline.describe()
>>>
Source code in kedro/pipeline/pipeline.py
140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 |
|
grouped_nodes
property
¶
grouped_nodes
Return a list of the pipeline nodes in topologically ordered groups, i.e. if node A needs to be run before node B, it will appear in an earlier group.
Returns:
-
list[list[Node]]
–The pipeline nodes in topologically ordered groups.
node_dependencies
property
¶
node_dependencies
nodes
property
¶
nodes
Return a list of the pipeline nodes in topological order, i.e. if node A needs to be run before node B, it will appear earlier in the list.
Returns:
-
list[Node]
–The list of all pipeline nodes in topological order.
__repr__ ¶
__repr__()
Pipeline ([node1, ..., node10 ...], name='pipeline_name')
Source code in kedro/pipeline/pipeline.py
342 343 344 345 346 347 348 349 350 351 352 |
|
all_inputs ¶
all_inputs()
All inputs for all nodes in the pipeline.
Returns:
-
set[str]
–All node input names as a Set.
Source code in kedro/pipeline/pipeline.py
379 380 381 382 383 384 385 386 |
|
all_outputs ¶
all_outputs()
All outputs of all nodes in the pipeline.
Returns:
-
set[str]
–All node outputs.
Source code in kedro/pipeline/pipeline.py
388 389 390 391 392 393 394 395 |
|
datasets ¶
datasets()
The names of all datasets used by the Pipeline
,
including inputs and outputs.
Returns:
-
set[str]
–The set of all pipeline datasets.
Source code in kedro/pipeline/pipeline.py
426 427 428 429 430 431 432 433 434 |
|
describe ¶
describe(names_only=True)
Obtain the order of execution and expected free input variables in a loggable pre-formatted string. The order of nodes matches the order of execution given by the topological sort.
Parameters:
-
names_only
(bool
, default:True
) –The flag to describe names_only pipeline with just node names.
Example: ::
>>> pipeline = Pipeline([ ... ])
>>>
>>> logger = logging.getLogger(__name__)
>>>
>>> logger.info(pipeline.describe())
After invocation the following will be printed as an info level log statement: ::
#### Pipeline execution order ####
Inputs: C, D
func1([C]) -> [A]
func2([D]) -> [B]
func3([A, D]) -> [E]
Outputs: B, E
##################################
Returns:
-
str
–The pipeline description as a formatted string.
Source code in kedro/pipeline/pipeline.py
439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 |
|
filter ¶
filter(tags=None, from_nodes=None, to_nodes=None, node_names=None, from_inputs=None, to_outputs=None, node_namespaces=None)
Creates a new Pipeline
object with the nodes that meet all of the
specified filtering conditions.
The new pipeline object is the intersection of pipelines that meet each filtering condition. This is distinct from chaining multiple filters together.
Parameters:
-
tags
(Iterable[str] | None
, default:None
) –A list of node tags which should be used to lookup the nodes of the new
Pipeline
. -
from_nodes
(Iterable[str] | None
, default:None
) –A list of node names which should be used as a starting point of the new
Pipeline
. -
to_nodes
(Iterable[str] | None
, default:None
) –A list of node names which should be used as an end point of the new
Pipeline
. -
node_names
(Iterable[str] | None
, default:None
) –A list of node names which should be selected for the new
Pipeline
. -
from_inputs
(Iterable[str] | None
, default:None
) –A list of inputs which should be used as a starting point of the new
Pipeline
-
to_outputs
(Iterable[str] | None
, default:None
) –A list of outputs which should be the final outputs of the new
Pipeline
. -
node_namespaces
(Iterable[str] | None
, default:None
) –A list of node namespaces which should be used to select nodes in the new
Pipeline
.
Returns:
-
Pipeline
–A new
Pipeline
object with nodes that meet all of the specified filtering conditions.
Raises:
-
ValueError
–The filtered
Pipeline
has no nodes.
Example: ::
>>> pipeline = Pipeline(
>>> [
>>> node(func, "A", "B", name="node1"),
>>> node(func, "B", "C", name="node2"),
>>> node(func, "C", "D", name="node3"),
>>> ]
>>> )
>>> pipeline.filter(node_names=["node1", "node3"], from_inputs=["A"])
>>> # Gives a new pipeline object containing node1 and node3.
Source code in kedro/pipeline/pipeline.py
940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 |
|
from_inputs ¶
from_inputs(*inputs)
Create a new Pipeline
object with the nodes which depend
directly or transitively on the provided inputs.
If provided a name, but no format, for a transcoded input, it
includes all the nodes that use inputs with that name, otherwise it
matches to the fully-qualified name only (i.e. name@format).
Parameters:
-
*inputs
(str
, default:()
) –A list of inputs which should be used as a starting point of the new
Pipeline
Raises:
-
ValueError
–Raised when any of the given inputs do not exist in the
Pipeline
object.
Returns:
-
Pipeline
–A new
Pipeline
object, containing a subset of the nodes of the current one such that only nodes depending directly or transitively on the provided inputs are being copied.
Source code in kedro/pipeline/pipeline.py
777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 |
|
from_nodes ¶
from_nodes(*node_names)
Create a new Pipeline
object with the nodes which depend
directly or transitively on the provided nodes.
Parameters:
-
*node_names
(str
, default:()
) –A list of node_names which should be used as a starting point of the new
Pipeline
.
Raises:
ValueError: Raised when any of the given names do not exist in the
Pipeline
object.
Returns:
A new Pipeline
object, containing a subset of the nodes of
the current one such that only nodes depending directly or
transitively on the provided nodes are being copied.
Source code in kedro/pipeline/pipeline.py
881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 |
|
group_nodes_by ¶
group_nodes_by(group_by='namespace')
Return a list of grouped nodes based on the specified strategy.
Parameters:
-
group_by
(str | None
, default:'namespace'
) –Strategy for grouping. Supported values: - "namespace": Groups nodes by their top-level namespace. - None or "none": No grouping, each node is its own group.
Returns:
-
list[GroupedNodes]
–A list of GroupedNodes instances.
Source code in kedro/pipeline/pipeline.py
549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 |
|
inputs ¶
inputs()
The names of free inputs that must be provided at runtime so that the pipeline is runnable. Does not include intermediate inputs which are produced and consumed by the inner pipeline nodes. Resolves transcoded names where necessary.
Returns:
-
set[str]
–The set of free input names needed by the pipeline.
Source code in kedro/pipeline/pipeline.py
403 404 405 406 407 408 409 410 411 412 413 |
|
only_nodes ¶
only_nodes(*node_names)
Create a new Pipeline
which will contain only the specified
nodes by name.
Parameters:
-
*node_names
(str
, default:()
) –One or more node names. The returned
Pipeline
will only contain these nodes.
Raises:
-
ValueError
–When some invalid node name is given.
Returns:
-
Pipeline
–A new
Pipeline
, containing onlynodes
.
Source code in kedro/pipeline/pipeline.py
612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 |
|
only_nodes_with_inputs ¶
only_nodes_with_inputs(*inputs)
Create a new Pipeline
object with the nodes which depend
directly on the provided inputs.
If provided a name, but no format, for a transcoded input, it
includes all the nodes that use inputs with that name, otherwise it
matches to the fully-qualified name only (i.e. name@format).
Parameters:
-
*inputs
(str
, default:()
) –A list of inputs which should be used as a starting point of the new
Pipeline
.
Raises:
-
ValueError
–Raised when any of the given inputs do not exist in the
Pipeline
object.
Returns:
-
Pipeline
–A new
Pipeline
object, containing a subset of the nodes of the current one such that only nodes depending directly on the provided inputs are being copied.
Source code in kedro/pipeline/pipeline.py
751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 |
|
only_nodes_with_namespaces ¶
only_nodes_with_namespaces(node_namespaces)
Creates a new Pipeline
containing only nodes with the specified
namespaces.
Parameters:
-
node_namespaces
(list[str]
) –A list of node namespaces.
Raises:
-
ValueError
–When pipeline contains no nodes with the specified namespaces.
Returns:
-
Pipeline
–A new
Pipeline
containing nodes with the specified namespaces.
Source code in kedro/pipeline/pipeline.py
651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 |
|
only_nodes_with_outputs ¶
only_nodes_with_outputs(*outputs)
Create a new Pipeline
object with the nodes which are directly
required to produce the provided outputs.
If provided a name, but no format, for a transcoded dataset, it
includes all the nodes that output to that name, otherwise it matches
to the fully-qualified name only (i.e. name@format).
Parameters:
-
*outputs
(str
, default:()
) –A list of outputs which should be the final outputs of the new
Pipeline
.
Raises:
-
ValueError
–Raised when any of the given outputs do not exist in the
Pipeline
object.
Returns:
-
Pipeline
–A new
Pipeline
object, containing a subset of the nodes of the -
Pipeline
–current one such that only nodes which are directly required to
-
Pipeline
–produce the provided outputs are being copied.
Source code in kedro/pipeline/pipeline.py
817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 |
|
only_nodes_with_tags ¶
only_nodes_with_tags(*tags)
Creates a new Pipeline
object with the nodes which contain any
of the provided tags. The resulting Pipeline
is empty if no tags
are provided.
Parameters:
-
*tags
(str
, default:()
) –A list of node tags which should be used to lookup the nodes of the new
Pipeline
.
Returns:
Pipeline: A new Pipeline
object, containing a subset of the
nodes of the current one such that only nodes containing any
of the tags provided are being copied.
Source code in kedro/pipeline/pipeline.py
923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 |
|
outputs ¶
outputs()
The names of outputs produced when the whole pipeline is run. Does not include intermediate outputs that are consumed by other pipeline nodes. Resolves transcoded names where necessary.
Returns:
-
set[str]
–The set of final pipeline outputs.
Source code in kedro/pipeline/pipeline.py
415 416 417 418 419 420 421 422 423 424 |
|
tag ¶
tag(tags)
Tags all the nodes in the pipeline.
Parameters:
-
tags
(str | Iterable[str]
) –The tags to be added to the nodes.
Returns:
-
Pipeline
–New
Pipeline
object with nodes tagged.
Source code in kedro/pipeline/pipeline.py
1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 |
|
to_json ¶
to_json()
Return a json representation of the pipeline.
Source code in kedro/pipeline/pipeline.py
1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 |
|
to_nodes ¶
to_nodes(*node_names)
Create a new Pipeline
object with the nodes required directly
or transitively by the provided nodes.
Parameters:
-
*node_names
(str
, default:()
) –A list of node_names which should be used as an end point of the new
Pipeline
.
Raises:
ValueError: Raised when any of the given names do not exist in the
Pipeline
object.
Returns:
A new Pipeline
object, containing a subset of the nodes of the
current one such that only nodes required directly or
transitively by the provided nodes are being copied.
Source code in kedro/pipeline/pipeline.py
902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 |
|
to_outputs ¶
to_outputs(*outputs)
Create a new Pipeline
object with the nodes which are directly
or transitively required to produce the provided outputs.
If provided a name, but no format, for a transcoded dataset, it
includes all the nodes that output to that name, otherwise it matches
to the fully-qualified name only (i.e. name@format).
Parameters:
-
*outputs
(str
, default:()
) –A list of outputs which should be the final outputs of the new
Pipeline
.
Raises:
-
ValueError
–Raised when any of the given outputs do not exist in the
Pipeline
object.
Returns:
-
Pipeline
–A new
Pipeline
object, containing a subset of the nodes of the -
Pipeline
–current one such that only nodes which are directly or transitively
-
Pipeline
–required to produce the provided outputs are being copied.
Source code in kedro/pipeline/pipeline.py
842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 |
|
PipelineSpecs ¶
Namespace that defines all specifications for a pipeline's lifecycle hooks.
after_pipeline_run ¶
after_pipeline_run(run_params, run_result, pipeline, catalog)
Hook to be invoked after a pipeline runs.
Parameters:
-
run_params
(dict[str, Any]
) –The params used to run the pipeline. Should have the following schema::
{ "run_id": str "project_path": str, "env": str, "kedro_version": str, "tags": Optional[List[str]], "from_nodes": Optional[List[str]], "to_nodes": Optional[List[str]], "node_names": Optional[List[str]], "from_inputs": Optional[List[str]], "to_outputs": Optional[List[str]], "load_versions": Optional[List[str]], "runtime_params": Optional[Dict[str, Any]] "pipeline_name": str, "namespace": Optional[str], "runner": str, }
-
run_result
(dict[str, Any]
) –The output of
Pipeline
run. -
pipeline
(Pipeline
) –The
Pipeline
that was run. -
catalog
(CatalogProtocol
) –An implemented instance of
CatalogProtocol
used during the run.
Source code in kedro/framework/hooks/specs.py
168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 |
|
before_pipeline_run ¶
before_pipeline_run(run_params, pipeline, catalog)
Hook to be invoked before a pipeline runs.
Parameters:
-
run_params
(dict[str, Any]
) –The params used to run the pipeline. Should have the following schema::
{ "run_id": str "project_path": str, "env": str, "kedro_version": str, "tags": Optional[List[str]], "from_nodes": Optional[List[str]], "to_nodes": Optional[List[str]], "node_names": Optional[List[str]], "from_inputs": Optional[List[str]], "to_outputs": Optional[List[str]], "load_versions": Optional[List[str]], "runtime_params": Optional[Dict[str, Any]] "pipeline_name": str, "namespace": Optional[str], "runner": str, }
-
pipeline
(Pipeline
) –The
Pipeline
that will be run. -
catalog
(CatalogProtocol
) –An implemented instance of
CatalogProtocol
to be used during the run.
Source code in kedro/framework/hooks/specs.py
135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 |
|
on_pipeline_error ¶
on_pipeline_error(error, run_params, pipeline, catalog)
Hook to be invoked if a pipeline run throws an uncaught Exception.
The signature of this error hook should match the signature of before_pipeline_run
along with the error that was raised.
Parameters:
-
error
(Exception
) –The uncaught exception thrown during the pipeline run.
-
run_params
(dict[str, Any]
) –The params used to run the pipeline. Should have the following schema::
{ "run_id": str "project_path": str, "env": str, "kedro_version": str, "tags": Optional[List[str]], "from_nodes": Optional[List[str]], "to_nodes": Optional[List[str]], "node_names": Optional[List[str]], "from_inputs": Optional[List[str]], "to_outputs": Optional[List[str]], "load_versions": Optional[List[str]], "runtime_params": Optional[Dict[str, Any]] "pipeline_name": str, "namespace": Optional[str], "runner": str, }
-
pipeline
(Pipeline
) –The
Pipeline
that will was run. -
catalog
(CatalogProtocol
) –An implemented instance of
CatalogProtocol
used during the run.
Source code in kedro/framework/hooks/specs.py
206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 |
|