問題タブ [producer-consumer]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
c# - レビュー依頼:スレッドセーフキュー(パラレルプッシュポップ)
これは、スレッド セーフ キューの実装です。プッシュとポップはお互いをブロックしません。ただし、キューが空の場合、アイテムがプッシュされるまで pop は待機します。複数のプロデューサーとコンシューマーがキューを使用できます。何か問題があれば教えてください。
更新:回答に従って編集1)「キューがいっぱい」の状況の問題を解決しました。2) .NET4 にはBlockingCollection<T>
andがありConcurrentQueue<T>
ます。したがって、車輪を再発明する必要はありません(.NET4の場合)
java - JMS は永続的なブロッキング キューの必要性に対する答えですか?
イベントをリモート サーバーに非同期的に送信する Log4J アペンダーで構成されるライブラリを作成しています。ログステートメントが作成されると、アペンダーはイベントをローカルキューに非同期に記録し、コンシューマーのプールが取得してリモートに送信します。
完全なインメモリ ソリューションは、同時実行の問題を処理する BlockingQueue を作成することです。ただし、キューを永続化して、リモートサーバーが使用できない場合にキューを無制限に拡張したり、キューが制限されている場合にメッセージを破棄したりしないようにしたいと考えています。
組み込みの H2 データベースを使用してイベントをローカルに保存し、ポーリング メカニズムを使用してイベントを取得し、リモートに送信することを考えていました。データベース テーブルをポーリングするよりも、むしろ BlockingQueue を使用したいと思います。
JMS が答えですか?
編集:
JMS が答えであり、そのように進んでいると思われる場合、処理中のメッセージのみを受け入れるように構成できる、軽量で埋め込み可能な JMS ソリューションに関する推奨事項はありますか? 言い換えれば、リッスンする TCP ソケットを開きたくないし、おそらく許可されないでしょう。
編集:
現在、ActiveMQ が組み込まれており、動作しているようです。皆さんありがとう。
c# - プロデューサー-バリエーションのあるコンシューマー-スレッド信号/待機と同期する方法は?
大規模なプロジェクトに取り組んでいると、将来的に予定されている電話をたくさんかけていることに気づきました。これらはかなり軽量だったので、別のスケジューラーを使用する方が良いのではないかと思いました。
そこで、独自のスレッドで実行され、これらのイベントを実行する別のスケジューラークラスを作成しました。別々のスレッドから共有キューにアクセスする2つの関数があります。ロックを使用しますが、スレッドの1つがスリープ待機する必要があるため、ロックを解除する方法がわかりませんでした。
これはC#を対象としていますが、これに対する一般的な解決策を聞いてうれしいです。ありがとう!
producer-consumer - 春のアプリケーションイベントを介した生産者消費者
Spring のアプリケーション イベントと threadpoolexecutors を使用して、アクター モデル パターン (プロデューサー コンシューマーと多少マッシュアップ) を実装しようとしています。私の主な目的は、各レイヤーを切り離すことです。私のアーキテクチャは次のとおりです。残りの API を介してビジネス トランザクションの要求を受信する戦争が展開されています。任意の時点で、X は構成可能な数である X 数のトランザクションが存在する可能性があり、実際の実行は非同期である必要があり、各トランザクションは別のスレッドにあります。リクエスト自体は FIFO 方式で処理されますが、一部のリクエストは他のリクエストが処理されるまで待機する必要があるため、多少の複雑さがありますが、それは他のリクエストを処理できないという意味ではありません。
入金(2) 出金(2) 入金(3)
ここで、数字は口座番号です。次の順序で処理したいと思います。
入金(2) 入金(3) 出金(2)
私はこのようにアーキテクチャを構築しました:ヒットを取得してDB(DBに状態を持たなければならない分散システム)に書き込む残りのAPIがあり、アプリケーションコンテキストでclientrequesteventを公開しますプロデューサーイベントの発行と、彼が送信したイベントの数の監視を担当しています(つまり、同時プロセスの数を制限し、上記のロジックを実装することを担当しています)。など)後者によって発行されたイベントをリッスンし、完了したイベントを発行します。
すべてがうまく機能しますすべてが完了し、異なるスレッドとすべてのフローが素晴らしいですが、空きスロットがあるかどうかを判断する中間層に問題があります同期メソッドを持ちたくないし、したくありませんアトミックロングなどのトリックを行いたいのですが、ブロッキングキューを使用してイベントを公開したいのですが、イベントがいつ完了したかを判断する良い方法が見つからないので、新しいイベントを元に戻すことができます。最大の問題は、新しい作業を要求するために、DB にアクセスする必要があることです。これは、このシステムが重い負荷の下で動作する必要があるためです。私はどういうわけかブロッキングキューとスレッドプールを利用したいので、1つのスロットが空いている分、スレッドでサイズが制限されたキューから取得します
これを処理する良い方法は何でしょうか? 前もって感謝します
c++ - 中断または参加した後、Boostスレッド(スレッドプールから)を再利用します
現在、リアルタイムグラフィックスアプリケーションのレンダリング部分にプロデューサーコンシューマーモデルを使用しています。コンシューマーは、キュー内のデータを継続的に検索します(無限ループ)。ただし、これによりシミュレーションがメインループの同期から外れる可能性があるのではないかと心配しています。これは、高速生産者と低速消費者の問題だと思います。シミュレーションが一定の時間に制限されているという事実によってさらに複雑になっています。
質問-これをすべてバランスを保ち、消費者が終了するのに十分な時間を確保するだけでなく、現在のフレームのレンダリングが完了する前にシミュレーションが次のフレームに移動しないようにするための最良の方法は何ですか(または少なくともこれを検出して次のフレームのレンダリングをスキップする-またはレンダリングされている現在のフレームを中断する)現在、各コンシューマーが終了した後、中断して参加しています
2番目の質問:以下のコードを見ると、現在、キューにレンダリングジョブを追加した後、割り込みと参加を呼び出していることがわかります。これにより、スレッドは操作を完了し、割り込みに応答する必要があります。終わった時に。その後、interrupt_allとjoin_allが呼び出された後、スレッドプール内のスレッドを再利用するにはどうすればよいですか?(つまり、drawNextFrameを再度呼び出す場合)
プロデューサーは実行のメインスレッドの一部です(これは何にも影響しないと思います)
コンシューマークラスを確認する必要がある場合は、以下を参照してください。
私はこれを簡単で素早く消化できるようにしようとしました、お時間をいただきありがとうございます
java - 同期を使用する生産者/消費者
生産者/消費者問題を実装するコードを記述しましたが、同期しなくても正常に機能しているようですが、可能ですか?
コードをテストして、実際に正しく機能しているかどうかを確認するにはどうすればよいですか?デッドロックが発生するかどうかはどうすればわかりますか?現在、私はループから抜け出していません(つまり、プロデューサーは挿入を続け、コンシューマーは無限ループで消費し続けます)。共有リソースとしてサイズ3(簡単にするため)の循環キューを使用しています。
これが私のコードです:
c# - C# プロデューサー/コンシューマー/オブザーバー?
特定の種類のオブジェクトがあることを除いて、プロデューサー/コンシューマー キューがあります。したがって、すべてのコンシューマーが追加されたオブジェクトを消費できるわけではありません。種類が多すぎるため、種類ごとに特定のキューを作成したくありません。(生産者/消費者の定義を拡張するようなものですが、正しい用語が何であるかはわかりません。)
パラメータでパルスを許可する EventWaitHandle のようなものはありますか? 例えばmyHandle.Set(AddedType = "foo")
。現在、私は使用してMonitor.Wait
おり、各消費者はパルスが実際にそれらを意図したものであるかどうかを確認していますが、それはちょっと無意味に思えます.
私が今持っているものの疑似コードバージョン:
ご覧のとおり、辞書に他のものが追加されると、パルスが発生することがあります。MyType が dict に追加されたときだけ気にします。それを行う方法はありますか?大したことではありませんが、たとえば、タイムアウト内でロックを取得するたびに成功する可能性があるため、手動でタイムアウトを処理する必要がありますが、内MyType
の辞書に追加されることはありませんtimeout
。
c - 「セマフォの使用法は微妙に間違っています」
この過去の学期、私はCでOS実習を受講していました。この実習では、最初のプロジェクトでスレッドパッケージを作成し、次に機能を実証するために複数の生産者/消費者プログラムを作成しました。しかし、採点フィードバックを受け取った後、「セマフォの使用法が微妙に間違っている」と「プログラムがプリエンプションを想定している(たとえば、制御を変更するためにyieldを使用する)」(非プリエンプティブスレッドパッケージから始めて、後でプリエンプションを追加した)のポイントを失いました。コメントと例は互いに矛盾していることに注意してください。どちらも想定しておらず、両方の環境で機能すると思います)。
これは長い間私を悩ませてきました-コースのスタッフはちょっと圧倒されていたので、学期を通してこれの何が問題なのかを彼らに尋ねることはできませんでした。私はこれについて長い間考えてきました、そして私は問題を見ることができません。誰かが見てエラーを指摘したり、実際には問題がないことを私に安心させてくれたら、本当にありがたいです。
構文はスレッドパッケージ関数(ミニスレッドとセマフォ)に関してはかなり標準的であるはずですが、混乱することがあれば教えてください。
message-queue - RabbitMQ: 非ポーリング戦略を使用して、1 つのキューに複数のコンシューマーを配置できますか?
RabbitMQ を使用して、1 台のマシン上のプロデューサーから、複数のマシンに分散された少数のコンシューマー グループにジョブを送信します。
プロデューサはジョブを生成してキューに配置し、コンシューマは 10 ミリ秒ごとにキューをチェックして、要求されていないジョブがあるかどうかを確認し、ジョブが利用可能な場合は一度にジョブをフェッチします。1 つの特定のワーカーがジョブの処理に時間がかかりすぎる場合 (GC の一時停止またはその他の一時的な問題)、他のコンシューマーはキューからジョブを自由に削除できるため、ジョブ全体のスループットは高く保たれます。
このシステムを最初にセットアップしたとき、キュー上の複数のコンシューマーに対してサブスクライバー関係をセットアップする方法を理解できませんでした。これにより、ポーリングしてわずかな余分なレイテンシーを導入する必要がなくなります。
ドキュメントを調べても、満足のいく答えは得られませんでした。メッセージ キューを使用するのは初めてで、上記のシナリオを正確に説明する言葉を知らない可能性があります。これは黒板システムのようなものですが、この場合、「スペシャリスト」はすべて同一であり、互いの結果を消費することはありません。結果は常にジョブ プロデューサーに報告されます。
何か案は?
objective-c - NSData を使用して生産者と消費者の問題を解決する (オーディオ ストリーミング用)
AVAssetReader を使用して PCM データを iPod トラックからバッファにコピーし、RemoteIO オーディオ ユニットで再生しています。サウンドデータをロードするための別のスレッドを作成しようとしています。これにより、ロード中にバッファからデータにアクセスして再生できます。
現在、最終的に曲のデータ全体を保持する大きな NSMutableData オブジェクトがあります。現在、次のように NSOperation を使用して別のスレッドにオーディオ データをロードします。
- AVAssetReaderOutput は、一度に最大 8192 バイトを CMBlockBuffer にコピーします
- これらのバイトを NSData オブジェクトにコピーします
- この NSData オブジェクトをより大きな NSMutableData オブジェクト (最終的に曲全体を保持する) に追加します。
- 終了したら、NSMutableData オブジェクトの各パケットにアクセスして曲を再生します
これらのバイトをコピーしながら曲を再生できるようにしようとしています。同時にファイルへの書き込みと読み取りを行う良い方法が何であるかはわかりません。
私が持っていた短いアイデア:
- それぞれ長さが 8192 バイトの 3 つの NSData オブジェクトをバッファとして作成し、埋めます。
- プレイを開始します。最初のバッファの再生が終了したら、新しいデータを最初のバッファにロードします。
- 2 番目のバッファーの再生が終了したら、新しいデータを 2 番目のバッファーに読み込みます。三代目も同じ
- 再び最初のバッファから再生を開始し、3 番目のバッファを埋めます。等々。
または、3 * 8192 PCM ユニットを保持する 1 つの NSData オブジェクトを作成し、何らかの方法で 2 つの異なるスレッドで同時に書き込みおよび読み取りを行います。
現在、2 つの異なるスレッドでコードを実行しています。再生を押すまで配列にデータを追加します。その時点で停止し (おそらくスレッドがブロックされているためですが、今はわかりません)、ロードしたものの最後に到達するまで再生され、EXC_BAD_ACCESS 例外が発生します。
要するに、コピー中に PCM データを再生する正しい方法を見つけたいと思っています。たとえば、一度に 8192 バイトです。おそらく別のスレッドで行う必要があります(現在NSOperationを使用しています)が、できればより高いレベルのObjective-Cメソッドを使用して、同時にバッファへの書き込みと読み取りを行う方法が不明です。