概要:
財務データ分析プロジェクト用に多数の変数 (幅広いデータセット) を作成しようとしています。140,000 行の pandas データフレーム「position_history」があり、それぞれに株と売買の日付/価格が含まれています。
入力 (stock、buy_date、sell_date) を受け取る関数 create_domain があります。
- 私のSQLite3データベースにクエリを作成して、それらの日付を指定してその株の時系列を抽出します。
- 時系列を使用して変数を構築します
df.apply を使用して、関数 create_domain を position_history に適用します
コードを順番に実行すると、変数を構築するのに約 4 時間かかります。これを何度も行う必要があり、より広いデータセットが必要になる可能性があるため、複数のプロセスを使用してこれを高速化したいと考えています。
複数のプロセスの場合、position_history を縦にチャンクに分割し、データフレームのリストを作成します。このリストを joblib (マルチプロセッシング) に渡します。私のコードはほとんどの場合、エラーをスローすることなく無期限にハングします (ただし、小さなサンプルで実行される場合もあります)。
ワーカー プロセスが同じ SQL テーブルを同時に読み込もうとしているために問題が発生しているのではないかと疑っています。
私は次の救済策を試しました:
関数 create_domain 内で新しい接続 (sqlalchemy.create_engine) を開いたので、各ワーカー プロセスは独自のエンジン/接続を取得します。
sqlalchemy のドキュメントに従ってください ( http://docs.sqlalchemy.org/en/latest/faq/connections.html#how-do-i-use-engines-connections-sessions-with-python-multiprocessing-or-os-fork )
- poolclass を NullPool に変更しました。これにより、1 つのエンジン内の複数の接続が無効になり、エンジンはトランザクションごとに新しい接続を強制的に開いたり閉じたりします。
- 念のため、各子に engine.dispose() を使用して、すべてのエンジン接続を確実にフラッシュし、新しい接続を取得しました。
- 注 - これらのソリューションは、sqlite3 がスレッド間で接続を共有しないようにすることを目的としており、これは機能しません (複数の Python プロセス間で SQLite データベースを共有するために SQLAlchemy を使用することは可能ですか? )
joblib の下 ( https://pythonhosted.org/joblib/parallel.html )
- 「マルチプロセッシング」の代わりに「スレッド」バックエンドを使用してみました。これは機能しましたが、コードの速度は大幅に向上しませんでした。ここでスレッド化について読んだことから(Multiprocessing vs Threading Python)、スレッド化では実際には複数のCPUを使用できないため、意味があります。
- pandasにはnumpyが含まれているため、memmapingテクニックを使用してみました。これは私にとって適切な修正ではなかったと思います
関連するスタックオーバーフロー エントリ:
- 説明したマルチスレッドではなく、複数のプロセスを使用したいと思います:
python multiple threaded processes for running executables ;
Python sqlite3 と同時実行性。
SQLite3 とマルチプロセッシング; *この最後の答えがわかりません
- 私は読み込もうとしていて、書き込もうとはしていないので、説明したように、SQLite ロック メカニズムの管理が必要かどうかはわかりません: SQLite は同時読み取りに適していますか? ; sqlite3 との同時書き込み
- この投稿では、理論的には機能するはずだと述べています。注 - 私は読んでいるだけなので、WAL モードは役に立たないと思います。
(擬似) コード スニペット
joblib への私の呼び出し:
x = Parallel(n_jobs =4)(delayed(create_domain)(chunk, other inputs) for chunk in chunks)
# where each chunk is a portion of the position_history df
私の create_domain 関数:
def create_domain (df=position_history, inputs):
# create vars using row x of position_history
f = lambda x: sql_query_and_create_vars(inputs, x['column'])
result = df.apply(f, axis=1)
return result
概要: コードが永久にハングアップし、カーネルがクラッシュし、エラーは発生しません。以下に関する洞察をいただければ幸いです。
- なぜこれが起こっているのですか?
- どうすれば修正できますか?
- 私がやろうとしていることを行うためのより良い方法はありますか? SQL クエリのベクトル化と最適化に最善を尽くしました。
- これは SQLite3 の問題ですか? MySQL のようなものがうまく機能しますか?
- 任意のヒント; コーディング/Python/データ サイエンスは初めてです。
詳細: 私はスーパー コンピューター クラスターで Linux を実行し、Python 3.4.3 で iPython を使用しています。
これは私の最初のスタック オーバーフローの質問です。