eventmachine ベースの http サーバーでタイムアウトを処理するにはどうすればよいですか? 私は基本的に、HTTP 要求情報を処理時にキューに配置しています。その後、処理によってコールバック関数が呼び出される場合と呼び出されない場合があります。タイムアウト時間を設定できますが、タイムアウト ハンドラまたはタイムアウト コールバックを追加する方法がわかりません。
私はドキュメントに目を通しましたが、それらから有用なものを集めることができませんでした。unbind が呼び出されるまでにリクエストが完了しているため、unbind メソッドにロジックを配置しても明らかに機能しませんでした。また、コールバック作成コードの隣に EM::error_handler を追加しても機能しませんでした。
タイムアウト イベントをキャッチし、タイムアウト イベントで特定の json を返したいと思います。
これが私のコードです-HTTPリクエストハンドラー
class HTTPRequestHandler < EventMachine::Connection
def initialize(s,q,h)
@tcpserver = s
@queue = q
@callback_hash = h
self.comm_inactivity_timeout = API_REQUEST_TIMEOUT
end
def post_init
@parser = RequestParser.new
end
def receive_data(data)
handle_http_request if @parser.parse(data)
end
def parse_query_parms(query_str)
begin
rethash = {}
query_arr = query_str.split(/&/)
query_arr.each { |element|
e_arr = element.split(/\=/)
rethash[e_arr[0]] = e_arr[1]
}
return rethash
rescue
return nil
end
end
def handle_http_request
result = parse_query_parms(@parser.env["QUERY_STRING"]) # hash
if result
if result.has_key?('id') and result.has_key?('rid') and result.has_key?('json')
puts result
# Callback to handle this
cb = EM.Callback{ |rid,rtime,msg|
data = "{\"rid\":\"#{rid}\",\"rtime\":\"#{rtime}\",\"msg\":#{msg}}"
send_data("HTTP/1.1 200 OK\r\n")
send_data("Content-Type: application/json\r\n")
send_data("Content-Length: #{data.bytesize}\r\n")
send_data("\r\n")
send_data(data)
close_connection_after_writing
}
# Add callback to hash
@callback_hash[result['rid']] = cb
# Unencode jsonin url
json_from_api = result['json']
json_from_api = URI.decode(json_from_api)
# Push request onto queue
qreq=QueuedRequest.new(result['id'],json_from_api)
@queue.push(qreq)
else
data = "{\"success\":\"false\",\"response\":\"request needs id, rid, json parameters\"}"
send_data("HTTP/1.1 200 OK\r\n")
send_data("Content-Type: application/json\r\n")
send_data("Content-Length: #{data.bytesize}\r\n")
send_data("\r\n")
send_data("#{data}")
close_connection_after_writing
end
else
data = "{\"success\":\"false\",\"response\":\"unable to parse parameters\"}"
send_data("HTTP/1.1 200 OK\r\n")
send_data("Content-Type: application/json\r\n")
send_data("Content-Length: #{data.bytesize}\r\n")
send_data("\r\n")
send_data("#{data}")
close_connection_after_writing
end
end
end
すべてを初期化してキューを処理するメインループ:
EM.synchrony do
h = {} # map of rids -> callbacks for requests
# Intialize TCP and HTTP Servers
q = EM::Queue.new # Queue of messages from HTTP Server
s = TCPProxyServer.new(h)
EM.start_server(LISTEN_HOST_CLIENT, LISTEN_PORT_API, HTTPRequestHandler, s, q, h)
s.start
puts "Server starting (http and tcp)"
# process queue of messages coming in from API (recursive)
process_queue = Proc.new do |qreq|
# Our functions
@operation = lambda do
puts qreq
begin
# Send data to channel
if s.connections_plug[qreq.id]
s.connections_plug[qreq.id].send_data(qreq.json)
else
return "unable to find id:#{qreq.id} in connection"
end
rescue Exception=>e
puts "Unable to send process queued request! #{e}"
end
end
@callback = lambda { |result| }
EM::defer(@operation,@callback)
EM.next_tick{ q.pop(&process_queue) }
end
q.pop(&process_queue)
end