1

すべてのjsonファイルが存在するスプールディレクトリがあります.着信ファイルは毎秒このディレクトリに追加されます.着信jsonファイルをデシリアライズし、必要なフィールドを取得してHDFSディレクトリに追加する必要があります.

私がしたことは、ソースとしてスプール ディレクトリからファイルを取得し、1 つのシンクを使用して json ファイルを HDFS に直接配置する Flume conf ファイルを作成することでした。

この json を Sink の前に構造体形式にして、HDFS に配置する必要があります。最も重要なことは、それはツイッターのデータではないということです。そして、純粋に Flume を実装する必要があります。

以下のflume構成を使用して、仕事を完了しました。

agent_slave_1.channels.fileChannel1_1.type = file 
agent_slave_1.channels.fileChannel1_1.capacity = 200000
agent_slave_1.channels.fileChannel1_1.transactionCapacity = 1000
agent_slave_1.sources.source1_1.type = spooldir

agent_slave_1.sources.source1_1.spoolDir = /home/cloudera/runs/
agent_slave_1.sources.source1_1.fileHeader = false
agent_slave_1.sources.source1_1.fileSuffix = .COMPLETED
agent_slave_1.sinks.hdfs-sink1_1.type = hdfs
agent_slave_1.sinks.hdfs-sink1_1.hdfs.path =hdfs://localhost.localdomain:8020/user/cloudera/runs_scored/
agent_slave_1.sinks.hdfs-sink1_1.hdfs.batchSize = 1000
agent_slave_1.sinks.hdfs-sink1_1.hdfs.rollSize = 268435456
agent_slave_1.sinks.hdfs-sink1_1.hdfs.rollInterval = 0
agent_slave_1.sinks.hdfs-sink1_1.hdfs.rollCount = 50000000
agent_slave_1.sinks.hdfs-sink1_1.hdfs.writeFormat=Text

agent_slave_1.sinks.hdfs-sink1_1.hdfsfileType = DataStream
agent_slave_1.sources.source1_1.channels = fileChannel1_1
agent_slave_1.sinks.hdfs-sink1_1.channel = fileChannel1_1

agent_slave_1.sinks =  hdfs-sink1_1
agent_slave_1.sources = source1_1
agent_slave_1.channels = fileChannel1_1

しかし、デシリアライザーの使い方がわかりません。

Incoming Json ファイルを逆シリアル化する方法を教えてもらえますか? Javaでコードを書く必要がある場合は、私を助けてください。どのインターフェースを使用する必要がありますか? 可能であれば、いくつかのヒントを与えてください。

4

1 に答える 1

1

最良の推測は、JSON を目的の HDFS 形式に変換するカスタム インターセプターを作成することです。また、hdfs パスで使用できるヘッダーを設定するという利点もあります。

インターセプターを構成する方法は次のとおりです。

agent_slave_1.sources.source1_1.interceptors = my_intercptor
agent_slave_1.sources.source1_1.interceptors.my_intercptor.type = com.mycompany.MyInteceptor

クラスは次のようになります。

public class MyInteceptor implements Interceptor, Interceptor.Builder {

    private MyInteceptor interceptor;

    @Override
    public void initialize() {


    }

    @Override
    public Event intercept(Event event) {
        String bjson = event.getBody()));
        // decode your json, e.g. Jackson
        MyDecodedJsonObject record; // pseudo class
        event.getHeaders().put("timestamp", record.getTimestamp().toString());
        String newBody = record.getA() + "\t" + record.getB();
        event.setBody(newBody.getBytes())
        return event;
    }

    @Override
    public List<Event> intercept(List<Event> events) {

        for (Iterator<Event> iterator = events.iterator(); iterator.hasNext();) {
            Event next = intercept(iterator.next());
            if (next == null) {
                iterator.remove();
            }
        }
        return events;
    }

    @Override
    public void close() {


    }

    @Override
    public Interceptor build() {
        return interceptor;
    }

    @Override
    public void configure(Context context) {

        interceptor = new MyInteceptor();
    }

}

このクラスを jar にパッケージ化し、flume の lib ディレクトリにドロップすることを忘れないでください。

于 2015-01-13T11:04:43.370 に答える