19

特定のアクティビティを使用して、ワークフローから Service Bus キューとトピックにアクセスしたいと考えています。

このシナリオに適合するものは見つかりませんでした (この MSDN の記事Roman Kiss によるこの記事) が最も近いものです。

async/await パターンで実装された BeginReceive メソッドを使用して、QueueClient を使用して仲介メッセージを非同期的に受信するカスタム アクティビティを設計したいと思います (それについての私の質問を参照してください)。

まず最初に、希望する方法 (QueueClient を使用) ではなく、提案された方法 (適応された WCF) を好む理由があるかどうかを尋ねたいと思います。

次に、永続化に適した方法で設計するのを手伝っていただければ幸いです。

アップデート:

これは私がこれまでに試したことです:

public class AsyncReceiveBrokeredMessage : AsyncCodeActivity<BrokeredMessage>
{
    [RequiredArgument]
    public InArgument<string> ConnectionString { get; set; }

    [RequiredArgument]
    public InArgument<string> Path { get; set; }

    protected sealed override IAsyncResult BeginExecute(AsyncCodeActivityContext context, AsyncCallback callback, object state)
    {
        var connectionString = this.ConnectionString.Get(context);
        var path = this.Path.Get(context);
        var queueClient = QueueClient.CreateFromConnectionString(connectionString, path);
        var cts = new CancellationTokenSource();
        context.UserState = new ReceiveState
                                {
                                    CancellationTokenSource = cts,
                                    QueueClient = queueClient
                                };
        var task = ExecuteAsync(context, cts.Token);
        var tcs = new TaskCompletionSource<BrokeredMessage>(state);
        task.ContinueWith(
            t =>
                {
                    if (t.IsFaulted)
                    {
                        tcs.TrySetException(t.Exception.InnerExceptions);
                    }
                    else if (t.IsCanceled)
                    {
                        tcs.TrySetCanceled();
                    }
                    else
                    {
                        tcs.TrySetResult(t.Result);
                    }

                    if (callback != null)
                    {
                        callback(tcs.Task);
                    }
                });

        return tcs.Task;
    }

    protected sealed override BrokeredMessage EndExecute(AsyncCodeActivityContext context, IAsyncResult result)
    {
        var task = (Task<BrokeredMessage>)result;
        try
        {
            return task.Result;
        }
        catch (OperationCanceledException)
        {
            if (context.IsCancellationRequested)
            {
                context.MarkCanceled();
            }
            else
            {
                throw;
            }

            return null; // or throw?
        }
        catch (AggregateException exception)
        {
            if (exception.InnerException is OperationCanceledException)
            {
                if (context.IsCancellationRequested)
                {
                    context.MarkCanceled();
                }
                else
                {
                    throw;
                }

                return null; // or throw?
            }

            ExceptionDispatchInfo.Capture(exception.InnerException).Throw();
            throw;
        }
    }

    protected override void Cancel(AsyncCodeActivityContext context)
    {
        var state = (ReceiveState)context.UserState;
        state.CancellationTokenSource.Cancel();
    }

    private async Task<BrokeredMessage> ExecuteAsync(
        AsyncCodeActivityContext context, CancellationToken cancellationToken)
    {
        var receiveState = context.UserState as ReceiveState;
        var receiveTask = Task<BrokeredMessage>.Factory.FromAsync(
            receiveState.QueueClient.BeginReceive, receiveState.QueueClient.EndReceive, null);
        var completionTask = receiveTask.ContinueWith(
             t =>
                 {
                     BrokeredMessage result;
                     if (t.IsCanceled)
                     {
                         context.MarkCanceled();
                         result = null;
                     }
                     else if (t.IsFaulted)
                     {
                         result = null;
                     }
                     else
                     {

                         t.Result.Complete();
                         result = t.Result;
                     }

                     receiveState.QueueClient.Close();
                     return result;
                 },
             cancellationToken);
        return await completionTask;
    }

    private class ReceiveState
    {
        public CancellationTokenSource CancellationTokenSource { get; set; }

        public QueueClient QueueClient { get; set; }
    }
}

そして、この方法でテストしました (ローカル Windows Server Service Bus を使用):

var connectionString = new Variable<string>
                                   {
                                       Default = connectionStringValue
                                   };
        var path = new Variable<string>
                       {
                           Default = pathValue
                       };
        var test = new While
                       {
                           Body =
                               new Pick
                                   {
                                       Branches =
                                           {
                                               new PickBranch
                                                   {
                                                       Trigger =
                                                           new AsyncReceiveBrokeredMessage
                                                               {
                                                                   ConnectionString = new InArgument<string>(connectionString),
                                                                   Path = new InArgument<string>(path)
                                                               },
                                                       Action =
                                                           new WriteLine
                                                               {
                                                                   Text =
                                                                       "Received message"
                                                               }
                                                   },
                                               new PickBranch
                                                   {
                                                       Trigger =
                                                           new Delay
                                                               {
                                                                   Duration = TimeSpan.FromSeconds(10)
                                                               },
                                                       Action =
                                                           new WriteLine
                                                               {
                                                                   Text =
                                                                       "Timeout!"
                                                               }
                                                   }
                                           }
                                   },
                           Condition = true,
                           Variables = { connectionString, path }
                       };
        WorkflowInvoker.Invoke(test);

メッセージを継続的に送信すると、期待どおりにメッセージを受信します。最初のタイムアウトの後に問題が発生します。これは、メッセージを受信して​​いないためです。説明をいただければ幸いです。

4

3 に答える 3

1

まず、いくつかの重要なことを知っておく必要があります: 1) ワークフローは、後で一時停止して復元できるようにするための長時間実行されるプロセスです。2) ワークフローを起動して復元する方法はブックマークです。3) 通常、人々は一時停止中もワークフローが持続可能であることを好みます。(永続性を気にしないのなら、なぜとにかく WF を使用しているのですか?ビジュアル デザイン ツールのためだけですか?)

論理的な問題:

すべてのワークフローとそのアクティビティが永続化され、一時停止されている場合、アクティビティ コードはまったく読み込まれません。では、リッスンしているのは誰でしょうか? 回答: ServiceBus キューをリッスンし、ブックマークを再開してワークフローを起動する責任を負うのは、Activity ではなく、別のものでなければなりません。

その何かは、ワークフロー「ホスト」、またはその拡張です。[GUI ボタン​​から] メッセージをリッスンし、ワークフロー アクティビティを起動するようにホストをカスタマイズする方法についてのブログ投稿をいくつか紹介します。

http://blogs.msdn.com/b/tilovell/archive/2011/02/26/wf4-workflow-4-0-hosting-extensions-redux.aspx

http://blogs.msdn.com/b/tilovell/archive/2010/06/08/wf4-workflow-4-0-hosting-extensions.aspx

あなたができることは、このコードを採用し、GUI ボタン​​の代わりに ServiceBus キューをリッスンするように適応させ、PageActivity に類似した独自の ReceiveFromServiceBus アクティビティを起動することです。ちゃんとブックマーク。

すべてかなり面倒です...しかし、私はWFでそれを行う「正しい」方法を信じています。

于 2014-12-06T07:30:12.020 に答える
0

DefaultMessageTimeToLive または TimeToLive プロパティに問題がある可能性があります。

NamespaceManager.CreateSubscription(
        new SubscriptionDescription(TopicName, SubscriptionName)
            {
                LockDuration = TimeSpan.FromMinutes(5),
                DefaultMessageTimeToLive = TimeSpan.FromDays(7),
                EnableDeadLetteringOnMessageExpiration = true
            });
于 2014-01-14T14:28:20.807 に答える
0

キュー エンティティは次の機能を提供します。「メッセージがキューに追加される時間を指定する機能」。

このルールにより、タイムアウト後に受信できない場合がありますか?

5 月の決議は次のとおりです。

インバウンド メッセージの重複の検出。クライアントが同じメッセージを複数回送信しても、悪影響はありません。

于 2014-01-14T14:25:17.463 に答える