2

プロセスを並行して実行していますが、書き込み先の CPU プロセスごとにデータベースを作成する必要があります。各サーバーに割り当てられたCPUと同じ数のデータベースのみが必要なので、後でマージできる3つのデータベースに100のジョブが書き込まれます。

各ワーカーを識別できるワーカー ID 番号またはコア ID はありますか?

def workerProcess(job):
  if workerDBexist(r'c:\temp\db\' + workerid):
    ##processjob into this database
  else:
    makeDB(r'c:\temp\db\' + workerid)
    ##first time this 'worker/ core' used, make DB then process

import pp
ppservers = ()
ncpus = 3
job_server = pp.Server(ncpus, ppservers=ppservers)

for work in 100WorkItems:
  job_server.submit(workerProcess, (work,))
4

1 に答える 1

2

私の知る限りpp、その API にはそのような機能はありません。

代わりに stdlib モジュールを使用すると、作業がずっと簡単になります。たとえば、各プロセスのデータベースを初期化するために使用できる引数multiprocessing.Poolを取り、initializer各タスクが使用できる変数として使用できます。

ただし、比較的簡単な回避策があります。

各プロセスには、(少なくとも実行中は) 一意のプロセス ID があります。* Python では、現在のプロセスのプロセス ID にos.getpid(). したがって、各タスクでは、次のようなことができます。

dbname = 'database{}'.format(os.getpid())

次にdbname、データベースを開く/作成するために使用します。dbm「データベース」とは、ファイル、sqlite3ファイル、MySQL サーバー上のデータベース、または何を意味するのかわかりません。たとえば、親で a を作成し、tempfile.TemporaryDirectoryそれをすべての子に渡し、それらos.path.joinを dbname にする必要がある場合があります (したがって、すべての子が完了したら、すべてを で取得できますos.listdir(the_temp_dir))。


これに関する問題pp.Serverは、プロセスの 1 つを再起動すると、データベースが 3 つではなく 4 つになることです。おそらく大した問題ではありませんが、コードでその可能性に対処する必要があります。(通常、IIRC はpp.Serverを渡さない限りプロセスを再起動しませんrestart=True、たとえばプロセスの 1 つがクラッシュした場合は再起動する場合があります。)

しかし、3 つのプロセスのプールを使用するのではなく、実際に各タスクをまったく新しいプロセスで実行している場合はどうなるでしょうか? そうなると、プロセスと同じ数のデータベースが作成されることになりますが、これはおそらく望んでいないことです。ここでの本当の問題は、3 つのプロセスのプールを使用していないことです。これを修正する必要があります。しかし、あなたが望むものを手に入れることができる他の方法はありますか? 多分。

たとえば、おそらくロックファイルとして、データベースごとに 1 つずつ、合計 3 つのロックを作成したとします。次に、各タスクは次の擬似コードを実行できます。

for i, lockfile in enumerate(lockfiles):
    try:
        with lockfile:
            do stuff with databases[i]
            break
    except AlreadyLockedError:
        pass
else:
    assert False, "oops, couldn't get any of the locks"

実際にデータベース自体を (群れや、関連するデータベースの API などを使用して) ロックできる場合は、さらに簡単です: そのうちの 1 つが成功するまで順番に接続を試みてください。

あなたのコードが実際に segfaulting などをしていない限り**、実際に一度に 3 つ以上のタスクを実行していなければ、3 つのロックファイルすべてをロックする方法はありません。


* これは正確ではありませんが、目的には十分です。たとえば、Windows では、各プロセスには一意HANDLEの があり、要求すると、まだ存在しない場合は生成されますpid一部の *nix では、各スレッドに固有のスレッドID があり、プロセスのpidスレッド ID は最初のスレッドのスレッド ID です。等々。しかし、コードからわかる限り、各プロセスには一意pidの があり、これが重要です。

** コードクラッシュしたとしても、それに対処することはできますが、それはもっと複雑です。たとえば、空のロックファイルの代わりに pidfile を使用します。pidfile の読み取りロックを取得してから、書き込みロックへのアップグレードを試みます。失敗した場合は、ファイルから pid を読み取り、そのようなプロセスが存在するかどうかを確認し (たとえば、*nix で発生した場合os.kill(pid, 0)、そのようなプロセスはありません)、存在する場合は強制的にロックを解除します。いずれにせよ、書き込みロックを取得したので、pid をファイルに書き込みます。

于 2013-12-18T23:01:22.893 に答える