xpark.dataset.Dataset#

class xpark.dataset.Dataset(ray_dataset: Dataset)[source]#

Construct a Dataset from a ray.data.Dataset. In general, you do not need to manually call this constructor. Please use the Read API to construct it.

Parameters:

ray_dataset – An instance of ray.data.Dataset.

Methods

aggregate(*aggs)

Aggregate values using one or more functions.

columns([fetch_if_missing])

Returns the columns of this Dataset.

copy([deep_copy])

Copy the Dataset.

count()

Count the number of rows in the dataset.

drop_columns(cols, *[, compute, concurrency])

Drop one or more columns from the dataset.

filter(expr[, compute])

Filter out rows that don't satisfy the given predicate.

groupby(key[, num_partitions])

Group rows of a Dataset according to a column.

input_files()

Return the list of input files for the dataset.

iter_batches(*[, prefetch_batches, ...])

Return an iterable over batches of data.

limit(limit)

Truncate the dataset to the first limit rows.

map(fn, *[, compute, fn_args, fn_kwargs, ...])

Apply the given function to each row of this dataset.

map_batches(fn, *[, batch_size, compute, ...])

Apply the given function to batches of data.

materialize()

Execute and materialize this dataset into object store memory.

max([on, ignore_nulls])

Return the maximum of one or more columns.

mean([on, ignore_nulls])

Compute the mean of one or more columns.

min([on, ignore_nulls])

Return the minimum of one or more columns.

repartition([num_blocks, ...])

Repartition the Dataset into exactly this number of blocks.

schema([fetch_if_missing])

Return the schema of the dataset.

select_columns(cols, *[, compute, concurrency])

Select one or more columns from the dataset.

show([limit])

Print up to the given number of rows from the Dataset.

size_bytes()

Return the in-memory size of the dataset.

sort(key[, descending, boundaries])

Sort the dataset by the specified key column or key function.

std([on, ddof, ignore_nulls])

Compute the standard deviation of one or more columns.

sum([on, ignore_nulls])

Compute the sum of one or more columns.

take([limit])

Return up to limit rows from the Dataset.

take_all([limit])

Return all of the rows in this Dataset.

take_batch([batch_size, batch_format])

Return up to batch_size rows from the Dataset in a batch.

to_arrow_refs()

Convert this Dataset into a distributed set of PyArrow tables.

to_pandas([limit])

Convert this Dataset to a single pandas DataFrame.

unique(column[, ignore_nulls])

List the unique elements in a given column.

with_column(column_name, expr, **ray_remote_args)

Add a new column to the dataset via an expression.

write_csv(path, *[, filesystem, ...])

Writes the Dataset to CSV files.

write_iceberg(table_identifier[, ...])

Writes the Dataset to an Iceberg table.

write_json(path, *[, filesystem, ...])

Writes the Dataset to JSON and JSONL files.

write_lance(path, *[, schema, mode, ...])

Write the dataset to a Lance dataset.

write_numpy(path, *, column[, filesystem, ...])

Writes a column of the Dataset to .npy files.

write_parquet(path, *[, partition_cols, ...])

Writes the Dataset to parquet files under the provided path.

aggregate(*aggs: AggregateFn) Any | dict[str, Any][source]#

Aggregate values using one or more functions.

Use this method to compute metrics like the product of a column.

Note

This operation will trigger execution of the lazy transformations performed on this dataset.

Note

This operation requires all inputs to be materialized in object store for it to execute.

Examples

import xpark
from xpark.dataset.aggregate import AggregateFn

ds = xpark.dataset.from_items([{"number": i} for i in range(1, 10)])
aggregation = AggregateFn(
    init=lambda column: 1,
    # Apply this to each row to produce a partial aggregate result
    accumulate_row=lambda a, row: a * row["number"],
    # Apply this to merge partial aggregate results into a final result
    merge=lambda a1, a2: a1 * a2,
    name="prod"
)
print(ds.aggregate(aggregation))
{'prod': 362880}

Time complexity: O(dataset size / parallelism)

Parameters:

*aggsAggregations to perform.

Returns:

A dict where each each value is an aggregation for a given column.

columns(fetch_if_missing: bool = True) list[str] | None[source]#

Returns the columns of this Dataset.

Note

If this dataset consists of more than a read, or if the schema can’t be determined from the metadata provided by the datasource, or if fetch_if_missing=True (the default), then this operation will trigger execution of the lazy transformations performed on this dataset.

Time complexity: O(1)

Example

>>> import xpark
>>> # Create dataset from synthetic data.
>>> ds = xpark.dataset.from_range(1000)
>>> ds.columns()
['id']
Parameters:

fetch_if_missing – If True, synchronously fetch the column names from the schema if it’s not known. If False, None is returned if the schema is not known. Default is True.

Returns:

A list of the column names for this Dataset or None if schema is not known and fetch_if_missing is False.

copy(deep_copy: bool = False) Dataset[source]#

Copy the Dataset.

count() int[source]#

Count the number of rows in the dataset.

For Datasets which only read Parquet files (created with read_parquet()), this method reads the file metadata to efficiently count the number of rows without reading in the entire data.

Note

If this dataset consists of more than a read, or if the row count can’t be determined from the metadata provided by the datasource, then this operation will trigger execution of the lazy transformations performed on this dataset.

Examples

>>> import xpark
>>> ds = xpark.dataset.from_range(10)
>>> ds.count()
10
Returns:

The number of records in the dataset.

drop_columns(cols: List[str], *, compute: str | None = None, concurrency: int | None = None, **ray_remote_args) Dataset[source]#

Drop one or more columns from the dataset.

Examples

>>> import xpark
>>> ds = xpark.dataset.read_parquet("s3://anonymous@xpark-example-data/iris.parquet")
>>> ds.schema()
Column        Type
------        ----
sepal.length  double
sepal.width   double
petal.length  double
petal.width   double
variety       string
>>> ds.drop_columns(["variety"]).schema()
Column        Type
------        ----
sepal.length  double
sepal.width   double
petal.length  double
petal.width   double

Time complexity: O(dataset size / parallelism)

Parameters:
  • cols – Names of the columns to drop. If any name does not exist, an exception is raised. Column names must be unique.

  • compute – This argument is deprecated. Use concurrency argument.

  • concurrency – The maximum number of Xpark workers to use concurrently.

  • ray_remote_args – Additional resource requirements to request from Xpark (e.g., num_gpus=1 to request GPUs for the map tasks). See ray.remote() for details.

filter(expr: Expr | DedupOp, compute: ComputeStrategy | None = None, **ray_remote_args) Dataset[source]#

Filter out rows that don’t satisfy the given predicate.

You can use either a function or a callable class or an expression to perform the transformation. For functions, Ray Data uses stateless Ray tasks. For classes, Ray Data uses stateful Ray actors. For more information, see Stateful Transforms.

Examples

>>> from xpark.dataset import from_range
>>> from xpark.dataset.expressions import col
>>> ds = from_range(100)
>>> # Using predicate expressions (preferred)
>>> ds.filter(expr=(col("id") > 10) & (col("id") < 20)).take_all()
[{'id': 11}, {'id': 12}, {'id': 13}, {'id': 14}, {'id': 15}, {'id': 16}, {'id': 17}, {'id': 18}, {'id': 19}]

Time complexity: O(dataset size / parallelism)

Parameters:
  • expr – An expression that represents a predicate (boolean condition) for filtering. Can be either a predicate expression from ray.data.expressions or a xpark filter.

  • compute

    The compute strategy to use for the map operation.

    • If compute is not specified for a function, will use ray.data.TaskPoolStrategy() to launch concurrent tasks based on the available resources and number of input blocks.

    • Use ray.data.TaskPoolStrategy(size=n) to launch at most n concurrent Ray tasks.

    • If compute is not specified for a callable class, will use ray.data.ActorPoolStrategy(min_size=1, max_size=None) to launch an autoscaling actor pool from 1 to unlimited workers.

    • Use ray.data.ActorPoolStrategy(size=n) to use a fixed size actor pool of n workers.

    • Use ray.data.ActorPoolStrategy(min_size=m, max_size=n) to use an autoscaling actor pool from m to n workers.

    • Use ray.data.ActorPoolStrategy(min_size=m, max_size=n, initial_size=initial) to use an autoscaling actor pool from m to n workers, with an initial size of initial.

  • ray_remote_args – Additional resource requirements to request from Ray (e.g., num_gpus=1 to request GPUs for the map tasks). See ray.remote() for details.

groupby(key: str | List[str] | None, num_partitions: int | None = None) GroupedData[source]#

Group rows of a Dataset according to a column.

Use this method to transform data based on a categorical variable.

Note

This operation requires all inputs to be materialized in object store for it to execute.

Examples

import pandas as pd
import xpark

def normalize_variety(group: pd.DataFrame) -> pd.DataFrame:
    for feature in group.drop(columns=["variety"]).columns:
        group[feature] = group[feature] / group[feature].abs().max()
    return group

ds = (
    xpark.dataset.read_parquet("s3://anonymous@xpark-example-data/iris.parquet")
    .groupby("variety")
    .map_groups(normalize_variety, batch_format="pandas")
)

Time complexity: O(dataset size * log(dataset size / parallelism))

Parameters:
  • key – A column name or list of column names. If this is None, place all rows in a single group.

  • num_partitions – Number of partitions data will be partitioned into (only relevant if hash-shuffling strategy is used). When not set defaults to DataContext.min_parallelism.

Returns:

A lazy GroupedData.

See also

map_groups()

Call this method to transform groups of data.

input_files() list[str][source]#

Return the list of input files for the dataset.

Note

This operation will trigger execution of the lazy transformations performed on this dataset.

Examples

>>> import xpark
>>> ds = xpark.dataset.read_csv("s3://anonymous@xpark-example-data/iris.csv")
>>> ds.input_files()
['xpark-example-data/iris.csv']
Returns:

The list of input files used to create the dataset, or an empty list if the input files is not known.

iter_batches(*, prefetch_batches: int = 1, batch_size: int | None = 256, batch_format: str | None = 'default', drop_last: bool = False, local_shuffle_buffer_size: int | None = None, local_shuffle_seed: int | None = None, _collate_fn: Callable[[pyarrow.Table | pandas.DataFrame | Dict[str, ndarray]], CollatedData] | None = None) Iterable[pyarrow.Table | pandas.DataFrame | Dict[str, ndarray]][source]#

Return an iterable over batches of data.

This method is useful for model training.

Note

This operation will trigger execution of the lazy transformations performed on this dataset.

Examples

import xpark

ds = xpark.dataset.read_images("example://image-datasets/simple")

for batch in ds.iter_batches(batch_size=2, batch_format="numpy"):
    print(batch)
{'image': array([[[[...]]]], dtype=uint8)}
...
{'image': array([[[[...]]]], dtype=uint8)}

Time complexity: O(1)

Parameters:
  • prefetch_batches – The number of batches to fetch ahead of the current batch to fetch. If set to greater than 0, a separate threadpool is used to fetch the objects to the local node and format the batches. Defaults to 1.

  • batch_size – The number of rows in each batch, or None to use entire blocks as batches (blocks may contain different numbers of rows). The final batch may include fewer than batch_size rows if drop_last is False. Defaults to 256.

  • batch_format – If "default" or "numpy", batches are Dict[str, numpy.ndarray]. If "pandas", batches are pandas.DataFrame.

  • drop_last – Whether to drop the last batch if it’s incomplete.

  • local_shuffle_buffer_size – If not None, the data is randomly shuffled using a local in-memory shuffle buffer, and this value serves as the minimum number of rows that must be in the local in-memory shuffle buffer in order to yield a batch. When there are no more rows to add to the buffer, the remaining rows in the buffer are drained.

  • local_shuffle_seed – The seed to use for the local random shuffle.

Returns:

An iterable over batches of data.

limit(limit: int) Dataset[source]#

Truncate the dataset to the first limit rows.

Unlike take(), this method doesn’t move data to the caller’s machine. Instead, it returns a new Dataset pointing to the truncated distributed data.

Examples

>>> import xpark
>>> ds = xpark.dataset.from_range(1000)
>>> ds.limit(5).count()
5

Time complexity: O(limit specified)

Parameters:

limit – The size of the dataset to truncate to.

Returns:

The truncated dataset.

map(fn: Callable[[Dict[str, Any]], Dict[str, Any]], *, compute: ComputeStrategy | None = None, fn_args: Iterable[Any] | None = None, fn_kwargs: Dict[str, Any] | None = None, fn_constructor_args: Iterable[Any] | None = None, fn_constructor_kwargs: Dict[str, Any] | None = None, num_cpus: float | None = None, num_gpus: float | None = None, memory: float | None = None, concurrency: int | Tuple[int, int] | Tuple[int, int, int] | None = None, ray_remote_args_fn: Callable[[], Dict[str, Any]] | None = None, **ray_remote_args) Dataset[source]#

Apply the given function to each row of this dataset.

Use this method to transform your data. To learn more, see Transforming rows.

You can use either a function or a callable class to perform the transformation. For functions, Xpark Dataset uses stateless Xpark tasks. For classes, Xpark Dataset uses stateful Xpark actors. For more information, see Stateful Transforms.

Tip

If your transformation is vectorized like most NumPy or pandas operations, map_batches() might be faster.

Warning

Specifying both num_cpus and num_gpus for map tasks is experimental, and may result in scheduling or stability issues. Please report any issues to the Xpark team.

Examples

import os
from typing import Any, Dict
import xpark

def parse_filename(row: Dict[str, Any]) -> Dict[str, Any]:
    row["filename"] = os.path.basename(row["path"])
    return row

ds = (
    xpark.dataset.read_images("s3://anonymous@xpark-example-data/image-datasets/simple", include_paths=True)
    .map(parse_filename)
)
print(ds.schema())
Column    Type
------    ----
image     ArrowTensorTypeV2(shape=(32, 32, 3), dtype=uint8)
path      string
filename  string

Time complexity: O(dataset size / parallelism)

Parameters:
  • fn – The function to apply to each row, or a class type that can be instantiated to create such a callable.

  • compute

    The compute strategy to use for the map operation.

    • If compute is not specified for a function, will use xpark.dataset.TaskPoolStrategy() to launch concurrent tasks based on the available resources and number of input blocks.

    • Use xpark.dataset.TaskPoolStrategy(size=n) to launch at most n concurrent Xpark tasks.

    • If compute is not specified for a callable class, will use xpark.dataset.ActorPoolStrategy(min_size=1, max_size=None) to launch an autoscaling actor pool from 1 to unlimited workers.

    • Use xpark.dataset.ActorPoolStrategy(size=n) to use a fixed size actor pool of n workers.

    • Use xpark.dataset.ActorPoolStrategy(min_size=m, max_size=n) to use an autoscaling actor pool from m to n workers.

    • Use xpark.dataset.ActorPoolStrategy(min_size=m, max_size=n, initial_size=initial) to use an autoscaling actor pool from m to n workers, with an initial size of initial.

  • fn_args – Positional arguments to pass to fn after the first argument. These arguments are top-level arguments to the underlying Xpark task.

  • fn_kwargs – Keyword arguments to pass to fn. These arguments are top-level arguments to the underlying Xpark task.

  • fn_constructor_args – Positional arguments to pass to fn’s constructor. You can only provide this if fn is a callable class. These arguments are top-level arguments in the underlying Xpark actor construction task.

  • fn_constructor_kwargs – Keyword arguments to pass to fn’s constructor. This can only be provided if fn is a callable class. These arguments are top-level arguments in the underlying Xpark actor construction task.

  • num_cpus – The number of CPUs to reserve for each parallel map worker.

  • num_gpus – The number of GPUs to reserve for each parallel map worker. For example, specify num_gpus=1 to request 1 GPU for each parallel map worker.

  • memory – The heap memory in bytes to reserve for each parallel map worker.

  • concurrency – This argument is deprecated. Use compute argument.

  • ray_remote_args_fn – A function that returns a dictionary of remote args passed to each map worker. The purpose of this argument is to generate dynamic arguments for each actor/task, and will be called each time prior to initializing the worker. Args returned from this dict will always override the args in ray_remote_args. Note: this is an advanced, experimental feature.

  • ray_remote_args – Additional resource requirements to request from Xpark for each map worker. See ray.remote() for details.

See also

flat_map()

Call this method to create new rows from existing ones. Unlike map(), a function passed to flat_map() can return multiple rows.

map_batches()

Call this method to transform batches of data.

map_batches(fn: Callable[[pyarrow.Table | pandas.DataFrame | Dict[str, ndarray]], pyarrow.Table | pandas.DataFrame | Dict[str, ndarray]] | Callable[[pyarrow.Table | pandas.DataFrame | Dict[str, ndarray]], Iterator[pyarrow.Table | pandas.DataFrame | Dict[str, ndarray]]] | type[_CallableClassProtocol], *, batch_size: int | None | Literal['default'] = None, compute: ComputeStrategy | None = None, batch_format: str | None = 'default', zero_copy_batch: bool = True, fn_args: Iterable[Any] | None = None, fn_kwargs: Dict[str, Any] | None = None, fn_constructor_args: Iterable[Any] | None = None, fn_constructor_kwargs: Dict[str, Any] | None = None, num_cpus: float | None = None, num_gpus: float | None = None, memory: float | None = None, concurrency: int | Tuple[int, int] | Tuple[int, int, int] | None = None, udf_modifying_row_count: bool = False, ray_remote_args_fn: Callable[[], Dict[str, Any]] | None = None, **ray_remote_args) Dataset[source]#

Apply the given function to batches of data.

This method is useful for preprocessing data and performing inference. To learn more, see Transforming batches.

You can use either a function or a callable class to perform the transformation. For functions, Xpark Dataset uses stateless Xpark tasks. For classes, Xpark Dataset uses stateful Xpark actors. For more information, see Stateful Transforms.

Tip

To understand the format of the input to fn, call take_batch() on the dataset to get a batch in the same format as will be passed to fn.

Note

fn should generally avoid modifying data buffers behind its input since these could be zero-copy views into the underlying object residing inside Xpark’s Object Store.

To perform any modifications it’s recommended to copy the data you want to modify.

In rare cases when you can’t copy inside your UDF, you can instead specify zero_copy_batch=False and then Xpark Dataset will copy the whole batch for you, providing fn with a copy rather than a zero-copy view.

Warning

Specifying both num_cpus and num_gpus for map tasks is experimental, and may result in scheduling or stability issues. Please report any issues to the Xpark team.

Examples

Call map_batches() to transform your data.

from typing import Dict
import numpy as np
import xpark

def add_dog_years(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
    batch["age_in_dog_years"] = 7 * batch["age"]
    return batch

ds = (
    xpark.dataset.from_items([
        {"name": "Luna", "age": 4},
        {"name": "Rory", "age": 14},
        {"name": "Scout", "age": 9},
    ])
    .map_batches(add_dog_years)
)
ds.show()
{'name': 'Luna', 'age': 4, 'age_in_dog_years': 28}
{'name': 'Rory', 'age': 14, 'age_in_dog_years': 98}
{'name': 'Scout', 'age': 9, 'age_in_dog_years': 63}

If your function returns large objects, yield outputs in chunks.

from typing import Dict
import xpark
import numpy as np

def map_fn_with_large_output(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
    for i in range(3):
        yield {"large_output": np.ones((100, 1000))}

ds = (
    xpark.dataset.from_items([1])
    .map_batches(map_fn_with_large_output)
)

If you require stateful transformation, use Python callable class. Here is an example showing how to use stateful transforms to create model inference workers, without having to reload the model on each call.

from typing import Dict
import numpy as np
import torch
import xpark

class TorchPredictor:

    def __init__(self):
        self.model = torch.nn.Identity().cuda()
        self.model.eval()

    def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
        inputs = torch.as_tensor(batch["data"], dtype=torch.float32).cuda()
        with torch.inference_mode():
            batch["output"] = self.model(inputs).detach().cpu().numpy()
        return batch

ds = (
    xpark.dataset.from_numpy(np.ones((32, 100)))
    .map_batches(
        TorchPredictor,
        # Two workers with one GPU each
        compute=xpark.dataset.ActorPoolStrategy(size=2),
        # Batch size is required if you're using GPUs.
        batch_size=4,
        num_gpus=1
    )
)

To learn more, see End-to-end: Offline Batch Inference.

Parameters:
  • fn – The function or generator to apply to a record batch, or a class type that can be instantiated to create such a callable. Note fn must be pickle-able.

  • batch_size – The desired number of rows in each batch, or None to use entire blocks as batches (blocks may contain different numbers of rows). The actual size of the batch provided to fn may be smaller than batch_size if batch_size doesn’t evenly divide the block(s) sent to a given map task. Default batch_size is None.

  • compute

    The compute strategy to use for the map operation.

    • If compute is not specified for a function, will use xpark.dataset.TaskPoolStrategy() to launch concurrent tasks based on the available resources and number of input blocks.

    • Use xpark.dataset.TaskPoolStrategy(size=n) to launch at most n concurrent Xpark tasks.

    • If compute is not specified for a callable class, will use xpark.dataset.ActorPoolStrategy(min_size=1, max_size=None) to launch an autoscaling actor pool from 1 to unlimited workers.

    • Use xpark.dataset.ActorPoolStrategy(size=n) to use a fixed size actor pool of n workers.

    • Use xpark.dataset.ActorPoolStrategy(min_size=m, max_size=n) to use an autoscaling actor pool from m to n workers.

    • Use xpark.dataset.ActorPoolStrategy(min_size=m, max_size=n, initial_size=initial) to use an autoscaling actor pool from m to n workers, with an initial size of initial.

  • batch_format – If "default" or "numpy", batches are Dict[str, numpy.ndarray]. If "pandas", batches are pandas.DataFrame. If "pyarrow", batches are pyarrow.Table. If batch_format is set to None input block format will be used.

  • zero_copy_batch – Whether fn should be provided zero-copy, read-only batches. If this is True and no copy is required for the batch_format conversion, the batch is a zero-copy, read-only view on data in Xpark’s object store, which can decrease memory utilization and improve performance. Setting this to False, will make a copy of the whole batch, therefore allowing UDF to modify underlying data buffers (like tensors, binary arrays, etc) in place. It’s recommended to copy only the data you need to modify instead of resorting to copying the whole batch.

  • fn_args – Positional arguments to pass to fn after the first argument. These arguments are top-level arguments to the underlying Xpark task.

  • fn_kwargs – Keyword arguments to pass to fn. These arguments are top-level arguments to the underlying Xpark task.

  • fn_constructor_args – Positional arguments to pass to fn’s constructor. You can only provide this if fn is a callable class. These arguments are top-level arguments in the underlying Xpark actor construction task.

  • fn_constructor_kwargs – Keyword arguments to pass to fn’s constructor. This can only be provided if fn is a callable class. These arguments are top-level arguments in the underlying Xpark actor construction task.

  • num_cpus – The number of CPUs to reserve for each parallel map worker.

  • num_gpus – The number of GPUs to reserve for each parallel map worker. For example, specify num_gpus=1 to request 1 GPU for each parallel map worker.

  • memory – The heap memory in bytes to reserve for each parallel map worker.

  • concurrency – This argument is deprecated. Use compute argument.

  • udf_modifying_row_count – Set to True if the UDF may modify the number of rows it receives so the limit pushdown optimization will not be applied.

  • ray_remote_args_fn – A function that returns a dictionary of remote args passed to each map worker. The purpose of this argument is to generate dynamic arguments for each actor/task, and will be called each time prior to initializing the worker. Args returned from this dict will always override the args in ray_remote_args. Note: this is an advanced, experimental feature.

  • ray_remote_args – Additional resource requirements to request from Xpark for each map worker. See ray.remote() for details.

Note

The size of the batches provided to fn might be smaller than the specified batch_size if batch_size doesn’t evenly divide the block(s) sent to a given map task.

If batch_size is set and each input block is smaller than the batch_size, Xpark Dataset will bundle up many blocks as the input for one task, until their total size is equal to or greater than the given batch_size. If batch_size is not set, the bundling will not be performed. Each task will receive entire input block as a batch.

See also

iter_batches()

Call this function to iterate over batches of data.

take_batch()

Call this function to get a batch of data from the dataset in the same format as will be passed to the fn function of map_batches().

flat_map()

Call this method to create new records from existing ones. Unlike map(), a function passed to flat_map() can return multiple records.

map()

Call this method to transform one record at time.

materialize() Dataset[source]#

Execute and materialize this dataset into object store memory.

Note

This operation will trigger execution of the lazy transformations performed on this dataset.

This can be used to read all blocks into memory. By default, Dataset doesn’t read blocks from the datasource until the first transform.

Note that this does not mutate the original Dataset. Only the blocks of the returned Dataset class are pinned in memory.

Examples

>>> import xpark
>>> ds = xpark.dataset.from_range(10)
>>> materialized_ds = ds.materialize()
>>> materialized_ds
Dataset(num_blocks=..., num_rows=10, schema={id: int64})
Returns:

A Dataset holding the materialized data blocks.

max(on: str | List[str] | None = None, ignore_nulls: bool = True) Any | dict[str, Any][source]#

Return the maximum of one or more columns.

Note

This operation will trigger execution of the lazy transformations performed on this dataset.

Note

This operation requires all inputs to be materialized in object store for it to execute.

Examples

>>> import xpark
>>> xpark.dataset.from_range(100).max("id")
99
>>> xpark.dataset.from_items([
...     {"A": i, "B": i**2}
...     for i in range(100)
... ]).max(["A", "B"])
{'max(A)': 99, 'max(B)': 9801}
Parameters:
  • on – a column name or a list of column names to aggregate.

  • ignore_nulls – Whether to ignore null values. If True, null values are ignored when computing the max; if False, when a null value is encountered, the output is None. This method considers np.nan, None, and pd.NaT to be null values. Default is True.

Returns:

The max result.

For different values of on, the return varies:

  • on=None: an dict containing the column-wise max of all columns,

  • on="col": a scalar representing the max of all items in column "col",

  • on=["col_1", ..., "col_n"]: an n-column dict containing the column-wise max of the provided columns.

If the dataset is empty, all values are null. If ignore_nulls is False and any value is null, then the output is None.

mean(on: str | List[str] | None = None, ignore_nulls: bool = True) Any | dict[str, Any][source]#

Compute the mean of one or more columns.

Note

This operation will trigger execution of the lazy transformations performed on this dataset.

Note

This operation requires all inputs to be materialized in object store for it to execute.

Examples

>>> import xpark
>>> xpark.dataset.from_range(100).mean("id")
49.5
>>> xpark.dataset.from_items([
...     {"A": i, "B": i**2}
...     for i in range(100)
... ]).mean(["A", "B"])
{'mean(A)': 49.5, 'mean(B)': 3283.5}
Parameters:
  • on – a column name or a list of column names to aggregate.

  • ignore_nulls – Whether to ignore null values. If True, null values are ignored when computing the mean; if False, when a null value is encountered, the output is None. This method considers np.nan, None, and pd.NaT to be null values. Default is True.

Returns:

The mean result.

For different values of on, the return varies:

  • on=None: an dict containing the column-wise mean of all columns,

  • on="col": a scalar representing the mean of all items in column "col",

  • on=["col_1", ..., "col_n"]: an n-column dict containing the column-wise mean of the provided columns.

If the dataset is empty, all values are null. If ignore_nulls is False and any value is null, then the output is None.

min(on: str | List[str] | None = None, ignore_nulls: bool = True) Any | dict[str, Any][source]#

Return the minimum of one or more columns.

Note

This operation will trigger execution of the lazy transformations performed on this dataset.

Note

This operation requires all inputs to be materialized in object store for it to execute.

Examples

>>> import xpark
>>> xpark.dataset.from_range(100).min("id")
0
>>> xpark.dataset.from_items([
...     {"A": i, "B": i**2}
...     for i in range(100)
... ]).min(["A", "B"])
{'min(A)': 0, 'min(B)': 0}
Parameters:
  • on – a column name or a list of column names to aggregate.

  • ignore_nulls – Whether to ignore null values. If True, null values are ignored when computing the min; if False, when a null value is encountered, the output is None. This method considers np.nan, None, and pd.NaT to be null values. Default is True.

Returns:

The min result.

For different values of on, the return varies:

  • on=None: an dict containing the column-wise min of all columns,

  • on="col": a scalar representing the min of all items in column "col",

  • on=["col_1", ..., "col_n"]: an n-column dict containing the column-wise min of the provided columns.

If the dataset is empty, all values are null. If ignore_nulls is False and any value is null, then the output is None.

repartition(num_blocks: int | None = None, target_num_rows_per_block: int | None = None, *, shuffle: bool = False, keys: List[str] | None = None, sort: bool = False) Dataset[source]#

Repartition the Dataset into exactly this number of blocks.

This method can be useful to tune the performance of your pipeline. To learn more, see Advanced: Performance Tips and Tuning.

If you’re writing data to files, you can also use this method to change the number of output files. To learn more, see Changing the number of output files <https://docs.ray.io/en/latest/data/saving-data.html#changing-number-output-files>`_.

Note

Repartition has three modes:

  • When num_blocks and shuffle=True are specified Xpark Dataset performs a full distributed shuffle producing exactly num_blocks blocks.

  • When num_blocks and shuffle=False are specified, Xpark Dataset does NOT perform full shuffle, instead opting in for splitting and combining of the blocks attempting to minimize the necessary data movement (relative to full-blown shuffle). Exactly num_blocks will be produced.

  • If target_num_rows_per_block is set (exclusive with num_blocks and shuffle), streaming repartitioning will be executed, where blocks will be made to carry no more than target_num_rows_per_block rows. Smaller blocks will be combined into bigger ones up to target_num_rows_per_block as well.

../../_images/dataset-shuffle.svg

Examples

>>> import xpark
>>> ds = xpark.dataset.from_range(100).repartition(10).materialize()
>>> ds.num_blocks()
10

Time complexity: O(dataset size / parallelism)

Parameters:
  • num_blocks – Number of blocks after repartitioning.

  • target_num_rows_per_block

    [Experimental] The target number of rows per block to repartition. Performs streaming repartitioning of the dataset (no shuffling). Note that either num_blocks or target_num_rows_per_block must be set, but not both. When target_num_rows_per_block is set, it only repartitions Dataset blocks that are larger than target_num_rows_per_block. Note that the system will internally figure out the number of rows per blocks for optimal execution, based on the target_num_rows_per_block. This is the current behavior because of the implementation and may change in the future.

  • shuffle – Whether to perform a distributed shuffle during the repartition. When shuffle is enabled, each output block contains a subset of data rows from each input block, which requires all-to-all data movement. When shuffle is disabled, output blocks are created from adjacent input blocks, minimizing data movement.

  • keys – List of key columns repartitioning will use to determine which partition will row belong to after repartitioning (by applying hash-partitioning algorithm to the whole dataset). Note that, this config is only relevant when DataContext.use_hash_based_shuffle is set to True.

  • sort – Whether the blocks should be sorted after repartitioning. Note, that by default blocks will be sorted in the ascending order.

Note that you must set either num_blocks or target_num_rows_per_block but not both. Additionally note that this operation materializes the entire dataset in memory when you set shuffle to True.

Returns:

The repartitioned Dataset.

schema(fetch_if_missing: bool = True) Schema | None[source]#

Return the schema of the dataset.

Examples

>>> import xpark
>>> ds = xpark.dataset.from_range(10)
>>> ds.schema()
Column  Type
------  ----
id      int64

Note

If this dataset consists of more than a read, or if the schema can’t be determined from the metadata provided by the datasource, or if fetch_if_missing=True (the default), then this operation will trigger execution of the lazy transformations performed on this dataset.

Time complexity: O(1)

Parameters:

fetch_if_missing – If True, synchronously fetch the schema if it’s not known. If False, None is returned if the schema is not known. Default is True.

Returns:

The xpark.dataset.Schema class of the records, or None if the schema is not known and fetch_if_missing is False.

select_columns(cols: str | List[str], *, compute: str | ComputeStrategy = None, concurrency: int | None = None, **ray_remote_args) Dataset[source]#

Select one or more columns from the dataset.

Specified columns must be in the dataset schema.

Tip

If you’re reading parquet files with xpark.dataset.read_parquet(), you might be able to speed it up by using projection pushdown; see Parquet column pruning for details.

Examples

>>> import xpark
>>> ds = xpark.dataset.read_parquet("s3://anonymous@xpark-example-data/iris.parquet")
>>> ds.schema()
Column        Type
------        ----
sepal.length  double
sepal.width   double
petal.length  double
petal.width   double
variety       string
>>> ds.select_columns(["sepal.length", "sepal.width"]).schema()
Column        Type
------        ----
sepal.length  double
sepal.width   double

Time complexity: O(dataset size / parallelism)

Parameters:
  • cols – Names of the columns to select. If a name isn’t in the dataset schema, an exception is raised. Columns also should be unique.

  • compute – This argument is deprecated. Use concurrency argument.

  • concurrency – The maximum number of Xpark workers to use concurrently.

  • ray_remote_args – Additional resource requirements to request from Xpark (e.g., num_gpus=1 to request GPUs for the map tasks). See ray.remote() for details.

show(limit: int = 20) None[source]#

Print up to the given number of rows from the Dataset.

This method is useful for inspecting data.

Note

This operation will trigger execution of the lazy transformations performed on this dataset.

Examples

>>> import xpark
>>> ds = xpark.dataset.from_range(100)
>>> ds.show(3)
{'id': 0}
{'id': 1}
{'id': 2}

Time complexity: O(limit specified)

Parameters:

limit – The maximum number of row to print.

See also

take()

Call this method to get (not print) a given number of rows.

size_bytes() int[source]#

Return the in-memory size of the dataset.

Note

This operation will trigger execution of the lazy transformations performed on this dataset.

Examples

>>> import xpark
>>> ds = xpark.dataset.from_range(10)
>>> ds.size_bytes()
80
Returns:

The in-memory size of the dataset in bytes, or None if the in-memory size is not known.

sort(key: str | List[str], descending: bool | List[bool] = False, boundaries: List[int | float] = None) Dataset[source]#

Sort the dataset by the specified key column or key function. The key parameter must be specified (i.e., it cannot be None).

Note

If provided, the boundaries parameter can only be used to partition the first sort key.

Note

This operation requires all inputs to be materialized in object store for it to execute.

Examples

>>> import xpark
>>> ds = xpark.dataset.from_range(15)
>>> ds = ds.sort("id", descending=False, boundaries=[5, 10])
>>> for df in ray.get(ds.to_pandas_refs()):
...     print(df)
   id
0   0
1   1
2   2
3   3
4   4
   id
0   5
1   6
2   7
3   8
4   9
   id
0  10
1  11
2  12
3  13
4  14

Time complexity: O(dataset size * log(dataset size / parallelism))

Parameters:
  • key – The column or a list of columns to sort by.

  • descending – Whether to sort in descending order. Must be a boolean or a list of booleans matching the number of the columns.

  • boundaries – The list of values based on which to repartition the dataset. For example, if the input boundary is [10,20], rows with values less than 10 will be divided into the first block, rows with values greater than or equal to 10 and less than 20 will be divided into the second block, and rows with values greater than or equal to 20 will be divided into the third block. If not provided, the boundaries will be sampled from the input blocks. This feature only supports numeric columns right now.

Returns:

A new, sorted Dataset.

Raises:

ValueError – if the sort key is None.

std(on: str | List[str] | None = None, ddof: int = 1, ignore_nulls: bool = True) Any | dict[str, Any][source]#

Compute the standard deviation of one or more columns.

Note

This method uses Welford’s online method for an accumulator-style computation of the standard deviation. This method has numerical stability, and is computable in a single pass. This may give different (but more accurate) results than NumPy, Pandas, and sklearn, which use a less numerically stable two-pass algorithm. To learn more, see the Wikapedia article.

Note

This operation will trigger execution of the lazy transformations performed on this dataset.

Note

This operation requires all inputs to be materialized in object store for it to execute.

Examples

>>> import xpark
>>> round(xpark.dataset.from_range(100).std("id", ddof=0), 5)
28.86607
>>> result = xpark.dataset.from_items([
...     {"A": i, "B": i**2}
...     for i in range(100)
... ]).std(["A", "B"])
>>> [(key, round(value, 10)) for key, value in result.items()]
[('std(A)', 29.0114919759), ('std(B)', 2968.1748039269)]
Parameters:
  • on – a column name or a list of column names to aggregate.

  • ddof – Delta Degrees of Freedom. The divisor used in calculations is N - ddof, where N represents the number of elements.

  • ignore_nulls – Whether to ignore null values. If True, null values are ignored when computing the std; if False, when a null value is encountered, the output is None. This method considers np.nan, None, and pd.NaT to be null values. Default is True.

Returns:

The standard deviation result.

For different values of on, the return varies:

  • on=None: an dict containing the column-wise std of all columns,

  • on="col": a scalar representing the std of all items in column "col",

  • on=["col_1", ..., "col_n"]: an n-column dict containing the column-wise std of the provided columns.

If the dataset is empty, all values are null. If ignore_nulls is False and any value is null, then the output is None.

sum(on: str | List[str] | None = None, ignore_nulls: bool = True) Any | dict[str, Any][source]#

Compute the sum of one or more columns.

Note

This operation will trigger execution of the lazy transformations performed on this dataset.

Note

This operation requires all inputs to be materialized in object store for it to execute.

Examples

>>> import xpark
>>> xpark.dataset.from_range(100).sum("id")
4950
>>> xpark.dataset.from_items([
...     {"A": i, "B": i**2}
...     for i in range(100)
... ]).sum(["A", "B"])
{'sum(A)': 4950, 'sum(B)': 328350}
Parameters:
  • on – a column name or a list of column names to aggregate.

  • ignore_nulls – Whether to ignore null values. If True, null values are ignored when computing the sum. If False, when a null value is encountered, the output is None. Xpark Dataset considers np.nan, None, and pd.NaT to be null values. Default is True.

Returns:

The sum result.

For different values of on, the return varies:

  • on=None: a dict containing the column-wise sum of all columns,

  • on="col": a scalar representing the sum of all items in column "col",

  • on=["col_1", ..., "col_n"]: an n-column dict containing the column-wise sum of the provided columns.

If the dataset is empty, all values are null. If ignore_nulls is False and any value is null, then the output is None.

take(limit: int = 20) list[dict[str, Any]][source]#

Return up to limit rows from the Dataset.

This method is useful for inspecting data.

Warning

take() moves up to limit rows to the caller’s machine. If limit is large, this method can cause an OutOfMemory error on the caller.

Note

This operation will trigger execution of the lazy transformations performed on this dataset.

Examples

>>> import xpark
>>> ds = xpark.dataset.from_range(100)
>>> ds.take(3)
[{'id': 0}, {'id': 1}, {'id': 2}]

Time complexity: O(limit specified)

Parameters:

limit – The maximum number of rows to return.

Returns:

A list of up to limit rows from the dataset.

See also

take_all()

Call this method to return all rows.

take_all(limit: int | None = None) list[dict[str, Any]][source]#

Return all of the rows in this Dataset.

This method is useful for inspecting small datasets.

Warning

take_all() moves the entire dataset to the caller’s machine. If the dataset is large, this method can cause an OutOfMemory error on the caller.

Note

This operation will trigger execution of the lazy transformations performed on this dataset.

Examples

>>> import xpark
>>> ds = xpark.dataset.from_range(5)
>>> ds.take_all()
[{'id': 0}, {'id': 1}, {'id': 2}, {'id': 3}, {'id': 4}]

Time complexity: O(dataset size)

Parameters:

limit – Raise an error if the size exceeds the specified limit.

Returns:

A list of all the rows in the dataset.

See also

take()

Call this method to return a specific number of rows.

take_batch(batch_size: int = 20, *, batch_format: str | None = 'default') pyarrow.Table | pandas.DataFrame | Dict[str, ndarray][source]#

Return up to batch_size rows from the Dataset in a batch.

Xpark Dataset represents batches as NumPy arrays or pandas DataFrames. You can configure the batch type by specifying batch_format.

This method is useful for inspecting inputs to map_batches().

Warning

take_batch() moves up to batch_size rows to the caller’s machine. If batch_size is large, this method can cause an ` OutOfMemory error on the caller.

Note

This operation will trigger execution of the lazy transformations performed on this dataset.

Examples

>>> import xpark
>>> ds = xpark.dataset.from_range(100)
>>> ds.take_batch(5)
{'id': array([0, 1, 2, 3, 4])}

Time complexity: O(batch_size specified)

Parameters:
  • batch_size – The maximum number of rows to return.

  • batch_format – If "default" or "numpy", batches are Dict[str, numpy.ndarray]. If "pandas", batches are pandas.DataFrame.

Returns:

A batch of up to batch_size rows from the dataset.

Raises:

ValueError – if the dataset is empty.

to_arrow_refs() List[ObjectRef[pyarrow.Table]][source]#

Convert this Dataset into a distributed set of PyArrow tables.

One PyArrow table is created for each block in this Dataset.

This method is only supported for datasets convertible to PyArrow tables. This function is zero-copy if the existing data is already in PyArrow format. Otherwise, the data is converted to PyArrow format.

Examples

>>> import xpark
>>> ds = xpark.dataset.from_range(10, override_num_blocks=2)
>>> refs = ds.to_arrow_refs()
>>> len(refs)
2

Note

This operation will trigger execution of the lazy transformations performed on this dataset.

Time complexity: O(1) unless conversion is required.

Returns:

A list of remote PyArrow tables created from this dataset.

DeveloperAPI: This API may change across minor Xpark releases.

to_pandas(limit: int = None) pandas.DataFrame[source]#

Convert this Dataset to a single pandas DataFrame.

This method errors if the number of rows exceeds the provided limit. To truncate the dataset beforehand, call limit().

Examples

>>> import xpark
>>> ds = xpark.dataset.from_items([{"a": i} for i in range(3)])
>>> ds.to_pandas()
   a
0  0
1  1
2  2

Note

This operation will trigger execution of the lazy transformations performed on this dataset.

Time complexity: O(dataset size)

Parameters:

limit – The maximum number of rows to return. An error is raised if the dataset has more rows than this limit. Defaults to None, which means no limit.

Returns:

A pandas DataFrame created from this dataset, containing a limited number of rows.

Raises:

ValueError – if the number of rows in the Dataset exceeds limit.

unique(column: str, ignore_nulls: bool = False) list[Any][source]#

List the unique elements in a given column.

Note

This operation will trigger execution of the lazy transformations performed on this dataset.

Note

This operation requires all inputs to be materialized in object store for it to execute.

Examples

>>> import xpark
>>> ds = xpark.dataset.from_items([1, 2, 3, 2, 3])
>>> sorted(ds.unique("item"))
[1, 2, 3]

This function is very useful for computing labels in a machine learning dataset:

>>> import xpark
>>> ds = xpark.dataset.read_csv("s3://anonymous@xpark-example-data/iris.csv")
>>> sorted(ds.unique("target"))
[0, 1, 2]

One common use case is to convert the class labels into integers for training and inference:

>>> classes = {0: 'Setosa', 1: 'Versicolor', 2: 'Virginica'}
>>> def preprocessor(df, classes):
...     df["variety"] = df["target"].map(classes)
...     return df
>>> train_ds = ds.map_batches(
...     preprocessor, fn_kwargs={"classes": classes}, batch_format="pandas")
>>> train_ds.sort("sepal length (cm)").take(1)  # Sort to make it deterministic
[{'sepal length (cm)': 4.3, ..., 'variety': 'Setosa'}]

Time complexity: O(dataset size / parallelism)

Parameters:
  • column – The column to collect unique elements over.

  • ignore_nulls – If True, ignore null values in the column.

Returns:

A list with unique elements in the given column.

with_column(column_name: str, expr: Expr, **ray_remote_args) Dataset[source]#

Add a new column to the dataset via an expression.

This method allows you to add a new column to a dataset by applying an expression. The expression can be composed of existing columns, literals, and user-defined functions (UDFs).

Examples

>>> from xpark.dataset import from_range
>>> from xpark.dataset.expressions import col
>>> ds = from_range(100)
>>> # Add a new column 'id_2' by multiplying 'id' by 2.
>>> ds.with_column("id_2", col("id") * 2).show(2)
{'id': 0, 'id_2': 0}
{'id': 1, 'id_2': 2}
>>> # Using a UDF with with_column
>>> from xpark.dataset.datatype import DataType
>>> from xpark.dataset.expressions import udf
>>> import pyarrow.compute as pc
>>>
>>> @udf(return_dtype=DataType.int32())
... def add_one(column):
...     return pc.add(column, 1)
>>>
>>> ds.with_column("id_plus_one", add_one(col("id"))).show(2)
{'id': 0, 'id_plus_one': 1}
{'id': 1, 'id_plus_one': 2}
>>> # Using an actor UDF with with_column
>>> @udf(return_dtype=DataType.int32(), num_workers={"CPU": 1})
... class Add:
...     def __init__(self, value):
...         self.value = value
...
...     def __call__(self, input: pyarrow.Array) -> pyarrow.Array:
...         return pc.add(input, self.value)
>>>
>>> ds_actor_udf = ds.with_column("id_plus_two", Add(2).with_column(col("id")))
{'id': 0, 'id_plus_one': 1, 'id_plus_two': 2}
{'id': 1, 'id_plus_one': 2, 'id_plus_two': 3}
Parameters:
  • column_name – The name of the new column.

  • expr – An expression that defines the new column values.

  • **ray_remote_args – Additional resource requirements to request from Ray for the map tasks (e.g., num_gpus=1).

Returns:

A new dataset with the added column evaluated via the expression.

write_csv(path: str, *, filesystem: pyarrow.fs.FileSystem | None = None, try_create_dir: bool = True, arrow_open_stream_args: Dict[str, Any] | None = None, filename_provider: FilenameProvider | None = None, arrow_csv_args_fn: Callable[[], Dict[str, Any]] | None = None, min_rows_per_file: int | None = None, ray_remote_args: Dict[str, Any] = None, concurrency: int | None = None, num_rows_per_file: int | None = None, mode: SaveMode = SaveMode.APPEND, **arrow_csv_args) None[source]#

Writes the Dataset to CSV files.

The number of files is determined by the number of blocks in the dataset. To control the number of number of blocks, call repartition().

This method is only supported for datasets with records that are convertible to pyarrow tables.

By default, the format of the output files is {uuid}_{block_idx}.csv, where uuid is a unique id for the dataset. To modify this behavior, implement a custom FilenameProvider and pass it in as the filename_provider argument.

Note

This operation will trigger execution of the lazy transformations performed on this dataset.

Examples

Write the dataset as CSV files to a local directory.

>>> import xpark
>>> ds = xpark.dataset.from_range(100)
>>> ds.write_csv("local:///tmp/data")

Write the dataset as CSV files to S3.

>>> import xpark
>>> ds = xpark.dataset.from_range(100)
>>> ds.write_csv("s3://bucket/folder/)

Time complexity: O(dataset size / parallelism)

Parameters:
  • path – The path to the destination root directory, where the CSV files are written to.

  • filesystem – The pyarrow filesystem implementation to write to. These filesystems are specified in the pyarrow docs. Specify this if you need to provide specific configurations to the filesystem. By default, the filesystem is automatically selected based on the scheme of the paths. For example, if the path begins with s3://, the S3FileSystem is used.

  • try_create_dir – If True, attempts to create all directories in the destination path if True. Does nothing if all directories already exist. Defaults to True.

  • arrow_open_stream_args – kwargs passed to pyarrow.fs.FileSystem.open_output_stream, which is used when opening the file to write to.

  • filename_provider – A FilenameProvider implementation. Use this parameter to customize what your filenames look like.

  • arrow_csv_args_fn – Callable that returns a dictionary of write arguments that are provided to pyarrow.write.write_csv when writing each block to a file. Overrides any duplicate keys from arrow_csv_args. Use this argument instead of arrow_csv_args if any of your write arguments cannot be pickled, or if you’d like to lazily resolve the write arguments for each dataset block.

  • min_rows_per_file – [Experimental] The target minimum number of rows to write to each file. If None, Xpark Dataset writes a system-chosen number of rows to each file. If the number of rows per block is larger than the specified value, Xpark Dataset writes the number of rows per block to each file. The specified value is a hint, not a strict limit. Xpark Dataset might write more or fewer rows to each file.

  • ray_remote_args – kwargs passed to ray.remote() in the write tasks.

  • concurrency – The maximum number of Xpark tasks to run concurrently. Set this to control number of tasks to run concurrently. This doesn’t change the total number of tasks run. By default, concurrency is dynamically decided based on the available resources.

  • num_rows_per_file – [Deprecated] Use min_rows_per_file instead.

  • arrow_csv_args

    Options to pass to pyarrow.write.write_csv when writing each block to a file.

  • mode – Determines how to handle existing files. Valid modes are “overwrite”, “error”, “ignore”, “append”. Defaults to “append”. NOTE: This method isn’t atomic. “Overwrite” first deletes all the data before writing to path.

write_iceberg(table_identifier: str, catalog_kwargs: Dict[str, Any] | None = None, snapshot_properties: Dict[str, str] | None = None, mode: SaveMode = SaveMode.APPEND, overwrite_filter: Expr | None = None, upsert_kwargs: Dict[str, Any] | None = None, overwrite_kwargs: Dict[str, Any] | None = None, ray_remote_args: Dict[str, Any] = None, concurrency: int | None = None) None[source]#

Writes the Dataset to an Iceberg table.

Tip

For more details on PyIceberg, see - URI: https://py.iceberg.apache.org/

Note

This operation will trigger execution of the lazy transformations performed on this dataset.

Examples

import xpark
import pandas as pd
from xpark.dataset import SaveMode
from xpark.dataset.expressions import col

# Basic append (default behavior)
docs = [{"id": i, "title": f"Doc {i}"} for i in range(4)]
ds = xpark.dataset.from_pandas(pd.DataFrame(docs))
ds.write_iceberg(
    table_identifier="db_name.table_name",
    catalog_kwargs={"name": "default", "type": "sql"}
)

# Schema evolution is automatic - new columns are added automatically
enriched_docs = [{"id": i, "title": f"Doc {i}", "category": "new"} for i in range(3)]
ds_enriched = xpark.dataset.from_pandas(pd.DataFrame(enriched_docs))
ds_enriched.write_iceberg(
    table_identifier="db_name.table_name",
    catalog_kwargs={"name": "default", "type": "sql"}
)
 # Upsert mode - update existing rows or insert new ones
updated_docs = [{"id": 2, "title": "Updated Doc 2"}, {"id": 5, "title": "New Doc 5"}]
ds_updates = xpark.dataset.from_pandas(pd.DataFrame(updated_docs))
ds_updates.write_iceberg(
    table_identifier="db_name.table_name",
    catalog_kwargs={"name": "default", "type": "sql"},
    mode=SaveMode.UPSERT,
    upsert_kwargs={"join_cols": ["id"]},
)

# Partial overwrite with Xpark Dataset expressions
ds.write_iceberg(
    table_identifier="events.user_activity",
    catalog_kwargs={"name": "default", "type": "rest"},
    mode=SaveMode.OVERWRITE,
    overwrite_filter=col("date") >= "2024-10-28"
)
Parameters:
  • table_identifier – Fully qualified table identifier (db_name.table_name)

  • catalog_kwargs – Optional arguments to pass to PyIceberg’s catalog.load_catalog() function (such as name, type, etc.). For the function definition, see pyiceberg catalog.

  • snapshot_properties – Custom properties to write to snapshot when committing to an iceberg table.

  • mode

    Write mode using SaveMode enum. Options:

    • SaveMode.APPEND (default): Add new data to the table without checking for duplicates.

    • SaveMode.UPSERT: Update existing rows that match on the join condition (join_cols in upsert_kwargs), or insert new rows if they don’t exist in the table.

    • SaveMode.OVERWRITE: Replace all existing data in the table with new data, or replace data matching overwrite_filter if specified.

  • overwrite_filter – Optional filter for OVERWRITE mode to perform partial overwrites. Must be a Xpark Dataset expression from xpark.dataset.expressions. Only rows matching this filter are replaced. If None with OVERWRITE mode, replaces all table data. Example: col(“date”) >= “2024-01-01” or (col(“region”) == “US”) & (col(“status”) == “active”)

  • upsert_kwargs – Optional arguments for upsert operations. Supported parameters: join_cols (List[str]), case_sensitive (bool), branch (str). Note: Xpark Dataset uses a copy-on-write strategy that always updates all columns for matched keys and inserts all new keys for optimal parallelism.

  • overwrite_kwargs – Optional arguments to pass through to PyIceberg’s table.overwrite() method. Supported parameters: case_sensitive (bool), branch (str). See PyIceberg documentation for details.

  • ray_remote_args – kwargs passed to ray.remote() in the write tasks.

  • concurrency – The maximum number of Xpark tasks to run concurrently. Set this to control number of tasks to run concurrently. This doesn’t change the total number of tasks run. By default, concurrency is dynamically decided based on the available resources.

Note

Schema evolution is automatically enabled. New columns in the incoming data are automatically added to the table schema. The schema is extracted automatically from the data being written.

PublicAPI (alpha): This API is in alpha and may change before becoming stable.

write_json(path: str, *, filesystem: pyarrow.fs.FileSystem | None = None, try_create_dir: bool = True, arrow_open_stream_args: Dict[str, Any] | None = None, filename_provider: FilenameProvider | None = None, pandas_json_args_fn: Callable[[], Dict[str, Any]] | None = None, min_rows_per_file: int | None = None, ray_remote_args: Dict[str, Any] = None, concurrency: int | None = None, num_rows_per_file: int | None = None, mode: SaveMode = SaveMode.APPEND, **pandas_json_args) None[source]#

Writes the Dataset to JSON and JSONL files.

The number of files is determined by the number of blocks in the dataset. To control the number of number of blocks, call repartition().

This method is only supported for datasets with records that are convertible to pandas dataframes.

By default, the format of the output files is {uuid}_{block_idx}.json, where uuid is a unique id for the dataset. To modify this behavior, implement a custom FilenameProvider and pass it in as the filename_provider argument.

Note

This operation will trigger execution of the lazy transformations performed on this dataset.

Examples

Write the dataset as JSON file to a local directory.

>>> import xpark
>>> import pandas as pd
>>> ds = xpark.dataset.from_pandas([pd.DataFrame({"one": [1], "two": ["a"]})])
>>> ds.write_json("local:///tmp/data")

Write the dataset as JSONL files to a local directory.

>>> ds = xpark.dataset.read_json("s3://anonymous@xpark-example-data/train.jsonl")
>>> ds.write_json("local:///tmp/data")

Time complexity: O(dataset size / parallelism)

Parameters:
  • path – The path to the destination root directory, where the JSON files are written to.

  • filesystem

    The pyarrow filesystem implementation to write to. These filesystems are specified in the pyarrow docs. Specify this if you need to provide specific configurations to the filesystem. By default, the filesystem is automatically selected based on the scheme of the paths. For example, if the path begins with s3://, the S3FileSystem is used.

  • try_create_dir – If True, attempts to create all directories in the destination path. Does nothing if all directories already exist. Defaults to True.

  • arrow_open_stream_args

    kwargs passed to pyarrow.fs.FileSystem.open_output_stream, which is used when opening the file to write to.

  • filename_provider – A FilenameProvider implementation. Use this parameter to customize what your filenames look like.

  • pandas_json_args_fn – Callable that returns a dictionary of write arguments that are provided to pandas.DataFrame.to_json() when writing each block to a file. Overrides any duplicate keys from pandas_json_args. Use this parameter instead of pandas_json_args if any of your write arguments can’t be pickled, or if you’d like to lazily resolve the write arguments for each dataset block.

  • min_rows_per_file – [Experimental] The target minimum number of rows to write to each file. If None, Xpark Dataset writes a system-chosen number of rows to each file. If the number of rows per block is larger than the specified value, Xpark Dataset writes the number of rows per block to each file. The specified value is a hint, not a strict limit. Xpark Dataset might write more or fewer rows to each file.

  • ray_remote_args – kwargs passed to ray.remote() in the write tasks.

  • concurrency – The maximum number of Xpark tasks to run concurrently. Set this to control number of tasks to run concurrently. This doesn’t change the total number of tasks run. By default, concurrency is dynamically decided based on the available resources.

  • num_rows_per_file – Deprecated. Use min_rows_per_file instead.

  • pandas_json_args

    These args are passed to pandas.DataFrame.to_json(), which is used under the hood to write out each Dataset block. These are dict(orient=”records”, lines=True) by default.

  • mode – Determines how to handle existing files. Valid modes are “overwrite”, “error”, “ignore”, “append”. Defaults to “append”. NOTE: This method isn’t atomic. “Overwrite” first deletes all the data before writing to path.

write_lance(path: str, *, schema: pyarrow.Schema | None = None, mode: Literal['create', 'append', 'overwrite'] = 'create', min_rows_per_file: int = 1048576, max_rows_per_file: int = 67108864, data_storage_version: str | None = None, storage_options: Dict[str, Any] | None = None, ray_remote_args: Dict[str, Any] = None, concurrency: int | None = None) None[source]#

Write the dataset to a Lance dataset.

Note

This operation will trigger execution of the lazy transformations performed on this dataset.

Examples

docs = [{"title": "Lance data sink test"} for key in range(4)]
ds = xpark.dataset.from_pandas(pd.DataFrame(docs))
ds.write_lance("/tmp/data/")
Parameters:
  • path – The path to the destination Lance dataset.

  • schema – The schema of the dataset. If not provided, it is inferred from the data.

  • mode – The write mode. Can be “create”, “append”, or “overwrite”.

  • min_rows_per_file – The minimum number of rows per file.

  • max_rows_per_file – The maximum number of rows per file.

  • data_storage_version – The version of the data storage format to use. Newer versions are more efficient but require newer versions of lance to read. The default is “legacy” which will use the legacy v1 version. See the user guide for more details.

  • storage_options – The storage options for the writer. Default is None.

write_numpy(path: str, *, column: str, filesystem: pyarrow.fs.FileSystem | None = None, try_create_dir: bool = True, arrow_open_stream_args: Dict[str, Any] | None = None, filename_provider: FilenameProvider | None = None, min_rows_per_file: int | None = None, ray_remote_args: Dict[str, Any] = None, concurrency: int | None = None, num_rows_per_file: int | None = None, mode: SaveMode = SaveMode.APPEND) None[source]#

Writes a column of the Dataset to .npy files.

This is only supported for columns in the datasets that can be converted to NumPy arrays.

The number of files is determined by the number of blocks in the dataset. To control the number of number of blocks, call repartition().

By default, the format of the output files is {uuid}_{block_idx}.npy, where uuid is a unique id for the dataset. To modify this behavior, implement a custom FilenameProvider and pass it in as the filename_provider argument.

Note

This operation will trigger execution of the lazy transformations performed on this dataset.

Examples

>>> import xpark
>>> ds = xpark.dataset.from_range(100)
>>> ds.write_numpy("local:///tmp/data/", column="id")

Time complexity: O(dataset size / parallelism)

Parameters:
  • path – The path to the destination root directory, where the npy files are written to.

  • column – The name of the column that contains the data to be written.

  • filesystem

    The pyarrow filesystem implementation to write to. These filesystems are specified in the pyarrow docs. Specify this if you need to provide specific configurations to the filesystem. By default, the filesystem is automatically selected based on the scheme of the paths. For example, if the path begins with s3://, the S3FileSystem is used.

  • try_create_dir – If True, attempts to create all directories in destination path. Does nothing if all directories already exist. Defaults to True.

  • arrow_open_stream_args

    kwargs passed to pyarrow.fs.FileSystem.open_output_stream, which is used when opening the file to write to.

  • filename_provider – A FilenameProvider implementation. Use this parameter to customize what your filenames look like.

  • min_rows_per_file – [Experimental] The target minimum number of rows to write to each file. If None, Xpark Dataset writes a system-chosen number of rows to each file. If the number of rows per block is larger than the specified value, Xpark Dataset writes the number of rows per block to each file. The specified value is a hint, not a strict limit. Xpark Dataset might write more or fewer rows to each file.

  • ray_remote_args – kwargs passed to ray.remote() in the write tasks.

  • concurrency – The maximum number of Xpark tasks to run concurrently. Set this to control number of tasks to run concurrently. This doesn’t change the total number of tasks run. By default, concurrency is dynamically decided based on the available resources.

  • num_rows_per_file – [Deprecated] Use min_rows_per_file instead.

  • mode – Determines how to handle existing files. Valid modes are “overwrite”, “error”, “ignore”, “append”. Defaults to “append”. NOTE: This method isn’t atomic. “Overwrite” first deletes all the data before writing to path.

write_parquet(path: str, *, partition_cols: List[str] | None = None, filesystem: pyarrow.fs.FileSystem | None = None, try_create_dir: bool = True, arrow_open_stream_args: Dict[str, Any] | None = None, filename_provider: FilenameProvider | None = None, arrow_parquet_args_fn: Callable[[], Dict[str, Any]] | None = None, min_rows_per_file: int | None = None, max_rows_per_file: int | None = None, ray_remote_args: Dict[str, Any] = None, concurrency: int | None = None, num_rows_per_file: int | None = None, mode: SaveMode = SaveMode.APPEND, **arrow_parquet_args) None[source]#

Writes the Dataset to parquet files under the provided path.

The number of files is determined by the number of blocks in the dataset. To control the number of number of blocks, call repartition().

If pyarrow can’t represent your data, this method errors.

By default, the format of the output files is {uuid}_{block_idx}.parquet, where uuid is a unique id for the dataset. To modify this behavior, implement a custom FilenameProvider and pass it in as the filename_provider argument.

Note

This operation will trigger execution of the lazy transformations performed on this dataset.

Examples

>>> import xpark
>>> ds = xpark.dataset.from_range(100)
>>> ds.write_parquet("local:///tmp/data/")

Time complexity: O(dataset size / parallelism)

Parameters:
  • path – The path to the destination root directory, where parquet files are written to.

  • partition_cols – Column names by which to partition the dataset. Files are writted in Hive partition style.

  • filesystem

    The pyarrow filesystem implementation to write to. These filesystems are specified in the pyarrow docs. Specify this if you need to provide specific configurations to the filesystem. By default, the filesystem is automatically selected based on the scheme of the paths. For example, if the path begins with s3://, the S3FileSystem is used.

  • try_create_dir – If True, attempts to create all directories in the destination path. Does nothing if all directories already exist. Defaults to True.

  • arrow_open_stream_args

    kwargs passed to pyarrow.fs.FileSystem.open_output_stream, which is used when opening the file to write to.

  • filename_provider – A FilenameProvider implementation. Use this parameter to customize what your filenames look like. The filename is expected to be templatized with {i} to ensure unique filenames when writing multiple files. If it’s not templatized, Xpark Dataset will add {i} to the filename to ensure compatibility with the pyarrow write_dataset.

  • arrow_parquet_args_fn – Callable that returns a dictionary of write arguments that are provided to pyarrow.parquet.ParquetWriter() when writing each block to a file. Overrides any duplicate keys from arrow_parquet_args. If row_group_size is provided, it will be passed to pyarrow.parquet.ParquetWriter.write_table(). Use this argument instead of arrow_parquet_args if any of your write arguments can’t pickled, or if you’d like to lazily resolve the write arguments for each dataset block.

  • min_rows_per_file – [Experimental] The target minimum number of rows to write to each file. If None, Xpark Dataset writes a system-chosen number of rows to each file. If the number of rows per block is larger than the specified value, Xpark Dataset writes the number of rows per block to each file. The specified value is a hint, not a strict limit. Xpark Dataset might write more or fewer rows to each file.

  • max_rows_per_file – [Experimental] The target maximum number of rows to write to each file. If None, Xpark Dataset writes a system-chosen number of rows to each file. If the number of rows per block is smaller than the specified value, Xpark Dataset writes the number of rows per block to each file. The specified value is a hint, not a strict limit. Xpark Dataset might write more or fewer rows to each file. If both min_rows_per_file and max_rows_per_file are specified, max_rows_per_file takes precedence when they cannot both be satisfied.

  • ray_remote_args – Kwargs passed to ray.remote() in the write tasks.

  • concurrency – The maximum number of Xpark tasks to run concurrently. Set this to control number of tasks to run concurrently. This doesn’t change the total number of tasks run. By default, concurrency is dynamically decided based on the available resources.

  • num_rows_per_file – [Deprecated] Use min_rows_per_file instead.

  • arrow_parquet_args

    Options to pass to pyarrow.parquet.ParquetWriter(), which is used to write out each block to a file. See arrow_parquet_args_fn for more detail.

  • mode – Determines how to handle existing files. Valid modes are “overwrite”, “error”, “ignore”, “append”. Defaults to “append”. NOTE: This method isn’t atomic. “Overwrite” first deletes all the data before writing to path.