1

私は Binance API を既存のシステムに統合しています。ほとんどの部分は簡単ですが、データ ストリーミング API は、go-routine に関する私の限られた理解に当てはまります。Binance 用の golang SDK に特別なものがあるとは思いませんが、基本的に必要なのは 2 つの関数だけです。1 つはデータ ストリームを開始し、パラメーターとして指定されたイベント ハンドラーでイベントを処理し、もう 1 つはデータ ストリームを終了します。他のすべての接続が閉じられるため、実際にクライアントをシャットダウンする必要はありません。以前のプロジェクトでは、これには 2 つのメッセージ タイプがありましたが、binance SDK は 2 つの go チャネルを返す実装を使用します。1 つはエラー用で、もう 1 つは名前から推測すると、データ ストラムを停止するためです。

データ ストリームを開始するために私が書いたコードは次のようになります。


func startDataStream(symbol, interval string, wsKlineHandler futures.WsKlineHandler, errHandler futures.ErrHandler) (err error){

    doneC, stopC, err := futures.WsKlineServe(symbol, interval, wsKlineHandler, errHandler)
    if err != nil {
        fmt.Println(err)
        return err
    }

    return nil
}


これは期待どおりに機能し、データをストリーミングします。簡単なテストでそれを確認します。


func runWSDataTest() {
    symbol := "BTCUSDT"
    interval := "15m"
    errHandler := func(err error) {fmt.Println(err)}

    wsKlineHandler := func(event *futures.WsKlineEvent) {fmt.Println(event)}

    _ = startDataStream(symbol, interval, wsKlineHandler, errHandler)
}

主に不完全な理解のために、私にはあまり明確ではないことは、実際にはどうすればストリームを停止できるかということです. 返された stopC チャネルを使用して、たとえばシステム レベルで sigterm に似た終了シグナルを何らかの形で発行すると、ストリームが終了するはずです。

たとえば、シンボルを引数として受け取る stopDataStream 関数があるとします。

func stopDataStream(symbol){

}

5 つのシンボルに対して 5 つのデータ ストリームを開始し、ストリームの 1 つだけを停止したいとします。それは次の疑問を投げかけます:

  1. これらすべての stopC チャネルを追跡するにはどうすればよいですか?

  2. シンボルでキー付けされたコレクションを使用し、stopC チャネルをプルしてから、そのデータ ストリームだけを終了する信号を発行することはできますか?

  3. stop 関数から stopC チャネルに実際に書き込むにはどうすればよいですか?

繰り返しますが、これは特に難しいとは思いません。ドキュメントからまだ理解できていないだけなので、助けていただければ幸いです。

ありがとうございました

4

1 に答える 1

2

(元々@Marvin.Hansenによって書かれた回答)

結局、チャンネルを保存して閉じるだけですべて解決しました。これがいかに簡単であるかには本当に驚きましたが、更新された関数のコードは次のとおりです。

func startDataStream(symbol, interval string, wsKlineHandler futures.WsKlineHandler, errHandler futures.ErrHandler) (err error) {

    _, stopC, err := futures.WsKlineServe(symbol, interval, wsKlineHandler, errHandler)
    if err != nil {
        fmt.Println(err)
        return err
    }
    // just save the stop channel 
    chanMap[symbol] = stopC
    return nil
}

そして、停止機能は本当に恥ずかしいほど些細なことになります。

func stopDataStream(symbol string) {
    stopC := chanMap[symbol]  // load the stop channel for the symbol
    close(stopC) // just close it. 
}

最後に、すべてをテストします。


var (
    chanMap map[string]chan struct{}
)

func runWSDataTest() {
    chanMap = make(map[string]chan struct{})

    symbol := "BTCUSDT"
    interval := "15m"
    errHandler := func(err error) { fmt.Println(err) }
    wsKlineHandler := getKLineHandler()

    println("Start stream")
    _ = startDataStream(symbol, interval, wsKlineHandler, errHandler)

    time.Sleep(3 * time.Second)

    println("Stop stream")
    stopDataStream(symbol)

    time.Sleep(1 * time.Second)
}

これです。

于 2021-07-14T20:11:20.350 に答える