1

特定の条件が満たされた場合にメールを送信するための clojure コード (riemann) があります。イベントを riemann サーバーに渡す際に、いくつかの問題に直面しています。

リーマン符号

(let [email (mailer {"......"})]

  (streams
    (where (service "system_log")

        (by :RefNo
         (smap
          (fn [events]
           (let [count-of-failures (count (filter #(= "Failed" (:Status %)) events))]        
              (assoc (first events)
                :status "Failure"
                 :metric  count-of-failures 
                 :total-fail (>= count-of-failures 2))))

          (where (and (= (:status event) "Failure")
                      (:total-fail event))

            (email "XXXXX@gmail.com"))prn)))))

リーマンサーバーのO/P

WARN [2015-11-18 05:24:49,596] defaultEventExecutorGroup-2-2 - riemann.streams - riemann.streams$smap$stream__3695@7addde9e threw
java.lang.IllegalArgumentException: Key must be integer
        at clojure.lang.APersistentVector.assoc(APersistentVector.java:335)
        at clojure.lang.APersistentVector.assoc(APersistentVector.java:18)

更新 2:

smap を sreduce に変更しただけです。どのように更新すればよいですか。私はこれに慣れていないので、あなたの提案に従ってコードを変更することについて少し混乱しています

(let [email (mailer {"......"})]

      (streams
        (where (service "system_log")

            (by :RefNo
             (sreduce
              (fn [events]
               (let [count-of-failures (count (filter #(= "Failed" (:Status %)) events))]        
                  (assoc (first events)
                    :status "Failure"
                     :metric  count-of-failures 
                     :total-fail (>= count-of-failures 2))))

              (where (and (= (:status event) "Failure")
                          (:total-fail event))

                (email "XXXXX@gmail.com"))prn)))))

更新 3:

coalesceを使用してコードを更新し、smapその子を持っています。エラーは表示されなくなりましたが、メールはトリガーされませんでした。count-of-failuresとして取得してい0ます。私count functionは働いていないと思います。

(let [email (mailer {"......"})]

          (streams
            (where (service "system_log")

                (by :RefNo
                 (coalesce
                   (smap
                  (fn [events]
                   (let [count-of-failures (count (filter #(= "Failed" (:status %)) events))]        
                      (assoc (first events)
                        :status "Failure"
                         :metric  count-of-failures 
                         :total-fail (>= count-of-failures 2))))

                  (where (and (= (:status event) "Failure")
                              (:total-fail event))

                    (email "XXXXX@gmail.com"))))prn))))
4

1 に答える 1

1

私の帽子の上から、byシンボルではなくベクトルを受け入れます:

(by [:Refno] ...

補足として、REPL (例: https://github.com/aphyr/riemann/wiki/playing-with-the-REPL ) を使用することをお勧めします。これにより、REPL で機能をテストしながらストリーム処理を徐々に構築できます。それは私にとってうまくいきました。

smap更新: 「失敗」を割り当てているため、内部に where をネストするべきではないかどうかもわかりませんwhereが、並行して実行されるsmapため、何かが欠けていない限り、表示されないと思います。

更新 2: 次のように Riemann に接続された REPL を実行しました。

(require '[riemann.streams :refer :all])
(def f (stream
        (where (service "system_log")
               (by :RefNo
                   (smap
                    (fn [events]
                      (let [count-of-failures (count (filter #(= "Failed" (:Status %)) events))]
                        (prn events)
                        (assoc (first events)
                               :status "Failure"
                               :metric  count-of-failures 
                               :total-fail (>= count-of-failures 2))))

                    #_(where (and (= (:status event) "Failure")
                                  (:total-fail event)))
                    prn)))))
(f {:RefNo 4444 :service "system_log" :status "Failed"})

あなたが持っているのと同じエラーが発生します。smapに渡された関数がイベントのリストを受け取ると想定しているため、エラーが発生します。そうではなく、単一のイベントを受け取ります (prnそこを参照)。firstベクトルは整数のみをサポートするため、ハッシュマップを呼び出すとベクトルが生成さassocれ、シンボルをキーとして使用しようとするとエラーが発生します。

map過去のイベントが必要なため、この目的で Clojureの通常の関数を使用しないのと同じように、この方法で失敗をカウントすることはできません。

あなたの smap の例と互換性があると私が思うものは次のとおりです。

また:

  1. 合体http://riemann.io/api/riemann.streams.html#var-coalesceと smap をその子として使用します。smap は、あなたが最初に望んでいたように、イベントのリストを受け取ると思います。試したことはありませんが、動作しない理由はありません。

  2. 1 時間の TTL でイベントを送信し、ストリーム内のインデックスをクエリすることで、必要な時間枠 (たとえば、1 時間あたり最大 2 回の失敗) を制御できます。完全な例を次に示します: http://riemann.io/howto.html#query-the-index-from-within-a-stream

それとは別に、:Status小文字にする必要があると思います。お役に立てば幸いです。

于 2015-11-18T14:20:27.853 に答える