次のコードで問題に直面しました。
public IActionResult Consume(string topic)
{
try
{
var config = new ConsumerConfig
{
BootstrapServers = "kafka-01:9092,kafka-02:9092,kafka-03:9092",
SecurityProtocol = SecurityProtocol.SaslPlaintext,
SaslMechanism = SaslMechanism.ScramSha256,
SaslUsername = "my-consumer",
SaslPassword = "pass",
GroupId = "my-test-group"
};
using IConsumer<Null, string> consumer = new ConsumerBuilder<Null, string>(config).Build();
consumer.Subscribe(topic);
var cts = new CancellationTokenSource();
cts.CancelAfter(TimeSpan.FromSeconds(60));
ConsumeResult<Null, string> result = consumer.Consume(cts.Token);
consumer.Close();
return Ok(result.Message.Value);
}
catch (OperationCanceledException cancelEx)
{
return NoContent();
}
catch (ConsumeException consumeEx)
{
return BadRequest(consumeEx.ToString());
}
}
しかし、消費者は機能しません!デバッグでわかるように、cancelationToken の有効期限が切れるまでフリーズし、OperationCancelledException をスローし、結果を受け取りません。
プロデューサーは動作しているようで、フリーズしたりエラーを返したりしません (同様の構成を使用)。
なぜ機能しないのですか?