軽量で処理中の非同期メッセージバスを探していたところ、TPLデータフローに出くわしました。
私の現在の実装は以下のとおりです(https://gist.github.com/4416655の完全な例)。
public class Bus
{
private readonly BroadcastBlock<object> broadcast =
new BroadcastBlock<object>(message => message);
private readonly ConcurrentDictionary<Guid, IDisposable> subscriptions
= new ConcurrentDictionary<Guid, IDisposable>();
public Task SendAsync<TMessage>(TMessage message)
{
return SendAsync<TMessage>(message, CancellationToken.None);
}
public Task SendAsync<TMessage>(TMessage message, CancellationToken cancellationToken)
{
return broadcast.SendAsync(message, cancellationToken);
}
public Guid Subscribe<TMessage>(Action<TMessage> handlerAction)
{
var handler = new ActionBlock<object>(message => handlerAction((TMessage)message));
var subscription = broadcast.LinkTo(handler,
new DataflowLinkOptions { PropagateCompletion = true },
message => message is TMessage);
return AddSubscription(subscription);
}
public void Unsubscribe(Guid subscriptionId)
{
IDisposable subscription;
if (subscriptions.TryRemove(subscriptionId, out subscription))
{
subscription.Dispose();
}
}
private Guid AddSubscription(IDisposable subscription)
{
var subscriptionId = Guid.NewGuid();
subscriptions.TryAdd(subscriptionId, subscription);
return subscriptionId;
}
}
メッセージングシナリオでのTPLデータフローの使用に関する一般的な質問がいくつかあります。
BroadcastBlock<T>
複数のハンドラーに同時にメッセージを送信するための推奨ソースですか?これは私がこの投稿に基づいて得た結論でした。- 私の実装では
BroadcastBlock<T>
、すべてのメッセージタイプに単一のインスタンスを使用しています。これにより、多数のメッセージを処理するときに問題が発生する可能性がありますか?メッセージタイプごとに個別のインスタンスを作成する必要がありますか? BroadcastBlock<T>
送信された最後のアイテムを常に保存します。これは、新しいサブスクリプション(リンク)にこのメッセージが自動的に渡されることを意味します。この動作を変更することは可能です(新しいサブスクリプションは新しいメッセージのみを受信する必要があります)。私のテストアプリケーションでは、最初のハンドラーに遅延を導入しました。
// Subscribe to Message type var subscription1 = bus.Subscribe<Message>(async m => { await Task.Delay(2000); Console.WriteLine("{0} Handler 1: {1}.", m.TimeStamp, m.Content); });
メッセージを送信するとき、私は各メッセージが2秒刻みでコンソールに1つずつ出力されることを期待していました。代わりに、2秒後にすべてのメッセージが一度に出力されました。これは、基盤となるスケジューラーによって実行される並列処理が原因であると思いますが、これらの設定を変更する方法に興味があります(設定MaxDegreeOfParallelism = 1
に違いはありません)。最後に、メッセージの送信
SendAsync
を待つことはできますが、ターゲット()の完了を待つことはできません。これでいいと思いましたが、そうではないようです。理想的には、メッセージのすべてのハンドラーがいつ実行されたかを知りたいです。ActionBlock<T>
PropagateCompletion
アップデート
期待どおりの動作が得られなかった理由は、これがすべてのハンドラーの処理ではなく、各ハンドラーTask.Delay
の実行を遅らせていたためです。私が必要としていたものでした。Thread.Sleep