Getting Started#

Welcome to Xpark — a multimodal AI data processing platform designed to streamline and optimize data workflows for AI applications.

This guide walks you through installing Xpark, loading your first dataset, applying transformations, and running AI processors.

Installation#

Note

Xpark is not yet fully open-sourced and has not been published to the public PyPI registry. External users can access Xpark through Tencent EMR.

Install Xpark via pip:

pip install xpark

# To use AI processors, install the extra dependencies:
pip install "xpark[ai]"

Quick Start#

Xpark runs on a Ray cluster. You can initialize a Ray cluster to run Xpark. The following example demonstrates single-machine (local) mode.

The following example shows how to create a simple dataset, apply a transformation, and collect the results.

from xpark.dataset import from_items
from xpark.dataset.expressions import col

# Create a dataset from a list of dictionaries
ds = from_items([
    {"text": "Hello, world!"},
    {"text": "Xpark makes data processing easy."},
    {"text": "Multimodal AI at scale."},
])

# Add an uppercase column using the string namespace
ds = ds.with_column("upper_text", col("text").str.upper())
ds.show()
{'text': 'Hello, world!', 'upper_text': 'HELLO, WORLD!'}
{'text': 'Xpark makes data processing easy.', 'upper_text': 'XPARK MAKES DATA PROCESSING EASY.'}
{'text': 'Multimodal AI at scale.', 'upper_text': 'MULTIMODAL AI AT SCALE.'}

Loading Data#

Xpark provides a rich set of readers for various data formats and modalities.

From Python objects

from xpark.dataset import from_items, from_range

# From a list of dicts
ds = from_items([{"id": 1, "value": 10}, {"id": 2, "value": 20}])

# From a numeric range
ds = from_range(100)

From files

from xpark.dataset import read_json, read_parquet, read_image, read_audio

# Structured data
ds = read_json("/path/to/data.json")
ds = read_parquet("/path/to/data.parquet")

# Multimodal data
ds = read_image("/path/to/images/")
ds = read_audio("/path/to/audio/")

From third-party sources

from xpark.dataset import from_pandas, from_numpy, from_huggingface

import pandas as pd
import numpy as np

ds = from_pandas(pd.DataFrame({"a": [1, 2, 3]}))
ds = from_numpy(np.array([1, 2, 3]))
ds = from_huggingface("squad", split="train")
From distributed sources

Xpark’s multimodal operators support reading from various file types (audio, video, and images) across remote distributed file systems and object storage services, such as S3, HDFS, and COS.

import os

from xpark.dataset import DatasetContext, from_items
from xpark.dataset.expressions import col
from xpark.dataset.processors.video_compute import VideoCompute

ctx = DatasetContext.get_current()

ctx.storage_options = {
    "cos": {
        "endpoint_url": "http://cos.ap-guangzhou.myqcloud.com",
        "key": os.getenv("TEST_COS_KEY"),
        "secret": os.getenv("TEST_COS_SECRET"),
        "use_ssl": True,
        "config_kwargs": {"s3": {"addressing_style": "virtual", "signature_version": "v4"}},
    }
}

ds = from_items([{"video": "cos://bucket_name/path/to/video.mp4"}])
ds = ds.with_column("audio", VideoCompute.extract_audio(col("video")))

Transforming Data#

Use with_column() to add or overwrite columns using expressions.

from xpark.dataset import from_items
from xpark.dataset.expressions import col, lit

ds = from_items([{"score": 85}, {"score": 42}, {"score": 91}])

# Multiply a column by a constant
ds = ds.with_column("scaled_score", col("score") * lit(2))

# Filter rows
ds = ds.filter(col("score") > lit(50))

ds.show()

Applying AI/Data Processors#

Xpark ships with ready-to-use AI/Data processors for text, image, audio, and video.

Text Embedding

from xpark.dataset import TextEmbedding, from_items
from xpark.dataset.expressions import col

ds = from_items([
    "What is the capital of France?",
    "Explain the theory of relativity.",
])
ds = ds.with_column(
    "embedding",
    TextEmbedding("Qwen/Qwen3-Embedding-0.6B")
    .options(num_workers={"CPU": 1})
    .with_column(col("item")),
)
output = ds.take_all()

Video Extract Audio

from xpark.dataset import (
    VideoCompute,
    from_items,
)
from xpark.dataset.expressions import col

# Step 1: Build the video dataset
ds = from_items([{"video": "/path/to/video.mp4"}])

# Step 2: Extract audio from videos (Video → Audio)
# VideoCompute.extract_audio returns audio binary data (bytes)
ds = ds.with_column(
    "audio",
    VideoCompute.extract_audio(col("video"), codec="mp3"),
)

audio_bytes = ds.take_all()[0]["audio"]

Transforming Data By UDF#

In addition to built-in expressions, Xpark supports custom transformations via User-Defined Functions (UDFs). UDFs can be used directly inside with_column(), just like any other expression.

There are two forms of UDFs:

Function UDF — a plain function decorated with udf(). Each argument receives a PyArrow Array (one per column), and the function must return a PyArrow Array.

from xpark.dataset import from_items
from xpark.dataset.expressions import col, udf
from xpark.dataset.datatype import DataType
import pyarrow as pa
import pyarrow.compute as pc


@udf(return_dtype=DataType.string())
def format_name(first: pa.Array, last: pa.Array) -> pa.Array:
    return pc.binary_join_element_wise(first, last, " ")


ds = from_items(
    [
        {"first": "John", "last": "Doe"},
        {"first": "Jane", "last": "Smith"},
    ]
)
ds = ds.with_column("full_name", format_name(col("first"), col("last")))
ds.show()

Class UDF — a class decorated with udf(). This form is useful when the UDF needs to maintain state across batches, such as loading a model in __init__ and reusing it in __call__. Use num_workers to control how many Ray actor workers are launched.

The recommended way to specify num_workers is at call time via .options(), which keeps the UDF definition reusable across different resource configurations:

from xpark.dataset import from_range
from xpark.dataset.expressions import col, udf
from xpark.dataset.datatype import DataType
import pyarrow as pa
import pyarrow.compute as pc


@udf(return_dtype=DataType.int32())
class Multiplier:
    def __init__(self, factor: int):
        self.factor = factor

    def __call__(self, array: pa.Array) -> pa.Array:
        return pc.multiply(array, self.factor)


ds = from_range(5)
ds = ds.with_column("tripled", Multiplier(3).options(num_workers={"CPU": 2})(col("id")))
ds.show()

num_workers can also be set in the @udf decorator directly, though this is less common since it couples the resource configuration to the UDF definition:

@udf(return_dtype=DataType.int32(), num_workers={"CPU": 2})
class Multiplier:
    ...

UDF results can also be composed with other expressions:

# UDF output participates in further arithmetic
ds = ds.with_column("tripled_plus_ten", Multiplier(3)(col("id")) + 10)

Collecting Results#

# Write to Parquet files
ds.write_parquet("/path/to/parquets/")

# Collect all rows as a list of dicts
results = ds.take_all()

# Collect the first N rows
first_five = ds.take(5)

# Print rows to stdout
ds.show()

Next Steps#