Redis Clone: Improved IO Control
We left with unsuccessful attempts to improve the IO by only using Virtual Threads and the classing blocking IO classes in the previous post.
This time we are using the NIO network API to get more fined grained control to the IO pattern. As a recap: We try to avoid blocking while flushing each individual response. This way we take advantage of the pipelined requests: We get a bunch of requests from the client, answer all of them and amortize the flush over multiple responses.
Unfortunately, the NIO API is not great. It felt clunky 20 year ago (yes, it was introduced with Java 1.4 in 2002), and feels very clunky by now. There are improved APIs like the AsynchronousSocketChannel added later, but they didn’t quite fit my approach. Generally, I recommend using a library like Netty, Grizzly or others. I actually ended up using the Netty’s ByteBuf library, just to have a better buffer abstraction than Java’s ByteBuffer.
The non-blocking NIO is intended for multiplexing multiple socket operations on that single thread.
That usually ends with some callback / event-driven code.
However, I wanted to keep classic blocking
code style code with the virtual threads.
The NIO API certainly wasn’t designed for that =).
Anyway, onwards with the code.
First, the code is now using the ServerSocketChannel API instead of the ServerSocket API. First, we start with the quite regular accepting of connections and scheduling it on the virtual threads.
var scheduler = Executors.newVirtualThreadPerTaskExecutor();
var socket = ServerSocketChannel.open();
socket.bind(new InetSocketAddress("0.0.0.0", 16379));
System.out.println("App is listening on 0.0.0.0:16379");
var clone = new RedisClone();
while (true) {
var client = socket.accept();
scheduler.execute(() -> {
try (client) {
clone.handleConnection(client);
} catch (Exception e) {
e.printStackTrace();
}
});
}
The main loop operation loop stays the more or less the same, using classic Java blocking style code, and relying on the virtual threads for the multiplexing on actual kernel threads. But we’ll use our special Writer and Reader classes to do the IO operations.
public void handleConnection(SocketChannel socket) throws Exception {
var args = new ArrayList<String>();
// COn
socket.configureBlocking(false);
// Configure the channel to be non-blocking: Our Writer and Reader class will control the blocking
socket.configureBlocking(false);
// Replace the JDK IO streams with our Reader and Writer
var writer = new Writer(socket);
var reader = new Reader(socket, writer);
while (true) {
args.clear();
var line = reader.readLine();
if (line == null)
break;
// Existing code from before
var reply = executeCommand(args);
if (reply == null) {
writer.write("$-1\r\n");
} else {
writer.write("$" + reply.length() + "\r\n" + reply + "\r\n");
}
}
}
The Writer writes the data into a buffer. If enough data accumulated in the buffer, then we write it to the socket, without blocking. Since the answers are small, we already flush after a kilo byte in this example. Plus there is a method to explicilty flush.
class Writer {
final SocketChannel socket;
final ByteBuf writeBuffer = Unpooled.buffer(4 * 1024);
public Writer(SocketChannel socket) throws IOException {
this.socket = socket;
assert !socket.isBlocking();
}
public void write(String toWrite) throws Exception {
var bytes = toWrite.getBytes(UTF_8);
ensureAvailableWriteSpace(writeBuffer);
writeBuffer.writeBytes(bytes);
final var AUTO_FLUSH_LIMIT = 64;
if (AUTO_FLUSH_LIMIT < writeBuffer.readableBytes()) {
// A bit confusing in this use site: We read the buffers content into the channel: aka write to the channel
var written = writeBuffer.readBytes(socket, writeBuffer.readableBytes());
// If we want proper handling of the back pressure by waiting for the channel to be writable.
// But for this example we ignore such concerns and just grow the writeBuffer defiantly
}
}
public void flush() throws IOException {
if (writeBuffer.readableBytes() > 0) {
writeBuffer.readBytes(socket, writeBuffer.readableBytes());
}
}
// The Netty ByteBufs are not circular buffer: Writes always go to the end and may grow the buffer
// I assume the underlying reason is to make it more efficient to interact with Java NIO.
// So, if we're running out of writeable space, discard the bytes already written and
// copy the not yet read bytes to the start of the buffer, giving it enough space to write more at the end.
static int ensureAvailableWriteSpace(ByteBuf buf) {
final var MIN_WRITE_SPACE = 1024;
if (buf.writableBytes() < MIN_WRITE_SPACE) {
buf.discardReadBytes();
}
return Math.max(MIN_WRITE_SPACE, buf.writableBytes());
}
}
The Reader is a bit more elaborate. We check if we find new line characters in our read buffer, and return the lines. Otherwise, we read from the socket into our read buffer. If we can’t read more data, we flush any pending answers and then wait until more data arrives. Here we are using the NIO Selector purely for waiting for more data and not as a multiplexer as intended.
class Reader {
final SocketChannel socket;
final Writer writer;
final ByteBuf readBuffer = Unpooled.buffer(8 * 1024);
private final Selector selector;
public Reader(SocketChannel socket, Writer writer) throws IOException {
this.socket = socket;
this.writer = writer;
this.selector = Selector.open();
socket.register(selector, SelectionKey.OP_READ, this);
}
public String readLine() throws Exception {
var eof = false;
while (!eof) {
var readIndex = readBuffer.readerIndex();
var toIndex = readBuffer.readableBytes();
// Find the next line in the read content
var foundNewLine = readBuffer.indexOf(readIndex, readIndex + toIndex, (byte) '\n');
if (foundNewLine >= 0) {
var length = foundNewLine - readIndex;
String line = readBuffer.toString(readIndex, length - 1, UTF_8);
readBuffer.readerIndex(readIndex + length + 1);
return line;
} else {
// Otherwise, read from the socket
int readSize = ensureAvailableWriteSpace(readBuffer);
// A bit confusing in this use case: We write the content of the socket into the buffer: aka read from the channel
var read = readBuffer.writeBytes(socket, readSize);
if (read < 0) {
eof = true;
} else if (read == 0) {
// If we read nothing, ensure we flushed our previous reponses
writer.flush();
// And then wait until the socket becomes readable again
selector.select(key -> {
if (!key.isReadable()) {
throw new AssertionError("Expect to be readable again");
}
});
}
}
}
return null;
}
}
Performance Numbers
What is the performance after all this work? It did improve from our original naive solution by up to 15% in throughput. Plus the p99% latency nearly halved from 8ms to 4.7ms.
============================================================================================================================ Type Ops/sec Hits/sec Misses/sec Avg. Latency p50 Latency p99 Latency p99.9 Latency KB/sec ---------------------------------------------------------------------------------------------------------------------------- Sets 382280.79 --- --- 1.82333 1.59900 4.79900 45.31100 113448.09 Gets 3822767.78 1548404.66 2274363.12 1.82050 1.59900 4.79900 45.05500 540549.78 Waits 0.00 --- --- --- --- --- --- --- Totals 4205048.56 1548404.66 2274363.12 1.82076 1.59900 4.79900 45.05500 653997.87
Profiling Again
I also profiled this new solution as before
This solution also improved the memory pressure as a side effect. We have less GC pressure than before and less memory churn. However, a lot of boxed integers are allocated, grrr. These boxed integers seem to come from Java’s virtual thread implementation. Again virtual threads are cheap and give you more understandable code, but they are not for free.
Also, if we look at the wall clock profiling, then now all the time is waiting with 'select' for the socket to become readable. The reading, lookup, writing, and flushing are a tiny sliver timewise by now. Great, that was the goal.
Next Steps
The next planned step it to reduce the memory churn further.
Stay tuned ;)
Full Code
▼Click for full source code:
package info.gamlor.redis;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import static info.gamlor.redis.Writer.ensureAvailableWriteSpace;
import static java.nio.charset.StandardCharsets.UTF_8;
public class RedisMain {
public static void main(String[] args) throws Exception {
var scheduler = Executors.newVirtualThreadPerTaskExecutor();
var socket = ServerSocketChannel.open();
socket.bind(new InetSocketAddress("0.0.0.0", 16379));
System.out.println("App is listening on 0.0.0.0:16379");
var clone = new RedisClone();
while (true) {
var client = socket.accept();
scheduler.execute(() -> {
try (client) {
clone.handleConnection(client);
} catch (Exception e) {
e.printStackTrace();
}
});
}
}
}
class RedisClone {
private final ConcurrentHashMap<String, String> state = new ConcurrentHashMap<>();
public void handleConnection(SocketChannel socket) throws Exception {
var args = new ArrayList<String>();
// Configure the channel to be non-blocking: Our Writer and Reader class will control the blocking
socket.configureBlocking(false);
// Replace the JDK IO streams with our Reader and Writer
var writer = new Writer(socket);
var reader = new Reader(socket, writer);
while (true) {
args.clear();
var line = reader.readLine();
if (line == null)
break;
if (line.charAt(0) != '*')
throw new RuntimeException("Cannot understand arg batch: " + line);
var argsv = Integer.parseInt(line.substring(1));
for (int i = 0; i < argsv; i++) {
line = reader.readLine();
if (line == null || line.charAt(0) != '$')
throw new RuntimeException("Cannot understand arg length: " + line);
var argLen = Integer.parseInt(line.substring(1));
line = reader.readLine();
if (line == null || line.length() != argLen)
throw new RuntimeException("Wrong arg length expected " + argLen + " got: " + line);
args.add(line);
}
var reply = executeCommand(args);
if (reply == null) {
writer.write("$-1\r\n");
} else {
writer.write("$" + reply.length() + "\r\n" + reply + "\r\n");
}
}
}
String executeCommand(List<String> args) {
switch (args.get(0)) {
case "GET":
return state.get(args.get(1));
case "SET":
state.put(args.get(1), args.get(2));
return null;
default:
throw new IllegalArgumentException("Unknown command: " + args.get(1));
}
}
}
class Writer {
final SocketChannel socket;
final ByteBuf writeBuffer = Unpooled.buffer(4 * 1024);
public Writer(SocketChannel socket) throws IOException {
this.socket = socket;
assert !socket.isBlocking();
}
public void write(String toWrite) throws Exception {
var bytes = toWrite.getBytes(UTF_8);
ensureAvailableWriteSpace(writeBuffer);
writeBuffer.writeBytes(bytes);
final var AUTO_FLUSH_LIMIT = 1024;
if (AUTO_FLUSH_LIMIT < writeBuffer.readableBytes()) {
// A bit confusing in this use case: We read the buffers content into the socket: aka write to the socket
var written = writeBuffer.readBytes(socket, writeBuffer.readableBytes());
// If we want proper handling of the back pressure by waiting for the channel to be writable.
// But for this example we ignore such concerns and just grow the writeBuffer defiantly
}
}
public void flush() throws IOException {
if (writeBuffer.readableBytes() > 0) {
writeBuffer.readBytes(socket, writeBuffer.readableBytes());
}
}
// The Netty ByteBufs are not circular buffer: Writes always go to the end and may grow the buffer
// I assume the underlying reason is to make it more efficient to interact with Java NIO.
// So, if we're running out of writeable space, discard the bytes already written and
// copy the not yet read bytes to the start of the buffer, giving it enough space to write more at the end.
static int ensureAvailableWriteSpace(ByteBuf buf) {
final var MIN_WRITE_SPACE = 1024;
if (buf.writableBytes() < MIN_WRITE_SPACE) {
buf.discardReadBytes();
}
return Math.max(MIN_WRITE_SPACE, buf.writableBytes());
}
}
class Reader {
final SocketChannel socket;
final Writer writer;
final ByteBuf readBuffer = Unpooled.buffer(8 * 1024);
private final Selector selector;
public Reader(SocketChannel socket, Writer writer) throws IOException {
this.socket = socket;
this.writer = writer;
this.selector = Selector.open();
socket.register(selector, SelectionKey.OP_READ, this);
}
public String readLine() throws Exception {
var eof = false;
while (!eof) {
var readIndex = readBuffer.readerIndex();
var toIndex = readBuffer.readableBytes();
// Find the next line in the read content
var foundNewLine = readBuffer.indexOf(readIndex, readIndex + toIndex, (byte) '\n');
if (foundNewLine >= 0) {
var length = foundNewLine - readIndex;
String line = readBuffer.toString(readIndex, length - 1, UTF_8);
readBuffer.readerIndex(readIndex + length + 1);
return line;
} else {
// Otherwise, read from the socket
int readSize = ensureAvailableWriteSpace(readBuffer);
// A bit confusing in this use case: We write the content of the socket into the buffer: aka read from the channel
var read = readBuffer.writeBytes(socket, readSize);
if (read < 0) {
eof = true;
} else if (read == 0) {
// If we read nothing, ensure we flushed our previous reponses
writer.flush();
// And then wait until the socket becomes readable again
selector.select(key -> {
if (!key.isReadable()) {
throw new AssertionError("Expect to be readable again");
}
});
}
}
}
return null;
}
}
Twitter Comments
Ron Pressler, who’s working on Java’s virtual threads had a few good comments on Twitter.
Summary:
The Integers are not allocated by the virtual threads. That is from something else ;)
The Selector API isn’t intended to be consumed on virtual threads. Use something like Netty on regular threads if you want a nicer API.