8

顧客からの電話のデータを保持する Hive テーブルがあります。簡単にするために、2 つの列があると考えてください。最初の列には顧客 ID が保持され、2 番目の列には呼び出しのタイムスタンプ (UNIX タイムスタンプ) が保持されます。

このテーブルにクエリを実行して、各顧客のすべての通話を見つけることができます。

SELECT * FROM mytable SORT BY customer_id, call_time;

結果は次のとおりです。

Customer1    timestamp11
Customer1    timestamp12
Customer1    timestamp13
Customer2    timestamp21
Customer3    timestamp31
Customer3    timestamp32
...

顧客ごとに、2 番目の呼び出しから開始して、2 つの連続する呼び出し間の時間間隔を返す Hive クエリを作成することは可能ですか? 上記の例では、クエリは次を返す必要があります。

Customer1    timestamp12-timestamp11
Customer1    timestamp13-timestamp12
Customer3    timestamp32-timestamp31
...

私はSQLソリューションからソリューションを適応させようとしましたが、Hiveの制限に固執しています.FROMでのみサブクエリを受け入れ結合には等式のみを含める必要があります.

ありがとうございました。

EDIT1:

Hive UDF 関数を使用しようとしました。

public class DeltaComputerUDF extends UDF {
private String previousCustomerId;
private long previousCallTime;

public String evaluate(String customerId, LongWritable callTime) {
    long callTimeValue = callTime.get();
    String timeDifference = null;

    if (customerId.equals(previousCustomerId)) {
        timeDifference = new Long(callTimeValue - previousCallTime).toString();
    }

    previousCustomerId = customerId;
    previousCallTime = callTimeValue;

    return timeDifference;
}}

「デルタ」という名前で使用します。

しかし、(ログと結果から) MAP 時に使用されているようです。このことから 2 つの問題が発生します。

最初:この関数を使用する前に、テーブル データを顧客 ID とタイムスタンプで並べ替える必要があります。クエリ:

 SELECT customer_id, call_time, delta(customer_id, call_time) FROM mytable DISTRIBUTE BY customer_id SORT BY customer_id, call_time;

私の機能が使用されてからずっと後、ソート部分がREDUCE時に実行されるため、機能しません。

関数を使用する前にテーブル データを並べ替えることができますが、回避したいオーバーヘッドであるため、これには満足できません。

2 つ目:分散 Hadoop 構成の場合、データは使用可能なジョブ トラッカー間で分割されます。したがって、この関数には複数のインスタンスがあり、マッパーごとに 1 つあると思います。そのため、同じ顧客データを 2 つのマッパーに分割することができます。この場合、顧客からの電話を失うことになりますが、これは受け入れられません。

この問題を解決する方法がわかりません。DISTRIBUTE BY は、特定の値を持つすべてのデータが同じレデューサーに送信されることを保証することを知っています (したがって、SORT が期待どおりに機能することを保証します)。

次に、reduce スクリプトを使用するという libjack の提案に従う予定です。この「計算」は、他のいくつかのハイブクエリ間で必要になるため、Balaswamy vaddeman が提案したように、別のツールに移動する前に、Hive が提供するすべてのものを試してみたいと思います。

EDIT2:

カスタム スクリプト ソリューションの調査を開始しました。しかし、プログラミング Hive ブックの第 14 章の最初のページ (この章ではカスタム スクリプトについて説明します) で、次の段落を見つけました。

通常、ストリーミングは、同等の UDF または InputFormat オブジェクトをコーディングするよりも効率的ではありません。データをパイプに出し入れするためにデータをシリアライズおよびデシリアライズすることは、比較的非効率的です。また、プログラム全体を統一的にデバッグすることも困難です。ただし、プロトタイプを迅速に作成したり、Java で記述されていない既存のコードを活用したりする場合には便利です。Java コードを書きたくない Hive ユーザーにとって、これは非常に効果的なアプローチになる可能性があります。

したがって、カスタム スクリプトが効率の点で最適なソリューションではないことは明らかでした。

しかし、UDF 機能を維持しながら、分散 Hadoop 構成で期待どおりに機能することを確認するにはどうすればよいでしょうか? この質問に対する答えは、Language Manual UDF wiki ページの UDF Internals セクションにあります。クエリを書くと:

 SELECT customer_id, call_time, delta(customer_id, call_time) FROM (SELECT customer_id, call_time FROM mytable DISTRIBUTE BY customer_id SORT BY customer_id, call_time) t;

これは REDUCE 時に実行され、DISTRIBUTE BY および SORT BY コンストラクトは、同じ顧客からのすべてのレコードが呼び出し順に同じリデューサーによって処理されることを保証します。

したがって、上記のUDFとこのクエリ構造は私の問題を解決します.

(リンクを貼れなくて申し訳ありませんが、名声点が足りないので貼れません)

4

3 に答える 3

13

これは古い質問ですが、今後の参考のために、別の命題をここに書きます。

Hiveウィンドウ関数を使用すると、クエリで前/次の値を使用できます。

同様のコード クエリは次のようになります。

SELECT customer_id, call_time - LAG(call_time, 1, 0) OVER (PARTITION BY customer_id ORDER BY call_time) FROM mytable;
于 2014-09-02T16:13:28.440 に答える
1

MAP-REDUCEJava や Python などの他のプログラミング言語で明示的に使用できます。マップ{cutomer_id,call_time}から出力し、レデューサーで取得する場所{customer_id,list{time_stamp}}と、レデューサーでこれらのタイムスタンプを並べ替えて、データを処理できます。

于 2013-07-03T17:01:14.003 に答える
0

誰かが同様の要件に遭遇した可能性があります。私が見つけた解決策は次のとおりです。

1)カスタム関数を作成します。

package com.example;
// imports (they depend on the hive version)
@Description(name = "delta", value = "_FUNC_(customer id column, call time column) "
    + "- computes the time passed between two succesive records from the same customer. "
    + "It generates 3 columns: first contains the customer id, second contains call time "
    + "and third contains the time passed from the previous call. This function returns only "
    + "the records that have a previous call from the same customer (requirements are not applicable "
    + "to the first call)", extended = "Example:\n> SELECT _FUNC_(customer_id, call_time) AS"
    + "(customer_id, call_time, time_passed) FROM (SELECT customer_id, call_time FROM mytable "
    + "DISTRIBUTE BY customer_id SORT BY customer_id, call_time) t;")
public class DeltaComputerUDTF extends GenericUDTF {
private static final int NUM_COLS = 3;

private Text[] retCols; // array of returned column values
private ObjectInspector[] inputOIs; // input ObjectInspectors
private String prevCustomerId;
private Long prevCallTime;

@Override
public StructObjectInspector initialize(ObjectInspector[] ois) throws UDFArgumentException {
    if (ois.length != 2) {
        throw new UDFArgumentException(
                "There must be 2 arguments: customer Id column name and call time column name");
    }

    inputOIs = ois;

    // construct the output column data holders
    retCols = new Text[NUM_COLS];
    for (int i = 0; i < NUM_COLS; ++i) {
        retCols[i] = new Text();
    }

    // construct output object inspector
    List<String> fieldNames = new ArrayList<String>(NUM_COLS);
    List<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>(NUM_COLS);
    for (int i = 0; i < NUM_COLS; ++i) {
        // column name can be anything since it will be named by UDTF as clause
        fieldNames.add("c" + i);
        // all returned type will be Text
        fieldOIs.add(PrimitiveObjectInspectorFactory.writableStringObjectInspector);
    }

    return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
}

@Override
public void process(Object[] args) throws HiveException {
    String customerId = ((StringObjectInspector) inputOIs[0]).getPrimitiveJavaObject(args[0]);
    Long callTime = ((LongObjectInspector) inputOIs[1]).get(args[1]);

    if (customerId.equals(prevCustomerId)) {
        retCols[0].set(customerId);
        retCols[1].set(callTime.toString());
        retCols[2].set(new Long(callTime - prevCallTime).toString());
        forward(retCols);
    }

    // Store the current customer data, for the next line
    prevCustomerId = customerId;
    prevCallTime = callTime;
}

@Override
public void close() throws HiveException {
    // TODO Auto-generated method stub

}

}

2)この関数を含むjarを作成します。jarnameがmyjar.jarであるとします。

3)jarをHiveを使用してマシンにコピーします。/tmpに配置されているとします

4)Hive内でカスタム関数を定義します。

ADD JAR /tmp/myjar.jar;
CREATE TEMPORARY FUNCTION delta AS 'com.example.DeltaComputerUDTF';

5)クエリを実行します。

SELECT delta(customer_id, call_time) AS (customer_id, call_time, time_difference) FROM 
  (SELECT customer_id, call_time FROM mytable DISTRIBUTE BY customer_id SORT BY customer_id, call_time) t;

備考:

a。call_time列にはデータがbigintとして格納されていると想定しました。文字列の場合、プロセス関数で(customerIdの場合と同様に)文字列として取得し、Longに解析します。

b。UDFの代わりにUDTFを使用することにしました。これにより、必要なすべてのデータが生成されます。それ以外の場合(UDFを使用)、生成されたデータをフィルタリングしてNULL値をスキップする必要があります。したがって、元の投稿の最初の編集で説明されているUDF関数(DeltaComputerUDF)を使用すると、クエリは次のようになります。

SELECT customer_id, call_time, time_difference 
FROM 
  (
    SELECT delta(customer_id, call_time) AS (customer_id, call_time, time_difference) 
    FROM 
      (
         SELECT customer_id, call_time FROM mytable
         DISTRIBUTE BY customer_id
         SORT BY customer_id, call_time
       ) t
   ) u 
WHERE time_difference IS NOT NULL;

c. Both functions (UDF and UDTF) work as desired, no matter the order of rows inside the table (so there is no requirement that table data to be sorted by customer id and call time before using delta functions)

于 2013-02-13T12:43:41.473 に答える