Skip to content

Commit

Permalink
Add 3d ball position estimation WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
DarwinsBuddy committed Oct 14, 2023
1 parent b4cbed5 commit 3ccb730
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 62 deletions.
3 changes: 1 addition & 2 deletions foosball/arUcos/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
from .calibration import Calibration
from .models import Aruco
from .calibration import Calibration
33 changes: 19 additions & 14 deletions foosball/arUcos/calibration.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,9 @@
import yaml
from tqdm import tqdm

from .models import Aruco
from ..models import Frame
from ..models import Frame, Aruco
from ..utils import ensure_cpu


logger = logging.getLogger(__name__)
DEFAULT_MARKER_SEPARATION_CM = 1.0

Expand All @@ -34,7 +32,7 @@ class Calibration:
dist_coefficients: np.array = None
_image_markers: List[List[Aruco]] = []

def __init__(self, board: aruco.GridBoard, camera_matrix: np.array = None, dist_coefficients: np.array = None):
def __init__(self, board: aruco.GridBoard = None, camera_matrix: np.array = None, dist_coefficients: np.array = None):
self.camera_matrix = camera_matrix
self.dist_coefficients = dist_coefficients
self.board = board
Expand Down Expand Up @@ -62,8 +60,7 @@ def recalibrate(self, shape: np.array, sample_size: Optional[int] = None) -> boo
logger.debug("Calibrating camera ....")
if len(corners_list) > 0:
try:
ret, mtx, dist, rvecs, tvecs = aruco.calibrateCameraAruco(corners_list, id_list, np.array(counter),
self.board, shape, None, None)
ret, mtx, dist, rvecs, tvecs = aruco.calibrateCameraAruco(corners_list, id_list, np.array(counter), self.board, shape, None, None)
self.camera_matrix = mtx
self.dist_coefficients = dist
return True
Expand Down Expand Up @@ -127,13 +124,15 @@ def draw_markers(img: Frame, arucos: list[Aruco], calib: Calibration = None) ->
return img


def detect_markers(image, detector: aruco.ArucoDetector) -> list[Aruco]:
def detect_markers(image, detector: aruco.ArucoDetector, calib: Calibration) -> list[Aruco]:
corners, ids, rejected_img_points = detector.detectMarkers(image)
ids = ensure_cpu(ids)
# if rejected_img_points is not None:
# logger.debug(f"Marker detection rejected {len(rejected_img_points)}")

if ids is not None:
return [Aruco(np.array(i[0]), c) for i, c in list(zip(ids, corners))]
arucos = [Aruco(np.array(i[0]), c, None, None) for i, c in list(zip(ids, corners))]
return estimate_markers_poses(arucos, calib)
else:
return []

Expand All @@ -143,8 +142,8 @@ def estimate_markers_poses(arucos: List[Aruco], calib: Calibration):
# Estimate pose of each marker and return the values rotation_vector and translation_vector
# (different from those of camera coefficients)
rotation_vectors, translation_vectors, marker_points = aruco.estimatePoseSingleMarkers(corners, 0.02,
calib.camera_matrix,
calib.dist_coefficients)
calib.camera_matrix,
calib.dist_coefficients)
if rotation_vectors is not None and translation_vectors is not None:
for i in range(0, len(arucos)):
arucos[i].rotation_vector = np.array([rotation_vectors[i]])
Expand All @@ -157,22 +156,29 @@ def init_aruco_detector(aruco_dictionary, aruco_params):
detector = aruco.ArucoDetector(aruco_dict, aruco_params)
return detector, aruco_dict


def generate_aruco_board(aruco_dict, marker_length_cm, marker_separation_cm):
board = aruco.GridBoard((4, 5), marker_length_cm, marker_separation_cm, aruco_dict)
return board


def generate_aruco_board_image(board):
board_img = aruco.drawPlanarBoard(board, (864, 1080), marginSize=0, borderBits=1)
return board_img

def print_aruco_board(filename='aruco.png', aruco_dictionary=aruco.DICT_4X4_1000, aruco_params=aruco.DetectorParameters(), marker_length_cm=DEFAULT_MARKER_LENGTH_CM, marker_separation_cm=DEFAULT_MARKER_SEPARATION_CM):

def print_aruco_board(filename='aruco.png', aruco_dictionary=aruco.DICT_4X4_1000,
aruco_params=aruco.DetectorParameters(), marker_length_cm=DEFAULT_MARKER_LENGTH_CM,
marker_separation_cm=DEFAULT_MARKER_SEPARATION_CM):
detector, aruco_dict = init_aruco_detector(aruco_dictionary=aruco_dictionary, aruco_params=aruco_params)
board = generate_aruco_board(aruco_dict, marker_length_cm, marker_separation_cm)
board_img = generate_aruco_board_image(board)
cv2.imwrite(filename, board_img)


def calibrate_camera(camera_id=None, calibration_video_path=None, calibration_images_path=None, headless=False,
aruco_dictionary=aruco.DICT_4X4_1000, marker_length_cm=DEFAULT_MARKER_LENGTH_CM, marker_separation_cm=DEFAULT_MARKER_SEPARATION_CM,
aruco_dictionary=aruco.DICT_4X4_1000, marker_length_cm=DEFAULT_MARKER_LENGTH_CM,
marker_separation_cm=DEFAULT_MARKER_SEPARATION_CM,
aruco_params=aruco.DetectorParameters(), recording_time=5, sample_size=None):
print("CAMERA: ", camera_id)
print("images: ", calibration_images_path)
Expand Down Expand Up @@ -215,9 +221,8 @@ def calibrate_camera(camera_id=None, calibration_video_path=None, calibration_im
while (camera_id is None or (time.time() - start) < recording_time) and ret:
img_gray = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
shape = img_gray.shape
arucos = detect_markers(img_gray, detector)
arucos = detect_markers(img_gray, detector, calib)
if not headless:
arucos = estimate_markers_poses(arucos, calib)
img = draw_markers(img, arucos, calib)
calib.add_image_markers(arucos)
if not headless:
Expand Down
8 changes: 0 additions & 8 deletions foosball/arUcos/models.py
Original file line number Diff line number Diff line change
@@ -1,9 +1 @@
from dataclasses import dataclass
import numpy as np

@dataclass
class Aruco:
id: np.array
corners: np.array
rotation_vector: np.array = None
translation_vector: np.array = None
10 changes: 10 additions & 0 deletions foosball/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,14 @@ def hsv2rgb(hsv: HSV) -> RGB:
return cv2.cvtColor(np.uint8([[hsv]]), cv2.COLOR_HSV2RGB)[0][0]


@dataclass
class Aruco:
id: np.array
corners: np.array
rotation_vector: np.array
translation_vector: np.array


CPUFrame = np.array
GPUFrame = cv2.UMat
Frame = Union[CPUFrame, GPUFrame]
Expand All @@ -38,6 +46,7 @@ class Team(Enum):


Point = [int, int]
Point3D = [int, int, int]
Rect = (Point, Point, Point, Point)
BBox = [int, int, int, int] # x y width height

Expand Down Expand Up @@ -174,6 +183,7 @@ class AnalyzeResult:
class PreprocessResult:
original: CPUFrame
preprocessed: Optional[CPUFrame]
arucos: list[Aruco]
homography_matrix: Optional[np.ndarray] # 3x3 matrix used to warp the image and project points
goals: Optional[Goals]
info: Info
73 changes: 39 additions & 34 deletions foosball/tracking/preprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@
import numpy as np

from .colordetection import detect_goals
from ..arUcos import calibration, Aruco
from ..models import Frame, PreprocessResult, Point, Rect, GoalConfig, Blob, Goals, FrameDimensions, ScaleDirection
from ..arUcos import calibration, Calibration
from ..models import Frame, PreprocessResult, Point, Rect, GoalConfig, Blob, Goals, FrameDimensions, ScaleDirection, \
Aruco
from ..pipe.BaseProcess import BaseProcess, Msg
from ..pipe.Pipe import clear
from ..utils import ensure_cpu, generate_processor_switches, relative_change, scale
Expand Down Expand Up @@ -61,6 +62,7 @@ def __init__(self, dims: FrameDimensions, goal_config: GoalConfig, headless=True
self.frames_since_last_marker_detection = 0
self.goals = None
self.calibration = kwargs.get('calibration')
self.camera_calibration = Calibration().load()
self.verbose = kwargs.get('verbose')
self.goals_calibration = self.calibration == "goal"
self.calibration_out = Queue() if self.goals_calibration else None
Expand All @@ -79,7 +81,7 @@ def close(self) -> None:

def detect_markers(self, frame: Frame) -> list[Aruco]:
img_gray = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY)
return calibration.detect_markers(img_gray, self.detector)
return calibration.detect_markers(img_gray, self.detector, self.camera_calibration)

def mask_frame(self, frame: Frame) -> Frame:
"""
Expand Down Expand Up @@ -141,41 +143,16 @@ def process(self, msg: Msg) -> Msg:
except Exception as e:
self.logger.error(f"Error in preprocess {e}")
traceback.print_exc()
return Msg(kwargs={"time": timestamp, "result": PreprocessResult(self.iproc(frame), self.iproc(preprocessed), self.homography_matrix, self.goals, info)})

@staticmethod
def corners2pt(corners) -> [int, int]:
moments = cv2.moments(corners)
c_x = int(moments["m10"] / moments["m00"])
c_y = int(moments["m01"] / moments["m00"])
return [c_x, c_y]

@staticmethod
def order_points(pts: np.ndarray) -> np.ndarray:
# initialize a list of coordinates that will be ordered
# such that the first entry in the list is the top-left,
# the second entry is the top-right, the third is the
# bottom-right, and the fourth is the bottom-left
rect = np.zeros((4, 2), dtype="float32")
# the top-left point will have the smallest sum, whereas
# the bottom-right point will have the largest sum
s = pts.sum(axis=1)
rect[0] = pts[np.argmin(s)]
rect[2] = pts[np.argmax(s)]
# now, compute the difference between the points, the
# top-right point will have the smallest difference,
# whereas the bottom-left will have the largest difference
diff = np.diff(pts, axis=1)
rect[1] = pts[np.argmin(diff)]
rect[3] = pts[np.argmax(diff)]
# return the ordered coordinates
return rect
return Msg(kwargs={
"time": timestamp,
"result": PreprocessResult(self.iproc(frame), self.iproc(preprocessed), self.markers, self.homography_matrix, self.goals, info)
})

def four_point_transform(self, frame: Frame, markers: list[Aruco]) -> tuple[Frame, [int, int]]:
pts = np.array([self.corners2pt(marker.corners) for marker in markers])
pts = np.array([corners2pt(marker.corners) for marker in markers])
# obtain a consistent order of the points and unpack them
# individually
src_pts = self.order_points(pts)
src_pts = order_points(pts)
# pad
src_pts = pad_rect(src_pts, self.xpad, self.ypad)
(tl, tr, br, bl) = src_pts
Expand Down Expand Up @@ -219,6 +196,34 @@ def four_point_transform(self, frame: Frame, markers: list[Aruco]) -> tuple[Fram
return warped, homography_matrix


def corners2pt(corners) -> [int, int]:
moments = cv2.moments(corners)
c_x = int(moments["m10"] / moments["m00"])
c_y = int(moments["m01"] / moments["m00"])
return [c_x, c_y]


def order_points(pts: np.ndarray) -> np.ndarray:
# initialize a list of coordinates that will be ordered
# such that the first entry in the list is the top-left,
# the second entry is the top-right, the third is the
# bottom-right, and the fourth is the bottom-left
rect = np.zeros((4, 2), dtype="float32")
# the top-left point will have the smallest sum, whereas
# the bottom-right point will have the largest sum
s = pts.sum(axis=1)
rect[0] = pts[np.argmin(s)]
rect[2] = pts[np.argmax(s)]
# now, compute the difference between the points, the
# top-right point will have the smallest difference,
# whereas the bottom-left will have the largest difference
diff = np.diff(pts, axis=1)
rect[1] = pts[np.argmin(diff)]
rect[3] = pts[np.argmax(diff)]
# return the ordered coordinates
return rect


def project_point(pt: Point, homography_matrix: np.array, mode: WarpMode):
H = homography_matrix if mode == WarpMode.WARP else np.linalg.inv(homography_matrix)
src = np.array([pt[0], pt[1], 1]) # add a dimension to convert into homogenous coordinates
Expand Down
69 changes: 65 additions & 4 deletions foosball/tracking/tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,14 @@
from multiprocessing import Queue
from queue import Empty

import cv2
import numpy as np
from cv2.typing import Point3d

from .colordetection import detect_ball
from .preprocess import WarpMode, project_blob
from ..models import TrackResult, Track, BallConfig, Info, Blob, Goals
from .preprocess import WarpMode, project_blob, corners2pt
from ..arUcos import Calibration
from ..models import Aruco, TrackResult, Track, BallConfig, Info, Blob, Goals, Point, Point3D
from ..pipe.BaseProcess import BaseProcess, Msg
from ..pipe.Pipe import clear
from ..utils import generate_processor_switches
Expand All @@ -22,9 +27,11 @@ def __init__(self, ball_bounds: BallConfig, useGPU: bool = False, **kwargs):
super().__init__(name="Tracker")
self.kwargs = kwargs
self.ball_track = Track(maxlen=kwargs.get('buffer'))
self.ball_track_3d = Track(maxlen=kwargs.get('buffer'))
self.off = kwargs.get('off')
self.verbose = kwargs.get("verbose")
self.calibration = kwargs.get("calibration")
self.camera_matrix = Calibration().load().camera_matrix
[self.proc, self.iproc] = generate_processor_switches(useGPU)
# define the lower_ball and upper_ball boundaries of the
# ball in the HSV color space, then initialize the
Expand All @@ -41,7 +48,12 @@ def close(self) -> None:
clear(self.calibration_out)
self.calibration_out.close()

def update_ball_track(self, detected_ball: Blob) -> Track:
def update_ball_tracks(self, detected_ball: Blob, ball3d: Point3d) -> Track:
# TODO: refactor this
self.ball_track_3d.appendleft(ball3d)
if len(self.ball_track_3d) > 1 and self.ball_track_3d[-2] is not None and self.ball_track_3d[-1] is not None:
# TODO: catch on here
print("Distance travelled ", self.ball_track_3d[-2] - self.ball_track_3d[-1])
if detected_ball is not None:
center = detected_ball.center
# update the points queue (track history)
Expand Down Expand Up @@ -89,6 +101,10 @@ def process(self, msg: Msg) -> Msg:
f = self.proc(preprocess_result.preprocessed if preprocess_result.preprocessed is not None else preprocess_result.original)
ball_detection_result = detect_ball(f, self.ball_bounds)
ball = ball_detection_result.ball
# if we have some markers detected in preprocess step, we can determine the 3d position of the ball
ball3d = None
if ball is not None and preprocess_result.arucos is not None:
ball3d = get_3d_position(ball.center, preprocess_result.arucos, self.camera_matrix)
# do not forget to project detected points onto the original frame on rendering
if not self.verbose:
if ball is not None and preprocess_result.homography_matrix is not None:
Expand All @@ -101,7 +117,7 @@ def process(self, msg: Msg) -> Msg:
if self.ball_calibration:
self.calibration_out.put_nowait(self.iproc(ball_detection_result.frame))
# copy deque, since we otherwise run into odd tracks displayed
ball_track = self.update_ball_track(ball).copy()
ball_track = self.update_ball_tracks(ball, ball3d).copy()
info = preprocess_result.info + self.get_info(ball_track)
except Exception as e:
logger.error(f"Error in track {e}")
Expand All @@ -110,3 +126,48 @@ def process(self, msg: Msg) -> Msg:
return Msg(kwargs={"time": timestamp, "result": TrackResult(preprocess_result.original, goals, ball_track, ball, info)})
else:
return Msg(kwargs={"time": timestamp, "result": TrackResult(preprocess_result.preprocessed, goals, ball_track, ball, info)})


def get_3d_position(point2d: Point, arucos: list[Aruco], camera_matrix) -> Point3D:
"""
Calculate the 3D position of the point within the area covered by the ArUco markers
"""
# TODO: refactor this to be more efficient
try:
aruco_marker_size_cm = 5.0 # TODO let this come from somewhere fixed or arguments
aruco_points = [corners2pt(a.corners) for a in arucos]
point = np.array([*point2d, 1])
scale_factors = [aruco_marker_size_cm / np.linalg.norm(t) for t in [a.translation_vector for a in arucos]]
rvecs = [a.rotation_vector * scale_factors[i] for i, a in enumerate(arucos)]
tvecs = [a.translation_vector * scale_factors[i] for i, a in enumerate(arucos)]
if arucos is not None:
# Define the transformation matrices for the ArUco markers
transformation_matrices = []
for i in range(4):
R, _ = cv2.Rodrigues(rvecs[i])
T = tvecs[i]
extrinsic_matrix = np.hstack((R, T.reshape(3, 1)))
transformation_matrices.append(extrinsic_matrix)

# Define the 3D positions of the ArUco markers
marker_positions = []
for i in range(4):
homogeneous_marker = np.hstack((aruco_points[i], 1))
marker_position = np.dot(np.linalg.inv(camera_matrix), homogeneous_marker)
marker_positions.append(marker_position)

# Calculate the 3D position of the input point
point_homogeneous = np.hstack((point, 1))
estimated_point = np.zeros(3)
for i in range(4):
inv_extrinsic = np.linalg.inv(np.vstack((transformation_matrices[i], [0, 0, 0, 1])))
result = np.dot(inv_extrinsic, point_homogeneous)
estimated_point += result[:3] * result[3] / marker_positions[i][2]

estimated_point /= 4 # Average the estimated positions from all 4 markers

return estimated_point
except Exception as e:
logging.error("Dang it!", e)
traceback.print_exc()
return None

0 comments on commit 3ccb730

Please sign in to comment.