Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding Extensions to Support Main Pipeline #99

Merged
merged 129 commits into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
129 commits
Select commit Hold shift + click to select a range
fdaef4d
watchdog setup
Sohambutala May 27, 2024
53df868
File Monitoring Extension
Sohambutala Jun 3, 2024
ff9de79
Change file name from extension
Sohambutala Jun 3, 2024
1121d5c
Linting
Sohambutala Jun 3, 2024
3354849
Added `fail-safe` flag
Sohambutala Jun 3, 2024
873632c
Implemented Fail Safe
Sohambutala Jun 4, 2024
8e7022f
Removed Model Definition
Sohambutala Jun 4, 2024
5d7013d
Deployable Echodataflow Trigger and Linting
Sohambutala Jun 5, 2024
8368c50
Added cleanup option
Sohambutala Jun 20, 2024
b1b5724
Changed for Prefect block compatibility
Sohambutala Jun 20, 2024
a117010
Changed to use Blocks instead of variables
Sohambutala Jun 20, 2024
1a080a4
Merge branch 'main' into realtime
Sohambutala Jun 20, 2024
85965ba
Added new enum EDF Run for File Monitoring metadata
Sohambutala Jun 20, 2024
caa353b
EDFRun Block reading util
Sohambutala Jun 20, 2024
c6f8146
Added concurrency setting to control number of files being downloaded
Sohambutala Jun 22, 2024
0578254
Merge branch 'realtime' of https://github.com/OSOceanAcoustics/echoda…
Sohambutala Jun 22, 2024
bc4cc0c
EDF downloader order maintaining
Sohambutala Jun 24, 2024
4678df1
Updated threshold condition
Sohambutala Jun 24, 2024
2820d44
Update time on every run
Sohambutala Jun 28, 2024
efc9ea8
Systemd process deploy scripts
Sohambutala Jun 28, 2024
a2cc1d5
Block Support for File Downloader
Sohambutala Jul 1, 2024
02ba832
New store after error threshold count
Sohambutala Jul 1, 2024
b4009a7
Added monitoring for specific extension
Sohambutala Jul 3, 2024
0a1c7aa
Merge branch 'main' into realtime
Sohambutala Jul 3, 2024
49afb49
Workflow version upgrade
Sohambutala Jul 3, 2024
6772b27
Merge branch 'realtime' of https://github.com/OSOceanAcoustics/echoda…
Sohambutala Jul 3, 2024
4c37b61
Merge branch 'main' into realtime
Sohambutala Jul 3, 2024
7dd6096
Readme badge
Sohambutala Jul 3, 2024
ae8d316
Added Echodataflow Service deploy script
Sohambutala Jul 3, 2024
2a330ff
Merge Main into Realtime
Sohambutala Jul 3, 2024
c1fa3e7
Updated deployment scripts specific to environment
Sohambutala Jul 3, 2024
9b03ea5
Updated default rules
Sohambutala Jul 3, 2024
87cb1a3
Added replace flag for downloader
Sohambutala Jul 4, 2024
8b0e35d
Update model path from yaml
Sohambutala Jul 4, 2024
cf84921
Updated default model
Sohambutala Jul 4, 2024
51fdeeb
Paramete to change store file name
Sohambutala Jul 4, 2024
b96ea4d
Small Fixes
Sohambutala Jul 4, 2024
7e023cd
Fixed: non-breaking flow, ping_time from external params
Sohambutala Jul 5, 2024
7b0f4cc
Store logic updated
Sohambutala Jul 8, 2024
1052c1c
Updated datetime convention used for file names
Sohambutala Jul 9, 2024
fa114bb
ML Hot fix; Slicing depth and including backgroung component in hake …
Sohambutala Jul 9, 2024
f801a25
Timestamp quotes issue fixed
Sohambutala Jul 9, 2024
d832396
ML Hot fix; Slicing depth and including backgroung component in hake …
Sohambutala Jul 9, 2024
b948c25
Updated NaN replacement values from -36 to -70
Sohambutala Jul 11, 2024
315b1d6
Sv Pipeline - Squashed stage
Sohambutala Jul 16, 2024
2c5e129
rclone support
Sohambutala Jul 17, 2024
d399c8d
Support for reprocessing unfinished last file
Sohambutala Jul 17, 2024
d2f6238
Support for reading individual Sv or MVBS files in folder treating as…
Sohambutala Jul 17, 2024
ad6c22a
Setting echopype log to verbose
Sohambutala Jul 17, 2024
fc1f1f2
Updated Requirements and Imports
Sohambutala Jul 17, 2024
4b53613
Null checks if no files at source
Sohambutala Jul 17, 2024
dd60135
Utilize UTC timezone
Sohambutala Jul 18, 2024
4b145ca
Lasker hotfixes
leewujung Jul 18, 2024
0054f1b
Rclone copy command
Sohambutala Jul 18, 2024
893ef90
MVBS combine, missing attrs fixed
leewujung Jul 18, 2024
5c3e648
Merge branch 'realtime' of https://github.com/OSOceanAcoustics/echoda…
leewujung Jul 18, 2024
30c987f
Configurable block name for ease of deployment duplication
Sohambutala Jul 18, 2024
ba22c48
Deployment renamed
Sohambutala Jul 18, 2024
0db0026
Deployment renamed
leewujung Jul 18, 2024
4a41470
Reverting hardcoded status change
Sohambutala Jul 19, 2024
66b188b
Floor end time based on window size
Sohambutala Jul 19, 2024
ef43369
rclone command support
Sohambutala Jul 20, 2024
c844f26
File Monitor fixes
Sohambutala Jul 20, 2024
8ce0278
Slicing path details from block storage
Sohambutala Jul 20, 2024
b3f0d44
Removing default sequential processing
Sohambutala Jul 20, 2024
f872f05
Reducing extra path data from metadata
Sohambutala Jul 20, 2024
818ddbc
rclone command support
Sohambutala Jul 20, 2024
a8bbd61
Pydantic improvement
Sohambutala Jul 20, 2024
b17e076
Run rclone command from edf-data-transfer
leewujung Jul 22, 2024
20bb904
Model Path comma removed
Sohambutala Jul 22, 2024
696a557
Updated start index calculation
Sohambutala Jul 22, 2024
21849bd
Write MVBS_Slice and delete at the end
leewujung Jul 23, 2024
97dce8b
Updated stage name to have MVBS Slice to delete later
leewujung Jul 23, 2024
192e9bc
Cancel scheduled flow if already running
Sohambutala Jul 23, 2024
7fe73af
Convert error to str
leewujung Jul 23, 2024
79e1c32
mvbs_slice reference missing
leewujung Jul 23, 2024
e2a84e9
Max number of files flag for file monitor
Sohambutala Jul 23, 2024
e2b237d
Merge branch 'realtime' of https://github.com/OSOceanAcoustics/echoda…
Sohambutala Jul 23, 2024
42aa115
Timeout for edf-data-transfer
Sohambutala Jul 23, 2024
f38bcab
Added timeout for edf-data-transfer
Sohambutala Jul 23, 2024
95483ce
Added flag `time_rounding_flag` to skip end time rounding
Sohambutala Jul 23, 2024
130fa86
Reduce count of last file assuming it is still not complete
Sohambutala Aug 1, 2024
8e2afce
Echoshader Integration
Sohambutala Aug 1, 2024
2f1e6ce
File modification time extracted from filename
Sohambutala Aug 8, 2024
bb74619
Timezone info
leewujung Aug 8, 2024
6f4e066
added print statements
leewujung Aug 8, 2024
32d2ff4
Removed print statement
leewujung Aug 10, 2024
db58937
Echopop initial draft
Sohambutala Aug 10, 2024
b6e5bde
fix dimension order of MVBS dataset, use .clone() to remove torch war…
leewujung Aug 12, 2024
369f394
reordering depth ping_time sequence in dims
valentina-s Aug 13, 2024
83ede9f
Echopop integration changes
Sohambutala Aug 13, 2024
9d7f4cb
File Monitor Batch processing support
Sohambutala Aug 14, 2024
4a40a53
Echopop remote repo support
Sohambutala Aug 14, 2024
74d4f32
Flow change from on-ship to on-cloud
Sohambutala Aug 14, 2024
0459ad3
Panel Updated code
Sohambutala Aug 14, 2024
369f98a
data refresh changes
Sohambutala Aug 14, 2024
1735b55
Shlex split to parse commands correctly
Sohambutala Aug 15, 2024
856754a
fix mask using xr.where
leewujung Aug 16, 2024
49872fd
Echopop panel app first version
Sohambutala Aug 17, 2024
7309b41
predictions working version
valentina-s Aug 17, 2024
672727d
adding predictions stage
valentina-s Aug 18, 2024
4af2a92
removing comments
valentina-s Aug 18, 2024
724ddd9
add predictions
leewujung Aug 18, 2024
1ee56cb
adding panel app
leewujung Aug 18, 2024
a3be75c
adding no chunking while saving
leewujung Aug 18, 2024
b4b06c4
Added sorting and remote fs monitoring options
Sohambutala Aug 19, 2024
7bf4304
Query Echopop DB to mark files as processed
Sohambutala Aug 19, 2024
1b450d9
Cleanup
Sohambutala Aug 19, 2024
75ff6e0
panel with buttons
leewujung Aug 19, 2024
24126d2
ordering multi frequencies
leewujung Aug 19, 2024
c81efe0
frequencies to list
leewujung Aug 19, 2024
d1020f3
fix dim order and slice depth to 590 in combine_datasets
leewujung Aug 20, 2024
babab77
change dims sequence
leewujung Aug 20, 2024
68ce7ed
add one more compute() statement when reading data
leewujung Aug 20, 2024
40bc57a
adding chunking in the saving statement
leewujung Aug 20, 2024
369934d
panel_apps
leewujung Aug 28, 2024
6b55013
panel app compute
leewujung Aug 28, 2024
cac065f
panel app resolve conflict
leewujung Aug 28, 2024
628a4af
Using regex instead of str split
leewujung Aug 29, 2024
31c1105
Merge branch 'realtime' into echoshader
Sohambutala Sep 4, 2024
6a845b9
Merge Echopop into Realtime branch
Sohambutala Sep 4, 2024
ff78cec
Prefect version change
Sohambutala Sep 4, 2024
8254560
Indentation error
Sohambutala Sep 4, 2024
b9c84b6
Update main.yaml
Sohambutala Sep 4, 2024
871cbf7
Update and rename docker-release.yml to docker-build.yml
Sohambutala Sep 4, 2024
f766946
Update docker-build.yml
Sohambutala Sep 4, 2024
2f14c9b
Merge remote-tracking branch 'origin/echoshader' into realtime
Sohambutala Sep 4, 2024
082c072
Merge remote-tracking branch 'origin/echopop' into realtime
Sohambutala Sep 5, 2024
83bb7d1
Merge remote-tracking branch 'origin/echoshader_predictions' into rea…
Sohambutala Sep 5, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
name: Docker Image CI
name: Build Docker Image

on:
push:
branches: [ "main" ]
branches:
- '*'
pull_request:
branches: [ "main" ]
branches:
- '*'

jobs:

Expand All @@ -16,3 +18,4 @@ jobs:
- uses: actions/checkout@v4
- name: Build the Docker image
run: docker build . --file Dockerfile --tag my-image-name:$(date +%s)

15 changes: 13 additions & 2 deletions .github/workflows/main.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,19 @@ jobs:
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
- name: Install echodataflow
run: pip install .[all]

- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install pytest pytest-cov
pip install -r requirements.txt
pip install -e .

- name: Initialize
run: |
echodataflow init
prefect profiles create echodataflow-local

- name: Run unit tests
env:
FORCE_COLOR: 3
Expand Down
15 changes: 15 additions & 0 deletions deployment/Linux/FileDownloader.service
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[Unit]
Description=Prefect serve
After=network.target

[Service]
User=exouser
WorkingDirectory=/home/exouser/Desktop/Echodataflow
ExecStart=/home/exouser/Desktop/echodataflowv2/bin/python /home/exouser/Desktop/echodataflowv2/lib/python3.10/site-packages/echodataflow/extensions/file_downloader.py
Restart=always
StandardOutput=syslog
StandardError=syslog
SyslogIdentifier=echodataflow

[Install]
WantedBy=multi-user.target
15 changes: 15 additions & 0 deletions deployment/Linux/FileMonitor.service
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[Unit]
Description=Prefect serve
After=network.target

[Service]
User=exouser
WorkingDirectory=/home/exouser/Desktop/Echodataflow
ExecStart=/home/exouser/Desktop/echodataflowv2/bin/python /home/exouser/Desktop/echodataflowv2/lib/python3.10/site-packages/echodataflow/extensions/file_monitor.py
Restart=always
StandardOutput=syslog
StandardError=syslog
SyslogIdentifier=echodataflow

[Install]
WantedBy=multi-user.target
15 changes: 15 additions & 0 deletions deployment/Linux/echodataflow.service
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[Unit]
Description=Prefect serve
After=network.target

[Service]
User=exouser
WorkingDirectory=/home/exouser/Desktop/Echodataflow/
ExecStart=/home/exouser/Desktop/echodataflowv2/bin/python /home/exouser/Desktop/Echodataflow/main.py
Restart=always
StandardOutput=syslog
StandardError=syslog
SyslogIdentifier=echodataflow

[Install]
WantedBy=multi-user.target
25 changes: 25 additions & 0 deletions deployment/MacOS/FileDownloader.plist
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
<plist version="1.0">
<dict>
<key>Label</key>
<string>com.example.filedownloader</string>
<key>UserName</key>
<string>exouser</string>
<key>WorkingDirectory</key>
<string>/Users/exouser/Desktop/Echodataflow</string>
<key>ProgramArguments</key>
<array>
<string>/Users/exouser/Desktop/echodataflowv2/bin/python</string>
<string>/Users/exouser/Desktop/echodataflowv2/lib/python3.10/site-packages/echodataflow/extensions/file_downloader.py</string>
</array>
<key>RunAtLoad</key>
<true/>
<key>KeepAlive</key>
<true/>
<key>StandardOutPath</key>
<string>/var/log/filedownloader.log</string>
<key>StandardErrorPath</key>
<string>/var/log/filedownloader.err</string>
</dict>
</plist>
25 changes: 25 additions & 0 deletions deployment/MacOS/FileMonitor.plist
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
<plist version="1.0">
<dict>
<key>Label</key>
<string>com.example.filemonitor</string>
<key>UserName</key>
<string>exouser</string>
<key>WorkingDirectory</key>
<string>/Users/exouser/Desktop/Echodataflow</string>
<key>ProgramArguments</key>
<array>
<string>/Users/exouser/Desktop/echodataflowv2/bin/python</string>
<string>/Users/exouser/Desktop/echodataflowv2/lib/python3.10/site-packages/echodataflow/extensions/file_monitor.py</string>
</array>
<key>RunAtLoad</key>
<true/>
<key>KeepAlive</key>
<true/>
<key>StandardOutPath</key>
<string>/var/log/filemonitor.log</string>
<key>StandardErrorPath</key>
<string>/var/log/filemonitor.err</string>
</dict>
</plist>
25 changes: 25 additions & 0 deletions deployment/MacOS/echodataflow.plist
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
<plist version="1.0">
<dict>
<key>Label</key>
<string>com.example.echodataflow</string>
<key>UserName</key>
<string>exouser</string>
<key>WorkingDirectory</key>
<string>/Users/exouser/Desktop/Echodataflow/</string>
<key>ProgramArguments</key>
<array>
<string>/Users/exouser/Desktop/echodataflowv2/bin/python</string>
<string>/Users/exouser/Desktop/Echodataflow/main.py</string>
</array>
<key>RunAtLoad</key>
<true/>
<key>KeepAlive</key>
<true/>
<key>StandardOutPath</key>
<string>/var/log/echodataflow.log</string>
<key>StandardErrorPath</key>
<string>/var/log/echodataflow.err</string>
</dict>
</plist>
56 changes: 56 additions & 0 deletions deployment/docker_trigger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import os
import json
from pathlib import Path
from echoflow import echoflow_start
from echoflow.stages.echoflow_trigger import echoflow_trigger
from prefect import flow
from prefect.task_runners import SequentialTaskRunner
from typing import Any, Dict, Optional, Union

@flow(name="Docker-Trigger", task_runner=SequentialTaskRunner())
def docker_trigger(
dataset_config: Union[dict, str, Path],
pipeline_config: Union[dict, str, Path],
logging_config: Union[dict, str, Path] = None,
storage_options: Optional[dict] = None,
options: Optional[dict] = {},
json_data_path: Union[str, Path] = None
):
return echoflow_trigger(
dataset_config=dataset_config,
pipeline_config=pipeline_config,
logging_config=logging_config,
storage_options=storage_options,
options=options,
json_data_path=json_data_path
)

def convert_to_proper_type(input_str):
if input_str is None:
return None

# Try to parse input as JSON
try:
return json.loads(input_str)
except (ValueError, TypeError):
pass

# If it's not valid JSON, check if it's a file path
if os.path.exists(input_str):
return Path(input_str)

# If not JSON or a valid file path, return the input as is
return input_str

if __name__ == "__main__":
print("Entered")
# Get inputs from environment variables or command-line arguments
dataset_config = convert_to_proper_type(os.getenv('DATASET_CONFIG', '{}'))
pipeline_config = convert_to_proper_type(os.getenv('PIPELINE_CONFIG', '{}'))
logging_config = convert_to_proper_type(os.getenv('LOGGING_CONFIG', None))
storage_options = convert_to_proper_type(os.getenv('STORAGE_OPTIONS', None))
options = convert_to_proper_type(os.getenv('OPTIONS', '{}'))
json_data_path = convert_to_proper_type(os.getenv('JSON_DATA_PATH', None))

# Call the docker_trigger function
docker_trigger(dataset_config, pipeline_config, logging_config, storage_options, options, json_data_path)
28 changes: 28 additions & 0 deletions deployment/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import logging
from echodataflow import echodataflow_start
from prefect import flow
from prefect.blocks.core import Block
from typing import Any, Dict, Optional, Union
from pathlib import Path

logging.basicConfig(filename='/home/exouser/Desktop/Echodataflow/echodataflow/logs/service.log', level=logging.DEBUG, format='%(asctime)s %(levelname)s:%(message)s')

@flow
def echodataflow(dataset_config: Union[Dict[str, Any], str, Path],
pipeline_config: Union[Dict[str, Any], str, Path],
logging_config: Union[Dict[str, Any], str, Path] = None,
storage_options: Union[Dict[str, Any], Block] = None,
options: Optional[Dict[str, Any]] = {},
json_data_path: Union[str, Path] = None):

echodataflow_start(dataset_config=dataset_config, pipeline_config=pipeline_config, logging_config=logging_config
, storage_options=storage_options, options=options, json_data_path=json_data_path)

if __name__ == "__main__":
logging.debug("Starting the Echodataflow service")
try:
echodataflow.serve(name="Echodataflow")
logging.debug("Echodataflow service started successfully")
except Exception as e:
logging.error(f"Error starting the Echodataflow service: {e}")
raise
4 changes: 2 additions & 2 deletions echodataflow/aspects/echodataflow_aspect.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,18 @@ def my_function(arg1, arg2):
# Function code here
pass
"""

def decorator(func=None):
def before_function_call(
gea: Singleton_Echodataflow, type: str, processing_stage: str, *args, **kwargs
):

if gea:
gea.log(
msg=f"Entering with memory at {gea.log_memory_usage()}: ",
extra={"mod_name": func.__module__, "func_name": func.__name__},
level=logging.DEBUG,
)

# Deprecating, since we have check before starting the pipeline

# if type == "FLOW" and processing_stage!= "DEFAULT":
Expand Down
27 changes: 12 additions & 15 deletions echodataflow/aspects/singleton_echodataflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,20 +55,21 @@ class Singleton_Echodataflow:
dataset: Dataset
db_log: DB_Log
logger: logging.Logger = None
log_level: int = 0
log_level: int = 0

def __new__(
cls,
log_file: Union[Dict[str, str], str] = None,
pipeline: Recipe = None,
dataset: Dataset = None,
dataset: Dataset = None
) -> "Singleton_Echodataflow":

if cls._instance is None:
cls._instance = super().__new__(cls)
if log_file is not None:
cls._instance.logger = cls._instance.logger_init(log_file)
cls._instance.log_level = cls._instance.logger.level

cls._instance.pipeline = pipeline
cls._instance.dataset = dataset
# cls._instance.db_log = cls._instance.setup_echodataflow_db()
Expand All @@ -84,7 +85,7 @@ def get_instance(self) -> "Singleton_Echodataflow":
Singleton_Echodataflow: The Singleton_Echodataflow instance.
"""
return self._instance

@classmethod
def get_logger(self) -> "logging.Logger":
"""
Expand Down Expand Up @@ -143,10 +144,7 @@ def log(self, msg, level, extra):
extra: Extra information to include in the log record.
"""
if self.logger is not None:
self.logger.log(level=logging.DEBUG, msg=msg, extra=extra)
self.logger.log(level=logging.ERROR, msg=msg, extra=extra)
self.logger.log(level=logging.INFO, msg=msg, extra=extra)
self.logger.log(level=logging.WARNING, msg=msg, extra=extra)
self.logger.log(level=level, msg=msg, extra=extra)
else:
print(f"{extra} : {msg}")

Expand Down Expand Up @@ -182,6 +180,7 @@ def insert_log_data(self):
print(e)
finally:
conn.close()


def log_memory_usage(self):
"""
Expand All @@ -193,7 +192,7 @@ def log_memory_usage(self):
process = psutil.Process()
mem = process.memory_info().rss
return mem

def get_possible_next_functions(self, function_name: str):
"""
Get Possible Next Functions
Expand All @@ -212,7 +211,7 @@ def get_possible_next_functions(self, function_name: str):
next_functions = executor.get_possible_next_functions(function_name="data_download")
"""
return self.rengine.get_possible_next_functions(function_name)

def load(self):
"""
Load Rules and Dependencies
Expand All @@ -228,12 +227,10 @@ def load(self):
loaded_rengine = executor.load()
"""
rengine: DependencyEngine = DependencyEngine()
rules_file_path = os.path.expanduser(
os.path.join("~", ".echodataflow", "echodataflow_rules.txt")
)
rules_file_path = os.path.expanduser(os.path.join("~", ".echodataflow", "echodataflow_rules.txt"))

with open(rules_file_path, "r") as file:
with open(rules_file_path, 'r') as file:
for line in file:
target, dependent = line.strip("\r\n").split(":")
target, dependent = line.strip().split(':')
rengine.add_dependency(target_function=target, dependent_function=dependent)
return rengine
Loading
Loading