Source code for xpark.dataset.context

from __future__ import annotations

from dataclasses import dataclass, field, fields
from typing import Any

from ray.data import context as _context


[docs] @dataclass class DatasetContext(_context.DatasetContext): # This is much better than using the set_config / get_config API storage_options: dict[str, Any] = field(default_factory=dict)
[docs] @staticmethod def get_current() -> "DatasetContext": """Get or create the current DataContext. When a Dataset is created, the current DataContext will be sealed. Changes to `DataContext.get_current()` will not impact existing Datasets. Examples: .. testcode:: from xpark.dataset import DatasetContext, from_range context = DataContext.get_current() context.target_max_block_size = 100 * 1024 ** 2 ds1 = from_range(1) context.target_max_block_size = 1 * 1024 ** 2 ds2 = from_range(1) # ds1's target_max_block_size will be 100MB ds1.take_all() # ds2's target_max_block_size will be 1MB ds2.take_all() Developer notes: Avoid using `DataContext.get_current()` in data internal components, use the DataContext object captured in the Dataset and pass it around as arguments. """ with _context._context_lock: if _context._default_context is None: _context._default_context = DatasetContext() elif not isinstance(_context._default_context, DatasetContext): new_context = DatasetContext() for field in fields(_context._default_context): setattr(new_context, field.name, getattr(_context._default_context, field.name)) _context._default_context = new_context return _context._default_context