私がどこから来たのかについてのいくつかの文脈。コードスニペットは最後にあります。
可能であれば、H2Oなどのオープンソースツールを使用して超高性能の並列CSVファイル読み取りを行うことを好みますが、このツールの機能セットには制限があります。教師あり学習を適切に行うためにH2Oクラスターにフィードする前に、データサイエンスパイプラインを作成するための多くのコードを作成することになります。
マルチプロセッシングライブラリのプールオブジェクトとマップ関数で多くの並列処理を追加することにより、UCIリポジトリから8GBHIGGSデータセットやデータサイエンス目的の40GBCSVファイルなどのファイルを大幅に高速に読み取っています。たとえば、最近傍探索を使用したクラスタリング、およびDBSCANとマルコフのクラスタリングアルゴリズムでは、深刻な問題を抱えるメモリと実時間の問題を回避するために、並列プログラミングの精巧さが必要です。
私は通常、最初にgnuツールを使用してファイルを行ごとに分割し、次にそれらすべてをglob-filemaskして、Pythonプログラムでそれらを並行して検索して読み取るのが好きです。私は一般的に1000以上の部分ファイルのようなものを使用します。これらのトリックを実行すると、処理速度とメモリ制限に非常に役立ちます。
pandas dataframe.read_csvはシングルスレッドであるため、並列実行のためにmap()を実行することで、これらのトリックを実行してパンダを非常に高速にすることができます。htopを使用すると、プレーンな古いシーケンシャルパンダdataframe.read_csvで、1つのコアの100%CPUがpd.read_csvの実際のボトルネックであり、ディスクではないことがわかります。
SATA6バスで回転するHDではなく、高速ビデオカードバスでSSDを使用し、さらに16個のCPUコアを使用していることを追加する必要があります。
また、一部のアプリケーションでうまく機能することを発見した別の手法は、1つの大きなファイルを多くのパーツファイルに事前に分割するのではなく、並列CSVファイルが1つの巨大なファイル内ですべてを読み取り、各ワーカーをファイルの異なるオフセットで開始することです。各並列ワーカーでpythonのファイルseek()とtell()を使用して、ビッグファイル内の異なるバイトオフセットの開始バイトと終了バイトの位置で、同時にビッグテキストファイルをストリップで読み取ります。バイトに対して正規表現のfindallを実行し、改行の数を返すことができます。これは部分的な合計です。最後に、部分的な合計を合計して、ワーカーの終了後にmap関数が戻ったときにグローバルな合計を取得します。
以下は、並列バイトオフセットトリックを使用したベンチマークの例です。
私は2つのファイルを使用しています:HIGGS.csvは8GBです。これは、UCI機械学習リポジトリからのものです。all_bin.csvは40.4GBで、現在のプロジェクトのものです。私は2つのプログラムを使用しています。Linuxに付属しているGNUwcプログラムと、私が開発した純粋なpythonfastread.pyプログラムです。
HP-Z820:/mnt/fastssd/fast_file_reader$ ls -l /mnt/fastssd/nzv/HIGGS.csv
-rw-rw-r-- 1 8035497980 Jan 24 16:00 /mnt/fastssd/nzv/HIGGS.csv
HP-Z820:/mnt/fastssd$ ls -l all_bin.csv
-rw-rw-r-- 1 40412077758 Feb 2 09:00 all_bin.csv
ga@ga-HP-Z820:/mnt/fastssd$ time python fastread.py --fileName="all_bin.csv" --numProcesses=32 --balanceFactor=2
2367496
real 0m8.920s
user 1m30.056s
sys 2m38.744s
In [1]: 40412077758. / 8.92
Out[1]: 4530501990.807175
これは、約4.5 GB / s、つまり45 Gb/sのファイルスラップ速度です。それは回転するハードディスクではありません、私の友人。それは実際にはSamsungPro950SSDです。
以下は、純粋なCコンパイル済みプログラムであるgnuwcによって行カウントされている同じファイルの速度ベンチマークです。
この場合、私の純粋なpythonプログラムがgnuwcでコンパイルされたCプログラムの速度と本質的に一致していることがわかります。Pythonは解釈されますが、Cはコンパイルされているので、これは非常に興味深い速度の偉業です。あなたも同意すると思います。もちろん、wcは本当に並列プログラムに変更する必要があります。そうすれば、それは私のpythonプログラムの靴下を本当に打ち負かすでしょう。しかし、現在のところ、gnuwcは単なるシーケンシャルプログラムです。あなたはできることをします、そしてpythonは今日並行して行うことができます。Cythonのコンパイルは私を助けることができるかもしれません(しばらくの間)。また、メモリマップトファイルはまだ調査されていません。
HP-Z820:/mnt/fastssd$ time wc -l all_bin.csv
2367496 all_bin.csv
real 0m8.807s
user 0m1.168s
sys 0m7.636s
HP-Z820:/mnt/fastssd/fast_file_reader$ time python fastread.py --fileName="HIGGS.csv" --numProcesses=16 --balanceFactor=2
11000000
real 0m2.257s
user 0m12.088s
sys 0m20.512s
HP-Z820:/mnt/fastssd/fast_file_reader$ time wc -l HIGGS.csv
11000000 HIGGS.csv
real 0m1.820s
user 0m0.364s
sys 0m1.456s
結論:Cプログラムと比較して、純粋なPythonプログラムの速度は良好です。ただし、少なくとも行数を数える目的で、Cプログラムよりも純粋なPythonプログラムを使用するだけでは十分ではありません。通常、この手法は他のファイル処理にも使用できるため、このPythonコードは引き続き優れています。
質問:正規表現を一度だけコンパイルしてすべてのワーカーに渡すと、速度が向上しますか?回答:正規表現の事前コンパイルは、このアプリケーションでは役に立ちません。その理由は、すべてのワーカーのプロセスのシリアル化と作成のオーバーヘッドが支配的であるためだと思います。
もう一つ。並列CSVファイルの読み取りも役立ちますか?ディスクがボトルネックですか、それともCPUですか?stackoverflowに関するいわゆる一流の回答の多くには、ファイルを読み取るのに1つのスレッドしか必要ないという一般的な開発の知恵が含まれています。でも、確かですか?
確認してみましょう:
HP-Z820:/mnt/fastssd/fast_file_reader$ time python fastread.py --fileName="HIGGS.csv" --numProcesses=16 --balanceFactor=2
11000000
real 0m2.256s
user 0m10.696s
sys 0m19.952s
HP-Z820:/mnt/fastssd/fast_file_reader$ time python fastread.py --fileName="HIGGS.csv" --numProcesses=1 --balanceFactor=1
11000000
real 0m17.380s
user 0m11.124s
sys 0m6.272s
そうそう、そうです。並列ファイル読み取りは非常にうまく機能します。さて、あなたは行きます!
追伸 知りたい人がいる場合、単一のワーカープロセスを使用しているときにbalanceFactorが2だったとしたらどうでしょうか。まあ、それは恐ろしいです:
HP-Z820:/mnt/fastssd/fast_file_reader$ time python fastread.py --fileName="HIGGS.csv" --numProcesses=1 --balanceFactor=2
11000000
real 1m37.077s
user 0m12.432s
sys 1m24.700s
fastread.py pythonプログラムの重要な部分:
fileBytes = stat(fileName).st_size # Read quickly from OS how many bytes are in a text file
startByte, endByte = PartitionDataToWorkers(workers=numProcesses, items=fileBytes, balanceFactor=balanceFactor)
p = Pool(numProcesses)
partialSum = p.starmap(ReadFileSegment, zip(startByte, endByte, repeat(fileName))) # startByte is already a list. fileName is made into a same-length list of duplicates values.
globalSum = sum(partialSum)
print(globalSum)
def ReadFileSegment(startByte, endByte, fileName, searchChar='\n'): # counts number of searchChar appearing in the byte range
with open(fileName, 'r') as f:
f.seek(startByte-1) # seek is initially at byte 0 and then moves forward the specified amount, so seek(5) points at the 6th byte.
bytes = f.read(endByte - startByte + 1)
cnt = len(re.findall(searchChar, bytes)) # findall with implicit compiling runs just as fast here as re.compile once + re.finditer many times.
return cnt
PartitionDataToWorkersのdefは、通常のシーケンシャルコードです。他の誰かが並列プログラミングとは何かについて練習したい場合に備えて、私はそれを省略しました。私はあなたの学習の利益のために、より難しい部分を無料で配りました:テストされて動作する並列コード。
おかげで:オープンソースのH2Oプロジェクト、ArnoとCliff、およびH2Oスタッフによる、優れたソフトウェアと教育ビデオのおかげで、上記のような純粋なpython高性能パラレルバイトオフセットリーダーのインスピレーションを得ることができました。H2Oは、Javaを使用して並列ファイル読み取りを行い、PythonおよびRプログラムで呼び出すことができ、大きなCSVファイルを読み取る際に地球上で最も高速です。