1

Scala と Flink 1.0-SNAPSHOT を使用して DataSet で leftOuterJoin を実行すると、次の例外が発生します。

    11:54:15,921 INFO  org.apache.flink.runtime.taskmanager.Task - CHAIN DataSource (at com.mycompany.FlinkTest$.main(FlinkTest.scala:99) (org.apache.flink.api.java.io.CollectionInputFormat)) -> Map (Map at select('date as 'date,'dsCode as 'dsCode,'datatype as 'datatype,'quote as 'quote,'name as 'name)) (1/1) switched to FAILED with exception.
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply$mcV$sp(JobManager.scala:563)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:509)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$5.apply(JobManager.scala:509)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.Exception: Call to registerInputOutput() of invokable failed
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:529)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalStateException: unread block data
    at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2431)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1383)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:294)
    at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:255)
    at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:282)
    at org.apache.flink.runtime.operators.DataSourceTask.initInputFormat(DataSourceTask.java:241)
    at org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:81)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:526)

DataSet の型として単純な Scala ケース クラスを使用します。

case class RawValue(date: String, dsCode: String, datatype: String, quote: Double, name: String)

ケース クラスのインスタンスを生成するには、次のメソッドを使用します。

def getRawValuesFromZipFile(fileName: String) : Array[RawValue]

環境を初期化し、次の方法で DataSet[RawValue] を作成します。

val env = ExecutionEnvironment.createLocalEnvironment(4)
val rawValues = env.fromCollection(getRawValuesFromZipFile("filename.zip"))
rawValues.print

シリアル化の問題がエラーの原因であると思われます。プロジェクトをコンパイルするために Scala 2.10.5 および Java 7 システム ライブラリを使用しています。私は Eclipse を使用しています。プロジェクトは、サンプル プロジェクト生成スクリプトによって生成されました。

問題を解決するための助けやヒントをいただければ幸いです:-) ありがとう、ダニエル

4

1 に答える 1

3

このenv.fromCollection()呼び出しは、ユース ケースにはあまり適していない可能性があります。データはジョブと一緒に出荷されるため、データが大きくなると壊れます。ワーカー ノードでデータが並列に読み取られることはありません。

https://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html#read-compressed-filesを見て、それがあなたのケースで機能するかどうかを確認してください。gzip のみをサポートしていますが、代わりにその形式でデータを圧縮することもできます。

于 2015-10-30T13:57:16.673 に答える