Skip to content

Commit

Permalink
Merge pull request #10 from infrawatch/addpresettletogen
Browse files Browse the repository at this point in the history
add presettled mode (-p), add multithread (-t)
  • Loading branch information
csibbitt authored Jul 7, 2020
2 parents 28557b1 + e3047dd commit 38a3d95
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 60 deletions.
64 changes: 39 additions & 25 deletions generator/amqp_snd_th.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ static time_t start_time;
int batch_count = 0;

static const pn_bytes_t SEND_TIME = {sizeof("SendTime") - 1, "SendTime"};
static const pn_bytes_t METRICS_SENT = {sizeof("AMQPSent") - 1, "AMQPSent"};

/* Close the connection and the listener so so we will get a
* PN_PROACTOR_INACTIVE event and exit, once all outstanding events
Expand Down Expand Up @@ -65,34 +66,32 @@ char *CD_MSG3 = ", \"interval\": 1,\"host\": \"";
char *CD_MSG4 = "\", \"plugin\": \"";
char *CD_MSG5 = "\", \"plugin_instance\": \"pluginInst0\",\"type\": \"type0\",\"type_instance\": \"typInst0\"}";

static char MSG_BUFFER[4096];
static char now_buf[100];

static char *build_mesg(app_data_t *app, char *time_buf) {
char *p = MSG_BUFFER;
int msg_buf_size = sizeof(app->MSG_BUFFER);
char *p = app->MSG_BUFFER;
char val_buff[20];

*p++ = '[';

for (int i = 0; i < app->num_cd_per_mesg;) {
p = memccpy(p, CD_MSG1, '\0', sizeof(MSG_BUFFER));
p = memccpy(p, CD_MSG1, '\0', msg_buf_size);
p--;
sprintf(val_buff, "%ld", app->host_list[app->curr_host].count++);
p = memccpy(p, val_buff, '\0', sizeof(MSG_BUFFER));
p = memccpy(p, val_buff, '\0', msg_buf_size);
p--;
p = memccpy(p, CD_MSG2, '\0', sizeof(MSG_BUFFER));
p = memccpy(p, CD_MSG2, '\0', msg_buf_size);
p--;
p = memccpy(p, time_buf, '\0', sizeof(MSG_BUFFER));
p = memccpy(p, time_buf, '\0', msg_buf_size);
p--;
p = memccpy(p, CD_MSG3, '\0', sizeof(MSG_BUFFER));
p = memccpy(p, CD_MSG3, '\0', msg_buf_size);
p--;
p = memccpy(p, app->host_list[app->curr_host].hostname, '\0', sizeof(MSG_BUFFER));
p = memccpy(p, app->host_list[app->curr_host].hostname, '\0', msg_buf_size);
p--;
p = memccpy(p, CD_MSG4, '\0', sizeof(MSG_BUFFER));
p = memccpy(p, CD_MSG4, '\0', msg_buf_size);
p--;
p = memccpy(p, app->host_list[app->curr_host].metric, '\0', sizeof(MSG_BUFFER));
p = memccpy(p, app->host_list[app->curr_host].metric, '\0', msg_buf_size);
p--;
p = memccpy(p, CD_MSG5, '\0', sizeof(MSG_BUFFER));
p = memccpy(p, CD_MSG5, '\0', msg_buf_size);
p--;

if (++i < app->num_cd_per_mesg) {
Expand All @@ -107,7 +106,7 @@ static char *build_mesg(app_data_t *app, char *time_buf) {
*p++ = ']';
*p = '\0';

return MSG_BUFFER;
return app->MSG_BUFFER;
}

static void gen_mesg(pn_rwbytes_t *buf, app_data_t *app, char *time_buf) {
Expand Down Expand Up @@ -138,6 +137,8 @@ static void send_message(app_data_t *app, pn_link_t *sender, pn_rwbytes_t *data)
pn_data_enter(props);
pn_data_put_string(props, pn_bytes(SEND_TIME.size, SEND_TIME.start));
pn_data_put_long(props, stime);
pn_data_put_string(props, pn_bytes(METRICS_SENT.size, METRICS_SENT.start));
pn_data_put_long(props, app->amqp_sent);
pn_data_exit(props);

pn_data_t *body = pn_message_body(message);
Expand Down Expand Up @@ -168,24 +169,26 @@ static bool send_burst(app_data_t *app, pn_event_t *event) {
struct timespec now;

clock_gettime(CLOCK_REALTIME, &now);
time_sprintf(now_buf, now);
time_sprintf(app->now_buf, now);

app->total_bursts++;
app->burst_credit += credits;
while (pn_link_credit(sender) > 0) {
if (app->message_count > 0 && app->metrics_sent == app->message_count) {
break;
return 0;
}
app->amqp_sent++;
app->metrics_sent += app->num_cd_per_mesg;

/* Use sent counter as unique delivery tag. */
pn_delivery(sender, pn_dtag((const char *)&app->metrics_sent,
pn_delivery_t *dlv = pn_delivery(sender, pn_dtag((const char *)&app->metrics_sent,
sizeof(app->metrics_sent)));
pn_rwbytes_t data;

gen_mesg(&data, app, now_buf);
gen_mesg(&data, app, app->now_buf);
send_message(app, sender, &data);
if (app->presettled)
pn_delivery_settle(dlv);
if (app->burst_size > 0 && ++burst >= app->burst_size) {
break;
}
Expand All @@ -207,7 +210,8 @@ static bool handle(app_data_t *app, pn_event_t *event) {
if (app->verbose > 1) {
printf("PN_LINK_FLOW %d\n", pn_link_credit(sender));
}
send_burst(app, event);
//printf("link_credits: %d, sent: %ld\n",pn_link_credit(sender), app->amqp_sent );
exit_code = send_burst(app, event);
break;
}

Expand All @@ -230,7 +234,8 @@ static bool handle(app_data_t *app, pn_event_t *event) {
pn_delivery_t *d = pn_event_delivery(event);

if (pn_delivery_remote_state(d) == PN_ACCEPTED) {
pn_delivery_settle(d);
if (app->presettled==false)
pn_delivery_settle(d);
app->acknowledged += app->num_cd_per_mesg;
if (app->acknowledged == app->message_count) {
printf("%ld messages metrics_sent and acknowledged\n",
Expand Down Expand Up @@ -270,7 +275,7 @@ static bool handle(app_data_t *app, pn_event_t *event) {
pn_link_t *sender = pn_sender(s, link_name);
app->sender = sender;
pn_terminus_set_address(pn_link_target(sender), app->amqp_address);
pn_link_set_snd_settle_mode(sender, PN_SND_UNSETTLED);
pn_link_set_snd_settle_mode(sender, PN_SND_MIXED);
pn_link_set_rcv_settle_mode(sender, PN_RCV_FIRST);
pn_link_open(sender);
}
Expand All @@ -290,7 +295,7 @@ static bool handle(app_data_t *app, pn_event_t *event) {
pn_transport_t *t = pn_event_transport(event);
pn_transport_require_auth(t, false);
pn_sasl_allowed_mechs(pn_sasl(t), "ANONYMOUS");
//pn_sasl_set_allow_insecure_mechs(pn_sasl(t), true);
pn_sasl_set_allow_insecure_mechs(pn_sasl(t), true);

break;
}
Expand Down Expand Up @@ -405,7 +410,7 @@ static bool handle(app_data_t *app, pn_event_t *event) {
void run(app_data_t *app) {
/* Loop and handle events */
if (app->verbose) {
printf("%s: %s(..) start...\n", __FILE__, __func__);
printf("%s: %s(%s) start...\n", __FILE__, __func__,app->container_id);
}

start_time = clock();
Expand All @@ -431,13 +436,16 @@ double amqp_snd_clock() {
}

void amqp_snd_th_cleanup(void *app_ptr) {
char thread_name[16];

app_data_t *app = (app_data_t *)app_ptr;

if (app) {
app->amqp_snd_th_running = 0;
}
pthread_getname_np(app->amqp_snd_th,thread_name,16);

fprintf(stderr, "Exit AMQP SND thread...\n");
fprintf(stderr, "Exit %s thread...\n", thread_name );
}

void *amqp_snd_th(void *app_ptr) {
Expand All @@ -451,7 +459,13 @@ void *amqp_snd_th(void *app_ptr) {
/* Create the proactor and connect */
app->proactor = pn_proactor();

pn_proactor_connect2(app->proactor, NULL, NULL, addr);
pn_connection_t *c = pn_connection();
pn_transport_t *t = pn_transport();
pn_proactor_connect2(app->proactor, c, t, addr);

if ( app->verbose > 1 ) {
pn_transport_trace(t,PN_TRACE_FRM);
}

run(app);

Expand Down
70 changes: 42 additions & 28 deletions generator/gen.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,14 @@ static void usage(void) {
" amqp_ip ip address of QDR\n"
" amqp_port port number of the QDR\n"
"optional args:\n"
" -s standalone mode, no QDR (defaults QDR mode)\n"
" -p pre-settle AMQP mode. Non-reliable delivery (defaults to unsettled)\n"
" -i container_id should be unique (defaults to sa-RND)\n"
" -a amqp_address AMQP address for endpoint (defaults to collectd/telemetry)\n"
" -c count message count to stop (defaults to 0 for continuous)\n"
" -n cd_per_mesg number of collectd messages per AMQP message (defaults to 1)\n"
" -o num_hosts number of hosts to simulate (defaults to 1)\n"
" -m metrics_hosts number of metrics per hosts to simulate (defaults to 100)\n"
" -t num_threads number of independent send pthreads (defaults to 1)\n"
" -b burst_size maximum number of AMQP msgs to send per credit interval (defaults to # of credits)\n"
" -s sleep_usec number of usec to sleep per credit interval (defaults to 0 for no sleep)\n"
" -v verbose, print extra info (additional -v increases verbosity)\n"
Expand All @@ -56,17 +57,17 @@ static void usage(void) {
__func__);
}

void gen_hosts(app_data_t *app) {
void gen_hosts(app_data_t *app, char *host_prefix) {
app->curr_host = 0;

app->host_list_len = app->num_hosts * app->num_metrics;

// Allocate the host array
app->host_list = malloc( sizeof(host_info_t) * app->host_list_len );
app->host_list = malloc(sizeof(host_info_t) * app->host_list_len);
for (int i = 0; i < app->num_hosts; i++) {
for (int j = 0; j < app->num_metrics; j++) {
int index = (i*app->num_metrics)+j;
asprintf(&app->host_list[index].hostname, "host_%d", i);
int index = (i * app->num_metrics) + j;
asprintf(&app->host_list[index].hostname, "host_%s_%d", host_prefix, i);
asprintf(&app->host_list[index].metric, "metric_%d", j);
app->host_list[index].count = 0;
}
Expand All @@ -93,19 +94,20 @@ int main(int argc, char **argv) {

srand(time(0));

sprintf(cid_buf, "sagen-%x", rand() % 1024);
sprintf(app.container_id, "sagen-%x", rand() % 1024);

app.container_id = cid_buf; /* Should be unique */

app.amqp_address = "collectd/telemetry";
asprintf(&app.amqp_address, "collectd/telemetry");
app.message_count = 0;
app.burst_size = 0;
app.sleep_usec = 0;
app.num_cd_per_mesg = 1;
app.num_hosts = 1;
app.num_metrics = 100;
app.presettled = false;

int num_threads = 1;

while ((opt = getopt(argc, argv, "i:a:c:hvb:s:n:o:m:")) != -1) {
while ((opt = getopt(argc, argv, "i:a:c:hvb:s:n:o:m:pt:")) != -1) {
switch (opt) {
case 'i':
sprintf(cid_buf, optarg);
Expand All @@ -125,12 +127,18 @@ int main(int argc, char **argv) {
case 'o':
app.num_hosts = atoi(optarg);
break;
case 't':
num_threads = atoi(optarg);
break;
case 'm':
app.num_metrics = atoi(optarg);
break;
case 'b':
app.burst_size = atoi(optarg);
break;
case 'p':
app.presettled = true;
break;
case 's':
app.sleep_usec = atoi(optarg);
break;
Expand All @@ -153,30 +161,36 @@ int main(int argc, char **argv) {
app.host = strdup(argv[optind++]);
app.port = strdup(argv[optind++]);

app.amqp_snd_th_running = true;
app_data_t apps[num_threads];
char host_prefix[100];

gen_hosts(&app);
for (int i = 0; i < num_threads; i++) {
app.amqp_snd_th_running = true;
apps[i] = app;

pthread_create(&app.amqp_snd_th, NULL, amqp_snd_th, (void *)&app);
snprintf(apps[i].container_id, sizeof(apps[i].container_id), "sagen-%03d", i);
printf("%s\n",apps[i].container_id);
snprintf(host_prefix, sizeof(host_prefix), "%d", i);
gen_hosts(&apps[i], host_prefix);

long last_metrics_sent = 0;
long last_amqp_sent = 0;
long last_acknowledged = 0;
pthread_create(&apps[i].amqp_snd_th, NULL, amqp_snd_th, (void *)&apps[i]);
pthread_setname_np(apps[i].amqp_snd_th, apps[i].container_id);
}

while (1) {
sleep(1);

printf("metrics_sent: %ld(%ld), amqp_sent: %ld(%ld), ack'd: %ld(%ld), miss: %ld, burst_size: %f\n",
app.metrics_sent, app.metrics_sent - last_metrics_sent,
app.amqp_sent, app.amqp_sent - last_amqp_sent,
app.acknowledged, app.acknowledged - last_acknowledged,
app.metrics_sent - app.acknowledged,
app.amqp_sent / (float)app.total_bursts );

last_metrics_sent = app.metrics_sent;
last_amqp_sent = app.amqp_sent;
last_acknowledged = app.acknowledged;

for (int i = 0; i < num_threads; i++) {
printf("%*s: %ld(%ld), amqp_sent: %ld(%ld), ack'd: %ld(%ld), miss: %ld, burst_size: %f\n",
(int)strlen(apps[i].container_id), apps[i].container_id,
apps[i].metrics_sent, apps[i].metrics_sent - apps[i].metrics_sent_last,
apps[i].amqp_sent, apps[i].amqp_sent - apps[i].amqp_sent_last,
apps[i].acknowledged, apps[i].acknowledged - apps[i].acknowledged_last,
apps[i].metrics_sent - apps[i].acknowledged,
apps[i].amqp_sent / (float)apps[i].total_bursts);

sample_app_metrics(&apps[i]);
}
printf("----------------------------------------\n");
if (app.amqp_snd_th_running == 0) {
printf("Joining amqp_rcv_th...\n");
pthread_join(app.amqp_snd_th, NULL);
Expand Down
21 changes: 14 additions & 7 deletions generator/gen.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#ifndef _BRIDGE_H
#define _BRIDGE_H 1
#ifndef _GEN_H
#define _GEN_H 1

#include <proton/condition.h>
#include <proton/listener.h>
Expand All @@ -15,6 +15,7 @@ typedef struct {
} host_info_t;

typedef struct {
int presettled;
int standalone;
int verbose;
int max_q_depth;
Expand All @@ -33,8 +34,9 @@ typedef struct {

const char *host, *port;

const char *amqp_address;
const char *container_id;
char *amqp_address;

char container_id[20];

pn_message_t *message;
int message_count;
Expand All @@ -46,14 +48,19 @@ typedef struct {
pn_rwbytes_t msgout; /* Buffers for incoming/outgoing messages */

/* Sender values */
long metrics_sent;
long amqp_sent;
long acknowledged;
volatile long metrics_sent;
volatile long amqp_sent;
volatile long acknowledged;
volatile long metrics_sent_last;
volatile long amqp_sent_last;
volatile long acknowledged_last;

host_info_t *host_list;
int host_list_len;
int curr_host;

char MSG_BUFFER[4096];
char now_buf[100];
} app_data_t;

#endif
7 changes: 7 additions & 0 deletions generator/utils.c
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include <stdlib.h>
#include <string.h>

#include "gen.h"
#include "utils.h"

void time_diff(struct timespec t1, struct timespec t2, struct timespec *diff) {
Expand Down Expand Up @@ -56,4 +57,10 @@ void rand_str(char *dest, size_t length, const char *prefix) {
*dest++ = charset[index];
}
*dest = '\0';
}

void sample_app_metrics(app_data_t *app) {
app->amqp_sent_last = app->amqp_sent;
app->metrics_sent_last = app->metrics_sent;
app->acknowledged_last = app->acknowledged;
}
1 change: 1 addition & 0 deletions generator/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@ void time_diff(struct timespec t1, struct timespec t2, struct timespec *diff);
char *time_sprintf(char *buf, struct timespec t1);
int64_t now();
void rand_str(char *dest, size_t length, const char *prefix);
void sample_app_metrics(app_data_t *app);

#endif

0 comments on commit 38a3d95

Please sign in to comment.