1149

I have tried to puzzle out an answer to this question for many months while learning pandas. I use SAS for my day-to-day work and it is great for it's out-of-core support. However, SAS is horrible as a piece of software for numerous other reasons.

One day I hope to replace my use of SAS with python and pandas, but I currently lack an out-of-core workflow for large datasets. I'm not talking about "big data" that requires a distributed network, but rather files too large to fit in memory but small enough to fit on a hard-drive.

My first thought is to use HDFStore to hold large datasets on disk and pull only the pieces I need into dataframes for analysis. Others have mentioned MongoDB as an easier to use alternative. My question is this:

What are some best-practice workflows for accomplishing the following:

  1. Loading flat files into a permanent, on-disk database structure
  2. Querying that database to retrieve data to feed into a pandas data structure
  3. Updating the database after manipulating pieces in pandas

Real-world examples would be much appreciated, especially from anyone who uses pandas on "large data".

Edit -- an example of how I would like this to work:

  1. Iteratively import a large flat-file and store it in a permanent, on-disk database structure. These files are typically too large to fit in memory.
  2. In order to use Pandas, I would like to read subsets of this data (usually just a few columns at a time) that can fit in memory.
  3. I would create new columns by performing various operations on the selected columns.
  4. I would then have to append these new columns into the database structure.

I am trying to find a best-practice way of performing these steps. Reading links about pandas and pytables it seems that appending a new column could be a problem.

Edit -- Responding to Jeff's questions specifically:

  1. I am building consumer credit risk models. The kinds of data include phone, SSN and address characteristics; property values; derogatory information like criminal records, bankruptcies, etc... The datasets I use every day have nearly 1,000 to 2,000 fields on average of mixed data types: continuous, nominal and ordinal variables of both numeric and character data. I rarely append rows, but I do perform many operations that create new columns.
  2. Typical operations involve combining several columns using conditional logic into a new, compound column. For example, if var1 > 2 then newvar = 'A' elif var2 = 4 then newvar = 'B'. The result of these operations is a new column for every record in my dataset.
  3. Finally, I would like to append these new columns into the on-disk data structure. I would repeat step 2, exploring the data with crosstabs and descriptive statistics trying to find interesting, intuitive relationships to model.
  4. A typical project file is usually about 1GB. Files are organized into such a manner where a row consists of a record of consumer data. Each row has the same number of columns for every record. This will always be the case.
  5. It's pretty rare that I would subset by rows when creating a new column. However, it's pretty common for me to subset on rows when creating reports or generating descriptive statistics. For example, I might want to create a simple frequency for a specific line of business, say Retail credit cards. To do this, I would select only those records where the line of business = retail in addition to whichever columns I want to report on. When creating new columns, however, I would pull all rows of data and only the columns I need for the operations.
  6. The modeling process requires that I analyze every column, look for interesting relationships with some outcome variable, and create new compound columns that describe those relationships. The columns that I explore are usually done in small sets. For example, I will focus on a set of say 20 columns just dealing with property values and observe how they relate to defaulting on a loan. Once those are explored and new columns are created, I then move on to another group of columns, say college education, and repeat the process. What I'm doing is creating candidate variables that explain the relationship between my data and some outcome. At the very end of this process, I apply some learning techniques that create an equation out of those compound columns.

It is rare that I would ever add rows to the dataset. I will nearly always be creating new columns (variables or features in statistics/machine learning parlance).

4

16 に答える 16

700

私は日常的に数十ギガバイトのデータをこの方法で使用しています。たとえば、クエリを介して読み取り、データを作成して追加するディスク上のテーブルがあります。

データの保存方法に関するいくつかの提案については、ドキュメントを読む価値があり、このスレッドの後半に記載されています。

データの保存方法に影響を与える詳細は次のとおり
です。そして私はあなたが構造を開発するのを手伝うことができます.

  1. データのサイズ、行数、列数、列の種類。行を追加していますか、それとも単に列を追加していますか?
  2. 典型的な操作はどのようになりますか。たとえば、列に対してクエリを実行して一連の行と特定の列を選択し、操作 (インメモリ) を実行して新しい列を作成し、これらを保存します。
    (おもちゃの例を挙げれば、より具体的な推奨事項を提供できる可能性があります。)
  3. その処理の後、あなたは何をしますか?ステップ 2 はアドホックですか、それとも繰り返し可能ですか?
  4. 入力フラット ファイル: いくつ、おおよその合計サイズ (Gb)。これらはどのように記録ごとに整理されていますか? それぞれに異なるフィールドが含まれていますか、それともファイルごとにいくつかのレコードがあり、各ファイルにすべてのフィールドがありますか?
  5. 基準に基づいて行 (レコード) のサブセットを選択したことがありますか (たとえば、フィールド A > 5 の行を選択するなど)? それから何かをしますか、それともすべてのレコードでフィールド A、B、C を選択しますか (そして何かをしますか)?
  6. すべての列を (グループで) 「作業」していますか、それともレポートにのみ使用できる十分な割合がありますか (たとえば、データを保持したいが、その列を明示的に取得する必要はありません)。最終結果時間)?

解決

少なくともパンダが0.10.1インストールされていることを確認してください。

ファイルをチャンク単位で繰り返し読み取り、複数のテーブル クエリを実行します。

pytables は行単位 (クエリ対象) で動作するように最適化されているため、フィールドのグループごとにテーブルを作成します。このようにして、フィールドの小さなグループを簡単に選択できます (大きなテーブルでも機能しますが、この方法で行う方が効率的です... 将来この制限を修正できると思います... これはとにかくもっと直観的です):
(以下は疑似コードです。)

import numpy as np
import pandas as pd

# create a store
store = pd.HDFStore('mystore.h5')

# this is the key to your storage:
#    this maps your fields to a specific group, and defines 
#    what you want to have as data_columns.
#    you might want to create a nice class wrapping this
#    (as you will want to have this map and its inversion)  
group_map = dict(
    A = dict(fields = ['field_1','field_2',.....], dc = ['field_1',....,'field_5']),
    B = dict(fields = ['field_10',......        ], dc = ['field_10']),
    .....
    REPORTING_ONLY = dict(fields = ['field_1000','field_1001',...], dc = []),

)

group_map_inverted = dict()
for g, v in group_map.items():
    group_map_inverted.update(dict([ (f,g) for f in v['fields'] ]))

ファイルを読み込んでストレージを作成する (基本的に何をするかappend_to_multiple):

for f in files:
   # read in the file, additional options may be necessary here
   # the chunksize is not strictly necessary, you may be able to slurp each 
   # file into memory in which case just eliminate this part of the loop 
   # (you can also change chunksize if necessary)
   for chunk in pd.read_table(f, chunksize=50000):
       # we are going to append to each table by group
       # we are not going to create indexes at this time
       # but we *ARE* going to create (some) data_columns

       # figure out the field groupings
       for g, v in group_map.items():
             # create the frame for this group
             frame = chunk.reindex(columns = v['fields'], copy = False)    

             # append it
             store.append(g, frame, index=False, data_columns = v['dc'])

これで、すべてのテーブルがファイルに含まれるようになりました (実際には、必要に応じてそれらを別々のファイルに保存できます。ファイル名を group_map に追加する必要があるかもしれませんが、おそらくこれは必要ありません)。

これは、列を取得して新しい列を作成する方法です。

frame = store.select(group_that_I_want)
# you can optionally specify:
# columns = a list of the columns IN THAT GROUP (if you wanted to
#     select only say 3 out of the 20 columns in this sub-table)
# and a where clause if you want a subset of the rows

# do calculations on this frame
new_frame = cool_function_on_frame(frame)

# to 'add columns', create a new group (you probably want to
# limit the columns in this new_group to be only NEW ones
# (e.g. so you don't overlap from the other tables)
# add this info to the group_map
store.append(new_group, new_frame.reindex(columns = new_columns_created, copy = False), data_columns = new_columns_created)

post_processing の準備ができたら:

# This may be a bit tricky; and depends what you are actually doing.
# I may need to modify this function to be a bit more general:
report_data = store.select_as_multiple([groups_1,groups_2,.....], where =['field_1>0', 'field_1000=foo'], selector = group_1)

data_columns については、実際にはdata_columnsを定義する必要はありません。列に基づいて行をサブ選択できます。例:

store.select(group, where = ['field_1000=foo', 'field_1001>0'])

それらは、最終的なレポート生成段階で最も興味深いものになる可能性があります (基本的に、データ列は他の列から分離されているため、多く定義すると効率に多少影響する可能性があります)。

次のこともできます。

  • フィールドのリストを取得し、groups_map でグループを検索し、これらを選択して結果を連結し、結果のフレームを取得する関数を作成します (これは本質的に select_as_multiple が行うことです)。このようにして、構造はかなり透過的になります。
  • 特定のデータ列のインデックス (行のサブセット化が大幅に高速化されます)。
  • 圧縮を有効にします。

ご不明な点がございましたらお知らせください。

于 2013-01-10T22:57:22.433 に答える
166

上記の回答には、私が非常に役立つと思った単純なアプローチが欠けていると思います。

大きすぎてメモリに読み込めないファイルがある場合、そのファイルを複数の小さなファイルに分割します (行ごとまたは列ごと)。

例: ~30GB のサイズの 30 日分の取引データの場合、1 日 ~1GB のサイズのファイルに分割します。その後、各ファイルを個別に処理し、最後に結果を集計します

最大の利点の 1 つは、ファイルの並列処理 (複数のスレッドまたはプロセス) が可能になることです。

もう 1 つの利点は、ファイル操作 (例での日付の追加/削除など) を通常のシェル コマンドで実行できることです。これは、より高度で複雑なファイル形式では不可能です。

このアプローチはすべてのシナリオをカバーしているわけではありませんが、多くのシナリオで非常に役立ちます

于 2013-12-19T19:46:48.987 に答える
115

質問から2年後、「コア外」のパンダに相当するものがあります:dask。それは素晴らしいです!pandas のすべての機能をサポートしているわけではありませんが、非常に便利です。更新: 過去 2 年間、Dask は一貫して維持されており、Dask を使用する実質的なユーザー コミュニティがあります。

そして今、質問から4年後、 Vaexに相当する別の高性能の「アウトオブコア」パンダがあります。「メモリ マッピング、ゼロ メモリ コピー ポリシー、遅延計算を使用して最高のパフォーマンスを実現します (メモリを無駄にしません)」。数十億行のデータセットを処理でき、それらをメモリに保存しません (最適化されていないハードウェアで分析を行うことさえ可能にします)。

于 2016-03-23T20:30:53.543 に答える
66

これが古いスレッドであることは承知していますが、 Blazeライブラリはチェックする価値があると思います。このような状況のために構築されています。

ドキュメントから:

Blaze は、NumPy と Pandas の使いやすさを分散コンピューティングとアウトオブコア コンピューティングに拡張します。Blaze は、NumPy ND-Array または Pandas DataFrame と同様のインターフェースを提供しますが、これらの使い慣れたインターフェースを Postgres や Spark などの他のさまざまな計算エンジンにマップします。

編集:ちなみに、ContinuumIO と NumPy の作成者である Travis Oliphant によってサポートされています。

于 2014-12-03T22:09:40.410 に答える
59

これは pymongo の場合です。また、python で sql server、sqlite、HDF、ORM (SQLAlchemy) を使用してプロトタイプを作成しました。何よりもまず、pymongo はドキュメント ベースの DB であるため、各人物は (dict属性の) ドキュメントになります。多くの人がコレクションを形成し、多くのコレクション (人、株式市場、収入) を持つことができます。

pd.dateframe -> pymongo 注: 私はchunksizeinを使用read_csvして 5 から 10k レコードに保持します (pymongo は、それより大きい場合はソケットをドロップします)。

aCollection.insert((a[1].to_dict() for a in df.iterrows()))

クエリ: gt = より大きい...

pd.DataFrame(list(mongoCollection.find({'anAttribute':{'$gt':2887000, '$lt':2889000}})))

.find()ichunkedはイテレータを返すので、私は通常、より小さなイテレータに分割 するために使用します。

通常は 10 個のデータ ソースをまとめて貼り付けるので、結合はどうでしょうか。

aJoinDF = pandas.DataFrame(list(mongoCollection.find({'anAttribute':{'$in':Att_Keys}})))

次に(私の場合aJoinDF、「マージ可能」になる前に最初にアグする必要がある場合があります。)

df = pandas.merge(df, aJoinDF, on=aKey, how='left')

次に、以下の更新メソッドを使用して、新しい情報をメイン コレクションに書き込むことができます。(論理コレクションと物理データソース)。

collection.update({primarykey:foo},{key:change})

小規模なルックアップでは、非正規化します。たとえば、ドキュメントにコードがあり、フィールド コード テキストを追加して、ドキュメントdictを作成するときにルックアップを実行するとします。

これで、人に基づいた優れたデータセットができました。各ケースでロジックを解き放ち、より多くの属性を作成できます。最後に、最大キー インジケーターを 3 つまで pandas に読み込み、ピボット/集計/データ探索を行うことができます。これは、数字/大きなテキスト/カテゴリ/コード/フロート/...を含む300万レコードで機能します

MongoDB に組み込まれている 2 つのメソッド (MapReduce と集約フレームワーク) を使用することもできます。集約フレームワークの詳細については、こちらを参照してください。これは、MapReduce よりも簡単で、迅速な集約作業に便利です。フィールドやリレーションを定義する必要がなく、アイテムをドキュメントに追加できることに注意してください。急速に変化する numpy、pandas、python ツールセットの現在の状態では、MongoDB は仕事に取り掛かるのに役立ちます :)

于 2013-01-11T22:11:52.907 に答える
54

大規模なデータの使用例で役立つとわかった 1 つのトリックは、浮動小数点数の精度を 32 ビットに減らしてデータの量を減らすことです。すべての場合に適用できるわけではありませんが、多くのアプリケーションでは 64 ビットの精度は過剰であり、2 倍のメモリ節約はそれだけの価値があります。明白な点をさらに明白にするために:

>>> df = pd.DataFrame(np.random.randn(int(1e8), 5))
>>> df.info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100000000 entries, 0 to 99999999
Data columns (total 5 columns):
...
dtypes: float64(5)
memory usage: 3.7 GB

>>> df.astype(np.float32).info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100000000 entries, 0 to 99999999
Data columns (total 5 columns):
...
dtypes: float32(5)
memory usage: 1.9 GB
于 2017-03-26T05:59:45.757 に答える
49

私はこれを少し遅れて発見しましたが、同様の問題 (住宅ローンの前払いモデル) に取り組んでいます。私の解決策は、pandas HDFStore レイヤーをスキップして、ストレート pytables を使用することでした。各列を個別の HDF5 配列として最終ファイルに保存します。

私の基本的なワークフローは、まずデータベースから CSV ファイルを取得することです。私はそれをgzipするので、それほど大きくはありません。次に、それを Python で反復処理し、各行を実際のデータ型に変換して HDF5 ファイルに書き込むことにより、それを行指向の HDF5 ファイルに変換します。これには数十分かかりますが、行単位でしか動作しないため、メモリを使用しません。次に、行指向の HDF5 ファイルを列指向の HDF5 ファイルに「転置」します。

テーブル転置は次のようになります。

def transpose_table(h_in, table_path, h_out, group_name="data", group_path="/"):
    # Get a reference to the input data.
    tb = h_in.getNode(table_path)
    # Create the output group to hold the columns.
    grp = h_out.createGroup(group_path, group_name, filters=tables.Filters(complevel=1))
    for col_name in tb.colnames:
        logger.debug("Processing %s", col_name)
        # Get the data.
        col_data = tb.col(col_name)
        # Create the output array.
        arr = h_out.createCArray(grp,
                                 col_name,
                                 tables.Atom.from_dtype(col_data.dtype),
                                 col_data.shape)
        # Store the data.
        arr[:] = col_data
    h_out.flush()

それを読み返すと、次のようになります。

def read_hdf5(hdf5_path, group_path="/data", columns=None):
    """Read a transposed data set from a HDF5 file."""
    if isinstance(hdf5_path, tables.file.File):
        hf = hdf5_path
    else:
        hf = tables.openFile(hdf5_path)

    grp = hf.getNode(group_path)
    if columns is None:
        data = [(child.name, child[:]) for child in grp]
    else:
        data = [(child.name, child[:]) for child in grp if child.name in columns]

    # Convert any float32 columns to float64 for processing.
    for i in range(len(data)):
        name, vec = data[i]
        if vec.dtype == np.float32:
            data[i] = (name, vec.astype(np.float64))

    if not isinstance(hdf5_path, tables.file.File):
        hf.close()
    return pd.DataFrame.from_items(data)

現在、私は通常、大量のメモリを搭載したマシンでこれを実行しているため、メモリの使用には十分注意していない可能性があります。たとえば、デフォルトでは、ロード操作はデータ セット全体を読み取ります。

これは一般的に私にとってはうまくいきますが、少し扱いに​​くく、派手な pytables マジックを使用することはできません。

編集: レコードの配列の pytables のデフォルトに対するこのアプローチの本当の利点は、テーブルを処理できない h5r を使用してデータを R にロードできることです。または、少なくとも、異種テーブルをロードすることができませんでした。

于 2013-03-21T21:19:30.510 に答える
37

他の人が指摘したように、数年後、「コア外」の pandas に相当するものが登場しました: dask。dask は pandas とそのすべての機能を簡単に置き換えるものではありませんが、いくつかの理由で際立っています。

Dask は、NumPy、Pandas、または Python イテレーターなどの一般的なインターフェイスをより大きなものに拡張する並列配列、データフレーム、リストなどの「ビッグ データ」コレクションのインタラクティブな計算ワークロードの動的タスク スケジューリング用に最適化された、分析コンピューティング用の柔軟な並列コンピューティング ライブラリです。メモリ不足または分散環境に対応し、ラップトップからクラスターまで拡張できます。

ダスクは次の美徳を強調しています。

  • おなじみ: 並列化された NumPy 配列と Pandas DataFrame オブジェクトを提供します
  • 柔軟性: より多くのカスタム ワークロードおよび他のプロジェクトとの統合のためのタスク スケジューリング インターフェイスを提供します。
  • ネイティブ: PyData スタックにアクセスして、Pure Python で分散コンピューティングを有効にします。
  • 高速: 高速な数値アルゴリズムに必要な低オーバーヘッド、低レイテンシ、および最小限のシリアライゼーションで動作します
  • スケールアップ: 数千のコアを備えたクラスターで回復力のある実行 スケールダウン: 1 つのプロセスでラップトップをセットアップして実行するのは簡単
  • レスポンシブ: インタラクティブ コンピューティングを念頭に置いて設計されており、人間を支援するための迅速なフィードバックと診断を提供します。

簡単なコードサンプルを追加するには:

import dask.dataframe as dd
df = dd.read_csv('2015-*-*.csv')
df.groupby(df.user_id).value.mean().compute()

次のようないくつかの pandas コードを置き換えます。

import pandas as pd
df = pd.read_csv('2015-01-01.csv')
df.groupby(df.user_id).value.mean()

そして、特に注目すべきは、concurrent.futuresインターフェースを介して、カスタム タスクを送信するための一般的なインフラストラクチャを提供することです。

from dask.distributed import Client
client = Client('scheduler:port')

futures = []
for fn in filenames:
    future = client.submit(load, fn)
    futures.append(future)

summary = client.submit(summarize, futures)
summary.result()
于 2017-11-22T03:55:40.313 に答える
21

もう一つのバリエーション

pandas で実行される操作の多くは、db クエリ (sql、mongo) としても実行できます。

RDBMS または mongodb を使用すると、DB クエリで集計の一部を実行できます (これは大規模なデータ用に最適化されており、キャッシュとインデックスを効率的に使用します)。

後で、pandas を使用して後処理を実行できます。

この方法の利点は、高レベルの宣言構文でロジックを定義しながら、大規模なデータを操作するための DB 最適化を実現できることです。また、メモリ内で何を実行し、何を実行するかを決定する詳細に対処する必要はありません。コアの。

また、クエリ言語と pandas は異なりますが、通常、ロジックの一部を別の言語に変換することは複雑ではありません。

于 2015-04-28T05:22:21.403 に答える