diff --git a/.idea/misc.xml b/.idea/misc.xml index 165dd0c..bcd8661 100644 --- a/.idea/misc.xml +++ b/.idea/misc.xml @@ -1,4 +1,7 @@ + + \ No newline at end of file diff --git a/config/defaults.ini b/config/defaults.ini index 60e8c36..86d35cd 100644 --- a/config/defaults.ini +++ b/config/defaults.ini @@ -9,10 +9,10 @@ node.call = ${mycall}-1 [network] mesh.ttl = 7 mesh.hello.interval = 10 -mesh.dead.interval = 30 -mesh.advert.interval = 30 -mesh.advert.max.age = 100 -mesh.query.interval = 60 +mesh.dead.interval = 40 +mesh.advert.interval = 120 +mesh.advert.max.age = 600 +mesh.query.interval = 120 [app:demo] # The 'app' configs are used to locate, run, and connect the app to the network diff --git a/config/logging.ini b/config/logging.ini index 2db0751..d31a516 100644 --- a/config/logging.ini +++ b/config/logging.ini @@ -51,7 +51,7 @@ args=(sys.stdout,) [handler_node] class=FileHandler -level=NOTSET +level=DEBUG formatter=standard args=('%(log.dir)s/node.log', 'w') @@ -80,7 +80,7 @@ formatter=standard args=('%(log.dir)s/netrom.log', 'w') [formatter_standard] -format=%(levelname)-8s %(asctime)s -- %(message)s +format=%(levelname)-8s %(asctime)s %(threadName)-8s -- %(message)s datefmt= class=logging.Formatter diff --git a/tarpn/application/node.py b/tarpn/application/node.py new file mode 100644 index 0000000..19ae744 --- /dev/null +++ b/tarpn/application/node.py @@ -0,0 +1,188 @@ +import math +import statistics +import time +import urllib +from typing import Optional, Dict + +from flask import Flask, Response, request +from flask.testing import EnvironBuilder +from pyformance import global_registry + +from tarpn.datalink import L2Queuing +from tarpn.network.mesh import MeshAddress +from tarpn.network.mesh.protocol import MeshProtocol +from tarpn.scheduler import Scheduler +from tarpn.settings import Settings +from tarpn.transport import Protocol, Transport +from tarpn.transport.mesh_l4 import MeshTransportManager + + +class TarpnCoreProtocol(Protocol): + def __init__(self, + settings: Settings, port_queues: Dict[int, L2Queuing], + network: MeshProtocol, transport_manager: MeshTransportManager, scheduler: Scheduler): + self.settings = settings + self.port_queues = port_queues + self.network = network + self.transport_manager = transport_manager + self.scheduler = scheduler + + self.transport: Optional[Transport] = None + self.app: Flask = Flask("tarpn-core") + self.register_handlers() + + def register_handlers(self): + @self.app.route("/") + def index(): + return "welcome" + + @self.app.route("/healthcheck") + def healhcheck(): + return "ok" + + @self.app.route("/ports") + def ports(): + port_data = [] + ports_settings = self.settings.port_configs() + for port, queue in self.port_queues.items(): + port_config = ports_settings[port] + port_data.append({ + "id": port, + "name": port_config.port_name(), + "mtu": queue.mtu(), + "qsize": queue.qsize(), + "config": port_config.as_dict() + }) + return {"ports": port_data} + + @self.app.route("/network") + def network(): + out = { + "address": str(self.network.our_address), + "epoch": self.network.our_link_state_epoch + } + neighbors = [] + for addr, neighbor in self.network.neighbors.items(): + neighbors.append({ + "address": str(addr), + "name": neighbor.name, + "last_seen": neighbor.last_seen, + "last_update": neighbor.last_update, + "status": str(neighbor.state) + }) + out["neighbors"] = neighbors + + nodes = [] + for node, link_states in self.network.valid_link_states().items(): + epoch = self.network.link_state_epochs.get(node) + nodes.append({ + "address": str(node), + "epoch": epoch, + "link_states": str(link_states) # TODO fix this + }) + out["nodes"] = nodes + return out + + @self.app.route("/routes") + def routes(): + out = [] + + for node, link_states in self.network.valid_link_states().items(): + epoch = self.network.link_state_epochs.get(node) + path, path_cost = self.network.route_to(node) + if len(path) == 0: + continue + link = self.network.neighbors.get(path[1]).link_id + device_id = self.network.link_multiplexer.get_link_device_id(link) + out.append({ + "address": str(node), + "epoch": epoch, + "path": [str(n) for n in path[1:]], + "port": device_id, + "cost": path_cost + }) + return {"routes": out} + + @self.app.route("/ping", methods=["POST"]) + def ping(): + address = request.args.get("address") + count = int(request.args.get("count", 3)) + run_async = request.args.get("async", None) + timeout = int(request.args.get("timeout", 1000)) + size = int(request.args.get("size", 100)) + + node = MeshAddress.parse(address) + seqs = [] + times = [] + lost = 0 + + def do_ping(): + nonlocal lost + t0 = time.time_ns() + seq = self.network.ping_protocol.send_ping(node, size=size) + found = self.network.ping_protocol.wait_for_ping(node, seq, timeout_ms=timeout) + t1 = time.time_ns() + if found: + dt = int((t1 - t0) / 1000000.) + seqs.append(seq) + times.append(dt) + else: + lost += 1 + + if run_async: + futures = [] + for _ in range(count): + futures.append(self.scheduler.run(do_ping)) + for fut in futures: + fut.result(3000) + else: + for _ in range(count): + do_ping() + + out = { + "address": address, + "seqs": seqs, + "times": times, + "lost": lost + } + if count > 1: + out["min"] = min(times) + out["max"] = max(times) + out["avg"] = statistics.mean(times) + out["stdev"] = statistics.stdev(times) + return out + + @self.app.route("/metrics") + def metrics(): + return global_registry().dump_metrics() + + def connection_made(self, transport: Transport): + self.transport = transport + + def connection_lost(self, exc): + self.transport = None + + def data_received(self, data: bytes): + raw_lines = data.splitlines() + first = raw_lines[0].decode("ascii") + parts = first.split(" ") + verb, path, http_version = parts[0:3] + url_parts = urllib.parse.urlparse(path) + # TODO parse headers? + builder = EnvironBuilder(app=self.app, + path=url_parts.path, + query_string=url_parts.query, + method=verb, + data=b"\r\n".join(raw_lines[2:]), + content_type="text/plain") + env = builder.get_environ() + + with self.app.request_context(env): + resp: Response = self.app.full_dispatch_request() + buf = bytearray() + buf.extend(f"HTTP/1.1 {resp.status}".encode("ISO-8859-1")) + buf.extend(b"\r\n") + buf.extend(str(resp.headers).encode("ISO-8859-1")) + if resp.data: + buf.extend(resp.data) + self.transport.write(buf) # TODO transport was None? diff --git a/tarpn/application/shell.py b/tarpn/application/shell.py index a425ba3..34a314a 100644 --- a/tarpn/application/shell.py +++ b/tarpn/application/shell.py @@ -8,9 +8,9 @@ from tarpn.network.mesh import MeshAddress from tarpn.network.mesh.protocol import MeshProtocol +from tarpn.transport import DatagramProtocol as DProtocol from tarpn.transport import Protocol, Transport, L4Address from tarpn.transport.mesh_l4 import MeshTransportManager -from tarpn.transport import DatagramProtocol as DProtocol class ShellDatagramProtocol(DProtocol): diff --git a/tarpn/datalink/__init__.py b/tarpn/datalink/__init__.py index acb43ac..86e1374 100644 --- a/tarpn/datalink/__init__.py +++ b/tarpn/datalink/__init__.py @@ -52,6 +52,12 @@ def take_inbound(self) -> Tuple[FrameData, int]: """ raise NotImplementedError + def qsize(self) -> int: + raise NotImplementedError + + def mtu(self) -> int: + raise NotImplementedError + def close(self): raise NotImplementedError @@ -104,6 +110,12 @@ def __init__(self, queue_size, max_payload_size): self._lock: threading.Lock = threading.Lock() self._not_empty: threading.Condition = threading.Condition(self._lock) + def qsize(self) -> int: + return self._outbound.maxsize + + def mtu(self) -> int: + return self._max_payload_size + def offer_outbound(self, frame: FrameData) -> bool: if len(frame.data) > self._max_payload_size: self.error(f"Payload too large, dropping. Size is {len(frame.data)}, max is {self._max_payload_size}") @@ -134,6 +146,7 @@ def offer_inbound(self, frame: FrameData) -> None: # Since the deque is bounded, this will eject old items to make room for this frame with self._lock: self.meter("inbound", "offer").mark() + #frame.timer = self.timer("inbound", "wait").time() self._inbound.append((frame, next(self._inbound_count))) self._not_empty.notify() @@ -142,6 +155,7 @@ def take_inbound(self) -> Tuple[FrameData, int]: while not len(self._inbound): self._not_empty.wait() inbound, count = self._inbound.popleft() + #inbound.timer.stop() dropped = count - self._last_inbound - 1 self._last_inbound = count self.meter("inbound", "take").mark() diff --git a/tarpn/datalink/protocol.py b/tarpn/datalink/protocol.py index 716e236..a37043c 100644 --- a/tarpn/datalink/protocol.py +++ b/tarpn/datalink/protocol.py @@ -5,6 +5,7 @@ from tarpn.datalink import FrameData, L2Queuing, L2Address from tarpn.log import LoggingMixin +from tarpn.metrics import MetricsMixin from tarpn.network import L3Payload, L3Queueing from tarpn.scheduler import Scheduler, CloseableThreadLoop from tarpn.util import BackoffGenerator @@ -121,7 +122,7 @@ def remove_link(self, link_id: int) -> None: raise NotImplementedError -class DefaultLinkMultiplexer(LinkMultiplexer): +class DefaultLinkMultiplexer(LinkMultiplexer, MetricsMixin): def __init__(self, queue_factory: Callable[[], L3Queueing], scheduler: Scheduler): self.queue_factory = queue_factory self.link_id_counter = itertools.count() diff --git a/tarpn/io/serial.py b/tarpn/io/serial.py index ab5d75e..6c3d7c1 100644 --- a/tarpn/io/serial.py +++ b/tarpn/io/serial.py @@ -9,26 +9,36 @@ from tarpn.io import IOProtocol from tarpn.log import LoggingMixin +from tarpn.metrics import MetricsMixin from tarpn.scheduler import Scheduler, CloseableThreadLoop from tarpn.util import CountDownLatch, BackoffGenerator +class DummyLock: + def __enter__(self): + return None + + def __exit__(self, _type, value, traceback): + pass + + class SerialLoop(CloseableThreadLoop, ABC): def __init__(self, name: str, ser: serial.Serial, protocol: IOProtocol, - open_event: threading.Event, error_event: threading.Event, closed_latch: CountDownLatch): + open_event: threading.Event, error_event: threading.Event, closed_latch: CountDownLatch, lock: threading.Lock): super().__init__(name=name) self.ser = ser self.protocol = protocol self.open_event = open_event self.error_event = error_event self.closed_latch = closed_latch + self.lock = lock def close(self): super().close() self.closed_latch.countdown() -class SerialReadLoop(SerialLoop, LoggingMixin): +class SerialReadLoop(SerialLoop, LoggingMixin, MetricsMixin): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) LoggingMixin.__init__(self, @@ -38,16 +48,19 @@ def __init__(self, *args, **kwargs): def iter_loop(self): if self.open_event.wait(3.0): try: - data = self.ser.read(1024) - if len(data) > 0: - self.debug(f"Read {len(data)} bytes: {data}") - self.protocol.handle_bytes(data) + with self.lock: + data = self.ser.read(1024) + if len(data) > 0: + self.debug(f"Read {len(data)} bytes: {data}") + self.meter("bytes", "in").mark(len(data)) # TODO add port number + self.protocol.handle_bytes(data) except serial.SerialException: self.exception("Failed to read bytes from serial device") + self.meter("error").mark() self.error_event.set() -class SerialWriteLoop(SerialLoop, LoggingMixin): +class SerialWriteLoop(SerialLoop, LoggingMixin, MetricsMixin): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) LoggingMixin.__init__(self, @@ -64,26 +77,31 @@ def iter_loop(self): to_write = self.protocol.next_bytes_to_write() if to_write is not None and len(to_write) > 0: try: - self.ser.write(to_write) - self.ser.flush() - self.debug(f"Wrote {len(to_write)} bytes: {to_write}") - self.retry_backoff.reset() + with self.lock: + self.ser.write(to_write) + self.ser.flush() + self.meter("bytes", "out").mark(len(to_write)) # TODO add port number + self.debug(f"Wrote {len(to_write)} bytes: {to_write}") + self.retry_backoff.reset() except serial.SerialTimeoutException as e: self.unsent.append(to_write) t = next(self.retry_backoff) self.exception(f"Failed to write bytes to serial device, serial timed out. Retrying after {t}s.", e) + self.meter("timeout").mark() sleep(t) except serial.SerialException as e: self.exception("Failed to write bytes to serial device", e) + self.meter("error").mark() self.error_event.set() -class SerialDevice(CloseableThreadLoop, LoggingMixin): +class SerialDevice(CloseableThreadLoop, LoggingMixin, MetricsMixin): def __init__(self, protocol: IOProtocol, device_name: str, speed: int, timeout: float, + duplex: bool, scheduler: Scheduler): LoggingMixin.__init__(self) CloseableThreadLoop.__init__(self, f"Serial Device {device_name}") @@ -94,14 +112,18 @@ def __init__(self, self._ser = serial.Serial(port=None, baudrate=speed, timeout=timeout, write_timeout=timeout) self._ser.port = device_name self._closed_latch = CountDownLatch(2) + if not duplex: + self._duplex_lock = threading.Lock() + else: + self._duplex_lock = DummyLock() self._open_event = threading.Event() self._error_event = threading.Event() self._open_backoff = BackoffGenerator(0.100, 1.2, 5.000) # Submit the reader and writer threads first, so they will be shutdown first self._scheduler.submit(SerialReadLoop(f"Serial Reader {self._ser.name}", self._ser, - self._protocol, self._open_event, self._error_event, self._closed_latch)) + self._protocol, self._open_event, self._error_event, self._closed_latch, self._duplex_lock)) self._scheduler.submit(SerialWriteLoop(f"Serial Writer {self._ser.name}", self._ser, - self._protocol, self._open_event, self._error_event, self._closed_latch)) + self._protocol, self._open_event, self._error_event, self._closed_latch, self._duplex_lock)) self._scheduler.submit(self) def close(self) -> None: @@ -132,6 +154,7 @@ def iter_loop(self): try: self._ser.open() self.info(f"Opened serial port {self._device_name}") + # TODO metrics self._open_event.set() self._open_backoff.reset() except serial.SerialException as err: diff --git a/tarpn/main.py b/tarpn/main.py index d532ced..9982937 100644 --- a/tarpn/main.py +++ b/tarpn/main.py @@ -3,16 +3,20 @@ import logging.config import os import shutil +import signal import sys from functools import partial +from pyformance.reporters import ConsoleReporter + import tarpn.netrom.router from tarpn.application import TransportMultiplexer, MultiplexingProtocol, ApplicationProtocol from tarpn.application.command import NodeCommandProcessor +from tarpn.application.node import TarpnCoreProtocol from tarpn.application.shell import TarpnShellProtocol from tarpn.ax25 import AX25Call from tarpn.datalink import L2FIFOQueue -from tarpn.datalink.ax25_l2 import AX25Protocol, DefaultLinkMultiplexer, AX25Address +from tarpn.datalink.ax25_l2 import AX25Protocol, DefaultLinkMultiplexer from tarpn.datalink.protocol import L2IOLoop from tarpn.io.kiss import KISSProtocol from tarpn.io.serial import SerialDevice @@ -41,7 +45,7 @@ def main(): parser = argparse.ArgumentParser(description='Decode packets from a serial port') parser.add_argument("config", nargs="?", default="config/node.ini", help="Config file") parser.add_argument("--verbose", action="store_true", help="Enable debug logging") - parser.add_argument("--profile", action="store_true", help="Attache a profiler to the process") + parser.add_argument("--profile", action="store_true", help="Attach a profiler to the process") args = parser.parse_args() if args.profile: import cProfile @@ -112,7 +116,7 @@ def run_node(args): intercept_dests = {} interceptor = lambda frame: None - for port_config in s.port_configs(): + for port_id, port_config in s.port_configs().items(): if port_config.get_boolean("port.enabled") and port_config.get("port.type") == "serial": l2_queueing = L2FIFOQueue(20, AX25Protocol.maximum_frame_size()) port_queues[port_config.port_id()] = l2_queueing @@ -123,6 +127,7 @@ def run_node(args): port_config.get("serial.device"), port_config.get_int("serial.speed"), port_config.get_float("serial.timeout"), + port_config.get_boolean("serial.duplex", True), scheduler) scheduler.submit(L2IOLoop(l2_queueing, l2)) @@ -186,13 +191,23 @@ def run_node(args): sock = node_settings.get("node.sock") print(f"Binding node terminal to {sock}") - scheduler.submit(UnixServerThread(sock, TarpnShellProtocol(mesh_l3, mesh_l4))) + #scheduler.submit(UnixServerThread(sock, TarpnShellProtocol(mesh_l3, mesh_l4))) + scheduler.submit(UnixServerThread(sock, TarpnCoreProtocol(s, port_queues, mesh_l3, mesh_l4, scheduler))) + # Start a metrics reporter - #reporter = ConsoleReporter(reporting_interval=300) - #scheduler.timer(10_000, reporter.start, True) + #reporter = ConsoleReporter(reporting_interval=300, stream=sys.stdout) + #reporter.start() #scheduler.add_shutdown_hook(reporter.stop) + def signal_handler(signum, frame): + logger.info(f"Shutting down due to {signal.Signals(signum).name}") + scheduler.shutdown() + + signal.signal(signal.SIGTERM, signal_handler) + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGSTOP, signal_handler) + logger.info("Finished Startup") try: # Wait for all threads diff --git a/tarpn/metrics.py b/tarpn/metrics.py index 12d36aa..2060aa2 100644 --- a/tarpn/metrics.py +++ b/tarpn/metrics.py @@ -1,29 +1,29 @@ -from typing import Optional +from typing import Optional, Callable from pyformance import global_registry -from pyformance.meters import Meter, Counter, Timer +from pyformance.meters import Meter, Counter, Timer, Histogram, Gauge, CallbackGauge class MetricsMixin: - def _name(self, name): + def get_key(self, name, *extra): pkg = self.__class__.__module__ clazz = self.__class__.__qualname__ - return f"{pkg}:{clazz}:{name}" - - def meter(self, name: str, event: Optional[str] = None) -> Meter: - if event is not None: - return global_registry().meter(self._name(f"{name}.{event}")) - else: - return global_registry().meter(self._name(name)) - - def counter(self, name: str, event: Optional[str] = None) -> Counter: - if event is not None: - return global_registry().counter(self._name(f"{name}.{event}")) - else: - return global_registry().counter(self._name(name)) - - def timer(self, name: str, event: Optional[str] = None) -> Timer: - if event is not None: - return global_registry().timer(self._name(f"{name}.{event}")) - else: - return global_registry().timer(self._name(name)) + name_parts = [name] + name_parts.extend([str(e) for e in extra]) + joined_name = ".".join(name_parts) + return f"{pkg}.{clazz}:{joined_name}" + + def meter(self, name: str, *args) -> Meter: + return global_registry().meter(self.get_key(name, *args)) + + def counter(self, name: str, *args) -> Counter: + return global_registry().counter(self.get_key(name, *args)) + + def timer(self, name: str, *args) -> Timer: + return global_registry().timer(self.get_key(name, *args)) + + def hist(self, name: str, *args) -> Histogram: + return global_registry().histogram(self.get_key(name, *args)) + + def gauge(self, fun: Callable[[], float], name: str, *args) -> Gauge: + return global_registry().gauge(key=self.get_key(name, *args), gauge=CallbackGauge(fun)) \ No newline at end of file diff --git a/tarpn/network/__init__.py b/tarpn/network/__init__.py index 6e23ec5..aadddfd 100644 --- a/tarpn/network/__init__.py +++ b/tarpn/network/__init__.py @@ -3,9 +3,11 @@ from enum import IntEnum from typing import List, Optional, Tuple +from pyformance.meters.timer import TimerContext + from tarpn.datalink import L2Payload from tarpn.log import LoggingMixin -#from tarpn.transport import L4Protocol +from tarpn.metrics import MetricsMixin class QoS(IntEnum): @@ -30,6 +32,7 @@ class L3Payload: link_id: int = field(compare=False) qos: QoS = field(compare=True, default=QoS.Default) reliable: bool = field(compare=False, default=True) + timer: TimerContext = field(compare=False, default=None) class L3Queueing: @@ -40,21 +43,35 @@ def maybe_take(self) -> Optional[L3Payload]: raise NotImplementedError -class L3PriorityQueue(L3Queueing): +class L3PriorityQueue(L3Queueing, MetricsMixin, LoggingMixin): def __init__(self, max_size=20): + LoggingMixin.__init__(self) self._queue = queue.PriorityQueue(max_size) + self.max_size = 0 + self.gauge(self._queue.qsize, "qsize") + self.gauge(self.max_qsize, "max") + + def max_qsize(self) -> int: + return self.max_size def offer(self, packet: L3Payload): try: + packet.timer = self.timer("qtime").time() + self.meter("offer").mark() self._queue.put(packet, False, None) + self.max_size = max(self.max_size, self._queue.qsize()) return True except queue.Full: - print("Full!") + self.meter("full").mark() + self.error(f"Queue full, dropping {packet}") return False def maybe_take(self) -> Optional[L3Payload]: try: - return self._queue.get(True, 1.0) + packet = self._queue.get(True, 1.0) + packet.timer.stop() + self.meter("take").mark() + return packet except queue.Empty: return None diff --git a/tarpn/network/mesh/header.py b/tarpn/network/mesh/header.py index e4e4ba9..add3e00 100644 --- a/tarpn/network/mesh/header.py +++ b/tarpn/network/mesh/header.py @@ -94,10 +94,14 @@ class ControlType(IntEnum): # Up to 0x7F PING = 0x00 LOOKUP = 0x01 + UNREACHABLE = 0x02 def _missing_(cls, value): return None + def __str__(self): + return f"{self.name} ({self.value:02x})" + @dataclass(eq=True, frozen=True) class ControlHeader(Header): @@ -123,6 +127,9 @@ def encode(self, data: BytesIO): ByteUtils.write_uint8(data, self.extra_length) data.write(self.extra) + def __str__(self): + return f"CTRL {str(self.control_type)} req={self.is_request}" + @dataclass(eq=True, frozen=True) class HelloHeader(Header): @@ -147,6 +154,9 @@ def encode(self, data: BytesIO): for neighbor in self.neighbors: ByteUtils.write_uint16(data, neighbor.id) + def __str__(self): + return f"HELLO {self.name} {self.neighbors}" + @dataclass(eq=True, frozen=True) class LinkStateHeader(Header): @@ -171,6 +181,9 @@ def encode(self, data: BytesIO): ByteUtils.write_uint16(data, self.via.id) ByteUtils.write_uint8(data, self.quality) + def __repr__(self): + return f"(node={self.node} via={self.via} q={self.quality})" + @dataclass(eq=True, frozen=True) class LinkStateAdvertisementHeader(Header): @@ -206,6 +219,9 @@ def encode(self, data: BytesIO): for link_state in self.link_states: link_state.encode(data) + def __str__(self): + return f"ADVERT {self.node} epoch={self.epoch} links={self.link_states}" + @dataclass(eq=True, frozen=True) class LinkStateQueryHeader(Header): @@ -237,6 +253,9 @@ def encode(self, data: BytesIO): ByteUtils.write_uint16(data, node.id) ByteUtils.write_int8(data, epoch) + def __str__(self): + return f"QUERY {self.node} {self.epoch}" + class RecordType(IntEnum): # up to 0xFF diff --git a/tarpn/network/mesh/ping.py b/tarpn/network/mesh/ping.py index e00c9f2..f5b8d0b 100644 --- a/tarpn/network/mesh/ping.py +++ b/tarpn/network/mesh/ping.py @@ -12,7 +12,7 @@ from tarpn.network import QoS from tarpn.network.mesh import MeshAddress from tarpn.network.mesh.header import ControlHeader, ControlType, NetworkHeader, Protocol -from tarpn.util import secure_random_byte +from tarpn.util import secure_random_byte, secure_random_data @dataclasses.dataclass @@ -41,8 +41,8 @@ def __init__(self, network): self.stats: Dict[MeshAddress, PingStats] = defaultdict(PingStats) LoggingMixin.__init__(self) - def send_ping(self, node: MeshAddress) -> int: - ctrl = ControlHeader(True, ControlType.PING, 1, bytes([secure_random_byte()])) + def send_ping(self, node: MeshAddress, size: int = 1) -> int: + ctrl = ControlHeader(True, ControlType.PING, size, secure_random_data(size)) network_header = NetworkHeader( version=0, qos=QoS.Higher, @@ -59,7 +59,7 @@ def send_ping(self, node: MeshAddress) -> int: stream.seek(0) buffer = stream.read() self.stats[node].results.append( - PingResult(ctrl.extra[0], time.time_ns(), None, threading.Condition(self.mutex))) + PingResult(ctrl.extra[0], time.time_ns(), None, threading.Condition())) self.network.send(network_header, buffer) return ctrl.extra[0] @@ -81,6 +81,7 @@ def handle_ping(self, network_header: NetworkHeader, ctrl: ControlHeader): now = time.time_ns() result = self.stats.get(network_header.source).get(ctrl.extra[0]) if result is not None: + self.debug(f"Got ping response {ctrl}") result.end_ns = now with result.cond: result.cond.notify_all() @@ -95,7 +96,7 @@ def wait_for_ping(self, node: MeshAddress, seq: int, timeout_ms: int) -> Optiona if done: return result else: - self.warning(f"Timed out waiting for ping {seq} from {node}") + self.warning(f"Timed out waiting for ping {seq} from {node} after {timeout_ms}ms") return None else: self.warning(f"No ping from {node} found with seq {seq}") diff --git a/tarpn/network/mesh/protocol.py b/tarpn/network/mesh/protocol.py index fbe71b3..1653f75 100644 --- a/tarpn/network/mesh/protocol.py +++ b/tarpn/network/mesh/protocol.py @@ -3,7 +3,7 @@ import queue import threading import time -from datetime import datetime +from datetime import datetime, timedelta from functools import partial from io import BytesIO from typing import Tuple, List, Dict, Optional, Set, cast @@ -15,6 +15,7 @@ from tarpn.datalink.ax25_l2 import AX25Address from tarpn.datalink.protocol import LinkMultiplexer from tarpn.log import LoggingMixin +from tarpn.metrics import MetricsMixin from tarpn.network import L3Protocol, L3Address, L3Payload, QoS from tarpn.network.mesh import MeshAddress from tarpn.network.mesh.header import Protocol, NetworkHeader, Header, HelloHeader, LinkStateHeader, \ @@ -50,6 +51,9 @@ class NeighborState(enum.Enum): INIT = 2 # We've seen HELLO UP = 3 # We've seen ourselves in a HELLO + def __repr__(self): + return self.name + @dataclasses.dataclass class Neighbor: @@ -62,7 +66,7 @@ class Neighbor: state: NeighborState -class MeshProtocol(CloseableThreadLoop, L3Protocol, LoggingMixin): +class MeshProtocol(CloseableThreadLoop, L3Protocol, LoggingMixin, MetricsMixin): """ A simple protocol for a partially connected mesh network. @@ -87,8 +91,8 @@ def __init__(self, link_multiplexer: LinkMultiplexer, l4_handlers: L4Handlers, scheduler: Scheduler): - LoggingMixin.__init__(self, extra_func=self.log_ident) CloseableThreadLoop.__init__(self, name="MeshNetwork") + LoggingMixin.__init__(self, extra_func=self.log_ident) self.time = time self.config = config @@ -124,14 +128,21 @@ def __init__(self, self.last_hello_time = datetime.fromtimestamp(0) self.last_advert = datetime.utcnow() - self.last_query = datetime.utcnow() + self.last_query = datetime.utcnow() # TODO keep track of these per-neighbor self.last_epoch_bump = datetime.utcnow() + self.gauge(self._get_queue_idle_value, "queue", "idle", "ratio") + self.gauge(self.get_current_epoch, "epoch") + + # Start this thread a little bit in the future self.scheduler.timer(1_000, partial(self.scheduler.submit, self), True) def __repr__(self): return f"" + def get_current_epoch(self) -> int: + return self.our_link_state_epoch + def log_ident(self) -> str: return f"[MeshProtocol {self.our_address}]" @@ -186,27 +197,44 @@ def close(self): CloseableThreadLoop.close(self) def iter_loop(self) -> bool: + loop_t0 = time.time_ns() + # Check if we need to take some periodic action like sending a HELLO now = datetime.utcnow() deadline = self.deadline(now) + # self.debug(f"Getting next event with deadline {deadline}") # Now wait at most the deadline for the next action for new incoming packets try: + self.meter("queue", "deadline").mark(deadline) + qtimer = self.timer("queue", "block").time() event = self.queue.get(block=True, timeout=deadline) + qtimer.stop() if event is not None: + timer = self.timer("queue", "process").time() self.process_incoming(event) + timer.stop() return True except queue.Empty: return False + def _get_queue_idle_value(self) -> float: + """Derived metric for L3 queue idle time percentage""" + busy = self.timer("queue", "process").get_mean() + idle = self.timer("queue", "block").get_mean() + if busy == 0 or idle == 0: + return 0.0 + else: + return 1.0 * idle / (busy + idle) + def deadline(self, now: datetime) -> int: # TODO use time.time_ns instead of datetime return min([ self.check_dead_neighbors(now), self.check_hello(now), self.check_epoch(now), - self.check_advert(now), - self.check_query(now) + self.check_query(now), + self.check_advert(now) ]) def wakeup(self): @@ -215,9 +243,11 @@ def wakeup(self): def bump_epoch(self, now: datetime, reason: str = ""): if now != self.last_epoch_bump: + self.meter("epoch", "bump").mark() self.our_link_state_epoch = next(self.our_link_state_epoch_generator) self.last_epoch_bump = now self.debug(f"Bumping our epoch to {self.our_link_state_epoch} due to {reason}") + self.last_advert = datetime.fromtimestamp(0) # Force our advert to go out def check_dead_neighbors(self, now: datetime) -> int: min_deadline = self.config.get_int("mesh.dead.interval") @@ -228,10 +258,9 @@ def check_dead_neighbors(self, now: datetime) -> int: deadline = self.config.get_int("mesh.dead.interval") - (now - neighbor.last_seen).seconds if deadline <= 0: self.info(f"Neighbor {neighbor.address} detected as DOWN!") - + self.meter(f"neighbors.{neighbor.address}.down").mark() neighbor.state = NeighborState.DOWN self.bump_epoch(datetime.utcnow(), "dead neighbor") - self.last_advert = datetime.fromtimestamp(0) # Force our advert to go out else: min_deadline = min(deadline, min_deadline) return min_deadline @@ -264,7 +293,6 @@ def check_epoch(self, now: datetime) -> int: deadline = int(max_age * .80) - (now - self.last_epoch_bump).seconds if deadline <= 0: self.bump_epoch(now, "max epoch age") - self.send_advertisement() return int(max_age * .80) else: return deadline @@ -304,14 +332,24 @@ def process_incoming(self, payload: L2Payload): self.error(f"Could not decode network packet from {payload}.", e) return + # Collect some metrics + self.meter("protocols", "in", network_header.protocol.name).mark() + self.meter("bytes", "in").mark(len(payload.l3_data)) + self.meter("packets", "in").mark() + if network_header.destination in (self.our_address, self.BroadcastAddress): + self.meter("source", str(network_header.source), "in").mark() + # Handle L3 protocols first if network_header.destination == self.our_address and network_header.protocol == Protocol.CONTROL: ctrl = ControlHeader.decode(stream) - self.info(f"Got {ctrl} from {network_header.source}") + self.debug(f"< {network_header.source} {network_header.destination} {ctrl}") if ctrl.control_type == ControlType.PING: self.ping_protocol.handle_ping(network_header, ctrl) + elif ctrl.control_type == ControlType.UNREACHABLE: + self.warning(f"< UNREACHABLE") else: self.warning(f"Ignoring unsupported control packet: {ctrl.control_type}") + self.meter("drop", "unknown").mark() return if network_header.protocol == Protocol.HELLO: @@ -328,17 +366,19 @@ def process_incoming(self, payload: L2Payload): # Now decide if we should handle or drop if self.header_cache.contains(hash(network_header)): + self.meter("drop", "duplicate").mark() self.debug(f"Dropping duplicate {network_header}") return # If the packet is addressed to us, handle it if network_header.destination == self.our_address: - self.debug(f"Handling {network_header}") + # TODO check for supported protocols here? + self.debug(f"Handling L4 {network_header}") self.l4_handlers.handle_l4(network_header, network_header.protocol, stream) return if network_header.destination == self.BroadcastAddress: - self.debug(f"Handling broadcast {network_header}") + self.debug(f"Handling L4 broadcast {network_header}") self.l4_handlers.handle_l4(network_header, network_header.protocol, stream) if network_header.ttl > 1: # Decrease the TTL and re-broadcast on all links except where we heard it @@ -348,8 +388,12 @@ def process_incoming(self, payload: L2Payload): stream.seek(0) self.send(header_copy, stream.read(), exclude_link_id=payload.link_id) else: + self.meter("drop", "ttl").mark() self.debug("Not re-broadcasting due to TTL") else: + # If not addressed to us, forward it along + self.meter("packets", "forward").mark() + self.debug(f"> {network_header.source} {network_header.destination} {network_header.protocol.name} Forwarding") header_copy = dataclasses.replace(network_header, ttl=network_header.ttl - 1) stream.seek(0) header_copy.encode(stream) @@ -357,7 +401,7 @@ def process_incoming(self, payload: L2Payload): self.send(header_copy, stream.read(), exclude_link_id=payload.link_id) def handle_hello(self, link_id: int, network_header: NetworkHeader, hello: HelloHeader): - self.debug(f"Handling hello {hello}") + self.debug(f"< {hello}") now = datetime.utcnow() sender = network_header.source if network_header.source not in self.neighbors: @@ -371,27 +415,28 @@ def handle_hello(self, link_id: int, network_header: NetworkHeader, hello: Hello last_update=now, state=NeighborState.INIT ) - self.bump_epoch(now, "new neighbor") else: self.neighbors[sender].neighbors = hello.neighbors self.neighbors[sender].last_seen = now if self.our_address in hello.neighbors: - delay = 100 if self.neighbors[sender].state != NeighborState.UP: self.info(f"Neighbor {sender} ({self.neighbors[sender].name}) is UP!") - self.scheduler.timer(delay, partial(self.send_query, sender), auto_start=True) + # Set last_query to the max query interval minus a bit + self.last_query = now - timedelta(seconds=(self.config.get_int("mesh.query.interval") - 10)) + self.meter(f"neighbors.{sender}.up").mark() self.neighbors[sender].state = NeighborState.UP self.neighbors[sender].last_update = now - delay *= 1.2 + self.bump_epoch(now, "new neighbor") else: self.debug(f"Neighbor {sender} is initializing...") self.neighbors[sender].state = NeighborState.INIT self.neighbors[sender].last_update = now + self.send_hello() def send_hello(self): - self.debug("Sending Hello") hello = HelloHeader(self.config.get("host.name"), self.alive_neighbors()) + self.debug(f"> {hello}") network_header = NetworkHeader( version=0, qos=QoS.Lower, @@ -414,6 +459,8 @@ def handle_advertisement(self, link_id: int, network_header: NetworkHeader, adve if advert.node == self.our_address: return + self.debug(f"< {advert}") + latest_epoch = self.link_state_epochs.get(advert.node) if latest_epoch is None: self.info(f"New link for node {advert.node}") @@ -468,7 +515,7 @@ def send_advertisement(self): link_states=link_states ) - self.debug("Sending Advertisement {}".format(advertisement)) + self.debug(f"> {advertisement}") network_header = NetworkHeader( version=0, @@ -489,7 +536,7 @@ def send_advertisement(self): self.send(network_header, buffer) def handle_query(self, link_id: int, network_header: NetworkHeader, query: LinkStateQueryHeader): - self.debug(f"Handling {query}") + self.debug(f"< {query}") dest = network_header.source adverts = [] @@ -536,7 +583,7 @@ def handle_query(self, link_id: int, network_header: NetworkHeader, query: LinkS dest = network_header.source for advert in adverts: - self.debug(f"Sending {advert} advert to {network_header.source}") + self.debug(f"> {advert}") resp_header = NetworkHeader( version=0, qos=QoS.Lower, @@ -556,7 +603,6 @@ def handle_query(self, link_id: int, network_header: NetworkHeader, query: LinkS self.send(resp_header, buffer) def send_query(self, neighbor: MeshAddress): - self.debug(f"Querying {neighbor} for link states") known_link_states = dict() for node, link_states in self.valid_link_states().items(): known_link_states[node] = self.link_state_epochs[node] @@ -567,6 +613,7 @@ def send_query(self, neighbor: MeshAddress): link_nodes=list(known_link_states.keys()), link_epochs=list(known_link_states.values()) ) + self.debug(f"> {query}") network_header = NetworkHeader( version=0, @@ -586,7 +633,7 @@ def send_query(self, neighbor: MeshAddress): buffer = stream.read() self.send(network_header, buffer) - def route_to(self, address: MeshAddress) -> List[MeshAddress]: + def route_to(self, address: MeshAddress) -> Tuple[List[MeshAddress], int]: g = nx.DiGraph() # Other's links for node, link_states in self.valid_link_states().items(): @@ -600,12 +647,12 @@ def route_to(self, address: MeshAddress) -> List[MeshAddress]: try: # Compute the shortest path path = nx.dijkstra_path(g, self.our_address, address) - + path_weight = nx.path_weight(g, path, "weight") # Ensure we have a return path nx.dijkstra_path(g, address, self.our_address) - return path + return path, path_weight except NetworkXException: - return [] + return [], 0 def send(self, header: NetworkHeader, buffer: bytes, exclude_link_id: Optional[int] = None): """ @@ -623,21 +670,31 @@ def send(self, header: NetworkHeader, buffer: bytes, exclude_link_id: Optional[i neighbor = self.neighbors.get(header.destination) links = [neighbor.link_id] else: - best_route = self.route_to(header.destination) - self.debug(f"Routing {header} via {best_route}") + best_route, cost = self.route_to(header.destination) + self.debug(f"Routing {header} via {best_route} with cost {cost}") if len(best_route) > 1: next_hop = best_route[1] hop_neighbor = self.neighbors.get(next_hop) if hop_neighbor is not None: links = [hop_neighbor.link_id] + self.hist("route", "cost").add(cost) + self.hist("route", "cost", str(header.destination)).add(cost) else: self.error(f"Calculated route including {next_hop}, but we're missing that neighbor.") links = [] else: self.warning(f"No route to {header.destination}, dropping.") + self.meter("drop", "no-route").mark() + self.meter("drop", "no-route", str(header.destination)).mark() links = [] for link_id in links: + self.meter("protocols", "out", header.protocol.name).mark() + self.meter("bytes", "out").mark(len(buffer)) + self.meter("packets", "out").mark() + self.hist("packets", "ttl").add(header.ttl) + self.meter("source", str(header.source), "out").mark() + self.meter("dest", str(header.destination), "out").mark() payload = L3Payload( source=header.source, destination=header.destination, @@ -646,7 +703,7 @@ def send(self, header: NetworkHeader, buffer: bytes, exclude_link_id: Optional[i link_id=link_id, qos=QoS(header.qos), reliable=False) - self.debug(f"Sending {payload}") + #self.debug(f"Sending {payload}") self.link_multiplexer.offer(payload) def failed_send(self, neighbor: MeshAddress): @@ -662,7 +719,7 @@ def route_packet(self, address: L3Address) -> Tuple[bool, int]: if address == MeshProtocol.BroadcastAddress: return True, l3_mtu else: - path = self.route_to(cast(MeshAddress, address)) + path, cost = self.route_to(cast(MeshAddress, address)) return len(path) > 0, l3_mtu def send_packet(self, payload: L3Payload) -> bool: diff --git a/tarpn/scheduler.py b/tarpn/scheduler.py index 36936b4..f909e09 100644 --- a/tarpn/scheduler.py +++ b/tarpn/scheduler.py @@ -59,8 +59,10 @@ def submit(self, thread: CloseableThread): thread.start() self.threads.append(thread) - def run(self, runnable: Callable[..., Any]): - self._futures.append(self.executor.submit(runnable)) + def run(self, runnable: Callable[..., Any]) -> Future: + fut = self.executor.submit(runnable) + self._futures.append(fut) + return fut def add_shutdown_hook(self, runnable: Callable[..., Any]): self.shutdown_tasks.append(runnable) diff --git a/tarpn/settings.py b/tarpn/settings.py index ee700eb..7461970 100644 --- a/tarpn/settings.py +++ b/tarpn/settings.py @@ -23,7 +23,8 @@ _default_port_settings = { "port.enabled": True, - "serial.timeout": 0.100 + "serial.timeout": 0.100, + "serial.duplex": True } @@ -38,81 +39,6 @@ def _default_basedir(app_name): return os.path.expanduser(os.path.join("~", "." + app_name.lower())) -class Settings: - def __init__(self, basedir: str = None, paths: List[str] = None, defaults: Dict = None): - self._init_basedir(basedir) - self._configfiles = [os.path.join(self._basedir, path) for path in paths] - self._config: Optional[configparser.ConfigParser] = None - if defaults is None: - self._defaults = dict() - else: - self._defaults = defaults - self.load() - - def _init_basedir(self, basedir): - if basedir is not None: - self._basedir = basedir - else: - self._basedir = _default_basedir("TARPN") - - if not os.path.isdir(self._basedir): - try: - os.makedirs(self._basedir) - except Exception: - print(f"Could not create base folder at {self._basedir}. This is a fatal error, TARPN " - "cannot run without a writable base folder.") - raise - - def load(self): - self._config = configparser.ConfigParser(defaults=self._defaults, - interpolation=configparser.ExtendedInterpolation(), - inline_comment_prefixes=";", - default_section="default") - self._config.read_dict(_default_settings) - for path in self._configfiles: - if os.path.exists(path): - self._config.read(path) - else: - raise RuntimeError(f"No such config file {path}") - - def save(self): - # self._config.write() - return - - def node_config(self): - return NodeConfig(self._config["node"]) - - def port_configs(self): - ports = [] - for section in self._config.sections(): - m = re.match(r"port:(\d+)", section) - if m: - ports.append(int(m.group(1))) - port_configs = [] - for port in ports: - port_sect = self._config[f"port:{port}"] - port_configs.append(PortConfig.from_dict(port, port_sect)) - return port_configs - - def network_configs(self): - return NetworkConfig(self._config["network"]) - - def app_configs(self): - apps = [] - for section in self._config.sections(): - m = re.match(r"app:(\w[\w\d]*)", section) - if m: - apps.append(m.group(1)) - app_configs = [] - for app in apps: - app_sect = self._config[f"app:{app}"] - app_configs.append(AppConfig.from_dict(app, app_sect)) - return app_configs - - def config_section(self, name): - return Config(name, self._config[name]) - - class Config(Mapping): def __init__(self, section_name, config_section): self._section = section_name @@ -187,14 +113,17 @@ def admin_listen(self) -> str: class PortConfig(Config): - def __init__(self, port_id, port_config): + def __init__(self, port_id: int, port_config): super().__init__(f"port:{port_id}", port_config) self._port_id = port_id - def port_id(self): + def port_id(self) -> int: return self._port_id - def port_type(self): + def port_name(self) -> str: + return super().get("port.name", f"port-{self._port_id}") + + def port_type(self) -> str: return super().get("port.type") @classmethod @@ -264,3 +193,78 @@ def from_dict(cls, app_name: str, configs: dict): parser.read_dict({f"app:{app_name}": configs}) config = parser[f"app:{app_name}"] return cls(app_name, config) + + +class Settings: + def __init__(self, basedir: str = None, paths: List[str] = None, defaults: Dict = None): + self._init_basedir(basedir) + self._configfiles = [os.path.join(self._basedir, path) for path in paths] + self._config: Optional[configparser.ConfigParser] = None + if defaults is None: + self._defaults = dict() + else: + self._defaults = defaults + self.load() + + def _init_basedir(self, basedir): + if basedir is not None: + self._basedir = basedir + else: + self._basedir = _default_basedir("TARPN") + + if not os.path.isdir(self._basedir): + try: + os.makedirs(self._basedir) + except Exception: + print(f"Could not create base folder at {self._basedir}. This is a fatal error, TARPN " + "cannot run without a writable base folder.") + raise + + def load(self): + self._config = configparser.ConfigParser(defaults=self._defaults, + interpolation=configparser.ExtendedInterpolation(), + inline_comment_prefixes=";", + default_section="default") + self._config.read_dict(_default_settings) + for path in self._configfiles: + if os.path.exists(path): + self._config.read(path) + else: + raise RuntimeError(f"No such config file {path}") + + def save(self): + # self._config.write() + return + + def node_config(self): + return NodeConfig(self._config["node"]) + + def port_configs(self) -> Dict[int, PortConfig]: + ports = [] + for section in self._config.sections(): + m = re.match(r"port:(\d+)", section) + if m: + ports.append(int(m.group(1))) + port_configs = {} + for port in ports: + port_sect = self._config[f"port:{port}"] + port_configs[port] = PortConfig.from_dict(port, port_sect) + return port_configs + + def network_configs(self): + return NetworkConfig(self._config["network"]) + + def app_configs(self): + apps = [] + for section in self._config.sections(): + m = re.match(r"app:(\w[\w\d]*)", section) + if m: + apps.append(m.group(1)) + app_configs = [] + for app in apps: + app_sect = self._config[f"app:{app}"] + app_configs.append(AppConfig.from_dict(app, app_sect)) + return app_configs + + def config_section(self, name): + return Config(name, self._config[name]) diff --git a/tarpn/util.py b/tarpn/util.py index ecfbe59..58f9f76 100644 --- a/tarpn/util.py +++ b/tarpn/util.py @@ -433,4 +433,8 @@ def lollipop_compare(old_epoch: int, new_epoch: int) -> int: def secure_random_byte() -> int: - return struct.unpack(' bytes: + return secrets.token_bytes(size) \ No newline at end of file diff --git a/tests/docker/Dockerfile b/tests/docker/Dockerfile index 21ff1fd..4d2d331 100644 --- a/tests/docker/Dockerfile +++ b/tests/docker/Dockerfile @@ -1,16 +1,22 @@ FROM python:3.7-slim-bullseye -RUN apt-get update; apt-get install -y socat netcat iproute2 dnsutils +RUN apt-get update; apt-get install -y socat netcat iproute2 dnsutils curl jq procps vim + +RUN curl -sSL https://get.docker.com/ | sh RUN pip install virtualenv RUN python3 -m virtualenv /opt/tarpn +WORKDIR /opt/tarpn + +RUN ./bin/pip install pytest pytest-timeout + COPY dist /dist -COPY tests/docker/bin /opt/tarpn/bin +COPY tests/docker/bin ./bin -WORKDIR /opt/tarpn +COPY tests/docker/test_curl.py ./tests/test_curl.py RUN ./bin/pip install /dist/*.whl diff --git a/tests/docker/bin/launch-tarpn.sh b/tests/docker/bin/launch-tarpn.sh index 6de77b7..10a2009 100755 --- a/tests/docker/bin/launch-tarpn.sh +++ b/tests/docker/bin/launch-tarpn.sh @@ -4,11 +4,13 @@ # The arguments for the socat commands are given as SOCAT_ARGS delimited by a pipe (|). IFS="|" +i=0 for SOCAT_ARG in $SOCAT_ARGS do IFS=" " - socat $SOCAT_ARG & + socat -x -d -d $SOCAT_ARG > /var/log/socat-${i}.log 2>&1 & IFS="|" + i=$((i+1)) done if [[ -v SLEEP ]]; @@ -16,4 +18,15 @@ then sleep $SLEEP fi -/opt/tarpn/bin/tarpn-node +cleanup() { + killall socat +} + +trap 'cleanup' SIGTERM + +if [[ -v DEBUG ]]; +then + exec env PYTHONFAULTHANDLER=true /opt/tarpn/bin/tarpn-node --verbose +else + exec /opt/tarpn/bin/tarpn-node +fi diff --git a/tests/docker/bin/tc.sh b/tests/docker/bin/tc.sh new file mode 100755 index 0000000..c08ada3 --- /dev/null +++ b/tests/docker/bin/tc.sh @@ -0,0 +1,42 @@ +#!/bin/bash + +OPTS="" + +if [[ -z DURATION ]]; +then + echo "Missing DURATION" + exit 1 +fi + +if [[ -v LOSS ]]; +then + OPTS="$OPTS loss $LOSS" +fi + +if [[ -v DELAY ]]; +then + OPTS="$OPTS delay ${DELAY}ms ${DELAY_JITTER:-10}ms ${DELAY_CORRELATION:-20}.00" +fi + +OPTS="$OPTS rate ${RATE:-1}kbit" + +docker ps + +for CONTAINER in $CONTAINERS +do + echo "Running on $CONTAINER: tc qdisc add dev eth0 root netem $OPTS " + docker exec $CONTAINER tc qdisc add dev eth0 root netem $OPTS +done + +cleanup() { + echo "cleanup!" + for CONTAINER in $CONTAINERS + do + echo "Running on $CONTAINER: tc qdisc del dev eth0 root netem" + docker exec $CONTAINER tc qdisc del dev eth0 root netem + done +} + +trap 'cleanup' EXIT + +sleep $DURATION \ No newline at end of file diff --git a/tests/docker/config/alice.ini b/tests/docker/config/alice.ini index a1a01c4..a5b1c58 100644 --- a/tests/docker/config/alice.ini +++ b/tests/docker/config/alice.ini @@ -2,36 +2,26 @@ mycall=AL1CE [node] -log.dir = /tmp/tarpn-logs-alice +log.dir = logs log.config = config/logging.ini node.call = ${mycall}-9 node.alias = ALICE node.sock = /tmp/socks/tarpn-shell-alice.sock - [port:1] port.enabled = True port.type = serial port.framing = kiss port.bitrate = 9600 kiss.checksum = false -serial.device = /tmp/vmodem_A0 -serial.speed = 9600 - -[port:2] -port.enabled = True -port.type = serial -port.framing = kiss -port.bitrate = 9600 -kiss.checksum = false -serial.device = /tmp/vmodem_B1 +serial.device = /tmp/vmodem1 serial.speed = 9600 [network] host.name = alice mesh.enabled = True -mesh.address = 00.ab +mesh.address = 00.aa mesh.ttl = 7 [app:demo] diff --git a/tests/docker/config/bob.ini b/tests/docker/config/bob.ini index e0524c0..8aa786c 100644 --- a/tests/docker/config/bob.ini +++ b/tests/docker/config/bob.ini @@ -2,27 +2,35 @@ mycall=B0B [node] -log.dir = /tmp/tarpn-logs-bob +log.dir = logs log.config = config/logging.ini node.call = ${mycall}-9 node.alias = BOB node.sock = /tmp/socks/tarpn-shell-bob.sock - [port:1] port.enabled = True port.type = serial port.framing = kiss port.bitrate = 9600 kiss.checksum = false -serial.device = /tmp/vmodem_A1 +serial.device = /tmp/vmodem1 +serial.speed = 9600 + +[port:2] +port.enabled = True +port.type = serial +port.framing = kiss +port.bitrate = 9600 +kiss.checksum = false +serial.device = /tmp/vmodem2 serial.speed = 9600 [network] host.name = bob mesh.enabled = True -mesh.address = 00.aa +mesh.address = 00.ab mesh.ttl = 7 diff --git a/tests/docker/config/carol.ini b/tests/docker/config/carol.ini index 9b3dc9e..7c1c690 100644 --- a/tests/docker/config/carol.ini +++ b/tests/docker/config/carol.ini @@ -2,35 +2,38 @@ mycall=C4ROL [node] -log.dir = /tmp/tarpn-logs-carol +log.dir = logs log.config = config/logging.ini node.call = ${mycall}-9 node.alias = CAROL -node.sock = /tmp/tarpn-shell-carol.sock +node.sock = /tmp/socks/tarpn-shell-carol.sock [port:1] port.enabled = True port.type = serial port.framing = kiss +port.bitrate = 9600 kiss.checksum = false -serial.device = /tmp/vmodem_B0 +serial.device = /tmp/vmodem1 serial.speed = 9600 [port:2] -port.enabled = False +port.enabled = True port.type = serial port.framing = kiss +port.bitrate = 9600 kiss.checksum = false -serial.device = /tmp/vmodem_C1 +serial.device = /tmp/vmodem2 serial.speed = 9600 [port:3] -port.enabled = False +port.enabled = True port.type = serial port.framing = kiss +port.bitrate = 9600 kiss.checksum = false -serial.device = /tmp/vmodem_G1 +serial.device = /tmp/vmodem3 serial.speed = 9600 [network] diff --git a/tests/docker/config/dave.ini b/tests/docker/config/dave.ini new file mode 100644 index 0000000..96ba209 --- /dev/null +++ b/tests/docker/config/dave.ini @@ -0,0 +1,33 @@ +[default] +mycall=D4VE + +[node] +log.dir = logs +log.config = config/logging.ini + +node.call = ${mycall}-9 +node.alias = DAVE +node.sock = /tmp/socks/tarpn-shell-dave.sock + +[port:1] +port.enabled = True +port.type = serial +port.framing = kiss +port.bitrate = 9600 +kiss.checksum = false +serial.device = /tmp/vmodem1 +serial.speed = 9600 + +[network] +host.name = dave +mesh.enabled = True +mesh.address = 00.ad +mesh.ttl = 7 + +[app:demo] +app.address = mesh://ff.ff:100 +app.call = ${mycall}-7 +app.alias = DEMO +app.sock = /tmp/socks/tarpn-demo-d.sock +app.module = plugins.demo +app.class = DemoApp diff --git a/tests/docker/config/eve.ini b/tests/docker/config/eve.ini new file mode 100644 index 0000000..c77563a --- /dev/null +++ b/tests/docker/config/eve.ini @@ -0,0 +1,33 @@ +[default] +mycall=E4E + +[node] +log.dir = logs +log.config = config/logging.ini + +node.call = ${mycall}-9 +node.alias = EVE +node.sock = /tmp/socks/tarpn-shell-eve.sock + +[port:1] +port.enabled = True +port.type = serial +port.framing = kiss +port.bitrate = 9600 +kiss.checksum = false +serial.device = /tmp/vmodem1 +serial.speed = 9600 + +[network] +host.name = eve +mesh.enabled = True +mesh.address = 00.ae +mesh.ttl = 7 + +[app:demo] +app.address = mesh://ff.ff:100 +app.call = ${mycall}-7 +app.alias = DEMO +app.sock = /tmp/socks/tarpn-demo-e.sock +app.module = plugins.demo +app.class = DemoApp diff --git a/tests/docker/config/network.yml b/tests/docker/config/network.yml deleted file mode 100644 index e69de29..0000000 diff --git a/tests/docker/docker-compose-add-latency.yml b/tests/docker/docker-compose-add-latency.yml new file mode 100644 index 0000000..6a2e0ef --- /dev/null +++ b/tests/docker/docker-compose-add-latency.yml @@ -0,0 +1,14 @@ +services: + tc: + image: tarpn/tarpn-test:latest + environment: + - DURATION=300 + #- LOSS=5 + - DELAY=100 + - CONTAINERS=alice bob carol dave eve + cap_add: + - NET_ADMIN + volumes: + - /tmp/socks/:/tmp/socks/ + - /var/run/docker.sock:/var/run/docker.sock + entrypoint: /opt/tarpn/bin/tc.sh diff --git a/tests/docker/docker-compose-curl-tests.yml b/tests/docker/docker-compose-curl-tests.yml new file mode 100644 index 0000000..c1a2152 --- /dev/null +++ b/tests/docker/docker-compose-curl-tests.yml @@ -0,0 +1,10 @@ +services: + test: + container_name: test + depends_on: + alice: + condition: service_healthy + image: tarpn/tarpn-test:latest + volumes: + - /tmp/socks/:/tmp/socks/ + command: /opt/tarpn/bin/pytest /opt/tarpn/tests/test_curl.py diff --git a/tests/docker/docker-compose.yml b/tests/docker/docker-compose.yml index f92b32b..57af3c6 100644 --- a/tests/docker/docker-compose.yml +++ b/tests/docker/docker-compose.yml @@ -1,27 +1,76 @@ services: alice: + # Connected to bob image: tarpn/tarpn-test:latest + container_name: alice + cap_add: + - NET_ADMIN environment: - - SOCAT_ARGS=PTY,raw,echo=1,link=/tmp/vmodem_A0 udp:bob:12345|PTY,raw,echo=1,link=/tmp/vmodem_B1 udp-listen:10000 - - SLEEP=3 + - SOCAT_ARGS=PTY,raw,echo=1,link=/tmp/vmodem1 udp-listen:12345 + - SLEEP=1 + - DEBUG=true volumes: - ./config/alice.ini:/opt/tarpn/config/node.ini - /tmp/socks/:/tmp/socks/ command: /opt/tarpn/bin/launch-tarpn.sh + healthcheck: + test: [ "CMD", "curl", "--unix-socket", "/tmp/socks/tarpn-shell-alice.sock", "http://dummy/healthcheck" ] + interval: 10s + timeout: 5s + retries: 5 bob: + # Connected to alice and carol image: tarpn/tarpn-test:latest + container_name: bob + cap_add: + - NET_ADMIN environment: - - SOCAT_ARGS=PTY,raw,echo=1,link=/tmp/vmodem_A1 udp-listen:12345 + - SOCAT_ARGS=PTY,raw,echo=1,link=/tmp/vmodem1 udp:alice:12345|PTY,raw,echo=1,link=/tmp/vmodem2 udp:carol:10000 + - SLEEP=2 + - DEBUG=true volumes: - ./config/bob.ini:/opt/tarpn/config/node.ini - /tmp/socks/:/tmp/socks/ command: /opt/tarpn/bin/launch-tarpn.sh carol: + # Connected to alice and dave and eve image: tarpn/tarpn-test:latest + container_name: carol + cap_add: + - NET_ADMIN environment: - - SOCAT_ARGS=PTY,raw,echo=1,link=/tmp/vmodem_B0 udp:alice:10000 - - SLEEP=1 + - SOCAT_ARGS=PTY,raw,echo=1,link=/tmp/vmodem1 udp-listen:10000|PTY,raw,echo=1,link=/tmp/vmodem2 udp-listen:10001|PTY,raw,echo=1,link=/tmp/vmodem3 udp:eve:10002 + - SLEEP=3 + - DEBUG=true volumes: - ./config/carol.ini:/opt/tarpn/config/node.ini - /tmp/socks/:/tmp/socks/ command: /opt/tarpn/bin/launch-tarpn.sh + dave: + # Connected to carol + image: tarpn/tarpn-test:latest + container_name: dave + cap_add: + - NET_ADMIN + environment: + - SOCAT_ARGS=PTY,raw,echo=1,link=/tmp/vmodem1 udp:carol:10001 + - SLEEP=1 + - DEBUG=true + volumes: + - ./config/dave.ini:/opt/tarpn/config/node.ini + - /tmp/socks/:/tmp/socks/ + command: /opt/tarpn/bin/launch-tarpn.sh + eve: + # Connected to carol + image: tarpn/tarpn-test:latest + container_name: eve + cap_add: + - NET_ADMIN + environment: + - SOCAT_ARGS=PTY,raw,echo=1,link=/tmp/vmodem1 udp-listen:10002 + - SLEEP=2 + - DEBUG=true + volumes: + - ./config/eve.ini:/opt/tarpn/config/node.ini + - /tmp/socks/:/tmp/socks/ + command: /opt/tarpn/bin/launch-tarpn.sh \ No newline at end of file diff --git a/tests/docker/smoke-test.bats b/tests/docker/smoke-test.bats deleted file mode 100644 index e69de29..0000000 diff --git a/tests/docker/test_curl.py b/tests/docker/test_curl.py new file mode 100644 index 0000000..9a73895 --- /dev/null +++ b/tests/docker/test_curl.py @@ -0,0 +1,41 @@ +import json +import shlex +import subprocess +import unittest + +import pytest + + +def curl(sock: str, url: str): + command = f"curl --retry 5 --max-time 10 --retry-delay 0 --retry-max-time 40 -v --unix-socket {sock} {url}" + result = subprocess.run(shlex.split(command), stdout=subprocess.PIPE) + return result.stdout + + +class DockerTest(unittest.TestCase): + def test_alice_alive(self): + result = curl("/tmp/socks/tarpn-shell-alice.sock", "http://dummy/healthcheck") + assert result == b"ok" + + def test_bob_alive(self): + result = curl("/tmp/socks/tarpn-shell-bob.sock", "http://dummy/healthcheck") + assert result == b"ok" + + def test_carol_alive(self): + result = curl("/tmp/socks/tarpn-shell-carol.sock", "http://dummy/healthcheck") + assert result == b"ok" + + @pytest.mark.timeout(120) + def test_convergence(self): + done = False + while not done: + alice_net = curl("/tmp/socks/tarpn-shell-alice.sock", "http://dummy/network") + bob_net = curl("/tmp/socks/tarpn-shell-bob.sock", "http://dummy/network") + carol_net = curl("/tmp/socks/tarpn-shell-carol.sock", "http://dummy/network") + + alice_node_data = json.loads(alice_net).get("nodes", []) + bob_node_data = json.loads(bob_net).get("nodes", []) + carol_node_data = json.loads(carol_net).get("nodes", []) + + if len(alice_node_data) == 2 and len(bob_node_data) == 2 and len(carol_node_data) == 2: + break