3

TransformBlock<Uri, string>(それ自体がの実装であるIPropagatorBlock<Uri, string>)から始めて、(これはWebクローラーです)Uriのコンテンツを取得するとします。string

var downloader = new TransformBlock<Uri, string>(async uri => {
    // Download and return string asynchronously...
});

文字列のコンテンツを取得したら、リンクを解析します。ページには複数のリンクを含めることができるため、aを使用しTransformManyBlock<string, Uri>て単一の結果(コンテンツ)を多くのリンクにマップします。

// The discovered item block.
var parser = new TransformManyBlock<string, Uri>(s => {
    // Parse the content here, return an IEnumerable<Uri>.
});

パーサーの鍵は、空のシーケンスを返すことができることです。これは、解析する必要のあるアイテムがこれ以上ないことを示します。

ただし、これはツリーのブランチ(またはWebのセクション)の場合のみです。

次に、ダウンローダーをパーサーにリンクしてから、次のようにダウンローダーに戻します。

downloader.LinkTo(parser);
parser.LinkTo(downloader);

今、私はすべてをブロックの外側で停止させることができることを知っています(それらの1つを呼び出すことによって)が、ブロックの内側Completeからそれが完了したことをどのように知らせることができますか?

それとも、どういうわけか自分でこの状態を管理する必要がありますか?

すべてのコンテンツがダウンロードされて解析された後、ダウンローダーブロックが不足しているため、現在はハングしています。

これが完全に含まれているテストメソッドで、次の呼び出しでハングしますWait

[TestMethod]
public void TestSpider()
{
    // The list of numbers.
    var numbers = new[] { 1, 2 };

    // Transforms from an int to a string.
    var downloader = new TransformBlock<Tuple<int, string>, string>(
        t => t.Item2 + t.Item1.ToString(CultureInfo.InvariantCulture),

        // Let's assume four downloads to a domain at a time.
        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 }
    );

    // Gets the next set of strings.
    var parser = new TransformManyBlock<string, Tuple<int, string>>(s => {
        // If the string length is greater than three, return an
        // empty sequence.
        // This is the signal for this branch to stop.
        if (s.Length > 3) return Enumerable.Empty<Tuple<int, string>>();

        // Branch out.
        return numbers.Select(n => new Tuple<int, string>(n, s));
    }, 
    // These are simple transformations/parsing, no need to not parallelize.
    // The dataflow blocks will handle the task allocation.
    new ExecutionDataflowBlockOptions {
        MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
    });

    // For broadcasting to an action.
    var parserBroadcaster = new BroadcastBlock<Tuple<int, string>>(
        // Clone.
        t => new Tuple<int, string>(t.Item1, t.Item2));

    // Indicate what was parsed.
    var parserConsumer = new ActionBlock<Tuple<int, string>>(
        t => Debug.WriteLine(
            string.Format(CultureInfo.InvariantCulture, 
                "Consumed - Item1: {0}, Item2: \"{1}\"",
            t.Item1, t.Item2)));

    // Link downloader to parser.
    downloader.LinkTo(parser);

    // Parser to broadcaster.
    parser.LinkTo(parserBroadcaster);

    // Broadcaster to consumer.
    parserBroadcaster.LinkTo(parserConsumer);

    // Broadcaster back to the downloader.
    parserBroadcaster.LinkTo(downloader);

    // Start the downloader.
    downloader.Post(new Tuple<int, string>(1, ""));

    // Wait on the consumer to finish.
    parserConsumer.Completion.Wait();
}

(予想どおり、ハングする前の)出力は次のとおりです。

Consumed - Item1: 1, Item2: "1"
Consumed - Item1: 2, Item2: "1"
Consumed - Item1: 1, Item2: "11"
Consumed - Item1: 2, Item2: "11"
Consumed - Item1: 1, Item2: "12"
Consumed - Item1: 2, Item2: "12"
Consumed - Item1: 1, Item2: "111"
Consumed - Item1: 2, Item2: "111"
Consumed - Item1: 1, Item2: "112"
Consumed - Item1: 2, Item2: "112"
Consumed - Item1: 1, Item2: "121"
Consumed - Item1: 2, Item2: "121"
Consumed - Item1: 1, Item2: "122"
Consumed - Item1: 2, Item2: "122"
4

1 に答える 1

2

TPL データフロー フレームワークには、これをすぐに処理できるものはありません。それは、状態管理の問題です。

とはいえ、重要なのは、ダウンロードされた URL と、まだダウンロードが必要な URL を追跡することです。

これを処理する理想的な場所は、パーサー ブロックです。これは、コンテンツ (ダウンロードするためのリンクに変換されます) と、コンテンツのダウンロード元の URL があるポイントです。

上記のサンプルに取り組んで、ダウンロード結果とそれがダウンロードされた URI を取得する方法を導入する必要があります (私は a を使用していましたTupleが、混乱を招きすぎていたでしょう):

public class DownloadResult
{
    public Tuple<int, string> Uri { get; set; }
    public string Content { get; set; }
}

そこから、ダウンロード ブロックはほとんど同じですが、上記の構造を出力するように更新されているだけです。

[TestMethod]
public void TestSpider2()
{
    // The list of numbers.
    var numbers = new[] { 1, 2 };

    // Performs the downloading.
    var downloader = new TransformBlock<Tuple<int, string>, DownloadResult>(
        t => new DownloadResult { 
            Uri = t, 
            Content = t.Item2 + 
                t.Item1.ToString(CultureInfo.InvariantCulture) 
        },

        // Let's assume four downloads to a domain at a time.
        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 }
    );

パーサーの消費者は変更する必要はありませんが、以前に宣言する必要があります。これは、パーサーが消費を停止する必要があることを消費者に通知する必要があり、パーサーに渡されたクロージャーでそれをキャプチャする必要があるためです。

// Indicate what was parsed.
var parserConsumer = new ActionBlock<Tuple<int, string>>(
    t => Debug.WriteLine(
        string.Format(CultureInfo.InvariantCulture, 
            "Consumed - Item1: {0}, Item2: \"{1}\"",
            t.Item1, t.Item2)));

ここで、ステート マネージャーを導入する必要があります。

// The dictionary indicating what needs to be processed.
var itemsToProcess = new HashSet<Tuple<int, string>>();

最初は、 だけで行こうと思ったのですConcurrentDictionary<TKey, TValue>が、削除複数の追加の前後でアトミック操作を実行する必要があり、必要なものが提供されませんでした。ここでは、簡単なlockステートメントが最適なオプションです。

パーサーは、最も変化するものです。アイテムは通常どおり解析されますが、次のこともアトミックに実行されます。

それは次のようになります。

// Changes content into items and new URLs to download.
var parser = new TransformManyBlock<DownloadResult, Tuple<int, string>>(
    r => {
        // The parsed items.
        IEnumerable<Tuple<int, string>> parsedItems;

        // If the string length is greater than three, return an
        // empty sequence.
        // This is the signal for this branch to stop.
        parsedItems = (r.Uri.Item2.Length > 3) ? 
            Enumerable.Empty<Tuple<int, string>>() :
            numbers.Select(n => new Tuple<int, string>(n, r.Content));

        // Materialize the list.
        IList<Tuple<int, string>> materializedParsedItems = 
            parsedItems.ToList();

        // Lock here, need to make sure the removal from
        // from the items to process dictionary and
        // the addition of the new items are atomic.
        lock (itemsToProcess)
        {
            // Remove the item.
            itemsToProcess.Remove(r.Uri);

            // If the materialized list has zero items, and the new
            // list has zero items, finish the action block.
            if (materializedParsedItems.Count == 0 && 
                itemsToProcess.Count == 0)
            {
                // Complete the consumer block.
                parserConsumer.Complete();
            }

            // Add the items.
            foreach (Tuple<int, string> newItem in materializedParsedItems) 
                itemsToProcess.Add(newItem);

                // Return the items.
                return materializedParsedItems;
            }
        }, 

        // These are simple transformations/parsing, no need to not 
        // parallelize.  The dataflow blocks will handle the task 
        // allocation.
        new ExecutionDataflowBlockOptions {
            MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
        });

ブロードキャスターとリンクは同じです。

// For broadcasting to an action.
var parserBroadcaster = new BroadcastBlock<Tuple<int, string>>(
    // Clone.
    t => new Tuple<int, string>(t.Item1, t.Item2));

// Link downloader to parser.
downloader.LinkTo(parser);

// Parser to broadcaster.
parser.LinkTo(parserBroadcaster);

// Broadcaster to consumer.
parserBroadcaster.LinkTo(parserConsumer);

// Broadcaster back to the downloader.
parserBroadcaster.LinkTo(downloader);

Postブロックを開始するとき、ルートがメソッドに渡される前に、ダウンロードする URL をステート マシンに準備する必要があります。

// The initial post to download.
var root = new Tuple<int, string>(1, "");

// Add to the items to process.
itemsToProcess.Add(root);

// Post to the downloader.
downloader.Post(root);

そして、クラスのWaitメソッドへの呼び出しは同じで、ハングすることなく完了するようになりました:Task

    // Wait on the consumer to finish.
    parserConsumer.Completion.Wait();
}
于 2012-11-04T15:34:53.353 に答える