2

Confluent dotnet clientを使ってアプリ起動時にトピックを処理したい。次の例を想定します。

    while (true)
    {
        try
        {
            var cr = c.Consume();
            Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
        }
        catch (ConsumeException e)
        {
            Console.WriteLine($"Error occured: {e.Error.Reason}");
        }
    }

Kafka に新しいメッセージがない場合、c.Consume はブロックされます。アプリケーションの起動 (キャッシュのウォームアップなど) に使用したいので、新しいメッセージがないことがわかったときにコードを続行したいと考えています。

タイムアウトを設定するための過負荷があることは知っていc.Consume(timeout)ますが、このアプローチの問題は、トピックにメッセージがあり、メッセージを読む時間がタイムアウトを超えた場合、望ましくない null 出力を受け取ることです。

4

3 に答える 3

2

OnPartitionEOFパーティションの終わりに達したことを示すイベントを使用できます。

CancellationTokenSource source = new CancellationTokenSource();
bool isContinue = true;

c.OnPartitionEOF += (o, e) =>
    {
        Console.WriteLine($"You have reached end of partition");
        isContinue = false;
        source.Cancel();
    };    
while (isContinue)
{
    try
    {
        var cr = c.Consume(source.Token);
        Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
    }
    catch (ConsumeException e)
    {
        Console.WriteLine($"Error occured: {e.Error.Reason}");
    }
}
于 2019-03-24T09:01:07.383 に答える