0

いくつかの入力トピックを含むカフカ ストリームがあります。これは、kafka ストリームを受け入れるために私が書いたコードです。

conf = SparkConf().setAppName(appname) 
sc = SparkContext(conf=conf) 
ssc = StreamingContext(sc) 
kvs = KafkaUtils.createDirectStream(ssc, topics,\ 
            {"metadata.broker.list": brokers})

次に、元のストリームのキーと値の 2 つの DStream を作成します。

keys = kvs.map(lambda x: x[0].split(" ")) 
values = kvs.map(lambda x: x[1].split(" "))

次に、値 DStream でいくつかの計算を実行します。例えば、

val = values.flatMap(lambda x: x*2)

ここで、キーと val DStream を組み合わせて、結果を Kafka ストリームの形式で返す必要があります。

対応するキーに val を結合する方法は?

4

1 に答える 1

0

join2 つの DStreams で演算子を使用して、それらをマージすることができます。マップを作成すると、本質的に別のストリームが作成されます。したがって、 join はそれらをマージするのに役立ちます。

例えば:

Joined_Stream = keys.join(values).(any operation like map, flatmap...)
于 2016-08-18T20:16:35.990 に答える