2

TPLに関するドキュメントと多くのチュートリアルを読みましたが、達成したいモデルをカバーしているものはありません。

一部のアルゴリズムでは、常に反復回数が固定されていました。

常に実行中のスレッドが必要です (できるだけ多く):

while(真)

  • MAIN スレッドからデータを取得する
  • 時間のかかる重いタスクを実行する (別のスレッドで)
  • メインスレッド情報を更新

さらに、目覚まし時計を設定できるメカニズムが必要です(たとえば、5秒)。5 秒後、すべての作業を一時中断してから再開する必要があります。

同じタスクで Task.ContinueWith を使用する必要がありますか? しかし、私は前のタスクの起動の結果を処理していませんが、代わりにメインスレッドのデータ構造を更新してから、新しいタスクの反復の入力になるものを決定します...

最高の効率を得るためにいくつのタスクを作成する必要があるかをTPLの決定に任せるにはどうすればよいですか?

いいえ、私は BackgroundWorkers を使用しています。なぜなら、彼らには素晴らしい RunEventCompleted イベントがあるためです。その内部ではメイン スレッドにいるので、MAIN 構造を更新し、時間の制約を確認し、最終的に完了した BackgroundWorker で StartAsync を再度呼び出すことができます。それは素晴らしく明確ですが、おそらく非常に非効率的です。マルチプロセッサ、マルチコア サーバーで高効率にする必要があります。

1 つの問題は、計算が常にオンラインであり、停止しないことです。MAIN構造の現在の状態をリモートで尋ねることができるネットワークもあります。

2番目の問題は、重要な時間制御です(正確なタイマーが必要です-停止すると、スレッドを再開できなくなります)。それが終了した後、特別な優先度の高いタスクが来て、すべての作業が再開されます。

3 つ目の問題は、実行する操作に上限がないことです。

私が観察したところによると、これらの 3 つの制約は TPL にうまく適合しません。コレクションはタスク自体の結果によってリアルタイムで変更されるため、Parallel.For のようなものは使用できません...結合方法もわかりません:

  • 作成するスレッド数を TPL に決定させる機能
  • スレッドのライフタイム実行の一種 (連続する再起動間の一時停止と同期ポイント)
  • スレッドを最初に 1 回だけ作成する (常に新しいパラメーターを使用してのみ再起動する必要があります)

誰かが私に手がかりを与えることができますか? 私はそれを悪い、非効率的な方法で行う方法を知っています。私が説明したいくつかの小さな要件があり、これを正しく行うことができません。私は少し混乱しています。

4

2 に答える 2

3

メッセージング + アクター + スケジューラー imo を使用する必要があります。そして、それが可能な言語を使用する必要があります。Azure Service Bus から非同期に受信し、共有キューにエンキューし、アクターを通じてランタイム状態を管理するこのコードを見てください。

列をなして:

同じタスクで Task.ContinueWith を使用する必要がありますか?

いいえ、ContinueWith は、各継続パス内の例外処理に基づいてプログラムを強制終了します。TPL には、失敗した状態を呼び出し側/メイン スレッドにマーシャリングする良い方法はありません。

しかし、私は前のタスクの起動の結果を処理していませんが、代わりにメインスレッドのデータ構造を更新してから、新しいタスクの反復の入力になるものを決定します...

問題に多くの時間を費やすつもりがない限り、これを行うにはスレッド化を超えて移動する必要があります。

最高の効率を得るためにいくつのタスクを作成する必要があるかをTPLの決定に任せるにはどうすればよいですか?

これは、非同期ワークフローを実行するフレームワークによって処理されます。

いいえ、私は BackgroundWorkers を使用しています。なぜなら、彼らには素晴らしい RunEventCompleted イベントがあるためです。その内部ではメイン スレッドにいるので、MAIN 構造を更新し、時間の制約を確認し、最終的に完了した BackgroundWorker で StartAsync を再度呼び出すことができます。それは素晴らしく明確ですが、おそらく非常に非効率的です。マルチプロセッサ、マルチコア サーバーで高効率にする必要があります。

1 つの問題は、計算が常にオンラインであり、停止しないことです。MAIN構造の現在の状態をリモートで尋ねることができるネットワークもあります。2番目の問題は、重要な時間制御です(正確なタイマーが必要です-停止すると、スレッドを再開できなくなります)。

すべてを非同期で実行する場合は、中断するアクターにメッセージを渡すことができます。スケジューリング アクターは、スケジュールされたメッセージですべてのサブスクライバーを呼び出す責任があります。pausedリンクされたコードの状態を見てください。未処理のリクエストがある場合は、それらにキャンセル トークンを渡し、その方法で「ハード」キャンセル/ソケット アボートを処理できます。

それが終了した後、特別な優先度の高いタスクが来て、すべての作業が再開されます。私が観察したところによると、これらの 2 つの制約は TPL にうまく対応していません。コレクションはタスク自体の結果によってリアルタイムで変更されるため、Parallel.For のようなものは使用できません...

おそらく、pipes-and-filters と呼ばれるパターンが必要です。入力を一連のワーカー (アクター) にパイプします。各ワーカーは、他のワーカーの出力から消費します。シグナリングは、コントロール チャネルを使用して行われます (私の場合、それはアクターの受信トレイです)。

于 2012-04-05T10:41:59.620 に答える
0

読むべきだと思います

MSDN: プロデューサー/コンシューマー データフロー パターンの実装方法

私も同じ問題を抱えていました。1 人の生産者が商品を生産し、複数の消費者がそれらを消費して、他の消費者に送ることにしました。各コンシューマは、他のコンシューマから独立して非同期で動作していました。

あなたの主な仕事はプロデューサーです。彼は、あなたの他のタスクが処理すべきアイテムを作成します。メインタスクのコードを持つクラスには関数があります:

public async Task ProduceOutputAsync(...)

メイン プログラムは、次を使用してこのタスクを開始します。

var producerTask = Task.Run( () => MyProducer.ProduceOutputAsync(...)

これが呼び出されると、プロデューサー タスクは出力の生成を開始します。その間、メイン プログラムは、たとえばコンシューマーを開始するなど、他のことを続けることができます。

しかし、最初に Producer タスクに注目しましょう。

プロデューサー タスクは、他のタスクによって処理されるタイプ T のアイテムを生成します。それらは、ITargetBlock を実装するオブジェクトを使用して他のタスクに引き継がれます。

プロデューサー タスクが型 T のオブジェクトの作成を完了するたびに、ITargetBlock.Post、またはできれば非同期バージョンを使用して、それをターゲット ブロックに送信します。

while (continueProducing())
{
    T product = await CreateProduct(...)
    bool accepted = await this.TargetBlock(product)
    // process the return value
}
// if here, nothing to produce anymore. Notify the consumers:
this.TargetBlock.Complete();

プロデューサには ITargetBlock <T> が必要です。私のアプリケーションでは、BufferBlock <T> で十分でした。他の可能なターゲットについては、MSDN を確認してください。

とにかく、データ フロー ブロックも ISourceBlock <T> を実装する必要があります。レシーバーは入力がソースに到着するのを待ち、それを取得して処理します。完了すると、結果を独自のターゲット ブロックに送信し、入力が予期されなくなるまで次の入力を待機できます。もちろん、コンシューマーが出力を生成しない場合、ターゲットに何も送信する必要はありません。

入力待ちは次のように行われます。

ISourceBlock`<T`> mySource = ...;
while (await mySource.ReceiveAsync())
{   // a object of type T is available at the source
    T objectToProcess = await mySource.ReceiveAsync();
    // keep in mind that someone else might have fetched your object
    // so only process it if you've got it.
    if (objectToProcess != null)
    {
        await ProcessAsync(objectToProcess);

        // if your processing produces output send the output to your target:
        var myOutput = await ProduceOutput(objectToprocess);
        await myTarget.SendAsync(myOutput);
    }
}
// if here, no input expected anymore, notify my consumers:
myTarget.Complete();
  • プロデューサーを構築する
  • すべてのコンシューマーを構築する
  • 生産者にその出力を送信するための BufferBlock を与える
  • プロデューサー MyProducer.ProduceOutputAsync(...) を開始します。
  • プロデューサーが出力を生成し、それをバッファー ブロックに送信している間:
  • コンシューマーに同じ BufferBlock を与える
  • コンシューマーを別のタスクとして開始する
  • await Task.WhenAll(...) を使用して、すべてのタスクが完了するまで待機します。

各コンシューマーは、入力がもう期待されていないことを聞くとすぐに停止します。すべてのタスクが完了すると、メイン関数は結果を読み取って返すことができます

于 2015-07-29T08:38:06.053 に答える