5

軽量で処理中の非同期メッセージバスを探していたところ、TPLデータフローに出くわしました。

私の現在の実装は以下のとおりです(https://gist.github.com/4416655の完全な例)。

public class Bus
{
    private readonly BroadcastBlock<object> broadcast =
        new BroadcastBlock<object>(message => message);

    private readonly ConcurrentDictionary<Guid, IDisposable> subscriptions
        = new ConcurrentDictionary<Guid, IDisposable>();

    public Task SendAsync<TMessage>(TMessage message)
    {
        return SendAsync<TMessage>(message, CancellationToken.None);
    }

    public Task SendAsync<TMessage>(TMessage message, CancellationToken cancellationToken)
    {
        return broadcast.SendAsync(message, cancellationToken);
    }

    public Guid Subscribe<TMessage>(Action<TMessage> handlerAction)
    {
        var handler = new ActionBlock<object>(message => handlerAction((TMessage)message));

        var subscription = broadcast.LinkTo(handler, 
            new DataflowLinkOptions { PropagateCompletion = true }, 
            message => message is TMessage);

        return AddSubscription(subscription);
    }

    public void Unsubscribe(Guid subscriptionId)
    {
        IDisposable subscription;
        if (subscriptions.TryRemove(subscriptionId, out subscription))
        {
            subscription.Dispose();
        }
    }

    private Guid AddSubscription(IDisposable subscription)
    {
        var subscriptionId = Guid.NewGuid();
        subscriptions.TryAdd(subscriptionId, subscription);
        return subscriptionId;
    }
}

メッセージングシナリオでのTPLデータフローの使用に関する一般的な質問がいくつかあります。

  • BroadcastBlock<T>複数のハンドラーに同時にメッセージを送信するための推奨ソースですか?これは私がこの投稿に基づいて得た結論でした。
  • 私の実装ではBroadcastBlock<T>、すべてのメッセージタイプに単一のインスタンスを使用しています。これにより、多数のメッセージを処理するときに問題が発生する可能性がありますか?メッセージタイプごとに個別のインスタンスを作成する必要がありますか?
  • BroadcastBlock<T>送信された最後のアイテムを常に保存します。これは、新しいサブスクリプション(リンク)にこのメッセージが自動的に渡されることを意味します。この動作を変更することは可能です(新しいサブスクリプションは新しいメッセージのみを受信する必要があります)。
  • 私のテストアプリケーションでは、最初のハンドラーに遅延を導入しました。

        // Subscribe to Message type
        var subscription1 = bus.Subscribe<Message>(async m => { 
            await Task.Delay(2000);
            Console.WriteLine("{0} Handler 1: {1}.", m.TimeStamp, m.Content);
        });
    

    メッセージを送信するとき、私は各メッセージが2秒刻みでコンソールに1つずつ出力されることを期待していました。代わりに、2秒後にすべてのメッセージが一度に出力されました。これは、基盤となるスケジューラーによって実行される並列処理が原因であると思いますが、これらの設定を変更する方法に興味があります(設定MaxDegreeOfParallelism = 1に違いはありません)。

  • 最後に、メッセージの送信SendAsyncを待つことはできますが、ターゲット()の完了を待つことはできません。これでいいと思いましたが、そうではないようです。理想的には、メッセージのすべてのハンドラーがいつ実行されたかを知りたいです。ActionBlock<T>PropagateCompletion

アップデート

期待どおりの動作が得られなかった理由は、これがすべてのハンドラーの処理ではなく、ハンドラーTask.Delayの実行を遅らせていたためです。私が必要としていたものでした。Thread.Sleep

4

1 に答える 1

7

質問に答えた後(以下を参照)、TPLDataflowブロックを使用して設計をモデル化することはおそらく良い考えではないことに気付きました。TDFは、単一のメッセージを追跡する組み込みの方法がなく、ほとんど独立したブロックによるメッセージの処理に適しています。しかし、それはあなたが望んでいるように思われることです。各メッセージの完了を追跡しながら、ハンドラーによってメッセージを順番に処理します。

そのため、データフローネットワーク全体を作成するのではなく、単一のネットワークをActionBlock非同期メッセージプロセッサとして使用する必要があると思います。

public class Bus
{
    class Subscription
    {
        public Guid Id { get; private set; }
        public Func<object, Task> HandlerAction { get; private set; }

        public Subscription(Guid id, Func<object, Task> handlerAction)
        {
            Id = id;
            HandlerAction = handlerAction;
        }
    }

    private readonly ConcurrentQueue<Subscription> m_handlersToSubscribe = new ConcurrentQueue<Subscription>();
    private readonly ConcurrentQueue<Guid> m_idsToUnsubscribe = new ConcurrentQueue<Guid>();

    private readonly ActionBlock<Tuple<object, Action>> m_messageProcessor;

    public Bus()
    {
        // subscriptions is accessed only from the (single-threaded) ActionBlock, so it is thread-safe
        var subscriptions = new List<Subscription>();

        m_messageProcessor = new ActionBlock<Tuple<object, Action>>(
            async tuple =>
            {
                var message = tuple.Item1;
                var completedAction = tuple.Item2;

                // could be made more efficient, probably doesn't matter
                Guid idToUnsubscribe;
                while (m_idsToUnsubscribe.TryDequeue(out idToUnsubscribe))
                {
                    subscriptions.RemoveAll(s => s.Id == idToUnsubscribe);
                }

                Subscription handlerToSubscribe;
                while (m_handlersToSubscribe.TryDequeue(out handlerToSubscribe))
                {
                    subscriptions.Add(handlerToSubscribe);
                }

                foreach (var subscription in subscriptions)
                {
                    await subscription.HandlerAction(message);
                }

                completedAction();
            });
    }

    public Task SendAsync<TMessage>(TMessage message)
    {
        var tcs = new TaskCompletionSource<bool>();
        Action completedAction = () => tcs.SetResult(true);

        m_messageProcessor.Post(new Tuple<object, Action>(message, completedAction));

        return tcs.Task;
    }

    public Guid Subscribe<TMessage>(Action<TMessage> handlerAction)
    {
        return Subscribe<TMessage>(
            message =>
            {
                handlerAction(message);
                // we need a completed non-generic Task; this is a simple, efficient way to get it
                // another option would be to use async lambda with no await,
                // but that's less efficient and produces a warning
                return Task.FromResult(false);
            });
    }

    public Guid Subscribe<TMessage>(Func<TMessage, Task> handlerAction)
    {
        Func<object, Task> actionWithCheck = async message =>
        {
            if (message is TMessage)
                await handlerAction((TMessage)message);
        };

        var id = Guid.NewGuid();
        m_handlersToSubscribe.Enqueue(new Subscription(id, actionWithCheck));
        return id;
    }

    public void Unsubscribe(Guid subscriptionId)
    {
        m_idsToUnsubscribe.Enqueue(subscriptionId);
    }
}

(メッセージの処理中にハンドラーのリストが変更されないように、サブスクライブとサブスクライブ解除にキューを使用することにしました。)

あなたの質問への回答

BroadcastBlock<T>複数のハンドラーに同時にメッセージを送信するための推奨ソースですか?

ええ、一見すると、それBroadcastBlock<T>はあなたが望むもののように聞こえます。確かに、TPLデータフローに直接同様のブロックはありません。

私の実装では、すべてのメッセージタイプに単一のBroadcastBlockインスタンスを使用しています。これにより、多数のメッセージを処理するときに問題が発生する可能性がありますか?メッセージタイプごとに個別のインスタンスを作成する必要がありますか?

すべてのメッセージタイプに単一のブロックを使用すると、単一のスレッドでより多くの作業(すべてのハンドラーに送信)を実行できます。メッセージタイプごとに1つのブロックを使用すると、複数のスレッドで実行できる作業が少なくなります(正しいハンドラーにのみ送信されます)。そのため、後者の方が速いと考えるのが妥当だと思います。

ただし、アプリケーションのパフォーマンス最適化のルールを忘れないでください。まず、シンプルで読みやすいコードを記述します。実際に遅いことが判明した場合にのみ、最適化してみてください。また、2つの選択肢を比較するときは、常にプロファイリングを使用して、どちらが実際に高速であるかを判断します。どちらが高速であるかを推測するだけではありません。

BroadcastBlock<T>送信された最後のアイテムを常に保存します。これは、新しいサブスクリプション(リンク)にこのメッセージが自動的に渡されることを意味します。この動作を変更することは可能ですか(新しいサブスクリプションは新しいメッセージのみを受信する必要があります)?

いいえ、それBroadcastBlock<T>を行うように構成する方法はありません。のすべての機能が必要ない場合BroadcastBlock<T>(容量が制限されているブロックに送信し、一時的にいっぱいになる可能性があり、欲張りでないブロックをターゲットとしてサポートする)、BroadcastBlock<T>これを行うためのカスタムバージョンを作成することをお勧めします。

メッセージを送信するとき、私は各メッセージが2秒刻みでコンソールに1つずつ出力されることを期待していました。代わりに、2秒後にすべてのメッセージが一度に出力されました。これは、基盤となるスケジューラーによって実行される並列処理が原因であると思いますが、これらの設定を変更する方法に興味があります(設定MaxDegreeOfParallelism = 1に違いはありません)。

TDFのポイントの1つは、各ブロックが独立しているため、複数のブロックを複数のスレッドで実行できることです。ActionBlock<T>それが望ましくない場合は、ハンドラーごとに個別に使用するのが最善の解決策ではない可能性があります。実際、TDFはまったく最善の解決策ではないかもしれません。

また、Subscribe()を受け入れますAction<TMessage>。これは、ラムダがメソッドとしてコンパイルされることを意味しますasync void。これらは、他のオプションがない特定の(そして比較的まれな)場合にのみ使用する必要があります。asyncハンドラーをサポートする場合は、async Taskメソッドを受け入れる必要がありますFunc<TMessage, Task>

期待どおりの動作が得られなかった理由は、これがすべてのハンドラーの処理ではなく、ハンドラーTask.Delayの実行を遅らせていたためです。私が必要としていたものでした。Thread.Sleep

を使用Thread.Sleep()すると、非同期の概念全体に反するため、可能であれば使用しないでください。また、実際には期待どおりに機能したとは思いません。各スレッドに遅延が発生しましたが、TPL Dataflowは複数のスレッドを使用するため、意図したとおりに動作しません。

最後にSendAsync、メッセージの送信を待つことはできますが、ターゲット()の完了を待つことはできませんActionBlock<T>。これでいいPropagateCompletionと思いましたが、そうではないようです。理想的には、メッセージのすべてのハンドラーがいつ実行されたかを知りたいです。

PropagateCompletion、と一緒Complete()Completion、単一のメッセージの処理ではなく、ブロック全体の完了を処理するためのものです。その理由の1つは、より複雑なデータフローネットワークの場合、メッセージがいつ正確に処理されるかが明確でない場合があります。たとえば、メッセージがの現在のすべてのターゲットにすでに送信されているがBroadcastBlock<T>、新しく追加されたすべてのターゲットにも送信される場合、メッセージは完了したと見なす必要がありますか?

これを実行したい場合は、おそらくを使用して、何らかの方法で手動で実行する必要がありTaskCompletionSourceます。

于 2012-12-31T16:44:50.697 に答える