From 1bdd13e221d462b573c68b15ce0abb313944ef47 Mon Sep 17 00:00:00 2001 From: Benyamin Ginzburg Date: Tue, 18 Oct 2022 00:07:21 +0300 Subject: [PATCH 1/3] [WIP] ws tracking --- Pipfile | 10 +++++----- israel_transport_api/main.py | 4 ++-- israel_transport_api/siri/client.py | 25 ++++++++++++++++++++++--- israel_transport_api/siri/models.py | 14 ++++++++++++++ israel_transport_api/siri/router.py | 28 +++++++++++++++++++++++++--- 5 files changed, 68 insertions(+), 13 deletions(-) diff --git a/Pipfile b/Pipfile index 65d783a..f0bc840 100644 --- a/Pipfile +++ b/Pipfile @@ -1,15 +1,15 @@ [[source]] -url = "https://pypi.org/simple" +url = 'https://pypi.org/simple' verify_ssl = true -name = "pypi" +name = 'pypi' [packages] fastapi = '==0.67.0' # untill https://github.com/tiangolo/fastapi/issues/3665 will closed pydantic = {extras = ["dotenv"],version = '*'} -uvicorn = '*' +uvicorn = {extras = ['websockets'], version = '*'} httpx = '*' -odmantic = "*" -motor = "*" +odmantic = '*' +motor = '*' betterlogging = '*' apscheduler = '*' diff --git a/israel_transport_api/main.py b/israel_transport_api/main.py index 8ad5ebc..bb9e954 100644 --- a/israel_transport_api/main.py +++ b/israel_transport_api/main.py @@ -1,7 +1,7 @@ import asyncio import os -import betterlogging as logging +import betterlogging as bl import uvicorn from fastapi import FastAPI from motor.motor_asyncio import AsyncIOMotorClient @@ -46,6 +46,6 @@ async def on_startup(): host='0.0.0.0' if os.getenv('DOCKER_MODE') else '127.0.0.1', port=8000, use_colors=True, - log_level=logging.DEBUG, + log_level=bl.DEBUG, log_config='../uvicorn_logger.json' ) diff --git a/israel_transport_api/siri/client.py b/israel_transport_api/siri/client.py index 65a2b77..ec53a7a 100644 --- a/israel_transport_api/siri/client.py +++ b/israel_transport_api/siri/client.py @@ -10,8 +10,7 @@ from israel_transport_api.misc import http_client from israel_transport_api.siri.exceptions import SiriException from israel_transport_api.siri.models import IncomingRoute, IncomingRoutesResponse -from israel_transport_api.siri.siri_models import MonitoredStopVisit - +from israel_transport_api.siri.siri_models import MonitoredStopVisit, VehicleLocation RETRY_COUNT = 5 logger = logging.getLogger('siri_client') @@ -55,7 +54,27 @@ async def get_incoming_routes(stop_code: int, monitoring_interval: int = 30) -> arrival_time = stop_visit.monitored_vehicle_journey.monitored_call.expected_arrival_time.replace(tzinfo=None) eta = (arrival_time - dt.now()).seconds // 60 route = routes_repository.find_route_by_id(stop_visit.monitored_vehicle_journey.line_ref) - incoming_routes.append(IncomingRoute(eta=eta, route=route)) + incoming_routes.append( + IncomingRoute( + eta=eta, + route=route, + plate_number=stop_visit.monitored_vehicle_journey.vehicle_ref + ) + ) resp = IncomingRoutesResponse(stop_info=stop_info, incoming_routes=sorted(incoming_routes, key=lambda r: r.eta)) return resp + + +async def get_vehicle_location(vehicle_plate_number: str, stop_code: int) -> VehicleLocation: + siri_data = await _make_request(stop_code, 30) + vehicle = list( + filter( + lambda m: m.monitored_vehicle_journey.vehicle_ref == vehicle_plate_number, + siri_data + ) + ) + if len(vehicle) == 0: + raise ValueError # todo + current_location = vehicle[0].monitored_vehicle_journey.vehicle_location + return current_location diff --git a/israel_transport_api/siri/models.py b/israel_transport_api/siri/models.py index 269b462..4b644d6 100644 --- a/israel_transport_api/siri/models.py +++ b/israel_transport_api/siri/models.py @@ -4,6 +4,7 @@ from pydantic import BaseModel, Field from israel_transport_api.gtfs.models import Stop, Route +from israel_transport_api.siri.siri_models import VehicleLocation class IncomingRoutesResponse(BaseModel): @@ -14,7 +15,20 @@ class IncomingRoutesResponse(BaseModel): class IncomingRoute(BaseModel): eta: int + plate_number: str route: Route +class VehicleLocationResponse(BaseModel): + latitude: float + longitude: float + + @classmethod + def from_siri_model(cls, siri_model: VehicleLocation): + return cls( + latitude=siri_model.latitude, + longitude=siri_model.longitude + ) + + IncomingRoutesResponse.update_forward_refs() diff --git a/israel_transport_api/siri/router.py b/israel_transport_api/siri/router.py index 7e74d77..204c3af 100644 --- a/israel_transport_api/siri/router.py +++ b/israel_transport_api/siri/router.py @@ -1,7 +1,12 @@ -from fastapi import APIRouter, Path, Query +from asyncio import sleep -from israel_transport_api.siri.client import get_incoming_routes -from israel_transport_api.siri.models import IncomingRoutesResponse +from fastapi import APIRouter, Path, Query, WebSocket, WebSocketDisconnect + +from israel_transport_api.siri.client import get_incoming_routes, get_vehicle_location +from israel_transport_api.siri.models import IncomingRoutesResponse, VehicleLocationResponse + + +WS_UPDATE_INTERVAL = 3 siri_router = APIRouter(prefix='/siri', tags=['Siri']) @@ -12,3 +17,20 @@ async def get_routes_for_stop( monitoring_interval: int = Query(30, description='Monitoring interval in minutes') ) -> IncomingRoutesResponse: return await get_incoming_routes(stop_code, monitoring_interval) + + +@siri_router.websocket('/track_vehicle/{stop_code}/{vehicle_plate_number}') +async def track_vehicle( + ws: WebSocket, + stop_code: int = Path(..., description='Stop code for tracking'), + vehicle_plate_number: str = Path(..., description='Vehicle plate number') +): + await ws.accept() + try: + while True: + resp = await get_vehicle_location(vehicle_plate_number, stop_code) + resp and await ws.send_json(resp.dict()) + await sleep(WS_UPDATE_INTERVAL) + except WebSocketDisconnect: + pass + From eb089fb91a3fe79804689c5bee938fa0a09e6217 Mon Sep 17 00:00:00 2001 From: Benyamin Ginzburg Date: Tue, 18 Oct 2022 00:07:21 +0300 Subject: [PATCH 2/3] [WIP] ws tracking --- Pipfile | 0 israel_transport_api/siri/client.py | 25 ++++++++++++++++++++++--- israel_transport_api/siri/models.py | 14 ++++++++++++++ israel_transport_api/siri/router.py | 28 +++++++++++++++++++++++++--- 4 files changed, 61 insertions(+), 6 deletions(-) create mode 100644 Pipfile diff --git a/Pipfile b/Pipfile new file mode 100644 index 0000000..e69de29 diff --git a/israel_transport_api/siri/client.py b/israel_transport_api/siri/client.py index f6c44c5..eaeceab 100644 --- a/israel_transport_api/siri/client.py +++ b/israel_transport_api/siri/client.py @@ -10,8 +10,7 @@ from israel_transport_api.misc import http_client from israel_transport_api.siri.exceptions import SiriException from israel_transport_api.siri.models import IncomingRoute, IncomingRoutesResponse -from israel_transport_api.siri.siri_models import MonitoredStopVisit - +from israel_transport_api.siri.siri_models import MonitoredStopVisit, VehicleLocation RETRY_COUNT = 5 logger = logging.getLogger('siri_client') @@ -59,7 +58,27 @@ async def get_incoming_routes( arrival_time = stop_visit.monitored_vehicle_journey.monitored_call.expected_arrival_time.replace(tzinfo=None) eta = (arrival_time - dt.now()).seconds // 60 route = await routes_repository.find_route_by_id(int(stop_visit.monitored_vehicle_journey.line_ref), conn) - incoming_routes.append(IncomingRoute(eta=eta, route=route)) + incoming_routes.append( + IncomingRoute( + eta=eta, + route=route, + plate_number=stop_visit.monitored_vehicle_journey.vehicle_ref + ) + ) resp = IncomingRoutesResponse(stop_info=stop_info, incoming_routes=sorted(incoming_routes, key=lambda r: r.eta)) return resp + + +async def get_vehicle_location(vehicle_plate_number: str, stop_code: int) -> VehicleLocation: + siri_data = await _make_request(stop_code, 30) + vehicle = list( + filter( + lambda m: m.monitored_vehicle_journey.vehicle_ref == vehicle_plate_number, + siri_data + ) + ) + if len(vehicle) == 0: + raise ValueError # todo + current_location = vehicle[0].monitored_vehicle_journey.vehicle_location + return current_location diff --git a/israel_transport_api/siri/models.py b/israel_transport_api/siri/models.py index 15ef401..bcbc57e 100644 --- a/israel_transport_api/siri/models.py +++ b/israel_transport_api/siri/models.py @@ -3,6 +3,7 @@ from pydantic import BaseModel, Field from israel_transport_api.gtfs.models import Stop, Route +from israel_transport_api.siri.siri_models import VehicleLocation class IncomingRoutesResponse(BaseModel): @@ -13,7 +14,20 @@ class IncomingRoutesResponse(BaseModel): class IncomingRoute(BaseModel): eta: int + plate_number: str route: Route +class VehicleLocationResponse(BaseModel): + latitude: float + longitude: float + + @classmethod + def from_siri_model(cls, siri_model: VehicleLocation): + return cls( + latitude=siri_model.latitude, + longitude=siri_model.longitude + ) + + IncomingRoutesResponse.update_forward_refs() diff --git a/israel_transport_api/siri/router.py b/israel_transport_api/siri/router.py index 666cbb4..c75464a 100644 --- a/israel_transport_api/siri/router.py +++ b/israel_transport_api/siri/router.py @@ -1,7 +1,12 @@ -from fastapi import APIRouter, Path, Query, Request +from asyncio import sleep -from israel_transport_api.siri.client import get_incoming_routes -from israel_transport_api.siri.models import IncomingRoutesResponse +from fastapi import APIRouter, Path, Query, Request, WebSocket, WebSocketDisconnect + +from israel_transport_api.siri.client import get_incoming_routes, get_vehicle_location +from israel_transport_api.siri.models import IncomingRoutesResponse, VehicleLocationResponse + + +WS_UPDATE_INTERVAL = 3 siri_router = APIRouter(prefix='/siri', tags=['Siri']) @@ -13,3 +18,20 @@ async def get_routes_for_stop( monitoring_interval: int = Query(30, description='Monitoring interval in minutes') ) -> IncomingRoutesResponse: return await get_incoming_routes(request.app.state.conn, stop_code, monitoring_interval) + + +@siri_router.websocket('/track_vehicle/{stop_code}/{vehicle_plate_number}') +async def track_vehicle( + ws: WebSocket, + stop_code: int = Path(..., description='Stop code for tracking'), + vehicle_plate_number: str = Path(..., description='Vehicle plate number') +): + await ws.accept() + try: + while True: + resp = await get_vehicle_location(vehicle_plate_number, stop_code) + resp and await ws.send_json(resp.dict()) + await sleep(WS_UPDATE_INTERVAL) + except WebSocketDisconnect: + pass + From dab3ccfb1146c113823fd577d2ea5dd869cac2f3 Mon Sep 17 00:00:00 2001 From: Benyamin Ginzburg Date: Mon, 12 Aug 2024 23:09:51 +0300 Subject: [PATCH 3/3] Implement vehicle tracking --- Pipfile | 0 israel_transport_api/config.py | 2 +- israel_transport_api/siri/router.py | 12 ++++++------ pdm.lock | 17 ++++++++++++++++- pyproject.toml | 2 +- 5 files changed, 24 insertions(+), 9 deletions(-) delete mode 100644 Pipfile diff --git a/Pipfile b/Pipfile deleted file mode 100644 index e69de29..0000000 diff --git a/israel_transport_api/config.py b/israel_transport_api/config.py index 2d68c0a..20ab05c 100644 --- a/israel_transport_api/config.py +++ b/israel_transport_api/config.py @@ -13,7 +13,7 @@ class Env(BaseSettings): SCHED_HOURS: int SCHED_MINS: int - DB_BATCH_SIZE: int = 300_000 + WS_UPDATE_INTERVAL: int = 5 env = Env() diff --git a/israel_transport_api/siri/router.py b/israel_transport_api/siri/router.py index c75464a..bb32e98 100644 --- a/israel_transport_api/siri/router.py +++ b/israel_transport_api/siri/router.py @@ -3,11 +3,10 @@ from fastapi import APIRouter, Path, Query, Request, WebSocket, WebSocketDisconnect from israel_transport_api.siri.client import get_incoming_routes, get_vehicle_location -from israel_transport_api.siri.models import IncomingRoutesResponse, VehicleLocationResponse +from israel_transport_api.siri.models import IncomingRoutesResponse +from israel_transport_api.config import env -WS_UPDATE_INTERVAL = 3 - siri_router = APIRouter(prefix='/siri', tags=['Siri']) @@ -28,10 +27,11 @@ async def track_vehicle( ): await ws.accept() try: + previous_resp = None while True: resp = await get_vehicle_location(vehicle_plate_number, stop_code) - resp and await ws.send_json(resp.dict()) - await sleep(WS_UPDATE_INTERVAL) + resp and previous_resp != resp and await ws.send_json(resp.model_dump()) + previous_resp = resp + await sleep(env.WS_UPDATE_INTERVAL) except WebSocketDisconnect: pass - diff --git a/pdm.lock b/pdm.lock index 89d7182..eb88bcd 100644 --- a/pdm.lock +++ b/pdm.lock @@ -5,7 +5,7 @@ groups = ["default", "dev"] strategy = ["inherit_metadata"] lock_version = "4.5.0" -content_hash = "sha256:f8cac5a8cc879acc895c40a249607f4b51ff8840fbb8dfe88bd87757070cc4a0" +content_hash = "sha256:ec4bcae5b4c2c1440fff8c7b0911e3695cbffb821186680dd2aca906dac8dc60" [[metadata.targets]] requires_python = "==3.12.*" @@ -168,6 +168,21 @@ files = [ {file = "fastapi_cli-0.0.4.tar.gz", hash = "sha256:e2e9ffaffc1f7767f488d6da34b6f5a377751c996f397902eb6abb99a67bde32"}, ] +[[package]] +name = "fastapi" +version = "0.111.1" +extras = ["websockets"] +requires_python = ">=3.8" +summary = "FastAPI framework, high performance, easy to learn, fast to code, ready for production" +groups = ["default"] +dependencies = [ + "fastapi==0.111.1", +] +files = [ + {file = "fastapi-0.111.1-py3-none-any.whl", hash = "sha256:4f51cfa25d72f9fbc3280832e84b32494cf186f50158d364a8765aabf22587bf"}, + {file = "fastapi-0.111.1.tar.gz", hash = "sha256:ddd1ac34cb1f76c2e2d7f8545a4bcb5463bce4834e81abf0b189e0c359ab2413"}, +] + [[package]] name = "h11" version = "0.14.0" diff --git a/pyproject.toml b/pyproject.toml index 3517322..8e4b667 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,7 +6,7 @@ authors = [ {name = "Benyamin Ginzburg", email = "benyomin.94@gmail.com"}, ] dependencies = [ - "fastapi==0.111.1", + "fastapi[websockets]==0.111.1", "pydantic==2.8.2", "pydantic-settings==2.4.0", "uvicorn==0.30.4",