-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Influxdb configurable prefix #2586
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
/** @file | ||
String formatting a-la MQTT topic | ||
|
||
Copyright (C) 2019 Christian Zuckschwerdt | ||
|
||
This program is free software; you can redistribute it and/or modify | ||
it under the terms of the GNU General Public License as published by | ||
the Free Software Foundation; either version 2 of the License, or | ||
(at your option) any later version. | ||
*/ | ||
|
||
#include "data.h" | ||
|
||
typedef char *(expand_string_sanitizer)(char *, char *); | ||
|
||
char *expand_topic_string(char *topic, char const *format, data_t *data, char const *hostname, expand_string_sanitizer sanitizer); | ||
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -40,6 +40,7 @@ add_library(r_433 STATIC | |
rfraw.c | ||
samp_grab.c | ||
sdr.c | ||
string_expand.c | ||
term_ctl.c | ||
util.c | ||
write_sigrok.c | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,6 +18,7 @@ | |
#include "logger.h" | ||
#include "fatal.h" | ||
#include "r_util.h" | ||
#include "string_expand.h" | ||
|
||
#include <stdlib.h> | ||
#include <stdio.h> | ||
|
@@ -28,15 +29,19 @@ | |
|
||
/* InfluxDB client abstraction / printer */ | ||
|
||
#define RTL433_INFLUX_URL_LENGTH 399 | ||
#define RTL433_INFLUX_METRIC_LENGTH 99 | ||
|
||
typedef struct { | ||
struct data_output output; | ||
struct mg_mgr *mgr; | ||
struct mg_connection *conn; | ||
int prev_status; | ||
int prev_resp_code; | ||
char hostname[64]; | ||
char url[400]; | ||
char url[RTL433_INFLUX_URL_LENGTH + 1]; | ||
char extra_headers[150]; | ||
char metric_format[RTL433_INFLUX_METRIC_LENGTH + 1]; | ||
tls_opts_t tls_opts; | ||
int databufidxfill; | ||
struct mbuf databufs[2]; | ||
|
@@ -89,11 +94,11 @@ static void influx_client_event(struct mg_connection *nc, int ev, void *ev_data) | |
} | ||
} | ||
|
||
static influx_client_t *influx_client_init(influx_client_t *ctx, char const *url, char const *token) | ||
static influx_client_t *influx_client_init(influx_client_t *ctx, char const *url, char const *token, const char *metric_format) | ||
{ | ||
strncpy(ctx->url, url, sizeof(ctx->url)); | ||
ctx->url[sizeof(ctx->url) - 1] = '\0'; | ||
strncpy(ctx->url, url, RTL433_INFLUX_URL_LENGTH); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure about getting rid of the zero termination. We kind of know that the influx_client_t should be zero filled. But glancing at a strncpy without accompanying termination it looks like a bug. |
||
snprintf(ctx->extra_headers, sizeof (ctx->extra_headers), "Authorization: Token %s\r\n", token); | ||
strncpy(ctx->metric_format, metric_format, RTL433_INFLUX_METRIC_LENGTH); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Clang is right, this is a bug ;) |
||
|
||
return ctx; | ||
} | ||
|
@@ -309,15 +314,20 @@ static void R_API_CALLCONV print_influx_data(data_output_t *output, data_t *data | |
data_time = d; | ||
} | ||
|
||
if (!data_model) { | ||
if (influx->metric_format[0]) { | ||
char metric_name[1000]; | ||
mbuf_reserve(buf, sizeof (metric_name) + 1000); | ||
expand_topic_string(metric_name, influx->metric_format, data, influx->hostname, influx_sanitize_tag); | ||
mbuf_snprintf(buf, "%s", metric_name); | ||
} else if (!data_model) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure about this? |
||
// data isn't from device (maybe report for example) | ||
// use hostname for measurement | ||
|
||
mbuf_reserve(buf, 20000); | ||
mbuf_snprintf(buf, "rtl_433_%s", influx->hostname); | ||
} | ||
else { | ||
// use model for measurement | ||
// default: use "model" for metric name | ||
|
||
mbuf_reserve(buf, 1000); | ||
str = &buf->buf[buf->len]; | ||
|
@@ -327,13 +337,25 @@ static void R_API_CALLCONV print_influx_data(data_output_t *output, data_t *data | |
|
||
// write tags | ||
while (data) { | ||
if (!strcmp(data->key, "model") | ||
|| !strcmp(data->key, "time")) { | ||
if ((!strcmp(data->key, "model") && strstr(influx->metric_format, "[model")) | ||
|| (!strcmp(data->key, "type") && strstr(influx->metric_format, "[type")) | ||
|| (!strcmp(data->key, "subtype") && strstr(influx->metric_format, "[subtype")) | ||
|| (!strcmp(data->key, "hostname") && strstr(influx->metric_format, "[hostname")) | ||
|| (!strcmp(data->key, "channel") && strstr(influx->metric_format, "[channel")) | ||
|| (!strcmp(data->key, "id") && strstr(influx->metric_format, "[id")) | ||
|| (!strcmp(data->key, "protocol") && strstr(influx->metric_format, "[protocol"))) { | ||
// this field is already encoded in the metric name -> skip | ||
} else if (!strcmp(data->key, "model") && !influx->metric_format[0]) { | ||
// non-configurable metric format uses "model" for metric name -> skip | ||
} else if (!strcmp(data->key, "time")) { | ||
// skip | ||
} | ||
else if (!strcmp(data->key, "type") | ||
|| !strcmp(data->key, "subtype") | ||
|| !strcmp(data->key, "protocol") | ||
|| !strcmp(data->key, "id") | ||
|| !strcmp(data->key, "dst_id") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You're right that these are not metadata, but to me, the real question is whether this is something that I, as a consumer of these data, should be filtering on, or whether it's some runtime data that I want to plot. In InfluxDB terms (cf. line protocol), these "filterable" items correspond to tags, in Prometheus these are called labels. I don't use Somfy-IOC myself (it's just some neighbor's roller shutters that I hear clearly in this apartment building), but the data looked pretty static and I was thinking that it makes more sense to produce separate time series based on the (source, destination) pairs. That way users can track communication of the individual device pairs as separate series (if they choose to). I'll move this to a separate patch, though, and if you think that it's still a wrong idea, I'm OK with dropping it as well. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It just struck me as pretty arbitrary to filter this one random key. It might look similar to the "id" key but is really just a random data field like all the others. So I assumed an understandable mixup. Yes, let's discuss (universal) handling in another PR. |
||
|| !strcmp(data->key, "src_id") | ||
|| !strcmp(data->key, "channel") | ||
|| !strcmp(data->key, "mic")) { | ||
str = mbuf_snprintf(buf, ",%s=", data->key); | ||
|
@@ -362,7 +384,10 @@ static void R_API_CALLCONV print_influx_data(data_output_t *output, data_t *data | |
} | ||
else if (!strcmp(data->key, "type") | ||
|| !strcmp(data->key, "subtype") | ||
|| !strcmp(data->key, "protocol") | ||
|| !strcmp(data->key, "id") | ||
|| !strcmp(data->key, "dst_id") | ||
|| !strcmp(data->key, "src_id") | ||
|| !strcmp(data->key, "channel") | ||
|| !strcmp(data->key, "mic")) { | ||
// skip | ||
|
@@ -457,6 +482,7 @@ struct data_output *data_output_influx_create(struct mg_mgr *mgr, char *opts) | |
influx_sanitize_tag(influx->hostname, NULL); | ||
|
||
char *token = NULL; | ||
char *metric_format = NULL; | ||
|
||
// param/opts starts with URL | ||
char *url = opts; | ||
|
@@ -486,6 +512,11 @@ struct data_output *data_output_influx_create(struct mg_mgr *mgr, char *opts) | |
exit(1); | ||
} | ||
|
||
if (strlen(url) > RTL433_INFLUX_URL_LENGTH) { | ||
print_logf(LOG_FATAL, __func__, "InfluxDB URL too long, sorry"); | ||
exit(1); | ||
} | ||
|
||
// parse auth and format options | ||
char *key, *val; | ||
while (getkwargs(&opts, &key, &val)) { | ||
|
@@ -495,6 +526,8 @@ struct data_output *data_output_influx_create(struct mg_mgr *mgr, char *opts) | |
continue; | ||
else if (!strcasecmp(key, "t") || !strcasecmp(key, "token")) | ||
token = val; | ||
else if (!strcasecmp(key, "metric")) | ||
metric_format = val; | ||
else if (!tls_param(&influx->tls_opts, key, val)) { | ||
// ok | ||
} | ||
|
@@ -504,17 +537,24 @@ struct data_output *data_output_influx_create(struct mg_mgr *mgr, char *opts) | |
} | ||
} | ||
|
||
if (metric_format && strlen(metric_format) > RTL433_INFLUX_METRIC_LENGTH) { | ||
print_logf(LOG_FATAL, __func__, "InfluxDB \"metric\" formatter too long, sorry"); | ||
exit(1); | ||
} | ||
|
||
influx->output.print_data = print_influx_data; | ||
influx->output.print_array = print_influx_array; | ||
influx->output.print_string = print_influx_string; | ||
influx->output.print_double = print_influx_double; | ||
influx->output.print_int = print_influx_int; | ||
influx->output.output_free = data_output_influx_free; | ||
|
||
print_logf(LOG_CRITICAL, "InfluxDB", "Publishing data to InfluxDB (%s)", url); | ||
print_logf(LOG_CRITICAL, "InfluxDB", "Publishing data to InfluxDB (%s, %s%s)", url, | ||
metric_format ? "dynamic metric " : "static metric", | ||
metric_format ? metric_format : ""); | ||
|
||
influx->mgr = mgr; | ||
influx_client_init(influx, url, token); | ||
influx_client_init(influx, url, token, metric_format); | ||
|
||
return &influx->output; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we could run the sanitizer after each expand_topic_string call (it would be more efficient for multiple expansions and less if there was no expansion, but overall neglible). Less nesting/passing of calls and a clear order of expand then sanitize in the calling code. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, good as it is. We need to run the sanitizer on each trailing part so we don't remove the e.g. slashes in MQTT topics.