Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. Weโ€™ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

INTERNAL: Refactor lq_detect_buffer struct #808

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 22 additions & 29 deletions lqdetect.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,17 @@
#include <pthread.h>
#include <sys/time.h>
#include <assert.h>
#include <math.h>

#include "memcached/util.h"

#define LQ_THRESHOLD_DEFAULT 4000
#define LQ_QUERY_SIZE (64*2+64) /* bop get (longest query) : "<longest bkey>..<longest bkey> efilter <offset> <count> delete" */
#define LQ_KEY_SIZE 250 /* the max size of key string */
#define LQ_SAVE_CNT 20 /* save key count */
#define LQ_INPUT_SIZE 500 /* the size of input(time, ip, command, argument) */
#define LQ_STAT_STRLEN 300 /* max length of stats */
#define LQ_TIME_SIZE 16 /* time string size */
#define LQ_IP_SIZE 39 /* ipv6 max string size */

/* lqdetect state */
#define LQ_EXPLICIT_STOP 0 /* stop by user request */
Expand Down Expand Up @@ -58,13 +60,14 @@ struct lq_detect_stats {
struct lq_detect_argument {
char query[LQ_QUERY_SIZE];
char key[LQ_KEY_SIZE+1];
char time[LQ_TIME_SIZE+1];
char client_ip[LQ_IP_SIZE+1];
uint32_t count;
uint32_t overhead;
};

/* lqdetect buffer structure */
struct lq_detect_buffer {
char *data;
uint32_t offset;
uint32_t ntotal;
uint32_t nsaved;
Expand Down Expand Up @@ -109,9 +112,6 @@ static bool is_command_duplicated(char *key, enum lq_detect_command cmd, struct
}
}
}
if (arg->count == 0) {
snprintf(arg->key, sizeof(arg->key), "%s", key);
}
break;
}
return false;
Expand All @@ -124,7 +124,6 @@ static void do_lqdetect_write(char *client_ip, char *key,
struct timeval val;
struct lq_detect_buffer *buffer = &lqdetect.buffer[cmd];
uint32_t nsaved = buffer->nsaved;
uint32_t length = ((nsaved+1) * LQ_INPUT_SIZE);
int keylen = strlen(key);
char keybuf[LQ_KEY_SIZE+1];
char *keyptr = key;
Expand All @@ -142,11 +141,12 @@ static void do_lqdetect_write(char *client_ip, char *key,
gettimeofday(&val, NULL);
ptm = localtime(&val.tv_sec);

snprintf(buffer->data + buffer->offset, length - buffer->offset,
"%02d:%02d:%02d.%06ld %s <%u> %s %s %s\n",
ptm->tm_hour, ptm->tm_min, ptm->tm_sec, (long)val.tv_usec,
client_ip, arg->overhead, command_str[cmd], keyptr, arg->query);
buffer->offset += strlen(buffer->data + buffer->offset);
buffer->offset += snprintf(arg->time, sizeof(arg->time), "%02d:%02d:%02d.%06ld",
ptm->tm_hour, ptm->tm_min, ptm->tm_sec, (long)val.tv_usec) + 1; /* +1 for space */
buffer->offset += snprintf(arg->client_ip, sizeof(arg->client_ip), "%s", client_ip) + 1; /* +1 for space */
buffer->offset += snprintf(arg->key, sizeof(arg->key), "%s", keyptr) + 1; /* +1 for space */
buffer->offset += (int)log10(arg->overhead) + 1 + 3; /* overhead + < + > + space */
buffer->offset += strlen(command_str[cmd]) + strlen(arg->query) + 2; /* query + command + spaces */
lqdetect.arg[cmd][nsaved] = *arg;
buffer->nsaved += 1;
}
Expand Down Expand Up @@ -220,26 +220,11 @@ int lqdetect_init(EXTENSION_LOGGER_DESCRIPTOR *logger)
memset(lqdetect.buffer, 0, LQ_CMD_NUM * sizeof(struct lq_detect_buffer));
memset(&lqdetect.stats, 0, sizeof(struct lq_detect_stats));
for (int ii = 0; ii < LQ_CMD_NUM; ii++) {
lqdetect.buffer[ii].data = malloc(LQ_SAVE_CNT * LQ_INPUT_SIZE);
if (lqdetect.buffer[ii].data == NULL) {
while (--ii >= 0) {
free(lqdetect.buffer[ii].data);
}
return -1;
}
memset(lqdetect.arg[ii], 0, LQ_SAVE_CNT * sizeof(struct lq_detect_argument));
}
return 0;
}

void lqdetect_final(void)
{
for (int ii = 0; ii < LQ_CMD_NUM; ii++) {
free(lqdetect.buffer[ii].data);
lqdetect.buffer[ii].data = NULL;
}
}

int lqdetect_start(uint32_t threshold, bool *already_started)
{
int ret = 0;
Expand Down Expand Up @@ -320,20 +305,28 @@ char *lqdetect_stats(void)
char *lqdetect_result_get(int *size)
{
int offset = 0;
int length = 32 * LQ_CMD_NUM; // header length
int length = 1; /* for null-terminator */
char *str;

pthread_mutex_lock(&lqdetect.lock);
for (int i = 0; i < LQ_CMD_NUM; i++) {
if (lqdetect.buffer[i].ntotal == 0) {
length += strlen(command_str[i]) + 1 + 4; /* command + space + ": 0\n" */
continue;
}
length += lqdetect.buffer[i].offset;
length += strlen(command_str[i]) + (int)log10(lqdetect.buffer[i].ntotal) + 1 + 4; /* command + space + ": " + number + "\n" */
}
str = (char*)malloc(length);
if (str != NULL) {
for (int i = 0; i < LQ_CMD_NUM; i++) {
struct lq_detect_buffer *ldb = &lqdetect.buffer[i];
offset += snprintf(str + offset, length - offset, "%s : %u\n", command_str[i], ldb->ntotal);
if (ldb->ntotal > 0) {
offset += snprintf(str + offset, length - offset, "%s", ldb->data);
for (int j = 0; j < ldb->nsaved; j++) {
offset += snprintf(str + offset, length - offset,
"%s %s <%u> %s %s %s\n",
lqdetect.arg[i][j].time, lqdetect.arg[i][j].client_ip, lqdetect.arg[i][j].overhead,
command_str[i], lqdetect.arg[i][j].key, lqdetect.arg[i][j].query);
}
}
}
Expand Down
1 change: 0 additions & 1 deletion lqdetect.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
extern bool lqdetect_in_use;

int lqdetect_init(EXTENSION_LOGGER_DESCRIPTOR *logger);
void lqdetect_final(void);
char *lqdetect_result_get(int *size);
int lqdetect_start(uint32_t threshold, bool *already_started);
void lqdetect_stop(bool *already_stopped);
Expand Down
3 changes: 0 additions & 3 deletions memcached.c
Original file line number Diff line number Diff line change
Expand Up @@ -15861,9 +15861,6 @@ int main (int argc, char **argv)
/* 6) destroy data structures */
#ifdef COMMAND_LOGGING
cmdlog_final(); /* finalize command logging */
#endif
#ifdef DETECT_LONG_QUERY
lqdetect_final(); /* finalize long query detection */
#endif
mc_engine.v1->destroy(mc_engine.v0);
mc_logger->log(EXTENSION_LOG_INFO, NULL, "Memcached engine destroyed.\n");
Expand Down