1

いくつかのデータを含む 1 つの Apache アクセス ログ ファイルがあり、それは継続的に増加しています。Apache Spark Streaming API を使用してそのデータを分析したいと考えています。

そして、Spark は私にとって新しいものであり、 jssc.textFileStream(directory)関数を使用してログ データを取得する 1 つのプログラムを作成しました。しかし、私の要件では機能しません。

spark を使用してそのログ ファイルを分析する方法をいくつか提案してください。

これが私のコードです。

SparkConf conf = new SparkConf()
                .setMaster("spark://192.168.1.9:7077")
                .setAppName("log streaming")
                .setSparkHome("/usr/local/spark")
                .setJars(new String[] { "target/sparkstreamingdemo-0.0.1.jar" });
        StreamingContext ssc = new StreamingContext(conf, new Duration(5000));
        DStream<String> filerdd = ssc.textFileStream("/home/user/logs");
        filerdd.print();
        ssc.start();
        ssc.awaitTermination();

このコードは、既存のファイルからデータを返しません。これは、新しいファイルを作成するときにのみ機能しますが、その新しいファイルを更新すると、プログラムは更新されたデータを返しません。

4

1 に答える 1

3

ファイルがリアルタイムで変更される場合は、Apache Commons IO のTailerを使用できます。これが最も単純なサンプルです。

     public void readLogs(File f, long delay) {
        TailerListener listener = new MyTailerListener();
        Tailer tailer = new Tailer(f, listener, delay);

        // stupid executor impl. for demo purposes
        Executor executor = new Executor() {
            public void execute(Runnable command) {
                command.run();
             }
        };
        executor.execute(tailer);       
    }

    public class MyTailerListener extends TailerListenerAdapter {
        public void handle(String line) {
            System.out.println(line);
        }
    }

上記のコードは、Apache Flumeのログ リーダーとして使用でき、ソースとして適用できます。次に、収集したログを Spark ストリームにリダイレクトするように Flume シンクを構成し、Spark を適用して Flume ストリームからのデータを分析する必要があります ( http://spark.apache.org/docs/latest/streaming-flume-integration.html ) 。

Flume セットアップの詳細については、次の記事を参照してください: Apache Spark Streaming を使用したリアルタイム ログ処理

于 2015-02-24T02:02:32.287 に答える