2

ゴール:

  • スレッドまたはプロセスで SQLAlchemy を使用してデータベースで最大 40 個の巨大なクエリを実行し、対応する SQLA ResultProxiesを Queue.Queue に入れます ( multiprocessing.Managerによって処理されます)
  • 同時に、そのキューを消費する多数のプロセスを含む .csv ファイルに結果を書き込みます。

現在の状態:

  • クエリを実行してデータを書き込む QueryThread クラスと WriteThread クラス。クエリの実行には時間がかかるため、GIL がスレッドを処理する方法によるパフォーマンスの大幅な低下はありません。
  • 一方、ファイルの書き込みには永遠に時間がかかります。実際、元のアイデアは WriteThread クラスの複数のスレッドを実行することでしたが、最高のパフォーマンスは 1 つのスレッドで得られます。

したがって、マルチプロセッシングを使用するというアイデア。出力を同時に書き込み、CPU バウンドではなく I/O バウンドにしたいと考えています。

背景はさておき、ここに問題があります (これは基本的に設計の問題です)。マルチプロセッシング ライブラリは、オブジェクトをピクルしてから、生成された他のプロセスにデータをパイプすることによって機能します。しかし、WriteWorker プロセスで使用しようとしている ResultProxy オブジェクトと共有キューは pickle 化できないため、次のメッセージが表示されます (逐語的ではありませんが、十分に近いものです)。

pickle.PicklingError: Can't pickle object in WriteWorker.start()

親切な人々への質問は、この問題を回避する潜在的な設計パターンまたはアプローチに関するアイデアはありますか? これは単純で古典的な生産者と消費者の問題のように思えます。解決策は簡単だと思いますが、考えすぎているだけです。

ヘルプやフィードバックをいただければ幸いです。ありがとう :)

編集:ここに関連するコードのスニペットがあります。他に提供できるコンテキストがあれば教えてください

親クラスから:

#init manager and queues
self.manager = multiprocessing.Manager()
self.query_queue = self.manager.Queue()
self.write_queue = self.manager.Queue()


def _get_data(self):
    #spawn a pool of query processes, and pass them query queue instance
    for i in xrange(self.NUM_QUERY_THREADS):
        qt = QueryWorker.QueryWorker(self.query_queue, self.write_queue, self.config_values, self.args)
        qt.daemon = True
        # qt.setDaemon(True)
        qt.start()

    #populate query queue
    self.parse_sql_queries()

    #spawn a pool of writer processes, and pass them output queue instance
    for i in range(self.NUM_WRITE_THREADS):
        wt = WriteWorker.WriteWorker(self.write_queue, self.output_path, self.WRITE_BUFFER, self.output_dict)
        wt.daemon = True
        # wt.setDaemon(True)
        wt.start()

    #wait on the queues until everything has been processed
    self.query_queue.join()
    self.write_queue.join()

および QueryWorker クラスから:

def run(self):
    while True:
        #grabs host from query queue
        query_tupe = self.query_queue.get()
        table =  query_tupe[0]
        query = query_tupe[1]
        query_num = query_tupe[2]
        if query and table:
            #grab connection from pool, run the query
            connection = self.engine.connect()
            print 'Running query #' + str(query_num) + ': ' + table
            try:
                result = connection.execute(query)
            except:
                print 'Error while running query #' + str(query_num) + ': \n\t' + str(query) + '\nError: '  + str(sys.exc_info()[1])

            #place result handle tuple into out queue
            self.out_queue.put((table, result))

        #signals to queue job is done
        self.query_queue.task_done()
4

1 に答える 1

1

簡単な答えは、ResultsProxy を直接使用しないことです。代わりに、cursor.fetchall() または cursor.fetchmany(number_to_fetch) を使用して ResultsProxy からデータを取得し、そのデータをマルチプロセッシング キューに渡します。

于 2012-11-28T17:13:55.560 に答える