現在、HttpWebRequest/Responseを使用するマルチスレッドダウンローダークラスがあります。すべて正常に動作します。超高速ですが、問題は、別のアプリにダウンロードするときにデータをストリーミングする必要があることです。つまり、最初のチャンクが最初に、次にキューの次のチャンクが正しい順序でストリーミングされる必要があります。現在、私のダウンローダークラスはsyncであり、Download()はbyte[]を返します。たとえば、非同期マルチスレッドクラスでは、4つの空の要素(スロット用)をリストし、Download()関数を使用してスロットの各インデックスを各スレッドに渡します。これは同期をシミュレートしますが、それは私が必要としているものではありません。最初のチャンクがダウンロードを開始するとすぐにデータがストリーミングされるようにするには、キューをどのように実行すればよいですか。
3 に答える
どのスレッドが最初のチャンクをダウンロードしているか、そしてその最初のチャンクがいつ使用できるようになるかを判断する方法について質問がある場合は、スレッドごとにイベントを使用して、どのスレッドに割り当てたチャンクを追跡します。最初のスレッド(データの最初のチャンクをダウンロードする)に渡すイベント、2番目のスレッド(データの2番目のチャンク)に渡すイベントなどを追跡します。メインスレッドまたは別のバックグラウンドスレッドを使用します。 (UIスレッドのブロックを回避するため)、最初のイベントを待ちます。最初のスレッドがチャンクのダウンロードを完了すると、最初のイベントを設定/通知します。その後、待機中のスレッドがウェイクアップし、データの最初のチャンクを使用できるようになります。
他のダウンロードスレッドも同じことを実行でき、実行が完了するとそれぞれのイベントを通知します。手動リセットイベントを使用して、誰も待機していない場合でもイベントが通知されたままになるようにします。順番にデータブロックを必要とするスレッドが最初のデータブロックの処理を終了すると、2番目のイベントを待機できます。2番目のイベントがすでに通知されている場合、待機はすぐに戻り、スレッドは2番目のデータブロックの処理を開始できます。
非常に大規模なダウンロードの場合は、ラウンドロビン方式でイベントとスレッドを再利用できます。データチャンクを消費するスレッドがデータチャンクを順番に消費し、それぞれのイベントを順番に待機している限り、それらが終了する順序は重要ではありません。
賢くて注意が必要な場合は、おそらく1つのイベントのみを使用してこれらすべてを実行できます。最初にnullに設定されたデータチャンクポインター/オブジェクトのグローバル配列を作成し、ワーカースレッドがデータのチャンクをダウンロードして、完成したチャンクをそれぞれのスロットに割り当てます。グローバル配列で、共有イベントを通知します。コンシューマースレッドはデータチャンクカウンターを保持するため、次に処理する必要のあるデータチャンクを認識し、共有イベントを待機し、シグナルが送信されると、グローバル配列の次のスロットを調べて、そこにデータが表示されているかどうかを確認します。次のスロットに順番にデータがない場合、コンシューマスレッドはイベントの待機に戻ります。また、ワーカースレッドが次にダウンロードする必要があるデータブロックを知る方法も必要です。ミューテックスで保護されているか、interlockedadd/exchangeを使用してアクセスされるグローバルカウンターで十分です。
同期されたマルチスレッドダウンローダーを作成するには、正しいデータ構造を作成する必要がありbyte[]
、データだけでは不十分です。
手順:
- コンテンツのサイズまたは各スレッドによってダウンロードされた約500KBの固定サイズのコンテンツダウンローダーに基づいて、ダウンロードを複数のチャンクに分割します。
- スレッドを開始するときに、チャンクインデックスを指定します-1番目の部分、2番目の部分など
- ダウンロードが可能になったら、チャンクインデックスに基づいて最終的なコンテンツを調整します。
興味がある場合は、prozilla(C、Linuxベース-at)またはAxelのコードを確認することをお勧めします。
ダウンロードを行うコードと、複数の非同期スレッドを開始するコードを表示できますか?
たぶん私はあなたのシナリオを完全には理解していませんが、私があなたなら、Async(responseStreamのBeginRead)を使用します。次に、次のことを行います。
void StartReading(Stream responseStream)
{
byte [] buffer = new byte[1024];
Context ctx = new Context();
ctx.Buffer = buffer;
ctx.InputStream = responseStream;
ctx.OutputStream = new MemoryStream(); // change this to be your output stream
responseStream.BeginRead(buffer, 0, buffer.Length; new AsyncCallback(ReadCallback), ctx);
}
void ReadCallback(IAsyncResult ar)
{
Context ctx = (Context)ar.AsyncState;
int read = 0;
try {
read = ctx.InputStream.EndRead(ar);
if (read > 0)
{
ctx.OutputStream.Write(ctx.Buffer, 0, read);
// kick off another async read
ctx.InputStream.BeginRead(ctx.Buffer, 0, ctx.Buffer.Length, new AsyncCallback(ReadCallback), ctx);
} else {
ctx.InputStream.Close();
ctx.OutputStream.Close();
}
} catch {
}
}
}