kcnygaard
kcnygaard

Reputation: 814

protobuf-net: Serialize a property of type System.IO.Stream without loading the entire stream into memory

protobuf-net cannot serialize the following class because serializing objects of type Stream is not supported:

[ProtoContract]
class StreamObject
{
    [ProtoMember(1)]
    public Stream StreamProperty { get; set; }
}

I know I can work around this by using a serialized property of type byte[] and reading the stream into that property, as in this question. But that requires the entire byte[] to be loaded into memory, which, if the stream is long, can quickly exhaust system resources.

Is there a way to serialize a stream as an array of bytes in protobuf-net without loading the entire sequence of bytes into memory?

Upvotes: 0

Views: 1394

Answers (1)

dbc
dbc

Reputation: 116991

The basic difficulty here isn't protobuf-net, it's the V2 protocol buffer format. There are two ways a repeated element (e.g. a byte array or stream) can be encoded:

  • As a packed repeated element. Here all of the elements of the field are packed into a single key-value pair with wire type 2 (length-delimited). Each element is encoded the same way it would be normally, except without a tag preceding it.

    protobuf-net automatically encodes byte arrays in this format, however doing so requires knowing the total number of bytes in advance. For a byte stream, this might require loading the entire stream into memory (e.g. when StreamProperty.CanSeek == false), which violates your requirements.

  • As a repeated element. Here the encoded message has zero or more key-value pairs with the same tag number.

    For a byte stream, using this format would cause massive bloat in the encoded message as each byte would require an an additional integer key.

As you can see, neither default representation meets your needs. Instead, it makes sense to encode a large byte stream as a sequence of "fairly large" chunks, where each chunk is packed, but the overall sequence is not.

The following version of StreamObject does this:

[ProtoContract]
class StreamObject
{
    public StreamObject() : this(new MemoryStream()) { }

    public StreamObject(Stream stream)
    {
        if (stream == null)
            throw new ArgumentNullException();
        this.StreamProperty = stream;
    }

    [ProtoIgnore]
    public Stream StreamProperty { get; set; }

    internal static event EventHandler OnDataReadBegin;

    internal static event EventHandler OnDataReadEnd;

    const int ChunkSize = 4096;

    [ProtoMember(1, IsPacked = false, OverwriteList = true)]
    IEnumerable<ByteBuffer> Data
    {
        get
        {
            if (OnDataReadBegin != null)
                OnDataReadBegin(this, new EventArgs());

            while (true)
            {
                byte[] buffer = new byte[ChunkSize];
                int read = StreamProperty.Read(buffer, 0, buffer.Length);
                if (read <= 0)
                {
                    break;
                }
                else if (read == buffer.Length)
                {
                    yield return new ByteBuffer { Data = buffer };
                }
                else
                {
                    Array.Resize(ref buffer, read);
                    yield return new ByteBuffer { Data = buffer };
                    break;
                }
            }

            if (OnDataReadEnd != null)
                OnDataReadEnd(this, new EventArgs());
        }
        set
        {
            if (value == null)
                return;
            foreach (var buffer in value)
                StreamProperty.Write(buffer.Data, 0, buffer.Data.Length);
        }
    }
}

[ProtoContract]
struct ByteBuffer
{
    [ProtoMember(1, IsPacked = true)]
    public byte[] Data { get; set; }
}

Notice the OnDataReadBegin and OnDataReadEnd events? I added then in for debugging purposes, to enable checking that the input stream is actually getting streamed into the output protobuf stream. The following test class does this:

internal class TestClass
{
    public void Test()
    {
        var writeStream = new MemoryStream();

        long beginLength = 0;
        long endLength = 0;

        EventHandler begin = (o, e) => { beginLength = writeStream.Length; Console.WriteLine(string.Format("Begin serialization of Data, writeStream.Length = {0}", writeStream.Length)); };
        EventHandler end = (o, e) => { endLength = writeStream.Length;  Console.WriteLine(string.Format("End serialization of Data, writeStream.Length = {0}", writeStream.Length)); };

        StreamObject.OnDataReadBegin += begin;
        StreamObject.OnDataReadEnd += end;

        try
        {
            int length = 1000000;

            var inputStream = new MemoryStream();
            for (int i = 0; i < length; i++)
            {
                inputStream.WriteByte(unchecked((byte)i));
            }
            inputStream.Position = 0;

            var streamObject = new StreamObject(inputStream);

            Serializer.Serialize(writeStream, streamObject);
            var data = writeStream.ToArray();

            StreamObject newStreamObject;
            using (var s = new MemoryStream(data))
            {
                newStreamObject = Serializer.Deserialize<StreamObject>(s);
            }

            if (beginLength >= endLength)
            {
                throw new InvalidOperationException("inputStream was completely buffered before writing to writeStream");
            }

            inputStream.Position = 0;
            newStreamObject.StreamProperty.Position = 0;

            if (!inputStream.AsEnumerable().SequenceEqual(newStreamObject.StreamProperty.AsEnumerable()))
            {
                throw new InvalidOperationException("!inputStream.AsEnumerable().SequenceEqual(newStreamObject.StreamProperty.AsEnumerable())");
            }
            else
            {
                Console.WriteLine("Streams identical.");
            }
        }
        finally
        {
            StreamObject.OnDataReadBegin -= begin;
            StreamObject.OnDataReadEnd -= end;
        }
    }
}

public static class StreamExtensions
{
    public static IEnumerable<byte> AsEnumerable(this Stream stream)
    {
        if (stream == null)
            throw new ArgumentNullException();
        int b;
        while ((b = stream.ReadByte()) != -1)
            yield return checked((byte)b);
    }
}

And the output of the above is:

Begin serialization of Data, writeStream.Length = 0
End serialization of Data, writeStream.Length = 1000888
Streams identical.

Which indicates that the input stream is indeed streamed to the output without being fully loaded into memory at once.

Prototype fiddle.

Is there a mechanism available to write out a packed repeated element incrementally with bytes from a stream, knowing the length in advance?

It appears not. Assuming you have a stream for which CanSeek == true, you could encapsulate it in an IList<byte> that enumerates through the bytes in the stream, provides random access to bytes in the stream, and returns the stream length in IList.Count. There is a sample fiddle here showing such an attempt. Unfortunately, however, ListDecorator.Write() simply enumerates the list and buffers its encoded contents before writing them to the output stream, which causes the input stream to be loaded completely into memory. I think this happens because protobuf-net encodes a List<byte> differently from a byte [], namely as a length-delimited sequence of Base 128 Varints. Since the Varint representation of a byte sometimes requires more than one byte, the length cannot be computed in advance from the list count. See this answer for additional details on the difference in how byte arrays and lists are encoded. It should be possible to implement encoding of an IList<byte> in the same way as a byte [] -- it just isn't currently available.

Upvotes: 2

Related Questions