From 653f1288a97a76422e6417dce09e430c5bd36342 Mon Sep 17 00:00:00 2001 From: Rose Yemelyanova Date: Mon, 27 Feb 2023 17:47:19 +0000 Subject: [PATCH 1/4] added a zebra object --- src/ophyd_epics_devices/zebra.py | 307 +++++++++++++++++++++++++++++++ 1 file changed, 307 insertions(+) create mode 100644 src/ophyd_epics_devices/zebra.py diff --git a/src/ophyd_epics_devices/zebra.py b/src/ophyd_epics_devices/zebra.py new file mode 100644 index 0000000..d5c0935 --- /dev/null +++ b/src/ophyd_epics_devices/zebra.py @@ -0,0 +1,307 @@ +from enum import Enum +from typing import List, Type + +import numpy as np +import numpy.typing as npt + +from ophyd.v2.core import StandardReadable, T +from ophyd.v2.epics import EpicsSignalR, EpicsSignalRW +from functools import partialmethod + + +def zebra_rw(datatype: Type[T], suffix: str) -> EpicsSignalRW[T]: + return EpicsSignalRW(datatype, suffix) + + +def zebra_r(datatype: Type[T], suffix: str) -> EpicsSignalR[T]: + return EpicsSignalR(datatype, suffix) + + +class Active(Enum): + no = "No" + yes = "Yes" + + +class GatePulseSelect(Enum): + time = "Time" + position = "Position" + external = "External" + + +class ArmSelect(Enum): + soft = "Soft" + external = "External" + + +class Direction(Enum): + positive = "Positive" + negative = "Negative" + + +class EncoderType(Enum): + enc1 = "Enc1" + enc2 = "Enc2" + enc3 = "Enc3" + enc4 = "Enc4" + enc1_4av = "Enc1-4Av" + + +class TimeUnits(Enum): + ms = "ms" + s = "s" + s10 = "10s" + + +class Capture(StandardReadable): + def __init__(self, prefix: str, name="") -> None: + self.enc1 = zebra_rw(Active, "B0") + self.enc2 = zebra_rw(Active, "B1") + self.enc3 = zebra_rw(Active, "B2") + self.enc4 = zebra_rw(Active, "B3") + self.sys1 = zebra_rw(Active, "B4") + self.sys2 = zebra_rw(Active, "B5") + self.div1 = zebra_rw(Active, "B6") + self.div2 = zebra_rw(Active, "B7") + self.div3 = zebra_rw(Active, "B8") + self.div4 = zebra_rw(Active, "B9") + super().__init__(prefix, name) + + +class ArrayOuts(StandardReadable): + def __init__(self, prefix: str, name="") -> None: + def make_pv(suffix: str, common_prefix: str = "PC_"): + return zebra_rw(npt.NDArray[np.float64], common_prefix + suffix) + + self.enc1 = make_pv("ENC1") + self.enc2 = make_pv("ENC2") + self.enc3 = make_pv("ENC3") + self.enc4 = make_pv("ENC4") + self.sys1 = make_pv("SYS1") + self.sys2 = make_pv("SYS2") + self.div1 = make_pv("DIV1") + self.div2 = make_pv("DIV2") + self.div3 = make_pv("DIV3") + self.div4 = make_pv("DIV4") + super().__init__(prefix, name) + + +class ZebraOutputPanel(StandardReadable): + def __init__(self, prefix: str, name="") -> None: + self.pulse_1_input = zebra_rw(int, "PULSE1_INP") + + self.out_1 = zebra_rw(int, "OUT1_TTL") + self.out_2 = zebra_rw(int, "OUT2_TTL") + self.out_3 = zebra_rw(int, "OUT3_TTL") + self.out_4 = zebra_rw(int, "OUT4_TTL") + super().__init__(prefix, name) + + @property + def out_pvs(self) -> List[EpicsSignalRW[int]]: + """A list of all the output TTL PVs. Note that as the PVs are 1 indexed + `out_pvs[0]` is `None`. NOTE: technically typing for this is wrong. + i.e. first index is not an EpicsSignal... + """ + return [None, self.out_1, self.out_2, self.out_3, self.out_4] + + async def reset(self): + ... + + +class GateControl(StandardReadable): + def __init__(self, prefix: str, pv_prefix: str, name=""): + self.enable = zebra_rw(int, "_ENA") + self.source_1 = zebra_rw(int, "_INP1") + self.source_2 = zebra_rw(int, "_INP2") + self.source_3 = zebra_rw(int, "_INP3") + self.source_4 = zebra_rw(int, "_INP4") + self.invert = zebra_rw(int, "_INV") + super().__init__(prefix, name) + + @property + def sources(self): + return [self.source_1, self.source_2, self.source_3, self.source_4] + + +def boolean_array_to_integer(values: List[bool]) -> int: + """Converts a boolean array to integer by interpretting it in binary with LSB 0 bit + numbering. + Args: + values (List[bool]): The list of booleans to convert. + Returns: + int: The interpretted integer. + """ + return sum(v << i for i, v in enumerate(values)) + + +class GateType(Enum): + AND = "AND" + OR = "OR" + + +class LogicGateConfiguration: + NUMBER_OF_INPUTS = 4 + + def __init__(self, input_source: int, invert: bool = False) -> None: + self.sources: List[int] = [] + self.invert: List[bool] = [] + self.add_input(input_source, invert) + + def add_input( + self, input_source: int, invert: bool = False + ) -> "LogicGateConfiguration": + """Add an input to the gate. This will throw an assertion error if more than 4 + inputs are added to the Zebra. + Args: + input_source (int): The source for the input (must be between 0 and 63). + invert (bool, optional): Whether the input should be inverted. Default + False. + Returns: + LogicGateConfiguration: A description of the gate configuration. + """ + assert len(self.sources) < 4 + assert 0 <= input_source <= 63 + self.sources.append(input_source) + self.invert.append(invert) + return self + + def __str__(self) -> str: + input_strings = [] + for input, (source, invert) in enumerate(zip(self.sources, self.invert)): + input_strings.append(f"INP{input+1}={'!' if invert else ''}{source}") + + return ", ".join(input_strings) + + +class LogicGateConfigurer(StandardReadable): + DEFAULT_SOURCE_IF_GATE_NOT_USED = 0 + + def __init__(self, prefix: str, name=""): + self.and_gate_1 = GateControl(prefix, "AND1", name) + self.and_gate_2 = GateControl(prefix, "AND2", name) + self.and_gate_3 = GateControl(prefix, "AND3", name) + self.and_gate_4 = GateControl(prefix, "AND4", name) + + self.or_gate_1 = GateControl(prefix, "OR1", name) + self.or_gate_2 = GateControl(prefix, "OR2", name) + self.or_gate_3 = GateControl(prefix, "OR3", name) + self.or_gate_4 = GateControl(prefix, "OR4", name) + + self.all_gates = { + GateType.AND: [ + self.and_gate_1, + self.and_gate_2, + self.and_gate_3, + self.and_gate_4, + ], + GateType.OR: [ + self.or_gate_1, + self.or_gate_2, + self.or_gate_3, + self.or_gate_4, + ], + } + + super().__init__(prefix, name) + + async def apply_logic_gate_config( + self, type: GateType, gate_number: int, config: LogicGateConfiguration + ): + """Uses the specified `LogicGateConfiguration` to configure a gate on the Zebra. + Args: + type (GateType): The type of gate e.g. AND/OR + gate_number (int): Which gate to configure. + config (LogicGateConfiguration): A configuration for the gate. + """ + gate: GateControl = self.all_gates[type][gate_number - 1] + + await gate.enable.set(boolean_array_to_integer([True] * len(config.sources))) + + # Input Source + for source_number, source_pv in enumerate(gate.sources): + try: + await source_pv.set(config.sources[source_number]) + except IndexError: + await source_pv.set(self.DEFAULT_SOURCE_IF_GATE_NOT_USED) + + # Invert + await gate.invert.set(boolean_array_to_integer(config.invert)) + + async def reset(self): + ... + + apply_and_gate_config = partialmethod(apply_logic_gate_config, GateType.AND) + apply_or_gate_config = partialmethod(apply_logic_gate_config, GateType.OR) + + +class PositionCompare(StandardReadable): + def __init__(self, prefix: str, name="") -> None: + self.capture = Capture("PC_BIT_CAP:") + self.array_outputs = ArrayOuts("") + + self.direction = zebra_rw(Direction, "PC_DIR") + self.time_units = zebra_rw(TimeUnits, "PC_TSPRE") + self.time = zebra_rw(float, "PC_TIME") + + self.pulse_sel = zebra_rw(GatePulseSelect, "PC_PULSE_SEL") + self.pulse_input = zebra_rw(int, "PC_PULSE_INP") + self.pulse_start = zebra_rw(float, "PC_PULSE_START") + self.pulse_width = zebra_rw(float, "PC_PULSE_WID") + self.pulse_delay = zebra_rw(int, "PC_PULSE_DLY") + self.pulse_step = zebra_rw(int, "PC_PULSE_STEP") + self.pulse_max = zebra_rw(int, "PC_PULSE_MAX") + + self.num_gates = zebra_rw(int, "PC_GATE_NGATE") + self.gate_trigger = zebra_rw(EncoderType, "PC_ENC") + self.gate_sel = zebra_rw(GatePulseSelect, "PC_GATE_SEL") + self.gate_input = zebra_rw(int, "PC_GATE_INP") + self.gate_start = zebra_rw(float, "PC_GATE_START") + self.gate_width = zebra_rw(float, "PC_GATE_WID") + self.gate_step = zebra_rw(int, "PC_GATE_STEP") + + self.arm_sel = zebra_rw(ArmSelect, "PC_ARM_SEL") + self.arm = zebra_rw(int, "PC_ARM") + self.disarm = zebra_rw(int, "PC_DISARM") + self.armed = zebra_rw(int, "PC_ARM_OUT") + + self.captured = zebra_r(int, "PC_NUM_CAP") + self.downloaded = zebra_r(int, "PC_NUM_DOWN") + super().__init__(prefix, name) + + # want a mapping between the encoder, capture and array pvs? + + async def reset(self): + await self.time_units.set(TimeUnits.ms) + await self.pulse_sel.set(GatePulseSelect.time) + await self.gate_sel.set(GatePulseSelect.position) + await self.disarm.set(1) + await self.arm_sel.set(ArmSelect.soft) + await self.pulse_start.set(0.0) + + +class System(StandardReadable): + def __init__(self, prefix: str, name: str = ""): + self.sys_reset = zebra_rw(int, "SYS_RESET.PROC") + self.config_file = zebra_rw(str, "CONFIG_FILE") + self.config_read = zebra_rw(int, "CONFIG_READ.PROC") + self.config_status = zebra_rw(str, "CONFIG_STATUS") + + super().__init__(prefix, name) + + async def reset(self): + await self.sys_reset.set(1) + + +class Zebra(StandardReadable): + def __init__(self, prefix: str, name: str = ""): + self.pc = PositionCompare("") + self.output = ZebraOutputPanel("") + self.logic_gates = LogicGateConfigurer("") + self.sys = System("") + + super().__init__(prefix, name) + + async def reset(self): + await self.pc.reset() + await self.output.reset() + await self.logic_gates.reset() + await self.sys.reset() From 8a7e3772d1a52695cb2efa789114694aab80d284 Mon Sep 17 00:00:00 2001 From: Rose Yemelyanova Date: Thu, 3 Aug 2023 13:52:25 +0000 Subject: [PATCH 2/4] Add base Zebra device compatible with epics screens --- pyproject.toml | 2 +- src/ophyd_epics_devices/zebra.py | 600 ++++++++++++++++++------------- tests/test_zebra.py | 114 ++++++ 3 files changed, 473 insertions(+), 243 deletions(-) create mode 100644 tests/test_zebra.py diff --git a/pyproject.toml b/pyproject.toml index ba0bfa3..1f261c8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,7 +14,7 @@ classifiers = [ ] description = "Cross-institution EPICS Devices for Ophyd" dependencies = [ - "ophyd[ca] @ git+https://github.com/bluesky/ophyd.git", + "ophyd @ git+https://github.com/bluesky/ophyd.git@sim-backend-numpy-typing", "bluesky", "event-model", "p4p", diff --git a/src/ophyd_epics_devices/zebra.py b/src/ophyd_epics_devices/zebra.py index d5c0935..c193f3e 100644 --- a/src/ophyd_epics_devices/zebra.py +++ b/src/ophyd_epics_devices/zebra.py @@ -1,23 +1,15 @@ +import asyncio from enum import Enum -from typing import List, Type import numpy as np import numpy.typing as npt +from bluesky import RunEngine +from ophyd.v2.core import Device, DeviceCollector, DeviceVector, SignalR, SignalRW +from ophyd.v2.epics import epics_signal_r, epics_signal_rw +from typing_extensions import TypeAlias -from ophyd.v2.core import StandardReadable, T -from ophyd.v2.epics import EpicsSignalR, EpicsSignalRW -from functools import partialmethod - -def zebra_rw(datatype: Type[T], suffix: str) -> EpicsSignalRW[T]: - return EpicsSignalRW(datatype, suffix) - - -def zebra_r(datatype: Type[T], suffix: str) -> EpicsSignalR[T]: - return EpicsSignalR(datatype, suffix) - - -class Active(Enum): +class Bool(Enum): no = "No" yes = "Yes" @@ -52,256 +44,380 @@ class TimeUnits(Enum): s10 = "10s" -class Capture(StandardReadable): - def __init__(self, prefix: str, name="") -> None: - self.enc1 = zebra_rw(Active, "B0") - self.enc2 = zebra_rw(Active, "B1") - self.enc3 = zebra_rw(Active, "B2") - self.enc4 = zebra_rw(Active, "B3") - self.sys1 = zebra_rw(Active, "B4") - self.sys2 = zebra_rw(Active, "B5") - self.div1 = zebra_rw(Active, "B6") - self.div2 = zebra_rw(Active, "B7") - self.div3 = zebra_rw(Active, "B8") - self.div4 = zebra_rw(Active, "B9") - super().__init__(prefix, name) - - -class ArrayOuts(StandardReadable): - def __init__(self, prefix: str, name="") -> None: - def make_pv(suffix: str, common_prefix: str = "PC_"): - return zebra_rw(npt.NDArray[np.float64], common_prefix + suffix) - - self.enc1 = make_pv("ENC1") - self.enc2 = make_pv("ENC2") - self.enc3 = make_pv("ENC3") - self.enc4 = make_pv("ENC4") - self.sys1 = make_pv("SYS1") - self.sys2 = make_pv("SYS2") - self.div1 = make_pv("DIV1") - self.div2 = make_pv("DIV2") - self.div3 = make_pv("DIV3") - self.div4 = make_pv("DIV4") - super().__init__(prefix, name) - - -class ZebraOutputPanel(StandardReadable): +class UpdateRate(Enum): + passive = "Passive" + event = "Event" + io_intr = "I/O Intr" + s10 = "10 second" + s5 = "5 second" + s2 = "2 second" + s1 = "1 second" + s0_5 = ".5 second" + s0_2 = ".2 second" + s0_1 = ".1 second" + + +CapturePvType: TypeAlias = DeviceVector[SignalRW[Bool]] +ArrayOutPvType: TypeAlias = DeviceVector[SignalRW[npt.NDArray[np.float64]]] + + +####################################################################################### + + +class PcSetupCapture(Device): def __init__(self, prefix: str, name="") -> None: - self.pulse_1_input = zebra_rw(int, "PULSE1_INP") - - self.out_1 = zebra_rw(int, "OUT1_TTL") - self.out_2 = zebra_rw(int, "OUT2_TTL") - self.out_3 = zebra_rw(int, "OUT3_TTL") - self.out_4 = zebra_rw(int, "OUT4_TTL") - super().__init__(prefix, name) - - @property - def out_pvs(self) -> List[EpicsSignalRW[int]]: - """A list of all the output TTL PVs. Note that as the PVs are 1 indexed - `out_pvs[0]` is `None`. NOTE: technically typing for this is wrong. - i.e. first index is not an EpicsSignal... - """ - return [None, self.out_1, self.out_2, self.out_3, self.out_4] + self.enc: CapturePvType = DeviceVector( + { + idx + 1: epics_signal_rw(Bool, f"{prefix}:B{key}") + for idx, key in enumerate(range(4)) + } + ) + self.sys = DeviceVector( + { + idx + 1: epics_signal_rw(Bool, f"{prefix}:B{key}") + for idx, key in enumerate(range(4, 6)) + } + ) + self.div = DeviceVector( + { + idx + 1: epics_signal_rw(Bool, f"{prefix}:B{key}") + for idx, key in enumerate(range(6, 10)) + } + ) + + +class PcSetup(Device): + def __init__(self, prefix: str) -> None: + self.capture = PcSetupCapture(f"{prefix}:PC_BIT_CAP") + self.posn_trig = epics_signal_rw(EncoderType, f"{prefix}:PC_ENC") + self.posn_dir = epics_signal_rw(Direction, f"{prefix}:PC_DIR") + self.time_units = epics_signal_rw(TimeUnits, f"{prefix}:PC_TSPRE") async def reset(self): - ... - - -class GateControl(StandardReadable): - def __init__(self, prefix: str, pv_prefix: str, name=""): - self.enable = zebra_rw(int, "_ENA") - self.source_1 = zebra_rw(int, "_INP1") - self.source_2 = zebra_rw(int, "_INP2") - self.source_3 = zebra_rw(int, "_INP3") - self.source_4 = zebra_rw(int, "_INP4") - self.invert = zebra_rw(int, "_INV") - super().__init__(prefix, name) - - @property - def sources(self): - return [self.source_1, self.source_2, self.source_3, self.source_4] - - -def boolean_array_to_integer(values: List[bool]) -> int: - """Converts a boolean array to integer by interpretting it in binary with LSB 0 bit - numbering. - Args: - values (List[bool]): The list of booleans to convert. - Returns: - int: The interpretted integer. - """ - return sum(v << i for i, v in enumerate(values)) - - -class GateType(Enum): - AND = "AND" - OR = "OR" - - -class LogicGateConfiguration: - NUMBER_OF_INPUTS = 4 - - def __init__(self, input_source: int, invert: bool = False) -> None: - self.sources: List[int] = [] - self.invert: List[bool] = [] - self.add_input(input_source, invert) - - def add_input( - self, input_source: int, invert: bool = False - ) -> "LogicGateConfiguration": - """Add an input to the gate. This will throw an assertion error if more than 4 - inputs are added to the Zebra. - Args: - input_source (int): The source for the input (must be between 0 and 63). - invert (bool, optional): Whether the input should be inverted. Default - False. - Returns: - LogicGateConfiguration: A description of the gate configuration. - """ - assert len(self.sources) < 4 - assert 0 <= input_source <= 63 - self.sources.append(input_source) - self.invert.append(invert) - return self - - def __str__(self) -> str: - input_strings = [] - for input, (source, invert) in enumerate(zip(self.sources, self.invert)): - input_strings.append(f"INP{input+1}={'!' if invert else ''}{source}") - - return ", ".join(input_strings) - - -class LogicGateConfigurer(StandardReadable): - DEFAULT_SOURCE_IF_GATE_NOT_USED = 0 - - def __init__(self, prefix: str, name=""): - self.and_gate_1 = GateControl(prefix, "AND1", name) - self.and_gate_2 = GateControl(prefix, "AND2", name) - self.and_gate_3 = GateControl(prefix, "AND3", name) - self.and_gate_4 = GateControl(prefix, "AND4", name) - - self.or_gate_1 = GateControl(prefix, "OR1", name) - self.or_gate_2 = GateControl(prefix, "OR2", name) - self.or_gate_3 = GateControl(prefix, "OR3", name) - self.or_gate_4 = GateControl(prefix, "OR4", name) - - self.all_gates = { - GateType.AND: [ - self.and_gate_1, - self.and_gate_2, - self.and_gate_3, - self.and_gate_4, - ], - GateType.OR: [ - self.or_gate_1, - self.or_gate_2, - self.or_gate_3, - self.or_gate_4, - ], - } - - super().__init__(prefix, name) - - async def apply_logic_gate_config( - self, type: GateType, gate_number: int, config: LogicGateConfiguration - ): - """Uses the specified `LogicGateConfiguration` to configure a gate on the Zebra. - Args: - type (GateType): The type of gate e.g. AND/OR - gate_number (int): Which gate to configure. - config (LogicGateConfiguration): A configuration for the gate. - """ - gate: GateControl = self.all_gates[type][gate_number - 1] - - await gate.enable.set(boolean_array_to_integer([True] * len(config.sources))) + await self.time_units.set(TimeUnits.ms) - # Input Source - for source_number, source_pv in enumerate(gate.sources): - try: - await source_pv.set(config.sources[source_number]) - except IndexError: - await source_pv.set(self.DEFAULT_SOURCE_IF_GATE_NOT_USED) - # Invert - await gate.invert.set(boolean_array_to_integer(config.invert)) +class PcArm(Device): + def __init__(self, prefix: str) -> None: + self.trig_source = epics_signal_rw(ArmSelect, f"{prefix}:PC_ARM_SEL") + self.arm = epics_signal_rw(float, f"{prefix}:PC_ARM") + self.disarm = epics_signal_rw(float, f"{prefix}:PC_DISARM") + self.arm_status = epics_signal_rw(float, f"{prefix}:PC_ARM_OUT") async def reset(self): - ... + await self.disarm.set(1) + await self.trig_source.set(ArmSelect.soft) - apply_and_gate_config = partialmethod(apply_logic_gate_config, GateType.AND) - apply_or_gate_config = partialmethod(apply_logic_gate_config, GateType.OR) +class PcGate(Device): + def __init__(self, prefix: str) -> None: + self.trig_source = epics_signal_rw(GatePulseSelect, f"{prefix}:PC_GATE_SEL") + self.gate_start = epics_signal_rw(float, f"{prefix}:PC_GATE_START") + self.gate_width = epics_signal_rw(float, f"{prefix}:PC_GATE_WID") + self.num_gates = epics_signal_rw(float, f"{prefix}:PC_GATE_NGATE") + # self.gate_input = epics_signal_rw(int, "PC_GATE_INP") + self.gate_step = epics_signal_rw(float, f"{prefix}:PC_GATE_STEP") + self.gate_status = epics_signal_rw(float, f"{prefix}:PC_GATE_OUT") -class PositionCompare(StandardReadable): - def __init__(self, prefix: str, name="") -> None: - self.capture = Capture("PC_BIT_CAP:") - self.array_outputs = ArrayOuts("") - - self.direction = zebra_rw(Direction, "PC_DIR") - self.time_units = zebra_rw(TimeUnits, "PC_TSPRE") - self.time = zebra_rw(float, "PC_TIME") - - self.pulse_sel = zebra_rw(GatePulseSelect, "PC_PULSE_SEL") - self.pulse_input = zebra_rw(int, "PC_PULSE_INP") - self.pulse_start = zebra_rw(float, "PC_PULSE_START") - self.pulse_width = zebra_rw(float, "PC_PULSE_WID") - self.pulse_delay = zebra_rw(int, "PC_PULSE_DLY") - self.pulse_step = zebra_rw(int, "PC_PULSE_STEP") - self.pulse_max = zebra_rw(int, "PC_PULSE_MAX") - - self.num_gates = zebra_rw(int, "PC_GATE_NGATE") - self.gate_trigger = zebra_rw(EncoderType, "PC_ENC") - self.gate_sel = zebra_rw(GatePulseSelect, "PC_GATE_SEL") - self.gate_input = zebra_rw(int, "PC_GATE_INP") - self.gate_start = zebra_rw(float, "PC_GATE_START") - self.gate_width = zebra_rw(float, "PC_GATE_WID") - self.gate_step = zebra_rw(int, "PC_GATE_STEP") - - self.arm_sel = zebra_rw(ArmSelect, "PC_ARM_SEL") - self.arm = zebra_rw(int, "PC_ARM") - self.disarm = zebra_rw(int, "PC_DISARM") - self.armed = zebra_rw(int, "PC_ARM_OUT") - - self.captured = zebra_r(int, "PC_NUM_CAP") - self.downloaded = zebra_r(int, "PC_NUM_DOWN") - super().__init__(prefix, name) - - # want a mapping between the encoder, capture and array pvs? + async def reset(self): + await self.trig_source.set(GatePulseSelect.position) + + +class PcPulse(Device): + def __init__(self, prefix: str) -> None: + self.trig_source = epics_signal_rw(GatePulseSelect, f"{prefix}:PC_PULSE_SEL") + self.pulse_start = epics_signal_rw(float, f"{prefix}:PC_PULSE_START") + self.pulse_width = epics_signal_rw(float, f"{prefix}:PC_PULSE_WID") + self.pulse_step = epics_signal_rw(float, f"{prefix}:PC_PULSE_STEP") + self.capt_delay = epics_signal_rw(float, f"{prefix}:PC_PULSE_DLY") + # self.pulse_input = epics_signal_rw(int, "PC_PULSE_INP") + self.max_pulses = epics_signal_rw(float, f"{prefix}:PC_PULSE_MAX") + self.pulse_status = epics_signal_rw(float, f"{prefix}:PC_PULSE_OUT") async def reset(self): - await self.time_units.set(TimeUnits.ms) - await self.pulse_sel.set(GatePulseSelect.time) - await self.gate_sel.set(GatePulseSelect.position) - await self.disarm.set(1) - await self.arm_sel.set(ArmSelect.soft) + await self.trig_source.set(GatePulseSelect.time) await self.pulse_start.set(0.0) -class System(StandardReadable): - def __init__(self, prefix: str, name: str = ""): - self.sys_reset = zebra_rw(int, "SYS_RESET.PROC") - self.config_file = zebra_rw(str, "CONFIG_FILE") - self.config_read = zebra_rw(int, "CONFIG_READ.PROC") - self.config_status = zebra_rw(str, "CONFIG_STATUS") +class ArrayOuts(Device): + def __init__(self, prefix: str) -> None: + def make_pv(suffix: str): + # return epics_signal_rw(npt.NDArray[np.float64], f"{prefix}:PC_{suffix}") + return epics_signal_rw(npt.NDArray[np.float64], f"{prefix}:PC_{suffix}") + + self.enc: ArrayOutPvType = DeviceVector( + { + idx + 1: make_pv(name) + for idx, name in enumerate(["ENC1", "ENC2", "ENC3", "ENC4"]) + } + ) + self.sys: ArrayOutPvType = DeviceVector( + {idx + 1: make_pv(name) for idx, name in enumerate(["SYS1", "SYS2"])} + ) + self.div: ArrayOutPvType = DeviceVector( + { + idx + 1: make_pv(name) + for idx, name in enumerate(["DIV1", "DIV2", "DIV3", "DIV4"]) + } + ) + + +class PcDownload(Device): + def __init__(self, prefix: str) -> None: + self.array_outputs = ArrayOuts(prefix) + self.captured = epics_signal_r(float, f"{prefix}:PC_NUM_CAP") + self.downloaded = epics_signal_r(float, f"{prefix}:PC_NUM_DOWN") + self.in_progress = epics_signal_r(float, f"{prefix}:ARRAY_ACQ") + self.update_rate = epics_signal_rw(UpdateRate, f"{prefix}:ARRAY_UPDATE.SCAN") + + +class PositionCompare(Device): + def __init__(self, prefix: str) -> None: + self.setup = PcSetup(prefix) + self.arm = PcArm(prefix) + self.gate = PcGate(prefix) + self.pulse = PcPulse(prefix) + self.download = PcDownload(prefix) + + async def reset(self): + await self.setup.reset() + await self.arm.reset() + await self.gate.reset() + await self.pulse.reset() + + +####################################################################################### + + +class Input(Device): + """Designed to represent the 'INP' fields in GATE tab for example.""" + + def __init__(self, prefix: str): + self.input = epics_signal_rw(float, f"{prefix}") + self.source = epics_signal_rw(str, f"{prefix}:STR") + self.status = epics_signal_rw(float, f"{prefix}:STA") + + +class LogicGatePanelInput(Input): + def __init__(self, prefix: str, number: int): + self.use = epics_signal_rw(Bool, f"{prefix}_ENA:B{number-1}") + self.invert = epics_signal_rw(Bool, f"{prefix}_INV:B{number-1}") + super().__init__(f"{prefix}_INP{number}") + + +class LogicGatePanel(Device): + def __init__(self, prefix: str): + self.inp = DeviceVector( + {channel: LogicGatePanelInput(prefix, channel) for channel in range(1, 5)} + ) + + +####################################################################################### + + +class Gate(Device): + def __init__(self, prefix: str, number: int): + self.inp1 = Input(f"{prefix}:GATE{number}_INP1") + self.inp1_trigger = epics_signal_rw(Bool, f"{prefix}:POLARITY:B{number-1}") + self.inp2 = Input(f"{prefix}:GATE{number}_INP2") + self.inp2_trigger = epics_signal_rw(Bool, f"{prefix}:POLARITY:B{number+3}") + self.out = epics_signal_r(float, f"{prefix}:GATE{number}_OUT") - super().__init__(prefix, name) + +####################################################################################### + + +class Div(Device): + triggers = {1: "8", 2: "9", 3: "A", 4: "B"} + + def __init__(self, prefix: str, number: int): + self.input = Input(f"{prefix}:DIV{number}_INP") + self.trigger = epics_signal_rw( + Bool, f"{prefix}:POLARITY:B{self.triggers[number]}" + ) + self.divisor = epics_signal_rw(float, f"{prefix}:DIV{number}_DIV") + self.first_pulse = epics_signal_rw(Bool, f"{prefix}:DIV_FIRST:B{number-1}") + self.outd = epics_signal_r(float, f"{prefix}:DIV{number}_OUTD") + self.outn = epics_signal_r(float, f"{prefix}:DIV{number}_OUTN") + + +####################################################################################### + + +class Pulse(Device): + triggers = {1: "C", 2: "D", 3: "E", 4: "F"} + + def __init__(self, prefix: str, number: int): + self.input = Input(f"{prefix}:PULSE{number}_INP") + self.trigger = epics_signal_rw( + Bool, f"{prefix}:POLARITY:B{self.triggers[number]}" + ) + self.delay_before = epics_signal_rw(float, f"{prefix}:PULSE{number}_DLY") + self.pulse_width = epics_signal_rw(float, f"{prefix}:PULSE{number}_WID") + self.time_units = epics_signal_rw(TimeUnits, f"{prefix}:PULSE{number}_PRE") + + self.trig_while_active = epics_signal_r( + int, f"{prefix}:SYS_STATERR.B{number-1}" + ) + self.output_pulse = epics_signal_r(float, f"{prefix}:PULSE{number}_OUT") + + +####################################################################################### + + +class EachMotor(Device): + def __init__(self, prefix: str, number: int): + self.title = epics_signal_r(str, f"{prefix}:M{number}") + self.description = epics_signal_r(str, f"{prefix}:M{number}:DESC") + self.motor_current_pos = epics_signal_r(float, f"{prefix}:M{number}:RBV") + self.set_zebra_pos = epics_signal_rw(float, f"{prefix}:POS{number}_SET") + self.copy_motor_pos_to_zebra = epics_signal_rw( + int, f"{prefix}:M{number}:SETPOS.PROC" + ) + + +class Quad(Device): + def __init__(self, prefix: str): + self.step = Input(f"{prefix}:QUAD_STEP") + self.dir = Input(f"{prefix}:QUAD_DIR") + + self.outa = epics_signal_r(float, f"{prefix}:QUAD_OUTA") + self.outb = epics_signal_r(float, f"{prefix}:QUAD_OUTB") + + +class Enc(Device): + def __init__(self, prefix: str): + self.pos: DeviceVector[EachMotor] = DeviceVector( + {number: EachMotor(prefix, number) for number in range(1, 5)} + ) + self.quad = Quad(prefix) + + +####################################################################################### + + +class SysFrontPanelOutputs(Device): + def __init__(self, prefix: str) -> None: + self.out_ttl: DeviceVector[Input] = DeviceVector( + {channel: Input(f"{prefix}:OUT{channel}_TTL") for channel in range(1, 5)} + ) + self.out_nim: DeviceVector[Input] = DeviceVector( + {channel: Input(f"{prefix}:OUT{channel}_NIM") for channel in [1, 2, 4]} + ) + self.out_lvds: DeviceVector[Input] = DeviceVector( + {channel: Input(f"{prefix}:OUT{channel}_LVDS") for channel in [1, 2, 3]} + ) + self.out_oc: DeviceVector[Input] = DeviceVector({3: Input(f"{prefix}:OUT3_OC")}) + self.out_pecl: DeviceVector[Input] = DeviceVector( + {4: Input(f"{prefix}:OUT4_PECL")} + ) + + +class SysRearPanelOutputs(Device): + def __init__(self, prefix: str) -> None: + self.out_enca: DeviceVector[Input] = DeviceVector( + {channel: Input(f"{prefix}:OUT{channel}_ENCA") for channel in range(5, 9)} + ) + self.out_encb: DeviceVector[Input] = DeviceVector( + {channel: Input(f"{prefix}:OUT{channel}_ENCB") for channel in range(5, 9)} + ) + self.out_encz: DeviceVector[Input] = DeviceVector( + {channel: Input(f"{prefix}:OUT{channel}_ENCZ") for channel in range(5, 9)} + ) + self.out_conn: DeviceVector[Input] = DeviceVector( + {channel: Input(f"{prefix}:OUT{channel}_CONN") for channel in range(5, 9)} + ) + + +class SysWriteRegsToFileFlash(Device): + def __init__(self, prefix: str): + self.file = epics_signal_rw(str, f"{prefix}:CONFIG_FILE") + self.store_to_file = epics_signal_rw(int, f"{prefix}:CONFIG_WRITE.PROC") + self.restore_from_file = epics_signal_rw(int, f"{prefix}:CONFIG_READ.PROC") + self.status = epics_signal_r(str, f"{prefix}:CONFIG_STATUS") + self.store_to_flash = epics_signal_rw(int, f"{prefix}:STORE.PROC") + self.restore_from_flash = epics_signal_rw(int, f"{prefix}:RESTORE.PROC") + + +class Sys(Device): + def __init__(self, prefix: str): + self.front_panel_outputs = SysFrontPanelOutputs(prefix) + self.rear_panel_outputs = SysRearPanelOutputs(prefix) + self.write_regs_to_file_or_flash = SysWriteRegsToFileFlash(prefix) + + self.version = epics_signal_r(float, f"{prefix}:SYS_VER") + self.initial_poll_done = epics_signal_r(Bool, f"{prefix}:INITIAL_POLL_DONE") async def reset(self): - await self.sys_reset.set(1) + return None + + +####################################################################################### + +class SoftIn(Device): + def __init__(self, prefix: str): + self.input: DeviceVector[epics_signal_rw[Bool]] = DeviceVector( + { + number: epics_signal_rw(Bool, f"{prefix}:B{number-1}") + for number in range(1, 5) + } + ) -class Zebra(StandardReadable): - def __init__(self, prefix: str, name: str = ""): - self.pc = PositionCompare("") - self.output = ZebraOutputPanel("") - self.logic_gates = LogicGateConfigurer("") - self.sys = System("") - super().__init__(prefix, name) +class Zebra(Device): + def __init__(self, prefix: str): + """ + Designed to pair well with the epics EDM screens for zebras. + """ + self.pc = PositionCompare(prefix) + + self.and_gates: DeviceVector[LogicGatePanel] = DeviceVector( + { + channel: LogicGatePanel(f"{prefix}:AND{channel}") + for channel in range(1, 5) + } + ) + self.or_gates: DeviceVector[LogicGatePanel] = DeviceVector( + { + channel: LogicGatePanel(f"{prefix}:OR{channel}") + for channel in range(1, 5) + } + ) + self.gate: DeviceVector[Gate] = DeviceVector( + {number: Gate(prefix, number) for number in range(1, 5)} + ) + self.div: DeviceVector[Div] = DeviceVector( + {number: Div(prefix, number) for number in range(1, 5)} + ) + self.pulse: DeviceVector[Pulse] = DeviceVector( + {number: Pulse(prefix, number) for number in range(1, 5)} + ) + self.enc = Enc(prefix) + self.sys = Sys(prefix) + + self.soft_in = SoftIn(f"{prefix}:SOFT_IN") + self.block_state = epics_signal_rw(int, f"{prefix}:SYS_RESET.PROC") async def reset(self): await self.pc.reset() - await self.output.reset() - await self.logic_gates.reset() await self.sys.reset() + + +# RE = RunEngine() + + +# async def somefunc(): +# async with DeviceCollector(): +# # I think I'd like to do and_screen.and[1].inp[1]... +# # so let's start with inp[1]... + +# # and_gates[1].inp[1] + +# # want to do, inp[1].use for example... +# setup_cap = Zebra("BL03S-EA-ZEBRA-01") +# return setup_cap + + +# zebra = asyncio.run(somefunc()) +# print("aha") diff --git a/tests/test_zebra.py b/tests/test_zebra.py new file mode 100644 index 0000000..f666598 --- /dev/null +++ b/tests/test_zebra.py @@ -0,0 +1,114 @@ +from typing import Any + +import pytest +from ophyd.v2.core import DeviceCollector + +from ophyd_epics_devices.zebra import ( + ArmSelect, + Bool, + Direction, + EncoderType, + GatePulseSelect, + TimeUnits, + Zebra, +) + +# Long enough for multiple asyncio event loop cycles to run so +# all the tasks have a chance to run +A_WHILE = 0.001 + + +@pytest.fixture +async def sim_zebra(): + async with DeviceCollector(sim=True): + sim_zebra = Zebra("BLxxI-MO-TABLE-01") + # Signals connected here + + assert sim_zebra.name == "sim_zebra" + await sim_zebra.reset() + + yield sim_zebra + + +@pytest.mark.parametrize( + "value", + [(Direction.positive), (Direction.negative)], +) +async def test_setting_direction(value: Any, sim_zebra: Zebra) -> None: + await sim_zebra.pc.setup.posn_dir.set(value) + assert await sim_zebra.pc.setup.posn_dir.get_value() == value + + +@pytest.mark.parametrize( + "value", + [TimeUnits.ms, TimeUnits.s, TimeUnits.s10], +) +async def test_setting_time_units(value: Any, sim_zebra: Zebra) -> None: + await sim_zebra.pc.setup.time_units.set(value) + assert await sim_zebra.pc.setup.time_units.get_value() == value + + +@pytest.mark.parametrize( + "value", + [GatePulseSelect.time, GatePulseSelect.external, GatePulseSelect.position], +) +async def test_setting_pulse_and_gate_selection(value: Any, sim_zebra: Zebra) -> None: + await sim_zebra.pc.pulse.trig_source.set(value) + assert await sim_zebra.pc.pulse.trig_source.get_value() == value + + await sim_zebra.pc.gate.trig_source.set(value) + assert await sim_zebra.pc.gate.trig_source.get_value() == value + + +@pytest.mark.parametrize( + "value", + [ArmSelect.external, ArmSelect.soft], +) +async def test_setting_arm_selection(value: Any, sim_zebra: Zebra) -> None: + await sim_zebra.pc.arm.trig_source.set(value) + assert await sim_zebra.pc.arm.trig_source.get_value() == value + + +@pytest.mark.parametrize( + "value", + [EncoderType.enc1, EncoderType.enc2, EncoderType.enc3, EncoderType.enc4], +) +async def test_setting_gate_trigger(value: Any, sim_zebra: Zebra) -> None: + await sim_zebra.pc.gate.trig_source.set(value) + assert await sim_zebra.pc.gate.trig_source.get_value() == value + + +@pytest.mark.parametrize( + "value", + [Bool.yes, Bool.no], +) +async def test_setting_capture_pvs(value: Any, sim_zebra: Zebra) -> None: + await sim_zebra.pc.setup.capture.div[1].set(value) + assert await sim_zebra.pc.setup.capture.div[1].get_value() == value + + await sim_zebra.pc.setup.capture.div[2].set(value) + assert await sim_zebra.pc.setup.capture.div[2].get_value() == value + + await sim_zebra.pc.setup.capture.div[3].set(value) + assert await sim_zebra.pc.setup.capture.div[3].get_value() == value + + await sim_zebra.pc.setup.capture.div[4].set(value) + assert await sim_zebra.pc.setup.capture.div[4].get_value() == value + + await sim_zebra.pc.setup.capture.sys[1].set(value) + assert await sim_zebra.pc.setup.capture.sys[1].get_value() == value + + await sim_zebra.pc.setup.capture.sys[2].set(value) + assert await sim_zebra.pc.setup.capture.sys[2].get_value() == value + + await sim_zebra.pc.setup.capture.enc[1].set(value) + assert await sim_zebra.pc.setup.capture.enc[1].get_value() == value + + await sim_zebra.pc.setup.capture.enc[2].set(value) + assert await sim_zebra.pc.setup.capture.enc[2].get_value() == value + + await sim_zebra.pc.setup.capture.enc[3].set(value) + assert await sim_zebra.pc.setup.capture.enc[3].get_value() == value + + await sim_zebra.pc.setup.capture.enc[4].set(value) + assert await sim_zebra.pc.setup.capture.enc[4].get_value() == value From 344de068248f4e1251f6a12ec33eac8f9d759dca Mon Sep 17 00:00:00 2001 From: Rose Yemelyanova Date: Thu, 3 Aug 2023 16:28:34 +0100 Subject: [PATCH 3/4] Modify pyproject toml to use correct ophyd package --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 1f261c8..ba0bfa3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,7 +14,7 @@ classifiers = [ ] description = "Cross-institution EPICS Devices for Ophyd" dependencies = [ - "ophyd @ git+https://github.com/bluesky/ophyd.git@sim-backend-numpy-typing", + "ophyd[ca] @ git+https://github.com/bluesky/ophyd.git", "bluesky", "event-model", "p4p", From 9b244adc57b0148deb303b99954dc48544387733 Mon Sep 17 00:00:00 2001 From: Rose Yemelyanova Date: Thu, 3 Aug 2023 16:31:10 +0100 Subject: [PATCH 4/4] Fix linting --- src/ophyd_epics_devices/zebra.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/ophyd_epics_devices/zebra.py b/src/ophyd_epics_devices/zebra.py index c193f3e..7a5aa8b 100644 --- a/src/ophyd_epics_devices/zebra.py +++ b/src/ophyd_epics_devices/zebra.py @@ -1,10 +1,8 @@ -import asyncio from enum import Enum import numpy as np import numpy.typing as npt -from bluesky import RunEngine -from ophyd.v2.core import Device, DeviceCollector, DeviceVector, SignalR, SignalRW +from ophyd.v2.core import Device, DeviceVector, SignalRW from ophyd.v2.epics import epics_signal_r, epics_signal_rw from typing_extensions import TypeAlias