4

0 から 100,000 までの値の大きなリストがあります (わかりやすくするために、ここでは文字で表しています)。各入力には数千の項目がある場合があります。

[a a a a b b b b c f d b c f ... ]

特定のしきい値を超える数の数を見つけたい。たとえば、しきい値が 3 の場合、答えは{a: 4, b: 5}です。

これを行うための明白な方法は、ID でグループ化し、各グループをカウントしてからフィルター処理することです。

これは言語にとらわれない質問ですが、Clojure では (Clojure を知らなくても気にしないでください!):

(filter (fn [[k cnt]] (> cnt threshold)) (frequencies input))

この関数は、非常に多数の入力に対して実行されます。各入力は非常に大きいため、グループ化とフィルタリングはコストのかかる操作です。入力が特定のしきい値を超える出力を生成できない場合、または問題空間を分割できない場合に、早期に返されるある種のガード関数を見つけたいと考えています。たとえば、最も単純なのはif the size of the input is less than the size of the threshold return nil.

入力が出力を生成できない場合に計算をスキップする、より優れたガード関数を探しています。または、出力を生成するより迅速な方法。

明らかに、グループ化自体よりも安価でなければなりません。優れた解決策の 1 つは、個別の入力セットによる入力のカウントに関係していましたが、最終的にはグループ化と同じくらいコストがかかりました...

確率的なデータ構造が鍵を握る可能性があるという考えがあります。何か案は?

(hyerloglog をタグ付けしましたが、カウントが提供されないため該当しないと思います)

4

3 に答える 3

1

Narratorを見たいと思うかもしれません。これは、「データ ストリームの分析と集計」のために設計されています。

query-seqあなたが最初にしたいことをするのは簡単です:

(require '[narrator.query :refer [query-seq query-stream]])
(require '[narrator.operators :as n])

(def my-seq [:a :a :b :b :b :b :c :a :b :c])
(query-seq (n/group-by identity n/rate) my-seq)
==> {:a 3, :b 5, :c 2}

あなたが提案したようにフィルタリングできます。

を使用quasi-cardinalityして、サンプル内の一意のアイテムの数 (したがって、パーティションの質問) をすばやく決定できます。これには、HyperLogLog カーディナリティ推定アルゴリズムが使用されます。

(query-seq (n/quasi-cardinality) my-seq)
==> 3

quasi-frequency-byここに示されています:

(defn freq-in-seq
  "returns a function that, when given a value, returns the frequency of that value in the sequence s
   e.g. ((freq-in-seq [:a :a :b :c]) :a)  ==> 2"
  [s]
  (query-seq (n/quasi-frequency-by identity) s))

((freq-in-seq my-seq) :a) ==> 3

quasi-distinct-by:

(query-seq (n/quasi-distinct-by identity) my-seq)
==> [:a :b :c]

を使用したリアルタイムのストリーム分析もありquery-streamます。

ストリームをサンプリングして、「期間」値の変更数を取得する方法を次に示します。

(s/stream->seq 
  (->> my-seq
       (map #(hash-map :timestamp %1 :value %2) (range))
       (query-stream (n/group-by identity n/rate) 
                     {:value :value :timestamp :timestamp :period 3})))
==> ({:timestamp 3, :value {:a 2, :b 1}} {:timestamp 6, :value {:b 3}} {:timestamp 9, :value {:a 1, :b 1, :c 1}} {:timestamp 12, :value {:c 1}})

結果は、適切なタイムスタンプを持つ 3 項目ごと (期間 3) の一連の変更です。

カスタム ストリーム アグリゲーターを作成することもできます。これは、おそらく上記のストリームで値を蓄積する方法です。私はこれらを簡単に試してみましたが、それを機能させるのにひどく失敗しました(現時点では昼休みのみ)が、これは代わりに機能します:

(defn lazy-value-accum
  ([s] (lazy-value-accum s {}))
  ([s m]
   (when-not (empty? s)
     (lazy-seq
      (let [new-map (merge-with + m (:value (first s)))]
        (cons new-map
              (lazy-value-accum (rest s) new-map))))))


(lazy-value-accum
  (s/stream->seq 
    (->> my-seq
         (map #(hash-map :timestamp %1 :value %2) (range))
         (query-stream (n/group-by identity n/rate) 
                       {:value :value :timestamp :timestamp :period 3}))))
==> ({:a 2, :b 1} {:a 2, :b 4} {:a 3, :b 5, :c 1} {:a 3, :b 5, :c 2})

periodこれは、遅延して読み取ることができる、サンプルごとに各値のカウントが徐々に蓄積されることを示しています。

于 2015-10-12T13:51:26.267 に答える
0

単一ノードでの作業を高速化したい場合は、このブログ投稿が示すように、reducers または core.async を検討してください。

これが非常に大きなデータセットであり、この操作が頻繁に必要であり、マルチノード クラスターを持つリソースがある場合は、Storm または Onyx のいずれかをセットアップすることを検討できます。

現実的には、reducer は最小の作業量で最大の利益をもたらすように思えます。私がリストしたすべてのオプションを使用すると、より強力で柔軟で高速なソリューションを理解するために、事前により多くの時間が必要になります。最も単純なものから最も強力なものの順に、reducer、core.async、Storm、Onyx です。

于 2015-10-17T06:56:43.520 に答える