数百万行を含む大きなファイルを読み取り、各行を解析し ( numpy array
)、double の配列に変換し ( python array
)、後で に書き込む単純なプログラムがありますhdf5 file
。このループを何日も繰り返します。各ファイルを読み取った後、すべてのオブジェクトを削除し、ガベージ コレクターを呼び出します。プログラムを実行すると、初日はエラーなしで解析されますが、2 日目にはMemoryError
. プログラムのメモリ使用量を監視しました。解析の初日、メモリ使用量は約1.5 GBでした。初日の解析が終了すると、メモリ使用量は50 MBに減少します。2日目が始まると、取得したファイルから行を読み取ろうとしますMemoryError
。以下は、プログラムの出力です。
source file extracted at C:\rfadump\au\2012.08.07.txt
parsing started
current time: 2012-09-16 22:40:16.829000
500000 lines parsed
1000000 lines parsed
1500000 lines parsed
2000000 lines parsed
2500000 lines parsed
3000000 lines parsed
3500000 lines parsed
4000000 lines parsed
4500000 lines parsed
5000000 lines parsed
parsing done.
end time is 2012-09-16 23:34:19.931000
total time elapsed 0:54:03.102000
repacking file
done
> s:\users\aaj\projects\pythonhf\rfadumptohdf.py(132)generateFiles()
-> while single_date <= self.end_date:
(Pdb) c
*** 2012-08-08 ***
source file extracted at C:\rfadump\au\2012.08.08.txt
cought an exception while generating file for day 2012-08-08.
Traceback (most recent call last):
File "rfaDumpToHDF.py", line 175, in generateFile
lines = self.rawfile.read().split('|\n')
MemoryError
Windows システム タスク マネージャーが、このプロセスのメモリ使用量を50 MBと表示していることは確かです。Python のガベージ コレクタまたはメモリ マネージャが空きメモリを正しく計算していないようです。空きメモリがたくさんあるはずですが、十分ではないと考えています。
何か案が?
編集
ここに私のコードを追加する
コードの一部を配置します。私は python を初めて使用します。私の python コーディング スタイルをご容赦ください。
モジュール 1
def generateFile(self, current_date):
try:
print "*** %s ***" % current_date.strftime("%Y-%m-%d")
weekday=current_date.weekday()
if weekday >= 5:
print "skipping weekend"
return
self.taqdb = taqDB(self.index, self.offset)
cache_filename = os.path.join(self.cache_dir,current_date.strftime("%Y.%m.%d.h5"))
outputFile = config.hdf5.filePath(self.index, date=current_date)
print "cache file: ", cache_filename
print "output file: ", outputFile
tempdir = "C:\\rfadump\\"+self.region+"\\"
input_filename = tempdir + filename
print "source file extracted at %s " % input_filename
## universe
reader = rfaTextToTAQ.rfaTextToTAQ(self.tickobj) ## PARSER
count = 0
self.rawfile = open(input_filename, 'r')
lines = self.rawfile.read().split('|\n')
total_lines = len(lines)
self.rawfile.close()
del self.rawfile
print "parsing started"
start_time = dt.datetime.now()
print "current time: %s" % start_time
#while(len(lines) > 0):
while(count < total_lines):
#line = lines.pop(0) ## This slows down processing
result = reader.parseline(lines[count]+"|")
count += 1
if(count % 500000 == 0):
print "%d lines parsed" %(count)
if(result == None):
continue
ric, timestamp, quotes, trades, levelsUpdated, tradeupdate = result
if(len(levelsUpdated) == 0 and tradeupdate == False):
continue
self.taqdb.insert(result)
## write to hdf5 TODO
writer = h5Writer.h5Writer(cache_filename, self.tickobj)
writer.write(self.taqdb.groups)
writer.close()
del lines
del self.taqdb, self.tickobj
##########################################################
print "parsing done."
end_time = dt.datetime.now()
print "end time is %s" % end_time
print "total time elapsed %s" % (end_time - start_time)
defragger = hdf.HDF5Defragmenter()
defragger.Defrag(cache_filename,outputFile)
del defragger
print "done"
gc.collect(2)
except:
print "cought an exception while generating file for day %s." % current_date.strftime("%Y-%m-%d")
tb = traceback.format_exc()
print tb
モジュール 2 - taqdb - 解析されたデータを配列に格納する
class taqDB:
def __init__(self, index, offset):
self.index = index
self.tickcfg = config.hdf5.getTickConfig(index)
self.offset = offset
self.groups = {}
def getGroup(self,ric):
if (self.groups.has_key(ric) == False):
self.groups[ric] = {}
return self.groups[ric]
def getOrderbookArray(self, ric, group):
datasetname = orderBookName
prodtype = self.tickcfg.getProdType(ric)
if(prodtype == ProdType.INDEX):
return
orderbookArrayShape = self.tickcfg.getOrderBookArrayShape(prodtype)
if(group.has_key(datasetname) == False):
group[datasetname] = array.array("d")
orderbookArray = self.tickcfg.getOrderBookArray(prodtype)
return orderbookArray
else:
orderbookArray = group[datasetname]
if(len(orderbookArray) == 0):
return self.tickcfg.getOrderBookArray(prodtype)
lastOrderbook = orderbookArray[-orderbookArrayShape[1]:]
return np.array([lastOrderbook])
def addToDataset(self, group, datasetname, timestamp, arr):
if(group.has_key(datasetname) == False):
group[datasetname] = array.array("d")
arr[0,0]=timestamp
a1 = group[datasetname]
a1.extend(arr[0])
def addToOrderBook(self, group, timestamp, arr):
self.addToDataset(self, group, orderBookName, timestamp, arr)
def insert(self, data):
ric, timestamp, quotes, trades, levelsUpdated, tradeupdate = data
delta = dt.timedelta(hours=timestamp.hour,minutes=timestamp.minute, seconds=timestamp.second, microseconds=(timestamp.microsecond/1000))
timestamp = float(str(delta.seconds)+'.'+str(delta.microseconds)) + self.offset
## write to array
group = self.getGroup(ric)
orderbookUpdate = False
orderbookArray = self.getOrderbookArray(ric, group)
nonzero = quotes.nonzero()
orderbookArray[nonzero] = quotes[nonzero]
if(np.any(nonzero)):
self.addToDataset(group, orderBookName, timestamp, orderbookArray)
if(tradeupdate == True):
self.addToDataset(group, tradeName, timestamp, trades)
モジュール 3 - パーサー
class rfaTextToTAQ:
"""RFA Raw dump file reader. Readers single line (record) and returns an array or array of fid value pairs."""
def __init__(self,tickconfig):
self.tickconfig = tickconfig
self.token = ''
self.state = ReadState.SEQ_NUM
self.fvstate = fvstate.FID
self.quotes = np.array([]) # read from tickconfig
self.trades = np.array([]) # read from tickconfig
self.prodtype = ProdType.STOCK
self.allquotes = {}
self.alltrades = {}
self.acvol = 0
self.levelsUpdated = []
self.quoteUpdate = False
self.tradeUpdate = False
self.depth = 0
def updateLevel(self, index):
if(self.levelsUpdated.__contains__(index) == False):
self.levelsUpdated.append(index)
def updateQuote(self, fidindex, field):
self.value = float(self.value)
if(self.depth == 1):
index = fidindex[0]+(len(self.tickconfig.stkQuotes)*(self.depth - 1))
self.quotes[index[0]][fidindex[1][0]] = self.value
self.updateLevel(index[0])
else:
self.quotes[fidindex] = self.value
self.updateLevel(fidindex[0][0])
self.quoteUpdate = True
def updateTrade(self, fidindex, field):
#self.value = float(self.value)
if(self.tickconfig.tradeUpdate(self.depth) == False):
return
newacvol = float(self.value)
if(field == acvol):
if(self.value > self.acvol):
tradesize = newacvol - self.acvol
self.acvol = newacvol
self.trades[fidindex] = tradesize
if(self.trades.__contains__(0) == False):
self.tradeUpdate = True
else:
self.trades[fidindex] = self.value
if(not (self.trades[0,1]==0 or self.trades[0,2]==0)):
self.tradeUpdate = True
def updateResult(self):
field = ''
valid, field = field_dict.FIDToField(int(self.fid), field)
if(valid == False):
return
if(self.value == '0'):
return
if(self.prodtype == ProdType.STOCK):
fidindex = np.where(self.tickconfig.stkQuotes == field)
if(len(fidindex[0]) == 0):
fidindex = np.where(self.tickconfig.stkTrades == field)
if(len(fidindex[0]) == 0):
return
else:
self.updateTrade(fidindex, field)
else:
self.updateQuote(fidindex, field)
else:
fidindex = np.where(self.tickconfig.futQuotes == field)
if(len(fidindex[0]) == 0):
fidindex = np.where(self.tickconfig.futTrades == field)
if(len(fidindex[0]) == 0):
return
else:
self.updateTrade(fidindex, field)
else:
self.updateQuote(fidindex, field)
def getOrderBookTrade(self):
if (self.allquotes.has_key(self.ric) == False):
acvol = 0
self.allquotes[self.ric] = self.tickconfig.getOrderBookArray(self.prodtype)
trades = self.tickconfig.getTradesArray()
self.alltrades[self.ric] = [trades, acvol]
return self.allquotes[self.ric], self.alltrades[self.ric]
def parseline(self, line):
self.tradeUpdate = False
self.levelsUpdated = []
pos = 0
length = len(line)
self.state = ReadState.SEQ_NUM
self.fvstate = fvstate.FID
self.token = ''
ch = ''
while(pos < length):
prevChar = ch
ch = line[pos]
pos += 1
#SEQ_NUM
if(self.state == ReadState.SEQ_NUM):
if(ch != ','):
self.token += ch
else:
self.seq_num = int(self.token)
self.state = ReadState.TIMESTAMP
self.token = ''
# TIMESTAMP
elif(self.state == ReadState.TIMESTAMP):
if(ch == ' '):
self.token = ''
elif(ch != ','):
self.token += ch
else:
if(len(self.token) != 12):
print "Invalid timestamp format. %s. skipping line.\n", self.token
self.state = ReadState.SKIPLINE
else:
self.timestamp = datetime.strptime(self.token,'%H:%M:%S.%f')
self.state = ReadState.RIC
self.token = ''
# RIC
elif(self.state == ReadState.RIC):
if(ch != ','):
self.token += ch
else:
self.ric = self.token
self.token = ''
self.ric, self.depth = self.tickconfig.replaceRic(self.ric)
self.prodtype = self.tickconfig.getProdType(self.ric)
if(self.tickconfig.subscribed(self.ric)):
self.state = ReadState.UPDATE_TYPE
self.quotes, trades = self.getOrderBookTrade()
self.trades = trades[0]
self.acvol = trades[1]
else:
self.state = ReadState.SKIPLINE
# UPDATE_TYPE
elif(self.state == ReadState.UPDATE_TYPE):
if(ch != '|'):
self.token += ch
else:
self.update_type = self.token
self.token = ''
self.state = ReadState.FVPAIRS
#SKIPLINE
elif(self.state == ReadState.SKIPLINE):
return None
# FV PAIRS
elif(self.state == ReadState.FVPAIRS):
# FID
if(self.fvstate == fvstate.FID):
if(ch != ','):
if(ch.isdigit() == False):
self.token = self.value+ch
self.fvstate = fvstate.FIDVALUE
self.state = ReadState.FVPAIRS
else:
self.token += ch
else:
self.fid = self.token
self.token = ''
self.fvstate = fvstate.FIDVALUE
self.state = ReadState.FVPAIRS
# FIDVALUE
elif(self.fvstate == fvstate.FIDVALUE):
if(ch != '|'):
self.token += ch
else:
self.value = self.token
self.token = ''
self.state = ReadState.FVPAIRS
self.fvstate = fvstate.FID
# TODO set value
self.updateResult()
return self.ric, self.timestamp, self.quotes, self.trades, self.levelsUpdated, self.tradeUpdate
ありがとう。