-
Notifications
You must be signed in to change notification settings - Fork 6
Subscriptions
The subscription subcomponent enables real-time alerts, automatic triggers, and streaming data output to on-line models.
To handle subscriptions, the Open Storm platform uses Kapacitor: part of the influx TICK stack.
The subscription service can be used to monitor the ''health'' of the wireless sensor network. This system monitors device health statistics (such as battery voltage and connection attempts) for each node on the network. Alerts can be pushed to a variety of endpoints, including email and Slack.
Current network health subscriptions include a battery voltage monitor, and a 'deadman's switch'. The battery voltage monitor generates an alert when the battery voltage of any node drops below a specified threshold. The 'deadman's switch' generates an alert when the data throughput ceases for a given node (indicating that the node has stopped reporting).
Subscriptions are implemented as Kapacitor TICK scripts. Below, we've included some example TICK scripts that are used on our own sensor networks. See the Kapacitor docs for more information on writing Kapacitor TICK scripts.
The following Kapacitor tick script detects if the variable test_measurement
has been written to a database and generates an alert. This alert can be chained to different output methods to initiate arbitrary 'trigger' actions.
stream
|from()
.measurement('test_measurement')
|alert()
.info(lambda: bool('true'))
.log('/tmp/alerts.log')
Define and enable the script:
kapacitor define write_detect -type stream -tick write_detect.tick -dbrp _TEST.autogen
kapacitor enable write_detect
The following Kapacitor tick script is used to push an alert to Slack when the battery voltage of any node drops below a set of predefined thresholds:
stream
|from()
.measurement('v_bat')
.groupBy('node_id')
|alert()
.info(lambda: "value" < 3.7)
.warn(lambda: "value" < 3.6)
.crit(lambda: "value" < 3.5)
.message('{{ .Level}}: {{ .ID }} is {{ index .Fields "value" | printf "%0.3f" }} V')
.stateChangesOnly()
.log('/tmp/alerts.log')
.slack()
.channel('#alerts')
Define and enable the script:
kapacitor define vbat_alert -type stream -tick vbat_alert.tick -dbrp _TEST.autogen
kapacitor enable vbat_alert
The following Kapacitor tick script is used to push an alert to Slack when the throughput of a node drops below 1 reading in the last 6 hours:
var data = stream
|from()
.measurement('conn_attempts')
.groupBy('node_id')
data
|deadman(1.0, 6h)
.stateChangesOnly()
.slack()
.channel('#alerts')
.log('/tmp/alerts.log')
Define and enable the script:
kapacitor define deadman -type stream -tick deadman.tick -dbrp _TEST.autogen
kapacitor enable deadman
stream
|from()
.database('DATABASE_1')
.measurement('test_measurement')
|influxDBOut()
.database('DATABASE_2')
.tag('source', 'kapacitor')
Define and enable the script:
kapacitor define db_forward -type stream -tick db_forward.tick -dbrp DATABASE_1.autogen -dbrp DATABASE_2.autogen
kapacitor enable db_forward
Kapacitor can be used to generate real-time control signals. The following TICK script generates a PID control signal and writes it to the originating database. It also writes relevant statistics (derivative, integral, etc.) to a separate database for debugging purposes.
var P = 3.0
var I = 0.1
var D = 0.5
var dur = 1ms
var dur_factor = 1000.0
var data = stream
|from()
.database('_TEST')
.measurement('test_measurement')
.groupBy('node_id')
var deriv = data
|derivative('value')
var diff = data
|difference('value')
var elapse = data
|elapsed('value', dur)
var integral = data
|join(diff, elapse)
.as('data', 'diff', 'elapse')
.streamName('integral')
|eval(lambda: (float("elapse.elapsed") / dur_factor) * ("data.value" - float("diff.difference") / 2.0))
.as('value')
var control_signal = data
|join(deriv, integral)
.as('data', 'deriv', 'integral')
.streamName('control_signal')
|eval(lambda: P*float("data.value") + I*float("integral.value") + D*float("deriv.value"))
.as('value')
deriv
|influxDBOut()
.database('_VAR')
.measurement('deriv')
.tag('source', 'kapacitor')
diff
|influxDBOut()
.database('_VAR')
.measurement('diff')
.tag('source', 'kapacitor')
elapse
|influxDBOut()
.database('_VAR')
.measurement('elapse')
.tag('source', 'kapacitor')
integral
|influxDBOut()
.database('_VAR')
.measurement('integral')
.tag('source', 'kapacitor')
control_signal
|influxDBOut()
.database('_TEST')
.measurement('control_signal')
.tag('source', 'kapacitor')
Define and enable the script:
kapacitor define pid -type stream -tick pid.tick -dbrp _TEST.autogen -dbrp _VAR.autogen
kapacitor enable pid