5

現在のプロジェクトで使用するために、Parallel.Invoke と一般的な並列処理について自分自身を教育しています。必要に応じてより多くの並列「スレッド」を動的に\インテリジェントに割り当てる方法を理解するには、正しい方向へのプッシュが必要です。

例として。大きなログ ファイルを解析しているとします。これには、ファイルからの読み取り、返された行のある種の解析、および最終的なデータベースへの書き込みが含まれます。

私にとって、これは並列処理の恩恵を受けることができる典型的な問題です。

簡単な最初のパスとして、次のコードでこれを実装します。

Parallel.Invoke(
  ()=> readFileLinesToBuffer(),
  ()=> parseFileLinesFromBuffer(),
  ()=> updateResultsToDatabase()    
);

舞台裏

  1. readFileLinesToBuffer() は各行を読み取り、バッファーに格納します。
  2. parseFileLinesFromBuffer が来て、バッファから行を消費し、 updateResultsToDatabase() が来てこのバッファを消費できるように、それらを別のバッファに置くとしましょう。

したがって、示されているコードは、3 つのステップのそれぞれが同じ時間/リソースを使用することを前提としていますが、parseFileLinesFromBuffer() は長時間実行されるプロセスであるため、これらのメソッドの 1 つだけを実行するのではなく、2 つを並行して実行する必要があります。

コードが認識する可能性のあるボトルネックに基づいて、これを実行することをコードにインテリジェントに決定させるにはどうすればよいでしょうか?

概念的には、バッファー サイズを監視するいくつかのアプローチがどのように機能するかを確認できます。たとえば、新しい「スレッド」を生成してバッファーを増加した速度で消費します...しかし、TPL ライブラリをまとめる際にこの種の問題が考慮されていると思います。 .

いくつかのサンプル コードは素晴らしいものですが、次に調査すべき概念についての手がかりが必要です。System.Threading.Tasks.TaskScheduler が鍵を握っているように見えますか?

4

3 に答える 3

4

リアクティブエクステンションを試しましたか?

http://msdn.microsoft.com/en-us/data/gg577609.aspx

Rxは、Microsoftの新しいテクノロジーであり、公式サイトに記載されているように焦点が当てられています。

Reactive Extensions(Rx)... ...は、監視可能なコレクションとLINQスタイルのクエリ演算子を使用して非同期のイベントベースのプログラムを作成するためのライブラリです。

Nugetパッケージとしてダウンロードできます

https://nuget.org/packages/Rx-Main/1.0.11226

私は現在Rxを学習しているので、この例を取り上げてコードを記述したかったので、最終的に作成したコードは実際には並列で実行されませんが、完全に非同期であり、ソース行が順番に実行されることが保証されます。

おそらくこれは最良の実装ではありませんが、私がRxを学んでいると言ったように、(スレッドセーフは良い改善になるはずです)

これは、バックグラウンドスレッドからデータを返すために使用しているDTOです

class MyItem
{
    public string Line { get; set; }
    public int CurrentThread { get; set; }
}

これらは実際の作業を行う基本的なメソッドです。単純なもので時間をシミュレートし、Thread.Sleep各メソッドの実行に使用されたスレッドを返しますThread.CurrentThread.ManagedThreadId。タイマーProcessLineは4秒で、最も時間のかかる操作であることに注意してください

private IEnumerable<MyItem> ReadLinesFromFile(string fileName)
{
    var source = from e in Enumerable.Range(1, 10)
                 let v = e.ToString()
                 select v;

    foreach (var item in source)
    {
        Thread.Sleep(1000);
        yield return new MyItem { CurrentThread = Thread.CurrentThread.ManagedThreadId, Line = item };
    }
}

private MyItem UpdateResultToDatabase(string processedLine)
{
    Thread.Sleep(700);
    return new MyItem { Line = "s" + processedLine, CurrentThread = Thread.CurrentThread.ManagedThreadId };
}

private MyItem ProcessLine(string line)
{
    Thread.Sleep(4000);
    return new MyItem { Line = "p" + line, CurrentThread = Thread.CurrentThread.ManagedThreadId };
}

UIを更新するためだけに使用している次の方法

private void DisplayResults(MyItem myItem, Color color, string message)
{
    this.listView1.Items.Add(
        new ListViewItem(
            new[]
            {
                message, 
                myItem.Line ,
                myItem.CurrentThread.ToString(), 
                Thread.CurrentThread.ManagedThreadId.ToString()
            }
        )
        {
            ForeColor = color
        }
    );
}

そして最後に、これはRxAPIを呼び出すメソッドです

private void PlayWithRx()
{
    // we init the observavble with the lines read from the file
    var source = this.ReadLinesFromFile("some file").ToObservable(Scheduler.TaskPool);

    source.ObserveOn(this).Subscribe(x =>
    {
        // for each line read, we update the UI
        this.DisplayResults(x, Color.Red, "Read");

        // for each line read, we subscribe the line to the ProcessLine method
        var process = Observable.Start(() => this.ProcessLine(x.Line), Scheduler.TaskPool)
            .ObserveOn(this).Subscribe(c =>
            {
                // for each line processed, we update the UI
                this.DisplayResults(c, Color.Blue, "Processed");

                // for each line processed we subscribe to the final process the UpdateResultToDatabase method
                // finally, we update the UI when the line processed has been saved to the database
                var persist = Observable.Start(() => this.UpdateResultToDatabase(c.Line), Scheduler.TaskPool)
                    .ObserveOn(this).Subscribe(z => this.DisplayResults(z, Color.Black, "Saved"));
            });
    });
}

このプロセスは完全にバックグラウンドで実行されます。これは生成された出力です。

ここに画像の説明を入力してください

于 2012-05-31T04:45:57.367 に答える
0

のカップルを使用しBlockingCollectionます。 ここに例があります

アイデアは、producerデータをコレクションに入れる を作成することです

while (true) {
    var data = ReadData();
    blockingCollection1.Add(data);
}

次に、コレクションから読み取る任意の数のコンシューマーを作成します

while (true) {
    var data = blockingCollection1.Take();
    var processedData = ProcessData(data);
    blockingCollection2.Add(processedData);
}

等々

Parallel.Foreach を使用して、TPL にコンシューマーの数を処理させることもできます。

Parallel.ForEach(blockingCollection1.GetConsumingPartitioner(),
                 data => {
                          var processedData = ProcessData(data);
                          blockingCollection2.Add(processedData);
                 });

GetConsumingPartitioner( notを使用する必要があることに注意してくださいGetConsumingEnumerable(こちらを参照)

于 2012-05-31T09:33:25.580 に答える
0

非同期/待機の世界では、次のようなものがあります。

public async Task ProcessFileAsync(string filename)
{
    var lines = await ReadLinesFromFileAsync(filename);
    var parsed = await ParseLinesAsync(lines);
    await UpdateDatabaseAsync(parsed);
}

次に、呼び出し元は var tasks = filenames.Select(ProcessFileAsync).ToArray(); を実行できます。および何でも(コンテキストに応じて、WaitAll、WhenAllなど)

于 2012-05-31T05:19:24.853 に答える