TransformBlock<Uri, string>
(それ自体がの実装であるIPropagatorBlock<Uri, string>
)から始めて、(これはWebクローラーです)Uri
のコンテンツを取得するとします。string
var downloader = new TransformBlock<Uri, string>(async uri => {
// Download and return string asynchronously...
});
文字列のコンテンツを取得したら、リンクを解析します。ページには複数のリンクを含めることができるため、aを使用しTransformManyBlock<string, Uri>
て単一の結果(コンテンツ)を多くのリンクにマップします。
// The discovered item block.
var parser = new TransformManyBlock<string, Uri>(s => {
// Parse the content here, return an IEnumerable<Uri>.
});
パーサーの鍵は、空のシーケンスを返すことができることです。これは、解析する必要のあるアイテムがこれ以上ないことを示します。
ただし、これはツリーのブランチ(またはWebのセクション)の場合のみです。
次に、ダウンローダーをパーサーにリンクしてから、次のようにダウンローダーに戻します。
downloader.LinkTo(parser);
parser.LinkTo(downloader);
今、私はすべてをブロックの外側で停止させることができることを知っています(それらの1つを呼び出すことによって)が、ブロックの内側Complete
からそれが完了したことをどのように知らせることができますか?
それとも、どういうわけか自分でこの状態を管理する必要がありますか?
すべてのコンテンツがダウンロードされて解析された後、ダウンローダーブロックが不足しているため、現在はハングしています。
これが完全に含まれているテストメソッドで、次の呼び出しでハングしますWait
。
[TestMethod]
public void TestSpider()
{
// The list of numbers.
var numbers = new[] { 1, 2 };
// Transforms from an int to a string.
var downloader = new TransformBlock<Tuple<int, string>, string>(
t => t.Item2 + t.Item1.ToString(CultureInfo.InvariantCulture),
// Let's assume four downloads to a domain at a time.
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 }
);
// Gets the next set of strings.
var parser = new TransformManyBlock<string, Tuple<int, string>>(s => {
// If the string length is greater than three, return an
// empty sequence.
// This is the signal for this branch to stop.
if (s.Length > 3) return Enumerable.Empty<Tuple<int, string>>();
// Branch out.
return numbers.Select(n => new Tuple<int, string>(n, s));
},
// These are simple transformations/parsing, no need to not parallelize.
// The dataflow blocks will handle the task allocation.
new ExecutionDataflowBlockOptions {
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
});
// For broadcasting to an action.
var parserBroadcaster = new BroadcastBlock<Tuple<int, string>>(
// Clone.
t => new Tuple<int, string>(t.Item1, t.Item2));
// Indicate what was parsed.
var parserConsumer = new ActionBlock<Tuple<int, string>>(
t => Debug.WriteLine(
string.Format(CultureInfo.InvariantCulture,
"Consumed - Item1: {0}, Item2: \"{1}\"",
t.Item1, t.Item2)));
// Link downloader to parser.
downloader.LinkTo(parser);
// Parser to broadcaster.
parser.LinkTo(parserBroadcaster);
// Broadcaster to consumer.
parserBroadcaster.LinkTo(parserConsumer);
// Broadcaster back to the downloader.
parserBroadcaster.LinkTo(downloader);
// Start the downloader.
downloader.Post(new Tuple<int, string>(1, ""));
// Wait on the consumer to finish.
parserConsumer.Completion.Wait();
}
(予想どおり、ハングする前の)出力は次のとおりです。
Consumed - Item1: 1, Item2: "1"
Consumed - Item1: 2, Item2: "1"
Consumed - Item1: 1, Item2: "11"
Consumed - Item1: 2, Item2: "11"
Consumed - Item1: 1, Item2: "12"
Consumed - Item1: 2, Item2: "12"
Consumed - Item1: 1, Item2: "111"
Consumed - Item1: 2, Item2: "111"
Consumed - Item1: 1, Item2: "112"
Consumed - Item1: 2, Item2: "112"
Consumed - Item1: 1, Item2: "121"
Consumed - Item1: 2, Item2: "121"
Consumed - Item1: 1, Item2: "122"
Consumed - Item1: 2, Item2: "122"