Large-Scale Text Data Deduplication#

Background#

During the training of large language models (LLMs), deduplication of training corpora is essential. Xpark provides high-performance deduplication capabilities, including the industry-exclusive distributed implementations described below.

Dedup Operations

  • MinHash Dedup: A fuzzy deduplication method based on MinHash LSH, suitable for near-duplicate detection at scale.

  • Exact Substring Dedup: A distributed exact substring deduplication method that identifies and removes documents sharing long common substrings. For more details, refer to the paper Deduplicating Training Data Makes Language Models Better.

Why Use Xpark#

  • MinHash Dedup: 8.5× faster deduplication performance than Data-Juicer (v1.4.3)

  • Exact Substring Dedup: Xpark is the first in the industry to provide a distributed implementation of this operator, whereas other open-source projects only support single-machine execution. It can deduplicate 1.2 TB of data in 2,813 seconds using a 3-node cluster.

Text Fuzzy Dedup#

The following example demonstrates how to perform fuzzy deduplication using Xpark:

from xpark.dataset import TextFuzzyDedup, read_parquet
from xpark.dataset.expressions import col

ds = read_parquet("/path/to/parquets/", dynamic_uid="uid")
ds.filter(TextFuzzyDedup().with_column(uid_column=col("uid"), text_column=col("text"))).drop_columns(
    "uid"
).write_parquet("/path/to/output/deduped_parquets/")

For additional parameters supported by Text Fuzzy Dedup, see the documentation of xpark.dataset.filters.dedup.text_fuzzy_dedup.TextFuzzyDedup.

Benchmark Results#

Xpark significantly improves deduplication efficiency through several optimizations:

  • a C++-based BTS implementation

  • unique UID retrieval based on Parquet metadata (eliminating one extra data read pass)

  • Bloom filter-based filter to skip redundant documents.

The benchmark was conducted under the following environment:

Benchmark parameters:

  • shingling window size: 5

  • lowercase: True

  • jaccard threshold: 0.7

  • minhash num perm: 256

  • minhash seed: 42

Dedup Method

Remaining Rows

Time Taken (seconds)

Peak Memory Usage (GB)

Data-Juicer (space tokenizer)

9,070,657

2927.93s

116.7G

Xpark (space tokenizer)

9,151,558

341.24s

71.2G

Xpark (CJK tokenizer)

9,151,602

330.62s

78.0G

Text Exact Substring Dedup#

Due to the nature of the algorithm, it is a 3-stage process that produces multiple outputs. For details on the operator implementation and parameters, refer to xpark.dataset.filters.dedup.text_exact_substring_dedup.TextExactSubstringDedup

The following example demonstrates how to perform exact substring deduplication using Xpark:

from xpark.dataset import read_parquet
from xpark.dataset.filters.dedup.text_exact_substring_dedup import TextExactSubstringDedup
from xpark.dataset.expressions import col

ds = read_parquet("/path/to/parquets/", dynamic_uid="uid")
dedup_op = TextExactSubstringDedup(min_match_tokens=64).with_column(uid=col("uid"), text=col("text"))

# Get documents that have no duplicate substrings with any other documents
ds.filter(dedup_op).drop_columns("uid").write_parquet("/path/to/output/unique_data")

# For potentially duplicated documents, deduplicate using suffix array
ds_connected_deduped = dedup_op.collect_sa_deduped_dataset()
ds_connected_deduped.drop_columns("uid").write_parquet("/path/to/output/deduped_by_sa_data")

Benchmark Results#

The benchmark was conducted on Tencent EMR with 2~3 nodes of the following specifications:

Benchmark parameters:

  • min match tokens: 200

  • cjk: True

Dataset Size

Device Number

Total Line

Total Time Cost

Peak Memory

489.09GB
(400 files)

2

226,404,291

1102.78s

937GB

1210.24GB
(1000 files)

3

566,030,018

2813.54s

1780.12GB