6

メッセージを投稿する BufferBlock があります。

public class DelimitedFileBlock : ISourceBlock<string>
{
    private ISourceBlock<string> _source;
    _source = new BufferBlock<string>(new DataflowBlockOptions() { BoundedCapacity = 10000 });

    //Read a file
    While(!eof)
        row = read one row 
    //if consumers are slow, then sleep for a while
    while(!(_source as BufferBlock<string>).Post<string>(row))
    {
        Thread.Sleep(5000);
    }
}

これは、2,400 万行の 5 GB のファイルです。

ActionBlock を使用する Target ブロックができました。

public class SolaceTargetBlock : ITargetBlock<string>
       private ActionBlock<IBasicDataContract> _publishToSolaceBlock;

       public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, string messageValue, ISourceBlock<string> source, bool consumeToAccept)
    {
        //post to another block to publish
        bool success = _publishToSolaceBlock.Post(messageValue);

コンソール アプリケーションでは、次のように指定します。

 SolaceTargetBlock solaceTargetBlock1 = new SolaceTargetBlock("someparam", 
            new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 10, BoundedCapacity = 1 });
 SolaceTargetBlock solaceTargetBlock2 = new SolaceTargetBlock("someparam", 
            new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 10, BoundedCapacity = 1 });
 SolaceTargetBlock solaceTargetBlock3 = new SolaceTargetBlock("someparam", 
            new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 10, BoundedCapacity = 1 });

 DelimitedFileBlock delimitedFileBlock = new DelimitedFileBlock(csvFileInfo);

テストのためだけに、制限された容量を 1 のままにしました。

次に、LinkTo を使用して、これら 3 つのコンシューマーをソースにリンクします。

 delimitedFileBlock.LinkTo(solaceTargetBlock1);      
 delimitedFileBlock.LinkTo(solaceTargetBlock2);      
 delimitedFileBlock.LinkTo(solaceTargetBlock3);      

これは、10003 行の後に Thread.Sleep(5000) ステートメントに移動し、while ループの Post は常に false を返します。

私はLinkToを持っているので、solaceTargetBlocksが完了すると次のメッセージを選択できると思っていましたが、LinkToはBufferBlockをクリアしていません。では、どうすれば複数のコンシューマー間で負荷を分散できますか。コンシューマ間で分散するために、単純なロード バランシング ロジックを受信して​​記述する必要がありますか?

4

1 に答える 1

13

クラスのPostメソッドのドキュメントから(強調鉱山):DataflowBlock<T>

このメソッドは、ターゲット ブロックがアイテムを受け入れるか拒否するかを決定すると戻ります。

これは、ターゲットがブロックを拒否することを選択できることを意味します (これは、あなたが見ている動作です)。

さらに、次のように述べています。

提供されたメッセージの延期をサポートするターゲット ブロック、または Post 実装でより多くの処理を行う可能性のあるブロックの場合は、SendAsync の使用を検討してください。これにより、すぐに戻り、ポストされたメッセージをターゲットが延期し、SendAsync が戻った後にそれを消費できるようになります。

これは、メッセージが完全に拒否されるのではなく、延期されても処理される可能性があるという点で、(ターゲット ブロックに応じて) より良い結果が得られる可能性があることを意味します。

と3 つのインスタンスの両方のBoundedCapacityプロパティ設定が、表示されているものと関係があると思います。BufferBlock<T>ActionBlock<TInput>

  • の最大バッファBufferBlock<T>は 10000 です。10,000 個のアイテムをキューに入れると、残りを処理できないため (上記の 2 番目の引用を参照)、残りを拒否します (SendAsync延期するメッセージをバッファリングできないため、ここでも機能しません)。

  • インスタンスの最大バッファActionBlock<TInput>は 1 で、そのうちの 3 つがあります。

10,000 + (1 * 3) = 10,000 + 3 = 10,003

これを回避するには、いくつかのことを行う必要があります。

まず、インスタンスを作成するときに、MaxDegreeOfParallelismプロパティに適切な値を設定する必要があります。ExecutionDataFlowBlockOptionsActionBlock<TInput>

デフォルトでは、MaxDegreeOfParallelismfor anActionBlock<TInput>は 1 に設定されています。これにより、呼び出しがシリアル化されることが保証され、スレッドセーフについて心配する必要がなくなります。ActionBlock<T>でスレッドセーフを考慮したい場合は、この設定をそのままにしてください。

ActionBlock<TInput> スレッド セーフである場合は、スロットリングする理由がなく、 に設定MaxDegreeOfParallelismする必要がありますDataflowBlockOptions.Unbounded

ActionBlock<TInput>制限付きで同時にアクセスできる何らかの共有リソースにアクセスしている場合は、間違ったことをしている可能性があります。

ある種の共有リソースがある場合は、別のブロックを介して実行し、そのMaxDegreeOfParallelismで を設定する必要があります。

次に、スループットに関心があり、ドロップされたアイテムに問題がない場合は、BoundedCapacityプロパティを設定する必要があります。

また、「消費者が遅い場合は、しばらく寝る」ことを示していることに注意してください。ブロックを正しく配線する場合、これを行う理由はありません。データが流れるようにして、必要な場所にのみ制限を配置する必要があります。生産者は消費者の調整に責任を負うべきではありません。消費者に調整の責任を負わせてください。

最後に、あなたのコードは、データフロー ブロック インターフェイスを自分で実装する必要があるようには見えません。次のように構築できます。

// The source, your read lines will be posted here.
var delimitedFileBlock = new BufferBlock<string>();

// The Action for the action blocks.
Action<string> action = 
    s => { /* Do something with the string here. */ };

// Create the action blocks, assuming that
// action is thread-safe, no need to have it process one at a time
// or to bound the capacity.
var solaceActionBlock1 = new ActionBlock<string>(action,
    new ExecutionDataflowBlockOptions { 
        MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
    });
var solaceActionBlock2 = new ActionBlock<string>(action,
    new ExecutionDataflowBlockOptions { 
        MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
    });
var solaceActionBlock3 = new ActionBlock<string>(action,
    new ExecutionDataflowBlockOptions { 
        MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
    });

// Link everything.
delimitedFileBlock.LinkTo(solaceTargetBlock1);
delimitedFileBlock.LinkTo(solaceTargetBlock2);
delimitedFileBlock.LinkTo(solaceTargetBlock3);

// Now read the file, and post to the BufferBlock<T>:
// Note: This is pseudo-code.
while (!eof)
{
    // Read the row.
    string row = ...;

    delimitedFileBlock.Post(read);
}

またActionBlock<TInput>、出力を別のアクションにフィルターする必要がない限り (ここでは行っていません)、3 つのインスタンスを持つことは不必要であることに注意してください。とにかく増加MaxDegreeOfParallelismする):Unbounded

// The source, your read lines will be posted here.
var delimitedFileBlock = new BufferBlock<string>();

// The Action for the action blocks.
Action<string> action = 
    s => { /* Do something with the string here. */ };

// Create the action blocks, assuming that
// action is thread-safe, no need to have it process one at a time
// or to bound the capacity.
var solaceActionBlock1 = new ActionBlock<string>(action,
    new ExecutionDataflowBlockOptions { 
        MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
    });

// Link everything.
delimitedFileBlock.LinkTo(solaceTargetBlock);

// Now read the file, and post to the BufferBlock<T>:
// Note: This is pseudo-code.
while (!eof)
{
    // Read the row.
    string row = ...;

    delimitedFileBlock.Post(read);
}
于 2012-11-16T21:09:13.653 に答える