このようなファイルにドライバー/メインクラスがあります。(基本的にはSTORMとAKKAをミックスしようとしています)。TenderEventSpout2 クラスで、アクターとの間でメッセージを送受信しようとしています。
public class TenderEventSpout2 extends BaseRichSpout {
ActorSystemHandle actorSystemHandle;
ActorSystem _system;
ActorRef eventSpoutActor;
Future<Object> future;
Timeout timeout;
String result;
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
//String[] message = {"WATCH_DIR"};
timeout = new Timeout(Duration.create(60, "seconds"));
List<Object> messageList = new ArrayList<Object>();
messageList.add("WATCH_DIR");
messageList.add(this.inputDirName);
actorSystemHandle = new ActorSystemHandle();
_system = actorSystemHandle.getActorSystem();
eventSpoutActor = _system.actorOf(Props.create(EventSpoutActor.class));
future = Patterns.ask(eventSpoutActor, messageList, timeout);
}
@Override
public void nextTuple() {
String result = null;
try{
result = (String) Await.result(future, timeout.duration());
}
catch(Exception e){
e.printStackTrace();
}
}
私のアクターは:
public class EventSpoutActor extends UntypedActor {
public ConcurrentLinkedQueue<String> eventQueue = new ConcurrentLinkedQueue<>();
Inbox inbox;
@Override
public void onReceive(Object message){// throws IOException {
if (message instanceof List<?>) {
System.out.println(((List<Object>)message).get(0)+"*******************");
if(((List<Object>)message).get(0).equals("WATCH_DIR")){
final List<Object> msg = (List<Object>)message;
Thread fileWatcher = new Thread(new Runnable(){
@Override
public void run() {
System.out.println(msg.get(1)+"*******************");
try {
String result = "Hello";
System.out.println("Before Sending Message *******************");
getSender().tell(result, getSelf());
}
catch (Exception e) {
getSender().tell(new akka.actor.Status.Failure(e), getSelf());
throw e;
}
}
});
fileWatcher.setDaemon(true);
fileWatcher.start();
System.out.println("Started file watcher");
}
}
else{
System.out.println("Unhandled !!");
unhandled(message);
}
}
}
EventSpoutActor にメッセージを送信できます。しかし、メッセージの受信に問題が発生しています。何故ですか??コンソールに次のメッセージが表示されます。
[EventProcessorActorSystem-akka.actor.default-dispatcher-3]
[akka://EventProcessorActorSystem/deadLetters] Message [java.lang.String]
from Actor[akka://EventProcessorActorSystem/user/$a#-1284357486] to
Actor[akka://EventProcessorActorSystem/deadLetters] was not delivered. [1]
dead letters encountered.
This logging can be turned off or adjusted with configuration settings
'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.