リアクティブエクステンションを試しましたか?
http://msdn.microsoft.com/en-us/data/gg577609.aspx
Rxは、Microsoftの新しいテクノロジーであり、公式サイトに記載されているように焦点が当てられています。
Reactive Extensions(Rx)... ...は、監視可能なコレクションとLINQスタイルのクエリ演算子を使用して非同期のイベントベースのプログラムを作成するためのライブラリです。
Nugetパッケージとしてダウンロードできます
https://nuget.org/packages/Rx-Main/1.0.11226
私は現在Rxを学習しているので、この例を取り上げてコードを記述したかったので、最終的に作成したコードは実際には並列で実行されませんが、完全に非同期であり、ソース行が順番に実行されることが保証されます。
おそらくこれは最良の実装ではありませんが、私がRxを学んでいると言ったように、(スレッドセーフは良い改善になるはずです)
これは、バックグラウンドスレッドからデータを返すために使用しているDTOです
class MyItem
{
public string Line { get; set; }
public int CurrentThread { get; set; }
}
これらは実際の作業を行う基本的なメソッドです。単純なもので時間をシミュレートし、Thread.Sleep
各メソッドの実行に使用されたスレッドを返しますThread.CurrentThread.ManagedThreadId
。タイマーProcessLine
は4秒で、最も時間のかかる操作であることに注意してください
private IEnumerable<MyItem> ReadLinesFromFile(string fileName)
{
var source = from e in Enumerable.Range(1, 10)
let v = e.ToString()
select v;
foreach (var item in source)
{
Thread.Sleep(1000);
yield return new MyItem { CurrentThread = Thread.CurrentThread.ManagedThreadId, Line = item };
}
}
private MyItem UpdateResultToDatabase(string processedLine)
{
Thread.Sleep(700);
return new MyItem { Line = "s" + processedLine, CurrentThread = Thread.CurrentThread.ManagedThreadId };
}
private MyItem ProcessLine(string line)
{
Thread.Sleep(4000);
return new MyItem { Line = "p" + line, CurrentThread = Thread.CurrentThread.ManagedThreadId };
}
UIを更新するためだけに使用している次の方法
private void DisplayResults(MyItem myItem, Color color, string message)
{
this.listView1.Items.Add(
new ListViewItem(
new[]
{
message,
myItem.Line ,
myItem.CurrentThread.ToString(),
Thread.CurrentThread.ManagedThreadId.ToString()
}
)
{
ForeColor = color
}
);
}
そして最後に、これはRxAPIを呼び出すメソッドです
private void PlayWithRx()
{
// we init the observavble with the lines read from the file
var source = this.ReadLinesFromFile("some file").ToObservable(Scheduler.TaskPool);
source.ObserveOn(this).Subscribe(x =>
{
// for each line read, we update the UI
this.DisplayResults(x, Color.Red, "Read");
// for each line read, we subscribe the line to the ProcessLine method
var process = Observable.Start(() => this.ProcessLine(x.Line), Scheduler.TaskPool)
.ObserveOn(this).Subscribe(c =>
{
// for each line processed, we update the UI
this.DisplayResults(c, Color.Blue, "Processed");
// for each line processed we subscribe to the final process the UpdateResultToDatabase method
// finally, we update the UI when the line processed has been saved to the database
var persist = Observable.Start(() => this.UpdateResultToDatabase(c.Line), Scheduler.TaskPool)
.ObserveOn(this).Subscribe(z => this.DisplayResults(z, Color.Black, "Saved"));
});
});
}
このプロセスは完全にバックグラウンドで実行されます。これは生成された出力です。