2

Go言語を使用してデータインポートジョブに取り組んでいます。各ステップをクロージャーとして記述し、通信にチャネルを使用します。つまり、各ステップは並行しています。問題は次の構造で定義できます。

  1. データソースから ウィジェットを取得する
    1. ソース1からウィジェットに翻訳を追加します。
    2. ソース2からウィジェットに翻訳を追加します。
    3. ソース1からウィジェットに価格を追加します。
    4. WidgetRevisionsウィジェットに追加します。
      1. ソース1からWidgetRevisionsに翻訳を追加します
      2. ソース2からWidgetRevisionsに翻訳を追加します

この質問の目的のために、私は新しいウィジェットで取らなければならない最初の3つのステップだけを扱っています。その上で、ステップ4はパイプラインステップとして実装でき、それ自体が*WidgetRevision*を制御するためのサブ3ステップパイプラインの観点から実装されると思います。

そのために、私は次のAPIを提供するためのコードを少し書いています。

// A Pipeline is just a list of closures, and a smart 
// function to set them all off, keeping channels of
// communication between them.
p, e, d := NewPipeline()

// Add the three steps of the process
p.Add(whizWidgets)
p.Add(popWidgets)
p.Add(bangWidgets)

// Start putting things on the channel, kick off
// the pipeline, and drain the output channel
// (probably to disk, or a database somewhere)
go emit(e)
p.Execute()
drain(d)

私はすでにそれを実装しました(GistまたはGo Playgroundのコード)が、100%の成功失敗率でデッドロックしています

p.Execute()おそらく、チャネルの1つが何もすることがなく、どのチャネルにも何も送信されず、何もする必要がないため、呼び出し時にデッドロックが発生します...

とに数行のデバッグ出力を追加するemit()drain()、次の出力が表示されます。クロージャ呼び出し間のパイプライン処理は正しいと思います。また、一部のウィジェットが省略されているのがわかります。

Emitting A Widget
Input Will Be Emitted On 0x420fdc80
Emitting A Widget
Emitting A Widget
Emitting A Widget
Output Will Drain From 0x420fdcd0
Pipeline reading from 0x420fdc80 writing to 0x420fdd20
Pipeline reading from 0x420fdd20 writing to 0x420fddc0
Pipeline reading from 0x420fddc0 writing to 0x42157000

このアプローチについて私が知っていることがいくつかあります。

  • このデザインがコルーチンを「飢えさせる」ことは珍しいことではないと私は信じています。それがこれが行き詰まっている理由だと思います。
  • そもそもパイプラインに物が供給されていればいいのですが(APIはPipeline.Process(*Widget)
    • 私がその仕事をすることができれば、ドレインは次の関数に何も渡さなかった「ステップ」である可能性があり、それはよりクリーンなAPIである可能性があります
  • ラングバッファを実装していないことはわかっているので、マシンの使用可能なメモリをオーバーロードする可能性は十分にあります。
  • これが良いGoスタイルだとは思いませんが...多くのGo機能を利用しているようですが、それは実際にはメリットではありません
  • WidgetRevisionsにもパイプラインが必要なため、パイプラインをより汎用的にしたいと思います。おそらくinterface{}タイプが解決策であるかもしれませんが、それが賢明かどうかを判断するのに十分なことはわかりません。
  • 競合状態を防ぐためにミューテックスを実装することを検討するようにアドバイスされましたが、クロージャはそれぞれWidget構造体の特定のユニットで動作するため、節約できると思いますが、そのトピックについて教育を受けてうれしいです。 。

要約:このコードを修正するにはどうすればよいですか、このコードを修正する必要があります。あなたが私より経験豊富なgoプログラマーである場合、この「順次作業単位」の問題をどのように解決しますか?

4

1 に答える 1

2

チャネルから遠く離れた抽象化を構築したとは思いません。明示的にパイプします。

次のように、実際のパイプ操作すべてに対して1つの関数を簡単に作成できます。

type StageMangler func(*Widget)

func stage(f StageMangler, chi <-chan *Widget, cho chan<- *Widget) {
    for widget := range chi {
                f(widget)
                cho <- widget
    }
    close(cho)
}

次にfunc(w *Widget) { w.Whiz = true}、ステージビルダーを渡すか、同様の方法で渡すことができます。

その時点addで、これらとそのワーカー数のコレクションを作成できるため、特定のステージでn人のワーカーをより簡単に作成できます。

実行時にこれらのパイプラインを構築しない限り、これがチャネルを直接つなぎ合わせるよりも簡単かどうかはわかりません。

于 2012-12-11T05:43:10.083 に答える