0

私は ZooKeeper を単純なメッセージ キューとして評価しており、2 つの非常に単純なスクリプト、mq feeder と mq consumer を作成しました。以下のフィーダーは、20 個のジョブをキューにプッシュし、キューのステータス (ジョブが消費されている) を監視しています。

from kazoo.client import KazooClient

zk = KazooClient(hosts='xxx')
zk.start()

for i in xrange(20):
  zk.create("/queue/%s" % i, b"%s" % i)

while 1:
  print zk.get_children('/queue')

以下のコンシューマーは数回起動され (私のテストでは最大 3 つの同時プロセス)、ジョブリストを取得し、それを繰り返し処理してロックされていないジョブを見つけ、処理します (いくつかの作業をシミュレートするためにランダムな秒数スリープします)。完了したら、ジョブを削除してからロックを削除します。

from kazoo.client import KazooClient
from kazoo.exceptions import NodeExistsError
from time import sleep
import random

zk = KazooClient(hosts='xxx')
zk.start()
zk.ensure_path("/locks")
zk.ensure_path("/queue")

while 1:
  jobs = sorted(zk.get_children('/queue'))
  if jobs:
    for i in jobs:
      print "Checking job: %s" % i
      try:
        zk.create("/locks/%s" % i)
      except NodeExistsError:
        print "Job is locked, skipping!"
        pass
      else:
        print "Job is unlocked, processing."
        sleep(random.randrange(5))
        zk.delete("/queue/%s" % i)
        print "Deleted processed job, deleting the lock."
        zk.delete("/locks/%s" % i)
        pass
  else:
    print "There's no locks in the queue."
    pass

私が見ている、追跡できない問題は、消費者プロセスが次のように終了していることです。

Traceback (most recent call last):
  File "zk_consumer.py", line 24, in <module>
    zk.delete("/queue/%s" % i)
  File "/Library/Python/2.7/site-packages/kazoo/client.py", line 1055, in delete
    return self.delete_async(path, version).get()
  File "/Library/Python/2.7/site-packages/kazoo/handlers/threading.py", line 107, in get
    raise self._exception
kazoo.exceptions.NoNodeError: ((), {})

最後のプロセスは単一のジョブを永遠にチェックし続けますが、そのジョブはキューに残りますが、常にロックされます。明らかに、競合状態につながると思われる論理エラーがいくつかありますが、それにしばらく時間を費やしましたが、それを見つけることができないようです。ここで何か間違ったことをしていますか、それとも ZooKeeper は単純なジョブ キューの実行可能なソリューションではありませんか?

4

1 に答える 1

1

あなたのコードは際どいです。このシーケンスを考えてみましょう。

T1                      T2
read queue/1     
                        read queue/1
                        write lock/1
                        delete queue/1
                        delete lock/1
write lock/1 
delete queue/1 (FAIL, no node!)

ロックした後、他の人がキュー 1 を削除していないことを確認するためにもう一度読み取る必要があります。

于 2013-05-04T04:39:28.173 に答える