8

このメモの長さについて前もってお詫び申し上げます。短くするのにかなりの時間を費やしましたが、これは可能な限り小さくなりました。

私には謎があり、あなたの助けに感謝します。observerこのミステリーは、私が Clojure で書いた rxjava がobservable、オンライン サンプルから引用したいくつかの簡単な s に対して動作することに起因しています。

1 つのオブザーバブルは、そのオブザーバーのハンドラーにメッセージを同期的に送信しonNext、私の原則的なオブザーバーは期待どおりに動作します。

もう一方のオブザーバブルは、Clojure を介して、別のスレッドで同じことを非同期に行いますfuture。まったく同じオブザーバーが、その に投稿されたすべてのイベントをキャプチャーするわけではありませんonNext。末尾のランダムな数のメッセージが失われるようです。

promise以下では、 dの待機の満了と、コレクターonCompletedに送信されるすべてのイベントの待機の満了との間に意図的な競合があります。が勝ったagent場合、. が勝った場合、のキューからのすべてのメッセージが表示されることを期待しています。私が期待していない 1 つの結果は、AND からの短いキューです。しかし、マーフィーは眠らない、それはまさに私が見ているものです. ガベージ コレクションに問題があるのか​​、それとも Clojure の STM への内部キューイングなのか、それとも私の愚かさなのか、それともまったく別の何かなのか、私にはわかりません。promisefalseonCompletedagentagenttrueonCompletedagenttrueonCompletedagent

を介して直接実行できるように、ソースを自己完結型の順序でここに示しますlein repl。最初に、 Netflix の rxjava のバージョンへのproject.clj依存関係を宣言するleiningen プロジェクト ファイルです。0.9.0

(defproject expt2 "0.1.0-SNAPSHOT"
  :description "FIXME: write description"
  :url "http://example.com/FIXME"
  :license {:name "Eclipse Public License"
            :url "http://www.eclipse.org/legal/epl-v10.html"}
  :dependencies [[org.clojure/clojure               "1.5.1"]
                 [com.netflix.rxjava/rxjava-clojure "0.9.0"]]
  :main expt2.core)

これで、名前空間と Clojure 要件、および Java インポートが次のようになります。

(ns expt2.core
  (:require clojure.pprint)
  (:refer-clojure :exclude [distinct])
  (:import [rx Observable subscriptions.Subscriptions]))

最後に、コンソールへの出力用のマクロ:

(defmacro pdump [x]
  `(let [x# ~x]
     (do (println "----------------")
         (clojure.pprint/pprint '~x)
         (println "~~>")
         (clojure.pprint/pprint x#)
         (println "----------------")
         x#)))

最後に、私のオブザーバーに。を使用しagentて、任意のオブザーバブルから送信されたメッセージを収集しますonNext。を使用しatomてポテンシャルを収集しonErrorます。オブザーバーの外部の消費者がそれを待つことができるように、 promiseforを使用します。onCompleted

(defn- subscribe-collectors [obl]
  (let [;; Keep a sequence of all values sent:
        onNextCollector      (agent [])
        ;; Only need one value if the observable errors out:
        onErrorCollector     (atom nil)
        ;; Use a promise for 'completed' so we can wait for it on
        ;; another thread:
        onCompletedCollector (promise)]
    (letfn [;; When observable sends a value, relay it to our agent"
            (collect-next      [item] (send onNextCollector (fn [state] (conj state item))))
            ;; If observable errors out, just set our exception;
            (collect-error     [excp] (reset!  onErrorCollector     excp))
            ;; When observable completes, deliver on the promise:
            (collect-completed [    ] (deliver onCompletedCollector true))
            ;; In all cases, report out the back end with this:
            (report-collectors [    ]
              (pdump
               ;; Wait for everything that has been sent to the agent
               ;; to drain (presumably internal message queues):
               {:onNext      (do (await-for 1000 onNextCollector)
                                 ;; Then produce the results:
                                 @onNextCollector)
                ;; If we ever saw an error, here it is:
                :onError     @onErrorCollector
                ;; Wait at most 1 second for the promise to complete;
                ;; if it does not complete, then produce 'false'.
                ;; I expect if this times out before the agent
                ;; times out to see an 'onCompleted' of 'false'.
                :onCompleted (deref onCompletedCollector 1000 false)
                }))]
      ;; Recognize that the observable 'obl' may run on another thread:
      (-> obl
          (.subscribe collect-next collect-error collect-completed))
      ;; Therefore, produce results that wait, with timeouts, on both
      ;; the completion event and on the draining of the (presumed)
      ;; message queue to the agent.
      (report-collectors))))

さて、ここに同期オブザーバブルがあります。それは 25 のメッセージをonNextオブザーバーの喉に送り込み、次に を呼び出しますonCompleted

(defn- customObservableBlocking []
  (Observable/create
    (fn [observer]                       ; This is the 'subscribe' method.
      ;; Send 25 strings to the observer's onNext:
      (doseq [x (range 25)]
        (-> observer (.onNext (str "SynchedValue_" x))))
      ; After sending all values, complete the sequence:
      (-> observer .onCompleted)
      ; return a NoOpSubsription since this blocks and thus
      ; can't be unsubscribed (disposed):
      (Subscriptions/empty))))

オブザーバーをこのオブザーバブルにサブスクライブします。

;;; The value of the following is the list of all 25 events:
(-> (customObservableBlocking)
    (subscribe-collectors))

期待どおりに動作し、コンソールに次の結果が表示されます

{:onNext (do (await-for 1000 onNextCollector) @onNextCollector),
 :onError @onErrorCollector,
 :onCompleted (deref onCompletedCollector 1000 false)}
~~>
{:onNext
 ["SynchedValue_0"
  "SynchedValue_1"
  "SynchedValue_2"
  "SynchedValue_3"
  "SynchedValue_4"
  "SynchedValue_5"
  "SynchedValue_6"
  "SynchedValue_7"
  "SynchedValue_8"
  "SynchedValue_9"
  "SynchedValue_10"
  "SynchedValue_11"
  "SynchedValue_12"
  "SynchedValue_13"
  "SynchedValue_14"
  "SynchedValue_15"
  "SynchedValue_16"
  "SynchedValue_17"
  "SynchedValue_18"
  "SynchedValue_19"
  "SynchedValue_20"
  "SynchedValue_21"
  "SynchedValue_22"
  "SynchedValue_23"
  "SynchedValue_24"],
 :onError nil,
 :onCompleted true}
----------------

futureのスレッドでのみ、まったく同じことを行う非同期オブザーバブルを次に示します。

(defn- customObservableNonBlocking []
  (Observable/create
    (fn [observer]                       ; This is the 'subscribe' method
      (let [f (future
                ;; On another thread, send 25 strings:
                (doseq [x (range 25)]
                  (-> observer (.onNext (str "AsynchValue_" x))))
                ; After sending all values, complete the sequence:
                (-> observer .onCompleted))]
        ; Return a disposable (unsubscribe) that cancels the future:
        (Subscriptions/create #(future-cancel f))))))

;;; For unknown reasons, the following does not produce all 25 events:
(-> (customObservableNonBlocking)
    (subscribe-collectors))

しかし、驚いたことに、コンソールに次のように表示されtrueます。ただし、非同期メッセージの一部のみです。表示されるメッセージの実際の数は実行ごとに異なります。これは、何らかの同時実行現象が発生していることを示しています。手がかりに感謝します。onCompletedpromise

----------------
{:onNext (do (await-for 1000 onNextCollector) @onNextCollector),
 :onError @onErrorCollector,
 :onCompleted (deref onCompletedCollector 1000 false)}
~~>
{:onNext
 ["AsynchValue_0"
  "AsynchValue_1"
  "AsynchValue_2"
  "AsynchValue_3"
  "AsynchValue_4"
  "AsynchValue_5"
  "AsynchValue_6"],
 :onError nil,
 :onCompleted true}
----------------
4

1 に答える 1

7

await-foron エージェントは、これまで(このスレッドまたはエージェントから) エージェントにディスパッチされたすべてのアクションが発生するまで、現在のスレッドをブロックすることを意味します。エージェントとそれがあなたのケースで起こっていることです。エージェントの待機が終了し、マップのキーでその値を逆参照した:onNext後、待機後に true であることが判明した on completed promise を待ちますが、その間に他のメッセージがエージェントにディスパッチされましたベクターにまとめます。

キーをマップの最初のキーにすることでこれを解決でき:onCompletedます。これは基本的に、完了を待ってからエージェントを待つことを意味しsendます。

{:onCompleted (deref onCompletedCollector 1000 false)
 :onNext      (do (await-for 0 onNextCollector)
                                 @onNextCollector)
 :onError     @onErrorCollector
 }
于 2013-06-01T08:45:17.613 に答える