1

私は C++ に関する知識が乏しいため、これに 2 日間苦労しました。私がする必要があるのは、protobuf C++ API を使用して、何百万ものメッセージを含む可能性のある大きなファイルから一連のメッセージを解析することです。この投稿で説明されているように、いつでも「ReadVarInt32」を実行してサイズを取得し、CodedInputStream にプッシュされた制限で ParseFromCodedStream を実行できるため、ファイルから直接読み取るのは簡単です。ただし、私が使用している I/O レベルの API (実際には libuv) では、読み取りコールバック アクションごとに固定サイズのバッファーを割り当てる必要があります。どうやらそのブロックサイズは、私が読んでいるメッセージサイズとは何の関係もありません。

これは私の人生を困難にします。基本的に、ファイルから読み取り、固定サイズのバッファー (16K など) を埋めるたびに、そのバッファーにはおそらく数百の完全な protobuf メッセージが含まれますが、そのバッファーの最後のチャンクは不完全なメッセージになる可能性があります。それで、私がすべきことは、できる限り多くのメッセージを読み取ろうとすることであり、最後に、最後のチャンクを抽出して、読み取った次の 16K バッファの先頭にアタッチし、EOF に到達するまで続けます。ファイル。ReadVarInt32() を使用してサイズを取得し、その数値を残りのバッファー サイズと比較します。メッセージ サイズが小さい場合は、読み取りを続けます。

GetDirectBufferPointerという API があるので、これを使用して、次のメッセージのサイズを読み取るにポインターの位置を記録しようとします。ただし、エンディアンの奇妙さのために、ポインターが開始して次のチャンクにアタッチする場所からバイト配列の残りを抽出するだけでは、Parse は成功せず、実際には最初の数バイト (8 だと思います) が完全に台無しになっていると思われます。 .

あるいは、codedStream.ReadRaw() を実行して残りのストリームをバッファーに書き込み、新しいチャンクの先頭にアタッチすると、データが破損することはありません。しかし問題は、今回は「ReadVarInt32」で既に「読み取られている」ため、「サイズ」バイト情報が失われることです! そして、前回読んだサイズ情報を覚えていて、次の反復 message.ParseFromCodedStream() で直接呼び出しても、1 バイト少なくなり、一部が破損してオブジェクトを正常に復元できません。

std::vector<char> mCheckBuffer;
std::vector<char> mResidueBuffer;
char bResidueBuffer[READ_BUFFER_SIZE];
char temp[READ_BUFFER_SIZE];
google::protobuf::uint32 size;
//"in" is the file input stream
while (in.good()) {
    in.read(mReadBuffer.data(), READ_BUFFER_SIZE);
    mCheckBuffer.clear();
    //merge the last remaining chunk that contains incomplete message with
    //the new data chunk I got out from buffer. Excuse my terrible C++ foo
    std::merge(mResidueBuffer.begin(), mResidueBuffer.end(),  
    mReadBuffer.begin(), mReadBuffer.end(), std::back_inserter(mCheckBuffer));

    //Treat the new merged buffer array as the new CIS
    google::protobuf::io::ArrayInputStream ais(&mCheckBuffer[0], 
    mCheckBuffer.size());
    google::protobuf::io::CodedInputStream cis(&ais);
    //Record the pointer location on CIS in bResidueBuffer
    cis.GetDirectBufferPointer((const void**)&bResidueBuffer,
    &bResidueBufSize);

    //No size information, probably first time or last iteration  
    //coincidentally read a complete message out. Otherwise I simply 
    //skip reading size again as I've already populated that from last 
    //iteration when I got an incomplete message
    if(size == 0) {
         cis.ReadVarint32(&size);
    }
    //Have to read this again to get remaining buffer size
    cis.GetDirectBufferPointer((const void**)&temp, &mResidueBufSize);

    //Compare the next message size with how much left in the buffer, if      
    //message size is smaller, I know I can read at least one more message 
    //out, keep reading until I run out of buffer, or, it's the end of message 
    //and my buffer just allocated larger so size should be 0
    while (size <= mResidueBufSize && size != 0) {
        //If this cis I constructed didn't have the size info at the beginning, 
        //and I just read straight from it hoping to get the message out from 
        //the "size" I got from last iteration, it simply doesn't work
        //(read one less byte in fact, and some part of the message corrupted)
        //push the size constraint to the input stream;
        int limit = cis.PushLimit(size);
        //parse message from the input stream
        message.ParseFromCodedStream(&cis);  
        cis.PopLimit(limit);
        google::protobuf::TextFormat::PrintToString(message, &str);
        printf("%s", str.c_str());
        //do something with the parsed object
        //Now I have to record the new pointer location again
        cis.GetDirectBufferPointer((const void**)&bResidueBuffer, 
        &bResidueBufSize);
        //Read another time the next message's size and go back to while loop check
        cis.ReadVarint32(&size);

    }
    //If I do the next line, bResidueBuffer will have the correct CIS information 
    //copied over, but not having the "already read" size info
    cis.ReadRaw(bResidueBuffer, bResidueBufSize);
    mResidueBuffer.clear();
    //I am constructing a new vector that receives the residual chunk of the 
    //current buffer that isn't enough to restore a message
    //If I don't do ReadRaw, this copy completely messes up at least the first 8 
    //bytes of the copied buffer's value, due to I suspect endianness
    mResidueBuffer.insert(mResidueBuffer.end(), &bResidueBuffer[0], 
    &bResidueBuffer[bResidueBufSize]);
}

私は今、本当に考えがわかりません。固定サイズの中間バッファーをまったく必要とする API で protobuf を適切に使用することさえ可能ですか? ご意見をお寄せいただきありがとうございます。

4

2 に答える 2

1

あなたのコードには 2 つの大きな問題があります。

std::merge(mResidueBuffer.begin(), mResidueBuffer.end(),  
mReadBuffer.begin(), mReadBuffer.end(), std::back_inserter(mCheckBuffer));

バッファを連結することを期待しているように見えますstd::mergeが、実際には、この関数は、MergeSort の意味で、2 つの並べ替えられた配列を 1 つの並べ替えられた配列にマージします。これは、このコンテキストでは意味がありません。mCheckBuffer にはナンセンスが含まれてしまいます。

cis.GetDirectBufferPointer((const void**)&bResidueBuffer,
&bResidueBufSize);

&bResidueBufferここでは、互換性のないポインター型にキャストしています。bResidueBufferは char 配列であるため&bResidueBuffer、ポインターへのポインターではないchar 配列へのポインターです。配列は暗黙的にポインターに変換できるため (ポインターは配列の最初の要素を指す)、これは確かに混乱を招きますが、これは実際には変換bResidueBufferです。それ自体はポインターではなく、ポインターに変換するだけです。

あなたも何をするのか誤解していると思いますGetDirectBufferPointer()。残りのバッファを にコピーするように見えますbResidueBufferが、メソッドはデータをコピーしません。このメソッドは、元のバッファーを指すポインターを返します。

それを呼び出す正しい方法は、次のようなものです。

const void* ptr;
int size;
cis.GetDirectBufferPointer(&ptr, &size);

ptrこれで、元のバッファーがポイントされます。これをバッファの先頭へのポインタと比較して、ストリーム内のどこにいるかを確認できます。次のようにします。

size_t pos = (const char*)ptr - &mCheckBuffer[0];

しかし、まさにこの目的のためCodedInputStreamのメソッドが既にあるため、それを行うべきではありません。CurrentPosition()これにより、バッファ内の現在のバイト オフセットが返されます。したがって、代わりにそれを使用してください。

于 2015-03-20T07:46:46.543 に答える