4

Web ベースのスライド ショー用のアプリケーションを作成しています。1 人の「マスター」ユーザーがスライド間を移動でき、全員のブラウザーがそれに従います。これを行うために、メッセージを送信するグローバル チャネルに websockets と Redis を使用しています。接続する各クライアントには、配列に格納された情報があります@clients。次に、Redis チャネルをサブスクライブするための別のスレッドがあります。ここには、配列内の全員にメッセージを送信する必要がある「on.message」ブロックが定義されています@clientsが、その配列はこのブロック内では空です (他の場所では空ではありません)。モジュール)。

この例にほぼ従っています: https://devcenter.heroku.com/articles/ruby-websockets

カスタムミドルウェアクラスにある関連コード:

require 'faye/websocket'
require 'redis'

class WsCommunication
  KEEPALIVE_TIME = 15 #seconds
  CHANNEL = 'vip-deck'

  def initialize(app)
    @app = app
    @clients = []

    uri = URI.parse(ENV['REDISCLOUD_URL'])
    Thread.new do
      redis_sub = Redis.new(host: uri.host, port: uri.port, password: uri.password)
      redis_sub.subscribe(CHANNEL) do |on|
        on.message do |channel, msg|
          puts @clients.count
          ### prints '0,' no clients receive msg
          @clients.each { |ws| ws.send(msg) }
        end
      end
    end
  end

  def call(env)
    if Faye::WebSocket.websocket?(env)
    ws = Faye::WebSocket.new(env, nil, {ping: KEEPALIVE_TIME})
  
    ws.on :open do |event|
      @clients << ws
      puts @clients.count
      ### prints actual number of clients
    end

    ws.on :message do |event|
      $redis.publish(CHANNEL, event.data)
    end

    ws.on :close do |event|
      @clients.delete(ws)
      ws = nil
    end

    ws.rack_response
  else
    @app.call(env)
  end
end
end

@clientsインスタンス変数はスレッド間で共有されないため、新しいスレッド内でアクセスすると配列は空になりますか? もしそうなら、どうすればスレッド間で変数を共有できますか?

また、 $clients (グローバル変数、スレッド間でアクセスできる必要があります) を使用してみましたが、役に立ちませんでした。

4

2 に答える 2

0

@client はすべてのスレッドで共有する必要があります。クライアントが誤ってアレイから削除されていないことを確認してください。ws.on :close ブロックに「client deleted」を入れてテストしてみてください。また、@client 変数が次のように使用されているミューテックスを使用することもできます: http://ruby-doc.org/core-2.2.0/Mutex.html

于 2016-07-25T14:38:13.310 に答える
0

最後に更新された編集: 作業コードを表示します。デバッグ コードを除いて変更されていないメイン モジュール。注: 終了前にサブスクリプションを解除する必要があることに関して、既に述べた問題が発生しました。

コードは正しいようです。あなたがそれをどのようにインスタンス化しているかを知りたいです。

config/application.rb には、おそらく少なくとも次のようなものがあります。

require 'ws_communication'
config.middleware.use WsCommunication

次に、JavaScript クライアントでは、次のようになります。

var ws = new WebSocket(uri);

WsCommunication の別のインスタンスをインスタンス化しますか? @clients が空の配列に設定され、症状が現れる可能性があります。次のようなものは正しくありません。

var ws = new WsCommunication;

この投稿が役に立たない場合は、クライアントと、おそらく config/application.rb を表示していただけると助かります。

ところで、 @clients は、読み取りではないにしても、更新時にミューテックスによって保護されるべきであるというコメントに同意します。これは、イベント駆動型システムでいつでも変更できる動的構造です。 redis-mutexは良いオプションです。(Github は現時点ですべてに 500 エラーをスローしているように見えるため、リンクが正しいことを願っています。)

$redis.publish は、メッセージを受信したクライアント数の整数値を返すことにも注意してください。

最後に、解約前にチャンネルの登録を解除する必要がある場合があります。同じチャンネルへの以前のサブスクリプションがクリーンアップされていなかったために、各メッセージを複数回、さらには何度も送信することになった状況がありました。スレッド内でチャネルをサブスクライブしているため、同じスレッド内でサブスクライブを解除する必要があります。そうしないと、適切なスレッドが魔法のように表示されるのを待ってプロセスが「ハング」します。「購読解除」フラグを設定してからメッセージを送信することで、その状況に対処しています。次に、on.message ブロック内で unsubscribe フラグをテストし、そこで unsubscribe を発行します。

わずかなデバッグ変更のみを加えた、提供したモジュール:

require 'faye/websocket'
require 'redis'

class WsCommunication
  KEEPALIVE_TIME = 15 #seconds
  CHANNEL = 'vip-deck'

  def initialize(app)
    @app = app
    @clients = []
    uri = URI.parse(ENV['REDISCLOUD_URL'])
    $redis = Redis.new(host: uri.host, port: uri.port, password: uri.password)
    Thread.new do
      redis_sub = Redis.new(host: uri.host, port: uri.port, password: uri.password)
      redis_sub.subscribe(CHANNEL) do |on|
        on.message do |channel, msg|
          puts "Message event. Clients receiving:#{@clients.count};"
          @clients.each { |ws| ws.send(msg) }
        end
      end
    end
  end

  def call(env)
    if Faye::WebSocket.websocket?(env)
      ws = Faye::WebSocket.new(env, nil, {ping: KEEPALIVE_TIME})

      ws.on :open do |event|
        @clients << ws
        puts "Open event. Clients open:#{@clients.count};"
      end

      ws.on :message do |event|
        receivers = $redis.publish(CHANNEL, event.data)
        puts "Message published:#{event.data}; Receivers:#{receivers};"
      end

      ws.on :close do |event|
        @clients.delete(ws)
        puts "Close event. Clients open:#{@clients.count};"
        ws = nil
      end

      ws.rack_response
    else
      @app.call(env)
    end
  end
end

私が提供したテスト サブスクライバー コード:

# encoding: UTF-8
puts "Starting client-subscriber.rb"
$:.unshift File.expand_path '../lib', File.dirname(__FILE__)
require 'rubygems'
require 'eventmachine'
require 'websocket-client-simple'

puts "websocket-client-simple v#{WebSocket::Client::Simple::VERSION}"

url = ARGV.shift || 'ws://localhost:3000'

EM.run do

  ws = WebSocket::Client::Simple.connect url

  ws.on :message do |msg|
    puts msg
  end

  ws.on :open do
    puts "-- Subscriber open (#{ws.url})"
  end

  ws.on :close do |e|
    puts "-- Subscriber close (#{e.inspect})"
    exit 1
  end

  ws.on :error do |e|
    puts "-- Subscriber error (#{e.inspect})"
  end

end

私が提供したテスト発行者コード。パブリッシャーとサブスクライバーは単なるテストであるため、簡単に組み合わせることができます。

# encoding: UTF-8
puts "Starting client-publisher.rb"
$:.unshift File.expand_path '../lib', File.dirname(__FILE__)
require 'rubygems'
require 'eventmachine'
require 'json'
require 'websocket-client-simple'

puts "websocket-client-simple v#{WebSocket::Client::Simple::VERSION}"

url = ARGV.shift || 'ws://localhost:3000'

EM.run do
  count ||= 0
  timer = EventMachine.add_periodic_timer(5+rand(5)) do
    count += 1
    send({"MESSAGE": "COUNT:#{count};"})
  end

  @ws = WebSocket::Client::Simple.connect url

  @ws.on :message do |msg|
    puts msg
  end

  @ws.on :open do
    puts "-- Publisher open"
  end

  @ws.on :close do |e|
    puts "-- Publisher close (#{e.inspect})"
    exit 1
  end

  @ws.on :error do |e|
    puts "-- Publisher error (#{e.inspect})"
    @ws.close
  end

  def self.send message
    payload = message.is_a?(Hash) ? message : {payload: message}
    @ws.send(payload.to_json)
  end
end

これらすべてをラック ミドルウェア レイヤーで実行するサンプル config.ru:

require './controllers/main'
require './middlewares/ws_communication'
use WsCommunication
run Main.new

これがメインです。実行中のバージョンから削除したので、使用する場合は微調整が必​​要になる場合があります。

%w(rubygems bundler sinatra/base json erb).each { |m| require m }
ENV['RACK_ENV'] ||= 'development'
Bundler.require
$: << File.expand_path('../', __FILE__)
$: << File.expand_path('../lib', __FILE__)

Dir["./lib/*.rb", "./lib/**/*.rb"].each { |file| require file }
env = ENV['OS'] == 'Windows_NT' ? 'development' : ENV['RACK_ENV']

  class Main < Sinatra::Base

    env = ENV['OS'] == 'Windows_NT' ? 'development' : ENV['RACK_ENV']
    get "/" do
      erb :"index.html"
    end

    get "/assets/js/application.js" do
      content_type :js
      @scheme = env == "production" ? "wss://" : "ws://"
      erb :"application.js"
    end
  end
于 2016-07-28T05:22:41.227 に答える