2
cities = DBSession.query(City).filter(City.big=='Y').options(joinedload(City.hash)).limit(1)

t0 = time.time()
keyword_statuses = DBSession.query(KeywordStatus).filter(KeywordStatus.status==0).options(joinedload(KeywordStatus.keyword)).with_lockmode("update").limit(1)

for kw_status in keyword_statuses:
    kw_status.status = 1
    DBSession.commit() 

t0 = time.time()
w = SWorker(threads_no=1, network_server='http://192.168.1.242:8180/', keywords=keyword_statuses, cities=cities, saver=MySqlRawSave(DBSession), loglevel='debug')

w.work()

print 'finished'

上記のコードは、select for update を使用してテーブルからキーワード ステータスを選択します。行が更新されるまで行をロックします。

ご覧のとおり、行を更新して変更をコミットします。

kw_status.status = 1
DBSession.commit()    

その後、タスクをキューに入れ、キューを処理する多数のスレッドを作成する SWorker オブジェクトを作成します (ここでは簡単にするために 1 つだけにします)。

処理が終了すると、ワーカーは

kw_status.status = 2
DBSession.commit()

この時点で例外が発生します

(1205, 'Lock wait timeout exceeded; try restarting transaction') 'UPDATE g_search_keyword_status SET status=%s WHERE g_search_keyword_status.keyword_id = %s' (2, 10000001L)

そのため、行がロックされているようです。しかし、ワーカーを開始する前に、ステータスを 1 に更新し、変更をコミットしたので、行のロックを解除する必要があります。

また、scoped_session を使用します

DBSession = scoped_session(
    sessionmaker(
    autoflush=True,
    autocommit=False,
    bind=engine
    )
)
4

1 に答える 1

1

問題は遅延読み込みでした。知らせ

keyword_statuses =  DBSession.query(KeywordStatus).filter(KeywordStatus.status==0).options(joinedload(KeywordStatus.keyword)).with_lockmode("update").limit(1)

for kw_status in keyword_statuses:
    kw_status.status = 1
DBSession.commit() 

上記のコードは結果をメモリにロードせず、実際に必要な場合にのみロードします。

私の SWorker では、keywordStatus.keyword オブジェクトをループするため、テーブルは foreach thread をロックします。

all() を使用して結果をメモリにロードすると、問題は解決しました

keyword_statuses =  DBSession.query(KeywordStatus).filter(KeywordStatus.status==0).options(joinedload(KeywordStatus.keyword)).with_lockmode("update").limit(1).all()
于 2012-07-06T13:44:54.343 に答える