1

0mq pub/sub のパブリッシャーをノードで冗長にするために使用される一般的なパターンはありますか? 動機は、定期的に失敗したり再起動したりする可能性のあるパブリッシャーで複数のプロセスを実行できるようにすることです。

私の最初の考えは、マスターにフォワーダーを作成し、ワーカー パブリッシャーからそれに接続することです。

var cluster = require('cluster')
  , zmq = require('zmq')
  , endpointIn = 'ipc:///tmp/cluster_pub_sub'
  , endpointOut = 'tcp://127.0.0.1:7777';

if (cluster.isMaster) {
  for (var i = 0; i < 2; i++) cluster.fork();
  startPubSubForwarder();
} else {
  startPublisher();
}

function startPublisher() {
  var socket = zmq.socket('pub');
  socket.connect(endpointIn);
  setInterval(function () {
    socket.send('pid=' + process.pid);
  }, 1000);
}

function startPubSubForwarder() {
  var sIn = zmq.socket('sub')
    , sOut = zmq.socket('pub');

  // incoming
  sIn.subscribe('');
  sIn.bind(endpointIn, function (err) {
    if (err) throw err;
  });
  sIn.on('message', function (data) {
    sOut.send(data);
  });

  // outgoing
  sOut.bind(endpointOut, function (err) {
    if (err) throw err;
  });
}

これを行う他の/より良い方法はありますか?

4

2 に答える 2

0

サンプル コードから、XPUB/XSUB 0MQ パターンが最適だと思います。これは、同じ「startPubSubForwarder()」ブロックを実現するより効率的な方法であり、サブスクライバー側は、パブリッシャーのバックエンド側で特定のパターンを直接サブスクライブできるという利点があります。ここに、publishers/xpub-xsub (プロキシ方式)/subscriptors の例のリンクを残しました: https://github.com/krakatoa/node_zmq_workshop/tree/master/03.3_news_proxy。これは NodeJS コードです (これは私のものです。詳しくはお尋ねください!)。

于 2014-05-09T03:30:20.777 に答える
0

あなたの懸念がメッセージの耐久性である場合、複数のパブリッシャーを持つことについてはあまり気にせず、パブリッシャーが死んだときにメッセージが失われないようにすることについてより気にかけていると思います。パブリッシャーをすぐに再起動して、中断したところから再開することができます。また、どのメッセージが正常に送信されたかを知る必要もあります。

これに必要なのは、1) 永続的なストレージと、2) 受信側でメッセージが受信されたこと (および場合によっては処理が完了したこと) をパブリッシャーに通知する手段です。このセットアップは、信頼性の要求を解決するはずです。

大規模化も実現したい場合は、アーキテクチャを少し拡張する必要があります。送信者と受信者が 1:1 である送受信シナリオではより簡単ですが、1:N ラウンドロビン/負荷分散シナリオを実行する必要がある場合はもう少し複雑で、おそらくそれがスケーリングに必要なものです。

スケールアウト シナリオを達成するための私の意見は、次のセットアップを行うことです。

sender_process--(1:1)-->distributor--(1:N)-->receiver_process(es)

ここで、ディストリビューターは送信者からのメッセージの受信を確認してから、受信者プロセスにファンアウトします。

これらの各プロセスの前にキューを配置することで、これを実現することができます。したがって、プロセスに送信しません。プロセスが読み取るキューに送信します。送信者は、ディストリビューター キューにデータを書き込みます。ディストリビューターは、レシーバーのキューにデータを置きます。各ポイントで、各プロセスが処理を試みます。最大再試行回数を超えて失敗した場合は、エラー キューに入れられます。

これらすべてを行うために、rabbitmq / amqp を使用します。ここで 1:1 および 1:N 送信を行うために使用するバスのオープン ソース化を開始しました: https://github.com/mateodelnorte/servicebus。今後数日間で readme とその他のテストを追加する予定です。

于 2012-06-28T18:56:11.873 に答える