0
func main() {
    jobs := []Job{job1, job2, job3}
    numOfJobs := len(jobs)
    resultsChan := make(chan *Result, numOfJobs)
    jobChan := make(chan *job, numOfJobs)
    go consume(numOfJobs, jobChan, resultsChan)
    for i := 0; i < numOfJobs; i++ {
        jobChan <- jobs[i]
    }
    close(jobChan)

    for i := 0; i < numOfJobs; i++ {
        <-resultsChan
    }
    close(resultsChan)
}

func (b *Blockchain) consume(num int, jobChan chan *Job, resultsChan chan *Result) {
    for i := 0; i < num; i++ {
        go func() {
            job := <-jobChan
            resultsChan <- doJob(job)
        }()
    }
}

上記の例では、ジョブが jobChan にプッシュされ、ゴルーチンがそれを jobChan から取り出して同時にジョブを実行し、結果を resultsChan にプッシュします。次に、resultsChan から結果を引き出します。

質問1:

私のコードでは、シリアル化/線形化された結果はありません。ジョブは、ジョブ 1、ジョブ 2、ジョブ 3 の順に進みますが。結果は、最も時間がかかるジョブに応じて、ジョブ 3、ジョブ 1、ジョブ 2 のようになります。

ジョブを同時に実行したいのですが、結果がジョブとして入ったのと同じ順序で resultsChan から出てくることを確認する必要があります。

質問2:

約 30 万のジョブがあります。これは、コードが最大 30 万のゴルーチンを生成することを意味します。非常に多くのゴルーチンを使用することは効率的ですか、それともジョブを 100 程度のスライスにグループ化し、各ゴルーチンを 1 ではなく 100 を通過させる方がよいでしょうか。

4

1 に答える 1

1

これは私がシリアライゼーションを処理した方法です(また、限られた数のワーカーを設定しています)。いくつかのワーカー オブジェクトに入力フィールドと出力フィールド、および同期チャネルを設定し、それらをラウンド ロビン方式で処理して、完了した作業をピックアップし、新しいジョブを与えます。次に、それらを最後に 1 回通過して、残っている完了したジョブをピックアップします。異常に長いジョブが 1 つある場合でも、すべてのリソースを少しの間ビジー状態に保つことができるように、ワーカー数をコア数よりも若干多くしたい場合があることに注意してください。コードはhttp://play.golang.org/p/PM9y4ieMxw以下にあります。

これは毛むくじゃらです (座って例を書く前に覚えているよりも毛むくじゃらです!) - 他の誰かが持っているもの、より良い実装、または目標を達成するためのまったく異なる方法を見てみたいです。

package main

import (
    "fmt"
    "math/rand"
    "runtime"
    "time"
)

type Worker struct {
    in     int
    out    int
    inited bool

    jobReady chan bool
    done     chan bool
}

func (w *Worker) work() {
    time.Sleep(time.Duration(rand.Float32() * float32(time.Second)))
    w.out = w.in + 1000
}
func (w *Worker) listen() {
    for <-w.jobReady {
        w.work()
        w.done <- true
    }
}
func doSerialJobs(in chan int, out chan int) {
    concurrency := 23
    workers := make([]Worker, concurrency)
    i := 0
    // feed in and get out items
    for workItem := range in {
        w := &workers[i%
            concurrency]
        if w.inited {
            <-w.done
            out <- w.out
        } else {
            w.jobReady = make(chan bool)
            w.done = make(chan bool)
            w.inited = true
            go w.listen()
        }
        w.in = workItem
        w.jobReady <- true
        i++
    }
    // get out any job results left over after we ran out of input
    for n := 0; n < concurrency; n++ {
        w := &workers[i%concurrency]
        if w.inited {
            <-w.done
            out <- w.out
        }
        close(w.jobReady)
        i++
    }
    close(out)
}
func main() {
    runtime.GOMAXPROCS(10)
    in, out := make(chan int), make(chan int)
    allFinished := make(chan bool)
    go doSerialJobs(in, out)
    go func() {
        for result := range out {
            fmt.Println(result)
        }
        allFinished <- true
    }()
    for i := 0; i < 100; i++ {
        in <- i
    }
    close(in)
    <-allFinished
}

inこの例では、 とだけoutが実際のデータを運ぶことに注意してください。他のすべてのチャネルは同期のためだけのものです。

于 2014-01-07T19:33:35.510 に答える