0

sidekiq サーバーの構成に問題があります。ページを更新するとすぐにプロセスがフォアグラウンドで実行されているようです。/consumers/fetch 永久にバックグラウンドに置く必要があります。

consumer_controller.rb

require 'kafka'

class ConsumersController < ApplicationController

  def fetch

    @consumer = Kafka::Consumer.new( { :host => ENV["host"],
        :port => ENV["port"],
        :topic => ENV["topic"]})

    @consumer.loop do |message|
      logger.info "-------------#{message.inspect}--------------"
      logger.info "-------------#{message.first.payload.inspect}--------------"
      unless message.blank?
        ConsumerWorker.perform_async(message.first.payload)
      end
    end
  end
end

consumer_worker.rb

class ConsumerWorker

  include Sidekiq::Worker

  def perform(message)
    payload = message.first["payload"]
    hash = JSON.parse(payload)
    return @message = Message.new(hash) if hash["concern"] == 'order_create' or hash["concern"] == 'first_payment'
  end

end

メッセージ.rb

class Message

  attr_reader :bundle_id, :order_id, :order_number, :event

  def initialize(message)
    @payload = message["payload"]
    @bundle_id = @payload["bundle_id"]
    @order_id  = @payload["order_id"]
    @order_number = @payload["order_number"]
    @event = message["concern"]
  end
end
4

1 に答える 1

0

このブロックを移動する必要があると思います

@consumer.loop do |message|
end

ブロックの実行後に消費が行われると思うので、どういうわけかワーカー内で。

于 2013-08-01T09:33:28.297 に答える