7

Go のセマフォはチャネルで実装されます。

例はこれです: https://sites.google.com/site/gopatterns/concurrency/semaphores

環境:

数百のサーバーがあり、アクセスを制限したい共有リソースがあります。したがって、特定のリソースに対して、セマフォを使用して、これらのサーバーによる同時アクセスを 5 つだけに制限したいと考えています。そのために、ロックサーバーを使用する予定です。マシンがリソースにアクセスすると、最初にキーによってリソースにアクセスしていることをロック サーバーに登録します。そして、それが完了すると、ロックサーバーに別のリクエストを送信して、完了したことを伝え、セマフォを解放します。これにより、これらのリソースへのアクセスを同時アクセスの最大数に制限できます。

問題:何か問題が発生した場合に、これを適切に処理したい。

質問:

セマフォにタイムアウトを実装するにはどうすればよいですか?

例:

セマフォのサイズが 5 だとしましょう。セマフォでロックを取得しようとするプロセスが同時に 10 個あるため、この場合は 5 つだけがロックを取得します。

場合によっては、プロセスが応答せずに終了することがあります (本当の理由は説明が少し複雑ですが、基本的にプロセスがロックを解除しない場合があります)。そのため、セマフォ内のスペースが永久にロックされるため、問題が発生します。

したがって、これにはタイムアウトが必要です。ここにいくつかの問題があります:

プロセスは 2 秒から 60 分の間で実行されます。

タイムアウトになってからプロセスがロックを解除しようとすると、セマフォを 1 回ではなく 2 回ロック解除したことになるため、いくつかの競合状態があります。逆に、最初にロックを解除してからタイムアウトします。

上記の提案されたパターンを使用して、これをタイムアウト付きのスレッドセーフなセマフォに変えるにはどうすればよいですか?

4

5 に答える 5

1

分散ロック サービスを作成しているため、ロック サーバーはポートでリッスンし、accept() 接続をループすると、接続ごとにゴルーチンでコマンドを待機すると仮定します。そして、そのゴルーチンは、ソケットがドロップされると終了します (つまり、リモートノードのクラッシュ)

したがって、それが真実であると仮定すると、いくつかのことができます。

1) 同時ロックの数に一致する深さを持つチャネルを作成します 2) ロックすると、チャネルにメッセージを送信します (満杯の場合はブロックされます) 3) ロックを解除すると、チャネルからメッセージを読み取るだけです 4) できる" defer release()" (すでにロックされている場合、release はメッセージを消費します)

これは大まかな作業例ですが、ソケットのもの以外はすべてです。うまくいけば、それは理にかなっています。 http://play.golang.org/p/DLOX7m8m6q

package main

import "fmt"

import "time"

type Locker struct {
    ch chan int
    locked bool
}

func (l *Locker) lock(){
    l.ch <- 1
    l.locked=true
}
func (l *Locker) unlock() {
    if l.locked { // called directly or via defer, make sure we don't unlock if we don't have the lock
        l.locked = false // avoid unlocking twice if socket crashes after unlock
        <- l.ch
    }
}

func dostuff(name string, locker Locker) {
    locker.lock()
    defer locker.unlock()
    fmt.Println(name,"Doing stuff")
    time.Sleep(1 * time.Second)
}

func main() {
    ch := make(chan int, 2)
    go dostuff("1",Locker{ch,false})
    go dostuff("2",Locker{ch,false})
    go dostuff("3",Locker{ch,false})
    go dostuff("4",Locker{ch,false})
    time.Sleep(4 * time.Second)
}
于 2013-10-29T01:17:28.240 に答える
0

これは役立つかもしれませんが、この実装は広すぎると思い
ます。コードに関する提案をいただければ幸いです。

package main

import (
   "fmt"
   "time"
   "math/rand"
   "strconv"
)

type Empty interface{}

type Semaphore struct {
    dur time.Duration
    ch  chan Empty
}

func NewSemaphore(max int, dur time.Duration) (sem *Semaphore) {
    sem = new(Semaphore)
    sem.dur = dur
    sem.ch = make(chan Empty, max)
    return
}

type Timeout struct{}

type Work struct{}

var null Empty
var timeout Timeout
var work Work

var global = time.Now()

func (sem *Semaphore) StartJob(id int, job func()) {
    sem.ch <- null
    go func() {
        ch := make(chan interface{})
        go func() {
            time.Sleep(sem.dur)
            ch <- timeout
        }()
        go func() {
            fmt.Println("Job ", strconv.Itoa(id), " is started", time.Since(global))
            job()
            ch <- work
        }()
        switch (<-ch).(type) {
        case Timeout:
            fmt.Println("Timeout for job ", strconv.Itoa(id), time.Since(global))
        case Work:
            fmt.Println("Job ", strconv.Itoa(id), " is finished", time.Since(global))
        }
        <-sem.ch
    }()
}

func main() {
    rand.Seed(time.Now().Unix())
    sem := NewSemaphore(3, 3*time.Second)
    for i := 0; i < 10; i++ {
        id := i
        go sem.StartJob(i, func() {
            seconds := 2 + rand.Intn(5)
            fmt.Println("For job ", strconv.Itoa(id), " was allocated ", seconds, " secs")
            time.Sleep(time.Duration(seconds) * time.Second)
        })
    }
    time.Sleep(30 * time.Second)
}
于 2015-03-22T12:36:27.320 に答える