Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Register a resource for EPollEventLoopGroup #1

Draft
wants to merge 4 commits into
base: 4.3_crac
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,16 @@
[![Build Status](https://github.com/eclipse-vertx/vert.x/workflows/CI/badge.svg?branch=master)](https://github.com/eclipse-vertx/vert.x/actions?query=workflow%3ACI)

## Fork information

This is a forked version of the Vert.x project intended to be used before the support for CRaC lands in the official version. The artifact should land on these Maven coordinates:

```
<dependency>
<groupId>io.github.crac.io.vertx</groupId>
<artifactId>vertx-core</artifactId>
<version><!-- version matching Vert.x Core release</version>
</dependency>
```


## Vert.x Core

Expand Down
20 changes: 17 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
<version>19</version>
</parent>

<groupId>io.github.crac.io.vertx</groupId>
<artifactId>vertx-core</artifactId>
<version>4.3.9-SNAPSHOT</version>

Expand All @@ -38,9 +39,9 @@
</licenses>

<scm>
<connection>scm:git:[email protected]:eclipse/vert.x.git</connection>
<developerConnection>scm:git:[email protected]:eclipse/vert.x.git</developerConnection>
<url>[email protected]:eclipse/vert.x.git</url>
<connection>scm:git:[email protected]:CRaC/vert.x.git</connection>
<developerConnection>scm:git:[email protected]:CRaC/vert.x.git</developerConnection>
<url>[email protected]:CRaC/vert.x.git</url>
</scm>

<properties>
Expand All @@ -55,6 +56,7 @@
<vertx.testNativeTransport>false</vertx.testNativeTransport>
<vertx.testDomainSockets>false</vertx.testDomainSockets>
<jar.manifest>${project.basedir}/src/main/resources/META-INF/MANIFEST.MF</jar.manifest>
<org.crac.version>0.1.3</org.crac.version>
</properties>

<dependencyManagement>
Expand All @@ -66,6 +68,11 @@
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-classes-epoll</artifactId>
<version>4.1.94.Final-SNAPSHOT</version>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down Expand Up @@ -167,6 +174,13 @@
<optional>true</optional>
</dependency>

<!-- CRaC -->
<dependency>
<groupId>org.crac</groupId>
<artifactId>crac</artifactId>
<version>${org.crac.version}</version>
</dependency>

<!-- Testing -->
<dependency>
<groupId>junit</groupId>
Expand Down
52 changes: 52 additions & 0 deletions src/main/java/io/vertx/core/net/impl/NetClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,17 @@
import io.vertx.core.spi.metrics.Metrics;
import io.vertx.core.spi.metrics.MetricsProvider;
import io.vertx.core.spi.metrics.TCPMetrics;
import org.crac.CheckpointException;
import org.crac.Context;
import org.crac.Core;
import org.crac.Resource;

import java.io.FileNotFoundException;
import java.net.ConnectException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;

Expand All @@ -73,6 +80,8 @@ public class NetClientImpl implements MetricsProvider, NetClient, Closeable {
private final CloseFuture closeFuture;
private final Predicate<SocketAddress> proxyFilter;

private final ResourceImpl cracResource = new ResourceImpl();

public NetClientImpl(VertxInternal vertx, TCPMetrics metrics, NetClientOptions options, CloseFuture closeFuture) {
this.vertx = vertx;
this.channelGroup = new DefaultChannelGroup(vertx.getAcceptorEventLoopGroup().next());
Expand All @@ -86,6 +95,7 @@ public NetClientImpl(VertxInternal vertx, TCPMetrics metrics, NetClientOptions o
this.idleTimeoutUnit = options.getIdleTimeoutUnit();
this.closeFuture = closeFuture;
this.proxyFilter = options.getNonProxyHosts() != null ? ProxyFilter.nonProxyHosts(options.getNonProxyHosts()) : ProxyFilter.DEFAULT_PROXY_FILTER;
Core.getGlobalContext().register(cracResource);
}

protected void initChannel(ChannelPipeline pipeline) {
Expand Down Expand Up @@ -258,6 +268,11 @@ private void connectInternal2(ProxyOptions proxyOptions,
EventLoop eventLoop = context.nettyEventLoop();

if (eventLoop.inEventLoop()) {
if (cracResource.delayed != null) {
cracResource.delayed.add(() -> connectInternal2(proxyOptions, remoteAddress, peerAddress, serverName, ssl, useAlpn, registerWriteHandlers, connectHandler, context, remainingAttempts));
return;
}

Objects.requireNonNull(connectHandler, "No null connectHandler accepted");
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoop);
Expand Down Expand Up @@ -319,5 +334,42 @@ private void failed(ContextInternal context, Channel ch, Throwable th, Promise<N
}
context.emit(th, connectHandler::tryFail);
}

private class ResourceImpl implements Resource {
private List<Runnable> delayed;

@Override
public void beforeCheckpoint(Context<? extends Resource> context) {
// We cannot block the whole eventloop because there might be other resources
// using that. We will delay all new connections until restore instead.
CountDownLatch latch = new CountDownLatch(1);
vertx.getOrCreateContext().runOnContext(ignored -> {
delayed = new ArrayList<>();
channelGroup.close();
latch.countDown();
});
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

@Override
public void afterRestore(Context<? extends Resource> context) {
CountDownLatch latch = new CountDownLatch(1);
vertx.getOrCreateContext().runOnContext(ignored -> {
List<Runnable> toRun = delayed;
delayed = null;
toRun.forEach(Runnable::run);
latch.countDown();
});
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}

14 changes: 14 additions & 0 deletions src/main/java/io/vertx/core/net/impl/pool/ConnectionPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -149,4 +149,18 @@ static <C> ConnectionPool<C> pool(PoolConnector<C> connector, int[] maxSizes, in
* to take decisions, this can be used for statistic or testing purpose
*/
int requests();

/**
* Removes all connections from the pool and returns them in the handler. The pool
* is blocked from creating new connections until {@link #resume()} is invoked.
*
* @param handler the callback handler with the result
*/
void suspend(Handler<AsyncResult<List<Future<C>>>> handler);

/**
* Allows a {@link #suspend suspended} connection pool to continue allocating
* and serving new connections.
*/
void resume();
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,7 @@
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.EventLoopContext;

import java.util.AbstractList;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.*;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
Expand Down Expand Up @@ -903,4 +899,62 @@ public int size() {
return size;
}
}

@Override
public void suspend(Handler<AsyncResult<List<Future<C>>>> handler) {
execute(new Suspend<>(handler));
}

private static class Suspend<C> implements Executor.Action<SimpleConnectionPool<C>> {
private final Handler<AsyncResult<List<Future<C>>>> handler;

private Suspend(Handler<AsyncResult<List<Future<C>>>> handler) {
this.handler = handler;
}

@Override
public Task execute(SimpleConnectionPool<C> pool) {
if (pool.closed) {
return new Task() {
@Override
public void run() {
handler.handle(Future.succeededFuture(Collections.emptyList()));
}
};
}
List<Future<C>> list = new ArrayList<>();
for (int i = 0; i < pool.size;i++) {
Slot<C> slot = pool.slots[i];
pool.slots[i] = null;
PoolWaiter<C> waiter = slot.initiator;
if (waiter != null) {
pool.waiters.addFirst(slot.initiator);
slot.initiator = null;
}
list.add(slot.result.future());
}
pool.size = 0;
// prevent creating further connections
pool.capacity = pool.maxCapacity;
return new Task() {
@Override
public void run() {
handler.handle(Future.succeededFuture(list));
}
};
}
}

@Override
public void resume() {
execute(new Resume<>());
}

private static class Resume<C> implements Executor.Action<SimpleConnectionPool<C>> {
@Override
public Task execute(SimpleConnectionPool<C> pool) {
pool.capacity = 0;
return null;
}
}
}
45 changes: 45 additions & 0 deletions src/main/java/io/vertx/core/net/impl/transport/EpollTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,26 @@
import io.netty.channel.epoll.EpollChannelOption;
import io.netty.channel.epoll.EpollDatagramChannel;
import io.netty.channel.epoll.EpollDomainSocketChannel;
import io.netty.channel.epoll.EpollEventLoop;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerDomainSocketChannel;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.InternetProtocolFamily;
import io.netty.channel.unix.DomainSocketAddress;
import io.netty.util.concurrent.EventExecutor;
import io.vertx.core.datagram.DatagramSocketOptions;
import io.vertx.core.net.ClientOptionsBase;
import io.vertx.core.net.NetServerOptions;
import io.vertx.core.net.impl.SocketAddressImpl;
import org.crac.Context;
import org.crac.Core;
import org.crac.Resource;

import java.net.SocketAddress;
import java.util.WeakHashMap;
import java.util.concurrent.Phaser;
import java.util.concurrent.ThreadFactory;

/**
Expand All @@ -40,6 +47,10 @@ class EpollTransport extends Transport {

private static volatile int pendingFastOpenRequestsThreshold = 256;

// Keeps the EpollResource alive until the EpollEventLoopGroup is GC'ed
@SuppressWarnings("MismatchedQueryAndUpdateOfCollection")
private final WeakHashMap<EpollEventLoopGroup, EpollResource> resources = new WeakHashMap<>();

/**
* Return the number of of pending TFO connections in SYN-RCVD state for TCP_FASTOPEN.
*
Expand Down Expand Up @@ -95,6 +106,9 @@ public Throwable unavailabilityCause() {
public EventLoopGroup eventLoopGroup(int type, int nThreads, ThreadFactory threadFactory, int ioRatio) {
EpollEventLoopGroup eventLoopGroup = new EpollEventLoopGroup(nThreads, threadFactory);
eventLoopGroup.setIoRatio(ioRatio);
synchronized (resources) {
resources.put(eventLoopGroup, new EpollResource(eventLoopGroup));
}
return eventLoopGroup;
}

Expand Down Expand Up @@ -155,4 +169,35 @@ public void configure(ClientOptionsBase options, boolean domainSocket, Bootstrap
}
super.configure(options, domainSocket, bootstrap);
}

private static class EpollResource implements Resource {
private final EpollEventLoopGroup eventLoopGroup;
private final Phaser phaser;

public EpollResource(EpollEventLoopGroup eventLoopGroup) {
this.eventLoopGroup = eventLoopGroup;
// contrary to Barrier the Phaser supports uninterruptible waiting
this.phaser = new Phaser(eventLoopGroup.executorCount() + 1);
Core.getGlobalContext().register(this);
}

@Override
public void beforeCheckpoint(Context<? extends Resource> context) throws Exception {
for (EventExecutor executor : eventLoopGroup) {
EpollEventLoop eventLoop = (EpollEventLoop) executor;
executor.execute(() -> {
eventLoop.closeFileDescriptors();
phaser.arriveAndAwaitAdvance();
phaser.arriveAndAwaitAdvance();
eventLoop.openFileDescriptors();
});
}
phaser.arriveAndAwaitAdvance();
}

@Override
public void afterRestore(Context<? extends Resource> context) throws Exception {
phaser.arriveAndAwaitAdvance();
}
}
}