0

今のところローカル モードで Pig 0.11.1 を使用し、CSV からデータをロードしています。

ここまでで、データ セットを読み込んで、必要な計算を実行することができました。次のステップは、データからいくつかのサンプルを取得し、同じ計算を実行することです。既存のプロセスを複製するために、15 分ごとにデータ ポイントを取得します。

ここで問題が発生します。データ ポイントが正確に 15 分間隔である場合に一致するフィルターを Pig で作成できますが、15 分境界に近いデータ ポイントを取得するにはどうすればよいでしょうか?

15 分のマークを見て、そこにあるレコードを取得する必要があります。そのマークのすぐ上にレコードがない場合 (ほとんどの場合)、マークの後の次のレコードを取得する必要があります。

独自のフィルター UDF を作成する必要があると思いますが、UDF はステートフルである必要があるようです。これにより、時間間隔の後に最初の一致が見つかったことがわかるようになります。ステートフルな UDF の例を見つけることができませんでした。最終的に Hadoop に対して実行されたときにデータがどのようにマップ/縮小されるかがわからないことを考えると、おそらく悪い考えだと言えます。

キー/タイムスタンプの値を保存し、それらを解析する Python スクリプトを作成することで、いくつかの手順でこれを行うことができます。ただし、このプロセスをできる限り Pig に保持したいと考えています。

編集:最も基本的なデータは次のようなものです: {id:long, timestamp:long}. 単位はtimestampミリ秒です。データの各セットは でソートされtimestampます。レコード X が最小値 (開始時間) から 15 分の境界に正確に収まる場合はtimestamp、それを取得します。それ以外の場合は、その 15 分の境界の後で、いつでも次のレコードを取得します。手でデータを並べ替える時間がなかったため、期待される結果がどのようなものかを示す良い例がありません。

4

1 に答える 1

1

MapReduce で「それ以外の場合は、その 15 分境界の次のレコードをいつでも取得する」という条件を満たすのは難しいかもしれませんが、少し変更して、「その 15 分境界の前の前のレコードを取得する」に変更すると、それは非常に簡単かもしれません。アイデアは、15 分が 900000 ミリ秒であるということです。そのため、レコードを 900000 ミリ秒をカバーするグループにグループ化し、それらを並べ替えて、上位のものを取ることができます。これが私の頭の上からのスクリプトの例です。

inpt = LOAD '....' AS (id:long, timestamp:long);
intervals = FOREACH inpt GENERATE id, timestamp, timestamp / 900000 as interval;
grp = GROUP intervals BY interval;
result = FOREACH grp {
    sort = ORDER intervals BY timestamp DESC;
    top = LIMIT ord 1;
    GENERATE FLATTEN(top);
};
于 2013-09-05T09:18:12.433 に答える