Source code for xpark.dataset.namespace_expressions.string_namespace

"""String namespace for expression operations on string-typed columns."""

from __future__ import annotations

from dataclasses import dataclass
from typing import TYPE_CHECKING

from ray.data.namespace_expressions.string_namespace import _StringNamespace as RayStringNamespace

from xpark.dataset.common.special_characters import SPECIAL_CHARACTERS
from xpark.dataset.datatype import DataType
from xpark.dataset.expressions import udf
from xpark.dataset.filters.dedup import utils as dedup_utils
from xpark.dataset.import_utils import lazy_import

if TYPE_CHECKING:
    import pyarrow as pa
    from ray.data.expressions import UDFExpr
else:
    pa = lazy_import("pyarrow", rename="pa")


[docs] @dataclass class _StringNamespace(RayStringNamespace): """Namespace for string operations on expression columns. This namespace provides methods for operating on string-typed columns using PyArrow compute functions. Example: >>> from xpark.dataset.expressions import col >>> # Convert to uppercase >>> expr = col("name").str.upper() >>> # Get string length >>> expr = col("name").str.len() >>> # Check if string starts with a prefix >>> expr = col("name").str.starts_with("A") """
[docs] def word_count(self, tokenizer: str = "cjk") -> UDFExpr: """Count words in texts using the specified tokenizer. Args: tokenizer: The tokenizer type to use for word segmentation. Defaults to "cjk" for Chinese-Japanese-Korean text processing. Examples: .. code-block:: python from xpark.dataset import from_items from xpark.dataset.expressions import col ds = from_items(["Hello world", "This is a test"]) ds = ds.with_column( "word_count", col("text").str.word_count(tokenizer="cjk"), ) print(ds.take_all()) """ @udf(return_dtype=DataType.int32()) def word_count_wrapped( texts: pa.ChunkedArray, tokenizer: str, ) -> pa.Array: if tokenizer == "cjk": tokens = dedup_utils.utf8_split_mixed(texts) else: raise ValueError(f"Invalid tokenizer: {tokenizer}") return pa.array([len(words) for words in tokens], pa.int32()) return word_count_wrapped(texts=self._expr, tokenizer=tokenizer)
[docs] def max_line_length(self) -> UDFExpr: """Compute the maximum line length for each text. Splits each text by newlines and returns the length of the longest line. Examples: .. code-block:: python from xpark.dataset import from_items from xpark.dataset.expressions import col ds = from_items(["Hello\\nworld", "This is a test"]) ds = ds.with_column( "max_line_length", col("text").str.max_line_length(), ) print(ds.take_all()) """ @udf(return_dtype=DataType.int32()) def max_line_length_wrapper(texts: pa.ChunkedArray) -> pa.Array: return pa.array( [max((len(line) for line in text.splitlines()), default=0) for text in texts.to_pylist()], pa.int32(), ) return max_line_length_wrapper(texts=self._expr)
[docs] def avg_line_length(self) -> UDFExpr: """Compute the average line length for each text. Splits each text by newlines and returns the mean character count across all lines. Examples: .. code-block:: python from xpark.dataset import from_items from xpark.dataset.expressions import col ds = from_items(["Hello\\nworld", "This is a test"]) ds = ds.with_column( "avg_line_length", col("text").str.avg_line_length(), ) print(ds.take_all()) """ @udf(return_dtype=DataType.float64()) def avg_line_length_wrapper(texts: pa.ChunkedArray) -> pa.Array: def _avg_line_length(text: str) -> float: lines = text.splitlines() if not lines: return 0.0 return sum(len(line) for line in lines) / len(lines) return pa.array([_avg_line_length(text) for text in texts.to_pylist()], pa.float64()) return avg_line_length_wrapper(texts=self._expr)
[docs] def special_word_count(self) -> UDFExpr: """Count the number of special characters in each text. Iterates over each character in the text and counts those that appear in the predefined ``SPECIAL_CHARACTERS`` set. Examples: .. code-block:: python from xpark.dataset import from_items from xpark.dataset.expressions import col ds = from_items(["Hello, world!", "No specials here"]) ds = ds.with_column( "special_word_count", col("text").str.special_word_count(), ) print(ds.take_all()) """ @udf(return_dtype=DataType.int32()) def special_word_count_wrapper(texts: pa.ChunkedArray) -> pa.Array: results = [] for text in texts.to_pylist(): result = len([c for c in text if c in SPECIAL_CHARACTERS]) results.append(result) return pa.array([result for result in results], pa.int32()) return special_word_count_wrapper(texts=self._expr)
[docs] def alpha_number_count(self) -> UDFExpr: """Count the number of alphanumeric characters in each text. Uses ``str.isalnum()`` to identify characters that are either letters or digits (including Unicode alphanumeric characters). Examples: .. code-block:: python from xpark.dataset import from_items from xpark.dataset.expressions import col ds = from_items(["Hello, world! 123", "abc"]) ds = ds.with_column( "alpha_number_count", col("text").str.alpha_number_count(), ) print(ds.take_all()) """ @udf(return_dtype=DataType.int32()) def alpha_number_count_wrapper(texts: pa.ChunkedArray) -> pa.Array: results = [] for text in texts.to_pylist(): result = sum(1 for c in text if c.isalnum()) results.append(result) return pa.array(results, pa.int32()) return alpha_number_count_wrapper(texts=self._expr)
[docs] def alpha_count(self) -> UDFExpr: """Count the number of alphabetic characters in each text. Uses ``str.isalpha()`` to identify characters that are letters (including Unicode alphabetic characters), excluding digits and other symbols. Examples: .. code-block:: python from xpark.dataset import from_items from xpark.dataset.expressions import col ds = from_items(["Hello, world! 123", "abc"]) ds = ds.with_column( "alpha_count", col("text").str.alpha_count(), ) print(ds.take_all()) """ @udf(return_dtype=DataType.int32()) def alpha_count_wrapper(texts: pa.ChunkedArray) -> pa.Array: results = [] for text in texts.to_pylist(): result = sum(1 for c in text if c.isalpha()) results.append(result) return pa.array(results, pa.int32()) return alpha_count_wrapper(texts=self._expr)