xpark.dataset.TextExactSubstringDedup#

class xpark.dataset.TextExactSubstringDedup(tokenize_regex_pattern: str | None = None, cjk: bool = True, min_match_tokens: int = 200, union_parallel_num: int | None = None, dedup_parallel_num: int | None = None, union_batch_size: int = 256)[source]#

Implements a scalable, distributed exact substring deduplication algorithm using a divide-and-conquer strategy.

Designed for large-scale text datasets, this class operates on the assumption that duplicated segments constitute a small fraction of the total volume. It employs a two-stage process: coarse-grained clustering followed by fine-grained deduplication.

Algorithm Workflow:

  1. Coarse-grained Filtering: Generates document fingerprints using x-gram shingling(x is min_match_tokens // 2), Winnowing (Local Min-Hash), and Karp-Rabin hashing. Documents sharing identical fingerprints are treated as connected nodes in a graph.

  2. Connectivity Analysis: Uses a Distributed Union-Find (BTS) to identify Connected Components (subgraphs) of related documents. - Isolated Nodes: Documents with no overlaps are passed through immediately to the main output stream. - Connected Components: Documents sharing substrings are routed to distributed Actors for buffering.

  3. Fine-grained Deduplication: Within each Actor, a Suffix Array (SA) is constructed for the buffered document cluster. Exact substring duplication is performed by analyzing the Longest Common Prefix (LCP) array to identify and remove redundant segments.

Parameters:
  • tokenize_regex_pattern – Split by space if None else apply the regex to tokenize

  • min_match_tokens – Minimum number of tokens to match

  • cjk – Whether to use CJK split, only available if tokenize_regex_pattern is None

  • union_find_parallel_num – Number of union find to use for parallelization

  • union_batch_size – Batch size for union finding

  • dedup_parallel_num – Number of dedup actors to use for parallelization

Example

>>> from xpark.dataset import read_parquet, TextExactSubstringDedup
>>> from xpark.dataset.expressions import col
>>> ds = read_parquet("data/", dynamic_uid="uid")
>>> dedup_op = TextExactSubstringDedup()
>>> ds_isolated = ds.filter(dedup_op.with_column(uid=col("uid"), text=col("text")))
>>> ds_isolated.write_parquet("data_deduped/")
>>> ds_connected_deduped = dedup_op.collect_sa_deduped_dataset()
>>> ds_connected_deduped.write_parquet("data_deduped/")

Methods

collect_sa_deduped_dataset()

Collects the 'Connected Components' dataset from distributed actors following Suffix Array (SA) deduplication.

with_column(uid, text)

Apply the TextExactSubstringDedup filter with specified columns.

collect_sa_deduped_dataset() Dataset[source]#

Collects the ‘Connected Components’ dataset from distributed actors following Suffix Array (SA) deduplication. This method retrieves documents that were flagged as potential duplicates and offloaded to actors for exact deduplication.

with_column(uid: ColumnExpr, text: ColumnExpr) TextExactSubstringDedupOp[source]#

Apply the TextExactSubstringDedup filter with specified columns.

Parameters:
  • uid – Column expression for unique identifiers. Should be integer type. Use dynamic_uid when reading data to auto-generate unique IDs.

  • text – Column expression for text content to deduplicate.

Returns:

A deduplication operation that can be used with filter().

Return type:

DedupOp

Example

>>> from xpark.dataset import read_parquet, TextExactSubstringDedup
>>> from xpark.dataset.expressions import col
>>> ds = read_parquet("data/", dynamic_uid="uid")
>>> dedup_op = TextExactSubstringDedup()
>>> ds_isolated = ds.filter(dedup_op.with_column(uid=col("uid"), text=col("text")))
>>> ds_isolated.write_parquet("data_deduped/")
>>> ds_connected_deduped = dedup_op.collect_sa_deduped_dataset()
>>> ds_connected_deduped.write_parquet("data_deduped/")