xpark.dataset.TextExtract#
- class xpark.dataset.TextExtract(labels_or_schema: list[str | dict[str, str]] | dict, /, *, ensure_ascii: bool = False, base_url: str, model: str, api_key: str = 'NOT_SET', max_qps: int | None = None, max_retries: int = 0, fallback_response: str | None = '{}', **kwargs: dict[str, Any])[source]#
- TextExtract processor extracts structured information from text based on user-defined
labels using an LLM model, and returns the results as a JSON string.
- Parameters:
labels –
The labels to extract from the text. Accepts three formats:
list[str]: plain label names, e.g.["person", "location"]list[dict]: dicts with"label"(required) and"description"(optional), e.g.[{"label": "person", "description": "the person's full name"}]Descriptions are injected into the prompt to guide the model when label names alone are ambiguous.
dictwith a JSON Schema object. The schema is passed to the model so it outputs JSON conforming to that schema. e.g.{"type": "object", "properties": {...}}
ensure_ascii – If True, the output JSON will escape all non-ASCII characters. If False (default), non-ASCII characters will be preserved in the output. This is useful when working with multilingual text to maintain readability.
base_url – The base URL of the LLM server.
model – The request model name.
api_key – The request API key.
max_qps – The maximum number of requests per second.
max_retries – The maximum number of retries per request in the event of failures. We retry with exponential backoff upto this specific maximum retries.
fallback_response – The response value to return when the LLM request fails. If set to None, the exception will be raised instead.
**kwargs – Keyword arguments to pass to the openai.AsyncClient.chat.completions.create API.
Examples
import os from xpark.dataset.expressions import col from xpark.dataset import TextExtract, from_items ds = from_items(["John Doe lives in New York and works for Acme Corp"]) # Plain labels ds = ds.with_column( "extracted_plain", TextExtract( ["person", "location", "organization"], model="deepseek-v3-0324", base_url=os.getenv("LLM_ENDPOINT"), api_key=os.getenv("LLM_API_KEY"), ) .options(num_workers={"IO": 1}, batch_size=1) .with_column(col("item")), ) # Labels with descriptions ds = ds.with_column( "extracted_with_desc", TextExtract( [ {"label": "person", "description": "the person's full name"}, {"label": "location", "description": "city or country"}, {"label": "organization"}, ], model="deepseek-v3-0324", base_url=os.getenv("LLM_ENDPOINT"), api_key=os.getenv("LLM_API_KEY"), ) .options(num_workers={"IO": 1}, batch_size=1) .with_column(col("item")), ) # JSON Schema ds = ds.with_column( "extracted_schema", TextExtract( { "type": "object", "properties": { "person": {"description": "the person's full name", "type": "string"}, "location": {"description": "city or country", "type": "string"}, }, }, model="deepseek-v3-0324", base_url=os.getenv("LLM_ENDPOINT"), api_key=os.getenv("LLM_API_KEY"), ) .options(num_workers={"IO": 1}, batch_size=1) .with_column(col("item")), ) print(ds.take_all())
Methods
__call__(texts)Call self as a function.
options(**kwargs)with_column(texts)- __call__(texts: pa.ChunkedArray) pa.Array#
Call self as a function.
- options(**kwargs: Unpack[ExprUDFOptions]) Self#
- with_column(texts: pa.ChunkedArray) pa.Array#