Synchronizing File Writes
Assuming we have a simple task to write events to a file, one event after another. Events can be written from multiple threads.
public interface EventLog {
public final class Event{
public final UUID id;
public final ZonedDateTime created;
public final String message;
public Event(UUID id, ZonedDateTime created, String message) {
this.id = id;
this.created = created;
this.message = message;
}
}
void recordEvent(Event event);
// Null Logger doing nothing to have a base line
EventLog NULL_LOGGER = new NullLogger();
class NullLogger implements EventLog {
@Override
public void recordEvent(Event event) {
// do nothing;
}
@Override
public void close() throws Exception {
}
}
}
In this blog post, I write a burst of events from different threads. Every few milliseconds a few hundred events are written. Then I record the latency processing these events. I’m using the HdrHistogram library to do that.
I did my test runs on a 16-core Digital Ocean cloud machine in OpenJDK 13. Do not trust any number. You need to benchmark for your specific scenario, on your hardware and you might want to use a benchmark framework like JMH. Anyway, for my small blog post, I’m rolling with my quick and dirty measurements of warming up for 2 minutes and then measure for 1 minute.
while(isRunning){
// Random event burst once every few milliseconds
try {
Thread.sleep(1+rnd.nextInt(5));
} catch (InterruptedException e) {
isRunning = false;
Thread.currentThread().interrupt();
}
// Record the time used to write a few hundred events
var start = System.nanoTime();
eventBurst(rnd);
var time = System.nanoTime()-start;
histogram.recordValue(time);
}
// Simulate writing a burst of events
private void eventBurst(Random rnd) {
for (int i = 0; i < 200; i++) {
byte[] data = new byte[96];
rnd.nextBytes(data);
var dataBase64 = Base64.getEncoder().encodeToString(data);
var time = System.currentTimeMillis();
var eventNo = time + "." + i;
var timestamp = ZonedDateTime.now(ZoneOffset.UTC);
logger.recordEvent(new EventLog.Event(UUID.randomUUID(), timestamp,
"EventNo: " + eventNo + " Data:" + dataBase64 ));
logger.recordEvent(new EventLog.Event(UUID.randomUUID(), timestamp,
"EventNo: " + eventNo + " Data: Small " + rnd.nextLong()));
logger.recordEvent(new EventLog.Event(UUID.randomUUID(),
timestamp, "EventNo: " + eventNo + " Data: medium " + dataBase64.substring(0, 48)));
}
}
Simple Synchronized Writes
Let’s start with a straight forward implementation. We write the event to a file. To coordinate between threads we use a plain lock with the synchronized keyword.
LockedWriter.java
public class LockedWriter implements EventLog {
private final Writer stream;
public LockedWriter(String file) throws Exception {
this.stream = Files.newBufferedWriter(Paths.get(file), StandardCharsets.UTF_8,
StandardOpenOption.CREATE, StandardOpenOption.WRITE) ;
}
@Override
public synchronized void recordEvent(Event event) {
try {
var date = DateTimeFormatter.ISO_DATE_TIME.format(event.created);
this.stream.append(date);
this.stream.append(' ');
this.stream.append(event.id.toString());
this.stream.append(' ');
this.stream.append(event.message);
this.stream.append('\n');
} catch (IOException e) {
e.printStackTrace();
}
}
// ...
}
Here are my latency numbers and latency graph:
#[Mean = 19.843, StdDeviation = 8.568] #[Max = 241.566, Total count = 41896] #[Buckets = 18, SubBuckets = 2048]
Synchronize Writes
Let’s try to do better. The simple implementation converted the object into bytes while holding the lock. Instead of that long, we covert the message and only lock the stream for the final write:
public class LockedStream implements EventLog {
// The final output stream, writes are synchronized on it.
private final OutputStream stream;
// A thread local buffer to assemble the messages. Avoiding allocating a buffer for every invocation.
private final ThreadLocal<Buffer> buffer = ThreadLocal.withInitial(Buffer::new);
static class Buffer {
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
OutputStreamWriter writer = new OutputStreamWriter(bytes);
}
public LockedStream(String file) throws Exception {
this.stream = new BufferedOutputStream(new FileOutputStream(file));
}
@Override
public void recordEvent(Event event) {
Buffer buffer = this.buffer.get();
var bytes = buffer.bytes;
var writer = buffer.writer;
try {
addToBuffer(event, writer);
synchronized (this.stream){
bytes.writeTo(this.stream);
}
bytes.reset();
} catch (IOException e) {
e.printStackTrace();
}
}
private void addToBuffer(Event event, Writer writer) throws IOException {
var date = DateTimeFormatter.ISO_DATE_TIME.format(event.created);
writer.append(date);
writer.append(' ');
writer.append(event.id.toString());
writer.append(' ');
writer.append(event.message);
writer.append("\n");
// Flush to ensure the bytes are written to the buffer
writer.flush();
}
//...
}
This change is a good improvement in the tail latency. Probably because we do get as much contention on the lock.
#[Mean = 17.440, StdDeviation = 3.846] #[Max = 62.194, Total count = 46795] #[Buckets = 16, SubBuckets = 2048]
Lock-Free Writes
So far we locked around the writes. We actually can remove the locks entirely. Instead, we get a write position by increasing the write position atomically and then write to that position.
public class AtomicPositionWrites implements EventLog {
private final FileChannel file;
// A thread local buffer to assemble the messages. Avoiding allocating a buffer for every invocation.
private final ThreadLocal<Buffer> buffer = ThreadLocal.withInitial(Buffer::new);
// Position where the next write is placed.
private final AtomicLong writePos = new AtomicLong();
public AtomicPositionWrites(String file) throws IOException {
this.file = FileChannel.open(Paths.get(file), StandardOpenOption.CREATE, StandardOpenOption.WRITE);
}
static class Buffer {
ByteArrayBuffer bytes = new ByteArrayBuffer();
OutputStreamWriter writer = new OutputStreamWriter(bytes);
}
@Override
public void recordEvent(Event event) {
var buffer = this.buffer.get();
try {
var byteBuf = buffer.bytes;
addToBuffer(event, buffer.writer);
var pos = writePos.getAndAdd(byteBuf.size());
file.write(byteBuf.toByteBuffer(), pos);
byteBuf.reset();
} catch (IOException e) {
e.printStackTrace();
}
}
//...
}
Indeed the worst-case latency got better. However, the average latency is slightly higher.
#[Mean = 18.896, StdDeviation = 2.133] #[Max = 36.209, Total count = 43633] #[Buckets = 16, SubBuckets = 2048]
Buffering the Writes
With the switch to the atomic position writes we also dropped be buffering from the BufferedOutputStream. The implementation does a write for every little bit of data. That creates tons of syscalls and might add to the latency. Let’s try to add the buffering back:
public class BufferedWrites implements EventLog {
// How much data is buffered before it's written to disk
private final static int BlockSize = 8*1024;
private final FileChannel channel;
// Keeps a list of buffers created by each thread, to flush pending data on close.
// Synchronized accesss!
private final ArrayList<Buffer> buffers = new ArrayList<>();
// A thread local buffer to assemble the messages. Avoiding allocating a buffer for every invocation.
private final ThreadLocal<Buffer> buffer = ThreadLocal.withInitial(()->{
var b = new Buffer();
synchronized (buffers){
buffers.add(b);
}
return b;
});
// Position where the next write is placed.
private final AtomicLong writePos = new AtomicLong();
public BufferedWrites(String file) throws IOException {
this.channel = FileChannel.open(Paths.get(file), StandardOpenOption.CREATE, StandardOpenOption.WRITE);
}
static class Buffer{
ByteArrayBuffer bytes = new ByteArrayBuffer();
OutputStreamWriter writer = new OutputStreamWriter(bytes);
}
@Override
public void recordEvent(Event event) {
var bf = this.buffer.get();
try {
var byteBuf = bf.bytes;
addToBuffer(event, bf.writer);
if(byteBuf.size() > BlockSize){
write(byteBuf);
}
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void close() throws Exception {
// On close flush the buffers first
synchronized (buffers){
for (Buffer buff : buffers) {
write(buff.bytes);
}
}
this.channel.close();
}
private void write(ByteArrayBuffer byteBuf) throws IOException {
var pos = writePos.getAndAdd(byteBuf.size());
channel.write(byteBuf.toByteBuffer(), pos);
byteBuf.reset();
}
// ...
}
Buffering reduces the median latency. However, it creates higher latency outlier because when a write happens more data is written.
#[Mean = 11.851, StdDeviation = 4.852] #[Max = 56.820, Total count = 64255] #[Buckets = 16, SubBuckets = 2048]
Background Writes
We might avoid the latency spikes of the bulk write by doing it in the background. When a buffer is full we queue it. A background thread takes filled buffers of the queue and writes it out. That way the application threads do not wait for the writes.
BufferedWritesOffThread.java
public class BufferedWritesOffThread implements EventLog {
// How much data is buffered before it's written to disk
private final static int BlockSize = 8*1024;
// Background thread writing to disk
private final WriterThread writer;
// Filled buffers ready to be written by the WriterThread
private final ConcurrentLinkedQueue<Buffer> complete = new ConcurrentLinkedQueue<>();
// After writing a buffer it is cleared and can be reused to avoid allocation of tons of buffers
private final ConcurrentLinkedQueue<Buffer> free = new ConcurrentLinkedQueue<>();
// Buffers which have some data in it and are not yet full. The close method will write these buffers out
private final ConcurrentHashMap<Thread, Buffer> dirty = new ConcurrentHashMap<>();
public BufferedWritesOffThread(String file) throws IOException {
var channel = FileChannel.open(Paths.get(file), StandardOpenOption.CREATE, StandardOpenOption.WRITE);
this.writer = new WriterThread(channel);
this.writer.start();
}
static class Buffer{
ByteArrayBuffer bytes = new ByteArrayBuffer();
OutputStreamWriter writer = new OutputStreamWriter(bytes);
}
class WriterThread extends Thread{
private final FileChannel channel;
private volatile boolean isRunning = true;
WriterThread(FileChannel channel) {
this.channel = channel;
}
@Override
public void run() {
try{
while(isRunning){
var buffer = complete.poll();
if(buffer == null){
Thread.sleep(1);
} else{
var bytes = buffer.bytes;
try {
channel.write(bytes.toByteBuffer());
bytes.reset();
free.offer(buffer);
} catch (IOException e) {
e.printStackTrace();
}
}
}
} catch (InterruptedException e){
// Completed
}
}
public void close() throws IOException{
this.isRunning = false;
try {
this.join();
this.channel.close();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
@Override
public void recordEvent(Event event) {
var bf = dirty.computeIfAbsent(Thread.currentThread(), (t)->newBuffer());
try {
var byteBuf = bf.bytes;
addToBuffer(event, bf.writer);
if(byteBuf.size() > BlockSize){
complete.offer(bf);
this.dirty.put(Thread.currentThread(), newBuffer());
}
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void close() throws Exception {
for (Buffer buff : dirty.values()) {
complete.offer(buff);
}
this.writer.close();
}
private Buffer newBuffer() {
var buffer = free.poll();
if(buffer == null){
buffer = new Buffer();
}
return buffer;
}
// ...
}
This indeed reduces the latency and brings us back close to the direct writes. The worst-case seems slightly higher, but to verify an actual difference we need a better benchmark.
BufferedWritesOffThread Latency
#[Mean = 11.322, StdDeviation = 4.152] #[Max = 39.682, Total count = 66660] #[Buckets = 16, SubBuckets = 2048]
Final Results
Implementation | Mean ms | Max ms | Std. Deviation | Total count |
---|---|---|---|---|
LockedWriter | 19.843 | 241.566 | 8.568 | 41896 |
LockedStream | 17.440 | 62.194 | 3.846 | 46795 |
AtomicPositionWrites | 18.896 | 36.209 | 2.133 | 43633 |
BufferedWrites | 11.851 | 56.820 | 4.85 | 64255 |
BufferedWritesOffThread | 11.322 | 39.682 | 4.152 | 66660 |
Conclusion
With a tiny bit of though and reducing the lock contention, you can increase the tail latency massively.