xpark.dataset.expressions.udf#
- xpark.dataset.expressions.udf(*, return_dtype: DataType, batch_size: Literal['default'] | int | None = None, num_workers: dict[Literal['CPU', 'GPU', 'IO'], tuple[int, int] | int] | None = None, worker_ray_remote_args: dict[Literal['CPU', 'GPU', 'IO'], dict] | None = None)[source]#
Decorator to convert a UDF into an expression-compatible function.
This decorator allows UDFs to be used seamlessly within the expression system, enabling schema inference and integration with other expressions.
IMPORTANT: UDFs operate on batches of data, not individual rows. When your UDF is called, each column argument will be passed as a PyArrow Array containing multiple values from that column across the batch. Under the hood, when working with multiple columns, they get translated to PyArrow arrays (one array per column).
- Parameters:
return_dtype – The data type of the return value of the UDF
batch_size – The desired number of rows in each batch, or
Noneto use entire blocks as batches (blocks may contain different numbers of rows). The actual size of the batch provided toprocessormay be smaller thanbatch_sizeifbatch_sizedoesn’t evenly divide the block(s) sent to a given map task. Defaultbatch_sizeisNone.num_workers – The number of worker processes to use for batch inference, the available worker types are
CPU,GPUandIO. Actual number of workers will be equal or less thannum_workers.worker_ray_remote_args – Additional resource requirements for each type of map worker. See
ray.remote()for details.
- Returns:
A callable that creates UDFExpr instances when called with expressions
Example
>>> from xpark.dataset import from_items >>> from xpark.dataset.expressions import col, udf >>> import pyarrow as pa >>> import pyarrow.compute as pc >>> import ray >>> >>> # UDF that operates on a batch of values (PyArrow Array) >>> @udf(return_dtype=DataType.int32()) ... def add_one(x: pa.Array) -> pa.Array: ... return pc.add(x, 1) # Vectorized operation on the entire Array >>> >>> # UDF that combines multiple columns (each as a PyArrow Array) >>> @udf(return_dtype=DataType.string()) ... def format_name(first: pa.Array, last: pa.Array) -> pa.Array: ... return pc.binary_join_element_wise(first, last, " ") # Vectorized string concatenation >>> >>> # Use in dataset operations >>> ds = from_items([ ... {"value": 5, "first": "John", "last": "Doe"}, ... {"value": 10, "first": "Jane", "last": "Smith"} ... ]) >>> >>> # Single column transformation (operates on batches) >>> ds_incremented = ds.with_column("value_plus_one", add_one(col("value"))) >>> >>> # Multi-column transformation (each column becomes a PyArrow Array) >>> ds_formatted = ds.with_column("full_name", format_name(col("first"), col("last"))) >>> >>> # Can also be used in complex expressions >>> ds_complex = ds.with_column("doubled_plus_one", add_one(col("value")) * 2) >>> >>> # UDF can be an actor >>> @udf(return_dtype=DataType.int32(), num_workers={"CPU": 1}) ... class Add: ... def __init__(self, value): ... self.value = value ... ... def __call__(self, array: pa.Array) -> pa.Array: ... return pc.add(array, self.value) >>> >>> add_two = Add(2) >>> ds_actor_udf = ds.with_column("value_plus_two", add_two(col("value")))