13

アイデアは、スライスに可変数のチャネルを持ち、それらを介して受信した各値を単一のチャネルにプッシュし、入力チャネルの最後のチャネルが閉じられたら、この出力チャネルを閉じることです。このようなものですが、2 つ以上のチャンネルの場合は次のようになります。

func multiplex(cin1, cin2, cout chan int) {
    n := 2
    for {
        select {
        case v, ok := <-cin1:
            if ok {
                cout <- v
            } else {
                n -= 1
            }

        case v, ok := <-cin2:
            if ok {
                cout <- v
            } else {
                n -= 1
            }
        }

        if n == 0 {
            close(cout)
            break
        }
    }
}

上記のコードは、defaultケースがないため、ビジー ループを回避します。これは良いことです (編集: ", ok" の存在により、選択ステートメントが非ブロックになり、ループがビジー状態になっているように見えます。しかし、例のために、コードをブロックするかのように考えてください)。任意の数の入力チャネルでも同様の機能を実現できますか? 明らかに、これはスライスをペアごとに 1 つのチャネルに減らすことで実行できますが、可能であれば、より単純なソリューションにもっと興味があります。

4

3 に答える 3

27

このスニペットはあなたが探しているものだと思います。入力と出力が一方向の通信にのみ使用されるべきであることを明確にするために、署名を変更しました。の追加に注意してください。sync.WaitGroupすべての入力が完了したことを知らせる何らかの方法が必要です。これは非常に簡単です。

func combine(inputs []<-chan int, output chan<- int) {
  var group sync.WaitGroup
  for i := range inputs {
    group.Add(1)
    go func(input <-chan int) {
      for val := range input {
        output <- val
      }
      group.Done()
    } (inputs[i])
  }
  go func() {
    group.Wait()
    close(output)
  } ()
}
于 2012-06-11T18:12:57.260 に答える
3

編集: ペアワイズ リダクションのサンプル コードを追加し、回答の一部を並べ替えました。

推奨される解決策は、「チャネルのスライスを持たないように再構築する」という非回答です。再構築では、多くの場合、複数のゴルーチンが単一のチャネルに送信できる機能を利用できます。したがって、各ソースを別々のチャネルで送信してから、一連のチャネルからの受信を処理する代わりに、チャネルを 1 つ作成して、すべてのソースがそのチャネルで送信できるようにします。

Go は、チャネルのスライスから受信する機能を提供していません。これはよくある質問であり、上記の解決策が推奨されますが、それをプログラムする方法もあります。元の質問で「ペアごとにスライスを減らす」と言って提案していると思った解決策は、バイナリの分割と征服の解決策です。2 つのチャネルを 1 つに多重化するソリューションがある限り、これは問題なく機能します。このためのサンプルコードは、動作に非常に近いです。

サンプルコードを機能させるための小さなトリックが 1 つ欠けているだけです。n をデクリメントする場所に、チャネル変数を nil に設定する行を追加します。たとえば、コードを読み取らせました

    case v, ok := <-cin1:
        if ok {
            cout <- v
        } else {
            n--
            cin1 = nil
        }
    case v, ok := <-cin2:
        if ok {
            cout <- v
        } else {
            n--
            cin2 = nil
        }
    }

このソリューションは、あなたが望むことを行い、忙しく待っているわけではありません。

それでは、このソリューションをスライスを多重化する関数に組み込んだ完全な例を次に示します。

package main

import (
    "fmt"
    "time"
)

func multiplex(cin []chan int, cout chan int) {
    var cin0, cin1 chan int
    switch len(cin) {
    case 2:
        cin1 = cin[1]
        fallthrough
    case 1:
        cin0 = cin[0]
    case 0:
    default:
        cin0 = make(chan int)
        cin1 = make(chan int)
        half := len(cin) / 2
        go multiplex(cin[:half], cin0)
        go multiplex(cin[half:], cin1)
    }
    for cin0 != nil || cin1 != nil {
        select {
        case v, ok := <-cin0:
            if ok {
                cout <- v
            } else {
                cin0 = nil
            }
        case v, ok := <-cin1:
            if ok {
                cout <- v
            } else {
                cin1 = nil
            }
        }
    }
    close(cout)
}

func main() {
    cin := []chan int{
        make(chan int),
        make(chan int),
        make(chan int),
    }
    cout := make(chan int)
    for i, c := range cin {
        go func(x int, cx chan int) {
            for i := 1; i <= 3; i++ {
                time.Sleep(100 * time.Millisecond)
                cx <- x*10 + i
            }
            close(cx)
        }(i, c)
    }
    go multiplex(cin, cout)
    for v := range cout {
        fmt.Println("main gets", v)
    }
}
于 2012-06-11T16:29:54.727 に答える
0

ゴルーチンを使ってこれを作りました。それはあなたが望むものですか?

package main

import (
    "fmt"
)

func multiplex(cin []chan int, cout chan int) {
    n := len(cin)
    for _, ch := range cin {
        go func(src chan int) {
            for {
                v, ok := <-src
                if ok {
                    cout <- v
                } else {
                    n-- // a little dangerous. Maybe use a channel to avoid missed decrements
                    if n == 0 {
                        close(cout)
                    }
                    break
                }
            }
        }(ch)
    }
}

// a main to test the multiplex
func main() {
    cin := make([]chan int, 3)
    cin[0] = make(chan int, 2)
    cin[1] = make(chan int, 2)
    cin[2] = make(chan int, 2)
    cout := make(chan int, 2)
    multiplex(cin, cout)
    cin[1] <- 1
    cin[0] <- 2
    cin[2] <- 3
    cin[1] <- 4
    cin[0] <- 5
    close(cin[1])
    close(cin[0])
    close(cin[2])
    for {
        v, ok := <-cout
        if ok {
            fmt.Println(v)
        } else {
            break
        }
    }
}

編集:参照:

http://golang.org/ref/spec#Receive_operator

http://golang.org/ref/spec#Close

于 2012-06-11T12:20:45.770 に答える