Source code for xpark.dataset.namespace_expressions.string_namespace
"""String namespace for expression operations on string-typed columns."""
from __future__ import annotations
from dataclasses import dataclass
from typing import TYPE_CHECKING
from ray.data.namespace_expressions.string_namespace import _StringNamespace as RayStringNamespace
from xpark.dataset.common.special_characters import SPECIAL_CHARACTERS
from xpark.dataset.datatype import DataType
from xpark.dataset.expressions import udf
from xpark.dataset.filters.dedup import utils as dedup_utils
from xpark.dataset.import_utils import lazy_import
if TYPE_CHECKING:
import pyarrow as pa
from ray.data.expressions import UDFExpr
else:
pa = lazy_import("pyarrow", rename="pa")
[docs]
@dataclass
class _StringNamespace(RayStringNamespace):
"""Namespace for string operations on expression columns.
This namespace provides methods for operating on string-typed columns using
PyArrow compute functions.
Example:
>>> from xpark.dataset.expressions import col
>>> # Convert to uppercase
>>> expr = col("name").str.upper()
>>> # Get string length
>>> expr = col("name").str.len()
>>> # Check if string starts with a prefix
>>> expr = col("name").str.starts_with("A")
"""
[docs]
def word_count(self, tokenizer: str = "cjk") -> UDFExpr:
"""Count words in texts using the specified tokenizer.
Args:
tokenizer: The tokenizer type to use for word segmentation.
Defaults to "cjk" for Chinese-Japanese-Korean text processing.
Examples:
.. code-block:: python
from xpark.dataset import from_items
from xpark.dataset.expressions import col
ds = from_items(["Hello world", "This is a test"])
ds = ds.with_column(
"word_count",
col("text").str.word_count(tokenizer="cjk"),
)
print(ds.take_all())
"""
@udf(return_dtype=DataType.int32())
def word_count_wrapped(
texts: pa.ChunkedArray,
tokenizer: str,
) -> pa.Array:
if tokenizer == "cjk":
tokens = dedup_utils.utf8_split_mixed(texts)
else:
raise ValueError(f"Invalid tokenizer: {tokenizer}")
return pa.array([len(words) for words in tokens], pa.int32())
return word_count_wrapped(texts=self._expr, tokenizer=tokenizer)
[docs]
def max_line_length(self) -> UDFExpr:
"""Compute the maximum line length for each text.
Splits each text by newlines and returns the length of the longest line.
Examples:
.. code-block:: python
from xpark.dataset import from_items
from xpark.dataset.expressions import col
ds = from_items(["Hello\\nworld", "This is a test"])
ds = ds.with_column(
"max_line_length",
col("text").str.max_line_length(),
)
print(ds.take_all())
"""
@udf(return_dtype=DataType.int32())
def max_line_length_wrapper(texts: pa.ChunkedArray) -> pa.Array:
return pa.array(
[max((len(line) for line in text.splitlines()), default=0) for text in texts.to_pylist()],
pa.int32(),
)
return max_line_length_wrapper(texts=self._expr)
[docs]
def avg_line_length(self) -> UDFExpr:
"""Compute the average line length for each text.
Splits each text by newlines and returns the mean character count
across all lines.
Examples:
.. code-block:: python
from xpark.dataset import from_items
from xpark.dataset.expressions import col
ds = from_items(["Hello\\nworld", "This is a test"])
ds = ds.with_column(
"avg_line_length",
col("text").str.avg_line_length(),
)
print(ds.take_all())
"""
@udf(return_dtype=DataType.float64())
def avg_line_length_wrapper(texts: pa.ChunkedArray) -> pa.Array:
def _avg_line_length(text: str) -> float:
lines = text.splitlines()
if not lines:
return 0.0
return sum(len(line) for line in lines) / len(lines)
return pa.array([_avg_line_length(text) for text in texts.to_pylist()], pa.float64())
return avg_line_length_wrapper(texts=self._expr)
[docs]
def special_word_count(self) -> UDFExpr:
"""Count the number of special characters in each text.
Iterates over each character in the text and counts those that appear
in the predefined ``SPECIAL_CHARACTERS`` set.
Examples:
.. code-block:: python
from xpark.dataset import from_items
from xpark.dataset.expressions import col
ds = from_items(["Hello, world!", "No specials here"])
ds = ds.with_column(
"special_word_count",
col("text").str.special_word_count(),
)
print(ds.take_all())
"""
@udf(return_dtype=DataType.int32())
def special_word_count_wrapper(texts: pa.ChunkedArray) -> pa.Array:
results = []
for text in texts.to_pylist():
result = len([c for c in text if c in SPECIAL_CHARACTERS])
results.append(result)
return pa.array([result for result in results], pa.int32())
return special_word_count_wrapper(texts=self._expr)
[docs]
def alpha_number_count(self) -> UDFExpr:
"""Count the number of alphanumeric characters in each text.
Uses ``str.isalnum()`` to identify characters that are either letters
or digits (including Unicode alphanumeric characters).
Examples:
.. code-block:: python
from xpark.dataset import from_items
from xpark.dataset.expressions import col
ds = from_items(["Hello, world! 123", "abc"])
ds = ds.with_column(
"alpha_number_count",
col("text").str.alpha_number_count(),
)
print(ds.take_all())
"""
@udf(return_dtype=DataType.int32())
def alpha_number_count_wrapper(texts: pa.ChunkedArray) -> pa.Array:
results = []
for text in texts.to_pylist():
result = sum(1 for c in text if c.isalnum())
results.append(result)
return pa.array(results, pa.int32())
return alpha_number_count_wrapper(texts=self._expr)
[docs]
def alpha_count(self) -> UDFExpr:
"""Count the number of alphabetic characters in each text.
Uses ``str.isalpha()`` to identify characters that are letters
(including Unicode alphabetic characters), excluding digits and
other symbols.
Examples:
.. code-block:: python
from xpark.dataset import from_items
from xpark.dataset.expressions import col
ds = from_items(["Hello, world! 123", "abc"])
ds = ds.with_column(
"alpha_count",
col("text").str.alpha_count(),
)
print(ds.take_all())
"""
@udf(return_dtype=DataType.int32())
def alpha_count_wrapper(texts: pa.ChunkedArray) -> pa.Array:
results = []
for text in texts.to_pylist():
result = sum(1 for c in text if c.isalpha())
results.append(result)
return pa.array(results, pa.int32())
return alpha_count_wrapper(texts=self._expr)