Skip to content

Commit

Permalink
Added fetcher_responses
Browse files Browse the repository at this point in the history
  • Loading branch information
caparker committed Dec 18, 2024
1 parent 566c60e commit d40898d
Show file tree
Hide file tree
Showing 11 changed files with 510 additions and 322 deletions.
9 changes: 8 additions & 1 deletion migra/patch.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
start=`date +%s`
PORT=5432
USER=postgres
#HOST=localhost
DB=openaq

while getopts ":f:h:p:u:" opt
do
Expand All @@ -10,6 +12,7 @@ do
p ) PORT="$OPTARG" ;;
h ) HOST="$OPTARG" ;;
u ) USER="$OPTARG" ;;
d ) DB="$OPTARG" ;;
esac
done

Expand All @@ -19,6 +22,10 @@ if [ -z ${HOST} ]; then
exit 1
fi;

if [ -z ${DB} ]; then
printf '%s\n' "Missing db" >&2
exit 1
fi;

if [ -z ${FILE} ]; then
printf '%s\n' "Missing file" >&2
Expand All @@ -32,7 +39,7 @@ PGOPTIONS='--client-min-messages=warning' psql \
-h $HOST \
-p $PORT \
-U $USER \
-d openaq \
-d $DB \
-c "BEGIN;" \
"${FILES[@]}" \
-c "COMMIT;"
Expand Down
8 changes: 8 additions & 0 deletions openaqdb/cron.sql
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,14 @@ SELECT cron.schedule_in_database(
, 'openaq'
);


SELECT cron.schedule_in_database(
'delete-fetcher-reponses'
, '0 * * * *'
, $$DELETE FROM fetcher_responses WHERE datetime < (current_date - 14)$$
, 'openaq'
);

-- just in case we start having failed ingestions
-- we dont want to keep them open
SELECT cron.schedule_in_database(
Expand Down
5 changes: 5 additions & 0 deletions openaqdb/idempotent/util_functions.sql
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,11 @@ SELECT date_trunc('hour', (tstz + '-1sec'::interval + tz_offset)) - tz_offset
$$ LANGUAGE SQL IMMUTABLE PARALLEL SAFE;


CREATE OR REPLACE FUNCTION truncate_timestamp(tstz timestamptz, period text, tz text, _offset interval)
RETURNS timestamptz AS $$
SELECT timezone(tz, date_trunc(period, timezone(tz, tstz + ('-1sec'::interval + _offset))));
$$ LANGUAGE SQL IMMUTABLE PARALLEL SAFE;


CREATE OR REPLACE FUNCTION truncate_timestamp(tstz timestamptz, period text)
RETURNS timestamptz AS $$
Expand Down
3 changes: 0 additions & 3 deletions openaqdb/idempotent/views.sql
Original file line number Diff line number Diff line change
Expand Up @@ -213,9 +213,6 @@ CREATE INDEX ON measurements_fastapi_base USING GIST (geog);






create or replace view measurements_analyses AS
SELECT * FROM measurements
UNION ALL
Expand Down
2 changes: 2 additions & 0 deletions openaqdb/lookups/measurands_map.sql
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,7 @@ INSERT INTO measurands_map (key, measurands_id, units, source_name) VALUES
, ('co2', 21, '', 'houston')
, ('bc', 11, '', 'houston')
, ('um025', 130, '', 'houston')
, ('relativehumidity', 98, '', 'airqoon')
, ('pressure', 95, '', 'airqoon')
ON CONFLICT DO NOTHING
;
48 changes: 29 additions & 19 deletions openaqdb/mock.sql
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@
WITH locations AS (
SELECT * FROM
(VALUES
('testing/site1', 'testing', 'testing site 1', ST_SetSRID(ST_Point( -151.76306, -16.51516), 4326), 'airnow', 'pm25') -- Kirbati
, ('testing/site2', 'testing', 'testing site 2', ST_SetSRID(ST_Point( -121.8418, 44.75228), 4326), 'airnow', 'pm25') -- America/Los_Angeles
, ('testing/site3', 'testing', 'testing site 3', ST_SetSRID(ST_Point( -71.104, 42.315),4326), 'airnow', 'pm25') -- America/New_York
, ('testing/site4', 'testing', 'testing site 4', ST_SetSRID(ST_Point( -0.107389, 51.487236), 4326), 'airnow', 'pm25') -- Europe/London
, ('testing/site5', 'testing', 'testing site 5', ST_SetSRID(ST_Point( 185.199922, -20.248716), 4326), 'airnow', 'pm25')
, ('testing/site6', 'testing', 'testing site 6', ST_SetSRID(ST_Point( 75.85257, 22.70763), 4326), 'airnow', 'pm25') -- malav, indore
) as t (source_id, source_name, site_name, geom, provider, measurand)
('testing/site1', 'testing', 'testing site 1', 30, ST_SetSRID(ST_Point( -151.76306, -16.51516), 4326), 'airnow', 'pm25') -- Kirbati
, ('testing/site2', 'testing', 'testing site 2', 30, ST_SetSRID(ST_Point( -121.8418, 44.75228), 4326), 'airnow', 'pm25') -- America/Los_Angeles
, ('testing/site3', 'testing', 'testing site 3', 30, ST_SetSRID(ST_Point( -71.104, 42.315),4326), 'airnow', 'pm25') -- America/New_York
, ('testing/site4', 'testing', 'testing site 4', 30, ST_SetSRID(ST_Point( -0.107389, 51.487236), 4326), 'airnow', 'pm25') -- Europe/London
, ('testing/site5', 'testing', 'testing site 5', 30, ST_SetSRID(ST_Point( 185.199922, -20.248716), 4326), 'airnow', 'pm25')
, ('testing/site6', 'testing', 'testing site 6', 30, ST_SetSRID(ST_Point( 75.85257, 22.70763), 4326), 'airnow', 'pm25') -- malav, indore
, ('testing/site7', 'testing', 'testing site 7', 60, ST_SetSRID(ST_Point( -121.8418, 44.75228), 4326), 'airnow', 'pm25') -- America/Los_Angeles
) as t (source_id, source_name, site_name, minutes, geom, provider, measurand)
), inserted_nodes AS (
INSERT INTO sensor_nodes (
source_id
Expand Down Expand Up @@ -40,7 +41,7 @@ WITH locations AS (
, timezones_id = EXCLUDED.timezones_id
, countries_id = EXCLUDED.countries_id
, owner_entities_id = EXCLUDED.owner_entities_id
RETURNING source_id, sensor_nodes_id
RETURNING source_id, sensor_nodes_id, metadata
), inserted_systems AS (
INSERT INTO sensor_systems (sensor_nodes_id, source_id, instruments_id)
SELECT sensor_nodes_id
Expand All @@ -62,25 +63,30 @@ WITH locations AS (
SELECT sensor_systems_id
, source_id||'/'||measurand
, get_measurands_id(l.measurand)
, 60*30
, 60*30
, 60*minutes
, 60*minutes
FROM locations l
JOIN inserted_systems n USING (source_id)
--ON CONFLICT (sensor_systems_id, measurands_id) DO UPDATE
--SET source_id = EXCLUDED.source_id
RETURNING sensor_systems_id, source_id
) SELECT * FROM inserted_sensors;




-- Using the 15m offset to make it easier to debug 30m offset timezones
WITH fake_times AS (
SELECT generate_series('2023-03-01'::date, '2023-04-01'::date, '30min'::interval) + '15m'::interval as datetime
--SELECT generate_series('2023-03-01'::date, '2023-04-01'::date, '30min'::interval) + '30m'::interval as datetime
SELECT sensors_id, datetime + make_interval(secs=>data_logging_period_seconds) as datetime
FROM sensors, generate_series('2023-03-01'::date, '2023-04-01'::date, make_interval(secs=>data_logging_period_seconds)) as datetime
) INSERT INTO measurements (datetime, sensors_id, value)
--SELECT f.datetime, s.sensors_id, date_part('day', as_local(datetime - interval '1sec', t.tzid))
SELECT as_utc(datetime, t.tzid)
, s.sensors_id
, date_part('day', datetime - '1sec'::interval) + date_part('hour', datetime - '1sec'::interval)/100
FROM fake_times f
JOIN sensors s ON (TRUE)
JOIN sensors s ON (s.sensors_id = f.sensors_id)
JOIN sensor_systems sy ON (s.sensor_systems_id = sy.sensor_systems_id)
JOIN sensor_nodes sn ON (sy.sensor_nodes_id = sn.sensor_nodes_id)
JOIN timezones t ON (sn.timezones_id = t.timezones_id)
Expand All @@ -90,34 +96,38 @@ SELECT generate_series('2023-03-01'::date, '2023-04-01'::date, '30min'::interval
-- make sure we have something to test the moy with
WITH fake_times AS (
--SELECT generate_series(current_date - (365 * 2), current_date, '1d'::interval) as datetime
SELECT generate_series('2021-12-25'::date, '2023-01-05'::date, '1d'::interval) + '0m'::interval as datetime
-- SELECT generate_series('2021-12-25'::date, '2023-01-05'::date, '1d'::interval) + '0m'::interval as datetime
SELECT sensors_id, datetime
FROM sensors, generate_series('2021-12-15'::date, '2023-01-05'::date, make_interval(secs=>data_logging_period_seconds)) as datetime
) INSERT INTO measurements (datetime, sensors_id, value)
--SELECT f.datetime, s.sensors_id, date_part('day', as_utc(datetime - interval '1sec', t.tzid))
SELECT as_utc(datetime, t.tzid)
, s.sensors_id
, date_part('day', datetime - '1sec'::interval) + date_part('hour', datetime - '1sec'::interval)/100
FROM fake_times xf
JOIN sensors s ON (TRUE)
FROM fake_times f
JOIN sensors s ON (s.sensors_id = f.sensors_id)
JOIN sensor_systems sy ON (s.sensor_systems_id = sy.sensor_systems_id)
JOIN sensor_nodes sn ON (sy.sensor_nodes_id = sn.sensor_nodes_id)
JOIN timezones t ON (sn.timezones_id = t.timezones_id)
WHERE sensors_id = 1
WHERE f.sensors_id IN (1)
ON CONFLICT DO NOTHING;


WITH fake_times AS (
SELECT generate_series(current_date - 7, current_timestamp, '30min'::interval) + '15m'::interval as datetime
--SELECT generate_series(current_date - 7, current_timestamp, '30min'::interval) + '30m'::interval as datetime
SELECT sensors_id, datetime + make_interval(secs=>data_logging_period_seconds) as datetime
FROM sensors, generate_series(current_date - 7, current_timestamp, make_interval(secs=>data_logging_period_seconds)) as datetime
) INSERT INTO measurements (datetime, sensors_id, value)
--SELECT f.datetime, s.sensors_id, date_part('day', as_utc(datetime - interval '1sec', t.tzid))
SELECT as_utc(datetime, t.tzid)
, s.sensors_id
, date_part('day', datetime - '1sec'::interval) + date_part('hour', datetime - '1sec'::interval)/100
FROM fake_times f
JOIN sensors s ON (TRUE)
JOIN sensors s ON (f.sensors_id = s.sensors_id)
JOIN sensor_systems sy ON (s.sensor_systems_id = sy.sensor_systems_id)
JOIN sensor_nodes sn ON (sy.sensor_nodes_id = sn.sensor_nodes_id)
JOIN timezones t ON (sn.timezones_id = t.timezones_id)
WHERE sensors_id = 1
WHERE f.sensors_id IN (1)
ON CONFLICT DO NOTHING;


Expand Down
67 changes: 67 additions & 0 deletions openaqdb/tables/fetchlogs.sql
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,70 @@ CREATE TABLE IF NOT EXISTS ingest_stats (
, started_on timestamptz DEFAULT now() -- start time for stats
, ingested_on timestamptz DEFAULT now() -- last update for stats
);


-- A table to store responses sent from the fetchers
CREATE TABLE IF NOT EXISTS fetcher_responses (
source_name text NOT NULL
, datetime timestamptz NOT NULL DEFAULT now()
, message text NOT NULL
, records int NOT NULL DEFAULT 0
, locations int
, datetime_from timestamptz
, datetime_to timestamptz
, duration_seconds real
, errors json
, parameters json
);



CREATE OR REPLACE FUNCTION fetcher_source_summary(st timestamptz, et timestamptz DEFAULT now()) RETURNS TABLE (
source_name text
, n int
, zeros int
, pct_success double precision
, min double precision
, p02 double precision
, p25 double precision
, p50 double precision
, avg double precision
, sd double precision
, p75 double precision
, p98 double precision
, max double precision
, skew double precision
) AS $$
WITH fetcher_agg AS (
SELECT source_name
, COUNT(1) as n
, SUM((records=0)::int) AS zeros
, MIN(records) as min
, PERCENTILE_CONT(0.02) WITHIN GROUP(ORDER BY records) as p02
, PERCENTILE_CONT(0.25) WITHIN GROUP(ORDER BY records) as p25
, PERCENTILE_CONT(0.5) WITHIN GROUP(ORDER BY records) as p50
, AVG(records) as avg
, STDDEV(records) as sd
, PERCENTILE_CONT(0.75) WITHIN GROUP(ORDER BY records) as p75
, PERCENTILE_CONT(0.98) WITHIN GROUP(ORDER BY records) as p98
, MAX(records) as max
FROM fetcher_responses
WHERE datetime > st
AND datetime < et
GROUP BY source_name)
SELECT source_name
, n
, zeros
, CASE WHEN zeros = 0 THEN 100 ELSE ROUND(((1-(zeros::numeric/n::numeric)) * 100.0),1) END as pct_success
, min
, ROUND(p02::numeric,1) as p02
, ROUND(p25::numeric,1) as p25
, ROUND(p50::numeric,1) as p50
, ROUND(avg::numeric,1) as avg
, ROUND(sd::numeric,1) as sd
, ROUND(p75::numeric,1) as p75
, ROUND(p98::numeric,1) as p98
, max
, CASE WHEN sd>0 THEN ROUND(((3*(avg-p50))/sd)::numeric,2) ELSE 0 END AS skew
FROM fetcher_agg;
$$ LANGUAGE SQL;
41 changes: 40 additions & 1 deletion openaqdb/tables/hourly_data_rollups.sql
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,45 @@ $$ LANGUAGE SQL;
);





SELECT datetime
, tz_offset
, datetime >= '-infinity'::date
, datetime <= current_date - '1hour'::interval
, current_date - '1hour'::interval
FROM hourly_data_queue WHERE calculated_on IS NULL AND queued_on IS NULL ORDER BY datetime DESC LIMIT 10;



CREATE OR REPLACE FUNCTION fetch_hourly_data_jobs2(n int DEFAULT 1, min_hour timestamptz DEFAULT NULL, max_hour timestamptz DEFAULT NULL) RETURNS TABLE(
datetime timestamptz
, tz_offset interval
) AS $$
SELECT q.datetime
, q.tz_offset
FROM hourly_data_queue q
-- Its either not been calculated or its been modified
WHERE q.datetime >= COALESCE(min_hour, '-infinity'::date)
AND q.datetime <= COALESCE(max_hour, now() - '1hour'::interval)
AND (q.calculated_on IS NULL)-- OR (q.modified_on IS NULL OR q.modified_on > q.calculated_on))
-- either its never been or it was resently modified but not queued
--AND (q.queued_on IS NULL -- has not been queued
--OR (
-- q.queued_on < now() - '1h'::interval -- a set amount of time has passed AND
-- AND (
-- q.queued_on < q.modified_on -- its been changed since being queued
-- OR calculated_on IS NULL -- it was never calculated
-- )
-- )
--)
ORDER BY q.datetime, q.tz_offset
LIMIT n;
$$ LANGUAGE sql;



CREATE OR REPLACE FUNCTION fetch_hourly_data_jobs(n int DEFAULT 1, min_hour timestamptz DEFAULT NULL, max_hour timestamptz DEFAULT NULL) RETURNS TABLE(
datetime timestamptz
, tz_offset interval
Expand All @@ -208,7 +247,7 @@ CREATE OR REPLACE FUNCTION fetch_hourly_data_jobs(n int DEFAULT 1, min_hour time
FROM hourly_data_queue q
-- Its either not been calculated or its been modified
WHERE q.datetime >= COALESCE(min_hour, '-infinity'::date)
AND q.datetime <= COALESCE(max_hour, current_date - '1hour'::interval)
AND q.datetime <= COALESCE(max_hour, date_trunc('hour', now()))
AND (q.calculated_on IS NULL OR q.modified_on > q.calculated_on)
-- either its never been or it was resently modified but not queued
AND (q.queued_on IS NULL -- has not been queued
Expand Down
4 changes: 3 additions & 1 deletion openaqdb/tables/versions.sql
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,11 @@ SELECT r.versions_id
, r.version_date
, r.label as life_cycle
, r.version_rank
, sy.sensor_nodes_id
FROM version_ranks r
JOIN sensors s ON (s.sensors_id = r.sensors_id)
JOIN sensors p ON (p.sensors_id = r.parent_sensors_id);
JOIN sensors p ON (p.sensors_id = r.parent_sensors_id)
JOIN sensor_systems sy ON (s.sensor_systems_id = sy.sensor_systems_id);


CREATE OR REPLACE VIEW stale_versions AS
Expand Down
Loading

0 comments on commit d40898d

Please sign in to comment.