SharedMemoryDataCatalog
kedro.io.SharedMemoryDataCatalog ¶
SharedMemoryDataCatalog(datasets=None, config_resolver=None, load_versions=None, save_version=None)
Bases: DataCatalog
A specialized DataCatalog
for managing datasets in a shared memory context.
The SharedMemoryDataCatalog
extends the base DataCatalog
to support multiprocessing
by ensuring that datasets are serializable and synchronized across threads or processes.
It provides additional functionality for managing shared memory datasets, such as setting
a multiprocessing manager and validating dataset compatibility with multiprocessing.
Attributes:
-
default_runtime_patterns
(ClassVar
) –A dictionary defining the default runtime pattern for datasets of type
kedro.io.SharedMemoryDataset
.
Example: ::
>>> from multiprocessing.managers import SyncManager
>>> from kedro.io import MemoryDataset
>>> from kedro.io.data_catalog import SharedMemoryDataCatalog
>>>
>>> # Create a shared memory catalog
>>> catalog = SharedMemoryDataCatalog(
... datasets={"shared_data": MemoryDataset(data=[1, 2, 3])}
... )
>>>
>>> # Set a multiprocessing manager
>>> manager = SyncManager()
>>> manager.start()
>>> catalog.set_manager_datasets(manager)
>>>
>>> # Validate the catalog for multiprocessing compatibility
>>> catalog.validate_catalog()
Source code in kedro/io/data_catalog.py
207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 |
|
default_runtime_patterns
class-attribute
instance-attribute
¶
default_runtime_patterns = {'{default}': {'type': 'kedro.io.SharedMemoryDataset'}}
set_manager_datasets ¶
set_manager_datasets(manager)
Associate a multiprocessing manager with all shared memory datasets in the catalog.
This method iterates through all datasets in the catalog and sets the provided
multiprocessing manager for datasets of type SharedMemoryDataset
. This ensures
that these datasets are properly synchronized across threads or processes.
Parameters:
-
manager
(SyncManager
) –A multiprocessing manager to be associated with shared memory datasets.
Example: ::
>>> from multiprocessing.managers import SyncManager
>>> from kedro.io.data_catalog import SharedMemoryDataCatalog
>>>
>>> catalog = SharedMemoryDataCatalog(
... datasets={"shared_data": MemoryDataset(data=[1, 2, 3])}
... )
>>>
>>> manager = SyncManager()
>>> manager.start()
>>> catalog.set_manager_datasets(manager)
>>> print(catalog)
# {'shared_data': kedro.io.memory_dataset.MemoryDataset(data='<list>')}
Source code in kedro/io/data_catalog.py
1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 |
|
validate_catalog ¶
validate_catalog()
Validate the catalog to ensure all datasets are serializable and compatible with multiprocessing.
This method checks that all datasets in the catalog are serializable and do not include non-proxied memory datasets as outputs. Non-serializable datasets or datasets that rely on single-process memory cannot be used in a multiprocessing context. If any such datasets are found, an exception is raised with details.
Raises:
-
AttributeError
–If any datasets are found to be non-serializable or incompatible with multiprocessing.
Example: ::
>>> from kedro.io.data_catalog import SharedMemoryDataCatalog
>>>
>>> catalog = SharedMemoryDataCatalog(
... datasets={"shared_data": MemoryDataset(data=[1, 2, 3])}
... )
>>>
>>> try:
... catalog.validate_catalog()
... except AttributeError as e:
... print(f"Validation failed: {e}")
# No error
Source code in kedro/io/data_catalog.py
1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 |
|