From b9baf36dc6c3bbdee8c673a0c4ed07211d289560 Mon Sep 17 00:00:00 2001 From: Radim Vansa Date: Thu, 8 Jun 2023 14:41:04 +0200 Subject: [PATCH] Register a resource for EPollEventLoopGroup --- pom.xml | 5 +++ .../net/impl/transport/EpollTransport.java | 45 +++++++++++++++++++ 2 files changed, 50 insertions(+) diff --git a/pom.xml b/pom.xml index db991286e3f..994e68f6cf3 100644 --- a/pom.xml +++ b/pom.xml @@ -79,6 +79,11 @@ pom import + + io.netty + netty-transport-classes-epoll + 4.1.96.Final + diff --git a/src/main/java/io/vertx/core/net/impl/transport/EpollTransport.java b/src/main/java/io/vertx/core/net/impl/transport/EpollTransport.java index 83a1324870c..e3b2998ea38 100644 --- a/src/main/java/io/vertx/core/net/impl/transport/EpollTransport.java +++ b/src/main/java/io/vertx/core/net/impl/transport/EpollTransport.java @@ -18,6 +18,7 @@ import io.netty.channel.epoll.EpollChannelOption; import io.netty.channel.epoll.EpollDatagramChannel; import io.netty.channel.epoll.EpollDomainSocketChannel; +import io.netty.channel.epoll.EpollEventLoop; import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.epoll.EpollServerDomainSocketChannel; import io.netty.channel.epoll.EpollServerSocketChannel; @@ -25,12 +26,18 @@ import io.netty.channel.socket.DatagramChannel; import io.netty.channel.socket.InternetProtocolFamily; import io.netty.channel.unix.DomainSocketAddress; +import io.netty.util.concurrent.EventExecutor; import io.vertx.core.datagram.DatagramSocketOptions; import io.vertx.core.net.ClientOptionsBase; import io.vertx.core.net.NetServerOptions; import io.vertx.core.net.impl.SocketAddressImpl; +import org.crac.Context; +import org.crac.Core; +import org.crac.Resource; import java.net.SocketAddress; +import java.util.WeakHashMap; +import java.util.concurrent.Phaser; import java.util.concurrent.ThreadFactory; /** @@ -40,6 +47,10 @@ class EpollTransport extends Transport { private static volatile int pendingFastOpenRequestsThreshold = 256; + // Keeps the EpollResource alive until the EpollEventLoopGroup is GC'ed + @SuppressWarnings("MismatchedQueryAndUpdateOfCollection") + private final WeakHashMap resources = new WeakHashMap<>(); + /** * Return the number of of pending TFO connections in SYN-RCVD state for TCP_FASTOPEN. * @@ -95,6 +106,9 @@ public Throwable unavailabilityCause() { public EventLoopGroup eventLoopGroup(int type, int nThreads, ThreadFactory threadFactory, int ioRatio) { EpollEventLoopGroup eventLoopGroup = new EpollEventLoopGroup(nThreads, threadFactory); eventLoopGroup.setIoRatio(ioRatio); + synchronized (resources) { + resources.put(eventLoopGroup, new EpollResource(eventLoopGroup)); + } return eventLoopGroup; } @@ -155,4 +169,35 @@ public void configure(ClientOptionsBase options, boolean domainSocket, Bootstrap } super.configure(options, domainSocket, bootstrap); } + + private static class EpollResource implements Resource { + private final EpollEventLoopGroup eventLoopGroup; + private final Phaser phaser; + + public EpollResource(EpollEventLoopGroup eventLoopGroup) { + this.eventLoopGroup = eventLoopGroup; + // contrary to Barrier the Phaser supports uninterruptible waiting + this.phaser = new Phaser(eventLoopGroup.executorCount() + 1); + Core.getGlobalContext().register(this); + } + + @Override + public void beforeCheckpoint(Context context) throws Exception { + for (EventExecutor executor : eventLoopGroup) { + EpollEventLoop eventLoop = (EpollEventLoop) executor; + executor.execute(() -> { + eventLoop.closeFileDescriptors(); + phaser.arriveAndAwaitAdvance(); + phaser.arriveAndAwaitAdvance(); + eventLoop.openFileDescriptors(); + }); + } + phaser.arriveAndAwaitAdvance(); + } + + @Override + public void afterRestore(Context context) throws Exception { + phaser.arriveAndAwaitAdvance(); + } + } }