David S.
David S.

Reputation: 6105

How to use one StreamWriter to write to multiple underlying streams?

I am writing text to a System.IO.StreamWriter.

The underlying stream (specified in new StreamWriter(underlyingStream)) writes to a remote file. (I don't think its type is relevant, but for completeness' sake I'll mention it's a microsoft azure CloudBlobStream underlyingStream).

Now, I wanted to extend this by also writing an additional compressed file with the same content, but by using GZipOutputStream compressedUnderlyingStream between the StreamWriter and a second CloudBlobStream.

I was looking for a way to specify both of the CloudBlobStreams as underlying streams of the StreamWriter. But I could not find a way. Does there exist some stream type that can combine two underlying streams? Or how can I otherwise approach this? Note I want everything to stay as streams to minimize amount of data in memory.

//disregard lack of using statements for this example's sake
CloudBlobStream blobStream = blob.OpenWrite();
CloudBlobStream compressedBlobStream = compressedBlob.OpenWrite();
GZipOutputStream compressorStream = new GZipOutputStream(compressedBlobStream);
//I make the streamwriter here for the regular blob,
///but how could I make the same streamwriter also write to the compressedBlob at the same time?
TextWriter streamWriter = new StreamWriter(blobStream, Encoding.UTF8);

Upvotes: 3

Views: 4245

Answers (2)

angularsen
angularsen

Reputation: 8668

MultiWriteStream takes a primary stream, and one or more secondary streams.

Writes and seeks go to all streams, but reads and getting current length or position go to the primary stream.

This helped us solve response logging in ASP.NET Core, by writing to both the response body (primary) and a temporary MemoryStream leased from RecyclableMemoryStreamManager (secondary), since the response body stream cannot be read out of the box.

#nullable enable
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace MyNamespace;

/// <summary>
///     Write to multiple streams by forwarding writes, reads and seeks.<br />
///     <br />
///     Reads go to the primary stream.<br />
///     Writes and seeks go to all the streams.<br />
///     <see cref="Stream.CanWrite" /> must be true for all streams.<br />
///     <see cref="Stream.CanSeek" /> must be true for secondary streams, if the primary stream is seekable.<br />
/// </summary>
/// <remarks>
///     Legacy async methods are not implemented, such as <see cref="Stream.BeginWrite"/> and <see cref="Stream.EndWrite"/>.
/// </remarks>
public sealed class MultiWriteStream : Stream
{
    private readonly Stream _primaryStream;
    private readonly List<Stream> _secondaryStreams;
    private readonly List<Stream> _streams;

    public MultiWriteStream(Stream primaryStream, Stream[] secondaryStreams)
    {
        _primaryStream = primaryStream ?? throw new ArgumentNullException(nameof(primaryStream));
        if (secondaryStreams.Length == 0) throw new ArgumentException("At least one secondary stream is required");

        if (!primaryStream.CanWrite) throw new ArgumentException("Primary stream must be writable.", nameof(primaryStream));
        if (secondaryStreams.Any(ss => !ss.CanWrite)) throw new ArgumentException("Secondary streams must be writable.", nameof(secondaryStreams));

        if (primaryStream.CanSeek && secondaryStreams.Any(ss => !ss.CanSeek))
            throw new ArgumentException($"The primary was seekable, but one of the secondary streams was not.", nameof(secondaryStreams));

        _streams = [primaryStream, ..secondaryStreams];
        _secondaryStreams = secondaryStreams.ToList();

        CanSeek = primaryStream.CanSeek;
        CanRead = primaryStream.CanRead;
        CanWrite = primaryStream.CanWrite;
    }

    public override bool CanRead { get; }
    public override bool CanSeek { get; }
    public override bool CanWrite { get; }

    public override long Length => _primaryStream.Length;

    public override long Position
    {
        get => _primaryStream.Position;
        set { foreach (Stream stream in _streams) stream.Position = value; }
    }

    public override bool CanTimeout => _primaryStream.CanTimeout;
    public override int ReadTimeout => _primaryStream.ReadTimeout;
    public override int WriteTimeout => _primaryStream.WriteTimeout;

    public override void CopyTo(Stream destination, int bufferSize)
    {
        _primaryStream.CopyTo(destination, bufferSize);
    }

    public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken)
    {
        return _primaryStream.CopyToAsync(destination, bufferSize, cancellationToken);
    }

    public override void Close()
    {
        foreach (Stream stream in _streams)
            stream.Close();

        base.Close();
    }

    protected override void Dispose(bool disposing)
    {
        if (disposing)
        {
            foreach (Stream stream in _streams)
                stream.Dispose();
        }

        base.Dispose(disposing);
    }

    public override async ValueTask DisposeAsync()
    {
        foreach (Stream stream in _streams)
            await stream.DisposeAsync().ConfigureAwait(true);

        await base.DisposeAsync().ConfigureAwait(false);
    }

    public override void Flush()
    {
        foreach (Stream stream in _streams) stream.Flush();
    }

    public override async Task FlushAsync(CancellationToken cancellationToken)
    {
        foreach (Stream stream in _streams)
            await stream.FlushAsync(cancellationToken).ConfigureAwait(true);
    }

    public override int Read(byte[] buffer, int offset, int count)
    {
        return _primaryStream.Read(buffer, offset, count);
    }

    public override int Read(Span<byte> buffer)
    {
        return _primaryStream.Read(buffer);
    }

    public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
    {
        return _primaryStream.ReadAsync(buffer, offset, count, cancellationToken);
    }

    public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = new CancellationToken())
    {
        return _primaryStream.ReadAsync(buffer, cancellationToken);
    }

    public override int ReadByte()
    {
        return _primaryStream.ReadByte();
    }


    public override long Seek(long offset, SeekOrigin origin)
    {
        foreach (Stream stream in _secondaryStreams)
            stream.Seek(offset, origin);

        return _primaryStream.Seek(offset, origin);
    }

    public override void SetLength(long value)
    {
        foreach (Stream stream in _streams)
            stream.SetLength(value);
    }

    public override void Write(byte[] buffer, int offset, int count)
    {
        foreach (Stream stream in _streams)
            stream.Write(buffer, offset, count);
    }

    public override void Write(ReadOnlySpan<byte> buffer)
    {
        foreach (Stream stream in _streams)
            stream.Write(buffer);
    }

    public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
    {
        foreach (Stream stream in _streams)
            await stream.WriteAsync(buffer, offset, count, cancellationToken).ConfigureAwait(true);
    }

    public override async ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = new())
    {
        foreach (Stream stream in _streams)
            await stream.WriteAsync(buffer, cancellationToken).ConfigureAwait(true);
    }

    public override void WriteByte(byte value)
    {
        foreach (Stream stream in _streams)
            stream.WriteByte(value);
    }
}

Upvotes: 0

PC Luddite
PC Luddite

Reputation: 6088

I can't think of one off the top of my head, but it would be trivial to write your own:

class MultiStream : Stream
{
    private List<Stream> streams;

    public Streams(IEnumerable<Stream> streams)
    {
        this.streams = new List<Stream>(streams);
    }

    ...

    public override void Write(byte[] buffer, int offset, int count)
    {
        foreach(Stream stream in streams)
            stream.Write(buffer, offset, count);
    }

    ...
}

Edit: Thank you to tenbits who has completed the implementation of the above class. Because it is neither my work nor something I have personally tested, I have only provided a link and will leave the original answer unchanged.

Upvotes: 6

Related Questions