SQLite は過去 4 年間で改善されたため、共有インメモリ データベースが可能になりました。次のコードを確認してください。
import sqlite3
foobar_uri = 'file:foobar_database?mode=memory&cache=shared'
not_really_foobar_uri = 'file:not_really_foobar?mode=memory&cache=shared'
# connect to databases in no particular order
db2 = sqlite3.connect(foobar_uri, uri=True)
db_lol = sqlite3.connect(not_really_foobar_uri, uri=True)
db1 = sqlite3.connect(foobar_uri, uri=True)
# create cursor as db2
cur2 = db2.cursor()
# create table as db2
db2.execute('CREATE TABLE foo (NUMBER bar)')
# insert values as db1
db1.execute('INSERT INTO foo VALUES (42)')
db1.commit()
# and fetch them from db2 through cur2
cur2.execute('SELECT * FROM foo')
print(cur2.fetchone()[0]) # 42
# test that db_lol is not shared with db1 and db2
try:
db_lol.cursor().execute('SELECT * FROM foo')
except sqlite3.OperationalError as exc:
print(exc) # just as expected
データベース アクセスは意図的に絡み合っており、インメモリ データベースへの同じ名前の 2 つの接続が SQLite の観点から同じであることを示しています。
参考文献:
- SQLite URI
- SQLite 共有キャッシュ
残念ながら、URI による接続は Python 3.4 以降でしか利用できません。ただし、Python 2.6 以降 (ただし Python 3 ではない) を使用している場合、組み込みsqlite3
モジュールは引き続き APSW 接続をインポートでき、これを使用して同じ効果を得ることができます。ドロップインsqlite3
モジュールの交換は次のとおりです。
from sqlite3 import *
from sqlite3 import connect as _connect
from apsw import Connection as _ApswConnection
from apsw import SQLITE_OPEN_READWRITE as _SQLITE_OPEN_READWRITE
from apsw import SQLITE_OPEN_CREATE as _SQLITE_OPEN_CREATE
from apsw import SQLITE_OPEN_URI as _SQLITE_OPEN_URI
# APSW and pysqlite use different instances of sqlite3 library, so initializing
# APSW won't help pysqlite. Because pysqlite does not expose any way to
# explicitly call sqlite3_initialize(), here goes an ugly hack. This only has
# to be done once per process.
_connect(':memory:').close()
def connect(database, timeout=5.0, detect_types=0, isolation_level=None,
check_same_thread=True, factory=Connection, cached_statements=100,
uri=False):
flags = _SQLITE_OPEN_READWRITE | _SQLITE_OPEN_CREATE
if uri:
flags |= _SQLITE_OPEN_URI
db = _ApswConnection(database, flags, None, cached_statements)
conn = _connect(db, timeout, detect_types, isolation_level,
check_same_thread, factory, cached_statements)
return conn