.. _get_started: =============== Getting Started =============== Welcome to **Xpark** — a multimodal AI data processing platform designed to streamline and optimize data workflows for AI applications. This guide walks you through installing Xpark, loading your first dataset, applying transformations, and running AI processors. Installation ------------ .. note:: Xpark is not yet fully open-sourced and has not been published to the public PyPI registry. External users can access Xpark through **Tencent EMR**. Install Xpark via pip: .. code-block:: bash pip install xpark # To use AI processors, install the extra dependencies: pip install "xpark[ai]" Quick Start ----------- Xpark runs on a Ray cluster. You can initialize a Ray cluster to run Xpark. The following example demonstrates single-machine (local) mode. The following example shows how to create a simple dataset, apply a transformation, and collect the results. .. testcode:: from xpark.dataset import from_items from xpark.dataset.expressions import col # Create a dataset from a list of dictionaries ds = from_items([ {"text": "Hello, world!"}, {"text": "Xpark makes data processing easy."}, {"text": "Multimodal AI at scale."}, ]) # Add an uppercase column using the string namespace ds = ds.with_column("upper_text", col("text").str.upper()) ds.show() .. testoutput:: {'text': 'Hello, world!', 'upper_text': 'HELLO, WORLD!'} {'text': 'Xpark makes data processing easy.', 'upper_text': 'XPARK MAKES DATA PROCESSING EASY.'} {'text': 'Multimodal AI at scale.', 'upper_text': 'MULTIMODAL AI AT SCALE.'} Loading Data ------------ Xpark provides a rich set of readers for various data formats and modalities. **From Python objects** .. code-block:: python from xpark.dataset import from_items, from_range # From a list of dicts ds = from_items([{"id": 1, "value": 10}, {"id": 2, "value": 20}]) # From a numeric range ds = from_range(100) **From files** .. code-block:: python from xpark.dataset import read_json, read_parquet, read_image, read_audio # Structured data ds = read_json("/path/to/data.json") ds = read_parquet("/path/to/data.parquet") # Multimodal data ds = read_image("/path/to/images/") ds = read_audio("/path/to/audio/") **From third-party sources** .. code-block:: python from xpark.dataset import from_pandas, from_numpy, from_huggingface import pandas as pd import numpy as np ds = from_pandas(pd.DataFrame({"a": [1, 2, 3]})) ds = from_numpy(np.array([1, 2, 3])) ds = from_huggingface("squad", split="train") **From distributed sources** Xpark's multimodal operators support reading from various file types (audio, video, and images) across remote distributed file systems and object storage services, such as S3, HDFS, and COS. .. code-block:: python import os from xpark.dataset import DatasetContext, from_items from xpark.dataset.expressions import col from xpark.dataset.processors.video_compute import VideoCompute ctx = DatasetContext.get_current() ctx.storage_options = { "cos": { "endpoint_url": "http://cos.ap-guangzhou.myqcloud.com", "key": os.getenv("TEST_COS_KEY"), "secret": os.getenv("TEST_COS_SECRET"), "use_ssl": True, "config_kwargs": {"s3": {"addressing_style": "virtual", "signature_version": "v4"}}, } } ds = from_items([{"video": "cos://bucket_name/path/to/video.mp4"}]) ds = ds.with_column("audio", VideoCompute.extract_audio(col("video"))) Transforming Data ----------------- Use :meth:`~xpark.dataset.Dataset.with_column` to add or overwrite columns using :ref:`expressions `. .. code-block:: python from xpark.dataset import from_items from xpark.dataset.expressions import col, lit ds = from_items([{"score": 85}, {"score": 42}, {"score": 91}]) # Multiply a column by a constant ds = ds.with_column("scaled_score", col("score") * lit(2)) # Filter rows ds = ds.filter(col("score") > lit(50)) ds.show() Applying AI/Data Processors ---------------------- Xpark ships with ready-to-use AI/Data processors for text, image, audio, and video. **Text Embedding** .. code-block:: python from xpark.dataset import TextEmbedding, from_items from xpark.dataset.expressions import col ds = from_items([ "What is the capital of France?", "Explain the theory of relativity.", ]) ds = ds.with_column( "embedding", TextEmbedding("Qwen/Qwen3-Embedding-0.6B") .options(num_workers={"CPU": 1}) .with_column(col("item")), ) output = ds.take_all() **Video Extract Audio** .. code-block:: python from xpark.dataset import ( VideoCompute, from_items, ) from xpark.dataset.expressions import col # Step 1: Build the video dataset ds = from_items([{"video": "/path/to/video.mp4"}]) # Step 2: Extract audio from videos (Video → Audio) # VideoCompute.extract_audio returns audio binary data (bytes) ds = ds.with_column( "audio", VideoCompute.extract_audio(col("video"), codec="mp3"), ) audio_bytes = ds.take_all()[0]["audio"] Transforming Data By UDF ------------------------ In addition to built-in expressions, Xpark supports custom transformations via User-Defined Functions (UDFs). UDFs can be used directly inside :meth:`~xpark.dataset.Dataset.with_column`, just like any other expression. There are two forms of UDFs: **Function UDF** — a plain function decorated with :func:`~xpark.dataset.expressions.udf`. Each argument receives a PyArrow ``Array`` (one per column), and the function must return a PyArrow ``Array``. .. code-block:: python from xpark.dataset import from_items from xpark.dataset.expressions import col, udf from xpark.dataset.datatype import DataType import pyarrow as pa import pyarrow.compute as pc @udf(return_dtype=DataType.string()) def format_name(first: pa.Array, last: pa.Array) -> pa.Array: return pc.binary_join_element_wise(first, last, " ") ds = from_items( [ {"first": "John", "last": "Doe"}, {"first": "Jane", "last": "Smith"}, ] ) ds = ds.with_column("full_name", format_name(col("first"), col("last"))) ds.show() **Class UDF** — a class decorated with :func:`~xpark.dataset.expressions.udf`. This form is useful when the UDF needs to maintain state across batches, such as loading a model in ``__init__`` and reusing it in ``__call__``. Use ``num_workers`` to control how many Ray actor workers are launched. The recommended way to specify ``num_workers`` is at call time via ``.options()``, which keeps the UDF definition reusable across different resource configurations: .. code-block:: python from xpark.dataset import from_range from xpark.dataset.expressions import col, udf from xpark.dataset.datatype import DataType import pyarrow as pa import pyarrow.compute as pc @udf(return_dtype=DataType.int32()) class Multiplier: def __init__(self, factor: int): self.factor = factor def __call__(self, array: pa.Array) -> pa.Array: return pc.multiply(array, self.factor) ds = from_range(5) ds = ds.with_column("tripled", Multiplier(3).options(num_workers={"CPU": 2})(col("id"))) ds.show() ``num_workers`` can also be set in the ``@udf`` decorator directly, though this is less common since it couples the resource configuration to the UDF definition: .. code-block:: python @udf(return_dtype=DataType.int32(), num_workers={"CPU": 2}) class Multiplier: ... UDF results can also be composed with other expressions: .. code-block:: python # UDF output participates in further arithmetic ds = ds.with_column("tripled_plus_ten", Multiplier(3)(col("id")) + 10) Collecting Results ------------------ .. code-block:: python # Write to Parquet files ds.write_parquet("/path/to/parquets/") # Collect all rows as a list of dicts results = ds.take_all() # Collect the first N rows first_five = ds.take(5) # Print rows to stdout ds.show() Next Steps ---------- - :ref:`reference_index` — Full Dataset API reference - :ref:`processors` — All built-in Data and AI Processors