私の PySpark アプリケーションには、次の 2 つの RDD があります。
items - すべての有効なアイテムのアイテム ID とアイテム名が含まれます。約100000アイテム。
attributeTable - これには、フィールド ユーザー ID、アイテム ID、およびこの組み合わせの属性値がこの順序で含まれます。これらは、システム内のユーザーとアイテムの組み合わせごとに特定の属性です。このRDDには、数百から数千の行があります。
item RDD の有効なアイテム ID (または名前) に対応しない attributeTable RDD のすべての行を破棄したいと考えています。つまり、アイテム ID による半結合です。たとえば、これらが R データ フレームの場合、semi_join(attributeTable, items, by="itemID") を実行します。
最初に次のアプローチを試しましたが、これが戻るまでに永遠に時間がかかることがわかりました (PC 上の VM で実行されているローカルの Spark インストールで)。膨大な数の比較が含まれているため、当然のことです。
# Create a broadcast variable of all valid item IDs for doing filter in the drivers
validItemIDs = sc.broadcast(items.map(lambda (itemID, itemName): itemID)).collect())
attributeTable = attributeTable.filter(lambda (userID, itemID, attributes): itemID in set(validItemIDs.value))
少しいじった後、次のアプローチがかなり高速に機能することがわかりました(私のシステムでは1分程度)。
# Create a broadcast variable for item ID to item name mapping (dictionary)
itemIdToNameMap = sc.broadcast(items.collectAsMap())
# From the attribute table, remove records that don't correspond to a valid item name.
# First go over all records in the table and add a dummy field indicating whether the item name is valid
# Then, filter out all rows with invalid names. Finally, remove the dummy field we added.
attributeTable = (attributeTable
.map(lambda (userID, itemID, attributes): (userID, itemID, attributes, itemIdToNameMap.value.get(itemID, 'Invalid')))
.filter(lambda (userID, itemID, attributes, itemName): itemName != 'Invalid')
.map(lambda (userID, itemID, attributes, itemName): (userID, itemID, attributes)))
これは私のアプリケーションでは十分に機能しますが、より汚い回避策のように感じられ、Spark でこれを行うための別のよりクリーンな、または慣用的に正しい (そしておそらくより効率的な) 方法が必要であると確信しています。何を提案しますか?私は Python と Spark の両方に慣れていないので、適切なリソースを教えていただければ、RTFM に関するアドバイスも役に立ちます。
私の Spark バージョンは 1.3.1 です。