BufferBlock と ActionBlock を使用してプロデューサー/コンシューマー データフロー ブロックをセットアップしましたが、コンソール アプリケーション内で正常に動作しています。
すべてのアイテムを BurfferBlock に追加し、BufferBlock を他のアクション アイテムとリンクした後。それはうまくいっています。
ここで、このデータフロー ブロック パイプラインが常に稼働している内部サービスを使用したいと考えています。また、外部イベントを介してメッセージが利用可能になると、メッセージは bufferblock 内に入り、処理が開始されます。どうすればこれを達成できますか?
これまでのところ、私は以下を行っています:
public void SetupPipeline()
{
FirstBlock = new ActionBlock<WorkItem>(new Action<WorkItem>(ProcessIncomingMessage),
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
});
BufferBlock = new BufferBlock<WorkItem>();
GroupingDataflowBlockOptions GroupingDataflowBlockOptions = new GroupingDataflowBlockOptions();
GroupingDataflowBlockOptions.Greedy = true;
GroupingDataflowBlockOptions.BoundedCapacity = GroupingDataflowBlockOptions.Unbounded;
CancellationTokenSource = new CancellationTokenSource();
CancellationToken = CancellationTokenSource.Token;
GroupingDataflowBlockOptions.CancellationToken = CancellationToken;
BatchBlock = new BatchBlock<WorkItem>(BoundingCapacity, GroupingDataflowBlockOptions);
ProcessItems = new ActionBlock<WorkItem[]>(WorkItems =>
ProcessWorkItems(WorkItems.ToList<WorkItem>()),
new ExecutionDataflowBlockOptions
{
CancellationToken = CancellationToken
});
Timer = new Timer(_ =>
BatchBlock.TriggerBatch()
);
TimingBlock = new TransformBlock<WorkItem, WorkItem>(WorkItem =>
{
Timer.Change(TimerInterval, Timeout.Infinite);
logger.Debug("Inside TimingBlock : " + WorkItem.ToString());
return WorkItem;
}, new ExecutionDataflowBlockOptions
{
CancellationToken = CancellationToken
});
BatchBlock.LinkTo(ProcessItems);
TimingBlock.LinkTo(BatchBlock);
BufferBlock.LinkTo(TimingBlock);
}