2

複数のジェネリック型を受け入れ、実行する作業単位を引数として取るメソッドを作成しようとしています。

作業単位は、それ自体が一般的な共通関数であるという考え方です。例として、次のようなものだとしましょう。

def loadModelRdd[T: TypeTag](sc: SparkContext): RDD[T] = {
  ...
}

loadModelRdd() は、モデル情報のロードなどの内部処理の後に、指定されたタイプの RDD を構築します。

私がハッキングしてきたプロトタイプ メソッドは、次のようなものです (動作していません)。

def forkAll[A : Manifest, B : Manifest](work: => RDD[_]): (RDD[A], RDD[B]) = {
  def aFuture = Future { work } // How can I notify that this work call returns type A?
  def bFuture = Future { work } // How can I notify that this work call returns type B?

  val res = for {
    a <- aFuture
    b <- bFuture
  } yield (a.asInstanceOf[A], b.asInstanceOf[B])

  Await.result(res, 10.seconds)
}

これは私が取り組んでいるコードの短縮版です。実際には 10 もの異なる型を受け入れることを検討しています。

ご覧のとおり、forkAll メソッドの全体的な目標は、Future で作業単位をラップし、タイプごとに作業単位の実行を fork-join し、結果を Tuple された結果として返すことです。消費者ステートメントの例は次のとおりです。

val (a, b) = forkAll[ClassA, ClassB](loadModelRdd)

つまり、この時点で fork-join して結果を待ちたいのですが、実行を並行して実行してからドライバー (具体的には Spark ドライバー) に戻してほしいと考えています。

問題は、Future {} ブロックを構築するときに forkAll 内の作業単位によって返される型を強制する方法がわからないことです。forkAll がない場合、実装は次のようになります。

val resA = loadModelRdd[ClassA](sc)
val resB = loadModelRdd[ClassB](sc)
...

私は2つの理由でこれを行うことを検討しています:

  1. このモデルに一致する作業単位の fork-join の詳細を抽象化します。
  2. 作業単位が何であるかを明示的に示すこのコードのバージョンは、本番環境で機能しており、長時間実行されるブロックの実行を半分近く削減する役割を果たしていました。このパターンを適用できる実行ステップがいくつかあります

これは Scala の型システムで可能なことですか? それとも、この問題を別の視点から見る必要がありますか? 私はいくつかの実装を試しました (ここで説明されている実装を含む) が、問題に対する現在の見解に適合するものを見つけることができませんでした

追加情報が必要な場合はお知らせください。

ありがとう!

4

1 に答える 1