From a9c99d1531e8a5dda38ed410dd85bf7234d086d1 Mon Sep 17 00:00:00 2001 From: Kevin Read Date: Wed, 24 Jan 2024 13:45:58 +0100 Subject: [PATCH 1/9] Add test for opening a local Polling-only socket.IO server and running a test with multiple messages in one HTTP body. Bump surefire to v3 so we can execute tests with Java modules. --- pom.xml | 21 +- .../socketio/transport/HttpTransportTest.java | 209 ++++++++++++++++++ 2 files changed, 228 insertions(+), 2 deletions(-) create mode 100644 src/test/java/com/corundumstudio/socketio/transport/HttpTransportTest.java diff --git a/pom.xml b/pom.xml index a1674c07..7e0a9730 100644 --- a/pom.xml +++ b/pom.xml @@ -50,6 +50,7 @@ true UTF-8 4.1.100.Final + 11 @@ -355,7 +356,7 @@ default-compile - 9 + 11 @@ -373,6 +374,20 @@ + + + default-testCompile + process-test-sources + + testCompile + + + ${testCompileSource} + ${testCompileSource} + ${testCompileSource} + --add-modules=java.net.http + + @@ -398,10 +413,12 @@ org.apache.maven.plugins maven-surefire-plugin - 2.22.2 + 3.2.5 -javaagent:"${settings.localRepository}"/org/jmockit/jmockit/1.46/jmockit-1.46.jar + --add-reads netty.socketio=java.net.http + --add-modules=java.net.http diff --git a/src/test/java/com/corundumstudio/socketio/transport/HttpTransportTest.java b/src/test/java/com/corundumstudio/socketio/transport/HttpTransportTest.java new file mode 100644 index 00000000..7b1d1e64 --- /dev/null +++ b/src/test/java/com/corundumstudio/socketio/transport/HttpTransportTest.java @@ -0,0 +1,209 @@ +/** + * Copyright (c) 2012-2023 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.corundumstudio.socketio.transport; + +import com.corundumstudio.socketio.Configuration; +import com.corundumstudio.socketio.SocketConfig; +import com.corundumstudio.socketio.SocketIOClient; +import com.corundumstudio.socketio.SocketIOServer; +import com.corundumstudio.socketio.Transport; +import com.corundumstudio.socketio.listener.ExceptionListener; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.netty.channel.ChannelHandlerContext; +import java.io.IOException; +import java.net.ServerSocket; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpRequest.BodyPublishers; +import java.net.http.HttpResponse; +import java.net.http.HttpResponse.BodyHandlers; +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class HttpTransportTest { + + private SocketIOServer server; + + private ObjectMapper mapper = new ObjectMapper(); + + private Pattern responseJsonMatcher = Pattern.compile("([0-9]+)(\\{.*\\})?"); + + private Pattern multiResponsePattern = Pattern.compile("((?[0-9])(?[0-9]*)(?.+)\\x{1E})*(?[0-9])(?[0-9]*)(?.+)"); + + private final String packetSeparator = new String(new byte[] { 0x1e }); + + private Logger logger = LoggerFactory.getLogger(HttpTransportTest.class); + + @Before + public void createTestServer() { + final int port = findFreePort(); + final Configuration config = new Configuration(); + config.setRandomSession(true); + config.setTransports(Transport.POLLING); + config.setPort(port); + config.setExceptionListener(new ExceptionListener() { + @Override + public void onEventException(Exception e, List args, SocketIOClient client) { + logger.error("eventException", e); + } + + @Override + public void onDisconnectException(Exception e, SocketIOClient client) { + logger.error("disconnectException", e); + } + + @Override + public void onConnectException(Exception e, SocketIOClient client) { + logger.error("connectException", e); + } + + @Override + public void onPingException(Exception e, SocketIOClient client) { + logger.error("pingException", e); + } + + @Override + public void onPongException(Exception e, SocketIOClient client) { + logger.error("pongException", e); + } + + @Override + public boolean exceptionCaught(ChannelHandlerContext ctx, Throwable e) throws Exception { + return false; + } + + @Override + public void onAuthException(Throwable e, SocketIOClient client) { + logger.error("authException", e); + } + }); + + final SocketConfig socketConfig = new SocketConfig(); + socketConfig.setReuseAddress(true); + config.setSocketConfig(socketConfig); + + this.server = new SocketIOServer(config); + this.server.start(); + } + + @After + public void cleanupTestServer() { + this.server.stop(); + } + + private URI createTestServerUri(final String query) throws URISyntaxException { + return new URI("http", null , "localhost", server.getConfiguration().getPort(), server.getConfiguration().getContext() + "/", + query, null); + } + + private HttpResponse makeSocketIoRequest(final String sessionId, final String bodyForPost) + throws URISyntaxException, IOException, InterruptedException { + final URI uri = createTestServerUri("EIO=4&transport=polling&t=Oqd9eWh" + (sessionId == null ? "" : "&sid=" + sessionId)); + HttpClient client = HttpClient.newHttpClient(); + final var builder = HttpRequest.newBuilder() + .uri(uri); + if (bodyForPost != null) { + builder.POST(BodyPublishers.ofString(bodyForPost)); + } else { + builder.GET(); + } + return client.send(builder.build(), BodyHandlers.ofString()); + } + + private void postMessage(final String sessionId, final String body) + throws URISyntaxException, IOException, InterruptedException { + HttpResponse response = makeSocketIoRequest(sessionId, body); + final String responseStr = response.body(); + Assert.assertEquals(responseStr, "ok"); + } + + private String[] pollForListOfResponses(final String sessionId) + throws URISyntaxException, IOException, InterruptedException { + HttpResponse response = makeSocketIoRequest(sessionId, null); + final String responseStr = response.body(); + return responseStr.split(packetSeparator); + } + + private String connectForSessionId(final String sessionId) + throws URISyntaxException, IOException, InterruptedException { + final String firstMessage = pollForListOfResponses(sessionId)[0]; + final Matcher jsonMatcher = responseJsonMatcher.matcher(firstMessage); + Assert.assertTrue(jsonMatcher.find()); + Assert.assertEquals(jsonMatcher.group(1), "0"); + final JsonNode node = mapper.readTree(jsonMatcher.group(2)); + return node.get("sid").asText(); + } + + @Test + public void testConnect() throws URISyntaxException, IOException, InterruptedException { + final String sessionId = connectForSessionId(null); + Assert.assertNotNull(sessionId); + } + + @Test + public void testMultipleMessages() throws URISyntaxException, IOException, InterruptedException { + server.addEventListener("hello", String.class, (client, data, ackSender) -> + ackSender.sendAckData(data)); + final String sessionId = connectForSessionId(null); + final ArrayList events = new ArrayList<>(); + events.add("420[\"hello\", \"world\"]"); + events.add("421[\"hello\", \"socketio\"]"); + postMessage(sessionId, events.stream().collect(Collectors.joining(packetSeparator))); + final String[] responses = pollForListOfResponses(sessionId); + Assert.assertEquals(responses.length, 2); + } + + /** + * Returns a free port number on localhost. + * + * Heavily inspired from org.eclipse.jdt.launching.SocketUtil (to avoid a dependency to JDT just because of this). + * Slightly improved with close() missing in JDT. And throws exception instead of returning -1. + * + * @return a free port number on localhost + * @throws IllegalStateException if unable to find a free port + */ + private static int findFreePort() { + ServerSocket socket = null; + try { + socket = new ServerSocket(0); + socket.setReuseAddress(true); + return socket.getLocalPort(); + } catch (IOException ignored) { + } finally { + if (socket != null) { + try { + socket.close(); + } catch (IOException ignored) { + } + } + } + throw new IllegalStateException("Could not find a free TCP/IP port to start embedded SocketIO Server on"); + } + +} From 71755eed53211c496a56404940d4c59d24281a71 Mon Sep 17 00:00:00 2001 From: Kevin Read Date: Wed, 24 Jan 2024 13:46:38 +0100 Subject: [PATCH 2/9] Implement v3/v4 parsing of multiple messages in one HTTP polling body. --- .../socketio/handler/InPacketHandler.java | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/corundumstudio/socketio/handler/InPacketHandler.java b/src/main/java/com/corundumstudio/socketio/handler/InPacketHandler.java index 5e850315..7cd4cdd7 100644 --- a/src/main/java/com/corundumstudio/socketio/handler/InPacketHandler.java +++ b/src/main/java/com/corundumstudio/socketio/handler/InPacketHandler.java @@ -63,7 +63,16 @@ protected void channelRead0(io.netty.channel.ChannelHandlerContext ctx, PacketsM } while (content.isReadable()) { try { - Packet packet = decoder.decodePackets(content, client); + final ByteBuf packetBuf; + final int separatorPos = content.bytesBefore((byte) 0x1E); + if (separatorPos != -1) { + // Multiple packets in one, copy out the next packet to parse + packetBuf = content.copy(content.readerIndex(), separatorPos); + content.readerIndex(separatorPos + 1); + } else { + packetBuf = content; + } + Packet packet = decoder.decodePackets(packetBuf, client); Namespace ns = namespacesHub.get(packet.getNsp()); if (ns == null) { From 314cd71d9a1ea59261623769a16a36628284c804 Mon Sep 17 00:00:00 2001 From: Kevin Read Date: Wed, 24 Jan 2024 15:02:13 +0100 Subject: [PATCH 3/9] Move packet splitting from InPacketHandler to PacketDecoder to not mess with binary packets --- .../socketio/handler/InPacketHandler.java | 11 +-------- .../socketio/protocol/PacketDecoder.java | 23 ++++++++++++++----- 2 files changed, 18 insertions(+), 16 deletions(-) diff --git a/src/main/java/com/corundumstudio/socketio/handler/InPacketHandler.java b/src/main/java/com/corundumstudio/socketio/handler/InPacketHandler.java index 7cd4cdd7..5e850315 100644 --- a/src/main/java/com/corundumstudio/socketio/handler/InPacketHandler.java +++ b/src/main/java/com/corundumstudio/socketio/handler/InPacketHandler.java @@ -63,16 +63,7 @@ protected void channelRead0(io.netty.channel.ChannelHandlerContext ctx, PacketsM } while (content.isReadable()) { try { - final ByteBuf packetBuf; - final int separatorPos = content.bytesBefore((byte) 0x1E); - if (separatorPos != -1) { - // Multiple packets in one, copy out the next packet to parse - packetBuf = content.copy(content.readerIndex(), separatorPos); - content.readerIndex(separatorPos + 1); - } else { - packetBuf = content; - } - Packet packet = decoder.decodePackets(packetBuf, client); + Packet packet = decoder.decodePackets(content, client); Namespace ns = namespacesHub.get(packet.getNsp()); if (ns == null) { diff --git a/src/main/java/com/corundumstudio/socketio/protocol/PacketDecoder.java b/src/main/java/com/corundumstudio/socketio/protocol/PacketDecoder.java index 8930f18b..cc00ae0f 100644 --- a/src/main/java/com/corundumstudio/socketio/protocol/PacketDecoder.java +++ b/src/main/java/com/corundumstudio/socketio/protocol/PacketDecoder.java @@ -147,23 +147,34 @@ private Packet decode(ClientHead head, ByteBuf frame) throws IOException { || frame.getByte(0) == 4 || frame.getByte(0) == 1) { return parseBinary(head, frame); } - PacketType type = readType(frame); + + final int separatorPos = frame.bytesBefore((byte) 0x1E); + final ByteBuf packetBuf; + if (separatorPos != -1) { + // Multiple packets in one, copy out the next packet to parse + packetBuf = frame.copy(frame.readerIndex(), separatorPos); + frame.readerIndex(separatorPos + 1); + } else { + packetBuf = frame; + } + + PacketType type = readType(packetBuf); Packet packet = new Packet(type, head.getEngineIOVersion()); if (type == PacketType.PING) { - packet.setData(readString(frame)); + packet.setData(readString(packetBuf)); return packet; } - if (!frame.isReadable()) { + if (!packetBuf.isReadable()) { return packet; } - PacketType innerType = readInnerType(frame); + PacketType innerType = readInnerType(packetBuf); packet.setSubType(innerType); - parseHeader(frame, packet, innerType); - parseBody(head, frame, packet); + parseHeader(packetBuf, packet, innerType); + parseBody(head, packetBuf, packet); return packet; } From 6a763ec32703c616563d9dd3013956eab35bebdc Mon Sep 17 00:00:00 2001 From: Kevin Read Date: Wed, 24 Jan 2024 15:03:29 +0100 Subject: [PATCH 4/9] Netty version bump from master --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 7e0a9730..c4ae2f47 100644 --- a/pom.xml +++ b/pom.xml @@ -49,7 +49,7 @@ true UTF-8 - 4.1.100.Final + 4.1.105.Final 11 From 084ae1d62f8ca8027f647c3d032bff355599a895 Mon Sep 17 00:00:00 2001 From: Kevin Read Date: Wed, 24 Jan 2024 15:37:05 +0100 Subject: [PATCH 5/9] Fix: wrong assumption about buffer movement when splitting packets --- .../com/corundumstudio/socketio/protocol/PacketDecoder.java | 4 ++-- .../corundumstudio/socketio/transport/HttpTransportTest.java | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/corundumstudio/socketio/protocol/PacketDecoder.java b/src/main/java/com/corundumstudio/socketio/protocol/PacketDecoder.java index cc00ae0f..73921d26 100644 --- a/src/main/java/com/corundumstudio/socketio/protocol/PacketDecoder.java +++ b/src/main/java/com/corundumstudio/socketio/protocol/PacketDecoder.java @@ -150,10 +150,10 @@ private Packet decode(ClientHead head, ByteBuf frame) throws IOException { final int separatorPos = frame.bytesBefore((byte) 0x1E); final ByteBuf packetBuf; - if (separatorPos != -1) { + if (separatorPos > 0) { // Multiple packets in one, copy out the next packet to parse packetBuf = frame.copy(frame.readerIndex(), separatorPos); - frame.readerIndex(separatorPos + 1); + frame.readerIndex(frame.readerIndex() + separatorPos + 1); } else { packetBuf = frame; } diff --git a/src/test/java/com/corundumstudio/socketio/transport/HttpTransportTest.java b/src/test/java/com/corundumstudio/socketio/transport/HttpTransportTest.java index 7b1d1e64..44261559 100644 --- a/src/test/java/com/corundumstudio/socketio/transport/HttpTransportTest.java +++ b/src/test/java/com/corundumstudio/socketio/transport/HttpTransportTest.java @@ -174,9 +174,10 @@ public void testMultipleMessages() throws URISyntaxException, IOException, Inter final ArrayList events = new ArrayList<>(); events.add("420[\"hello\", \"world\"]"); events.add("421[\"hello\", \"socketio\"]"); + events.add("422[\"hello\", \"socketio\"]"); postMessage(sessionId, events.stream().collect(Collectors.joining(packetSeparator))); final String[] responses = pollForListOfResponses(sessionId); - Assert.assertEquals(responses.length, 2); + Assert.assertEquals(responses.length, 3); } /** From 953b031fafc54248ab147d7c05164aea5fadebf9 Mon Sep 17 00:00:00 2001 From: Kevin Read Date: Thu, 25 Jan 2024 10:52:03 +0100 Subject: [PATCH 6/9] Refactor test to use java 8. Run tests with Java 8. --- pom.xml | 10 ----- .../socketio/transport/HttpTransportTest.java | 45 ++++++++++++------- 2 files changed, 29 insertions(+), 26 deletions(-) diff --git a/pom.xml b/pom.xml index 4730c963..bd016ea5 100644 --- a/pom.xml +++ b/pom.xml @@ -50,7 +50,6 @@ true UTF-8 4.1.105.Final - 11 @@ -374,19 +373,12 @@ - default-testCompile process-test-sources testCompile - - ${testCompileSource} - ${testCompileSource} - ${testCompileSource} - --add-modules=java.net.http - @@ -417,8 +409,6 @@ -javaagent:"${settings.localRepository}"/org/jmockit/jmockit/1.46/jmockit-1.46.jar - --add-reads netty.socketio=java.net.http - --add-modules=java.net.http diff --git a/src/test/java/com/corundumstudio/socketio/transport/HttpTransportTest.java b/src/test/java/com/corundumstudio/socketio/transport/HttpTransportTest.java index 44261559..0e1f60b6 100644 --- a/src/test/java/com/corundumstudio/socketio/transport/HttpTransportTest.java +++ b/src/test/java/com/corundumstudio/socketio/transport/HttpTransportTest.java @@ -25,15 +25,16 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import io.netty.channel.ChannelHandlerContext; +import java.io.BufferedReader; import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.net.HttpURLConnection; import java.net.ServerSocket; import java.net.URI; import java.net.URISyntaxException; -import java.net.http.HttpClient; -import java.net.http.HttpRequest; -import java.net.http.HttpRequest.BodyPublishers; -import java.net.http.HttpResponse; -import java.net.http.HttpResponse.BodyHandlers; +import java.net.URLConnection; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; import java.util.regex.Matcher; @@ -122,31 +123,43 @@ private URI createTestServerUri(final String query) throws URISyntaxException { query, null); } - private HttpResponse makeSocketIoRequest(final String sessionId, final String bodyForPost) + private String makeSocketIoRequest(final String sessionId, final String bodyForPost) throws URISyntaxException, IOException, InterruptedException { final URI uri = createTestServerUri("EIO=4&transport=polling&t=Oqd9eWh" + (sessionId == null ? "" : "&sid=" + sessionId)); - HttpClient client = HttpClient.newHttpClient(); - final var builder = HttpRequest.newBuilder() - .uri(uri); + + URLConnection con = uri.toURL().openConnection(); + HttpURLConnection http = (HttpURLConnection)con; + if (bodyForPost != null) { + http.setRequestMethod("POST"); // PUT is another valid option + http.setDoOutput(true); + } + if (bodyForPost != null) { - builder.POST(BodyPublishers.ofString(bodyForPost)); + byte[] out = bodyForPost.toString().getBytes(StandardCharsets.UTF_8); + http.setFixedLengthStreamingMode(out.length); + http.setRequestProperty("Content-Type", "application/x-www-form-urlencoded; charset=UTF-8"); + http.connect(); + try (OutputStream os = http.getOutputStream()) { + os.write(out); + } } else { - builder.GET(); + http.connect(); + } + + try (BufferedReader reader = new BufferedReader(new InputStreamReader(http.getInputStream(), StandardCharsets.UTF_8))) { + return reader.lines().collect(Collectors.joining("\n")); } - return client.send(builder.build(), BodyHandlers.ofString()); } private void postMessage(final String sessionId, final String body) throws URISyntaxException, IOException, InterruptedException { - HttpResponse response = makeSocketIoRequest(sessionId, body); - final String responseStr = response.body(); + final String responseStr = makeSocketIoRequest(sessionId, body); Assert.assertEquals(responseStr, "ok"); } private String[] pollForListOfResponses(final String sessionId) throws URISyntaxException, IOException, InterruptedException { - HttpResponse response = makeSocketIoRequest(sessionId, null); - final String responseStr = response.body(); + final String responseStr = makeSocketIoRequest(sessionId, null); return responseStr.split(packetSeparator); } From 5dc2deffbb2dcd16a3f92ce8fac7663be25723dc Mon Sep 17 00:00:00 2001 From: Kevin Read Date: Thu, 25 Jan 2024 10:57:01 +0100 Subject: [PATCH 7/9] Make maven less verbose in CI and use batch mode --- .github/workflows/build.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 2319b585..d426d58c 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -22,4 +22,5 @@ jobs: cache: 'maven' - name: 'Build Project' run: | - mvn test + export MAVEN_OPTS="-Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=WARN" + mvn --batch-mode --errors --fail-at-end test From 3b8cf376875a4c7dfb65971883017b49a18d489e Mon Sep 17 00:00:00 2001 From: Kevin Read Date: Sat, 27 Jan 2024 18:22:20 +0100 Subject: [PATCH 8/9] Do not run tests with JDK 8. Explain about building with Java 11+ but JAR compatible with Java 8. --- .github/workflows/build-pr.yml | 1 - README.md | 2 ++ 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/build-pr.yml b/.github/workflows/build-pr.yml index bae32c8e..9379c126 100644 --- a/.github/workflows/build-pr.yml +++ b/.github/workflows/build-pr.yml @@ -19,7 +19,6 @@ jobs: strategy: matrix: java-version: - - 8 - 11 - 17 - 21 diff --git a/README.md b/README.md index 6e8ebe2a..5bfd5ecc 100644 --- a/README.md +++ b/README.md @@ -17,9 +17,11 @@ Features * Supports distributed broadcast across netty-socketio nodes ([Redisson](https://redisson.org), [Hazelcast](https://www.hazelcast.com/)) * Supports OSGi * Supports Spring +* Contains Java module info for JPMS. * Lock-free and thread-safe implementation * Declarative handler configuration via annotations +JAR is compatible with Java 8 but needs Java 11+ for building the module-info. Performance ================================ From 4c9998d5dd0ed5ec54d71d819ed042d8b56cb9b5 Mon Sep 17 00:00:00 2001 From: Kevin Read Date: Tue, 30 Jan 2024 10:18:50 +0100 Subject: [PATCH 9/9] Do not run tests with JDK 11 --- .github/workflows/build-pr.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.github/workflows/build-pr.yml b/.github/workflows/build-pr.yml index 9379c126..dee8040a 100644 --- a/.github/workflows/build-pr.yml +++ b/.github/workflows/build-pr.yml @@ -19,7 +19,6 @@ jobs: strategy: matrix: java-version: - - 11 - 17 - 21 uses: ./.github/workflows/build.yml