Trevor
Trevor

Reputation: 361

Write and read from a stream asynchronously

I am wanting to write to and read from a stream asynchronously. I am using GCP's storage API, specifically StorageClient.DownloadObjectAsync. It takes a stream which gets written to. I want to pass the bytes that get written to that stream to another API I am calling as they are written.

What I am struggling with is the reading since it seems streams (MemoryStream in this case) does not support simultaneous reading and writing. Is there another option for this?

Here are the pertinent snippets:

Downloader

using (StorageClient sc = StorageClient.Create(cred))
{
    await sc.DownloadObjectAsync(
        "841f1487-d1b3-434d-8848-8a4bb0749b20",
        @"filename",
        stream);
}

Writer

// Use a BinaryReader?
using BinaryReader br = new BinaryReader(stream);
int current = 0;
int total = 0;
do
{
    current = br.Read(buffer, 0, buffer.Length);
    if (current > 0)
    {
        fileStore.WriteFile(out int numberOfBytesWritten, directoryHandle, total, buffer);
        total += current;
    }
} while (current > 0)

Edit - adding additional info

When I try to read while the file is downloading, I only get 1% of the bytes before the BinaryReader stops reading even though the file is fully in memory (and while this file can fit in memory, not all files will be able to).

Upvotes: 2

Views: 1407

Answers (2)

Trevor
Trevor

Reputation: 361

Finally found what I wanted. The answer is using the Pipe class.

It was as simple as creating a new pipe (Pipe pipe = new Pipe()), modifying the downloader to take a PipeWriter instead of just a stream so that I can call pw.CompleteAsync() when I'm done writing (you can still get the stream with pw.AsStream()), and passing in the PipeReader stream with pipe.Reader.AsStream().

Upvotes: 3

Stephen Cleary
Stephen Cleary

Reputation: 456322

Is there another option for this?

The code is currently downloading to a memory stream and then writing it out to disk. If that's all you need to do with it, then passing a file stream instead of a memory stream should be sufficient.

What I am struggling with is the reading since it seems streams (MemoryStream in this case) does not support simultaneous reading and writing.

This is correct. No built-in stream types support simultaneous reads and writes. One reason is that there's a notion of a single "position" that is updated by both reads and writes.

It should be possible to write a concurrent stream type. You'd need to handle concurrent access as well as having two positions rather than one, and some operations might not be supported for concurrent streams. I've thought about writing a type like this a few times, but didn't feel it was sufficiently useful in a world where Pipelines, Channels, Dataflow, and async streams already exist.

So, I'd say:

  1. If possible, just pass it a file stream and get rid of the memory stream completely.
  2. Otherwise, explore the GCP API and see if there's a non-stream-based solution you can use to download.
  3. If the GCP API must download to a stream and your code must do in-memory processing during the save, then you'll need to write a concurrent stream type.

Upvotes: 1

Related Questions