圧縮された csv を mongo コレクションにインポートする必要がありますが、キャッチがあります。すべてのレコードには太平洋時間のタイムスタンプが含まれており、同じレコードにある (経度、緯度) ペアに対応する現地時間に変換する必要があります。
コードは次のようになります。
def read_csv_zip(path, timezones):
with ZipFile(path) as z, z.open(z.namelist()[0]) as input:
csv_rows = csv.reader(input)
header = csv_rows.next()
check,converters = get_aux_stuff(header)
for csv_row in csv_rows:
if check(csv_row):
row = {
converter[0]:converter[1](value)
for converter, value in zip(converters, csv_row)
if allow_field(converter)
}
ts = row['ts']
lng, lat = row['loc']
found_tz_entry = timezones.find_one(SON({'loc': {'$within': {'$box': [[lng-tz_lookup_radius, lat-tz_lookup_radius],[lng+tz_lookup_radius, lat+tz_lookup_radius]]}}}))
if found_tz_entry:
tz_name = found_tz_entry['tz']
local_ts = ts.astimezone(timezone(tz_name)).replace(tzinfo=None)
row['tz'] = tz_name
else:
local_ts = (ts.astimezone(utc) + timedelta(hours = int(lng/15))).replace(tzinfo = None)
row['local_ts'] = local_ts
yield row
def insert_documents(collection, source, batch_size):
while True:
items = list(itertools.islice(source, batch_size))
if len(items) == 0:
break;
try:
collection.insert(items)
except:
for item in items:
try:
collection.insert(item)
except Exception as exc:
print("Failed to insert record {0} - {1}".format(item['_id'], exc))
def main(zip_path):
with Connection() as connection:
data = connection.mydb.data
timezones = connection.timezones.data
insert_documents(data, read_csv_zip(zip_path, timezones), 1000)
コードは次のように進みます。
- csv から読み取られたすべてのレコードがチェックされ、ディクショナリに変換されます。一部のフィールドはスキップされ、一部のタイトルは (csv ヘッダーに表示されるものから) 名前が変更され、一部の値は (datetime、integer、float に) 変換されます。など...)
- csv から読み取られたレコードごとに、タイムゾーンコレクションが検索され、レコードの場所がそれぞれのタイム ゾーンにマップされます。マッピングが成功した場合、そのタイムゾーンを使用して、レコードのタイムスタンプ (太平洋時間) がそれぞれのローカル タイムスタンプに変換されます。マッピングが見つからない場合 - 大まかな概算が計算されます。
もちろん、timezonesコレクションは適切にインデックス化されています - 呼び出すexplain()
ことで確認できます。
プロセスは遅いです。当然のことながら、すべてのレコードに対してtimezonesコレクションを照会する必要があると、パフォーマンスが低下します。どうすれば改善できるのか、アドバイスをお待ちしています。
ありがとう。
編集
timezones コレクションには 8176040 レコードが含まれており、それぞれに次の 4 つの値が含まれています。
> db.data.findOne()
{ "_id" : 3038814, "loc" : [ 1.48333, 42.5 ], "tz" : "Europe/Andorra" }
EDIT2
OK、http://toblerity.github.com/rtree/ のリリース ビルドをコンパイルし、 rtree パッケージを構成しました。次に、タイムゾーン コレクションに対応するファイルの rtree dat/idx ペアを作成しました。そのため、呼び出す代わりに をcollection.find_one
呼び出しますindex.intersection
。驚いたことに、改善がないだけでなく、動作がさらに遅くなりました! dat/idx ペア全体を RAM (704M) にロードするように rtree を微調整できるかもしれませんが、その方法はわかりません。それまでは、代替手段ではありません。
一般に、ソリューションにはタスクの並列化が必要だと思います。
EDIT3
使用時のプロファイル出力collection.find_one
:
>>> p.sort_stats('cumulative').print_stats(10)
Tue Apr 10 14:28:39 2012 ImportDataIntoMongo.profile
64549590 function calls (64549180 primitive calls) in 1231.257 seconds
Ordered by: cumulative time
List reduced from 730 to 10 due to restriction <10>
ncalls tottime percall cumtime percall filename:lineno(function)
1 0.012 0.012 1231.257 1231.257 ImportDataIntoMongo.py:1(<module>)
1 0.001 0.001 1230.959 1230.959 ImportDataIntoMongo.py:187(main)
1 853.558 853.558 853.558 853.558 {raw_input}
1 0.598 0.598 370.510 370.510 ImportDataIntoMongo.py:165(insert_documents)
343407 9.965 0.000 359.034 0.001 ImportDataIntoMongo.py:137(read_csv_zip)
343408 2.927 0.000 287.035 0.001 c:\python27\lib\site-packages\pymongo\collection.py:489(find_one)
343408 1.842 0.000 274.803 0.001 c:\python27\lib\site-packages\pymongo\cursor.py:699(next)
343408 2.542 0.000 271.212 0.001 c:\python27\lib\site-packages\pymongo\cursor.py:644(_refresh)
343408 4.512 0.000 253.673 0.001 c:\python27\lib\site-packages\pymongo\cursor.py:605(__send_message)
343408 0.971 0.000 242.078 0.001 c:\python27\lib\site-packages\pymongo\connection.py:871(_send_message_with_response)
使用時のプロファイル出力index.intersection
:
>>> p.sort_stats('cumulative').print_stats(10)
Wed Apr 11 16:21:31 2012 ImportDataIntoMongo.profile
41542960 function calls (41542536 primitive calls) in 2889.164 seconds
Ordered by: cumulative time
List reduced from 778 to 10 due to restriction <10>
ncalls tottime percall cumtime percall filename:lineno(function)
1 0.028 0.028 2889.164 2889.164 ImportDataIntoMongo.py:1(<module>)
1 0.017 0.017 2888.679 2888.679 ImportDataIntoMongo.py:202(main)
1 2365.526 2365.526 2365.526 2365.526 {raw_input}
1 0.766 0.766 502.817 502.817 ImportDataIntoMongo.py:180(insert_documents)
343407 9.147 0.000 491.433 0.001 ImportDataIntoMongo.py:152(read_csv_zip)
343406 0.571 0.000 391.394 0.001 c:\python27\lib\site-packages\rtree-0.7.0-py2.7.egg\rtree\index.py:384(intersection)
343406 379.957 0.001 390.824 0.001 c:\python27\lib\site-packages\rtree-0.7.0-py2.7.egg\rtree\index.py:435(_intersection_obj)
686513 22.616 0.000 38.705 0.000 c:\python27\lib\site-packages\rtree-0.7.0-py2.7.egg\rtree\index.py:451(_get_objects)
343406 6.134 0.000 33.326 0.000 ImportDataIntoMongo.py:162(<dictcomp>)
346 0.396 0.001 30.665 0.089 c:\python27\lib\site-packages\pymongo\collection.py:240(insert)
EDIT4
コードを並列化しましたが、結果はまだあまり有望ではありません。もっとうまくできると確信しています。詳細については、この質問に対する私自身の回答を参照してください。