forked from real-logic/artio
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Issue real-logic#503: SessionProxy.sendSequenceReset() not called
The test also tries to reproduce an inversion of SendSequenceReset and ResendRequest messages, but it is not reproduced (it happens only when a SessionProxy causes the ResendRequest to be sent asynchronously).
- Loading branch information
Showing
4 changed files
with
423 additions
and
0 deletions.
There are no files selected for viewing
172 changes: 172 additions & 0 deletions
172
...o-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/RaceResendResetTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,172 @@ | ||
package uk.co.real_logic.artio.system_tests; | ||
|
||
import org.agrona.ErrorHandler; | ||
import org.agrona.concurrent.EpochNanoClock; | ||
import org.junit.Test; | ||
import uk.co.real_logic.artio.dictionary.generation.Exceptions; | ||
import uk.co.real_logic.artio.engine.EngineConfiguration; | ||
import uk.co.real_logic.artio.engine.FixEngine; | ||
import uk.co.real_logic.artio.fields.EpochFractionFormat; | ||
import uk.co.real_logic.artio.library.LibraryConfiguration; | ||
import uk.co.real_logic.artio.protocol.GatewayPublication; | ||
import uk.co.real_logic.artio.session.DirectSessionProxy; | ||
import uk.co.real_logic.artio.session.SessionCustomisationStrategy; | ||
import uk.co.real_logic.artio.session.SessionIdStrategy; | ||
import uk.co.real_logic.artio.session.SessionProxy; | ||
import uk.co.real_logic.artio.util.DebugFIXClient; | ||
import uk.co.real_logic.artio.util.DebugServer; | ||
|
||
import java.io.IOException; | ||
|
||
import static org.junit.Assert.assertEquals; | ||
import static org.junit.Assert.assertTrue; | ||
import static uk.co.real_logic.artio.TestFixtures.launchMediaDriver; | ||
import static uk.co.real_logic.artio.messages.InitialAcceptedSessionOwner.SOLE_LIBRARY; | ||
import static uk.co.real_logic.artio.system_tests.SystemTestUtil.ACCEPTOR_ID; | ||
import static uk.co.real_logic.artio.system_tests.SystemTestUtil.INITIATOR_ID; | ||
import static uk.co.real_logic.artio.system_tests.SystemTestUtil.acceptingConfig; | ||
import static uk.co.real_logic.artio.system_tests.SystemTestUtil.acceptingLibraryConfig; | ||
import static uk.co.real_logic.artio.system_tests.SystemTestUtil.connect; | ||
import static uk.co.real_logic.artio.system_tests.SystemTestUtil.initiatingConfig; | ||
import static uk.co.real_logic.artio.system_tests.SystemTestUtil.initiatingLibraryConfig; | ||
|
||
/** | ||
* Try reproducing race between sent ResendRequest and ResetSequence message when both | ||
* parties request a resend. Also checks that SessionProxy is invoked when a ResetSequence | ||
* message must be sent. | ||
*/ | ||
public class RaceResendResetTest extends AbstractGatewayToGatewaySystemTest { | ||
|
||
private boolean sendResendRequestCalled; | ||
private boolean sendSequenceResetCalled; | ||
private boolean useProxy; | ||
|
||
private void launch() { | ||
mediaDriver = launchMediaDriver(); | ||
launchAccepting(); | ||
launchInitiating(); | ||
testSystem = new TestSystem(acceptingLibrary, initiatingLibrary); | ||
} | ||
|
||
private void launchInitiating() { | ||
final EngineConfiguration initiatingConfig = | ||
initiatingConfig(libraryAeronPort, nanoClock) | ||
.deleteLogFileDirOnStart(true) | ||
.initialAcceptedSessionOwner(SOLE_LIBRARY); | ||
initiatingEngine = FixEngine.launch(initiatingConfig); | ||
LibraryConfiguration lib = initiatingLibraryConfig(libraryAeronPort, initiatingHandler, nanoClock); | ||
if (useProxy) | ||
lib.sessionProxyFactory(this::sessionProxyFactory); | ||
initiatingLibrary = connect(lib | ||
); | ||
} | ||
|
||
private void launchAccepting() { | ||
final EngineConfiguration acceptingConfig = acceptingConfig(port, ACCEPTOR_ID, INITIATOR_ID, nanoClock) | ||
.deleteLogFileDirOnStart(true) | ||
.initialAcceptedSessionOwner(SOLE_LIBRARY); | ||
acceptingEngine = FixEngine.launch(acceptingConfig); | ||
|
||
final LibraryConfiguration acceptingLibraryConfig = acceptingLibraryConfig(acceptingHandler, nanoClock); | ||
acceptingLibrary = connect(acceptingLibraryConfig); | ||
} | ||
|
||
/** | ||
* Sanity check that we can connect Artio to a debug server with canned messages. | ||
*/ | ||
@Test | ||
public void testDebugServer() throws IOException { | ||
DebugServer srv = new DebugServer(port); | ||
srv.setWaitForData(true); | ||
srv.addFIXResponse("8=FIX.4.4|9=94|35=A|49=acceptor|56=initiator|34=1|52=20240315-10:52:24.098|98=0|108=10|141=N|35002=0|35003=0|10=024|"); | ||
srv.start(); | ||
|
||
mediaDriver = launchMediaDriver(); | ||
launchInitiating(); | ||
testSystem = new TestSystem(initiatingLibrary); | ||
connectAndAcquire(); | ||
} | ||
|
||
private SessionProxy sessionProxyFactory( | ||
final int sessionBufferSize, | ||
final GatewayPublication gatewayPublication, | ||
final SessionIdStrategy sessionIdStrategy, | ||
final SessionCustomisationStrategy customisationStrategy, | ||
final EpochNanoClock clock, | ||
final long connectionId, | ||
final int libraryId, | ||
final ErrorHandler errorHandler, | ||
final EpochFractionFormat epochFractionPrecision) { | ||
return new DirectSessionProxy(sessionBufferSize, gatewayPublication, sessionIdStrategy, customisationStrategy, | ||
clock, connectionId, libraryId, errorHandler, epochFractionPrecision) { | ||
@Override | ||
public long sendResendRequest(int msgSeqNo, int beginSeqNo, int endSeqNo, int sequenceIndex, int lastMsgSeqNumProcessed) { | ||
sendResendRequestCalled = true; | ||
// try { | ||
// Thread.sleep(10); | ||
// } catch (InterruptedException ignored) { | ||
// } | ||
return super.sendResendRequest(msgSeqNo, beginSeqNo, endSeqNo, sequenceIndex, lastMsgSeqNumProcessed); | ||
} | ||
|
||
@Override | ||
public long sendSequenceReset(int msgSeqNo, int newSeqNo, int sequenceIndex, int lastMsgSeqNumProcessed) { | ||
sendSequenceResetCalled = true; | ||
return super.sendSequenceReset(msgSeqNo, newSeqNo, sequenceIndex, lastMsgSeqNumProcessed); | ||
} | ||
}; | ||
} | ||
|
||
@Test(timeout = TEST_TIMEOUT_IN_MS) | ||
public void shouldNotInvertResendAndReset() throws Exception { | ||
useProxy = false; | ||
reconnectTest(); | ||
} | ||
|
||
@Test(timeout = TEST_TIMEOUT_IN_MS) | ||
public void shouldCallProxySendSequenceReset() throws Exception { | ||
useProxy = true; | ||
reconnectTest(); | ||
} | ||
|
||
private void reconnectTest() throws Exception { | ||
launch(); | ||
|
||
connectAndAcquire(); | ||
|
||
messagesCanBeExchanged(); | ||
|
||
disconnectSessions(); | ||
Exceptions.closeAll(this::closeAcceptingEngine); | ||
|
||
assertEquals(3, acceptingSession.lastReceivedMsgSeqNum()); | ||
assertEquals(3, initiatingSession.lastReceivedMsgSeqNum()); | ||
|
||
DebugServer srv = new DebugServer(port); | ||
srv.setWaitForData(true); | ||
srv.addFIXResponse("8=FIX.4.4|9=94|35=A|49=acceptor|56=initiator|34=5|52=***|98=0|108=10|141=N|35002=0|35003=0|10=024|"); | ||
srv.addFIXResponse("8=FIX.4.4|9=94|35=2|49=acceptor|56=initiator|34=6|52=***|7=4|16=0|10=024|"); | ||
srv.start(); | ||
|
||
connectPersistentSessions(4, 4, false); | ||
|
||
DebugFIXClient exchange = new DebugFIXClient(srv.popClient(5000)); | ||
exchange.popAndAssert("35=A 34=4"); | ||
exchange.popAndAssert("35=2 34=5 7=4 16=0"); | ||
exchange.popAndAssert("35=4 34=4 36=6"); | ||
|
||
exchange.close(); | ||
srv.stop(); | ||
Exceptions.closeAll(this::closeInitiatingEngine, mediaDriver); | ||
|
||
if (useProxy) { | ||
assertTrue("SessionProxy.sendResendRequest() not called", sendResendRequestCalled); | ||
assertTrue("SessionProxy.sendSequenceReset() not called", sendSequenceResetCalled); | ||
} | ||
} | ||
|
||
private void connectAndAcquire() { | ||
connectSessions(); | ||
acceptingSession = acceptingHandler.lastSession(); | ||
} | ||
} |
79 changes: 79 additions & 0 deletions
79
artio-system-tests/src/test/java/uk/co/real_logic/artio/util/DebugFIXClient.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
package uk.co.real_logic.artio.util; | ||
|
||
import org.junit.Assert; | ||
|
||
import java.util.HashMap; | ||
import java.util.Map; | ||
import java.util.Objects; | ||
import java.util.Scanner; | ||
import java.util.concurrent.BlockingQueue; | ||
import java.util.concurrent.LinkedBlockingQueue; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
/** | ||
* Helper to pop FIX messages received on a socket. | ||
* | ||
* @see DebugServer | ||
*/ | ||
public class DebugFIXClient { | ||
private final DebugServer.HasIOStream io; | ||
private final Thread thread; | ||
|
||
private final BlockingQueue<Map<String, String>> messages = new LinkedBlockingQueue<>(); | ||
private volatile boolean disposed; | ||
private String prefix = " <<< "; | ||
|
||
public DebugFIXClient(DebugServer.HasIOStream io) { | ||
this.io = Objects.requireNonNull(io); | ||
thread = new Thread(this::run, "DebugFIXClient"); | ||
thread.start(); | ||
} | ||
|
||
public void close() throws Exception { | ||
disposed = true; | ||
io.in.close(); | ||
io.in.close(); | ||
thread.interrupt(); | ||
thread.join(); | ||
} | ||
|
||
private void run() { | ||
StringBuilder s = new StringBuilder(128); | ||
while (!disposed) { | ||
Scanner scanner = new Scanner(io.in).useDelimiter("\u0001"); | ||
Map<String, String> msg = new HashMap<>(); | ||
while (scanner.hasNext()) { | ||
String fld = scanner.next(); | ||
s.append(fld).append('|'); | ||
int eq = fld.indexOf('='); | ||
String tag = fld.substring(0, eq); | ||
msg.put(tag, fld.substring(eq + 1)); | ||
if (tag.equals("10")) { | ||
messages.add(msg); | ||
msg = new HashMap<>(); | ||
System.out.println(prefix + s); | ||
s.setLength(0); | ||
} | ||
} | ||
} | ||
} | ||
|
||
public Map<String, String> popMessage() throws InterruptedException { | ||
return messages.poll(5, TimeUnit.SECONDS); | ||
} | ||
|
||
public void popAndAssert(String tagValues) throws InterruptedException { | ||
Map<String, String> map = popMessage(); | ||
for (String rule : tagValues.split(" ")) { | ||
String tag = rule.substring(0, rule.indexOf('=')); | ||
if (map == null) | ||
throw new AssertionError("No message received"); | ||
String value = map.get(tag); | ||
Assert.assertEquals(rule, tag + "=" + value); | ||
} | ||
} | ||
|
||
public void setPrefix(String prefix) { | ||
this.prefix = prefix; | ||
} | ||
} |
131 changes: 131 additions & 0 deletions
131
artio-system-tests/src/test/java/uk/co/real_logic/artio/util/DebugServer.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,131 @@ | ||
package uk.co.real_logic.artio.util; | ||
|
||
import java.io.BufferedInputStream; | ||
import java.io.BufferedOutputStream; | ||
import java.io.IOException; | ||
import java.io.InputStream; | ||
import java.io.OutputStream; | ||
import java.net.ServerSocket; | ||
import java.net.Socket; | ||
import java.util.Queue; | ||
import java.util.concurrent.BlockingQueue; | ||
import java.util.concurrent.ConcurrentLinkedQueue; | ||
import java.util.concurrent.LinkedBlockingQueue; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
/** | ||
* A server that accepts TCP connections and is able to reply automatically with canned | ||
* data. It can be used to simulate a FIX server in order to quickly sent specific messages. | ||
*/ | ||
public class DebugServer { | ||
|
||
private final int port; | ||
private final Queue<byte[]> connectResponses; | ||
private final BlockingQueue<HasIOStream> clients; | ||
private final ServerSocket serverSocket; | ||
|
||
/** | ||
* If true, wait until some data is received before sending prepared messages. | ||
*/ | ||
private boolean waitForData; | ||
|
||
/** | ||
* Creates a debug server listening on specified port. | ||
*/ | ||
public DebugServer(int port) throws IOException { | ||
this.port = port; | ||
this.connectResponses = new ConcurrentLinkedQueue<>(); | ||
this.clients = new LinkedBlockingQueue<>(); | ||
this.serverSocket = new ServerSocket(port); | ||
} | ||
|
||
/** | ||
* Adds a message that must be directly sent to connecting clients. Messages | ||
* are sent in the same order they were added. | ||
*/ | ||
public void addConnectResponse(byte[] message) { | ||
connectResponses.add(message); | ||
} | ||
|
||
/** | ||
* Warning: causes problems because SendingTime and checksum needs to be regenerated | ||
* and they are not. | ||
*/ | ||
public void addFIXResponse(String msg) { | ||
addConnectResponse(FixMessageTweak.recycle(msg)); | ||
} | ||
|
||
/** | ||
* Starts the debug server, accepting incoming connections and sending | ||
* prepared data. | ||
*/ | ||
public void start() throws IOException { | ||
new Thread("DebugServer-" + port) { | ||
@Override | ||
public void run() { | ||
try { | ||
while (!serverSocket.isClosed()) { | ||
Socket s = serverSocket.accept(); | ||
System.out.println("Connection accepted from " + s.getInetAddress()); | ||
try { | ||
BufferedInputStream in = new BufferedInputStream(s.getInputStream()); | ||
BufferedOutputStream out = new BufferedOutputStream(s.getOutputStream()); | ||
|
||
if (!connectResponses.isEmpty() && waitForData) { | ||
in.mark(0); | ||
in.read(); | ||
in.reset(); | ||
} | ||
|
||
HasIOStream client = new HasIOStream(in, out); | ||
sendResponses(client.out); | ||
clients.add(client); | ||
} catch (IOException e) { | ||
e.printStackTrace(); | ||
} | ||
} | ||
} catch (IOException e) { | ||
if (!serverSocket.isClosed()) | ||
e.printStackTrace(); | ||
} | ||
} | ||
}.start(); | ||
} | ||
|
||
public void stop() throws IOException { | ||
serverSocket.close(); | ||
} | ||
|
||
/** | ||
* Sends prepared data to the client. | ||
*/ | ||
private void sendResponses(OutputStream outputStream) throws IOException { | ||
for (byte[] response : connectResponses) { | ||
outputStream.write(response); | ||
outputStream.flush(); | ||
} | ||
} | ||
|
||
public HasIOStream popClient(long timeoutMs) throws InterruptedException { | ||
return clients.poll(timeoutMs, TimeUnit.MILLISECONDS); | ||
} | ||
|
||
public int getPort() { | ||
return port; | ||
} | ||
|
||
public void setWaitForData(boolean waitForData) { | ||
this.waitForData = waitForData; | ||
} | ||
|
||
public static class HasIOStream { | ||
|
||
public final InputStream in; | ||
public final OutputStream out; | ||
|
||
public HasIOStream(InputStream in, OutputStream out) { | ||
this.in = in; | ||
this.out = out; | ||
} | ||
} | ||
} |
Oops, something went wrong.