問題タブ [akka-kafka]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票する
1 に答える
416 参照

scala - 1 秒あたりのメッセージ数を調整できる Kafka プロデューサー

安定しているが調整可能な出力で Apache Kafka プロデューサーを作成する最良の方法は何ですか。

例:プロデューサーは、一定の 1000 メッセージ/秒をブローカーに送信する必要があります。ランタイム中、出力は 10 または 10000 メッセージ/秒に調整できる必要があります。

1 つの方法として、1 秒ごとに実行されるスケジューラを設定し、事前定義された量のメッセージをバッチで送信する方法があります。

追加:このプロデューサはパフォーマンス テスト フレームワークの一部である必要があるため、送信する必要があるメッセージの量は非常に多くなります。非常に高い負荷をどのように処理しますか? そのためにAkkaを使用することは有益でしょうか?

ターゲット言語は Scala ですが、どの言語のサンプル コードも大歓迎です。

0 投票する
0 に答える
353 参照

akka-stream - Akka Kafka Producer でのエラー処理

私は react-kafka-core 0.10.1 を使用しています (Kafka 0.9.x を対象としています)。コールバック関数からエラーが発生するたびに、Kafka プロデューサー アクターが停止しているようです。この動作をカスタマイズする方法はありますか? 私たちの使用例は、メッセージを回復して再送信しようとすることです。

}

0 投票する
1 に答える
1882 参照

scala - NoSuchMethodError: KafkaConsumer.subscribe

次の依存関係を使用します。

そして俳優があります:

次に、上記のアクターを開始しようとすると、次の例外が発生します。

次の問題が見つかりましたが、役に立ちません。この API の競合を解決するにはどうすればよいですか?

0 投票する
0 に答える
208 参照

scala - Chain Akka Streams Kafka with Akka-http by Source[Bytestring]

サービスに送信したい Kafka Reactive Streams コンシューマからバイト文字列としてファイルを受信して​​います。

Kafka Reactive Stream Consumer から Source[Bytestring, Any] を抽出して、Bytestring 全体をメモリにロードしてから akka-http リクエストを実行することなく、Kafka から Akka-http にストリームをチェーンできるようにする方法はありますか?

0 投票する
3 に答える
696 参照

scala - Akka Stream TCP + Akka Stream Kafka プロデューサーが停止せず、メッセージを発行せず、エラーも発生しない

次のストリームがあります。

しばらくは正常に動作し、Kafka トピックに取り込まれたメッセージを消費できます。しかし、ときどきランダムな間隔でメッセージが発行されなくなり、このコードはエラーをログに記録しません (printAndByeBye は渡されたメッセージを出力し、アクター システムを終了します)。フロー。

ここで何が起こっているのかを知る方法について何か考えはありますか?

編集: Kamon を配置すると、次の動作が見られました。

アクターあたりのメールボックス サイズ

アクターごとのメールボックスの時間

アクターあたりの処理時間

ストリームを停止するように通知せずに何かが停止したように見えますが、明示的にストリームを停止する方法がわかりません。

0 投票する
2 に答える
654 参照

scala - Akka Kafka Producersettings : オーバーロードされたメソッドの値は代替で適用されます:

コードにプロデューサー設定を入れると、何度も問題が発生します。私がそれを持っていないときは、すべて正常に動作します。以下に、すべてのコードを含む単一のファイルをファイルに示しました。ファイルをkafkaストリームに書き込もうとしています。そして、このコンパイルエラーが発生します。

エラーは

0 投票する
1 に答える
504 参照

akka - Akka Kafka ストリーム監視戦略が機能しない

私は Akka Streams Kafka アプリケーションを実行しています。ブローカが停止し、ストリーム コンシューマが停止タイムアウト後に停止した場合、スーパーバイザがコンシューマを再起動できるように、ストリーム コンシューマに監視戦略を組み込みたいと考えています。

ここに私の完全なコードがあります:

UserEventStream:

StreamProcessorSupervisor(これはクラスのスーパーバイザー クラスですUserEventStream):

App(メイン アプリケーション クラス):

application.conf:

アプリケーションを実行した後、意図的に Kafka ブローカーを強制終了しましたが、30 秒後に攻撃者がポイズン ピルを送信して自身を停止していることに気付きました。BackoffSupervisorしかし不思議なことに作戦通りに再起動しない。

ここで何が問題になる可能性がありますか?