diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml
index 8fa660f..3b7042c 100644
--- a/.github/workflows/build.yml
+++ b/.github/workflows/build.yml
@@ -81,5 +81,56 @@ jobs:
prerelease: true
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
-
+ docker-release-build:
+ runs-on: ubuntu-latest
+ steps:
+ - uses: actions/checkout@v2
+ - name: Set up QEMU
+ id: qemu
+ uses: docker/setup-qemu-action@v1
+ - name: Set up Docker Buildx
+ uses: docker/setup-buildx-action@v1
+ - name: Docker build
+ id: docker_build
+ uses: docker/build-push-action@v2
+ with:
+ push: false
+ tags: tarpn/tarpn-core:latest
+ platforms: |
+ linux/amd64
+ linux/arm/v7
+ linux/arm64
+ docker-source-build:
+ runs-on: ubuntu-latest
+ steps:
+ - uses: actions/checkout@v2
+ - name: Install dependencies
+ run: |
+ python3 -m pip install --upgrade pip
+ python3 -m pip install virtualenv
+ python3 -m virtualenv venv
+ source venv/bin/activate
+ pip install -e .[develop]
+ python setup.py bdist_wheel
+ python setup.py sdist
+ - name: Set up QEMU
+ id: qemu
+ uses: docker/setup-qemu-action@v1
+ with:
+ platforms: amd64,arm,arm64
+ - name: Set up Docker Buildx
+ uses: docker/setup-buildx-action@v1
+ - name: Docker build
+ id: docker_build
+ uses: docker/build-push-action@v2
+ with:
+ push: false
+ context: .
+ file: tests/docker/Dockerfile
+ tags: tarpn/tarpn-test:latest
+ platforms: |
+ linux/amd64
+ linux/arm/v7
+ linux/arm64
+
diff --git a/.idea/misc.xml b/.idea/misc.xml
index 165dd0c..bcd8661 100644
--- a/.idea/misc.xml
+++ b/.idea/misc.xml
@@ -1,4 +1,7 @@
+
+
+
\ No newline at end of file
diff --git a/Dockerfile b/Dockerfile
index b8918d7..97bd14d 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -4,10 +4,8 @@ RUN pip install virtualenv
RUN python3 -m virtualenv /opt/tarpn
-ADD dist /dist
-
WORKDIR /opt/tarpn
-RUN ./bin/pip install /dist/*.whl
+RUN ./bin/pip install https://github.com/tarpn/tarpn-node-controller/releases/download/v0.1.4/tarpn-core-0.1.4.tar.gz
CMD [ "./bin/tarpn-node" ]
diff --git a/config/defaults.ini b/config/defaults.ini
index 60e8c36..86d35cd 100644
--- a/config/defaults.ini
+++ b/config/defaults.ini
@@ -9,10 +9,10 @@ node.call = ${mycall}-1
[network]
mesh.ttl = 7
mesh.hello.interval = 10
-mesh.dead.interval = 30
-mesh.advert.interval = 30
-mesh.advert.max.age = 100
-mesh.query.interval = 60
+mesh.dead.interval = 40
+mesh.advert.interval = 120
+mesh.advert.max.age = 600
+mesh.query.interval = 120
[app:demo]
# The 'app' configs are used to locate, run, and connect the app to the network
diff --git a/config/logging.ini b/config/logging.ini
index 2db0751..d31a516 100644
--- a/config/logging.ini
+++ b/config/logging.ini
@@ -51,7 +51,7 @@ args=(sys.stdout,)
[handler_node]
class=FileHandler
-level=NOTSET
+level=DEBUG
formatter=standard
args=('%(log.dir)s/node.log', 'w')
@@ -80,7 +80,7 @@ formatter=standard
args=('%(log.dir)s/netrom.log', 'w')
[formatter_standard]
-format=%(levelname)-8s %(asctime)s -- %(message)s
+format=%(levelname)-8s %(asctime)s %(threadName)-8s -- %(message)s
datefmt=
class=logging.Formatter
diff --git a/setup.py b/setup.py
index e8967e3..e3cd1fe 100644
--- a/setup.py
+++ b/setup.py
@@ -49,7 +49,8 @@ def read_file_contents(path):
'tarpn-serial-dump = tarpn.tools.serial_dump:main',
'tarpn-packet-dump = tarpn.tools.packet_dump:main',
'tarpn-node = tarpn.main:main',
- 'tarpn-app = tarpn.app.runner:main'
+ 'tarpn-app = tarpn.app.runner:main',
+ 'tarpn-shell = tarpn.tools.shell:main'
]},
python_requires='>=3.7',
install_requires=[
diff --git a/tarpn/application/node.py b/tarpn/application/node.py
new file mode 100644
index 0000000..19ae744
--- /dev/null
+++ b/tarpn/application/node.py
@@ -0,0 +1,188 @@
+import math
+import statistics
+import time
+import urllib
+from typing import Optional, Dict
+
+from flask import Flask, Response, request
+from flask.testing import EnvironBuilder
+from pyformance import global_registry
+
+from tarpn.datalink import L2Queuing
+from tarpn.network.mesh import MeshAddress
+from tarpn.network.mesh.protocol import MeshProtocol
+from tarpn.scheduler import Scheduler
+from tarpn.settings import Settings
+from tarpn.transport import Protocol, Transport
+from tarpn.transport.mesh_l4 import MeshTransportManager
+
+
+class TarpnCoreProtocol(Protocol):
+ def __init__(self,
+ settings: Settings, port_queues: Dict[int, L2Queuing],
+ network: MeshProtocol, transport_manager: MeshTransportManager, scheduler: Scheduler):
+ self.settings = settings
+ self.port_queues = port_queues
+ self.network = network
+ self.transport_manager = transport_manager
+ self.scheduler = scheduler
+
+ self.transport: Optional[Transport] = None
+ self.app: Flask = Flask("tarpn-core")
+ self.register_handlers()
+
+ def register_handlers(self):
+ @self.app.route("/")
+ def index():
+ return "welcome"
+
+ @self.app.route("/healthcheck")
+ def healhcheck():
+ return "ok"
+
+ @self.app.route("/ports")
+ def ports():
+ port_data = []
+ ports_settings = self.settings.port_configs()
+ for port, queue in self.port_queues.items():
+ port_config = ports_settings[port]
+ port_data.append({
+ "id": port,
+ "name": port_config.port_name(),
+ "mtu": queue.mtu(),
+ "qsize": queue.qsize(),
+ "config": port_config.as_dict()
+ })
+ return {"ports": port_data}
+
+ @self.app.route("/network")
+ def network():
+ out = {
+ "address": str(self.network.our_address),
+ "epoch": self.network.our_link_state_epoch
+ }
+ neighbors = []
+ for addr, neighbor in self.network.neighbors.items():
+ neighbors.append({
+ "address": str(addr),
+ "name": neighbor.name,
+ "last_seen": neighbor.last_seen,
+ "last_update": neighbor.last_update,
+ "status": str(neighbor.state)
+ })
+ out["neighbors"] = neighbors
+
+ nodes = []
+ for node, link_states in self.network.valid_link_states().items():
+ epoch = self.network.link_state_epochs.get(node)
+ nodes.append({
+ "address": str(node),
+ "epoch": epoch,
+ "link_states": str(link_states) # TODO fix this
+ })
+ out["nodes"] = nodes
+ return out
+
+ @self.app.route("/routes")
+ def routes():
+ out = []
+
+ for node, link_states in self.network.valid_link_states().items():
+ epoch = self.network.link_state_epochs.get(node)
+ path, path_cost = self.network.route_to(node)
+ if len(path) == 0:
+ continue
+ link = self.network.neighbors.get(path[1]).link_id
+ device_id = self.network.link_multiplexer.get_link_device_id(link)
+ out.append({
+ "address": str(node),
+ "epoch": epoch,
+ "path": [str(n) for n in path[1:]],
+ "port": device_id,
+ "cost": path_cost
+ })
+ return {"routes": out}
+
+ @self.app.route("/ping", methods=["POST"])
+ def ping():
+ address = request.args.get("address")
+ count = int(request.args.get("count", 3))
+ run_async = request.args.get("async", None)
+ timeout = int(request.args.get("timeout", 1000))
+ size = int(request.args.get("size", 100))
+
+ node = MeshAddress.parse(address)
+ seqs = []
+ times = []
+ lost = 0
+
+ def do_ping():
+ nonlocal lost
+ t0 = time.time_ns()
+ seq = self.network.ping_protocol.send_ping(node, size=size)
+ found = self.network.ping_protocol.wait_for_ping(node, seq, timeout_ms=timeout)
+ t1 = time.time_ns()
+ if found:
+ dt = int((t1 - t0) / 1000000.)
+ seqs.append(seq)
+ times.append(dt)
+ else:
+ lost += 1
+
+ if run_async:
+ futures = []
+ for _ in range(count):
+ futures.append(self.scheduler.run(do_ping))
+ for fut in futures:
+ fut.result(3000)
+ else:
+ for _ in range(count):
+ do_ping()
+
+ out = {
+ "address": address,
+ "seqs": seqs,
+ "times": times,
+ "lost": lost
+ }
+ if count > 1:
+ out["min"] = min(times)
+ out["max"] = max(times)
+ out["avg"] = statistics.mean(times)
+ out["stdev"] = statistics.stdev(times)
+ return out
+
+ @self.app.route("/metrics")
+ def metrics():
+ return global_registry().dump_metrics()
+
+ def connection_made(self, transport: Transport):
+ self.transport = transport
+
+ def connection_lost(self, exc):
+ self.transport = None
+
+ def data_received(self, data: bytes):
+ raw_lines = data.splitlines()
+ first = raw_lines[0].decode("ascii")
+ parts = first.split(" ")
+ verb, path, http_version = parts[0:3]
+ url_parts = urllib.parse.urlparse(path)
+ # TODO parse headers?
+ builder = EnvironBuilder(app=self.app,
+ path=url_parts.path,
+ query_string=url_parts.query,
+ method=verb,
+ data=b"\r\n".join(raw_lines[2:]),
+ content_type="text/plain")
+ env = builder.get_environ()
+
+ with self.app.request_context(env):
+ resp: Response = self.app.full_dispatch_request()
+ buf = bytearray()
+ buf.extend(f"HTTP/1.1 {resp.status}".encode("ISO-8859-1"))
+ buf.extend(b"\r\n")
+ buf.extend(str(resp.headers).encode("ISO-8859-1"))
+ if resp.data:
+ buf.extend(resp.data)
+ self.transport.write(buf) # TODO transport was None?
diff --git a/tarpn/application/shell.py b/tarpn/application/shell.py
index a425ba3..34a314a 100644
--- a/tarpn/application/shell.py
+++ b/tarpn/application/shell.py
@@ -8,9 +8,9 @@
from tarpn.network.mesh import MeshAddress
from tarpn.network.mesh.protocol import MeshProtocol
+from tarpn.transport import DatagramProtocol as DProtocol
from tarpn.transport import Protocol, Transport, L4Address
from tarpn.transport.mesh_l4 import MeshTransportManager
-from tarpn.transport import DatagramProtocol as DProtocol
class ShellDatagramProtocol(DProtocol):
diff --git a/tarpn/datalink/__init__.py b/tarpn/datalink/__init__.py
index acb43ac..86e1374 100644
--- a/tarpn/datalink/__init__.py
+++ b/tarpn/datalink/__init__.py
@@ -52,6 +52,12 @@ def take_inbound(self) -> Tuple[FrameData, int]:
"""
raise NotImplementedError
+ def qsize(self) -> int:
+ raise NotImplementedError
+
+ def mtu(self) -> int:
+ raise NotImplementedError
+
def close(self):
raise NotImplementedError
@@ -104,6 +110,12 @@ def __init__(self, queue_size, max_payload_size):
self._lock: threading.Lock = threading.Lock()
self._not_empty: threading.Condition = threading.Condition(self._lock)
+ def qsize(self) -> int:
+ return self._outbound.maxsize
+
+ def mtu(self) -> int:
+ return self._max_payload_size
+
def offer_outbound(self, frame: FrameData) -> bool:
if len(frame.data) > self._max_payload_size:
self.error(f"Payload too large, dropping. Size is {len(frame.data)}, max is {self._max_payload_size}")
@@ -134,6 +146,7 @@ def offer_inbound(self, frame: FrameData) -> None:
# Since the deque is bounded, this will eject old items to make room for this frame
with self._lock:
self.meter("inbound", "offer").mark()
+ #frame.timer = self.timer("inbound", "wait").time()
self._inbound.append((frame, next(self._inbound_count)))
self._not_empty.notify()
@@ -142,6 +155,7 @@ def take_inbound(self) -> Tuple[FrameData, int]:
while not len(self._inbound):
self._not_empty.wait()
inbound, count = self._inbound.popleft()
+ #inbound.timer.stop()
dropped = count - self._last_inbound - 1
self._last_inbound = count
self.meter("inbound", "take").mark()
diff --git a/tarpn/datalink/protocol.py b/tarpn/datalink/protocol.py
index 716e236..a37043c 100644
--- a/tarpn/datalink/protocol.py
+++ b/tarpn/datalink/protocol.py
@@ -5,6 +5,7 @@
from tarpn.datalink import FrameData, L2Queuing, L2Address
from tarpn.log import LoggingMixin
+from tarpn.metrics import MetricsMixin
from tarpn.network import L3Payload, L3Queueing
from tarpn.scheduler import Scheduler, CloseableThreadLoop
from tarpn.util import BackoffGenerator
@@ -121,7 +122,7 @@ def remove_link(self, link_id: int) -> None:
raise NotImplementedError
-class DefaultLinkMultiplexer(LinkMultiplexer):
+class DefaultLinkMultiplexer(LinkMultiplexer, MetricsMixin):
def __init__(self, queue_factory: Callable[[], L3Queueing], scheduler: Scheduler):
self.queue_factory = queue_factory
self.link_id_counter = itertools.count()
diff --git a/tarpn/io/serial.py b/tarpn/io/serial.py
index ab5d75e..6c3d7c1 100644
--- a/tarpn/io/serial.py
+++ b/tarpn/io/serial.py
@@ -9,26 +9,36 @@
from tarpn.io import IOProtocol
from tarpn.log import LoggingMixin
+from tarpn.metrics import MetricsMixin
from tarpn.scheduler import Scheduler, CloseableThreadLoop
from tarpn.util import CountDownLatch, BackoffGenerator
+class DummyLock:
+ def __enter__(self):
+ return None
+
+ def __exit__(self, _type, value, traceback):
+ pass
+
+
class SerialLoop(CloseableThreadLoop, ABC):
def __init__(self, name: str, ser: serial.Serial, protocol: IOProtocol,
- open_event: threading.Event, error_event: threading.Event, closed_latch: CountDownLatch):
+ open_event: threading.Event, error_event: threading.Event, closed_latch: CountDownLatch, lock: threading.Lock):
super().__init__(name=name)
self.ser = ser
self.protocol = protocol
self.open_event = open_event
self.error_event = error_event
self.closed_latch = closed_latch
+ self.lock = lock
def close(self):
super().close()
self.closed_latch.countdown()
-class SerialReadLoop(SerialLoop, LoggingMixin):
+class SerialReadLoop(SerialLoop, LoggingMixin, MetricsMixin):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
LoggingMixin.__init__(self,
@@ -38,16 +48,19 @@ def __init__(self, *args, **kwargs):
def iter_loop(self):
if self.open_event.wait(3.0):
try:
- data = self.ser.read(1024)
- if len(data) > 0:
- self.debug(f"Read {len(data)} bytes: {data}")
- self.protocol.handle_bytes(data)
+ with self.lock:
+ data = self.ser.read(1024)
+ if len(data) > 0:
+ self.debug(f"Read {len(data)} bytes: {data}")
+ self.meter("bytes", "in").mark(len(data)) # TODO add port number
+ self.protocol.handle_bytes(data)
except serial.SerialException:
self.exception("Failed to read bytes from serial device")
+ self.meter("error").mark()
self.error_event.set()
-class SerialWriteLoop(SerialLoop, LoggingMixin):
+class SerialWriteLoop(SerialLoop, LoggingMixin, MetricsMixin):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
LoggingMixin.__init__(self,
@@ -64,26 +77,31 @@ def iter_loop(self):
to_write = self.protocol.next_bytes_to_write()
if to_write is not None and len(to_write) > 0:
try:
- self.ser.write(to_write)
- self.ser.flush()
- self.debug(f"Wrote {len(to_write)} bytes: {to_write}")
- self.retry_backoff.reset()
+ with self.lock:
+ self.ser.write(to_write)
+ self.ser.flush()
+ self.meter("bytes", "out").mark(len(to_write)) # TODO add port number
+ self.debug(f"Wrote {len(to_write)} bytes: {to_write}")
+ self.retry_backoff.reset()
except serial.SerialTimeoutException as e:
self.unsent.append(to_write)
t = next(self.retry_backoff)
self.exception(f"Failed to write bytes to serial device, serial timed out. Retrying after {t}s.", e)
+ self.meter("timeout").mark()
sleep(t)
except serial.SerialException as e:
self.exception("Failed to write bytes to serial device", e)
+ self.meter("error").mark()
self.error_event.set()
-class SerialDevice(CloseableThreadLoop, LoggingMixin):
+class SerialDevice(CloseableThreadLoop, LoggingMixin, MetricsMixin):
def __init__(self,
protocol: IOProtocol,
device_name: str,
speed: int,
timeout: float,
+ duplex: bool,
scheduler: Scheduler):
LoggingMixin.__init__(self)
CloseableThreadLoop.__init__(self, f"Serial Device {device_name}")
@@ -94,14 +112,18 @@ def __init__(self,
self._ser = serial.Serial(port=None, baudrate=speed, timeout=timeout, write_timeout=timeout)
self._ser.port = device_name
self._closed_latch = CountDownLatch(2)
+ if not duplex:
+ self._duplex_lock = threading.Lock()
+ else:
+ self._duplex_lock = DummyLock()
self._open_event = threading.Event()
self._error_event = threading.Event()
self._open_backoff = BackoffGenerator(0.100, 1.2, 5.000)
# Submit the reader and writer threads first, so they will be shutdown first
self._scheduler.submit(SerialReadLoop(f"Serial Reader {self._ser.name}", self._ser,
- self._protocol, self._open_event, self._error_event, self._closed_latch))
+ self._protocol, self._open_event, self._error_event, self._closed_latch, self._duplex_lock))
self._scheduler.submit(SerialWriteLoop(f"Serial Writer {self._ser.name}", self._ser,
- self._protocol, self._open_event, self._error_event, self._closed_latch))
+ self._protocol, self._open_event, self._error_event, self._closed_latch, self._duplex_lock))
self._scheduler.submit(self)
def close(self) -> None:
@@ -132,6 +154,7 @@ def iter_loop(self):
try:
self._ser.open()
self.info(f"Opened serial port {self._device_name}")
+ # TODO metrics
self._open_event.set()
self._open_backoff.reset()
except serial.SerialException as err:
diff --git a/tarpn/io/udp.py b/tarpn/io/udp.py
index 511d278..93025af 100644
--- a/tarpn/io/udp.py
+++ b/tarpn/io/udp.py
@@ -11,7 +11,6 @@
from tarpn.log import LoggingMixin
from tarpn.scheduler import CloseableThread
-
packet_logger = logging.getLogger("packet")
udp_logger = logging.getLogger("udp")
diff --git a/tarpn/main.py b/tarpn/main.py
index 0d674a1..9982937 100644
--- a/tarpn/main.py
+++ b/tarpn/main.py
@@ -3,16 +3,20 @@
import logging.config
import os
import shutil
+import signal
import sys
from functools import partial
+from pyformance.reporters import ConsoleReporter
+
import tarpn.netrom.router
from tarpn.application import TransportMultiplexer, MultiplexingProtocol, ApplicationProtocol
from tarpn.application.command import NodeCommandProcessor
+from tarpn.application.node import TarpnCoreProtocol
from tarpn.application.shell import TarpnShellProtocol
from tarpn.ax25 import AX25Call
from tarpn.datalink import L2FIFOQueue
-from tarpn.datalink.ax25_l2 import AX25Protocol, DefaultLinkMultiplexer, AX25Address
+from tarpn.datalink.ax25_l2 import AX25Protocol, DefaultLinkMultiplexer
from tarpn.datalink.protocol import L2IOLoop
from tarpn.io.kiss import KISSProtocol
from tarpn.io.serial import SerialDevice
@@ -41,7 +45,7 @@ def main():
parser = argparse.ArgumentParser(description='Decode packets from a serial port')
parser.add_argument("config", nargs="?", default="config/node.ini", help="Config file")
parser.add_argument("--verbose", action="store_true", help="Enable debug logging")
- parser.add_argument("--profile", action="store_true", help="Attache a profiler to the process")
+ parser.add_argument("--profile", action="store_true", help="Attach a profiler to the process")
args = parser.parse_args()
if args.profile:
import cProfile
@@ -67,7 +71,7 @@ def run_node(args):
"https://github.com/tarpn/tarpn-node-controller")
sys.exit(1)
else:
- print(f"Loaded configuration for {node_call}")
+ print(f"Loaded configuration for {node_call} from {args.config}")
# Setup logging
logging_config_file = node_settings.get("log.config", "not_set")
@@ -75,6 +79,7 @@ def run_node(args):
log_dir = node_settings.get("log.dir")
if not os.path.exists(log_dir):
os.makedirs(log_dir)
+ print(f"Loading log configuration from {logging_config_file}")
logging.config.fileConfig(
logging_config_file, defaults={"log.dir": log_dir}, disable_existing_loggers=False)
@@ -111,7 +116,7 @@ def run_node(args):
intercept_dests = {}
interceptor = lambda frame: None
- for port_config in s.port_configs():
+ for port_id, port_config in s.port_configs().items():
if port_config.get_boolean("port.enabled") and port_config.get("port.type") == "serial":
l2_queueing = L2FIFOQueue(20, AX25Protocol.maximum_frame_size())
port_queues[port_config.port_id()] = l2_queueing
@@ -122,6 +127,7 @@ def run_node(args):
port_config.get("serial.device"),
port_config.get_int("serial.speed"),
port_config.get_float("serial.timeout"),
+ port_config.get_boolean("serial.duplex", True),
scheduler)
scheduler.submit(L2IOLoop(l2_queueing, l2))
@@ -181,17 +187,27 @@ def run_node(args):
scheduler.submit(UnixServerThread(app_config.app_socket(), app_protocol))
multiplexer_protocol = partial(MultiplexingProtocol, app_multiplexer)
# TODO bind or connect?
- mesh_l4.connect(multiplexer_protocol, app_address.address, MeshAddress.parse("00.a2"), app_address.port)
+ mesh_l4.connect(multiplexer_protocol, app_address.address, app_address.address, app_address.port)
sock = node_settings.get("node.sock")
print(f"Binding node terminal to {sock}")
- scheduler.submit(UnixServerThread(sock, TarpnShellProtocol(mesh_l3, mesh_l4)))
+ #scheduler.submit(UnixServerThread(sock, TarpnShellProtocol(mesh_l3, mesh_l4)))
+ scheduler.submit(UnixServerThread(sock, TarpnCoreProtocol(s, port_queues, mesh_l3, mesh_l4, scheduler)))
+
# Start a metrics reporter
- #reporter = ConsoleReporter(reporting_interval=300)
- #scheduler.timer(10_000, reporter.start, True)
+ #reporter = ConsoleReporter(reporting_interval=300, stream=sys.stdout)
+ #reporter.start()
#scheduler.add_shutdown_hook(reporter.stop)
+ def signal_handler(signum, frame):
+ logger.info(f"Shutting down due to {signal.Signals(signum).name}")
+ scheduler.shutdown()
+
+ signal.signal(signal.SIGTERM, signal_handler)
+ signal.signal(signal.SIGINT, signal_handler)
+ signal.signal(signal.SIGSTOP, signal_handler)
+
logger.info("Finished Startup")
try:
# Wait for all threads
diff --git a/tarpn/metrics.py b/tarpn/metrics.py
index 12d36aa..2060aa2 100644
--- a/tarpn/metrics.py
+++ b/tarpn/metrics.py
@@ -1,29 +1,29 @@
-from typing import Optional
+from typing import Optional, Callable
from pyformance import global_registry
-from pyformance.meters import Meter, Counter, Timer
+from pyformance.meters import Meter, Counter, Timer, Histogram, Gauge, CallbackGauge
class MetricsMixin:
- def _name(self, name):
+ def get_key(self, name, *extra):
pkg = self.__class__.__module__
clazz = self.__class__.__qualname__
- return f"{pkg}:{clazz}:{name}"
-
- def meter(self, name: str, event: Optional[str] = None) -> Meter:
- if event is not None:
- return global_registry().meter(self._name(f"{name}.{event}"))
- else:
- return global_registry().meter(self._name(name))
-
- def counter(self, name: str, event: Optional[str] = None) -> Counter:
- if event is not None:
- return global_registry().counter(self._name(f"{name}.{event}"))
- else:
- return global_registry().counter(self._name(name))
-
- def timer(self, name: str, event: Optional[str] = None) -> Timer:
- if event is not None:
- return global_registry().timer(self._name(f"{name}.{event}"))
- else:
- return global_registry().timer(self._name(name))
+ name_parts = [name]
+ name_parts.extend([str(e) for e in extra])
+ joined_name = ".".join(name_parts)
+ return f"{pkg}.{clazz}:{joined_name}"
+
+ def meter(self, name: str, *args) -> Meter:
+ return global_registry().meter(self.get_key(name, *args))
+
+ def counter(self, name: str, *args) -> Counter:
+ return global_registry().counter(self.get_key(name, *args))
+
+ def timer(self, name: str, *args) -> Timer:
+ return global_registry().timer(self.get_key(name, *args))
+
+ def hist(self, name: str, *args) -> Histogram:
+ return global_registry().histogram(self.get_key(name, *args))
+
+ def gauge(self, fun: Callable[[], float], name: str, *args) -> Gauge:
+ return global_registry().gauge(key=self.get_key(name, *args), gauge=CallbackGauge(fun))
\ No newline at end of file
diff --git a/tarpn/network/__init__.py b/tarpn/network/__init__.py
index 6e23ec5..aadddfd 100644
--- a/tarpn/network/__init__.py
+++ b/tarpn/network/__init__.py
@@ -3,9 +3,11 @@
from enum import IntEnum
from typing import List, Optional, Tuple
+from pyformance.meters.timer import TimerContext
+
from tarpn.datalink import L2Payload
from tarpn.log import LoggingMixin
-#from tarpn.transport import L4Protocol
+from tarpn.metrics import MetricsMixin
class QoS(IntEnum):
@@ -30,6 +32,7 @@ class L3Payload:
link_id: int = field(compare=False)
qos: QoS = field(compare=True, default=QoS.Default)
reliable: bool = field(compare=False, default=True)
+ timer: TimerContext = field(compare=False, default=None)
class L3Queueing:
@@ -40,21 +43,35 @@ def maybe_take(self) -> Optional[L3Payload]:
raise NotImplementedError
-class L3PriorityQueue(L3Queueing):
+class L3PriorityQueue(L3Queueing, MetricsMixin, LoggingMixin):
def __init__(self, max_size=20):
+ LoggingMixin.__init__(self)
self._queue = queue.PriorityQueue(max_size)
+ self.max_size = 0
+ self.gauge(self._queue.qsize, "qsize")
+ self.gauge(self.max_qsize, "max")
+
+ def max_qsize(self) -> int:
+ return self.max_size
def offer(self, packet: L3Payload):
try:
+ packet.timer = self.timer("qtime").time()
+ self.meter("offer").mark()
self._queue.put(packet, False, None)
+ self.max_size = max(self.max_size, self._queue.qsize())
return True
except queue.Full:
- print("Full!")
+ self.meter("full").mark()
+ self.error(f"Queue full, dropping {packet}")
return False
def maybe_take(self) -> Optional[L3Payload]:
try:
- return self._queue.get(True, 1.0)
+ packet = self._queue.get(True, 1.0)
+ packet.timer.stop()
+ self.meter("take").mark()
+ return packet
except queue.Empty:
return None
diff --git a/tarpn/network/mesh/header.py b/tarpn/network/mesh/header.py
index e4e4ba9..add3e00 100644
--- a/tarpn/network/mesh/header.py
+++ b/tarpn/network/mesh/header.py
@@ -94,10 +94,14 @@ class ControlType(IntEnum):
# Up to 0x7F
PING = 0x00
LOOKUP = 0x01
+ UNREACHABLE = 0x02
def _missing_(cls, value):
return None
+ def __str__(self):
+ return f"{self.name} ({self.value:02x})"
+
@dataclass(eq=True, frozen=True)
class ControlHeader(Header):
@@ -123,6 +127,9 @@ def encode(self, data: BytesIO):
ByteUtils.write_uint8(data, self.extra_length)
data.write(self.extra)
+ def __str__(self):
+ return f"CTRL {str(self.control_type)} req={self.is_request}"
+
@dataclass(eq=True, frozen=True)
class HelloHeader(Header):
@@ -147,6 +154,9 @@ def encode(self, data: BytesIO):
for neighbor in self.neighbors:
ByteUtils.write_uint16(data, neighbor.id)
+ def __str__(self):
+ return f"HELLO {self.name} {self.neighbors}"
+
@dataclass(eq=True, frozen=True)
class LinkStateHeader(Header):
@@ -171,6 +181,9 @@ def encode(self, data: BytesIO):
ByteUtils.write_uint16(data, self.via.id)
ByteUtils.write_uint8(data, self.quality)
+ def __repr__(self):
+ return f"(node={self.node} via={self.via} q={self.quality})"
+
@dataclass(eq=True, frozen=True)
class LinkStateAdvertisementHeader(Header):
@@ -206,6 +219,9 @@ def encode(self, data: BytesIO):
for link_state in self.link_states:
link_state.encode(data)
+ def __str__(self):
+ return f"ADVERT {self.node} epoch={self.epoch} links={self.link_states}"
+
@dataclass(eq=True, frozen=True)
class LinkStateQueryHeader(Header):
@@ -237,6 +253,9 @@ def encode(self, data: BytesIO):
ByteUtils.write_uint16(data, node.id)
ByteUtils.write_int8(data, epoch)
+ def __str__(self):
+ return f"QUERY {self.node} {self.epoch}"
+
class RecordType(IntEnum):
# up to 0xFF
diff --git a/tarpn/network/mesh/ping.py b/tarpn/network/mesh/ping.py
index e00c9f2..f5b8d0b 100644
--- a/tarpn/network/mesh/ping.py
+++ b/tarpn/network/mesh/ping.py
@@ -12,7 +12,7 @@
from tarpn.network import QoS
from tarpn.network.mesh import MeshAddress
from tarpn.network.mesh.header import ControlHeader, ControlType, NetworkHeader, Protocol
-from tarpn.util import secure_random_byte
+from tarpn.util import secure_random_byte, secure_random_data
@dataclasses.dataclass
@@ -41,8 +41,8 @@ def __init__(self, network):
self.stats: Dict[MeshAddress, PingStats] = defaultdict(PingStats)
LoggingMixin.__init__(self)
- def send_ping(self, node: MeshAddress) -> int:
- ctrl = ControlHeader(True, ControlType.PING, 1, bytes([secure_random_byte()]))
+ def send_ping(self, node: MeshAddress, size: int = 1) -> int:
+ ctrl = ControlHeader(True, ControlType.PING, size, secure_random_data(size))
network_header = NetworkHeader(
version=0,
qos=QoS.Higher,
@@ -59,7 +59,7 @@ def send_ping(self, node: MeshAddress) -> int:
stream.seek(0)
buffer = stream.read()
self.stats[node].results.append(
- PingResult(ctrl.extra[0], time.time_ns(), None, threading.Condition(self.mutex)))
+ PingResult(ctrl.extra[0], time.time_ns(), None, threading.Condition()))
self.network.send(network_header, buffer)
return ctrl.extra[0]
@@ -81,6 +81,7 @@ def handle_ping(self, network_header: NetworkHeader, ctrl: ControlHeader):
now = time.time_ns()
result = self.stats.get(network_header.source).get(ctrl.extra[0])
if result is not None:
+ self.debug(f"Got ping response {ctrl}")
result.end_ns = now
with result.cond:
result.cond.notify_all()
@@ -95,7 +96,7 @@ def wait_for_ping(self, node: MeshAddress, seq: int, timeout_ms: int) -> Optiona
if done:
return result
else:
- self.warning(f"Timed out waiting for ping {seq} from {node}")
+ self.warning(f"Timed out waiting for ping {seq} from {node} after {timeout_ms}ms")
return None
else:
self.warning(f"No ping from {node} found with seq {seq}")
diff --git a/tarpn/network/mesh/protocol.py b/tarpn/network/mesh/protocol.py
index aebd96c..1653f75 100644
--- a/tarpn/network/mesh/protocol.py
+++ b/tarpn/network/mesh/protocol.py
@@ -3,7 +3,7 @@
import queue
import threading
import time
-from datetime import datetime
+from datetime import datetime, timedelta
from functools import partial
from io import BytesIO
from typing import Tuple, List, Dict, Optional, Set, cast
@@ -15,6 +15,7 @@
from tarpn.datalink.ax25_l2 import AX25Address
from tarpn.datalink.protocol import LinkMultiplexer
from tarpn.log import LoggingMixin
+from tarpn.metrics import MetricsMixin
from tarpn.network import L3Protocol, L3Address, L3Payload, QoS
from tarpn.network.mesh import MeshAddress
from tarpn.network.mesh.header import Protocol, NetworkHeader, Header, HelloHeader, LinkStateHeader, \
@@ -50,6 +51,9 @@ class NeighborState(enum.Enum):
INIT = 2 # We've seen HELLO
UP = 3 # We've seen ourselves in a HELLO
+ def __repr__(self):
+ return self.name
+
@dataclasses.dataclass
class Neighbor:
@@ -62,7 +66,7 @@ class Neighbor:
state: NeighborState
-class MeshProtocol(CloseableThreadLoop, L3Protocol, LoggingMixin):
+class MeshProtocol(CloseableThreadLoop, L3Protocol, LoggingMixin, MetricsMixin):
"""
A simple protocol for a partially connected mesh network.
@@ -87,8 +91,8 @@ def __init__(self,
link_multiplexer: LinkMultiplexer,
l4_handlers: L4Handlers,
scheduler: Scheduler):
- LoggingMixin.__init__(self, extra_func=self.log_ident)
CloseableThreadLoop.__init__(self, name="MeshNetwork")
+ LoggingMixin.__init__(self, extra_func=self.log_ident)
self.time = time
self.config = config
@@ -124,14 +128,21 @@ def __init__(self,
self.last_hello_time = datetime.fromtimestamp(0)
self.last_advert = datetime.utcnow()
- self.last_query = datetime.utcnow()
+ self.last_query = datetime.utcnow() # TODO keep track of these per-neighbor
self.last_epoch_bump = datetime.utcnow()
+ self.gauge(self._get_queue_idle_value, "queue", "idle", "ratio")
+ self.gauge(self.get_current_epoch, "epoch")
+
+ # Start this thread a little bit in the future
self.scheduler.timer(1_000, partial(self.scheduler.submit, self), True)
def __repr__(self):
return f""
+ def get_current_epoch(self) -> int:
+ return self.our_link_state_epoch
+
def log_ident(self) -> str:
return f"[MeshProtocol {self.our_address}]"
@@ -177,7 +188,7 @@ def can_handle(self, protocol: int) -> bool:
def pre_close(self):
# Erase our neighbors and send ADVERT
self.neighbors.clear()
- self.our_link_state_epoch = next(self.our_link_state_epoch_generator)
+ self.bump_epoch(datetime.utcnow(), "closing")
self.send_advertisement()
time.sleep(1) # TODO better solution is to wait for L3 queue to drain in close
@@ -186,33 +197,58 @@ def close(self):
CloseableThreadLoop.close(self)
def iter_loop(self) -> bool:
+ loop_t0 = time.time_ns()
+
# Check if we need to take some periodic action like sending a HELLO
now = datetime.utcnow()
deadline = self.deadline(now)
+ # self.debug(f"Getting next event with deadline {deadline}")
# Now wait at most the deadline for the next action for new incoming packets
try:
+ self.meter("queue", "deadline").mark(deadline)
+ qtimer = self.timer("queue", "block").time()
event = self.queue.get(block=True, timeout=deadline)
+ qtimer.stop()
if event is not None:
+ timer = self.timer("queue", "process").time()
self.process_incoming(event)
+ timer.stop()
return True
except queue.Empty:
return False
+ def _get_queue_idle_value(self) -> float:
+ """Derived metric for L3 queue idle time percentage"""
+ busy = self.timer("queue", "process").get_mean()
+ idle = self.timer("queue", "block").get_mean()
+ if busy == 0 or idle == 0:
+ return 0.0
+ else:
+ return 1.0 * idle / (busy + idle)
+
def deadline(self, now: datetime) -> int:
# TODO use time.time_ns instead of datetime
return min([
self.check_dead_neighbors(now),
self.check_hello(now),
self.check_epoch(now),
- self.check_advert(now),
- self.check_query(now)
+ self.check_query(now),
+ self.check_advert(now)
])
def wakeup(self):
"""Wake up the main thread"""
self.queue.put(None)
+ def bump_epoch(self, now: datetime, reason: str = ""):
+ if now != self.last_epoch_bump:
+ self.meter("epoch", "bump").mark()
+ self.our_link_state_epoch = next(self.our_link_state_epoch_generator)
+ self.last_epoch_bump = now
+ self.debug(f"Bumping our epoch to {self.our_link_state_epoch} due to {reason}")
+ self.last_advert = datetime.fromtimestamp(0) # Force our advert to go out
+
def check_dead_neighbors(self, now: datetime) -> int:
min_deadline = self.config.get_int("mesh.dead.interval")
for neighbor in list(self.neighbors.values()):
@@ -222,11 +258,9 @@ def check_dead_neighbors(self, now: datetime) -> int:
deadline = self.config.get_int("mesh.dead.interval") - (now - neighbor.last_seen).seconds
if deadline <= 0:
self.info(f"Neighbor {neighbor.address} detected as DOWN!")
-
+ self.meter(f"neighbors.{neighbor.address}.down").mark()
neighbor.state = NeighborState.DOWN
- self.our_link_state_epoch = next(self.our_link_state_epoch_generator)
- self.last_epoch_bump = datetime.utcnow()
- self.last_advert = datetime.fromtimestamp(0) # Force our advert to go out
+ self.bump_epoch(datetime.utcnow(), "dead neighbor")
else:
min_deadline = min(deadline, min_deadline)
return min_deadline
@@ -246,6 +280,7 @@ def check_epoch(self, now: datetime) -> int:
for node, links in self.link_states.items():
for link in list(links):
if (now - link.created).seconds > max_age:
+ self.info(f"Lost link to {node}")
self.debug(f"Expiring link state {link} for {node}")
links.remove(link)
if len(links) == 0:
@@ -253,12 +288,11 @@ def check_epoch(self, now: datetime) -> int:
for node in to_delete:
del self.link_states[node]
+ del self.link_state_epochs[node]
deadline = int(max_age * .80) - (now - self.last_epoch_bump).seconds
if deadline <= 0:
- self.our_link_state_epoch = next(self.our_link_state_epoch_generator)
- self.last_epoch_bump = now
- self.send_advertisement()
+ self.bump_epoch(now, "max epoch age")
return int(max_age * .80)
else:
return deadline
@@ -298,14 +332,24 @@ def process_incoming(self, payload: L2Payload):
self.error(f"Could not decode network packet from {payload}.", e)
return
+ # Collect some metrics
+ self.meter("protocols", "in", network_header.protocol.name).mark()
+ self.meter("bytes", "in").mark(len(payload.l3_data))
+ self.meter("packets", "in").mark()
+ if network_header.destination in (self.our_address, self.BroadcastAddress):
+ self.meter("source", str(network_header.source), "in").mark()
+
# Handle L3 protocols first
if network_header.destination == self.our_address and network_header.protocol == Protocol.CONTROL:
ctrl = ControlHeader.decode(stream)
- self.info(f"Got {ctrl} from {network_header.source}")
+ self.debug(f"< {network_header.source} {network_header.destination} {ctrl}")
if ctrl.control_type == ControlType.PING:
self.ping_protocol.handle_ping(network_header, ctrl)
+ elif ctrl.control_type == ControlType.UNREACHABLE:
+ self.warning(f"< UNREACHABLE")
else:
self.warning(f"Ignoring unsupported control packet: {ctrl.control_type}")
+ self.meter("drop", "unknown").mark()
return
if network_header.protocol == Protocol.HELLO:
@@ -322,17 +366,19 @@ def process_incoming(self, payload: L2Payload):
# Now decide if we should handle or drop
if self.header_cache.contains(hash(network_header)):
+ self.meter("drop", "duplicate").mark()
self.debug(f"Dropping duplicate {network_header}")
return
# If the packet is addressed to us, handle it
if network_header.destination == self.our_address:
- self.debug(f"Handling {network_header}")
+ # TODO check for supported protocols here?
+ self.debug(f"Handling L4 {network_header}")
self.l4_handlers.handle_l4(network_header, network_header.protocol, stream)
return
if network_header.destination == self.BroadcastAddress:
- self.debug(f"Handling broadcast {network_header}")
+ self.debug(f"Handling L4 broadcast {network_header}")
self.l4_handlers.handle_l4(network_header, network_header.protocol, stream)
if network_header.ttl > 1:
# Decrease the TTL and re-broadcast on all links except where we heard it
@@ -342,8 +388,12 @@ def process_incoming(self, payload: L2Payload):
stream.seek(0)
self.send(header_copy, stream.read(), exclude_link_id=payload.link_id)
else:
+ self.meter("drop", "ttl").mark()
self.debug("Not re-broadcasting due to TTL")
else:
+ # If not addressed to us, forward it along
+ self.meter("packets", "forward").mark()
+ self.debug(f"> {network_header.source} {network_header.destination} {network_header.protocol.name} Forwarding")
header_copy = dataclasses.replace(network_header, ttl=network_header.ttl - 1)
stream.seek(0)
header_copy.encode(stream)
@@ -351,11 +401,11 @@ def process_incoming(self, payload: L2Payload):
self.send(header_copy, stream.read(), exclude_link_id=payload.link_id)
def handle_hello(self, link_id: int, network_header: NetworkHeader, hello: HelloHeader):
- self.debug(f"Handling hello {hello}")
+ self.debug(f"< {hello}")
now = datetime.utcnow()
sender = network_header.source
if network_header.source not in self.neighbors:
- self.info(f"Saw new neighbor {sender} ({hello.name})")
+ self.info(f"New neighbor {sender} ({hello.name})")
self.neighbors[sender] = Neighbor(
address=sender,
name=hello.name,
@@ -365,28 +415,28 @@ def handle_hello(self, link_id: int, network_header: NetworkHeader, hello: Hello
last_update=now,
state=NeighborState.INIT
)
- self.our_link_state_epoch = next(self.our_link_state_epoch_generator)
- self.last_epoch_bump = now
else:
self.neighbors[sender].neighbors = hello.neighbors
self.neighbors[sender].last_seen = now
if self.our_address in hello.neighbors:
- delay = 100
if self.neighbors[sender].state != NeighborState.UP:
- self.info(f"Neighbor {sender} is UP!")
- self.scheduler.timer(delay, partial(self.send_query, sender), auto_start=True)
+ self.info(f"Neighbor {sender} ({self.neighbors[sender].name}) is UP!")
+ # Set last_query to the max query interval minus a bit
+ self.last_query = now - timedelta(seconds=(self.config.get_int("mesh.query.interval") - 10))
+ self.meter(f"neighbors.{sender}.up").mark()
self.neighbors[sender].state = NeighborState.UP
self.neighbors[sender].last_update = now
- delay *= 1.2
+ self.bump_epoch(now, "new neighbor")
else:
- self.info(f"Neighbor {sender} is initializing...")
+ self.debug(f"Neighbor {sender} is initializing...")
self.neighbors[sender].state = NeighborState.INIT
self.neighbors[sender].last_update = now
+ self.send_hello()
def send_hello(self):
- self.debug("Sending Hello")
hello = HelloHeader(self.config.get("host.name"), self.alive_neighbors())
+ self.debug(f"> {hello}")
network_header = NetworkHeader(
version=0,
qos=QoS.Lower,
@@ -409,8 +459,11 @@ def handle_advertisement(self, link_id: int, network_header: NetworkHeader, adve
if advert.node == self.our_address:
return
+ self.debug(f"< {advert}")
+
latest_epoch = self.link_state_epochs.get(advert.node)
if latest_epoch is None:
+ self.info(f"New link for node {advert.node}")
self.debug(f"Initializing link state for {advert.node} with epoch {advert.epoch}")
update = True
else:
@@ -420,6 +473,7 @@ def handle_advertisement(self, link_id: int, network_header: NetworkHeader, adve
self.debug(f"Updating link state for {advert.node}. "
f"Received epoch is {advert.epoch}, last known was {latest_epoch}")
elif epoch_cmp == 0:
+ self.info(f"Reset link for node {advert.node}")
self.debug(f"Resetting link state for {advert.node}. "
f"Received epoch is {advert.epoch}, last known was {latest_epoch}")
else:
@@ -461,7 +515,7 @@ def send_advertisement(self):
link_states=link_states
)
- self.debug("Sending Advertisement {}".format(advertisement))
+ self.debug(f"> {advertisement}")
network_header = NetworkHeader(
version=0,
@@ -482,7 +536,7 @@ def send_advertisement(self):
self.send(network_header, buffer)
def handle_query(self, link_id: int, network_header: NetworkHeader, query: LinkStateQueryHeader):
- self.debug(f"Handling {query}")
+ self.debug(f"< {query}")
dest = network_header.source
adverts = []
@@ -529,7 +583,7 @@ def handle_query(self, link_id: int, network_header: NetworkHeader, query: LinkS
dest = network_header.source
for advert in adverts:
- self.debug(f"Sending {advert} advert to {network_header.source}")
+ self.debug(f"> {advert}")
resp_header = NetworkHeader(
version=0,
qos=QoS.Lower,
@@ -549,7 +603,6 @@ def handle_query(self, link_id: int, network_header: NetworkHeader, query: LinkS
self.send(resp_header, buffer)
def send_query(self, neighbor: MeshAddress):
- self.debug(f"Querying {neighbor} for link states")
known_link_states = dict()
for node, link_states in self.valid_link_states().items():
known_link_states[node] = self.link_state_epochs[node]
@@ -560,6 +613,7 @@ def send_query(self, neighbor: MeshAddress):
link_nodes=list(known_link_states.keys()),
link_epochs=list(known_link_states.values())
)
+ self.debug(f"> {query}")
network_header = NetworkHeader(
version=0,
@@ -579,7 +633,7 @@ def send_query(self, neighbor: MeshAddress):
buffer = stream.read()
self.send(network_header, buffer)
- def route_to(self, address: MeshAddress) -> List[MeshAddress]:
+ def route_to(self, address: MeshAddress) -> Tuple[List[MeshAddress], int]:
g = nx.DiGraph()
# Other's links
for node, link_states in self.valid_link_states().items():
@@ -593,12 +647,12 @@ def route_to(self, address: MeshAddress) -> List[MeshAddress]:
try:
# Compute the shortest path
path = nx.dijkstra_path(g, self.our_address, address)
-
+ path_weight = nx.path_weight(g, path, "weight")
# Ensure we have a return path
nx.dijkstra_path(g, address, self.our_address)
- return path
+ return path, path_weight
except NetworkXException:
- return []
+ return [], 0
def send(self, header: NetworkHeader, buffer: bytes, exclude_link_id: Optional[int] = None):
"""
@@ -616,21 +670,31 @@ def send(self, header: NetworkHeader, buffer: bytes, exclude_link_id: Optional[i
neighbor = self.neighbors.get(header.destination)
links = [neighbor.link_id]
else:
- best_route = self.route_to(header.destination)
- self.debug(f"Routing {header} via {best_route}")
+ best_route, cost = self.route_to(header.destination)
+ self.debug(f"Routing {header} via {best_route} with cost {cost}")
if len(best_route) > 1:
next_hop = best_route[1]
hop_neighbor = self.neighbors.get(next_hop)
if hop_neighbor is not None:
links = [hop_neighbor.link_id]
+ self.hist("route", "cost").add(cost)
+ self.hist("route", "cost", str(header.destination)).add(cost)
else:
self.error(f"Calculated route including {next_hop}, but we're missing that neighbor.")
links = []
else:
self.warning(f"No route to {header.destination}, dropping.")
+ self.meter("drop", "no-route").mark()
+ self.meter("drop", "no-route", str(header.destination)).mark()
links = []
for link_id in links:
+ self.meter("protocols", "out", header.protocol.name).mark()
+ self.meter("bytes", "out").mark(len(buffer))
+ self.meter("packets", "out").mark()
+ self.hist("packets", "ttl").add(header.ttl)
+ self.meter("source", str(header.source), "out").mark()
+ self.meter("dest", str(header.destination), "out").mark()
payload = L3Payload(
source=header.source,
destination=header.destination,
@@ -639,7 +703,7 @@ def send(self, header: NetworkHeader, buffer: bytes, exclude_link_id: Optional[i
link_id=link_id,
qos=QoS(header.qos),
reliable=False)
- self.debug(f"Sending {payload}")
+ #self.debug(f"Sending {payload}")
self.link_multiplexer.offer(payload)
def failed_send(self, neighbor: MeshAddress):
@@ -655,7 +719,7 @@ def route_packet(self, address: L3Address) -> Tuple[bool, int]:
if address == MeshProtocol.BroadcastAddress:
return True, l3_mtu
else:
- path = self.route_to(cast(MeshAddress, address))
+ path, cost = self.route_to(cast(MeshAddress, address))
return len(path) > 0, l3_mtu
def send_packet(self, payload: L3Payload) -> bool:
diff --git a/tarpn/scheduler.py b/tarpn/scheduler.py
index 36936b4..f909e09 100644
--- a/tarpn/scheduler.py
+++ b/tarpn/scheduler.py
@@ -59,8 +59,10 @@ def submit(self, thread: CloseableThread):
thread.start()
self.threads.append(thread)
- def run(self, runnable: Callable[..., Any]):
- self._futures.append(self.executor.submit(runnable))
+ def run(self, runnable: Callable[..., Any]) -> Future:
+ fut = self.executor.submit(runnable)
+ self._futures.append(fut)
+ return fut
def add_shutdown_hook(self, runnable: Callable[..., Any]):
self.shutdown_tasks.append(runnable)
diff --git a/tarpn/settings.py b/tarpn/settings.py
index ee700eb..7461970 100644
--- a/tarpn/settings.py
+++ b/tarpn/settings.py
@@ -23,7 +23,8 @@
_default_port_settings = {
"port.enabled": True,
- "serial.timeout": 0.100
+ "serial.timeout": 0.100,
+ "serial.duplex": True
}
@@ -38,81 +39,6 @@ def _default_basedir(app_name):
return os.path.expanduser(os.path.join("~", "." + app_name.lower()))
-class Settings:
- def __init__(self, basedir: str = None, paths: List[str] = None, defaults: Dict = None):
- self._init_basedir(basedir)
- self._configfiles = [os.path.join(self._basedir, path) for path in paths]
- self._config: Optional[configparser.ConfigParser] = None
- if defaults is None:
- self._defaults = dict()
- else:
- self._defaults = defaults
- self.load()
-
- def _init_basedir(self, basedir):
- if basedir is not None:
- self._basedir = basedir
- else:
- self._basedir = _default_basedir("TARPN")
-
- if not os.path.isdir(self._basedir):
- try:
- os.makedirs(self._basedir)
- except Exception:
- print(f"Could not create base folder at {self._basedir}. This is a fatal error, TARPN "
- "cannot run without a writable base folder.")
- raise
-
- def load(self):
- self._config = configparser.ConfigParser(defaults=self._defaults,
- interpolation=configparser.ExtendedInterpolation(),
- inline_comment_prefixes=";",
- default_section="default")
- self._config.read_dict(_default_settings)
- for path in self._configfiles:
- if os.path.exists(path):
- self._config.read(path)
- else:
- raise RuntimeError(f"No such config file {path}")
-
- def save(self):
- # self._config.write()
- return
-
- def node_config(self):
- return NodeConfig(self._config["node"])
-
- def port_configs(self):
- ports = []
- for section in self._config.sections():
- m = re.match(r"port:(\d+)", section)
- if m:
- ports.append(int(m.group(1)))
- port_configs = []
- for port in ports:
- port_sect = self._config[f"port:{port}"]
- port_configs.append(PortConfig.from_dict(port, port_sect))
- return port_configs
-
- def network_configs(self):
- return NetworkConfig(self._config["network"])
-
- def app_configs(self):
- apps = []
- for section in self._config.sections():
- m = re.match(r"app:(\w[\w\d]*)", section)
- if m:
- apps.append(m.group(1))
- app_configs = []
- for app in apps:
- app_sect = self._config[f"app:{app}"]
- app_configs.append(AppConfig.from_dict(app, app_sect))
- return app_configs
-
- def config_section(self, name):
- return Config(name, self._config[name])
-
-
class Config(Mapping):
def __init__(self, section_name, config_section):
self._section = section_name
@@ -187,14 +113,17 @@ def admin_listen(self) -> str:
class PortConfig(Config):
- def __init__(self, port_id, port_config):
+ def __init__(self, port_id: int, port_config):
super().__init__(f"port:{port_id}", port_config)
self._port_id = port_id
- def port_id(self):
+ def port_id(self) -> int:
return self._port_id
- def port_type(self):
+ def port_name(self) -> str:
+ return super().get("port.name", f"port-{self._port_id}")
+
+ def port_type(self) -> str:
return super().get("port.type")
@classmethod
@@ -264,3 +193,78 @@ def from_dict(cls, app_name: str, configs: dict):
parser.read_dict({f"app:{app_name}": configs})
config = parser[f"app:{app_name}"]
return cls(app_name, config)
+
+
+class Settings:
+ def __init__(self, basedir: str = None, paths: List[str] = None, defaults: Dict = None):
+ self._init_basedir(basedir)
+ self._configfiles = [os.path.join(self._basedir, path) for path in paths]
+ self._config: Optional[configparser.ConfigParser] = None
+ if defaults is None:
+ self._defaults = dict()
+ else:
+ self._defaults = defaults
+ self.load()
+
+ def _init_basedir(self, basedir):
+ if basedir is not None:
+ self._basedir = basedir
+ else:
+ self._basedir = _default_basedir("TARPN")
+
+ if not os.path.isdir(self._basedir):
+ try:
+ os.makedirs(self._basedir)
+ except Exception:
+ print(f"Could not create base folder at {self._basedir}. This is a fatal error, TARPN "
+ "cannot run without a writable base folder.")
+ raise
+
+ def load(self):
+ self._config = configparser.ConfigParser(defaults=self._defaults,
+ interpolation=configparser.ExtendedInterpolation(),
+ inline_comment_prefixes=";",
+ default_section="default")
+ self._config.read_dict(_default_settings)
+ for path in self._configfiles:
+ if os.path.exists(path):
+ self._config.read(path)
+ else:
+ raise RuntimeError(f"No such config file {path}")
+
+ def save(self):
+ # self._config.write()
+ return
+
+ def node_config(self):
+ return NodeConfig(self._config["node"])
+
+ def port_configs(self) -> Dict[int, PortConfig]:
+ ports = []
+ for section in self._config.sections():
+ m = re.match(r"port:(\d+)", section)
+ if m:
+ ports.append(int(m.group(1)))
+ port_configs = {}
+ for port in ports:
+ port_sect = self._config[f"port:{port}"]
+ port_configs[port] = PortConfig.from_dict(port, port_sect)
+ return port_configs
+
+ def network_configs(self):
+ return NetworkConfig(self._config["network"])
+
+ def app_configs(self):
+ apps = []
+ for section in self._config.sections():
+ m = re.match(r"app:(\w[\w\d]*)", section)
+ if m:
+ apps.append(m.group(1))
+ app_configs = []
+ for app in apps:
+ app_sect = self._config[f"app:{app}"]
+ app_configs.append(AppConfig.from_dict(app, app_sect))
+ return app_configs
+
+ def config_section(self, name):
+ return Config(name, self._config[name])
diff --git a/tarpn/tools/shell.py b/tarpn/tools/shell.py
new file mode 100644
index 0000000..51d7ba9
--- /dev/null
+++ b/tarpn/tools/shell.py
@@ -0,0 +1,74 @@
+import argparse
+import asyncio
+import signal
+import sys
+from asyncio import Protocol, Transport
+from typing import Optional
+
+from tarpn.util import shutdown
+
+
+class TTY(Protocol):
+ def __init__(self):
+ self.transport: Optional[Transport] = None
+
+ def connection_made(self, transport: Transport):
+ self.transport = transport
+ sys.stderr.write(f"Connected to {transport.get_extra_info('peername')}\r\n")
+ sys.stderr.flush()
+
+ def connection_lost(self, exc):
+ sys.stderr.write(f"Lost connection to {self.transport.get_extra_info('peername')}\r\n")
+ sys.stderr.flush()
+ self.transport = None
+ signal.raise_signal(signal.SIGTERM)
+
+ def data_received(self, data: bytes) -> None:
+ msg = data.decode("utf-8")
+ for line in msg.splitlines():
+ if line.startswith("Welcome to the TARPN shell") or line.startswith("(tarpn)"):
+ return
+ else:
+ sys.stdout.write(line.strip())
+ sys.stdout.flush()
+
+ def handle_stdin(self):
+ line = sys.stdin.readline()
+ if self.transport is not None:
+ if line == "": # Got a ^D
+ self.transport.close()
+ signal.raise_signal(signal.SIGTERM)
+ else:
+ line = line.strip()
+ self.transport.write(line + "\r\n")
+ else:
+ sys.stdout.write("Not connected\r\n")
+ sys.stdout.flush()
+
+ def handle_signal(self, loop, scheduler):
+ if self.transport is not None:
+ self.transport.close()
+ asyncio.create_task(shutdown(loop))
+ scheduler.shutdown()
+
+
+async def async_main():
+ parser = argparse.ArgumentParser(description='Open a shell to a running tarpn-core')
+ parser.add_argument("sock", help="Path to unix socket")
+ parser.add_argument("cmd", help="Command to send")
+ args = parser.parse_args()
+
+ tty = TTY()
+ loop = asyncio.get_event_loop()
+ loop.add_reader(sys.stdin, tty.handle_stdin)
+ (transport, protocol) = await loop.create_unix_connection(lambda: tty, path=args.sock)
+ transport.write(args.cmd.encode("utf-8"))
+ await asyncio.sleep(1)
+
+
+def main():
+ asyncio.run(async_main())
+
+
+if __name__ == "__main__":
+ main()
\ No newline at end of file
diff --git a/tarpn/transport/mesh_l4.py b/tarpn/transport/mesh_l4.py
index 1ab06b7..d1179d2 100644
--- a/tarpn/transport/mesh_l4.py
+++ b/tarpn/transport/mesh_l4.py
@@ -6,7 +6,7 @@
from tarpn.log import LoggingMixin
from tarpn.network import L3Protocol, L3Address
from tarpn.network.mesh import MeshAddress
-from tarpn.network.mesh.header import DatagramHeader, Datagram, BroadcastHeader
+from tarpn.network.mesh.header import DatagramHeader, Datagram
from tarpn.transport.mesh.broadcast import BroadcastProtocol
from tarpn.transport.mesh.datagram import DatagramProtocol
from tarpn.transport import DatagramProtocol as DProtocol
@@ -107,8 +107,8 @@ def remote_address(self) -> Optional[L3Address]:
class BroadcastTransport(BTransport):
"""
- A channel for sending and receiving datagrams
- """
+ A channel for broadcasting datagrams
+ """
def __init__(self,
network: L3Protocol,
@@ -133,6 +133,7 @@ def close(self) -> None:
self.closing = True
def write(self, data: Any) -> None:
+ # Route only for the purposes of getting the MTU
_, mtu = self.network.route_packet(MeshAddress.parse("ff.ff"))
if isinstance(data, str):
encoded_data = data.encode("utf-8")
@@ -188,7 +189,10 @@ def connect(self, protocol_factory: Callable[[], DProtocol],
return protocol
def broadcast(self, protocol_factory: Callable[[], DProtocol],
- local_address: MeshAddress, port: int) -> DProtocol:
+ local_address: MeshAddress, port: int) -> DProtocol:
+ """
+ Create a broadcast transport for a given local address and port
+ """
if port in self.connections:
if self.connections[port][0].is_closing():
del self.connections[port]
@@ -196,7 +200,7 @@ def broadcast(self, protocol_factory: Callable[[], DProtocol],
raise RuntimeError(f"Connection to {port} is already open")
protocol = protocol_factory()
transport = BroadcastTransport(self.l3_protocol, self.broadcast_protocol, self.datagram_protocol, port,
- local_address, MeshAddress.parse("ff.ff"))
+ local_address, MeshAddress.parse("ff.ff"))
protocol.connection_made(transport)
self.connections[port] = (transport, protocol)
return protocol
diff --git a/tarpn/util.py b/tarpn/util.py
index ecfbe59..58f9f76 100644
--- a/tarpn/util.py
+++ b/tarpn/util.py
@@ -433,4 +433,8 @@ def lollipop_compare(old_epoch: int, new_epoch: int) -> int:
def secure_random_byte() -> int:
- return struct.unpack(' bytes:
+ return secrets.token_bytes(size)
\ No newline at end of file
diff --git a/tests/docker/Dockerfile b/tests/docker/Dockerfile
new file mode 100644
index 0000000..4d2d331
--- /dev/null
+++ b/tests/docker/Dockerfile
@@ -0,0 +1,23 @@
+FROM python:3.7-slim-bullseye
+
+RUN apt-get update; apt-get install -y socat netcat iproute2 dnsutils curl jq procps vim
+
+RUN curl -sSL https://get.docker.com/ | sh
+
+RUN pip install virtualenv
+
+RUN python3 -m virtualenv /opt/tarpn
+
+WORKDIR /opt/tarpn
+
+RUN ./bin/pip install pytest pytest-timeout
+
+COPY dist /dist
+
+COPY tests/docker/bin ./bin
+
+COPY tests/docker/test_curl.py ./tests/test_curl.py
+
+RUN ./bin/pip install /dist/*.whl
+
+CMD [ "./bin/tarpn-node" ]
diff --git a/tests/docker/README.md b/tests/docker/README.md
new file mode 100644
index 0000000..910f528
--- /dev/null
+++ b/tests/docker/README.md
@@ -0,0 +1,20 @@
+# Testing with Docker
+
+Build the test image from the project root directory:
+
+> docker build -f tests/docker/Dockerfile -t tarpn/tarpn-test:latest .
+
+Define a network as done in docker-compose.yml, run it
+
+> docker-compose -f tests/docker/docker-compose.yml up
+
+In a separate shell, run this docker-compose to simulate slow links
+
+> docker-compose -f tests/docker/docker-compose-add-latency.yml up
+
+Now, connect to the unix socket with curl from (yet another) docker container to inspect a node
+
+> docker run -v /tmp/socks:/tmp/socks tarpn/tarpn-test:latest curl --unix-socket /tmp/socks/tarpn-shell-alice.sock "http://dummy/metrics"
+
+The reason for running curl within a container is that there are some issues with a Mac host and linux Docker container
+sharing a domain socket. Not sure if it's curl or the socket itself.
diff --git a/tests/docker/bin/launch-tarpn.sh b/tests/docker/bin/launch-tarpn.sh
new file mode 100755
index 0000000..10a2009
--- /dev/null
+++ b/tests/docker/bin/launch-tarpn.sh
@@ -0,0 +1,32 @@
+#!/bin/bash
+
+# Launcher script for tarpn-node. Launches some number of socat processes before running tarpn-node.
+# The arguments for the socat commands are given as SOCAT_ARGS delimited by a pipe (|).
+
+IFS="|"
+i=0
+for SOCAT_ARG in $SOCAT_ARGS
+do
+ IFS=" "
+ socat -x -d -d $SOCAT_ARG > /var/log/socat-${i}.log 2>&1 &
+ IFS="|"
+ i=$((i+1))
+done
+
+if [[ -v SLEEP ]];
+then
+ sleep $SLEEP
+fi
+
+cleanup() {
+ killall socat
+}
+
+trap 'cleanup' SIGTERM
+
+if [[ -v DEBUG ]];
+then
+ exec env PYTHONFAULTHANDLER=true /opt/tarpn/bin/tarpn-node --verbose
+else
+ exec /opt/tarpn/bin/tarpn-node
+fi
diff --git a/tests/docker/bin/tc.sh b/tests/docker/bin/tc.sh
new file mode 100755
index 0000000..c08ada3
--- /dev/null
+++ b/tests/docker/bin/tc.sh
@@ -0,0 +1,42 @@
+#!/bin/bash
+
+OPTS=""
+
+if [[ -z DURATION ]];
+then
+ echo "Missing DURATION"
+ exit 1
+fi
+
+if [[ -v LOSS ]];
+then
+ OPTS="$OPTS loss $LOSS"
+fi
+
+if [[ -v DELAY ]];
+then
+ OPTS="$OPTS delay ${DELAY}ms ${DELAY_JITTER:-10}ms ${DELAY_CORRELATION:-20}.00"
+fi
+
+OPTS="$OPTS rate ${RATE:-1}kbit"
+
+docker ps
+
+for CONTAINER in $CONTAINERS
+do
+ echo "Running on $CONTAINER: tc qdisc add dev eth0 root netem $OPTS "
+ docker exec $CONTAINER tc qdisc add dev eth0 root netem $OPTS
+done
+
+cleanup() {
+ echo "cleanup!"
+ for CONTAINER in $CONTAINERS
+ do
+ echo "Running on $CONTAINER: tc qdisc del dev eth0 root netem"
+ docker exec $CONTAINER tc qdisc del dev eth0 root netem
+ done
+}
+
+trap 'cleanup' EXIT
+
+sleep $DURATION
\ No newline at end of file
diff --git a/tests/docker/config/alice.ini b/tests/docker/config/alice.ini
new file mode 100644
index 0000000..a5b1c58
--- /dev/null
+++ b/tests/docker/config/alice.ini
@@ -0,0 +1,33 @@
+[default]
+mycall=AL1CE
+
+[node]
+log.dir = logs
+log.config = config/logging.ini
+
+node.call = ${mycall}-9
+node.alias = ALICE
+node.sock = /tmp/socks/tarpn-shell-alice.sock
+
+[port:1]
+port.enabled = True
+port.type = serial
+port.framing = kiss
+port.bitrate = 9600
+kiss.checksum = false
+serial.device = /tmp/vmodem1
+serial.speed = 9600
+
+[network]
+host.name = alice
+mesh.enabled = True
+mesh.address = 00.aa
+mesh.ttl = 7
+
+[app:demo]
+app.address = mesh://ff.ff:100
+app.call = ${mycall}-7
+app.alias = DEMO
+app.sock = /tmp/socks/tarpn-demo-a.sock
+app.module = plugins.demo
+app.class = DemoApp
diff --git a/tests/docker/config/bob.ini b/tests/docker/config/bob.ini
new file mode 100644
index 0000000..8aa786c
--- /dev/null
+++ b/tests/docker/config/bob.ini
@@ -0,0 +1,43 @@
+[default]
+mycall=B0B
+
+[node]
+log.dir = logs
+log.config = config/logging.ini
+
+node.call = ${mycall}-9
+node.alias = BOB
+node.sock = /tmp/socks/tarpn-shell-bob.sock
+
+[port:1]
+port.enabled = True
+port.type = serial
+port.framing = kiss
+port.bitrate = 9600
+kiss.checksum = false
+serial.device = /tmp/vmodem1
+serial.speed = 9600
+
+[port:2]
+port.enabled = True
+port.type = serial
+port.framing = kiss
+port.bitrate = 9600
+kiss.checksum = false
+serial.device = /tmp/vmodem2
+serial.speed = 9600
+
+[network]
+host.name = bob
+mesh.enabled = True
+mesh.address = 00.ab
+mesh.ttl = 7
+
+
+[app:demo]
+app.address = mesh://ff.ff:100
+app.call = ${mycall}-7
+app.alias = DEMO
+app.sock = /tmp/socks/tarpn-demo-b.sock
+app.module = plugins.demo
+app.class = DemoApp
\ No newline at end of file
diff --git a/tests/docker/config/carol.ini b/tests/docker/config/carol.ini
new file mode 100644
index 0000000..7c1c690
--- /dev/null
+++ b/tests/docker/config/carol.ini
@@ -0,0 +1,51 @@
+[default]
+mycall=C4ROL
+
+[node]
+log.dir = logs
+log.config = config/logging.ini
+
+node.call = ${mycall}-9
+node.alias = CAROL
+node.sock = /tmp/socks/tarpn-shell-carol.sock
+
+[port:1]
+port.enabled = True
+port.type = serial
+port.framing = kiss
+port.bitrate = 9600
+kiss.checksum = false
+serial.device = /tmp/vmodem1
+serial.speed = 9600
+
+[port:2]
+port.enabled = True
+port.type = serial
+port.framing = kiss
+port.bitrate = 9600
+kiss.checksum = false
+serial.device = /tmp/vmodem2
+serial.speed = 9600
+
+[port:3]
+port.enabled = True
+port.type = serial
+port.framing = kiss
+port.bitrate = 9600
+kiss.checksum = false
+serial.device = /tmp/vmodem3
+serial.speed = 9600
+
+[network]
+host.name = carol
+mesh.enabled = True
+mesh.address = 00.ac
+mesh.ttl = 7
+
+[app:demo]
+app.address = mesh://ff.ff:100
+app.call = ${mycall}-7
+app.alias = DEMO
+app.sock = /tmp/socks/tarpn-demo-c.sock
+app.module = plugins.demo
+app.class = DemoApp
diff --git a/tests/docker/config/dave.ini b/tests/docker/config/dave.ini
new file mode 100644
index 0000000..96ba209
--- /dev/null
+++ b/tests/docker/config/dave.ini
@@ -0,0 +1,33 @@
+[default]
+mycall=D4VE
+
+[node]
+log.dir = logs
+log.config = config/logging.ini
+
+node.call = ${mycall}-9
+node.alias = DAVE
+node.sock = /tmp/socks/tarpn-shell-dave.sock
+
+[port:1]
+port.enabled = True
+port.type = serial
+port.framing = kiss
+port.bitrate = 9600
+kiss.checksum = false
+serial.device = /tmp/vmodem1
+serial.speed = 9600
+
+[network]
+host.name = dave
+mesh.enabled = True
+mesh.address = 00.ad
+mesh.ttl = 7
+
+[app:demo]
+app.address = mesh://ff.ff:100
+app.call = ${mycall}-7
+app.alias = DEMO
+app.sock = /tmp/socks/tarpn-demo-d.sock
+app.module = plugins.demo
+app.class = DemoApp
diff --git a/tests/docker/config/eve.ini b/tests/docker/config/eve.ini
new file mode 100644
index 0000000..c77563a
--- /dev/null
+++ b/tests/docker/config/eve.ini
@@ -0,0 +1,33 @@
+[default]
+mycall=E4E
+
+[node]
+log.dir = logs
+log.config = config/logging.ini
+
+node.call = ${mycall}-9
+node.alias = EVE
+node.sock = /tmp/socks/tarpn-shell-eve.sock
+
+[port:1]
+port.enabled = True
+port.type = serial
+port.framing = kiss
+port.bitrate = 9600
+kiss.checksum = false
+serial.device = /tmp/vmodem1
+serial.speed = 9600
+
+[network]
+host.name = eve
+mesh.enabled = True
+mesh.address = 00.ae
+mesh.ttl = 7
+
+[app:demo]
+app.address = mesh://ff.ff:100
+app.call = ${mycall}-7
+app.alias = DEMO
+app.sock = /tmp/socks/tarpn-demo-e.sock
+app.module = plugins.demo
+app.class = DemoApp
diff --git a/tests/docker/docker-compose-add-latency.yml b/tests/docker/docker-compose-add-latency.yml
new file mode 100644
index 0000000..6a2e0ef
--- /dev/null
+++ b/tests/docker/docker-compose-add-latency.yml
@@ -0,0 +1,14 @@
+services:
+ tc:
+ image: tarpn/tarpn-test:latest
+ environment:
+ - DURATION=300
+ #- LOSS=5
+ - DELAY=100
+ - CONTAINERS=alice bob carol dave eve
+ cap_add:
+ - NET_ADMIN
+ volumes:
+ - /tmp/socks/:/tmp/socks/
+ - /var/run/docker.sock:/var/run/docker.sock
+ entrypoint: /opt/tarpn/bin/tc.sh
diff --git a/tests/docker/docker-compose-curl-tests.yml b/tests/docker/docker-compose-curl-tests.yml
new file mode 100644
index 0000000..c1a2152
--- /dev/null
+++ b/tests/docker/docker-compose-curl-tests.yml
@@ -0,0 +1,10 @@
+services:
+ test:
+ container_name: test
+ depends_on:
+ alice:
+ condition: service_healthy
+ image: tarpn/tarpn-test:latest
+ volumes:
+ - /tmp/socks/:/tmp/socks/
+ command: /opt/tarpn/bin/pytest /opt/tarpn/tests/test_curl.py
diff --git a/tests/docker/docker-compose.yml b/tests/docker/docker-compose.yml
new file mode 100644
index 0000000..57af3c6
--- /dev/null
+++ b/tests/docker/docker-compose.yml
@@ -0,0 +1,76 @@
+services:
+ alice:
+ # Connected to bob
+ image: tarpn/tarpn-test:latest
+ container_name: alice
+ cap_add:
+ - NET_ADMIN
+ environment:
+ - SOCAT_ARGS=PTY,raw,echo=1,link=/tmp/vmodem1 udp-listen:12345
+ - SLEEP=1
+ - DEBUG=true
+ volumes:
+ - ./config/alice.ini:/opt/tarpn/config/node.ini
+ - /tmp/socks/:/tmp/socks/
+ command: /opt/tarpn/bin/launch-tarpn.sh
+ healthcheck:
+ test: [ "CMD", "curl", "--unix-socket", "/tmp/socks/tarpn-shell-alice.sock", "http://dummy/healthcheck" ]
+ interval: 10s
+ timeout: 5s
+ retries: 5
+ bob:
+ # Connected to alice and carol
+ image: tarpn/tarpn-test:latest
+ container_name: bob
+ cap_add:
+ - NET_ADMIN
+ environment:
+ - SOCAT_ARGS=PTY,raw,echo=1,link=/tmp/vmodem1 udp:alice:12345|PTY,raw,echo=1,link=/tmp/vmodem2 udp:carol:10000
+ - SLEEP=2
+ - DEBUG=true
+ volumes:
+ - ./config/bob.ini:/opt/tarpn/config/node.ini
+ - /tmp/socks/:/tmp/socks/
+ command: /opt/tarpn/bin/launch-tarpn.sh
+ carol:
+ # Connected to alice and dave and eve
+ image: tarpn/tarpn-test:latest
+ container_name: carol
+ cap_add:
+ - NET_ADMIN
+ environment:
+ - SOCAT_ARGS=PTY,raw,echo=1,link=/tmp/vmodem1 udp-listen:10000|PTY,raw,echo=1,link=/tmp/vmodem2 udp-listen:10001|PTY,raw,echo=1,link=/tmp/vmodem3 udp:eve:10002
+ - SLEEP=3
+ - DEBUG=true
+ volumes:
+ - ./config/carol.ini:/opt/tarpn/config/node.ini
+ - /tmp/socks/:/tmp/socks/
+ command: /opt/tarpn/bin/launch-tarpn.sh
+ dave:
+ # Connected to carol
+ image: tarpn/tarpn-test:latest
+ container_name: dave
+ cap_add:
+ - NET_ADMIN
+ environment:
+ - SOCAT_ARGS=PTY,raw,echo=1,link=/tmp/vmodem1 udp:carol:10001
+ - SLEEP=1
+ - DEBUG=true
+ volumes:
+ - ./config/dave.ini:/opt/tarpn/config/node.ini
+ - /tmp/socks/:/tmp/socks/
+ command: /opt/tarpn/bin/launch-tarpn.sh
+ eve:
+ # Connected to carol
+ image: tarpn/tarpn-test:latest
+ container_name: eve
+ cap_add:
+ - NET_ADMIN
+ environment:
+ - SOCAT_ARGS=PTY,raw,echo=1,link=/tmp/vmodem1 udp-listen:10002
+ - SLEEP=2
+ - DEBUG=true
+ volumes:
+ - ./config/eve.ini:/opt/tarpn/config/node.ini
+ - /tmp/socks/:/tmp/socks/
+ command: /opt/tarpn/bin/launch-tarpn.sh
\ No newline at end of file
diff --git a/tests/docker/test_curl.py b/tests/docker/test_curl.py
new file mode 100644
index 0000000..9a73895
--- /dev/null
+++ b/tests/docker/test_curl.py
@@ -0,0 +1,41 @@
+import json
+import shlex
+import subprocess
+import unittest
+
+import pytest
+
+
+def curl(sock: str, url: str):
+ command = f"curl --retry 5 --max-time 10 --retry-delay 0 --retry-max-time 40 -v --unix-socket {sock} {url}"
+ result = subprocess.run(shlex.split(command), stdout=subprocess.PIPE)
+ return result.stdout
+
+
+class DockerTest(unittest.TestCase):
+ def test_alice_alive(self):
+ result = curl("/tmp/socks/tarpn-shell-alice.sock", "http://dummy/healthcheck")
+ assert result == b"ok"
+
+ def test_bob_alive(self):
+ result = curl("/tmp/socks/tarpn-shell-bob.sock", "http://dummy/healthcheck")
+ assert result == b"ok"
+
+ def test_carol_alive(self):
+ result = curl("/tmp/socks/tarpn-shell-carol.sock", "http://dummy/healthcheck")
+ assert result == b"ok"
+
+ @pytest.mark.timeout(120)
+ def test_convergence(self):
+ done = False
+ while not done:
+ alice_net = curl("/tmp/socks/tarpn-shell-alice.sock", "http://dummy/network")
+ bob_net = curl("/tmp/socks/tarpn-shell-bob.sock", "http://dummy/network")
+ carol_net = curl("/tmp/socks/tarpn-shell-carol.sock", "http://dummy/network")
+
+ alice_node_data = json.loads(alice_net).get("nodes", [])
+ bob_node_data = json.loads(bob_net).get("nodes", [])
+ carol_node_data = json.loads(carol_net).get("nodes", [])
+
+ if len(alice_node_data) == 2 and len(bob_node_data) == 2 and len(carol_node_data) == 2:
+ break