Skip to content

Commit

Permalink
Add docker compose file for network testing
Browse files Browse the repository at this point in the history
  • Loading branch information
mumrah committed Jan 10, 2022
1 parent e5da53b commit 6e8a258
Show file tree
Hide file tree
Showing 14 changed files with 266 additions and 19 deletions.
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ def read_file_contents(path):
'tarpn-serial-dump = tarpn.tools.serial_dump:main',
'tarpn-packet-dump = tarpn.tools.packet_dump:main',
'tarpn-node = tarpn.main:main',
'tarpn-app = tarpn.app.runner:main'
'tarpn-app = tarpn.app.runner:main',
'tarpn-shell = tarpn.tools.shell:main'
]},
python_requires='>=3.7',
install_requires=[
Expand Down
2 changes: 1 addition & 1 deletion tarpn/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ def run_node(args):
scheduler.submit(UnixServerThread(app_config.app_socket(), app_protocol))
multiplexer_protocol = partial(MultiplexingProtocol, app_multiplexer)
# TODO bind or connect?
mesh_l4.connect(multiplexer_protocol, app_address.address, MeshAddress.parse("00.a2"), app_address.port)
mesh_l4.connect(multiplexer_protocol, app_address.address, app_address.address, app_address.port)

sock = node_settings.get("node.sock")
print(f"Binding node terminal to {sock}")
Expand Down
27 changes: 17 additions & 10 deletions tarpn/network/mesh/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ def can_handle(self, protocol: int) -> bool:
def pre_close(self):
# Erase our neighbors and send ADVERT
self.neighbors.clear()
self.our_link_state_epoch = next(self.our_link_state_epoch_generator)
self.bump_epoch(datetime.utcnow(), "closing")
self.send_advertisement()
time.sleep(1) # TODO better solution is to wait for L3 queue to drain in close

Expand Down Expand Up @@ -213,6 +213,12 @@ def wakeup(self):
"""Wake up the main thread"""
self.queue.put(None)

def bump_epoch(self, now: datetime, reason: str = ""):
if now != self.last_epoch_bump:
self.our_link_state_epoch = next(self.our_link_state_epoch_generator)
self.last_epoch_bump = now
self.debug(f"Bumping our epoch to {self.our_link_state_epoch} due to {reason}")

def check_dead_neighbors(self, now: datetime) -> int:
min_deadline = self.config.get_int("mesh.dead.interval")
for neighbor in list(self.neighbors.values()):
Expand All @@ -224,8 +230,7 @@ def check_dead_neighbors(self, now: datetime) -> int:
self.info(f"Neighbor {neighbor.address} detected as DOWN!")

neighbor.state = NeighborState.DOWN
self.our_link_state_epoch = next(self.our_link_state_epoch_generator)
self.last_epoch_bump = datetime.utcnow()
self.bump_epoch(datetime.utcnow(), "dead neighbor")
self.last_advert = datetime.fromtimestamp(0) # Force our advert to go out
else:
min_deadline = min(deadline, min_deadline)
Expand All @@ -246,18 +251,19 @@ def check_epoch(self, now: datetime) -> int:
for node, links in self.link_states.items():
for link in list(links):
if (now - link.created).seconds > max_age:
self.info(f"Lost link to {node}")
self.debug(f"Expiring link state {link} for {node}")
links.remove(link)
if len(links) == 0:
to_delete.append(node)

for node in to_delete:
del self.link_states[node]
del self.link_state_epochs[node]

deadline = int(max_age * .80) - (now - self.last_epoch_bump).seconds
if deadline <= 0:
self.our_link_state_epoch = next(self.our_link_state_epoch_generator)
self.last_epoch_bump = now
self.bump_epoch(now, "max epoch age")
self.send_advertisement()
return int(max_age * .80)
else:
Expand Down Expand Up @@ -355,7 +361,7 @@ def handle_hello(self, link_id: int, network_header: NetworkHeader, hello: Hello
now = datetime.utcnow()
sender = network_header.source
if network_header.source not in self.neighbors:
self.info(f"Saw new neighbor {sender} ({hello.name})")
self.info(f"New neighbor {sender} ({hello.name})")
self.neighbors[sender] = Neighbor(
address=sender,
name=hello.name,
Expand All @@ -365,22 +371,21 @@ def handle_hello(self, link_id: int, network_header: NetworkHeader, hello: Hello
last_update=now,
state=NeighborState.INIT
)
self.our_link_state_epoch = next(self.our_link_state_epoch_generator)
self.last_epoch_bump = now
self.bump_epoch(now, "new neighbor")
else:
self.neighbors[sender].neighbors = hello.neighbors
self.neighbors[sender].last_seen = now

if self.our_address in hello.neighbors:
delay = 100
if self.neighbors[sender].state != NeighborState.UP:
self.info(f"Neighbor {sender} is UP!")
self.info(f"Neighbor {sender} ({self.neighbors[sender].name}) is UP!")
self.scheduler.timer(delay, partial(self.send_query, sender), auto_start=True)
self.neighbors[sender].state = NeighborState.UP
self.neighbors[sender].last_update = now
delay *= 1.2
else:
self.info(f"Neighbor {sender} is initializing...")
self.debug(f"Neighbor {sender} is initializing...")
self.neighbors[sender].state = NeighborState.INIT
self.neighbors[sender].last_update = now

Expand Down Expand Up @@ -411,6 +416,7 @@ def handle_advertisement(self, link_id: int, network_header: NetworkHeader, adve

latest_epoch = self.link_state_epochs.get(advert.node)
if latest_epoch is None:
self.info(f"New link for node {advert.node}")
self.debug(f"Initializing link state for {advert.node} with epoch {advert.epoch}")
update = True
else:
Expand All @@ -420,6 +426,7 @@ def handle_advertisement(self, link_id: int, network_header: NetworkHeader, adve
self.debug(f"Updating link state for {advert.node}. "
f"Received epoch is {advert.epoch}, last known was {latest_epoch}")
elif epoch_cmp == 0:
self.info(f"Reset link for node {advert.node}")
self.debug(f"Resetting link state for {advert.node}. "
f"Received epoch is {advert.epoch}, last known was {latest_epoch}")
else:
Expand Down
74 changes: 74 additions & 0 deletions tarpn/tools/shell.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import argparse
import asyncio
import signal
import sys
from asyncio import Protocol, Transport
from typing import Optional

from tarpn.util import shutdown


class TTY(Protocol):
def __init__(self):
self.transport: Optional[Transport] = None

def connection_made(self, transport: Transport):
self.transport = transport
sys.stderr.write(f"Connected to {transport.get_extra_info('peername')}\r\n")
sys.stderr.flush()

def connection_lost(self, exc):
sys.stderr.write(f"Lost connection to {self.transport.get_extra_info('peername')}\r\n")
sys.stderr.flush()
self.transport = None
signal.raise_signal(signal.SIGTERM)

def data_received(self, data: bytes) -> None:
msg = data.decode("utf-8")
for line in msg.splitlines():
if line.startswith("Welcome to the TARPN shell") or line.startswith("(tarpn)"):
return
else:
sys.stdout.write(line.strip())
sys.stdout.flush()

def handle_stdin(self):
line = sys.stdin.readline()
if self.transport is not None:
if line == "": # Got a ^D
self.transport.close()
signal.raise_signal(signal.SIGTERM)
else:
line = line.strip()
self.transport.write(line + "\r\n")
else:
sys.stdout.write("Not connected\r\n")
sys.stdout.flush()

def handle_signal(self, loop, scheduler):
if self.transport is not None:
self.transport.close()
asyncio.create_task(shutdown(loop))
scheduler.shutdown()


async def async_main():
parser = argparse.ArgumentParser(description='Open a shell to a running tarpn-core')
parser.add_argument("sock", help="Path to unix socket")
parser.add_argument("cmd", help="Command to send")
args = parser.parse_args()

tty = TTY()
loop = asyncio.get_event_loop()
loop.add_reader(sys.stdin, tty.handle_stdin)
(transport, protocol) = await loop.create_unix_connection(lambda: tty, path=args.sock)
transport.write(args.cmd.encode("utf-8"))
await asyncio.sleep(1)


def main():
asyncio.run(async_main())


if __name__ == "__main__":
main()
14 changes: 9 additions & 5 deletions tarpn/transport/mesh_l4.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from tarpn.log import LoggingMixin
from tarpn.network import L3Protocol, L3Address
from tarpn.network.mesh import MeshAddress
from tarpn.network.mesh.header import DatagramHeader, Datagram, BroadcastHeader
from tarpn.network.mesh.header import DatagramHeader, Datagram
from tarpn.transport.mesh.broadcast import BroadcastProtocol
from tarpn.transport.mesh.datagram import DatagramProtocol
from tarpn.transport import DatagramProtocol as DProtocol
Expand Down Expand Up @@ -107,8 +107,8 @@ def remote_address(self) -> Optional[L3Address]:

class BroadcastTransport(BTransport):
"""
A channel for sending and receiving datagrams
"""
A channel for broadcasting datagrams
"""

def __init__(self,
network: L3Protocol,
Expand All @@ -133,6 +133,7 @@ def close(self) -> None:
self.closing = True

def write(self, data: Any) -> None:
# Route only for the purposes of getting the MTU
_, mtu = self.network.route_packet(MeshAddress.parse("ff.ff"))
if isinstance(data, str):
encoded_data = data.encode("utf-8")
Expand Down Expand Up @@ -188,15 +189,18 @@ def connect(self, protocol_factory: Callable[[], DProtocol],
return protocol

def broadcast(self, protocol_factory: Callable[[], DProtocol],
local_address: MeshAddress, port: int) -> DProtocol:
local_address: MeshAddress, port: int) -> DProtocol:
"""
Create a broadcast transport for a given local address and port
"""
if port in self.connections:
if self.connections[port][0].is_closing():
del self.connections[port]
else:
raise RuntimeError(f"Connection to {port} is already open")
protocol = protocol_factory()
transport = BroadcastTransport(self.l3_protocol, self.broadcast_protocol, self.datagram_protocol, port,
local_address, MeshAddress.parse("ff.ff"))
local_address, MeshAddress.parse("ff.ff"))
protocol.connection_made(transport)
self.connections[port] = (transport, protocol)
return protocol
Expand Down
2 changes: 1 addition & 1 deletion tests/docker/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
FROM python:3.7-slim-bullseye

RUN apt-get update; apt-get install -y socat netcat iproute2
RUN apt-get update; apt-get install -y socat netcat iproute2 dnsutils

RUN pip install virtualenv

Expand Down
5 changes: 4 additions & 1 deletion tests/docker/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,7 @@ Connect to one of the nodes using a domain socket
Inject some network errors using Pumba

> docker run -it --rm -v /var/run/docker.sock:/var/run/docker.sock gaiaadm/pumba --interval=5m --log-level=info netem --duration 100s loss -p 100 tarpn-node-controller_david_1
> docker run -it --rm -v /var/run/docker.sock:/var/run/docker.sock gaiaadm/pumba --interval=5m --log-level=info netem --duration 100s loss -p 100 tarpn-node-controller_david_1

dig +short bob
5 changes: 5 additions & 0 deletions tests/docker/bin/launch-tarpn.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,9 @@ do
IFS="|"
done

if [[ -v SLEEP ]];
then
sleep $SLEEP
fi

/opt/tarpn/bin/tarpn-node
43 changes: 43 additions & 0 deletions tests/docker/config/alice.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
[default]
mycall=AL1CE

[node]
log.dir = /tmp/tarpn-logs-alice
log.config = config/logging.ini

node.call = ${mycall}-9
node.alias = ALICE
node.sock = /tmp/socks/tarpn-shell-alice.sock


[port:1]
port.enabled = True
port.type = serial
port.framing = kiss
port.bitrate = 9600
kiss.checksum = false
serial.device = /tmp/vmodem_A0
serial.speed = 9600

[port:2]
port.enabled = True
port.type = serial
port.framing = kiss
port.bitrate = 9600
kiss.checksum = false
serial.device = /tmp/vmodem_B1
serial.speed = 9600

[network]
host.name = alice
mesh.enabled = True
mesh.address = 00.ab
mesh.ttl = 7

[app:demo]
app.address = mesh://ff.ff:100
app.call = ${mycall}-7
app.alias = DEMO
app.sock = /tmp/socks/tarpn-demo-a.sock
app.module = plugins.demo
app.class = DemoApp
35 changes: 35 additions & 0 deletions tests/docker/config/bob.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
[default]
mycall=B0B

[node]
log.dir = /tmp/tarpn-logs-bob
log.config = config/logging.ini

node.call = ${mycall}-9
node.alias = BOB
node.sock = /tmp/socks/tarpn-shell-bob.sock


[port:1]
port.enabled = True
port.type = serial
port.framing = kiss
port.bitrate = 9600
kiss.checksum = false
serial.device = /tmp/vmodem_A1
serial.speed = 9600

[network]
host.name = bob
mesh.enabled = True
mesh.address = 00.aa
mesh.ttl = 7


[app:demo]
app.address = mesh://ff.ff:100
app.call = ${mycall}-7
app.alias = DEMO
app.sock = /tmp/socks/tarpn-demo-b.sock
app.module = plugins.demo
app.class = DemoApp
Loading

0 comments on commit 6e8a258

Please sign in to comment.