14

私がやりたいのは、一連のプロデューサーゴルーチン(一部は完了している場合と完了していない場合があります)とコンシューマールーチンを用意することです。問題は括弧内の警告にあります-答えを返す総数はわかりません。

だから私がしたいのはこれです:

package main

import (
  "fmt"
  "math/rand"
)

func producer(c chan int) {
  // May or may not produce.
  success := rand.Float32() > 0.5
  if success {
    c <- rand.Int()
  }
}

func main() {
  c := make(chan int, 10)
  for i := 0; i < 10; i++ {
    go producer(c, signal)
  }

  // If we include a close, then that's WRONG. Chan will be closed
  // but a producer will try to write to it. Runtime error.
  close(c)

  // If we don't close, then that's WRONG. All goroutines will
  // deadlock, since the range keyword will look for a close.
  for num := range c {
    fmt.Printf("Producer produced: %d\n", num)
  }
  fmt.Println("All done.")
}

したがって、問題は、閉じた場合、閉じない場合、それでも間違っているということです(コードのコメントを参照)。

さて、解決策は、すべてのプロデューサーが次のように書き込む帯域外信号チャネルになります。

package main

import (
  "fmt"
  "math/rand"
)

func producer(c chan int, signal chan bool) {
  success := rand.Float32() > 0.5
  if success {
    c <- rand.Int()
  }
  signal <- true
}

func main() {
  c := make(chan int, 10)
  signal := make(chan bool, 10)
  for i := 0; i < 10; i++ {
    go producer(c, signal)
  }

  // This is basically a 'join'.
  num_done := 0
  for num_done < 10 {
    <- signal
    num_done++
  }
  close(c)

  for num := range c {
    fmt.Printf("Producer produced: %d\n", num)
  }
  fmt.Println("All done.")
}

そして、それは完全に私が望むことをします!しかし、私にはそれは一口のように思えます。私の質問は次のとおりです。より簡単な方法で同様のことを行うことができるイディオム/トリックはありますか?

http://golang.org/doc/codewalk/sharemem/ そして、completechan(の先頭で初期化されたmain)が範囲で使用されているようですが、閉じられていないようです。方法がわかりません。

誰かが何か洞察を持っているなら、私はそれを大いに感謝します。乾杯!


編集:fls0815に答えがあり、クローズレスチャネル範囲がどのように機能するかという質問にも答えています。

上記の私のコードは動作するように変更されました(fls0815が親切に提供したコードの前に実行されました):

package main

import (
  "fmt"
  "math/rand"
  "sync"
)

var wg_prod sync.WaitGroup
var wg_cons sync.WaitGroup

func producer(c chan int) {
  success := rand.Float32() > 0.5
  if success {
    c <- rand.Int()
  }
  wg_prod.Done()
}

func main() {
  c := make(chan int, 10)
  wg_prod.Add(10)
  for i := 0; i < 10; i++ {
    go producer(c)
  }

  wg_cons.Add(1)
  go func() {
    for num := range c {
      fmt.Printf("Producer produced: %d\n", num)
    }
    wg_cons.Done()
  } ()

  wg_prod.Wait()
  close(c)
  wg_cons.Wait()
  fmt.Println("All done.")
}
4

5 に答える 5

16

プロデューサーのみがチャネルを閉じる必要があります。rangeプロデューサーが開始されたら、結果のチャネルを反復 ( ) するコンシューマーを呼び出すことで、目標を達成できます。メイン スレッドではsync.WaitGroup、コンシューマー/プロデューサーが作業を完了するまで待機します (「参考文献」を参照)。プロデューサが終了したら、結果のチャネルを閉じます。これにより、コンシューマが強制的に終了します (rangeチャネルが閉じられ、バッファリングされたアイテムがなくなると終了します)。

コード例:

package main

import (
    "log"
    "sync"
    "time"
    "math/rand"
    "runtime"
)

func consumer() {
    defer consumer_wg.Done()

    for item := range resultingChannel {
        log.Println("Consumed:", item)
    }
}

func producer() {
    defer producer_wg.Done()

    success := rand.Float32() > 0.5
    if success {
        resultingChannel <- rand.Int()
    }
}

var resultingChannel = make(chan int)
var producer_wg sync.WaitGroup
var consumer_wg sync.WaitGroup

func main() {
    rand.Seed(time.Now().Unix())

    for c := 0; c < runtime.NumCPU(); c++ {
        producer_wg.Add(1)  
        go producer()
    }

    for c := 0; c < runtime.NumCPU(); c++ {
        consumer_wg.Add(1)
        go consumer()
    }

    producer_wg.Wait()

    close(resultingChannel)

    consumer_wg.Wait()
}

-statement を main 関数に入れた理由はclose、複数のプロデューサーがあるためです。上記の例で 1 つのプロデューサーでチャネルを閉じると、既に遭遇した問題が発生します (クローズされたチャネルに書き込みます。その理由は、まだデータを生成している 1 つのプロデューサーが残っている可能性があるためです)。チャネルは、プロデューサーが残っていない場合にのみ閉じる必要があります (したがって、プロデューサーのみがチャネルを閉じることをお勧めします)。これは、Go でチャネルが構築される方法です。ここでは、チャネルの閉鎖に関する詳細情報を確認できます。


sharemem の例に関連する: AFAICS この例は、リソースを何度も再キューイングすることによって無限に実行されます (保留中 -> 完了 -> 保留中 -> 完了... など)。これは、main-func の最後の反復が行うことです。完了したリソースを受け取り、Resource.Sleep() を使用して再キューイングして保留状態にします。完了したリソースがない場合、新しいリソースが完了するのを待機してブロックします。したがって、チャネルは常に使用されているため、チャネルを閉じる必要はありません。

于 2012-06-18T00:51:16.327 に答える
1

これらの問題を解決する方法は常にたくさんあります。これは、Go の基本である単純な同期チャネルを使用したソリューションです。バッファリングされたチャネル、終了チャネル、WaitGroups はありません。

それはあなたの「一口」の解決策からそれほど離れていません.-失望して申し訳ありません--それほど小さくはありません. コンシューマーを独自のゴルーチンに配置するため、プロデューサーが数値を生成するときにコンシューマーが数値を消費できます。また、生産の「試行」が成功または失敗のいずれかで終了する可能性があることも区別します。生産が失敗した場合、試行はすぐに行われます。成功した場合、数が消費されるまで試行は行われません。

package main

import (
    "fmt"
    "math/rand"
)

func producer(c chan int, fail chan bool) {
    if success := rand.Float32() > 0.5; success {
        c <- rand.Int()
    } else {
        fail <- true
    }
}

func consumer(c chan int, success chan bool) {
    for {
        num := <-c
        fmt.Printf("Producer produced: %d\n", num)
        success <- true
    }
}

func main() {
    const nTries = 10
    c := make(chan int)
    done := make(chan bool)
    for i := 0; i < nTries; i++ {
        go producer(c, done)
    }
    go consumer(c, done)

    for i := 0; i < nTries; i++ {
        <-done
    }
    fmt.Println("All done.")
}
于 2012-06-18T04:47:32.060 に答える
0

既存の回答ではいくつかのことが明確にならないため、これを追加しています。まず、コードウォークの例の範囲ループは単なる無限イベント ループであり、同じ URL リストの再チェックと更新を永遠に続けます。

次に、チャネルは、それ自体で、既にGoの慣用的なコンシューマー/プロデューサー キューです。チャネルをバッキングする非同期バッファのサイズによって、プロデューサが背圧を受ける前に生成できる量が決まります。以下で N = 0 を設定すると、誰も先を争ったり遅れたりすることなく、ロックステップの生産者と消費者を確認できます。そのままでは、N = 10 の場合、プロデューサーはブロックする前に最大 10 個の製品を生産できます。

最後に、Go で通信するシーケンシャル プロセスを記述するための優れた慣用句がいくつかあります (たとえば、go ルーチンを開始する関数や、for/select パターンを使用して通信し、制御コマンドを受け入れる)。私は WaitGroups を不器用だと考えており、代わりに慣用的な例を見たいと思っています。

package main

import (
    "fmt"
    "time"
)

type control int
const  (
    sleep control = iota
    die // receiver will close the control chan in response to die, to ack.
)

func (cmd control) String() string {
    switch cmd {
    case sleep: return "sleep"
    case die: return "die"
    }
    return fmt.Sprintf("%d",cmd)
}

func ProduceTo(writechan chan<- int, ctrl chan control, done chan bool) {
    var product int
    go func() {
        for {
            select {
        case writechan <- product:
            fmt.Printf("Producer produced %v\n", product)
            product++
        case cmd:= <- ctrl:
            fmt.Printf("Producer got control cmd: %v\n", cmd)
            switch cmd {
            case sleep:
                fmt.Printf("Producer sleeping 2 sec.\n")
                time.Sleep(2000 * time.Millisecond)
            case die:
                fmt.Printf("Producer dies.\n")
                close(done)
                return
            }
            }
        }
    }()
}

func ConsumeFrom(readchan <-chan int, ctrl chan control, done chan bool) {
    go func() {
        var product int
        for {
            select {
            case product = <-readchan:
                fmt.Printf("Consumer consumed %v\n", product)
            case cmd:= <- ctrl:
                fmt.Printf("Consumer got control cmd: %v\n", cmd)
                switch cmd {
                case sleep:
                    fmt.Printf("Consumer sleeping 2 sec.\n")
                    time.Sleep(2000 * time.Millisecond)
                case die:
                    fmt.Printf("Consumer dies.\n")
                    close(done)
                    return
                }

            }
        }
    }()
}

func main() {

    N := 10
    q := make(chan int, N)

    prodCtrl := make(chan control)
    consCtrl := make(chan control)

    prodDone := make(chan bool)
    consDone := make(chan bool)


    ProduceTo(q, prodCtrl, prodDone)
    ConsumeFrom(q, consCtrl, consDone)

    // wait for a moment, to let them produce and consume
    timer := time.NewTimer(10 * time.Millisecond)
    <-timer.C

    // tell producer to pause
    fmt.Printf("telling producer to pause\n")
    prodCtrl <- sleep

    // wait for a second
    timer = time.NewTimer(1 * time.Second)
    <-timer.C

    // tell consumer to pause
    fmt.Printf("telling consumer to pause\n")
    consCtrl <- sleep


    // tell them both to finish
    prodCtrl <- die
    consCtrl <- die

    // wait for that to actually happen
    <-prodDone
    <-consDone
}
于 2014-05-09T01:52:39.433 に答える
0

fanIn 関数でジェネレーター パターンを使用する場合、待機グループなしで単純なバッファーなしチャネルを使用できます。

ジェネレーター パターンでは、各プロデューサーがチャネルを返し、それを閉じる責任があります。次に、fanIn 関数がこれらのチャネルを繰り返し処理し、返された値を単一のチャネルに転送します。

もちろん問題は、各チャネルが閉じられたときに、fanIn 関数がチャネル タイプ (int) のゼロ値を転送することです。

チャネル タイプのゼロ値をセンチネル値として使用し、ゼロ値でない場合は fanIn チャネルからの結果のみを使用することで、この問題を回避できます。

次に例を示します。

package main

import (
    "fmt"
    "math/rand"
)

const offset = 1

func producer() chan int {
    cout := make(chan int)
    go func() {
        defer close(cout)
        // May or may not produce.
        success := rand.Float32() > 0.5
        if success {
            cout <- rand.Int() + offset
        }
    }()
    return cout
}

func fanIn(cin []chan int) chan int {
    cout := make(chan int)
    go func() {
        defer close(cout)
        for _, c := range cin {
            cout <- <-c
        }
    }()
    return cout
}

func main() {
    chans := make([]chan int, 0)
    for i := 0; i < 10; i++ {
        chans = append(chans, producer())
    }

    for num := range fanIn(chans) {
        if num > offset {
            fmt.Printf("Producer produced: %d\n", num)
        }
    }
    fmt.Println("All done.")
}
于 2016-06-10T10:31:31.743 に答える