Akka
SyncWriteJournal
現在、 API との接続を実装するための独自のプラグインを作成中HSQLDB
です。
問題は、メソッドの要件を理解していないことですdoAsyncReplayMessages
。future を返す必要があり、すべてのメッセージは によって呼び出される必要があると述べていますreplayCallback
。
メッセージのリストを返すクエリがあるとしましょう: List<Message> messages
. replayCallback
を使用する方法と、Future
そのリストを使用してメソッドを正しく実装する方法の最小限の例 (説明付き) を誰かが提供できますか? どのようreplayCallback
にFuture
連携し、メソッドによって何を返す必要がありdoAsyncReplayMessages
ますか?
ありがとう!
-編集-
いくつかのコメントの助けを借りて、完全ではないが提案されたアイデアを組み込んだ実装を作成しました。
public Future<Void> doAsyncReplayMessages(final String persistenceId, long fromSequenceNr, long toSequenceNr, long max,
final Procedure<PersistentRepr> replayCallback) {
final ExecutionContext ec = context().system().dispatcher();
final Future<Void> future = Futures.future(new Callable<Void>() {
@Override
public Void call() throws Exception {
final List<Message> messages = getMessages();
for (int i = 0; i < feedbackList.size(); i++) {
replayCallback.apply(
new PersistentImpl(messages.get(i), i, persistenceId, false, null, null));
}
return null;
}
}, ec);
return future;
}
お気づきかもしれませんが、私がまだ見逃しているいくつかの重要な概念が見落とされています。PersistentImpl には、静止している 1 つの引数が必要Seq<String> confirm
ですnull
。そしておそらくもっと重要なことはnull
、将来は戻り値の型として期待Void
されているため、私は戻ります。それをどのように実装するかわかりません。現在、NPE をスローします。
[ERROR] [08/28/2014 12:31:19.582] [akkaSystem-akka.actor.default-dispatcher-7] [akka://akkaSystem/system/journal] null
java.lang.NullPointerException
at akka.persistence.journal.japi.AsyncRecovery.asyncReadHighestSequenceNr(AsyncRecovery.scala:26)
at akka.persistence.journal.SyncWriteJournal$$anonfun$receive$1.applyOrElse(SyncWriteJournal.scala:53)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at akka.persistence.journal.japi.SyncWriteJournal.aroundReceive(SyncWriteJournal.scala:16)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:724)