3

以下のコードは、実行から 15 分以内に EC2 インスタンスで OOO をスローします (java config xms 1024 xmx2G) が、intellij での実行時にエラーをスローしません。

SqsSource(queueUrl,
      //parallelism = maxBufferSize / maxBatchSize 20 10
      SqsSourceSettings().withWaitTime(10 seconds)
        .withMaxBatchSize(10).withMaxBufferSize(20)
    ).map {
      msg => {
        val out = Source.single(msg)
          .via(messageToLambdaRequest)
          .via(lambdaRequestToLambdaResp)
          .via(lambdaRespToAggregationKeySet)
          .via(workFlow)
          .log("error while consuming events internally.")
          .withAttributes(ActorAttributes.supervisionStrategy(decider))
          .runWith(Sink.seq)

        val reducedResponse = out.map(response => {
          response.foldLeft[Response](OK)((a, b) =>
            if (a == OK && b == OK) OK else NotOK)
        })

        val messageAction = reducedResponse
          .map(res =>
            if (res == OK) {
              //log.info("finally response is OK hence message action delete is prepared. {}.", msg.messageId())
              delete(msg)
            } else
              ignore(msg)
          )
        messageAction
      }
    }
      .mapAsync(1)(identity)
      .withAttributes(ActorAttributes.supervisionStrategy(decider))
      // For the SQS Sinks and Sources the sum of all parallelism (Source) and maxInFlight (Sink)
      // must be less than or equal to the thread pool size.
      .log("error log")
      .runWith(SqsAckSink(queueUrl, SqsAckSettings(1)))

  }

これを 1.0-M3 と 1.0-RC1 の両方で試しました。これの回避策はありますか?

jhat を使用した上位 5 つのオブジェクト作成ヒストグラム -

Class   Instance Count  Total Size
class [C    1376284 2068640582
class software.amazon.awssdk.services.sqs.model.Message 332718  18632208
class java.lang.String  1375675 16508100
class [Lakka.dispatch.forkjoin.ForkJoinTask;    227 14880304
class scala.collection.immutable.$colon$colon   334396  5350336

また、ここで同様の問題を見つけました- https://github.com/akka/alpakka/issues/1588

これを解決するために利用できる代替手段があるかどうか疑問に思っています。

4

2 に答える 2