68

C++とJavaの両方でファイルから複数のProtocolBuffersメッセージを読み書きしようとしています。Googleはメッセージの前に長さのプレフィックスを書くことを提案していますが、デフォルトではそれを行う方法はありません(私が見ることができます)。

ただし、バージョン2.1.0のJava APIは、明らかにその仕事をする一連の「区切られた」I/O関数を受け取りました。

parseDelimitedFrom
mergeDelimitedFrom
writeDelimitedTo

C ++に相当するものはありますか?そうでない場合は、Java APIが付加するサイズプレフィックスのワイヤー形式は何ですか?C ++でそれらのメッセージを解析できますか?


アップデート:

これらは現在google/protobuf/util/delimited_message_util.h、v3.3.0の時点で存在しています。

4

11 に答える 11

81

私はここでパーティーに少し遅れていますが、以下の実装には他の回答から欠落しているいくつかの最適化が含まれており、64MBの入力後に失敗することはありません(ただし、ストリーム全体ではなく、個々のメッセージに64MBの制限が適用されます)。

(私はC++およびJavaprotobufライブラリの作成者ですが、Googleで働いていません。このコードが公式ライブラリに組み込まれなかったことを残念に思います。これがあった場合のようになります。)

bool writeDelimitedTo(
    const google::protobuf::MessageLite& message,
    google::protobuf::io::ZeroCopyOutputStream* rawOutput) {
  // We create a new coded stream for each message.  Don't worry, this is fast.
  google::protobuf::io::CodedOutputStream output(rawOutput);

  // Write the size.
  const int size = message.ByteSize();
  output.WriteVarint32(size);

  uint8_t* buffer = output.GetDirectBufferForNBytesAndAdvance(size);
  if (buffer != NULL) {
    // Optimization:  The message fits in one buffer, so use the faster
    // direct-to-array serialization path.
    message.SerializeWithCachedSizesToArray(buffer);
  } else {
    // Slightly-slower path when the message is multiple buffers.
    message.SerializeWithCachedSizes(&output);
    if (output.HadError()) return false;
  }

  return true;
}

bool readDelimitedFrom(
    google::protobuf::io::ZeroCopyInputStream* rawInput,
    google::protobuf::MessageLite* message) {
  // We create a new coded stream for each message.  Don't worry, this is fast,
  // and it makes sure the 64MB total size limit is imposed per-message rather
  // than on the whole stream.  (See the CodedInputStream interface for more
  // info on this limit.)
  google::protobuf::io::CodedInputStream input(rawInput);

  // Read the size.
  uint32_t size;
  if (!input.ReadVarint32(&size)) return false;

  // Tell the stream not to read beyond that size.
  google::protobuf::io::CodedInputStream::Limit limit =
      input.PushLimit(size);

  // Parse the message.
  if (!message->MergeFromCodedStream(&input)) return false;
  if (!input.ConsumedEntireMessage()) return false;

  // Release the limit.
  input.PopLimit(limit);

  return true;
}
于 2014-04-08T03:49:06.390 に答える
17

さて、私は必要なものを実装するトップレベルのC ++関数を見つけることができませんでしたが、Java APIリファレンスを調べたところ、MessageLiteインターフェイス内で次のことがわかりました。

void writeDelimitedTo(OutputStream output)
/*  Like writeTo(OutputStream), but writes the size of 
    the message as a varint before writing the data.   */

したがって、Javaサイズプレフィックスは(プロトコルバッファ)変数です!

その情報を武器に、C ++ APIを調べて、次のようなCodedStreamヘッダーを見つけました。

bool CodedInputStream::ReadVarint32(uint32 * value)
void CodedOutputStream::WriteVarint32(uint32 value)

それらを使用して、私は仕事をする私自身のC++関数を転がすことができるはずです。

ただし、実際にはこれをメインのメッセージAPIに追加する必要があります。Javaに機能があることを考えると、機能が不足しています。MarcGravellの優れたprotobuf-net C#ポート(SerializeWithLengthPrefixおよびDeserializeWithLengthPrefix経由)も同様です。

于 2010-02-26T12:53:21.057 に答える
12

CodedOutputStream / ArrayOutputStreamを使用してメッセージを(サイズで)書き込み、CodedInputStream / ArrayInputStreamを使用してメッセージを(サイズで)読み取ることで、同じ問題を解決しました。

たとえば、次の擬似コードは、メッセージの後にメッセージサイズを書き込みます。

const unsigned bufLength = 256;
unsigned char buffer[bufLength];
Message protoMessage;

google::protobuf::io::ArrayOutputStream arrayOutput(buffer, bufLength);
google::protobuf::io::CodedOutputStream codedOutput(&arrayOutput);

codedOutput.WriteLittleEndian32(protoMessage.ByteSize());
protoMessage.SerializeToCodedStream(&codedOutput);

書き込むときは、バッファがメッセージ(サイズを含む)に収まるのに十分な大きさであることも確認する必要があります。また、読み取るときは、バッファにメッセージ全体(サイズを含む)が含まれていることを確認する必要があります。

JavaAPIによって提供されるものと同様の便利なメソッドをC++APIに追加すると、間違いなく便利です。

于 2010-02-26T13:19:01.503 に答える
8

IsteamInputStreamは、std :: istreamと一緒に使用すると簡単に発生する、eofsやその他のエラーに対して非常に脆弱です。この後、protobufストリームは永続的に損傷し、すでに使用されているバッファデータはすべて破棄されます。protobufの従来のストリームからの読み取りには適切なサポートがあります。

これを実装して、 CopyingInputStreamAdaptergoogle::protobuf::io::CopyingInputStreamと一緒に使用します。出力バリアントについても同じようにします。

実際には、解析呼び出しはgoogle::protobuf::io::CopyingInputStream::Read(void* buffer, int size)、バッファーが指定された場所で終了します。残された唯一のことは、どういうわけかそれに読み込まれることです。

Asio同期ストリーム( SyncReadStream / SyncWriteStream)で使用する例を次に示します。

#include <google/protobuf/io/zero_copy_stream_impl_lite.h>

using namespace google::protobuf::io;


template <typename SyncReadStream>
class AsioInputStream : public CopyingInputStream {
    public:
        AsioInputStream(SyncReadStream& sock);
        int Read(void* buffer, int size);
    private:
        SyncReadStream& m_Socket;
};


template <typename SyncReadStream>
AsioInputStream<SyncReadStream>::AsioInputStream(SyncReadStream& sock) :
    m_Socket(sock) {}


template <typename SyncReadStream>
int
AsioInputStream<SyncReadStream>::Read(void* buffer, int size)
{
    std::size_t bytes_read;
    boost::system::error_code ec;
    bytes_read = m_Socket.read_some(boost::asio::buffer(buffer, size), ec);

    if(!ec) {
        return bytes_read;
    } else if (ec == boost::asio::error::eof) {
        return 0;
    } else {
        return -1;
    }
}


template <typename SyncWriteStream>
class AsioOutputStream : public CopyingOutputStream {
    public:
        AsioOutputStream(SyncWriteStream& sock);
        bool Write(const void* buffer, int size);
    private:
        SyncWriteStream& m_Socket;
};


template <typename SyncWriteStream>
AsioOutputStream<SyncWriteStream>::AsioOutputStream(SyncWriteStream& sock) :
    m_Socket(sock) {}


template <typename SyncWriteStream>
bool
AsioOutputStream<SyncWriteStream>::Write(const void* buffer, int size)
{   
    boost::system::error_code ec;
    m_Socket.write_some(boost::asio::buffer(buffer, size), ec);
    return !ec;
}

使用法:

AsioInputStream<boost::asio::ip::tcp::socket> ais(m_Socket); // Where m_Socket is a instance of boost::asio::ip::tcp::socket
CopyingInputStreamAdaptor cis_adp(&ais);
CodedInputStream cis(&cis_adp);

Message protoMessage;
uint32_t msg_size;

/* Read message size */
if(!cis.ReadVarint32(&msg_size)) {
    // Handle error
 }

/* Make sure not to read beyond limit of message */
CodedInputStream::Limit msg_limit = cis.PushLimit(msg_size);
if(!msg.ParseFromCodedStream(&cis)) {
    // Handle error
}

/* Remove limit */
cis.PopLimit(msg_limit);
于 2012-11-15T09:07:20.423 に答える
7

どうぞ:

#include <google/protobuf/io/zero_copy_stream_impl.h>
#include <google/protobuf/io/coded_stream.h>

using namespace google::protobuf::io;

class FASWriter 
{
    std::ofstream mFs;
    OstreamOutputStream *_OstreamOutputStream;
    CodedOutputStream *_CodedOutputStream;
public:
    FASWriter(const std::string &file) : mFs(file,std::ios::out | std::ios::binary)
    {
        assert(mFs.good());

        _OstreamOutputStream = new OstreamOutputStream(&mFs);
        _CodedOutputStream = new CodedOutputStream(_OstreamOutputStream);
    }

    inline void operator()(const ::google::protobuf::Message &msg)
    {
        _CodedOutputStream->WriteVarint32(msg.ByteSize());

        if ( !msg.SerializeToCodedStream(_CodedOutputStream) )
            std::cout << "SerializeToCodedStream error " << std::endl;
    }

    ~FASWriter()
    {
        delete _CodedOutputStream;
        delete _OstreamOutputStream;
        mFs.close();
    }
};

class FASReader
{
    std::ifstream mFs;

    IstreamInputStream *_IstreamInputStream;
    CodedInputStream *_CodedInputStream;
public:
    FASReader(const std::string &file), mFs(file,std::ios::in | std::ios::binary)
    {
        assert(mFs.good());

        _IstreamInputStream = new IstreamInputStream(&mFs);
        _CodedInputStream = new CodedInputStream(_IstreamInputStream);      
    }

    template<class T>
    bool ReadNext()
    {
        T msg;
        unsigned __int32 size;

        bool ret;
        if ( ret = _CodedInputStream->ReadVarint32(&size) )
        {   
            CodedInputStream::Limit msgLimit = _CodedInputStream->PushLimit(size);
            if ( ret = msg.ParseFromCodedStream(_CodedInputStream) )
            {
                _CodedInputStream->PopLimit(msgLimit);      
                std::cout << mFeed << " FASReader ReadNext: " << msg.DebugString() << std::endl;
            }
        }

        return ret;
    }

    ~FASReader()
    {
        delete _CodedInputStream;
        delete _IstreamInputStream;
        mFs.close();
    }
};
于 2012-10-02T06:06:25.290 に答える
7

私はC++とPythonの両方で同じ問題に遭遇しました。

C ++バージョンでは、Kenton Vardaがこのスレッドに投稿したコードと、彼がprotobufチームに送信したプルリクエストのコードを組み合わせて使用​​しました(ここに投稿されたバージョンはEOFを処理しませんが、githubに送信したバージョンは処理します) )。

#include <google/protobuf/message_lite.h>
#include <google/protobuf/io/zero_copy_stream.h>
#include <google/protobuf/io/coded_stream.h>


bool writeDelimitedTo(const google::protobuf::MessageLite& message,
    google::protobuf::io::ZeroCopyOutputStream* rawOutput)
{
    // We create a new coded stream for each message.  Don't worry, this is fast.
    google::protobuf::io::CodedOutputStream output(rawOutput);

    // Write the size.
    const int size = message.ByteSize();
    output.WriteVarint32(size);

    uint8_t* buffer = output.GetDirectBufferForNBytesAndAdvance(size);
    if (buffer != NULL)
    {
        // Optimization:  The message fits in one buffer, so use the faster
        // direct-to-array serialization path.
        message.SerializeWithCachedSizesToArray(buffer);
    }

    else
    {
        // Slightly-slower path when the message is multiple buffers.
        message.SerializeWithCachedSizes(&output);
        if (output.HadError())
            return false;
    }

    return true;
}

bool readDelimitedFrom(google::protobuf::io::ZeroCopyInputStream* rawInput, google::protobuf::MessageLite* message, bool* clean_eof)
{
    // We create a new coded stream for each message.  Don't worry, this is fast,
    // and it makes sure the 64MB total size limit is imposed per-message rather
    // than on the whole stream.  (See the CodedInputStream interface for more
    // info on this limit.)
    google::protobuf::io::CodedInputStream input(rawInput);
    const int start = input.CurrentPosition();
    if (clean_eof)
        *clean_eof = false;


    // Read the size.
    uint32_t size;
    if (!input.ReadVarint32(&size))
    {
        if (clean_eof)
            *clean_eof = input.CurrentPosition() == start;
        return false;
    }
    // Tell the stream not to read beyond that size.
    google::protobuf::io::CodedInputStream::Limit limit = input.PushLimit(size);

    // Parse the message.
    if (!message->MergeFromCodedStream(&input)) return false;
    if (!input.ConsumedEntireMessage()) return false;

    // Release the limit.
    input.PopLimit(limit);

    return true;
}

そして、これが私のpython2実装です:

from google.protobuf.internal import encoder
from google.protobuf.internal import decoder

#I had to implement this because the tools in google.protobuf.internal.decoder
#read from a buffer, not from a file-like objcet
def readRawVarint32(stream):
    mask = 0x80 # (1 << 7)
    raw_varint32 = []
    while 1:
        b = stream.read(1)
        #eof
        if b == "":
            break
        raw_varint32.append(b)
        if not (ord(b) & mask):
            #we found a byte starting with a 0, which means it's the last byte of this varint
            break
    return raw_varint32

def writeDelimitedTo(message, stream):
    message_str = message.SerializeToString()
    delimiter = encoder._VarintBytes(len(message_str))
    stream.write(delimiter + message_str)

def readDelimitedFrom(MessageType, stream):
    raw_varint32 = readRawVarint32(stream)
    message = None

    if raw_varint32:
        size, _ = decoder._DecodeVarint32(raw_varint32, 0)

        data = stream.read(size)
        if len(data) < size:
            raise Exception("Unexpected end of file")

        message = MessageType()
        message.ParseFromString(data)

    return message

#In place version that takes an already built protobuf object
#In my tests, this is around 20% faster than the other version 
#of readDelimitedFrom()
def readDelimitedFrom_inplace(message, stream):
    raw_varint32 = readRawVarint32(stream)

    if raw_varint32:
        size, _ = decoder._DecodeVarint32(raw_varint32, 0)

        data = stream.read(size)
        if len(data) < size:
            raise Exception("Unexpected end of file")

        message.ParseFromString(data)

        return message
    else:
        return None

これは見栄えの良いコードではない可能性があり、かなりリファクタリングできると確信していますが、少なくともそれを行う1つの方法を示しているはずです。

今大きな問題:それは遅いです。

python-protobufのC++実装を使用する場合でも、純粋なC++よりも1桁遅くなります。私は、ファイルからそれぞれ最大30バイトの1,000万個のprotobufメッセージを読み取るベンチマークを持っています。C ++では約0.9秒、Pythonでは35秒かかります。

少し速くする1つの方法は、このコードのようにファイルから読み取ってからデコードするのではなく、varintデコーダーを再実装して、ファイルから読み取って一度にデコードすることです。(プロファイリングは、varintエンコーダー/デコーダーでかなりの時間が費やされていることを示しています)。しかし、言うまでもなく、PythonバージョンとC++バージョンの間のギャップを埋めるにはそれだけでは十分ではありません。

それをより速くするためのどんなアイデアも大歓迎です:)

于 2015-12-31T01:13:36.103 に答える
4

完全を期すために、protobufとPython3のマスターバージョンで動作する最新バージョンをここに投稿します

C ++バージョンの場合、delimited_message_utils.h(ここではMWE)のutilsを使用するだけで十分です。

#include <google/protobuf/io/zero_copy_stream_impl.h>
#include <google/protobuf/util/delimited_message_util.h>

#include <stdio.h>
#include <fcntl.h>
#include <unistd.h>

template <typename T>
bool writeManyToFile(std::deque<T> messages, std::string filename) {
    int outfd = open(filename.c_str(), O_WRONLY | O_CREAT | O_TRUNC);
    google::protobuf::io::FileOutputStream fout(outfd);

    bool success;
    for (auto msg: messages) {
        success = google::protobuf::util::SerializeDelimitedToZeroCopyStream(
            msg, &fout);
        if (! success) {
            std::cout << "Writing Failed" << std::endl;
            break;
        }
    }
    fout.Close();
    close(outfd);
    return success;
}

template <typename T>
std::deque<T> readManyFromFile(std::string filename) {
    int infd = open(filename.c_str(), O_RDONLY);

    google::protobuf::io::FileInputStream fin(infd);
    bool keep = true;
    bool clean_eof = true;
    std::deque<T> out;

    while (keep) {
        T msg;
        keep = google::protobuf::util::ParseDelimitedFromZeroCopyStream(
            &msg, &fin, nullptr);
        if (keep)
            out.push_back(msg);
    }
    fin.Close();
    close(infd);
    return out;
}

Python3バージョンの場合、@ firebootの回答に基づいて、変更が必要なのはraw_varint32のデコードだけです。

def getSize(raw_varint32):
    result = 0
    shift = 0
    b = six.indexbytes(raw_varint32, 0)
    result |= ((ord(b) & 0x7f) << shift)
    return result

def readDelimitedFrom(MessageType, stream):
    raw_varint32 = readRawVarint32(stream)
    message = None

    if raw_varint32:
        size = getSize(raw_varint32)

        data = stream.read(size)
        if len(data) < size:
            raise Exception("Unexpected end of file")

        message = MessageType()
        message.ParseFromString(data)

    return message
于 2019-11-27T15:31:51.223 に答える
3

これに対する解決策も探していました。これが私たちのソリューションのコアです。いくつかのJavaコードが多くのMyRecordメッセージをwriteDelimitedToファイルに書き込んだと仮定しています。ファイルを開いてループし、次のようにします。

if(someCodedInputStream-> ReadVarint32(&bytes)){
  CodedInputStream :: Limit msgLimit = someCodedInputStream-> PushLimit(bytes);
  if(myRecord-> ParseFromCodedStream(someCodedInputStream)){
    //解析されたMyRecordインスタンスで作業を行います
  } そうしないと {
    //解析エラーを処理します
  }
  someCodedInputStream-> PopLimit(msgLimit);
} そうしないと {
  //ファイルの終わりかもしれません
}

それが役に立てば幸い。

于 2011-06-30T09:43:42.960 に答える
0

プロトコルバッファのobjective-cバージョンを使用して、この正確な問題に遭遇しました。iOSクライアントから、最初のバイトとして長さを期待するparseDelimitedFromを使用するJavaベースのサーバーに送信するとき、最初にwriteRawByteをCodedOutputStreamに呼び出す必要がありました。この問題に遭遇した他の人を喜んで助けるためにここに投稿してください。この問題に取り組んでいる間、Googleのproto-bufsには、これを行うための単純なフラグが付属していると思うでしょう...

    Request* request = [rBuild build];

    [self sendMessage:request];
} 


- (void) sendMessage:(Request *) request {

    //** get length
    NSData* n = [request data];
    uint8_t len = [n length];

    PBCodedOutputStream* os = [PBCodedOutputStream streamWithOutputStream:outputStream];
    //** prepend it to message, such that Request.parseDelimitedFrom(in) can parse it properly
    [os writeRawByte:len];
    [request writeToCodedOutputStream:os];
    [os flush];
}
于 2013-12-13T21:24:14.350 に答える
0

上記のケントンバルダの答えへのコメントとしてこれを書くことは許可されていないので、彼が投稿したコード(および提供されている他の回答)にバグがあると思います。次のコード:

...
google::protobuf::io::CodedInputStream input(rawInput);

// Read the size.
uint32_t size;
if (!input.ReadVarint32(&size)) return false;

// Tell the stream not to read beyond that size.
google::protobuf::io::CodedInputStream::Limit limit =
    input.PushLimit(size);
...

入力からすでに読み取られているvarint32のサイズが考慮されていないため、誤った制限が設定されます。これにより、次のメッセージの一部である可能性のある追加のバイトがストリームから読み取られるため、データの損失/破損が発生する可能性があります。これを正しく処理する通常の方法は、サイズの読み取りに使用されるCodedInputStreamを削除し、ペイロードを読み取るための新しいサイズを作成することです。

...
uint32_t size;
{
  google::protobuf::io::CodedInputStream input(rawInput);

  // Read the size.
  if (!input.ReadVarint32(&size)) return false;
}

google::protobuf::io::CodedInputStream input(rawInput);

// Tell the stream not to read beyond that size.
google::protobuf::io::CodedInputStream::Limit limit =
    input.PushLimit(size);
...
于 2016-04-05T01:59:28.033 に答える
-7

指定された区切り文字を使用して、ストリームから文字列を読み取るためにgetlineを使用できます。

istream& getline ( istream& is, string& str, char delim );

(ヘッダーで定義)

于 2010-02-26T10:20:05.120 に答える