xpark.dataset.TextSimilarity#

class xpark.dataset.TextSimilarity(target: str, /, *, use_embedding: bool = False, embedding_model: str | None = None, embedding_batch_rows: int = 10, base_url: str | None = None, model: str | None = None, api_key: str = 'NOT_SET', max_qps: int | None = None, max_retries: int = 0, fallback_response: float | None = 0.0, **kwargs: dict[str, Any])[source]#

TextSimilarity processor calculates similarity between texts using LLM model.

Parameters:
  • target – Target text to be compared with. All input texts will be compared against this reference text.

  • use_embedding – Whether to use embedding model to calculate similarity. default is False

  • embedding_model – The embedding model name for CPU or GPU, only use_embedding is true will use this model available models: {AVAILABLE_MODELS}

  • embedding_batch_rows – The number of rows to request once for embedding model.

  • base_url – The base URL of the LLM server.

  • model – The request llm model name.

  • api_key – The request API key.

  • max_qps – The maximum number of requests per second.

  • max_retries – The maximum number of retries per request in the event of failures. We retry with exponential backoff upto this specific maximum retries.

  • fallback_response – The response value to return when the LLM request fails or output from LLM is invalid. If set to None, the exception will be raised instead. default is 0.0

  • **kwargs – Keyword arguments to pass to the openai.AsyncClient.chat.completions.create API.

Examples

from xpark.dataset.expressions import col
from xpark.dataset import TextSimilarity, from_items

ds = from_items([""])
ds = ds.with_column(
    "similarity",
    TextSimilarity(
        "This is a test text.",
        model="deepseek-v3-0324",
        base_url=os.getenv("LLM_ENDPOINT"),
        api_key=os.getenv("LLM_API_KEY"),
    )
    .options(num_workers={{"IO": 1}}, batch_size=1)
    .with_column(col("item")),
)

print(ds.take_all())

Methods

__call__(texts)

Call self as a function.

options(**kwargs)

with_column(texts)

__call__(texts: pa.ChunkedArray) pa.Array#

Call self as a function.

options(**kwargs: Unpack[ExprUDFOptions]) Self#
with_column(texts: pa.ChunkedArray) pa.Array#