3

ライブ ストリーミング サーバーを構築する clojure の方法を探しています。私が苦労している特定の問題は、単一のプロバイダー (Web カメラ) から未定義の数のスレッド (接続されたクライアント) に値を送信する方法です。明らかに、クライアントが接続するとき、Web カメラからの完全なビデオ ファイルには関心がありません。基本的には、ヘッダーを送信する必要があり、その瞬間に Web カメラから到着するパッケージは何でも送信する必要があります。

ストレートJavaでは、簡単だと思います。クライアントが接続するたびにアレイに接続を追加し、切断するとアレイから接続を削除し、Web カメラから新しいパッケージが到着するたびにそれをアレイ内の各エントリに送信します。配列をロックして、エントリを追加/削除するか、ループしてパケットを送信します。もちろん、同じものを clojure で作成することもできますが、これは非常に悪いことのように思えます。

メッセージ パッシング マルチスレッド アーキテクチャでは、これは同様に簡単に思えます。

clojure で考えられる唯一の解決策は、promise の遅延シーケンスを使用することです。確かにそれは機能しますが、よりクリーンなコードとより多くのclojure-zenにつながる別の方法があるかどうか疑問に思っていました:)

説明のために: Promise と Atom を使用した簡略化された問題:

データを生成する 1 つのプロバイダー関数、このデータを読み取る 1 つのスレッド。後で、この最初のスレッドからデータを取得したいが、取得できない他のスレッドがいくつか作成されます。

(defn provider []                                                                                                                                                                                                                                                                                                             
  (lazy-seq                                                                                                                                                                                                                                                                                                                   
    (do                                                                                                                                                                                                                                                                                                                       
      (Thread/sleep 100)                                                                                                                                                                                                                                                                                                      
      (cons (rand) (provider)))))                                                                                                                                                                                                                                                                                             

(def printer (agent nil))                                                                                                                                                                                                                                                                                                     
(defn log [& line]                                                                                                                                                                                                                                                                                                            
  (send-off printer (fn [x] (apply println line))))                                                                                                                                                                                                                                                                           

(def promises (atom (repeatedly promise)))                                                                                                                                                                                                                                                                                    

(defn client-connected-thread [x input]                                                                                                                                                                                                                                                                                       
  (log "Client connection " x " is connected with the provider and just received" @(first input))                                                                                                                                                                                                                             
  (recur x (rest input)))                                                                                                                                                                                                                                                                                                     

(.start (Thread. (fn []                                                                                                                                                                                                                                                                                                       
                   (loop [stream (provider)]                                                                                                                                                                                                                                                                                  
                     (when-let [item (first stream)]                                                                                                                                                                                                                                                                          
                       (log "I received " item", will share now")                                                                                                                                                                                                                                                             
                       (deliver (first @promises) item)                                                                                                                                                                                                                                                                       
                       (swap! promises rest))                                                                                                                                                                                                                                                                                 
                       (recur (rest stream))))))                                                                                                                                                                                                                                                                              


(Thread/sleep 300)                                                                                                                                                                                                                                                                                                            
(.start (Thread. #(client-connected-thread 1 @promises)))                                                                                                                                                                                                                                                                     
(Thread/sleep 100)                                                                                                                                                                                                                                                                                                            
(.start (Thread. #(client-connected-thread 2 @promises)))                                                                                                                                                                                                                                                                     
(Thread/sleep 50)                                                                                                                                                                                                                                                                                                             
(.start (Thread. #(client-connected-thread 3 @promises)))            

したがって、基本的に問題は次のとおりです。これは、この問題に取り組む正しい方法ですか?

また、ここではストリーミング メディア サーバーについて話しているので、プロバイダー機能は 1 秒あたり数万のアイテムを提供し、10 のクライアントが接続されている可能性があります。promise-system は、そのように頻繁に使用するためのものですか?

4

2 に答える 2

1

アレフを見てください。これは、必要なシナリオの実装に役立つ「非同期チャネル」を提供するためのライブラリです。

于 2012-03-06T06:46:25.740 に答える
1

Clojure には、情報を非同期的に送信する必要がある状況に対応するエージェントが用意されており、ユース ケースに適していると思われます。

あなたは確かに非常に近いですが、作業エージェントをいくつかの場所に貼り付けて終了しました。

「ストレートなClojureでは、簡単だと思います。クライアントが接続するたびに、エージェントのエージェントのベクトルに接続を追加し、切断するとエージェントのエージェントから接続を削除し、Webカメラから新しいパッケージが到着するたびにエージェント内の各エージェントに送信します。」

スレッドプールを空にしないようにするsend-off代わりに、必ず使用してください。send

これには、「配列をロックする」アプローチよりも多くの利点があります。

  • 遅いクライアントが 1 つあるだけで、接続の追加、削除、または追加ができなくなることはありません
  • クライアントは最終的に、それぞれを個別に追跡する必要なく、すべてのフレームを取得します
  • ロックを心配する必要はありません
  • 手動でスレッドを割り当てる必要はありません
  • アルゴリズムの単純なコアを変更せずに、時計などを使用してパフォーマンスを報告できます。

大まかなアウトラインは次のようになります。

user> (def connections-stub (range))
user> (def connections (agent []))
#'user/connections
user> (defn accept-connection [connection] 
    (send connections conj (agent connection)))
#'user/accept-connection
user> (map accept-connection (take 10 connections-stub))
(#<Agent@2a81df82: [#<Agent@3478e59b: 0> #<Agent@6d2a3e06: 1>]> #<Agent@2a81df82: [#<Agent@3478e59b: 0> #<Agent@6d2a3e06: 1> #<Agent@704e33e7: 2>]> #<Agent@2a81df82: [#<Agent@3478e59b: 0> #<Agent@6d2a3e06: 1> #<Agent@704e33e7: 2> #<Agent@1e31bc4b: 3>]> #<Agent@2a81df82: [#<Agent@3478e59b: 0> #<Agent@6d2a3e06: 1> #<Agent@704e33e7: 2> #<Agent@1e31bc4b: 3> #<Agent@5340ef69: 4>]> #<Agent@2a81df82: [#<Agent@3478e59b: 0> #<Agent@6d2a3e06: 1> #<Agent@704e33e7: 2> #<Agent@1e31bc4b: 3> #<Agent@5340ef69: 4> #<Agent@4c260132: 5>]> #<Agent@2a81df82: [#<Agent@3478e59b: 0> #<Agent@6d2a3e06: 1> #<Agent@704e33e7: 2> #<Agent@1e31bc4b: 3> #<Agent@5340ef69: 4> #<Agent@4c260132: 5> #<Agent@5318a0ac: 6>]> #<Agent@2a81df82: [#<Agent@3478e59b: 0> #<Agent@6d2a3e06: 1> #<Agent@704e33e7: 2> #<Agent@1e31bc4b: 3> #<Agent@5340ef69: 4> #<Agent@4c260132: 5> #<Agent@5318a0ac: 6> #<Agent@75dca6d2: 7>]> #<Agent@2a81df82: [#<Agent@3478e59b: 0> #<Agent@6d2a3e06: 1> #<Agent@704e33e7: 2> #<Agent@1e31bc4b: 3> #<Agent@5340ef69: 4> #<Agent@4c260132: 5> #<Agent@5318a0ac: 6> #<Agent@75dca6d2: 7> #<Agent@694c6171: 8>]> #<Agent@2a81df82: [#<Agent@3478e59b: 0> #<Agent@6d2a3e06: 1> #<Agent@704e33e7: 2> #<Agent@1e31bc4b: 3> #<Agent@5340ef69: 4> #<Agent@4c260132: 5> #<Agent@5318a0ac: 6> #<Agent@75dca6d2: 7> #<Agent@694c6171: 8> #<Agent@159177b9: 9>]> #<Agent@2a81df82: [#<Agent@3478e59b: 0> #<Agent@6d2a3e06: 1> #<Agent@704e33e7: 2> #<Agent@1e31bc4b: 3> #<Agent@5340ef69: 4> #<Agent@4c260132: 5> #<Agent@5318a0ac: 6> #<Agent@75dca6d2: 7> #<Agent@694c6171: 8> #<Agent@159177b9: 9>]>)

user> (defn send-frame [con-agent frame] 
       (send con-agent 
         (fn [connection frame] 
           (println "sending " frame " to " connection) connection) frame))
#'user/send-frame

user> (send-frame (first @connections) "hello")
sending  hello  to  0
#<Agent@da69a9c: 0>

user> (defn dispatch-frame [frame] 
        (doall (map #(send-frame % frame) @connections)))
#'user/dispatch-frame

user> (dispatch-frame "hello")
sending  hello  to  0
sending  hello  to  1
sending  hello  to  2
sending  hello  to  3
sending  hello  to  4
sending  hello  to  5
sending  hello  to  6
sending  hello  to  7
sending  hello  to  8
sending  hello  to  9
(#<Agent@da69a9c: 0> #<Agent@34f07ec4: 1> #<Agent@11ee68d1: 2> #<Agent@3b237a89: 3> #<Agent@1641d6b4: 4> #<Agent@3c76ced6: 5> #<Agent@1c05629d: 6> #<Agent@258d3fca: 7> #<Agent@5c56fa08: 8> #<Agent@52395294: 9>)
user> 
于 2012-03-06T01:10:49.103 に答える