ディープハッシュ(SSDEEP)http://code.google.com/p/pyssdeepを使用して130万を超えるファイルを処理しようとしています。
それがすることは、です。ハッシュを生成し(3〜6分以内に1.3 milを生成)、次に互いに比較して類似性の結果を取得します。比較は非常に高速ですが、単一のプロセスを実行するだけでは完了しません。そこで、Pythonマルチプロセッシングモジュールを組み込んで処理を実行します。
結果は、30分以内に作成された130万のテキストファイルです。18コアを使用(Quad Xeonプロセッサ、合計24 CPU)
各プロセスの仕組みは次のとおりです。
- SSDEEP合計を生成します。
- それらの合計のリストを5000グループのチャンクに分割します。
- 18プロセス内の各チャンク1と5000を比較します。各反復で18の合計を比較します。
- 類似スコアに基づいて結果をグループ化します(デフォルトは75)
- 次の反復のためにすでにチェックされているファイルを削除しました。
- 次のグループのスコアが75%未満の次のファイルから開始します
- すべてのグループが完了するまで繰り返します。
- 含まれていない(他のファイルとは異なる)ファイルがある場合、それらは残りのリストに追加されます。
すべての処理が完了すると、残りのファイルが結合され、結果がなくなるまで再帰的に相互に比較されます。
問題は、ファイルのリストがより小さな(5000)ファイルにチャンク化される場合です。最初の5000チャンクに含まれているが、別のグループには含まれていないファイルがあり、グループが不完全になっています。
チャンクなしで実行すると、ループが完了するまでに非常に長い時間がかかります。18時間以上、行われていません、。どれくらいかわからない。
アドバイスしてください。
使用されるモジュール:multiprocessing.Pool、ssdeep python
def ssdpComparer(lst, threshold):
s = ssdeep()
check_file = []
result_data = []
lst1 = lst
set_lst = set(lst)
print '>>>START'
for tup1 in lst1:
if tup1 in check_file:
continue
for tup2 in set_lst:
score = s.compare(tup1[0], tup2[0])
if score >= threshold:
result_data.append((score, tup1[2], tup2[2])) #Score, GroupID, FileID
check_file.append(tup2)
set_lst = set_lst.difference(check_file)
print """####### DONE #######"""
remain_lst = set(lst).difference(check_file)
return (result_data, remain_lst)
def parallelProcessing(tochunk_list, total_processes, threshold, source_path, mode, REMAINING_LEN = 0):
result = []
remainining = []
pooled_lst = []
pair = []
chunks_toprocess = []
print 'Total Files:', len(tochunk_list)
if mode == MODE_INTENSIVE:
chunks_toprocess = groupWithBlockID(tochunk_list) #blockID chunks
elif mode == MODE_THOROUGH:
chunks_toprocess = groupSafeLimit(tochunk_list, TOTAL_PROCESSES) #Chunks by processes
elif mode == MODE_FAST:
chunks_toprocess = groupSafeLimit(tochunk_list) #5000 chunks
print 'No. of files group to process: %d' % (len(chunks_toprocess))
pool_obj = Pool(processes = total_processes, initializer = poolInitializer, initargs = [None, threshold, source_path, mode])
pooled_lst = pool_obj.map(matchingProcess, chunks_toprocess) #chunks_toprocess
tmp_rs, tmp_rm = getResultAndRemainingLists(pooled_lst)
result += tmp_rs
remainining += tmp_rm
print 'RESULT LEN: %s, REMAINING LEN: %s, P.R.L: %s' % (len(result), len(remainining), REMAINING_LEN)
tmp_r_len = len(remainining)
if tmp_r_len != REMAINING_LEN and len(result) > 0 :
result += parallelProcessing(remainining, total_processes, threshold, source_path, mode, tmp_r_len)
else:
result += [('','', rf[2]) for rf in remainining]
return result
def getResultAndRemainingLists(pooled_lst):
g_result = []
g_remaining = []
for tup_result in pooled_lst:
tmp_result, tmp_remaining = tup_result
g_result += tmp_result
if tmp_remaining:
g_remaining += tmp_remaining
return (g_result, g_remaining)