PySparkでRDDから行を削除するにはどうすればよいですか? 特に最初の行は、データセットに列名が含まれる傾向があるためです。API を熟読しても、これを行う簡単な方法が見つからないようです。もちろん、Bash / HDFS 経由でこれを行うこともできますが、PySpark 内からこれを行うことができるかどうかを知りたいだけです。
6 に答える
私の知る限り、これを行う「簡単な」方法はありません。
ただし、これでうまくいくはずです:
val header = data.first
val rows = data.filter(line => line != header)
Python 3 を使用していると仮定して、PySpark (Python API) でこれを実現する簡単な方法:
noHeaderRDD = rawRDD.zipWithIndex().filter(lambda row_index: row_index[1] > 0).keys()
私はさまざまなソリューションでいくつかのプロファイリングを行い、次のものを持っています
クラスタ構成
クラスター
- クラスター 1: 4 コア 16 GB
- クラスター 2 : 4 コア 16 GB
- クラスター 3 : 4 コア 16 GB
- クラスタ 4 : 2 コア 8 GB
データ
700 万行、4 列
#Solution 1
# Time Taken : 40 ms
data=sc.TextFile('file1.txt')
firstRow=data.first()
data=data.filter(lambda row:row != firstRow)
#Solution 2
# Time Taken : 3 seconds
data=sc.TextFile('file1.txt')
def dropFirstRow(index,iterator):
return iter(list(iterator)[1:]) if index==0 else iterator
data=data.mapPartitionsWithIndex(dropFirstRow)
#Solution 3
# Time Taken : 0.3 seconds
data=sc.TextFile('file1.txt')
def dropFirstRow(index,iterator):
if(index==0):
for subIndex,item in enumerate(iterator):
if subIndex > 0:
yield item
else:
yield iterator
data=data.mapPartitionsWithIndex(dropFirstRow)
ソリューション 3 が最もスケーラブルだと思います
個人的には、フィルターを使用してこのようなものを取り除くのが最も簡単な方法だと思います. しかし、あなたのコメントによると、別のアプローチがあります。各パーティションが配列になるようにRDDをグロムし(パーティションごとに1つのファイルがあり、各ファイルには問題のある行が一番上にあると仮定しています)、最初の要素をスキップします(これはscala APIを使用しています)。
data.glom().map(x => for (elem <- x.drop(1){/*do stuff*/}) //x is an array so just skip the 0th index
RDD の大きな特徴の 1 つは不変であることを覚えておいてください。
更新:
より良い解決策。
rdd.mapPartions(x => for (elem <- x.drop(1){/*do stuff*/} )
グロムと同じですが、この場合 x は反復子であるため、すべてを配列に入れるオーバーヘッドはありません。