Skip to content

Commit

Permalink
Rework AFSelector to normalize and simplify its locking
Browse files Browse the repository at this point in the history
* Place bulk of `select0` operations under `this` and
  `publicSelectedKeys` locks to reduce locking/unlocking thrash given
  frequency of locking operations
* Lock `publicSelectedKeys` instead of `selectedKeys` since that is what
  clients of AFSelector are locking on
* Eschew uses of synchronized methods in favor of locking `this` via
  synchronized blocks
* Use assertions to check lock invariants
  • Loading branch information
ThePumpingLemma committed Nov 7, 2023
1 parent b9695d6 commit 1346d2c
Showing 1 changed file with 88 additions and 78 deletions.
166 changes: 88 additions & 78 deletions junixsocket-common/src/main/java/org/newsclub/net/unix/AFSelector.java
Original file line number Diff line number Diff line change
Expand Up @@ -111,47 +111,51 @@ public int select() throws IOException {

@SuppressWarnings("PMD.CognitiveComplexity")
private int select0(int timeout) throws IOException {
PollFd pfd;
synchronized (this) {
if (!isOpen()) {
throw new ClosedSelectorException();
}
pfd = pollFd = initPollFd(pollFd);
processDeregisterQueue();
selectedKeys.clear();
}
int num;
try {
begin();
num = NativeUnixSocket.poll(pfd, timeout);
} finally {
end();
}
synchronized (this) {
selectedKeys.clear();
pfd = pollFd;
if (pfd != null) {
AFSelectionKey[] keys = pfd.keys;
if (keys != null) {
for (AFSelectionKey key : keys) {
if (key != null && key.hasOpInvalid()) {
SelectableChannel ch = key.channel();
if (ch != null && ch.isOpen()) {
ch.close();

synchronized (publicSelectedKeys) {
PollFd pfd = pollFd = initPollFd(pollFd);
processDeregisterQueue();
publicSelectedKeys.clear();

int num;
try {
begin();
num = NativeUnixSocket.poll(pfd, timeout);
} finally {
end();
}

publicSelectedKeys.clear();
pfd = pollFd;
if (pfd != null) {
AFSelectionKey[] keys = pfd.keys;
if (keys != null) {
for (AFSelectionKey key : keys) {
if (key != null && key.hasOpInvalid()) {
SelectableChannel ch = key.channel();
if (ch != null && ch.isOpen()) {
ch.close();
}
}
}
}
}
if (num > 0) {
consumeAllBytesAfterPoll();
setOpsReady(pfd); // updates keysSelected and numKeysSelected
}
return publicSelectedKeys.size();
}
if (num > 0) {
consumeAllBytesAfterPoll();
setOpsReady(pfd); // updates keysSelected and numKeysSelected
}
return selectedKeys.size();
}
}

private synchronized void consumeAllBytesAfterPoll() throws IOException {
private void consumeAllBytesAfterPoll() throws IOException {
assert Thread.holdsLock(this);

if (pollFd == null) {
return;
}
Expand Down Expand Up @@ -185,7 +189,10 @@ private synchronized void consumeAllBytesAfterPoll() throws IOException {
}
}

private synchronized void setOpsReady(PollFd pfd) {
private void setOpsReady(PollFd pfd) {
assert Thread.holdsLock(this);
assert Thread.holdsLock(publicSelectedKeys);

if (pfd != null) {
for (int i = 1; i < pfd.rops.length; i++) {
int rops = pfd.rops[i];
Expand All @@ -203,65 +210,65 @@ private synchronized void setOpsReady(PollFd pfd) {

@SuppressWarnings({"resource", "PMD.CognitiveComplexity"})
private PollFd initPollFd(PollFd existingPollFd) throws IOException {
synchronized (this) {
for (Iterator<AFSelectionKey> it = registeredKeys.iterator(); it.hasNext();) {
AFSelectionKey key = it.next();
if (!key.getAFCore().fd.valid() || !key.isValid()) {
key.cancel();
it.remove();
existingPollFd = null;
} else {
key.setOpsReady(0);
}
}
assert Thread.holdsLock(this);

if (existingPollFd != null && //
existingPollFd.keys != null && //
(existingPollFd.keys.length - 1) == registeredKeys.size()) {
boolean needsUpdate = false;
int i = 1;
for (AFSelectionKey key : registeredKeys) {
if (existingPollFd.keys[i] != key || !key.isValid()) { // NOPMD
needsUpdate = true;
break;
}
existingPollFd.ops[i] = key.interestOps();
for (Iterator<AFSelectionKey> it = registeredKeys.iterator(); it.hasNext();) {
AFSelectionKey key = it.next();
if (!key.getAFCore().fd.valid() || !key.isValid()) {
key.cancel();
it.remove();
existingPollFd = null;
} else {
key.setOpsReady(0);
}
}

i++;
if (existingPollFd != null && //
existingPollFd.keys != null && //
(existingPollFd.keys.length - 1) == registeredKeys.size()) {
boolean needsUpdate = false;
int i = 1;
for (AFSelectionKey key : registeredKeys) {
if (existingPollFd.keys[i] != key || !key.isValid()) { // NOPMD
needsUpdate = true;
break;
}
existingPollFd.ops[i] = key.interestOps();

if (!needsUpdate) {
return existingPollFd;
}
i++;
}

int keysToPoll = registeredKeys.size();
for (AFSelectionKey key : registeredKeys) {
if (!key.isValid()) {
keysToPoll--;
}
if (!needsUpdate) {
return existingPollFd;
}
}

int keysToPoll = registeredKeys.size();
for (AFSelectionKey key : registeredKeys) {
if (!key.isValid()) {
keysToPoll--;
}
}

int size = keysToPoll + 1;
FileDescriptor[] fds = new FileDescriptor[size];
int[] ops = new int[size];
int size = keysToPoll + 1;
FileDescriptor[] fds = new FileDescriptor[size];
int[] ops = new int[size];

AFSelectionKey[] keys = new AFSelectionKey[size];
fds[0] = selectorPipe.sourceFD();
ops[0] = SelectionKey.OP_READ;
AFSelectionKey[] keys = new AFSelectionKey[size];
fds[0] = selectorPipe.sourceFD();
ops[0] = SelectionKey.OP_READ;

int i = 1;
for (AFSelectionKey key : registeredKeys) {
if (!key.isValid()) {
continue;
}
keys[i] = key;
fds[i] = key.getAFCore().fd;
ops[i] = key.interestOps();
i++;
int i = 1;
for (AFSelectionKey key : registeredKeys) {
if (!key.isValid()) {
continue;
}
return new PollFd(keys, fds, ops);
keys[i] = key;
fds[i] = key.getAFCore().fd;
ops[i] = key.interestOps();
i++;
}
return new PollFd(keys, fds, ops);
}

@Override
Expand Down Expand Up @@ -309,10 +316,13 @@ public void cancel(AFSelectionKey key) {
}

void processDeregisterQueue() {
assert Thread.holdsLock(this);
assert Thread.holdsLock(publicSelectedKeys);

synchronized (cancelledKeys) {
SelectionKey key;
while ((key = cancelledKeys.pollFirst()) != null) {
selectedKeys.remove(key);
publicSelectedKeys.remove(key);
deregister((AFSelectionKey) key);
pollFd = null;
}
Expand Down

0 comments on commit 1346d2c

Please sign in to comment.