ライブ ストリーミング サーバーを構築する clojure の方法を探しています。私が苦労している特定の問題は、単一のプロバイダー (Web カメラ) から未定義の数のスレッド (接続されたクライアント) に値を送信する方法です。明らかに、クライアントが接続するとき、Web カメラからの完全なビデオ ファイルには関心がありません。基本的には、ヘッダーを送信する必要があり、その瞬間に Web カメラから到着するパッケージは何でも送信する必要があります。
ストレートJavaでは、簡単だと思います。クライアントが接続するたびにアレイに接続を追加し、切断するとアレイから接続を削除し、Web カメラから新しいパッケージが到着するたびにそれをアレイ内の各エントリに送信します。配列をロックして、エントリを追加/削除するか、ループしてパケットを送信します。もちろん、同じものを clojure で作成することもできますが、これは非常に悪いことのように思えます。
メッセージ パッシング マルチスレッド アーキテクチャでは、これは同様に簡単に思えます。
clojure で考えられる唯一の解決策は、promise の遅延シーケンスを使用することです。確かにそれは機能しますが、よりクリーンなコードとより多くのclojure-zenにつながる別の方法があるかどうか疑問に思っていました:)
説明のために: Promise と Atom を使用した簡略化された問題:
データを生成する 1 つのプロバイダー関数、このデータを読み取る 1 つのスレッド。後で、この最初のスレッドからデータを取得したいが、取得できない他のスレッドがいくつか作成されます。
(defn provider []
(lazy-seq
(do
(Thread/sleep 100)
(cons (rand) (provider)))))
(def printer (agent nil))
(defn log [& line]
(send-off printer (fn [x] (apply println line))))
(def promises (atom (repeatedly promise)))
(defn client-connected-thread [x input]
(log "Client connection " x " is connected with the provider and just received" @(first input))
(recur x (rest input)))
(.start (Thread. (fn []
(loop [stream (provider)]
(when-let [item (first stream)]
(log "I received " item", will share now")
(deliver (first @promises) item)
(swap! promises rest))
(recur (rest stream))))))
(Thread/sleep 300)
(.start (Thread. #(client-connected-thread 1 @promises)))
(Thread/sleep 100)
(.start (Thread. #(client-connected-thread 2 @promises)))
(Thread/sleep 50)
(.start (Thread. #(client-connected-thread 3 @promises)))
したがって、基本的に問題は次のとおりです。これは、この問題に取り組む正しい方法ですか?
また、ここではストリーミング メディア サーバーについて話しているので、プロバイダー機能は 1 秒あたり数万のアイテムを提供し、10 のクライアントが接続されている可能性があります。promise-system は、そのように頻繁に使用するためのものですか?