2

Go-Stomp を使用して ActiveMQ(Apollo) をサブスクライブしようとしましたが、読み取りタイムアウト エラーが発生しました。私のアプリは、着信メッセージを処理するために 1 日 24 時間稼働している必要があります。

質問:

  1. キューに存在するメッセージがなくなった場合でも、サブスクリプションを維持する方法はありますか? ConnOpt.HeartBeat を入れようとしてもうまくいかないようです
  2. 読み取りタイムアウトの後、もう 1 つのメッセージを受け入れているように見えるのはなぜですか?

以下は私の手順です:

  • テスト用に 1000 個のメッセージを入力キューに入れました
  • サブスクライバーを実行します。コードは以下に記載されています
  • サブスクライバーは 1000 メッセージの読み取りを終了しました 2 ~ 3 秒後、エラー "2016/10/07 17:12:44 サブスクリプション 1: /queue/hflc-in: ERROR メッセージ: 読み取りタイムアウト" が表示されました。
  • さらに 1000 件のメッセージを送信しますが、サブスクリプションは既に停止しているようです。したがって、処理されていないメッセージはありません

私のコード:

  var(
   serverAddr   = flag.String("server", "10.92.10.10:61613", "STOMP server    endpoint")
   messageCount = flag.Int("count", 10, "Number of messages to send/receive")
   inputQ       = flag.String("inputq", "/queue/hflc-in", "Input queue")
)

var options []func(*stomp.Conn) error = []func(*stomp.Conn) error{
   stomp.ConnOpt.Login("userid", "userpassword"),
   stomp.ConnOpt.Host("mybroker"),
   stomp.ConnOpt.HeartBeat(360*time.Second, 360*time.Second), // I put this but seems no impact
}

func main() {
  flag.Parse()
  jobschan := make(chan bean.Request, 10)
  //my init setup
  go getInput(1, jobschan)
}

func getInput(id int, jobschan chan bean.Request) {
   conn, err := stomp.Dial("tcp", *serverAddr, options...)

   if err != nil {
      println("cannot connect to server", err.Error())
      return
   }
   fmt.Printf("Connected %v \n", id)

   sub, err := conn.Subscribe(*inputQ, stomp.AckClient)
   if err != nil {
     println("cannot subscribe to", *inputQ, err.Error())
     return
   }

   fmt.Printf("Subscribed %v \n", id)
   var messageCount int
   for {
    msg := <-sub.C
    //expectedText := fmt.Sprintf("Message #%d", i)
    if msg != nil {

        actualText := string(msg.Body)
        
        var req bean.Request
        if actualText != "SHUTDOWN" {
            messageCount = messageCount + 1
            var err2 = easyjson.Unmarshal([]byte(actualText), &req)
            if err2 != nil {
                log.Error("Unable unmarshall", zap.Error(err))
                println("message body %v", msg.Body) // what is [0/0]0x0 ?
            } else {
                fmt.Printf("Subscriber %v received message, count %v \n  ", id, messageCount)
                jobschan <- req
            }
        } else {
            logchan <- "got some issue"
        }
    }
   }
  }

エラー :

2016/10/07 17:12:44 サブスクリプション 1: /queue/hflc-in: エラー メッセージ: タイムアウトの読み取り
[E] 2016-10-07T09:12:44Z アンマーシャル
メッセージ本文 %v [0/0]0x0

4

2 に答える 2

1

これらの行を追加することで解決しました:

Apollo では、数秒後に空になった後にキューが削除されたことに気付いたので、 auto_delete_after を apollo.xml に数時間置きます。

<queue id="hflc-in" dlq="dlq-in" nak_limit="3" auto_delete_after="7200"/>
<queue id="hflc-log" dlq="dlq-log" nak_limit="3" auto_delete_after="7200"/>
<queue id="hflc-out" dlq="dlq-out" nak_limit="3" auto_delete_after="7200"/>

Go では、キューにメッセージが見つからない場合、go-stomp はすぐにあきらめることに気付いたので、conn オプションに HeartBeat Error を追加します。

var options []func(*stomp.Conn) error = []func(*stomp.Conn) error{
   //.... original configuration
   stomp.ConnOpt.HeartBeatError(360 * time.Second),
}

ただし、質問2についてはまだ混乱しています。

于 2016-10-10T03:47:07.653 に答える