3

Cascading を使用してファイルを Parquet に変換しようとしています。しかし、私は以下のエラーが発生しています。

エラー

Exception in thread "main" cascading.flow.planner.PlannerException: tap named: 'Copy', cannot be used as a sink: Hfs["ParquetTupleScheme[['A', 'B']->[ALL]]"]["/user/cloudera/htcountp"]
at cascading.flow.planner.FlowPlanner.verifyTaps(FlowPlanner.java:240)
at cascading.flow.planner.FlowPlanner.verifyAllTaps(FlowPlanner.java:174)
at cascading.flow.hadoop.planner.HadoopPlanner.buildFlow(HadoopPlanner.java:242)
at cascading.flow.hadoop.planner.HadoopPlanner.buildFlow(HadoopPlanner.java:80)
at cascading.flow.FlowConnector.connect(FlowConnector.java:459)
at first.Copy.main(Copy.java:49)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.hadoop.util.RunJar.main(RunJar.java:212)

コード

Scheme sourceScheme = new TextDelimited(new Fields("A","B"), ", ");
Scheme sinkScheme = new ParquetTupleScheme(new Fields("A", "B"));

// create the source tap
Tap inTap = new Hfs(sourceScheme, inPath );

// create the sink tap
Tap outTap = new Hfs( sinkScheme, outPath );

// specify a pipe to connect the taps
Pipe copyPipe = new Pipe("Copy"); 

// connect the taps, pipes, etc., into a flow
FlowDef flowDef = FlowDef.flowDef()
 .addSource( copyPipe, inTap )
 .addTailSink( copyPipe, outTap );

// run the flow
flowConnector.connect( flowDef ).complete();
4

2 に答える 2

1

同じ問題に遭遇しました。ソース コードを見ると、Parquet スキーマを ParquetTupleScheme のコンストラクターに渡して、データを HDFS にシリアル化できるようにする必要があります。このクラスには isSink() メソッドがあり、それが存在することを確認します。それ以外の場合、それはシンクではなく、コードは特定したエラーをスローします。

于 2015-03-26T03:23:45.193 に答える
0

少し遅れた。しかし、私も同じ問題に出くわしました。以下のように、ParquetTupleScheme の宣言には別のバリエーションがあります。

new ParquetTupleScheme(Fields SourceFields, Fields SinkFields, String ParquetSchema)

シンク スキームの宣言を以下のように変更します。

Scheme sinkScheme = new ParquetTupleScheme(new Fields("A", "B"), new Fields("A", "B"), "message FileName { required Binary A , required Binary B }" );
于 2015-11-20T02:49:06.300 に答える