Worker ロールからイベント ハブを処理する方法を簡略化したバージョンを次に示します。インスタンスを mainWorker ロールに保持し、IEventProcessor を呼び出して処理を開始します。
このようにして、ワーカーがシャットダウン イベントなどに応答したときに、それを呼び出して閉じることができます。
編集:
並列処理に関しては、IEventProcessor クラスは、現在のイベントの処理が終了したときに、イベント ハブからさらに 10 個のイベントを取得します。豪華なパーティションのリースをすべて処理します。
これは同期ワークフローです。複数のワーカー ロールにスケーリングすると、パーティションがインスタンス間で分割され、高速化するなどの現象が見られます。イベント ハブを別の方法で処理する場合は、独自のソリューションを展開する必要があります。 .
public class WorkerRole : RoleEntryPoint
{
private readonly CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource();
private readonly ManualResetEvent _runCompleteEvent = new ManualResetEvent(false);
private EventProcessorHost _eventProcessorHost;
public override bool OnStart()
{
ThreadPool.SetMaxThreads(4096, 2048);
ServicePointManager.DefaultConnectionLimit = 500;
ServicePointManager.UseNagleAlgorithm = false;
ServicePointManager.Expect100Continue = false;
var eventClient = EventHubClient.CreateFromConnectionString("consumersConnectionString",
"eventHubName");
_eventProcessorHost = new EventProcessorHost(Dns.GetHostName(), eventClient.Path,
eventClient.GetDefaultConsumerGroup().GroupName,
"consumersConnectionString", "blobLeaseConnectionString");
return base.OnStart();
}
public override void Run()
{
try
{
RunAsync(this._cancellationTokenSource.Token).Wait();
}
finally
{
_runCompleteEvent.Set();
}
}
private async Task RunAsync(CancellationToken cancellationToken)
{
// starts processing here
await _eventProcessorHost.RegisterEventProcessorAsync<EventProcessor>();
while (!cancellationToken.IsCancellationRequested)
{
await Task.Delay(TimeSpan.FromMinutes(1));
}
}
public override void OnStop()
{
_eventProcessorHost.UnregisterEventProcessorAsync().Wait();
_cancellationTokenSource.Cancel();
_runCompleteEvent.WaitOne();
base.OnStop();
}
}
特定のパーティション用に複数のプロセッサがあります (この方法で FIFO を保証できます) が、独自のロジックを簡単に実装できます。つまり、私の例では EventDataProcessor クラスとディクショナリ ルックアップの使用をスキップし、ProcessEventsAsync メソッド内にいくつかのロジックを実装するだけです。 .
public class EventProcessor : IEventProcessor
{
private readonly Dictionary<string, IEventDataProcessor> _eventDataProcessors;
public EventProcessor()
{
_eventDataProcessors = new Dictionary<string, IEventDataProcessor>
{
{"A", new EventDataProcessorA()},
{"B", new EventDataProcessorB()},
{"C", new EventDataProcessorC()}
}
}
public Task OpenAsync(PartitionContext context)
{
return Task.FromResult<object>(null);
}
public async Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
{
foreach(EventData eventData in messages)
{
// implement your own logic here, you could just process the data here, just remember that they will all be from the same partition in this block
try
{
IEventDataProcessor eventDataProcessor;
if(_eventDataProcessors.TryGetValue(eventData.PartitionKey, out eventDataProcessor))
{
await eventDataProcessor.ProcessMessage(eventData);
}
}
catch (Exception ex)
{
_//log exception
}
}
await context.CheckpointAsync();
}
public async Task CloseAsync(PartitionContext context, CloseReason reason)
{
if (reason == CloseReason.Shutdown)
await context.CheckpointAsync();
}
}
EventDataProcessors の 1 つの例
public interface IEventDataProcessor
{
Task ProcessMessage(EventData eventData);
}
public class EventDataProcessorA : IEventDataProcessor
{
public async Task ProcessMessage(EventData eventData)
{
// Do Something specific with data from Partition "A"
}
}
public class EventDataProcessorB : IEventDataProcessor
{
public async Task ProcessMessage(EventData eventData)
{
// Do Something specific with data from Partition "B"
}
}
これが役に立てば幸いです。これまでのところ、私たちにとっては堅実であり、複数のインスタンスに簡単にスケーリングできます