66

パンダでライブ受信データを処理する最も推奨される/pythonicな方法はどれですか?

数秒ごとに、次の形式のデータ ポイントを受信して​​います。

{'time' :'2013-01-01 00:00:00', 'stock' : 'BLAH',
 'high' : 4.0, 'low' : 3.0, 'open' : 2.0, 'close' : 1.0}

それを既存の DataFrame に追加してから、分析を実行したいと思います。

問題は、DataFrame.append で行を追加するだけで、そのすべてのコピーでパフォーマンスの問題が発生する可能性があることです。

私が試したこと:

何人かは、大きな DataFrame を事前に割り当て、データが入ってきたら更新することを提案しました。

In [1]: index = pd.DatetimeIndex(start='2013-01-01 00:00:00', freq='S', periods=5)

In [2]: columns = ['high', 'low', 'open', 'close']

In [3]: df = pd.DataFrame(index=t, columns=columns)

In [4]: df
Out[4]: 
                    high  low open close
2013-01-01 00:00:00  NaN  NaN  NaN   NaN
2013-01-01 00:00:01  NaN  NaN  NaN   NaN
2013-01-01 00:00:02  NaN  NaN  NaN   NaN
2013-01-01 00:00:03  NaN  NaN  NaN   NaN
2013-01-01 00:00:04  NaN  NaN  NaN   NaN

In [5]: data = {'time' :'2013-01-01 00:00:02', 'stock' : 'BLAH', 'high' : 4.0, 'low' : 3.0, 'open' : 2.0, 'close' : 1.0}

In [6]: data_ = pd.Series(data)

In [7]: df.loc[data['time']] = data_

In [8]: df
Out[8]: 
                    high  low open close
2013-01-01 00:00:00  NaN  NaN  NaN   NaN
2013-01-01 00:00:01  NaN  NaN  NaN   NaN
2013-01-01 00:00:02    4    3    2     1
2013-01-01 00:00:03  NaN  NaN  NaN   NaN
2013-01-01 00:00:04  NaN  NaN  NaN   NaN

もう 1 つの方法は、辞書のリストを作成することです。着信データをリストに追加し、それを小さな DataFrame にスライスして作業を行うだけです。

In [9]: ls = []

In [10]: for n in range(5):
   .....:     # Naive stuff ahead =)
   .....:     time = '2013-01-01 00:00:0' + str(n)
   .....:     d = {'time' : time, 'stock' : 'BLAH', 'high' : np.random.rand()*10, 'low' : np.random.rand()*10, 'open' : np.random.rand()*10, 'close' : np.random.rand()*10}
   .....:     ls.append(d)

In [11]: df = pd.DataFrame(ls[1:3]).set_index('time')

In [12]: df
Out[12]: 
                        close      high       low      open stock
time                                                             
2013-01-01 00:00:01  3.270078  1.008289  7.486118  2.180683  BLAH
2013-01-01 00:00:02  3.883586  2.215645  0.051799  2.310823  BLAH

またはそのようなもの、おそらく入力をもう少し処理します。

4

2 に答える 2

25

次のように HDF5/pytables を使用します。

  1. 「可能な限り」データをpythonリストとして保持します。
  2. 結果をそのリストに追加します。
  3. それが「大きく」なったとき:
    • pandas io (および追加可能なテーブル) を使用して HDF5 ストアにプッシュします。
    • リストをクリアします。
  4. 繰り返す。

実際、私が定義する関数は、各「キー」のリストを使用して、同じプロセスで複数の DataFrame を HDF5 ストアに格納できるようにします。


各行で呼び出す関数を定義しますd

CACHE = {}
STORE = 'store.h5'   # Note: another option is to keep the actual file open

def process_row(d, key, max_len=5000, _cache=CACHE):
    """
    Append row d to the store 'key'.

    When the number of items in the key's cache reaches max_len,
    append the list of rows to the HDF5 store and clear the list.

    """
    # keep the rows for each key separate.
    lst = _cache.setdefault(key, [])
    if len(lst) >= max_len:
        store_and_clear(lst, key)
    lst.append(d)

def store_and_clear(lst, key):
    """
    Convert key's cache list to a DataFrame and append that to HDF5.
    """
    df = pd.DataFrame(lst)
    with pd.HDFStore(STORE) as store:
        store.append(key, df)
    lst.clear()

注: with ステートメントを使用して、各書き込み後にストアを自動的に閉じます。開いたままにしておく方が速いかもしれませんが、そうであれば定期的にフラッシュすることをお勧めします (閉じたフラッシュ)。また、リストではなくコレクションの両端キューを使用した方が読みやすいかもしれませんが、ここではリストのパフォーマンスがわずかに向上します。

これを使用するには、次のように呼び出します。

process_row({'time' :'2013-01-01 00:00:00', 'stock' : 'BLAH', 'high' : 4.0, 'low' : 3.0, 'open' : 2.0, 'close' : 1.0},
            key="df")

注:「df」は、pytables ストアで使用される保存されたキーです。

ジョブが終了したらstore_and_clear、残りのキャッシュを確認します。

for k, lst in CACHE.items():  # you can instead use .iteritems() in python 2
    store_and_clear(lst, k)

これで、完全な DataFrame が次の方法で利用できるようになりました。

with pd.HDFStore(STORE) as store:
    df = store["df"]                    # other keys will be store[key]

いくつかのコメント:

  • 5000 は調整できます。ニーズに合わせて小さい/大きい数値を試してみてください。
  • リストの追加は O(1)、データフレームの追加は O( len(df)) です。
  • 統計やデータ変更を行うまでは、パンダは必要ありません。最速のものを使用してください。
  • このコードは、入ってくる複数のキー (データ ポイント) で機能します。
  • これは非常に小さなコードであり、バニラの python リストと pandas データフレームにとどまっています...

さらに、最新の読み取りを取得するには、読み取り前に保存してクリアする get メソッドを定義できます。このようにして、最新のデータを取得します。

def get_latest(key, _cache=CACHE):
    store_and_clear(_cache[key], key)
    with pd.HDFStore(STORE) as store:
        return store[key]

今あなたがアクセスするとき:

df = get_latest("df")

最新の「df」が利用可能になります。


別のオプションはもう少し複雑です: バニラ pytables でカスタム テーブルを定義します。チュートリアルを参照してください。

注:列記述子を作成するには、フィールド名を知る必要があります。

于 2015-12-15T06:22:12.697 に答える