5

こんにちは、すべての Python Pandas 達人です。Python と並行して SQL を実行し、いくつかの Pandas データフレームを返す方法を探しています。MS SQL サーバー データベースに対して 4 つの SQL クエリを連続して実行する、以下のようなコードがあります。クエリのうち 2 つは、結果を取得するための IO (ネットワーク) 時間に対して実行時間がはるかに長いため、並列化するとコードの実行が最大 2 倍速くなると考えています。クエリを並行して実行する簡単な方法はありますか?

理想的には、プロジェクトのサブディレクトリにあるすべての *.sql ファイルを読み取ってから、クエリを起動して並列に実行し、4 つのデータフレームを使いやすい形式 (リスト?) で返し、さらに詳しく説明できるようにしたいと考えています。操作 (インデックス作成、結合、集計)。

前もって感謝します、ランドール

# imports
import ceODBC
import numpy as np
import pandas as pd
import pandas.io.sql as psql
from ConfigParser import ConfigParser  
import os
import glob

# db connection string
cnxn = 'DRIVER={SQL Server Native Client 11.0}; SERVER=<servername>; DATABASE=<dname>; Trusted_Connection=Yes'

# directories (also should be moved to config)
dataDir = os.getcwd() + '\\data\\'
sqlDir = os.getcwd() + '\\sql\\'

# read sql from external .sql files. Possible to read all *.sql files in a sql dir into a list (or other structure...)?
with open(sqlDir + 'q1.sql', 'r') as f: q1sql = f.read()
with open(sqlDir + 'q2.sql', 'r') as f: q2sql = f.read()
with open(sqlDir + 'q3.sql', 'r') as f: q3sql = f.read()
with open(sqlDir + 'q4.sql', 'r') as f: q4sql = f.read()

# Connect to db, run SQL, assign result into dataframe, close connection. 
cnxn = ceODBC.connect(cnxn)
cursor = cnxn.cursor()

# execute the queries and close the connection. Parallelize?
df1 = psql.frame_query(q1sql, cnxn)
df2 = psql.frame_query(q2sql, cnxn) 
df3 = psql.frame_query(q3sql, cnxn)
df4 = psql.frame_query(q4sql, cnxn) 

# close connection
cnxn.close()
4

1 に答える 1

3

N 個のスレッドで N 個の接続を使用します。次に、広告を結合して結果を処理します。

# imports
import ceODBC
import numpy as np
import pandas as pd
import pandas.io.sql as psql
from ConfigParser import ConfigParser  
import os
import glob
import threading
enter code here


# db connection string
cnxn_string = 'DRIVER={SQL Server Native Client 11.0}; SERVER=<servername>; DATABASE=<dname>; Trusted_Connection=Yes'

# directories (also should be moved to config)
dataDir = os.getcwd() + '\\data\\'
sqlDir = os.getcwd() + '\\sql\\'

#variable to store results
responses={}
responses_lock=threading.Lock()

maxconnections = 8
pool_sema = BoundedSemaphore(value=maxconnections)


def task(fname):

    with open(fname, 'r') as f: sql = f.read()

    # Connect to db, run SQL, assign result into dataframe, close connection. 
    # to limit connections on DB used semaphore
    pool_sema.acquire()
    cnxn = ceODBC.connect(cnxn_string)
    cursor = cnxn.cursor()
    # execute the queries and close the connection. Parallelize?
    df = psql.frame_query(sql, cnxn)
    # close connection
    cnxn.close()
    pool_sema.release()

    # to ensure that only one thread can modify global variable
    responses_lock.acquire()
    responses[fname] = df
    responses_lock.release()


pool = []

#find sql files and spawn theads
for fname im glob.glob( os.path.join(sqlDir,'*sql')):
    #create new thread with task
    thread = threading.Thread(target=task,args=(fname,))
    thread.daemon = True
    # store thread in pool 
    pool.append(thread)
    #thread started
    thread.start()

#wait for all threads tasks done
for thread in pool:
    thread.join()

# results of each execution stored in responses dict

各ファイルは個別のスレッドで実行されます。結果は 1 つの変数に格納されます。

withステートメントを含む関数と同等:

def task(fname):

    with open(fname, 'r') as f: sql = f.read()

    # Connect to db, run SQL, assign result into dataframe, close connection. 
    # to limit connections on DB used semaphore
    with pool_sema:
        cnxn = ceODBC.connect(cnxn_string)
        cursor = cnxn.cursor()
        # execute the queries and close the connection. Parallelize?
        df = psql.frame_query(sql, cnxn)
        # close connection
        cnxn.close()


    # to ensure that only one thread can modify global variable
    with responses_lock:
        responses[fname] = df

multiprocessing.Pool重いタスクを分散するのは簡単ですが、それ自体でより多くの IO 操作があります。

于 2013-07-28T22:51:39.987 に答える