14

私は頭を包み込むことができない繊細なスパークの問題を抱えています。

2 つの RDD があります (Cassandra から来ています)。RDD1 にはデータが含まれActions、RDD2 にはHistoricデータが含まれます。両方とも、一致/結合できる ID を持っています。しかし問題は、2 つのテーブルが N:N の関係にあることです。Actionsには同じ ID を持つ複数の行が含まれていHistoricます。両方のテーブルの日付の例を次に示します。

Actions 時間は実際にはタイムスタンプです

id  |  time  | valueX
1   |  12:05 | 500
1   |  12:30 | 500
2   |  12:30 | 125

Historic set_at は実際にはタイムスタンプです

id  |  set_at| valueY
1   |  11:00 | 400
1   |  12:15 | 450
2   |  12:20 | 50
2   |  12:25 | 75

このような結果が得られるように、これら 2 つのテーブルをどのように結合できますか?

1   |  100  # 500 - 400 for Actions#1 with time 12:05 because Historic was in that time at 400
1   |  50   # 500 - 450 for Actions#2 with time 12:30 because H. was in that time at 450
2   |  50   # 125 - 75  for Actions#3 with time 12:30 because H. was in that time at 75

巨大なデータセットに対して多くの反復を行わなければ、適切な解決策を思いつくことはできません。私は常にセットから範囲を作成することを考えなければならず、計算を行うために(11:00 - 12:15)などの範囲Historicに収まるかどうかを何らかの方法で確認する必要があります。Actionsしかし、それは私にはかなり遅いようです。それを行うより効率的な方法はありますか?この種の問題は人気があるように思えますが、これに関するヒントはまだ見つかりませんでした。スパークでこの問題をどのように解決しますか?

これまでの私の現在の試み(途中で完了したコード)

case class Historic(id: String, set_at: Long, valueY: Int)
val historicRDD = sc.cassandraTable[Historic](...)

historicRDD
.map( row => ( row.id, row ) )
.reduceByKey(...) 
// transforming to another case which results in something like this; code not finished yet
// (List((Range(0, 12:25), 400), (Range(12:25, NOW), 450)))

// From here we could join with Actions
// And then some .filter maybe to select the right Lists tuple
4

3 に答える 3

4

興味深い問題です。私はまた、アプローチを考え出すのに時間を費やしました。これは私が思いついたものです:

Action(id, time, x)およびのケース クラスが与えられた場合Historic(id, time, y)

  • アクションをヒストリーに結合 (これは重いかもしれません)
  • 特定のアクションに関係のないすべての履歴データをフィルタリングする
  • (id,time) による結果のキー - 異なる時間で同じキーを区別する
  • アクションごとの履歴を最大値まで減らし、特定のアクションに関連する履歴レコードを残します

スパークの場合:

val actionById = actions.keyBy(_.id)
val historyById = historic.keyBy(_.id)
val actionByHistory = actionById.join(historyById)
val filteredActionByidTime = actionByHistory.collect{ case (k,(action,historic)) if (action.time>historic.t) => ((action.id, action.time),(action,historic))}
val topHistoricByAction = filteredActionByidTime.reduceByKey{ case ((a1:Action,h1:Historic),(a2:Action, h2:Historic)) =>  (a1, if (h1.t>h2.t) h1 else h2)}

// we are done, let's produce a report now
val report = topHistoricByAction.map{case ((id,time),(action,historic)) => (id,time,action.X -historic.y)}

上記のデータを使用すると、レポートは次のようになります。

report.collect
Array[(Int, Long, Int)] = Array((1,43500,100), (1,45000,50), (2,45000,50))

(タイムスタンプを単純化するために時間を秒に変換しました)

于 2014-11-27T00:52:57.877 に答える
0

数時間考え、試み、失敗した後、私はこの解決策を思いつきました。それが良いかどうかはわかりませんが、他に選択肢がないため、これが私の解決策です。

まず、case class Historic

case class Historic(id: String, set_at: Long, valueY: Int) {
  val set_at_map = new java.util.TreeMap[Long, Int]() // as it seems Scala doesn't provides something like this with similar operations we'll need a few lines later
  set_at_map.put(0, valueY) // Means from the beginning of Epoch ...
  set_at_map.put(set_at, valueY) // .. to the set_at date

  // This is the fun part. With .getHistoricValue we can pass any timestamp and we will get the a value of the key back that contains the passed date. For more information look at this answer: http://stackoverflow.com/a/13400317/1209327
  def getHistoricValue(date: Long) : Option[Int] = {
    var e = set_at_map.floorEntry(date)                                   
    if (e != null && e.getValue == null) {                                  
      e = set_at_map.lowerEntry(date)                                     
    }                                                                         
    if ( e == null ) None else e.getValue()
  }
}

ケースクラスの準備ができたので、それを実行に移します

val historicRDD = sc.cassandraTable[Historic](...)
  .map( row => ( row.id, row ) )
  .reduceByKey( (row1, row2) =>  {
    row1.set_at_map.put(row2.set_at, row2.valueY) // we add the historic Events up to each id
    row1
  })

// Now we load the Actions and map it by id as we did with Historic
val actionsRDD = sc.cassandraTable[Actions](...)
  .map( row => ( row.id, row ) )

// Now both RDDs have the same key and we can join them
val fin = actionsRDD.join(historicRDD)
  .map( row => {
    ( row._1.id, 
      (
        row._2._1.id, 
        row._2._1.valueX - row._2._2.getHistoricValue(row._2._1.time).get // returns valueY for that timestamp
      )
    )
  })

私は Scala をまったく初めて使用するので、このコードを改善できる箇所があれば教えてください。

于 2014-11-26T15:45:39.540 に答える
0

この質問に回答があったことは知っていますが、私のために働いた別の解決策を追加したいと思います -

あなたのデータ -

Actions 
id  |  time  | valueX
1   |  12:05 | 500
1   |  12:30 | 500
2   |  12:30 | 125

Historic 
id  |  set_at| valueY
1   |  11:00 | 400
1   |  12:15 | 450
2   |  12:20 | 50
2   |  12:25 | 75
  1. ユニオンActionsHistoric
    組み合わせた
    ID | 時間 | 値 X | レコードタイプ
    1 | 12:05 | 500 | アクション
    1 | 12:30 | 500 | アクション
    2 | 12:30 | 125 | アクション
    1 | 11:00 | 400 | ヒストリック
    1 | 12:15 | 450 | ヒストリック
    2 | 12:20 | 50 | ヒストリック
    2 | 12:25 | 75 | ヒストリック
  1. カスタム パーティショナーを作成し、 repartitionAndSortWithinPartitionsを使用して でパーティション分割しidますが、 で並べ替えますtime

    パーティション-1
    1 | 11:00 | 400 | ヒストリック
    1 | 12:05 | 500 | アクション
    1 | 12:15 | 450 | ヒストリック
    1 | 12:30 | 500 | アクション
    パーティション-2
    2 | 12:20 | 50 | ヒストリック
    2 | 12:25 | 75 | ヒストリック
    2 | 12:30 | 125 | アクション
    

  2. パーティションごとにレコードをトラバースします。

レコードの場合は、マップに追加するか、既にその ID がある場合はマップを更新します。パーティションごとにマップを使用して、Historical最新のものを追跡します。valueYid

Actionレコードの場合はvalueY、マップから値を取得してから減算しますvalueX

地図M

Partition-1 traversal in order M={ 1 -> 400} // A new entry in map M 1 | 100 // M(1) = 400; 500-400 M={1 -> 450} // update M, because key already exists 1 | 50 // M(1) Partition-2 traversal in order M={ 2 -> 50} // A new entry in M M={ 2 -> 75} // update M, because key already exists 2 | 50 // M(2) = 75; 125-75

でパーティション分割して並べ替えようとすることもできtimeますが、後でパーティションをマージする必要があります。そして、それはいくつかの複雑さを増す可能性があります.

これは、時間範囲を使用して参加するときに通常得られる多対多の結合よりも好ましいことがわかりました。

于 2017-03-25T08:26:40.383 に答える