Skip to content

Commit

Permalink
Mqtt5 Canary Update (#721)
Browse files Browse the repository at this point in the history
  • Loading branch information
xiazhvera authored Nov 16, 2023
1 parent d85ea46 commit efdde24
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 38 deletions.
6 changes: 2 additions & 4 deletions codebuild/mqtt5-java-canary-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,10 @@ phases:
- export S3_DST=$(aws secretsmanager get-secret-value --secret-id "$CANARY_BUILD_S3_DST" --query "SecretString" | cut -f2,3 -d":" | sed -e 's/[\\\"\}]//g')
- export GIT_HASH=$(git rev-parse HEAD)
# Get the endpoint, certificate, and key for connecting to IoT Core
- export ENDPOINT=$(aws secretsmanager get-secret-value --secret-id "unit-test/endpoint" --query "SecretString" | cut -f2 -d":" | sed -e 's/[\\\"\}]//g')
- cert=$(aws secretsmanager get-secret-value --secret-id "unit-test/certificate" --query "SecretString" | cut -f2 -d":" | cut -f2 -d\") && echo -e "$cert" > /tmp/certificate.pem
- key=$(aws secretsmanager get-secret-value --secret-id "unit-test/privatekey" --query "SecretString" | cut -f2 -d":" | cut -f2 -d\") && echo -e "$key" > /tmp/privatekey.pem
- export ENDPOINT=$(aws secretsmanager get-secret-value --secret-id ${CANARY_SERVER_ARN} --query "SecretString" | cut -f2 -d":" | sed -e 's/[\\\"\}]//g')
# Run the Canary
- cd ./utils/Canary
- python3 ../../codebuild/CanaryWrapper.py --canary_executable mvn --canary_arguments "compile exec:java -Dexec.mainClass=canary.mqtt5.Mqtt5Canary -Dexec.args=\"--endpoint \"${ENDPOINT}\" --port 8883 --cert /tmp/certificate.pem --key /tmp/privatekey.pem --seconds ${CANARY_DURATION} --threads ${CANARY_THREADS} --tps ${CANARY_TPS} --clients ${CANARY_CLIENT_COUNT}\" -Daws.crt.debugnative=true -Daws.crt.log.destination=Stdout -Daws.crt.log.level=${CANARY_LOG_LEVEL}" --git_hash ${GIT_HASH} --git_repo_name $PACKAGE_NAME --codebuild_log_path $CODEBUILD_LOG_PATH --ticket_item "${CODEBUILD_TICKET_ITEM}"
- python3 ../../codebuild/CanaryWrapper.py --canary_executable mvn --canary_arguments "compile exec:java -Dexec.mainClass=canary.mqtt5.Mqtt5Canary -Dexec.args=\"--endpoint \"${ENDPOINT}\" --port 1883 --seconds ${CANARY_DURATION} --threads ${CANARY_THREADS} --tps ${CANARY_TPS} --clients ${CANARY_CLIENT_COUNT}\" -Daws.crt.debugnative=true -Daws.crt.log.destination=Stdout -Daws.crt.log.level=${CANARY_LOG_LEVEL}" --git_hash ${GIT_HASH} --git_repo_name $PACKAGE_NAME --codebuild_log_path $CODEBUILD_LOG_PATH --ticket_item "${CODEBUILD_TICKET_ITEM}"
post_build:
commands:
- echo Build completed on `date`
72 changes: 38 additions & 34 deletions utils/Canary/src/main/java/canary/mqtt5/Mqtt5Canary.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,27 +76,29 @@ public class Mqtt5Canary {

static int operationFutureWaitTime = 30;

private static final int MAX_PAYLOAD_SIZE = 65535; // Use UINT64_MAX for the payload size

static void printUsage() {
System.out.println(
"Usage:\n" +
" --help This message\n"+
" --endpoint MQTT5 endpoint hostname (optional, default=localhost)\n"+
" --port MQTT5 endpoint port to use (optional, default=1883)\n"+
" --ca_file A path to a CA certificate file (optional)\n"+
" --cert A path to a certificate file (optional, will use mTLS if defined)\n" +
" --key A path to a private key file (optional, will use mTLS if defined)\n" +
" --clientID The ClientID to connect with (optional, default=MQTT5_Sample_Java_<UUID>)\n"+
" --use_websockets If defined, websockets will be used (optional)\n"+
" --use_tls If defined, TLS (or mTLS) will be used (optional)\n"+
"\n"+
" --threads The number of EventLoop group threads to use (optional, default=8)\n"+
" --clients The number of clients to use (optional, default=3, max=50)\n"+
" --tps The number of seconds to wait after performing an operation (optional, default=12)\n"+
" --seconds The number of seconds to run the Canary test (optional, default=600)\n"+
" --log_console If defined, logging will print to stdout (optional, default=true, type=boolean)\n"+
" --log_aws If defined, logging will occur using the AWS Logger (optional, type=boolean) \n"+
" --log_aws_level If defined, logging to AWS Logger will use that log level (optional, default=Debug)\n"+
" --log_file If defined, logging will be written to this file (optional)"
"Usage:\n" +
" --help This message\n"+
" --endpoint MQTT5 endpoint hostname (optional, default=localhost)\n"+
" --port MQTT5 endpoint port to use (optional, default=1883)\n"+
" --ca_file A path to a CA certificate file (optional)\n"+
" --cert A path to a certificate file (optional, will use mTLS if defined)\n" +
" --key A path to a private key file (optional, will use mTLS if defined)\n" +
" --clientID The ClientID to connect with (optional, default=MQTT5_Sample_Java_<UUID>)\n"+
" --use_websockets If defined, websockets will be used (optional)\n"+
" --use_tls If defined, TLS (or mTLS) will be used (optional)\n"+
"\n"+
" --threads The number of EventLoop group threads to use (optional, default=8)\n"+
" --clients The number of clients to use (optional, default=3, max=50)\n"+
" --tps The number of seconds to wait after performing an operation (optional, default=12)\n"+
" --seconds The number of seconds to run the Canary test (optional, default=600)\n"+
" --log_console If defined, logging will print to stdout (optional, default=true, type=boolean)\n"+
" --log_aws If defined, logging will occur using the AWS Logger (optional, type=boolean) \n"+
" --log_aws_level If defined, logging to AWS Logger will use that log level (optional, default=Debug)\n"+
" --log_file If defined, logging will be written to this file (optional)"
);
}

Expand Down Expand Up @@ -154,7 +156,12 @@ static void parseCommandLine(String[] args) {
break;
case "--tps":
if (idx + 1 < args.length) {
configTps = Integer.parseInt(args[++idx]);
int tps = Integer.parseInt(args[++idx]);
if (tps == 0) {
configTps = 0;
} else {
configTps = 1000 / tps;
}
}
break;
case "--seconds":
Expand Down Expand Up @@ -256,7 +263,7 @@ public void onConnectionSuccess(Mqtt5Client client, OnConnectionSuccessReturn on
@Override
public void onConnectionFailure(Mqtt5Client client, OnConnectionFailureReturn onConnectionFailureReturn) {
int clientIdx = clients.indexOf(client);
PrintLog("[Lifecycle event] Client ID " + clientIdx + " connection failed...");
PrintLog("[Lifecycle event] Client ID " + clientIdx + " connection failed with errorCode : " + onConnectionFailureReturn.getErrorCode());
clientsData.get(clientIdx).connectedFuture.completeExceptionally(new Exception("Connection failure"));
clientsData.get(clientIdx).subscribedToTopics = false;
}
Expand Down Expand Up @@ -286,8 +293,7 @@ public void onMessageReceived(Mqtt5Client client, PublishReturn publishReturn) {
PublishPacket publishPacket = publishReturn.getPublishPacket();
int clientIdx = clients.indexOf(client);
PrintLog("[Publish event] Client ID " + clientIdx + " message received:\n" +
" Topic: " + publishPacket.getTopic() + "\n" +
" Payload: " + new String(publishPacket.getPayload()));
" Topic: " + publishPacket.getTopic() + "\n");
}
}

Expand Down Expand Up @@ -364,8 +370,6 @@ public void accept(Mqtt5WebsocketHandshakeTransformArgs t) {

public static void setupOperations() {
// For now have everything evenly distributed
clientsOperationsList.add(CANARY_OPERATIONS.OPERATION_NULL);
clientsOperationsList.add(CANARY_OPERATIONS.OPERATION_START);
clientsOperationsList.add(CANARY_OPERATIONS.OPERATION_STOP);
clientsOperationsList.add(CANARY_OPERATIONS.OPERATION_SUBSCRIBE);
clientsOperationsList.add(CANARY_OPERATIONS.OPERATION_UNSUBSCRIBE);
Expand Down Expand Up @@ -410,7 +414,6 @@ public static void OperationStart(int clientIdx) {
if (configFilePrinter != null) {
ex.printStackTrace(configFilePrinter);
}
exitWithError(1);
}
PrintLog("[OP] Started client ID " + clientIdx);
clientsData.get(clientIdx).isWaitingForOperation = false;
Expand Down Expand Up @@ -438,7 +441,6 @@ public static void OperationStop(int clientIdx) {
if (configFilePrinter != null) {
ex.printStackTrace(configFilePrinter);
}
exitWithError(1);
}
PrintLog("[OP] Stopped client ID " + clientIdx);
clientsData.get(clientIdx).isWaitingForOperation = false;
Expand Down Expand Up @@ -471,7 +473,6 @@ public static void OperationSubscribe(int clientIdx) {
if (configFilePrinter != null) {
ex.printStackTrace(configFilePrinter);
}
exitWithError(1);
}
clientsData.get(clientIdx).subscribedToTopics = true;
PrintLog("[OP] Subscribed client ID " + clientIdx);
Expand Down Expand Up @@ -505,7 +506,6 @@ public static void OperationUnsubscribe(int clientIdx) {
if (configFilePrinter != null) {
ex.printStackTrace(configFilePrinter);
}
exitWithError(1);
}
clientsData.get(clientIdx).subscribedToTopics = false;
PrintLog("[OP] Unsubscribed client ID " + clientIdx);
Expand Down Expand Up @@ -535,7 +535,6 @@ public static void OperationUnsubscribeBad(int clientIdx) {
if (configFilePrinter != null) {
ex.printStackTrace(configFilePrinter);
}
exitWithError(1);
}
PrintLog("[OP] Unsubscribed (bad) client ID " + clientIdx);
clientsData.get(clientIdx).isWaitingForOperation = false;
Expand All @@ -557,7 +556,13 @@ public static void OperationPublish(int clientIdx, QOS qos, String topic) {
PrintLog("[OP] About to publish client ID " + clientIdx + " with QoS " + qos + " with topic " + topic);
PublishPacketBuilder publishPacketBuilder = new PublishPacketBuilder();
publishPacketBuilder.withQOS(qos);
publishPacketBuilder.withPayload("Hello World".getBytes());

int payload_size = random.nextInt(MAX_PAYLOAD_SIZE);
byte[] payload_bytes = new byte[payload_size];
for (int i = 0; i < payload_size; i++) {
payload_bytes[i] = (byte)random.nextInt(128);
}
publishPacketBuilder.withPayload(payload_bytes);
publishPacketBuilder.withTopic(topic);

// Add user properties!
Expand All @@ -575,7 +580,6 @@ public static void OperationPublish(int clientIdx, QOS qos, String topic) {
if (configFilePrinter != null) {
ex.printStackTrace(configFilePrinter);
}
exitWithError(1);
}
PrintLog("[OP] Published client ID " + clientIdx + " with QoS " + qos + " with topic " + topic);
clientsData.get(clientIdx).isWaitingForOperation = false;
Expand Down Expand Up @@ -734,9 +738,9 @@ public static void main(String[] args) {
PerformRandomOperation();

try {
Thread.sleep(configTps * 1000);
Thread.sleep(configTps);
} catch (Exception ex) {
PrintLog("[OP] Could not sleep for " + (configTps*1000) + " seconds due to exception! Exception: " + ex);
PrintLog("[OP] Could not sleep for " + (configTps) + " seconds due to exception! Exception: " + ex);
exitWithError(1);
}
}
Expand Down

0 comments on commit efdde24

Please sign in to comment.