csv ファイルがあります。私は pyspark で DataFrame(df) に変換します。いくつかの変換の後; df に列を追加したい。これは単純な行 ID (0 または 1 から N まで) である必要があります。
rdd で df を変換し、「zipwithindex」を使用します。結果のrddをdfに変換しました。このアプローチは機能しますが、250k のタスクが生成され、実行に多くの時間がかかります。実行時間を短縮する他の方法があるかどうか疑問に思っていました。
以下は私のコードのスニペットです。私が処理しているcsvファイルはBIGです。何十億もの行が含まれています。
debug_csv_rdd = (sc.textFile("debug.csv")
.filter(lambda x: x.find('header') == -1)
.map(lambda x : x.replace("NULL","0")).map(lambda p: p.split(','))
.map(lambda x:Row(c1=int(x[0]),c2=int(x[1]),c3=int(x[2]),c4=int(x[3]))))
debug_csv_df = sqlContext.createDataFrame(debug_csv_rdd)
debug_csv_df.registerTempTable("debug_csv_table")
sqlContext.cacheTable("debug_csv_table")
r0 = sqlContext.sql("SELECT c2 FROM debug_csv_table WHERE c1 = 'str'")
r0.registerTempTable("r0_table")
r0_1 = (r0.flatMap(lambda x:x)
.zipWithIndex()
.map(lambda x: Row(c1=x[0],id=int(x[1]))))
r0_df=sqlContext.createDataFrame(r0_2)
r0_df.show(10)