99

これは、@Jimt によって書かれた Go のワーカーとコントローラー モードの良い例です

package main

import (
    "fmt"
    "runtime"
    "sync"
    "time"
)

// Possible worker states.
const (
    Stopped = 0
    Paused  = 1
    Running = 2
)

// Maximum number of workers.
const WorkerCount = 1000

func main() {
    // Launch workers.
    var wg sync.WaitGroup
    wg.Add(WorkerCount + 1)

    workers := make([]chan int, WorkerCount)
    for i := range workers {
        workers[i] = make(chan int)

        go func(i int) {
            worker(i, workers[i])
            wg.Done()
        }(i)
    }

    // Launch controller routine.
    go func() {
        controller(workers)
        wg.Done()
    }()

    // Wait for all goroutines to finish.
    wg.Wait()
}

func worker(id int, ws <-chan int) {
    state := Paused // Begin in the paused state.

    for {
        select {
        case state = <-ws:
            switch state {
            case Stopped:
                fmt.Printf("Worker %d: Stopped\n", id)
                return
            case Running:
                fmt.Printf("Worker %d: Running\n", id)
            case Paused:
                fmt.Printf("Worker %d: Paused\n", id)
            }

        default:
            // We use runtime.Gosched() to prevent a deadlock in this case.
            // It will not be needed of work is performed here which yields
            // to the scheduler.
            runtime.Gosched()

            if state == Paused {
                break
            }

            // Do actual work here.
        }
    }
}

// controller handles the current state of all workers. They can be
// instructed to be either running, paused or stopped entirely.
func controller(workers []chan int) {
    // Start workers
    for i := range workers {
        workers[i] <- Running
    }

    // Pause workers.
    <-time.After(1e9)
    for i := range workers {
        workers[i] <- Paused
    }

    // Unpause workers.
    <-time.After(1e9)
    for i := range workers {
        workers[i] <- Running
    }

    // Shutdown workers.
    <-time.After(1e9)
    for i := range workers {
        close(workers[i])
    }
}

しかし、このコードには問題もあります。終了workers時にワーカー チャネルを削除しようとするworker()と、デッド ロックが発生します。

close(workers[i])すると、次にコントローラーが書き込みを行うとパニックが発生します。これは、go が閉じたチャネルに書き込むことができないためです。ミューテックスを使用して保護すると、がチャネルから何も読み取らず、書き込みがブロックされるためスタックされ、ミューテックスはデッドロックを引き起こしますworkers[i] <- Runningworker回避策として、より大きなバッファーをチャネルに与えることもできますが、十分ではありません。

したがって、これを解決する最善の方法はworker()、終了時にチャネルを閉じることだと思います。コントローラがチャネルが閉じていることを検出した場合、それを飛び越えて何もしません。しかし、この状況でチャネルが既に閉じられているかどうかを確認する方法が見つかりません。コントローラーでチャネルを読み取ろうとすると、コントローラーがブロックされる可能性があります。だから私は今とても混乱しています。

PS: 発生したパニックを回復することは私が試みたものですが、パニックを発生させたゴルーチンを閉じます。この場合はコントローラーになるので使い物になりません。

それでも、次のバージョンの Go でこの機能を実装することは、Go チームにとって役立つと思います。

4

10 に答える 10

86

チャネルと対話せずに、チャネルが開いているかどうかを知る必要がある安全なアプリケーションを作成する方法はありません。

やりたいことを実行する最善の方法は、2 つのチャネルを使用することです。1 つは作業用で、もう 1 つは状態を変更したいという希望を示すためのものです (重要な場合は、その状態変更の完了も同様です)。

チャンネルは安いです。セマンティクスをオーバーロードする複雑な設計はそうではありません。

[また]

<-time.After(1e9)

非常に紛らわしく、わかりにくい書き方です

time.Sleep(time.Second)

物事をシンプルに保ち、全員 (あなたを含む) が理解できるようにします。

于 2013-04-21T02:19:40.800 に答える
79

ハックな方法で、発生したパニックを回復することにより、書き込みを試みるチャネルに対して実行できます。ただし、読み取りチャネルから読み取ることなく、読み取りチャネルが閉じているかどうかを確認することはできません。

どちらか

  • 最終的にそこから「真の」値を読み取ります ( v <- c)
  • 「真」の値と「閉じていない」インジケーターを読み取ります ( v, ok <- c)
  • ゼロ値と「クローズ」インジケータ ( v, ok <- c)を読み取ります()
  • チャネル読み取りを永久にブロックします ( v <- c)

技術的には最後のものだけがチャネルから読み取られませんが、それはほとんど役に立ちません。

于 2013-04-19T13:25:49.987 に答える
8

私はこの答えがとても遅いことを知っています, 私はこの解決策を書きました, Hacking Go run-time , 安全ではありません. クラッシュする可能性があります:

import (
    "unsafe"
    "reflect"
)


func isChanClosed(ch interface{}) bool {
    if reflect.TypeOf(ch).Kind() != reflect.Chan {
        panic("only channels!")
    }
    
    // get interface value pointer, from cgo_export 
    // typedef struct { void *t; void *v; } GoInterface;
    // then get channel real pointer
    cptr := *(*uintptr)(unsafe.Pointer(
        unsafe.Pointer(uintptr(unsafe.Pointer(&ch)) + unsafe.Sizeof(uint(0))),
    ))
    
    // this function will return true if chan.closed > 0
    // see hchan on https://github.com/golang/go/blob/master/src/runtime/chan.go 
    // type hchan struct {
    // qcount   uint           // total data in the queue
    // dataqsiz uint           // size of the circular queue
    // buf      unsafe.Pointer // points to an array of dataqsiz elements
    // elemsize uint16
    // closed   uint32
    // **
    
    cptr += unsafe.Sizeof(uint(0))*2
    cptr += unsafe.Sizeof(unsafe.Pointer(uintptr(0)))
    cptr += unsafe.Sizeof(uint16(0))
    return *(*uint32)(unsafe.Pointer(cptr)) > 0
}
于 2016-08-08T18:54:47.647 に答える
2

複数の同時ゴルーチンでこの問題が頻繁に発生しました。

これは良いパターンかもしれませんし、そうでないかもしれませんが、私はワーカー用の struct を定義し、ワーカーの状態を示す quit チャネルとフィールドを使用します。

type Worker struct {
    data chan struct
    quit chan bool
    stopped bool
}

次に、コントローラーでワーカーの停止関数を呼び出すことができます。

func (w *Worker) Stop() {
    w.quit <- true
    w.stopped = true
}

func (w *Worker) eventloop() {
    for {
        if w.Stopped {
            return
        }
        select {
            case d := <-w.data:
                //DO something
                if w.Stopped {
                    return
                }
            case <-w.quit:
                return
        }
    }
}

これにより、何もハングしたりエラーを生成したりすることなく、ワーカーをきれいに停止させることができます。これは、コンテナーで実行する場合に特に適しています。

于 2021-02-26T14:17:11.493 に答える
-5

チャネルに要素があるかどうかを最初に確認する方が簡単です。これにより、チャネルが有効であることを確認できます。

func isChanClosed(ch chan interface{}) bool {
    if len(ch) == 0 {
        select {
        case _, ok := <-ch:
            return !ok
        }
    }
    return false 
}
于 2015-07-20T11:58:28.340 に答える
-8

このチャンネルを聞くと、チャンネルが閉鎖されたことをいつでも知ることができます。

case state, opened := <-ws:
    if !opened {
         // channel was closed 
         // return or made some final work
    }
    switch state {
        case Stopped:

ただし、1 つのチャネルを 2 回閉じることはできないことに注意してください。これはパニックを引き起こします。

于 2013-04-19T13:19:07.203 に答える