xpark.dataset.TextEmbedding#
- class xpark.dataset.TextEmbedding(_local_model: str | None = None, /, *, base_url: str | None = None, model: str | None = None, api_key: str = 'NOT_SET', batch_rows: int = 10, max_qps: int | None = None, max_retries: int = 0, **kwargs: dict[str, Any])[source]#
Text Embedding processor for CPU, GPU and remote Http requests.
- Parameters:
_local_model – The embedding model name for CPU or GPU, available models: [‘Qwen/Qwen3-Embedding-0.6B’, ‘Qwen/Qwen3-Embedding-4B’, ‘Qwen/Qwen3-Embedding-8B’]
base_url – The base URL of the LLM server.
model – The request model name.
api_key – The request API key.
batch_rows – The number of rows to request once.
max_qps – The maximum number of requests per second.
max_retries – The maximum number of retries per request in the event of failures. We retry with exponential backoff upto this specific maximum retries.
**kwargs – Keyword arguments to pass to the openai.AsyncClient.embeddings.create API.
Examples
from xpark.dataset.expressions import col from xpark.dataset import TextEmbedding, from_items ds = from_items([ "what is the advantage of using the GPU rendering options in Android?", "Blank video when converting uncompressed AVI files with ffmpeg", ]) ds = ds.with_column( "embedding", TextEmbedding( # Local embedding model. "Qwen/Qwen3-Embedding-0.6B", # For remote embedding requests. base_url="http://127.0.0.1:9997/v1", model="qwen3", ) # One IO worker for HTTP request, 10 CPU workers for local embedding. .options(num_workers={"CPU": 10, "IO": 1}) .with_column(col("item")), ) print(ds.take(2))
Methods
__call__(prompt)Call self as a function.
options(**kwargs)with_column(prompt)- __call__(prompt: pa.ChunkedArray) pa.Array#
Call self as a function.
- options(**kwargs: Unpack[ExprUDFOptions]) Self#
- with_column(prompt: pa.ChunkedArray) pa.Array#