3

Python バインディングを使用して zeroMQ でパイプライン パターンを実装しています。

タスクは、次のような無限ループで新しいタスクをリッスンするワーカーにファン アウトされます。

    while True:
        socks = dict(self.poller.poll())
        if self.receiver in socks and socks[self.receiver] == zmq.POLLIN:
            msg = self.receiver.recv_unicode(encoding='utf-8')
            self.process(msg)
        if self.hear in socks and socks[self.hear] == zmq.POLLIN:
            msg = self.hear.recv()
            print self.pid,":",  msg
            sys.exit(0)

シンク ノードからメッセージを受信すると終了し、期待されるすべての結果を受信したことを確認します。

ただし、ワーカーはそのようなメッセージを見逃して終了しない場合があります。ワーカーが知る方法がない場合に、ワーカーが常に終了するようにする最善の方法は何ですか (前述のメッセージ以外では、処理するタスクはこれ以上ありません)。

ワーカーのステータスをチェックするために私が書いたテスト コードは次のとおりです。

#-*- coding:utf-8 -*-
"""
Test module containing tests for all modules of pypln 

"""
import unittest
from servers.ventilator import Ventilator
from subprocess import Popen, PIPE
import time
class testWorkerModules(unittest.TestCase):
    def setUp(self):
        self.nw = 4
        #spawn 4 workers
        self.ws = [Popen(['python', 'workers/dummy_worker.py'], stdout=None) for i in range(self.nw)]
        #spawn a sink
        self.sink = Popen(['python', 'sinks/dummy_sink.py'], stdout=None)
        #start a ventilator
        self.V = Ventilator()
        # wait for workers and sinks to connect
        time.sleep(1)

    def test_send_unicode(self):
        '''
        Pushing unicode strings through workers to sinks.
        '''

        self.V.push_load([u'são joão' for i in xrange(80)])
        time.sleep(1)
        #[p.wait() for p in self.ws]#wait for the workers to terminate
        wsr = [p.poll() for p in self.ws]
        while None in wsr:
            print wsr, [p.pid for p in self.ws if p.poll() == None] #these are the unfinished workers
            time.sleep(0.5)
            wsr = [p.poll() for p in self.ws]
        self.sink.wait()
        self.sink = self.sink.returncode
        self.assertEqual([0]*self.nw, wsr)
        self.assertEqual(0, self.sink)

if __name__ == '__main__':
    unittest.main()
4

1 に答える 1

1

すべてのメッセージングのものは、最終的にはハートビートになります。(ワーカーやシンクなどとして)作業する必要のあるコンポーネントが死んでいることに気付いた場合は、基本的に別の場所に接続するか、自殺することができます。したがって、作業者としてシンクがもうないことに気付いた場合は、終了してください。これは、シンクがまだ存在していても接続が切断されていても、終了できることも意味します。しかし、もっと多くのことができるかどうかはわかりません。おそらく、すべてのタイムアウトをより合理的に設定してください...

于 2013-01-15T01:30:18.217 に答える