2

圧縮された csv を mongo コレクションにインポートする必要がありますが、キャッチがあります。すべてのレコードには太平洋時間のタイムスタンプが含まれており、同じレコードにある (経度、緯度) ペアに対応する現地時間に変換する必要があります。

コードは次のようになります。

def read_csv_zip(path, timezones):
  with ZipFile(path) as z, z.open(z.namelist()[0]) as input:
    csv_rows = csv.reader(input)
    header = csv_rows.next()
    check,converters = get_aux_stuff(header)
    for csv_row in csv_rows:
      if check(csv_row):
        row = {
          converter[0]:converter[1](value) 
          for converter, value in zip(converters, csv_row) 
          if allow_field(converter)
        }
        ts = row['ts']
        lng, lat = row['loc']
        found_tz_entry = timezones.find_one(SON({'loc': {'$within': {'$box': [[lng-tz_lookup_radius, lat-tz_lookup_radius],[lng+tz_lookup_radius, lat+tz_lookup_radius]]}}}))
        if found_tz_entry:
          tz_name = found_tz_entry['tz']
          local_ts = ts.astimezone(timezone(tz_name)).replace(tzinfo=None)
          row['tz'] = tz_name
        else:
          local_ts = (ts.astimezone(utc) + timedelta(hours = int(lng/15))).replace(tzinfo = None)
        row['local_ts'] = local_ts
        yield row

def insert_documents(collection, source, batch_size):
  while True:
    items = list(itertools.islice(source, batch_size))
    if len(items) == 0:
      break;
    try:
      collection.insert(items)
    except:
      for item in items:
        try:
          collection.insert(item)
        except Exception as exc:
          print("Failed to insert record {0} - {1}".format(item['_id'], exc))

def main(zip_path):
  with Connection() as connection:
    data = connection.mydb.data
    timezones = connection.timezones.data
    insert_documents(data, read_csv_zip(zip_path, timezones), 1000)

コードは次のように進みます。

  1. csv から読み取られたすべてのレコードがチェックされ、ディクショナリに変換されます。一部のフィールドはスキップされ、一部のタイトルは (csv ヘッダーに表示されるものから) 名前が変更され、一部の値は (datetime、integer、float に) 変換されます。など...)
  2. csv から読み取られたレコードごとに、タイムゾーンコレクションが検索され、レコードの場所がそれぞれのタイム ゾーンにマップされます。マッピングが成功した場合、そのタイムゾーンを使用して、レコードのタイムスタンプ (太平洋時間) がそれぞれのローカル タイムスタンプに変換されます。マッピングが見つからない場合 - 大まかな概算が計算されます。

もちろん、timezonesコレクションは適切にインデックス化されています - 呼び出すexplain()ことで確認できます。

プロセスは遅いです。当然のことながら、すべてのレコードに対してtimezonesコレクションを照会する必要があると、パフォーマンスが低下します。どうすれば改善できるのか、アドバイスをお待ちしています。

ありがとう。

編集

timezones コレクションには 8176040 レコードが含まれており、それぞれに次の 4 つの値が含まれています。

> db.data.findOne()
{ "_id" : 3038814, "loc" : [ 1.48333, 42.5 ], "tz" : "Europe/Andorra" }

EDIT2

OK、http://toblerity.github.com/rtree/ のリリース ビルドをコンパイルし rtree パッケージを構成しました。次に、タイムゾーン コレクションに対応するファイルの rtree dat/idx ペアを作成しました。そのため、呼び出す代わりに をcollection.find_one呼び出しますindex.intersection。驚いたことに、改善がないだけでなく、動作がさらに遅くなりました! dat/idx ペア全体を RAM (704M) にロードするように rtree を微調整できるかもしれませんが、その方法はわかりません。それまでは、代替手段ではありません。

一般に、ソリューションにはタスクの並列化が必要だと思います。

EDIT3

使用時のプロファイル出力collection.find_one:

>>> p.sort_stats('cumulative').print_stats(10)
Tue Apr 10 14:28:39 2012    ImportDataIntoMongo.profile

         64549590 function calls (64549180 primitive calls) in 1231.257 seconds

   Ordered by: cumulative time
   List reduced from 730 to 10 due to restriction <10>

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1    0.012    0.012 1231.257 1231.257 ImportDataIntoMongo.py:1(<module>)
        1    0.001    0.001 1230.959 1230.959 ImportDataIntoMongo.py:187(main)
        1  853.558  853.558  853.558  853.558 {raw_input}
        1    0.598    0.598  370.510  370.510 ImportDataIntoMongo.py:165(insert_documents)
   343407    9.965    0.000  359.034    0.001 ImportDataIntoMongo.py:137(read_csv_zip)
   343408    2.927    0.000  287.035    0.001 c:\python27\lib\site-packages\pymongo\collection.py:489(find_one)
   343408    1.842    0.000  274.803    0.001 c:\python27\lib\site-packages\pymongo\cursor.py:699(next)
   343408    2.542    0.000  271.212    0.001 c:\python27\lib\site-packages\pymongo\cursor.py:644(_refresh)
   343408    4.512    0.000  253.673    0.001 c:\python27\lib\site-packages\pymongo\cursor.py:605(__send_message)
   343408    0.971    0.000  242.078    0.001 c:\python27\lib\site-packages\pymongo\connection.py:871(_send_message_with_response)

使用時のプロファイル出力index.intersection:

>>> p.sort_stats('cumulative').print_stats(10)
Wed Apr 11 16:21:31 2012    ImportDataIntoMongo.profile

         41542960 function calls (41542536 primitive calls) in 2889.164 seconds

   Ordered by: cumulative time
   List reduced from 778 to 10 due to restriction <10>

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1    0.028    0.028 2889.164 2889.164 ImportDataIntoMongo.py:1(<module>)
        1    0.017    0.017 2888.679 2888.679 ImportDataIntoMongo.py:202(main)
        1 2365.526 2365.526 2365.526 2365.526 {raw_input}
        1    0.766    0.766  502.817  502.817 ImportDataIntoMongo.py:180(insert_documents)
   343407    9.147    0.000  491.433    0.001 ImportDataIntoMongo.py:152(read_csv_zip)
   343406    0.571    0.000  391.394    0.001 c:\python27\lib\site-packages\rtree-0.7.0-py2.7.egg\rtree\index.py:384(intersection)
   343406  379.957    0.001  390.824    0.001 c:\python27\lib\site-packages\rtree-0.7.0-py2.7.egg\rtree\index.py:435(_intersection_obj)
   686513   22.616    0.000   38.705    0.000 c:\python27\lib\site-packages\rtree-0.7.0-py2.7.egg\rtree\index.py:451(_get_objects)
   343406    6.134    0.000   33.326    0.000 ImportDataIntoMongo.py:162(<dictcomp>)
      346    0.396    0.001   30.665    0.089 c:\python27\lib\site-packages\pymongo\collection.py:240(insert)

EDIT4

コードを並列化しましたが、結果はまだあまり有望ではありません。もっとうまくできると確信しています。詳細については、この質問に対する私自身の回答を参照してください。

4

1 に答える 1

0

OK、コードを並列化しましたが、実行速度は 2 倍しかありません。これが私の解決策です。

write_batch_size=100
read_batch_size=100
count_parsed_csv_consumers=15
count_data_records_consumers=1
parsed_csv_queue = Queue()
data_record_queue = Queue()

def get_parsed_csv_consumer(converters, timezones):
  def do_work(csv_row):
    row = {
      converter[0]:converter[1](value) 
      for converter, value in zip(converters, csv_row) 
      if allow_field(converter)
    }
    ts = row['ts']
    lng, lat = row['loc']
    found_tz_entry = timezones.find_one(SON({'loc': {'$within': {'$box': [[lng-tz_lookup_radius, lat-tz_lookup_radius],[lng+tz_lookup_radius, lat+tz_lookup_radius]]}}}))
    if found_tz_entry:
      tz_name = found_tz_entry['tz']
      local_ts = ts.astimezone(timezone(tz_name)).replace(tzinfo=None)
      row['tz'] = tz_name
    else:
      local_ts = (ts.astimezone(utc) + timedelta(hours = int(lng/15))).replace(tzinfo = None)
    row['local_ts'] = local_ts
    return row
  def worker():
    while True:
      csv_rows = parsed_csv_queue.get();
      try:
        rows=[]
        for csv_row in csv_rows:
          rows.append(do_work(csv_row))
        data_record_queue.put_nowait(rows)
      except Exception as exc:
        print(exc)
      parsed_csv_queue.task_done()
  return worker

def get_data_record_consumer(collection):
  items = []
  def do_work(row):
    items.append(row)
    if len(items) == write_batch_size:
      persist_items()
  def persist_items():
    try:
      collection.insert(items)
    except:
      for item in items:
        try:
          collection.insert(item)
        except Exception as exc:
          print("Failed to insert record {0} - {1}".format(item['_id'], exc))
    del items[:]
  def data_record_consumer():
    collection    # explicit capture
    while True:
      rows = data_record_queue.get()
      try:
        if rows:
          for row in rows:
            do_work(row)
        elif items:
          persist_items()
      except Exception as exc:
        print(exc)
      data_record_queue.task_done()
  return data_record_consumer

def import_csv_zip_to_collection(path, timezones, collection):
  def get_threads(count, target, name):
    acc = []
    for i in range(count):
      x = Thread(target=target, name=name + " " + str(i))
      x.daemon = True
      x.start()
      acc.append(x)
    return acc

  with ZipFile(path) as z, z.open(z.namelist()[0]) as input:
    csv_rows = csv.reader(input)
    header = next(csv_rows)
    check,converters = get_aux_stuff(header)

    parsed_csv_consumer_threads = get_threads(count_parsed_csv_consumers, get_parsed_csv_consumer(converters, timezones), "parsed csv consumer")
    data_record_consumer_threads = get_threads(count_data_records_consumers, get_data_record_consumer(collection), "data record consumer")

    read_batch = []
    for csv_row in csv_rows:
      if check(csv_row):
        read_batch.append(csv_row)
        if len(read_batch) == read_batch_size:
          parsed_csv_queue.put_nowait(read_batch)
          read_batch = []
    if len(read_batch) > 0:
      parsed_csv_queue.put_nowait(read_batch)
      read_batch = []
    parsed_csv_queue.join()
    data_record_queue.join()
    # data record consumers may have some items cached. All of them must flush their caches now.
    # we do it by enqueing a special item, which when fetched causes the respective consumer to
    # terminate its operation
    for i in range(len(data_record_consumer_threads)):
      data_record_queue.put_nowait(None)
    data_record_queue.join()

プロセスは次のようになります。

  1. 解析された csv 行はバッチ処理されます (バッチのサイズは によって決まりますread_batch_size)
  2. 解析された csv 行のバッチがいっぱいにparsed_csv_queueなると、複数のコンシューマーによって消費されるように配置されます。parsed_csv_consumer_threads
  3. 解析された csv 行コンシューマーは、mongo クエリを使用してタイムゾーンを検索する必要があるため、低速です ( ) 。正確にtimezones.find_oneは、それらの多くが存在します。count_parsed_csv_consumers
  4. 解析された csv コンシューマは、その入力をデータ レコードに変換します。変換されたレコードはバッチ処理され (バッチ サイズは保持されますread_batch_size)、バッチがいっぱいになると、別のキューに配置されます。data_record_queue
  5. データ レコード コンシューマは、データ レコードのバッチを取得data_record_queueし、宛先の mongo コレクションに挿入します。
  6. データ レコード コンシューマーは、解析された csv レコード コンシューマーよりもはるかに高速であるため、はるかに少ない数です。実際、私は 1 つしか使用していませんが、count_data_records_consumers定数を使用して変更できます。

最初のバージョンでは、個々のレコードをキューに配置していましたが、プロファイリングにより、これQueue.put_nowaitは非常にコストがかかることが判明したため、レコードをバッチ処理することで、プットの数を減らすことを余儀なくされました。

とにかく、パフォーマンスは 2 倍高速ですが、もっと良い結果を期待していました。プロファイリングの結果は次のとおりです。

>>> p.sort_stats('cumulative').print_stats(10)
Fri Apr 13 13:31:17 2012    ImportOoklaIntoMongo.profile

         3782711 function calls (3782429 primitive calls) in 310.209 seconds

   Ordered by: cumulative time
   List reduced from 737 to 10 due to restriction <10>

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1    0.016    0.016  310.209  310.209 .\ImportOoklaIntoMongo.py:1(<module>)
        1    0.004    0.004  309.833  309.833 .\ImportOoklaIntoMongo.py:272(main)
        1   17.829   17.829  220.432  220.432 .\ImportOoklaIntoMongo.py:225(import_csv_zip_to_collection)
   386081   28.049    0.000  135.297    0.000 c:\python27\lib\zipfile.py:508(readline)
   107008    7.588    0.000  102.938    0.001 c:\python27\lib\zipfile.py:570(read)
   107008   50.716    0.000   95.302    0.001 c:\python27\lib\zipfile.py:598(read1)
    71240    3.820    0.000   95.292    0.001 c:\python27\lib\zipfile.py:558(peek)
        1   89.382   89.382   89.382   89.382 {raw_input}
   386079   43.564    0.000   54.706    0.000 .\ImportOoklaIntoMongo.py:103(check)
    35767   40.286    0.001   40.286    0.001 {built-in method decompress}

メインスレッドの結果だけが表示されているように見えるため、プロファイラーの出力には少し疑いがあります。実際、Python でマルチスレッド プログラムをプロファイリングするにはどうすればよいですか?

于 2012-04-13T11:21:25.220 に答える