Skip to content

Commit

Permalink
Merge pull request #959 from unverbraucht/fix/v4-multiple-frames-in-http
Browse files Browse the repository at this point in the history
v4 multiple frames in polling body
  • Loading branch information
mrniko authored Feb 26, 2024
2 parents 59233ea + 4c9998d commit 57f3de3
Show file tree
Hide file tree
Showing 6 changed files with 253 additions and 11 deletions.
2 changes: 0 additions & 2 deletions .github/workflows/build-pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ jobs:
strategy:
matrix:
java-version:
- 8
- 11
- 17
- 21
uses: ./.github/workflows/build.yml
Expand Down
3 changes: 2 additions & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@ jobs:
cache: 'maven'
- name: 'Build Project'
run: |
mvn test
export MAVEN_OPTS="-Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=WARN"
mvn --batch-mode --errors --fail-at-end test
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ Features
* Supports distributed broadcast across netty-socketio nodes ([Redisson](https://redisson.org), [Hazelcast](https://www.hazelcast.com/))
* Supports OSGi
* Supports Spring
* Contains Java module info for JPMS.
* Lock-free and thread-safe implementation
* Declarative handler configuration via annotations

JAR is compatible with Java 8 but needs Java 11+ for building the module-info.

Performance
================================
Expand Down
11 changes: 9 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@
<execution>
<id>default-compile</id>
<configuration>
<release>9</release>
<release>11</release>
<!-- no excludes: compile everything to ensure module-info contains right entries -->
</configuration>
</execution>
Expand All @@ -373,6 +373,13 @@
</excludes>
</configuration>
</execution>
<execution>
<id>default-testCompile</id>
<phase>process-test-sources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<!-- defaults for compile and testCompile -->
<configuration>
Expand All @@ -398,7 +405,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.2</version>
<version>3.2.5</version>
<configuration>
<argLine>
-javaagent:"${settings.localRepository}"/org/jmockit/jmockit/1.46/jmockit-1.46.jar
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,23 +147,34 @@ private Packet decode(ClientHead head, ByteBuf frame) throws IOException {
|| frame.getByte(0) == 4 || frame.getByte(0) == 1) {
return parseBinary(head, frame);
}
PacketType type = readType(frame);

final int separatorPos = frame.bytesBefore((byte) 0x1E);
final ByteBuf packetBuf;
if (separatorPos > 0) {
// Multiple packets in one, copy out the next packet to parse
packetBuf = frame.copy(frame.readerIndex(), separatorPos);
frame.readerIndex(frame.readerIndex() + separatorPos + 1);
} else {
packetBuf = frame;
}

PacketType type = readType(packetBuf);
Packet packet = new Packet(type, head.getEngineIOVersion());

if (type == PacketType.PING) {
packet.setData(readString(frame));
packet.setData(readString(packetBuf));
return packet;
}

if (!frame.isReadable()) {
if (!packetBuf.isReadable()) {
return packet;
}

PacketType innerType = readInnerType(frame);
PacketType innerType = readInnerType(packetBuf);
packet.setSubType(innerType);

parseHeader(frame, packet, innerType);
parseBody(head, frame, packet);
parseHeader(packetBuf, packet, innerType);
parseBody(head, packetBuf, packet);
return packet;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
/**
* Copyright (c) 2012-2023 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.corundumstudio.socketio.transport;

import com.corundumstudio.socketio.Configuration;
import com.corundumstudio.socketio.SocketConfig;
import com.corundumstudio.socketio.SocketIOClient;
import com.corundumstudio.socketio.SocketIOServer;
import com.corundumstudio.socketio.Transport;
import com.corundumstudio.socketio.listener.ExceptionListener;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.channel.ChannelHandlerContext;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.ServerSocket;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URLConnection;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HttpTransportTest {

private SocketIOServer server;

private ObjectMapper mapper = new ObjectMapper();

private Pattern responseJsonMatcher = Pattern.compile("([0-9]+)(\\{.*\\})?");

private Pattern multiResponsePattern = Pattern.compile("((?<type>[0-9])(?<id>[0-9]*)(?<body>.+)\\x{1E})*(?<lasttype>[0-9])(?<lastid>[0-9]*)(?<lastbody>.+)");

private final String packetSeparator = new String(new byte[] { 0x1e });

private Logger logger = LoggerFactory.getLogger(HttpTransportTest.class);

@Before
public void createTestServer() {
final int port = findFreePort();
final Configuration config = new Configuration();
config.setRandomSession(true);
config.setTransports(Transport.POLLING);
config.setPort(port);
config.setExceptionListener(new ExceptionListener() {
@Override
public void onEventException(Exception e, List<Object> args, SocketIOClient client) {
logger.error("eventException", e);
}

@Override
public void onDisconnectException(Exception e, SocketIOClient client) {
logger.error("disconnectException", e);
}

@Override
public void onConnectException(Exception e, SocketIOClient client) {
logger.error("connectException", e);
}

@Override
public void onPingException(Exception e, SocketIOClient client) {
logger.error("pingException", e);
}

@Override
public void onPongException(Exception e, SocketIOClient client) {
logger.error("pongException", e);
}

@Override
public boolean exceptionCaught(ChannelHandlerContext ctx, Throwable e) throws Exception {
return false;
}

@Override
public void onAuthException(Throwable e, SocketIOClient client) {
logger.error("authException", e);
}
});

final SocketConfig socketConfig = new SocketConfig();
socketConfig.setReuseAddress(true);
config.setSocketConfig(socketConfig);

this.server = new SocketIOServer(config);
this.server.start();
}

@After
public void cleanupTestServer() {
this.server.stop();
}

private URI createTestServerUri(final String query) throws URISyntaxException {
return new URI("http", null , "localhost", server.getConfiguration().getPort(), server.getConfiguration().getContext() + "/",
query, null);
}

private String makeSocketIoRequest(final String sessionId, final String bodyForPost)
throws URISyntaxException, IOException, InterruptedException {
final URI uri = createTestServerUri("EIO=4&transport=polling&t=Oqd9eWh" + (sessionId == null ? "" : "&sid=" + sessionId));

URLConnection con = uri.toURL().openConnection();
HttpURLConnection http = (HttpURLConnection)con;
if (bodyForPost != null) {
http.setRequestMethod("POST"); // PUT is another valid option
http.setDoOutput(true);
}

if (bodyForPost != null) {
byte[] out = bodyForPost.toString().getBytes(StandardCharsets.UTF_8);
http.setFixedLengthStreamingMode(out.length);
http.setRequestProperty("Content-Type", "application/x-www-form-urlencoded; charset=UTF-8");
http.connect();
try (OutputStream os = http.getOutputStream()) {
os.write(out);
}
} else {
http.connect();
}

try (BufferedReader reader = new BufferedReader(new InputStreamReader(http.getInputStream(), StandardCharsets.UTF_8))) {
return reader.lines().collect(Collectors.joining("\n"));
}
}

private void postMessage(final String sessionId, final String body)
throws URISyntaxException, IOException, InterruptedException {
final String responseStr = makeSocketIoRequest(sessionId, body);
Assert.assertEquals(responseStr, "ok");
}

private String[] pollForListOfResponses(final String sessionId)
throws URISyntaxException, IOException, InterruptedException {
final String responseStr = makeSocketIoRequest(sessionId, null);
return responseStr.split(packetSeparator);
}

private String connectForSessionId(final String sessionId)
throws URISyntaxException, IOException, InterruptedException {
final String firstMessage = pollForListOfResponses(sessionId)[0];
final Matcher jsonMatcher = responseJsonMatcher.matcher(firstMessage);
Assert.assertTrue(jsonMatcher.find());
Assert.assertEquals(jsonMatcher.group(1), "0");
final JsonNode node = mapper.readTree(jsonMatcher.group(2));
return node.get("sid").asText();
}

@Test
public void testConnect() throws URISyntaxException, IOException, InterruptedException {
final String sessionId = connectForSessionId(null);
Assert.assertNotNull(sessionId);
}

@Test
public void testMultipleMessages() throws URISyntaxException, IOException, InterruptedException {
server.addEventListener("hello", String.class, (client, data, ackSender) ->
ackSender.sendAckData(data));
final String sessionId = connectForSessionId(null);
final ArrayList<String> events = new ArrayList<>();
events.add("420[\"hello\", \"world\"]");
events.add("421[\"hello\", \"socketio\"]");
events.add("422[\"hello\", \"socketio\"]");
postMessage(sessionId, events.stream().collect(Collectors.joining(packetSeparator)));
final String[] responses = pollForListOfResponses(sessionId);
Assert.assertEquals(responses.length, 3);
}

/**
* Returns a free port number on localhost.
*
* Heavily inspired from org.eclipse.jdt.launching.SocketUtil (to avoid a dependency to JDT just because of this).
* Slightly improved with close() missing in JDT. And throws exception instead of returning -1.
*
* @return a free port number on localhost
* @throws IllegalStateException if unable to find a free port
*/
private static int findFreePort() {
ServerSocket socket = null;
try {
socket = new ServerSocket(0);
socket.setReuseAddress(true);
return socket.getLocalPort();
} catch (IOException ignored) {
} finally {
if (socket != null) {
try {
socket.close();
} catch (IOException ignored) {
}
}
}
throw new IllegalStateException("Could not find a free TCP/IP port to start embedded SocketIO Server on");
}

}

0 comments on commit 57f3de3

Please sign in to comment.