Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve stability #3

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
.venv/
.vscode/
__pycache__/

# configuration files
Expand Down
12 changes: 6 additions & 6 deletions docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,28 +15,28 @@ RUN apt-get -y update && apt-get -y upgrade && apt-get -y install \

# install BlueZ with mesh support
WORKDIR /opt/build
COPY ./scripts/install-ell.sh .
COPY ./docker/scripts/install-ell.sh .
RUN sh ./install-ell.sh

WORKDIR /opt/build
COPY ./scripts/install-json-c.sh .
COPY ./docker/scripts/install-json-c.sh .
RUN sh ./install-json-c.sh

WORKDIR /opt/build
COPY ./scripts/install-bluez.sh .
COPY ./docker/scripts/install-bluez.sh .
RUN sh ./install-bluez.sh

# install bridge
WORKDIR /opt/hass-ble-mesh
RUN git clone https://github.com/minims/homeassistant-bluetooth-mesh.git .
RUN git checkout master
COPY ./requirements.txt .
RUN pip3 install -r requirements.txt
COPY ./gateway ./gateway

# mount config
WORKDIR /config
VOLUME /config

# run bluetooth service and bridge
WORKDIR /opt/hass-ble-mesh/gateway
COPY ./scripts/entrypoint.sh .
COPY ./docker/scripts/entrypoint.sh .
ENTRYPOINT [ "/bin/bash", "entrypoint.sh" ]
4 changes: 3 additions & 1 deletion docker/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
version: "3.9"
services:
app:
build: .
build:
context: ..
dockerfile: docker/Dockerfile
volumes:
- ./config:/config/
restart: "always"
Expand Down
25 changes: 20 additions & 5 deletions gateway/gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ def _load_key(self, keychain, name):
self._new_keys.add(name)
try:
return bytes.fromhex(keychain[name])
except Exception as exp: # pylint: disable=broad-except
except ValueError as exp: # pylint: disable=broad-except
raise InvalidKey(f"Invalid device key: {keychain[name]} / {exp}") from exp

def _initialize(self):
Expand Down Expand Up @@ -233,6 +233,24 @@ async def run(self, args):
await tasks.gather()


async def run(args):
"""
Wrap application startup and cleanup
"""

loop = asyncio.get_event_loop()
app = MqttGateway(loop, args.basedir)

await app.run(args)

orphans = asyncio.all_tasks()
for orphan in orphans:
if orphan != asyncio.current_task():
logging.warn(f"Orphaned task {orphan}")

logging.info("Shutdown complete")


def main():
parser = argparse.ArgumentParser()
parser.add_argument("--leave", action="store_true")
Expand All @@ -248,11 +266,8 @@ def main():

args = parser.parse_args()

loop = asyncio.get_event_loop()
app = MqttGateway(loop, args.basedir)

with suppress(KeyboardInterrupt):
loop.run_until_complete(app.run(args))
asyncio.run(run(args))


if __name__ == "__main__":
Expand Down
4 changes: 3 additions & 1 deletion gateway/mesh/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@ def add(self, node):
self._nodes[str(node.uuid)] = node

def create(self, uuid, info):
self.add(self._make_node(uuid, info))
node = self._make_node(uuid, info)
self.add(node)
return node

def delete(self, uuid):
del self._nodes[str(uuid)]
Expand Down
13 changes: 12 additions & 1 deletion gateway/mesh/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,21 @@ class Node:
event interface for other application components.
"""

def __init__(self, uuid, type, unicast, count, configured=False, config=None): # pylint: disable=redefined-builtin
def __init__(
self,
uuid,
type,
unicast,
count,
device_key=None,
configured=False,
config=None,
):
self.uuid = uuid
self.type = type
self.unicast = unicast
self.count = count
self.device_key = device_key
self.configured = configured
self.config = config or Config(config={})

Expand Down Expand Up @@ -92,4 +102,5 @@ def yaml(self):
"unicast": self.unicast,
"count": self.count,
"configured": self.configured,
"device_key": self.device_key,
}
2 changes: 1 addition & 1 deletion gateway/modules/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def setup_cli(self, parser):
async def handle_cli(self, args):
try:
uuid = UUID(args.uuid)
except Exception as exp: # pylint: disable=broad-except
except (TypeError, ValueError) as exp: # pylint: disable=broad-except
logging.info(f"Invalid uuid: {exp}")
return

Expand Down
54 changes: 52 additions & 2 deletions gateway/modules/provisioner.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
"""Provisioner"""
import asyncio
import logging
import os

from uuid import UUID

from bluetooth_mesh import models
from bluetooth_mesh.crypto import DeviceKey

from mesh.composition import Composition

from . import Module

Expand Down Expand Up @@ -64,7 +69,7 @@ async def handle_cli(self, args):

try:
uuid = UUID(args.uuid)
except Exception as exp: # pylint: disable=broad-except
except (TypeError, ValueError) as exp: # pylint: disable=broad-except
logging.info(f"Invalid uuid: {uuid}: {exp}")
return

Expand All @@ -78,6 +83,11 @@ async def handle_cli(self, args):
logging.info(f"Unknown node: {node}")
return

if args.task == "import":
await self._import(node)
self.print_node_list()
return

if args.task == "config":
await self._configure(node)
return
Expand All @@ -98,6 +108,13 @@ def print_node_list(self):
for node in self.app.nodes.all():
node.print_info()

def _get_dev_key_file(self, basedir, unicast):
"""
Get local device key file (<basedir>/<uuid>/dev_keys/<unicast>)
"""

return os.path.join(basedir, self.app.uuid.hex, "dev_keys", f"{unicast:04x}")

def _request_prov_data(self, count):
"""
This method is implemented by a Provisioner capable application
Expand Down Expand Up @@ -130,14 +147,25 @@ def _add_node_complete(self, uuid, unicast, count):
"""
_uuid = UUID(bytes=uuid)

self.app.nodes.create(
node = self.app.nodes.create(
_uuid,
{
"type": "generic",
"unicast": unicast,
"count": count,
},
)

# get remote device key after provisioning
storage = self.config.optional("bluetooth.storage")
if storage is not None:
try:
# read binary device key
with open(self._get_dev_key_file(storage, unicast), "rb") as file:
node.device_key = file.read().hex()
except:
logging.exception(f"Failed to retrieve device key for {_uuid}")

self.app.nodes.persist()

logging.info(f"Provisioned {_uuid} as {unicast} ({count})")
Expand Down Expand Up @@ -166,6 +194,28 @@ async def _provision(self, uuid):
await self.app.management_interface.add_node(uuid)
await self.provisioning_done.wait()

async def _import(self, node, device_key=None):
logging.info(f"Importing node {node}...")

# get device key
if device_key is None:
device_key = node.device_key

if device_key is None:
logging.error(f"Missing device key for {node}")
return

# import existing node with device key
await self.app.management_interface.import_remote_node(
node.unicast,
node.count,
DeviceKey(bytes.fromhex(device_key)),
)

# ensure address is not reused
self.store.set("base_address", node.unicast + node.count)
self.store.persist()

async def _configure(self, node):
logging.info(f"Configuring node {node}...")

Expand Down
2 changes: 1 addition & 1 deletion gateway/modules/scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def _scan_result(self, rssi, data, options): # pylint: disable=unused-argument
self._unprovisioned.add(uuid)
logging.info(f"Found unprovisioned node: {uuid}")
logging.info(f"Options not used: {options}")
except ScanException as exp:
except (ValueError, TypeError, ScanException) as exp:
logging.exception(f"Failed to retrieve UUID: {exp}")

async def handle_cli(self, args):
Expand Down
4 changes: 2 additions & 2 deletions gateway/mqtt/bridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def _property_change(self, node, property, value): # pylint: disable=redefined-
try:
# get handler from property name
handler = getattr(self, f"_notify_{property}")
except Exception as exp: # pylint: disable=broad-except
except AttributeError as exp: # pylint: disable=broad-except
logging.warning(f"Missing handler for property {property}: {exp}")
return

Expand Down Expand Up @@ -58,7 +58,7 @@ async def listen(self, node):
try:
# get handler from command name
handler = getattr(self, f"_mqtt_{command}")
except Exception as exp: # pylint: disable=broad-except
except AttributeError as exp: # pylint: disable=broad-except
logging.warning(f"Missing handler for command {command}: {exp}")
continue

Expand Down
2 changes: 1 addition & 1 deletion gateway/mqtt/messenger.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ async def shutdown(self):

async def run(self, app):
async with AsyncExitStack() as stack:
tasks = await stack.enter_async_context(Tasks())
tasks = await stack.enter_async_context(Tasks("messenger"))

# connect to MQTT broker
await stack.enter_async_context(self._client)
Expand Down
50 changes: 36 additions & 14 deletions gateway/tools/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,43 +6,65 @@
class Tasks:
"""
Simple task pool

TODO: This class can be extended in order to manage failed tasks.
Currently failed tasks are logged but otherwise ignored.
"""

def __init__(self):
def __init__(self, name="root"):
self._name = name
self._tasks = set()

async def __aenter__(self):
return self

async def __aexit__(self, exc_type, exc, tb): # pylint: disable=invalid-name
await self._shutdown()
async def __aexit__(self, exc_type, exc, tb):
self._invoke_shutdown()

# wait until finished before exiting
for task in self._tasks:
if not task.done():
await task

logging.debug(f"{self._name} finalized")

def _invoke_shutdown(self):
logging.info(f"{self._name}: invoke shutdown...")

async def _shutdown(self):
for task in self._tasks:
if task.done():
continue

try:
# invoke tasks cancellation
task.cancel()
await task
except asyncio.CancelledError:
pass

async def _runner(self, task, name):
if name:
logging.debug(f"Spawning task to {name}...")
logging.debug(f"{self._name}: spawning task to {name}...")
try:
await task
except Exception as exp: # pylint: disable=broad-except
logging.exception(f"Task failed: {exp}")
except asyncio.CancelledError:
# graceful exit
logging.debug(f"{self._name}: {name} cancelled")
return
except:
logging.exception(f"{self._name}: {name} failed")
# force cancellation of all tasks
# depending on the configuration, this should lead to service restart
raise
if name:
logging.debug(f"{name} completed")
logging.debug(f"{self._name}: {name} completed")

def spawn(self, task, name=None):
self._tasks.add(asyncio.create_task(self._runner(task, name)))
self._tasks.add(
asyncio.create_task(
self._runner(task, name),
name=name or f"self._name + {len(self._tasks)}",
)
)

async def gather(self):
logging.info(f"Awaiting {len(self._tasks)} tasks")
logging.info(f"{self._name}: awaiting {len(self._tasks)} tasks")

# wait until all tasks are completed or an exception is caught
await asyncio.gather(*self._tasks)
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
asyncio-mqtt==0.12.1
bitstring==3.1.9
black==22.10.0
git+https://github.com/minims/python-bluetooth-mesh@d6b2bd17bf768f85f2faa45dec739435a5719d4f
git+https://github.com/minims/python-bluetooth-mesh@d813d4860025bf28a7338681229e728feeeed851
cffi==1.15.1
construct==2.9.45
crc==0.3.0
Expand Down