4

Heroku チュートリアルを使用して websocket を実装しました。

Thin では正常に動作しますが、Unicorn と Puma では動作しません。

また、クライアントのメッセージに応答するエコー メッセージも実装されています。各サーバーで適切に動作するため、websockets の実装に問題はありません。

Redis のセットアップも正しいです (すべてのメッセージをキャッチし、subscribeブロック内のコードを実行します)。

今はどのように機能しますか:

サーバーの起動時に、空の@clients配列が初期化されます。次に、Redis をリッスンし、そのメッセージを @clients 配列から対応するユーザーに送信するための新しいスレッドが開始されます。

ページの読み込み時に、新しい websocket 接続が作成され、@clients 配列に保存されます。

ブラウザーからメッセージを受信すると、同じユーザーに接続されているすべてのクライアントにメッセージを送り返します (その部分は、Thin と Puma の両方で適切に機能しています)。

Redis からメッセージを受信した場合、@clients 配列に保存されているすべてのユーザーの接続も検索します。ここで奇妙なことが起こります:

  • Thin で実行している場合、@clients 配列で接続を検出し、それらにメッセージを送信します。

  • Puma/Unicorn で実行している場合、@clients 配列は、その順序で試しても (ページのリロードなどを行わずに) 常に空です。

    1. ブラウザからメッセージを送信 ->@clients.lengthが 1 の場合、メッセージが配信されます
    2. Redis 経由でメッセージを送信 ->@clients.lengthは 0、メッセージは失われます
    3. ブラウザからメッセージを送信 ->@clients.lengthは 1 のままで、メッセージは配信されます

誰かが私に欠けているものを明確にしてもらえますか?

Puma サーバーの関連構成:

workers 1
threads_count = 1
threads threads_count, threads_count

関連するミドルウェア コード:

require 'faye/websocket'

class NotificationsBackend

  def initialize(app)
    @app     = app
    @clients = []
    Thread.new do
      redis_sub = Redis.new
      redis_sub.subscribe(CHANNEL) do |on|
        on.message do |channel, msg|
          # logging @clients.length from here will always return 0
          # [..] retrieve user
          send_message(user.id, { message: "ECHO: #{event.data}"} )
        end
      end
    end
  end

  def call(env)
    if Faye::WebSocket.websocket?(env)
      ws = Faye::WebSocket.new(env, nil, {ping: KEEPALIVE_TIME })
      ws.on :open do |event|
        # [..] retrieve current user
        if user
          # add ws connection to @clients array
        else
          # close ws
        end
      end

      ws.on :message do |event|
        # [..] retrieve current user
        Redis.current.publish({user_id: user.id, { message: "ECHO: #{event.data}"}} )
      end

      ws.rack_response
    else
      @app.call(env)
    end
  end
  def send_message user_id, message
    # logging @clients.length here will always return correct result
    # cs = all connections which belong to that client
    cs.each { |c| c.send(message.to_json) }
  end
end
4

2 に答える 2

4

Unicorn (および明らかに puma) は、マスター プロセスを起動し、1 つまたは複数のワーカーをフォークします。fork はプロセス全体をコピーします (または少なくともコピーの錯覚を示します - 実際のコピーは通常、ページに書き込むときにのみ発生します) が、呼び出されたスレッドのみがfork新しいプロセスに存在します。

アプリが fork される前に初期化されていることは明らかです。これは通常、ワーカーがすばやく起動し、コピー オン ライト メモリを節約できるようにするために行われます。結果として、redis チェック スレッドはマスター プロセスでのみ実行されます@clientsが、子プロセスでは変更されます。

おそらく、redis スレッドの作成を延期するか、アプリのプリロードを無効にすることで、これを回避できますが、設定によって、単一のワーカー プロセスを超えてスケ​​ーリングできないことに注意する必要があります (puma と、jruby のようなスレッド フレンドリーな JVM を使用すると、制約が少ない)

于 2015-06-11T07:14:27.830 に答える
3

誰かが同じ問題に直面する場合に備えて、私が思いついた 2 つの解決策を次に示します。

1.アプリのプリロードを無効にします(これは私が思いついた最初の解決策でした)

preload_app!puma.rb ファイルから削除するだけです。したがって、すべてのスレッドには独自の@clients変数があります。callまた、他のミドルウェア メソッド (など)からアクセスできます。

欠点: アプリのプリロードの利点がすべて失われます。スレッドが 2 つあるワーカーが 1 つまたは 2 つしかない場合は問題ありませんが、多数のスレッドが必要な場合は、アプリをプリロードすることをお勧めします。だから私は私の研究を続けました、そしてここに別の解決策があります:

2.スレッドの初期化をinitializeメソッドの外に移動します(これは私が現在使用しているものです)

たとえば、callメソッドに移動したので、ミドルウェア クラスのコードは次のようになります。

attr_accessor :subscriber

def call(env)
  @subscriber ||= Thread.new do # if no subscriber present, init new one
    redis_sub = Redis.new(url: ENV['REDISCLOUD_URL'])
    redis_sub.subscribe(CHANNEL) do |on|
      on.message do |_, msg|
        # parsing message code here, retrieve user
        send_message(user.id, { message: "ECHO: #{event.data}"} )
      end
    end
  end
  # other code from method
end

どちらのソリューションも同じ問題を解決します。Redis リスニング スレッドは、メイン プロセス (実際にはリクエストを処理していない) ではなく、各 Puma ワーカー/スレッドに対して初期化されます。

于 2015-09-13T08:27:13.483 に答える