メッセージを投稿する BufferBlock があります。
public class DelimitedFileBlock : ISourceBlock<string>
{
private ISourceBlock<string> _source;
_source = new BufferBlock<string>(new DataflowBlockOptions() { BoundedCapacity = 10000 });
//Read a file
While(!eof)
row = read one row
//if consumers are slow, then sleep for a while
while(!(_source as BufferBlock<string>).Post<string>(row))
{
Thread.Sleep(5000);
}
}
これは、2,400 万行の 5 GB のファイルです。
ActionBlock を使用する Target ブロックができました。
public class SolaceTargetBlock : ITargetBlock<string>
private ActionBlock<IBasicDataContract> _publishToSolaceBlock;
public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, string messageValue, ISourceBlock<string> source, bool consumeToAccept)
{
//post to another block to publish
bool success = _publishToSolaceBlock.Post(messageValue);
コンソール アプリケーションでは、次のように指定します。
SolaceTargetBlock solaceTargetBlock1 = new SolaceTargetBlock("someparam",
new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 10, BoundedCapacity = 1 });
SolaceTargetBlock solaceTargetBlock2 = new SolaceTargetBlock("someparam",
new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 10, BoundedCapacity = 1 });
SolaceTargetBlock solaceTargetBlock3 = new SolaceTargetBlock("someparam",
new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 10, BoundedCapacity = 1 });
DelimitedFileBlock delimitedFileBlock = new DelimitedFileBlock(csvFileInfo);
テストのためだけに、制限された容量を 1 のままにしました。
次に、LinkTo を使用して、これら 3 つのコンシューマーをソースにリンクします。
delimitedFileBlock.LinkTo(solaceTargetBlock1);
delimitedFileBlock.LinkTo(solaceTargetBlock2);
delimitedFileBlock.LinkTo(solaceTargetBlock3);
これは、10003 行の後に Thread.Sleep(5000) ステートメントに移動し、while ループの Post は常に false を返します。
私はLinkToを持っているので、solaceTargetBlocksが完了すると次のメッセージを選択できると思っていましたが、LinkToはBufferBlockをクリアしていません。では、どうすれば複数のコンシューマー間で負荷を分散できますか。コンシューマ間で分散するために、単純なロード バランシング ロジックを受信して記述する必要がありますか?