DStream と RDD の間で結合を実行するために戦ってきました。シーンを設定するには:
- スパーク - 2.3.1
- パイソン-3.6.3
RDD
CSV ファイルから RDD を読み込んで、レコードを分割し、ペアの RDD を作成しています。
sku_prices = sc.textFile("sku-catalog.csv")\
.map(lambda line: line.split(","))\
.map(lambda fields: (fields[0], float(fields[1])))
これは からの出力ですsku_prices.collect()
:
[('0003003001', 19.25),
('0001017002', 2.25),
('0001017003', 3.5),
('0003013001', 18.75),
('0004017002', 16.5),
('0002008001', 2.25),
('0004002001', 10.75),
('0005020002', 10.5),
('0001004002', 3.5),
('0002016003', 14.25)]
Dストリーム
Kafka から DStream を読んでいます。
orders = kstream.map(lambda n: n[1]).map(lambda n: json.loads(n))
items = orders.map(lambda order: order['items'])\
.flatMap(lambda items: [(i['sku'], i['count']) for i in items])\
.reduceByKey(lambda x, y: x + y)
実行pprint()
するorders
と、次のような出力が得られます。
-------------------------------------------
Time: 2018-09-03 06:57:20
-------------------------------------------
('0004002001', 3)
('0002016003', 1)
('0003013001', 1)
加入
今、私はitems
DStream をsku_prices
RDD に参加させたいと思っています。その結合を直接行うことはできないことはわかっていますが、私の読書ではtransform()
、DStream のメソッドを使用してジョブを実行できることが示唆されています。これは私が持っているものです:
items.transform(lambda rdd: rdd.join(sku_prices)).pprint()
次のような DStream を取得することを期待しています。
-------------------------------------------
Time: 2018-09-03 06:57:20
-------------------------------------------
('0004002001', (3, 10.75))
('0002016003', (1, 14.25))
('0003013001', (1, 18.75))
Sparkのドキュメントでは、これが機能するはずであり、実際に機能することが示唆されています。その結果はまさに私が得たものです! :)
チェックポイント
ただし、ステートフルな操作も行いたいので、チェックポイントを導入する必要があります。
ssc.checkpoint("checkpoint")
チェックポイントを追加するだけで、次のエラーが発生しますtransform()
。
RDD をブロードキャストしようとしているか、アクションまたは変換から RDD を参照しようとしているようです。RDD 変換とアクションは、他の変換内ではなく、ドライバーによってのみ呼び出すことができます。たとえば、rdd1.map(lambda x: rdd2.values.count() * x) は無効です。これは、値の変換とカウント アクションを rdd1.map 変換内で実行できないためです。
このスレッドの回答は、チェックポイントと外部 RDD が混在しないことを示唆しています。これを回避する方法はありますか?StreamingContext でチェックポイントが有効になっている場合、DStream と RDD を結合することは可能ですか?
ありがとう、アンドリュー。