1

私はHiveでPyspark 1.2.1を使用しています。(アップグレードはすぐには行われません)。

私が抱えている問題は、Hive テーブルから選択してインデックスを追加すると、pyspark が long 値を int に変更するため、Long 型の列を持つが Integer 型の値を持つ一時テーブルになってしまうことです。(以下のコードを参照)。

私の質問は次のとおりです。(a) long を int に変更せずにインデックスのマージ (コードを参照) を実行するにはどうすればよいですか。または(b)問題を回避する他の方法でインデックスを追加します。または(c)結合する必要なくテーブルの列をランダム化しますか?

私が解決しようとしている根本的な問題は、ハイブ テーブル内の特定の列の順序をランダム化し、それを新しいテーブルに書き込みたいということです。これは、データが個人を特定できないようにするためです。元のテーブルとランダム化された列に増分インデックスを追加し、そのインデックスに結合することでそれを行っています。

テーブルは次のようになります。

primary | longcolumn | randomisecolumn

コードは次のとおりです。

hc = HiveContext(sc)
orig = hc.sql('select * from mytable')
widx = orig.zipWithIndex().map(merge_index_on_row)
sql_context.applySchema(widx, add_index_schema(orig.schema()))
        .registerTempTable('sani_first')

# At this point sani_first has a column longcolumn with type long,
# but (many of) the values are ints

def merge_index_on_row((row, idx), idx_name=INDEX_COL):
    """
    Row is a SchemaRDD row object; idx is an integer;
    schema is the schema for row with an added index col at the end
    returns a version of row applying schema and holding the index in the new row
    """
    as_dict = row.asDict()
    as_dict[idx_name] = idx
    return Row(**as_dict)

def add_index_schema(schema):
    """
    Take a schema, add a column for an index, return the new schema
    """
    return StructType(sorted(schema.fields + [StructField(INDEX_COL, IntegerType(), False)],key=lambda x:x.name))

より良い解決策がない場合、影響を受ける列を Python コードで long 型に強制します。これは...良くない。

4

0 に答える 0