xpark.dataset.GroupedData#

class xpark.dataset.GroupedData(ray_grouped_data: GroupedData)[source]#

Represents a grouped dataset created by calling Dataset.groupby().

The actual groupby is deferred until an aggregation is applied.

Methods

aggregate(*aggs)

Implements an accumulator-based aggregation.

count()

Compute count aggregation.

map_groups(fn, *[, zero_copy_batch, ...])

Apply the given function to each group of records of this dataset.

max([on, ignore_nulls])

Compute grouped max aggregation.

mean([on, ignore_nulls])

Compute grouped mean aggregation.

min([on, ignore_nulls])

Compute grouped min aggregation.

std([on, ddof, ignore_nulls])

Compute grouped standard deviation aggregation.

sum([on, ignore_nulls])

Compute grouped sum aggregation.

with_column(column_name, expr, **ray_remote_args)

Add a new column to each group using an expression.

aggregate(*aggs: AggregateFn) Dataset#

Implements an accumulator-based aggregation.

Parameters:

aggs – Aggregations to do.

Returns:

The output is an dataset of n + 1 columns where the first column is the groupby key and the second through n + 1 columns are the results of the aggregations. If groupby key is None then the key part of return is omitted.

count() Dataset#

Compute count aggregation.

Examples

>>> import xpark
>>> xpark.dataset.from_items([
...     {"A": x % 3, "B": x} for x in range(100)]).groupby(
...     "A").count()
Returns:

A dataset of [k, v] columns where k is the groupby key and v is the number of rows with that key. If groupby key is None then the key part of return is omitted.

map_groups(fn: Callable[[pyarrow.Table | pandas.DataFrame | Dict[str, ndarray]], pyarrow.Table | pandas.DataFrame | Dict[str, ndarray]] | Callable[[pyarrow.Table | pandas.DataFrame | Dict[str, ndarray]], Iterator[pyarrow.Table | pandas.DataFrame | Dict[str, ndarray]]] | type[_CallableClassProtocol], *, zero_copy_batch: bool = True, compute: str | ComputeStrategy = None, batch_format: str | None = 'default', fn_args: Iterable[Any] | None = None, fn_kwargs: Dict[str, Any] | None = None, fn_constructor_args: Iterable[Any] | None = None, fn_constructor_kwargs: Dict[str, Any] | None = None, num_cpus: float | None = None, num_gpus: float | None = None, memory: float | None = None, concurrency: int | Tuple[int, int] | Tuple[int, int, int] | None = None, ray_remote_args_fn: Callable[[], Dict[str, Any]] | None = None, **ray_remote_args) Dataset#

Apply the given function to each group of records of this dataset.

While map_groups() is very flexible, note that it comes with downsides:

  • It may be slower than using more specific methods such as min(), max().

  • It requires that each group fits in memory on a single node.

In general, prefer to use aggregate() instead of map_groups().

Warning

Specifying both num_cpus and num_gpus for map tasks is experimental, and may result in scheduling or stability issues. Please report any issues to the Xpark team.

Examples

>>> # Return a single record per group (list of multiple records in,
>>> # list of a single record out).
>>> import xpark
>>> import pandas as pd
>>> import numpy as np
>>> # Get first value per group.
>>> ds = xpark.dataset.from_items([
...     {"group": 1, "value": 1},
...     {"group": 1, "value": 2},
...     {"group": 2, "value": 3},
...     {"group": 2, "value": 4}])
>>> ds.groupby("group").map_groups(
...     lambda g: {"result": np.array([g["value"][0]])})
>>> # Return multiple records per group (dataframe in, dataframe out).
>>> df = pd.DataFrame(
...     {"A": ["a", "a", "b"], "B": [1, 1, 3], "C": [4, 6, 5]}
... )
>>> ds = xpark.dataset.from_pandas(df)
>>> grouped = ds.groupby("A")
>>> grouped.map_groups(
...     lambda g: g.apply(
...         lambda c: c / g[c.name].sum() if c.name in ["B", "C"] else c
...     )
... )
Parameters:
  • fn – The function to apply to each group of records, or a class type that can be instantiated to create such a callable. It takes as input a batch of all records from a single group, and returns a batch of zero or more records, similar to map_batches().

  • zero_copy_batch – If True, each group of rows (batch) will be provided w/o making an additional copy.

  • compute

    The compute strategy to use for the map operation.

    • If compute is not specified for a function, will use xpark.dataset.TaskPoolStrategy() to launch concurrent tasks based on the available resources and number of input blocks.

    • Use xpark.dataset.TaskPoolStrategy(size=n) to launch at most n concurrent Xpark tasks.

    • If compute is not specified for a callable class, will use xpark.dataset.ActorPoolStrategy(min_size=1, max_size=None) to launch an autoscaling actor pool from 1 to unlimited workers.

    • Use xpark.dataset.ActorPoolStrategy(size=n) to use a fixed size actor pool of n workers.

    • Use xpark.dataset.ActorPoolStrategy(min_size=m, max_size=n) to use an autoscaling actor pool from m to n workers.

    • Use xpark.dataset.ActorPoolStrategy(min_size=m, max_size=n, initial_size=initial) to use an autoscaling actor pool from m to n workers, with an initial size of initial.

  • batch_format – Specify "default" to use the default block format (NumPy), "pandas" to select pandas.DataFrame, “pyarrow” to select pyarrow.Table, or "numpy" to select Dict[str, numpy.ndarray], or None to return the underlying block exactly as is with no additional formatting.

  • fn_args – Arguments to fn.

  • fn_kwargs – Keyword arguments to fn.

  • fn_constructor_args – Positional arguments to pass to fn’s constructor. You can only provide this if fn is a callable class. These arguments are top-level arguments in the underlying Xpark actor construction task.

  • fn_constructor_kwargs – Keyword arguments to pass to fn’s constructor. This can only be provided if fn is a callable class. These arguments are top-level arguments in the underlying Xpark actor construction task.

  • num_cpus – The number of CPUs to reserve for each parallel map worker.

  • num_gpus – The number of GPUs to reserve for each parallel map worker. For example, specify num_gpus=1 to request 1 GPU for each parallel map worker.

  • memory – The heap memory in bytes to reserve for each parallel map worker.

  • ray_remote_args_fn – A function that returns a dictionary of remote args passed to each map worker. The purpose of this argument is to generate dynamic arguments for each actor or task, and will be called each time prior to initializing the worker. Args returned from this dict will always override the args in ray_remote_args. Note: this is an advanced, experimental feature.

  • concurrency – This argument is deprecated. Use compute argument.

  • ray_remote_args – Additional resource requirements to request from Xpark (e.g., num_gpus=1 to request GPUs for the map tasks). See ray.remote() for details.

Returns:

The return type is determined by the return type of fn, and the return value is combined from results of all groups.

See also

GroupedData.aggregate()

Use this method for common aggregation use cases.

max(on: str | List[str] = None, ignore_nulls: bool = True) Dataset#

Compute grouped max aggregation.

Examples

>>> import xpark
>>> xpark.dataset.le(100).groupby("value").max()
>>> xpark.dataset.from_items([
...     {"A": i % 3, "B": i, "C": i**2}
...     for i in range(100)])
...     .groupby("A")
...     .max(["B", "C"])
Parameters:
  • on – a column name or a list of column names to aggregate.

  • ignore_nulls – Whether to ignore null values. If True, null values will be ignored when computing the max; if False, if a null value is encountered, the output will be null. We consider np.nan, None, and pd.NaT to be null values. Default is True.

Returns:

The max result.

For different values of on, the return varies:

  • on=None: a dataset containing a groupby key column, "k", and a column-wise max column for each original column in the dataset.

  • on=["col_1", ..., "col_n"]: a dataset of n + 1 columns where the first column is the groupby key and the second through n + 1 columns are the results of the aggregations.

If groupby key is None then the key part of return is omitted.

mean(on: str | List[str] = None, ignore_nulls: bool = True) Dataset#

Compute grouped mean aggregation.

Examples

>>> import xpark
>>> xpark.dataset.le(100).groupby("value").mean()
>>> xpark.dataset.from_items([
...     {"A": i % 3, "B": i, "C": i**2}
...     for i in range(100)])
...     .groupby("A")
...     .mean(["B", "C"])
Parameters:
  • on – a column name or a list of column names to aggregate.

  • ignore_nulls – Whether to ignore null values. If True, null values will be ignored when computing the mean; if False, if a null value is encountered, the output will be null. We consider np.nan, None, and pd.NaT to be null values. Default is True.

Returns:

The mean result.

For different values of on, the return varies:

  • on=None: a dataset containing a groupby key column, "k", and a column-wise mean column for each original column in the dataset.

  • on=["col_1", ..., "col_n"]: a dataset of n + 1 columns where the first column is the groupby key and the second through n + 1 columns are the results of the aggregations.

If groupby key is None then the key part of return is omitted.

min(on: str | List[str] = None, ignore_nulls: bool = True) Dataset#

Compute grouped min aggregation.

Examples

>>> import xpark
>>> xpark.dataset.le(100).groupby("value").min()
>>> xpark.dataset.from_items([
...     {"A": i % 3, "B": i, "C": i**2}
...     for i in range(100)])
...     .groupby("A")
...     .min(["B", "C"])
Parameters:
  • on – a column name or a list of column names to aggregate.

  • ignore_nulls – Whether to ignore null values. If True, null values will be ignored when computing the min; if False, if a null value is encountered, the output will be null. We consider np.nan, None, and pd.NaT to be null values. Default is True.

Returns:

The min result.

For different values of on, the return varies:

  • on=None: a dataset containing a groupby key column, "k", and a column-wise min column for each original column in the dataset.

  • on=["col_1", ..., "col_n"]: a dataset of n + 1 columns where the first column is the groupby key and the second through n + 1 columns are the results of the aggregations.

If groupby key is None then the key part of return is omitted.

std(on: str | List[str] = None, ddof: int = 1, ignore_nulls: bool = True) Dataset#

Compute grouped standard deviation aggregation.

Examples

>>> import xpark
>>> xpark.dataset.from_range(100).groupby("id").std(ddof=0)
>>> xpark.dataset.from_items([
...     {"A": i % 3, "B": i, "C": i**2}
...     for i in range(100)])
...     .groupby("A")
...     .std(["B", "C"])

NOTE: This uses Welford’s online method for an accumulator-style computation of the standard deviation. This method was chosen due to it’s numerical stability, and it being computable in a single pass. This may give different (but more accurate) results than NumPy, Pandas, and sklearn, which use a less numerically stable two-pass algorithm. See https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Welford’s_online_algorithm

Parameters:
  • on – a column name or a list of column names to aggregate.

  • ddof – Delta Degrees of Freedom. The divisor used in calculations is N - ddof, where N represents the number of elements.

  • ignore_nulls – Whether to ignore null values. If True, null values will be ignored when computing the std; if False, if a null value is encountered, the output will be null. We consider np.nan, None, and pd.NaT to be null values. Default is True.

Returns:

The standard deviation result.

For different values of on, the return varies:

  • on=None: a dataset containing a groupby key column, "k", and a column-wise std column for each original column in the dataset.

  • on=["col_1", ..., "col_n"]: a dataset of n + 1 columns where the first column is the groupby key and the second through n + 1 columns are the results of the aggregations.

If groupby key is None then the key part of return is omitted.

sum(on: str | List[str] = None, ignore_nulls: bool = True) Dataset#

Compute grouped sum aggregation.

Examples

>>> import xpark
>>> xpark.dataset.from_items([
...     (i % 3, i, i**2)
...     for i in range(100)])
...     .groupby(lambda x: x[0] % 3)
...     .sum(lambda x: x[2])
>>> xpark.dataset.from_range(100).groupby("id").sum()
>>> xpark.dataset.from_items([
...     {"A": i % 3, "B": i, "C": i**2}
...     for i in range(100)])
...     .groupby("A")
...     .sum(["B", "C"])
Parameters:
  • on – a column name or a list of column names to aggregate.

  • ignore_nulls – Whether to ignore null values. If True, null values will be ignored when computing the sum; if False, if a null value is encountered, the output will be null. We consider np.nan, None, and pd.NaT to be null values. Default is True.

Returns:

The sum result.

For different values of on, the return varies:

  • on=None: a dataset containing a groupby key column, "k", and a column-wise sum column for each original column in the dataset.

  • on=["col_1", ..., "col_n"]: a dataset of n + 1 columns where the first column is the groupby key and the second through n + 1 columns are the results of the aggregations.

If groupby key is None then the key part of return is omitted.

with_column(column_name: str, expr: Expr, **ray_remote_args) Dataset#

Add a new column to each group using an expression.

The supplied expression is evaluated against every row in each group, and the resulting column is appended to the group’s records. The output dataset preserves the original rows and columns.

Examples

>>> import xpark
>>> from xpark.dataset.expressions import col
>>> ds = (
...     xpark.dataset.from_items([{"group": 1, "value": 1}, {"group": 1, "value": 2}])
...     .groupby("group")
...     .with_column("value_twice", col("value") * 2)
...     .sort(["group", "value"])
... )
>>> ds.take_all()
[{'group': 1, 'value': 1, 'value_twice': 2}, {'group': 1, 'value': 2, 'value_twice': 4}]
Parameters:
  • column_name – Name of the column to add.

  • expr – Expression that yields the values for the new column.

  • **ray_remote_args – Additional resource requirements to request from Xpark for the underlying map tasks (for example, num_gpus=1).

Returns:

A new Dataset containing all existing columns plus the newly computed column.

PublicAPI (alpha): This API is in alpha and may change before becoming stable.