7

複合キーと値の形式のタプルがたくさんあります。例えば、

tfile.collect() = [(('id1','pd1','t1'),5.0), 
     (('id2','pd2','t2'),6.0),
     (('id1','pd1','t2'),7.5),
     (('id1','pd1','t3'),8.1)  ]

id[1..n] または pd[1..n] に基づいて情報を集約できる、このコレクションに対して sql のような操作を実行したいと考えています。SQLContext を使用せずに、バニラの pyspark API を使用して実装したいと考えています。私の現在の実装では、一連のファイルから読み取り、RDD をマージしています。

def readfile():
    fr = range(6,23)
    tfile = sc.union([sc.textFile(basepath+str(f)+".txt")
                        .map(lambda view: set_feature(view,f)) 
                        .reduceByKey(lambda a, b: a+b)
                        for f in fr])
    return tfile

集約された配列を値として作成するつもりです。例えば、

agg_tfile = [((id1,pd1),[5.0,7.5,8.1])]

ここで、5.0,7.5,8.1 は [t1,t2,t3] を表します。私は現在、辞書を使用してバニラのpythonコードで同じことを達成しています。小さいデータセットでは問題なく機能します。しかし、これはより大きなデータセットに対応できない可能性があるため、心配しています。pyspark apis を使用して同じことを達成する効率的な方法はありますか?

4

2 に答える 2

13

私の推測では、複数のフィールドに従ってデータを転置したいと考えています。

簡単な方法は、グループ化するターゲット フィールドを連結し、それをペアの RDD のキーにすることです。例えば:

lines = sc.parallelize(['id1,pd1,t1,5.0', 'id2,pd2,t2,6.0', 'id1,pd1,t2,7.5', 'id1,pd1,t3,8.1'])
rdd = lines.map(lambda x: x.split(',')).map(lambda x: (x[0] + ', ' + x[1], x[3])).reduceByKey(lambda a, b: a + ', ' + b)
print rdd.collect()

次に、転置された結果が得られます。

[('id1, pd1', '5.0, 7.5, 8.1'), ('id2, pd2', '6.0')]
于 2015-04-08T13:10:25.063 に答える