from __future__ import annotations
from typing import TYPE_CHECKING, Any, Literal
if TYPE_CHECKING:
import numpy as np
import pyarrow.fs
import ray.data
from ray.data.datasource.file_based_datasource import FileShuffleConfig, _validate_shuffle_arg
from ray.data.read_api import _resolve_parquet_args, read_datasource
from xpark.dataset.dataset import Dataset
from xpark.dataset.datasource import Partitioning, PartitionStyle, PathPartitionFilter
from xpark.dataset.datasource.parquet_datasource import ParquetDatasource
from xpark.dataset.utils import copy_sig
[docs]
@copy_sig(ray.data.from_arrow)
def from_arrow(*args, **kwargs) -> Dataset:
return Dataset(ray.data.from_arrow(*args, **kwargs))
@copy_sig(ray.data.from_arrow_refs)
def from_arrow_refs(*args, **kwargs) -> Dataset:
return Dataset(ray.data.from_arrow_refs(*args, **kwargs))
[docs]
@copy_sig(ray.data.from_blocks)
def from_blocks(*args, **kwargs) -> Dataset:
return Dataset(ray.data.from_blocks(*args, **kwargs))
[docs]
@copy_sig(ray.data.from_huggingface)
def from_huggingface(*args, **kwargs) -> Dataset:
return Dataset(ray.data.from_huggingface(*args, **kwargs))
[docs]
@copy_sig(ray.data.from_items)
def from_items(*args, **kwargs) -> Dataset:
return Dataset(ray.data.from_items(*args, **kwargs))
[docs]
@copy_sig(ray.data.range)
def from_range(*args, **kwargs) -> Dataset:
return Dataset(ray.data.range(*args, **kwargs))
[docs]
@copy_sig(ray.data.range_tensor)
def from_range_tensor(*args, **kwargs) -> Dataset:
return Dataset(ray.data.range_tensor(*args, **kwargs))
[docs]
@copy_sig(ray.data.from_pandas)
def from_pandas(*args, **kwargs) -> Dataset:
return Dataset(ray.data.from_pandas(*args, **kwargs))
[docs]
@copy_sig(ray.data.from_numpy)
def from_numpy(*args, **kwargs) -> Dataset:
return Dataset(ray.data.from_numpy(*args, **kwargs))
[docs]
@copy_sig(ray.data.read_json)
def read_json(*args, **kwargs) -> Dataset:
return Dataset(ray.data.read_json(*args, **kwargs))
[docs]
@copy_sig(ray.data.read_audio)
def read_audio(*args, **kwargs) -> Dataset:
return Dataset(ray.data.read_audio(*args, **kwargs))
[docs]
@copy_sig(ray.data.read_videos)
def read_video(*args, **kwargs) -> Dataset:
return Dataset(ray.data.read_videos(*args, **kwargs))
[docs]
@copy_sig(ray.data.read_images)
def read_image(*args, **kwargs) -> Dataset:
return Dataset(ray.data.read_images(*args, **kwargs))
[docs]
def read_parquet(
paths: str | list[str],
*,
filesystem: pyarrow.fs.FileSystem | None = None,
columns: list[str] | None = None,
parallelism: int = -1,
num_cpus: float | None = None,
num_gpus: float | None = None,
memory: float | None = None,
ray_remote_args: dict[str, Any] | None = None,
tensor_column_schema: dict[str, tuple[np.dtype, tuple[int, ...]]] | None = None,
partition_filter: PathPartitionFilter | None = None,
partitioning: Partitioning = Partitioning(PartitionStyle.HIVE),
shuffle: Literal["files"] | FileShuffleConfig | None = None,
include_paths: bool = False,
file_extensions: list[str] | None = None,
concurrency: int | None = None,
override_num_blocks: int | None = None,
dynamic_uid: str | None = None,
dynamic_uid_start: int = 0,
**arrow_parquet_args,
) -> Dataset:
"""Creates a :class:`~xpark.dataset.Dataset` from parquet files.
Examples:
Read a file in remote storage.
>>> import xpark
>>> ds = xpark.dataset.read_parquet("s3://anonymous@xpark-example-data/iris.parquet")
>>> ds.schema()
Column Type
------ ----
sepal.length double
sepal.width double
petal.length double
petal.width double
variety string
Read a directory in remote storage.
>>> ds = xpark.dataset.read_parquet("s3://anonymous@xpark-example-data/iris-parquet/")
Read multiple local files.
>>> xpark.dataset.read_parquet(
... ["local:///path/to/file1", "local:///path/to/file2"]) # doctest: +SKIP
Specify a schema for the parquet file.
>>> import pyarrow as pa
>>> fields = [("sepal.length", pa.float32()),
... ("sepal.width", pa.float32()),
... ("petal.length", pa.float32()),
... ("petal.width", pa.float32()),
... ("variety", pa.string())]
>>> ds = xpark.dataset.read_parquet("s3://anonymous@xpark-example-data/iris.parquet",
... schema=pa.schema(fields))
>>> ds.schema()
Column Type
------ ----
sepal.length float
sepal.width float
petal.length float
petal.width float
variety string
The Parquet reader also supports projection and filter pushdown, allowing column
selection and row filtering to be pushed down to the file scan.
.. testcode::
import pyarrow as pa
# Create a Dataset by reading a Parquet file, pushing column selection and
# row filtering down to the file scan.
ds = xpark.dataset.read_parquet(
"s3://anonymous@xpark-example-data/iris.parquet",
columns=["sepal.length", "variety"],
filter=pa.dataset.field("sepal.length") > 5.0,
)
ds.show(2)
.. testoutput::
{'sepal.length': 5.1, 'variety': 'Setosa'}
{'sepal.length': 5.4, 'variety': 'Setosa'}
For further arguments you can pass to PyArrow as a keyword argument, see the
`PyArrow API reference <https://arrow.apache.org/docs/python/generated/\
pyarrow.dataset.Scanner.html#pyarrow.dataset.Scanner.from_fragment>`_.
Args:
paths: A single file path or directory, or a list of file paths. Multiple
directories are not supported.
filesystem: The PyArrow filesystem
implementation to read from. These filesystems are specified in the
`pyarrow docs <https://arrow.apache.org/docs/python/api/\
filesystems.html#filesystem-implementations>`_. Specify this parameter if
you need to provide specific configurations to the filesystem. By default,
the filesystem is automatically selected based on the scheme of the paths.
For example, if the path begins with ``s3://``, the ``S3FileSystem`` is
used. If ``None``, this function uses a system-chosen implementation.
columns: A list of column names to read. Only the specified columns are
read during the file scan.
parallelism: This argument is deprecated. Use ``override_num_blocks`` argument.
num_cpus: The number of CPUs to reserve for each parallel read worker.
num_gpus: The number of GPUs to reserve for each parallel read worker. For
example, specify `num_gpus=1` to request 1 GPU for each parallel read
worker.
memory: The heap memory in bytes to reserve for each parallel read worker.
ray_remote_args: kwargs passed to :func:`ray.remote` in the read tasks.
tensor_column_schema: A dict of column name to PyArrow dtype and shape
mappings for converting a Parquet column containing serialized
tensors (ndarrays) as their elements to PyArrow tensors. This function
assumes that the tensors are serialized in the raw
NumPy array format in C-contiguous order (e.g., via
`arr.tobytes()`).
partition_filter: A
:class:`~xpark.dataset.datasource.partitioning.PathPartitionFilter`. Use
with a custom callback to read only selected partitions of a dataset.
partitioning: A :class:`~xpark.dataset.datasource.partitioning.Partitioning` object
that describes how paths are organized. Defaults to HIVE partitioning.
shuffle: If setting to "files", randomly shuffle input files order before read.
If setting to :class:`~xpark.dataset.FileShuffleConfig`, you can pass a seed to
shuffle the input files. Defaults to not shuffle with ``None``.
arrow_parquet_args: Other parquet read options to pass to PyArrow. For the full
set of arguments, see the `PyArrow API <https://arrow.apache.org/docs/\
python/generated/pyarrow.dataset.Scanner.html\
#pyarrow.dataset.Scanner.from_fragment>`_
include_paths: If ``True``, include the path to each file. File paths are
stored in the ``'path'`` column.
file_extensions: A list of file extensions to filter files by.
concurrency: The maximum number of Xpark tasks to run concurrently. Set this
to control number of tasks to run concurrently. This doesn't change the
total number of tasks run or the total number of output blocks. By default,
concurrency is dynamically decided based on the available resources.
override_num_blocks: Override the number of output blocks from all read tasks.
By default, the number of output blocks is dynamically decided based on
input data size and available resources. You shouldn't manually set this
value in most cases.
dynamic_uid: If specified, a column with this name will be added to the dataset
containing a contiguous integer unique identifier (int64) for each row, starting
from 0. The UIDs are assigned based on file order and row position, ensuring that
the same data will always produce the same UIDs across multiple reads. The UIDs
will range from dynamic_uid_start to (dynamic_uid_start + total_rows - 1).
dynamic_uid_start: Only available if dynamic_uid is specified. This is the first
uid generated for this dataset.
Returns:
:class:`~xpark.dataset.Dataset` producing records read from the specified parquet
files.
"""
_validate_shuffle_arg(shuffle)
arrow_parquet_args = _resolve_parquet_args(
tensor_column_schema,
**arrow_parquet_args,
)
dataset_kwargs = arrow_parquet_args.pop("dataset_kwargs", None)
_block_udf = arrow_parquet_args.pop("_block_udf", None)
schema = arrow_parquet_args.pop("schema", None)
datasource = ParquetDatasource(
paths,
columns=columns,
dataset_kwargs=dataset_kwargs,
to_batch_kwargs=arrow_parquet_args,
dynamic_uid=dynamic_uid,
dynamic_uid_start=dynamic_uid_start,
_block_udf=_block_udf,
filesystem=filesystem,
schema=schema,
partition_filter=partition_filter,
partitioning=partitioning,
shuffle=shuffle,
include_paths=include_paths,
file_extensions=file_extensions,
)
return Dataset(
read_datasource(
datasource,
num_cpus=num_cpus,
num_gpus=num_gpus,
memory=memory,
parallelism=parallelism,
ray_remote_args=ray_remote_args,
concurrency=concurrency,
override_num_blocks=override_num_blocks,
)
)
[docs]
@copy_sig(ray.data.read_iceberg)
def read_iceberg(*args, **kwargs) -> Dataset:
return Dataset(ray.data.read_iceberg(*args, **kwargs))
[docs]
@copy_sig(ray.data.read_lance)
def read_lance(*args, **kwargs) -> Dataset:
return Dataset(ray.data.read_lance(*args, **kwargs))
[docs]
@copy_sig(ray.data.read_binary_files)
def read_files(*args, **kwargs) -> Dataset:
return Dataset(ray.data.read_binary_files(*args, **kwargs))
[docs]
def read_lerobot(
path: str,
*,
episodes: list[int] | None = None,
columns: list[str] | None = None,
include_video_paths: bool = True,
decode_video: bool = False,
parallelism: int = -1,
num_cpus: float | None = None,
num_gpus: float | None = None,
memory: float | None = None,
ray_remote_args: dict[str, Any] | None = None,
concurrency: int | None = None,
override_num_blocks: int | None = None,
) -> Dataset:
"""Creates a :class:`~xpark.dataset.Dataset` from a LeRobot format dataset.
LeRobot is a framework for robot learning that stores datasets in a specific
format with Parquet files for tabular data and MP4 videos for visual
observations. This function reads LeRobot datasets from either HuggingFace
Hub or local storage.
Dataset Structure:
LeRobot datasets are structured as::
dataset_root/
├── meta/
│ ├── info.json # Dataset metadata (fps, features, etc.)
│ ├── stats.json # Statistics for normalization
│ ├── episodes.jsonl # Episode metadata
│ └── tasks.jsonl # Task definitions
├── data/
│ └── chunk-000/
│ ├── episode_000000.parquet
│ └── ...
└── videos/
└── observation.images.top/
└── chunk-000/
├── episode_000000.mp4
└── ...
Examples:
Read a dataset from HuggingFace Hub:
>>> from xpark.dataset import read_lerobot
>>> ds = read_lerobot("hf://datasets/lerobot/pusht")
>>> ds.schema()
Column Type
------ ----
observation.state list<float32>
action list<float32>
episode_index int64
frame_index int64
timestamp float32
observation.images.top_path string
Read specific episodes:
>>> ds = read_lerobot("hf://datasets/lerobot/pusht", episodes=[0, 1, 2])
Read specific columns:
>>> ds = read_lerobot(
... "hf://datasets/lerobot/pusht",
... columns=["observation.state", "action", "episode_index"]
... )
Read from local path:
>>> ds = xd.read_lerobot("/path/to/local/dataset")
Read with video decoding:
>>> ds = xd.read_lerobot("hf://datasets/lerobot/pusht", decode_video=True)
Read from COS:
>>> ds = xd.read_lerobot("cos://bucket/path/to/dataset")
Args:
path: Dataset path. Can be:
- Local filesystem path (e.g., "/path/to/dataset")
- Remote path with protocol prefix:
- HuggingFace: "hf://datasets/lerobot/pusht"
- COS: "cos://bucket/path"
- S3: "s3://bucket/path"
episodes: List of episode indices to load. If None, loads all episodes.
This is useful for loading a subset of data for debugging or
validation splits.
columns: List of column names to read. If None, reads all columns.
Common columns include "observation.state", "action",
"episode_index", "frame_index", "timestamp".
include_video_paths: If True, adds columns with video file paths for
each video key in the dataset (e.g., "observation.images.top_path").
These paths can be used with video decoding processors.
decode_video: If True, decode video frames into tensors (numpy arrays).
Requires PyAV and PyTorch. When enabled, video columns will contain
decoded frame data instead of path/timestamp references.
parallelism: The requested parallelism of the read. Parallelism may be
limited by the number of files in the dataset. Defaults to -1,
which automatically determines the optimal parallelism.
num_cpus: The number of CPUs to reserve for each parallel read task.
num_gpus: The number of GPUs to reserve for each parallel read task.
memory: The heap memory in bytes to reserve for each parallel read task.
ray_remote_args: Additional kwargs passed to :func:`ray.remote` for
the read tasks.
concurrency: The maximum number of concurrent read tasks.
override_num_blocks: Override the number of output blocks.
Returns:
:class:`~xpark.dataset.Dataset` containing the LeRobot dataset records.
Raises:
FileNotFoundError: If the dataset cannot be found locally and cannot
be downloaded from HuggingFace Hub.
ImportError: If huggingface_hub is not installed when trying to download
a dataset from HuggingFace Hub.
See Also:
- LeRobot documentation: https://github.com/huggingface/lerobot
"""
from xpark.dataset.datasource.lerobot_datasource import LeRobotDatasource
# Always read with decode_video=False in datasource
# Video decoding is applied via with_column after dataset construction
datasource = LeRobotDatasource(
path=path,
episodes=episodes,
columns=columns,
include_video_paths=include_video_paths or decode_video,
)
ds = Dataset(
read_datasource(
datasource,
parallelism=parallelism,
num_cpus=num_cpus,
num_gpus=num_gpus,
memory=memory,
ray_remote_args=ray_remote_args,
concurrency=concurrency,
override_num_blocks=override_num_blocks,
)
)
# Apply video decoding via with_column if requested
# This is more efficient than decoding inside read_fn
if decode_video and datasource.metadata.video_keys:
from xpark.dataset.expressions import col
from xpark.dataset.processors.video_compute import VideoCompute
for video_key in datasource.metadata.video_keys:
path_col = f"{video_key}_path"
from_ts_col = f"{video_key}_from_timestamp"
# Decode video frame using VideoCompute.decode_frame_at
# Add from_timestamp offset to get correct position in video file
# This is needed for LeRobot v3.0+ where multiple episodes
# are stored sequentially in a single video file
ds = ds.with_column(
video_key,
VideoCompute.decode_frame_at(
col(path_col),
col("timestamp") + col(from_ts_col),
),
)
return ds