0

このようなファイルにドライバー/メインクラスがあります。(基本的にはSTORMとAKKAをミックスしようとしています)。TenderEventSpout2 クラスで、アクターとの間でメッセージを送受信しようとしています。

public class TenderEventSpout2 extends BaseRichSpout {  
        ActorSystemHandle actorSystemHandle;
        ActorSystem _system;
        ActorRef eventSpoutActor;
        Future<Object> future;
        Timeout timeout;
        String result;

    @Override
    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        //String[] message = {"WATCH_DIR"};
        timeout = new Timeout(Duration.create(60, "seconds"));
        List<Object> messageList = new ArrayList<Object>(); 

        messageList.add("WATCH_DIR");

        messageList.add(this.inputDirName);

        actorSystemHandle = new ActorSystemHandle();
        _system = actorSystemHandle.getActorSystem();
        eventSpoutActor = _system.actorOf(Props.create(EventSpoutActor.class));


        future = Patterns.ask(eventSpoutActor, messageList, timeout);

    }

    @Override
    public void nextTuple() {
        String result = null;
        try{
            result = (String) Await.result(future, timeout.duration());
        }
        catch(Exception e){
            e.printStackTrace();
        }
}

私のアクターは:

public class EventSpoutActor extends UntypedActor {
public ConcurrentLinkedQueue<String> eventQueue = new ConcurrentLinkedQueue<>();
Inbox inbox;
@Override 
public void onReceive(Object message){// throws IOException {
    if (message instanceof List<?>) {
        System.out.println(((List<Object>)message).get(0)+"*******************");
        if(((List<Object>)message).get(0).equals("WATCH_DIR")){
            final List<Object> msg = (List<Object>)message;
            Thread fileWatcher = new Thread(new Runnable(){

                @Override
                public void run() {

                        System.out.println(msg.get(1)+"*******************");
                        try {

                            String result = "Hello";
                            System.out.println("Before Sending Message *******************");
                            getSender().tell(result, getSelf());
                            } 
                        catch (Exception e) {
                            getSender().tell(new akka.actor.Status.Failure(e), getSelf());
                            throw e;
                            }

                }
            });
            fileWatcher.setDaemon(true);
            fileWatcher.start();
            System.out.println("Started file watcher");
        }
    } 
    else{
        System.out.println("Unhandled !!");
        unhandled(message);
    }
}

}

EventSpoutActor にメッセージを送信できます。しかし、メッセージの受信に問題が発生しています。何故ですか??コンソールに次のメッセージが表示されます。

[EventProcessorActorSystem-akka.actor.default-dispatcher-3]
[akka://EventProcessorActorSystem/deadLetters] Message [java.lang.String]      
from Actor[akka://EventProcessorActorSystem/user/$a#-1284357486] to  
Actor[akka://EventProcessorActorSystem/deadLetters] was not delivered. [1] 
dead letters encountered.
This logging can be turned off or adjusted with configuration settings 
'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
4

1 に答える 1

0

それで、メッセージが配信されなかった理由がわかりました。

 getSender().tell(result, getSelf());

メッセージを送信者に送信するはずのこの行は、スレッド コード内で使用されると、そのコンテキスト データを失いました。

 Thread fileWatcher = new Thread(new Runnable(){

            @Override
            public void run() {

                    System.out.println(msg.get(1)+"*******************");
                    try {

                        String result = "Hello";
                        System.out.println("Before Sending Message *******************");
                        getSender().tell(result, getSelf());

「tell」コードをスレッドの外に移動すると、機能しました。

于 2016-05-23T12:00:21.200 に答える