2

Akka Cluster を使用して冗長性を実現し、Java アプリケーションの単一障害点を取り除きます。ただし、あるマシンから別のマシンにメッセージを送信するときに、メッセージのシリアル化に問題があります。

を実装していないJava オブジェクトを送信していますSerializable。そのため、独自のシリアライザーを使用する必要があり、幸運にもオブジェクトは Kryo を使用したシリアライゼーション メソッドを提供します。したがって、このガイドに従って、拡張する独自の akka シリアライザーを作成しましたJSerializer

import akka.serialization.JSerializer;

public class MyObjectClusterSerializer extends JSerializer {

    public int identifier() {
        return 66666;
    }

    public boolean includeManifest() {
        return false;
    }

    public byte[] toBinary(Object obj) {
        MyObject myObject = (MyObject) obj;
        byte[] bytes = null;
        try {
            bytes = MyObjectHelper.serialize(myObject);
        } catch (SerializationException e) {

        }
        return bytes;
    }

    @Override
    public Object fromBinaryJava(byte[] arg0, Class<?> arg1) {
        MyObject myObject = null;
        try {
            myObject = MyObjectHelper.deserialize(arg0);
        } catch (DeserializationException e) {
        }
        return myObject;
    }

}

コードは非常に簡単で、それが問題だとは思いません。次に、シリアライザーとシリアライザー バインディングを構成に追加します。

serializers {
  myobject = "com.my.awesome.project.MyObjectClusterSerializer"
}

serialization-bindings {
  "com.my.awesome.project.MyObject" = myobject
}

単一ノードを実行している場合、問題はありません (明らかに、何もシリアライズしていないためです...)。ただし、クラスターモードでは、次の例外が発生し続けます。

    2014-08-04 01:45:27,498 ERROR [MyApp-akka.actor.default-dispatcher-16] - Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$1.apply$mcV$sp (Slf4jLogger.scala:66) - Transient association error (association remains live)
    java.io.NotSerializableException: com.my.awesome.project.MyObject
            at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) ~[?:1.7.0_60]
            at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) ~[?:1.7.0_60]
            at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) ~[?:1.7.0_60]
            at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) ~[?:1.7.0_60]
            at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) ~[?:1.7.0_60]
            at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) ~[?:1.7.0_60]
            at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:129) ~[akka-actor_2.11-2.3.4.jar:?]
            at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129) ~[akka-actor_2.11-2.3.4.jar:?]
            at akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:129) ~[akka-actor_2.11-2.3.4.jar:?]
            at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) ~[scala-library-2.11.1.jar:?]
            at akka.serialization.JavaSerializer.toBinary(Serializer.scala:129) ~[akka-actor_2.11-2.3.4.jar:?]
            at akka.remote.serialization.MessageContainerSerializer.serializeSelection(MessageContainerSerializer.scala:36) ~[akka-remote_2.11-2.3.4.jar:?]
            at akka.remote.serialization.MessageContainerSerializer.toBinary(MessageContainerSerializer.scala:25) ~[akka-remote_2.11-2.3.4.jar:?]
            at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:36) ~[akka-remote_2.11-2.3.4.jar:?]
            at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:845) ~[akka-remote_2.11-2.3.4.jar:?]
            at akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:845) ~[akka-remote_2.11-2.3.4.jar:?]
            at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) ~[scala-library-2.11.1.jar:?]
            at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:844) ~[akka-remote_2.11-2.3.4.jar:?]
            at akka.remote.EndpointWriter.writeSend(Endpoint.scala:747) ~[akka-remote_2.11-2.3.4.jar:?]
            at akka.remote.EndpointWriter$$anonfun$4.applyOrElse(Endpoint.scala:722) ~[akka-remote_2.11-2.3.4.jar:?]
            at akka.actor.Actor$class.aroundReceive(Actor.scala:465) ~[akka-actor_2.11-2.3.4.jar:?]
            at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:415) ~[akka-remote_2.11-2.3.4.jar:?]
            at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) [akka-actor_2.11-2.3.4.jar:?]
            at akka.actor.ActorCell.invoke(ActorCell.scala:487) [akka-actor_2.11-2.3.4.jar:?]
            at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) [akka-actor_2.11-2.3.4.jar:?]
            at akka.dispatch.Mailbox.run(Mailbox.scala:220) [akka-actor_2.11-2.3.4.jar:?]
            at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) [akka-actor_2.11-2.3.4.jar:?]
            at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [scala-library-2.11.1.jar:?]
            at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [scala-library-2.11.1.jar:?]
            at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [scala-library-2.11.1.jar:?]
            at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [scala-library-2.11.1.jar:?]

そして、私にとっては、どのシリアライザーを使用するかを理解できず、実際には汎用の Java シリアライザーを使用していることを意味します。また、関連しているかどうかはわかりませんが、メッセージを ConsistentHashableEnvelope でラップしています。これは、メッセージをクラスター対応のコンシステント ハッシュ グループに送信しているためです。

このことを機能させるための助けはありますか?何か不足していますか?

4

1 に答える 1

3

Akka は、メッセージ クラスごとにシリアライザーを 1 回選択します (スタック トレースを参照)。あなたの場合、ベースメッセージには ConsistentHashableEnvelope タイプがあり、このシリアライザーをすべてのフィールド ( messageおよびhashKey ) に適用するため、akka はJavaSerializerを正しく使用します。

タスクを解決するには、ConsistentHashableEnvelopeのシリアライザーを作成し、メッセージフィールドにMyObjectタイプがある場合にのみMyObjectClusterSerializerを使用し、それ以外の場合はJavaSerializerを使用します

于 2014-12-03T10:51:29.050 に答える