9

kafka-net プラグインを備えた Red Hat VM で kafka 0.8.1.1 を使用しています。カフカからの以前のメッセージの受信を停止するようにコンシューマーを構成するにはどうすればよいですか?

私の消費者コード:

var options = new KafkaOptions(new Uri("tcp://199.53.249.150:9092"), new Uri("tcp://199.53.249.151:9092"));

Stopwatch sp = new Stopwatch();
var router = new BrokerRouter(options);
var consumer = new Consumer(new ConsumerOptions("Test", router));

ThreadStart start2 = () =>
{
    while (true)
    {
        sp.Start();
        foreach (var message in consumer.Consume())
        {
            if (MessageDecoderReceiver.MessageBase(message.Value) != null)
            {
                PrintMessage(MessageDecoderReceiver.MessageBase(message.Value).ToString());
            }
            else
            {
                Console.WriteLine(message.Value);
            }
        }
        sp.Stop();
    }
};
var thread2 = new Thread(start2);
thread2.Start();
4

1 に答える 1

12

Kafka-net の Consumer は現在、消費されているオフセットを自動追跡しません。オフセット トラッキングを手動で実装する必要があります。

オフセットをkafkaバージョン0.8.1に保存するには:

 var commit = new OffsetCommitRequest
            {
                ConsumerGroup = consumerGroup,
                OffsetCommits = new List<OffsetCommit>
                            {
                                new OffsetCommit
                                    {
                                        PartitionId = partitionId,
                                        Topic = IntegrationConfig.IntegrationTopic,
                                        Offset = offset,
                                        Metadata = metadata
                                    }
                            }
            };

var commitResponse = conn.Connection.SendAsync(commit).Result.FirstOrDefault();

特定のオフセット ポイントでインポートを開始するようにコンシューマを設定するには:

var offsets = consumer.GetTopicOffsetAsync(IntegrationConfig.IntegrationTopic).Result
                    .Select(x => new OffsetPosition(x.PartitionId, x.Offsets.Max())).ToArray();

var consumer = new Consumer(new ConsumerOptions(IntegrationConfig.IntegrationTopic, router), offsets);

上記のコードは、コンシューマーがログの最後で消費を開始するように設定し、事実上、新しいメッセージのみを受信することに注意してください。

于 2015-04-08T18:36:03.690 に答える