1

有限数のメッセージを Kafka に送信してから終了するアプリケーションがあります。何らかの理由で、プロデューサーを閉じても Kafka 接続が維持されます。私の実装(Scalaで)は多かれ少なかれ

object Kafka {
  private val props = new Properties()

  props.put("compression.codec", DefaultCompressionCodec.codec.toString)
  props.put("producer.type", "sync")
  props.put("metadata.broker.list", "localhost:9092")
  props.put("batch.num.messages", "200")
  props.put("message.send.max.retries", "3")
  props.put("request.required.acks", "-1")
  props.put("client.id", "myclient")

  private val producer = new Producer[Array[Byte], Array[Byte]](new ProducerConfig(props))
  private def encode(msg: Message) = new KeyedMessage("topic", msg.id.getBytes, write(msg).getBytes)

  def send(msg: Message) = Try(producer.send(encode(msg)))
  def close() = producer.close()
}

これMessageは単純なケース クラスであり、それをバイト配列に変換する方法は実際には関係ありません。

メッセージは届きますが、最終的に を呼び出すKafka.close()と、アプリケーションは終了せず、接続が解放されていないようです。

Kafka に明示的に接続を終了するように依頼する方法はありますか?

4

1 に答える 1

1

def close() = producer.close()

これにより、 Producer.close() を呼び出す「close」という関数が作成されます。コードが実際にプロデューサーを閉じるという証拠はありません。

あなたはただ呼び出す必要があります: producer.close

于 2014-11-06T14:49:47.593 に答える