5

注 - Go の初心者。

チャネルの配列の出力を 1 つにマージするマルチプレクサを作成しました建設的な批判に満足。

func Mux(channels []chan big.Int) chan big.Int {
    // Count down as each channel closes. When hits zero - close ch.
    n := len(channels)
    // The channel to output to.
    ch := make(chan big.Int, n)

    // Make one go per channel.
    for _, c := range channels {
        go func() {
            // Pump it.
            for x := range c {
                ch <- x
            }
            // It closed.
            n -= 1
            // Close output if all closed now.
            if n == 0 {
                close(ch)
            }
        }()
    }
    return ch
}

私はそれをテストしています:

func fromTo(f, t int) chan big.Int {
    ch := make(chan big.Int)

    go func() {
        for i := f; i < t; i++ {
            fmt.Println("Feed:", i)
            ch <- *big.NewInt(int64(i))
        }
        close(ch)
    }()
    return ch
}

func testMux() {
    r := make([]chan big.Int, 10)
    for i := 0; i < 10; i++ {
        r[i] = fromTo(i*10, i*10+10)
    }
    all := Mux(r)
    // Roll them out.
    for l := range all {
        fmt.Println(l)
    }
}

しかし、私の出力は非常に奇妙です:

Feed: 0
Feed: 10
Feed: 20
Feed: 30
Feed: 40
Feed: 50
Feed: 60
Feed: 70
Feed: 80
Feed: 90
Feed: 91
Feed: 92
Feed: 93
Feed: 94
Feed: 95
Feed: 96
Feed: 97
Feed: 98
Feed: 99
{false [90]}
{false [91]}
{false [92]}
{false [93]}
{false [94]}
{false [95]}
{false [96]}
{false [97]}
{false [98]}
{false [99]}

だから私の質問に:

  • Mux で間違っていることはありますか?
  • 出力チャネルから最後の 10 個しか取得できないのはなぜですか?
  • 給餌が奇妙に見えるのはなぜですか?(各入力チャンネルの最初、最後のチャンネルすべて、その後何もない)
  • これを行うより良い方法はありますか?

すべての入力チャネルが出力チャネルに対して同等の権利を持つ必要があります。つまり、あるチャネルからすべての出力を取得し、次のチャネルからすべての出力を取得することはできません。


興味のある人のために-これは修正後の最終的なコードであり、正しい(おそらく)使用sync.WaitGroup

import (
    "math/big"
    "sync"
)

/*
  Multiplex a number of channels into one.
*/
func Mux(channels []chan big.Int) chan big.Int {
    // Count down as each channel closes. When hits zero - close ch.
    var wg sync.WaitGroup
    wg.Add(len(channels))
    // The channel to output to.
    ch := make(chan big.Int, len(channels))

    // Make one go per channel.
    for _, c := range channels {
        go func(c <-chan big.Int) {
            // Pump it.
            for x := range c {
                ch <- x
            }
            // It closed.
            wg.Done()
        }(c)
    }
    // Close the channel when the pumping is finished.
    go func() {
        // Wait for everyone to be done.
        wg.Wait()
        // Close.
        close(ch)
    }()
    return ch
}
4

3 に答える 3

3

から生成された各ゴルーチンは、ループの反復ごとに更新されるMuxため、同じチャネルからプルすることcになります – それらは の値をキャプチャするだけではありませんc。次のようにチャネルをゴルーチンに渡すと、期待される結果が得られます。

for _, c := range channels {
    go func(c <-chan big.Int) {
        ...
    }(c)
}

この変更をここでテストできます。

もう 1 つの考えられる問題は、n変数の処理です。 を使用して実行している場合GOMAXPROCS != 1、2 つのゴルーチンが同時に変数を更新しようとする可能性があります。型は、sync.WaitGroupゴルーチンが完了するのを待つより安全な方法です。

于 2013-10-05T01:02:24.867 に答える
2

事の少し後、私は知っていますが、これに似た汎用の Multiplex 関数を実装するパッケージを書きました。リフレクション パッケージの "select" 呼び出しを使用して、ロックや待機グループを必要とせずに効率的でバランスの取れた多重化を保証します。

于 2014-03-02T03:56:46.540 に答える
0

James Hentridgeの回答に基づいて構築するには、ステートメントを使用するときに再割り当ての問題を処理する慣用的な方法rangeは、ローカル変数を問題の値に割り当てることです。

for _, c := range channels {
    c := c
    go func() {
    ...
    }()
}
于 2014-08-21T19:30:16.290 に答える