diff --git a/codebuild/mqtt5-java-canary-test.yml b/codebuild/mqtt5-java-canary-test.yml index 9e293d939..00a9c2804 100644 --- a/codebuild/mqtt5-java-canary-test.yml +++ b/codebuild/mqtt5-java-canary-test.yml @@ -33,12 +33,10 @@ phases: - export S3_DST=$(aws secretsmanager get-secret-value --secret-id "$CANARY_BUILD_S3_DST" --query "SecretString" | cut -f2,3 -d":" | sed -e 's/[\\\"\}]//g') - export GIT_HASH=$(git rev-parse HEAD) # Get the endpoint, certificate, and key for connecting to IoT Core - - export ENDPOINT=$(aws secretsmanager get-secret-value --secret-id "unit-test/endpoint" --query "SecretString" | cut -f2 -d":" | sed -e 's/[\\\"\}]//g') - - cert=$(aws secretsmanager get-secret-value --secret-id "unit-test/certificate" --query "SecretString" | cut -f2 -d":" | cut -f2 -d\") && echo -e "$cert" > /tmp/certificate.pem - - key=$(aws secretsmanager get-secret-value --secret-id "unit-test/privatekey" --query "SecretString" | cut -f2 -d":" | cut -f2 -d\") && echo -e "$key" > /tmp/privatekey.pem + - export ENDPOINT=$(aws secretsmanager get-secret-value --secret-id ${CANARY_SERVER_ARN} --query "SecretString" | cut -f2 -d":" | sed -e 's/[\\\"\}]//g') # Run the Canary - cd ./utils/Canary - - python3 ../../codebuild/CanaryWrapper.py --canary_executable mvn --canary_arguments "compile exec:java -Dexec.mainClass=canary.mqtt5.Mqtt5Canary -Dexec.args=\"--endpoint \"${ENDPOINT}\" --port 8883 --cert /tmp/certificate.pem --key /tmp/privatekey.pem --seconds ${CANARY_DURATION} --threads ${CANARY_THREADS} --tps ${CANARY_TPS} --clients ${CANARY_CLIENT_COUNT}\" -Daws.crt.debugnative=true -Daws.crt.log.destination=Stdout -Daws.crt.log.level=${CANARY_LOG_LEVEL}" --git_hash ${GIT_HASH} --git_repo_name $PACKAGE_NAME --codebuild_log_path $CODEBUILD_LOG_PATH --ticket_item "${CODEBUILD_TICKET_ITEM}" + - python3 ../../codebuild/CanaryWrapper.py --canary_executable mvn --canary_arguments "compile exec:java -Dexec.mainClass=canary.mqtt5.Mqtt5Canary -Dexec.args=\"--endpoint \"${ENDPOINT}\" --port 1883 --seconds ${CANARY_DURATION} --threads ${CANARY_THREADS} --tps ${CANARY_TPS} --clients ${CANARY_CLIENT_COUNT}\" -Daws.crt.debugnative=true -Daws.crt.log.destination=Stdout -Daws.crt.log.level=${CANARY_LOG_LEVEL}" --git_hash ${GIT_HASH} --git_repo_name $PACKAGE_NAME --codebuild_log_path $CODEBUILD_LOG_PATH --ticket_item "${CODEBUILD_TICKET_ITEM}" post_build: commands: - echo Build completed on `date` diff --git a/utils/Canary/src/main/java/canary/mqtt5/Mqtt5Canary.java b/utils/Canary/src/main/java/canary/mqtt5/Mqtt5Canary.java index c84095d3a..509732d55 100644 --- a/utils/Canary/src/main/java/canary/mqtt5/Mqtt5Canary.java +++ b/utils/Canary/src/main/java/canary/mqtt5/Mqtt5Canary.java @@ -76,27 +76,29 @@ public class Mqtt5Canary { static int operationFutureWaitTime = 30; + private static final int MAX_PAYLOAD_SIZE = 65535; // Use UINT64_MAX for the payload size + static void printUsage() { System.out.println( - "Usage:\n" + - " --help This message\n"+ - " --endpoint MQTT5 endpoint hostname (optional, default=localhost)\n"+ - " --port MQTT5 endpoint port to use (optional, default=1883)\n"+ - " --ca_file A path to a CA certificate file (optional)\n"+ - " --cert A path to a certificate file (optional, will use mTLS if defined)\n" + - " --key A path to a private key file (optional, will use mTLS if defined)\n" + - " --clientID The ClientID to connect with (optional, default=MQTT5_Sample_Java_)\n"+ - " --use_websockets If defined, websockets will be used (optional)\n"+ - " --use_tls If defined, TLS (or mTLS) will be used (optional)\n"+ - "\n"+ - " --threads The number of EventLoop group threads to use (optional, default=8)\n"+ - " --clients The number of clients to use (optional, default=3, max=50)\n"+ - " --tps The number of seconds to wait after performing an operation (optional, default=12)\n"+ - " --seconds The number of seconds to run the Canary test (optional, default=600)\n"+ - " --log_console If defined, logging will print to stdout (optional, default=true, type=boolean)\n"+ - " --log_aws If defined, logging will occur using the AWS Logger (optional, type=boolean) \n"+ - " --log_aws_level If defined, logging to AWS Logger will use that log level (optional, default=Debug)\n"+ - " --log_file If defined, logging will be written to this file (optional)" + "Usage:\n" + + " --help This message\n"+ + " --endpoint MQTT5 endpoint hostname (optional, default=localhost)\n"+ + " --port MQTT5 endpoint port to use (optional, default=1883)\n"+ + " --ca_file A path to a CA certificate file (optional)\n"+ + " --cert A path to a certificate file (optional, will use mTLS if defined)\n" + + " --key A path to a private key file (optional, will use mTLS if defined)\n" + + " --clientID The ClientID to connect with (optional, default=MQTT5_Sample_Java_)\n"+ + " --use_websockets If defined, websockets will be used (optional)\n"+ + " --use_tls If defined, TLS (or mTLS) will be used (optional)\n"+ + "\n"+ + " --threads The number of EventLoop group threads to use (optional, default=8)\n"+ + " --clients The number of clients to use (optional, default=3, max=50)\n"+ + " --tps The number of seconds to wait after performing an operation (optional, default=12)\n"+ + " --seconds The number of seconds to run the Canary test (optional, default=600)\n"+ + " --log_console If defined, logging will print to stdout (optional, default=true, type=boolean)\n"+ + " --log_aws If defined, logging will occur using the AWS Logger (optional, type=boolean) \n"+ + " --log_aws_level If defined, logging to AWS Logger will use that log level (optional, default=Debug)\n"+ + " --log_file If defined, logging will be written to this file (optional)" ); } @@ -154,7 +156,12 @@ static void parseCommandLine(String[] args) { break; case "--tps": if (idx + 1 < args.length) { - configTps = Integer.parseInt(args[++idx]); + int tps = Integer.parseInt(args[++idx]); + if (tps == 0) { + configTps = 0; + } else { + configTps = 1000 / tps; + } } break; case "--seconds": @@ -256,7 +263,7 @@ public void onConnectionSuccess(Mqtt5Client client, OnConnectionSuccessReturn on @Override public void onConnectionFailure(Mqtt5Client client, OnConnectionFailureReturn onConnectionFailureReturn) { int clientIdx = clients.indexOf(client); - PrintLog("[Lifecycle event] Client ID " + clientIdx + " connection failed..."); + PrintLog("[Lifecycle event] Client ID " + clientIdx + " connection failed with errorCode : " + onConnectionFailureReturn.getErrorCode()); clientsData.get(clientIdx).connectedFuture.completeExceptionally(new Exception("Connection failure")); clientsData.get(clientIdx).subscribedToTopics = false; } @@ -286,8 +293,7 @@ public void onMessageReceived(Mqtt5Client client, PublishReturn publishReturn) { PublishPacket publishPacket = publishReturn.getPublishPacket(); int clientIdx = clients.indexOf(client); PrintLog("[Publish event] Client ID " + clientIdx + " message received:\n" + - " Topic: " + publishPacket.getTopic() + "\n" + - " Payload: " + new String(publishPacket.getPayload())); + " Topic: " + publishPacket.getTopic() + "\n"); } } @@ -364,8 +370,6 @@ public void accept(Mqtt5WebsocketHandshakeTransformArgs t) { public static void setupOperations() { // For now have everything evenly distributed - clientsOperationsList.add(CANARY_OPERATIONS.OPERATION_NULL); - clientsOperationsList.add(CANARY_OPERATIONS.OPERATION_START); clientsOperationsList.add(CANARY_OPERATIONS.OPERATION_STOP); clientsOperationsList.add(CANARY_OPERATIONS.OPERATION_SUBSCRIBE); clientsOperationsList.add(CANARY_OPERATIONS.OPERATION_UNSUBSCRIBE); @@ -410,7 +414,6 @@ public static void OperationStart(int clientIdx) { if (configFilePrinter != null) { ex.printStackTrace(configFilePrinter); } - exitWithError(1); } PrintLog("[OP] Started client ID " + clientIdx); clientsData.get(clientIdx).isWaitingForOperation = false; @@ -438,7 +441,6 @@ public static void OperationStop(int clientIdx) { if (configFilePrinter != null) { ex.printStackTrace(configFilePrinter); } - exitWithError(1); } PrintLog("[OP] Stopped client ID " + clientIdx); clientsData.get(clientIdx).isWaitingForOperation = false; @@ -471,7 +473,6 @@ public static void OperationSubscribe(int clientIdx) { if (configFilePrinter != null) { ex.printStackTrace(configFilePrinter); } - exitWithError(1); } clientsData.get(clientIdx).subscribedToTopics = true; PrintLog("[OP] Subscribed client ID " + clientIdx); @@ -505,7 +506,6 @@ public static void OperationUnsubscribe(int clientIdx) { if (configFilePrinter != null) { ex.printStackTrace(configFilePrinter); } - exitWithError(1); } clientsData.get(clientIdx).subscribedToTopics = false; PrintLog("[OP] Unsubscribed client ID " + clientIdx); @@ -535,7 +535,6 @@ public static void OperationUnsubscribeBad(int clientIdx) { if (configFilePrinter != null) { ex.printStackTrace(configFilePrinter); } - exitWithError(1); } PrintLog("[OP] Unsubscribed (bad) client ID " + clientIdx); clientsData.get(clientIdx).isWaitingForOperation = false; @@ -557,7 +556,13 @@ public static void OperationPublish(int clientIdx, QOS qos, String topic) { PrintLog("[OP] About to publish client ID " + clientIdx + " with QoS " + qos + " with topic " + topic); PublishPacketBuilder publishPacketBuilder = new PublishPacketBuilder(); publishPacketBuilder.withQOS(qos); - publishPacketBuilder.withPayload("Hello World".getBytes()); + + int payload_size = random.nextInt(MAX_PAYLOAD_SIZE); + byte[] payload_bytes = new byte[payload_size]; + for (int i = 0; i < payload_size; i++) { + payload_bytes[i] = (byte)random.nextInt(128); + } + publishPacketBuilder.withPayload(payload_bytes); publishPacketBuilder.withTopic(topic); // Add user properties! @@ -575,7 +580,6 @@ public static void OperationPublish(int clientIdx, QOS qos, String topic) { if (configFilePrinter != null) { ex.printStackTrace(configFilePrinter); } - exitWithError(1); } PrintLog("[OP] Published client ID " + clientIdx + " with QoS " + qos + " with topic " + topic); clientsData.get(clientIdx).isWaitingForOperation = false; @@ -734,9 +738,9 @@ public static void main(String[] args) { PerformRandomOperation(); try { - Thread.sleep(configTps * 1000); + Thread.sleep(configTps); } catch (Exception ex) { - PrintLog("[OP] Could not sleep for " + (configTps*1000) + " seconds due to exception! Exception: " + ex); + PrintLog("[OP] Could not sleep for " + (configTps) + " seconds due to exception! Exception: " + ex); exitWithError(1); } }