0

データベースから単一のレコード(行)を取得する必要があるセロリワーカーがたくさんあります。

「last_used」フィールドから 60 秒が経過した場合にのみ返され、返されるとすぐに「last_used」を現在の時刻で更新する必要があります (したがって、0 の遅延で無制限の再試行を行う他のワーカーは同じものを取得しません)。

これをすべて db レベルで行うことは可能ですか? この「last_used」フィールドを更新する他のソースはありません。

単列はこんな感じ。

{"id": "..." "last_used": "2016-06-31 00:37:21.241833", "item": "string"}

私は試した:

conn = r.connect(host='localhost', port=28015)
do = r.db('database').table('tb').order_by(index=("last_used")).limit(1).update(
{'last_used': r.now()}
, return_changes=True).run(conn)

それは機能しません。変更される前に、複数のワーカーが同じ行を返します。

4

1 に答える 1

0

私はそれをやったと思います.0の競合でそれに対して50のワーカーを実行しようとしました. しかし、私はまだ rethinkdb の初心者であり、ここから借りてきたこれについてのあなたの考えを聞きたいと思っています。

@app.task(bind=True, default_retry_delay=0, max_retries=999)
def ss(self):        
    conn = r.connect(host='localhost', port=28015)
    do = r.db('').table('').order_by("last_used").filter(
    r.now() - r.row['last_used'] > 10
    ).filter({'status': 1}).limit(1).update(
    r.branch(r.row["status"] == 1, {'status': 2, "last_used": r.now()}, {}),
    return_changes=True).run(conn)


    try:
        got_item = do['changes'][0]['new_val']['id']
        last_used = do['changes'][0]['new_val']['last_used']
    except:
        # print('error', do)
        raise self.retry()

    if got_item:
        # Do stuff that required unique row here....
        print(got_item,'\n',last_used)
        time.sleep(random.randrange(1,5))
        r.db('').table('').get(got_item).update({"status": 1}).run(conn)

#start workers
for i in range(50):
    ss.delay()

これで、ワーカーが「if got_item」ブロック内の一意のアイテムを操作できることが保証されることを願っています。

于 2016-07-27T04:48:44.430 に答える