5

Scala でAkka Streamsを使用して、 AWS Java SDKを使用してAWS SQSキューからポーリングしています。2 秒間隔でメッセージをデキューするActorPublisherを作成しました。

class SQSSubscriber(name: String) extends ActorPublisher[Message] {
  implicit val materializer = ActorMaterializer()

  val schedule = context.system.scheduler.schedule(0 seconds, 2 seconds, self, "dequeue")

  val client = new AmazonSQSClient()
  client.setRegion(RegionUtils.getRegion("us-east-1"))
  val url = client.getQueueUrl(name).getQueueUrl

  val MaxBufferSize = 100
  var buf = Vector.empty[Message]

  override def receive: Receive = {
    case "dequeue" =>
      val messages = iterableAsScalaIterable(client.receiveMessage(new ReceiveMessageRequest(url).getMessages).toList
      messages.foreach(self ! _)
    case message: Message if buf.size == MaxBufferSize =>
      log.error("The buffer is full")
    case message: Message =>
      if (buf.isEmpty && totalDemand > 0)
        onNext(message)
      else {
        buf :+= message
        deliverBuf()
      }
    case Request(_) =>
      deliverBuf()
    case Cancel =>
      context.stop(self)
  }

  @tailrec final def deliverBuf(): Unit =
    if (totalDemand > 0) {
      if (totalDemand <= Int.MaxValue) {
        val (use, keep) = buf.splitAt(totalDemand.toInt)
        buf = keep
        use foreach onNext
      } else {
        val (use, keep) = buf.splitAt(Int.MaxValue)
        buf = keep
        use foreach onNext
        deliverBuf()
      }
    }
}

私のアプリケーションでは、同様に 2 秒間隔でフローを実行しようとしています。

val system = ActorSystem("system")
val sqsSource = Source.actorPublisher[Message](SQSSubscriber.props("queue-name"))
val flow = Flow[Message]
  .map { elem => system.log.debug(s"${elem.getBody} (${elem.getMessageId})"); elem }
  .to(Sink.ignore)

system.scheduler.schedule(0 seconds, 2 seconds) {
  flow.runWith(sqsSource)(ActorMaterializer()(system))
}

ただし、アプリケーションを実行するjava.util.concurrent.TimeoutException: Futures timed out after [20000 milliseconds]と、ActorMaterializer.

Akka Stream を継続的にマテリアライズするための推奨されるアプローチはありますか?

4

1 に答える 1

8

ActorPublisher2秒ごとに新しいものを作成する必要はないと思います。これは冗長でメモリの無駄遣いのようです。また、ActorPublisher は必要ないと思います。コードについて私が知る限り、あなたの実装では、同じデータをクエリするストリームの数が増え続けています。クライアントからのそれぞれMessageが N 個の異なる akka Streams によって処理され、さらに悪いことに、N 個は時間の経過とともに大きくなります。

無限ループ クエリの反復子

scala の を使用して、ActorPublisher から同じ動作を得ることができますIterator。クライアントに継続的にクエリを実行する Iterator を作成することができます。

//setup the client
val client = {
  val sqsClient = new AmazonSQSClient()
  sqsClient setRegion (RegionUtils getRegion "us-east-1")
  sqsClient
}

val url = client.getQueueUrl(name).getQueueUrl

//single query
def queryClientForMessages : Iterable[Message] = iterableAsScalaIterable {
  client receiveMessage (new ReceiveMessageRequest(url).getMessages)
}

def messageListIteartor : Iterator[Iterable[Message]] = 
  Iterator continually messageListStream

//messages one-at-a-time "on demand", no timer pushing you around
def messageIterator() : Iterator[Message] = messageListIterator flatMap identity

この実装は、以前のすべてのメッセージが消費された場合にのみクライアントにクエリを実行するため、真にリアクティブです。固定サイズのバッファを追跡する必要はありません。メッセージの作成 (タイマーによる) はメッセージの消費 (println による) から切り離されているため、ソリューションにはバッファーが必要です。私の実装では、作成と消費は背圧によって密接に結合されています。

Akka ストリーム ソース

次に、この Iterator ジェネレーター関数を使用して、akka ストリーム Source をフィードできます。

def messageSource : Source[Message, _] = Source fromIterator messageIterator

流れの形成

そして最後に、この Source を使用して実行できますprintln(補足として: あなたのflow値は実際にはSinksinceFlow + Sink = Sinkです)。flow質問の値を使用して:

messageSource runWith flow

1 つの akka Stream がすべてのメッセージを処理します。

于 2015-11-23T14:03:50.390 に答える