大きなテキスト ファイルを読み取り、検索語を含む行を検索するための小さなユーティリティを作成しました。これを TPL Dataflow を学ぶ機会として利用しています。
検索用語がファイルの最後近くにない限り、コードは正常に機能します。その場合、ブレークポイントが存在しない限りuiResult
、アクション ブロックは呼び出されません。
私の理解では、データは from に投稿されuiResult
、searcher
その後完了searcher
します (データの最後のブロックを処理しました)。データはポストされているためuiResult
、そのデータが処理されるまで完全にはなりません。
質問
uiResult
データがポストされているのに完了になるのはなぜですか(でブレークポイントが設定されていない場合uiResult
)。
コード
可能な限り切り詰めた関連コードを次に示します。
ActionBlock<LineInfo> uiResult = new ActionBlock<LineInfo>(li =>
{
// If match found near end of file, the following line only runs
// if a breakpoint is set on it:
if (results != null) results.Add(li);
},
new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = 1,
CancellationToken = cancelSource.Token,
TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext()
});
BatchBlock<LineInfo> batcher = new BatchBlock<LineInfo>(5000);
ActionBlock<LineInfo[]> searcher = new ActionBlock<LineInfo[]>(lines =>
{
foreach (LineInfo li in lines)
{
if (li.TextOfLine.Contains(searchTerm))
{
uiResult.Post(li);
}
}
},
new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = 1,
CancellationToken = cancelSource.Token
});
batcher.LinkTo(searcher);
batcher.Completion.ContinueWith(t =>
{
if (t.IsFaulted) ((IDataflowBlock)searcher).Fault(t.Exception);
else searcher.Complete();
if (t.IsFaulted) ((IDataflowBlock)uiResult).Fault(t.Exception);
else uiResult.Complete();
});
Task.Run(() =>
{
using (FileStream fs = File.Open(filePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite))
using (BufferedStream bs = new BufferedStream(fs))
using (StreamReader sr = new StreamReader(bs))
{
string line;
while ((line = sr.ReadLine()) != null && cancelSource.IsCancellationRequested == false)
{
batcher.Post(new LineInfo() { LineNumber = lineNumber, OffsetOfLine = offset, TextOfLine = line });
}
batcher.Complete();
try
{
searcher.Completion.Wait();
uiResult.Completion.Wait();
}
catch (AggregateException ae)
{
TaskCanceledException taskCancelled = ae.InnerException as TaskCanceledException;
if (taskCancelled != null)
{
// Swallow the Exception if is just a user cancellation
throw;
}
}
finally
{
signalDone();
}
}
});