Reputation: 2696
First of all I should mention that I have no internal Stream
object available. Instead of that, I do have this object:
public interface IChannel
{
void Send(byte[] data);
event EventHandler<byte[]> Receive;
}
I want to implement a Stream class, one like this:
public class ChannelStream : Stream
{
private readonly IChannel _channel;
public ChannelStream(IChannel channel)
{
this._channel = channel;
}
// TODO: Implement Stream class
}
The functionality I require is very similar to NetworkStream
:
Writing bytes to my stream should add these bytes to a buffer and call _channel.Send
once Flush()
is called.
The Stream will also listen to _channel.Receive
events and add the bytes to another internal buffer until they are read from the stream. If the Stream doesn't have any data available, it should block until new data becomes available.
I am however struggling with the implementation. I have experimented with internally using two MemoryStream
s but this caused the buffer to keep eating more and more ram.
What kind of collection / stream can I use to implement my stream?
Upvotes: 1
Views: 3549
Reputation: 16584
Consider what you need from the collection and go from there.
Here are a few questions you should consider when you need a collection of some sort:
Do you need random access to the items in the collection?
Is the collection going to be accessed by multiple threads?
Do you need to retain the data in the collection after it is read?
Is ordering important? If so, what order - add order, reverse add order, item ordering by some comparison?
For the output buffer in this case the answers are no, yes, no and yes: add order. Which pretty much singles out the ConcurrentQueue
class. This allows you to add objects from a source or sources that do not need to be in the same thread as the code that is reading them back out. It doesn't let you arbitrarily index the collection (well, not directly anyway), which you don't appear to need.
I'd use the same type for the input buffer, with a 'current block' buffer to hold the most recently read buffer, wrapped in some simple object locking semantics to handle any threading issues.
The output section looks something like this:
// Output buffer
private readonly ConcurrentQueue<byte[]> _outputBuffer = new ConcurrentQueue<byte[]>();
public override void Write(byte[] buffer, int offset, int count)
{
// Copy written data to new buffer and add to output queue
byte[] data = new byte[count];
Buffer.BlockCopy(buffer, offset, data, 0, count);
_outputBuffer.Enqueue(data);
}
public override void Flush()
{
// pull everything out of the queue and send to wherever it is going
byte[] curr;
while (_outputBuffer.TryDequeue(out curr))
internalSendData(curr);
}
The internalSendData
method is where the data would then go out to the network.
The read buffering is a little more complex:
// collection to hold unread input data
private readonly ConcurrentQueue<byte[]> _inputBuffer = new ConcurrentQueue<byte[]>();
// current data block being read from
private byte[] _inputCurrent = null;
// read offset in current block
private short _inputPos = 0;
// object for locking access to the above.
private readonly object _inputLock = new object();
public override int Read(byte[] buffer, int offset, int count)
{
int readCount = 0;
lock(_inputLock)
{
while (count > 0)
{
if (_inputCurrent == null || _inputCurrent.Length <= _inputPos)
{
// read next block from input buffer
if (!_inputBuffer.TryDequeue(out _inputCurrent))
break;
_inputPos = 0;
}
// copy bytes to destination
int nBytes = Math.Min(count, _inputCurrent.Length - _inputPos);
Buffer.BlockCopy(_inputCurrent, _inputPos, buffer, offset, nBytes);
// adjust all the offsets and counters
readCount += nBytes;
offset += nBytes;
count -= nBytes;
_inputPos += (short)nBytes;
}
}
return readCount;
}
Hopefully that makes sense.
Using queues for this soft of buffering means that the data is only held in memory for as long as they are delayed being sent or read. Once you call Flush
the output buffer's memory is released for garbage collection, so you don't have to worry about memory blowouts unless you are trying to send a lot faster than the actual transport mechanism can handle. But if you're queuing up several megabytes of data every second to go out over an ADSL connection, nothing is going to save you :P
I'd add a few refinements to the above, like some checks to make sure that Flush
gets called automatically once the buffer is at a reasonable level.
Upvotes: 3