diff --git a/.github/workflows/build-pr.yml b/.github/workflows/build-pr.yml index bae32c8e..dee8040a 100644 --- a/.github/workflows/build-pr.yml +++ b/.github/workflows/build-pr.yml @@ -19,8 +19,6 @@ jobs: strategy: matrix: java-version: - - 8 - - 11 - 17 - 21 uses: ./.github/workflows/build.yml diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 2319b585..d426d58c 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -22,4 +22,5 @@ jobs: cache: 'maven' - name: 'Build Project' run: | - mvn test + export MAVEN_OPTS="-Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=WARN" + mvn --batch-mode --errors --fail-at-end test diff --git a/README.md b/README.md index 6e8ebe2a..5bfd5ecc 100644 --- a/README.md +++ b/README.md @@ -17,9 +17,11 @@ Features * Supports distributed broadcast across netty-socketio nodes ([Redisson](https://redisson.org), [Hazelcast](https://www.hazelcast.com/)) * Supports OSGi * Supports Spring +* Contains Java module info for JPMS. * Lock-free and thread-safe implementation * Declarative handler configuration via annotations +JAR is compatible with Java 8 but needs Java 11+ for building the module-info. Performance ================================ diff --git a/pom.xml b/pom.xml index deda841d..bd016ea5 100644 --- a/pom.xml +++ b/pom.xml @@ -355,7 +355,7 @@ default-compile - 9 + 11 @@ -373,6 +373,13 @@ + + default-testCompile + process-test-sources + + testCompile + + @@ -398,7 +405,7 @@ org.apache.maven.plugins maven-surefire-plugin - 2.22.2 + 3.2.5 -javaagent:"${settings.localRepository}"/org/jmockit/jmockit/1.46/jmockit-1.46.jar diff --git a/src/main/java/com/corundumstudio/socketio/protocol/PacketDecoder.java b/src/main/java/com/corundumstudio/socketio/protocol/PacketDecoder.java index 8930f18b..73921d26 100644 --- a/src/main/java/com/corundumstudio/socketio/protocol/PacketDecoder.java +++ b/src/main/java/com/corundumstudio/socketio/protocol/PacketDecoder.java @@ -147,23 +147,34 @@ private Packet decode(ClientHead head, ByteBuf frame) throws IOException { || frame.getByte(0) == 4 || frame.getByte(0) == 1) { return parseBinary(head, frame); } - PacketType type = readType(frame); + + final int separatorPos = frame.bytesBefore((byte) 0x1E); + final ByteBuf packetBuf; + if (separatorPos > 0) { + // Multiple packets in one, copy out the next packet to parse + packetBuf = frame.copy(frame.readerIndex(), separatorPos); + frame.readerIndex(frame.readerIndex() + separatorPos + 1); + } else { + packetBuf = frame; + } + + PacketType type = readType(packetBuf); Packet packet = new Packet(type, head.getEngineIOVersion()); if (type == PacketType.PING) { - packet.setData(readString(frame)); + packet.setData(readString(packetBuf)); return packet; } - if (!frame.isReadable()) { + if (!packetBuf.isReadable()) { return packet; } - PacketType innerType = readInnerType(frame); + PacketType innerType = readInnerType(packetBuf); packet.setSubType(innerType); - parseHeader(frame, packet, innerType); - parseBody(head, frame, packet); + parseHeader(packetBuf, packet, innerType); + parseBody(head, packetBuf, packet); return packet; } diff --git a/src/test/java/com/corundumstudio/socketio/transport/HttpTransportTest.java b/src/test/java/com/corundumstudio/socketio/transport/HttpTransportTest.java new file mode 100644 index 00000000..0e1f60b6 --- /dev/null +++ b/src/test/java/com/corundumstudio/socketio/transport/HttpTransportTest.java @@ -0,0 +1,223 @@ +/** + * Copyright (c) 2012-2023 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.corundumstudio.socketio.transport; + +import com.corundumstudio.socketio.Configuration; +import com.corundumstudio.socketio.SocketConfig; +import com.corundumstudio.socketio.SocketIOClient; +import com.corundumstudio.socketio.SocketIOServer; +import com.corundumstudio.socketio.Transport; +import com.corundumstudio.socketio.listener.ExceptionListener; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.netty.channel.ChannelHandlerContext; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.net.HttpURLConnection; +import java.net.ServerSocket; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URLConnection; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class HttpTransportTest { + + private SocketIOServer server; + + private ObjectMapper mapper = new ObjectMapper(); + + private Pattern responseJsonMatcher = Pattern.compile("([0-9]+)(\\{.*\\})?"); + + private Pattern multiResponsePattern = Pattern.compile("((?[0-9])(?[0-9]*)(?.+)\\x{1E})*(?[0-9])(?[0-9]*)(?.+)"); + + private final String packetSeparator = new String(new byte[] { 0x1e }); + + private Logger logger = LoggerFactory.getLogger(HttpTransportTest.class); + + @Before + public void createTestServer() { + final int port = findFreePort(); + final Configuration config = new Configuration(); + config.setRandomSession(true); + config.setTransports(Transport.POLLING); + config.setPort(port); + config.setExceptionListener(new ExceptionListener() { + @Override + public void onEventException(Exception e, List args, SocketIOClient client) { + logger.error("eventException", e); + } + + @Override + public void onDisconnectException(Exception e, SocketIOClient client) { + logger.error("disconnectException", e); + } + + @Override + public void onConnectException(Exception e, SocketIOClient client) { + logger.error("connectException", e); + } + + @Override + public void onPingException(Exception e, SocketIOClient client) { + logger.error("pingException", e); + } + + @Override + public void onPongException(Exception e, SocketIOClient client) { + logger.error("pongException", e); + } + + @Override + public boolean exceptionCaught(ChannelHandlerContext ctx, Throwable e) throws Exception { + return false; + } + + @Override + public void onAuthException(Throwable e, SocketIOClient client) { + logger.error("authException", e); + } + }); + + final SocketConfig socketConfig = new SocketConfig(); + socketConfig.setReuseAddress(true); + config.setSocketConfig(socketConfig); + + this.server = new SocketIOServer(config); + this.server.start(); + } + + @After + public void cleanupTestServer() { + this.server.stop(); + } + + private URI createTestServerUri(final String query) throws URISyntaxException { + return new URI("http", null , "localhost", server.getConfiguration().getPort(), server.getConfiguration().getContext() + "/", + query, null); + } + + private String makeSocketIoRequest(final String sessionId, final String bodyForPost) + throws URISyntaxException, IOException, InterruptedException { + final URI uri = createTestServerUri("EIO=4&transport=polling&t=Oqd9eWh" + (sessionId == null ? "" : "&sid=" + sessionId)); + + URLConnection con = uri.toURL().openConnection(); + HttpURLConnection http = (HttpURLConnection)con; + if (bodyForPost != null) { + http.setRequestMethod("POST"); // PUT is another valid option + http.setDoOutput(true); + } + + if (bodyForPost != null) { + byte[] out = bodyForPost.toString().getBytes(StandardCharsets.UTF_8); + http.setFixedLengthStreamingMode(out.length); + http.setRequestProperty("Content-Type", "application/x-www-form-urlencoded; charset=UTF-8"); + http.connect(); + try (OutputStream os = http.getOutputStream()) { + os.write(out); + } + } else { + http.connect(); + } + + try (BufferedReader reader = new BufferedReader(new InputStreamReader(http.getInputStream(), StandardCharsets.UTF_8))) { + return reader.lines().collect(Collectors.joining("\n")); + } + } + + private void postMessage(final String sessionId, final String body) + throws URISyntaxException, IOException, InterruptedException { + final String responseStr = makeSocketIoRequest(sessionId, body); + Assert.assertEquals(responseStr, "ok"); + } + + private String[] pollForListOfResponses(final String sessionId) + throws URISyntaxException, IOException, InterruptedException { + final String responseStr = makeSocketIoRequest(sessionId, null); + return responseStr.split(packetSeparator); + } + + private String connectForSessionId(final String sessionId) + throws URISyntaxException, IOException, InterruptedException { + final String firstMessage = pollForListOfResponses(sessionId)[0]; + final Matcher jsonMatcher = responseJsonMatcher.matcher(firstMessage); + Assert.assertTrue(jsonMatcher.find()); + Assert.assertEquals(jsonMatcher.group(1), "0"); + final JsonNode node = mapper.readTree(jsonMatcher.group(2)); + return node.get("sid").asText(); + } + + @Test + public void testConnect() throws URISyntaxException, IOException, InterruptedException { + final String sessionId = connectForSessionId(null); + Assert.assertNotNull(sessionId); + } + + @Test + public void testMultipleMessages() throws URISyntaxException, IOException, InterruptedException { + server.addEventListener("hello", String.class, (client, data, ackSender) -> + ackSender.sendAckData(data)); + final String sessionId = connectForSessionId(null); + final ArrayList events = new ArrayList<>(); + events.add("420[\"hello\", \"world\"]"); + events.add("421[\"hello\", \"socketio\"]"); + events.add("422[\"hello\", \"socketio\"]"); + postMessage(sessionId, events.stream().collect(Collectors.joining(packetSeparator))); + final String[] responses = pollForListOfResponses(sessionId); + Assert.assertEquals(responses.length, 3); + } + + /** + * Returns a free port number on localhost. + * + * Heavily inspired from org.eclipse.jdt.launching.SocketUtil (to avoid a dependency to JDT just because of this). + * Slightly improved with close() missing in JDT. And throws exception instead of returning -1. + * + * @return a free port number on localhost + * @throws IllegalStateException if unable to find a free port + */ + private static int findFreePort() { + ServerSocket socket = null; + try { + socket = new ServerSocket(0); + socket.setReuseAddress(true); + return socket.getLocalPort(); + } catch (IOException ignored) { + } finally { + if (socket != null) { + try { + socket.close(); + } catch (IOException ignored) { + } + } + } + throw new IllegalStateException("Could not find a free TCP/IP port to start embedded SocketIO Server on"); + } + +}