spark-jobserver
低レイテンシーのコンテキストを使用して集約 Spark ジョブを実行する必要があります。Java クラスから Java メソッドを使用してジョブを実行するためのこの Scala ランナーがあります。
object AggregationRunner extends SparkJob {
def main(args: Array[String]) {
val ctx = new SparkContext("local[4]", "spark-jobs")
val config = ConfigFactory.parseString("")
val results = runJob(ctx, config)
}
override def validate(sc: SparkContext, config: Config): SparkJobValidation = {
SparkJobValid;
}
override def runJob(sc: SparkContext, config: Config): Any = {
val context = new JavaSparkContext(sc)
val aggJob = new ServerAggregationJob()
val id = config.getString("input.string").split(" ")(0)
val field = config.getString("input.string").split(" ")(1)
return aggJob.aggregate(context, id, field)
}
}
ただし、次のエラーが発生します。Javaメソッドで返されたコンテンツを取り出してみましたが、テスト文字列を返すだけですが、まだ機能しません:
{
"status": "ERROR",
"result": {
"message": "Ask timed out on [Actor[akka://JobServer/user/context-supervisor/single-context#1243999360]] after [10000 ms]",
"errorClass": "akka.pattern.AskTimeoutException",
"stack": ["akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333)", "akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117)", "scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694)", "scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691)", "akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467)", "akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419)", "akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423)", "akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375)", "java.lang.Thread.run(Thread.java:745)"]
}
}
文字列を返すだけなので、タイムアウトが発生する理由がよくわかりません。
編集
そのため、JAR を更新する前に作成された Spark コンテキストを使用していたため、問題が発生していることがわかりました。ただし、Spark ジョブ内で JavaSparkContext を使用しようとすると、上記のエラーに戻ります。
エラーを取り除くための恒久的な方法は何でしょうか。
また、ローカルの docker コンテナーで重い Spark ジョブを実行しているという事実が、タイムアウトのもっともらしい理由になるでしょうか。