Skip to content

Commit

Permalink
feat: add method to list next systemd time executions
Browse files Browse the repository at this point in the history
This will eventually resolve something similar to #96 for systemd
timers.
  • Loading branch information
languitar committed Jul 24, 2022
1 parent 5a1fa6f commit 74a0aee
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 2 deletions.
35 changes: 34 additions & 1 deletion src/autosuspend/util/systemd.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import Iterable, Tuple, TYPE_CHECKING
from datetime import datetime, timedelta, timezone
from typing import Dict, Iterable, Optional, Tuple, TYPE_CHECKING


if TYPE_CHECKING:
Expand Down Expand Up @@ -43,3 +44,35 @@ def list_logind_sessions() -> Iterable[Tuple[str, dict]]:
raise LogindDBusException(error) from error

return results


def next_timer_executions() -> Dict[str, datetime]:
import dbus

bus = _get_bus()

systemd = bus.get_object("org.freedesktop.systemd1", "/org/freedesktop/systemd1")
units = systemd.ListUnits(dbus_interface="org.freedesktop.systemd1.Manager")
timers = [unit for unit in units if unit[0].endswith(".timer")]

result: Dict[str, datetime] = {}
for timer in timers:
obj = bus.get_object("org.freedesktop.systemd1", timer[6])
properties_interface = dbus.Interface(obj, "org.freedesktop.DBus.Properties")
props = properties_interface.GetAll("org.freedesktop.systemd1.Timer")

next_time: Optional[datetime] = None
if props["NextElapseUSecRealtime"]:
next_time = datetime.fromtimestamp(
props["NextElapseUSecRealtime"] / 1000000,
tz=timezone.utc,
)
elif props["NextElapseUSecMonotonic"]:
next_time = datetime.now(tz=timezone.utc) + timedelta(
seconds=props["NextElapseUSecMonotonic"] / 1000000
)

if next_time:
result[str(props["Unit"])] = next_time

return result
14 changes: 13 additions & 1 deletion tests/test_util_systemd.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
from dbus.proxies import ProxyObject
import pytest

from autosuspend.util.systemd import list_logind_sessions, LogindDBusException
from autosuspend.util.systemd import (
list_logind_sessions,
LogindDBusException,
next_timer_executions,
)


def test_list_logind_sessions_empty(logind: ProxyObject) -> None:
Expand All @@ -17,3 +21,11 @@ def test_list_logind_sessions_empty(logind: ProxyObject) -> None:
def test_list_logind_sessions_dbus_error() -> None:
with pytest.raises(LogindDBusException):
list_logind_sessions()


def test_next_timer_executions() -> None:
pytest.importorskip("dbus")
pytest.importorskip("gi")
# no working dbus mock interface exists for list units. Therefore, this is not easy
# to test.
assert next_timer_executions() is not None

0 comments on commit 74a0aee

Please sign in to comment.