Job Distributor
別のメッセージを公開する人がいますChannels
。
Consumers
さらに、異なるタスクで動作し、異なるマシンで実行される2人(将来的にはそれ以上)が必要です。(現在、私は1つしか持っておらず、スケーリングする必要があります)
これらのタスクに名前を付けましょう(単なる例):
FIBONACCI
(フィボナッチ数を生成します)RANDOMBOOKS
(本を書くためにランダムな文を生成します)
これらのタスクは最大2〜3時間実行され、それぞれに均等に分割する必要がありますConsumer
。
すべてのコンシューマーは、これらのタスクに取り組むためのx
並列スレッドを持つことができます。だから私は言います:(これらの数字は単なる例であり、変数に置き換えられます)
- マシン1は、3つの並列ジョブ
FIBONACCI
と5つの並列ジョブを消費できます。RANDOMBOOKS
- マシン2は、7つの並列ジョブ
FIBONACCI
と3つの並列ジョブを消費できます。RANDOMBOOKS
どうすればこれを達成できますか?
それぞれをリッスンするx
ために、それぞれのスレッドを開始する必要がありますか?Channel
Consumer
いつそれを確認する必要がありますか?
私の現在のアプローチは1つだけConsumer
です。各タスクのスレッドを開始x
します-各スレッドは、を実装するDefaultconsumerRunnable
です。このhandleDelivery
メソッドでは、呼び出しbasicAck(deliveryTag,false)
てから作業を行います。
さらに:私はいくつかのタスクを特別な消費者に送りたいです。上記のように公正な分配と組み合わせてそれをどのように達成できますか?
これは私のコードですpublishing
String QUEUE_NAME = "FIBONACCI";
Channel channel = this.clientManager.getRabbitMQConnection().createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.basicPublish("", QUEUE_NAME,
MessageProperties.BASIC,
Control.getBytes(this.getArgument()));
channel.close();
これは私のコードですConsumer
public final class Worker extends DefaultConsumer implements Runnable {
@Override
public void run() {
try {
this.getChannel().queueDeclare(this.jobType.toString(), true, false, false, null);
this.getChannel().basicConsume(this.jobType.toString(), this);
this.getChannel().basicQos(1);
} catch (IOException e) {
// catch something
}
while (true) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Control.getLogger().error("Exception!", e);
}
}
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] bytes) throws IOException {
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
this.getChannel().basicAck(deliveryTag, false); // Is this right?
// Start new Thread for this task with my own ExecutorService
}
}
この場合、クラスWorker
は2回開始されます。1回はFIBUNACCI
1回、もう1回はRANDOMBOOKS
アップデート
回答が述べているように、RabbitMQはこれに最適なソリューションではありませんが、CouchbaseまたはMongoDBプルアプローチが最適です。私はそれらのシステムに不慣れですが、これがどのように達成されるかを私に説明できる人はいますか?