flink-1.0-SNAPSHOTを使用して、カフカからのデータを消費しています。データは、後で使用するために thrift に渡されるSnappy 圧縮byte[] として入ってきます。
flinkを使用してデータを取得すると、データが破損または誤って処理され、解凍できなくなります。コードはこのサンプルから派生したもので、次のとおりです。
DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer081<>(parameterTool.getRequired("topic"), new SimpleStringSchema(), parameterTool.getProperties()));
messageStream.rebalance().map(new MapFunction<String, String>() {
@Override public String map(String value) throws Exception {
boolean bvalid = Snappy.isValidCompressedBuffer(value.getBytes());
});
isValidCompressedBufferは毎回 false を返します。
データは、他の方法で消費された場合に良好であることが知られています。
私は何を取りこぼしたか?
解決:
を使用した例が見つからなかったので、これを投稿しますRawSchema
。
public static void main(String[] args) throws Exception {
// create execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// parse user parameters
ParameterTool parameterTool = ParameterTool.fromArgs(args);
DataStream<byte[]> dataStream = env.addSource(new FlinkKafkaConsumer081<>(parameterTool.getRequired("topic"), new RawSchema(), parameterTool.getProperties()));
dataStream.map(new MapFunction<byte[], Object>() {
@Override
public Object map(byte[] bytes) throws Exception {
boolean bvali = Snappy.isValidCompressedBuffer(bytes);
});
return 0;
}
}).print();
env.execute();
}