問題タブ [akka-stream]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
scala - Akka Stream と HTTP Scala: ルートからアクターにメッセージを送信する方法
akka-stream-and-http-experimental
1.0で遊んでいます。これまでのところ、HTTP 要求を受け入れて応答できるユーザー サービスができました。また、アポイントメントを管理できるアポイントメントサービスも予定しています。予約するには、既存のユーザーである必要があります。予約サービスは、ユーザーが存在するかどうかをユーザー サービスに確認します。これは明らかに HTTP 経由で実行できますが、予約サービスがユーザー サービスにメッセージを送信するようにしたいと考えています。akka-http
これに慣れていないので、メッセージを送受信するためにアクターを (抽象化として) 使用する方法が明確ではありません。ドキュメントにはActorRef
andについての言及がありますが、前者と後者の例はありません。私のコードは次のようになり、Githubにあります。ActorPublisher
編集:メッセージを送信する方法を見つけました。これは、を使用して実行できますSource.actorRef
。これは、メッセージをストリームに送信するだけです。私がやりたいのは、ルート ハンドラー クラスが応答を受け取ることです。そうすれば、予約サービスを作成するときに、そのアクターはユーザー サービス アクターを呼び出して、例のユーザー ルート ハンドラーと同じ方法で応答を受け取ることができます。擬似コード:
val src = Source.single(name) \\ How to send this to an actor and get the response
編集2:@yardenaの回答に基づいて、次のことを思いつきましたが、最後の行はコンパイルされません。私のアクター パブリッシャーは、でラップされ、 としてルート ハンドラーに配信されるとFuture
推測している を返します。Promise
Future
scala - どうすれば Akka ストリームを継続的に具体化できますか?
Scala でAkka Streamsを使用して、 AWS Java SDKを使用してAWS SQSキューからポーリングしています。2 秒間隔でメッセージをデキューするActorPublisherを作成しました。
私のアプリケーションでは、同様に 2 秒間隔でフローを実行しようとしています。
ただし、アプリケーションを実行するjava.util.concurrent.TimeoutException: Futures timed out after [20000 milliseconds]
と、ActorMaterializer
.
Akka Stream を継続的にマテリアライズするための推奨されるアプローチはありますか?
scala - Akka Http が応答しない
レスポンシブな Akka Http アプリがあり、いくつかの変更を行った後、次のメッセージが表示されました。
それらが何を意味するのか理解できません。それらはドキュメントに記載されておらず、Google のヒットもありません。誰か洞察を共有できますか?
scala - Akka Stream Graph から具体化された結果を取得するには?
scala Akka Stream グラフから具体化された結果を取得する方法を理解しようとしています。
を使用して"com.typesafe.akka" %% "akka-stream-experimental" % "1.0"
います。
ドキュメントを見ましたが、例が見つかりませんでした。
だから、私はコードを持っているとしましょう
グラフから結果を取得したいのですが、g
Unit が返されます。それに対処する方法は?
ありがとうございました。
scala - PushPullStage から複数のオブジェクトを放出する
私は Akka-Streams で遊んでいてFlow
、独自の を実装してカスタムを作ろうとしていPushPullStage
ます。Flow
アップストリームから受信したオブジェクトをリストに蓄積し、アップストリームが完了したときにグループをダウンストリームに放出する前に、関数に従ってそれらをグループ化する必要があります。
実装するのは非常に簡単なことのように思えますが、その方法がわかりません! から複数のオブジェクトを放出する方法はないようですPushPullStage
。
これまでの私の実装は次のとおりです。
編集
圧力を考慮してコードを変更しましたが、現在はすべて機能しています。基本的には、ダウンストリームFlow
が意図したとおりに動作し、要素をプルし続ける必要がありました。
scala - 非同期ステージで Akka ストリームの真のプル ストリームを作成する方法
OAuth2 トークンを提供し、期限切れのトークンの更新も処理する Source を作成しようとしています。現在、私のコードは次のようになっています
このコードの出力は次のようになります
明らかに、この mapAsync(1) は予期しないときに需要を生み出しています (プリフェッチ?)
2 つの問題があります。
- 要求により、上流で不要なトークン要求が発生します
- トークンのプリフェッチ/キャッシュは、特定の時間のみ有効であるため問題があります。
では、この関数のように動作する真のプル ストリームを作成するにはどうすればよいでしょうか?
def tokenSource: () => Future[Token]
json - Akka HTTP: Json 形式の応答をドメイン オブジェクトにアンマーシャリングする方法
私は Akka HTTP を試しており、HttpResponse でドメイン オブジェクトの Json 配列を返すサービスを作成しました。クライアントでは、それをドメイン オブジェクトのソースに変換して、後続のフローとシンクで使用できるようにしたいと考えています。
Json サポートセクションを参照: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/http/common/json-support.html
暗黙的な RootJsonReader などを定義する必要はありましたが、FromEntityUnmarshaller を使用する方法がわかりません。
私のコードはここにあります: https://github.com/charlesxucheng/akka-http-microservice
akka-http-microservice activator テンプレートに基づいています。Service2.scala は私のサーバー実装であり、機能しています。AkkaHttpClient.scala はクライアントの実装であり、不完全です。
build.sbt が最新ではないため、ビルドには Gradle を使用してください。
ありがとう。
scala - Akka/Scala: この Akka Streams フローで何が起こっているのか説明できますか?
私は Akka ストリームに慣れていて、次のようにフィボナッチ パブリッシャー/サブスクライバーの例を作成しました。しかし、需要が最初にどのように生成され、それが加入者の要求戦略とどのような関係にあるのかはまだよくわかっていません。誰か説明してくれませんか?
フィボナッチ出版社:
フィボナッチ購読者:
フィボナッチ アプリ:
サンプル実行:質問: 4 の最初の需要はどこから来たのですか?