3

8 コア x86 システムで実行することを目的とした 32 ビット Linux OS で、C/C++ を使用してバイナリ ファイルから符号なし整数を読み取るコードを作成していました。アプリケーションは、リトル エンディアン形式の符号なし整数を含む入力ファイルを次々と受け取ります。したがって、バイト単位の入力ファイル サイズは 4 の倍数です。ファイルには 10 億個の整数が含まれている可能性があります。すべての整数を読み取って加算し、合計を 64 ビット精度で返す最速の方法は何ですか?

以下は私の実装です。ここでは、破損データのエラー チェックは重要な問題ではなく、この場合、入力ファイルには問題がないと見なされます。

#include <iostream>
#include <fstream>
#include <pthread.h>
#include <string>
#include <string.h>


using namespace std;

string filepath;
unsigned int READBLOCKSIZE = 1024*1024;
unsigned long long nFileLength = 0;

unsigned long long accumulator = 0; // assuming 32 bit OS running on X86-64
unsigned int seekIndex[8] = {};
unsigned int threadBlockSize = 0; 
unsigned long long acc[8] = {};

pthread_t thread[8];
void* threadFunc(void* pThreadNum);

//time_t seconds1;
//time_t seconds2;

int main(int argc, char *argv[])
{
    if (argc < 2) 
    {
        cout << "Please enter a file path\n";
        return -1;
    }

    //seconds1 = time (NULL);
    //cout << "Start Time in seconds since January 1, 1970 -> " << seconds1 << "\n";

    string path(argv[1]);
    filepath = path;
    ifstream ifsReadFile(filepath.c_str(), ifstream::binary);  // Create FileStream for the file to be read
    if(0 == ifsReadFile.is_open()) 
    {
        cout << "Could not find/open input file\n";
        return -1;
    }

    ifsReadFile.seekg (0, ios::end);
    nFileLength = ifsReadFile.tellg();           // get file size
    ifsReadFile.seekg (0, ios::beg);



    if(nFileLength < 16*READBLOCKSIZE)
    {
        //cout << "Using One Thread\n"; //**
        char* readBuf = new char[READBLOCKSIZE];
        if(0 == readBuf) return -1;

        unsigned int startOffset = 0;   
        if(nFileLength >  READBLOCKSIZE)
        {
            while(startOffset + READBLOCKSIZE < nFileLength)
            {
                //ifsReadFile.flush();
                ifsReadFile.read(readBuf, READBLOCKSIZE);  // At this point ifsReadFile is open
                int* num = reinterpret_cast<int*>(readBuf);
                for(unsigned int i = 0 ; i < (READBLOCKSIZE/4) ; i++) 
                {
                    accumulator += *(num + i);  
                }
                startOffset += READBLOCKSIZE;
            }

        }

        if(nFileLength - (startOffset) > 0)
        {
            ifsReadFile.read(readBuf, nFileLength - (startOffset));  
            int* num = reinterpret_cast<int*>(readBuf);
            for(unsigned int i = 0 ; i < ((nFileLength - startOffset)/4) ; ++i) 
            {
                accumulator += *(num + i);  
            }
        }
        delete[] readBuf; readBuf = 0;
    }
    else
    {
        //cout << "Using 8 Threads\n"; //**
        unsigned int currthreadnum[8] = {0,1,2,3,4,5,6,7};
        if(nFileLength > 200000000) READBLOCKSIZE *= 16; // read larger blocks
        //cout << "Read Block Size -> " << READBLOCKSIZE << "\n";       

        if(nFileLength % 28)
        {
            threadBlockSize = (nFileLength / 28);
            threadBlockSize *= 4;
        }
        else
        {   
            threadBlockSize = (nFileLength / 7);
        }

        for(int i = 0; i < 8 ; ++i)
        {
            seekIndex[i] = i*threadBlockSize;
            //cout << seekIndex[i] << "\n";
        }
        pthread_create(&thread[0], NULL, threadFunc, (void*)(currthreadnum + 0));
        pthread_create(&thread[1], NULL, threadFunc, (void*)(currthreadnum + 1));
        pthread_create(&thread[2], NULL, threadFunc, (void*)(currthreadnum + 2));
        pthread_create(&thread[3], NULL, threadFunc, (void*)(currthreadnum + 3));
        pthread_create(&thread[4], NULL, threadFunc, (void*)(currthreadnum + 4));
        pthread_create(&thread[5], NULL, threadFunc, (void*)(currthreadnum + 5));
        pthread_create(&thread[6], NULL, threadFunc, (void*)(currthreadnum + 6));
        pthread_create(&thread[7], NULL, threadFunc, (void*)(currthreadnum + 7));

        pthread_join(thread[0], NULL);
        pthread_join(thread[1], NULL);
        pthread_join(thread[2], NULL);
        pthread_join(thread[3], NULL);
        pthread_join(thread[4], NULL);
        pthread_join(thread[5], NULL);
        pthread_join(thread[6], NULL);
        pthread_join(thread[7], NULL);

        for(int i = 0; i < 8; ++i)
        {
            accumulator += acc[i];
        }
    }

    //seconds2 = time (NULL);
    //cout << "End Time in seconds since January 1, 1970 -> " << seconds2 << "\n";
    //cout << "Total time to add " << nFileLength/4 << " integers -> " << seconds2 - seconds1 << " seconds\n";

    cout << accumulator << "\n";      
    return 0;
}

void* threadFunc(void* pThreadNum)
{
    unsigned int threadNum = *reinterpret_cast<int*>(pThreadNum);
    char* localReadBuf = new char[READBLOCKSIZE];
    unsigned int startOffset = seekIndex[threadNum];
    ifstream ifs(filepath.c_str(), ifstream::binary);  // Create FileStream for the file to be read
    if(0 == ifs.is_open()) 
    {
        cout << "Could not find/open input file\n";
        return 0;
    }   
    ifs.seekg (startOffset, ios::beg); // Seek to the correct offset for this thread
    acc[threadNum] = 0;
    unsigned int endOffset = startOffset + threadBlockSize;
    if(endOffset > nFileLength) endOffset = nFileLength; // for last thread
    //cout << threadNum << "-" << startOffset << "-" << endOffset << "\n"; 
    if((endOffset - startOffset) >  READBLOCKSIZE)
    {
        while(startOffset + READBLOCKSIZE < endOffset)
        {
            ifs.read(localReadBuf, READBLOCKSIZE);  // At this point ifs is open
            int* num = reinterpret_cast<int*>(localReadBuf);
            for(unsigned int i = 0 ; i < (READBLOCKSIZE/4) ; i++) 
            {
                acc[threadNum] += *(num + i);   
            }
            startOffset += READBLOCKSIZE;
        }   
    }

    if(endOffset - startOffset > 0)
    {
        ifs.read(localReadBuf, endOffset - startOffset);
        int* num = reinterpret_cast<int*>(localReadBuf);
        for(unsigned int i = 0 ; i < ((endOffset - startOffset)/4) ; ++i) 
        {
            acc[threadNum] += *(num + i);   
        }
    }

    //cout << "Thread " << threadNum + 1 << " subsum = " << acc[threadNum] << "\n"; //**
    delete[] localReadBuf; localReadBuf = 0;
    return 0;
}

テスト用の入力バイナリ ファイルを生成する小さな C# プログラムを作成しました。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.IO;

namespace BinaryNumWriter
{
    class Program
    {
        static UInt64 total = 0;
        static void Main(string[] args)
        {
            BinaryWriter bw = new BinaryWriter(File.Open("test.txt", FileMode.Create));
            Random rn = new Random();
            for (UInt32 i = 1; i <= 500000000; ++i)
            {
                UInt32 num = (UInt32)rn.Next(0, 0xffff);
                bw.Write(num);
                total += num;
            }
            bw.Flush();
            bw.Close();
        }
    }
}

2 GB RAM および Ubuntu 9.10 32 ビットを搭載した 3.33 Ghz の Core i5 マシン (クアッドコアですが、現時点で取得したもの) でプログラムを実行すると、次のパフォーマンス数値が得られました。

100 個の整数 ~ 0 秒 (そうでなければ本当にしゃくらなければならないでしょう) 100000 個の整数 < 0 秒 100000000 個の整数 ~ 7 秒 500000000 個の整数 ~ 29 秒 (1.86 GB の入力ファイル)

HDDが5400RPMなのか7200RPMなのかわかりません。読み込み用にさまざまなバッファ サイズを試してみたところ、大きな入力ファイルの場合、一度に 16 MB の読み込みが最適であることがわかりました。

全体的なパフォーマンスを向上させるために、ファイルからより速く読み取るためのより良い方法はありますか? 整数の大きな配列をより速く追加し、繰り返し折りたたむよりスマートな方法はありますか? 私がコードを書いた方法でパフォーマンスに大きな障害はありますか/多くの時間を費やしている明らかに間違ったことをしていますか?

このデータの読み取りと追加のプロセスを高速化するにはどうすればよいですか?

ありがとう。

チンメイ

4

2 に答える 2

3

大量のデータを高速に読み取り (または書き込み) たいが、そのデータをあまり処理したくない場合は、バッファー間でのデータの余分なコピーを避ける必要があります。つまり、fstream または FILE の抽象化を避けて (コピーする必要がある余分なバッファーが導入されるため)、カーネルとユーザーのバッファー間で内容をコピーする読み取り/書き込み型の呼び出しを回避する必要があります。

代わりに、Linux では mmap(2) を使用します。64 ビット OS では、ファイル全体をメモリに mmap するだけで、madvise(MADV_SEQUENTIAL)ほぼ順次アクセスすることをカーネルに伝えるために使用できます。32 ビット OS の場合、毎回前のチャンクのマッピングを解除して、チャンクで mmap する必要があります。現在の構造によく似たもので、各スレッドが一度に 1 つの固定サイズのチャンクを mmap すると、うまく機能するはずです。

于 2012-06-08T19:22:54.673 に答える
3

あなたのやり方で複数のスレッドから機械式 HDD にアクセスするには、いくらかの頭の動きが必要です (読む速度を落としてください)。ほぼ確実に IO バウンドです (1.86GB ファイルで 65MBps)。次の方法で戦略を変更してみてください。

  • 8 つのスレッドを開始します - それらを CONSUMERS と呼びます
  • 8 つのスレッドは、データが利用可能になるまで待機します。
  • メインスレッドでファイルのチャンク(たとえば256KB)の読み取りを開始するため、消費者のプロバイダーになります
  • メイン スレッドが EOF に達し、ワーカーに avail データがなくなったことを知らせます。
  • メインスレッドは 8 つのワーカーが参加するのを待ちます。

完璧に動作させるには、かなりの同期が必要です。シーケンシャルなファイルアクセスを行うことで、HDD /ファイルシステムのIO機能を完全に最大限に活用できると思います. 小さいファイルの YMMV は、超高速でキャッシュしてキャッシュから提供できます。

あなたが試すことができる別の方法は、7 つのスレッドだけを開始し、メインスレッドとシステムの残りの部分のために 1 つの空き CPU を残すことです。

..またはSSDを入手してください:)

編集:

簡単にするために、処理なしでシングルスレッドでファイルを読み取る (バッファーを破棄する) 速度を確認してください。それに加えてイプシロンは、これをどれだけ速く実行できるかについての理論上の限界です。

于 2012-06-08T18:25:48.247 に答える