-
Notifications
You must be signed in to change notification settings - Fork 68
/
camera_client.py
77 lines (63 loc) · 2.35 KB
/
camera_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# Author: Jimmy Wu
# Date: February 2023
import argparse
import time
from multiprocessing import resource_tracker, shared_memory
from multiprocessing.connection import Client
import cv2 as cv
import numpy as np
from constants import CONN_AUTHKEY
# See https://bugs.python.org/issue38119
def remove_shm_from_resource_tracker():
# pylint: disable=all
def fix_register(name, rtype):
if rtype == "shared_memory":
return
return resource_tracker._resource_tracker.register(self, name, rtype)
resource_tracker.register = fix_register
def fix_unregister(name, rtype):
if rtype == "shared_memory":
return
return resource_tracker._resource_tracker.unregister(self, name, rtype)
resource_tracker.unregister = fix_unregister
if "shared_memory" in resource_tracker._CLEANUP_FUNCS:
del resource_tracker._CLEANUP_FUNCS["shared_memory"]
class CameraClient:
def __init__(self, port):
# Connect to camera server
self.conn = Client(('localhost', port), authkey=CONN_AUTHKEY)
# Set up shared memory block for reading camera images
self.conn.send(None)
data = self.conn.recv()
remove_shm_from_resource_tracker()
self.shm = shared_memory.SharedMemory(name=data['name'])
self.image = np.ndarray(data['shape'], dtype=data['dtype'], buffer=self.shm.buf)
self.conn.send(None)
def get_image(self):
self.conn.recv()
self.conn.send(None)
return self.image
def close(self):
self.shm.close()
def main(args):
camera_client = CameraClient(args.port)
window_name = 'out'
try:
while True:
if args.benchmark:
start_time = time.time()
image = camera_client.get_image()
if not args.benchmark:
cv.imshow(window_name, image)
if cv.waitKey(1) == 27 or cv.getWindowProperty(window_name, cv.WND_PROP_VISIBLE) < 0.5:
break
if args.benchmark:
print(f'{1000 * (time.time() - start_time):.1f} ms')
finally:
camera_client.close()
cv.destroyAllWindows()
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--port', type=int, default=6000)
parser.add_argument('--benchmark', action='store_true')
main(parser.parse_args())