1

私の pyspark アプリでは、「飛行中」の Kafka メッセージを変換する方法として Spark ストリーミングを使用するつもりです。このような各メッセージは、最初に特定の Kafka トピックから受信されます。このようなメッセージは、いくつかの変換を行う必要があり (つまり、ある文字列を別の文字列に置き換えます)、変換されたバージョンを別の Kafka トピックに投稿する必要があります。最初の部分 (Kafka メッセージの受信) は正常に動作しているようです。

from pyspark import SparkConf, SparkContext

from operator import add
import sys
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
## Constants
APP_NAME = "PythonStreamingDirectKafkaWordCount"
##OTHER FUNCTIONS/CLASSES

def main():
    sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
    ssc = StreamingContext(sc, 2)

    brokers, topic = sys.argv[1:]
    kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
    ...

    ssc.start()
    ssc.awaitTermination()
if __name__ == "__main__":

   main()

何か (たとえば、文字列) を別の Kafka トピックに配置するための適切な構文は何ですか? そのようなメソッドは KafkaUtils によって提供される必要がありますか、それとも他の方法で利用できるようになりますか?

4

2 に答える 2