10

Job Distributor別のメッセージを公開する人がいますChannels

Consumersさらに、異なるタスクで動作し、異なるマシンで実行される2人(将来的にはそれ以上)が必要です。(現在、私は1つしか持っておらず、スケーリングする必要があります)

これらのタスクに名前を付けましょう(単なる例):

  • FIBONACCI(フィボナッチ数を生成します)
  • RANDOMBOOKS(本を書くためにランダムな文を生成します)

これらのタスクは最大2〜3時間実行され、それぞれに均等に分割する必要がありますConsumer

すべてのコンシューマーは、これらのタスクに取り組むためのx 並列スレッドを持つことができます。だから私は言います:(これらの数字は単なる例であり、変数に置き換えられます)

  • マシン1は、3つの並列ジョブFIBONACCIと5つの並列ジョブを消費できます。RANDOMBOOKS
  • マシン2は、7つの並列ジョブFIBONACCIと3つの並列ジョブを消費できます。RANDOMBOOKS

どうすればこれを達成できますか?

それぞれをリッスンするxために、それぞれのスレッドを開始する必要がありますか?ChannelConsumer

いつそれを確認する必要がありますか?

私の現在のアプローチは1つだけConsumerです。各タスクのスレッドを開始xします-各スレッドは、を実装するDefaultconsumerRunnableです。このhandleDeliveryメソッドでは、呼び出しbasicAck(deliveryTag,false)てから作業を行います。

さらに:私はいくつかのタスクを特別な消費者に送りたいです。上記のように公正な分配と組み合わせてそれをどのように達成できますか?

これは私のコードですpublishing

String QUEUE_NAME = "FIBONACCI";

Channel channel = this.clientManager.getRabbitMQConnection().createChannel();

channel.queueDeclare(QUEUE_NAME, true, false, false, null);

channel.basicPublish("", QUEUE_NAME,
                MessageProperties.BASIC,
                Control.getBytes(this.getArgument()));

channel.close();

これは私のコードですConsumer

public final class Worker extends DefaultConsumer implements Runnable {
    @Override
    public void run() {

        try {
            this.getChannel().queueDeclare(this.jobType.toString(), true, false, false, null);
            this.getChannel().basicConsume(this.jobType.toString(), this);

            this.getChannel().basicQos(1);
        } catch (IOException e) {
            // catch something
        }
        while (true) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                Control.getLogger().error("Exception!", e);
            }

        }
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] bytes) throws IOException {
        String routingKey = envelope.getRoutingKey();
        String contentType = properties.getContentType();
        this.getChannel().basicAck(deliveryTag, false); // Is this right?
        // Start new Thread for this task with my own ExecutorService

    }
}

この場合、クラスWorkerは2回開始されます。1回はFIBUNACCI1回、もう1回はRANDOMBOOKS

アップデート

回答が述べているように、RabbitMQはこれに最適なソリューションではありませんが、CouchbaseまたはMongoDBプルアプローチが最適です。私はそれらのシステムに不慣れですが、これがどのように達成されるかを私に説明できる人はいますか?

4

5 に答える 5

7

これは、couchbaseでこれを構築する方法の概念図です。

  1. ジョブを処理するためのマシンがいくつかあり、実行するジョブを作成するマシン(おそらく同じマシン)がいくつかあります。
  2. Couchbaseのバケット内の各ジョブのドキュメントを作成できます(そのタイプを「ジョブ」などに設定して、そのバケット内の他のデータと混合する場合)。
  3. 各ジョブの説明には、実行する特定のコマンドとともに、作成された時間、期限(特定の期限がある場合)、および生成された作業値の種類を含めることができます。この作業値は任意の単位になります。
  4. ジョブの各コンシューマーは、一度に実行できるワークユニットの数と、使用可能なワークユニットの数を知っています(他のワーカーが作業している可能性があるため)。
  5. したがって、たとえば、容量が10作業単位で、6作業単位が実行されているマシンは、4作業単位以下のジョブを検索するクエリを実行します。
  6. couchbaseには、段階的に更新されるmap / reduceジョブであるビューがあります。ここでは、マップフェーズのみが必要になると思います。期限、システムに入力された時間、および作業単位の数を照会できるビューを作成します。このようにして、「4ワークユニット以下の最も遅れている仕事」を得ることができます。
  7. この種のクエリは、容量が解放されると、最初に最も延滞しているジョブを取得しますが、最大の延滞ジョブを取得できます。延滞しているジョブがない場合は、最大の延滞していないジョブを取得できます。(ここで、「延滞」とは、現在の時刻とジョブの期日との間のデルタです。)
  8. Couchbaseビューでは、このような非常に高度なクエリが可能です。また、段階的に更新されますが、完全にリアルタイムではありません。したがって、単一の仕事を探すのではなく、仕事の候補者のリストを探します(好きなように並べ替えます)。
  9. したがって、次のステップは、求職者のリストを取得し、2番目の場所(ロックファイルの場合はmembaseバケット(例:RAMキャッシュ、非永続的))を確認することです。ロックファイルには複数のフェーズがあります(ここでは、CRDTまたはニーズに最適な方法を使用してパーティション解決ロジックを少し実行します)。
  10. このバケットはRAMベースであるため、ビューよりも高速で、全体の状態からのラグが少なくなります。ロックファイルがない場合は、ステータスフラグが「暫定」のロックファイルを作成します。
  11. 別のワーカーが同じジョブを取得してロックファイルを確認した場合、そのワーカーはそのジョブ候補をスキップして、リストの次のジョブを実行できます。
  12. どういうわけか2人のワーカーが同じジョブのロックファイルを作成しようとすると、競合が発生します。衝突の場合、あなたはただパントすることができます。または、各ワーカーがロックファイルを更新するロジック(CRDTの解決により、兄弟をマージできるようにこれらのべき等元を作成する)を使用して、乱数または優先順位の数値を入力することもできます。
  13. 指定された時間(おそらく数秒)後、ロックファイルはワーカーによってチェックされ、レース解像度の変更を行う必要がない場合は、ロックファイルのステータスが「暫定」から「取得済み」に変更されます。 「」
  14. 次に、他のワーカーが利用可能なジョブを探しているときにビューに表示されないように、ジョブ自体を「実行済み」などのステータスで更新します。
  15. 最後に、上記で説明したこれらの求人候補を取得するためのクエリを実行する前に、実行されたが関係するワーカーが死亡したジョブを見つけるために特別なクエリを実行する別のステップを追加する必要があります。(例:延滞している仕事)。
  16. ワーカーがいつ死ぬかを知る1つの方法は、membaseバケットに配置されたロックファイルには、最終的には消える有効期限が必要であるということです。おそらく今回は短い可能性があり、ワーカーはそれに触れるだけで有効期限を更新します(これはcouchbase APIでサポートされています)
  17. ワーカーが死亡すると、最終的にそのロックファイルは消滅し、孤立したジョブは「実行済み」としてマークされますが、ロックファイルはありません。これは、ジョブを探しているワーカーが検索できる条件です。

したがって、要約すると、各ワーカーは孤立したジョブのクエリを実行し、孤立したジョブがある場合は、それらのロックファイルがあるかどうかを確認し、ロックファイルがない場合は作成し、上記の通常のロックプロトコルに従います。孤立したジョブがない場合は、期限切れのジョブを探し、ロックプロトコルに従います。期限切れのジョブがない場合は、最も古いジョブを取得し、ロックプロトコルに従います。

もちろん、これは、システムに「期限切れ」などがない場合にも機能します。適時性が重要でない場合は、最も古いジョブを実行する代わりに、別の方法を使用できます。

もう1つの方法は、1-Nの間にランダムな値を作成することです。ここで、Nは適度に大きな数、たとえばワーカー数の4倍であり、各ジョブにその値のタグを付けます。労働者が仕事を探しに行くたびに、サイコロを振って、その番号の仕事があるかどうかを確認することができます。そうでない場合は、その番号の仕事が見つかるまで、再度そうします。このように、複数のワーカーが少数の「最も古い」または最も優先度の高いジョブを争う代わりに、ロック競合の可能性が高くなります。 。

ランダムな方法は、対応する必要のある負荷値がある場合にも適用でき(1台のマシンが過度の負荷をかけないようにするため)、最も古い候補を取得する代わりに、ランダムな候補を取得します。実行可能な仕事のリストとそれをやってみてください。

編集して追加:

私が「おそらく乱数を入れる」と言うステップ12で、私が意味するのは、労働者が優先順位(たとえば、どれが最も仕事をする必要があるか)を知っている場合、これを表す図をファイルに入れることができるということです。仕事を「必要とする」という概念がない場合、彼らは両方ともサイコロを振ることができます。彼らはこのファイルをサイコロの役割で更新します。次に、両方がそれを見て、もう一方が何をロールバックしたかを確認できます。彼らが負けた場合、彼らはパントし、他の労働者はそれがそれを持っていることを知っています。このようにして、多くの複雑なプロトコルやネゴシエーションなしで、どのワーカーが仕事を引き受けるかを解決できます。ここでは、両方のワーカーが同じロックファイルにアクセスしていると想定しています。これは、2つのロックファイルと、それらすべてを検索するクエリを使用して実装できます。しばらくすると、

于 2013-03-16T22:43:01.483 に答える
5

まず、RabbitMQとの通信にJavaを使用したことがないため、コード例を提供することはできません。しかし、それはあなたが求めていることではないので、それは問題ではないはずです。この質問は、アプリケーションの一般的な設計に関するものです。

ここで多くの質問が行われているので、少し分解してみましょう。

異なる消費者のタスクを分割する

これを行う1つの方法はラウンドロビンを使用することですが、これはかなり粗雑であり、タスクが異なれば終了までに時間がかかる可能性があることを考慮していません。じゃあ何をすればいいの。これを行う1つの方法は、をに設定するprefetchこと1です。プリフェッチとは、コンシューマーがメッセージをローカルにキャッシュすることを意味します(注:メッセージはまだ消費されていません)。この値を1に設定すると、プリフェッチは発生しません。これは、消費者が現在作業中のメッセージについてのみ認識し、メモリ内に保持することを意味します。これにより、ワーカーがアイドル状態のときにのみメッセージを受信できるようになります。

いつ確認するか

上記の設定では、キューからメッセージを読み取り、それをスレッドの1つに渡してから、メッセージを確認することができます。使用可能なすべてのスレッドに対してこれを実行します-1。最後のメッセージを確認したくないのは、それは、まだワーカーの1人に渡すことができない別のメッセージを受信するために開かれることを意味するためです。スレッドの1つが終了すると、そのメッセージを確認したときに、この方法で常にスレッドが何かを処理できるようになります。

特別なメッセージを伝える

これはあなたが何をしたくないかにもよりますが、一般的にあなたのプロデューサーは彼らが何を伝えているのかを知っているべきだと思います。これは、メッセージを特定の取引所に送信できるか、特定のルーティングキーを使用して送信できることを意味します。このルーティングキーは、このメッセージを適切なキューに渡し、消費者がそのメッセージの処理方法を知っていることを聞きます。

AMQPとRabbitMQを読むことをお勧めします。これは、良い出発点になる可能性があります。

警告

私の提案とあなたのデザインには大きな欠陥が1つありますACK。それは、実際に処理を完了する前にメッセージを送信することです。これは、アプリケーションが壊れた場合(そうでない場合)、ACKedメッセージを再作成する方法がないことを意味します。これは、開始するスレッドの数が事前にわかっている場合に解決できます。プリフェッチカウントを動的に変更できるかどうかはわかりませんが、どういうわけかそれは疑わしいです。

いくつかの考え

私のRabbitMQの経験は限られていますが、交換やキューを作成することを恐れてはいけません。正しく実行すれば、アプリケーションの設計を大幅に改善および簡素化できます。たぶん、大量のコンシューマースレッドを開始するアプリケーションを用意するべきではありません。代わりに、システムで使用可能なメモリなどに基づいてコンシューマーを起動する、ある種のラッパーが必要になる場合があります。そうすれば、アプリケーションがクラッシュした場合でも、メッセージが失われないようにすることができます。そのようにすると、もちろん、メッセージが終了したときにメッセージを確認することになります。

おすすめの読み物

不明な点がある場合、またはあなたの主張が欠けている場合はお知らせください。可能な場合は、回答を拡大するか、改善するよう努めます。

于 2013-03-13T23:45:22.947 に答える
3

これがあなたの質問に対する私の考えです。@Danielが彼の回答で述べたように、これは実装よりもアーキテクチャの原則の問題であると私は信じています。アーキテクチャが明確になると、実装は簡単になります。

まず、スケジューリング理論に関連することを取り上げたいと思います。ここには非常に長時間実行されるタスクがあり、それらが適切な方法でスケジュールされていない場合、(a)サーバーをフルキャパシティー未満で実行するか、(b)タスクの完了に他の方法よりもはるかに長い時間がかかることになります。 。だから、私はあなたのスケジューリングパラダイムに関連してあなたにいくつかの質問があります:

  1. 各仕事にかかる時間を見積もる能力はありますか?
  2. ジョブには期限が関連付けられていますか?ある場合、それはどのように決定されますか?

この場合、RabbitMQは適切ですか?

RabbitMQが、非常に長時間実行されるジョブをディスパッチするための適切なソリューションであるとは思いません。実際、RabbitMQがその仕事に適したツールではないという事実の結果として、あなたはこれらの質問をしていると思います。デフォルトでは、次に処理する必要があるジョブを決定するためにキューからジョブを削除する前に、ジョブについて十分な洞察がありません。次に、@ Danielの回答で述べたように、RabbitMQサーバーへの接続が失敗するたびにジョブが再キューに入れられるのはおそらく悪いことなので、組み込みのACKメカニズムを使用できない可能性があります。

代わりに、ジョブの「キュー」を保存するために、MongoDBやCouchbaseのようなものを探します。そうすれば、RabbitMQによって適用される組み込みのラウンドロビンに依存するのではなく、ディスパッチングロジックを完全に制御できます。

その他の考慮事項:

さらに、異なるタスクに取り組み、異なるマシンで実行する2つ(および将来的にはそれ以上)のコンシューマーが必要です。(現在、私は1つしか持っておらず、スケーリングする必要があります)

この場合、プッシュベースのコンシューマーは使用したくないと思います。代わりに、プルベースのシステムを使用してください(RabbitMQでは、これはBasic.Getと呼ばれます)。これを行うことにより、ジョブスケジューリングの責任を負うことになります

コンシューマー1には、FIBONACCI用に3つのスレッドがあり、RANDOMBOOKS用に5つのスレッドがあります。コンシューマー2には、FIBONACCI用に7つのスレッドがあり、RANDOMBOOKS用に3つのスレッドがあります。どうすればこれを達成できますか?

この場合、私にはわかりません。あなたは1つfibonacciの仕事を持っていますか、そしてあなたはどういうわけかあなたのサーバー上でそれを並行して実行していますか?それとも、サーバーがfibonacci同時に多くのジョブを実行することを望みますか?後者を想定すると、サーバー上で作業を行うためのスレッドを作成し、すべてのスレッドがいっぱいになるまでそれらにジョブを割り当てます。スレッドが使用可能になると、キューをポーリングして別のジョブを開始します。

あなたが持っていた他の質問:

  • 各コンシューマーでリッスンするには、チャネルごとにxスレッドを開始する必要がありますか?
  • いつそれを確認する必要がありますか?
  • たった1人のコンシューマーに対する私の現在のアプローチは次のとおりです。
  • タスク-各スレッドは、Runnableを実装するDefaultconsumerです。handleDeliveryメソッドで、basicAck(deliveryTag、false)を呼び出してから、作業を行います。
  • さらに:私はいくつかのタスクを特別な消費者に送りたいです。上記のように公正な分配と組み合わせてそれをどのように達成できますか?

上記のように、ディスパッチングの責任をRabbitMQサーバーから個々のコンシューマーに移すと、上記の質問はなくなります(コンシューマーとは、スレッドを消費することを意味します)。さらに、よりデータベース駆動型のもの(Couchbaseなど)を使用する場合は、これらを自分でプログラムすることができ、ロジックを完全に制御できます。

Couchbaseの使用

Couchbaseをキューとして使用する方法の詳細な説明はこの質問の範囲を超えていますが、いくつかの指針を提供することができます。

  • まず、 Couchbaseで読みたいと思うでしょう
  • ジョブをCouchbaseバケットに保存し、インデックス付きのビューを使用して利用可能なジョブを一覧表示することをお勧めします。各ジョブのキーを定義する方法には多くのオプションがありますが、ジョブ自体をJSONにシリアル化する必要があります。おそらくServiceStack.Textを使用します
  • ジョブがプルされて処理される場合、Couchbaseでジョブのステータスをマークするためのロジックが必要になります。CASメソッドを使用して、他の誰かがあなたと同じ時間に処理するための仕事を引き受けていないことを確認する必要があります。
  • キューから失敗したジョブと完了したジョブをクリアするための何らかのポリシーが必要になります。

概要

  1. これにはRabbitMQを使用しないでください
  2. 各ジョブのパラメーターを使用して、インテリジェントなディスパッチングアルゴリズムを考案します。あなたの仕事の性質についてもっと知ったら、私はこれを手伝うことができます。
  3. サーバーからジョブをプッシュするのではなく、#2のアルゴリズムに基づいてワーカーにジョブをプルします。
  4. システム全体のジョブのステータス(キューに入れられた、実行中、失敗した、成功したなど)と、停止したジョブをいつ/再ディスパッチするかを追跡する独自の方法を考え出します。
于 2013-03-14T00:26:54.747 に答える
1

スプリングを使用している場合、またはスプリングを使用する意思がある場合は、スプリングリスナーコンテナサポートを使用してそれを実現できます。それはあなたが探しているのと同様のコールバックの種類のプログラミングモデルを提供します。

SpringAMQPリファレンスドキュメントのサンプルコード

@Configuration
public class ExampleAmqpConfiguration {

    @Bean
    public MessageListenerContainer messageListenerContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(rabbitConnectionFactory());
        container.setQueueName("some.queue");
        container.setMessageListener(exampleListener());
        return container;
    }

    @Bean
    public ConnectionFactory rabbitConnectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        return connectionFactory;
    }

    @Bean
    public MessageListener exampleListener() {
        return new MessageListener() {
            public void onMessage(Message message) {
                System.out.println("received: " + message);
            }
        };
    }
}
于 2012-09-05T09:21:32.170 に答える
0

最近、コールバックがコンシューマー実装に送信される方法を変更するブランチbug18384をプッシュしました。

この変更に続いて、Connectionは、コンシューマーにコールバックを送信するために使用されるディスパッチスレッドを維持します。これにより、コンシューマーは接続とチャネルのブロッキングメソッドを呼び出すことができます。

これを構成可能にして、カスタムエグゼキューターをConnectionFactoryにプラグインできるようにすることについて、Twitterで質問がありました。なぜこれが複雑なのかを概説し、可能な実装について話し合い、多くの関心があるかどうかを確認したいと思いました。

まず、各コンシューマーが単一のスレッドでのみコールバックを受信する必要があることを確立する必要があります。そうでない場合は、混乱が生じ、コンシューマーは初期化の安全性を超えて自分のスレッドの安全性について心配する必要があります。

すべてのコンシューマーに対して単一のディスパッチスレッドしかないため、このコンシューマースレッドのペアリングは簡単に実行できます。

複数のスレッドを導入する場合、各コンシューマーが1つのスレッドのみとペアになっていることを確認する必要があります。Executor抽象化を使用する場合、これにより、使用されるスレッドを保証できないため、各コールバックディスパッチがRunnableにラップされてExecutorに送信されるのを防ぎます。

これを回避するために、エグゼキュータは「n」個の長時間実行タスクを実行するように設定できます(nはエグゼキュータ内のスレッドの数です)。これらの各タスクは、ディスパッチ命令をキューから引き出して実行します。各コンシューマーは、おそらくラウンドロビンベースで割り当てられた1つのディスパッチ命令キューとペアになっています。これはそれほど複雑ではなく、エグゼキュータ内のスレッド間でディスパッチ負荷の単純なバランスを提供します。

今、まだいくつかの問題があります:

  1. エグゼキュータ内のスレッドの数は、必ずしも固定されているわけではありません(ThreadPoolExecutorの場合のように)。
  2. ExecutorまたはExecutorServiceを介して、スレッドの数を確認する方法はありません。したがって、作成するディスパッチ命令キューの数を知ることはできません。

ただし、確かにConnectionFactory.setDispatchThreadCount(int)を導入することはできます。舞台裏では、Executors.newFixedThreadPool()と、正しい数のディスパッチキューおよびディスパッチタスクが作成されます。

私はこれを解決するためのより簡単な方法を見落としていると誰かが思っているかどうか、そして実際にこれが解決する価値があるかどうかを聞くことに興味があります。

于 2013-03-20T10:38:26.507 に答える