Source code for xpark.dataset.context
from __future__ import annotations
from dataclasses import dataclass, field, fields
from typing import Any
from ray.data import context as _context
[docs]
@dataclass
class DatasetContext(_context.DatasetContext):
# This is much better than using the set_config / get_config API
storage_options: dict[str, Any] = field(default_factory=dict)
[docs]
@staticmethod
def get_current() -> "DatasetContext":
"""Get or create the current DataContext.
When a Dataset is created, the current DataContext will be sealed.
Changes to `DataContext.get_current()` will not impact existing Datasets.
Examples:
.. testcode::
from xpark.dataset import DatasetContext, from_range
context = DataContext.get_current()
context.target_max_block_size = 100 * 1024 ** 2
ds1 = from_range(1)
context.target_max_block_size = 1 * 1024 ** 2
ds2 = from_range(1)
# ds1's target_max_block_size will be 100MB
ds1.take_all()
# ds2's target_max_block_size will be 1MB
ds2.take_all()
Developer notes: Avoid using `DataContext.get_current()` in data
internal components, use the DataContext object captured in the
Dataset and pass it around as arguments.
"""
with _context._context_lock:
if _context._default_context is None:
_context._default_context = DatasetContext()
elif not isinstance(_context._default_context, DatasetContext):
new_context = DatasetContext()
for field in fields(_context._default_context):
setattr(new_context, field.name, getattr(_context._default_context, field.name))
_context._default_context = new_context
return _context._default_context