0

複数のプロデューサーを介してメッセージを発行する Pulsar トピックがあります。これらのプロデューサーは単なるゴルーチンでありSendAsync、プロデューサーでメソッドを使用してメッセージを送信していますが、Got unexpected send receipt for message警告が表示され続けます。クライアントのライブラリ コード (Pulsar の golang クライアント) を調べたところ、この警告は、プロデューサーがこのマップでリスナーとして登録されていない場合にのみ表示されることがわかりましたmap[uint64]ConnectionListener。しかし、プロデューサーがリスナーとして登録されていない場合、どうすれば公開できますか?

func benchmarkConcurrentProducers(b *testing.B, pool []pulsar.Producer, concur int, payload []byte) {
    wg := &sync.WaitGroup{}
    wg.Add(concur)

    for i := 0; i < concur; i++ {
        go Produce(pool[i], payload, wg)
    }
    wg.Wait()
}

func BenchmarkProducer(b *testing.B) {
    type config struct {
        Pulsarcfg struct {
            Url string `required: "true"`
        }
    }
    cfg := &config{}
    err := configor.Load(cfg, "../config.yml")
    require.Nil(b, err)

    client, err := pulsar.NewClient(pulsar.ClientOptions{
        URL:               cfg.Pulsarcfg.Url,
        OperationTimeout:  30 * time.Second,
        ConnectionTimeout: 30 * time.Second,
    })
    require.Nil(b, err)
    defer client.Close()

    concur := 4

    producerPool := make([]pulsar.Producer, concur)

    for i := 0; i < concur; i++ {
        p, err := New(client, string(i))
        require.Nil(b, err)
        defer p.Close()

        producerPool[i] = p
    }

    path, err := filepath.Abs("../events.json")
    if err != nil {
        log.Fatalf("invalid payload file path, %v", err)
    }
    f, err := ioutil.ReadFile(path)
    if err != nil {
        log.Fatalf("bad payload: %v", err)
    }

    b.Run("benchmark producer", func(b *testing.B) {
        // run the producer b.N times
        for n := 0; n < b.N; n++ {
            benchmarkConcurrentProducers(b, producerPool, concur, f)
        }
    })
}
4

2 に答える 2

1

でプロデューサーがリスナーとして登録されない理由は 2 つありますmap[uint64]ConnectionListener。プロデューサーが作成されなかったか、プロデューサーが閉じられました。私の場合、プロデューサーが実際に作成されたことを意味するメッセージを公開できましたが、SendAsyncメソッドを使用してメッセージを送信していたため、承認を受信するためにブロックされませんでした。そして、deferすべてのメッセージを生成した後にプロデューサーを閉じるために使用していましたが (wait groupプロデューサーを閉じる前にすべてのメッセージが送信されたことを確認するために a を使用しました)、確認を受け取る前にプロデューサーが閉じられたため、警告が表示されました。

于 2020-05-10T13:39:20.970 に答える
0

まったく同じエラーメッセージが表示されました。私の場合、同じクライアントを使用してリーダーとプロデューサーを作成しました。新しいメッセージを生成しているときにリーダーはすでに閉じていましたが、この警告を受け取り続け、さらに悪いことに、への呼び出しでハングしましたproducer.Send()。読み取り用と生成用に別々のクライアントを使用することで、これを解決しました。

于 2020-07-26T13:24:47.467 に答える