特定のアクティビティを使用して、ワークフローから Service Bus キューとトピックにアクセスしたいと考えています。
このシナリオに適合するものは見つかりませんでした (この MSDN の記事とRoman Kiss によるこの記事) が最も近いものです。
async/await パターンで実装された BeginReceive メソッドを使用して、QueueClient を使用して仲介メッセージを非同期的に受信するカスタム アクティビティを設計したいと思います (それについての私の質問を参照してください)。
まず最初に、希望する方法 (QueueClient を使用) ではなく、提案された方法 (適応された WCF) を好む理由があるかどうかを尋ねたいと思います。
次に、永続化に適した方法で設計するのを手伝っていただければ幸いです。
アップデート:
これは私がこれまでに試したことです:
public class AsyncReceiveBrokeredMessage : AsyncCodeActivity<BrokeredMessage>
{
[RequiredArgument]
public InArgument<string> ConnectionString { get; set; }
[RequiredArgument]
public InArgument<string> Path { get; set; }
protected sealed override IAsyncResult BeginExecute(AsyncCodeActivityContext context, AsyncCallback callback, object state)
{
var connectionString = this.ConnectionString.Get(context);
var path = this.Path.Get(context);
var queueClient = QueueClient.CreateFromConnectionString(connectionString, path);
var cts = new CancellationTokenSource();
context.UserState = new ReceiveState
{
CancellationTokenSource = cts,
QueueClient = queueClient
};
var task = ExecuteAsync(context, cts.Token);
var tcs = new TaskCompletionSource<BrokeredMessage>(state);
task.ContinueWith(
t =>
{
if (t.IsFaulted)
{
tcs.TrySetException(t.Exception.InnerExceptions);
}
else if (t.IsCanceled)
{
tcs.TrySetCanceled();
}
else
{
tcs.TrySetResult(t.Result);
}
if (callback != null)
{
callback(tcs.Task);
}
});
return tcs.Task;
}
protected sealed override BrokeredMessage EndExecute(AsyncCodeActivityContext context, IAsyncResult result)
{
var task = (Task<BrokeredMessage>)result;
try
{
return task.Result;
}
catch (OperationCanceledException)
{
if (context.IsCancellationRequested)
{
context.MarkCanceled();
}
else
{
throw;
}
return null; // or throw?
}
catch (AggregateException exception)
{
if (exception.InnerException is OperationCanceledException)
{
if (context.IsCancellationRequested)
{
context.MarkCanceled();
}
else
{
throw;
}
return null; // or throw?
}
ExceptionDispatchInfo.Capture(exception.InnerException).Throw();
throw;
}
}
protected override void Cancel(AsyncCodeActivityContext context)
{
var state = (ReceiveState)context.UserState;
state.CancellationTokenSource.Cancel();
}
private async Task<BrokeredMessage> ExecuteAsync(
AsyncCodeActivityContext context, CancellationToken cancellationToken)
{
var receiveState = context.UserState as ReceiveState;
var receiveTask = Task<BrokeredMessage>.Factory.FromAsync(
receiveState.QueueClient.BeginReceive, receiveState.QueueClient.EndReceive, null);
var completionTask = receiveTask.ContinueWith(
t =>
{
BrokeredMessage result;
if (t.IsCanceled)
{
context.MarkCanceled();
result = null;
}
else if (t.IsFaulted)
{
result = null;
}
else
{
t.Result.Complete();
result = t.Result;
}
receiveState.QueueClient.Close();
return result;
},
cancellationToken);
return await completionTask;
}
private class ReceiveState
{
public CancellationTokenSource CancellationTokenSource { get; set; }
public QueueClient QueueClient { get; set; }
}
}
そして、この方法でテストしました (ローカル Windows Server Service Bus を使用):
var connectionString = new Variable<string>
{
Default = connectionStringValue
};
var path = new Variable<string>
{
Default = pathValue
};
var test = new While
{
Body =
new Pick
{
Branches =
{
new PickBranch
{
Trigger =
new AsyncReceiveBrokeredMessage
{
ConnectionString = new InArgument<string>(connectionString),
Path = new InArgument<string>(path)
},
Action =
new WriteLine
{
Text =
"Received message"
}
},
new PickBranch
{
Trigger =
new Delay
{
Duration = TimeSpan.FromSeconds(10)
},
Action =
new WriteLine
{
Text =
"Timeout!"
}
}
}
},
Condition = true,
Variables = { connectionString, path }
};
WorkflowInvoker.Invoke(test);
メッセージを継続的に送信すると、期待どおりにメッセージを受信します。最初のタイムアウトの後に問題が発生します。これは、メッセージを受信していないためです。説明をいただければ幸いです。