0

Scala 2.11.8 で akka 2.4.7 を使用しています。

イベントは extractShardId と extractEntityId に送信されますが、アクターの receive メソッドには伝達されません。それが何であるかについてのアイデアはありますか?

https://bitbucket.org/kuzelac/apt-billing/overviewブランチの価格-シャード

クラスタ シャードを定義しました。

val dailyPriceAggregateActor: ActorRef = ClusterSharding(context.system).start(
typeName = "DailyPriceAggregateActor",
entityProps = DailyPriceAggregateActor(),
settings = ClusterShardingSettings(context.system),
extractEntityId = DailyPriceAggregateActor.extractEntityId,
extractShardId = DailyPriceAggregateActor.extractShardId)

アクターオブジェクトは

object DailyPriceAggregateActor {
  def apply() = Props(classOf[DailyPriceAggregateActor])

val extractEntityId: ShardRegion.ExtractEntityId = {
  case e@DailyPriceSaved(userId, unitId, _, _, _) => (s"$userId$unitId", e)
  case e@LookupPriceForDay(userId, unitId, _) => (s"$userId$unitId", e)
}

val extractShardId: ShardRegion.ExtractShardId = {
  case _ => "one"
}
}

confをに設定しました

akka.actor.provider = "akka.cluster.ClusterActorRefProvider"

アクターは永続クエリによってシードされています

  def startSync(actor: ActorRef) = {
    val queries = PersistenceQuery(context.system).readJournalFor[ScalaDslMongoReadJournal](MongoReadJournal.Identifier)

    val src: Source[EventEnvelope, NotUsed] =
  queries.eventsByPersistenceId(PriceAggregateActor.persistenceId, 0L, Long.MaxValue)

    src.runForeach(actor ! _.event)
  }
4

1 に答える 1