Skip to content

Subscriptions

Matt Bartos edited this page May 19, 2017 · 1 revision

The subscription subcomponent enables real-time alerts, automatic triggers, and streaming data output to on-line models.

To handle subscriptions, the Open Storm platform uses Kapacitor: part of the influx TICK stack.

Network health alerts

The subscription service can be used to monitor the ''health'' of the wireless sensor network. This system monitors device health statistics (such as battery voltage and connection attempts) for each node on the network. Alerts can be pushed to a variety of endpoints, including email and Slack.

Current network health subscriptions include a battery voltage monitor, and a 'deadman's switch'. The battery voltage monitor generates an alert when the battery voltage of any node drops below a specified threshold. The 'deadman's switch' generates an alert when the data throughput ceases for a given node (indicating that the node has stopped reporting).

Subscriptions are implemented as Kapacitor TICK scripts. Below, we've included some example TICK scripts that are used on our own sensor networks. See the Kapacitor docs for more information on writing Kapacitor TICK scripts.

Generic write detection

The following Kapacitor tick script detects if the variable test_measurement has been written to a database and generates an alert. This alert can be chained to different output methods to initiate arbitrary 'trigger' actions.

stream
    |from()
	.measurement('test_measurement')
	|alert()
	    .info(lambda: bool('true'))
	    .log('/tmp/alerts.log')

Define and enable the script:

kapacitor define write_detect -type stream -tick write_detect.tick -dbrp _TEST.autogen
kapacitor enable write_detect

Battery voltage monitor

The following Kapacitor tick script is used to push an alert to Slack when the battery voltage of any node drops below a set of predefined thresholds:

stream
    |from()
        .measurement('v_bat')
        .groupBy('node_id')
    |alert()
        .info(lambda: "value" < 3.7)
        .warn(lambda: "value" < 3.6)
        .crit(lambda: "value" < 3.5)
        .message('{{ .Level}}: {{ .ID }} is {{ index .Fields "value" | printf "%0.3f" }} V')
        .stateChangesOnly()
        .log('/tmp/alerts.log')
        .slack()
        .channel('#alerts')

Define and enable the script:

kapacitor define vbat_alert -type stream -tick vbat_alert.tick -dbrp _TEST.autogen
kapacitor enable vbat_alert

Deadman's switch

The following Kapacitor tick script is used to push an alert to Slack when the throughput of a node drops below 1 reading in the last 6 hours:

var data = stream
    |from()
        .measurement('conn_attempts')
        .groupBy('node_id')
data
    |deadman(1.0, 6h)
        .stateChangesOnly()
        .slack()
        .channel('#alerts')
        .log('/tmp/alerts.log')

Define and enable the script:

kapacitor define deadman -type stream -tick deadman.tick -dbrp _TEST.autogen
kapacitor enable deadman

Data forwarding

Forward measurements to another database on the influxdb instance

stream
    |from()
        .database('DATABASE_1')
        .measurement('test_measurement')
    |influxDBOut()
        .database('DATABASE_2')
        .tag('source', 'kapacitor')

Define and enable the script:

kapacitor define db_forward -type stream -tick db_forward.tick -dbrp DATABASE_1.autogen -dbrp DATABASE_2.autogen
kapacitor enable db_forward

Real-time control

Kapacitor can be used to generate real-time control signals. The following TICK script generates a PID control signal and writes it to the originating database. It also writes relevant statistics (derivative, integral, etc.) to a separate database for debugging purposes.

PID Controller

var P = 3.0
var I = 0.1
var D = 0.5

var dur = 1ms
var dur_factor = 1000.0

var data = stream
    |from()
        .database('_TEST')
        .measurement('test_measurement')
        .groupBy('node_id')

var deriv = data
    |derivative('value')

var diff = data
    |difference('value')

var elapse = data
    |elapsed('value', dur)

var integral = data
    |join(diff, elapse)
        .as('data', 'diff', 'elapse')
        .streamName('integral')
    |eval(lambda: (float("elapse.elapsed") / dur_factor) * ("data.value" - float("diff.difference") / 2.0))
        .as('value')

var control_signal = data
    |join(deriv, integral)
        .as('data', 'deriv', 'integral')
        .streamName('control_signal')
    |eval(lambda: P*float("data.value") + I*float("integral.value") + D*float("deriv.value"))
        .as('value')

deriv
    |influxDBOut()
        .database('_VAR')
        .measurement('deriv')
        .tag('source', 'kapacitor')

diff
    |influxDBOut()
        .database('_VAR')
        .measurement('diff')
        .tag('source', 'kapacitor')

elapse
    |influxDBOut()
        .database('_VAR')
        .measurement('elapse')
        .tag('source', 'kapacitor')

integral
    |influxDBOut()
        .database('_VAR')
        .measurement('integral')
        .tag('source', 'kapacitor')

control_signal
    |influxDBOut()
        .database('_TEST')
        .measurement('control_signal')
        .tag('source', 'kapacitor')

Define and enable the script:

kapacitor define pid -type stream -tick pid.tick -dbrp _TEST.autogen -dbrp _VAR.autogen
kapacitor enable pid