場所の座標をストリーミングしているデバイスを複製し、データを処理してテキスト ファイルに保存しようとしています。私は Kafka と Spark ストリーミング (pyspark 上) を使用しています。これが私のアーキテクチャです。
1-Kafka プロデューサーは、次の文字列形式で test という名前のトピックにデータを発行します。
"LG float LT float" example : LG 8100.25191107 LT 8406.43141483
プロデューサーコード:
from kafka import KafkaProducer
import random
producer = KafkaProducer(bootstrap_servers='localhost:9092')
for i in range(0,10000):
lg_value = str(random.uniform(5000, 10000))
lt_value = str(random.uniform(5000, 10000))
producer.send('test', 'LG '+lg_value+' LT '+lt_value)
producer.flush()
プロデューサーは正常に動作し、ストリーミングされたデータをコンシューマー (さらにはスパーク) で取得します
2- Spark ストリーミングはこのストリームを受信してpprint()
います。
Spark ストリーミング処理コード
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
ssc = StreamingContext(sc, 1)
kvs = KafkaUtils.createDirectStream(ssc, ["test"], {"bootstrap.servers": "localhost:9092"})
lines = kvs.map(lambda x: x[1])
words = lines.flatMap(lambda line: line.split(" "))
words.pprint()
word_pairs = words.map(lambda word: (word, 1))
counts = word_pairs.reduceByKey(lambda a, b: a+b)
results = counts.foreachRDD(lambda word: word.saveAsTextFile("C:\path\spark_test.txt"))
//I tried this kvs.saveAsTextFiles('C:\path\spark_test.txt')
// to copy all stream and it works fine
ssc.start()
ssc.awaitTermination()
エラーとして私は得る:
16/12/26 00:51:53 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
org.apache.spark.SparkException: Python worker did not connect back in time
そして他の例外。
私が実際に望んでいるのは、各エントリ"LG float LT float"
を JSON 形式でファイルに保存することですが、最初に単に座標をファイルに保存したいのですが、それを実現できないようです。何かアイデアはありますか?
必要に応じて完全なスタック トレースを提供できます