Skip to content

Commit

Permalink
Register a resource for EPollEventLoopGroup
Browse files Browse the repository at this point in the history
  • Loading branch information
rvansa committed Aug 14, 2023
1 parent 913af68 commit 7c4b82b
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 0 deletions.
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-classes-epoll</artifactId>
<version>4.1.96.Final</version>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down
45 changes: 45 additions & 0 deletions src/main/java/io/vertx/core/net/impl/transport/EpollTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,26 @@
import io.netty.channel.epoll.EpollChannelOption;
import io.netty.channel.epoll.EpollDatagramChannel;
import io.netty.channel.epoll.EpollDomainSocketChannel;
import io.netty.channel.epoll.EpollEventLoop;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerDomainSocketChannel;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.InternetProtocolFamily;
import io.netty.channel.unix.DomainSocketAddress;
import io.netty.util.concurrent.EventExecutor;
import io.vertx.core.datagram.DatagramSocketOptions;
import io.vertx.core.net.ClientOptionsBase;
import io.vertx.core.net.NetServerOptions;
import io.vertx.core.net.impl.SocketAddressImpl;
import org.crac.Context;
import org.crac.Core;
import org.crac.Resource;

import java.net.SocketAddress;
import java.util.WeakHashMap;
import java.util.concurrent.Phaser;
import java.util.concurrent.ThreadFactory;

/**
Expand All @@ -40,6 +47,10 @@ class EpollTransport extends Transport {

private static volatile int pendingFastOpenRequestsThreshold = 256;

// Keeps the EpollResource alive until the EpollEventLoopGroup is GC'ed
@SuppressWarnings("MismatchedQueryAndUpdateOfCollection")
private final WeakHashMap<EpollEventLoopGroup, EpollResource> resources = new WeakHashMap<>();

/**
* Return the number of of pending TFO connections in SYN-RCVD state for TCP_FASTOPEN.
*
Expand Down Expand Up @@ -95,6 +106,9 @@ public Throwable unavailabilityCause() {
public EventLoopGroup eventLoopGroup(int type, int nThreads, ThreadFactory threadFactory, int ioRatio) {
EpollEventLoopGroup eventLoopGroup = new EpollEventLoopGroup(nThreads, threadFactory);
eventLoopGroup.setIoRatio(ioRatio);
synchronized (resources) {
resources.put(eventLoopGroup, new EpollResource(eventLoopGroup));
}
return eventLoopGroup;
}

Expand Down Expand Up @@ -155,4 +169,35 @@ public void configure(ClientOptionsBase options, boolean domainSocket, Bootstrap
}
super.configure(options, domainSocket, bootstrap);
}

private static class EpollResource implements Resource {
private final EpollEventLoopGroup eventLoopGroup;
private final Phaser phaser;

public EpollResource(EpollEventLoopGroup eventLoopGroup) {
this.eventLoopGroup = eventLoopGroup;
// contrary to Barrier the Phaser supports uninterruptible waiting
this.phaser = new Phaser(eventLoopGroup.executorCount() + 1);
Core.getGlobalContext().register(this);
}

@Override
public void beforeCheckpoint(Context<? extends Resource> context) throws Exception {
for (EventExecutor executor : eventLoopGroup) {
EpollEventLoop eventLoop = (EpollEventLoop) executor;
executor.execute(() -> {
eventLoop.closeFileDescriptors();
phaser.arriveAndAwaitAdvance();
phaser.arriveAndAwaitAdvance();
eventLoop.openFileDescriptors();
});
}
phaser.arriveAndAwaitAdvance();
}

@Override
public void afterRestore(Context<? extends Resource> context) throws Exception {
phaser.arriveAndAwaitAdvance();
}
}
}

0 comments on commit 7c4b82b

Please sign in to comment.