問題タブ [sarama]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票する
1 に答える
246 参照

apache-kafka - Sarama ClusterAdmin 接続の問題 - 壊れたパイプ

kafka1.1.0 のトピックを管理するために sarama(1.27) ClusterAdmin を使用しています。kafka トピックを管理する私のアプリケーションは、REST サービスとして実行されています。私のアプリケーションはしばらくの間正常に動作し、トピックを取得/作成/削除できます。

しかし、アクティビティがない状態でしばらく時間が経過すると、新しいトピック リクエストでエラーが発生します。

このHow to fix broker may not be available after broken pipe に遭遇しました。

私のアプリケーションはサービスとして実行されているため、パイプの破損の問題を防ぐにはどうすればよいですか? アプリケーションが終了するときだけ、ClusterAdmin を閉じます。すべての要求を処理するために、同じ ClusterAdmin 接続が使用されます。何らかの理由で nil の場合は、リクエストごとに clusterAdmin を再初期化します (通常、最初の初期化後は nil ではないため、同じ接続が再利用されます)。

各リクエストが処理された後に clusteradmin を閉じて、各トピック リクエストに対して NewClusterAdmin() を開く必要がありますか、それともキープアライブ オプションを使用する必要がありますか?

これが私の既存のコードです:

0 投票する
0 に答える
1125 参照

go - Sarama Kafka ライブラリ: 消費者グループの session.MarkMessage() を単体テストする方法は?

の消費者グループの例からコードを適応させようとしていますが、メソッドgithub.com/Shopify/saramaの機能をテストする単体テストを追加するのに苦労しています/main.go#L160 )。session.MarkMessage()ConsumeClaim

consume()関数を使用した私の適応コードは次のとおりです。

ここに私が書いたいくつかの単体テストがあります:

johnnypark/kafka-zookeeperテストは、次のように Docker コンテナーで Kafka と Zookeeper を実行した後に実行できます。

私が苦労しているのは次のとおりです:行をコメントアウトした場合

テストはまだパスします。https://godoc.org/github.com/Shopify/sarama#ConsumerGroupSessionによるとMarkMessage、メッセージを消費済みとしてマークしますが、これを単体テストでどのようにテストしますか?

0 投票する
2 に答える
1038 参照

go - Sarama を使用してメッセージを個別またはバッチごとにコミットする - Go 用のカフカ クライアント

私は、収集されたメッセージを手動でコミットできるようになった後、特定の時間メッセージを収集するカフカ コンシューマを作成しようとしていました。しかし、メッセージまたはメッセージのバッチをコミットするために使用できるメソッドまたは API が shopify sarama から見つかりませんでした。助けてください

0 投票する
0 に答える
454 参照

amazon-web-services - sarama consumergroup を AWS でホストされているコンフルエントなクラウド kafka に接続します。いいえ

これは、AWS サーバーでホストされている Confluent Kafka クラスターに接続しようとする最初の試みですが、Amazon マネージド ストリーミング サービスを使用していません。

このコードを使用して、古い自己ホスト型クラスターに問題なく接続できました (Config.Net.SASL セットアップを除く)。

何が間違っているのかわかりません。

接続すると、次のエラーが発生します。

これは私のコードです: