Skip to content

Commit

Permalink
Check ports allocation for NS8 Modules (#742)
Browse files Browse the repository at this point in the history
* Print validation error reason to stderr
   The message is sent to the system journal and also in the API error
   stream, which is visible in the Python exception raised by
   agent.allocate_ports(), too.

* deallocate_ports(): remove ModuleNotFoundError
   Return an empty list if the module has no ports allocation.
   Suppress module-not-found warnings when a module is removed.

Refs NethServer/dev#7092

Co-authored-by: Davide Principi <[email protected]>
  • Loading branch information
tommaso-ascani and DavidePrincipi authored Nov 6, 2024
1 parent 6701c75 commit a724dba
Show file tree
Hide file tree
Showing 8 changed files with 137 additions and 21 deletions.
65 changes: 52 additions & 13 deletions core/imageroot/usr/local/agent/pypkg/node/ports_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,6 @@ def __init__(self, message="Database operation failed."):
self.message = message
super().__init__(self.message)

class ModuleNotFoundError(PortError):
"""Exception raised when a module is not found for deallocation."""
def __init__(self, module_name, message=None):
self.module_name = module_name
if message is None:
message = f"Module '{module_name}' not found."
self.message = message
super().__init__(self.message)

class InvalidPortRequestError(PortError):
"""Exception raised when the requested number of ports is invalid."""
def __init__(self, message="The number of required ports must be at least 1."):
Expand Down Expand Up @@ -81,7 +72,7 @@ def allocate_ports(required_ports: int, module_name: str, protocol: str, keep_ex
try:
with sqlite3.connect('./ports.sqlite', isolation_level='EXCLUSIVE', timeout=30) as database:
cursor = database.cursor()
create_tables(cursor) # Ensure the tables exist
create_tables(cursor) # Ensure the tables exist

# Fetch used ports based on protocol
if protocol == 'tcp':
Expand All @@ -104,7 +95,7 @@ def allocate_ports(required_ports: int, module_name: str, protocol: str, keep_ex
if len(ports_used) == 0:
write_range(range_start, range_start + required_ports - 1, module_name, protocol, database)
return (range_start, range_start + required_ports - 1)

while range_start <= range_end:
# Check if the current port is within an already used range
for port_range in ports_used:
Expand Down Expand Up @@ -139,7 +130,7 @@ def deallocate_ports(module_name: str, protocol: str):
elif protocol == 'udp':
cursor.execute("SELECT start,end,module FROM UDP_PORTS WHERE module=?;", (module_name,))
ports_deallocated = cursor.fetchall()

if ports_deallocated:
# Delete the allocated port range for the module
if protocol == 'tcp':
Expand All @@ -150,7 +141,7 @@ def deallocate_ports(module_name: str, protocol: str):
# remove the name of the module and return a list of tuples
return [(port[0], port[1]) for port in ports_deallocated]
else:
raise ModuleNotFoundError(module_name) # Raise error if the module is not found
return []

except sqlite3.Error as e:
raise StorageError(f"Database error: {e}") from e # Raise custom database error
Expand Down Expand Up @@ -183,3 +174,51 @@ def write_range(start: int, end: int, module: str, protocol: str, database: sqli

except sqlite3.Error as e:
raise StorageError(f"Database error: {e}") from e # Raise custom database error

def get_tcp_ports_by_module(module_name: str) -> int:
"""
Get the number of TCP ports allocated to a specific module.
:param module_name: Name of the module.
:return: Number of TCP ports allocated to the module.
"""
try:
with sqlite3.connect('./ports.sqlite', isolation_level='EXCLUSIVE', timeout=30) as database:
cursor = database.cursor()
create_tables(cursor)

cursor.execute("""
SELECT SUM(end - start + 1)
FROM TCP_PORTS
WHERE module=?;
""", (module_name,))
result = cursor.fetchone()

return result[0] if result[0] is not None else 0

except sqlite3.Error as e:
raise StorageError(f"Database error: {e}") from e

def get_udp_ports_by_module(module_name: str) -> int:
"""
Get the number of UDP ports allocated to a specific module.
:param module_name: Name of the module.
:return: Number of UDP ports allocated to the module.
"""
try:
with sqlite3.connect('./ports.sqlite', isolation_level='EXCLUSIVE', timeout=30) as database:
cursor = database.cursor()
create_tables(cursor)

cursor.execute("""
SELECT SUM(end - start + 1)
FROM UDP_PORTS
WHERE module=?;
""", (module_name,))
result = cursor.fetchone()

return result[0] if result[0] is not None else 0

except sqlite3.Error as e:
raise StorageError(f"Database error: {e}") from e
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,13 @@ os.mkdir(f'/var/lib/nethserver/cluster/ui/apps/{module_id}')
os.chdir(f'/var/lib/nethserver/cluster/ui/apps/{module_id}')
agent.run_helper('extract-ui', image_url).check_returncode()

# Write on redis image ports demands
if tcp_ports_demand > 0:
rdb.hset('cluster/tcp_ports_demand', mapping={module_id: tcp_ports_demand})

if udp_ports_demand > 0:
rdb.hset('cluster/udp_ports_demand', mapping={module_id: udp_ports_demand})

# Wait for the module host to set up the module environment: it
# has to return us the module password hash
add_module_result = agent.tasks.run(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ if module_keys:
# Delete module from favorites
rdb.srem('cluster/favorites', module_id)

# Delete TCP and UDP port demands
rdb.hdel('cluster/tcp_ports_demand', module_id)
rdb.hdel('cluster/udp_ports_demand', module_id)

# Delete module authorizations and node relation
rdb.delete(f'cluster/authorizations/module/{module_id}')
rdb.hdel('cluster/module_node', module_id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,22 @@ else:

# Retrieve the image org.nethserver.authorizations label
with subprocess.Popen(['podman', 'image', 'inspect', image_url], stdout=subprocess.PIPE, stderr=sys.stderr) as proc:
inspect = json.load(proc.stdout)
inspect_labels = inspect[0]['Labels']
try:
authorizations = json.load(proc.stdout)[0]['Labels']['org.nethserver.authorizations'].split()
authorizations = inspect_labels['org.nethserver.authorizations'].split()
except:
authorizations = []

if 'org.nethserver.tcp-ports-demand' in inspect_labels:
tcp_ports_demand = int(inspect_labels['org.nethserver.tcp-ports-demand'])
else:
tcp_ports_demand = 0

if 'org.nethserver.udp-ports-demand' in inspect_labels:
udp_ports_demand = int(inspect_labels['org.nethserver.udp-ports-demand'])
else:
udp_ports_demand = 0

# Run sanity checks on the org.nethserver.authorizations label:
if not cluster.grants.check_authorizations_sanity(authorizations):
Expand All @@ -68,6 +80,17 @@ if not cluster.grants.check_authorizations_sanity(authorizations):
# Start the module update on each instance
update_tasks = []
for module_id in instances:

if tcp_ports_demand > 0:
rdb.hset('cluster/tcp_ports_demand', mapping={module_id: tcp_ports_demand})
else:
rdb.hdel('cluster/tcp_ports_demand', module_id)

if udp_ports_demand > 0:
rdb.hset('cluster/udp_ports_demand', mapping={module_id: udp_ports_demand})
else:
rdb.hdel('cluster/udp_ports_demand', module_id)

if authorizations:
# Replace existing authorizations with the new image:
rdb.delete(f'cluster/authorizations/module/{module_id}')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,55 @@ import json
import sys
import os

# Load the request from stdin
request = json.load(sys.stdin)

# Get the module environment variable
module_env = os.getenv("AGENT_TASK_USER")

# Check if the agent has permission to change the port allocation
if module_env != "" and module_env != f"module/{request['module_id']}":
print(agent.SD_ERR + f" Agent {module_env} does not have permission to change the port allocation for {request['module_id']}.", file=sys.stderr)
sys.exit(1)
agent.set_status('validation-failed')
json.dump([{'field':'module_id', 'parameter':'module_id', 'value': request['module_id'], 'error':'module_permission_denied'}], fp=sys.stdout)
sys.exit(32)

range = node.ports_manager.allocate_ports(int(request['ports']), request['module_id'], request['protocol'], request['keep_existing'])
# Connect to Redis without privileged access
rdb = agent.redis_connect(privileged=False)

json.dump(range, fp=sys.stdout)
# Get the current node ID and the module's node ID
current_node_id = os.getenv("NODE_ID")
module_node_id = rdb.hget('cluster/module_node', request['module_id'])

# Verify that the module exists
agent.assert_exp(module_node_id not in [None, ''], f"Error: Module {request['module_id']} does not exist.")

# Verify that the module is present on the current node
agent.assert_exp(module_node_id == current_node_id, f"Error: Module {request['module_id']} is not located on the current node {current_node_id}.")

# Get the ports demand based on the protocol
if request['protocol'] == 'tcp':
ports_demand = int(rdb.hget('cluster/tcp_ports_demand', request["module_id"]) or "0")
elif request['protocol'] == 'udp':
ports_demand = int(rdb.hget('cluster/udp_ports_demand', request["module_id"]) or "0")

# Get the ports used by the module if keep_existing is set to true
if request['keep_existing']:
if request['protocol'] == 'tcp':
ports_used = node.ports_manager.get_tcp_ports_by_module(request['module_id']) + request['ports']
elif request['protocol'] == 'udp':
ports_used = node.ports_manager.get_udp_ports_by_module(request['module_id']) + request['ports']
else:
ports_used = 0 + request['ports']

# Check if the total required ports exceed the allowed number of ports
if ports_used > ports_demand:
print(agent.SD_ERR + f"Cannot allocate {ports_used} {request['protocol']} ports to {request['module_id']}. Max allowed is {ports_demand}.", file=sys.stderr)
agent.set_status('validation-failed')
json.dump([{'field':'ports', 'parameter':'ports', 'value': request['ports'], 'error':'max_ports_exceeded'}], fp=sys.stdout)
sys.exit(33)

# Allocate the ports
result = node.ports_manager.allocate_ports(int(request['ports']), request['module_id'], request['protocol'], request['keep_existing'])

json.dump(result, fp=sys.stdout)
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
],
"properties": {
"ports": {
"type": "number",
"title": "Ports number",
"type": "integer",
"title": "Number of TCP/UDP ports",
"description": "How many ports will be allocated on a specific node"
},
"module_id": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@ try:

# In case of clone/move/import fail ensure rsync port deallocation
node.ports_manager.deallocate_ports(f"{request['module_id']}_rsync", 'tcp')
except node.ports_manager.ModuleNotFoundError as exc:
print(agent.SD_WARNING, exc, file=sys.stderr)
except Exception as exc:
print(agent.SD_WARNING, exc, file=sys.stderr)
2 changes: 2 additions & 0 deletions docs/core/database.md
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@ subsections for more information.
|cluster/override/modules |HASH |Maps (image name) => (image URL), overriding the standard image name/URL resolution function |
|cluster/subscription |HASH |[Subscription]({{site.baseurl}}/core/subscription) attributes in key/value pairs|
|cluster/apply_updates |HASH |[Scheduled updates]({{site.baseurl}}/core/updates) attributes in key/value pairs|
|cluster/tcp_ports_demand |HASH |Max TCP ports a module can request. Hash key is MODULE_ID, hash value is the max TCP ports.|
|cluster/udp_ports_demand |HASH |Max UDP ports a module can request. Hash key is MODULE_ID, hash value is the max UDP ports.|

#### cluster/smarthost

Expand Down

0 comments on commit a724dba

Please sign in to comment.