2

Worker ロールで EventProcessorHost を使用する方法についてのガイダンスを期待していました。基本的に、EventProcessorHost がパーティションを並行して処理することを望んでいます。ワーカー ロール内でこのタイプのコードをどこに配置すればよいか、何かキーが不足していないかどうか疑問に思っています。

    var manager = NamespaceManager.CreateFromConnectionString(connectionString);
    var desc = manager.CreateEventHubIfNotExistsAsync(path).Result;
    var client = Microsoft.ServiceBus.Messaging.EventHubClient.CreateFromConnectionString(connectionString, path);
    var host = new EventProcessorHost(hostname, path, consumerGroup, connectionString, blobStorageConnectionString);
    EventHubProcessorFactory<EventData> factory = new EventHubProcessorFactory<EventData>();
    host.RegisterEventProcessorFactoryAsync(factory);

私が読んだことはすべて、EventProcessorHost が独自にパーティションを分割すると言っていますが、上記のコードはすべてのパーティションを非同期的に処理するのに十分ですか?

4

1 に答える 1

3

Worker ロールからイベント ハブを処理する方法を簡略化したバージョンを次に示します。インスタンスを mainWorker ロールに保持し、IEventProcessor を呼び出して処理を開始します。

このようにして、ワーカーがシャットダウン イベントなどに応答したときに、それを呼び出して閉じることができます。

編集:

並列処理に関しては、IEventProcessor クラスは、現在のイベントの処理が終了したときに、イベント ハブからさらに 10 個のイベントを取得します。豪華なパーティションのリースをすべて処理します。

これは同期ワークフローです。複数のワーカー ロールにスケーリングすると、パーティションがインスタンス間で分割され、高速化するなどの現象が見られます。イベント ハブを別の方法で処理する場合は、独自のソリューションを展開する必要があります。 .

public class WorkerRole : RoleEntryPoint
{
    private readonly CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource();
    private readonly ManualResetEvent _runCompleteEvent = new ManualResetEvent(false);
    private EventProcessorHost _eventProcessorHost;

     public override bool OnStart()
    {
        ThreadPool.SetMaxThreads(4096, 2048);
        ServicePointManager.DefaultConnectionLimit = 500;
        ServicePointManager.UseNagleAlgorithm = false;
        ServicePointManager.Expect100Continue = false;

        var eventClient = EventHubClient.CreateFromConnectionString("consumersConnectionString",
                            "eventHubName");
        _eventProcessorHost = new EventProcessorHost(Dns.GetHostName(), eventClient.Path, 
               eventClient.GetDefaultConsumerGroup().GroupName,
               "consumersConnectionString", "blobLeaseConnectionString");
        return base.OnStart();
    }

    public override void Run()
    {
        try
        {
            RunAsync(this._cancellationTokenSource.Token).Wait();
        }
        finally
        {
            _runCompleteEvent.Set();
        }
    }

    private async Task RunAsync(CancellationToken cancellationToken)
    {
        // starts processing here
        await _eventProcessorHost.RegisterEventProcessorAsync<EventProcessor>();
        while (!cancellationToken.IsCancellationRequested)
        {
            await Task.Delay(TimeSpan.FromMinutes(1));
        }
    }

    public override void OnStop()
    {
        _eventProcessorHost.UnregisterEventProcessorAsync().Wait();
        _cancellationTokenSource.Cancel();
        _runCompleteEvent.WaitOne();
        base.OnStop();
    }
}

特定のパーティション用に複数のプロセッサがあります (この方法で FIFO を保証できます) が、独自のロジックを簡単に実装できます。つまり、私の例では EventDataProcessor クラスとディクショナリ ルックアップの使用をスキップし、ProcessEventsAsync メソッド内にいくつかのロジックを実装するだけです。 .

public class EventProcessor : IEventProcessor
{
    private readonly Dictionary<string, IEventDataProcessor> _eventDataProcessors; 

    public EventProcessor()
    {
        _eventDataProcessors = new Dictionary<string, IEventDataProcessor>
        {
            {"A", new EventDataProcessorA()},
            {"B", new EventDataProcessorB()},
            {"C", new EventDataProcessorC()}
        }
    }

    public Task OpenAsync(PartitionContext context)
    {
        return Task.FromResult<object>(null);
    }

    public async Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
    {
        foreach(EventData eventData in messages)
        {
            // implement your own logic here, you could just process the data here, just remember that they will all be from the same partition in this block
            try
            {
                IEventDataProcessor eventDataProcessor;
                if(_eventDataProcessors.TryGetValue(eventData.PartitionKey, out eventDataProcessor))
                {
                    await eventDataProcessor.ProcessMessage(eventData);
                }
            }
            catch (Exception ex)
            {
                _//log exception
            }
        }
        await context.CheckpointAsync();
    }

    public async Task CloseAsync(PartitionContext context, CloseReason reason)
    {
        if (reason == CloseReason.Shutdown)
            await context.CheckpointAsync();
    }
}

EventDataProcessors の 1 つの例

public interface IEventDataProcessor
{
    Task ProcessMessage(EventData eventData);
}

public class EventDataProcessorA : IEventDataProcessor
{
    public async Task ProcessMessage(EventData eventData)
    {
        // Do Something specific with data from Partition "A"
    }
}

public class EventDataProcessorB : IEventDataProcessor
{
    public async Task ProcessMessage(EventData eventData)
    {
        // Do Something specific with data from Partition "B"
    }
}

これが役に立てば幸いです。これまでのところ、私たちにとっては堅実であり、複数のインスタンスに簡単にスケーリングできます

于 2015-06-01T19:05:50.160 に答える