Skip to content

Commit

Permalink
Support for InfluxDB 2
Browse files Browse the repository at this point in the history
  • Loading branch information
oherrala committed Jun 15, 2022
1 parent 80e83b6 commit e9b8386
Show file tree
Hide file tree
Showing 8 changed files with 361 additions and 1,139 deletions.
1,356 changes: 272 additions & 1,084 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion LICENSE
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
Copyright 2018 Ossi Herrala
Copyright 2018-2022 Ossi Herrala

Permission is hereby granted, free of charge, to any person obtaining
a copy of this software and associated documentation files (the
Expand Down
11 changes: 3 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,9 @@ password = "<your really secret password>"

[influxdb]
url = "<url to InfluxDB, e.g. http://127.0.0.1:8086>"
database = "<your InfluxDB database name>"
```

## Remember to create database

```console
$ influxd run
$ influx -execute 'create database energiatili;'
token = "<access token to InfluxDb>"
org = "<organization in InfluxDB>"
bucket = "<data bucket in InfluxDB>"
```

# Tools
Expand Down
4 changes: 3 additions & 1 deletion energiatili-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ pub struct Energiatili {
#[derive(Default, Serialize, Deserialize)]
pub struct InfluxDB {
pub url: String,
pub database: String,
pub token: String,
pub org: String,
pub bucket: String,
}

impl Config {
Expand Down
2 changes: 1 addition & 1 deletion energiatili-model/src/model.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::io::{BufRead, Read};

use chrono::{DateTime, Utc};
use serde_json;
use serde::{Deserialize, Serialize};
use serde_json;

use crate::utils::fix_new_date;

Expand Down
5 changes: 3 additions & 2 deletions influxdb-export/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@ edition = "2021"
energiatili-config = { path = "../energiatili-config" }
energiatili-model = { path = "../energiatili-model" }
env_logger = "0.9"
influxdb = "0.0.5"
tokio = "0.1"
futures = "0.3"
influxdb2_client = { git = "https://github.com/influxdata/influxdb_iox", rev = "bf39649d64e2890e2ae9e5a4e245c67624d2df43" }
tokio = { version = "1", features = [ "macros", "rt-multi-thread" ] }
4 changes: 0 additions & 4 deletions influxdb-export/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,2 @@

# InfluxDB

```console
> CREATE DATABASE energiatili
```
116 changes: 78 additions & 38 deletions influxdb-export/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::io;

use tokio::runtime::current_thread::Runtime;

use energiatili_model::measurement::{Measurements, Resolution, Tariff};
use energiatili_model::model::Model;
use influxdb2_client::models::data_point::{DataPoint, DataPointError};

fn main() {
#[tokio::main]
async fn main() {
env_logger::init();

let config = match energiatili_config::Config::read() {
Expand All @@ -16,53 +16,93 @@ fn main() {
}
};
let db_url = config.influxdb.url;
let db_name = config.influxdb.database;
let db_token = config.influxdb.token;
let db_org = config.influxdb.org;
let db_bucket = config.influxdb.bucket;

let stdin = io::stdin();
let stdin = io::stdin().lock();
let model = Model::from_reader(stdin).expect("read JSON Model");
let measurements = Measurements::from(&model);
let dp_stream = DataPointStream::new(measurements);

let client = influxdb::Client::new(db_url, db_name);
let mut rt = Runtime::new().expect("Unable to create a runtime");

for m in measurements.0 {
let ts = m.timestamp.timestamp_nanos();
let timestamp = influxdb::Timestamp::NANOSECONDS(ts as usize);
let mut measurement = <dyn influxdb::Query>::write_query(timestamp, "energiatili");
let client = influxdb2_client::Client::new(db_url, db_token);
client
.write(&db_org, &db_bucket, dp_stream)
.await
.expect("write to influxdb");
}

measurement = measurement.add_field("consumption", m.consumption);
measurement = measurement.add_field("quality", i64::from(m.quality));
struct DataPointStream {
measurements: std::collections::btree_set::IntoIter<Measurement>,
}

if let Some(price) = m.price.energy {
measurement = measurement.add_field("energy_price", price);
impl DataPointStream {
fn new(measurements: Measurements) -> Self {
Self {
measurements: measurements.0.into_iter(),
}
}
}

if let Some(price) = m.price.transfer {
measurement = measurement.add_field("transfer_price", price);
}
use std::pin::Pin;
use std::task::{Context, Poll};

if let (Some(e), Some(t)) = (m.price.energy, m.price.transfer) {
measurement = measurement.add_field("price", e + t);
}
impl futures::Stream for DataPointStream {
type Item = DataPoint;

if !m.temperature.is_nan() {
measurement = measurement.add_field("temperature", m.temperature);
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.measurements.size_hint()
}

if m.resolution == Resolution::Hour {
measurement = match m.tariff {
Tariff::Day => measurement.add_tag("tariff", "day"),
Tariff::Night => measurement.add_tag("tariff", "night"),
Tariff::Simple => measurement,
};
}
fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let measurement = match self.measurements.next() {
Some(m) => m,
None => return Poll::Ready(None),
};
let dp = convert(&measurement).expect("convert measurement to datapoint");
Poll::Ready(Some(dp))
}
}

measurement = match m.resolution {
Resolution::Hour => measurement.add_tag("resolution", "hour"),
Resolution::Day => measurement.add_tag("resolution", "day"),
Resolution::Month => measurement.add_tag("resolution", "month"),
Resolution::Year => measurement.add_tag("resolution", "year"),
use energiatili_model::measurement::Measurement;

fn convert(m: &Measurement) -> Result<DataPoint, DataPointError> {
let mut dp = influxdb2_client::models::DataPoint::builder("energiatili");

dp = dp.timestamp(m.timestamp.timestamp_nanos());
dp = dp.field("consumption", m.consumption);
dp = dp.field("quality", i64::from(m.quality));

if let Some(price) = m.price.energy {
dp = dp.field("energy_price", price);
}

if let Some(price) = m.price.transfer {
dp = dp.field("transfer_price", price);
}

if let (Some(e), Some(t)) = (m.price.energy, m.price.transfer) {
dp = dp.field("price", e + t);
}

if !m.temperature.is_nan() {
dp = dp.field("temperature", m.temperature);
}

if m.resolution == Resolution::Hour {
dp = match m.tariff {
Tariff::Day => dp.tag("tariff", "day"),
Tariff::Night => dp.tag("tariff", "night"),
Tariff::Simple => dp,
};
rt.block_on(client.query(&measurement)).expect("influxdb write");
}

dp = match m.resolution {
Resolution::Hour => dp.tag("resolution", "hour"),
Resolution::Day => dp.tag("resolution", "day"),
Resolution::Month => dp.tag("resolution", "month"),
Resolution::Year => dp.tag("resolution", "year"),
};

dp.build()
}

0 comments on commit e9b8386

Please sign in to comment.