xpark.dataset.TextMask#

class xpark.dataset.TextMask(labels: list[str | dict[str, str]], /, *, base_url: str, model: str, api_key: str = 'NOT_SET', max_qps: int | None = None, max_retries: int = 0, fallback_response: str | None = None, **kwargs: dict[str, Any])[source]#

TextMask processor replaces sensitive information in the original text with [MASKED] according to the labels.

Parameters:
  • labels

    The labels to mask. Accepts two formats:

    • list[str]: plain label names, e.g. ["email", "phone_num"]

    • list[dict]: dicts with "label" (required) and "description" (optional), e.g. [{"label": "email", "description": "email address"}]

    Descriptions are injected into the prompt to guide the model when label names alone are ambiguous.

  • base_url – The base URL of the LLM server.

  • model – The request model name.

  • api_key – The request API key.

  • max_qps – The maximum number of requests per second.

  • max_retries – The maximum number of retries per request in the event of failures. We retry with exponential backoff upto this specific maximum retries.

  • fallback_response – The response value to return when the LLM request fails. If set to None, the exception will be raised instead.

  • **kwargs – Keyword arguments to pass to the openai.AsyncClient.chat.completions.create API.

Examples

from xpark.dataset.expressions import col
from xpark.dataset import TextMask, from_items

ds = from_items(["My email is rarity@example.com and my phone is 123-456-7890"])

# Plain labels
ds = ds.with_column(
    "masked_text",
    TextMask(
        ["email", "phone_num"],
        model="deepseek-v3-0324",
        base_url=os.getenv("LLM_ENDPOINT"),
        api_key=os.getenv("LLM_API_KEY"),
    )
    .options(num_workers={"IO": 1}, batch_size=1)
    .with_column(col("item")),
)

# Labels with descriptions
ds = ds.with_column(
    "masked_text",
    TextMask(
        [
            {"label": "email", "description": "email address"},
            {"label": "phone_num", "description": "phone number"},
        ],
        model="deepseek-v3-0324",
        base_url=os.getenv("LLM_ENDPOINT"),
        api_key=os.getenv("LLM_API_KEY"),
    )
    .options(num_workers={"IO": 1}, batch_size=1)
    .with_column(col("item")),
)

print(ds.take_all())

Methods

__call__(texts)

Call self as a function.

options(**kwargs)

with_column(texts)

__call__(texts: pa.ChunkedArray) pa.Array#

Call self as a function.

options(**kwargs: Unpack[ExprUDFOptions]) Self#
with_column(texts: pa.ChunkedArray) pa.Array#