Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Influxdb configurable prefix #2586

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,7 @@ E.g. -X "n=doorbell,m=OOK_PWM,s=400,l=800,r=7000,g=1000,match={24}0xa9878c,repea
Specify InfluxDB 2.0 server with e.g. -F "influx://localhost:9999/api/v2/write?org=<org>&bucket=<bucket>,token=<authtoken>"
Specify InfluxDB 1.x server with e.g. -F "influx://localhost:8086/write?db=<db>&p=<password>&u=<user>"
Additional parameter -M time:unix:usec:utc for correct timestamps in InfluxDB recommended
Additional parameter `metric` supports expanding keys (see MQTT for details).
Specify host/port for syslog with e.g. -F syslog:127.0.0.1:1514


Expand Down
1 change: 1 addition & 0 deletions conf/rtl_433.example.conf
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ signal_grabber none
# Specify InfluxDB 2.0 server with e.g. -F "influx://localhost:9999/api/v2/write?org=<org>&bucket=<bucket>,token=<authtoken>"
# Specify InfluxDB 1.x server with e.g. -F "influx://localhost:8086/write?db=<db>&p=<password>&u=<user>"
# Additional parameter -M time:unix:usec:utc for correct timestamps in InfluxDB recommended
# Additional parameter `metric` supports expanding keys (see MQTT for details).
# Specify host/port for syslog with e.g. -F syslog:127.0.0.1:1514
# default is "kv", multiple outputs can be used.
output json
Expand Down
16 changes: 16 additions & 0 deletions include/string_expand.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/** @file
String formatting a-la MQTT topic

Copyright (C) 2019 Christian Zuckschwerdt

This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
*/

#include "data.h"

typedef char *(expand_string_sanitizer)(char *, char *);

char *expand_topic_string(char *topic, char const *format, data_t *data, char const *hostname, expand_string_sanitizer sanitizer);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could run the sanitizer after each expand_topic_string call (it would be more efficient for multiple expansions and less if there was no expansion, but overall neglible). Less nesting/passing of calls and a clear order of expand then sanitize in the calling code. What do you think?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, good as it is. We need to run the sanitizer on each trailing part so we don't remove the e.g. slashes in MQTT topics.

1 change: 1 addition & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ add_library(r_433 STATIC
rfraw.c
samp_grab.c
sdr.c
string_expand.c
term_ctl.c
util.c
write_sigrok.c
Expand Down
60 changes: 50 additions & 10 deletions src/output_influx.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "logger.h"
#include "fatal.h"
#include "r_util.h"
#include "string_expand.h"

#include <stdlib.h>
#include <stdio.h>
Expand All @@ -28,15 +29,19 @@

/* InfluxDB client abstraction / printer */

#define RTL433_INFLUX_URL_LENGTH 399
#define RTL433_INFLUX_METRIC_LENGTH 99

typedef struct {
struct data_output output;
struct mg_mgr *mgr;
struct mg_connection *conn;
int prev_status;
int prev_resp_code;
char hostname[64];
char url[400];
char url[RTL433_INFLUX_URL_LENGTH + 1];
char extra_headers[150];
char metric_format[RTL433_INFLUX_METRIC_LENGTH + 1];
tls_opts_t tls_opts;
int databufidxfill;
struct mbuf databufs[2];
Expand Down Expand Up @@ -89,11 +94,11 @@ static void influx_client_event(struct mg_connection *nc, int ev, void *ev_data)
}
}

static influx_client_t *influx_client_init(influx_client_t *ctx, char const *url, char const *token)
static influx_client_t *influx_client_init(influx_client_t *ctx, char const *url, char const *token, const char *metric_format)
{
strncpy(ctx->url, url, sizeof(ctx->url));
ctx->url[sizeof(ctx->url) - 1] = '\0';
strncpy(ctx->url, url, RTL433_INFLUX_URL_LENGTH);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about getting rid of the zero termination. We kind of know that the influx_client_t should be zero filled. But glancing at a strncpy without accompanying termination it looks like a bug.

snprintf(ctx->extra_headers, sizeof (ctx->extra_headers), "Authorization: Token %s\r\n", token);
strncpy(ctx->metric_format, metric_format, RTL433_INFLUX_METRIC_LENGTH);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clang is right, this is a bug ;)


return ctx;
}
Expand Down Expand Up @@ -309,15 +314,20 @@ static void R_API_CALLCONV print_influx_data(data_output_t *output, data_t *data
data_time = d;
}

if (!data_model) {
if (influx->metric_format[0]) {
char metric_name[1000];
mbuf_reserve(buf, sizeof (metric_name) + 1000);
expand_topic_string(metric_name, influx->metric_format, data, influx->hostname, influx_sanitize_tag);
mbuf_snprintf(buf, "%s", metric_name);
} else if (!data_model) {
Copy link
Collaborator

@zuckschwerdt zuckschwerdt Aug 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about this? influx->metric_format is a user setting but data_model is an event property, is there a logic to mixing those within a condition?

// data isn't from device (maybe report for example)
// use hostname for measurement

mbuf_reserve(buf, 20000);
mbuf_snprintf(buf, "rtl_433_%s", influx->hostname);
}
else {
// use model for measurement
// default: use "model" for metric name

mbuf_reserve(buf, 1000);
str = &buf->buf[buf->len];
Expand All @@ -327,13 +337,25 @@ static void R_API_CALLCONV print_influx_data(data_output_t *output, data_t *data

// write tags
while (data) {
if (!strcmp(data->key, "model")
|| !strcmp(data->key, "time")) {
if ((!strcmp(data->key, "model") && strstr(influx->metric_format, "[model"))
|| (!strcmp(data->key, "type") && strstr(influx->metric_format, "[type"))
|| (!strcmp(data->key, "subtype") && strstr(influx->metric_format, "[subtype"))
|| (!strcmp(data->key, "hostname") && strstr(influx->metric_format, "[hostname"))
|| (!strcmp(data->key, "channel") && strstr(influx->metric_format, "[channel"))
|| (!strcmp(data->key, "id") && strstr(influx->metric_format, "[id"))
|| (!strcmp(data->key, "protocol") && strstr(influx->metric_format, "[protocol"))) {
// this field is already encoded in the metric name -> skip
} else if (!strcmp(data->key, "model") && !influx->metric_format[0]) {
// non-configurable metric format uses "model" for metric name -> skip
} else if (!strcmp(data->key, "time")) {
// skip
}
else if (!strcmp(data->key, "type")
|| !strcmp(data->key, "subtype")
|| !strcmp(data->key, "protocol")
|| !strcmp(data->key, "id")
|| !strcmp(data->key, "dst_id")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dst_id is not metadata it's part of the Somfy remote commands, there is no src_id?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right that these are not metadata, but to me, the real question is whether this is something that I, as a consumer of these data, should be filtering on, or whether it's some runtime data that I want to plot. In InfluxDB terms (cf. line protocol), these "filterable" items correspond to tags, in Prometheus these are called labels.

I don't use Somfy-IOC myself (it's just some neighbor's roller shutters that I hear clearly in this apartment building), but the data looked pretty static and I was thinking that it makes more sense to produce separate time series based on the (source, destination) pairs. That way users can track communication of the individual device pairs as separate series (if they choose to).

I'll move this to a separate patch, though, and if you think that it's still a wrong idea, I'm OK with dropping it as well.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It just struck me as pretty arbitrary to filter this one random key. It might look similar to the "id" key but is really just a random data field like all the others. So I assumed an understandable mixup. Yes, let's discuss (universal) handling in another PR.

|| !strcmp(data->key, "src_id")
|| !strcmp(data->key, "channel")
|| !strcmp(data->key, "mic")) {
str = mbuf_snprintf(buf, ",%s=", data->key);
Expand Down Expand Up @@ -362,7 +384,10 @@ static void R_API_CALLCONV print_influx_data(data_output_t *output, data_t *data
}
else if (!strcmp(data->key, "type")
|| !strcmp(data->key, "subtype")
|| !strcmp(data->key, "protocol")
|| !strcmp(data->key, "id")
|| !strcmp(data->key, "dst_id")
|| !strcmp(data->key, "src_id")
|| !strcmp(data->key, "channel")
|| !strcmp(data->key, "mic")) {
// skip
Expand Down Expand Up @@ -457,6 +482,7 @@ struct data_output *data_output_influx_create(struct mg_mgr *mgr, char *opts)
influx_sanitize_tag(influx->hostname, NULL);

char *token = NULL;
char *metric_format = NULL;

// param/opts starts with URL
char *url = opts;
Expand Down Expand Up @@ -486,6 +512,11 @@ struct data_output *data_output_influx_create(struct mg_mgr *mgr, char *opts)
exit(1);
}

if (strlen(url) > RTL433_INFLUX_URL_LENGTH) {
print_logf(LOG_FATAL, __func__, "InfluxDB URL too long, sorry");
exit(1);
}

// parse auth and format options
char *key, *val;
while (getkwargs(&opts, &key, &val)) {
Expand All @@ -495,6 +526,8 @@ struct data_output *data_output_influx_create(struct mg_mgr *mgr, char *opts)
continue;
else if (!strcasecmp(key, "t") || !strcasecmp(key, "token"))
token = val;
else if (!strcasecmp(key, "metric"))
metric_format = val;
else if (!tls_param(&influx->tls_opts, key, val)) {
// ok
}
Expand All @@ -504,17 +537,24 @@ struct data_output *data_output_influx_create(struct mg_mgr *mgr, char *opts)
}
}

if (metric_format && strlen(metric_format) > RTL433_INFLUX_METRIC_LENGTH) {
print_logf(LOG_FATAL, __func__, "InfluxDB \"metric\" formatter too long, sorry");
exit(1);
}

influx->output.print_data = print_influx_data;
influx->output.print_array = print_influx_array;
influx->output.print_string = print_influx_string;
influx->output.print_double = print_influx_double;
influx->output.print_int = print_influx_int;
influx->output.output_free = data_output_influx_free;

print_logf(LOG_CRITICAL, "InfluxDB", "Publishing data to InfluxDB (%s)", url);
print_logf(LOG_CRITICAL, "InfluxDB", "Publishing data to InfluxDB (%s, %s%s)", url,
metric_format ? "dynamic metric " : "static metric",
metric_format ? metric_format : "");

influx->mgr = mgr;
influx_client_init(influx, url, token);
influx_client_init(influx, url, token, metric_format);

return &influx->output;
}
127 changes: 6 additions & 121 deletions src/output_mqtt.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "logger.h"
#include "fatal.h"
#include "r_util.h"
#include "string_expand.h"

#include <stdlib.h>
#include <stdio.h>
Expand Down Expand Up @@ -188,9 +189,9 @@ static void mqtt_client_free(mqtt_client_t *ctx)
/* Helper */

/// clean the topic inplace to [-.A-Za-z0-9], esp. not whitespace, +, #, /, $
static char *mqtt_sanitize_topic(char *topic)
static char *mqtt_sanitize_topic(char *topic, char *end)
{
for (char *p = topic; *p; ++p)
for (char *p = topic; p < end && *p; ++p)
if (*p != '-' && *p != '.' && (*p < 'A' || *p > 'Z') && (*p < 'a' || *p > 'z') && (*p < '0' || *p > '9'))
*p = '_';

Expand Down Expand Up @@ -224,122 +225,6 @@ static void R_API_CALLCONV print_mqtt_array(data_output_t *output, data_array_t
*orig = '\0'; // restore topic
}

static char *append_topic(char *topic, data_t *data)
{
if (data->type == DATA_STRING) {
strcpy(topic, data->value.v_ptr);
mqtt_sanitize_topic(topic);
topic += strlen(data->value.v_ptr);
}
else if (data->type == DATA_INT) {
topic += sprintf(topic, "%d", data->value.v_int);
}
else {
print_logf(LOG_ERROR, __func__, "Can't append data type %d to topic", data->type);
}

return topic;
}

static char *expand_topic(char *topic, char const *format, data_t *data, char const *hostname)
{
// collect well-known top level keys
data_t *data_type = NULL;
data_t *data_model = NULL;
data_t *data_subtype = NULL;
data_t *data_channel = NULL;
data_t *data_id = NULL;
data_t *data_protocol = NULL;
for (data_t *d = data; d; d = d->next) {
if (!strcmp(d->key, "type"))
data_type = d;
else if (!strcmp(d->key, "model"))
data_model = d;
else if (!strcmp(d->key, "subtype"))
data_subtype = d;
else if (!strcmp(d->key, "channel"))
data_channel = d;
else if (!strcmp(d->key, "id"))
data_id = d;
else if (!strcmp(d->key, "protocol")) // NOTE: needs "-M protocol"
data_protocol = d;
}

// consume entire format string
while (format && *format) {
data_t *data_token = NULL;
char const *string_token = NULL;
int leading_slash = 0;
char const *t_start = NULL;
char const *t_end = NULL;
char const *d_start = NULL;
char const *d_end = NULL;
// copy until '['
while (*format && *format != '[')
*topic++ = *format++;
// skip '['
if (!*format)
break;
++format;
// read slash
if (!leading_slash && (*format < 'a' || *format > 'z')) {
leading_slash = *format;
format++;
}
// read key until : or ]
t_start = t_end = format;
while (*format && *format != ':' && *format != ']' && *format != '[')
t_end = ++format;
// read default until ]
if (*format == ':') {
d_start = d_end = ++format;
while (*format && *format != ']' && *format != '[')
d_end = ++format;
}
// check for proper closing
if (*format != ']') {
print_log(LOG_FATAL, __func__, "unterminated token");
exit(1);
}
++format;

// resolve token
if (!strncmp(t_start, "hostname", t_end - t_start))
string_token = hostname;
else if (!strncmp(t_start, "type", t_end - t_start))
data_token = data_type;
else if (!strncmp(t_start, "model", t_end - t_start))
data_token = data_model;
else if (!strncmp(t_start, "subtype", t_end - t_start))
data_token = data_subtype;
else if (!strncmp(t_start, "channel", t_end - t_start))
data_token = data_channel;
else if (!strncmp(t_start, "id", t_end - t_start))
data_token = data_id;
else if (!strncmp(t_start, "protocol", t_end - t_start))
data_token = data_protocol;
else {
print_logf(LOG_FATAL, __func__, "unknown token \"%.*s\"", (int)(t_end - t_start), t_start);
exit(1);
}

// append token or default
if (!data_token && !string_token && !d_start)
continue;
if (leading_slash)
*topic++ = leading_slash;
if (data_token)
topic = append_topic(topic, data_token);
else if (string_token)
topic += sprintf(topic, "%s", string_token);
else
topic += sprintf(topic, "%.*s", (int)(d_end - d_start), d_start);
}

*topic = '\0';
return topic;
}

// <prefix>[/type][/model][/subtype][/channel][/id]/battery: "OK"|"LOW"
static void R_API_CALLCONV print_mqtt_data(data_output_t *output, data_t *data, char const *format)
{
Expand Down Expand Up @@ -368,7 +253,7 @@ static void R_API_CALLCONV print_mqtt_data(data_output_t *output, data_t *data,
return; // NOTE: skip output on alloc failure.
}
data_print_jsons(data, message, message_size);
expand_topic(mqtt->topic, mqtt->states, data, mqtt->hostname);
expand_topic_string(mqtt->topic, mqtt->states, data, mqtt->hostname, mqtt_sanitize_topic);
mqtt_client_publish(mqtt->mqc, mqtt->topic, message);
*mqtt->topic = '\0'; // clear topic
free(message);
Expand All @@ -380,7 +265,7 @@ static void R_API_CALLCONV print_mqtt_data(data_output_t *output, data_t *data,
if (mqtt->events) {
char message[2048]; // we expect the biggest strings to be around 500 bytes.
data_print_jsons(data, message, sizeof(message));
expand_topic(mqtt->topic, mqtt->events, data, mqtt->hostname);
expand_topic_string(mqtt->topic, mqtt->events, data, mqtt->hostname, mqtt_sanitize_topic);
mqtt_client_publish(mqtt->mqc, mqtt->topic, message);
*mqtt->topic = '\0'; // clear topic
}
Expand All @@ -390,7 +275,7 @@ static void R_API_CALLCONV print_mqtt_data(data_output_t *output, data_t *data,
return;
}

end = expand_topic(mqtt->topic, mqtt->devices, data, mqtt->hostname);
end = expand_topic_string(mqtt->topic, mqtt->devices, data, mqtt->hostname, mqtt_sanitize_topic);
}

while (data) {
Expand Down
Loading