0

Hadoop マップ/リデュースに渡すために、Spring バッチ ジョブでシーケンス ファイルを生成しようとしています。ファイルをhdfsに手動でコピーすることで、ジョブを一度機能させることができました。そして、ローカル システム テストで実行すると、ローカル ファイルシステムがファイルを検出するため、正常に実行されます。しかし、リモートの Hadoop インスタンスにデプロイしようとすると、次の例外が発生します。

org.apache.hadoop.mapreduce.lib.input.InvalidInputException: 入力パスが存在しません: hdfs://ngram-test:9000/user/hduser/DocumentsPTOgrants2007_2011.seq
    org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:224) で
    org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.listStatus (SequenceFileInputFormat.java:55) で
    org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits (FileInputFormat.java:241) で
    org.apache.hadoop.mapred.JobClient.writeNewSplits (JobClient.java:885) で
    org.apache.hadoop.mapred.JobClient.submitJobInternal (JobClient.java:779) で
    org.apache.hadoop.mapreduce.Job.submit (Job.java:432) で
    org.apache.hadoop.mapreduce.Job.waitForCompletion (Job.java:447) で
    com.atsid.hadoop.jobs.AbstractJobRunner.executeJob (AbstractJobRunner.java:70) で
    com.atsid.hadoop.jobs.AbstractJobRunner.run (AbstractJobRunner.java:36) で
    org.apache.hadoop.util.ToolRunner.run (ToolRunner.java:65) で
    org.apache.hadoop.util.ToolRunner.run (ToolRunner.java:79) で
    com.atsid.cloudbase.ngram.ingest.mapreduce.NGramIngestJobRunner.main (NGramIngestJobRunner.java:34) で
    sun.reflect.NativeMethodAccessorImpl.invoke0(ネイティブメソッド)
    sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) で
    sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) で
    java.lang.reflect.Method.invoke(Method.java:597) で
    org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:191) で
    org.springframework.data.hadoop.mapreduce.JarExecutor.invokeTargetObject(JarExecutor.java:71) で
    org.springframework.data.hadoop.mapreduce.HadoopCodeExecutor.invokeTarget(HadoopCodeExecutor.java:185) で
    org.springframework.data.hadoop.mapreduce.HadoopCodeExecutor.runCode(HadoopCodeExecutor.java:102) で
    org.springframework.data.hadoop.mapreduce.JarTasklet.execute(JarTasklet.java:32) で
    sun.reflect.NativeMethodAccessorImpl.invoke0(ネイティブメソッド)
    sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) で
    sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) で
    java.lang.reflect.Method.invoke(Method.java:597) で
    org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317) で
    org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:183) で
    org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:150) で
    org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:132) で
    org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:120) で
    org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172) で
    org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:204) で
    com.sun.proxy.$Proxy43.execute (不明なソース) で
    org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:386) で
    org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:131) で
    org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:264) で
    org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:76) で
    org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:367) で
    org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:214) で
    org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:143) で
    org.springframework.batch.core.step.tasklet.TaskletStep.doExecute(TaskletStep.java:250)で
    org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:195) で
    org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:135) で
    org.springframework.batch.core.job.flow.JobFlowExecutor.executeStep(JobFlowExecutor.java:61) で
    org.springframework.batch.core.job.flow.support.state.StepState.handle(StepState.java:60) で
    org.springframework.batch.core.job.flow.support.SimpleFlow.resume(SimpleFlow.java:144) で
    org.springframework.batch.core.job.flow.support.SimpleFlow.start(SimpleFlow.java:124) で
    org.springframework.batch.core.job.flow.FlowJob.doExecute(FlowJob.java:135)で
    org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:293) で
    org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:120) で
    org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:49) で
    org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:114) で
    org.springframework.batch.core.launch.support.CommandLineJobRunner.start(CommandLineJobRunner.java:349) で
    org.springframework.batch.core.launch.support.CommandLineJobRunner.main(CommandLineJobRunner.java:574) で

ステップで使用されるタスクレット構成は次のとおりです。files属性を使用して、入力ファイルを HDFS に渡そうとしています。ファイルは Hadoop ログに表示されます。

<hdp:jar-tasklet id="ingestJarTasklet" scope="step"
                 jar="file:${ingest.job.jar.path}"
                 main-class="com.atsid.cloudbase.ngram.ingest.mapreduce.NGramIngestJobRunner"
                 libs="${ingest.job.libs}"
                 files="#{seqFileLocation.URI.toString()}"
                 configuration-ref="hadoopConfiguration">
    ngram.jobrunner.input.document.sequence.file=${job.file.location}#{seqFileLocation.filename}
</hdp:jar-tasklet>
4

1 に答える 1