2

read_table() の呼び出しを並列化する方法はありますか? 私の場合、日付の解析のために CPU バウンドです。ドキュメントを読んでもそれを達成する方法がわかりません。頭に浮かぶ唯一のことは、入力ファイルを分割し、 read_table を並行して呼び出してから、データフレームを連結することです。

4

1 に答える 1

1

これにより、CSV ファイルが並行して読み取られ、それらが連結されます。厄介なのは、numpy型を処理しないため、日付を解析できないことです。私も同じ問題で苦労していますが、今のところ などのライブラリexecnetは組み込みでない型を扱えないようです。jsonそのため、送信する前にDataFrames を変換します。型を基本的な Python の型に取り除きます。

編集:日付を解析する必要がある場合は、ファイルをリモートで読み取り、日付を解析してハード ドライブにCSV保存するのがより賢明な方法かもしれません。pickle次に、メイン プロセスで pickle ファイルを読み取り、それらを連結することができます。それがパフォーマンスの向上につながるかどうかを確認するためにそれを試したことはありません。

remote_read_csv.py

import cPickle as pickle

if __name__ == '__channelexec__':
    reader = pickle.loads(channel.receive())

    for filename in channel:
        channel.send(reader(filename).to_json())

以下は、上記のモジュールを利用します。IPythonでテストしました。

from pandas import DataFrame, concat, read_csv, read_json
from numpy import random
import execnet
import remote_read_csv
import cPickle as pickle
import itertools
import psutil

### Create dummy data and save to CSV

def rdf():
    return DataFrame((random.rand(4, 3) * 100).astype(int))

d1 = rdf()
d2 = rdf()
d3 = rdf()

dfsl = [d1, d2, d3]
names = 'd1.csv d2.csv d3.csv'.split()
for i in range(3):
    dfsl[i].to_csv(names[i])

### Read CSV files in separate threads then concatenate

reader = pickle.dumps(read_csv)

def set_gateways(remote_module, *channel_sends):
    gateways = []
    channels = []
    for i in range(psutil.NUM_CPUS):
        gateways.append(execnet.makegateway())
        channels.append(gateways[i].remote_exec(remote_module))
        for send in channel_sends:
            channels[i].send(send)
    return (gateways, channels)

def para_read(names):
    gateways, channels = set_gateways(remote_read_csv, reader)
    mch = execnet.MultiChannel(channels)
    queue = mch.make_receive_queue()
    channel_ring = itertools.cycle(mch)
    for f in names:
        channel = channel_ring.next()
        channel.send(f)
    dfs = []
    for i in range(len(names)):
        channel, df = queue.get()
        dfs.append(df)

    [gw.exit() for gw in gateways]
    return concat([read_json(i) for i in dfs], keys=names)

para_read(names)
于 2014-05-02T04:56:21.127 に答える