Skip to content

Commit

Permalink
Add a HTTP command interface over unix socket
Browse files Browse the repository at this point in the history
  • Loading branch information
mumrah committed Jan 10, 2022
1 parent 6e8a258 commit 38e27ca
Show file tree
Hide file tree
Showing 31 changed files with 795 additions and 205 deletions.
3 changes: 3 additions & 0 deletions .idea/misc.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions config/defaults.ini
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ node.call = ${mycall}-1
[network]
mesh.ttl = 7
mesh.hello.interval = 10
mesh.dead.interval = 30
mesh.advert.interval = 30
mesh.advert.max.age = 100
mesh.query.interval = 60
mesh.dead.interval = 40
mesh.advert.interval = 120
mesh.advert.max.age = 600
mesh.query.interval = 120

[app:demo]
# The 'app' configs are used to locate, run, and connect the app to the network
Expand Down
4 changes: 2 additions & 2 deletions config/logging.ini
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ args=(sys.stdout,)

[handler_node]
class=FileHandler
level=NOTSET
level=DEBUG
formatter=standard
args=('%(log.dir)s/node.log', 'w')

Expand Down Expand Up @@ -80,7 +80,7 @@ formatter=standard
args=('%(log.dir)s/netrom.log', 'w')

[formatter_standard]
format=%(levelname)-8s %(asctime)s -- %(message)s
format=%(levelname)-8s %(asctime)s %(threadName)-8s -- %(message)s
datefmt=
class=logging.Formatter

Expand Down
188 changes: 188 additions & 0 deletions tarpn/application/node.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
import math
import statistics
import time
import urllib
from typing import Optional, Dict

from flask import Flask, Response, request
from flask.testing import EnvironBuilder
from pyformance import global_registry

from tarpn.datalink import L2Queuing
from tarpn.network.mesh import MeshAddress
from tarpn.network.mesh.protocol import MeshProtocol
from tarpn.scheduler import Scheduler
from tarpn.settings import Settings
from tarpn.transport import Protocol, Transport
from tarpn.transport.mesh_l4 import MeshTransportManager


class TarpnCoreProtocol(Protocol):
def __init__(self,
settings: Settings, port_queues: Dict[int, L2Queuing],
network: MeshProtocol, transport_manager: MeshTransportManager, scheduler: Scheduler):
self.settings = settings
self.port_queues = port_queues
self.network = network
self.transport_manager = transport_manager
self.scheduler = scheduler

self.transport: Optional[Transport] = None
self.app: Flask = Flask("tarpn-core")
self.register_handlers()

def register_handlers(self):
@self.app.route("/")
def index():
return "welcome"

@self.app.route("/healthcheck")
def healhcheck():
return "ok"

@self.app.route("/ports")
def ports():
port_data = []
ports_settings = self.settings.port_configs()
for port, queue in self.port_queues.items():
port_config = ports_settings[port]
port_data.append({
"id": port,
"name": port_config.port_name(),
"mtu": queue.mtu(),
"qsize": queue.qsize(),
"config": port_config.as_dict()
})
return {"ports": port_data}

@self.app.route("/network")
def network():
out = {
"address": str(self.network.our_address),
"epoch": self.network.our_link_state_epoch
}
neighbors = []
for addr, neighbor in self.network.neighbors.items():
neighbors.append({
"address": str(addr),
"name": neighbor.name,
"last_seen": neighbor.last_seen,
"last_update": neighbor.last_update,
"status": str(neighbor.state)
})
out["neighbors"] = neighbors

nodes = []
for node, link_states in self.network.valid_link_states().items():
epoch = self.network.link_state_epochs.get(node)
nodes.append({
"address": str(node),
"epoch": epoch,
"link_states": str(link_states) # TODO fix this
})
out["nodes"] = nodes
return out

@self.app.route("/routes")
def routes():
out = []

for node, link_states in self.network.valid_link_states().items():
epoch = self.network.link_state_epochs.get(node)
path, path_cost = self.network.route_to(node)
if len(path) == 0:
continue
link = self.network.neighbors.get(path[1]).link_id
device_id = self.network.link_multiplexer.get_link_device_id(link)
out.append({
"address": str(node),
"epoch": epoch,
"path": [str(n) for n in path[1:]],
"port": device_id,
"cost": path_cost
})
return {"routes": out}

@self.app.route("/ping", methods=["POST"])
def ping():
address = request.args.get("address")
count = int(request.args.get("count", 3))
run_async = request.args.get("async", None)
timeout = int(request.args.get("timeout", 1000))
size = int(request.args.get("size", 100))

node = MeshAddress.parse(address)
seqs = []
times = []
lost = 0

def do_ping():
nonlocal lost
t0 = time.time_ns()
seq = self.network.ping_protocol.send_ping(node, size=size)
found = self.network.ping_protocol.wait_for_ping(node, seq, timeout_ms=timeout)
t1 = time.time_ns()
if found:
dt = int((t1 - t0) / 1000000.)
seqs.append(seq)
times.append(dt)
else:
lost += 1

if run_async:
futures = []
for _ in range(count):
futures.append(self.scheduler.run(do_ping))
for fut in futures:
fut.result(3000)
else:
for _ in range(count):
do_ping()

out = {
"address": address,
"seqs": seqs,
"times": times,
"lost": lost
}
if count > 1:
out["min"] = min(times)
out["max"] = max(times)
out["avg"] = statistics.mean(times)
out["stdev"] = statistics.stdev(times)
return out

@self.app.route("/metrics")
def metrics():
return global_registry().dump_metrics()

def connection_made(self, transport: Transport):
self.transport = transport

def connection_lost(self, exc):
self.transport = None

def data_received(self, data: bytes):
raw_lines = data.splitlines()
first = raw_lines[0].decode("ascii")
parts = first.split(" ")
verb, path, http_version = parts[0:3]
url_parts = urllib.parse.urlparse(path)
# TODO parse headers?
builder = EnvironBuilder(app=self.app,
path=url_parts.path,
query_string=url_parts.query,
method=verb,
data=b"\r\n".join(raw_lines[2:]),
content_type="text/plain")
env = builder.get_environ()

with self.app.request_context(env):
resp: Response = self.app.full_dispatch_request()
buf = bytearray()
buf.extend(f"HTTP/1.1 {resp.status}".encode("ISO-8859-1"))
buf.extend(b"\r\n")
buf.extend(str(resp.headers).encode("ISO-8859-1"))
if resp.data:
buf.extend(resp.data)
self.transport.write(buf) # TODO transport was None?
2 changes: 1 addition & 1 deletion tarpn/application/shell.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@

from tarpn.network.mesh import MeshAddress
from tarpn.network.mesh.protocol import MeshProtocol
from tarpn.transport import DatagramProtocol as DProtocol
from tarpn.transport import Protocol, Transport, L4Address
from tarpn.transport.mesh_l4 import MeshTransportManager
from tarpn.transport import DatagramProtocol as DProtocol


class ShellDatagramProtocol(DProtocol):
Expand Down
14 changes: 14 additions & 0 deletions tarpn/datalink/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ def take_inbound(self) -> Tuple[FrameData, int]:
"""
raise NotImplementedError

def qsize(self) -> int:
raise NotImplementedError

def mtu(self) -> int:
raise NotImplementedError

def close(self):
raise NotImplementedError

Expand Down Expand Up @@ -104,6 +110,12 @@ def __init__(self, queue_size, max_payload_size):
self._lock: threading.Lock = threading.Lock()
self._not_empty: threading.Condition = threading.Condition(self._lock)

def qsize(self) -> int:
return self._outbound.maxsize

def mtu(self) -> int:
return self._max_payload_size

def offer_outbound(self, frame: FrameData) -> bool:
if len(frame.data) > self._max_payload_size:
self.error(f"Payload too large, dropping. Size is {len(frame.data)}, max is {self._max_payload_size}")
Expand Down Expand Up @@ -134,6 +146,7 @@ def offer_inbound(self, frame: FrameData) -> None:
# Since the deque is bounded, this will eject old items to make room for this frame
with self._lock:
self.meter("inbound", "offer").mark()
#frame.timer = self.timer("inbound", "wait").time()
self._inbound.append((frame, next(self._inbound_count)))
self._not_empty.notify()

Expand All @@ -142,6 +155,7 @@ def take_inbound(self) -> Tuple[FrameData, int]:
while not len(self._inbound):
self._not_empty.wait()
inbound, count = self._inbound.popleft()
#inbound.timer.stop()
dropped = count - self._last_inbound - 1
self._last_inbound = count
self.meter("inbound", "take").mark()
Expand Down
3 changes: 2 additions & 1 deletion tarpn/datalink/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from tarpn.datalink import FrameData, L2Queuing, L2Address
from tarpn.log import LoggingMixin
from tarpn.metrics import MetricsMixin
from tarpn.network import L3Payload, L3Queueing
from tarpn.scheduler import Scheduler, CloseableThreadLoop
from tarpn.util import BackoffGenerator
Expand Down Expand Up @@ -121,7 +122,7 @@ def remove_link(self, link_id: int) -> None:
raise NotImplementedError


class DefaultLinkMultiplexer(LinkMultiplexer):
class DefaultLinkMultiplexer(LinkMultiplexer, MetricsMixin):
def __init__(self, queue_factory: Callable[[], L3Queueing], scheduler: Scheduler):
self.queue_factory = queue_factory
self.link_id_counter = itertools.count()
Expand Down
Loading

0 comments on commit 38e27ca

Please sign in to comment.