xpark.dataset.TextEmbedding#

class xpark.dataset.TextEmbedding(_local_model: str | None = None, /, *, base_url: str | None = None, model: str | None = None, api_key: str = 'NOT_SET', batch_rows: int = 10, max_qps: int | None = None, max_retries: int = 0, **kwargs: dict[str, Any])[source]#

Text Embedding processor for CPU, GPU and remote Http requests.

Parameters:
  • _local_model – The embedding model name for CPU or GPU, available models: [‘Qwen/Qwen3-Embedding-0.6B’, ‘Qwen/Qwen3-Embedding-4B’, ‘Qwen/Qwen3-Embedding-8B’]

  • base_url – The base URL of the LLM server.

  • model – The request model name.

  • api_key – The request API key.

  • batch_rows – The number of rows to request once.

  • max_qps – The maximum number of requests per second.

  • max_retries – The maximum number of retries per request in the event of failures. We retry with exponential backoff upto this specific maximum retries.

  • **kwargs – Keyword arguments to pass to the openai.AsyncClient.embeddings.create API.

Examples

from xpark.dataset.expressions import col
from xpark.dataset import TextEmbedding, from_items

ds = from_items([
    "what is the advantage of using the GPU rendering options in Android?",
    "Blank video when converting uncompressed AVI files with ffmpeg",
])
ds = ds.with_column(
    "embedding",
    TextEmbedding(
        # Local embedding model.
        "Qwen/Qwen3-Embedding-0.6B",
        # For remote embedding requests.
        base_url="http://127.0.0.1:9997/v1",
        model="qwen3",
    )
    # One IO worker for HTTP request, 10 CPU workers for local embedding.
    .options(num_workers={"CPU": 10, "IO": 1})
    .with_column(col("item")),
)
print(ds.take(2))

Methods

__call__(prompt)

Call self as a function.

options(**kwargs)

with_column(prompt)

__call__(prompt: pa.ChunkedArray) pa.Array#

Call self as a function.

options(**kwargs: Unpack[ExprUDFOptions]) Self#
with_column(prompt: pa.ChunkedArray) pa.Array#