xpark.dataset.DatasetContext#

class xpark.dataset.DatasetContext(target_max_block_size: int | None = 134217728, target_min_block_size: int = 1048576, streaming_read_buffer_size: int = 33554432, enable_pandas_block: bool = True, actor_prefetcher_enabled: bool = False, iter_get_block_batch_size: int = 32, autoscaling_config: ray.data.context.AutoscalingConfig = <factory>, use_push_based_shuffle: bool = False, _shuffle_strategy: ray.data.context.ShuffleStrategy = <ShuffleStrategy.HASH_SHUFFLE: 'hash_shuffle'>, pipeline_push_based_shuffle_reduce_tasks: bool = True, default_hash_shuffle_parallelism: int = 200, max_hash_shuffle_aggregators: int | None = None, min_hash_shuffle_aggregator_wait_time_in_s: int = 300, hash_shuffle_aggregator_health_warning_interval_s: int = 30, max_hash_shuffle_finalization_batch_size: int | None = None, join_operator_actor_num_cpus_override: float = None, hash_shuffle_operator_actor_num_cpus_override: float = None, hash_aggregate_operator_actor_num_cpus_override: float = None, scheduling_strategy: Union[NoneType, str, ray.util.scheduling_strategies.PlacementGroupSchedulingStrategy, ray.util.scheduling_strategies.NodeAffinitySchedulingStrategy, ray.util.scheduling_strategies.NodeLabelSchedulingStrategy]='SPREAD', scheduling_strategy_large_args: Union[NoneType, str, ray.util.scheduling_strategies.PlacementGroupSchedulingStrategy, ray.util.scheduling_strategies.NodeAffinitySchedulingStrategy, ray.util.scheduling_strategies.NodeLabelSchedulingStrategy]='DEFAULT', large_args_threshold: int = 52428800, use_polars: bool = False, use_polars_sort: bool = False, eager_free: bool = False, decoding_size_estimation: bool = True, min_parallelism: int = 200, read_op_min_num_blocks: int = 200, enable_tensor_extension_casting: bool = True, use_arrow_tensor_v2: bool = True, enable_fallback_to_arrow_object_ext_type: bool | None = None, enable_auto_log_stats: bool = False, verbose_stats_logs: bool = False, trace_allocations: bool = False, execution_options: 'ExecutionOptions' = <factory>, use_ray_tqdm: bool = True, enable_progress_bars: bool = True, enable_operator_progress_bars: bool = True, enable_progress_bar_name_truncation: bool = True, enable_rich_progress_bars: bool = False, progress_bar_log_interval: int = 5, enable_get_object_locations_for_metrics: bool = False, write_file_retry_on_errors: List[str] = ('AWS Error INTERNAL_FAILURE', 'AWS Error NETWORK_CONNECTION', 'AWS Error SLOW_DOWN', 'AWS Error UNKNOWN (HTTP status 503)'), warn_on_driver_memory_usage_bytes: int = 2147483648, actor_task_retry_on_errors: Union[bool, List[BaseException]]=False, actor_init_retry_on_errors: bool = False, actor_init_max_retries: int = 3, op_resource_reservation_enabled: bool = True, op_resource_reservation_ratio: float = 0.5, max_errored_blocks: int = 0, log_internal_stack_trace_to_stdout: bool = False, raise_original_map_exception: bool = False, print_on_execution_start: bool = True, s3_try_create_dir: bool = False, wait_for_min_actors_s: int = -1, max_tasks_in_flight_per_actor: int | None = None, retried_io_errors: List[str] = <factory>, enable_per_node_metrics: bool = False, override_object_store_memory_limit_fraction: float = None, memory_usage_poll_interval_s: float | None = 1, dataset_logger_id: str | None = None, _enable_actor_pool_on_exit_hook: bool = False, issue_detectors_config: 'IssueDetectorsConfiguration' = <factory>, downstream_capacity_backpressure_ratio: float = None, downstream_capacity_backpressure_max_queued_bundles: int = None, enable_dynamic_output_queue_size_backpressure: bool = False, enforce_schemas: bool = False, pandas_block_ignore_metadata: bool = False, storage_options: 'dict[str, Any]'=<factory>)[source]#

Methods

copy()

Create a copy of the current DataContext.

get_config(key[, default])

Get the value for a key-value style config.

get_current()

Get or create the current DataContext.

remove_config(key)

Remove a key-value style config.

set_config(key, value)

Set the value for a key-value style config.

set_dataset_logger_id(dataset_id)

Set the current dataset logger id.

Attributes

actor_init_max_retries

actor_init_retry_on_errors

actor_prefetcher_enabled

actor_task_retry_on_errors

decoding_size_estimation

default_hash_shuffle_parallelism

eager_free

enable_auto_log_stats

enable_dynamic_output_queue_size_backpressure

enable_get_object_locations_for_metrics

enable_operator_progress_bars

enable_pandas_block

enable_per_node_metrics

enable_progress_bar_name_truncation

enable_progress_bars

enable_rich_progress_bars

enable_tensor_extension_casting

enforce_schemas

hash_shuffle_aggregator_health_warning_interval_s

iter_get_block_batch_size

large_args_threshold

log_internal_stack_trace_to_stdout

max_errored_blocks

memory_usage_poll_interval_s

min_hash_shuffle_aggregator_wait_time_in_s

min_parallelism

op_resource_reservation_enabled

op_resource_reservation_ratio

pandas_block_ignore_metadata

pipeline_push_based_shuffle_reduce_tasks

print_on_execution_start

progress_bar_log_interval

raise_original_map_exception

read_op_min_num_blocks

s3_try_create_dir

scheduling_strategy

scheduling_strategy_large_args

shuffle_strategy

streaming_read_buffer_size

target_max_block_size

target_min_block_size

trace_allocations

use_arrow_tensor_v2

use_polars

use_polars_sort

use_push_based_shuffle

use_ray_tqdm

verbose_stats_logs

wait_for_min_actors_s

warn_on_driver_memory_usage_bytes

write_file_retry_on_errors

storage_options

autoscaling_config

execution_options

retried_io_errors

issue_detectors_config

static get_current() DatasetContext[source]#

Get or create the current DataContext.

When a Dataset is created, the current DataContext will be sealed. Changes to DataContext.get_current() will not impact existing Datasets.

Examples

context = DataContext.get_current()

context.target_max_block_size = 100 * 1024 ** 2
ds1 = from_range(1)
context.target_max_block_size = 1 * 1024 ** 2
ds2 = from_range(1)

# ds1's target_max_block_size will be 100MB
ds1.take_all()
# ds2's target_max_block_size will be 1MB
ds2.take_all()

Developer notes: Avoid using DataContext.get_current() in data internal components, use the DataContext object captured in the Dataset and pass it around as arguments.

storage_options: dict[str, Any]#