2

巨大なファイル (2 ~ 40 GB) を Azure Blob Storage にアップロードしたいと考えています。

まず、各ファイルをチャンクに分割することに成功しました (各チャンク = 2 MB)。次に、チャンクを 1 つずつアップロードします。各チャンクが正常にアップロードされたら、一時ファイルを更新して、アプリケーションが閉じられた場合にアップロードを再開できるようにします。

ここで、アップロード操作をマルチスレッドにしたいと考えています。TPL を見直した後、どこから始めればよいのか混乱しました。

TPLをどこから始めるべきか、何かガイダンスはありますか?

void Upload(int segmentId)
{
    try
    {
        string blockId = GetBlockId(segmentId);
        var segment = GetSegment(FilePath, segmentId, SeqmentSize);
        var md5Hash = CalcMd5Hash(segment);

        var blob = new CloudBlockBlob(_link.Uri);
        using (var memoryStream = new MemoryStream(segment))
        {
            blob.PutBlock(blockId, memoryStream, md5Hash);
        }

        SerializeStatus();
    }
    catch (Exception exception)
    {
        ...
    }
}
4

1 に答える 1

3

私はずっと前に似たようなものを作りました (ただし、TPL ではなく非同期アプローチを使用しました) upload really large blobs with resumable capability。これが私がしたことです:

  1. まず、ブロック サイズに基づいて、ファイルをチャンクに分割します。各チャンクには ID が割り当てられます。次に、チャンク ID とそのチャンクのステータスを保持するオブジェクトを作成しました。簡単にするために、チャンクの次のステータスを保持しました - NotStartedSuccessful、およびFailed.
  2. 次に、これらのチャンクのコレクションを作成し、そのデータをファイルにシリアル化しました。
  3. 並列スレッドの数 (x としましょう) に基づいて、ステータスが であるコレクションから x 個の項目をフェッチしますNotStarted。次に、これらのチャンクを並行して処理しました。チャンク ID をユーザー状態として渡したので、コールバックを受け取ったときに、アップロード ステータスに基づいて、それに応じてコレクションを更新し、データをシリアル化します。
  4. すべてのチャンクがアップロードされたら、失敗したチャンクがあるかどうかを確認しました。ある場合は、それらのチャンクを再試行します。
  5. すべてのチャンクが正常に完了したら、チャンク コレクションからブロック リストを作成し、そのブロック リストをコミットします。コミット ブロック リスト操作が成功した場合は、チャンク データを含むそのファイルを単純に削除しました。

お役に立てれば。

アップデート

この擬似コードを見て、これが役立つかどうかを確認してください。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace UploadLargeBlob
{
    class Program
    {
        static void Main(string[] args)
        {
            List<ChunkInformation> chunksToUpload = new List<ChunkInformation>();
            CreateChunkCollection("MyVeryLargeFile", 2*1024*1024);
            int numberOfParallelThreads = 8;
            do
            {
                var chunksToProcess = chunksToUpload.Where(c => c.Status == ChunkStatus.NotStarted || c.Status == ChunkStatus.Failed).Take(numberOfParallelThreads);
                if (chunksToProcess.Count() == 0)
                {
                    break;
                }
                List<Task> tasks = new List<Task>();
                try
                {
                    foreach (var chunk in chunksToProcess)
                    {
                        tasks.Add(Task.Factory.StartNew(() =>
                            {
                                DoUpload(chunk);
                            }, chunk));
                    }
                    Task.WaitAll(tasks.ToArray());
                }
                catch (AggregateException excep)
                {
                    foreach (var task in tasks)
                    {
                        if (task.Exception != null)
                        {
                            ChunkInformation chunk = task.AsyncState as ChunkInformation;
                            chunk.Status = ChunkStatus.Failed;
                            //Now serialize the data.
                        }
                    }
                }
            }
            while (true);
        }

        static void DoUpload(ChunkInformation chunk)
        {
            //Do the actual upload

            //Update chunk status once chunk is uploaded
            chunk.Status = ChunkStatus.Successful;

            //Serialize the data.
        }

        static void CreateChunkCollection(string fileName, int chunkSize)
        {
        }
    }

    public class ChunkInformation
    {
        public string Id
        {
            get;
            set;
        }

        public ChunkStatus Status
        {
            get;
            set;
        }
    }

    public enum ChunkStatus
    {
        NotStarted,
        Successful,
        Failed
    }
}
于 2013-08-13T11:37:41.943 に答える