Python/SQLAlchemy アプリのプロファイリングの経験がある人はいますか? また、ボトルネックや設計上の欠陥を見つける最善の方法は何ですか?
データベース層が SQLAlchemy によって処理される Python アプリケーションがあります。このアプリケーションはバッチ設計を使用しているため、多数のデータベース リクエストが限られた時間内で順次実行されます。現在、実行に時間がかかりすぎるため、最適化が必要です。ORM 機能は使用せず、データベースは PostgreSQL です。
Python/SQLAlchemy アプリのプロファイリングの経験がある人はいますか? また、ボトルネックや設計上の欠陥を見つける最善の方法は何ですか?
データベース層が SQLAlchemy によって処理される Python アプリケーションがあります。このアプリケーションはバッチ設計を使用しているため、多数のデータベース リクエストが限られた時間内で順次実行されます。現在、実行に時間がかかりすぎるため、最適化が必要です。ORM 機能は使用せず、データベースは PostgreSQL です。
単純な SQL ロギング (python のロギング モジュールまたはecho=True
引数 onを介して有効化create_engine()
) だけで、処理にどれくらいの時間がかかっているかがわかります。たとえば、SQL 操作の直後に何かをログに記録すると、ログに次のように表示されます。
17:37:48,325 INFO [sqlalchemy.engine.base.Engine.0x...048c] SELECT ...
17:37:48,326 INFO [sqlalchemy.engine.base.Engine.0x...048c] {<params>}
17:37:48,660 DEBUG [myapp.somemessage]
myapp.somemessage
操作の直後に ログインした場合は、SQL 部分を完了するのに 334 ミリ秒かかったことがわかります。
ログ SQL は、数十または数百のクエリが発行されているかどうかも示します。これは、結合を介してより少ないクエリに整理することができます。contains_eager()
SQLAlchemy ORM を使用する場合、このアクティビティを部分的に ( ) または完全に ( eagerload()
, ) 自動化するために「eager loading」機能が提供されますがeagerload_all()
、ORM がなければ、複数のテーブルの結果を 1 つの結果セットにロードできるように結合を使用するだけです。深さが追加されるにつれてクエリの数を増やす代わりに(つまりr + r*r2 + r*r2*r3
...)
ログ記録で個々のクエリに時間がかかりすぎることが明らかになった場合、データベース内でクエリの処理、ネットワーク経由での結果の送信、DBAPI による処理、そして最終的に SQLAlchemy の結果セットによる受信に費やされた時間の内訳が必要になります。および/またはORMレイヤー。これらの各段階では、詳細に応じて、独自のボトルネックが発生する可能性があります。
そのためには、cProfile や hotshot などのプロファイリングを使用する必要があります。ここに私が使用するデコレータがあります:
import cProfile as profiler
import gc, pstats, time
def profile(fn):
def wrapper(*args, **kw):
elapsed, stat_loader, result = _profile("foo.txt", fn, *args, **kw)
stats = stat_loader()
stats.sort_stats('cumulative')
stats.print_stats()
# uncomment this to see who's calling what
# stats.print_callers()
return result
return wrapper
def _profile(filename, fn, *args, **kw):
load_stats = lambda: pstats.Stats(filename)
gc.collect()
began = time.time()
profiler.runctx('result = fn(*args, **kw)', globals(), locals(),
filename=filename)
ended = time.time()
return ended - began, load_stats, locals()['result']
コードのセクションをプロファイリングするには、デコレータを使用して関数に配置します。
@profile
def go():
return Session.query(FooClass).filter(FooClass.somevalue==8).all()
myfoos = go()
プロファイリングの出力を使用して、どこで時間が費やされているかを知ることができます。たとえば、すべての時間が 内で費やされている場合cursor.execute()
、それはデータベースへの低レベルの DBAPI 呼び出しであり、インデックスを追加するか、クエリおよび/または基になるスキーマを再構築することにより、クエリを最適化する必要があることを意味します。そのタスクについては、pgadmin とそのグラフィカルな EXPLAIN ユーティリティを使用して、クエリがどのような作業を行っているかを確認することをお勧めします。
行のフェッチに関連する呼び出しが何千もある場合は、クエリが予想よりも多くの行を返している可能性があります。不完全な結合の結果としてのデカルト積がこの問題を引き起こす可能性があります。さらに別の問題は、型処理に費やされる時間です。SQLAlchemy 型などはUnicode
、バインド パラメータと結果列で文字列のエンコード/デコードを実行しますが、これはすべての場合に必要とは限りません。
プロファイルの出力は少し難しいかもしれませんが、練習すれば非常に読みやすくなります。メーリング リストで速度が遅いと主張する人がいたことがあります。彼にプロファイルの結果を投稿してもらったところ、速度の問題はネットワークの遅延によるものであることがわかりました。メソッドは非常に高速でしたが、ほとんどの時間は socket.receive() に費やされました。
あなたが野心的だと感じているなら、http: //www.sqlalchemy.org/trac/browser/sqlalchemy/trunk/test/aaa_profiling を調べてみると、SQLAlchemy 単体テスト内の SQLAlchemy プロファイリングのより複雑な例もあります。そこでは、特定の操作に使用されるメソッド呼び出しの最大数をアサートするデコレーターを使用したテストがあり、非効率的なものがチェックインされた場合、テストでそれが明らかになります (Python では、関数呼び出しが最も高いことに注意することが重要です)。あらゆる操作のオーバーヘッドがあり、呼び出し回数は多くの場合、費やされた時間にほぼ比例します)。注目すべきは、数式から DBAPI のオーバーヘッドを切り取る凝った「SQL キャプチャ」スキームを使用する「zoomark」テストです。
SQLAlchemy wikiに非常に役立つプロファイリング レシピがあります。
いくつかの小さな変更を加えて、
from sqlalchemy import event
from sqlalchemy.engine import Engine
import time
import logging
logging.basicConfig()
logger = logging.getLogger("myapp.sqltime")
logger.setLevel(logging.DEBUG)
@event.listens_for(Engine, "before_cursor_execute")
def before_cursor_execute(conn, cursor, statement,
parameters, context, executemany):
context._query_start_time = time.time()
logger.debug("Start Query:\n%s" % statement)
# Modification for StackOverflow answer:
# Show parameters, which might be too verbose, depending on usage..
logger.debug("Parameters:\n%r" % (parameters,))
@event.listens_for(Engine, "after_cursor_execute")
def after_cursor_execute(conn, cursor, statement,
parameters, context, executemany):
total = time.time() - context._query_start_time
logger.debug("Query Complete!")
# Modification for StackOverflow: times in milliseconds
logger.debug("Total Time: %.02fms" % (total*1000))
if __name__ == '__main__':
from sqlalchemy import *
engine = create_engine('sqlite://')
m1 = MetaData(engine)
t1 = Table("sometable", m1,
Column("id", Integer, primary_key=True),
Column("data", String(255), nullable=False),
)
conn = engine.connect()
m1.create_all(conn)
conn.execute(
t1.insert(),
[{"data":"entry %d" % x} for x in xrange(100000)]
)
conn.execute(
t1.select().where(t1.c.data.between("entry 25", "entry 7800")).order_by(desc(t1.c.data))
)
出力は次のようなものです。
DEBUG:myapp.sqltime:Start Query:
SELECT sometable.id, sometable.data
FROM sometable
WHERE sometable.data BETWEEN ? AND ? ORDER BY sometable.data DESC
DEBUG:myapp.sqltime:Parameters:
('entry 25', 'entry 7800')
DEBUG:myapp.sqltime:Query Complete!
DEBUG:myapp.sqltime:Total Time: 410.46ms
次に、異常に遅いクエリを見つけた場合は、クエリ文字列を取得し、パラメーターでフォーマットし (%
少なくとも psycopg2 の場合、文字列フォーマット演算子を実行できます)、「EXPLAIN ANALYZE」というプレフィックスを付けて、クエリ プランの出力を押し込みます。http://explain.depesz.com/ ( PostgreSQL のパフォーマンスに関するこの優れた記事で見つかりました)
cprofile を使用して、runsnakerun で結果を確認することに成功しました。これにより、少なくとも、どの関数と呼び出しに時間がかかるか、データベースに問題があるかどうかがわかりました。ドキュメントはこちらです。wxpython が必要です。そのプレゼンテーションは、始めるのに適しています。
そのように簡単
import cProfile
command = """foo.run()"""
cProfile.runctx( command, globals(), locals(), filename="output.profile" )
それで
python runsnake.py output.profile
クエリの最適化を検討している場合は、postgrsql プロファイリングが必要になります。
クエリを記録するためにログを記録することも価値がありますが、実行時間の長いクエリを取得するために私が知っているパーサーはありません (また、同時要求には役に立ちません)。
sqlhandler = logging.FileHandler("sql.log")
sqllogger = logging.getLogger('sqlalchemy.engine')
sqllogger.setLevel(logging.info)
sqllogger.addHandler(sqlhandler)
create engine ステートメントに echo = True があることを確認します。
私がやったとき、実際には私のコードが主な問題だったので、cprofile が役に立ちました。
プロファイリングするクエリ時間のみの場合は、コンテキスト マネージャーを使用して、特定のコンテキストで実行されたすべてのクエリをログに記録できます。
"""SQLAlchemy Query profiler and logger."""
import logging
import time
import traceback
import sqlalchemy
class QueryProfiler:
"""Log query duration and SQL as a context manager."""
def __init__(self,
engine: sqlalchemy.engine.Engine,
logger: logging.Logger,
path: str):
"""
Initialize for an engine and logger and filepath.
engine: The sqlalchemy engine for which events should be logged.
You can pass the class `sqlalchemy.engine.Engine` to capture all engines
logger: The logger that should capture the query
path: Only log the stacktrace for files in this path, use `'/'` to log all files
"""
self.engine = engine
self.logger = logger
self.path = path
def _before_cursor_execute(self, conn, cursor, statement, parameters, context, executemany):
"""Set the time on the connection to measure query duration."""
conn._sqla_query_start_time = time.time()
def _after_cursor_execute(self, conn, cursor, statement, parameters, context, executemany):
"""Listen for the 'after_cursor_execute' event and log sqlstatement and time."""
end_time = time.time()
start_time = getattr(conn, '_sqla_query_start_time', end_time)
elapsed_time = round((end_time-start_time) * 1000)
# only include the files in self.path in the stacktrace to reduce the noise
stack = [frame for frame in traceback.extract_stack()[:-1] if frame.filename.startswith(self.path)]
self.logger.debug('Query `%s` took %s ms. Stack: %s', statement, elapsed_time, traceback.format_list(stack))
def __enter__(self, *args, **kwargs):
"""Context manager."""
if isinstance(self.engine, sqlalchemy.engine.Engine):
sqlalchemy.event.listen(self.engine, "before_cursor_execute", self._before_cursor_execute)
sqlalchemy.event.listen(self.engine, "after_cursor_execute", self._after_cursor_execute)
return self
def __exit__(self, *args, **kwargs) -> None:
"""Context manager."""
if isinstance(self.engine, sqlalchemy.engine.Engine):
sqlalchemy.event.remove(self.engine, "before_cursor_execute", self._before_cursor_execute)
sqlalchemy.event.remove(self.engine, "after_cursor_execute", self._after_cursor_execute)
使用法とテスト:
"""Test SQLAlchemy Query profiler and logger."""
import logging
import os
import sqlalchemy
from .sqlaprofiler import QueryProfiler
def test_sqlite_query(caplog):
"""Create logger and sqllite engine and profile the queries."""
logging.basicConfig()
logger = logging.getLogger(f'{__name__}')
logger.setLevel(logging.DEBUG)
caplog.set_level(logging.DEBUG, logger=f'{__name__}')
path = os.path.dirname(os.path.realpath(__file__))
engine = sqlalchemy.create_engine('sqlite://')
metadata = sqlalchemy.MetaData(engine)
table1 = sqlalchemy.Table(
"sometable", metadata,
sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
sqlalchemy.Column("data", sqlalchemy.String(255), nullable=False),
)
conn = engine.connect()
metadata.create_all(conn)
with QueryProfiler(engine, logger, path):
conn.execute(
table1.insert(),
[{"data": f"entry {i}"} for i in range(100000)]
)
conn.execute(
table1.select()
.where(table1.c.data.between("entry 25", "entry 7800"))
.order_by(sqlalchemy.desc(table1.c.data))
)
assert caplog.messages[0].startswith('Query `INSERT INTO sometable (data) VALUES (?)` took')
assert caplog.messages[1].startswith('Query `SELECT sometable.id, sometable.data \n'
'FROM sometable \n'
'WHERE sometable.data BETWEEN ? AND ? '
'ORDER BY sometable.data DESC` took ')