13

私はいくつかのデータ分析に Python を使用しています。私は 2 つのテーブルを持っています。1 つ目 (「A」と呼びましょう) には 1,000 万行と 10 列があり、2 つ目 (「B」) には 7,300 万行と 2 列があります。共通の ID を持つ 1 つの列があり、その列に基づいて 2 つのテーブルを交差させたいと考えています。特に、テーブルの内部結合が必要です。

パンダで通常のマージ機能を使用するために、テーブル B をメモリに pandas データフレームとしてロードできませんでした。テーブル B のファイルをチャンクで読み取り、各チャンクを A と交差させ、これらの交差を連結してみました (内部結合からの出力)。これは速度的には問題ありませんが、時々これにより問題が発生し、セグメンテーション違反が発生します...それほど素晴らしいことではありません. このエラーは再現が困難ですが、2 つの異なるマシン (Mac OS X v10.6 (Snow Leopard) と UNIX、Red Hat Linux) で発生します。

最後に、テーブル B をディスクに書き込み、テーブル A を反復処理してテーブル B から一致する行を選択することで、Pandas と PyTables の組み合わせを試しました。この最後のオプションは機能しますが、遅いです。pytables のテーブル B は、デフォルトで既にインデックスが作成されています。

どうすればこの問題に取り組むことができますか?

4

1 に答える 1

17

これは少し疑似コードですが、かなり高速になるはずです。

ディスク上のすべてのテーブルとの簡単なディスク ベースのマージ。重要なのは、選択自体を行っているのではなく、開始/停止を介してテーブルにインデックスを付けているだけであり、これは非常に高速であることです。

B の基準を満たす行を (A の ID を使用して) 選択することは、それほど高速ではありません。これは、データをカーネル内検索ではなく Python 空間に取り込む可能性があるためです (よくわかりませんが、 pytables.org のカーネル内最適化のセクションで詳しく調べてください (カーネル内で実行されるかどうかを判断する方法があります)。

また、これは非常に並列的な問題です (複数のプロセスから同じファイルに結果を書き込まないでください。pytables はそのために書き込み安全ではありません)。

結合操作の実行が実際に「内部」結合になる方法についてのコメントについては、この回答を参照してください。

merge_a_b 操作には、非常に効率的な標準の pandas 結合を使用できると思います (インメモリの場合)。

もう1つのオプション(Aの「大きさ」に応じて)は、最初のテーブルで小さい(おそらく単一の列を使用する)を使用して、Aを2つの部分(同じインデックスが付けられている)に分割することです。マージ結果自体を保存する代わりに、行インデックスを保存します。後で、必要なデータを引き出すことができます (インデクサーを使用して取得するようなものです)。http://pandas.pydata.org/pandas-docs/stable/io.html#multiple-table-queriesを参照してください

A = HDFStore('A.h5')
B = HDFStore('B.h5')

nrows_a = A.get_storer('df').nrows
nrows_b = B.get_storer('df').nrows
a_chunk_size = 1000000
b_chunk_size = 1000000

def merge_a_b(a,b):
    # Function that returns an operation on passed
    # frames, a and b.
    # It could be a merge, join, concat, or other operation that
    # results in a single frame.


for a in xrange(int(nrows_a / a_chunk_size) + 1):

    a_start_i = a * a_chunk_size
    a_stop_i  = min((a + 1) * a_chunk_size, nrows_a)

    a = A.select('df', start = a_start_i, stop = a_stop_i)

    for b in xrange(int(nrows_b / b_chunk_size) + 1):

        b_start_i = b * b_chunk_size
        b_stop_i = min((b + 1) * b_chunk_size, nrows_b)

        b = B.select('df', start = b_start_i, stop = b_stop_i)

        # This is your result store
        m = merge_a_b(a, b)

        if len(m):
            store.append('df_result', m)
于 2013-01-31T03:24:35.147 に答える