9

ページをクロールするために人工呼吸器/ワーカー/シンクのパターンを設定しようとしてきましたが、テスト フェーズを通過することはありませんでした。私のセットアップの特徴の 1 つは、シンクが人工呼吸器と同じプロセスにあることです。すべてのノードは ipc:// トランスポートを使用します。現時点では、テスト メッセージのみが交換されます。人工呼吸器がタスクを送信し、ワーカーがタスクを受信して​​待機し、確認をシンクに送信します。

症状: しばらくすると (通常は 5 分以内)、人工呼吸器がタスクを送信し続け、ワーカーがタスクの受信と確認メッセージの送信を続けても、シンクは確認メッセージの受信を停止します。

シンクを再起動すると、不足しているすべてのメッセージが起動時に取得されるため、確認が送信されることはわかっています。

ZeroMQ は自動再接続を扱っていると思いました。

人工呼吸器/シンク

var push = zmq.socket('push');
var sink = zmq.socket('pull');
var pi = 0;
setInterval(function() {
    push.send(['ping', pi++], zmq.ZMQ_SNDMORE);
    push.send('end');
}, 2000);
push.bind('ipc://crawl.ipc');
sink.bind('ipc://crawl-sink.ipc');
sink.on('message', function() {
    var args = [].slice.apply(arguments).map(function(e) {return e.toString()});
    console.log('got message', args.join(' '));
});

worker.js

var pull = zmq.socket('pull');
var sink = zmq.socket('push');
sink.connect(opt.sink);
pull.connect(opt.push);

pull.on('message', function() {
    var args = [].slice.apply(arguments).map(function(e) {return e.toString()});
    console.log('got job ', args.join(' '));
    setTimeout(function() {
        console.log('job done ', args.join(' '));
        sink.send(['job done', args.join(' ')]);
    }, Math.random() * 5 * 1000);
});

編集シンクを別のプロセスに移動しようとしましたが、うまくいくようです。ただし、同じプロセスに存在することを本当に望んでおり、使用されるパターンに関係なく、プロセスごとに複数のzmqソケットを処理するときに同様の動作を観察しました

編集私はこのモジュールを使用していますhttps://github.com/JustinTulloss/zeromq.node

4

2 に答える 2

4

この回答が受け入れられるとは限りませんが、参考のためにここに配置します。ZeroMQに触発されたAxonと呼ばれる非常に忠実なノードのみのモジュールがあります。

  • Axonにはコンパイルされた依存関係がなく、ZeroMQと同じソケットタイプを再作成します。
  • Axonは、pub / subソケットタイプに基づいて構築され、ネットワークイベントエミッターを作成します。
  • 最後に、ZMQは応答が同期的に発生することを想定しているため、ZMQのreq/repソケットはNode.jsでは機能しません。ネイティブノードであるAxonライブラリは、req/repパターンを適切に処理します。

注: ZMQとAxonは相互運用できません。

于 2013-02-05T17:16:11.353 に答える
2

基本の AMQP クライアントを使用しているか、それを内部で使用するパッケージを使用しているかはわかりませんが、RabbitMQ で同様の問題が発生しています。実際のプロセスはまだ実行中です (setInterval は機能します)

メインプロセスからcluster.forkを介してサービス/ワーカーを実行しています...メインプロセスには、終了時にワーカー/サービスを再起動するリスナーがあります...ワーカー内には、X秒ごとに実行されるsetIntervalがあります、その間に作業が行われなかった場合、ワーカー process.exit があります (ここで、メイン プロセス リスナーが新しいフォークを起動します)。これは私にとって十分な回復力として機能します。複数のワーカーを実行する (キューをリッスンする) ことで、作業は完了します。

別の提案によると、MQ へのすべてのインターフェイスが現在 Node.js を経由しているため、Axon への切り替えを検討しています。私の他のシステムは、NodeJS 駆動の API サービスを介して接続しています。さらに言えば、API サービスを介して必要なものを公開することは、おそらくそれほど難しくありません。

于 2013-03-05T20:46:53.343 に答える