8

次の形式の一意の文字列を含む 2 つのテキスト ファイル (*.txt) があります。

udtvbacfbbxfdffzpwsqzxyznecbqxgebuudzgzn:refmfxaawuuilznjrxuogrjqhlmhslkmprdxbascpoxda
ltswbjfsnejkaxyzwyjyfggjynndwkivegqdarjg:qyktyzugbgclpovyvmgtkihxqisuawesmcvsjzukcbrzi

最初のファイルには5000 万行 (4.3 GB) が含まれ、2 番目のファイルには100 万行 (112 MB) が含まれています。1 行には 40 文字、delimiter :、およびさらに 45 文字が含まれます。

タスク: 両方のファイルの一意の値を取得します。つまり、2 番目のファイルにあり、最初のファイルにはない行を含むcsv または txtファイルが必要です。

私はvaex ( Vaex ) を使用してこれを実行しようとしています:

import vaex

base_files = ['file1.txt']
for i, txt_file in enumerate(base_files, 1):
    for j, dv in enumerate(vaex.from_csv(txt_file, chunk_size=5_000_000, names=['data']), 1):
        dv.export_hdf5(f'hdf5_base/base_{i:02}_{j:02}.hdf5')

check_files = ['file2.txt']
for i, txt_file in enumerate(check_files, 1):
    for j, dv in enumerate(vaex.from_csv(txt_file, chunk_size=5_000_000, names=['data']), 1):
        dv.export_hdf5(f'hdf5_check/check_{i:02}_{j:02}.hdf5')

dv_base = vaex.open('hdf5_base/*.hdf5')
dv_check = vaex.open('hdf5_check/*.hdf5')
dv_result = dv_check.join(dv_base, on='data', how='inner', inplace=True)
dv_result.export(path='result.csv')

その結果、一意の行値を持つresult.csvファイルを取得します。しかし、検証プロセスには非常に長い時間がかかります。さらに、使用可能なすべての RAM とすべてのプロセッサ リソースを使用します。このプロセスをどのように加速できますか? 私は何を間違っていますか?もっとうまくできることは何ですか?このチェックに他のライブラリ (pandas、dask) を使用する価値はありますか?


UPD 10.11.2020 これまでのところ、次のオプションよりも高速なものは見つかりませんでした。

from io import StringIO


def read_lines(filename):
    handle = StringIO(filename)
    for line in handle:
        yield line.rstrip('\n')


def read_in_chunks(file_obj, chunk_size=10485760):
    while True:
        data = file_obj.read(chunk_size)
        if not data:
            break
        yield data


file_check = open('check.txt', 'r', errors='ignore').read()

check_set = {elem for elem in read_lines(file_check)}

with open(file='base.txt', mode='r', errors='ignore') as file_base:
    for idx, chunk in enumerate(read_in_chunks(file_base), 1):
        print(f'Checked [{idx}0 Mb]')
        for elem in read_lines(chunk):
            if elem in check_set:
                check_set.remove(elem)

print(f'Unique rows: [{len(check_set)}]')

UPD 11.11.2020: パフォーマンスを改善するためのヒントを提供してくれた @m9_psy に感謝します。本当に速いです!現在、最速の方法は次のとおりです。

from io import BytesIO

check_set = {elem for elem in BytesIO(open('check.txt', 'rb').read())}

with open('base.txt', 'rb') as file_base:
    for line in file_base:
        if line in check_set:
            check_set.remove(line)

print(f'Unique rows: [{len(check_set)}]')

このプロセスをさらに高速化する方法はありますか?

4

4 に答える 4

6

あなたの例を模倣するデータセットを生成しましょう

import vaex
import numpy as np
N = 50_000_000  # 50 million rows for base
N2 = 1_000_000  # 1 million for check
M = 40+1+45     # chars for each string
N_dup = 10_000  # number of duplicate rows in the checks

s1 = np.random.randint(ord('a'), ord('z'), (N, M), np.uint32).view(f'U{M}').reshape(N)
s2 = np.random.randint(ord('a'), ord('z'), (N2, M), np.uint32).view(f'U{M}').reshape(N2)
# make sure s2 has rows that match s1
dups = np.random.choice(N2, N_dup, replace=False)
s2[dups] = s1[np.random.choice(N, N_dup, replace=False)]

# save the data to disk
vaex.from_arrays(s=s1).export('/data/tmp/base.hdf5')
vaex.from_arrays(s=s2).export('/data/tmp/check.hdf5')

ここで、ベースにないチェック中の行を見つけるために、それらを結合し、一致しなかった行を削除できます。

import vaex
base = vaex.open('/data/tmp/base.hdf5')
check = vaex.open('/data/tmp/check.hdf5')
# joined contains rows where s_other is missing
joined = check.join(base, on='s', how='left', rsuffix='_other')
# drop those
unique = joined.dropmissing(['s_other'])
# and we have everything left
unique
#      s                                                    s_other
0      'hvxursyijiehidlmtqwpfawtuwlmflvwwdokmuvxqyujfh...  'hvxursyijiehidlmtqwpfawtuwlmflvwwdokmuvxqyujfhb...
1      'nslxohrqydxyugngxhvtjwptjtsyuwaljdnprwfjnssikh...  'nslxohrqydxyugngxhvtjwptjtsyuwaljdnprwfjnssikhh...
2      'poevcdxjirulnktmvifdbdaonjwiellqrgnxhbolnjhact...  'poevcdxjirulnktmvifdbdaonjwiellqrgnxhbolnjhactn...
3      'xghcphcvwswlsywgcrrwxglnhwtlpbhlnqhjgsmpivghjk...  'xghcphcvwswlsywgcrrwxglnhwtlpbhlnqhjgsmpivghjku...
4      'gwmkxxqkrfjobkpciqpdahdeuqfenrorqrwajuqdgluwvb...  'gwmkxxqkrfjobkpciqpdahdeuqfenrorqrwajuqdgluwvbs...
...    ...                                                  ...
9,995  'uukjkyaxbjqvmwscnhewxpdgwrhosipoelbhsdnbpjxiwn...  'uukjkyaxbjqvmwscnhewxpdgwrhosipoelbhsdnbpjxiwno...
9,996  'figbmhruheicxkmuqbbnuavgabdlvxxjfudavspdncogms...  'figbmhruheicxkmuqbbnuavgabdlvxxjfudavspdncogmsb...
9,997  'wwgykvwckqqttxslahcojcplnxrjsijupswcyekxooknji...  'wwgykvwckqqttxslahcojcplnxrjsijupswcyekxooknjii...
9,998  'yfopgcfpedonpgbeatweqgweibdesqkgrxwwsikilvvvmv...  'yfopgcfpedonpgbeatweqgweibdesqkgrxwwsikilvvvmvo...
9,999  'qkavooownqwtpbeqketbvpcvxlliptitespfqkcecidfeb...  'qkavooownqwtpbeqketbvpcvxlliptitespfqkcecidfebi...
于 2020-11-06T11:39:41.123 に答える