1

このから始めて、重複したドキュメントを見つけるために Pyspark で Locality-Sensitive Hashing (LSH) を使用しました。
私の DB に関するメモ: 4M のテキスト ファイルがあります。各ファイルの平均文字数は 20,000 文字です。現在、各ドキュメントの最初の 500 文字のみを検討しています。
文字数を 500 から 1000 に増やすと、メモリ エラーが発生します。
パイプラインのパラメーターに取り組んでみました。Ngramでnを増やし、MinHashLSHでNumHashTablesを減らすメモリエラーを回避できることを私は知っています。ただし、これにより偽陰性が増えすぎます。
パイプラインにパフォーマンスを改善できる他のステップはありますか?
私の目的は、メモリ エラーや非常に長い計算時間 (理想的には、計算時間 < 6 時間) を発生させずに、文字数を 500 から 2000 に増やすことです。
これは偽のデータを含む私のコードです:

# Prameters
# NGram
n_gram = 2 #really, i use n_gram=8 because i have 500char per each document 
# MinHashLSH
hash_tables = 10 #really, i use hash_tables=3 to avoid memory error and too long computational time 
# jaccard treshold
treshold_test = 0.5
#Fake dataframe
df = spark.createDataFrame([
  (0, "Hi I heard about Spark Hi I heard about Spark Hi I heard about Spark Hi I heard about Spark"),
  (1, "I wish Java could use case classes I wish Java could use case classes!!"),
  (2, "Logistic, regression, models, are, neat, etc, etc, etc, etc, etc, etc, etc, etc"),
  (3, "Hi I heard about Spork Hi I heard about Spark Hi I heard about Spark Hi I heard about Spark"),
  (4, "Hi I heard about Java Hi I heard about Java Hi I heard about Java Hi     I heard about Java")
], ["id", "text"])
# cleaning puntuactions and double spaces
df = df.withColumn("text", regexp_replace('text', r'\p{Punct}', ''))
df = df.withColumn("text", regexp_replace('text', r' (?= |$)', ''))
#trim whitespaces and filtering out text too short
df = df.withColumn("text", trim(col("text")))\
.filter((col('text') != "") & (length(col('text')) > n_gram*3))
df.show(5,False)
# LSH pipeline
from pyspark.ml import Pipeline
from pyspark.ml.feature import RegexTokenizer, NGram, HashingTF, MinHashLSH

db = df
query = df

model = Pipeline(stages=[
        RegexTokenizer(
        pattern="", inputCol="text", outputCol="tokens", minTokenLength=1
    ),
    NGram(n=n_gram, inputCol="tokens", outputCol="ngrams"),
    HashingTF(inputCol="ngrams", outputCol="vectors"), 
    MinHashLSH(inputCol="vectors", outputCol="lsh", numHashTables=hash_tables)]).fit(db)

db_hashed = model.transform(db)
query_hashed = model.transform(query)
output = model.stages[-1].approxSimilarityJoin(db_hashed, query_hashed, treshold_test)

# similar pairs of documents:
output.filter(col('datasetA.id') != col('datasetB.id'))\
.select(col("datasetA.id").alias("idA"),
                col("datasetB.id").alias("idB"),
                col("datasetA.text").alias("textA"),
                col("datasetB.text").alias("textB"),
                col("distCol")).sort(col("distCol"))\
.withColumn('comb', sort_array(array(*('idA', 'idB')))).dropDuplicates(['comb']).show()
4

0 に答える 0