Kafka からのログを処理するために Spark をバッチとして使用しています。各サイクルで、コードは kafka コンシューマーに到達するものをすべて取得する必要があります。ただし、各サイクルで kafka から取得するデータの量に制限を加えたいと考えています。5 GB または 500000 ログ行としましょう..
offsetRanges = []
def storeOffsetRanges(rdd):
global offsetRanges
offsetRanges = rdd.offsetRanges()
WRITE OFFSETS TO DISK
return rdd
while True:
host = "localhost:9092"
offset = OffsetRange(topic, 0, fromOffset, untilOffset)
kafka_content = KafkaUtils.createRDD(sc, {"metadata.broker.list": host}, [offset])
kafka_content.transform(storeOffsetRanges)
RDD TRANSFORMATIONS..
ドライバーに障害が発生した場合に備えて、メモリとディスクにオフセットを保存します。しかし、これらのカフカオフセットを課して、サイクルごとの最大データを制限するにはどうすればよいですか? カフカのオフセット範囲の単位は??
前もって感謝します!