Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Maint: python version, CI/CD, tiled dependency #40

Merged
merged 18 commits into from
May 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 8 additions & 10 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,21 @@ on:

jobs:
unit-tests:

runs-on: ubuntu-latest

strategy:
matrix:
python-version: ["3.9"] # TODO: expand versions
matrix:
python-version: ["3.9", "3.10", "3.11", "3.12"]

fail-fast: false
steps:
fail-fast: false

steps:
- uses: actions/checkout@v4

# TODO (maffettone): Change host after CD into bluesky/bluesky-pods
- name: Download and build bluesky-pods
run: |
docker pull ghcr.io/maffettone/bluesky-pods-bluesky:latest
docker tag ghcr.io/maffettone/bluesky-pods-bluesky:latest bluesky:latest
docker pull ghcr.io/bluesky/bluesky-pods-bluesky:main
docker tag ghcr.io/bluesky/bluesky-pods-bluesky:main bluesky:latest

- name: Start Bluesky containers
run: |
Expand Down Expand Up @@ -84,10 +82,10 @@ jobs:
qserver environment open
qserver permissions reload


- name: Test with pytest
shell: bash -l {0}
run: |
set -vxeuo pipefail
coverage run -m pytest -s -v
coverage run -m pytest -v
coverage report

29 changes: 22 additions & 7 deletions bluesky_adaptive/agents/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import msgpack
import numpy as np
import tiled
import tiled.client.node
from bluesky_kafka import Publisher, RemoteDispatcher
from bluesky_queueserver_api import BPlan
from bluesky_queueserver_api.api_threads import API_Threads_Mixin
Expand Down Expand Up @@ -207,9 +206,9 @@ class Agent(ABC):
kafka messages to trigger agent directives.
kafka_producer : Optional[Publisher]
Bluesky Kafka publisher to produce document stream of agent actions for optional Adjudicator.
tiled_data_node : tiled.client.node.Node
tiled_data_node : tiled.client.container.Container
Tiled node to serve as source of data (BlueskyRuns) for the agent.
tiled_agent_node : tiled.client.node.Node
tiled_agent_node : tiled.client.container.Container
Tiled node to serve as storage for the agent documents.
qserver : bluesky_queueserver_api.api_threads.API_Threads_Mixin
Object to manage communication with Queue Server
Expand Down Expand Up @@ -247,8 +246,8 @@ def __init__(
self,
*,
kafka_consumer: AgentConsumer,
tiled_data_node: tiled.client.node.Node,
tiled_agent_node: tiled.client.node.Node,
tiled_data_node: tiled.client.container.Container,
tiled_agent_node: tiled.client.container.Container,
qserver: API_Threads_Mixin,
kafka_producer: Optional[Publisher],
agent_run_suffix: Optional[str] = None,
Expand Down Expand Up @@ -521,7 +520,13 @@ def _write_event(self, stream, doc, uid=None):
return event_doc["uid"]

def _add_to_queue(
self, next_points, uid, re_manager=None, position: Optional[Union[int, Literal["front", "back"]]] = None
self,
next_points,
uid,
*,
re_manager=None,
position: Optional[Union[int, Literal["front", "back"]]] = None,
plan_factory: Optional[Callable] = None,
):
"""
Adds a single set of points to the queue as bluesky plans
Expand All @@ -535,12 +540,16 @@ def _add_to_queue(
Defaults to self.re_manager
position : Optional[Union[int, Literal['front', 'back']]]
Defaults to self.queue_add_position
plan_factory : Optional[Callable]
Function to generate plans from points. Defaults to self.measurement_plan.
Callable should return a tuple of (plan_name, args, kwargs)

Returns
-------

"""
for point in next_points:
plan_factory = plan_factory or self.measurement_plan
plan_name, args, kwargs = self.measurement_plan(point)
kwargs.setdefault("md", {})
kwargs["md"].update(self.default_plan_md)
Expand Down Expand Up @@ -1049,7 +1058,13 @@ def add_suggestions_to_subject_queue(self, batch_size: int):
"""Calls ask, adds suggestions to queue, and writes out event"""
next_points, uid = self._ask_and_write_events(batch_size, self.subject_ask, "subject_ask")
logger.info("Issued ask to subject and adding to the queue. {uid}")
self._add_to_queue(next_points, uid, re_manager=self.subject_re_manager, position="front")
self._add_to_queue(
next_points,
uid,
re_manager=self.subject_re_manager,
position="front",
plan_factory=self.subject_measurement_plan,
)

def _on_stop_router(self, name, doc):
ret = super()._on_stop_router(name, doc)
Expand Down
11 changes: 6 additions & 5 deletions bluesky_adaptive/tests/test_adjudicators.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,12 @@ def test_adjudicator_receipt(temporary_topics, kafka_bootstrap_servers, kafka_pr
assert len(adjudicator.consumed_documents) == 1


@pytest.mark.xfail(
os.environ.get("GITHUB_ACTIONS") == "true",
raises=TimeoutError,
reason="Kafka timeout awaiting messages to arrive",
) # Allow timeout in GHA CI/CD
# @pytest.mark.xfail(
# os.environ.get("GITHUB_ACTIONS") == "true",
# raises=TimeoutError,
# reason="Kafka timeout awaiting messages to arrive",
# ) # Allow timeout in GHA CI/CD
@pytest.mark.skip(reason="Segmentation fault in Github Actions") # TODO (maffettone): revisit this test
def test_adjudicator_by_name(temporary_topics, kafka_bootstrap_servers, kafka_producer_config):
with temporary_topics(topics=["test.adjudicator", "test.data"]) as (adj_topic, bs_topic):
re_manager = REManagerAPI(http_server_uri=None)
Expand Down
23 changes: 12 additions & 11 deletions bluesky_adaptive/tests/test_sklearn_agents.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import os
import time as ttime
from typing import Tuple, Union

Expand Down Expand Up @@ -109,11 +108,12 @@ def test_decomp_agent(
agent.stop()


@pytest.mark.xfail(
os.environ.get("GITHUB_ACTIONS") == "true",
raises=TimeoutError,
reason="Kafka timeout awaiting messages to arrive",
) # Allow timeout in GHA CI/CD
# @pytest.mark.xfail(
# os.environ.get("GITHUB_ACTIONS") == "true",
# raises=TimeoutError,
# reason="Kafka timeout awaiting messages to arrive",
# ) # Allow timeout in GHA CI/CD
@pytest.mark.skip(reason="Segfaults on GitHub Actions") # TODO(maffettone): revisit
@pytest.mark.parametrize("estimator", [PCA(2), NMF(2)], ids=["PCA", "NMF"])
def test_decomp_remodel_from_report(
estimator,
Expand Down Expand Up @@ -215,11 +215,12 @@ def test_cluster_agent(
agent.stop()


@pytest.mark.xfail(
os.environ.get("GITHUB_ACTIONS") == "true",
raises=TimeoutError,
reason="Kafka timeout awaiting messages to arrive",
) # Allow timeout in GHA CI/CD
# @pytest.mark.xfail(
# os.environ.get("GITHUB_ACTIONS") == "true",
# raises=TimeoutError,
# reason="Kafka timeout awaiting messages to arrive",
# ) # Allow timeout in GHA CI/CD
@pytest.mark.skip(reason="Segfaults on GitHub Actions") # TODO(maffettone): revisit
@pytest.mark.parametrize("estimator", [KMeans(2)], ids=["KMeans"])
def test_cluster_remodel_from_report(
estimator,
Expand Down
13 changes: 11 additions & 2 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,14 @@ databroker
ophyd
scipy
pre-commit
databroker @ git+https://github.com/bluesky/[email protected]#egg=databroker
tiled[all]
databroker @ git+https://github.com/bluesky/[email protected]#egg=databroker
# This is required for the `tiled` package to make a Mongo client (some components from tiled[server]).
asgi_correlation_id
python-jose[cryptography]
sqlalchemy[asyncio] >=2
pydantic-settings >=2, <3
jmespath
openpyxl
cachetools
pymongo
prometheus_client
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ bluesky-widgets
bluesky-kafka
bluesky-queueserver-api
xkcdpass
tiled
tiled[client]
numpy
pydantic
Loading