4

Akka SyncWriteJournal現在、 API との接続を実装するための独自のプラグインを作成中HSQLDBです。

問題は、メソッドの要件を理解していないことですdoAsyncReplayMessages。future を返す必要があり、すべてのメッセージは によって呼び出される必要があると述べていますreplayCallback

メッセージのリストを返すクエリがあるとしましょう: List<Message> messages. replayCallbackを使用する方法と、Futureそのリストを使用してメソッドを正しく実装する方法の最小限の例 (説明付き) を誰かが提供できますか? どのようreplayCallbackFuture連携し、メソッドによって何を返す必要がありdoAsyncReplayMessagesますか?

ありがとう!

-編集-

いくつかのコメントの助けを借りて、完全ではないが提案されたアイデアを組み込んだ実装を作成しました。

public Future<Void> doAsyncReplayMessages(final String persistenceId, long fromSequenceNr, long toSequenceNr, long max,
        final Procedure<PersistentRepr> replayCallback) {
    final ExecutionContext ec = context().system().dispatcher();

    final Future<Void> future = Futures.future(new Callable<Void>() {
        @Override
        public Void call() throws Exception {
            final List<Message> messages = getMessages();
            for (int i = 0; i < feedbackList.size(); i++) {
                replayCallback.apply(
                        new PersistentImpl(messages.get(i), i, persistenceId, false, null, null));
            }
            return null;
        }
    }, ec);

    return future;
}

お気づきかもしれませんが、私がまだ見逃しているいくつかの重要な概念が見落とされています。PersistentImpl には、静止している 1 つの引数が必要Seq<String> confirmですnull。そしておそらくもっと重要なことはnull、将来は戻り値の型として期待Voidされているため、私は戻ります。それをどのように実装するかわかりません。現在、NPE をスローします。

[ERROR] [08/28/2014 12:31:19.582] [akkaSystem-akka.actor.default-dispatcher-7] [akka://akkaSystem/system/journal] null
java.lang.NullPointerException
    at akka.persistence.journal.japi.AsyncRecovery.asyncReadHighestSequenceNr(AsyncRecovery.scala:26)
    at akka.persistence.journal.SyncWriteJournal$$anonfun$receive$1.applyOrElse(SyncWriteJournal.scala:53)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at akka.persistence.journal.japi.SyncWriteJournal.aroundReceive(SyncWriteJournal.scala:16)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
    at akka.dispatch.Mailbox.run(Mailbox.scala:220)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:724)
4

1 に答える 1

2

次のように、ブロック操作を Future でラップするだけですFuture { fetchStuff() }

同期ジャーナルの完全な実装については、dnvriend/akka-persistence-jdbc: JdbcSyncWriteJournalを参照してください。

于 2014-08-26T12:12:16.273 に答える