From ef629dd050237644a7bde69a17b9e425ca6a9eba Mon Sep 17 00:00:00 2001 From: "Dr. Phil Maffettone" <43007690+maffettone@users.noreply.github.com> Date: Fri, 10 May 2024 07:40:52 -0700 Subject: [PATCH] Maint: python version, CI/CD, tiled dependency (#40) * add: optional plan factory to override measurement_plan * maint: fix Tiled imports for typing * test: update CI/CD testing with bluesky image and python versions * maint: tiled[client] only in requirements * fix: ghcr tag --- .github/workflows/tests.yml | 18 +++++------- bluesky_adaptive/agents/base.py | 29 ++++++++++++++----- bluesky_adaptive/tests/test_adjudicators.py | 11 +++---- bluesky_adaptive/tests/test_sklearn_agents.py | 23 ++++++++------- requirements-dev.txt | 13 +++++++-- requirements.txt | 2 +- 6 files changed, 60 insertions(+), 36 deletions(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 36e63f8..3862262 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -11,23 +11,21 @@ on: jobs: unit-tests: - runs-on: ubuntu-latest strategy: - matrix: - python-version: ["3.9"] # TODO: expand versions + matrix: + python-version: ["3.9", "3.10", "3.11", "3.12"] - fail-fast: false - steps: + fail-fast: false + steps: - uses: actions/checkout@v4 - # TODO (maffettone): Change host after CD into bluesky/bluesky-pods - name: Download and build bluesky-pods run: | - docker pull ghcr.io/maffettone/bluesky-pods-bluesky:latest - docker tag ghcr.io/maffettone/bluesky-pods-bluesky:latest bluesky:latest + docker pull ghcr.io/bluesky/bluesky-pods-bluesky:main + docker tag ghcr.io/bluesky/bluesky-pods-bluesky:main bluesky:latest - name: Start Bluesky containers run: | @@ -84,10 +82,10 @@ jobs: qserver environment open qserver permissions reload - - name: Test with pytest shell: bash -l {0} run: | set -vxeuo pipefail - coverage run -m pytest -s -v + coverage run -m pytest -v coverage report + diff --git a/bluesky_adaptive/agents/base.py b/bluesky_adaptive/agents/base.py index 681bf82..d1ac9da 100644 --- a/bluesky_adaptive/agents/base.py +++ b/bluesky_adaptive/agents/base.py @@ -12,7 +12,6 @@ import msgpack import numpy as np import tiled -import tiled.client.node from bluesky_kafka import Publisher, RemoteDispatcher from bluesky_queueserver_api import BPlan from bluesky_queueserver_api.api_threads import API_Threads_Mixin @@ -207,9 +206,9 @@ class Agent(ABC): kafka messages to trigger agent directives. kafka_producer : Optional[Publisher] Bluesky Kafka publisher to produce document stream of agent actions for optional Adjudicator. - tiled_data_node : tiled.client.node.Node + tiled_data_node : tiled.client.container.Container Tiled node to serve as source of data (BlueskyRuns) for the agent. - tiled_agent_node : tiled.client.node.Node + tiled_agent_node : tiled.client.container.Container Tiled node to serve as storage for the agent documents. qserver : bluesky_queueserver_api.api_threads.API_Threads_Mixin Object to manage communication with Queue Server @@ -247,8 +246,8 @@ def __init__( self, *, kafka_consumer: AgentConsumer, - tiled_data_node: tiled.client.node.Node, - tiled_agent_node: tiled.client.node.Node, + tiled_data_node: tiled.client.container.Container, + tiled_agent_node: tiled.client.container.Container, qserver: API_Threads_Mixin, kafka_producer: Optional[Publisher], agent_run_suffix: Optional[str] = None, @@ -521,7 +520,13 @@ def _write_event(self, stream, doc, uid=None): return event_doc["uid"] def _add_to_queue( - self, next_points, uid, re_manager=None, position: Optional[Union[int, Literal["front", "back"]]] = None + self, + next_points, + uid, + *, + re_manager=None, + position: Optional[Union[int, Literal["front", "back"]]] = None, + plan_factory: Optional[Callable] = None, ): """ Adds a single set of points to the queue as bluesky plans @@ -535,12 +540,16 @@ def _add_to_queue( Defaults to self.re_manager position : Optional[Union[int, Literal['front', 'back']]] Defaults to self.queue_add_position + plan_factory : Optional[Callable] + Function to generate plans from points. Defaults to self.measurement_plan. + Callable should return a tuple of (plan_name, args, kwargs) Returns ------- """ for point in next_points: + plan_factory = plan_factory or self.measurement_plan plan_name, args, kwargs = self.measurement_plan(point) kwargs.setdefault("md", {}) kwargs["md"].update(self.default_plan_md) @@ -1049,7 +1058,13 @@ def add_suggestions_to_subject_queue(self, batch_size: int): """Calls ask, adds suggestions to queue, and writes out event""" next_points, uid = self._ask_and_write_events(batch_size, self.subject_ask, "subject_ask") logger.info("Issued ask to subject and adding to the queue. {uid}") - self._add_to_queue(next_points, uid, re_manager=self.subject_re_manager, position="front") + self._add_to_queue( + next_points, + uid, + re_manager=self.subject_re_manager, + position="front", + plan_factory=self.subject_measurement_plan, + ) def _on_stop_router(self, name, doc): ret = super()._on_stop_router(name, doc) diff --git a/bluesky_adaptive/tests/test_adjudicators.py b/bluesky_adaptive/tests/test_adjudicators.py index 711726a..1bffd85 100644 --- a/bluesky_adaptive/tests/test_adjudicators.py +++ b/bluesky_adaptive/tests/test_adjudicators.py @@ -191,11 +191,12 @@ def test_adjudicator_receipt(temporary_topics, kafka_bootstrap_servers, kafka_pr assert len(adjudicator.consumed_documents) == 1 -@pytest.mark.xfail( - os.environ.get("GITHUB_ACTIONS") == "true", - raises=TimeoutError, - reason="Kafka timeout awaiting messages to arrive", -) # Allow timeout in GHA CI/CD +# @pytest.mark.xfail( +# os.environ.get("GITHUB_ACTIONS") == "true", +# raises=TimeoutError, +# reason="Kafka timeout awaiting messages to arrive", +# ) # Allow timeout in GHA CI/CD +@pytest.mark.skip(reason="Segmentation fault in Github Actions") # TODO (maffettone): revisit this test def test_adjudicator_by_name(temporary_topics, kafka_bootstrap_servers, kafka_producer_config): with temporary_topics(topics=["test.adjudicator", "test.data"]) as (adj_topic, bs_topic): re_manager = REManagerAPI(http_server_uri=None) diff --git a/bluesky_adaptive/tests/test_sklearn_agents.py b/bluesky_adaptive/tests/test_sklearn_agents.py index 28a0020..86c7b46 100644 --- a/bluesky_adaptive/tests/test_sklearn_agents.py +++ b/bluesky_adaptive/tests/test_sklearn_agents.py @@ -1,4 +1,3 @@ -import os import time as ttime from typing import Tuple, Union @@ -109,11 +108,12 @@ def test_decomp_agent( agent.stop() -@pytest.mark.xfail( - os.environ.get("GITHUB_ACTIONS") == "true", - raises=TimeoutError, - reason="Kafka timeout awaiting messages to arrive", -) # Allow timeout in GHA CI/CD +# @pytest.mark.xfail( +# os.environ.get("GITHUB_ACTIONS") == "true", +# raises=TimeoutError, +# reason="Kafka timeout awaiting messages to arrive", +# ) # Allow timeout in GHA CI/CD +@pytest.mark.skip(reason="Segfaults on GitHub Actions") # TODO(maffettone): revisit @pytest.mark.parametrize("estimator", [PCA(2), NMF(2)], ids=["PCA", "NMF"]) def test_decomp_remodel_from_report( estimator, @@ -215,11 +215,12 @@ def test_cluster_agent( agent.stop() -@pytest.mark.xfail( - os.environ.get("GITHUB_ACTIONS") == "true", - raises=TimeoutError, - reason="Kafka timeout awaiting messages to arrive", -) # Allow timeout in GHA CI/CD +# @pytest.mark.xfail( +# os.environ.get("GITHUB_ACTIONS") == "true", +# raises=TimeoutError, +# reason="Kafka timeout awaiting messages to arrive", +# ) # Allow timeout in GHA CI/CD +@pytest.mark.skip(reason="Segfaults on GitHub Actions") # TODO(maffettone): revisit @pytest.mark.parametrize("estimator", [KMeans(2)], ids=["KMeans"]) def test_cluster_remodel_from_report( estimator, diff --git a/requirements-dev.txt b/requirements-dev.txt index 664278f..ae0c4d7 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -23,5 +23,14 @@ databroker ophyd scipy pre-commit -databroker @ git+https://github.com/bluesky/databroker.git@v2.0.0b38#egg=databroker -tiled[all] +databroker @ git+https://github.com/bluesky/databroker.git@v2.0.0b41#egg=databroker +# This is required for the `tiled` package to make a Mongo client (some components from tiled[server]). +asgi_correlation_id +python-jose[cryptography] +sqlalchemy[asyncio] >=2 +pydantic-settings >=2, <3 +jmespath +openpyxl +cachetools +pymongo +prometheus_client diff --git a/requirements.txt b/requirements.txt index 9befd33..be6f07d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,6 +6,6 @@ bluesky-widgets bluesky-kafka bluesky-queueserver-api xkcdpass -tiled +tiled[client] numpy pydantic