いくつかの入力トピックを含むカフカ ストリームがあります。これは、kafka ストリームを受け入れるために私が書いたコードです。
conf = SparkConf().setAppName(appname)
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc)
kvs = KafkaUtils.createDirectStream(ssc, topics,\
{"metadata.broker.list": brokers})
次に、元のストリームのキーと値の 2 つの DStream を作成します。
keys = kvs.map(lambda x: x[0].split(" "))
values = kvs.map(lambda x: x[1].split(" "))
次に、値 DStream でいくつかの計算を実行します。例えば、
val = values.flatMap(lambda x: x*2)
ここで、キーと val DStream を組み合わせて、結果を Kafka ストリームの形式で返す必要があります。
対応するキーに val を結合する方法は?