ファイルを行数で読み取り、分割/チャンクするにはどうすればよいですか?
行が 2 つ以上のバッファー間で分割されないようにしながら、ファイルを個別のバッファーに分割したいと考えています。これらのバッファーを独自の pthread に渡して、ある種の同時/非同期処理を実行できるようにする予定です。
cを使用してLinuxでチャンクを読み書きする以下の回答を読みましたが、行が2つ以上のバッファーに分割されないようにすることに関する質問に正確に答えているとは思いません。
ファイルを行数で読み取り、分割/チャンクするにはどうすればよいですか?
行が 2 つ以上のバッファー間で分割されないようにしながら、ファイルを個別のバッファーに分割したいと考えています。これらのバッファーを独自の pthread に渡して、ある種の同時/非同期処理を実行できるようにする予定です。
cを使用してLinuxでチャンクを読み書きする以下の回答を読みましたが、行が2つ以上のバッファーに分割されないようにすることに関する質問に正確に答えているとは思いません。
ファイルはどのようにエンコードされていますか? 各バイトが文字を表す場合、次のようにします。
mmap()
。'\n'
.チャンクサイズをバイト単位で選択します。次に、ファイル内の適切な場所を探して、改行を取得するまで、一度にいくつかの小さいバイト数を読み取ります。
最初のチャンクの最後の文字は改行です。2 番目のチャンクの最初の文字は、改行の後の文字です。
常に pagesize() 境界を探し、改行を検索するために一度に pagesize() バイトを読み取ります。これにより、境界を見つけるためにディスクから必要最小限のみを取得するようになります。一度に 128 バイト程度の読み取りを試みることができます。ただし、システムコールをさらに作成するリスクがあります。
文字の頻度をカウントするためにこれを行うサンプルプログラムを作成しました。もちろん、これはほぼ確実にIOバウンドであるため、スレッドに分割することはほとんど無意味です。また、行指向ではないため、改行がどこにあるかも問題ではありません。しかし、それはほんの一例です。また、かなり完全な C++11 実装を持っていることに大きく依存しています。
それらの重要な機能は次のとおりです。
// Find the offset of the next newline given a particular desired offset.
off_t next_linestart(int fd, off_t start)
{
using ::std::size_t;
using ::ssize_t;
using ::pread;
const size_t bufsize = 4096;
char buf[bufsize];
for (bool found = false; !found;) {
const ssize_t result = pread(fd, buf, bufsize, start);
if (result < 0) {
throw ::std::system_error(errno, ::std::system_category(),
"Read failure trying to find newline.");
} else if (result == 0) {
// End of file
found = true;
} else {
const char * const nl_loc = ::std::find(buf, buf + result, '\n');
if (nl_loc != (buf + result)) {
start += ((nl_loc - buf) + 1);
found = true;
} else {
start += result;
}
}
}
return start;
}
を使用していることにも注意してpread
ください。複数のスレッドがファイルの異なる部分から読み取る場合、これは絶対に不可欠です。
ファイル記述子は、スレッド間の共有リソースです。1 つのスレッドが通常の関数を使用してファイルから読み取ると、この共有リソースに関する詳細であるファイル ポインターが変更されます。ファイル ポインタは、次の読み取りが発生するファイル内の位置です。
lseek
毎回読む前に を使用するだけでは、lseek
と の間で競合状態が発生するため、これは役に立ちませんread
。
このpread
関数を使用すると、ファイル内の特定の場所から一連のバイトを読み取ることができます。また、ファイル ポインタはまったく変更されません。ファイルポインタを変更しないという事実は別として、それ以外の点では、同じ呼び出しでanlseek
と aを組み合わせるようなものです。read
バッファのクラスを定義します。それぞれに、ページ サイズの倍数である大きなバッファー スペースと開始/終了インデックス、渡されたストリームからバッファーを読み取るメソッド、および別の *buffer インスタンスをパラメーターとして受け取る 'lineParse' メソッドを与えます。
いくつかの *buffer を作成し、それらをプロデューサー/コンシューマー プール キューに格納します。ファイルを開き、プールからバッファーを取得し、バッファー領域を最初から最後まで読み取ります (エラー/EOF のブール値を返します)。プールから別の *buffer を取得し、それを以前のものの lineparse() に渡します。そこで、データの末尾から逆方向に検索して、newLine を探します。見つかったら、終了インデックスをリロードし、最後の行のフラグメントを memcpy して (ある場合は、幸運なこともあります:)、渡された新しい *buffer に入れ、その開始インデックスを設定します。最初のバッファーには行全体が含まれるようになり、行を処理するスレッドのキューに入れることができます。2 番目のバッファーには、最初のバッファーからコピーされた行のフラグメントが含まれており、より多くのデータをディスクから開始インデックスのバッファー領域に読み取ることができます。
ライン処理スレッドは、「使用済み」*バッファをプールにリサイクルできます。
EOF (またはエラー:) まで続行します。
可能であれば、バッファの処理を行うメソッドをバッファ クラスに追加します。
大きなバッファ クラスを使用して最後から解析する方が、小さなビットを継続的に読み取り、最初から改行を探すよりもはるかに効率的です。スレッド間の通信は遅く、渡すことができるバッファが大きいほど良いです。
バッファーのプールを使用すると、継続的な新規作成/削除が不要になり、フロー制御が提供されます。ディスク読み取りスレッドが処理よりも高速な場合、プールは空になり、ディスク読み取りスレッドは、使用されているバッファーがリサイクルされるまでブロックされます。これにより、メモリの暴走を防ぎます。
複数の処理スレッドを使用する場合、バッファが「順不同」に処理される可能性があることに注意してください。これは問題になる場合とそうでない場合があります。
このシナリオでは、行がディスク読み取りレイテンシーと並行して処理されることの利点が、スレッド間通信のオーバーヘッドよりも大きくなるようにすることによってのみ得ることができます。スレッド間で小さなバッファーを通信することは、逆効果になる可能性が非常に高くなります。
最大のスピードアップは、ネットワーク化されたディスクで全体的に高速ですが、大きなレイテンシーが発生します。