1

eventmachine ベースの http サーバーでタイムアウトを処理するにはどうすればよいですか? 私は基本的に、HTTP 要求情報を処理時にキューに配置しています。その後、処理によってコールバック関数が呼び出される場合と呼び出されない場合があります。タイムアウト時間を設定できますが、タイムアウト ハンドラまたはタイムアウト コールバックを追加する方法がわかりません。

私はドキュメントに目を通しましたが、それらから有用なものを集めることができませんでした。unbind が呼び出されるまでにリクエストが完了しているため、unbind メソッドにロジックを配置しても明らかに機能しませんでした。また、コールバック作成コードの隣に EM::error_handler を追加しても機能しませんでした。

タイムアウト イベントをキャッチし、タイムアウト イベントで特定の json を返したいと思います。

これが私のコードです-HTTPリクエストハンドラー

class HTTPRequestHandler  < EventMachine::Connection

  def initialize(s,q,h)
    @tcpserver = s
    @queue = q
    @callback_hash = h
    self.comm_inactivity_timeout = API_REQUEST_TIMEOUT

  end

  def post_init
      @parser = RequestParser.new
  end

  def receive_data(data)
    handle_http_request if @parser.parse(data)
  end

  def parse_query_parms(query_str)
    begin
      rethash = {}
      query_arr = query_str.split(/&/)
      query_arr.each { |element|
        e_arr = element.split(/\=/)
        rethash[e_arr[0]] = e_arr[1]
      }
      return rethash
    rescue
      return nil
    end
  end

  def handle_http_request
    result = parse_query_parms(@parser.env["QUERY_STRING"]) # hash
    if result
      if result.has_key?('id') and result.has_key?('rid') and result.has_key?('json')
        puts result

        # Callback to handle this
         cb = EM.Callback{ |rid,rtime,msg|
           data = "{\"rid\":\"#{rid}\",\"rtime\":\"#{rtime}\",\"msg\":#{msg}}"
           send_data("HTTP/1.1 200 OK\r\n")
           send_data("Content-Type: application/json\r\n")
           send_data("Content-Length: #{data.bytesize}\r\n")
           send_data("\r\n")
           send_data(data)
           close_connection_after_writing
         }

         # Add callback to hash
         @callback_hash[result['rid']] = cb

         # Unencode jsonin url
         json_from_api = result['json']
         json_from_api = URI.decode(json_from_api)

         # Push request onto queue
         qreq=QueuedRequest.new(result['id'],json_from_api)
         @queue.push(qreq)

      else
        data = "{\"success\":\"false\",\"response\":\"request needs id, rid, json parameters\"}"
        send_data("HTTP/1.1 200 OK\r\n")
        send_data("Content-Type: application/json\r\n")
        send_data("Content-Length: #{data.bytesize}\r\n")
        send_data("\r\n")
        send_data("#{data}")
        close_connection_after_writing
      end
    else
      data = "{\"success\":\"false\",\"response\":\"unable to parse parameters\"}"
      send_data("HTTP/1.1 200 OK\r\n")
      send_data("Content-Type: application/json\r\n")
      send_data("Content-Length: #{data.bytesize}\r\n")
      send_data("\r\n")
      send_data("#{data}")
      close_connection_after_writing
    end
  end

end

すべてを初期化してキューを処理するメインループ:

EM.synchrony do
  h = {} # map of rids -> callbacks for requests

  # Intialize TCP and HTTP Servers
  q = EM::Queue.new # Queue of messages from HTTP Server
  s = TCPProxyServer.new(h)
  EM.start_server(LISTEN_HOST_CLIENT, LISTEN_PORT_API, HTTPRequestHandler, s, q, h)
  s.start
  puts "Server starting (http and tcp)"

  # process queue of messages coming in from API (recursive)
  process_queue = Proc.new do |qreq| 
    # Our functions
    @operation = lambda do
      puts qreq
      begin
        # Send data to channel
        if s.connections_plug[qreq.id]
          s.connections_plug[qreq.id].send_data(qreq.json)
        else
          return "unable to find id:#{qreq.id} in connection"
        end
      rescue Exception=>e
        puts "Unable to send process queued request! #{e}"
      end     
    end
    @callback = lambda { |result| }

    EM::defer(@operation,@callback) 
    EM.next_tick{ q.pop(&process_queue) }
  end
  q.pop(&process_queue)
end
4

1 に答える 1

2

EventMachineグループのアドバイスを受けて、httpリクエストにタイマーを追加しました。

class HTTPRequestHandler  < EventMachine::Connection
  def initialize(s,q,h)
    @tcpserver = s
    @queue = q
    @callback_hash = h
    #self.comm_inactivity_timeout = API_REQUEST_TIMEOUT # handled using one off timer
  end

  def post_init
    @parser = RequestParser.new

    # Use timer to handle timeout
    @timer = EventMachine::Timer.new API_REQUEST_TIMEOUT, proc {
      data = {:err => "timeout"}
      data = data.to_json
      send_data("HTTP/1.1 200 OK\r\n")
      send_data("Content-Type: application/json\r\n")
      send_data("Content-Length: #{data.bytesize}\r\n")
      send_data("\r\n")
      send_data("#{data}")
      close_connection_after_writing
    }
  end

  def unbind
    @timer.cancel()
  end

  def receive_data(data)
  end

end
于 2012-07-07T21:23:54.203 に答える