原則として、少なくともクローラーと同じように機能するジョブ/タスク ディストリビューターを構築しています。ここに私が学んだいくつかのことがあります:
すべてのイベントを定義
サーバーとクローラー間の通信は、サーバーからクローラーへの作業のディスパッチや、サーバーへのハートビート メッセージの送信など、システムで発生するさまざまな事柄に基づいています。システムのイベント タイプを定義します。それらはユースケースです:
DISPATCH_WORK_TO_CRAWLER_EVENT
CRAWLER_NODE_STATUS_EVENT
...
メッセージ標準を定義する
サーバーとクローラー間のすべての通信は ZMsg を使用して行う必要があるため、次のようなフレームを編成する標準を定義します。
Frame1: "Crawler v1.0" //this is a static header
Frame2: <event type> //ex: "CRAWLER_NODE_STATUS_EVENT"
Frame3: <content xml/json/binary> //content that applies to this event (if any)
すべてのメッセージが従わなければならない標準的な規則があるため、ピア間で受信した ZMsgs を検証するメッセージ バリデーターを作成できるようになりました。
サーバ
ROUTER
クローラーとの非同期および双方向通信には、サーバー上で単一の を使用します。また、PUB
ハートビート メッセージのブロードキャストにソケットを使用します。
ROUTER ソケットでブロックしないでください。a を使用しPOLLER
て 5 秒ごとにループするなどしてください。これにより、サーバーはハートビート イベントをクローラーにブロードキャストするなど、他のことを定期的に実行できます。このようなもの:
Socket rtr = .. //ZMQ.ROUTER
Socket pub = .. //ZMQ.PUB
ZMQ.Poller poller = new ZMQ.Poller(2)
poller.register( rtr, ZMQ.Poller.POLLIN)
poller.register( pub, ZMQ.Poller.POLLIN)
while (true) {
ZMsg msg = null
poller.poll(5000)
if( poller.pollin(0)){
//messages from crawlers
msg = ZMsg.recvMsg(rtr)
}
//send heartbeat messages
ZMsg hearbeatMsg = ...
//create message content here,
//publish to all crawlers
heartbeatMsg.send(pub)
}
ワーカーの認識に関する質問に答えるには、FIFO スタックとハートビート メッセージを使用する簡単で効果的な方法があります。このようなもの:
- サーバーは単純な FIFO スタックをメモリに保持します
- サーバーはハートビートを送信します。クローラーはノード名で応答します。ROUTER は自動的にノードのアドレスもメッセージに挿入します (メッセージのエンベロープを参照してください) 。
- ノード名とノード アドレスを含むスタックに 1 つのオブジェクトをプッシュします。
- サーバーが作業をクローラーにディスパッチしたい場合は、スタックから次のオブジェクトをポップし、メッセージを作成し、アドレスを (ノード アドレスを使用して) 適切に作成し、そのワーカーに送信します。
- 同じ方法で他のクローラーにさらに作業をディスパッチします。クローラーがサーバーに応答したら、ノード名/アドレスを持つ別のオブジェクトをスタックにプッシュします。他のワーカーは応答するまで利用できないため、気にする必要はありません。
これは、やみくもに作業を送信するのではなく、ワーカーの可用性に基づいて作業を分散するための単純ですが効果的な方法です。lbbroker.phpの例を確認してください。概念は同じです。
クローラー (ワーカー)
ワーカーは、単一のDEALER
ソケットとSUB
. はDEALER
非同期通信のメイン ソケットであり、SUB はサーバーからのハートビート メッセージをサブスクライブします。ワーカーは、ハートビート メッセージを受信すると、DEALER ソケットでサーバーに応答します。
Socket dlr = .. //ZMQ.DEALER
Socket sub = .. //ZMQ.SUB
ZMQ.Poller poller = new ZMQ.Poller(2)
poller.register( dlr, ZMQ.Poller.POLLIN)
poller.register( sub, ZMQ.Poller.POLLIN)
while (true) {
ZMsg msg = null
poller.poll(5000)
if( poller.pollin(0)){
//message from server
msg = ZMsg.recvMsg(dlr)
}
if( poller.pollin(1)){
//heartbeat message from server
msg = ZMsg.recvMsg(sub)
//reply back with status
ZMsg statusMsg = ...
statusMsg.send(dlr)
}
残りは自分で把握できます。PHP の例に取り組み、何かを構築し、それを壊し、さらに構築してください。それがあなたが学ぶ唯一の方法です!
楽しんでください、お役に立てば幸いです!