1

SQLAlchemy を使用してアクセスしている SQLite データベースの約 200,000 エントリに対してテキスト処理を実行しようとしています。並列化したいのですが (Parallel Python を見ています)、正確な方法がわかりません。

エントリが処理されるたびにセッションをコミットしたいので、スクリプトを停止する必要がある場合でも、既に行った作業が失われることはありません。ただし、 session.commit() コマンドをコールバック関数に渡そうとすると、うまくいかないようです。

from assignDB import *
from sqlalchemy.orm import sessionmaker
import pp, sys, fuzzy_substring

def matchIng(rawIng, ingreds):
maxScore = 0
choice = ""
for (ingred, parentIng) in ingreds.iteritems():
    score = len(ingred)/(fuzzy_substring(ingred,rawIng)+1)
    if score > maxScore:
        maxScore = score
        choice = ingred
        refIng = parentIng  
return (refIng, choice, maxScore)

def callbackFunc(match, session, inputTuple):
    print inputTuple
    match.refIng_id = inputTuple[0]
    match.refIng_name = inputTuple[1]
    match.matchScore = inputTuple[2]
    session.commit()

# tuple of all parallel python servers to connect with
ppservers = ()
#ppservers = ("10.0.0.1",)

if len(sys.argv) > 1:
    ncpus = int(sys.argv[1])
    # Creates jobserver with ncpus workers
    job_server = pp.Server(ncpus, ppservers=ppservers)
else:
    # Creates jobserver with automatically detected number of workers
    job_server = pp.Server(ppservers=ppservers)

print "Starting pp with", job_server.get_ncpus(), "workers"

ingreds = {}
for synonym, parentIng in session.query(IngSyn.synonym, IngSyn.parentIng): 
    ingreds[synonym] = parentIng

jobs = []
for match in session.query(Ingredient).filter(Ingredient.refIng_id == None):
    rawIng = match.ingredient
    jobs.append((match, job_server.submit(matchIng,(rawIng,ingreds),    (fuzzy_substring,),callback=callbackFunc,callbackargs=(match,session))))

セッションは からインポートされassignDBます。データベースを更新していないだけで、エラーは発生していません。

ご協力いただきありがとうございます。

更新 fuzzy_substring のコードは次のとおりです

def fuzzy_substring(needle, haystack):
    """Calculates the fuzzy match of needle in haystack,
    using a modified version of the Levenshtein distance
    algorithm.
    The function is modified from the levenshtein function
    in the bktree module by Adam Hupp"""
    m, n = len(needle), len(haystack)

    # base cases
    if m == 1:
        return not needle in haystack
    if not n:
        return m

    row1 = [0] * (n+1)
    for i in range(0,m):
        row2 = [i+1]
        for j in range(0,n):
            cost = ( needle[i] != haystack[j] )

            row2.append( min(row1[j+1]+1, # deletion
                               row2[j]+1, #insertion
                               row1[j]+cost) #substitution
                           )
        row1 = row2
    return min(row1)

ここから得たもの:Fuzzy Substring。私の場合、「針」は最大 8000 の可能な選択肢の 1 つですが、干し草の山は一致させようとしている生の文字列です。考えられるすべての「針」をループして、最高のスコアを持つものを選択します。

4

2 に答える 2

3

特定のコードを見なくても、次のように言えます。

  1. サーバーレス SQLite の使用と
  2. 並列処理による書き込みパフォーマンスの向上を求める

相容れない欲求です。SQLite FAQを引用します。

… ただし、クライアント/サーバー データベース エンジン (PostgreSQL、MySQL、Oracle など) は通常、より高いレベルの同時実行をサポートし、複数のプロセスが同じデータベースに同時に書き込むことを許可します。アクセスを調整するための適切に制御された単一のサーバー プロセスが常に存在するため、クライアント/サーバー データベースではこれが可能です。アプリケーションで多くの同時実行が必要な場合は、クライアント/サーバー データベースの使用を検討する必要があります。しかし、経験上、ほとんどのアプリケーションは、設計者が想像するよりもはるかに少ない並行性を必要とすることが示唆されています。…</p>

そしてそれは、SQLAlchemy が使用するゲーティングと順序付けがなくてもです。また、Parallel Python ジョブが完了したとしても、それがいつ完了するかはまったくわかりません。

私の提案:最初に正しく動作するようにしてから、最適化を探します。特に、ppたとえ完璧に機能していたとしても、秘密のソースがまったくあなたを買っていないかもしれない場合.

コメントに応じて追加:

マッチングがボトルネックである場合fuzzy_substring、それはデータベース アクセスから完全に切り離されているように見えるので、その点に留意する必要があります。何が行われているかを確認しなくても、シングルスレッド プログラミングを計算的に実行可能にするアルゴリズムの改善を行うことができるというのが、最初の前提として適切fuzzy_substringです。近似文字列マッチングは非常によく研究されている問題であり、適切なアルゴリズムを選択することは、多くの場合、「より多くのプロセッサを投入する」よりもはるかに優れています。

この意味では、よりクリーンなコードが得られ、問題のセグメント化と再アセンブルのオーバーヘッドを無駄にせず、より拡張可能でデバッグ可能なプログラムが最終的に得られます。

于 2012-07-15T10:05:26.320 に答える
0

@msw は問題の優れた概要を提供し、並列化について考える一般的な方法を提供しています。

これらのコメントにもかかわらず、これが私が最終的に取り組むことになったものです:

from assignDB import *
from sqlalchemy.orm import sessionmaker
import pp, sys, fuzzy_substring  

def matchIng(rawIng, ingreds):
    maxScore = 0
    choice = ""
    for (ingred, parentIng) in ingreds.iteritems():
        score = len(ingred)/(fuzzy_substring(ingred,rawIng)+1)
        if score > maxScore:
            maxScore = score
            choice = ingred
            refIng = parentIng  
    return (refIng, choice, maxScore)

# tuple of all parallel python servers to connect with
ppservers = ()
#ppservers = ("10.0.0.1",)

if len(sys.argv) > 1:
    ncpus = int(sys.argv[1])
    # Creates jobserver with ncpus workers
    job_server = pp.Server(ncpus, ppservers=ppservers)
else:
    # Creates jobserver with automatically detected number of workers
    job_server = pp.Server(ppservers=ppservers)

print "Starting pp with", job_server.get_ncpus(), "workers"

ingreds = {}
for synonym, parentIng in session.query(IngSyn.synonym, IngSyn.parentIng): 
    ingreds[synonym] = parentIng

rawIngredients = session.query(Ingredient).filter(Ingredient.refIng_id == None)
numIngredients = session.query(Ingredient).filter(Ingredient.refIng_id == None).count()
stepSize = 30

for i in range(0, numIngredients, stepSize):
    print i
    print numIngredients

    if i + stepSize > numIngredients:
        stop = numIngredients
    else:
        stop = i + stepSize

    jobs = []
    for match in rawIngredients[i:stop]:
        rawIng = match.ingredient
        jobs.append((match, job_server.submit(matchIng,(rawIng,ingreds),    (fuzzy_substring,))))

    job_server.wait()

    for match, job in jobs:
        inputTuple = job()
        print match.ingredient
        print inputTuple
        match.refIng_id = inputTuple[0]
        match.refIng_name = inputTuple[1]
        match.matchScore = inputTuple[2]
    session.commit()

基本的に、私は問題をチャンクに切り刻みました。30 個の部分文字列を並行して照合した後、結果が返され、データベースにコミットされます。30 という数字を任意に選択したので、その数を最適化することでメリットが得られる可能性があります。現在、プロセッサの 3 つ (!) のコアすべてを使用しているため、かなり高速化されたようです。

于 2012-07-15T18:49:27.583 に答える