0

これは、公式にはまだサポートされていない maprfs と連携する spring-xd リリース 1.0.1 に関する質問です。それでも私はそれを機能させたいと思っています。

これが私たちがしたことです:

1) パラメータを受け入れるように xd-shell と xd-worker と xd-singlenode シェル スクリプトを調整しました。--hadoopDistro mapr

2) ライブラリを新しいディレクトリ $XD_HOME/lib/mapr に追加しました

avro-1.7.4.jar                              jersey-core-1.9.jar
hadoop-annotations-2.2.0.jar                jersey-server-1.9.jar
hadoop-core-1.0.3-mapr-3.0.2.jar            jetty-util-6.1.26.jar
hadoop-distcp-2.2.0.jar                     maprfs-1.0.3-mapr-3.0.2.jar
hadoop-hdfs-2.2.0.jar                       protobuf-java-2.5.0.jar
hadoop-mapreduce-client-core-2.2.0.jar      spring-data-hadoop-2.0.2.RELEASE-hadoop24.jar
hadoop-streaming-2.2.0.jar                  spring-data-hadoop-batch-2.0.2.RELEASE-hadoop24.jar
hadoop-yarn-api-2.2.0.jar                   spring-data-hadoop-core-2.0.2.RELEASE-hadoop24.jar
hadoop-yarn-common-2.2.0.jar                spring-data-hadoop-store-2.0.2.RELEASE-hadoop24.jar

3) と を実行bin/xd-singlenode --hadoopDistro maprshell/bin/xd-shell --hadoopDistro maprます。

を介してストリームを作成およびデプロイするとstream create foo --definition "time | hdfs" --deploy、データは maprfs のファイル tmp/xd/foo/foo-1.txt.tmp に書き込まれます。ただし、ストリームをアンデプロイすると、次の例外が表示されます。

org.springframework.data.hadoop.store.StoreException: Failed renaming from /xd/foo/foo-1.txt.tmp to /xd/foo/foo-1.txt; nested exception is java.io.FileNotFoundException: Requested file /xd/foo/foo-1.txt does not exist.
at org.springframework.data.hadoop.store.support.OutputStoreObjectSupport.renameFile(OutputStoreObjectSupport.java:261)
at org.springframework.data.hadoop.store.output.TextFileWriter.close(TextFileWriter.java:92)
at org.springframework.xd.integration.hadoop.outbound.HdfsDataStoreMessageHandler.doStop(HdfsDataStoreMessageHandler.java:58)
at org.springframework.xd.integration.hadoop.outbound.HdfsStoreMessageHandler.stop(HdfsStoreMessageHandler.java:94)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:201)
at com.sun.proxy.$Proxy120.stop(Unknown Source)
at org.springframework.integration.endpoint.EventDrivenConsumer.doStop(EventDrivenConsumer.java:64)
at org.springframework.integration.endpoint.AbstractEndpoint.stop(AbstractEndpoint.java:100)
at org.springframework.integration.endpoint.AbstractEndpoint.stop(AbstractEndpoint.java:115)
at org.springframework.integration.config.ConsumerEndpointFactoryBean.stop(ConsumerEndpointFactoryBean.java:303)
at org.springframework.context.support.DefaultLifecycleProcessor.doStop(DefaultLifecycleProcessor.java:229)
at org.springframework.context.support.DefaultLifecycleProcessor.access$300(DefaultLifecycleProcessor.java:51)
at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.stop(DefaultLifecycleProcessor.java:363)
at org.springframework.context.support.DefaultLifecycleProcessor.stopBeans(DefaultLifecycleProcessor.java:202)
at org.springframework.context.support.DefaultLifecycleProcessor.stop(DefaultLifecycleProcessor.java:106)
at org.springframework.context.support.AbstractApplicationContext.stop(AbstractApplicationContext.java:1186)
at org.springframework.xd.module.core.SimpleModule.stop(SimpleModule.java:234)
at org.springframework.xd.dirt.module.ModuleDeployer.destroyModule(ModuleDeployer.java:132)
at org.springframework.xd.dirt.module.ModuleDeployer.handleUndeploy(ModuleDeployer.java:111)
at org.springframework.xd.dirt.module.ModuleDeployer.undeploy(ModuleDeployer.java:83)
at org.springframework.xd.dirt.server.ContainerRegistrar.undeployModule(ContainerRegistrar.java:261)
at org.springframework.xd.dirt.server.ContainerRegistrar$StreamModuleWatcher.process(ContainerRegistrar.java:884)
at org.apache.curator.framework.imps.NamespaceWatcher.process(NamespaceWatcher.java:67)
at org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:522)
at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:498)
Caused by: java.io.FileNotFoundException: Requested file /xd/foo/foo-1.txt does not exist.
at com.mapr.fs.MapRFileSystem.getMapRFileStatus(MapRFileSystem.java:805)
at com.mapr.fs.MapRFileSystem.delete(MapRFileSystem.java:629)
at org.springframework.data.hadoop.store.support.OutputStoreObjectSupport.renameFile(OutputStoreObjectSupport.java:258)
... 29 more

機能を見てみましたOutputStoreObjectSupport.renameFile()。hdfs 上のファイルが完成すると、このメソッドはファイル /xd/foo/foo-1.txt.tmp の名前を xd/foo/foo1.txt に変更しようとします。これは関連するコードです:

        try {
        FileSystem fs = path.getFileSystem(getConfiguration());

        boolean succeed;
        try {
            fs.delete(toPath, false);
            log.info("Renaming path=[" + path + "] toPath=[" + toPath + "]");
            succeed = fs.rename(path, toPath);
        } catch (Exception e) {
            throw new StoreException("Failed renaming from " + path + " to " + toPath, e);
        }
        if (!succeed) {
            throw new StoreException("Failed renaming from " + path + " to " + toPath + " because hdfs returned false");
        }
    }

対象のファイルが hdfs 上に存在しない場合、maprfs を呼び出すと例外がスローされるようfs.delete(toPath, false)です。ただし、この場合に例外をスローしても意味がありません。他のファイルシステムの実装では動作が異なると思いますが、これはまだ確認する必要がある点です。残念ながら、MapRFileSystem.java のソースが見つかりません。これはクローズドソースですか?これは、問題をよりよく理解するのに役立ちます。spring-xd から maprfs に書き込んだ経験のある人はいますか? または、spring-data-hadoop を使用して maprfs のファイルの名前を変更しますか?

編集

簡単なテスト ケース (以下を参照) を使用して、Spring XD 以外で問題を再現することができました。この例外は、inWritingSuffix または inWritingPrefix が設定されている場合にのみスローされることに注意してください。そうしないと、spring-hadoop はファイルの名前を変更しようとしません。したがって、これはまだ私にとっては不十分な回避策です。inWritingPrefixes と inWritingSuffixes の使用を控えてください。

@ContextConfiguration("context.xml")
@RunWith(SpringJUnit4ClassRunner.class)
public class MaprfsSinkTest {

    @Autowired 
    Configuration configuration;

    @Autowired
    FileSystem filesystem;

    @Autowired 
    DataStoreWriter<String >storeWriter;

    @Test
    public void testRenameOnMaprfs() throws IOException, InterruptedException {
        Path testPath = new Path("/tmp/foo.txt");
        filesystem.delete(testPath, true);
        TextFileWriter writer = new TextFileWriter(configuration, testPath, null);
        writer.setInWritingSuffix("tmp");
        writer.write("some entity");
        writer.close();
    }

    @Test
    public void testStoreWriter() throws IOException {
        this.storeWriter.write("something");
    }

}
4

1 に答える 1

0

maprfs をサポートする spring-hadoop の新しいブランチを作成しました。

https://github.com/blinse/spring-hadoop/tree/origin/2.0.2.RELEASE-mapr

このリリースをビルドし、結果の jar を使用すると、hdfs シンクで正常に動作します。

于 2015-04-17T08:10:12.193 に答える