0

入力ファイルを解析し、Parquet 形式でマッパーのディスクに出力を書き込むレデューサーを使用せずにマップを削減するジョブがあります。このジョブは複数のフォルダーからファイルを入力として取得できるため (日付ごとに 1 つのフォルダー)、次のように出力をフォルダーに分割したいと考えています。

01JAN15
    output-0000
    output-0001

02JAN15
    output-0000
    output-0001

ドキュメントでMultipleOutput形式のクラスを見たのですが、reduce部分で複数のフォルダに書き込むだけでうまくいっているようです。

どういうわけか、同じディレクトリ内の複数のファイルへの書き込みは機能しますが、複数のディレクトリに書き込もうとするとすぐに例外が発生します (おそらく、一部のマッパーが同じディレクトリを同時に作成しようとしているからでしょうか?)。

参考までに、私のコードはマッパーで次のようになります。

mos.write("pb", null, message, date + "/output");

そして、私はそのような出力フォーマットを定義します:

MultipleOutputs.addNamedOutput(job, "pb", ProtoParquetOutputFormat.class,
Void.class, com.google.protobuf.Message.class);

私が得る例外は次のとおりです。

15/01/11 15:05:09 WARN ipc.Client: interrupted waiting to send rpc request to server
java.lang.InterruptedException
    at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:400)
    at java.util.concurrent.FutureTask.get(FutureTask.java:187)
    at org.apache.hadoop.ipc.Client$Connection.sendRpcRequest(Client.java:1046)
    at org.apache.hadoop.ipc.Client.call(Client.java:1441)
    at org.apache.hadoop.ipc.Client.call(Client.java:1399)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232)
    at com.sun.proxy.$Proxy9.getBlockLocations(Unknown Source)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:254)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
    at com.sun.proxy.$Proxy10.getBlockLocations(Unknown Source)
    at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1220)
    at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1210)
    at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1200)
    at org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:271)
    at org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:238)
    at org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:231)
    at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1498)
    at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:302)
    at org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:298)
    at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
    at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:298)
    at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:766)
    at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:272)
    at parquet.hadoop.ParquetFileReader$2.call(ParquetFileReader.java:180)
    at parquet.hadoop.ParquetFileReader$2.call(ParquetFileReader.java:176)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

私がやりたいことが可能かどうか知っていますか?私は何を間違っていますか?ありがとう !

4

3 に答える 3

0

パーティションを使用して、別のファイルに出力できます。

于 2015-01-12T05:36:10.700 に答える
0

出力ファイルは複数のプロセス (マッパーまたはリデューサー) で書き込むことができないため、複数の出力ファイルを生成するには、カスタム パーティショニングを定義するか、リデューサーでデータをグループ化し、出力ファイル名にキーを含める必要があります。マッパーが複数の入力ファイルからのデータを同じファイルに書き込むことはできません。

于 2015-01-30T06:59:46.350 に答える