diff --git a/generator/amqp_snd_th.c b/generator/amqp_snd_th.c index e9cef4f6..9b5d74ed 100644 --- a/generator/amqp_snd_th.c +++ b/generator/amqp_snd_th.c @@ -29,6 +29,7 @@ static time_t start_time; int batch_count = 0; static const pn_bytes_t SEND_TIME = {sizeof("SendTime") - 1, "SendTime"}; +static const pn_bytes_t METRICS_SENT = {sizeof("AMQPSent") - 1, "AMQPSent"}; /* Close the connection and the listener so so we will get a * PN_PROACTOR_INACTIVE event and exit, once all outstanding events @@ -65,34 +66,32 @@ char *CD_MSG3 = ", \"interval\": 1,\"host\": \""; char *CD_MSG4 = "\", \"plugin\": \""; char *CD_MSG5 = "\", \"plugin_instance\": \"pluginInst0\",\"type\": \"type0\",\"type_instance\": \"typInst0\"}"; -static char MSG_BUFFER[4096]; -static char now_buf[100]; - static char *build_mesg(app_data_t *app, char *time_buf) { - char *p = MSG_BUFFER; + int msg_buf_size = sizeof(app->MSG_BUFFER); + char *p = app->MSG_BUFFER; char val_buff[20]; *p++ = '['; for (int i = 0; i < app->num_cd_per_mesg;) { - p = memccpy(p, CD_MSG1, '\0', sizeof(MSG_BUFFER)); + p = memccpy(p, CD_MSG1, '\0', msg_buf_size); p--; sprintf(val_buff, "%ld", app->host_list[app->curr_host].count++); - p = memccpy(p, val_buff, '\0', sizeof(MSG_BUFFER)); + p = memccpy(p, val_buff, '\0', msg_buf_size); p--; - p = memccpy(p, CD_MSG2, '\0', sizeof(MSG_BUFFER)); + p = memccpy(p, CD_MSG2, '\0', msg_buf_size); p--; - p = memccpy(p, time_buf, '\0', sizeof(MSG_BUFFER)); + p = memccpy(p, time_buf, '\0', msg_buf_size); p--; - p = memccpy(p, CD_MSG3, '\0', sizeof(MSG_BUFFER)); + p = memccpy(p, CD_MSG3, '\0', msg_buf_size); p--; - p = memccpy(p, app->host_list[app->curr_host].hostname, '\0', sizeof(MSG_BUFFER)); + p = memccpy(p, app->host_list[app->curr_host].hostname, '\0', msg_buf_size); p--; - p = memccpy(p, CD_MSG4, '\0', sizeof(MSG_BUFFER)); + p = memccpy(p, CD_MSG4, '\0', msg_buf_size); p--; - p = memccpy(p, app->host_list[app->curr_host].metric, '\0', sizeof(MSG_BUFFER)); + p = memccpy(p, app->host_list[app->curr_host].metric, '\0', msg_buf_size); p--; - p = memccpy(p, CD_MSG5, '\0', sizeof(MSG_BUFFER)); + p = memccpy(p, CD_MSG5, '\0', msg_buf_size); p--; if (++i < app->num_cd_per_mesg) { @@ -107,7 +106,7 @@ static char *build_mesg(app_data_t *app, char *time_buf) { *p++ = ']'; *p = '\0'; - return MSG_BUFFER; + return app->MSG_BUFFER; } static void gen_mesg(pn_rwbytes_t *buf, app_data_t *app, char *time_buf) { @@ -138,6 +137,8 @@ static void send_message(app_data_t *app, pn_link_t *sender, pn_rwbytes_t *data) pn_data_enter(props); pn_data_put_string(props, pn_bytes(SEND_TIME.size, SEND_TIME.start)); pn_data_put_long(props, stime); + pn_data_put_string(props, pn_bytes(METRICS_SENT.size, METRICS_SENT.start)); + pn_data_put_long(props, app->amqp_sent); pn_data_exit(props); pn_data_t *body = pn_message_body(message); @@ -168,24 +169,26 @@ static bool send_burst(app_data_t *app, pn_event_t *event) { struct timespec now; clock_gettime(CLOCK_REALTIME, &now); - time_sprintf(now_buf, now); + time_sprintf(app->now_buf, now); app->total_bursts++; app->burst_credit += credits; while (pn_link_credit(sender) > 0) { if (app->message_count > 0 && app->metrics_sent == app->message_count) { - break; + return 0; } app->amqp_sent++; app->metrics_sent += app->num_cd_per_mesg; /* Use sent counter as unique delivery tag. */ - pn_delivery(sender, pn_dtag((const char *)&app->metrics_sent, + pn_delivery_t *dlv = pn_delivery(sender, pn_dtag((const char *)&app->metrics_sent, sizeof(app->metrics_sent))); pn_rwbytes_t data; - gen_mesg(&data, app, now_buf); + gen_mesg(&data, app, app->now_buf); send_message(app, sender, &data); + if (app->presettled) + pn_delivery_settle(dlv); if (app->burst_size > 0 && ++burst >= app->burst_size) { break; } @@ -207,7 +210,8 @@ static bool handle(app_data_t *app, pn_event_t *event) { if (app->verbose > 1) { printf("PN_LINK_FLOW %d\n", pn_link_credit(sender)); } - send_burst(app, event); + //printf("link_credits: %d, sent: %ld\n",pn_link_credit(sender), app->amqp_sent ); + exit_code = send_burst(app, event); break; } @@ -230,7 +234,8 @@ static bool handle(app_data_t *app, pn_event_t *event) { pn_delivery_t *d = pn_event_delivery(event); if (pn_delivery_remote_state(d) == PN_ACCEPTED) { - pn_delivery_settle(d); + if (app->presettled==false) + pn_delivery_settle(d); app->acknowledged += app->num_cd_per_mesg; if (app->acknowledged == app->message_count) { printf("%ld messages metrics_sent and acknowledged\n", @@ -270,7 +275,7 @@ static bool handle(app_data_t *app, pn_event_t *event) { pn_link_t *sender = pn_sender(s, link_name); app->sender = sender; pn_terminus_set_address(pn_link_target(sender), app->amqp_address); - pn_link_set_snd_settle_mode(sender, PN_SND_UNSETTLED); + pn_link_set_snd_settle_mode(sender, PN_SND_MIXED); pn_link_set_rcv_settle_mode(sender, PN_RCV_FIRST); pn_link_open(sender); } @@ -290,7 +295,7 @@ static bool handle(app_data_t *app, pn_event_t *event) { pn_transport_t *t = pn_event_transport(event); pn_transport_require_auth(t, false); pn_sasl_allowed_mechs(pn_sasl(t), "ANONYMOUS"); - //pn_sasl_set_allow_insecure_mechs(pn_sasl(t), true); + pn_sasl_set_allow_insecure_mechs(pn_sasl(t), true); break; } @@ -405,7 +410,7 @@ static bool handle(app_data_t *app, pn_event_t *event) { void run(app_data_t *app) { /* Loop and handle events */ if (app->verbose) { - printf("%s: %s(..) start...\n", __FILE__, __func__); + printf("%s: %s(%s) start...\n", __FILE__, __func__,app->container_id); } start_time = clock(); @@ -431,13 +436,16 @@ double amqp_snd_clock() { } void amqp_snd_th_cleanup(void *app_ptr) { + char thread_name[16]; + app_data_t *app = (app_data_t *)app_ptr; if (app) { app->amqp_snd_th_running = 0; } + pthread_getname_np(app->amqp_snd_th,thread_name,16); - fprintf(stderr, "Exit AMQP SND thread...\n"); + fprintf(stderr, "Exit %s thread...\n", thread_name ); } void *amqp_snd_th(void *app_ptr) { @@ -451,7 +459,13 @@ void *amqp_snd_th(void *app_ptr) { /* Create the proactor and connect */ app->proactor = pn_proactor(); - pn_proactor_connect2(app->proactor, NULL, NULL, addr); + pn_connection_t *c = pn_connection(); + pn_transport_t *t = pn_transport(); + pn_proactor_connect2(app->proactor, c, t, addr); + + if ( app->verbose > 1 ) { + pn_transport_trace(t,PN_TRACE_FRM); + } run(app); diff --git a/generator/gen.c b/generator/gen.c index 09994e06..a3440b7a 100644 --- a/generator/gen.c +++ b/generator/gen.c @@ -41,13 +41,14 @@ static void usage(void) { " amqp_ip ip address of QDR\n" " amqp_port port number of the QDR\n" "optional args:\n" - " -s standalone mode, no QDR (defaults QDR mode)\n" + " -p pre-settle AMQP mode. Non-reliable delivery (defaults to unsettled)\n" " -i container_id should be unique (defaults to sa-RND)\n" " -a amqp_address AMQP address for endpoint (defaults to collectd/telemetry)\n" " -c count message count to stop (defaults to 0 for continuous)\n" " -n cd_per_mesg number of collectd messages per AMQP message (defaults to 1)\n" " -o num_hosts number of hosts to simulate (defaults to 1)\n" " -m metrics_hosts number of metrics per hosts to simulate (defaults to 100)\n" + " -t num_threads number of independent send pthreads (defaults to 1)\n" " -b burst_size maximum number of AMQP msgs to send per credit interval (defaults to # of credits)\n" " -s sleep_usec number of usec to sleep per credit interval (defaults to 0 for no sleep)\n" " -v verbose, print extra info (additional -v increases verbosity)\n" @@ -56,17 +57,17 @@ static void usage(void) { __func__); } -void gen_hosts(app_data_t *app) { +void gen_hosts(app_data_t *app, char *host_prefix) { app->curr_host = 0; app->host_list_len = app->num_hosts * app->num_metrics; // Allocate the host array - app->host_list = malloc( sizeof(host_info_t) * app->host_list_len ); + app->host_list = malloc(sizeof(host_info_t) * app->host_list_len); for (int i = 0; i < app->num_hosts; i++) { for (int j = 0; j < app->num_metrics; j++) { - int index = (i*app->num_metrics)+j; - asprintf(&app->host_list[index].hostname, "host_%d", i); + int index = (i * app->num_metrics) + j; + asprintf(&app->host_list[index].hostname, "host_%s_%d", host_prefix, i); asprintf(&app->host_list[index].metric, "metric_%d", j); app->host_list[index].count = 0; } @@ -93,19 +94,20 @@ int main(int argc, char **argv) { srand(time(0)); - sprintf(cid_buf, "sagen-%x", rand() % 1024); + sprintf(app.container_id, "sagen-%x", rand() % 1024); - app.container_id = cid_buf; /* Should be unique */ - - app.amqp_address = "collectd/telemetry"; + asprintf(&app.amqp_address, "collectd/telemetry"); app.message_count = 0; app.burst_size = 0; app.sleep_usec = 0; app.num_cd_per_mesg = 1; app.num_hosts = 1; app.num_metrics = 100; + app.presettled = false; + + int num_threads = 1; - while ((opt = getopt(argc, argv, "i:a:c:hvb:s:n:o:m:")) != -1) { + while ((opt = getopt(argc, argv, "i:a:c:hvb:s:n:o:m:pt:")) != -1) { switch (opt) { case 'i': sprintf(cid_buf, optarg); @@ -125,12 +127,18 @@ int main(int argc, char **argv) { case 'o': app.num_hosts = atoi(optarg); break; + case 't': + num_threads = atoi(optarg); + break; case 'm': app.num_metrics = atoi(optarg); break; case 'b': app.burst_size = atoi(optarg); break; + case 'p': + app.presettled = true; + break; case 's': app.sleep_usec = atoi(optarg); break; @@ -153,30 +161,36 @@ int main(int argc, char **argv) { app.host = strdup(argv[optind++]); app.port = strdup(argv[optind++]); - app.amqp_snd_th_running = true; + app_data_t apps[num_threads]; + char host_prefix[100]; - gen_hosts(&app); + for (int i = 0; i < num_threads; i++) { + app.amqp_snd_th_running = true; + apps[i] = app; - pthread_create(&app.amqp_snd_th, NULL, amqp_snd_th, (void *)&app); + snprintf(apps[i].container_id, sizeof(apps[i].container_id), "sagen-%03d", i); + printf("%s\n",apps[i].container_id); + snprintf(host_prefix, sizeof(host_prefix), "%d", i); + gen_hosts(&apps[i], host_prefix); - long last_metrics_sent = 0; - long last_amqp_sent = 0; - long last_acknowledged = 0; + pthread_create(&apps[i].amqp_snd_th, NULL, amqp_snd_th, (void *)&apps[i]); + pthread_setname_np(apps[i].amqp_snd_th, apps[i].container_id); + } while (1) { sleep(1); - - printf("metrics_sent: %ld(%ld), amqp_sent: %ld(%ld), ack'd: %ld(%ld), miss: %ld, burst_size: %f\n", - app.metrics_sent, app.metrics_sent - last_metrics_sent, - app.amqp_sent, app.amqp_sent - last_amqp_sent, - app.acknowledged, app.acknowledged - last_acknowledged, - app.metrics_sent - app.acknowledged, - app.amqp_sent / (float)app.total_bursts ); - - last_metrics_sent = app.metrics_sent; - last_amqp_sent = app.amqp_sent; - last_acknowledged = app.acknowledged; - + for (int i = 0; i < num_threads; i++) { + printf("%*s: %ld(%ld), amqp_sent: %ld(%ld), ack'd: %ld(%ld), miss: %ld, burst_size: %f\n", + (int)strlen(apps[i].container_id), apps[i].container_id, + apps[i].metrics_sent, apps[i].metrics_sent - apps[i].metrics_sent_last, + apps[i].amqp_sent, apps[i].amqp_sent - apps[i].amqp_sent_last, + apps[i].acknowledged, apps[i].acknowledged - apps[i].acknowledged_last, + apps[i].metrics_sent - apps[i].acknowledged, + apps[i].amqp_sent / (float)apps[i].total_bursts); + + sample_app_metrics(&apps[i]); + } + printf("----------------------------------------\n"); if (app.amqp_snd_th_running == 0) { printf("Joining amqp_rcv_th...\n"); pthread_join(app.amqp_snd_th, NULL); diff --git a/generator/gen.h b/generator/gen.h index dae193fa..9d4c1efc 100644 --- a/generator/gen.h +++ b/generator/gen.h @@ -1,5 +1,5 @@ -#ifndef _BRIDGE_H -#define _BRIDGE_H 1 +#ifndef _GEN_H +#define _GEN_H 1 #include #include @@ -15,6 +15,7 @@ typedef struct { } host_info_t; typedef struct { + int presettled; int standalone; int verbose; int max_q_depth; @@ -33,8 +34,9 @@ typedef struct { const char *host, *port; - const char *amqp_address; - const char *container_id; + char *amqp_address; + + char container_id[20]; pn_message_t *message; int message_count; @@ -46,14 +48,19 @@ typedef struct { pn_rwbytes_t msgout; /* Buffers for incoming/outgoing messages */ /* Sender values */ - long metrics_sent; - long amqp_sent; - long acknowledged; + volatile long metrics_sent; + volatile long amqp_sent; + volatile long acknowledged; + volatile long metrics_sent_last; + volatile long amqp_sent_last; + volatile long acknowledged_last; host_info_t *host_list; int host_list_len; int curr_host; + char MSG_BUFFER[4096]; + char now_buf[100]; } app_data_t; #endif diff --git a/generator/utils.c b/generator/utils.c index 47efb947..077e9555 100644 --- a/generator/utils.c +++ b/generator/utils.c @@ -1,6 +1,7 @@ #include #include +#include "gen.h" #include "utils.h" void time_diff(struct timespec t1, struct timespec t2, struct timespec *diff) { @@ -56,4 +57,10 @@ void rand_str(char *dest, size_t length, const char *prefix) { *dest++ = charset[index]; } *dest = '\0'; +} + +void sample_app_metrics(app_data_t *app) { + app->amqp_sent_last = app->amqp_sent; + app->metrics_sent_last = app->metrics_sent; + app->acknowledged_last = app->acknowledged; } \ No newline at end of file diff --git a/generator/utils.h b/generator/utils.h index 64b12de5..4416fd6f 100644 --- a/generator/utils.h +++ b/generator/utils.h @@ -12,5 +12,6 @@ void time_diff(struct timespec t1, struct timespec t2, struct timespec *diff); char *time_sprintf(char *buf, struct timespec t1); int64_t now(); void rand_str(char *dest, size_t length, const char *prefix); +void sample_app_metrics(app_data_t *app); #endif \ No newline at end of file