From 0da908da764d98ef6f4357be121a37fca5e40e39 Mon Sep 17 00:00:00 2001 From: David Arthur Date: Mon, 3 Jan 2022 19:50:32 -0500 Subject: [PATCH 01/16] Add Docker build --- .github/workflows/build.yml | 17 ++++++++++++++++- Dockerfile | 4 +--- tarpn/io/udp.py | 1 - tarpn/main.py | 3 ++- tests/docker/Dockerfile | 17 +++++++++++++++++ tests/docker/README.md | 17 +++++++++++++++++ tests/docker/bin/launch-tarpn.sh | 14 ++++++++++++++ 7 files changed, 67 insertions(+), 6 deletions(-) create mode 100644 tests/docker/Dockerfile create mode 100644 tests/docker/README.md create mode 100755 tests/docker/bin/launch-tarpn.sh diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 8fa660f..8f54634 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -81,5 +81,20 @@ jobs: prerelease: true env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - + docker-build: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - name: Set up QEMU + id: qemu + uses: docker/setup-qemu-action@v1 + with: + platforms: amd64,arm,arm64 + - name: Docker build + id: docker_build + uses: docker/build-push-action@v2 + with: + push: false + tags: tarpn/tarpn-core:latest + diff --git a/Dockerfile b/Dockerfile index b8918d7..97bd14d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -4,10 +4,8 @@ RUN pip install virtualenv RUN python3 -m virtualenv /opt/tarpn -ADD dist /dist - WORKDIR /opt/tarpn -RUN ./bin/pip install /dist/*.whl +RUN ./bin/pip install https://github.com/tarpn/tarpn-node-controller/releases/download/v0.1.4/tarpn-core-0.1.4.tar.gz CMD [ "./bin/tarpn-node" ] diff --git a/tarpn/io/udp.py b/tarpn/io/udp.py index 511d278..93025af 100644 --- a/tarpn/io/udp.py +++ b/tarpn/io/udp.py @@ -11,7 +11,6 @@ from tarpn.log import LoggingMixin from tarpn.scheduler import CloseableThread - packet_logger = logging.getLogger("packet") udp_logger = logging.getLogger("udp") diff --git a/tarpn/main.py b/tarpn/main.py index 0d674a1..2ce26e1 100644 --- a/tarpn/main.py +++ b/tarpn/main.py @@ -67,7 +67,7 @@ def run_node(args): "https://github.com/tarpn/tarpn-node-controller") sys.exit(1) else: - print(f"Loaded configuration for {node_call}") + print(f"Loaded configuration for {node_call} from {args.config}") # Setup logging logging_config_file = node_settings.get("log.config", "not_set") @@ -75,6 +75,7 @@ def run_node(args): log_dir = node_settings.get("log.dir") if not os.path.exists(log_dir): os.makedirs(log_dir) + print(f"Loading log configuration from {logging_config_file}") logging.config.fileConfig( logging_config_file, defaults={"log.dir": log_dir}, disable_existing_loggers=False) diff --git a/tests/docker/Dockerfile b/tests/docker/Dockerfile new file mode 100644 index 0000000..8e3b5ac --- /dev/null +++ b/tests/docker/Dockerfile @@ -0,0 +1,17 @@ +FROM python:3.7-slim-bullseye + +RUN apt-get update; apt-get install -y socat netcat iproute2 + +RUN pip install virtualenv + +RUN python3 -m virtualenv /opt/tarpn + +COPY dist /dist + +COPY tests/docker/bin /opt/tarpn/bin + +WORKDIR /opt/tarpn + +RUN ./bin/pip install /dist/*.whl + +CMD [ "./bin/tarpn-node" ] diff --git a/tests/docker/README.md b/tests/docker/README.md new file mode 100644 index 0000000..1307039 --- /dev/null +++ b/tests/docker/README.md @@ -0,0 +1,17 @@ +# Testing with Docker + +Build the test image from the project root directory: + +> docker build -f tests/docker/Dockerfile -t tarpn/tarpn-test:latest . + +Define a network as done in docker-compose.yml, run it + +> docker-compose -f tests/docker/docker-compose.yml up + +Connect to one of the nodes using a domain socket + +> docker run -i -t -v /tmp/socks/:/tmp/socks/ tarpn/tarpn-core:latest nc -U /tmp/socks/tarpn-shell-david.sock + +Inject some network errors using Pumba + +> docker run -it --rm -v /var/run/docker.sock:/var/run/docker.sock gaiaadm/pumba --interval=5m --log-level=info netem --duration 100s loss -p 100 tarpn-node-controller_david_1 \ No newline at end of file diff --git a/tests/docker/bin/launch-tarpn.sh b/tests/docker/bin/launch-tarpn.sh new file mode 100755 index 0000000..640f30d --- /dev/null +++ b/tests/docker/bin/launch-tarpn.sh @@ -0,0 +1,14 @@ +#!/bin/bash + +# Launcher script for tarpn-node. Launches some number of socat processes before running tarpn-node. +# The arguments for the socat commands are given as SOCAT_ARGS delimited by a pipe (|). + +IFS="|" +for SOCAT_ARG in $SOCAT_ARGS +do + IFS=" " + socat $SOCAT_ARG & + IFS="|" +done + +/opt/tarpn/bin/tarpn-node From dc83cd55df484a1331a3b884e2664389e4880ad6 Mon Sep 17 00:00:00 2001 From: David Arthur Date: Mon, 3 Jan 2022 20:08:23 -0500 Subject: [PATCH 02/16] Add test docker image --- .github/workflows/build.yml | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 8f54634..9055ed9 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -96,5 +96,12 @@ jobs: with: push: false tags: tarpn/tarpn-core:latest + - name: Docker test build + id: docker_build + uses: docker/build-push-action@v2 + with: + push: false + file: tests/docker/Dockerfile + tags: tarpn/tarpn-test:latest From 0e928f7c8519babb3c69cf75c8b2cf3915cb301b Mon Sep 17 00:00:00 2001 From: David Arthur Date: Mon, 3 Jan 2022 20:18:55 -0500 Subject: [PATCH 03/16] Split into two workflows --- .github/workflows/build.yml | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 9055ed9..4d2b13a 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -90,12 +90,19 @@ jobs: uses: docker/setup-qemu-action@v1 with: platforms: amd64,arm,arm64 + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v1 - name: Docker build id: docker_build uses: docker/build-push-action@v2 with: push: false tags: tarpn/tarpn-core:latest + cache-from: type=registry,ref=user/app:latest + cache-to: type=inline + docker-test: + runs-on: ubuntu-latest + steps: - name: Docker test build id: docker_build uses: docker/build-push-action@v2 @@ -103,5 +110,7 @@ jobs: push: false file: tests/docker/Dockerfile tags: tarpn/tarpn-test:latest + cache-from: type=registry,ref=user/app:latest + cache-to: type=inline From 4a8f4e84c895505967a247d3393eb121282e7647 Mon Sep 17 00:00:00 2001 From: David Arthur Date: Mon, 3 Jan 2022 20:21:22 -0500 Subject: [PATCH 04/16] Fix cache name --- .github/workflows/build.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 4d2b13a..1e32d1e 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -98,7 +98,7 @@ jobs: with: push: false tags: tarpn/tarpn-core:latest - cache-from: type=registry,ref=user/app:latest + cache-from: type=registry,ref=tarpn/tarpn-core:latest cache-to: type=inline docker-test: runs-on: ubuntu-latest @@ -110,7 +110,7 @@ jobs: push: false file: tests/docker/Dockerfile tags: tarpn/tarpn-test:latest - cache-from: type=registry,ref=user/app:latest + cache-from: type=registry,ref=tarpn/tarpn-core:latest cache-to: type=inline From d7789d8e33ff7f9b625c0ba8a5ce502ef5477e9c Mon Sep 17 00:00:00 2001 From: David Arthur Date: Mon, 3 Jan 2022 20:50:52 -0500 Subject: [PATCH 05/16] Try registry cache --- .github/workflows/build.yml | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 1e32d1e..8c8bf4b 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -98,11 +98,13 @@ jobs: with: push: false tags: tarpn/tarpn-core:latest - cache-from: type=registry,ref=tarpn/tarpn-core:latest - cache-to: type=inline + cache-from: type=registry,ref=tarpn/tarpn-core:buildcache + cache-to: type=registry,ref=tarpn/tarpn-core:buildcache,mode=max docker-test: runs-on: ubuntu-latest + needs: docker-build steps: + - uses: actions/checkout@v2 - name: Docker test build id: docker_build uses: docker/build-push-action@v2 @@ -110,7 +112,7 @@ jobs: push: false file: tests/docker/Dockerfile tags: tarpn/tarpn-test:latest - cache-from: type=registry,ref=tarpn/tarpn-core:latest - cache-to: type=inline + cache-from: type=registry,ref=tarpn/tarpn-core:buildcache + cache-to: type=registry,ref=tarpn/tarpn-core:buildcache,mode=max From fd1408b9918dd5ac6197b8e2560265d96385772e Mon Sep 17 00:00:00 2001 From: David Arthur Date: Mon, 3 Jan 2022 20:55:50 -0500 Subject: [PATCH 06/16] Try inline again --- .github/workflows/build.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 8c8bf4b..ea1891a 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -98,8 +98,8 @@ jobs: with: push: false tags: tarpn/tarpn-core:latest - cache-from: type=registry,ref=tarpn/tarpn-core:buildcache - cache-to: type=registry,ref=tarpn/tarpn-core:buildcache,mode=max + cache-from: type=registry,ref=tarpn/tarpn-core:latest + cache-to: type=inline docker-test: runs-on: ubuntu-latest needs: docker-build @@ -112,7 +112,7 @@ jobs: push: false file: tests/docker/Dockerfile tags: tarpn/tarpn-test:latest - cache-from: type=registry,ref=tarpn/tarpn-core:buildcache - cache-to: type=registry,ref=tarpn/tarpn-core:buildcache,mode=max + cache-from: type=inline + cache-to: type=inline From a1efaa426adb31712b045c5deb2db9cb6d5a9a03 Mon Sep 17 00:00:00 2001 From: David Arthur Date: Mon, 3 Jan 2022 20:59:13 -0500 Subject: [PATCH 07/16] fixup test dockerfile --- .github/workflows/build.yml | 2 +- tests/docker/Dockerfile | 10 +--------- 2 files changed, 2 insertions(+), 10 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index ea1891a..4a79ca3 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -112,7 +112,7 @@ jobs: push: false file: tests/docker/Dockerfile tags: tarpn/tarpn-test:latest - cache-from: type=inline + cache-from: type=registry,ref=tarpn/tarpn-core:latest cache-to: type=inline diff --git a/tests/docker/Dockerfile b/tests/docker/Dockerfile index 8e3b5ac..ba11292 100644 --- a/tests/docker/Dockerfile +++ b/tests/docker/Dockerfile @@ -1,17 +1,9 @@ -FROM python:3.7-slim-bullseye +FROM tarpn/tarpn-core:latest RUN apt-get update; apt-get install -y socat netcat iproute2 -RUN pip install virtualenv - -RUN python3 -m virtualenv /opt/tarpn - -COPY dist /dist - COPY tests/docker/bin /opt/tarpn/bin WORKDIR /opt/tarpn -RUN ./bin/pip install /dist/*.whl - CMD [ "./bin/tarpn-node" ] From f3743a11f8629212391d285ed0ef0a9802907086 Mon Sep 17 00:00:00 2001 From: David Arthur Date: Mon, 3 Jan 2022 21:10:37 -0500 Subject: [PATCH 08/16] Trying a different approach --- .github/workflows/build.yml | 15 ++++++++------- tests/docker/Dockerfile | 10 +++++++++- 2 files changed, 17 insertions(+), 8 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 4a79ca3..8e29261 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -81,7 +81,7 @@ jobs: prerelease: true env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - docker-build: + docker-release-build: runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 @@ -98,21 +98,22 @@ jobs: with: push: false tags: tarpn/tarpn-core:latest - cache-from: type=registry,ref=tarpn/tarpn-core:latest - cache-to: type=inline - docker-test: + docker-source-build: runs-on: ubuntu-latest - needs: docker-build steps: - uses: actions/checkout@v2 + - name: Set up QEMU + id: qemu + uses: docker/setup-qemu-action@v1 + with: + platforms: amd64,arm,arm64 - name: Docker test build id: docker_build uses: docker/build-push-action@v2 with: push: false + context: . file: tests/docker/Dockerfile tags: tarpn/tarpn-test:latest - cache-from: type=registry,ref=tarpn/tarpn-core:latest - cache-to: type=inline diff --git a/tests/docker/Dockerfile b/tests/docker/Dockerfile index ba11292..8e3b5ac 100644 --- a/tests/docker/Dockerfile +++ b/tests/docker/Dockerfile @@ -1,9 +1,17 @@ -FROM tarpn/tarpn-core:latest +FROM python:3.7-slim-bullseye RUN apt-get update; apt-get install -y socat netcat iproute2 +RUN pip install virtualenv + +RUN python3 -m virtualenv /opt/tarpn + +COPY dist /dist + COPY tests/docker/bin /opt/tarpn/bin WORKDIR /opt/tarpn +RUN ./bin/pip install /dist/*.whl + CMD [ "./bin/tarpn-node" ] From 9eab045445f88e03688280cc6c0f60200cd36396 Mon Sep 17 00:00:00 2001 From: David Arthur Date: Mon, 3 Jan 2022 21:13:56 -0500 Subject: [PATCH 09/16] add missing buildx --- .github/workflows/build.yml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 8e29261..e27f636 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -107,7 +107,9 @@ jobs: uses: docker/setup-qemu-action@v1 with: platforms: amd64,arm,arm64 - - name: Docker test build + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v1 + - name: Docker build id: docker_build uses: docker/build-push-action@v2 with: From daf7ad78fceead88ca0900e596f9d104236b94c0 Mon Sep 17 00:00:00 2001 From: David Arthur Date: Mon, 3 Jan 2022 21:17:20 -0500 Subject: [PATCH 10/16] Add compile step for source image --- .github/workflows/build.yml | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index e27f636..74ec60b 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -102,6 +102,15 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 + - name: Install dependencies + run: | + python3.7 -m pip install --upgrade pip + python3.7 -m pip install virtualenv + python3.7 -m virtualenv venv + source venv/bin/activate + pip install -e .[develop] + python setup.py bdist_wheel + python setup.py sdist - name: Set up QEMU id: qemu uses: docker/setup-qemu-action@v1 From 0f8e58fc45a446664a8ba8e37fc4e84841320203 Mon Sep 17 00:00:00 2001 From: David Arthur Date: Mon, 3 Jan 2022 21:18:23 -0500 Subject: [PATCH 11/16] install python --- .github/workflows/build.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 74ec60b..89c73d4 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -104,6 +104,7 @@ jobs: - uses: actions/checkout@v2 - name: Install dependencies run: | + sudo apt-get install -y python3.7 python3.7 -m pip install --upgrade pip python3.7 -m pip install virtualenv python3.7 -m virtualenv venv From 76dab13f189311e77df72f9a0e8eaecede3b95a9 Mon Sep 17 00:00:00 2001 From: David Arthur Date: Mon, 3 Jan 2022 21:19:21 -0500 Subject: [PATCH 12/16] fix python name --- .github/workflows/build.yml | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 89c73d4..fe2e47d 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -104,10 +104,9 @@ jobs: - uses: actions/checkout@v2 - name: Install dependencies run: | - sudo apt-get install -y python3.7 - python3.7 -m pip install --upgrade pip - python3.7 -m pip install virtualenv - python3.7 -m virtualenv venv + python3 -m pip install --upgrade pip + python3 -m pip install virtualenv + python3 -m virtualenv venv source venv/bin/activate pip install -e .[develop] python setup.py bdist_wheel From e5da53b0613bdb9c6022b4e454af6ec286930a52 Mon Sep 17 00:00:00 2001 From: David Arthur Date: Mon, 3 Jan 2022 21:21:52 -0500 Subject: [PATCH 13/16] add buildx platforms --- .github/workflows/build.yml | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index fe2e47d..3b7042c 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -88,8 +88,6 @@ jobs: - name: Set up QEMU id: qemu uses: docker/setup-qemu-action@v1 - with: - platforms: amd64,arm,arm64 - name: Set up Docker Buildx uses: docker/setup-buildx-action@v1 - name: Docker build @@ -98,6 +96,10 @@ jobs: with: push: false tags: tarpn/tarpn-core:latest + platforms: | + linux/amd64 + linux/arm/v7 + linux/arm64 docker-source-build: runs-on: ubuntu-latest steps: @@ -126,5 +128,9 @@ jobs: context: . file: tests/docker/Dockerfile tags: tarpn/tarpn-test:latest + platforms: | + linux/amd64 + linux/arm/v7 + linux/arm64 From 6e8a258ae89d93d129633701bd5e221b0b606fa3 Mon Sep 17 00:00:00 2001 From: David Arthur Date: Mon, 3 Jan 2022 21:36:29 -0500 Subject: [PATCH 14/16] Add docker compose file for network testing --- setup.py | 3 +- tarpn/main.py | 2 +- tarpn/network/mesh/protocol.py | 27 +++++++----- tarpn/tools/shell.py | 74 ++++++++++++++++++++++++++++++++ tarpn/transport/mesh_l4.py | 14 +++--- tests/docker/Dockerfile | 2 +- tests/docker/README.md | 5 ++- tests/docker/bin/launch-tarpn.sh | 5 +++ tests/docker/config/alice.ini | 43 +++++++++++++++++++ tests/docker/config/bob.ini | 35 +++++++++++++++ tests/docker/config/carol.ini | 48 +++++++++++++++++++++ tests/docker/config/network.yml | 0 tests/docker/docker-compose.yml | 27 ++++++++++++ tests/docker/smoke-test.bats | 0 14 files changed, 266 insertions(+), 19 deletions(-) create mode 100644 tarpn/tools/shell.py create mode 100644 tests/docker/config/alice.ini create mode 100644 tests/docker/config/bob.ini create mode 100644 tests/docker/config/carol.ini create mode 100644 tests/docker/config/network.yml create mode 100644 tests/docker/docker-compose.yml create mode 100644 tests/docker/smoke-test.bats diff --git a/setup.py b/setup.py index e8967e3..e3cd1fe 100644 --- a/setup.py +++ b/setup.py @@ -49,7 +49,8 @@ def read_file_contents(path): 'tarpn-serial-dump = tarpn.tools.serial_dump:main', 'tarpn-packet-dump = tarpn.tools.packet_dump:main', 'tarpn-node = tarpn.main:main', - 'tarpn-app = tarpn.app.runner:main' + 'tarpn-app = tarpn.app.runner:main', + 'tarpn-shell = tarpn.tools.shell:main' ]}, python_requires='>=3.7', install_requires=[ diff --git a/tarpn/main.py b/tarpn/main.py index 2ce26e1..d532ced 100644 --- a/tarpn/main.py +++ b/tarpn/main.py @@ -182,7 +182,7 @@ def run_node(args): scheduler.submit(UnixServerThread(app_config.app_socket(), app_protocol)) multiplexer_protocol = partial(MultiplexingProtocol, app_multiplexer) # TODO bind or connect? - mesh_l4.connect(multiplexer_protocol, app_address.address, MeshAddress.parse("00.a2"), app_address.port) + mesh_l4.connect(multiplexer_protocol, app_address.address, app_address.address, app_address.port) sock = node_settings.get("node.sock") print(f"Binding node terminal to {sock}") diff --git a/tarpn/network/mesh/protocol.py b/tarpn/network/mesh/protocol.py index aebd96c..fbe71b3 100644 --- a/tarpn/network/mesh/protocol.py +++ b/tarpn/network/mesh/protocol.py @@ -177,7 +177,7 @@ def can_handle(self, protocol: int) -> bool: def pre_close(self): # Erase our neighbors and send ADVERT self.neighbors.clear() - self.our_link_state_epoch = next(self.our_link_state_epoch_generator) + self.bump_epoch(datetime.utcnow(), "closing") self.send_advertisement() time.sleep(1) # TODO better solution is to wait for L3 queue to drain in close @@ -213,6 +213,12 @@ def wakeup(self): """Wake up the main thread""" self.queue.put(None) + def bump_epoch(self, now: datetime, reason: str = ""): + if now != self.last_epoch_bump: + self.our_link_state_epoch = next(self.our_link_state_epoch_generator) + self.last_epoch_bump = now + self.debug(f"Bumping our epoch to {self.our_link_state_epoch} due to {reason}") + def check_dead_neighbors(self, now: datetime) -> int: min_deadline = self.config.get_int("mesh.dead.interval") for neighbor in list(self.neighbors.values()): @@ -224,8 +230,7 @@ def check_dead_neighbors(self, now: datetime) -> int: self.info(f"Neighbor {neighbor.address} detected as DOWN!") neighbor.state = NeighborState.DOWN - self.our_link_state_epoch = next(self.our_link_state_epoch_generator) - self.last_epoch_bump = datetime.utcnow() + self.bump_epoch(datetime.utcnow(), "dead neighbor") self.last_advert = datetime.fromtimestamp(0) # Force our advert to go out else: min_deadline = min(deadline, min_deadline) @@ -246,6 +251,7 @@ def check_epoch(self, now: datetime) -> int: for node, links in self.link_states.items(): for link in list(links): if (now - link.created).seconds > max_age: + self.info(f"Lost link to {node}") self.debug(f"Expiring link state {link} for {node}") links.remove(link) if len(links) == 0: @@ -253,11 +259,11 @@ def check_epoch(self, now: datetime) -> int: for node in to_delete: del self.link_states[node] + del self.link_state_epochs[node] deadline = int(max_age * .80) - (now - self.last_epoch_bump).seconds if deadline <= 0: - self.our_link_state_epoch = next(self.our_link_state_epoch_generator) - self.last_epoch_bump = now + self.bump_epoch(now, "max epoch age") self.send_advertisement() return int(max_age * .80) else: @@ -355,7 +361,7 @@ def handle_hello(self, link_id: int, network_header: NetworkHeader, hello: Hello now = datetime.utcnow() sender = network_header.source if network_header.source not in self.neighbors: - self.info(f"Saw new neighbor {sender} ({hello.name})") + self.info(f"New neighbor {sender} ({hello.name})") self.neighbors[sender] = Neighbor( address=sender, name=hello.name, @@ -365,8 +371,7 @@ def handle_hello(self, link_id: int, network_header: NetworkHeader, hello: Hello last_update=now, state=NeighborState.INIT ) - self.our_link_state_epoch = next(self.our_link_state_epoch_generator) - self.last_epoch_bump = now + self.bump_epoch(now, "new neighbor") else: self.neighbors[sender].neighbors = hello.neighbors self.neighbors[sender].last_seen = now @@ -374,13 +379,13 @@ def handle_hello(self, link_id: int, network_header: NetworkHeader, hello: Hello if self.our_address in hello.neighbors: delay = 100 if self.neighbors[sender].state != NeighborState.UP: - self.info(f"Neighbor {sender} is UP!") + self.info(f"Neighbor {sender} ({self.neighbors[sender].name}) is UP!") self.scheduler.timer(delay, partial(self.send_query, sender), auto_start=True) self.neighbors[sender].state = NeighborState.UP self.neighbors[sender].last_update = now delay *= 1.2 else: - self.info(f"Neighbor {sender} is initializing...") + self.debug(f"Neighbor {sender} is initializing...") self.neighbors[sender].state = NeighborState.INIT self.neighbors[sender].last_update = now @@ -411,6 +416,7 @@ def handle_advertisement(self, link_id: int, network_header: NetworkHeader, adve latest_epoch = self.link_state_epochs.get(advert.node) if latest_epoch is None: + self.info(f"New link for node {advert.node}") self.debug(f"Initializing link state for {advert.node} with epoch {advert.epoch}") update = True else: @@ -420,6 +426,7 @@ def handle_advertisement(self, link_id: int, network_header: NetworkHeader, adve self.debug(f"Updating link state for {advert.node}. " f"Received epoch is {advert.epoch}, last known was {latest_epoch}") elif epoch_cmp == 0: + self.info(f"Reset link for node {advert.node}") self.debug(f"Resetting link state for {advert.node}. " f"Received epoch is {advert.epoch}, last known was {latest_epoch}") else: diff --git a/tarpn/tools/shell.py b/tarpn/tools/shell.py new file mode 100644 index 0000000..51d7ba9 --- /dev/null +++ b/tarpn/tools/shell.py @@ -0,0 +1,74 @@ +import argparse +import asyncio +import signal +import sys +from asyncio import Protocol, Transport +from typing import Optional + +from tarpn.util import shutdown + + +class TTY(Protocol): + def __init__(self): + self.transport: Optional[Transport] = None + + def connection_made(self, transport: Transport): + self.transport = transport + sys.stderr.write(f"Connected to {transport.get_extra_info('peername')}\r\n") + sys.stderr.flush() + + def connection_lost(self, exc): + sys.stderr.write(f"Lost connection to {self.transport.get_extra_info('peername')}\r\n") + sys.stderr.flush() + self.transport = None + signal.raise_signal(signal.SIGTERM) + + def data_received(self, data: bytes) -> None: + msg = data.decode("utf-8") + for line in msg.splitlines(): + if line.startswith("Welcome to the TARPN shell") or line.startswith("(tarpn)"): + return + else: + sys.stdout.write(line.strip()) + sys.stdout.flush() + + def handle_stdin(self): + line = sys.stdin.readline() + if self.transport is not None: + if line == "": # Got a ^D + self.transport.close() + signal.raise_signal(signal.SIGTERM) + else: + line = line.strip() + self.transport.write(line + "\r\n") + else: + sys.stdout.write("Not connected\r\n") + sys.stdout.flush() + + def handle_signal(self, loop, scheduler): + if self.transport is not None: + self.transport.close() + asyncio.create_task(shutdown(loop)) + scheduler.shutdown() + + +async def async_main(): + parser = argparse.ArgumentParser(description='Open a shell to a running tarpn-core') + parser.add_argument("sock", help="Path to unix socket") + parser.add_argument("cmd", help="Command to send") + args = parser.parse_args() + + tty = TTY() + loop = asyncio.get_event_loop() + loop.add_reader(sys.stdin, tty.handle_stdin) + (transport, protocol) = await loop.create_unix_connection(lambda: tty, path=args.sock) + transport.write(args.cmd.encode("utf-8")) + await asyncio.sleep(1) + + +def main(): + asyncio.run(async_main()) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/tarpn/transport/mesh_l4.py b/tarpn/transport/mesh_l4.py index 1ab06b7..d1179d2 100644 --- a/tarpn/transport/mesh_l4.py +++ b/tarpn/transport/mesh_l4.py @@ -6,7 +6,7 @@ from tarpn.log import LoggingMixin from tarpn.network import L3Protocol, L3Address from tarpn.network.mesh import MeshAddress -from tarpn.network.mesh.header import DatagramHeader, Datagram, BroadcastHeader +from tarpn.network.mesh.header import DatagramHeader, Datagram from tarpn.transport.mesh.broadcast import BroadcastProtocol from tarpn.transport.mesh.datagram import DatagramProtocol from tarpn.transport import DatagramProtocol as DProtocol @@ -107,8 +107,8 @@ def remote_address(self) -> Optional[L3Address]: class BroadcastTransport(BTransport): """ - A channel for sending and receiving datagrams - """ + A channel for broadcasting datagrams + """ def __init__(self, network: L3Protocol, @@ -133,6 +133,7 @@ def close(self) -> None: self.closing = True def write(self, data: Any) -> None: + # Route only for the purposes of getting the MTU _, mtu = self.network.route_packet(MeshAddress.parse("ff.ff")) if isinstance(data, str): encoded_data = data.encode("utf-8") @@ -188,7 +189,10 @@ def connect(self, protocol_factory: Callable[[], DProtocol], return protocol def broadcast(self, protocol_factory: Callable[[], DProtocol], - local_address: MeshAddress, port: int) -> DProtocol: + local_address: MeshAddress, port: int) -> DProtocol: + """ + Create a broadcast transport for a given local address and port + """ if port in self.connections: if self.connections[port][0].is_closing(): del self.connections[port] @@ -196,7 +200,7 @@ def broadcast(self, protocol_factory: Callable[[], DProtocol], raise RuntimeError(f"Connection to {port} is already open") protocol = protocol_factory() transport = BroadcastTransport(self.l3_protocol, self.broadcast_protocol, self.datagram_protocol, port, - local_address, MeshAddress.parse("ff.ff")) + local_address, MeshAddress.parse("ff.ff")) protocol.connection_made(transport) self.connections[port] = (transport, protocol) return protocol diff --git a/tests/docker/Dockerfile b/tests/docker/Dockerfile index 8e3b5ac..21ff1fd 100644 --- a/tests/docker/Dockerfile +++ b/tests/docker/Dockerfile @@ -1,6 +1,6 @@ FROM python:3.7-slim-bullseye -RUN apt-get update; apt-get install -y socat netcat iproute2 +RUN apt-get update; apt-get install -y socat netcat iproute2 dnsutils RUN pip install virtualenv diff --git a/tests/docker/README.md b/tests/docker/README.md index 1307039..ca19c53 100644 --- a/tests/docker/README.md +++ b/tests/docker/README.md @@ -14,4 +14,7 @@ Connect to one of the nodes using a domain socket Inject some network errors using Pumba -> docker run -it --rm -v /var/run/docker.sock:/var/run/docker.sock gaiaadm/pumba --interval=5m --log-level=info netem --duration 100s loss -p 100 tarpn-node-controller_david_1 \ No newline at end of file +> docker run -it --rm -v /var/run/docker.sock:/var/run/docker.sock gaiaadm/pumba --interval=5m --log-level=info netem --duration 100s loss -p 100 tarpn-node-controller_david_1 + + +dig +short bob \ No newline at end of file diff --git a/tests/docker/bin/launch-tarpn.sh b/tests/docker/bin/launch-tarpn.sh index 640f30d..6de77b7 100755 --- a/tests/docker/bin/launch-tarpn.sh +++ b/tests/docker/bin/launch-tarpn.sh @@ -11,4 +11,9 @@ do IFS="|" done +if [[ -v SLEEP ]]; +then + sleep $SLEEP +fi + /opt/tarpn/bin/tarpn-node diff --git a/tests/docker/config/alice.ini b/tests/docker/config/alice.ini new file mode 100644 index 0000000..a1a01c4 --- /dev/null +++ b/tests/docker/config/alice.ini @@ -0,0 +1,43 @@ +[default] +mycall=AL1CE + +[node] +log.dir = /tmp/tarpn-logs-alice +log.config = config/logging.ini + +node.call = ${mycall}-9 +node.alias = ALICE +node.sock = /tmp/socks/tarpn-shell-alice.sock + + +[port:1] +port.enabled = True +port.type = serial +port.framing = kiss +port.bitrate = 9600 +kiss.checksum = false +serial.device = /tmp/vmodem_A0 +serial.speed = 9600 + +[port:2] +port.enabled = True +port.type = serial +port.framing = kiss +port.bitrate = 9600 +kiss.checksum = false +serial.device = /tmp/vmodem_B1 +serial.speed = 9600 + +[network] +host.name = alice +mesh.enabled = True +mesh.address = 00.ab +mesh.ttl = 7 + +[app:demo] +app.address = mesh://ff.ff:100 +app.call = ${mycall}-7 +app.alias = DEMO +app.sock = /tmp/socks/tarpn-demo-a.sock +app.module = plugins.demo +app.class = DemoApp diff --git a/tests/docker/config/bob.ini b/tests/docker/config/bob.ini new file mode 100644 index 0000000..e0524c0 --- /dev/null +++ b/tests/docker/config/bob.ini @@ -0,0 +1,35 @@ +[default] +mycall=B0B + +[node] +log.dir = /tmp/tarpn-logs-bob +log.config = config/logging.ini + +node.call = ${mycall}-9 +node.alias = BOB +node.sock = /tmp/socks/tarpn-shell-bob.sock + + +[port:1] +port.enabled = True +port.type = serial +port.framing = kiss +port.bitrate = 9600 +kiss.checksum = false +serial.device = /tmp/vmodem_A1 +serial.speed = 9600 + +[network] +host.name = bob +mesh.enabled = True +mesh.address = 00.aa +mesh.ttl = 7 + + +[app:demo] +app.address = mesh://ff.ff:100 +app.call = ${mycall}-7 +app.alias = DEMO +app.sock = /tmp/socks/tarpn-demo-b.sock +app.module = plugins.demo +app.class = DemoApp \ No newline at end of file diff --git a/tests/docker/config/carol.ini b/tests/docker/config/carol.ini new file mode 100644 index 0000000..9b3dc9e --- /dev/null +++ b/tests/docker/config/carol.ini @@ -0,0 +1,48 @@ +[default] +mycall=C4ROL + +[node] +log.dir = /tmp/tarpn-logs-carol +log.config = config/logging.ini + +node.call = ${mycall}-9 +node.alias = CAROL +node.sock = /tmp/tarpn-shell-carol.sock + +[port:1] +port.enabled = True +port.type = serial +port.framing = kiss +kiss.checksum = false +serial.device = /tmp/vmodem_B0 +serial.speed = 9600 + +[port:2] +port.enabled = False +port.type = serial +port.framing = kiss +kiss.checksum = false +serial.device = /tmp/vmodem_C1 +serial.speed = 9600 + +[port:3] +port.enabled = False +port.type = serial +port.framing = kiss +kiss.checksum = false +serial.device = /tmp/vmodem_G1 +serial.speed = 9600 + +[network] +host.name = carol +mesh.enabled = True +mesh.address = 00.ac +mesh.ttl = 7 + +[app:demo] +app.address = mesh://ff.ff:100 +app.call = ${mycall}-7 +app.alias = DEMO +app.sock = /tmp/socks/tarpn-demo-c.sock +app.module = plugins.demo +app.class = DemoApp diff --git a/tests/docker/config/network.yml b/tests/docker/config/network.yml new file mode 100644 index 0000000..e69de29 diff --git a/tests/docker/docker-compose.yml b/tests/docker/docker-compose.yml new file mode 100644 index 0000000..f92b32b --- /dev/null +++ b/tests/docker/docker-compose.yml @@ -0,0 +1,27 @@ +services: + alice: + image: tarpn/tarpn-test:latest + environment: + - SOCAT_ARGS=PTY,raw,echo=1,link=/tmp/vmodem_A0 udp:bob:12345|PTY,raw,echo=1,link=/tmp/vmodem_B1 udp-listen:10000 + - SLEEP=3 + volumes: + - ./config/alice.ini:/opt/tarpn/config/node.ini + - /tmp/socks/:/tmp/socks/ + command: /opt/tarpn/bin/launch-tarpn.sh + bob: + image: tarpn/tarpn-test:latest + environment: + - SOCAT_ARGS=PTY,raw,echo=1,link=/tmp/vmodem_A1 udp-listen:12345 + volumes: + - ./config/bob.ini:/opt/tarpn/config/node.ini + - /tmp/socks/:/tmp/socks/ + command: /opt/tarpn/bin/launch-tarpn.sh + carol: + image: tarpn/tarpn-test:latest + environment: + - SOCAT_ARGS=PTY,raw,echo=1,link=/tmp/vmodem_B0 udp:alice:10000 + - SLEEP=1 + volumes: + - ./config/carol.ini:/opt/tarpn/config/node.ini + - /tmp/socks/:/tmp/socks/ + command: /opt/tarpn/bin/launch-tarpn.sh diff --git a/tests/docker/smoke-test.bats b/tests/docker/smoke-test.bats new file mode 100644 index 0000000..e69de29 From 38e27ca97270b1234a4a4d31560708084efa33bb Mon Sep 17 00:00:00 2001 From: David Arthur Date: Sun, 9 Jan 2022 21:55:15 -0500 Subject: [PATCH 15/16] Add a HTTP command interface over unix socket --- .idea/misc.xml | 3 + config/defaults.ini | 8 +- config/logging.ini | 4 +- tarpn/application/node.py | 188 ++++++++++++++++++++ tarpn/application/shell.py | 2 +- tarpn/datalink/__init__.py | 14 ++ tarpn/datalink/protocol.py | 3 +- tarpn/io/serial.py | 51 ++++-- tarpn/main.py | 27 ++- tarpn/metrics.py | 44 ++--- tarpn/network/__init__.py | 25 ++- tarpn/network/mesh/header.py | 19 ++ tarpn/network/mesh/ping.py | 11 +- tarpn/network/mesh/protocol.py | 117 ++++++++---- tarpn/scheduler.py | 6 +- tarpn/settings.py | 162 +++++++++-------- tarpn/util.py | 6 +- tests/docker/Dockerfile | 12 +- tests/docker/bin/launch-tarpn.sh | 17 +- tests/docker/bin/tc.sh | 42 +++++ tests/docker/config/alice.ini | 16 +- tests/docker/config/bob.ini | 16 +- tests/docker/config/carol.ini | 17 +- tests/docker/config/dave.ini | 33 ++++ tests/docker/config/eve.ini | 33 ++++ tests/docker/config/network.yml | 0 tests/docker/docker-compose-add-latency.yml | 14 ++ tests/docker/docker-compose-curl-tests.yml | 10 ++ tests/docker/docker-compose.yml | 59 +++++- tests/docker/smoke-test.bats | 0 tests/docker/test_curl.py | 41 +++++ 31 files changed, 795 insertions(+), 205 deletions(-) create mode 100644 tarpn/application/node.py create mode 100755 tests/docker/bin/tc.sh create mode 100644 tests/docker/config/dave.ini create mode 100644 tests/docker/config/eve.ini delete mode 100644 tests/docker/config/network.yml create mode 100644 tests/docker/docker-compose-add-latency.yml create mode 100644 tests/docker/docker-compose-curl-tests.yml delete mode 100644 tests/docker/smoke-test.bats create mode 100644 tests/docker/test_curl.py diff --git a/.idea/misc.xml b/.idea/misc.xml index 165dd0c..bcd8661 100644 --- a/.idea/misc.xml +++ b/.idea/misc.xml @@ -1,4 +1,7 @@ + + \ No newline at end of file diff --git a/config/defaults.ini b/config/defaults.ini index 60e8c36..86d35cd 100644 --- a/config/defaults.ini +++ b/config/defaults.ini @@ -9,10 +9,10 @@ node.call = ${mycall}-1 [network] mesh.ttl = 7 mesh.hello.interval = 10 -mesh.dead.interval = 30 -mesh.advert.interval = 30 -mesh.advert.max.age = 100 -mesh.query.interval = 60 +mesh.dead.interval = 40 +mesh.advert.interval = 120 +mesh.advert.max.age = 600 +mesh.query.interval = 120 [app:demo] # The 'app' configs are used to locate, run, and connect the app to the network diff --git a/config/logging.ini b/config/logging.ini index 2db0751..d31a516 100644 --- a/config/logging.ini +++ b/config/logging.ini @@ -51,7 +51,7 @@ args=(sys.stdout,) [handler_node] class=FileHandler -level=NOTSET +level=DEBUG formatter=standard args=('%(log.dir)s/node.log', 'w') @@ -80,7 +80,7 @@ formatter=standard args=('%(log.dir)s/netrom.log', 'w') [formatter_standard] -format=%(levelname)-8s %(asctime)s -- %(message)s +format=%(levelname)-8s %(asctime)s %(threadName)-8s -- %(message)s datefmt= class=logging.Formatter diff --git a/tarpn/application/node.py b/tarpn/application/node.py new file mode 100644 index 0000000..19ae744 --- /dev/null +++ b/tarpn/application/node.py @@ -0,0 +1,188 @@ +import math +import statistics +import time +import urllib +from typing import Optional, Dict + +from flask import Flask, Response, request +from flask.testing import EnvironBuilder +from pyformance import global_registry + +from tarpn.datalink import L2Queuing +from tarpn.network.mesh import MeshAddress +from tarpn.network.mesh.protocol import MeshProtocol +from tarpn.scheduler import Scheduler +from tarpn.settings import Settings +from tarpn.transport import Protocol, Transport +from tarpn.transport.mesh_l4 import MeshTransportManager + + +class TarpnCoreProtocol(Protocol): + def __init__(self, + settings: Settings, port_queues: Dict[int, L2Queuing], + network: MeshProtocol, transport_manager: MeshTransportManager, scheduler: Scheduler): + self.settings = settings + self.port_queues = port_queues + self.network = network + self.transport_manager = transport_manager + self.scheduler = scheduler + + self.transport: Optional[Transport] = None + self.app: Flask = Flask("tarpn-core") + self.register_handlers() + + def register_handlers(self): + @self.app.route("/") + def index(): + return "welcome" + + @self.app.route("/healthcheck") + def healhcheck(): + return "ok" + + @self.app.route("/ports") + def ports(): + port_data = [] + ports_settings = self.settings.port_configs() + for port, queue in self.port_queues.items(): + port_config = ports_settings[port] + port_data.append({ + "id": port, + "name": port_config.port_name(), + "mtu": queue.mtu(), + "qsize": queue.qsize(), + "config": port_config.as_dict() + }) + return {"ports": port_data} + + @self.app.route("/network") + def network(): + out = { + "address": str(self.network.our_address), + "epoch": self.network.our_link_state_epoch + } + neighbors = [] + for addr, neighbor in self.network.neighbors.items(): + neighbors.append({ + "address": str(addr), + "name": neighbor.name, + "last_seen": neighbor.last_seen, + "last_update": neighbor.last_update, + "status": str(neighbor.state) + }) + out["neighbors"] = neighbors + + nodes = [] + for node, link_states in self.network.valid_link_states().items(): + epoch = self.network.link_state_epochs.get(node) + nodes.append({ + "address": str(node), + "epoch": epoch, + "link_states": str(link_states) # TODO fix this + }) + out["nodes"] = nodes + return out + + @self.app.route("/routes") + def routes(): + out = [] + + for node, link_states in self.network.valid_link_states().items(): + epoch = self.network.link_state_epochs.get(node) + path, path_cost = self.network.route_to(node) + if len(path) == 0: + continue + link = self.network.neighbors.get(path[1]).link_id + device_id = self.network.link_multiplexer.get_link_device_id(link) + out.append({ + "address": str(node), + "epoch": epoch, + "path": [str(n) for n in path[1:]], + "port": device_id, + "cost": path_cost + }) + return {"routes": out} + + @self.app.route("/ping", methods=["POST"]) + def ping(): + address = request.args.get("address") + count = int(request.args.get("count", 3)) + run_async = request.args.get("async", None) + timeout = int(request.args.get("timeout", 1000)) + size = int(request.args.get("size", 100)) + + node = MeshAddress.parse(address) + seqs = [] + times = [] + lost = 0 + + def do_ping(): + nonlocal lost + t0 = time.time_ns() + seq = self.network.ping_protocol.send_ping(node, size=size) + found = self.network.ping_protocol.wait_for_ping(node, seq, timeout_ms=timeout) + t1 = time.time_ns() + if found: + dt = int((t1 - t0) / 1000000.) + seqs.append(seq) + times.append(dt) + else: + lost += 1 + + if run_async: + futures = [] + for _ in range(count): + futures.append(self.scheduler.run(do_ping)) + for fut in futures: + fut.result(3000) + else: + for _ in range(count): + do_ping() + + out = { + "address": address, + "seqs": seqs, + "times": times, + "lost": lost + } + if count > 1: + out["min"] = min(times) + out["max"] = max(times) + out["avg"] = statistics.mean(times) + out["stdev"] = statistics.stdev(times) + return out + + @self.app.route("/metrics") + def metrics(): + return global_registry().dump_metrics() + + def connection_made(self, transport: Transport): + self.transport = transport + + def connection_lost(self, exc): + self.transport = None + + def data_received(self, data: bytes): + raw_lines = data.splitlines() + first = raw_lines[0].decode("ascii") + parts = first.split(" ") + verb, path, http_version = parts[0:3] + url_parts = urllib.parse.urlparse(path) + # TODO parse headers? + builder = EnvironBuilder(app=self.app, + path=url_parts.path, + query_string=url_parts.query, + method=verb, + data=b"\r\n".join(raw_lines[2:]), + content_type="text/plain") + env = builder.get_environ() + + with self.app.request_context(env): + resp: Response = self.app.full_dispatch_request() + buf = bytearray() + buf.extend(f"HTTP/1.1 {resp.status}".encode("ISO-8859-1")) + buf.extend(b"\r\n") + buf.extend(str(resp.headers).encode("ISO-8859-1")) + if resp.data: + buf.extend(resp.data) + self.transport.write(buf) # TODO transport was None? diff --git a/tarpn/application/shell.py b/tarpn/application/shell.py index a425ba3..34a314a 100644 --- a/tarpn/application/shell.py +++ b/tarpn/application/shell.py @@ -8,9 +8,9 @@ from tarpn.network.mesh import MeshAddress from tarpn.network.mesh.protocol import MeshProtocol +from tarpn.transport import DatagramProtocol as DProtocol from tarpn.transport import Protocol, Transport, L4Address from tarpn.transport.mesh_l4 import MeshTransportManager -from tarpn.transport import DatagramProtocol as DProtocol class ShellDatagramProtocol(DProtocol): diff --git a/tarpn/datalink/__init__.py b/tarpn/datalink/__init__.py index acb43ac..86e1374 100644 --- a/tarpn/datalink/__init__.py +++ b/tarpn/datalink/__init__.py @@ -52,6 +52,12 @@ def take_inbound(self) -> Tuple[FrameData, int]: """ raise NotImplementedError + def qsize(self) -> int: + raise NotImplementedError + + def mtu(self) -> int: + raise NotImplementedError + def close(self): raise NotImplementedError @@ -104,6 +110,12 @@ def __init__(self, queue_size, max_payload_size): self._lock: threading.Lock = threading.Lock() self._not_empty: threading.Condition = threading.Condition(self._lock) + def qsize(self) -> int: + return self._outbound.maxsize + + def mtu(self) -> int: + return self._max_payload_size + def offer_outbound(self, frame: FrameData) -> bool: if len(frame.data) > self._max_payload_size: self.error(f"Payload too large, dropping. Size is {len(frame.data)}, max is {self._max_payload_size}") @@ -134,6 +146,7 @@ def offer_inbound(self, frame: FrameData) -> None: # Since the deque is bounded, this will eject old items to make room for this frame with self._lock: self.meter("inbound", "offer").mark() + #frame.timer = self.timer("inbound", "wait").time() self._inbound.append((frame, next(self._inbound_count))) self._not_empty.notify() @@ -142,6 +155,7 @@ def take_inbound(self) -> Tuple[FrameData, int]: while not len(self._inbound): self._not_empty.wait() inbound, count = self._inbound.popleft() + #inbound.timer.stop() dropped = count - self._last_inbound - 1 self._last_inbound = count self.meter("inbound", "take").mark() diff --git a/tarpn/datalink/protocol.py b/tarpn/datalink/protocol.py index 716e236..a37043c 100644 --- a/tarpn/datalink/protocol.py +++ b/tarpn/datalink/protocol.py @@ -5,6 +5,7 @@ from tarpn.datalink import FrameData, L2Queuing, L2Address from tarpn.log import LoggingMixin +from tarpn.metrics import MetricsMixin from tarpn.network import L3Payload, L3Queueing from tarpn.scheduler import Scheduler, CloseableThreadLoop from tarpn.util import BackoffGenerator @@ -121,7 +122,7 @@ def remove_link(self, link_id: int) -> None: raise NotImplementedError -class DefaultLinkMultiplexer(LinkMultiplexer): +class DefaultLinkMultiplexer(LinkMultiplexer, MetricsMixin): def __init__(self, queue_factory: Callable[[], L3Queueing], scheduler: Scheduler): self.queue_factory = queue_factory self.link_id_counter = itertools.count() diff --git a/tarpn/io/serial.py b/tarpn/io/serial.py index ab5d75e..6c3d7c1 100644 --- a/tarpn/io/serial.py +++ b/tarpn/io/serial.py @@ -9,26 +9,36 @@ from tarpn.io import IOProtocol from tarpn.log import LoggingMixin +from tarpn.metrics import MetricsMixin from tarpn.scheduler import Scheduler, CloseableThreadLoop from tarpn.util import CountDownLatch, BackoffGenerator +class DummyLock: + def __enter__(self): + return None + + def __exit__(self, _type, value, traceback): + pass + + class SerialLoop(CloseableThreadLoop, ABC): def __init__(self, name: str, ser: serial.Serial, protocol: IOProtocol, - open_event: threading.Event, error_event: threading.Event, closed_latch: CountDownLatch): + open_event: threading.Event, error_event: threading.Event, closed_latch: CountDownLatch, lock: threading.Lock): super().__init__(name=name) self.ser = ser self.protocol = protocol self.open_event = open_event self.error_event = error_event self.closed_latch = closed_latch + self.lock = lock def close(self): super().close() self.closed_latch.countdown() -class SerialReadLoop(SerialLoop, LoggingMixin): +class SerialReadLoop(SerialLoop, LoggingMixin, MetricsMixin): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) LoggingMixin.__init__(self, @@ -38,16 +48,19 @@ def __init__(self, *args, **kwargs): def iter_loop(self): if self.open_event.wait(3.0): try: - data = self.ser.read(1024) - if len(data) > 0: - self.debug(f"Read {len(data)} bytes: {data}") - self.protocol.handle_bytes(data) + with self.lock: + data = self.ser.read(1024) + if len(data) > 0: + self.debug(f"Read {len(data)} bytes: {data}") + self.meter("bytes", "in").mark(len(data)) # TODO add port number + self.protocol.handle_bytes(data) except serial.SerialException: self.exception("Failed to read bytes from serial device") + self.meter("error").mark() self.error_event.set() -class SerialWriteLoop(SerialLoop, LoggingMixin): +class SerialWriteLoop(SerialLoop, LoggingMixin, MetricsMixin): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) LoggingMixin.__init__(self, @@ -64,26 +77,31 @@ def iter_loop(self): to_write = self.protocol.next_bytes_to_write() if to_write is not None and len(to_write) > 0: try: - self.ser.write(to_write) - self.ser.flush() - self.debug(f"Wrote {len(to_write)} bytes: {to_write}") - self.retry_backoff.reset() + with self.lock: + self.ser.write(to_write) + self.ser.flush() + self.meter("bytes", "out").mark(len(to_write)) # TODO add port number + self.debug(f"Wrote {len(to_write)} bytes: {to_write}") + self.retry_backoff.reset() except serial.SerialTimeoutException as e: self.unsent.append(to_write) t = next(self.retry_backoff) self.exception(f"Failed to write bytes to serial device, serial timed out. Retrying after {t}s.", e) + self.meter("timeout").mark() sleep(t) except serial.SerialException as e: self.exception("Failed to write bytes to serial device", e) + self.meter("error").mark() self.error_event.set() -class SerialDevice(CloseableThreadLoop, LoggingMixin): +class SerialDevice(CloseableThreadLoop, LoggingMixin, MetricsMixin): def __init__(self, protocol: IOProtocol, device_name: str, speed: int, timeout: float, + duplex: bool, scheduler: Scheduler): LoggingMixin.__init__(self) CloseableThreadLoop.__init__(self, f"Serial Device {device_name}") @@ -94,14 +112,18 @@ def __init__(self, self._ser = serial.Serial(port=None, baudrate=speed, timeout=timeout, write_timeout=timeout) self._ser.port = device_name self._closed_latch = CountDownLatch(2) + if not duplex: + self._duplex_lock = threading.Lock() + else: + self._duplex_lock = DummyLock() self._open_event = threading.Event() self._error_event = threading.Event() self._open_backoff = BackoffGenerator(0.100, 1.2, 5.000) # Submit the reader and writer threads first, so they will be shutdown first self._scheduler.submit(SerialReadLoop(f"Serial Reader {self._ser.name}", self._ser, - self._protocol, self._open_event, self._error_event, self._closed_latch)) + self._protocol, self._open_event, self._error_event, self._closed_latch, self._duplex_lock)) self._scheduler.submit(SerialWriteLoop(f"Serial Writer {self._ser.name}", self._ser, - self._protocol, self._open_event, self._error_event, self._closed_latch)) + self._protocol, self._open_event, self._error_event, self._closed_latch, self._duplex_lock)) self._scheduler.submit(self) def close(self) -> None: @@ -132,6 +154,7 @@ def iter_loop(self): try: self._ser.open() self.info(f"Opened serial port {self._device_name}") + # TODO metrics self._open_event.set() self._open_backoff.reset() except serial.SerialException as err: diff --git a/tarpn/main.py b/tarpn/main.py index d532ced..9982937 100644 --- a/tarpn/main.py +++ b/tarpn/main.py @@ -3,16 +3,20 @@ import logging.config import os import shutil +import signal import sys from functools import partial +from pyformance.reporters import ConsoleReporter + import tarpn.netrom.router from tarpn.application import TransportMultiplexer, MultiplexingProtocol, ApplicationProtocol from tarpn.application.command import NodeCommandProcessor +from tarpn.application.node import TarpnCoreProtocol from tarpn.application.shell import TarpnShellProtocol from tarpn.ax25 import AX25Call from tarpn.datalink import L2FIFOQueue -from tarpn.datalink.ax25_l2 import AX25Protocol, DefaultLinkMultiplexer, AX25Address +from tarpn.datalink.ax25_l2 import AX25Protocol, DefaultLinkMultiplexer from tarpn.datalink.protocol import L2IOLoop from tarpn.io.kiss import KISSProtocol from tarpn.io.serial import SerialDevice @@ -41,7 +45,7 @@ def main(): parser = argparse.ArgumentParser(description='Decode packets from a serial port') parser.add_argument("config", nargs="?", default="config/node.ini", help="Config file") parser.add_argument("--verbose", action="store_true", help="Enable debug logging") - parser.add_argument("--profile", action="store_true", help="Attache a profiler to the process") + parser.add_argument("--profile", action="store_true", help="Attach a profiler to the process") args = parser.parse_args() if args.profile: import cProfile @@ -112,7 +116,7 @@ def run_node(args): intercept_dests = {} interceptor = lambda frame: None - for port_config in s.port_configs(): + for port_id, port_config in s.port_configs().items(): if port_config.get_boolean("port.enabled") and port_config.get("port.type") == "serial": l2_queueing = L2FIFOQueue(20, AX25Protocol.maximum_frame_size()) port_queues[port_config.port_id()] = l2_queueing @@ -123,6 +127,7 @@ def run_node(args): port_config.get("serial.device"), port_config.get_int("serial.speed"), port_config.get_float("serial.timeout"), + port_config.get_boolean("serial.duplex", True), scheduler) scheduler.submit(L2IOLoop(l2_queueing, l2)) @@ -186,13 +191,23 @@ def run_node(args): sock = node_settings.get("node.sock") print(f"Binding node terminal to {sock}") - scheduler.submit(UnixServerThread(sock, TarpnShellProtocol(mesh_l3, mesh_l4))) + #scheduler.submit(UnixServerThread(sock, TarpnShellProtocol(mesh_l3, mesh_l4))) + scheduler.submit(UnixServerThread(sock, TarpnCoreProtocol(s, port_queues, mesh_l3, mesh_l4, scheduler))) + # Start a metrics reporter - #reporter = ConsoleReporter(reporting_interval=300) - #scheduler.timer(10_000, reporter.start, True) + #reporter = ConsoleReporter(reporting_interval=300, stream=sys.stdout) + #reporter.start() #scheduler.add_shutdown_hook(reporter.stop) + def signal_handler(signum, frame): + logger.info(f"Shutting down due to {signal.Signals(signum).name}") + scheduler.shutdown() + + signal.signal(signal.SIGTERM, signal_handler) + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGSTOP, signal_handler) + logger.info("Finished Startup") try: # Wait for all threads diff --git a/tarpn/metrics.py b/tarpn/metrics.py index 12d36aa..2060aa2 100644 --- a/tarpn/metrics.py +++ b/tarpn/metrics.py @@ -1,29 +1,29 @@ -from typing import Optional +from typing import Optional, Callable from pyformance import global_registry -from pyformance.meters import Meter, Counter, Timer +from pyformance.meters import Meter, Counter, Timer, Histogram, Gauge, CallbackGauge class MetricsMixin: - def _name(self, name): + def get_key(self, name, *extra): pkg = self.__class__.__module__ clazz = self.__class__.__qualname__ - return f"{pkg}:{clazz}:{name}" - - def meter(self, name: str, event: Optional[str] = None) -> Meter: - if event is not None: - return global_registry().meter(self._name(f"{name}.{event}")) - else: - return global_registry().meter(self._name(name)) - - def counter(self, name: str, event: Optional[str] = None) -> Counter: - if event is not None: - return global_registry().counter(self._name(f"{name}.{event}")) - else: - return global_registry().counter(self._name(name)) - - def timer(self, name: str, event: Optional[str] = None) -> Timer: - if event is not None: - return global_registry().timer(self._name(f"{name}.{event}")) - else: - return global_registry().timer(self._name(name)) + name_parts = [name] + name_parts.extend([str(e) for e in extra]) + joined_name = ".".join(name_parts) + return f"{pkg}.{clazz}:{joined_name}" + + def meter(self, name: str, *args) -> Meter: + return global_registry().meter(self.get_key(name, *args)) + + def counter(self, name: str, *args) -> Counter: + return global_registry().counter(self.get_key(name, *args)) + + def timer(self, name: str, *args) -> Timer: + return global_registry().timer(self.get_key(name, *args)) + + def hist(self, name: str, *args) -> Histogram: + return global_registry().histogram(self.get_key(name, *args)) + + def gauge(self, fun: Callable[[], float], name: str, *args) -> Gauge: + return global_registry().gauge(key=self.get_key(name, *args), gauge=CallbackGauge(fun)) \ No newline at end of file diff --git a/tarpn/network/__init__.py b/tarpn/network/__init__.py index 6e23ec5..aadddfd 100644 --- a/tarpn/network/__init__.py +++ b/tarpn/network/__init__.py @@ -3,9 +3,11 @@ from enum import IntEnum from typing import List, Optional, Tuple +from pyformance.meters.timer import TimerContext + from tarpn.datalink import L2Payload from tarpn.log import LoggingMixin -#from tarpn.transport import L4Protocol +from tarpn.metrics import MetricsMixin class QoS(IntEnum): @@ -30,6 +32,7 @@ class L3Payload: link_id: int = field(compare=False) qos: QoS = field(compare=True, default=QoS.Default) reliable: bool = field(compare=False, default=True) + timer: TimerContext = field(compare=False, default=None) class L3Queueing: @@ -40,21 +43,35 @@ def maybe_take(self) -> Optional[L3Payload]: raise NotImplementedError -class L3PriorityQueue(L3Queueing): +class L3PriorityQueue(L3Queueing, MetricsMixin, LoggingMixin): def __init__(self, max_size=20): + LoggingMixin.__init__(self) self._queue = queue.PriorityQueue(max_size) + self.max_size = 0 + self.gauge(self._queue.qsize, "qsize") + self.gauge(self.max_qsize, "max") + + def max_qsize(self) -> int: + return self.max_size def offer(self, packet: L3Payload): try: + packet.timer = self.timer("qtime").time() + self.meter("offer").mark() self._queue.put(packet, False, None) + self.max_size = max(self.max_size, self._queue.qsize()) return True except queue.Full: - print("Full!") + self.meter("full").mark() + self.error(f"Queue full, dropping {packet}") return False def maybe_take(self) -> Optional[L3Payload]: try: - return self._queue.get(True, 1.0) + packet = self._queue.get(True, 1.0) + packet.timer.stop() + self.meter("take").mark() + return packet except queue.Empty: return None diff --git a/tarpn/network/mesh/header.py b/tarpn/network/mesh/header.py index e4e4ba9..add3e00 100644 --- a/tarpn/network/mesh/header.py +++ b/tarpn/network/mesh/header.py @@ -94,10 +94,14 @@ class ControlType(IntEnum): # Up to 0x7F PING = 0x00 LOOKUP = 0x01 + UNREACHABLE = 0x02 def _missing_(cls, value): return None + def __str__(self): + return f"{self.name} ({self.value:02x})" + @dataclass(eq=True, frozen=True) class ControlHeader(Header): @@ -123,6 +127,9 @@ def encode(self, data: BytesIO): ByteUtils.write_uint8(data, self.extra_length) data.write(self.extra) + def __str__(self): + return f"CTRL {str(self.control_type)} req={self.is_request}" + @dataclass(eq=True, frozen=True) class HelloHeader(Header): @@ -147,6 +154,9 @@ def encode(self, data: BytesIO): for neighbor in self.neighbors: ByteUtils.write_uint16(data, neighbor.id) + def __str__(self): + return f"HELLO {self.name} {self.neighbors}" + @dataclass(eq=True, frozen=True) class LinkStateHeader(Header): @@ -171,6 +181,9 @@ def encode(self, data: BytesIO): ByteUtils.write_uint16(data, self.via.id) ByteUtils.write_uint8(data, self.quality) + def __repr__(self): + return f"(node={self.node} via={self.via} q={self.quality})" + @dataclass(eq=True, frozen=True) class LinkStateAdvertisementHeader(Header): @@ -206,6 +219,9 @@ def encode(self, data: BytesIO): for link_state in self.link_states: link_state.encode(data) + def __str__(self): + return f"ADVERT {self.node} epoch={self.epoch} links={self.link_states}" + @dataclass(eq=True, frozen=True) class LinkStateQueryHeader(Header): @@ -237,6 +253,9 @@ def encode(self, data: BytesIO): ByteUtils.write_uint16(data, node.id) ByteUtils.write_int8(data, epoch) + def __str__(self): + return f"QUERY {self.node} {self.epoch}" + class RecordType(IntEnum): # up to 0xFF diff --git a/tarpn/network/mesh/ping.py b/tarpn/network/mesh/ping.py index e00c9f2..f5b8d0b 100644 --- a/tarpn/network/mesh/ping.py +++ b/tarpn/network/mesh/ping.py @@ -12,7 +12,7 @@ from tarpn.network import QoS from tarpn.network.mesh import MeshAddress from tarpn.network.mesh.header import ControlHeader, ControlType, NetworkHeader, Protocol -from tarpn.util import secure_random_byte +from tarpn.util import secure_random_byte, secure_random_data @dataclasses.dataclass @@ -41,8 +41,8 @@ def __init__(self, network): self.stats: Dict[MeshAddress, PingStats] = defaultdict(PingStats) LoggingMixin.__init__(self) - def send_ping(self, node: MeshAddress) -> int: - ctrl = ControlHeader(True, ControlType.PING, 1, bytes([secure_random_byte()])) + def send_ping(self, node: MeshAddress, size: int = 1) -> int: + ctrl = ControlHeader(True, ControlType.PING, size, secure_random_data(size)) network_header = NetworkHeader( version=0, qos=QoS.Higher, @@ -59,7 +59,7 @@ def send_ping(self, node: MeshAddress) -> int: stream.seek(0) buffer = stream.read() self.stats[node].results.append( - PingResult(ctrl.extra[0], time.time_ns(), None, threading.Condition(self.mutex))) + PingResult(ctrl.extra[0], time.time_ns(), None, threading.Condition())) self.network.send(network_header, buffer) return ctrl.extra[0] @@ -81,6 +81,7 @@ def handle_ping(self, network_header: NetworkHeader, ctrl: ControlHeader): now = time.time_ns() result = self.stats.get(network_header.source).get(ctrl.extra[0]) if result is not None: + self.debug(f"Got ping response {ctrl}") result.end_ns = now with result.cond: result.cond.notify_all() @@ -95,7 +96,7 @@ def wait_for_ping(self, node: MeshAddress, seq: int, timeout_ms: int) -> Optiona if done: return result else: - self.warning(f"Timed out waiting for ping {seq} from {node}") + self.warning(f"Timed out waiting for ping {seq} from {node} after {timeout_ms}ms") return None else: self.warning(f"No ping from {node} found with seq {seq}") diff --git a/tarpn/network/mesh/protocol.py b/tarpn/network/mesh/protocol.py index fbe71b3..1653f75 100644 --- a/tarpn/network/mesh/protocol.py +++ b/tarpn/network/mesh/protocol.py @@ -3,7 +3,7 @@ import queue import threading import time -from datetime import datetime +from datetime import datetime, timedelta from functools import partial from io import BytesIO from typing import Tuple, List, Dict, Optional, Set, cast @@ -15,6 +15,7 @@ from tarpn.datalink.ax25_l2 import AX25Address from tarpn.datalink.protocol import LinkMultiplexer from tarpn.log import LoggingMixin +from tarpn.metrics import MetricsMixin from tarpn.network import L3Protocol, L3Address, L3Payload, QoS from tarpn.network.mesh import MeshAddress from tarpn.network.mesh.header import Protocol, NetworkHeader, Header, HelloHeader, LinkStateHeader, \ @@ -50,6 +51,9 @@ class NeighborState(enum.Enum): INIT = 2 # We've seen HELLO UP = 3 # We've seen ourselves in a HELLO + def __repr__(self): + return self.name + @dataclasses.dataclass class Neighbor: @@ -62,7 +66,7 @@ class Neighbor: state: NeighborState -class MeshProtocol(CloseableThreadLoop, L3Protocol, LoggingMixin): +class MeshProtocol(CloseableThreadLoop, L3Protocol, LoggingMixin, MetricsMixin): """ A simple protocol for a partially connected mesh network. @@ -87,8 +91,8 @@ def __init__(self, link_multiplexer: LinkMultiplexer, l4_handlers: L4Handlers, scheduler: Scheduler): - LoggingMixin.__init__(self, extra_func=self.log_ident) CloseableThreadLoop.__init__(self, name="MeshNetwork") + LoggingMixin.__init__(self, extra_func=self.log_ident) self.time = time self.config = config @@ -124,14 +128,21 @@ def __init__(self, self.last_hello_time = datetime.fromtimestamp(0) self.last_advert = datetime.utcnow() - self.last_query = datetime.utcnow() + self.last_query = datetime.utcnow() # TODO keep track of these per-neighbor self.last_epoch_bump = datetime.utcnow() + self.gauge(self._get_queue_idle_value, "queue", "idle", "ratio") + self.gauge(self.get_current_epoch, "epoch") + + # Start this thread a little bit in the future self.scheduler.timer(1_000, partial(self.scheduler.submit, self), True) def __repr__(self): return f"" + def get_current_epoch(self) -> int: + return self.our_link_state_epoch + def log_ident(self) -> str: return f"[MeshProtocol {self.our_address}]" @@ -186,27 +197,44 @@ def close(self): CloseableThreadLoop.close(self) def iter_loop(self) -> bool: + loop_t0 = time.time_ns() + # Check if we need to take some periodic action like sending a HELLO now = datetime.utcnow() deadline = self.deadline(now) + # self.debug(f"Getting next event with deadline {deadline}") # Now wait at most the deadline for the next action for new incoming packets try: + self.meter("queue", "deadline").mark(deadline) + qtimer = self.timer("queue", "block").time() event = self.queue.get(block=True, timeout=deadline) + qtimer.stop() if event is not None: + timer = self.timer("queue", "process").time() self.process_incoming(event) + timer.stop() return True except queue.Empty: return False + def _get_queue_idle_value(self) -> float: + """Derived metric for L3 queue idle time percentage""" + busy = self.timer("queue", "process").get_mean() + idle = self.timer("queue", "block").get_mean() + if busy == 0 or idle == 0: + return 0.0 + else: + return 1.0 * idle / (busy + idle) + def deadline(self, now: datetime) -> int: # TODO use time.time_ns instead of datetime return min([ self.check_dead_neighbors(now), self.check_hello(now), self.check_epoch(now), - self.check_advert(now), - self.check_query(now) + self.check_query(now), + self.check_advert(now) ]) def wakeup(self): @@ -215,9 +243,11 @@ def wakeup(self): def bump_epoch(self, now: datetime, reason: str = ""): if now != self.last_epoch_bump: + self.meter("epoch", "bump").mark() self.our_link_state_epoch = next(self.our_link_state_epoch_generator) self.last_epoch_bump = now self.debug(f"Bumping our epoch to {self.our_link_state_epoch} due to {reason}") + self.last_advert = datetime.fromtimestamp(0) # Force our advert to go out def check_dead_neighbors(self, now: datetime) -> int: min_deadline = self.config.get_int("mesh.dead.interval") @@ -228,10 +258,9 @@ def check_dead_neighbors(self, now: datetime) -> int: deadline = self.config.get_int("mesh.dead.interval") - (now - neighbor.last_seen).seconds if deadline <= 0: self.info(f"Neighbor {neighbor.address} detected as DOWN!") - + self.meter(f"neighbors.{neighbor.address}.down").mark() neighbor.state = NeighborState.DOWN self.bump_epoch(datetime.utcnow(), "dead neighbor") - self.last_advert = datetime.fromtimestamp(0) # Force our advert to go out else: min_deadline = min(deadline, min_deadline) return min_deadline @@ -264,7 +293,6 @@ def check_epoch(self, now: datetime) -> int: deadline = int(max_age * .80) - (now - self.last_epoch_bump).seconds if deadline <= 0: self.bump_epoch(now, "max epoch age") - self.send_advertisement() return int(max_age * .80) else: return deadline @@ -304,14 +332,24 @@ def process_incoming(self, payload: L2Payload): self.error(f"Could not decode network packet from {payload}.", e) return + # Collect some metrics + self.meter("protocols", "in", network_header.protocol.name).mark() + self.meter("bytes", "in").mark(len(payload.l3_data)) + self.meter("packets", "in").mark() + if network_header.destination in (self.our_address, self.BroadcastAddress): + self.meter("source", str(network_header.source), "in").mark() + # Handle L3 protocols first if network_header.destination == self.our_address and network_header.protocol == Protocol.CONTROL: ctrl = ControlHeader.decode(stream) - self.info(f"Got {ctrl} from {network_header.source}") + self.debug(f"< {network_header.source} {network_header.destination} {ctrl}") if ctrl.control_type == ControlType.PING: self.ping_protocol.handle_ping(network_header, ctrl) + elif ctrl.control_type == ControlType.UNREACHABLE: + self.warning(f"< UNREACHABLE") else: self.warning(f"Ignoring unsupported control packet: {ctrl.control_type}") + self.meter("drop", "unknown").mark() return if network_header.protocol == Protocol.HELLO: @@ -328,17 +366,19 @@ def process_incoming(self, payload: L2Payload): # Now decide if we should handle or drop if self.header_cache.contains(hash(network_header)): + self.meter("drop", "duplicate").mark() self.debug(f"Dropping duplicate {network_header}") return # If the packet is addressed to us, handle it if network_header.destination == self.our_address: - self.debug(f"Handling {network_header}") + # TODO check for supported protocols here? + self.debug(f"Handling L4 {network_header}") self.l4_handlers.handle_l4(network_header, network_header.protocol, stream) return if network_header.destination == self.BroadcastAddress: - self.debug(f"Handling broadcast {network_header}") + self.debug(f"Handling L4 broadcast {network_header}") self.l4_handlers.handle_l4(network_header, network_header.protocol, stream) if network_header.ttl > 1: # Decrease the TTL and re-broadcast on all links except where we heard it @@ -348,8 +388,12 @@ def process_incoming(self, payload: L2Payload): stream.seek(0) self.send(header_copy, stream.read(), exclude_link_id=payload.link_id) else: + self.meter("drop", "ttl").mark() self.debug("Not re-broadcasting due to TTL") else: + # If not addressed to us, forward it along + self.meter("packets", "forward").mark() + self.debug(f"> {network_header.source} {network_header.destination} {network_header.protocol.name} Forwarding") header_copy = dataclasses.replace(network_header, ttl=network_header.ttl - 1) stream.seek(0) header_copy.encode(stream) @@ -357,7 +401,7 @@ def process_incoming(self, payload: L2Payload): self.send(header_copy, stream.read(), exclude_link_id=payload.link_id) def handle_hello(self, link_id: int, network_header: NetworkHeader, hello: HelloHeader): - self.debug(f"Handling hello {hello}") + self.debug(f"< {hello}") now = datetime.utcnow() sender = network_header.source if network_header.source not in self.neighbors: @@ -371,27 +415,28 @@ def handle_hello(self, link_id: int, network_header: NetworkHeader, hello: Hello last_update=now, state=NeighborState.INIT ) - self.bump_epoch(now, "new neighbor") else: self.neighbors[sender].neighbors = hello.neighbors self.neighbors[sender].last_seen = now if self.our_address in hello.neighbors: - delay = 100 if self.neighbors[sender].state != NeighborState.UP: self.info(f"Neighbor {sender} ({self.neighbors[sender].name}) is UP!") - self.scheduler.timer(delay, partial(self.send_query, sender), auto_start=True) + # Set last_query to the max query interval minus a bit + self.last_query = now - timedelta(seconds=(self.config.get_int("mesh.query.interval") - 10)) + self.meter(f"neighbors.{sender}.up").mark() self.neighbors[sender].state = NeighborState.UP self.neighbors[sender].last_update = now - delay *= 1.2 + self.bump_epoch(now, "new neighbor") else: self.debug(f"Neighbor {sender} is initializing...") self.neighbors[sender].state = NeighborState.INIT self.neighbors[sender].last_update = now + self.send_hello() def send_hello(self): - self.debug("Sending Hello") hello = HelloHeader(self.config.get("host.name"), self.alive_neighbors()) + self.debug(f"> {hello}") network_header = NetworkHeader( version=0, qos=QoS.Lower, @@ -414,6 +459,8 @@ def handle_advertisement(self, link_id: int, network_header: NetworkHeader, adve if advert.node == self.our_address: return + self.debug(f"< {advert}") + latest_epoch = self.link_state_epochs.get(advert.node) if latest_epoch is None: self.info(f"New link for node {advert.node}") @@ -468,7 +515,7 @@ def send_advertisement(self): link_states=link_states ) - self.debug("Sending Advertisement {}".format(advertisement)) + self.debug(f"> {advertisement}") network_header = NetworkHeader( version=0, @@ -489,7 +536,7 @@ def send_advertisement(self): self.send(network_header, buffer) def handle_query(self, link_id: int, network_header: NetworkHeader, query: LinkStateQueryHeader): - self.debug(f"Handling {query}") + self.debug(f"< {query}") dest = network_header.source adverts = [] @@ -536,7 +583,7 @@ def handle_query(self, link_id: int, network_header: NetworkHeader, query: LinkS dest = network_header.source for advert in adverts: - self.debug(f"Sending {advert} advert to {network_header.source}") + self.debug(f"> {advert}") resp_header = NetworkHeader( version=0, qos=QoS.Lower, @@ -556,7 +603,6 @@ def handle_query(self, link_id: int, network_header: NetworkHeader, query: LinkS self.send(resp_header, buffer) def send_query(self, neighbor: MeshAddress): - self.debug(f"Querying {neighbor} for link states") known_link_states = dict() for node, link_states in self.valid_link_states().items(): known_link_states[node] = self.link_state_epochs[node] @@ -567,6 +613,7 @@ def send_query(self, neighbor: MeshAddress): link_nodes=list(known_link_states.keys()), link_epochs=list(known_link_states.values()) ) + self.debug(f"> {query}") network_header = NetworkHeader( version=0, @@ -586,7 +633,7 @@ def send_query(self, neighbor: MeshAddress): buffer = stream.read() self.send(network_header, buffer) - def route_to(self, address: MeshAddress) -> List[MeshAddress]: + def route_to(self, address: MeshAddress) -> Tuple[List[MeshAddress], int]: g = nx.DiGraph() # Other's links for node, link_states in self.valid_link_states().items(): @@ -600,12 +647,12 @@ def route_to(self, address: MeshAddress) -> List[MeshAddress]: try: # Compute the shortest path path = nx.dijkstra_path(g, self.our_address, address) - + path_weight = nx.path_weight(g, path, "weight") # Ensure we have a return path nx.dijkstra_path(g, address, self.our_address) - return path + return path, path_weight except NetworkXException: - return [] + return [], 0 def send(self, header: NetworkHeader, buffer: bytes, exclude_link_id: Optional[int] = None): """ @@ -623,21 +670,31 @@ def send(self, header: NetworkHeader, buffer: bytes, exclude_link_id: Optional[i neighbor = self.neighbors.get(header.destination) links = [neighbor.link_id] else: - best_route = self.route_to(header.destination) - self.debug(f"Routing {header} via {best_route}") + best_route, cost = self.route_to(header.destination) + self.debug(f"Routing {header} via {best_route} with cost {cost}") if len(best_route) > 1: next_hop = best_route[1] hop_neighbor = self.neighbors.get(next_hop) if hop_neighbor is not None: links = [hop_neighbor.link_id] + self.hist("route", "cost").add(cost) + self.hist("route", "cost", str(header.destination)).add(cost) else: self.error(f"Calculated route including {next_hop}, but we're missing that neighbor.") links = [] else: self.warning(f"No route to {header.destination}, dropping.") + self.meter("drop", "no-route").mark() + self.meter("drop", "no-route", str(header.destination)).mark() links = [] for link_id in links: + self.meter("protocols", "out", header.protocol.name).mark() + self.meter("bytes", "out").mark(len(buffer)) + self.meter("packets", "out").mark() + self.hist("packets", "ttl").add(header.ttl) + self.meter("source", str(header.source), "out").mark() + self.meter("dest", str(header.destination), "out").mark() payload = L3Payload( source=header.source, destination=header.destination, @@ -646,7 +703,7 @@ def send(self, header: NetworkHeader, buffer: bytes, exclude_link_id: Optional[i link_id=link_id, qos=QoS(header.qos), reliable=False) - self.debug(f"Sending {payload}") + #self.debug(f"Sending {payload}") self.link_multiplexer.offer(payload) def failed_send(self, neighbor: MeshAddress): @@ -662,7 +719,7 @@ def route_packet(self, address: L3Address) -> Tuple[bool, int]: if address == MeshProtocol.BroadcastAddress: return True, l3_mtu else: - path = self.route_to(cast(MeshAddress, address)) + path, cost = self.route_to(cast(MeshAddress, address)) return len(path) > 0, l3_mtu def send_packet(self, payload: L3Payload) -> bool: diff --git a/tarpn/scheduler.py b/tarpn/scheduler.py index 36936b4..f909e09 100644 --- a/tarpn/scheduler.py +++ b/tarpn/scheduler.py @@ -59,8 +59,10 @@ def submit(self, thread: CloseableThread): thread.start() self.threads.append(thread) - def run(self, runnable: Callable[..., Any]): - self._futures.append(self.executor.submit(runnable)) + def run(self, runnable: Callable[..., Any]) -> Future: + fut = self.executor.submit(runnable) + self._futures.append(fut) + return fut def add_shutdown_hook(self, runnable: Callable[..., Any]): self.shutdown_tasks.append(runnable) diff --git a/tarpn/settings.py b/tarpn/settings.py index ee700eb..7461970 100644 --- a/tarpn/settings.py +++ b/tarpn/settings.py @@ -23,7 +23,8 @@ _default_port_settings = { "port.enabled": True, - "serial.timeout": 0.100 + "serial.timeout": 0.100, + "serial.duplex": True } @@ -38,81 +39,6 @@ def _default_basedir(app_name): return os.path.expanduser(os.path.join("~", "." + app_name.lower())) -class Settings: - def __init__(self, basedir: str = None, paths: List[str] = None, defaults: Dict = None): - self._init_basedir(basedir) - self._configfiles = [os.path.join(self._basedir, path) for path in paths] - self._config: Optional[configparser.ConfigParser] = None - if defaults is None: - self._defaults = dict() - else: - self._defaults = defaults - self.load() - - def _init_basedir(self, basedir): - if basedir is not None: - self._basedir = basedir - else: - self._basedir = _default_basedir("TARPN") - - if not os.path.isdir(self._basedir): - try: - os.makedirs(self._basedir) - except Exception: - print(f"Could not create base folder at {self._basedir}. This is a fatal error, TARPN " - "cannot run without a writable base folder.") - raise - - def load(self): - self._config = configparser.ConfigParser(defaults=self._defaults, - interpolation=configparser.ExtendedInterpolation(), - inline_comment_prefixes=";", - default_section="default") - self._config.read_dict(_default_settings) - for path in self._configfiles: - if os.path.exists(path): - self._config.read(path) - else: - raise RuntimeError(f"No such config file {path}") - - def save(self): - # self._config.write() - return - - def node_config(self): - return NodeConfig(self._config["node"]) - - def port_configs(self): - ports = [] - for section in self._config.sections(): - m = re.match(r"port:(\d+)", section) - if m: - ports.append(int(m.group(1))) - port_configs = [] - for port in ports: - port_sect = self._config[f"port:{port}"] - port_configs.append(PortConfig.from_dict(port, port_sect)) - return port_configs - - def network_configs(self): - return NetworkConfig(self._config["network"]) - - def app_configs(self): - apps = [] - for section in self._config.sections(): - m = re.match(r"app:(\w[\w\d]*)", section) - if m: - apps.append(m.group(1)) - app_configs = [] - for app in apps: - app_sect = self._config[f"app:{app}"] - app_configs.append(AppConfig.from_dict(app, app_sect)) - return app_configs - - def config_section(self, name): - return Config(name, self._config[name]) - - class Config(Mapping): def __init__(self, section_name, config_section): self._section = section_name @@ -187,14 +113,17 @@ def admin_listen(self) -> str: class PortConfig(Config): - def __init__(self, port_id, port_config): + def __init__(self, port_id: int, port_config): super().__init__(f"port:{port_id}", port_config) self._port_id = port_id - def port_id(self): + def port_id(self) -> int: return self._port_id - def port_type(self): + def port_name(self) -> str: + return super().get("port.name", f"port-{self._port_id}") + + def port_type(self) -> str: return super().get("port.type") @classmethod @@ -264,3 +193,78 @@ def from_dict(cls, app_name: str, configs: dict): parser.read_dict({f"app:{app_name}": configs}) config = parser[f"app:{app_name}"] return cls(app_name, config) + + +class Settings: + def __init__(self, basedir: str = None, paths: List[str] = None, defaults: Dict = None): + self._init_basedir(basedir) + self._configfiles = [os.path.join(self._basedir, path) for path in paths] + self._config: Optional[configparser.ConfigParser] = None + if defaults is None: + self._defaults = dict() + else: + self._defaults = defaults + self.load() + + def _init_basedir(self, basedir): + if basedir is not None: + self._basedir = basedir + else: + self._basedir = _default_basedir("TARPN") + + if not os.path.isdir(self._basedir): + try: + os.makedirs(self._basedir) + except Exception: + print(f"Could not create base folder at {self._basedir}. This is a fatal error, TARPN " + "cannot run without a writable base folder.") + raise + + def load(self): + self._config = configparser.ConfigParser(defaults=self._defaults, + interpolation=configparser.ExtendedInterpolation(), + inline_comment_prefixes=";", + default_section="default") + self._config.read_dict(_default_settings) + for path in self._configfiles: + if os.path.exists(path): + self._config.read(path) + else: + raise RuntimeError(f"No such config file {path}") + + def save(self): + # self._config.write() + return + + def node_config(self): + return NodeConfig(self._config["node"]) + + def port_configs(self) -> Dict[int, PortConfig]: + ports = [] + for section in self._config.sections(): + m = re.match(r"port:(\d+)", section) + if m: + ports.append(int(m.group(1))) + port_configs = {} + for port in ports: + port_sect = self._config[f"port:{port}"] + port_configs[port] = PortConfig.from_dict(port, port_sect) + return port_configs + + def network_configs(self): + return NetworkConfig(self._config["network"]) + + def app_configs(self): + apps = [] + for section in self._config.sections(): + m = re.match(r"app:(\w[\w\d]*)", section) + if m: + apps.append(m.group(1)) + app_configs = [] + for app in apps: + app_sect = self._config[f"app:{app}"] + app_configs.append(AppConfig.from_dict(app, app_sect)) + return app_configs + + def config_section(self, name): + return Config(name, self._config[name]) diff --git a/tarpn/util.py b/tarpn/util.py index ecfbe59..58f9f76 100644 --- a/tarpn/util.py +++ b/tarpn/util.py @@ -433,4 +433,8 @@ def lollipop_compare(old_epoch: int, new_epoch: int) -> int: def secure_random_byte() -> int: - return struct.unpack(' bytes: + return secrets.token_bytes(size) \ No newline at end of file diff --git a/tests/docker/Dockerfile b/tests/docker/Dockerfile index 21ff1fd..4d2d331 100644 --- a/tests/docker/Dockerfile +++ b/tests/docker/Dockerfile @@ -1,16 +1,22 @@ FROM python:3.7-slim-bullseye -RUN apt-get update; apt-get install -y socat netcat iproute2 dnsutils +RUN apt-get update; apt-get install -y socat netcat iproute2 dnsutils curl jq procps vim + +RUN curl -sSL https://get.docker.com/ | sh RUN pip install virtualenv RUN python3 -m virtualenv /opt/tarpn +WORKDIR /opt/tarpn + +RUN ./bin/pip install pytest pytest-timeout + COPY dist /dist -COPY tests/docker/bin /opt/tarpn/bin +COPY tests/docker/bin ./bin -WORKDIR /opt/tarpn +COPY tests/docker/test_curl.py ./tests/test_curl.py RUN ./bin/pip install /dist/*.whl diff --git a/tests/docker/bin/launch-tarpn.sh b/tests/docker/bin/launch-tarpn.sh index 6de77b7..10a2009 100755 --- a/tests/docker/bin/launch-tarpn.sh +++ b/tests/docker/bin/launch-tarpn.sh @@ -4,11 +4,13 @@ # The arguments for the socat commands are given as SOCAT_ARGS delimited by a pipe (|). IFS="|" +i=0 for SOCAT_ARG in $SOCAT_ARGS do IFS=" " - socat $SOCAT_ARG & + socat -x -d -d $SOCAT_ARG > /var/log/socat-${i}.log 2>&1 & IFS="|" + i=$((i+1)) done if [[ -v SLEEP ]]; @@ -16,4 +18,15 @@ then sleep $SLEEP fi -/opt/tarpn/bin/tarpn-node +cleanup() { + killall socat +} + +trap 'cleanup' SIGTERM + +if [[ -v DEBUG ]]; +then + exec env PYTHONFAULTHANDLER=true /opt/tarpn/bin/tarpn-node --verbose +else + exec /opt/tarpn/bin/tarpn-node +fi diff --git a/tests/docker/bin/tc.sh b/tests/docker/bin/tc.sh new file mode 100755 index 0000000..c08ada3 --- /dev/null +++ b/tests/docker/bin/tc.sh @@ -0,0 +1,42 @@ +#!/bin/bash + +OPTS="" + +if [[ -z DURATION ]]; +then + echo "Missing DURATION" + exit 1 +fi + +if [[ -v LOSS ]]; +then + OPTS="$OPTS loss $LOSS" +fi + +if [[ -v DELAY ]]; +then + OPTS="$OPTS delay ${DELAY}ms ${DELAY_JITTER:-10}ms ${DELAY_CORRELATION:-20}.00" +fi + +OPTS="$OPTS rate ${RATE:-1}kbit" + +docker ps + +for CONTAINER in $CONTAINERS +do + echo "Running on $CONTAINER: tc qdisc add dev eth0 root netem $OPTS " + docker exec $CONTAINER tc qdisc add dev eth0 root netem $OPTS +done + +cleanup() { + echo "cleanup!" + for CONTAINER in $CONTAINERS + do + echo "Running on $CONTAINER: tc qdisc del dev eth0 root netem" + docker exec $CONTAINER tc qdisc del dev eth0 root netem + done +} + +trap 'cleanup' EXIT + +sleep $DURATION \ No newline at end of file diff --git a/tests/docker/config/alice.ini b/tests/docker/config/alice.ini index a1a01c4..a5b1c58 100644 --- a/tests/docker/config/alice.ini +++ b/tests/docker/config/alice.ini @@ -2,36 +2,26 @@ mycall=AL1CE [node] -log.dir = /tmp/tarpn-logs-alice +log.dir = logs log.config = config/logging.ini node.call = ${mycall}-9 node.alias = ALICE node.sock = /tmp/socks/tarpn-shell-alice.sock - [port:1] port.enabled = True port.type = serial port.framing = kiss port.bitrate = 9600 kiss.checksum = false -serial.device = /tmp/vmodem_A0 -serial.speed = 9600 - -[port:2] -port.enabled = True -port.type = serial -port.framing = kiss -port.bitrate = 9600 -kiss.checksum = false -serial.device = /tmp/vmodem_B1 +serial.device = /tmp/vmodem1 serial.speed = 9600 [network] host.name = alice mesh.enabled = True -mesh.address = 00.ab +mesh.address = 00.aa mesh.ttl = 7 [app:demo] diff --git a/tests/docker/config/bob.ini b/tests/docker/config/bob.ini index e0524c0..8aa786c 100644 --- a/tests/docker/config/bob.ini +++ b/tests/docker/config/bob.ini @@ -2,27 +2,35 @@ mycall=B0B [node] -log.dir = /tmp/tarpn-logs-bob +log.dir = logs log.config = config/logging.ini node.call = ${mycall}-9 node.alias = BOB node.sock = /tmp/socks/tarpn-shell-bob.sock - [port:1] port.enabled = True port.type = serial port.framing = kiss port.bitrate = 9600 kiss.checksum = false -serial.device = /tmp/vmodem_A1 +serial.device = /tmp/vmodem1 +serial.speed = 9600 + +[port:2] +port.enabled = True +port.type = serial +port.framing = kiss +port.bitrate = 9600 +kiss.checksum = false +serial.device = /tmp/vmodem2 serial.speed = 9600 [network] host.name = bob mesh.enabled = True -mesh.address = 00.aa +mesh.address = 00.ab mesh.ttl = 7 diff --git a/tests/docker/config/carol.ini b/tests/docker/config/carol.ini index 9b3dc9e..7c1c690 100644 --- a/tests/docker/config/carol.ini +++ b/tests/docker/config/carol.ini @@ -2,35 +2,38 @@ mycall=C4ROL [node] -log.dir = /tmp/tarpn-logs-carol +log.dir = logs log.config = config/logging.ini node.call = ${mycall}-9 node.alias = CAROL -node.sock = /tmp/tarpn-shell-carol.sock +node.sock = /tmp/socks/tarpn-shell-carol.sock [port:1] port.enabled = True port.type = serial port.framing = kiss +port.bitrate = 9600 kiss.checksum = false -serial.device = /tmp/vmodem_B0 +serial.device = /tmp/vmodem1 serial.speed = 9600 [port:2] -port.enabled = False +port.enabled = True port.type = serial port.framing = kiss +port.bitrate = 9600 kiss.checksum = false -serial.device = /tmp/vmodem_C1 +serial.device = /tmp/vmodem2 serial.speed = 9600 [port:3] -port.enabled = False +port.enabled = True port.type = serial port.framing = kiss +port.bitrate = 9600 kiss.checksum = false -serial.device = /tmp/vmodem_G1 +serial.device = /tmp/vmodem3 serial.speed = 9600 [network] diff --git a/tests/docker/config/dave.ini b/tests/docker/config/dave.ini new file mode 100644 index 0000000..96ba209 --- /dev/null +++ b/tests/docker/config/dave.ini @@ -0,0 +1,33 @@ +[default] +mycall=D4VE + +[node] +log.dir = logs +log.config = config/logging.ini + +node.call = ${mycall}-9 +node.alias = DAVE +node.sock = /tmp/socks/tarpn-shell-dave.sock + +[port:1] +port.enabled = True +port.type = serial +port.framing = kiss +port.bitrate = 9600 +kiss.checksum = false +serial.device = /tmp/vmodem1 +serial.speed = 9600 + +[network] +host.name = dave +mesh.enabled = True +mesh.address = 00.ad +mesh.ttl = 7 + +[app:demo] +app.address = mesh://ff.ff:100 +app.call = ${mycall}-7 +app.alias = DEMO +app.sock = /tmp/socks/tarpn-demo-d.sock +app.module = plugins.demo +app.class = DemoApp diff --git a/tests/docker/config/eve.ini b/tests/docker/config/eve.ini new file mode 100644 index 0000000..c77563a --- /dev/null +++ b/tests/docker/config/eve.ini @@ -0,0 +1,33 @@ +[default] +mycall=E4E + +[node] +log.dir = logs +log.config = config/logging.ini + +node.call = ${mycall}-9 +node.alias = EVE +node.sock = /tmp/socks/tarpn-shell-eve.sock + +[port:1] +port.enabled = True +port.type = serial +port.framing = kiss +port.bitrate = 9600 +kiss.checksum = false +serial.device = /tmp/vmodem1 +serial.speed = 9600 + +[network] +host.name = eve +mesh.enabled = True +mesh.address = 00.ae +mesh.ttl = 7 + +[app:demo] +app.address = mesh://ff.ff:100 +app.call = ${mycall}-7 +app.alias = DEMO +app.sock = /tmp/socks/tarpn-demo-e.sock +app.module = plugins.demo +app.class = DemoApp diff --git a/tests/docker/config/network.yml b/tests/docker/config/network.yml deleted file mode 100644 index e69de29..0000000 diff --git a/tests/docker/docker-compose-add-latency.yml b/tests/docker/docker-compose-add-latency.yml new file mode 100644 index 0000000..6a2e0ef --- /dev/null +++ b/tests/docker/docker-compose-add-latency.yml @@ -0,0 +1,14 @@ +services: + tc: + image: tarpn/tarpn-test:latest + environment: + - DURATION=300 + #- LOSS=5 + - DELAY=100 + - CONTAINERS=alice bob carol dave eve + cap_add: + - NET_ADMIN + volumes: + - /tmp/socks/:/tmp/socks/ + - /var/run/docker.sock:/var/run/docker.sock + entrypoint: /opt/tarpn/bin/tc.sh diff --git a/tests/docker/docker-compose-curl-tests.yml b/tests/docker/docker-compose-curl-tests.yml new file mode 100644 index 0000000..c1a2152 --- /dev/null +++ b/tests/docker/docker-compose-curl-tests.yml @@ -0,0 +1,10 @@ +services: + test: + container_name: test + depends_on: + alice: + condition: service_healthy + image: tarpn/tarpn-test:latest + volumes: + - /tmp/socks/:/tmp/socks/ + command: /opt/tarpn/bin/pytest /opt/tarpn/tests/test_curl.py diff --git a/tests/docker/docker-compose.yml b/tests/docker/docker-compose.yml index f92b32b..57af3c6 100644 --- a/tests/docker/docker-compose.yml +++ b/tests/docker/docker-compose.yml @@ -1,27 +1,76 @@ services: alice: + # Connected to bob image: tarpn/tarpn-test:latest + container_name: alice + cap_add: + - NET_ADMIN environment: - - SOCAT_ARGS=PTY,raw,echo=1,link=/tmp/vmodem_A0 udp:bob:12345|PTY,raw,echo=1,link=/tmp/vmodem_B1 udp-listen:10000 - - SLEEP=3 + - SOCAT_ARGS=PTY,raw,echo=1,link=/tmp/vmodem1 udp-listen:12345 + - SLEEP=1 + - DEBUG=true volumes: - ./config/alice.ini:/opt/tarpn/config/node.ini - /tmp/socks/:/tmp/socks/ command: /opt/tarpn/bin/launch-tarpn.sh + healthcheck: + test: [ "CMD", "curl", "--unix-socket", "/tmp/socks/tarpn-shell-alice.sock", "http://dummy/healthcheck" ] + interval: 10s + timeout: 5s + retries: 5 bob: + # Connected to alice and carol image: tarpn/tarpn-test:latest + container_name: bob + cap_add: + - NET_ADMIN environment: - - SOCAT_ARGS=PTY,raw,echo=1,link=/tmp/vmodem_A1 udp-listen:12345 + - SOCAT_ARGS=PTY,raw,echo=1,link=/tmp/vmodem1 udp:alice:12345|PTY,raw,echo=1,link=/tmp/vmodem2 udp:carol:10000 + - SLEEP=2 + - DEBUG=true volumes: - ./config/bob.ini:/opt/tarpn/config/node.ini - /tmp/socks/:/tmp/socks/ command: /opt/tarpn/bin/launch-tarpn.sh carol: + # Connected to alice and dave and eve image: tarpn/tarpn-test:latest + container_name: carol + cap_add: + - NET_ADMIN environment: - - SOCAT_ARGS=PTY,raw,echo=1,link=/tmp/vmodem_B0 udp:alice:10000 - - SLEEP=1 + - SOCAT_ARGS=PTY,raw,echo=1,link=/tmp/vmodem1 udp-listen:10000|PTY,raw,echo=1,link=/tmp/vmodem2 udp-listen:10001|PTY,raw,echo=1,link=/tmp/vmodem3 udp:eve:10002 + - SLEEP=3 + - DEBUG=true volumes: - ./config/carol.ini:/opt/tarpn/config/node.ini - /tmp/socks/:/tmp/socks/ command: /opt/tarpn/bin/launch-tarpn.sh + dave: + # Connected to carol + image: tarpn/tarpn-test:latest + container_name: dave + cap_add: + - NET_ADMIN + environment: + - SOCAT_ARGS=PTY,raw,echo=1,link=/tmp/vmodem1 udp:carol:10001 + - SLEEP=1 + - DEBUG=true + volumes: + - ./config/dave.ini:/opt/tarpn/config/node.ini + - /tmp/socks/:/tmp/socks/ + command: /opt/tarpn/bin/launch-tarpn.sh + eve: + # Connected to carol + image: tarpn/tarpn-test:latest + container_name: eve + cap_add: + - NET_ADMIN + environment: + - SOCAT_ARGS=PTY,raw,echo=1,link=/tmp/vmodem1 udp-listen:10002 + - SLEEP=2 + - DEBUG=true + volumes: + - ./config/eve.ini:/opt/tarpn/config/node.ini + - /tmp/socks/:/tmp/socks/ + command: /opt/tarpn/bin/launch-tarpn.sh \ No newline at end of file diff --git a/tests/docker/smoke-test.bats b/tests/docker/smoke-test.bats deleted file mode 100644 index e69de29..0000000 diff --git a/tests/docker/test_curl.py b/tests/docker/test_curl.py new file mode 100644 index 0000000..9a73895 --- /dev/null +++ b/tests/docker/test_curl.py @@ -0,0 +1,41 @@ +import json +import shlex +import subprocess +import unittest + +import pytest + + +def curl(sock: str, url: str): + command = f"curl --retry 5 --max-time 10 --retry-delay 0 --retry-max-time 40 -v --unix-socket {sock} {url}" + result = subprocess.run(shlex.split(command), stdout=subprocess.PIPE) + return result.stdout + + +class DockerTest(unittest.TestCase): + def test_alice_alive(self): + result = curl("/tmp/socks/tarpn-shell-alice.sock", "http://dummy/healthcheck") + assert result == b"ok" + + def test_bob_alive(self): + result = curl("/tmp/socks/tarpn-shell-bob.sock", "http://dummy/healthcheck") + assert result == b"ok" + + def test_carol_alive(self): + result = curl("/tmp/socks/tarpn-shell-carol.sock", "http://dummy/healthcheck") + assert result == b"ok" + + @pytest.mark.timeout(120) + def test_convergence(self): + done = False + while not done: + alice_net = curl("/tmp/socks/tarpn-shell-alice.sock", "http://dummy/network") + bob_net = curl("/tmp/socks/tarpn-shell-bob.sock", "http://dummy/network") + carol_net = curl("/tmp/socks/tarpn-shell-carol.sock", "http://dummy/network") + + alice_node_data = json.loads(alice_net).get("nodes", []) + bob_node_data = json.loads(bob_net).get("nodes", []) + carol_node_data = json.loads(carol_net).get("nodes", []) + + if len(alice_node_data) == 2 and len(bob_node_data) == 2 and len(carol_node_data) == 2: + break From f85508db11eca23d4f67e358803a52a9fb0ebb06 Mon Sep 17 00:00:00 2001 From: David Arthur Date: Sun, 9 Jan 2022 22:06:35 -0500 Subject: [PATCH 16/16] Fix docker readme --- tests/docker/README.md | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/docker/README.md b/tests/docker/README.md index ca19c53..910f528 100644 --- a/tests/docker/README.md +++ b/tests/docker/README.md @@ -8,13 +8,13 @@ Define a network as done in docker-compose.yml, run it > docker-compose -f tests/docker/docker-compose.yml up -Connect to one of the nodes using a domain socket +In a separate shell, run this docker-compose to simulate slow links -> docker run -i -t -v /tmp/socks/:/tmp/socks/ tarpn/tarpn-core:latest nc -U /tmp/socks/tarpn-shell-david.sock +> docker-compose -f tests/docker/docker-compose-add-latency.yml up -Inject some network errors using Pumba +Now, connect to the unix socket with curl from (yet another) docker container to inspect a node -> docker run -it --rm -v /var/run/docker.sock:/var/run/docker.sock gaiaadm/pumba --interval=5m --log-level=info netem --duration 100s loss -p 100 tarpn-node-controller_david_1 +> docker run -v /tmp/socks:/tmp/socks tarpn/tarpn-test:latest curl --unix-socket /tmp/socks/tarpn-shell-alice.sock "http://dummy/metrics" - -dig +short bob \ No newline at end of file +The reason for running curl within a container is that there are some issues with a Mac host and linux Docker container +sharing a domain socket. Not sure if it's curl or the socket itself.