C/CPP でワーク スティーリング キューの適切な実装を探しています。Google を見回しましたが、役立つものは何も見つかりませんでした。
おそらく、誰かが優れたオープンソースの実装に精通しているでしょうか? (元の学術論文から取った疑似コードを実装したくない)。
C/CPP でワーク スティーリング キューの適切な実装を探しています。Google を見回しましたが、役立つものは何も見つかりませんでした。
おそらく、誰かが優れたオープンソースの実装に精通しているでしょうか? (元の学術論文から取った疑似コードを実装したくない)。
フリーランチなし。
紙を盗むオリジナル作品をぜひご覧ください。この論文はわかりにくい。論文には疑似コードではなく理論的証明が含まれていることを私は知っています。ただし、TBBほどシンプルなバージョンはありません。もしあれば、それは最適なパフォーマンスを提供しません。ワーク スチール自体にはある程度のオーバーヘッドが発生するため、最適化とトリックが非常に重要です。特に、デキューはスレッドセーフでなければなりません。高度にスケーラブルでオーバーヘッドの少ない同期を実装することは困難です。
なぜあなたがそれを必要とするのか、私は本当に疑問に思っています。適切な実装とは、TBB や Cilk のようなものだと思います。繰り返しますが、ワーク スティーリングを実装するのは困難です。
「ワークスチール」を実装することは、理論的には難しくありません。より多くの作業を実行するために、計算と他のタスクの生成を組み合わせて実行するタスクを含む一連のキューが必要です。また、新しく生成されたタスクをそれらのキューに配置するには、キューへのアトミック アクセスが必要です。最後に、タスクを実行したスレッドの作業をさらに見つけるために、各タスクが最後に呼び出すプロシージャが必要です。そのプロシージャは、作業を見つけるために作業キューを調べる必要があります。
そのようなワーク・スティーリング・システムのほとんどは、少数のスレッド (通常は実際のプロセッサー・コアによってバックアップされます) があり、スレッドごとに正確に 1 つのワーク・キューがあると想定しています。次に、まず自分のキューから作業を盗もうとし、それが空の場合は、他の人から盗もうとします。注意が必要なのは、どのキューを調べるかを知ることです。それらを順番にスキャンして作業を行うと、かなりコストがかかり、作業を探すスレッド間で大量の競合が発生する可能性があります。
1) コンテキストの切り替え (例えば、「スタック」などのプロセッサ コンテキスト レジスタの設定) は、純粋な C または C++ では記述できません。これは、パッケージの一部をターゲット プラットフォーム固有のマシン コードで記述することに同意することで解決できます。2) マルチプロセッサのキューへのアトミック アクセスは、純粋に C または C++ で行うことはできません (Dekker のアルゴリズムを無視します)。そのため、X86 LOCK XCH や Compare and Swap などのアセンブリ言語同期プリミティブを使用してコーディングする必要があります。さて、安全にアクセスできるようになったら、キューの更新に関連するコードはそれほど複雑ではなく、数行の C で簡単に記述できます。
ただし、混合アセンブラーを使用して C および C++ でそのようなパッケージをコーディングしようとすることは、依然としてかなり非効率的であり、最終的にはアセンブラーで全体をコーディングすることになることがわかると思います。残っているのはすべて C/C++ 互換のエントリ ポイントです:-}
私はPARLANSE並列プログラミング言語のためにこれを行いました。これは、任意の数の並列計算がライブで、いつでも相互作用 (同期) するというアイデアを提供します。これは X86 の舞台裏で CPU ごとに 1 つのスレッドで正確に実装され、実装は完全にアセンブラーで行われます。ワークスティーリング コードはおそらく合計 1000 行であり、非競合の場合に非常に高速にする必要があるため、トリッキーなコードです。
C および C++ の軟膏の実際のハエは、作業を表すタスクを作成するときに、どのくらいのスタック領域を割り当てるかということです。シリアル C/C++ プログラムは、単純に大量 (10Mb など) の1 つを過剰に割り当てることによって、この問題を回避します。線形スタックであり、そのスタック スペースがどれだけ浪費されているかは誰も気にしません。しかし、何千ものタスクを作成し、それらすべてを特定の瞬間に実行できる場合、各タスクに 10Mb を合理的に割り当てることはできません。そのため、タスクに必要なスタック スペースの量を静的に決定する (チューリング ハード) か、スタック チャンクを (たとえば、関数呼び出しごとに) 割り当てる必要がありますが、これは広く利用されている C/C++ コンパイラでは行われません。 (たとえば、使用している可能性が高いもの)。最後の方法は、タスクの作成を抑制して、いつでも数百に制限し、ライブのタスク間で数百の非常に巨大なスタックを多重化することです。タスクがインターロック/サスペンド状態になる可能性がある場合は、しきい値に達するため、最後に行うことはできません。したがって、タスクのみの場合にのみこれを行うことができます計算をします。かなり厳しい制約のようです。
PARLANSE では、関数呼び出しごとにヒープにアクティベーション レコードを割り当てるコンパイラを構築しました。
非常にエレガントな方法でそれを簡単に行うためのツールがあります。これは、非常に短時間でプログラムを並列化するための非常に効果的な方法です。
HPCチャレンジアワード
HPCチャレンジクラス2アワードのCilkエントリーは、2006年の「エレガンスとパフォーマンスのベストコンビネーション」賞を受賞しました。この賞は、2006年11月14日にタンパで開催されたSC'06で行われました。
pthreadまたはboost::thread上に構築されたC++でのスタンドアロンのワークスティーリングキュークラスの実装を探している場合は、幸運を祈ります。私の知る限り、それはありません。
ただし、他の人が言っているように、Cilk、TBB、およびMicrosoftのPPLはすべて、内部でワークスティーリングの実装を持っています。
問題は、ワークスティーリングキューを使用するのか、それとも実装するのかということです。1つだけを使用したい場合は、上記の選択肢が適切な開始点であり、いずれかで「タスク」をスケジュールするだけで十分です。
BlueRajaがPPLのtask_groupとstructured_task_groupがこれを行うと述べたように、これらのクラスは最新バージョンのIntelのTBBでも利用可能であることに注意してください。並列ループ(parallel_for、parallel_for_each)もworkstealingで実装されます。
実装を使用するのではなくソースを調べる必要がある場合、TBBはオープンソースであり、MicrosoftはCRTのソースを出荷しているため、調査を行うことができます。
Joe DuffyのブログでC#の実装を確認することもできます(ただし、C#であり、メモリモデルは異なります)。
-リック
OpenMP は、再帰的並列処理と呼ばれるワークスティーリングを非常によくサポートする可能性があります。
OpenMP 仕様では、タスク コンストラクト (入れ子にすることができるため、再帰的並列処理に非常に適しています) を定義していますが、それらがどのように実装されているかについての詳細は指定されていません。gcc を含む OpenMP の実装では、通常、タスクに対してなんらかの形式のワーク スチールが使用されますが、正確なアルゴリズム (およびその結果のパフォーマンス) は異なる場合があります。
見#pragma omp task
て、#pragma omp taskwait
アップデート
C++ Concurrency in Actionの第 9 章では、「プール スレッドのワーク スティーリング」を実装する方法について説明しています。私は自分で読んだり実装したりしていませんが、それほど難しくはありません。
PPLのstructured_task_groupクラスは、その実装にワーク スティーリング キューを使用します。スレッド化に WSQ が必要な場合は、それをお勧めします。
実際にソースを探しているのであれば、コードが ppl.h で提供されているのか、それともプリコンパイル済みのオブジェクトがあるのかはわかりません。今夜帰宅したら確認しなければなりません。
この C プロジェクトを C++ に移植しました。
Steal
配列が拡張されると、オリジナルはダーティ リードを経験する可能性があります。私はバグを修正しようとしましたが、動的に成長するスタックが実際には必要なかったため、最終的にあきらめました。スペースを割り当てようとする代わりに、このPush
メソッドは単純に を返しますfalse
。次に、呼び出し元はスピン待機を実行できますwhile(!stack->Push(value)){}
。
#pragma once
#include <atomic>
// A lock-free stack.
// Push = single producer
// Pop = single consumer (same thread as push)
// Steal = multiple consumer
// All methods, including Push, may fail. Re-issue the request
// if that occurs (spinwait).
template<class T, size_t capacity = 131072>
class WorkStealingStack {
public:
inline WorkStealingStack() {
_top = 1;
_bottom = 1;
}
WorkStealingStack(const WorkStealingStack&) = delete;
inline ~WorkStealingStack()
{
}
// Single producer
inline bool Push(const T& item) {
auto oldtop = _top.load(std::memory_order_relaxed);
auto oldbottom = _bottom.load(std::memory_order_relaxed);
auto numtasks = oldbottom - oldtop;
if (
oldbottom > oldtop && // size_t is unsigned, validate the result is positive
numtasks >= capacity - 1) {
// The caller can decide what to do, they will probably spinwait.
return false;
}
_values[oldbottom % capacity].store(item, std::memory_order_relaxed);
_bottom.fetch_add(1, std::memory_order_release);
return true;
}
// Single consumer
inline bool Pop(T& result) {
size_t oldtop, oldbottom, newtop, newbottom, ot;
oldbottom = _bottom.fetch_sub(1, std::memory_order_release);
ot = oldtop = _top.load(std::memory_order_acquire);
newtop = oldtop + 1;
newbottom = oldbottom - 1;
// Bottom has wrapped around.
if (oldbottom < oldtop) {
_bottom.store(oldtop, std::memory_order_relaxed);
return false;
}
// The queue is empty.
if (oldbottom == oldtop) {
_bottom.fetch_add(1, std::memory_order_release);
return false;
}
// Make sure that we are not contending for the item.
if (newbottom == oldtop) {
auto ret = _values[newbottom % capacity].load(std::memory_order_relaxed);
if (!_top.compare_exchange_strong(oldtop, newtop, std::memory_order_acquire)) {
_bottom.fetch_add(1, std::memory_order_release);
return false;
}
else {
result = ret;
_bottom.store(newtop, std::memory_order_release);
return true;
}
}
// It's uncontended.
result = _values[newbottom % capacity].load(std::memory_order_acquire);
return true;
}
// Multiple consumer.
inline bool Steal(T& result) {
size_t oldtop, newtop, oldbottom;
oldtop = _top.load(std::memory_order_acquire);
oldbottom = _bottom.load(std::memory_order_relaxed);
newtop = oldtop + 1;
if (oldbottom <= oldtop)
return false;
// Make sure that we are not contending for the item.
if (!_top.compare_exchange_strong(oldtop, newtop, std::memory_order_acquire)) {
return false;
}
result = _values[oldtop % capacity].load(std::memory_order_relaxed);
return true;
}
private:
// Circular array
std::atomic<T> _values[capacity];
std::atomic<size_t> _top; // queue
std::atomic<size_t> _bottom; // stack
};
完全な要点 (単体テストを含む)。強力なアーキテクチャ (x86/64) でのみテストを実行したので、弱いアーキテクチャが進む限り、これを Neon/PPC などで使用しようとすると、走行距離が異なる場合があります。
JobSwarmがワーク スティーリングを使用しているとは思いませんが、これは最初のステップです。この目的のための他のオープン ソース ライブラリについては知りません。