From 68737f83aaf0bfacb863c8da009059faa6462d1c Mon Sep 17 00:00:00 2001 From: Gabriel Fosse <67290377+majesticio@users.noreply.github.com> Date: Sat, 10 Feb 2024 05:25:57 -0800 Subject: [PATCH] Sensors by location (#297) * Added ability to increase the working memory for a given endpoint (#282) resolves #281 * Added ability to increase the working memory for a given endpoint resolves #281 * update staging deploy to deploy on release branches (#284) * Added timer (#280) * included order_by in v1 and v2 measurements (#263) Co-authored-by: Gabriel Fosse * manufacturers resource (#273) * manufacturers resource resolves #252 --------- Co-authored-by: Gabriel Fosse * Adding `v3/instruments` resource (#271) * instruments resource resolves #270 --------- Co-authored-by: Gabriel Fosse * Added timer * Added owner router --------- Co-authored-by: Gabriel Fosse <67290377+majesticio@users.noreply.github.com> Co-authored-by: Gabriel Fosse Co-authored-by: Russ Biggs * Add sort and order by query params to v3 endpoints (#285) * order by classes * refactor query builder to handle sort and order by statements * add sort and order by to v3 endpoints resolves #163 --------- Co-authored-by: Gabriel Fosse * Feature/working memory query 281 (#287) * Moved the tr.start out of if block Fixes bug #286 --------- Co-authored-by: Christian Parker * fix missing start variable and return empty string for order (#288) * Remove count fields (#289) * removed count fields --------- Co-authored-by: Gabriel Fosse * fix failing unit test (#292) * rate limiter logging fix (#293) * location sensors query * location sensors path query * Cleaned up the sensors endpoints Rewrote the queries to speed them up a litte and added the measurements method * Removed old method that I had renamed --------- Co-authored-by: Christian Parker Co-authored-by: Russ Biggs Co-authored-by: Gabriel Fosse --- openaq_api/openaq_api/db.py | 2 + openaq_api/openaq_api/main.py | 1 + openaq_api/openaq_api/v3/models/responses.py | 2 +- openaq_api/openaq_api/v3/routers/locations.py | 2 +- openaq_api/openaq_api/v3/routers/sensors.py | 181 ++++++++++-------- 5 files changed, 102 insertions(+), 86 deletions(-) diff --git a/openaq_api/openaq_api/db.py b/openaq_api/openaq_api/db.py index 7259d1a..cde570d 100644 --- a/openaq_api/openaq_api/db.py +++ b/openaq_api/openaq_api/db.py @@ -21,6 +21,7 @@ allowed_config_params = ["work_mem"] + DEFAULT_CONNECTION_TIMEOUT = 6 MAX_CONNECTION_TIMEOUT = 15 @@ -32,6 +33,7 @@ def default(obj): # config is required as a placeholder here because of this # function is used in the `cached` decorator and without it # we will get a number of arguments error + def dbkey(m, f, query, args, timeout=None, config=None): j = orjson.dumps( args, option=orjson.OPT_OMIT_MICROSECONDS, default=default diff --git a/openaq_api/openaq_api/main.py b/openaq_api/openaq_api/main.py index 47317c4..9a1f04b 100644 --- a/openaq_api/openaq_api/main.py +++ b/openaq_api/openaq_api/main.py @@ -1,3 +1,4 @@ +from contextlib import asynccontextmanager import datetime import logging import time diff --git a/openaq_api/openaq_api/v3/models/responses.py b/openaq_api/openaq_api/v3/models/responses.py index 7be07bb..7f85509 100644 --- a/openaq_api/openaq_api/v3/models/responses.py +++ b/openaq_api/openaq_api/v3/models/responses.py @@ -73,6 +73,7 @@ class Summary(JsonBase): q75: float | None = None q98: float | None = None max: float | None = None + avg: float | None = None sd: float | None = None @@ -180,7 +181,6 @@ class Sensor(SensorBase): datetime_first: DatetimeObject datetime_last: DatetimeObject coverage: Coverage - # period: Period latest: Latest summary: Summary diff --git a/openaq_api/openaq_api/v3/routers/locations.py b/openaq_api/openaq_api/v3/routers/locations.py index abf4aa9..c5bc74b 100644 --- a/openaq_api/openaq_api/v3/routers/locations.py +++ b/openaq_api/openaq_api/v3/routers/locations.py @@ -128,7 +128,7 @@ async def fetch_locations(query, db): , bbox(geom) as bounds , datetime_first , datetime_last - {query_builder.fields() or ''} + {query_builder.fields() or ''} {query_builder.total()} FROM locations_view_cached {query_builder.where()} diff --git a/openaq_api/openaq_api/v3/routers/sensors.py b/openaq_api/openaq_api/v3/routers/sensors.py index 985c7bb..4406f05 100644 --- a/openaq_api/openaq_api/v3/routers/sensors.py +++ b/openaq_api/openaq_api/v3/routers/sensors.py @@ -4,8 +4,19 @@ from fastapi import APIRouter, Depends, Path from openaq_api.db import DB -from openaq_api.v3.models.queries import QueryBaseModel, QueryBuilder -from openaq_api.v3.models.responses import SensorsResponse +from openaq_api.v3.models.queries import ( + DateFromQuery, + DateToQuery, + Paging, + PeriodNameQuery, + QueryBaseModel, + QueryBuilder, +) + +from openaq_api.v3.models.responses import ( + SensorsResponse, + MeasurementsResponse, + ) from openaq_api.v3.routers.measurements import fetch_measurements logger = logging.getLogger("sensors") @@ -23,25 +34,51 @@ class SensorQuery(QueryBaseModel): ) def where(self): - return "m.sensors_id = :sensors_id" + return "s.sensors_id = :sensors_id" + +class LocationSensorQuery(QueryBaseModel): + locations_id: int = Path( + ..., description="Limit the results to a specific sensors id", ge=1 + ) + + def where(self): + return "n.sensor_nodes_id = :locations_id" + +class SensorMeasurementsQueries( + Paging, + SensorQuery, + DateFromQuery, + DateToQuery, + PeriodNameQuery, +): + ... -# class SensorsQueries(Paging, CountryQuery): -# ... +@router.get( + "/sensors/{sensors_id}/measurements", + response_model=MeasurementsResponse, + summary="Get measurements by sensor ID", + description="Provides a list of measurements by sensor ID", +) +async def sensor_measurements_get( + sensors: Annotated[SensorMeasurementsQueries, Depends(SensorMeasurementsQueries.depends())], + db: DB = Depends(), +): + response = await fetch_measurements(sensors, db) + return response -# @router.get( -# "/sensors/{id}/measurements", -# response_model=MeasurementsResponse, -# summary="Get measurements by sensor ID", -# description="Provides a list of measurements by sensor ID", -# ) -# async def sensor_measurements_get( -# sensor: SensorsQueries = Depends(SensorsQueries.depends()), -# db: DB = Depends(), -# ): -# response = await fetch_measurements(sensor, db) -# return response +@router.get( + "/locations/{locations_id}/sensors", + response_model=SensorsResponse, + summary="Get sensors by location ID", + description="Provides a list of sensors by location ID", +) +async def sensors_get( + location_sensors: Annotated[LocationSensorQuery, Depends(LocationSensorQuery.depends())], + db: DB = Depends(), +): + return await fetch_sensors(location_sensors, db) @router.get( @@ -54,76 +91,52 @@ async def sensor_get( sensors: Annotated[SensorQuery, Depends(SensorQuery.depends())], db: DB = Depends(), ): - response = await fetch_sensors(sensors, db) - return response + return await fetch_sensors(sensors, db) async def fetch_sensors(q, db): query = QueryBuilder(q) + logger.debug(query.params()) sql = f""" - WITH sensor AS ( - SELECT - m.sensors_id - , MIN(datetime - '1sec'::interval) as datetime_first - , MAX(datetime - '1sec'::interval) as datetime_last - , COUNT(1) as value_count - , AVG(value_avg) as value_avg - , STDDEV(value_avg) as value_sd - , MIN(value_avg) as value_min - , MAX(value_avg) as value_max - , PERCENTILE_CONT(0.02) WITHIN GROUP(ORDER BY value_avg) as value_p02 - , PERCENTILE_CONT(0.25) WITHIN GROUP(ORDER BY value_avg) as value_p25 - , PERCENTILE_CONT(0.5) WITHIN GROUP(ORDER BY value_avg) as value_p50 - , PERCENTILE_CONT(0.75) WITHIN GROUP(ORDER BY value_avg) as value_p75 - , PERCENTILE_CONT(0.98) WITHIN GROUP(ORDER BY value_avg) as value_p98 - , current_timestamp as calculated_on - FROM hourly_data m - {query.where()} - GROUP BY 1) - SELECT c.sensors_id as id - , 'sensor' as name - , c.value_avg as value - , get_datetime_object(c.datetime_first, ts.tzid) as datetime_first - , get_datetime_object(c.datetime_last, ts.tzid) as datetime_last - , json_build_object( - 'datetime', get_datetime_object(r.datetime_last, ts.tzid) - , 'value', r.value_latest - , 'coordinates', jsonb_build_object( - 'lat', st_y(r.geom_latest) - , 'lon', st_x(r.geom_latest) - ) - ) as latest - , json_build_object( - 'id', s.measurands_id - , 'units', m.units - , 'name', m.measurand - , 'display_name', m.display - ) as parameter - , json_build_object( - 'sd', c.value_sd - , 'min', c.value_min - , 'q02', c.value_p02 - , 'q25', c.value_p25 - , 'median', c.value_p50 - , 'q75', c.value_p75 - , 'q98', c.value_p98 - , 'max', c.value_max - ) as summary - , calculate_coverage( - c.value_count::int - , s.data_averaging_period_seconds - , s.data_logging_period_seconds - , EXTRACT(EPOCH FROM c.datetime_last - c.datetime_first) - ) as coverage - FROM sensors s - JOIN sensor_systems sy ON (s.sensor_systems_id = sy.sensor_systems_id) - JOIN sensor_nodes sn ON (sy.sensor_nodes_id = sn.sensor_nodes_id) - JOIN timezones ts ON (sn.timezones_id = ts.gid) - JOIN measurands m ON (s.measurands_id = m.measurands_id) - LEFT JOIN sensors_rollup r ON (s.sensors_id = r.sensors_id) - LEFT JOIN sensor c ON (c.sensors_id = s.sensors_id) - WHERE s.sensors_id = :sensors_id; - """ - response = await db.fetchPage(sql, query.params()) - return response + SELECT s.sensors_id as id + , m.measurand||' '||m.units as name + , json_build_object( + 'id', m.measurands_id + , 'name', m.measurand + , 'units', m.units + , 'display_name', m.display + ) as parameter + , s.sensors_id + , json_build_object( + 'min', r.value_min + , 'max', r.value_max + , 'avg', r.value_avg + , 'sd', r.value_sd + ) as summary + , calculate_coverage( + r.value_count + , s.data_averaging_period_seconds + , s.data_logging_period_seconds + , r.datetime_first + , r.datetime_last + ) as coverage + , get_datetime_object(r.datetime_first, t.tzid) as datetime_first + , get_datetime_object(r.datetime_last, t.tzid) as datetime_last + , json_build_object( + 'datetime', get_datetime_object(r.datetime_last, t.tzid) + , 'value', r.value_latest + , 'coordinates', json_build_object( + 'latitude', st_y(COALESCE(r.geom_latest, n.geom)) + ,'longitude', st_x(COALESCE(r.geom_latest, n.geom)) + )) as latest + FROM sensors s + JOIN sensor_systems sy ON (s.sensor_systems_id = sy.sensor_systems_id) + JOIN sensor_nodes n ON (sy.sensor_nodes_id = n.sensor_nodes_id) + JOIN timezones t ON (n.timezones_id = t.gid) + JOIN measurands m ON (s.measurands_id = m.measurands_id) + LEFT JOIN sensors_rollup r ON (s.sensors_id = r.sensors_id) + {query.where()} + {query.pagination()} + """ + return await db.fetchPage(sql, query.params())