Skip to content

Commit

Permalink
common: Add proper support for Virtual Threads (Java 21)
Browse files Browse the repository at this point in the history
Currently, running both server and clients from Virtual Threads is
impossible as they may block each other.

Modify the blocking code to use LockSupport.park/unpark internally, when
necessary.

For now use a naive poll implementation that spawns individual poll
syscalls on system threads (later, we will use epoll/kqueue when
available).

Improve the MassiveParallelTest to exercise the new code.
  • Loading branch information
kohlschuetter committed Apr 18, 2024
1 parent e61a567 commit c5962ba
Show file tree
Hide file tree
Showing 19 changed files with 1,520 additions and 335 deletions.
274 changes: 209 additions & 65 deletions junixsocket-common/src/main/java/org/newsclub/net/unix/AFCore.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,20 @@
import java.io.IOException;
import java.net.SocketAddress;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.eclipse.jdt.annotation.NonNull;
import org.newsclub.net.unix.pool.MutableHolder;
import org.newsclub.net.unix.pool.ObjectPool;
import org.newsclub.net.unix.pool.ObjectPool.Lease;

import com.kohlschutter.annotations.compiletime.SuppressFBWarnings;

/**
* The core functionality of file descriptor based I/O.
*
Expand All @@ -38,6 +44,12 @@ class AFCore extends CleanableState {
private static final ObjectPool<MutableHolder<ByteBuffer>> TL_BUFFER = ObjectPool
.newThreadLocalPool(() -> {
return new MutableHolder<>(null);
}, (o) -> {
ByteBuffer bb = o.get();
if (bb != null) {
bb.clear();
}
return true;
});

private static final String PROP_TL_BUFFER_MAX_CAPACITY =
Expand All @@ -54,7 +66,8 @@ class AFCore extends CleanableState {

private final boolean datagramMode;

private boolean blocking = true;
private final AtomicInteger virtualBlockingLeases = new AtomicInteger(0);
private volatile boolean blocking = true;
private boolean cleanFd = true;

AFCore(Object observed, FileDescriptor fd, AncillaryDataSupport ancillaryDataSupport,
Expand Down Expand Up @@ -120,11 +133,16 @@ synchronized FileDescriptor validFd() {
return null;
}

int read(ByteBuffer dst) throws IOException {
return read(dst, null, 0);
int read(ByteBuffer dst, AFSupplier<Integer> timeout) throws IOException {
return read(dst, timeout, null, 0);
}

int read(ByteBuffer dst, ByteBuffer socketAddressBuffer, int options) throws IOException {
@SuppressWarnings({
"PMD.NcssCount", "PMD.CognitiveComplexity", "PMD.CyclomaticComplexity",
"PMD.NPathComplexity"})
@SuppressFBWarnings("NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE")
int read(ByteBuffer dst, AFSupplier<Integer> timeout, ByteBuffer socketAddressBuffer, int options)
throws IOException {
int remaining = dst.remaining();
if (remaining == 0) {
return 0;
Expand All @@ -137,56 +155,107 @@ int read(ByteBuffer dst, ByteBuffer socketAddressBuffer, int options) throws IOE
int pos;

boolean direct = dst.isDirect();
try (Lease<MutableHolder<ByteBuffer>> lease = direct ? null : getPrivateDirectByteBuffer(
remaining)) {
if (direct) {
buf = dst;
pos = dstPos;
} else {
buf = Objects.requireNonNull(lease).get().get();
remaining = Math.min(remaining, buf.remaining());
pos = buf.position();
}

if (!blocking) {
options |= NativeUnixSocket.OPT_NON_BLOCKING;
}
final boolean virtualBlocking = (ThreadUtil.isVirtualThread() && isBlocking())
|| isVirtualBlocking();
final long now;
if (virtualBlocking) {
now = System.currentTimeMillis();
} else {
now = 0;
}
if (virtualBlocking || !blocking) {
options |= NativeUnixSocket.OPT_NON_BLOCKING;
}

int count = NativeUnixSocket.receive(fdesc, buf, pos, remaining, socketAddressBuffer, options,
ancillaryDataSupport, 0);
if (count == -1) {
return count;
}
boolean park = false;

if (direct) {
if (count < 0) {
throw new IllegalStateException();
int count;
virtualThreadLoop : do {
if (virtualBlocking) {
if (park) {
VirtualThreadPoller.INSTANCE.parkThreadUntilReady(fdesc, SelectionKey.OP_WRITE, now,
timeout);
}
dst.position(pos + count);
} else {
int oldLimit = buf.limit();
if (count < oldLimit) {
buf.limit(count);
configureVirtualBlocking(true);
}

try (Lease<MutableHolder<ByteBuffer>> lease = direct ? null : getPrivateDirectByteBuffer(
remaining)) {
if (direct) {
buf = dst;
pos = dstPos;
} else {
buf = Objects.requireNonNull(Objects.requireNonNull(lease).get().get());
remaining = Math.min(remaining, buf.remaining());
pos = buf.position();
buf.limit(pos + remaining);
}

try {
while (buf.hasRemaining()) {
dst.put(buf);
count = NativeUnixSocket.receive(fdesc, buf, pos, remaining, socketAddressBuffer, options,
ancillaryDataSupport, 0);
if (count == 0 && virtualBlocking) {
// try again
park = true;
continue virtualThreadLoop;
}
} finally {
} catch (SocketTimeoutException e) {
if (virtualBlocking) {
// try again
park = true;
continue virtualThreadLoop;
} else {
throw e;
}
}

if (count == -1 || buf == null) {
return -1;
}

if (direct) {
if (count < 0) {
throw new IllegalStateException();
}
dst.position(pos + count);
} else {
int oldLimit = buf.limit();
if (count < oldLimit) {
buf.limit(oldLimit);
buf.limit(count);
}
try {
while (buf.hasRemaining()) {
dst.put(buf);
}
} finally {
if (count < oldLimit) {
buf.limit(oldLimit);
}
}
}
} finally {
if (virtualBlocking) {
configureVirtualBlocking(false);
}
}
return count;
}

break; // NOPMD.AvoidBranchingStatementAsLastInLoop virtualThreadLoop
} while (true); // NOPMD.WhileLoopWithLiteralBoolean

return count;
}

int write(ByteBuffer src) throws IOException {
return write(src, null, 0);
int write(ByteBuffer src, AFSupplier<Integer> timeout) throws IOException {
return write(src, timeout, null, 0);
}

int write(ByteBuffer src, SocketAddress target, int options) throws IOException {
@SuppressWarnings({
"PMD.NcssCount", "PMD.CognitiveComplexity", "PMD.CyclomaticComplexity",
"PMD.NPathComplexity"})
@SuppressFBWarnings("NP_NULL_ON_SOME_PATH_FROM_RETURN_VALUE")
int write(ByteBuffer src, AFSupplier<Integer> timeout, SocketAddress target, int options)
throws IOException {
int remaining = src.remaining();

if (remaining == 0) {
Expand All @@ -208,41 +277,81 @@ int write(ByteBuffer src, SocketAddress target, int options) throws IOException

// accept "send buffer overflow" as packet loss
// and don't retry (which may slow things down quite a bit)
if (!blocking) {
options |= NativeUnixSocket.OPT_NON_BLOCKING;
}

int pos = src.position();
boolean isDirect = src.isDirect();
ByteBuffer buf;
int bufPos;

try (Lease<MutableHolder<ByteBuffer>> lease = isDirect ? null : getPrivateDirectByteBuffer(
remaining)) {
if (isDirect) {
buf = src;
bufPos = pos;
} else {
buf = Objects.requireNonNull(lease).get().get();
remaining = Math.min(remaining, buf.remaining());
final boolean virtualBlocking = (ThreadUtil.isVirtualThread() && isBlocking())
|| isVirtualBlocking();
final long now;
if (virtualBlocking) {
now = System.currentTimeMillis();
} else {
now = 0;
}
if (virtualBlocking || !blocking) {
options |= NativeUnixSocket.OPT_NON_BLOCKING;
}
if (datagramMode) {
options |= NativeUnixSocket.OPT_DGRAM_MODE;
}

bufPos = buf.position();
int written;

while (src.hasRemaining() && buf.hasRemaining()) {
buf.put(src);
boolean park = false;
virtualThreadLoop : do {
if (virtualBlocking) {
if (park) {
VirtualThreadPoller.INSTANCE.parkThreadUntilReady(fdesc, SelectionKey.OP_WRITE, now,
timeout);
}

buf.position(bufPos);
}
if (datagramMode) {
options |= NativeUnixSocket.OPT_DGRAM_MODE;
configureVirtualBlocking(true);
}

int written = NativeUnixSocket.send(fdesc, buf, bufPos, remaining, addressTo, addressToLen,
options, ancillaryDataSupport);
src.position(pos + written);
return written;
}
try (Lease<MutableHolder<ByteBuffer>> lease = isDirect ? null : getPrivateDirectByteBuffer(
remaining)) {
if (isDirect) {
buf = src;
bufPos = pos;
} else {
buf = Objects.requireNonNull(Objects.requireNonNull(lease).get().get());
remaining = Math.min(remaining, buf.remaining());

bufPos = buf.position();

while (src.hasRemaining() && buf.hasRemaining()) {
buf.put(src);
}

buf.position(bufPos);
}

written = NativeUnixSocket.send(fdesc, buf, bufPos, remaining, addressTo, addressToLen,
options, ancillaryDataSupport);
if (written == 0 && virtualBlocking) {
// try again
park = true;
continue virtualThreadLoop;
}
} catch (SocketTimeoutException e) {
if (virtualBlocking) {
// try again
park = true;
continue virtualThreadLoop;
} else {
throw e;
}
} finally {
if (virtualBlocking) {
configureVirtualBlocking(false);
}
}
break; // NOPMD.AvoidBranchingStatementAsLastInLoop virtualThreadLoop
} while (true); // NOPMD.WhileLoopWithLiteralBoolean
src.position(pos + written);
return written;
}
}

Expand All @@ -256,7 +365,7 @@ int write(ByteBuffer src, SocketAddress target, int options) throws IOException
* @param capacity The desired capacity.
* @return A byte buffer satisfying the requested capacity.
*/
Lease<MutableHolder<ByteBuffer>> getPrivateDirectByteBuffer(int capacity) {
Lease<MutableHolder<@NonNull ByteBuffer>> getPrivateDirectByteBuffer(int capacity) {
if (capacity > TL_BUFFER_MAX_CAPACITY && TL_BUFFER_MAX_CAPACITY > 0) {
// Capacity exceeds configurable maximum limit;
// allocate but do not cache direct buffer.
Expand All @@ -278,8 +387,43 @@ Lease<MutableHolder<ByteBuffer>> getPrivateDirectByteBuffer(int capacity) {
}

void implConfigureBlocking(boolean block) throws IOException {
NativeUnixSocket.configureBlocking(validFdOrException(), block);
this.blocking = block;
if (block && isVirtualBlocking()) {
// do not actually change it here, defer it to when the virtual blocking counter goes to 0
} else {
NativeUnixSocket.configureBlocking(validFdOrException(), block);
}
}

/**
* Increments/decrements the "virtual blocking" counter (calls must be in pairs/balanced using
* try-finally blocks).
*
* @param enabled {@code true} if increment, {@code false} if decrement.
* @throws SocketException on error.
* @throws IOException on error, including count overflow/underflow.
*/
void configureVirtualBlocking(boolean enabled) throws SocketException, IOException {
int v;
if (enabled) {
if ((v = this.virtualBlockingLeases.incrementAndGet()) >= 1 && blocking) {
NativeUnixSocket.configureBlocking(validFdOrException(), false);
}
if (v >= Integer.MAX_VALUE) {
throw new IOException("blocking overflow");
}
} else {
if ((v = this.virtualBlockingLeases.decrementAndGet()) == 0 && blocking) {
NativeUnixSocket.configureBlocking(validFdOrException(), true);
}
if (v < 0) {
throw new IOException("blocking underflow");
}
}
}

boolean isVirtualBlocking() {
return virtualBlockingLeases.get() > 0;
}

boolean isBlocking() {
Expand Down
Loading

0 comments on commit c5962ba

Please sign in to comment.