0

ConsistentHashingRoutingLogicAkka を使用して、同じキーを持つメッセージが同じアクターにルーティングされることを保証しようとしています。同じキーを持つメッセージが FIFO 順で処理されることが重要です。異なるキーを持つメッセージを異なるアクタにルーティングし、自由に並列処理できます。私は分散モードで Akka を使用していません。

メッセージは実際には RabbitMQ ブローカーから読み取られる JSON メッセージであるため、マスター アクターは AMQP メッセージを受信し、ルーティング キーをメッセージ キーとして使用します。同じキーがメッセージ自体にもあります。アクターは Spring アプリケーションの一部です。

私のマスター アクターは次のようになります。

@Named("MessageHandlerMaster")
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class MessageHandlerMaster extends UntypedActor {

  private static final Logger log = LoggerFactory.getLogger(MessageHandlerMaster.class);

  private Router router;

  @Autowired
  public MessageHandlerMaster(final SpringProps springProps) {

  List<Routee> routees = Stream.generate(() -> {
      ActorRef worker = getContext().actorOf(springProps.create(MessageHandlerWorker.class));
      getContext().watch(worker);
      return new ActorRefRoutee(worker);
    }).limit(5) //todo: configurable number of workers
      .collect(Collectors.toList());

    router = new Router(new ConsistentHashingRoutingLogic(getContext().system()), routees);
  }

  public void onReceive(Object message) {
    if (message instanceof Message) {
      Message amqpMessage = (Message) message;
      String encoding = getMessageEncoding(amqpMessage);
      try {
        String json = new String(amqpMessage.getBody(), encoding);
        String routingKey = amqpMessage.getMessageProperties().getReceivedRoutingKey();
        log.debug("Routing message based on routing key " + routingKey);
        router.route(new ConsistentHashingRouter.ConsistentHashableEnvelope(json, routingKey), getSender());
      } catch (UnsupportedEncodingException e) {
        log.warn("Unknown content encoding sent in message! {}", encoding);
      }
    } else if (message instanceof Terminated) {
      //if one of the routee's died, remove it and replace it
      log.debug("Actor routee terminated!");
      router.removeRoutee(((Terminated) message).actor());
      ActorRef r = getContext().actorOf(Props.create(MessageHandlerWorker.class));
      getContext().watch(r);
      router = router.addRoutee(new ActorRefRoutee(r));
    }
  }

  private static String getMessageEncoding(Message message) {
    String encoding = message.getMessageProperties().getContentEncoding();
    if ((encoding == null) || (encoding.equals(""))) {
      encoding = "UTF-8";
    }
    return encoding;
  }
}

私は最初にマスターを一度取得しています:

this.master = actorSystem.actorOf(springProps.create(MessageHandlerMaster.class), "master");

次に、次の方法でメッセージを送信します。

master.tell(message, ActorRef.noSender());

しかし、ワーカーのログを印刷するonReceive()と、同じキーに対して異なるディスパッチャ スレッドが使用されていることがわかります。

また、マスター アクターとワーカー アクターに同じディスパッチャー スレッドが使用されることがある理由も明確ではありません。これはスレッド間の非同期メッセージ パッシングではないでしょうか。

16:45:13.359 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186
16:45:13.359 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.360 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186

ここでわかるように、キー 10420186 を持つワーカー処理メッセージのディスパッチャー スレッドは、9 の場合もあれば 10 の場合もありました。マスター アクターもこれら 2 つのスレッドを使用する場合がありました。

ConsistentHashingRoutingLogicが実際に機能していて、同じスレッドが同じキーでメッセージを処理していることを確認するにはどうすればよいですか? ルーターの初期化で何か間違ったことをしていますか?

4

1 に答える 1

0

だから@vrudkovskは彼のコメントに正しい. スレッドとアクターの間で混乱していると思います。アクターは、アドレスとメールボックスを持つメモリ内の単なるオブジェクトです。ディスパッチャは基本的に、アクターでアクションを実行するスレッド プールです。アクションの例は次のとおりです。

  • アクターで処理するためにメールボックスからメッセージをデキューする
  • メッセージをメールボックスにエンキューします。

異なるスレッドが同じアクターに対してアクションを実行できます。それは派遣会社が決めることです。Akka は、アクター内で一度に 1 つのスレッドだけがメッセージを処理することを保証します。これは、常に同じスレッドになるという意味ではありません。

同じ. _ context.self.path_ context.self.path.addressActorSystem

于 2016-10-20T16:46:50.493 に答える