2

DataStreamGeneric レコード タイプの2 つの s にユニオン演算子を適用しています。

package com.gslab.com.dataSets;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericData.Record;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class FlinkBroadcast {
    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);

        List<String> controlMessageList = new ArrayList<String>();
        controlMessageList.add("controlMessage1");
        controlMessageList.add("controlMessage2");

        List<String> dataMessageList = new ArrayList<String>();
        dataMessageList.add("Person1");
        dataMessageList.add("Person2");
        dataMessageList.add("Person3");
        dataMessageList.add("Person4");

        DataStream<String> controlMessageStream  = env.fromCollection(controlMessageList);
        DataStream<String> dataMessageStream  = env.fromCollection(dataMessageList);

        DataStream<GenericRecord> controlMessageGenericRecordStream = controlMessageStream.map(new MapFunction<String, GenericRecord>() {
            @Override
            public GenericRecord map(String value) throws Exception {
                 Record gr = new GenericData.Record(new Schema.Parser().parse(new File("src/main/resources/controlMessageSchema.avsc")));
                 gr.put("TYPE", value);
                 return gr;
            }
        });

        DataStream<GenericRecord> dataMessageGenericRecordStream = dataMessageStream.map(new MapFunction<String, GenericRecord>() {
            @Override
            public GenericRecord map(String value) throws Exception {
                 Record gr = new GenericData.Record(new Schema.Parser().parse(new File("src/main/resources/dataMessageSchema.avsc")));
                 gr.put("FIRSTNAME", value);
                 gr.put("LASTNAME", value+": lastname");
                 return gr;
            }
        });

        //Displaying Generic records
        dataMessageGenericRecordStream.map(new MapFunction<GenericRecord, GenericRecord>() {
            @Override
            public GenericRecord map(GenericRecord value) throws Exception {
                System.out.println("data before union: "+ value);
                return value;
            }
        });

        controlMessageGenericRecordStream.broadcast().union(dataMessageGenericRecordStream).map(new MapFunction<GenericRecord, GenericRecord>() {
            @Override
            public GenericRecord map(GenericRecord value) throws Exception {
                System.out.println("data after union: " + value);
                return value;
            }
        });
        env.execute("stream");
    }
}

出力:

05/09/2016 13:02:13 Map(2/2) switched to FINISHED 
data after union: {"TYPE": "controlMessage1"}
data before union: {"FIRSTNAME": "Person2", "LASTNAME": "Person2: lastname"}
data after union: {"TYPE": "controlMessage1"}
data before union: {"FIRSTNAME": "Person1", "LASTNAME": "Person1: lastname"}
data after union: {"TYPE": "controlMessage2"}
data after union: {"TYPE": "controlMessage2"}
data after union: {"FIRSTNAME": "Person1", "LASTNAME": "Person1"}
data before union: {"FIRSTNAME": "Person4", "LASTNAME": "Person4: lastname"}
data before union: {"FIRSTNAME": "Person3", "LASTNAME": "Person3: lastname"}
data after union: {"FIRSTNAME": "Person2", "LASTNAME": "Person2"}
data after union: {"FIRSTNAME": "Person3", "LASTNAME": "Person3"}
05/09/2016 13:02:13 Map -> Map(2/2) switched to FINISHED 
data after union: {"FIRSTNAME": "Person4", "LASTNAME": "Person4"}
05/09/2016 13:02:13 Map -> Map(1/2) switched to FINISHED 
05/09/2016 13:02:13 Map(1/2) switched to FINISHED 
05/09/2016 13:02:13 Map(2/2) switched to FINISHED 
05/09/2016 13:02:13 Job execution switched to status FINISHED.

ご覧のとおり、dataMessageGenericRecordStream のレコードは、結合後に正しくありません。すべてのフィールド値が最初のフィールド値に置き換えられています。

4

2 に答える 2

2

別の問題 (ただし、まだ GenericRecord が関係しています) についてこれを調査するのに数日を費やし、根本的な原因と解決策を見つけました。

根本的な原因: Apache Avro "Schema.class" 内では、"field" 位置は TRANSIENT であり、Kryo によってシリアル化されないため、Flink パイプライン内で逆シリアル化されると、位置 "0" として初期化されます。

これについて説明し、特に kyro シリアライゼーションについて言及している JIRA AVRO-1476 を参照してください。

これは Avro 1.7.7 で修正されました。

解決策: Flink は Avro 1.7.7 (またはそれ以降) を使用する必要があります。flink-dist_2.11-1.1.3.jar 内の Avro クラスを置き換えることでローカル マシンの修正を確認したところ、問題が修正されました。

これについて JIRA の問題を更新しました: https://issues.apache.org/jira/browse/FLINK-5039

現在、これに関する PR があります: https://github.com/apache/flink/pull/2953

そして、Flink 1.1.4 および 1.2.0 ビルドに含まれることを期待しています。

于 2016-12-08T16:47:31.670 に答える
1

DataSet API で同様の問題に直面していました。いくつかの Avro ファイルを GenericRecords として読み取っていたところ、この奇妙な動作が見られました。私はこの回避策を使用しました。それらを GenericRecords として読み取る代わりに、特定のレコード (MyAvroObject など) として読み取り、マップを使用して GenericRecords として変換/型キャストしました。

DataSet API を使用してユースケースをテストするコードをいくつか書きましたが、上記の回避策で動作します-

public static void maintest(String[] args) throws Exception {
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(2);

    List<String> queryList1 = new ArrayList<String>();
    queryList1.add("query1");
    queryList1.add("query2");

    List<String> queryList2 = new ArrayList<String>();
    queryList2.add("QUERY1");
    queryList2.add("QUERY2");
    queryList2.add("QUERY3");
    queryList2.add("QUERY4");

    DataSet<String> dataset1  = env.fromCollection(queryList1);
    DataSet<String> dataset2  = env.fromCollection(queryList2);

    DataSet<GenericRecord> genericDS1 = dataset1.map(new MapFunction<String, GenericRecord>() {
        @Override
        public GenericRecord map(String value) throws Exception {
            Query query = Query.newBuilder().setQuery(value).build();
            return (GenericRecord) query;
        }
    });

    DataSet<GenericRecord> genericDS2 = dataset2.map(new MapFunction<String, GenericRecord>() {
        @Override
        public GenericRecord map(String value) throws Exception {
            SearchEngineQuery searchEngineQuery = SearchEngineQuery.newBuilder().setSeQuery(value).build();
            return (GenericRecord) searchEngineQuery;
        }
    });

    genericDS2.map(new MapFunction<GenericRecord, GenericRecord>() {
        @Override
        public GenericRecord map(GenericRecord value) throws Exception {
            System.out.println("DEBUG: data before union: " + value);
            return value;
        }
    });

    genericDS1.union(genericDS2).map(new MapFunction<GenericRecord, GenericRecord>() {
        @Override
        public GenericRecord map(GenericRecord value) throws Exception {
            System.out.println("DEBUG: data after union: " + value);
            return value;
        }
    }).print();
}

Query と SearchEngine クエリは、私の Avro オブジェクトです (コントロール メッセージ リストとデータ メッセージ リストに似ています)。

出力:

{"query": "query1"}
{"se_query": "QUERY1"}
{"se_query": "QUERY3"}
{"query": "query2"}
{"se_query": "QUERY2"}
{"se_query": "QUERY4"}
于 2016-05-12T08:36:41.073 に答える