pyodbc カーソル出力 (またはから).fetchone
をPython 辞書としてシリアル化するにはどうすればよいですか?.fetchmany
.fetchall
私は bottlepy を使用しており、dict を返す必要があるため、JSON として返すことができます。
pyodbc カーソル出力 (またはから).fetchone
をPython 辞書としてシリアル化するにはどうすればよいですか?.fetchmany
.fetchall
私は bottlepy を使用しており、dict を返す必要があるため、JSON として返すことができます。
前もって列がわからない場合は、Cursor.descriptionを使用して列名のリストを作成し、各行を圧縮して辞書のリストを作成します。例では、接続とクエリが構築されていることを前提としています。
>>> cursor = connection.cursor().execute(sql)
>>> columns = [column[0] for column in cursor.description]
>>> print(columns)
['name', 'create_date']
>>> results = []
>>> for row in cursor.fetchall():
... results.append(dict(zip(columns, row)))
...
>>> print(results)
[{'create_date': datetime.datetime(2003, 4, 8, 9, 13, 36, 390000), 'name': u'master'},
{'create_date': datetime.datetime(2013, 1, 30, 12, 31, 40, 340000), 'name': u'tempdb'},
{'create_date': datetime.datetime(2003, 4, 8, 9, 13, 36, 390000), 'name': u'model'},
{'create_date': datetime.datetime(2010, 4, 2, 17, 35, 8, 970000), 'name': u'msdb'}]
@Beargle の結果を bottlepy で使用して、エンドポイントを公開するこの非常に簡潔なクエリを作成できました。
@route('/api/query/<query_str>')
def query(query_str):
cursor.execute(query_str)
return {'results':
[dict(zip([column[0] for column in cursor.description], row))
for row in cursor.fetchall()]}
これは、使用できる可能性のある短い形式のバージョンです
>>> cursor.select("<your SQL here>")
>>> single_row = dict(zip(zip(*cursor.description)[0], cursor.fetchone()))
>>> multiple_rows = [dict(zip(zip(*cursor.description)[0], row)) for row in cursor.fetchall()]
リストに * を追加すると、基本的にリストが取り除かれ、呼び出している関数のパラメーターとして個々のリスト エントリが残されます。zip を使用することで、1 番目から n 番目のエントリを選択し、パンツのジッパーのようにそれらをまとめてジッパーで留めます。
だから使うことで
zip(*[(a,1,2),(b,1,2)])
# interpreted by python as zip((a,1,2),(b,1,2))
あなたが得る
[('a', 'b'), (1, 1), (2, 2)]
description はタプルを含むタプルであるため、各タプルは各列のヘッダーとデータ型を記述しているため、次のようにして各タプルの最初のものを抽出できます
>>> columns = zip(*cursor.description)[0]
に相当
>>> columns = [column[0] for column in cursor.description]
カーソルが使用できない状況では、たとえば、関数呼び出しまたは内部メソッドによって行が返された場合でも、row.cursor_description を使用して辞書表現を作成できます。
def row_to_dict(row):
return dict(zip([t[0] for t in row.cursor_description], row))
主に @Torxed レスポンスから離れて、スキーマとデータをディクショナリに検索するための完全に一般化された関数のセットを作成しました。
def schema_dict(cursor):
cursor.execute("SELECT sys.objects.name, sys.columns.name FROM sys.objects INNER JOIN sys.columns ON sys.objects.object_id = sys.columns. object_id WHERE sys.objects.type = 'U';")
schema = {}
for it in cursor.fetchall():
if it[0] not in schema:
schema[it[0]]={'scheme':[]}
else:
schema[it[0]]['scheme'].append(it[1])
return schema
def populate_dict(cursor, schema):
for i in schema.keys():
cursor.execute("select * from {table};".format(table=i))
for row in cursor.fetchall():
colindex = 0
for col in schema[i]['scheme']:
if not 'data' in schema[i]:
schema[i]['data']=[]
schema[i]['data'].append(row[colindex])
colindex += 1
return schema
def database_to_dict():
cursor = connect()
schema = populate_dict(cursor, schema_dict(cursor))
行を減らすために、これですべてのコードゴルフを自由に行ってください。それまでの間、それは機能します!
;)
手順は次のとおりです。
from pandas import DataFrame
import pyodbc
import sqlalchemy
db_file = r'xxx.accdb'
odbc_conn_str = 'DRIVER={Microsoft Access Driver (*.mdb, *.accdb)};DBQ=%s' % (db_file)
conn = pyodbc.connect(odbc_conn_str)
cur = conn.cursor()
qry = cur.execute("SELECT * FROM tbl")
columns = [column[0] for column in cur.description]
results = []
for row in cur.fetchall():
results.append(dict(zip(columns, row)))
df = DataFrame(results)
df
列名を知っていると仮定します! また、ここに 3 つの異なるソリューションがあり
ます。おそらく最後のソリューションを参照してください。
colnames = ['city', 'area', 'street']
data = {}
counter = 0
for row in x.fetchall():
if not counter in data:
data[counter] = {}
colcounter = 0
for colname in colnames:
data[counter][colname] = row[colcounter]
colcounter += 1
counter += 1
これはインデックス付きのバージョンであり、最も美しいソリューションではありませんが、機能します。もう 1 つは、行番号順にデータを含む各キー内のリストを使用して、列名をディクショナリ キーとしてインデックス付けすることです。することによって:
colnames = ['city', 'area', 'street']
data = {}
for row in x.fetchall():
colindex = 0
for col in colnames:
if not col in data:
data[col] = []
data[col].append(row[colindex])
colindex += 1
これを書いて、私は行うfor col in colnames
ことを置き換えることができることを理解してfor colindex in range(0, len())
いますが、あなたはその考えを理解しています. 後者の例は、すべてのデータをフェッチするのではなく、一度に 1 行ずつフェッチする場合に役立ちます。たとえば、次のようになります。
def fetchone_dict(stuff):
colnames = ['city', 'area', 'street']
data = {}
for colindex in range(0, colnames):
data[colnames[colindex]] = stuff[colindex]
return data
row = x.fetchone()
print fetchone_dict(row)['city']
テーブル名の取得 (私が思う.. Foo Stack のおかげです):以下のベアグルから
のより直接的な解決策!
cursor.execute("SELECT sys.objects.name, sys.columns.name FROM sys.objects INNER JOIN sys.columns ON sys.objects.object_id = sys.columns. object_id WHERE sys.objects.type = 'U';")
schema = {}
for it in cursor.fetchall():
if it[0] in schema:
schema[it[0]].append(it[1])
else:
schema[it[0]] = [it[1]]