私はDSE 3.1.2を使用しています。Pigを使用して、前処理結果の一部をcqlに保存しています。テーブルを作成し、スクリプトを開始しました。これは、少量のデータに対して機能しましたが、数レコードが増加すると、cassandraに保存されません、90% のみ、または出力が cassandra に格納されています。
ここに私のスクリプトがあります
SET default_parallel 10;
result = foreach PreprocUDF generate Primary1,Primary2,col3,col4;
finalresult = foreach result generate TOTUPLE(TOTUPLE('Primary1',PreprocUDF::Primary1),TOTUPLE('Primary2',PreprocUDF::Primary2)),TOTUPLE(PreprocUDF::col3,PreprocUDF::col4);
store finalresult into 'cql://conflux/tbl_test?output_query=update+conflux.tbl_test+set+col3+%3D+%3F+,col4+%3D+%3F' using CqlStorage();
現在、次のエラーが発生しており、レコードの 90% が Cassandra にダンプされています。
ERROR - 2014-04-29 01:53:49.590; org.apache.hadoop.security.UserGroupInformation; PriviledgedActionException as:sarrajen cause:java.io.IOException: java.io.IOException: InvalidRequestException(why:Expected 8 or 0 byte long (4))
WARN - 2014-04-29 01:53:49.590; org.apache.hadoop.mapred.Child; Error running child
java.io.IOException: java.io.IOException: InvalidRequestException(why:Expected 8 or 0 byte long (4))
at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.runPipeline(PigGenericMapReduce.java:465)
at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.processOnePackageOutput(PigGenericMapReduce.java:428)
at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.reduce(PigGenericMapReduce.java:408)
at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.reduce(PigGenericMapReduce.java:262)
at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:176)
at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:652)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:420)
at org.apache.hadoop.mapred.Child$4.run(Child.java:266)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
at org.apache.hadoop.mapred.Child.main(Child.java:260)
Caused by: java.io.IOException: InvalidRequestException(why:Expected 8 or 0 byte long (4))
at org.apache.cassandra.hadoop.cql3.CqlRecordWriter$RangeClient.run(CqlRecordWriter.java:248)
Caused by: InvalidRequestException(why:Expected 8 or 0 byte long (4))
at org.apache.cassandra.thrift.Cassandra$execute_prepared_cql3_query_result.read(Cassandra.java:42694)
at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:78)
at org.apache.cassandra.thrift.Cassandra$Client.recv_execute_prepared_cql3_query(Cassandra.java:1724)
at org.apache.cassandra.thrift.Cassandra$Client.execute_prepared_cql3_query(Cassandra.java:1709)
at org.apache.cassandra.hadoop.cql3.CqlRecordWriter$RangeClient.run(CqlRecordWriter.java:232)
INFO - 2014-04-29 01:53:49.764; org.apache.hadoop.mapred.Task; Runnning cleanup for the task
Cassandraに手動で挿入すると、実行されたlatレコードは正常に機能します。
最終結果をCFSにダンプすると、すべてが正常に機能します
store result into '/testop'
Output(s):
Successfully stored 56347 records in: "/testop"
また、データをCFSにダンプしてから、CFSからCassandra DBにダンプしようとしましたが、これはうまくいきました。どこが間違っているか教えてください。複合キーを使用してCQLでテーブルを作成しました。列にデータ型を指定したため、コンパレータとバリデータを指定する必要はないと思います。
store result into '/testop'
x = load '/testop' as (Primary1:chararray,Primary2:long,col3:chararray,col4:long);
finalresult = foreach x generate TOTUPLE(TOTUPLE('Primary1',Primary1),TOTUPLE('Primary2',Primary2)),TOTUPLE(col3,col4);
store finalresult into 'cql://conflux/tbl_test?output_query=update+conflux.tbl_test+set+col3+%3D+%3F+,col4+%3D+%3F' using CqlStorage();
これでうまくいきました。
どこが間違っているか教えてください。