Source code for xpark.dataset.read_api

from __future__ import annotations

from typing import TYPE_CHECKING, Any, Literal

if TYPE_CHECKING:
    import numpy as np
    import pyarrow.fs


import ray.data
from ray.data.datasource.file_based_datasource import FileShuffleConfig, _validate_shuffle_arg
from ray.data.read_api import _resolve_parquet_args, read_datasource

from xpark.dataset.dataset import Dataset
from xpark.dataset.datasource import Partitioning, PartitionStyle, PathPartitionFilter
from xpark.dataset.datasource.parquet_datasource import ParquetDatasource
from xpark.dataset.utils import copy_sig


[docs] @copy_sig(ray.data.from_arrow) def from_arrow(*args, **kwargs) -> Dataset: return Dataset(ray.data.from_arrow(*args, **kwargs))
@copy_sig(ray.data.from_arrow_refs) def from_arrow_refs(*args, **kwargs) -> Dataset: return Dataset(ray.data.from_arrow_refs(*args, **kwargs))
[docs] @copy_sig(ray.data.from_blocks) def from_blocks(*args, **kwargs) -> Dataset: return Dataset(ray.data.from_blocks(*args, **kwargs))
[docs] @copy_sig(ray.data.from_huggingface) def from_huggingface(*args, **kwargs) -> Dataset: return Dataset(ray.data.from_huggingface(*args, **kwargs))
[docs] @copy_sig(ray.data.from_items) def from_items(*args, **kwargs) -> Dataset: return Dataset(ray.data.from_items(*args, **kwargs))
[docs] @copy_sig(ray.data.range) def from_range(*args, **kwargs) -> Dataset: return Dataset(ray.data.range(*args, **kwargs))
[docs] @copy_sig(ray.data.range_tensor) def from_range_tensor(*args, **kwargs) -> Dataset: return Dataset(ray.data.range_tensor(*args, **kwargs))
[docs] @copy_sig(ray.data.from_pandas) def from_pandas(*args, **kwargs) -> Dataset: return Dataset(ray.data.from_pandas(*args, **kwargs))
[docs] @copy_sig(ray.data.from_numpy) def from_numpy(*args, **kwargs) -> Dataset: return Dataset(ray.data.from_numpy(*args, **kwargs))
[docs] @copy_sig(ray.data.read_json) def read_json(*args, **kwargs) -> Dataset: return Dataset(ray.data.read_json(*args, **kwargs))
[docs] @copy_sig(ray.data.read_audio) def read_audio(*args, **kwargs) -> Dataset: return Dataset(ray.data.read_audio(*args, **kwargs))
[docs] @copy_sig(ray.data.read_videos) def read_video(*args, **kwargs) -> Dataset: return Dataset(ray.data.read_videos(*args, **kwargs))
[docs] @copy_sig(ray.data.read_images) def read_image(*args, **kwargs) -> Dataset: return Dataset(ray.data.read_images(*args, **kwargs))
[docs] def read_parquet( paths: str | list[str], *, filesystem: pyarrow.fs.FileSystem | None = None, columns: list[str] | None = None, parallelism: int = -1, num_cpus: float | None = None, num_gpus: float | None = None, memory: float | None = None, ray_remote_args: dict[str, Any] | None = None, tensor_column_schema: dict[str, tuple[np.dtype, tuple[int, ...]]] | None = None, partition_filter: PathPartitionFilter | None = None, partitioning: Partitioning = Partitioning(PartitionStyle.HIVE), shuffle: Literal["files"] | FileShuffleConfig | None = None, include_paths: bool = False, file_extensions: list[str] | None = None, concurrency: int | None = None, override_num_blocks: int | None = None, dynamic_uid: str | None = None, dynamic_uid_start: int = 0, **arrow_parquet_args, ) -> Dataset: """Creates a :class:`~xpark.dataset.Dataset` from parquet files. Examples: Read a file in remote storage. >>> import xpark >>> ds = xpark.dataset.read_parquet("s3://anonymous@xpark-example-data/iris.parquet") >>> ds.schema() Column Type ------ ---- sepal.length double sepal.width double petal.length double petal.width double variety string Read a directory in remote storage. >>> ds = xpark.dataset.read_parquet("s3://anonymous@xpark-example-data/iris-parquet/") Read multiple local files. >>> xpark.dataset.read_parquet( ... ["local:///path/to/file1", "local:///path/to/file2"]) # doctest: +SKIP Specify a schema for the parquet file. >>> import pyarrow as pa >>> fields = [("sepal.length", pa.float32()), ... ("sepal.width", pa.float32()), ... ("petal.length", pa.float32()), ... ("petal.width", pa.float32()), ... ("variety", pa.string())] >>> ds = xpark.dataset.read_parquet("s3://anonymous@xpark-example-data/iris.parquet", ... schema=pa.schema(fields)) >>> ds.schema() Column Type ------ ---- sepal.length float sepal.width float petal.length float petal.width float variety string The Parquet reader also supports projection and filter pushdown, allowing column selection and row filtering to be pushed down to the file scan. .. testcode:: import pyarrow as pa # Create a Dataset by reading a Parquet file, pushing column selection and # row filtering down to the file scan. ds = xpark.dataset.read_parquet( "s3://anonymous@xpark-example-data/iris.parquet", columns=["sepal.length", "variety"], filter=pa.dataset.field("sepal.length") > 5.0, ) ds.show(2) .. testoutput:: {'sepal.length': 5.1, 'variety': 'Setosa'} {'sepal.length': 5.4, 'variety': 'Setosa'} For further arguments you can pass to PyArrow as a keyword argument, see the `PyArrow API reference <https://arrow.apache.org/docs/python/generated/\ pyarrow.dataset.Scanner.html#pyarrow.dataset.Scanner.from_fragment>`_. Args: paths: A single file path or directory, or a list of file paths. Multiple directories are not supported. filesystem: The PyArrow filesystem implementation to read from. These filesystems are specified in the `pyarrow docs <https://arrow.apache.org/docs/python/api/\ filesystems.html#filesystem-implementations>`_. Specify this parameter if you need to provide specific configurations to the filesystem. By default, the filesystem is automatically selected based on the scheme of the paths. For example, if the path begins with ``s3://``, the ``S3FileSystem`` is used. If ``None``, this function uses a system-chosen implementation. columns: A list of column names to read. Only the specified columns are read during the file scan. parallelism: This argument is deprecated. Use ``override_num_blocks`` argument. num_cpus: The number of CPUs to reserve for each parallel read worker. num_gpus: The number of GPUs to reserve for each parallel read worker. For example, specify `num_gpus=1` to request 1 GPU for each parallel read worker. memory: The heap memory in bytes to reserve for each parallel read worker. ray_remote_args: kwargs passed to :func:`ray.remote` in the read tasks. tensor_column_schema: A dict of column name to PyArrow dtype and shape mappings for converting a Parquet column containing serialized tensors (ndarrays) as their elements to PyArrow tensors. This function assumes that the tensors are serialized in the raw NumPy array format in C-contiguous order (e.g., via `arr.tobytes()`). partition_filter: A :class:`~xpark.dataset.datasource.partitioning.PathPartitionFilter`. Use with a custom callback to read only selected partitions of a dataset. partitioning: A :class:`~xpark.dataset.datasource.partitioning.Partitioning` object that describes how paths are organized. Defaults to HIVE partitioning. shuffle: If setting to "files", randomly shuffle input files order before read. If setting to :class:`~xpark.dataset.FileShuffleConfig`, you can pass a seed to shuffle the input files. Defaults to not shuffle with ``None``. arrow_parquet_args: Other parquet read options to pass to PyArrow. For the full set of arguments, see the `PyArrow API <https://arrow.apache.org/docs/\ python/generated/pyarrow.dataset.Scanner.html\ #pyarrow.dataset.Scanner.from_fragment>`_ include_paths: If ``True``, include the path to each file. File paths are stored in the ``'path'`` column. file_extensions: A list of file extensions to filter files by. concurrency: The maximum number of Xpark tasks to run concurrently. Set this to control number of tasks to run concurrently. This doesn't change the total number of tasks run or the total number of output blocks. By default, concurrency is dynamically decided based on the available resources. override_num_blocks: Override the number of output blocks from all read tasks. By default, the number of output blocks is dynamically decided based on input data size and available resources. You shouldn't manually set this value in most cases. dynamic_uid: If specified, a column with this name will be added to the dataset containing a contiguous integer unique identifier (int64) for each row, starting from 0. The UIDs are assigned based on file order and row position, ensuring that the same data will always produce the same UIDs across multiple reads. The UIDs will range from dynamic_uid_start to (dynamic_uid_start + total_rows - 1). dynamic_uid_start: Only available if dynamic_uid is specified. This is the first uid generated for this dataset. Returns: :class:`~xpark.dataset.Dataset` producing records read from the specified parquet files. """ _validate_shuffle_arg(shuffle) arrow_parquet_args = _resolve_parquet_args( tensor_column_schema, **arrow_parquet_args, ) dataset_kwargs = arrow_parquet_args.pop("dataset_kwargs", None) _block_udf = arrow_parquet_args.pop("_block_udf", None) schema = arrow_parquet_args.pop("schema", None) datasource = ParquetDatasource( paths, columns=columns, dataset_kwargs=dataset_kwargs, to_batch_kwargs=arrow_parquet_args, dynamic_uid=dynamic_uid, dynamic_uid_start=dynamic_uid_start, _block_udf=_block_udf, filesystem=filesystem, schema=schema, partition_filter=partition_filter, partitioning=partitioning, shuffle=shuffle, include_paths=include_paths, file_extensions=file_extensions, ) return Dataset( read_datasource( datasource, num_cpus=num_cpus, num_gpus=num_gpus, memory=memory, parallelism=parallelism, ray_remote_args=ray_remote_args, concurrency=concurrency, override_num_blocks=override_num_blocks, ) )
[docs] @copy_sig(ray.data.read_iceberg) def read_iceberg(*args, **kwargs) -> Dataset: return Dataset(ray.data.read_iceberg(*args, **kwargs))
[docs] @copy_sig(ray.data.read_lance) def read_lance(*args, **kwargs) -> Dataset: return Dataset(ray.data.read_lance(*args, **kwargs))
[docs] @copy_sig(ray.data.read_binary_files) def read_files(*args, **kwargs) -> Dataset: return Dataset(ray.data.read_binary_files(*args, **kwargs))
[docs] def read_lerobot( path: str, *, episodes: list[int] | None = None, columns: list[str] | None = None, include_video_paths: bool = True, decode_video: bool = False, parallelism: int = -1, num_cpus: float | None = None, num_gpus: float | None = None, memory: float | None = None, ray_remote_args: dict[str, Any] | None = None, concurrency: int | None = None, override_num_blocks: int | None = None, ) -> Dataset: """Creates a :class:`~xpark.dataset.Dataset` from a LeRobot format dataset. LeRobot is a framework for robot learning that stores datasets in a specific format with Parquet files for tabular data and MP4 videos for visual observations. This function reads LeRobot datasets from either HuggingFace Hub or local storage. Dataset Structure: LeRobot datasets are structured as:: dataset_root/ ├── meta/ │ ├── info.json # Dataset metadata (fps, features, etc.) │ ├── stats.json # Statistics for normalization │ ├── episodes.jsonl # Episode metadata │ └── tasks.jsonl # Task definitions ├── data/ │ └── chunk-000/ │ ├── episode_000000.parquet │ └── ... └── videos/ └── observation.images.top/ └── chunk-000/ ├── episode_000000.mp4 └── ... Examples: Read a dataset from HuggingFace Hub: >>> from xpark.dataset import read_lerobot >>> ds = read_lerobot("hf://datasets/lerobot/pusht") >>> ds.schema() Column Type ------ ---- observation.state list<float32> action list<float32> episode_index int64 frame_index int64 timestamp float32 observation.images.top_path string Read specific episodes: >>> ds = read_lerobot("hf://datasets/lerobot/pusht", episodes=[0, 1, 2]) Read specific columns: >>> ds = read_lerobot( ... "hf://datasets/lerobot/pusht", ... columns=["observation.state", "action", "episode_index"] ... ) Read from local path: >>> ds = xd.read_lerobot("/path/to/local/dataset") Read with video decoding: >>> ds = xd.read_lerobot("hf://datasets/lerobot/pusht", decode_video=True) Read from COS: >>> ds = xd.read_lerobot("cos://bucket/path/to/dataset") Args: path: Dataset path. Can be: - Local filesystem path (e.g., "/path/to/dataset") - Remote path with protocol prefix: - HuggingFace: "hf://datasets/lerobot/pusht" - COS: "cos://bucket/path" - S3: "s3://bucket/path" episodes: List of episode indices to load. If None, loads all episodes. This is useful for loading a subset of data for debugging or validation splits. columns: List of column names to read. If None, reads all columns. Common columns include "observation.state", "action", "episode_index", "frame_index", "timestamp". include_video_paths: If True, adds columns with video file paths for each video key in the dataset (e.g., "observation.images.top_path"). These paths can be used with video decoding processors. decode_video: If True, decode video frames into tensors (numpy arrays). Requires PyAV and PyTorch. When enabled, video columns will contain decoded frame data instead of path/timestamp references. parallelism: The requested parallelism of the read. Parallelism may be limited by the number of files in the dataset. Defaults to -1, which automatically determines the optimal parallelism. num_cpus: The number of CPUs to reserve for each parallel read task. num_gpus: The number of GPUs to reserve for each parallel read task. memory: The heap memory in bytes to reserve for each parallel read task. ray_remote_args: Additional kwargs passed to :func:`ray.remote` for the read tasks. concurrency: The maximum number of concurrent read tasks. override_num_blocks: Override the number of output blocks. Returns: :class:`~xpark.dataset.Dataset` containing the LeRobot dataset records. Raises: FileNotFoundError: If the dataset cannot be found locally and cannot be downloaded from HuggingFace Hub. ImportError: If huggingface_hub is not installed when trying to download a dataset from HuggingFace Hub. See Also: - LeRobot documentation: https://github.com/huggingface/lerobot """ from xpark.dataset.datasource.lerobot_datasource import LeRobotDatasource # Always read with decode_video=False in datasource # Video decoding is applied via with_column after dataset construction datasource = LeRobotDatasource( path=path, episodes=episodes, columns=columns, include_video_paths=include_video_paths or decode_video, ) ds = Dataset( read_datasource( datasource, parallelism=parallelism, num_cpus=num_cpus, num_gpus=num_gpus, memory=memory, ray_remote_args=ray_remote_args, concurrency=concurrency, override_num_blocks=override_num_blocks, ) ) # Apply video decoding via with_column if requested # This is more efficient than decoding inside read_fn if decode_video and datasource.metadata.video_keys: from xpark.dataset.expressions import col from xpark.dataset.processors.video_compute import VideoCompute for video_key in datasource.metadata.video_keys: path_col = f"{video_key}_path" from_ts_col = f"{video_key}_from_timestamp" # Decode video frame using VideoCompute.decode_frame_at # Add from_timestamp offset to get correct position in video file # This is needed for LeRobot v3.0+ where multiple episodes # are stored sequentially in a single video file ds = ds.with_column( video_key, VideoCompute.decode_frame_at( col(path_col), col("timestamp") + col(from_ts_col), ), ) return ds