1

multiprocessing モジュールの Pool を使用して、大きな csv ファイルの読み取りを高速化しようとしています。このために、(py2kからの)を採用しましたが、csv.dictreaderオブジェクトには長さがないようです。それを繰り返すことしかできないということですか?まだチャンクする方法はありますか?

これらの質問は関連しているように見えましたが、実際には私の質問には答えませんでした: Number of lines in csv.DictReader , How to chunk a list in Python 3?

私のコードはこれをやろうとしました:

source = open('/scratch/data.txt','r')
def csv2nodes(r):
    strptime = time.strptime
    mktime = time.mktime
    l = []
    ppl = set()
    for row in r:
        cell = int(row['cell'])
        id = int(row['seq_ei'])
        st = mktime(strptime(row['dat_deb_occupation'],'%d/%m/%Y'))
        ed = mktime(strptime(row['dat_fin_occupation'],'%d/%m/%Y'))
        # collect list
        l.append([(id,cell,{1:st,2: ed})])
        # collect separate sets
        ppl.add(id)
    return (l,ppl)


def csv2graph(source):
    r = csv.DictReader(source,delimiter=',')
    MG=nx.MultiGraph()
    l = []
    ppl = set()
    # Remember that I use integers for edge attributes, to save space! Dic above.
    # start: 1
    # end: 2
    p = Pool(processes=4)
    node_divisor = len(p._pool)*4
    node_chunks = list(chunks(r,int(len(r)/int(node_divisor))))
    num_chunks = len(node_chunks)
    pedgelists = p.map(csv2nodes,
                       zip(node_chunks))
    ll = []
    for l in pedgelists:
        ll.append(l[0])
        ppl.update(l[1])
    MG.add_edges_from(ll)
    return (MG,ppl)
4

1 に答える 1

1

csv.DictReaderドキュメント(およびcsv.readerそれがサブクラス化するクラス) から、クラスは反復子を返します。TypeErrorを呼び出したときに、コードは a をスローしているはずですlen()

データをチャンク化することはできますが、データ全体をメモリに読み込む必要があります。メモリが心配な場合は、に切り替えてcsv.DictReader、作成csv.readerされる辞書のオーバーヘッドをスキップできcsv.DictReaderます。での読みやすさを向上させるためにcsv2nodes()、定数を割り当てて各フィールドのインデックスを指定できます。

CELL = 0
SEQ_EI = 1
DAT_DEB_OCCUPATION = 4
DAT_FIN_OCCUPATION = 5

idまた、組み込み関数名であるため、 とは異なる変数を使用することをお勧めします。

于 2011-11-08T16:30:49.207 に答える