1

私は定期的にmysqlデータベースにデータを照会するpythonスクリプトを持っています(sqlalchemy 0.7.4を使用)。これは、ストアド プロシージャを実行することによって行われます。プロシージャが何かを返す場合、スクリプトはデータを処理しようとし (この部分は DB とは関係ありません)、2 番目のプロシージャを使用して結果を保存します。

この後、一定時間 (通常は 1 分間) スリープ状態になり、停止するまで再びスリープ状態になります。数週間実行できる必要があります。

「無効なトランザクションがロールバックされるまで再接続できません」というエラーがよく発生します。これについて見つけることができるあらゆる種類の情報を使用していくつかの変更を加えましたが、これが私が望むものを達成するための良い方法であるかどうか疑問に思っています:

from sqlalchemy import create_engine, exc
from sqlalchemy.orm import sessionmaker
from sqlalchemy import text, func
import time

class StoredProcedures():
    _engine = None
    _connection = None
    _session = None

    def __init__(self, cs):
        self._engine = create_engine(cs, encoding='utf-8', echo=False, pool_recycle=600)
        self._connection = self._engine.connect()
        Session = sessionmaker(bind=self._engine)
        self._session = Session()

    def sp_test_1(self, user_id):
        t = self._session.begin(subtransactions=True)

        try:
            query = 'call sp_get_files(%d)'%user_id
            result = self._session.execute(query).fetchall()
            t.close()
            return result
        except exc.DBAPIError, e: #Proper way of reconnecting?
            t.rollback()
            time.sleep(5)
            self._connection = self._engine.connect()
            Session = sessionmaker(bind=self._engine)
            self._session = Session()
        except:
            t.rollback()

        return None


cs = "mysql://test:test@127.0.0.1/test_db"
db_stored_procedures = StoredProcedures(cs)

while (True):
    files = db_stored_procedures.sp_test_1(1)
    if len(files) > 0:
        print "This is where processing happens"
        #And this is where the second procedure would be called to store the results
    time.sleep(15)

私はこれをテストしましたが、ほとんど書いたばかりなので、長期的なテストは行っていません。まずはご意見をいただきたいです。

EDIT: もともと私は接続を使用してクエリを実行しました(上記と同じスクリプトのほとんどを省略しました):

def sp_test_1(self, user_id):
    t = self._connection.begin()

    try:
        query = 'call sp_get_files(%d)'%user_id
        result = self._connection.execute(query).fetchall()
        t.close()
        return result
    except exc.DBAPIError, e:
        #same as above
    except:
        t.rollback()

    return None
4

1 に答える 1

0

内部で Transaction オブジェクトを使用する Session インターフェイスを使用しているため、独自のトランザクション管理を行う必要はないと思います。

シンプルなもの以外はあまり必要ないと思います:

def sp_test_1(self, user_id):

    query = 'call sp_get_files(%d)'%user_id
    result = self._session.execute(query).fetchall()
    return result

それが同じ例外を生成する場合は、完全なスタック トレースを投稿すると便利です。例外は友人であり、敵ではありません。:)

于 2013-02-19T13:19:23.893 に答える