0

以下のコードのどこかにバグがあります (Akka で再帰を試みています)。アルゴリズムが停止し、システム モニターから強制終了しない限り、JVM でプロセス (Java アプリケーション) が永久に実行されます。それを修正するのは非常に簡単なハックであるべきだと思います。

これは、並列 Pi 近似に Akka を使用する方法の例です。以下は、Akka が再帰アクターでどのように機能するかを示す試みです。したがって、マスターは 2 つのワーカーを作成し、同じメッセージを送信してint値を減らします。彼らはそれを並行して行い、整数値が 0 でないかどうかをチェックします。そうであれば、結果の整数値 (0) をマスターに返すか、両方とも 2 つのワーカーを再度作成し、最近減分された値を送信します。このツリーの深さが 1 より大きい (整数値が 1 より大きい) 場合、ワーカーは結果を呼び出し元のワーカーに送信し、最後にマスターにのみ送信します。以下のように非常に簡単です (Decrement、NewIntValue、FinalIntValue は基本的に同じですが、わかりやすいように名前が異なります)。

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.actor.UntypedActorFactory;
import akka.routing.RoundRobinRouter;

public class StackOverFlow {

    public static void main(String[] args) {
        StackOverFlow rid = new StackOverFlow();
        rid.start(2);
    }

    public void start(final int workersNumber) {
        // create an Akka system
        ActorSystem system = ActorSystem.create("IntDec");
        // create the result listener, which will print the result and shutdown the system
        final ActorRef listener = system.actorOf(new Props(Listener.class), "listener");
        // create the master
        ActorRef master = system.actorOf(new Props(new UntypedActorFactory() {
            public UntypedActor create() {
                return new Master(workersNumber, listener);
            }
        }), "master");
        // start the computation
        master.tell(new Compute());
    }

    static class Compute {}

    static class Decrement {
        private final int intValue;
        public Decrement(int value) {
            this.intValue = value;
        }
        public int getValue() {
            return intValue;
        }
    }

    static class NewIntValue {
        private final int intValue;
        public NewIntValue(int value) {
            intValue = value;
        }
        public int getValue() {
            return intValue;
        }
    }

    static class FinalIntValue {
        private final int intValue;
        public FinalIntValue(int value) {
            intValue = value;
        }
        public int getValue() {
            return intValue;
        }
    }

    public static class Worker extends UntypedActor {

        private int resultsNumber = 0;
        private final int messagesNumber = 2;

        private final ActorRef workerRouter;

        public Worker(final int workersNumber) {

            workerRouter = getContext().actorOf(
                    new Props(new UntypedActorFactory() {
                        public UntypedActor create() {
                            return new Worker(workersNumber);
                        }
                    }).withRouter(
                        new RoundRobinRouter(workersNumber)
                    ), "workerRouter");

        }

        public void onReceive(Object message) {

            if (message instanceof Decrement) {
                // get and decrement the int value
                Decrement job = (Decrement) message;
                int intValue = job.getValue();
                System.out.println("\tWorker:Decrement " + intValue);
                intValue--;
                if (intValue == 0) {
                    // we are finished
                    getSender().tell(new NewIntValue(intValue), getSelf());
                    // stop this actor and all its supervised children
                    getContext().stop(getSelf());
                } else {
                    for (int i = 0; i < messagesNumber; i++) {
                        // notify a worker
                        workerRouter.tell(new Decrement(intValue), getSelf());
                    }
                }

            } else if (message instanceof NewIntValue) {

                NewIntValue newInt = (NewIntValue) message;
                int intValue = newInt.getValue();

                System.out.println("\tWorker:NewIntValue!!! " + intValue);

                resultsNumber++;
                if (resultsNumber == messagesNumber) {
                    // we are finished
                    getSender().tell(new NewIntValue(intValue), getSelf());
                    // stop this actor and all its supervised children
                    getContext().stop(getSelf());
                }

            } else unhandled(message);
        }

    }

    public static class Master extends UntypedActor {

        private int resultsNumber = 0;
        private final int messagesNumber = 2;

        private int intValue = 2;

        private final ActorRef listener;
        private final ActorRef workerRouter;

        public Master(final int workersNumber, ActorRef listener) {

            this.listener = listener;

            workerRouter = getContext().actorOf(
                    new Props(new UntypedActorFactory() {
                        public UntypedActor create() {
                            return new Worker(workersNumber);
                        }
                    }).withRouter(
                        new RoundRobinRouter(workersNumber)
                    ), "workerRouter");

        }

        public void onReceive(Object message) {

            if (message instanceof Compute) {

                System.out.println("\tMaster:Compute " + intValue);

                System.out.println(
                        "\n\tInitial integer value: " + intValue);

                for (int i = 0; i < messagesNumber; i++) {
                    workerRouter.tell(new Decrement(intValue), getSelf());
                }

            } else if (message instanceof NewIntValue) {

                NewIntValue newInt = (NewIntValue) message;
                intValue = newInt.getValue();

                System.out.println("\tMaster:NewIntValue " + intValue);

                resultsNumber++;
                if (resultsNumber == messagesNumber) {
                    // send the result to the listener
                    listener.tell(new FinalIntValue(intValue), getSelf());
                    // stop this actor and all its supervised children
                    getContext().stop(getSelf());
                }

            } else unhandled(message);

        }

    }

    public static class Listener extends UntypedActor {

        public void onReceive(Object message) {

            if (message instanceof FinalIntValue) {
                FinalIntValue finalInt = (FinalIntValue) message;
                System.out.println(
                        "\n\tFinal integer value: " + finalInt.getValue());
                getContext().system().shutdown();
            } else {
                unhandled(message);
            }

        }

    }

}
4

1 に答える 1

1
  1. クラスに追加private ActorRef sender;します。Worker
  2. メッセージsender = getSender();の先頭に追加します。Decrement
  3. クラスのメソッドに 変更getSender()します。senderNewIntValueWorker
于 2012-08-12T07:00:14.757 に答える