Sergei Rodionov
Sergei Rodionov

Reputation: 4529

Timeout-based BufferedWriter flush

I'm using BufferedWriter with the default size of 8192 characters to write lines to a local file. The lines are read from socket inputstream using BufferedReader readLine method, blocking I/O.

Average line length is 50 characters. It all works well and fast enough (over 1 mln lines per second) however if the client stops writing, lines that are currently stored in BufferedWriter buffer won't be flushed to disk. In fact the buffered characters won't be flushed to disk until the client resumes writing or the connection is closed. This translates into a delay between the time line is transmitted by client and the time this line is committed to file, so long-tail latency goes up.

Is there a way to flush incomplete BufferedWriter buffer on timeout, e.g. within 100 milliseconds?

Upvotes: 1

Views: 2117

Answers (4)

user207421
user207421

Reputation: 310903

Don't try this complex scheme, it's too hard. Just reduce the size of the buffer, by specifying it when constructing the BufferedWriter. Reduce it till you find the balance between performance and latency that you need.

Upvotes: 0

David Ehrmann
David Ehrmann

Reputation: 7576

What about something like this? It's not a real BufferedWriter, but it's a Writer. It works by periodically checking on on the last writer to the underlying, hopefully unbuffered writer, then flushing the BufferedWriter if it's been longer than the timeout.

public class PeriodicFlushingBufferedWriter extends Writer {

  protected final MonitoredWriter monitoredWriter;
  protected final BufferedWriter writer;

  protected final long timeout;
  protected final Thread thread;

  public PeriodicFlushingBufferedWriter(Writer out, long timeout) {
    this(out, 8192, timeout);
  }

  public PeriodicFlushingBufferedWriter(Writer out, int sz, final long timeout) {
    monitoredWriter = new MonitoredWriter(out);
    writer = new BufferedWriter(monitoredWriter, sz);

    this.timeout = timeout;

    thread = new Thread(new Runnable() {
      @Override
      public void run() {
        long deadline = System.currentTimeMillis() + timeout;
        while (!Thread.interrupted()) {
          try {
            Thread.sleep(Math.max(deadline - System.currentTimeMillis(), 0));
          } catch (InterruptedException e) {
            return;
          }

          synchronized (PeriodicFlushingBufferedWriter.this) {
            if (Thread.interrupted()) {
              return;
            }

            long lastWrite = monitoredWriter.getLastWrite();

            if (System.currentTimeMillis() - lastWrite >= timeout) {
              try {
                writer.flush();
              } catch (IOException e) {
              }
            }

            deadline = lastWrite + timeout;
          }
        }
      }
    });

    thread.start();
  }

  @Override
  public synchronized void write(char[] cbuf, int off, int len) throws IOException {
    this.writer.write(cbuf, off, len);
  }

  @Override
  public synchronized void flush() throws IOException {
    this.writer.flush();
  }

  @Override
  public synchronized void close() throws IOException {
    try {
      thread.interrupt();
    } finally {
      this.writer.close();
    }
  }

  private static class MonitoredWriter extends FilterWriter {

    protected final AtomicLong lastWrite = new AtomicLong();

    protected MonitoredWriter(Writer out) {
      super(out);
    }

    @Override
    public void write(int c) throws IOException {
      lastWrite.set(System.currentTimeMillis());
      super.write(c);
    }

    @Override
    public void write(char[] cbuf, int off, int len) throws IOException {
      lastWrite.set(System.currentTimeMillis());
      super.write(cbuf, off, len);
    }

    @Override
    public void write(String str, int off, int len) throws IOException {
      lastWrite.set(System.currentTimeMillis());
      super.write(str, off, len);
    }

    @Override
    public void flush() throws IOException {
      lastWrite.set(System.currentTimeMillis());
      super.flush();
    }

    public long getLastWrite() {
      return this.lastWrite.get();
    }
  }
}

Upvotes: 3

jatal
jatal

Reputation: 840

You might apply Observer, Manager, and Factory patterns here and have a central BufferedWriterManager produce your BufferedWriters and maintain a list of active instances. An internal thread might wake periodically and flush the active instances. This might also be an opportunity for Weak references so there is no requirement for your consumers to explicitly free the object. Instead, the GC will do the work and your Manager simply needs to handle the case when its internal reference becomes null (i.e. when all strong references are dropped).

Upvotes: 0

Alex
Alex

Reputation: 4473

@copeg is right - flush it after every line. It is easy to flush it at time period but what is the sense to have only half record and not be able to proceed it?

Upvotes: 0

Related Questions