6

私が取り組んでいるプロジェクトでは、SQS からのメッセージを読み取る必要があり、Akka を使用してこれらのメッセージの処理を分散することにしました。

SQS は Camel によってサポートされており、Consumer クラスには Akka で使用するための機能が組み込まれているため、エンドポイントを実装してこの方法でメッセージを読み取るのが最善だと思いましたが、そうしている例はあまり見たことがありませんでした。

私の問題は、キューを空またはほぼ空に保つのに十分な速さでキューをポーリングできないことです。私が最初に考えたのは、X/s の速度で SQS から Camel 経由で Consumer にメッセージを受信させることができるということでした。そこから、必要なメッセージ処理速度に到達するために、さらに多くのコンシューマーを作成するだけで済みます。

私の消費者:

import akka.camel.{CamelMessage, Consumer}
import akka.actor.{ActorRef, ActorPath}

class MyConsumer() extends Consumer {
  def endpointUri = "aws-sqs://my_queue?delay=1&maxMessagesPerPoll=10&accessKey=myKey&secretKey=RAW(mySecret)"
  var count = 0

  def receive = {
    case msg: CamelMessage => {
      count += 1
    }
    case _ => {
      println("Got something else")
    }
  }

  override def postStop(){
    println("Count for actor: " + count)
  }
}

示されているように、メッセージのレートを向上させるために設定delay=1しましたが、同じエンドポイントで複数のコンシューマーを生成することはできません。&maxMessagesPerPoll=10

私はドキュメントを読みましたが、これは SQS エンドポイントにも当てはまると思いますBy default endpoints are assumed not to support multiple consumers.。複数のコンシューマーを生成すると、システムを 1 分間実行した後に.Count for actor: xCount for actor: 0

これがまったく役立つ場合。単一のコンシューマーでのこの現在の実装では、毎秒約 33 メッセージを読み取ることができます。

これは、Akka の SQS キューからメッセージを読み取る適切な方法ですか? もしそうなら、メッセージの消費率を毎秒 900 メッセージに近づけるために、これを外側に拡張する方法はありますか?

4

2 に答える 2

5

悲しいことに、Camel は現在、SQS でのメッセージの並列消費をサポートしていません。

http://camel.465427.n5.nabble.com/Amazon-SQS-listener-as-multi-threaded-td5741541.html

これに対処するために、aws-java-sdk を使用してバッチ メッセージ SQS をポーリングする独自のアクターを作成しました。

  def receive = {
    case BeginPolling => {
      // re-queue sending asynchronously
      self ! BeginPolling
      // traverse the response
      val deleteMessageList = new ArrayList[DeleteMessageBatchRequestEntry]
      val messages = sqs.receiveMessage(receiveMessageRequest).getMessages
      messages.toList.foreach {
        node => {
          deleteMessageList.add(new DeleteMessageBatchRequestEntry(node.getMessageId, node.getReceiptHandle))
          //log.info("Node body: {}", node.getBody)
          filterSupervisor ! node.getBody
        }
      }
      if(deleteEntryList.size() > 0){
        val deleteMessageBatchRequest = new DeleteMessageBatchRequest(queueName, deleteMessageList)
        sqs.deleteMessageBatch(deleteMessageBatchRequest)
      }
    }

    case _ => {
      log.warning("Unknown message")
    }
  }

これが最適な実装であるかどうかはわかりませんが、リクエストが常に空のキューにヒットしないように改善することはもちろん可能ですが、同じキューからメッセージをポーリングできるという現在のニーズに合っています。

これにより、SQS から約 133 (メッセージ/秒)/アクターを取得します。

于 2013-11-06T16:20:20.243 に答える