5

大量のデータをストリーミングするための WCF サービスを開発しているため、 protobuf-netシリアル化と組み合わせたWCF ストリーミング機能を使用することにしました。

環境:

一般的には、サービス内のオブジェクトをシリアル化し、それらをストリームに書き込んで送信するというアイデアがあります。一方、呼び出し元は Stream オブジェクトを受け取り、すべてのデータを読み取ることができます。

したがって、現在、サービス メソッドのコードは次のようになっています。

public Result TestMethod(Parameter parameter)
{
    // Create response
    var responseObject = new BusinessResponse { Value = "some very large data"};

    // The resposne have to be serialized in advance to intermediate MemoryStream
    var stream = new MemoryStream();
    serializer.Serialize(stream, responseObject);
    stream.Position = 0;

    // ResultBody is a stream, Result is a MessageContract
    return new Result {ResultBody = stream};
}

BusinessResponse オブジェクトは MemoryStream にシリアル化され、メソッドから返されます。クライアント側では、呼び出しコードは次のようになります。

var parameter = new Parameter();

// Call the service method
var methodResult = channel.TestMethod(parameter);

// protobuf-net deserializer reads from a stream received from a service.
// while reading is performed by protobuf-net, 
// on the service side WCF is actually reading from a 
// memory stream where serialized message is stored
var result = serializer.Deserialize<BusinessResponse>(methodResult.ResultBody);
return result;

したがって、serializer.Deserialize()が呼び出されると、 stream から読み取りますmethodResult.ResultBody。同時に、サービス側で WCF は、 から返された MemoryStream を読み取りTestMethodます。

問題:

MemoryStream私たちが達成したいことは、サービス側でオブジェクト全体の最初のシリアル化を一度に取り除くことです。ストリーミングを使用しているため、送信前にシリアル化されたオブジェクトをメモリに保持することは避けたいと考えています。

考え:

TestMethod()完全な解決策は、シリアル化されるオブジェクト (私の例では 'BusinessResponse' オブジェクト) への参照を使用して、空のカスタムメイドの Stream オブジェクト (から) を返すことです。したがって、WCF がRead()ストリームのメソッドを呼び出すときは、protobuf-net を使用してオブジェクトの一部を内部でシリアル化し、メモリに保存せずに呼び出し元に返します。

ここで問題が発生します。実際に必要なのは、ストリームが読み取られる瞬間にオブジェクトを 1 つずつシリアル化する可​​能性があるためです。これはシリアライゼーションのまったく異なる方法であることを理解しています。オブジェクトをシリアライザーにプッシュする代わりに、シリアライズされたコンテンツを少しずつ要求したいと思います。

そのようなシリアル化は、protobuf-net を使用して何とか可能ですか?

4

1 に答える 1

2

おそらくマークのゲートのアイデアに沿ったコードをいくつか作成しました。

public class PullStream : Stream
{
    private byte[] internalBuffer;
    private bool ended;
    private static ManualResetEvent dataAvailable = new ManualResetEvent(false);
    private static ManualResetEvent dataEmpty = new ManualResetEvent(true);

    public override bool CanRead
    {
        get { return true; }
    }

    public override bool CanSeek
    {
        get { return false; }
    }

    public override bool CanWrite
    {
        get { return true; }
    }

    public override void Flush()
    {
        throw new NotImplementedException();
    }

    public override long Length
    {
        get { throw new NotImplementedException(); }
    }

    public override long Position
    {
        get
        {
            throw new NotImplementedException();
        }
        set
        {
            throw new NotImplementedException();
        }
    }

    public override int Read(byte[] buffer, int offset, int count)
    {
        dataAvailable.WaitOne();
        if ( count >= internalBuffer.Length)
        {
            var retVal = internalBuffer.Length;
            Array.Copy(internalBuffer, buffer, retVal);
            internalBuffer = null;
            dataAvailable.Reset();
            dataEmpty.Set();
            return retVal;
        }
        else
        {
            Array.Copy(internalBuffer, buffer, count);
            internalBuffer = internalBuffer.Skip(count).ToArray(); // i know
            return count;
        }
    }

    public override long Seek(long offset, SeekOrigin origin)
    {
        throw new NotImplementedException();
    }

    public override void SetLength(long value)
    {
        throw new NotImplementedException();
    }

    public override void Write(byte[] buffer, int offset, int count)
    {
        dataEmpty.WaitOne();
        dataEmpty.Reset();

        internalBuffer = new byte[count];
        Array.Copy(buffer, internalBuffer, count);

        Debug.WriteLine("Writing some data");

        dataAvailable.Set();
    }

    public void End()
    {
        dataEmpty.WaitOne();
        dataEmpty.Reset();

        internalBuffer = new byte[0];

        Debug.WriteLine("Ending writes");

        dataAvailable.Set();
    }
}

これは、Read と Write (および End) のみを実装する単純なストリームの子孫クラスです。データが利用できない間は読み取りがブロックされ、データが利用可能な間は書き込みがブロックされます。この方法では、関与するバイト バッファは 1 つだけです。残りの linq コピーは最適化のために開かれています ;-) End メソッドが追加されているため、使用可能なデータがなく、データが書き込まれないときに Read が実行される場所でブロックが発生しません。

別のスレッドからこのストリームに書き込む必要があります。これを以下に示します。

    // create a large object
    var obj = new List<ToSerialize>();
    for(int i = 0; i <= 1000; i ++)
        obj.Add(new ToSerialize { Test = "This is my very loooong message" });
    // create my special stream to read from
    var ms = new PullStream();
    new Thread(x =>
    {
        ProtoBuf.Serializer.Serialize(ms, obj);
        ms.End();
    }).Start();
    var buffer = new byte[100];
    // stream to write back to (just to show deserialization is working too)
    var ws = new MemoryStream();
    int read;
    while ((read = ms.Read(buffer, 0, 100)) != 0)
    {
        ws.Write(buffer, 0, read);
        Debug.WriteLine("read some data");
    }
    ws.Position = 0;
    var back = ProtoBuf.Serializer.Deserialize<List<ToSerialize>>(ws);

これで問題が解決することを願っています :-) とにかく、これをコーディングするのは楽しかったです。

よろしく、ジャッコ

于 2013-03-13T21:52:21.390 に答える