ActorPublisher
ソースであるストリームを作成しました。SupervisionStrategy
ストリームのマテリアライザーを設定しました:
public class TestStream {
static class MyActorPublisher extends AbstractActorPublisher<String> {
public MyActorPublisher() {
receive(ReceiveBuilder
.match(ActorPublisherMessage.Request.class, request -> publish())
.match(ActorPublisherMessage.Cancel.class, cancel -> {
context().stop(self());
})
.build()
);
}
public void publish() {
if (totalDemand() > 0) {
throw new RuntimeException(" test exc ");
}
}
@Override
public SupervisorStrategy supervisorStrategy() {
return new OneForOneStrategy(2, Duration.create(1, TimeUnit.MINUTES), param -> {
return SupervisorStrategy.restart();
});
}
}
public static void main(String[] args) {
final ActorSystem actorSystem = ActorSystem.create("actorSystem");
final ActorMaterializerSettings settings = ActorMaterializerSettings
.create(actorSystem)
.withInputBuffer(1, 1024)
.withSupervisionStrategy(JFunction.func((t) -> {
System.out.println("Supervisor found an error");
return Supervision.resume();
}));
final Materializer materializer = ActorMaterializer.create(settings, actorSystem);
final Source<String, ActorRef> benchmarkSource = Source.actorPublisher(
Props.create(MyActorPublisher.class)
);
benchmarkSource.map(item -> {
System.out.println(item);
return item + "1";
})
.to(Sink.foreach(item -> System.out.println(item)))
.run(materializer);
}}
ActorPublisher
ただし、例外がスローされている場合、作成された戦略は使用されません。でオーバーライドも試みましsupervisorStrategy()
たMyActorPublisher
。しかし、それは子役にのみ使用され、機能していないことを理解しました。