multiprocessing モジュールの Pool を使用して、大きな csv ファイルの読み取りを高速化しようとしています。このために、(py2kからの)例を採用しましたが、csv.dictreaderオブジェクトには長さがないようです。それを繰り返すことしかできないということですか?まだチャンクする方法はありますか?
これらの質問は関連しているように見えましたが、実際には私の質問には答えませんでした: Number of lines in csv.DictReader , How to chunk a list in Python 3?
私のコードはこれをやろうとしました:
source = open('/scratch/data.txt','r')
def csv2nodes(r):
strptime = time.strptime
mktime = time.mktime
l = []
ppl = set()
for row in r:
cell = int(row['cell'])
id = int(row['seq_ei'])
st = mktime(strptime(row['dat_deb_occupation'],'%d/%m/%Y'))
ed = mktime(strptime(row['dat_fin_occupation'],'%d/%m/%Y'))
# collect list
l.append([(id,cell,{1:st,2: ed})])
# collect separate sets
ppl.add(id)
return (l,ppl)
def csv2graph(source):
r = csv.DictReader(source,delimiter=',')
MG=nx.MultiGraph()
l = []
ppl = set()
# Remember that I use integers for edge attributes, to save space! Dic above.
# start: 1
# end: 2
p = Pool(processes=4)
node_divisor = len(p._pool)*4
node_chunks = list(chunks(r,int(len(r)/int(node_divisor))))
num_chunks = len(node_chunks)
pedgelists = p.map(csv2nodes,
zip(node_chunks))
ll = []
for l in pedgelists:
ll.append(l[0])
ppl.update(l[1])
MG.add_edges_from(ll)
return (MG,ppl)