42

私のアプリケーションは、スコープセッションとSQLALchemyの宣言型スタイルを使用しています。これはWebアプリであり、多くのDB挿入はCeleryタスクスケジューラであるによって実行されます。

通常、オブジェクトを挿入することを決定するとき、私のコードは次の行に沿って何かをするかもしれません:

from schema import Session
from schema.models import Bike

pk = 123 # primary key
bike = Session.query(Bike).filter_by(bike_id=pk).first()
if not bike: # no bike in DB
    new_bike = Bike(pk, "shiny", "bike")
    Session.add(new_bike)
    Session.commit()

ここでの問題は、これの多くが非同期ワーカーによって行われるため、ある作業がBikewithを挿入しても途中で動作しid=123、別の作業がその存在を確認している可能性があることです。この場合、2番目のワーカーは同じ主キーを持つ行を挿入しようとし、SQLAlchemyは。を生成しIntegrityErrorます。

私は私の人生のために交換することを除いてこの問題に対処するための良い方法を見つけることができませんSession.commit()

'''schema/__init__.py'''
from sqlalchemy.orm import scoped_session, sessionmaker
Session = scoped_session(sessionmaker())

def commit(ignore=False):
    try:
        Session.commit()
    except IntegrityError as e:
        reason = e.message
        logger.warning(reason)

        if not ignore:
            raise e

        if "Duplicate entry" in reason:
            logger.info("%s already in table." % e.params[0])
            Session.rollback()

そして、私が持っているすべての場所にSession.commitschema.commit(ignore=True)行が再び挿入されなくてもかまわない場所があります。

文字列チェックのため、これは非常に脆弱に思えます。参考までに、IntegrityErrorを上げると次のようになります。

(IntegrityError) (1062, "Duplicate entry '123' for key 'PRIMARY'")

ですからもちろん、私が挿入していた主キーは次のようなものでした。主キーが重複しているために実際にはそうではなかったものをDuplicate entry is a cool thing見逃す可能性があると思います。IntegrityError

私が使用しているクリーンなSQLAlchemyアプローチを維持するより良いアプローチはありますか(文字列などでステートメントを書き始めるのとは対照的です。..)

DbはMySQLです(ユニットテストではSQLiteを使用するのが好きで、新しいアプローチでその機能を妨げたくありません)。

乾杯!

4

6 に答える 6

38

session.merge(bike)の代わりにを使用するとsession.add(bike)、主キー エラーは生成されません。はbike、必要に応じて取得および更新または作成されます。

于 2012-07-23T21:22:35.863 に答える
10

すべてIntegrityError同じ方法で処理する必要があります。トランザクションをロールバックし、必要に応じて再試行してください。一部のデータベースでは、IntegrityError. 競合する 2 つのトランザクションの開始時に、テーブルのロック、またはデータベースで許可されている場合はより細かいロックを取得することもできます。

ステートメントを使用してwithトランザクションを明示的に開始し、自動的にコミット (または例外でロールバック) します。

from schema import Session
from schema.models import Bike

session = Session()
with session.begin():
    pk = 123 # primary key
    bike = session.query(Bike).filter_by(bike_id=pk).first()
    if not bike: # no bike in DB
        new_bike = Bike(pk, "shiny", "bike")
        session.add(new_bike)
于 2012-04-25T20:13:51.903 に答える
4

以下のコードを使用する必要がある代わりにsession.add(obj)、これははるかにクリーンになり、言及したようなカスタム コミット関数を使用する必要はありません。ただし、これは競合を無視しますが、重複キーだけでなく他のものも無視します。

mysql:

 self.session.execute(insert(self.table, values=values, prefixes=['IGNORE']))

sqlite

self.session.execute(insert(self.table, values=values, prefixes=['OR IGNORE']))
于 2015-09-19T18:54:12.970 に答える
3

ここでの主キーは何らかの形で自然なものであると想定しているため、通常の自動インクリメント手法に頼ることはできません。したがって、問題が実際には挿入する必要がある一意の列の 1 つであるとしましょう。これはより一般的です。

「挿入を試み、失敗時に部分的にロールバックする」ことが必要な場合は、SAVEPOINT を使用します。SQLAlchemy では begin_nested() です。次の rollback() または commit() は、その SAVEPOINT に対してのみ作用し、進行中のより大きなスパンでは作用しません。

ただし、全体として、ここでのパターンは本当に避けるべきパターンの 1 つにすぎません。ここで本当にやりたいことは、3 つのことのうちの 1 つです。1. 挿入する必要がある同じキーを処理する同時ジョブを実行しないでください。2. 作業中の同時キーで何らかの方法でジョブを同期し、3. 共通のサービスを使用して、この特定のタイプの新しいレコードを生成し、ジョブで共有します (または、ジョブが実行される前にそれらがすべて設定されていることを確認します)。

考えてみれば、#2 はいずれにしても高度な分離で行われます。2 つの postgres セッションを開始します。セッション 1:

test=> create table foo(id integer primary key);
NOTICE:  CREATE TABLE / PRIMARY KEY will create implicit index "foo_pkey" for table "foo"
CREATE TABLE
test=> begin;
BEGIN
test=> insert into foo (id) values (1);

セッション 2:

test=> begin;
BEGIN
test=> insert into foo(id) values(1);

PK #1 の行がロックされているため、セッション 2 がブロックされます。MySQL がこれを行うほどスマートかどうかはわかりませんが、それは正しい動作です。OTOH が別の PK を挿入しようとした場合:

^CCancel request sent
ERROR:  canceling statement due to user request
test=> rollback;
ROLLBACK
test=> begin;
BEGIN
test=> insert into foo(id) values(2);
INSERT 0 1
test=> \q

ブロックすることなく問題なく進行します。

要点は、この種の PK/UQ 競合を行っている場合、セロリ タスクはとにかく自分自身をシリアル化するか、少なくともシリアル化する必要があるということです。

于 2012-05-04T01:52:46.417 に答える