3

8 つの変数を含む netCDF ファイルがあります。(申し訳ありませんが、実際のファイルを共有することはできません) 各変数には、時間と駅の 2 つの次元があります。時間は約 14 ステップで、ステーションは現在 38000 の異なる ID です。したがって、38000 の異なる「場所」(実際には単なる ID) に対して、8 つの変数と 14 の異なる時間があります。

$ncdump -h stationdata.nc
netcdf stationdata {
dimensions:
    station = 38000 ;
    name_strlen = 40 ;
    time = UNLIMITED ; // (14 currently)
variables:
    int time(time) ;
            time:long_name = "time" ;
            time:units = "seconds since 1970-01-01" ;
    char station_name(station, name_strlen) ;
            station_name:long_name = "station_name" ;
            station_name:cf_role = "timeseries_id" ;
    float var1(time, station) ;
            var1:long_name = "Variable 1" ;
            var1:units = "m3/s" ;
    float var2(time, station) ;
            var2:long_name = "Variable 2" ;
            var2:units = "m3/s" ;
...

このデータを PostGres データベースにロードして、後で視覚化するために station_name に一致するジオメトリにデータを結合できるようにする必要があります。

現在、netCDF4-module を使用して Python でこれを行っています。機能しますが、永遠にかかります!今、私はこのようにループしています:

times = rootgrp.variables['time']
stations = rootgrp.variables['station_name']
for timeindex, time in enumerate(times):
    stations = rootgrp.variables['station_name']
    for stationindex, stationnamearr in enumerate(stations):
        var1val = var1[timeindex][stationindex]
        print "INSERT INTO ncdata (validtime, stationname, var1) \
            VALUES ('%s','%s', %s);" % \
            ( time, stationnamearr, var1val )

私のマシンではこれを実行するのに数分かかりますが、もっと賢い方法で実行できると感じています。

これをよりスマートな方法で行う方法について、誰にもアイデアがありますか? できればPythonで。

4

3 に答える 3

3

これが正しい方法かどうかはわかりませんが、これを解決する良い方法を見つけたので、共有する必要があると思いました.

最初のバージョンでは、スクリプトの実行に約 1 時間かかりました。コードを書き直した後、30 秒以内に実行されるようになりました。

大きなことは、numpy 配列を使用し、NetCDF リーダーからの変数配列を変換して行にし、すべての列を 1 つの行列に積み重ねることでした。このマトリックスは、psycopg2 copy_from 関数を使用してデータベースにロードされました。この質問からそのコードを取得しました

psycopg2 でバイナリ COPY テーブル FROM を使用する

私のコードの一部:

dates = num2date(rootgrp.variables['time'][:],units=rootgrp.variables['time'].units)
var1=rootgrp.variables['var1']
var2=rootgrp.variables['var2']

cpy = cStringIO.StringIO()

for timeindex, time in enumerate(dates):

    validtimes=np.empty(var1[timeindex].size, dtype="object")
    validtimes.fill(time)

    #  Transponse and stack the arrays of parameters
    #    [a,a,a,a]        [[a,b,c],
    #    [b,b,b,b]  =>     [a,b,c],
    #    [c,c,c,c]         [a,b,c],
    #                      [a,b,c]]

    a = np.hstack((
              validtimes.reshape(validtimes.size,1),
              stationnames.reshape(stationnames.size,1),
              var1[timeindex].reshape(var1[timeindex].size,1),
              var2[timeindex].reshape(var2[timeindex].size,1)
    ))

    # Fill the cStringIO with text representation of the created array
    for row in a:
            cpy.write(row[0].strftime("%Y-%m-%d %H:%M")+'\t'+ row[1] +'\t' + '\t'.join([str(x) for x in row[2:]]) + '\n')


conn = psycopg2.connect("host=postgresserver dbname=nc user=user password=passwd")
curs = conn.cursor()

cpy.seek(0)
curs.copy_from(cpy, 'ncdata', columns=('validtime', 'stationname', 'var1', 'var2'))
conn.commit()
于 2013-04-26T08:41:42.747 に答える
2

これを高速化するためにできるいくつかの簡単な改善があります。これらはすべて独立しており、すべてまたはいくつかを試して、十分に高速かどうかを確認できます。それらはおおよそ難易度の昇順です。

  • psycopg2データベースドライバーを使用してください。より高速です
  • 挿入のブロック全体をトランザクションにラップします。を使用しpsycopg2ている場合は、すでにこれを行っています。最後に必要なトランザクションを自動的に開きますcommit
  • 配列内の数行分の値を収集し、n 行ごとに多値 INSERT を実行します。
  • ヘルパー プロセスを介して挿入を行うには、複数の接続を使用します -multiprocessingモジュールを参照してください。GIL (グローバル インタープリター ロック) の問題により、スレッドはうまく機能しません。

1 つの大きなトランザクションを使用したくない場合は、ディスク フラッシュが実際に完了する前に接続が戻るようにsynchronous_commit = off設定して設定できます。commit_delay1 つのトランザクションですべての作業を行っている場合、これはあまり役に立ちません。

多値挿入

Psycopg2 は多値を直接サポートしていませんINSERTが、次のように書くことができます:

curs.execute("""
INSERT INTO blah(a,b) VALUES
(%s,%s),
(%s,%s),
(%s,%s),
(%s,%s),
(%s,%s);
""", parms);

次のようなものでループします。

parms = []
rownum = 0
for x in input_data:
    parms.extend([x.firstvalue, x.secondvalue])
    rownum += 1
    if rownum % 5 == 0:
        curs.execute("""INSERT ...""", tuple(parms))
        del(parms[:])
于 2013-04-15T23:47:20.777 に答える