65

私が理解する限りでは;

  • レデューサーでのみ並べ替える

  • グローバルに注文するが、すべてを1つのレデューサーに押し込む

  • cluster byは、キーハッシュによってレデューサーにインテリジェントにデータを分散し、並べ替えを行います。

だから私の質問は、クラスターはグローバルな秩序を保証するのですか?同じキーを同じレデューサーに配置して配布しますが、隣接するキーはどうですか?

私がこれについて見つけることができる唯一の文書はここにあり、例からそれはそれらを世界的に注文しているようです。しかし、定義からすると、必ずしもそうとは限らないように感じます。

4

8 に答える 8

168

短い答え:はい、CLUSTER BY複数の出力ファイルを自分で結合する意思がある場合は、グローバルな順序付けが保証されます。

長いバージョン:

  • ORDER BY x:グローバルな順序付けを保証しますが、これはすべてのデータを1つのレデューサーにプッシュすることで実現します。これは、大規模なデータセットでは基本的に受け入れられません。最終的に、1つのソートされたファイルが出力として作成されます。
  • SORT BY x:N個のレデューサーのそれぞれでデータを注文しますが、各レデューサーは重複する範囲のデータを受信できます。範囲が重複するN個以上のソートされたファイルになります。
  • DISTRIBUTE BY x:N個のレデューサーのそれぞれが重複しない範囲のを取得することを保証xしますが、各レデューサーの出力はソートしません。範囲が重複しないN個以上のソートされていないファイルになります。
  • CLUSTER BY x:N個のレデューサーのそれぞれが重複しない範囲を取得するようにし、レデューサーでそれらの範囲で並べ替えます。DISTRIBUTE BY xこれにより、グローバルな順序付けが可能になり、(および)を実行するのと同じになりSORT BY xます。範囲が重複しないN個以上のソートされたファイルになります。

わかる?つまりCLUSTER BY、基本的にはよりスケーラブルなバージョンですORDER BY

于 2013-09-17T01:09:05.527 に答える
15

最初に明確にclustered byしておきます。キーを異なるバケットにのみ配布し、clustered by ... sorted byバケットを並べ替えます。

簡単な実験(以下を参照)を使用すると、デフォルトではグローバルオーダーを取得できないことがわかります。その理由は、デフォルトのパーティショナーが実際のキーの順序に関係なくハッシュコードを使用してキーを分割するためです。

ただし、データを完全に注文することはできます。

動機は、Tom Whiteによる「Hadoop:The Definitive Guide」(第3版、第8章、274ページ、Total Sort)で、TotalOrderPartitionerについて説明しています。

最初にTotalOrderingの質問に答えてから、私が行ったソート関連のHive実験について説明します。

ここで説明しているのは「概念実証」であり、ClauderaのCDH3ディストリビューションを使用して1つの例を処理することができました。

もともと私はorg.apache.hadoop.mapred.lib.TotalOrderPartitionerがうまくいくことを望んでいました。残念ながら、キーではなく値でHiveパーティションのように見えるため、そうではありませんでした。だから私はそれにパッチを当てます(サブクラスがあるはずですが、そのための時間がありません):

交換

public int getPartition(K key, V value, int numPartitions) {
    return partitions.findPartition(key);
}

public int getPartition(K key, V value, int numPartitions) {
    return partitions.findPartition(value);
}

これで、TotalOrderPartitionerをHiveパーティショナーとして設定(パッチ)できます。

hive> set hive.mapred.partitioner=org.apache.hadoop.mapred.lib.TotalOrderPartitioner;

hive> set total.order.partitioner.natural.order=false

hive> set total.order.partitioner.path=/user/yevgen/out_data2

私も使用しました

hive> set hive.enforce.bucketing = true; 

hive> set mapred.reduce.tasks=4;

私のテストでは。

ファイルout_data2は、TotalOrderPartitionerに値をバケット化する方法を指示します。データをサンプリングしてout_data2を生成します。私のテストでは、0から10までの4つのバケットとキーを使用しました。アドホックアプローチを使用してout_data2を生成しました。

import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.fs.FileSystem;


public class TotalPartitioner extends Configured implements Tool{
    public static void main(String[] args) throws Exception{
            ToolRunner.run(new TotalPartitioner(), args);
    }

    @Override
    public int run(String[] args) throws Exception {
        Path partFile = new Path("/home/yevgen/out_data2");
        FileSystem fs = FileSystem.getLocal(getConf());

        HiveKey key = new HiveKey();
        NullWritable value = NullWritable.get();

        SequenceFile.Writer writer = SequenceFile.createWriter(fs, getConf(), partFile, HiveKey.class, NullWritable.class);
        key.set( new byte[]{1,3}, 0, 2);//partition at 3; 1 came from Hive -- do not know why
        writer.append(key, value);
        key.set( new byte[]{1, 6}, 0, 2);//partition at 6
        writer.append(key, value);
        key.set( new byte[]{1, 9}, 0, 2);//partition at 9
        writer.append(key, value);
        writer.close();
        return 0;
    }

}

次に、結果のout_data2をHDFS(/ user / yevgen / out_data2)にコピーしました。

これらの設定を使用して、データをバケット化/並べ替えました(実験リストの最後の項目を参照)。

これが私の実験です。

  • サンプルデータを作成する

    bash> echo -e "1 \ n3 \ n2 \ n4 \ n5 \ n7 \ n6 \ n8 \ n9 \ n0"> data.txt

  • 基本的なテストテーブルを作成します。

    ハイブ>テーブルテストの作成(x int); ハイブ>データローカルインパス'data.txt'をテーブルテストにロードします。

基本的に、このテーブルには0から9までの値が順序なしで含まれています。

  • テーブルのコピーがどのように機能するかを示します(使用するreduceタスクの最大数を設定する実際のmapred.reduce.tasksパラメーター)

    ハイブ>テーブルtest2(x int);を作成します。

    ハイブ>setmapred.reduce.tasks = 4;

    ハイブ>挿入上書きテーブルtest2selectax from test a join test b on ax = bx; --自明でないマップを強制するための愚かな参加-reduce

    bash> hadoop fs -cat / user / hive / Warehouse / test2 / 000001_0

    1

    5

    9

  • バケット化を示します。キーが並べ替え順序なしでランダムに割り当てられていることがわかります。

    ハイブ>(x)によって4つのバケットにクラスター化されたテーブルtest3(x int)を作成します。

    hive> set hive.enforce.bucketing = true;

    ハイブ>挿入上書きテーブルtest3select* from test;

    bash> hadoop fs -cat / user / hive / Warehouse / test3 / 000000_0

    4

    8

    0

  • 並べ替えによるバケット化。結果は完全にソートされているのではなく、部分的にソートされています

    ハイブ>(x)でクラスター化されたテーブルtest4(x int)を(x desc)で4つのバケットにソートして作成します。

    ハイブ>挿入上書きテーブルtest4select* from test;

    bash> hadoop fs -cat / user / hive / Warehouse / test4 / 000001_0

    1

    5

    9

値が昇順で並べ替えられていることがわかります。CDH3のHiveバグのように見えますか?

  • ステートメントごとにクラスターなしで部分的にソートされる:

    hive> create table test5 as select x from test distribution by x sort by x desc;

    bash> hadoop fs -cat / user / hive / Warehouse / test5 / 000001_0

    9

    5

    1

  • パッチを適用したTotalOrderParitionerを使用します。

    hive> set hive.mapred.partitioner = org.apache.hadoop.mapred.lib.TotalOrderPartitioner;

    ハイブ>settotal.order.partitioner.natural.order = false

    ハイブ>settotal.order.partitioner.path = / user / training / out_data2

    ハイブ>(x)でクラスター化されたテーブルtest6(x int)を(x)で4つのバケットにソートして作成します。

    ハイブ>挿入上書きテーブルtest6select* from test;

    bash> hadoop fs -cat / user / hive / Warehouse / test6 / 000000_0

    1

    2

    0

    bash> hadoop fs -cat / user / hive / Warehouse / test6 / 000001_0

    3

    4

    5

    bash> hadoop fs -cat / user / hive / Warehouse / test6 / 000002_0

    7

    6

    8

    bash> hadoop fs -cat / user / hive / Warehouse / test6 / 000003_0

    9

于 2012-12-05T09:25:11.200 に答える
10

CLUSTERBYはグローバル順序を生成しません。

受け入れられた回答(Lars Yenckenによる)は、レデューサーが重複しない範囲を受け取ると述べて誤解を招きます。Anton ZaviriukhinがBucketedTablesのドキュメントを正しく指しているため、CLUSTER BYは基本的にDISTRIBUTEBY(バケット化と同じ)と各バケット/リデューサー内のSORTBYです。また、DISTRIBUTE BYは、単純にハッシュとmodをバケットに入れます。ハッシュ関数順序を保持できますが(i> jの場合はiのハッシュ>jのハッシュ)、ハッシュ値のmodは保持しません。

重複する範囲を示すより良い例を次に示します

http://myitlearnings.com/bucketing-in-hive/

于 2016-08-04T15:59:38.757 に答える
5

私が理解しているように、短い答えは「いいえ」です。範囲が重複します。

SortByドキュメントから:「ClusterByは、DistributeByとSortByの両方のショートカットです。」「同じDistributeBy列を持つすべての行は、同じレデューサーに移動します。」ただし、重複しない範囲を保証して配布するという情報はありません。

さらに、DDL BucketedTablesのドキュメントから:「Hiveはどのようにバケット間で行を分散しますか?一般に、バケット番号は式hash_function(bucketing_column)modnum_bucketsによって決定されます。」Cluster by in Selectステートメントは、バケット化されたテーブルにデータを取り込むことが主な用途であるため、同じ原則を使用してレデューサー間で行を分散すると思います。

1つの整数列「a」を持つテーブルを作成し、そこに0から9までの数値を挿入しました。

次に、レデューサーの数を2に設定しまし set mapred.reduce.tasks = 2;た。

そしてselect、このテーブルのデータとCluster byselect * from my_tab cluster by a;

そして、私が期待した結果を受け取りました:

0
2
4
6
8
1
3
5
7
9

したがって、最初のレデューサー(番号0)は偶数になります(モード2は0を与えるため)

そして2番目のレデューサー(番号1)は奇数を取得しました(モード2は1を与えるため)

これが「DistributeBy」の仕組みです。

次に、「並べ替え」は各レデューサー内の結果を並べ替えます。

于 2014-07-25T15:43:44.633 に答える
2

ユースケース:大きなデータセットがある場合は、並べ替えのように並べ替えを行う必要があります。すべてのセットレデューサーは、一緒にクラブする前にデータを内部で並べ替えます。これにより、パフォーマンスが向上します。Order byの場合、すべてのデータが単一のレデューサーを通過するため、より大きなデータセットのパフォーマンスが低下します。これにより、負荷が増加し、クエリの実行に時間がかかります。以下の11ノードクラスターの例を参照してください。 ここに画像の説明を入力してください

これはOrderByの例の出力ですここに画像の説明を入力してください

これは並べ替えの例の出力ですここに画像の説明を入力してください

これはクラスター例ですここに画像の説明を入力してください

私が観察したのは、ソート、クラスター、ディストリビューションの数字同じですが、内部メカニズムは異なります。DISTRIBUTE BYの場合:同じ列の行が1つのレデューサーに送られます。DISTRIBUTE BY(City)-1列のバンガロールデータ、1つのレデューサーのデリーデータ:

ここに画像の説明を入力してください

于 2019-03-11T07:52:55.750 に答える
0

Cluster byは、グローバルではなく、レデューサーごとの並べ替えです。多くの本でも、それは間違ってまたは紛らわしく言及されています。これは、各部門を特定のレデューサーに分散してから、各部門の従業員名で並べ替え、クラスターを使用する部門の順序を気にせず、ワークロードがレデューサー間で分散されるため、パフォーマンスが向上する場合に特に使用されます。 。

于 2017-04-19T14:25:56.350 に答える
0

SortBy:範囲が重複するN個以上の並べ替えられたファイル。

OrderBy:単一の出力、つまり完全に順序付けられています。

Distribute By:Distribute By N個のレデューサーのそれぞれを保護することにより、列の重複しない範囲を取得しますが、各レデューサーの出力をソートしません。

詳細については、http://commandstech.com/hive-sortby-vs-orderby-vs-distributeby-vs-clusterby/

ClusterBy:上記と同じ例を参照してください。ClusterByxを使用すると、2つのレデューサーがx上の行をさらに並べ替えます。

于 2019-02-02T16:17:36.940 に答える
0

正しく理解できたら

1.並べ替え-レデューサー内のデータのみを並べ替えます

2.order by-データセット全体を単一のレデューサーにプッシュすることにより、グローバルに注文します。大量のデータ(スキュー)がある場合、このプロセスには多くの時間がかかります。

  1. cluster by-キーハッシュによってレデューサーにインテリジェントにデータを分散し、並べ替えを行いますが、グローバルな順序付けは許可されません。1つのキー(k1)を2つのレデューサーに配置できます。最初のレデューサーは10KK1データを取得し、2番目のレデューサーは1Kk1データを取得する可能性があります。
于 2021-07-29T05:37:06.830 に答える