トピック
Akka HTTP 経由で Akka Actor と対話したいと思います。アイデアは、HTTP クライアントが Akka HTTP サーバー メソッドを呼び出し、Akka アクターへの要求を処理するシステムを持つことです。アクターはメッセージを処理し、HTTP クライアントに応答する呼び出し元 (Akka HTTP) に応答します。上記のようにできましたが、実装がブロックされているように見えるため、正しく実行していないと思います。
より適切に説明します。多くの同時 HTTP リクエストを作成すると、Akka HTTP が「キューを作成」し、アクターがリクエストを処理するのを待ってから、次のように送信することがわかります。
代わりに取得したいのは、Akka HTTP サーバーが HTTP クライアントからのリクエストをターゲットの akka アクターに即座に転送することです。アクターが精緻化を終了するのを待つ必要はありません。 アクターのメールボックス容量パラメーターを使用して、メッセージ キューの大きさを判断し、メッセージが多すぎる場合はメッセージを拒否したいと考えています。
したがって、Akka HTTP でアクターの応答を非同期に待機させる方法が必要になります。
メールボックスの容量が正しく機能していることはわかっています。なぜなら、単純なactor2.tell("Prova1", system.deadLetters()) (テスト用)を使用してアクターに多くのリクエストを行うと、メールボックスのサイズを超えるリクエストが正しく処理されるからです。拒否されました。
参考文献
システムをテストするために、akka のドキュメントで提供されている最小限の例に従って、簡単な構成を作成しました。akka http の場合: https://doc.akka.io/docs/akka-http/current/routing-dsl/index.html#minimal-example
そして私のアクターを作成するための以下: https://doc.akka.io/docs/akka/current/actors.html#creating-actors
私のコード
私が最初にしたことは、次のように 1 つのアクター (actor1) が akka HTTP を構成するシステムを作成することでした。
public class TestActor {
private static ActorSystem system;
public static void main(String[] args) throws InterruptedException
{
String httpBindAddress = "0.0.0.0";
int httpPort = 8086;
system = ActorSystem.create("deupnp");
ActorMaterializer materializer = ActorMaterializer.create(system);
Http http = Http.get(system);
AllDirectives app = new AllDirectives() {
};
Route routeActor = app.get(() ->
app.pathPrefix("mysuburl", () ->
app.pathPrefix(akka.http.javadsl.unmarshalling.StringUnmarshallers.STRING, actor ->
app.path(akka.http.javadsl.unmarshalling.StringUnmarshallers.STRING, message ->
app.onSuccess(() ->
CompletableFuture.supplyAsync(() -> actorFunctionCall(actor, message)), response ->
app.complete(StatusCodes.get(200), response))))));
Flow<HttpRequest, HttpResponse, NotUsed> routeFlow = app.route(routeActor).flow(system, materializer);
CompletionStage<ServerBinding> binding = http.bindAndHandle(routeFlow, ConnectHttp.toHost(httpBindAddress, httpPort), materializer);
// create system with one actor
ActorRef actor1 = system.actorOf(Props.create(ActorTest.class,"actor1").withMailbox("my-mailbox"),"actor1");
}
private static String actorFunctionCall(String actor, String message)
{
try {
Inbox inbox = Inbox.create(system);
system.actorSelection("user/"+actor).tell(message, inbox.getRef());
String response = (String) inbox.receive(Duration.create(20000, TimeUnit.SECONDS));
return response;
} catch (Exception e) {
//return new ResponseMessage(204,"Error");
e.printStackTrace();
return null;
}
}
}
私の ActorTest は次のとおりです。
public class ActorTest extends AbstractActor {
private String myName = "";
public ActorTest(String nome){
this.myName = nome;
}
@Override
public void preStart()
{
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(String.class,
message -> {
Thread.sleep(5000l);
System.out.println(this.getClass().getName() + " >> " + myName + " >> " + message);
})
.matchAny(mex->{
System.out.println("Error");
})
.build();
}
}
私のapplication.confは非常に単純です:
akka
{
stdout-loglevel = "DEBUG"
loglevel = "DEBUG"
actor {
default-dispatcher {
throughput = 10
}
}
}
my-mailbox {
mailbox-type = "akka.dispatch.NonBlockingBoundedMailbox"
mailbox-capacity = 1
}
予想された結果
ご覧のとおり、mailbox-capacity = 1の場合、複数の同時要求を行った場合、1 つのみが処理され、残りは破棄されることが予想されます。
http://127.0.0.1/mysuburl/actor1/my_msgで HTTP リクエストを受信するために Akka HTTP ルーティングを使用してから、 Inboxを使用してメッセージを送信するため、上記のコードは取得したいものに対して正しくないと思います応答を待ちます。
私の質問は、Akka HTTP リクエストを Akka Actor アクター 1 に非同期でリンクする正しい方法はどれですか?
詳細が必要な場合はお知らせください。
ノート
次の記事も読みました: https://doc.akka.io/docs/akka-http/current/handling-blocking-operations-in-akka-http-routes.html
これは、複数のブロッキング要求に対処するために有限数のスレッドを作成する方法を説明していますが、これは私のコードの影響を「軽減」するだけであり、ブロッキングではなく、ブロッキングではない方法で記述する必要があると思います.