2

データセットを調べて、データベース (postgres) から特徴を計算しようとしています。

問題は、時々、プログラムがどこかでスタックすることです (有効にしたデータベースログから確認されました。新しいクエリは長時間行われません)。ctrl+c を押すと、プログラムは正常に再開するようです。 (行数が多いので計算が正しいかはまだ確認していません)。同じ場所に引っかかるわけではありませんが、ランダムなパターンを持っているようです。私が間違っているかもしれないことは何ですか?

main.py と NAC.py の 2 つのファイルがあります。

main.py:

import NAC
from dateutil.parser import parse
from datetime import timedelta
rows = fc.Read_CSV_to_Dict(input_file) #just a wrapper around csv.Dictreader
i=0
start_time = time.time()
for row in rows : #rows has about 600,000 rows
    ret1,ret2 = NAC.function(row['key1'], ...) #and other parameters
    #new keys
    row['newKey1'],row['newKey2'] = ret1
    row['newKey3'],row['newKey4'] = ret2 #unpacking
    i=i+1
    if(i%10000==0): #progress monitor
        print i
print (time.time()-start_time)/60
NAC.db_close()

NAC.py:

from dateutil.parser import parse
from datetime import timedelta
import psycopg2
import psycopg2.extras

def function(param1, ...):
    """     
    Returns:
        2 element list, each a list by itself
    """ 
    nsclist = [0]*param2_count
    naclist = [0]*param2_count  
    for i in range(param2_count):
        stime = (begintime + timedelta(seconds = 60*intervalPeriod * i))
        etime = (begintime + timedelta(seconds = 60*intervalPeriod * (i+1)))
        table1_query = "select sum(count)from table1 where column1= '{0}' and column2>'{1}'::TIMESTAMP WITH TIME ZONE and column2<='{2}'::TIMESTAMP WITH TIME ZONE"
        cur.execute(sched_query.format(param1,stime,etime))
        nsclist[i] = cur.fetchone()[0]
        if(nsclist[i] == []):
            nsclist[i] = 0
        table2_query = "select sum(count)from table2 where column1 = '{0}' and column2 >'{1}'::TIMESTAMP WITH TIME ZONE and column2 <='{2}'::TIMESTAMP WITH TIME ZONE"
        cur.execute(table2_query .format(param1,stime,etime))
        naclist[i] = cur.fetchone()[0]
        if(naclist[i] == []):
            naclist[i] = 0
    return nsclist, naclist

def db_close():
    cur.close()
    conn.close()

intervalPeriod = 5 #minutes
conn = psycopg2.connect(cs.local_connstr)
cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)

DB ログのタイムスタンプ:

2013-07-01 18:26:01 PDT LOG:  statement: select sum(count)from ...
2013-07-01 18:26:01 PDT LOG:  statement: select sum(count)from ...
2013-07-01 18:26:01 PDT LOG:  statement: select sum(count)from ...
2013-07-01 18:26:01 PDT LOG:  statement: select sum(count)from ...
2013-07-01 18:26:01 PDT LOG:  statement: select sum(count)from ...
2013-07-01 18:26:01 PDT LOG:  statement: select sum(count)from ...
2013-07-01 18:26:01 PDT LOG:  statement: select sum(count)from ...
2013-07-01 18:26:01 PDT LOG:  statement: select sum(count)from ...
2013-07-01 18:29:30 ctl+c pressed (manually added... not in the log)
2013-07-01 18:29:30 PDT LOG:  statement: select sum(count)from ...
2013-07-01 18:29:30 PDT LOG:  statement: select sum(count)from ...
2013-07-01 18:29:30 PDT LOG:  statement: select sum(count)from ...
2013-07-01 18:29:30 PDT LOG:  statement: select sum(count)from ...
2013-07-01 18:29:30 PDT LOG:  statement: select sum(count)from ...
2013-07-01 18:29:30 PDT LOG:  statement: select sum(count)from ...
4

1 に答える 1

0

カーソルに問題があったことが判明しました。関数呼び出しごとにカーソルを開いたり閉じたりする必要がありました。理由はわかりません。

from dateutil.parser import parse
from datetime import timedelta
import psycopg2
import psycopg2.extras

def function(param1, ...):
    """     
    Returns:
        2 element list, each a list by itself
    """ 
    cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
    nsclist = [0]*param2_count
    naclist = [0]*param2_count  
    for i in range(param2_count):
        table1_query = "select sum(count)from table1 where column1= '{0}' and column2>'{1}'::TIMESTAMP WITH TIME ZONE and column2<='{2}'::TIMESTAMP WITH TIME ZONE"
        cur.execute(sched_query.format(param1,stime,etime))
        nsclist[i] = cur.fetchone()[0]
        if(nsclist[i] == []):
            nsclist[i] = 0
        table2_query = "select sum(count)from table2 where column1 = '{0}' and column2 >'{1}'::TIMESTAMP WITH TIME ZONE and column2 <='{2}'::TIMESTAMP WITH TIME ZONE"
        cur.execute(table2_query .format(param1,stime,etime))
        naclist[i] = cur.fetchone()[0]
        if(naclist[i] == []):
            naclist[i] = 0
    cur.close()
    return nsclist, naclist

def db_close():
    conn.close()

intervalPeriod = 5 #minutes
conn = psycopg2.connect(cs.local_connstr)
于 2013-07-02T10:25:49.907 に答える