Reputation: 3682
I have a long-running application that basically:
A very common use-case indeed - except both the data size and data rate can be quite large. To avoid overflow of the memory and improve efficiency, I am thinking of a dual buffer design, where buffer A and B alternate: while A is holding networking packet, B is processed for output. Once buffer A reaches a soft bound, A is due for output processing, and B will be used for holding network packets.
I am not particularly experienced on concurrency/multi-thread program paradigm. I have read some past discussion on circular buffer that handle multiple-producer and multiple consumer case. I am not sure if that is the best solution and It seems the dual buffer design is simpler.
My question is: is there a design pattern I can follow to tackle the problem? or better design for that matter? If possible, please use pseudo code to help to illustrate the solution. Thanks.
Upvotes: 0
Views: 623
Reputation: 24907
Find a producer-consumer queue class that works. Use one to create a buffer pool to improve performance and control memory use. Use another to transfer the buffers from the network thread to the disk thread:
#define CnumBuffs 128
#define CbufSize 8182
#define CcacheLineSize 128
public class netBuf{
private char cacheLineFiller[CcacheLineSize]; // anti false-sharing space
public int dataLen;
public char bigBuf[CbufSize];
};
PCqueue pool;
PCqueue diskQueue;
netThread Thread;
diskThread Thread;
pool=new(PCqueue);
diskQueue=new(PCqueue);
// make an object pool
for(i=0;i<CnumBuffs,i++){
pool->push(new(netBuf));
};
netThread=new(netThread);
diskThread=new(diskThread);
netThread->start();
diskThread->start();
..
void* netThread.run{
netbuf *thisBuf;
for(;;){
pool->pop(&thisBuf}; // blocks if pool empty
netBuf->datalen=network.read(&thisBuf.bigBuf,sizeof(thisBuf.bigBuf));
diskQueue->push(thisBuf);
};
};
void* diskThread.run{
fileStream *myFile;
diskBuf *thisBuf;
new myFile("someFolder\fileSpec",someEnumWrite);
for(;;){
diskQueue->pop(&thisBuf}; // blocks until buffer available
myFile.write(&thisBuf.bigBuf,thisBuf.dataLen);
pool->push(thisBuf};
};
};
Upvotes: 0
Reputation: 175
I suggest that you should, instead of assuming "two" (or any fixed number of ...) buffers, simply use a queue, and therefore a "producer/consumer" relationship.
The process that is receiving packets simply adds them to a buffer of some certain size, and, either when the buffer is sufficiently full or a specified (short...) time interval has elapsed, places the (non-empty) buffer onto a queue for processing by the other. It then allocates a new buffer for its own use.
The receiving ("other...") process is woken up any time there might be a new buffer in the queue for it to process. It removes the buffer, processes it, then checks the queue again. It goes to sleep only when it finds that the queue is empty. (Use care to be sure that the process cannot decide to go to sleep at the precise instant that the other process decides to signal it... there must be no "race condition" here.)
Consider simply allocating storage "per-message" (whatever a "message" may mean to you), and putting that "message" onto the queue, so that there is no unnecessary delay in processing caused by "waiting for a buffer to fill up."
Upvotes: 2
Reputation: 18717
It might be worth mentioning a technique used in real-time audio processing/recording, which uses a single ring buffer (or fifo if you prefer that term) of sufficient size can be used for this case.
You will need then a read and write cursor. (Whether you actually need a lock or can do with volatile plus memory barriers is a touchy subject, but the people at portaudio suggest you do this without locks if performance is important.)
You can use one thread to read and another thread to write. The read thread should consume as much of the buffer as possible. You will be safe unless you run out of buffer space, but that exists for the dual-buffer solution as well. So the underlying assumption is that you can write to disk faster then the input comes in, or you will need to expand on the solution.
Upvotes: 1