いくつかの計算を並列化しようとしていますが、私のバージョンの 1 つ (より高速であるはずだと思っていた) がなぜより遅いのか理解できません。
要するに、userId のリスト (200 前後) と placeId のリスト (200 万前後) があります。各ペアのユーザー/場所のスコアを計算する必要があります。良い点は、計算が互いに完全に独立していることです (アルゴリズムの実装方法によっては、返される結果さえ必要ありません)。
これには2つのアプローチを試しました。
最初のアプローチ
- メインスレッドのすべての場所とすべてのユーザーをプルします
すべてのユーザーをループし、x スレッドを生成します (私の場合、私の小さな macbook 8 が最適なようです)
with cf.ThreadPoolExecutor(max_workers=8) as executor: futures = [executor.submit(task,userId, placeIds) for userId in userIds]
すべての先物が完了したら、それらすべてをループし、結果をデータベースに挿入します (ワーカー タスクはリスト [userId, placeId, score] を返します)。
すべての場所をループして結果を返すタスクがあります
def task(userId, placeIds): connection = pool.getconn() cursor = conn.cursor() #loop through all the places and call makeCalculation(cur, userId, placeId) pool.putconn(conn) return results
この紳士淑女は、ユーザー/場所のすべてのセットを 10 分で計算します (ちなみに、シーケンシャルでは 1.30 時間ではありません :))
しかし、私は..スコア計算を並列化しないのはなぜですか? したがって、タスクが 2000 のすべての場所を 1 つずつループする必要がある代わりに、たとえば、他の 8 つのスレッドで計算を生成します。
2 番目のアプローチ:
基本的に、このアプローチは「タスク」関数のループを次のように置き換えています。
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
futures = [ executor.submit(calculateScores,userId,placeId) for placeId in placeIds]
私がしなければならなかった他の変更は、calculateScores関数にあります
def calculateScores(userId,placeId):
**connection = pool.getconn()
cursor = connecton.cursor()**
...
make a bunch of calculation by calling the database 1 or 2 times
pool.putconn(conn)
return [userId, placeId, score]
ご覧のとおり、calculateScores 自体が 8 つの // スレッドになるため、データベース接続を共有できません。そうしないと、競合状態エラーが発生します (そして、スクリプトは 4 回中 3 回クラッシュします)。
このアプローチは、より高速になると思っていましたが、25分かかります.....(単純なforループでは10分ではなく...)
すべてのタスクがプールからデータベース接続を取得するようになり、これはどういうわけか非常に高価であるため、速度が低下するため、これが遅くなることは 90% 確信しています。
私のシナリオで並列化を最大限に活用するための最良の方法について誰かがアドバイスをくれますか?
タスクが結果を返すようにすることは良い考えですか? または、calculateScores関数で準備ができたらすぐにデータベースに挿入する必要がありますか?
ThreadPool 内に Threadpool を配置することをお勧めしますか?
マルチプロセスを実行する必要がありますか?
ありがとう!