Narratorを見たいと思うかもしれません。これは、「データ ストリームの分析と集計」のために設計されています。
query-seq
あなたが最初にしたいことをするのは簡単です:
(require '[narrator.query :refer [query-seq query-stream]])
(require '[narrator.operators :as n])
(def my-seq [:a :a :b :b :b :b :c :a :b :c])
(query-seq (n/group-by identity n/rate) my-seq)
==> {:a 3, :b 5, :c 2}
あなたが提案したようにフィルタリングできます。
を使用quasi-cardinality
して、サンプル内の一意のアイテムの数 (したがって、パーティションの質問) をすばやく決定できます。これには、HyperLogLog カーディナリティ推定アルゴリズムが使用されます。
(query-seq (n/quasi-cardinality) my-seq)
==> 3
quasi-frequency-by
ここに示されています:
(defn freq-in-seq
"returns a function that, when given a value, returns the frequency of that value in the sequence s
e.g. ((freq-in-seq [:a :a :b :c]) :a) ==> 2"
[s]
(query-seq (n/quasi-frequency-by identity) s))
((freq-in-seq my-seq) :a) ==> 3
quasi-distinct-by
:
(query-seq (n/quasi-distinct-by identity) my-seq)
==> [:a :b :c]
を使用したリアルタイムのストリーム分析もありquery-stream
ます。
ストリームをサンプリングして、「期間」値の変更数を取得する方法を次に示します。
(s/stream->seq
(->> my-seq
(map #(hash-map :timestamp %1 :value %2) (range))
(query-stream (n/group-by identity n/rate)
{:value :value :timestamp :timestamp :period 3})))
==> ({:timestamp 3, :value {:a 2, :b 1}} {:timestamp 6, :value {:b 3}} {:timestamp 9, :value {:a 1, :b 1, :c 1}} {:timestamp 12, :value {:c 1}})
結果は、適切なタイムスタンプを持つ 3 項目ごと (期間 3) の一連の変更です。
カスタム ストリーム アグリゲーターを作成することもできます。これは、おそらく上記のストリームで値を蓄積する方法です。私はこれらを簡単に試してみましたが、それを機能させるのにひどく失敗しました(現時点では昼休みのみ)が、これは代わりに機能します:
(defn lazy-value-accum
([s] (lazy-value-accum s {}))
([s m]
(when-not (empty? s)
(lazy-seq
(let [new-map (merge-with + m (:value (first s)))]
(cons new-map
(lazy-value-accum (rest s) new-map))))))
(lazy-value-accum
(s/stream->seq
(->> my-seq
(map #(hash-map :timestamp %1 :value %2) (range))
(query-stream (n/group-by identity n/rate)
{:value :value :timestamp :timestamp :period 3}))))
==> ({:a 2, :b 1} {:a 2, :b 4} {:a 3, :b 5, :c 1} {:a 3, :b 5, :c 2})
period
これは、遅延して読み取ることができる、サンプルごとに各値のカウントが徐々に蓄積されることを示しています。