Reputation: 4529
I'm using BufferedWriter
with the default size of 8192 characters to write lines to a local file. The lines are read from socket inputstream using BufferedReader
readLine method, blocking I/O.
Average line length is 50 characters. It all works well and fast enough (over 1 mln lines per second) however if the client stops writing, lines that are currently stored in BufferedWriter
buffer won't be flushed to disk. In fact the buffered characters won't be flushed to disk until the client resumes writing or the connection is closed. This translates into a delay between the time line is transmitted by client and the time this line is committed to file, so long-tail latency goes up.
Is there a way to flush incomplete BufferedWriter
buffer on timeout, e.g. within 100 milliseconds?
Upvotes: 1
Views: 2117
Reputation: 310903
Don't try this complex scheme, it's too hard. Just reduce the size of the buffer, by specifying it when constructing the BufferedWriter.
Reduce it till you find the balance between performance and latency that you need.
Upvotes: 0
Reputation: 7576
What about something like this? It's not a real BufferedWriter
, but it's a Writer
. It works by periodically checking on on the last writer to the underlying, hopefully unbuffered writer, then flushing the BufferedWriter
if it's been longer than the timeout.
public class PeriodicFlushingBufferedWriter extends Writer {
protected final MonitoredWriter monitoredWriter;
protected final BufferedWriter writer;
protected final long timeout;
protected final Thread thread;
public PeriodicFlushingBufferedWriter(Writer out, long timeout) {
this(out, 8192, timeout);
}
public PeriodicFlushingBufferedWriter(Writer out, int sz, final long timeout) {
monitoredWriter = new MonitoredWriter(out);
writer = new BufferedWriter(monitoredWriter, sz);
this.timeout = timeout;
thread = new Thread(new Runnable() {
@Override
public void run() {
long deadline = System.currentTimeMillis() + timeout;
while (!Thread.interrupted()) {
try {
Thread.sleep(Math.max(deadline - System.currentTimeMillis(), 0));
} catch (InterruptedException e) {
return;
}
synchronized (PeriodicFlushingBufferedWriter.this) {
if (Thread.interrupted()) {
return;
}
long lastWrite = monitoredWriter.getLastWrite();
if (System.currentTimeMillis() - lastWrite >= timeout) {
try {
writer.flush();
} catch (IOException e) {
}
}
deadline = lastWrite + timeout;
}
}
}
});
thread.start();
}
@Override
public synchronized void write(char[] cbuf, int off, int len) throws IOException {
this.writer.write(cbuf, off, len);
}
@Override
public synchronized void flush() throws IOException {
this.writer.flush();
}
@Override
public synchronized void close() throws IOException {
try {
thread.interrupt();
} finally {
this.writer.close();
}
}
private static class MonitoredWriter extends FilterWriter {
protected final AtomicLong lastWrite = new AtomicLong();
protected MonitoredWriter(Writer out) {
super(out);
}
@Override
public void write(int c) throws IOException {
lastWrite.set(System.currentTimeMillis());
super.write(c);
}
@Override
public void write(char[] cbuf, int off, int len) throws IOException {
lastWrite.set(System.currentTimeMillis());
super.write(cbuf, off, len);
}
@Override
public void write(String str, int off, int len) throws IOException {
lastWrite.set(System.currentTimeMillis());
super.write(str, off, len);
}
@Override
public void flush() throws IOException {
lastWrite.set(System.currentTimeMillis());
super.flush();
}
public long getLastWrite() {
return this.lastWrite.get();
}
}
}
Upvotes: 3
Reputation: 840
You might apply Observer, Manager, and Factory patterns here and have a central BufferedWriterManager produce your BufferedWriters and maintain a list of active instances. An internal thread might wake periodically and flush the active instances. This might also be an opportunity for Weak references so there is no requirement for your consumers to explicitly free the object. Instead, the GC will do the work and your Manager simply needs to handle the case when its internal reference becomes null (i.e. when all strong references are dropped).
Upvotes: 0
Reputation: 4473
@copeg is right - flush it after every line. It is easy to flush it at time period but what is the sense to have only half record and not be able to proceed it?
Upvotes: 0