Skip to content

Commit

Permalink
enhancement(zmq): Add zmq hook WIP
Browse files Browse the repository at this point in the history
Add HookManager
Add ZMQHook
Add JsonSerializable to serialize msg properly
  • Loading branch information
DarwinsBuddy committed Nov 23, 2024
1 parent 7d7f31f commit 3cf3880
Show file tree
Hide file tree
Showing 7 changed files with 192 additions and 41 deletions.
45 changes: 45 additions & 0 deletions foosball/hooks/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
import threading
from abc import ABC, abstractmethod

import asyncio


class Hook(ABC):

def __init__(self):
pass
@abstractmethod
def invoke(self, *args, **kwargs):
pass
Expand All @@ -13,3 +19,42 @@ def start(self, *args, **kwargs):
@abstractmethod
def stop(self, *args, **kwargs):
pass


class HookManager(threading.Thread):
def __init__(self):
super().__init__(daemon=True)
self.hooks = []
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self.stop_event = asyncio.Event()

def run(self):
for hook in self.hooks:
self.loop.call_soon_threadsafe(hook.start)
self.loop.run_until_complete(self.stop_event.wait())

def extend(self, hooks: [Hook], start=False):
self.hooks.extend(hooks)
for hook in hooks:
if start:
self.loop.call_soon_threadsafe(hook.start)

def add(self, hook: Hook, start=False):
self.hooks.append(hook)
if start:
self.loop.call_soon_threadsafe(hook.start)

def remove(self, hook: Hook):
self.loop.call_soon_threadsafe(hook.stop)
self.hooks.remove(hook)

def invoke(self, *args, **kwargs):
for hook in self.hooks:
self.loop.call_soon_threadsafe(hook.invoke(*args, **kwargs))

def stop(self):
for hook in self.hooks:
self.loop.call_soon_threadsafe(hook.stop)
self.hooks = []
self.stop_event.set()
80 changes: 67 additions & 13 deletions foosball/hooks/zmq/__init__.py
Original file line number Diff line number Diff line change
@@ -1,43 +1,97 @@
import json
import logging
import threading
import time
from multiprocessing import Queue
from queue import Full, Empty

import zmq

from .. import Hook


class ZMQHook(Hook):

def __init__(self, host="localhost", port=5555, topic="ws"):
super().__init__()
self.zmq_pub = ZMQPub(address=f"tcp://{host}:{port}")
self.topic = topic
self.zmq = ZMQDispatcher(host, port, topic)
self.started = False
self.start_lock = threading.Lock()

def invoke(self, message: dict | str, *args, **kwargs):
# TODO: send here FrameStats
self.zmq_pub.publish(message, topic=self.topic)
self.zmq.send(message)

def start(self, *args, **kwargs):
pass
with self.start_lock:
self.started = True
self.zmq.start()

def stop(self, *args, **kwargs):
self.zmq_pub.close()
self.zmq.stop()

class ZMQDispatcher(threading.Thread):
def __init__(self, host="localhost", port=5555, topic="ws"):
super().__init__(daemon=True)
self.zmq_pub = ZMQPub(address=f"tcp://{host}:{port}")
self.topic = topic
self.q = Queue()
self.stopped = threading.Event()

def send(self, message: dict | str, *args, **kwargs):
try:
if not self.stopped.is_set():
self.q.put_nowait(message)
except Full:
logging.error("ZMQHook queue is full")

def run(self):
logging.debug("ZMQDispatcher started")
try:
self.zmq_pub.start()
while not self.stopped.is_set():
try:
message = self.q.get(timeout=0.1)
self.zmq_pub.publish(message, topic=self.topic)
except Empty:
pass
except Exception as e:
logging.error(f"Error in ZMQDispatcher {e}")
self.stopped.set()
finally:
try:
while True:
self.q.get_nowait()
except Empty:
pass
self.zmq_pub.close()

def stop(self):
self.stopped.is_set()


class ZMQPub:
def __init__(self, address="tcp://127.0.0.1:5555"):
self.address = address
self.context = zmq.Context()
self.socket = self.context.socket(zmq.PUB)
self.socket.setsockopt(zmq.LINGER, 0)
logging.info(f"ZMQ Publisher started at {self.address}")

def start(self):
self.socket.bind(self.address) # Bind the socket to the address
time.sleep(0.5)

def publish(self, message: str | dict, topic=None):
msg = json.dumps(message) if type(message) is dict else message
logging.debug(f"ZMQ publish {msg} to {topic} ({self.address})")
if topic is not None:
self.socket.send_string(f"{topic} {msg}")
else:
# Send the message
self.socket.send_string(msg)
try:
msg = json.dumps(message) if type(message) is dict else message
# logging.debug(f"ZMQ publish {msg} to {topic} ({self.address})")
if topic is not None:
self.socket.send_string(f"{topic} {msg}")
else:
# Send the message
self.socket.send_string(msg)
except Exception as e:
logging.error(f"Error in ZMQ Publisher {e}")

def close(self):
self.socket.close()
Expand Down
34 changes: 29 additions & 5 deletions foosball/models.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import collections
from abc import ABC, abstractmethod
from dataclasses import dataclass
from enum import Enum
from typing import Union
Expand Down Expand Up @@ -39,9 +40,14 @@ class Team(Enum):
Rect = (Point, Point, Point, Point)
BBox = [int, int, int, int] # x y width height

class JsonSerializable(ABC):
@abstractmethod
def to_json(self) -> dict | list:
pass


@dataclass
class Score:
class Score(JsonSerializable):
blue: int = 0
red: int = 0

Expand All @@ -58,34 +64,52 @@ def inc(self, team: Team):
def to_string(self):
return f"{self.blue} : {self.red}"

def to_json(self) -> dict:
return self.__dict__


@dataclass
class FrameDimensions:
class FrameDimensions(JsonSerializable):
original: [int, int]
scaled: [int, int]
scale: float

def to_json(self) -> dict:
return self.__dict__


@dataclass
class Blob:
class Blob(JsonSerializable):
center: Point
bbox: BBox

def area(self):
[_, _, w, h] = self.bbox
return w * h

def to_json(self) -> dict:
return self.__dict__

Goal = Blob


@dataclass
class Goals:
class Goals(JsonSerializable):
left: Goal
right: Goal

def to_json(self) -> dict:
return { "left": self.left.to_json(), "right": self.right.to_json() }

class Track(collections.deque, JsonSerializable):
def to_json(self) -> list:
return [x for x in self]

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

Track = collections.deque
def __str__(self):
return str([x for x in self])


class Verbosity(Enum):
Expand Down
2 changes: 1 addition & 1 deletion foosball/tracking/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def __init__(self, stream, dims: FrameDimensions, goal_detector: GoalDetector, b
self.preprocessor = PreProcessor(dims, goal_detector, mask=mask, headless=headless, useGPU='preprocess' in gpu_flags,
calibrationMode=calibrationMode, **kwargs)
self.tracker = Tracker(ball_detector, useGPU='tracker' in gpu_flags, calibrationMode=calibrationMode, **kwargs)
self.analyzer = Analyzer(goal_grace_period_sec=goalGracePeriod, **kwargs)
self.analyzer = Analyzer(dims, goal_grace_period_sec=goalGracePeriod, **kwargs)
self.renderer = Renderer(dims, headless=headless, useGPU='render' in gpu_flags, **kwargs)

self.stream = stream
Expand Down
55 changes: 36 additions & 19 deletions foosball/tracking/analyzer/ScoreAnalyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
from typing import Optional

from . import AbstractAnalyzer
from ...hooks.zmq import ZMQHook
from .models import FrameStats
from ...hooks import HookManager
from ...hooks.zmq import ZMQHook, ZMQDispatcher
from ...hooks.webhook import Webhook
from ...hooks.audio import AudioHook
from ...models import Team, Goals, Score, Track, Verbosity, Info
from ...models import Team, Goals, Score, Track, Verbosity, Info, FrameDimensions
from ...pipe.BaseProcess import Msg
from ...utils import contains

Expand All @@ -21,27 +23,34 @@ class ScoreAnalyzerResult:

class ScoreAnalyzer(AbstractAnalyzer):
def close(self):
if self.hooks is not None and isinstance(self.hooks, list):
for h in self.hooks:
h.stop()
self.goal_hooks.stop()
self.frame_hooks.stop()
self.zmq_dispatcher.stop()

def __init__(self, goal_grace_period_sec: float = 1.0, *args, **kwargs):
def __init__(self, dims: FrameDimensions, goal_grace_period_sec: float = 1.0, *args, **kwargs):
super().__init__(name="ScoreAnalyzer")
self.dims = dims
self.kwargs = kwargs
self.goal_grace_period_sec = goal_grace_period_sec
self.score = Score()
self.score_reset = multiprocessing.Event()
self.last_track_sighting: dt.datetime | None = None
self.last_track: Optional[Track] = None
self.goal_candidate = None
self.goal_hooks = HookManager()
self.frame_hooks = HookManager()
self.zmq_hook = ZMQHook(host="localhost", port=5555, topic="ws")
# TODO: create them somewhere else and pass them down
self.hooks = [
self.goal_hooks.extend([
AudioHook("goal"),
Webhook.load_webhook('goal_webhook.yaml'),
ZMQHook(host="localhost", port=5555, topic="ws")
]
for h in self.hooks:
h.start()
self.zmq_hook
])
self.frame_hooks.extend([
self.zmq_hook
])
self.goal_hooks.start()
self.frame_hooks.start()

@staticmethod
def is_track_empty(track: Track):
Expand All @@ -66,6 +75,7 @@ def goal_shot(self, goals: Goals, track: Track) -> Optional[Team]:
def analyze(self, msg: Msg, timestamp: dt.datetime) -> [ScoreAnalyzerResult, [Info]]:
goals = msg.data["Tracker"].goals
track = msg.data["Tracker"].ball_track
ball = msg.data["Tracker"].ball
team_scored = None
try:
self.check_reset_score()
Expand All @@ -91,6 +101,15 @@ def analyze(self, msg: Msg, timestamp: dt.datetime) -> [ScoreAnalyzerResult, [In
self.logger.error("Error in analyzer ", e)
traceback.print_exc()
self.last_track = track
stats = FrameStats(
goals=goals,
track=track,
score=self.score,
ball=ball,
dims=self.dims,
timestamp=timestamp.strftime("%Y-%m-%dT%H:%M:%S")
)
self.frame_hooks.invoke(stats.to_dict())
return [
ScoreAnalyzerResult(score=self.score, team_scored=team_scored),
[Info(verbosity=Verbosity.INFO, title="Score", value=self.score.to_string())]
Expand All @@ -103,14 +122,12 @@ def check_reset_score(self):

def count_goal(self, team: Team, timestamp: dt.datetime):
self.score.inc(team)
for h in self.hooks:
try:
h.invoke({"team": team.value}, timestamp)
except Exception as e:
self.logger.error("Error in Analyzer - effects ")
print(e)
print(type(h))
traceback.print_exc()
try:
self.goal_hooks.invoke({"team": team.value}, timestamp)
except Exception as e:
self.logger.error("Error in Analyzer - effects")
self.logger.error(e)
traceback.print_exc()

def reset(self):
self.score_reset.set()
5 changes: 3 additions & 2 deletions foosball/tracking/analyzer/analyze.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import traceback

from ...models import FrameDimensions
from .ScoreAnalyzer import ScoreAnalyzer
from ...pipe.BaseProcess import BaseProcess, Msg

Expand All @@ -8,10 +9,10 @@ class Analyzer(BaseProcess):
def close(self):
pass

def __init__(self, goal_grace_period_sec: float = 1.0, *args, **kwargs):
def __init__(self, dims: FrameDimensions, goal_grace_period_sec: float = 1.0, *args, **kwargs):
super().__init__(name="Analyzer")
self.kwargs = kwargs
self.analyzers = [ScoreAnalyzer(goal_grace_period_sec, args, kwargs)]
self.analyzers = [ScoreAnalyzer(dims, goal_grace_period_sec, args, kwargs)]

def reset_score(self):
for a in self.analyzers:
Expand Down
Loading

0 comments on commit 3cf3880

Please sign in to comment.