4

私の PySpark アプリケーションには、次の 2 つの RDD があります。

  • items - すべての有効なアイテムのアイテム ID とアイテム名が含まれます。約100000アイテム。

  • attributeTable - これには、フィールド ユーザー ID、アイテム ID、およびこの組み合わせの属性値がこの順序で含まれます。これらは、システム内のユーザーとアイテムの組み合わせごとに特定の属性です。このRDDには、数百から数千の行があります。

item RDD の有効なアイテム ID (または名前) に対応しない attributeTable RDD のすべての行を破棄したいと考えています。つまり、アイテム ID による半結合です。たとえば、これらが R データ フレームの場合、semi_join(attributeTable, items, by="itemID") を実行します。

最初に次のアプローチを試しましたが、これが戻るまでに永遠に時間がかかることがわかりました (PC 上の VM で実行されているローカルの Spark インストールで)。膨大な数の比較が含まれているため、当然のことです。

# Create a broadcast variable of all valid item IDs for doing filter in the drivers
validItemIDs = sc.broadcast(items.map(lambda (itemID, itemName): itemID)).collect())
attributeTable = attributeTable.filter(lambda (userID, itemID, attributes): itemID in set(validItemIDs.value))

少しいじった後、次のアプローチがかなり高速に機能することがわかりました(私のシステムでは1分程度)。

# Create a broadcast variable for item ID to item name mapping (dictionary) 
itemIdToNameMap = sc.broadcast(items.collectAsMap())

# From the attribute table, remove records that don't correspond to a valid item name.
# First go over all records in the table and add a dummy field indicating whether the item name is valid
# Then, filter out all rows with invalid names. Finally, remove the dummy field we added.
attributeTable = (attributeTable
                  .map(lambda (userID, itemID, attributes): (userID, itemID, attributes, itemIdToNameMap.value.get(itemID, 'Invalid')))
                  .filter(lambda (userID, itemID, attributes, itemName): itemName != 'Invalid')
                  .map(lambda (userID, itemID, attributes, itemName): (userID, itemID, attributes)))

これは私のアプリケーションでは十分に機能しますが、より汚い回避策のように感じられ、Spark でこれを行うための別のよりクリーンな、または慣用的に正しい (そしておそらくより効率的な) 方法が必要であると確信しています。何を提案しますか?私は Python と Spark の両方に慣れていないので、適切なリソースを教えていただければ、RTFM に関するアドバイスも役に立ちます。

私の Spark バージョンは 1.3.1 です。

4

2 に答える 2