3

私は Go でいくつかのストリーム処理を行っていますが、ロックなしで「Go way」を実行する方法を見つけようとして行き詰まりました。

この不自然な例は、私が直面している問題を示しています。

  • 一度に1つずつ取得thingします。
  • それらを と呼ばれるスライスにバッファリングするゴルーチンがありますthings
  • thingsいっぱいになると何とかlen(things) == 100処理してリセット
  • いっぱいになる前にnアクセスする必要がある同時ゴルーチンの数がありますthings
  • things他のゴルーチンからの「不完全」へのアクセスは予測できません。
  • 変異する必要doSomethingWithPartialもないdoSomethingWithCompletethings

コード:

var m sync.Mutex
var count int64
things := make([]int64, 0, 100)

// slices of data are constantly being generated and used
go func() {
  for {
    m.Lock()
    if len(things) == 100 {
      // doSomethingWithComplete does not modify things
      doSomethingWithComplete(things)
      things = make([]int64, 0, 100)
    }
    things = append(things, count)
    m.Unlock()
    count++
  }
}()

// doSomethingWithPartial needs to access the things before they're ready
for {
  m.Lock()
  // doSomethingWithPartial does not modify things
  doSomethingWithPartial(things)
  m.Unlock()
}
  1. スライスは不変であることを知っているので、ミューテックスを削除して、それがまだ機能することを期待できることを意味します (私はノーと仮定します)

  2. これをリファクタリングして、ミューテックスの代わりにチャネルを使用するにはどうすればよいですか。

編集:ミューテックスを使用しない、私が思いついたソリューションは次のとおりです

package main

import (
    "fmt"
    "sync"
    "time"
)

func Incrementor() chan int {
    ch := make(chan int)
    go func() {
        count := 0
        for {
            ch <- count
            count++
        }
    }()
    return ch
}

type Foo struct {
    things   []int
    requests chan chan []int
    stream   chan int
    C        chan []int
}

func NewFoo() *Foo {
    foo := &Foo{
        things:   make([]int, 0, 100),
        requests: make(chan chan []int),
        stream:   Incrementor(),
        C:        make(chan []int),
    }
    go foo.Launch()
    return foo
}

func (f *Foo) Launch() {
    for {
        select {
        case ch := <-f.requests:
            ch <- f.things
        case thing := <-f.stream:
            if len(f.things) == 100 {
                f.C <- f.things
                f.things = make([]int, 0, 100)
            }
            f.things = append(f.things, thing)
        }
    }
}

func (f *Foo) Things() []int {
    ch := make(chan []int)
    f.requests <- ch
    return <-ch
}

func main() {

    foo := NewFoo()

    var wg sync.WaitGroup
    wg.Add(10)

    for i := 0; i < 10; i++ {
        go func(i int) {
            time.Sleep(time.Millisecond * time.Duration(i) * 100)
            things := foo.Things()
            fmt.Println("got things:", len(things))
            wg.Done()
        }(i)
    }

    go func() {
        for _ = range foo.C {
            // do something with things
        }
    }()

    wg.Wait()
}
4

1 に答える 1

1

「行く方法」は、おそらくこれにミューテックスを使用することであることに注意してください。チャネルでそれを行う方法を考えるのは楽しいですが、この特定の問題についてはおそらくミューテックスの方が単純で簡単に推論できます。

于 2013-05-19T02:51:01.337 に答える