2

私のユースケースは、Avro データを Kafka から HDFS にプッシュすることです。Camus は適切なツールのようですが、機能させることができません。私はcamusが初めてで、camus-exampleを機能させようとしてい ますhttps://github.com/linkedin/camus

今、camus-example を機能させようとしています。しかし、私はまだ問題に直面しています。

DummyLogKafkaProducerClient のコード スニペット

package com.linkedin.camus.example.schemaregistry;

import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Random;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

import com.linkedin.camus.etl.kafka.coders.KafkaAvroMessageEncoder;
import com.linkedin.camus.example.records.DummyLog;

public class DummyLogKafkaProducerClient {


    public static void main(String[] args) {

        Properties props = new Properties();

        props.put("metadata.broker.list", "localhost:6667");
        // props.put("serializer.class", "kafka.serializer.StringEncoder");
        // props.put("partitioner.class", "example.producer.SimplePartitioner");
        //props.put("request.required.acks", "1");

        ProducerConfig config = new ProducerConfig(props);

        Producer<String, byte[]> producer = new Producer<String, byte[]>(config);

        KafkaAvroMessageEncoder encoder = get_DUMMY_LOG_Encoder();

        for (int i = 0; i < 500; i++) {
            KeyedMessage<String, byte[]> data = new KeyedMessage<String, byte[]>("DUMMY_LOG", encoder.toBytes(getDummyLog()));
            producer.send(data);

        }
    }

    public static DummyLog getDummyLog() {
        Random random = new Random();
        DummyLog dummyLog = DummyLog.newBuilder().build();
        dummyLog.setId(random.nextLong());
        dummyLog.setLogTime(new Date().getTime());
        Map<CharSequence, CharSequence> machoStuff = new HashMap<CharSequence, CharSequence>();
        machoStuff.put("macho1", "abcd");
        machoStuff.put("macho2", "xyz");
        dummyLog.setMuchoStuff(machoStuff);
        return dummyLog;
    }

    public static KafkaAvroMessageEncoder get_DUMMY_LOG_Encoder() {
        KafkaAvroMessageEncoder encoder = new KafkaAvroMessageEncoder("DUMMY_LOG", null);
        Properties props = new Properties();
        props.put(KafkaAvroMessageEncoder.KAFKA_MESSAGE_CODER_SCHEMA_REGISTRY_CLASS, "com.linkedin.camus.example.schemaregistry.DummySchemaRegistry");
        encoder.init(props, "DUMMY_LOG");
        return encoder;

    }
}

インスタンス化例外を与えていたため、DummySchemaRegistryのデフォルトの引数なしコンストラクターも追加されました

package com.linkedin.camus.example.schemaregistry;

import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;

import com.linkedin.camus.example.records.DummyLog;
import com.linkedin.camus.example.records.DummyLog2;
import com.linkedin.camus.schemaregistry.MemorySchemaRegistry;

/**
 * This is a little dummy registry that just uses a memory-backed schema registry to store two dummy Avro schemas. You
 * can use this with camus.properties
 */
public class DummySchemaRegistry extends MemorySchemaRegistry<Schema> {
    public DummySchemaRegistry(Configuration conf) {
        super();
        super.register("DUMMY_LOG", DummyLog.newBuilder().build().getSchema());
        super.register("DUMMY_LOG_2", DummyLog2.newBuilder().build()
                .getSchema());
    }
    public DummySchemaRegistry() {
        super();
        super.register("DUMMY_LOG", DummyLog.newBuilder().build().getSchema());
        super.register("DUMMY_LOG_2", DummyLog2.newBuilder().build().getSchema());
    }
}

プログラムの実行後に取得する例外トレースの下

スレッド「メイン」の例外 com.linkedin.camus.coders.MessageEncoderException: org.apache.avro.AvroRuntimeException: org.apache.avro.AvroRuntimeException: フィールド ID タイプ:LONG pos:0 が設定されておらず、com にデフォルト値がありません。 linkedin.camus.etl.kafka.coders.KafkaAvroMessageEncoder.init(KafkaAvroMessageEncoder.java:55) com.linkedin.camus.example.schemaregistry.DummyLogKafkaProducerClient.get_DUMMY_LOG_Encoder(DummyLogKafkaProducerClient.java:57) at com.linkedin.camus.example. schemaregistry.DummyLogKafkaProducerClient.main(DummyLogKafkaProducerClient.java:32) 原因: org.apache.avro.AvroRuntimeException: org.apache.avro.AvroRuntimeException: フィールド ID タイプ:LONG pos:0 が設定されておらず、com.linkedin にデフォルト値がありません.camus.example.records.DummyLog$Builder.build(DummyLog.java:214) com.linkedin.camus.example.schemaregistry.DummySchemaRegistry.(DummySchemaRegistry.java:16) の sun.reflect.NativeConstructorAccessorImpl.newInstance0(ネイティブ メソッド) の sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) の sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl) .java:45) で java.lang.reflect.Constructor.newInstance(Constructor.java:408) で java.lang.Class.newInstance(Class.java:438) で com.linkedin.camus.etl.kafka.coders. KafkaAvroMessageEncoder.init(KafkaAvroMessageEncoder.java:52) ... 2 つ以上 原因: org.apache.avro.AvroRuntimeException: フィールド ID タイプ:LONG pos:0 が設定されておらず、org.apache.avro.data にデフォルト値がありません。 com.linkedin.camus.example.records.DummyLog$Builder の RecordBuilderBase.defaultValue(RecordBuilderBase.java:151)。build(DummyLog.java:209) ... 9 もっと見る

4

4 に答える 4

1

camus は、Avro スキーマにデフォルト値があることを期待していると思います。私は私のdummyLog.avscを次のように変更し、再コンパイルしました-

{ "namespace": "com.linkedin.camus.example.records", "type": "record", "name": "DummyLog", "doc": "あまり重要でないログ", "fields" : [ { "name": "id", "type": "int", "default": 0 }, { "name": "logTime", "type": "int", "default": 0 } ] }

それがあなたのために働くかどうか私に知らせてください。

ありがとう、アンバーリッシュ

于 2015-02-09T23:56:14.833 に答える
0

次のように、任意の String フィールドまたは Long フィールドをデフォルト設定できます。

  {"type":"record","name":"CounterData","namespace":"org.avro.usage.tutorial","fields":[{"name":"word","type":["string","null"]},{"name":"count","type":["long","null"]}]}
于 2015-03-24T05:14:56.977 に答える
0

Camus は、スキーマにデフォルト値があるとは想定していません。私は最近カミュを使用しましたが、同じ問題が見つかりました。実際、スキーマ レジストリでの使用方法は、デフォルトの例では正しくありません。Camus コードにいくつかの変更を加えました。 https://github.com/chandanbansal/camusを確認してください。機能させるための小さな変更があります。Avro レコードのデコーダーはありません。私もそう書いています。

于 2015-09-30T10:53:34.247 に答える