1

flink-1.0-SNAPSHOTを使用して、カフカからのデータを消費しています。データは、後で使用するために thrift に渡されるSnappy 圧縮byte[] として入ってきます。

flinkを使用してデータを取得すると、データが破損または誤って処理され、解凍できなくなります。コードはこのサンプルから派生したもので、次のとおりです。

DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer081<>(parameterTool.getRequired("topic"), new SimpleStringSchema(), parameterTool.getProperties()));

messageStream.rebalance().map(new MapFunction<String, String>() {

    @Override public String map(String value) throws Exception {
    boolean bvalid = Snappy.isValidCompressedBuffer(value.getBytes());
 });

isValidCompressedBufferは毎回 false を返します

データは、他の方法で消費された場合に良好であることが知られています。

私は何を取りこぼしたか?


解決:

を使用した例が見つからなかったので、これを投稿しますRawSchema

public static void main(String[] args) throws Exception {
    // create execution environment
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // parse user parameters
    ParameterTool parameterTool = ParameterTool.fromArgs(args);

    DataStream<byte[]> dataStream = env.addSource(new FlinkKafkaConsumer081<>(parameterTool.getRequired("topic"), new RawSchema(), parameterTool.getProperties()));

    dataStream.map(new MapFunction<byte[], Object>() {
        @Override
        public Object map(byte[] bytes) throws Exception {
            boolean bvali = Snappy.isValidCompressedBuffer(bytes);

            });
            return 0;
        }
    }).print();
    env.execute();
}
4

1 に答える 1

2

バイト メッセージを文字列として読み取ることは正しくありません。バイトをそのまま読み取ってから解凍する必要があります。

public Object map(byte[] bytes) throws Exception {
    boolean bvalid = Snappy.isValidCompressedBuffer(bytes);
    ...
于 2015-10-28T06:13:03.137 に答える