Kafka と Spark-Streaming の間に問題があります。本番環境で低レベルのトラフィック (1 秒あたり約 12000 ~ 15000 レコード) のサービスを使用しています。最初は、トラフィックの消費は正常に見えますが、10 ~ 15 分後、突然ほぼ1/10残りを消費する速度。ネットワークのトラフィックの問題でしょうか?
Kafka の構成:
num.network.threads=2
num.io.threads=8
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
log.flush.interval .messages=10000
log.flush.interval.ms=1000
log.retention.hours=12
log.segment.bytes=536870912
log.retention.check.interval.ms=60000
log.cleaner.enable=false
log.cleanup.interval .分=1
スパーク ストリーミングの構成 (コンシューマー):
....
val kafkaParams = Map(
"zookeeper.connect" -> zkQuorum,
"group.id" -> group,
"zookeeper.connection.timeout.ms" -> "1000000",
"zookeeper.sync.time.ms" -> "200",
"fetch.message.max.bytes" -> "2097152000",
"queued.max.message.chunks" -> "1000",
"auto.commit.enable" -> "true",
"auto.commit.interval.ms" -> "1000")
try {
KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topics.map((_, partition)).toMap,
StorageLevel.MEMORY_ONLY).map {
case (key, value) => convertTo(key, value)
}.filter {
_ != null
}.foreachRDD(line => saveToHBase(line, INPUT_TABLE))
//}.foreachRDD(line => logger.info("handling testing....."+ line))
} catch {
case e: Exception => logger.error("consumerEx: " + e.printStackTrace)
}