0

私は Akka Streams Kafka アプリケーションを実行しています。ブローカが停止し、ストリーム コンシューマが停止タイムアウト後に停止した場合、スーパーバイザがコンシューマを再起動できるように、ストリーム コンシューマに監視戦略を組み込みたいと考えています。

ここに私の完全なコードがあります:

UserEventStream:

import akka.actor.{Actor, PoisonPill, Props}
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.kafka.scaladsl.Consumer
import akka.stream.scaladsl.Sink
import akka.util.Timeout
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, StringDeserializer}

import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}
import akka.pattern.ask
import akka.stream.ActorMaterializer

class UserEventStream extends Actor {

  val settings = Settings(context.system).KafkaConsumers
  implicit val timeout: Timeout = Timeout(10 seconds)
  implicit val materializer = ActorMaterializer()

  override def preStart(): Unit = {
    super.preStart()
    println("Starting UserEventStream....s")
  }
  override def receive = {
    case "start" =>
      val consumerConfig = settings.KafkaConsumerInfo
      println(s"ConsumerConfig with $consumerConfig")
      startStreamConsumer(consumerConfig("UserEventMessage" + ".c" + 1))
  }

  def startStreamConsumer(config: Map[String, String]) = {
    println(s"startStreamConsumer with config $config")

    val consumerSource = createConsumerSource(config)
    val consumerSink = createConsumerSink()
    val messageProcessor = context.actorOf(Props[MessageProcessor], "messageprocessor")

    println("START: The UserEventStream processing")
    val future =
      consumerSource
        .mapAsync(parallelism = 50) { message =>
          val m = s"${message.record.value()}"
          messageProcessor ? m
        }
        .runWith(consumerSink)
    future.onComplete {
      case Failure(ex) =>
        println("FAILURE : The UserEventStream processing, stopping the actor.")
        self ! PoisonPill
      case Success(ex) =>
    }
  }

  def createConsumerSource(config: Map[String, String]) = {
    val kafkaMBAddress = config("bootstrap-servers")
    val groupID = config("groupId")
    val topicSubscription = config("subscription-topic").split(',').toList
    println(s"Subscriptiontopics $topicSubscription")

    val consumerSettings = ConsumerSettings(context.system, new ByteArrayDeserializer, new StringDeserializer)
      .withBootstrapServers(kafkaMBAddress)
      .withGroupId(groupID)
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
      .withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")

    Consumer.committableSource(consumerSettings, Subscriptions.topics(topicSubscription: _*))
  }

  def createConsumerSink() = {
    Sink.foreach(println)
  }
}  

StreamProcessorSupervisor(これはクラスのスーパーバイザー クラスですUserEventStream):

import akka.actor.{Actor, Props}
import akka.pattern.{Backoff, BackoffSupervisor}
import akka.stream.ActorMaterializer
import stream.StreamProcessorSupervisor.StartClient
import scala.concurrent.duration._

object StreamProcessorSupervisor {
  final case object StartSimulator
  final case class StartClient(id: String)
  def props(implicit materializer: ActorMaterializer) =
    Props(classOf[StreamProcessorSupervisor], materializer)
}

class StreamProcessorSupervisor(implicit materializer: ActorMaterializer) extends Actor {
  override def preStart(): Unit = {
    self ! StartClient(self.path.name)
  }

  def receive: Receive = {
    case StartClient(id) =>
      println(s"startCLient with id $id")
      val childProps = Props(classOf[UserEventStream])
      val supervisor = BackoffSupervisor.props(
        Backoff.onFailure(
          childProps,
          childName = "usereventstream",
          minBackoff = 1.second,
          maxBackoff = 1.minutes,
          randomFactor = 0.2
        )
      )
      context.actorOf(supervisor, name = s"$id-backoff-supervisor")
      val userEventStrean = context.actorOf(Props(classOf[UserEventStream]),"usereventstream")
      userEventStrean ! "start"
  }
}

App(メイン アプリケーション クラス):

import akka.actor.{ActorSystem, Props}
import akka.stream.ActorMaterializer

object App extends App {

  implicit val system = ActorSystem("stream-test")
  implicit val materializer = ActorMaterializer()

  system.actorOf(StreamProcessorSupervisor.props,"StreamProcessorSupervisor")
}

application.conf:

kafka {

  consumer {

    num-consumers = "1"
    c1 {
      bootstrap-servers = "localhost:9092"
      bootstrap-servers = ${?KAFKA_CONSUMER_ENDPOINT1}
      groupId = "localakkagroup1"
      subscription-topic = "test"
      subscription-topic = ${?SUBSCRIPTION_TOPIC1}
      message-type = "UserEventMessage"
      poll-interval = 50ms
      poll-timeout = 50ms
      stop-timeout = 30s
      close-timeout = 20s
      commit-timeout = 15s
      wakeup-timeout = 10s
      max-wakeups = 10
      use-dispatcher = "akka.kafka.default-dispatcher"
      kafka-clients {
        enable.auto.commit = true
      }
    }
  }
}

アプリケーションを実行した後、意図的に Kafka ブローカーを強制終了しましたが、30 秒後に攻撃者がポイズン ピルを送信して自身を停止していることに気付きました。BackoffSupervisorしかし不思議なことに作戦通りに再起動しない。

ここで何が問題になる可能性がありますか?

4

1 に答える 1