0

Kafka からのログを処理するために Spark をバッチとして使用しています。各サイクルで、コードは kafka コンシューマーに到達するものをすべて取得する必要があります。ただし、各サイクルで kafka から取得するデータの量に制限を加えたいと考えています。5 GB または 500000 ログ行としましょう..

offsetRanges = []
def storeOffsetRanges(rdd):
    global offsetRanges
    offsetRanges = rdd.offsetRanges()
    WRITE OFFSETS TO DISK
    return rdd

while True:
    host = "localhost:9092"
    offset = OffsetRange(topic, 0, fromOffset, untilOffset)
    kafka_content = KafkaUtils.createRDD(sc, {"metadata.broker.list": host}, [offset])
    kafka_content.transform(storeOffsetRanges)
    RDD TRANSFORMATIONS..

ドライバーに障害が発生した場合に備えて、メモリとディスクにオフセットを保存します。しかし、これらのカフカオフセットを課して、サイクルごとの最大データを制限するにはどうすればよいですか? カフカのオフセット範囲の単位は??

前もって感謝します!

4

1 に答える 1