3

PostgreSQL に同時にロードしたい多数のファイル (4000 以上) があります。それらを 4 つの異なるファイル リストに分けました。データをロードする各リストをスレッドで反復処理する必要があります。

私が抱えている問題は、 os.system を使用してロードプログラムを呼び出すことですが、これにより他のスレッドが同時に実行されなくなります。subprocess.Popen を使用すると、それらは同時に実行されますが、スレッドは実行が終了したと認識しているため、スクリプトの次の部分に進みます。

私はこれを正しい方法でやっていますか?または、スレッド内からサブプロセスを呼び出すより良い方法はありますか。

def thread1Load(self, thread1fileList):
    connectionstring = settings.connectionstring
    postgreshost = settings.postgreshost
    postgresdatabase = settings.postgresdatabase
    postgresport = settings.postgresport
    postgresusername = settings.postgresusername
    postgrespassword = settings.postgrespassword

    tablename = None
    encoding = None
    connection = psycopg2.connect(connectionstring)

    for filename in thread1fileList:
        load_cmd = #load command
        run = subprocess.Popen(load_cmd, shell=True)
    print "finished loading thread 1"


def thread2Load(self, thread2fileList):
    connectionstring = settings.connectionstring
    postgreshost = settings.postgreshost
    postgresdatabase = settings.postgresdatabase
    postgresport = settings.postgresport
    postgresusername = settings.postgresusername
    postgrespassword = settings.postgrespassword

    tablename = None

    connection = psycopg2.connect(connectionstring)
    for filename in thread2fileList:
        load_cmd = #load command            
        run = subprocess.Popen(load_cmd, shell=True)
    print "finished loading thread 2"


def thread3Load(self, thread3fileList):
    connectionstring = settings.connectionstring
    postgreshost = settings.postgreshost
    postgresdatabase = settings.postgresdatabase
    postgresport = settings.postgresport
    postgresusername = settings.postgresusername
    postgrespassword = settings.postgrespassword

    tablename = None
    connection = psycopg2.connect(connectionstring)

    for shapefilename in thread3fileList:
        load_cmd = #load command
        run = subprocess.Popen(load_cmd, shell=True)
    print "finished loading thread 3"

def thread4Load(self, thread4fileList):
    connectionstring = settings.connectionstring
    postgreshost = settings.postgreshost
    postgresdatabase = settings.postgresdatabase
    postgresport = settings.postgresport
    postgresusername = settings.postgresusername
    postgrespassword = settings.postgrespassword

    tablename = None

    connection = psycopg2.connect(connectionstring)

    for filename in thread4fileList:
        load_cmd = #load command
        run = subprocess.Popen(load_cmd, shell=True)

    print "finished loading thread 4"


def finishUp(self):
    print 'finishing up'


def main():
load = Loader()

thread1 = threading.Thread(target=(load.thread1Load), args=(thread1fileList, ))
thread2 = threading.Thread(target=(load.thread2Load), args=(thread2fileList, ))
thread3 = threading.Thread(target=(load.thread3Load), args=(thread3fileList, ))
thread4 = threading.Thread(target=(load.thread4Load), args=(thread4fileList, ))
threads = [thread1, thread2, thread3, thread4]
for thread in threads:
    thread.start()
    thread.join()


load.finishUp(connectionstring)

if __name__ == '__main__':
main()
4

1 に答える 1

7
  • 繰り返さないでください。1 つのthreadLoad方法で十分です。そうすれば、メソッド内の何かを変更する必要がある場合でも、4 つの異なる場所で同じ変更を行う必要はありません。
  • run.communicate()サブプロセスが完了するまでブロックするために使用します。
  • これにより、1 つのスレッドが開始され、そのスレッドが終了するまでブロックされてから、別のスレッドが開始されます。

    for thread in threads:
        thread.start()
        thread.join()
    

    代わりに、最初にすべてのスレッドを開始してから、すべてのスレッドに参加します。

    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()
    

import subprocess
import threading


class Loader(object):
    def threadLoad(self, threadfileList):
        connectionstring = settings.connectionstring
        ...
        connection = psycopg2.connect(connectionstring)

        for filename in threadfileList:
            load_cmd =  # load command
            run = subprocess.Popen(load_cmd, shell=True)
            # block until subprocess is done
            run.communicate()
        name = threading.current_thread().name
        print "finished loading {n}".format(n=name)

    def finishUp(self):
        print 'finishing up'


def main():
    load = Loader()
    threads = [threading.Thread(target=load.threadLoad, args=(fileList, ))
               for fileList in (thread1fileList, thread2fileList,
                                thread3fileList, thread4fileList)]
    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()

    load.finishUp(connectionstring)

if __name__ == '__main__':
    main()
于 2013-03-27T17:42:22.660 に答える