xpark.dataset.TextExactSubstringDedup#
- class xpark.dataset.TextExactSubstringDedup(tokenize_regex_pattern: str | None = None, cjk: bool = True, min_match_tokens: int = 200, union_parallel_num: int | None = None, dedup_parallel_num: int | None = None, union_batch_size: int = 256)[source]#
Implements a scalable, distributed exact substring deduplication algorithm using a divide-and-conquer strategy.
Designed for large-scale text datasets, this class operates on the assumption that duplicated segments constitute a small fraction of the total volume. It employs a two-stage process: coarse-grained clustering followed by fine-grained deduplication.
Algorithm Workflow:
Coarse-grained Filtering: Generates document fingerprints using x-gram shingling(x is min_match_tokens // 2), Winnowing (Local Min-Hash), and Karp-Rabin hashing. Documents sharing identical fingerprints are treated as connected nodes in a graph.
Connectivity Analysis: Uses a Distributed Union-Find (BTS) to identify Connected Components (subgraphs) of related documents. - Isolated Nodes: Documents with no overlaps are passed through immediately to the main output stream. - Connected Components: Documents sharing substrings are routed to distributed Actors for buffering.
Fine-grained Deduplication: Within each Actor, a Suffix Array (SA) is constructed for the buffered document cluster. Exact substring duplication is performed by analyzing the Longest Common Prefix (LCP) array to identify and remove redundant segments.
- Parameters:
tokenize_regex_pattern – Split by space if None else apply the regex to tokenize
min_match_tokens – Minimum number of tokens to match
cjk – Whether to use CJK split, only available if tokenize_regex_pattern is None
union_find_parallel_num – Number of union find to use for parallelization
union_batch_size – Batch size for union finding
dedup_parallel_num – Number of dedup actors to use for parallelization
Example
>>> from xpark.dataset import read_parquet, TextExactSubstringDedup >>> from xpark.dataset.expressions import col >>> ds = read_parquet("data/", dynamic_uid="uid") >>> dedup_op = TextExactSubstringDedup() >>> ds_isolated = ds.filter(dedup_op.with_column(uid=col("uid"), text=col("text"))) >>> ds_isolated.write_parquet("data_deduped/") >>> ds_connected_deduped = dedup_op.collect_sa_deduped_dataset() >>> ds_connected_deduped.write_parquet("data_deduped/")
Methods
Collects the 'Connected Components' dataset from distributed actors following Suffix Array (SA) deduplication.
with_column(uid, text)Apply the TextExactSubstringDedup filter with specified columns.
- collect_sa_deduped_dataset() Dataset[source]#
Collects the ‘Connected Components’ dataset from distributed actors following Suffix Array (SA) deduplication. This method retrieves documents that were flagged as potential duplicates and offloaded to actors for exact deduplication.
- with_column(uid: ColumnExpr, text: ColumnExpr) TextExactSubstringDedupOp[source]#
Apply the TextExactSubstringDedup filter with specified columns.
- Parameters:
uid – Column expression for unique identifiers. Should be integer type. Use dynamic_uid when reading data to auto-generate unique IDs.
text – Column expression for text content to deduplicate.
- Returns:
A deduplication operation that can be used with filter().
- Return type:
DedupOp
Example
>>> from xpark.dataset import read_parquet, TextExactSubstringDedup >>> from xpark.dataset.expressions import col >>> ds = read_parquet("data/", dynamic_uid="uid") >>> dedup_op = TextExactSubstringDedup() >>> ds_isolated = ds.filter(dedup_op.with_column(uid=col("uid"), text=col("text"))) >>> ds_isolated.write_parquet("data_deduped/") >>> ds_connected_deduped = dedup_op.collect_sa_deduped_dataset() >>> ds_connected_deduped.write_parquet("data_deduped/")