私は頭を包み込むことができない繊細なスパークの問題を抱えています。
2 つの RDD があります (Cassandra から来ています)。RDD1 にはデータが含まれActions
、RDD2 にはHistoric
データが含まれます。両方とも、一致/結合できる ID を持っています。しかし問題は、2 つのテーブルが N:N の関係にあることです。Actions
には同じ ID を持つ複数の行が含まれていHistoric
ます。両方のテーブルの日付の例を次に示します。
Actions
時間は実際にはタイムスタンプです
id | time | valueX
1 | 12:05 | 500
1 | 12:30 | 500
2 | 12:30 | 125
Historic
set_at は実際にはタイムスタンプです
id | set_at| valueY
1 | 11:00 | 400
1 | 12:15 | 450
2 | 12:20 | 50
2 | 12:25 | 75
このような結果が得られるように、これら 2 つのテーブルをどのように結合できますか?
1 | 100 # 500 - 400 for Actions#1 with time 12:05 because Historic was in that time at 400
1 | 50 # 500 - 450 for Actions#2 with time 12:30 because H. was in that time at 450
2 | 50 # 125 - 75 for Actions#3 with time 12:30 because H. was in that time at 75
巨大なデータセットに対して多くの反復を行わなければ、適切な解決策を思いつくことはできません。私は常にセットから範囲を作成することを考えなければならず、計算を行うために(11:00 - 12:15)などの範囲Historic
に収まるかどうかを何らかの方法で確認する必要があります。Actions
しかし、それは私にはかなり遅いようです。それを行うより効率的な方法はありますか?この種の問題は人気があるように思えますが、これに関するヒントはまだ見つかりませんでした。スパークでこの問題をどのように解決しますか?
これまでの私の現在の試み(途中で完了したコード)
case class Historic(id: String, set_at: Long, valueY: Int)
val historicRDD = sc.cassandraTable[Historic](...)
historicRDD
.map( row => ( row.id, row ) )
.reduceByKey(...)
// transforming to another case which results in something like this; code not finished yet
// (List((Range(0, 12:25), 400), (Range(12:25, NOW), 450)))
// From here we could join with Actions
// And then some .filter maybe to select the right Lists tuple