12

非常に簡単なように思われるものを実装するのに苦労しています:

私の目標は、2 番目の RDD/データフレームをルックアップ テーブルまたは翻訳辞書として使用して、RDD/データフレームで翻訳を行うことです。これらの翻訳を複数の列で行いたいです。

問題を説明する最も簡単な方法は、例です。入力として次の 2 つの RDD があるとします。

Route SourceCityID DestinationCityID
A     1            2
B     1            3
C     2            1

CityID CityName
1      London
2      Paris
3      Tokyo

私の希望する出力RDDは次のとおりです。

Route SourceCity DestinationCity
A     London     Paris
B     London     Tokyo
C     Paris      London

それを生産するにはどうすればよいですか?

これは SQL では簡単な問題ですが、Spark の RDD を使用した明白な解決策は知りません。joincogroupなどのメソッドは、複数列の RDD には適していないようで、結合する列を指定できません。

何か案は?SQLContext が答えですか?

4

2 に答える 2

7

rdd 方法:

routes = sc.parallelize([("A", 1, 2),("B", 1, 3), ("C", 2, 1) ])
cities = sc.parallelize([(1, "London"),(2, "Paris"), (3, "Tokyo")])


print routes.map(lambda x: (x[1], (x[0], x[2]))).join(cities) \
.map(lambda x: (x[1][0][1], (x[1][0][0], x[1][1]))).join(cities). \
map(lambda x: (x[1][0][0], x[1][0][1], x[1][1])).collect()

どちらが印刷されますか:

[('C', 'Paris', 'London'), ('A', 'London', 'Paris'), ('B', 'London', 'Tokyo')]

そしてSQLContextの方法:

from pyspark.sql import HiveContext
from pyspark.sql import SQLContext

df_routes = sqlContext.createDataFrame(\
routes, ["Route", "SourceCityID", "DestinationCityID"])
df_cities = sqlContext.createDataFrame(\
cities, ["CityID", "CityName"])

temp =  df_routes.join(df_cities, df_routes.SourceCityID == df_cities.CityID) \
.select("Route", "DestinationCityID", "CityName")
.withColumnRenamed("CityName", "SourceCity")

print temp.join(df_cities, temp.DestinationCityID == df_cities.CityID) \
.select("Route", "SourceCity", "CityName")
.withColumnRenamed("CityName", "DestinationCity").collect()

どちらが印刷されますか:

[Row(Route=u'C', SourceCity=u'Paris', DestinationCity=u'London'),
Row(Route=u'A', SourceCity=u'London', DestinationCity=u'Paris'),
Row(Route=u'B', SourceCity=u'London', DestinationCity=u'Tokyo')]
于 2015-10-13T08:32:44.207 に答える