問題タブ [consumer]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
apache-kafka - Kafka の高レベル コンシューマは、Java API を使用してトピックからすべてのメッセージを取得します (--from-beginning と同等)
Kafka サイトの ConsumerGroupExample コードを使用して、Kafka High Level Consumer をテストしています。Kafka サーバー構成にある「test」というトピックに関する既存のメッセージをすべて取得したいと思います。他のブログを見ると、auto.offset.reset を「最小」に設定して、すべてのメッセージを取得できるようにする必要があります。
私が実際に持っている質問は次のとおりです。高レベルのコンシューマーに対する同等の Java API 呼び出しは何ですか。
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
java - Java での複数の生産者と単一の消費者のためのソフトウェア設計
複数のプロデューサーがあり、それぞれが独自のブロッキング キューにデータをプッシュします。各プロデューサーからのデータは個別に処理されます (したがって、個別のキューがあります)。
私の単一のコンシューマーは現在、これらのキューのそれぞれをポーリングし、それぞれにタイムアウトを設定しています。
プログラムは問題なく動作しますが、最適化しようとしています。「ポーリング」が好きではありません。これはこれにアプローチする最も効率的な方法ですか?,
プロデューサーごとにコンシューマーを使用して同じアプリを作成しましたが、プロファイルはわずかに悪化しました。
ありがとう 。
rabbitmq - Windsor Integration の使用時に MassTransit Consumer が呼び出されない
RabbitMQ を介した Mass Transit で Castle Windsor Integration を機能させることができないようです。ウィンザーを写真に紹介するまで、すべてがうまく機能していました. Castle.Windsor 3.2 と MassTransit.WindsorIntegration 2.9 を参照し、アプリケーション内で使用するコンテナーを構成しました。次の方法で MassTransit Consumers を登録しています。
この行の実行後にコンテナをデバッグして調べると、他のすべてのコンポーネントとともにすべてのコンシューマが正常に登録されていることがわかります。次に、サービス バスを初期化して登録する次のコードがあります。
LoadFrom(IWindsorContainer container)
MassTransit.WindsorIntegration が提供する拡張メソッドを使用しています。
私がこれまでに見つけたすべての例はここで終わりであり、これがあなたがしなければならないすべてであることを示しています。残念ながら、コンシューマーが呼び出されることはなく、メッセージがキューに積み上げられます (最終的にはタイムアウトになり、エラー キューに移動します)。コンシューマ キューにメッセージがまったく表示されているという事実 (+ RabbitMQ 管理ツールを介してキューにバインドされている単一のコンシューマが表示されます) は、コンシューマがおそらく適切にサブスクライブされていることを示しています。問題があります。
Windsor と MassTransit の NLog ログを追加しましたが、ログにエラーは表示されません。この時点でトラブルシューティングをどのように進めればよいかわかりません。何か案は?
また、このアプリケーションは現在、開発に Topshelf を使用する単なるコンソール アプリケーションです。最終的には、Windows サービスとしてインストールされます。関係あるかどうかわかりませんが、念のため書いておきます。
アップデート
テストとして、非常に単純なテスト メッセージを処理するためのパラメーターのないコンストラクターを持つ非常に単純な Consumer を作成しました。このコンシューマーは正常に呼び出されました! ただし、「実際の」コンシューマーには、コンストラクターを介して注入する必要がある依存関係があります。コンテナがこれらを解決してくれることを期待していましたが、どうやら何らかの問題が発生しているようです。それについてログに何も表示されないのは奇妙です。乞うご期待...
ruby - ファンアウトを使用してRabbitmqキューイングシステムが消費されていない
ruby コンシューマーを使用してこれらのキューをファンアウトしようとしましたが、非常に単純で、その交換/キューにサブスクライブし、メッセージを受信するだけです。新しいメッセージが発行されるたびに問題になります。つまり、消費しておらず、消費者がリストされていないことを意味します。交換でキューを再度バインドし、Ruby アプリを再起動すると、再び消費が開始されます。それから再びレンボに戻ります!Rubyアプリを数回再起動すると、うまくいくことがあります。何か案が?
以下のコンシューマに使用されるコード:
c - C で独自のセマフォを実装するにはどうすればよいですか?
私はさまざまな方法で生産者/消費者の問題を解決することに取り組んでいます。私が現在取り組んでいるものには、セマフォの down() および up() 関数呼び出しを保護するための Peterson のソリューションと組み合わせた、セマフォの独自の実装が含まれます。
問題は、私は C で作業しており、セマフォを作成する唯一の方法は、down() と up() の関数ポインターを持つ構造体を使用することです。これは正しい考えですか?
c - Producer Consumer マルチスレッドの printf の問題
生産者と消費者の問題をシミュレートするプログラムを作成しましたが、いくつかの問題が発生しています。これは、Win32 API を使用して作成されました。
アイテムが格納されているバッファのカウントを実行するために、満杯と空の 2 つのセマフォを使用しています。クリティカル セクションへのアクセスを制御するミューテックスもあります。2 つの関数があります。1 つは単純な計算 (スレッド * 1000000 + カウント) に基づいてアイテムを作成し、もう 1 つはそれを消費します。
バッファーには 10 個のスペースがありますが、プログラムはさまざまな数のスペースとスレッドに対して機能するはずです。Producer スレッドはバッファを通過し、セマフォが 10 までカウントされるまで最初からやり直します。2 つの問題が発生しており、解決策が見つからないようで、デバッガーで多くの詳細を取得することもできません。
1) printf() 関数を持つコメント部分は、毎回スレッドをクラッシュさせます。コンソールに何も出力されません。他の変数を出力せずに、文字列だけを使用してみました。失敗。マルチスレッド環境で printf() を使用すると問題が発生する可能性があるというドキュメントを見つけましたが、この場合はクリティカル セクション内にあり、最初の試行でクラッシュします。その情報をコンソールに出力するにはどうすればよいですか?
2) print ステートメントを削除してコードを実行すると、スレッドは毎回異なる時点でクラッシュします。理由がわかりません。プロデューサー スレッドは、空のセマフォとミューテックスを待機し、項目を入れてから、完全なセマフォのカウントを増やします (項目が追加されたことを示します)。10 に達したら停止する必要があります。コンシューマ スレッドは、完全なセマフォとミューテックスを待機し、アイテムを削除してから、空のセマフォのカウントを増やします。これらのスレッドの終了を引き起こしている、私が書いた方法に何かがありますか?
これは私が得るものです:
プログラム '[5348] OperatingSystem.exe' はコード 0 (0x0) で終了しました。
.net - RabbitMQ で再試行回数を設定するにはどうすればよいですか?
私は RabbitMQ を使用しており、電子メール メッセージを保持するキューがあります。私の消費者サービスはメッセージをデキューし、それらを送信しようとします。何らかの理由でコンシューマがメッセージを送信できない場合、メッセージを再キューイングして再度送信したいと考えています。basicNack を実行して requeue フラグを true に設定できることはわかっていますが、メッセージを無期限に再キューに入れたくはありません (たとえば、メール システムがダウンした場合に、未送信のメッセージを継続的に再キューに入れたくはありません)。メッセージをキューに入れ直して再送信できる回数を定義したいと思います。ただし、電子メール メッセージ オブジェクトをデキューして nack を送信すると、フィールドを設定できません。更新されたフィールドは、キュー内のメッセージに存在しません。これにアプローチできる他の方法はありますか?前もって感謝します。