私が取り組んでいるプロジェクトでは、SQS からのメッセージを読み取る必要があり、Akka を使用してこれらのメッセージの処理を分散することにしました。
SQS は Camel によってサポートされており、Consumer クラスには Akka で使用するための機能が組み込まれているため、エンドポイントを実装してこの方法でメッセージを読み取るのが最善だと思いましたが、そうしている例はあまり見たことがありませんでした。
私の問題は、キューを空またはほぼ空に保つのに十分な速さでキューをポーリングできないことです。私が最初に考えたのは、X/s の速度で SQS から Camel 経由で Consumer にメッセージを受信させることができるということでした。そこから、必要なメッセージ処理速度に到達するために、さらに多くのコンシューマーを作成するだけで済みます。
私の消費者:
import akka.camel.{CamelMessage, Consumer}
import akka.actor.{ActorRef, ActorPath}
class MyConsumer() extends Consumer {
def endpointUri = "aws-sqs://my_queue?delay=1&maxMessagesPerPoll=10&accessKey=myKey&secretKey=RAW(mySecret)"
var count = 0
def receive = {
case msg: CamelMessage => {
count += 1
}
case _ => {
println("Got something else")
}
}
override def postStop(){
println("Count for actor: " + count)
}
}
示されているように、メッセージのレートを向上させるために設定delay=1
しましたが、同じエンドポイントで複数のコンシューマーを生成することはできません。&maxMessagesPerPoll=10
私はドキュメントを読みましたが、これは SQS エンドポイントにも当てはまると思いますBy default endpoints are assumed not to support multiple consumers.
。複数のコンシューマーを生成すると、システムを 1 分間実行した後に.Count for actor: x
Count for actor: 0
これがまったく役立つ場合。単一のコンシューマーでのこの現在の実装では、毎秒約 33 メッセージを読み取ることができます。
これは、Akka の SQS キューからメッセージを読み取る適切な方法ですか? もしそうなら、メッセージの消費率を毎秒 900 メッセージに近づけるために、これを外側に拡張する方法はありますか?