Skip to content

Commit

Permalink
fetch gps data from OceanLive server and insert it to the local gpsda…
Browse files Browse the repository at this point in the history
…ta db
  • Loading branch information
alischandy committed Nov 27, 2024
1 parent f93d4da commit 952492b
Show file tree
Hide file tree
Showing 4 changed files with 254 additions and 117 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ db.db
secret*
/videos/
/ondeck/
.env

257 changes: 140 additions & 117 deletions gps_fetch.py
Original file line number Diff line number Diff line change
@@ -1,122 +1,145 @@
import os
import re
import time
from datetime import UTC, datetime
from pathlib import Path
from datetime import datetime, timezone

import click
import paramiko
import psycopg2
import schedule
from dateutil.parser import isoparse
from flask.config import Config as FlaskConfig
from psycopg2.pool import SimpleConnectionPool

flaskconfig = FlaskConfig(root_path="")

flaskconfig.from_object("config.defaults")
if "ENVIRONMENT" in os.environ:
flaskconfig.from_envvar("ENVIRONMENT")


def thalos_gps_filename_date(filename: str) -> datetime:
m = re.match(r".*(\d{8}).?(\d{6})\.txt", filename)
if not m:
return None
return isoparse(m[1] + " " + m[2] + "+00:00")


def gps_fetch(cpool: SimpleConnectionPool, thalos_dir: Path):
conn: psycopg2.connection = cpool.getconn()
gps_files = [x for x in thalos_dir.iterdir()]
dt_index = {}
for gps_file in gps_files:
m = re.match(r".*(\d{8}).?(\d{6})\.txt", gps_file.name)
if not m:
continue
dt = datetime.strptime(m[1] + " " + m[2] + "Z", "%Y%m%d %H%M%S%z")
dt_index[dt] = gps_file

new_dts = []

# dts_tupled = list(map(lambda x: (x,), dt_index.keys()))
if len(dt_index.keys()) > 0:
try:
with conn.cursor() as cur:
args = ",".join(cur.mogrify("(%s)", [dt]).decode("utf-8") for dt in dt_index)
cur.execute(
"""WITH t (file_dt) AS ( VALUES """
+ args
+ """ )
SELECT t.file_dt FROM t
LEFT JOIN gpsdata ON t.file_dt = gpsdata.gps_datetime
WHERE gpsdata.gps_datetime IS NULL;"""
)
# print(cur.query)
# print(cur.description)
rows = cur.fetchall()
new_dts.extend(col for cols in rows for col in cols)

insert_tuples = []

for new_dt in new_dts:
new_file: Path = dt_index[new_dt.astimezone(UTC)]
with new_file.open() as data:
line = data.readline()
m = re.match(
r"([+-]?(\d+(\.\d*)?|\.\d+)).*,.*?([+-]?(\d+(\.\d*)?|\.\d+))", line
)
if m:
lat = m[1]
lon = m[4]
insert_tuples.append(
(
new_dt,
lat,
lon,
)
)

if len(insert_tuples) > 0:
click.echo(f"inserting {len(insert_tuples)} new gps coords")
with conn.cursor() as cur:
cur.executemany(
"INSERT INTO gpsdata (gps_datetime, lat, lon) VALUES (%s, %s, %s);",
insert_tuples,
)
# print(cur.query)
conn.commit()
finally:
cpool.putconn(conn)


@click.command()
@click.option("--dbname", default=flaskconfig.get("DBNAME"))
@click.option("--dbuser", default=flaskconfig.get("DBUSER"))
@click.option("--thalos_gps_dir", default=flaskconfig.get("THALOS_GPS_DIR"))
def main(dbname, dbuser, thalos_gps_dir):
thalos_gps_dir = Path(thalos_gps_dir)

cpool = SimpleConnectionPool(1, 1, database=dbname, user=dbuser)

def runonce(cpool, thalos_gps_dir):
gps_fetch(cpool, thalos_gps_dir)
return schedule.CancelJob

schedule.every(1).seconds.do(runonce, cpool, thalos_gps_dir)
schedule.every(15).minutes.do(gps_fetch, cpool, thalos_gps_dir)

while 1:
n = schedule.idle_seconds()
if n is None:
# no more jobs
click.echo("No more jobs. exiting")
break
elif n > 0:
# sleep exactly the right amount of time
click.echo(f"sleeping for: {n}")
time.sleep(n)
schedule.run_pending()


if __name__ == "__main__":
main()
from dotenv import load_dotenv

# Load environment variables
load_dotenv(override=True)

# Configuration
SERVER_HOSTNAME = os.getenv("OCEANLIVE_SERVER")
USERNAME = os.getenv("OCEANLIVE_USERNAME")
PASSWORD = os.getenv("OCEANLIVE_PASSWORD")
REMOTE_PATH = os.getenv("OCEANLIVE_GPS_FOLDER")
PROCESSED_FILE = os.getenv("PROCESSED_GPS_LOG")
LOCAL_FOLDER = os.getenv("GPS_FILES_FOLDER")
DBNAME = os.getenv("DBNAME")
DBUSER = os.getenv("DBUSER")

# Ensure directories exist
os.makedirs(LOCAL_FOLDER, exist_ok=True)


def connect_to_server() -> paramiko.SSHClient:
"""Connect to the remote server."""
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(SERVER_HOSTNAME, username=USERNAME, password=PASSWORD)
return ssh


def read_processed_files() -> set:
"""Read the list of processed GPS files from log file."""
if os.path.exists(PROCESSED_FILE):
with open(PROCESSED_FILE) as f:
return {line.strip() for line in f}
return set()


def mark_as_processed(file_name: str) -> None:
"""Add a file to the processed log file."""
with open(PROCESSED_FILE, "a") as f:
f.write(f"{file_name}\n")


def get_unprocessed_files(sftp: paramiko.SFTPClient, processed_files: set) -> list:
"""Get list of GPS files that haven't been processed yet."""
try:
files = [
f"{REMOTE_PATH}/{f}"
for f in sftp.listdir(REMOTE_PATH)
if f.endswith(".csv") and f not in processed_files
]
return sorted(files) # Sort for chronological processing
except FileNotFoundError:
print(f"Directory {REMOTE_PATH} not found.")
return []


def parse_gps_file(file_path: str) -> list:
"""Parse a GPS file and return data for insertion.
Example GPS DATA:
tryolabs@oceanlive:~/gps$ cat positions_20241008_1925.csv
MBX;2024-10-08 19:25:33;+6.7378291;N;-85.4017213;W; 2.64;121.6° (±24.3°);primary;;S
"""
gps_data = []
with open(file_path) as f:
for line in f:
parts = line.strip().split(";")
if len(parts) >= 3:
dt = datetime.strptime(parts[1], '%Y-%m-%d %H:%M:%S').replace(tzinfo=timezone.utc)
lat = float(parts[2])
lon = float(parts[4])
gps_data.append((dt, lat, lon))
return gps_data


def insert_gps_data(gps_data: list) -> None:
"""Insert GPS data into the database."""
conn = psycopg2.connect(database=DBNAME, user=DBUSER)
try:
with conn.cursor() as cur:
cur.executemany(
"INSERT INTO gpsdata (gps_datetime, lat, lon) VALUES (%s, %s, %s) ON CONFLICT (gps_datetime) DO NOTHING;",
gps_data,
)
conn.commit()
finally:
conn.close()


def check_and_process_gps() -> None:
"""Main function that will be scheduled."""
print("\nChecking for new GPS files...")

processed_files = read_processed_files()

try:
ssh = connect_to_server()
sftp = ssh.open_sftp()

# Get unprocessed files
unprocessed = get_unprocessed_files(sftp, processed_files)
print(f"Found {len(unprocessed)} GPS files to process.")

for remote_file in unprocessed:
try:
# Download file
local_file = f"{LOCAL_FOLDER}/{os.path.basename(remote_file)}"
print(f"Downloading {remote_file} to {local_file}...")
sftp.get(remote_file, local_file)

# Parse and insert data
gps_data = parse_gps_file(local_file)
insert_gps_data(gps_data)

# Clean up and mark as processed
os.remove(local_file)
mark_as_processed(os.path.basename(remote_file))
print(f"Finished processing {os.path.basename(remote_file)}.")

except Exception as e:
print(f"Error processing {remote_file}: {e}")

sftp.close()
ssh.close()

except Exception as e:
print(f"Connection error: {e}")


# Schedule the task
print("Starting GPS data fetch service...")
schedule.every(1).hour.do(check_and_process_gps)

# Run the job immediately when starting
check_and_process_gps()

# Keep the script running
while True:
schedule.run_pending()
time.sleep(1)
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ dev-dependencies = [
"pytest>=8.3.3",
"pytest-cov>=5.0.0",
"ruff>=0.7.0",
"paramiko>=3.5.0",
"schedule>=1.2.2",
"python-dotenv>=1.0.1",
]

[tool.pytest.ini_options]
Expand Down
Loading

0 comments on commit 952492b

Please sign in to comment.