Skip to content

Commit

Permalink
more typing
Browse files Browse the repository at this point in the history
  • Loading branch information
Christian-B committed Dec 20, 2024
1 parent 4ccf971 commit 8e5febc
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

from enum import Enum
from typing import Optional
from spinn_utilities.abstract_base import AbstractBase, abstractmethod


Expand Down Expand Up @@ -95,7 +96,7 @@ def device_control_max_value(self) -> float:

@property
@abstractmethod
def device_control_timesteps_between_sending(self) -> int:
def device_control_timesteps_between_sending(self) -> Optional[int]:
"""
The number of timesteps between sending commands to the device.
This defines the "sampling interval" for the device.
Expand Down Expand Up @@ -124,10 +125,8 @@ def device_control_scaling_factor(self) -> int: # pragma: no cover
return 1

@property
def device_control_first_send_timestep(self) -> int:
def device_control_first_send_timestep(self) -> Optional[int]:
"""
The first timestep that the device should send in (0 by default).
:rtype: int
"""
return 0
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Iterable

from spynnaker.pyNN.external_devices_models import ExternalDeviceLifControl
from spynnaker.pyNN.external_devices_models.push_bot.ethernet import (
PushBotEthernetDevice)
from spynnaker.pyNN.protocols import MunichIoSpiNNakerLinkProtocol
from spynnaker.pyNN.models.defaults import default_initial_values

Expand Down Expand Up @@ -41,12 +45,15 @@ class PushBotLifSpinnakerLink(ExternalDeviceLifControl):

@default_initial_values({"v", "isyn_exc", "isyn_inh"})
def __init__(
self, protocol, devices,
self, protocol: MunichIoSpiNNakerLinkProtocol,
devices: Iterable[PushBotEthernetDevice],

# default params for the neuron model type
tau_m=20.0, cm=1.0, v_rest=0.0, v_reset=0.0, tau_syn_E=5.0,
tau_syn_I=5.0, tau_refrac=0.1, i_offset=0.0, v=0.0,
isyn_exc=0.0, isyn_inh=0.0):
tau_m: float = 20.0, cm: float = 1.0, v_rest: float = 0.0,
v_reset: float = 0.0, tau_syn_E: float = 5.0,
tau_syn_I: float = 5.0, tau_refrac: float = 0.1,
i_offset: float = 0.0, v: float = 0.0, isyn_exc: float = 0.0,
isyn_inh: float = 0.0):
# pylint: disable=too-many-arguments

command_protocol = MunichIoSpiNNakerLinkProtocol(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Optional

from spinn_utilities.overrides import overrides
from spinn_utilities.abstract_base import AbstractBase, abstractmethod
from spynnaker.pyNN.external_devices_models import (
AbstractMulticastControllableDevice, SendType)
from spynnaker.pyNN.external_devices_models.push_bot import (
AbstractPushBotOutputDevice)
from spynnaker.pyNN.protocols import MunichIoSpiNNakerLinkProtocol


# The default timestep to use for first send. Avoids clashes with other
# control commands.
_DEFAULT_FIRST_SEND_TIMESTEP = 100
Expand All @@ -31,9 +34,10 @@ class PushBotEthernetDevice(
"""

def __init__(
self, protocol: MunichIoSpiNNakerLinkProtocol, device,
uses_payload, time_between_send,
first_send_timestep=_DEFAULT_FIRST_SEND_TIMESTEP):
self, protocol: MunichIoSpiNNakerLinkProtocol,
device: AbstractPushBotOutputDevice,
uses_payload: bool, time_between_send: Optional[int],
first_send_timestep: Optional[int] =_DEFAULT_FIRST_SEND_TIMESTEP):
"""
:param MunichIoSpiNNakerLinkProtocol protocol:
The protocol instance to get commands from
Expand Down Expand Up @@ -80,7 +84,7 @@ def device_control_max_value(self) -> float:
@property
@overrides(AbstractMulticastControllableDevice
.device_control_timesteps_between_sending)
def device_control_timesteps_between_sending(self) -> int:
def device_control_timesteps_between_sending(self) -> Optional[int]:
return self.__time_between_send

@property
Expand All @@ -92,7 +96,7 @@ def device_control_send_type(self) -> SendType:
@property
@overrides(AbstractMulticastControllableDevice
.device_control_first_send_timestep)
def device_control_first_send_timestep(self) -> int:
def device_control_first_send_timestep(self) -> Optional[int]:
return self.__first_send_timestep

@property
Expand All @@ -106,7 +110,7 @@ def protocol(self) -> MunichIoSpiNNakerLinkProtocol:

@abstractmethod
def set_command_protocol(
self, command_protocol: MunichIoSpiNNakerLinkProtocol):
self, command_protocol: MunichIoSpiNNakerLinkProtocol) -> None:
"""
Set the protocol use to send setup and shutdown commands,
separately from the protocol used to control the device.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,21 @@
# limitations under the License.

from threading import RLock
from typing import Optional

import numpy

from spinnman.connections import ConnectionListener

from spinn_front_end_common.utilities.connections import LiveEventConnection
from spinn_front_end_common.utilities.constants import BYTES_PER_SHORT

from spynnaker.pyNN.connections import SpynnakerLiveSpikesConnection
from spynnaker.pyNN.external_devices_models.push_bot.parameters import (
PushBotRetinaResolution)

from .push_bot_wifi_connection import PushBotWIFIConnection

# Each value is a 16-bit 1yyyyyyy.pxxxxxxx
_RETINA_PACKET_SIZE = BYTES_PER_SHORT
_MIN_PIXEL_VALUE = 32768
Expand Down Expand Up @@ -54,9 +62,12 @@ class PushBotRetinaConnection(SpynnakerLiveSpikesConnection):
"__ready")

def __init__(
self, retina_injector_label, pushbot_wifi_connection,
resolution=PushBotRetinaResolution.NATIVE_128_X_128,
local_host=None, local_port=None):
self, retina_injector_label: str,
pushbot_wifi_connection: PushBotWIFIConnection,
resolution: PushBotRetinaResolution = (
PushBotRetinaResolution.NATIVE_128_X_128),
local_host: Optional[str] = None,
local_port: Optional[int] = None):
"""
:param str retina_injector_label:
:param PushBotWIFIConnection pushbot_wifi_connection:
Expand Down Expand Up @@ -88,7 +99,7 @@ def __init__(
self.__pushbot_listener.start()
self.__lock = RLock()

self.__next_data = None
self.__next_data: Optional[bytearray] = None
self.__ready = False

self.add_start_resume_callback(
Expand All @@ -97,16 +108,18 @@ def __init__(
retina_injector_label, self.__push_bot_stop)

# pylint: disable=unused-argument
def __push_bot_start(self, label, connection):
def __push_bot_start(
self, label: str, connection: LiveEventConnection) -> None:
with self.__lock:
self.__ready = True

# pylint: disable=unused-argument
def __push_bot_stop(self, label, connection):
def __push_bot_stop(
self, label: str, connection: LiveEventConnection) -> None:
with self.__lock:
self.__ready = False

def _receive_retina_data(self, data):
def _receive_retina_data(self, data: bytearray) -> None:
"""
Receive retina packets from the PushBot and converts them into
neuron spikes within the spike injector system.
Expand Down Expand Up @@ -134,7 +147,7 @@ def _receive_retina_data(self, data):
self.__next_data = data[i:i+1]

# Filter out the usable data
data_filtered = numpy.fromstring(data_all, dtype=numpy.uint16)
data_filtered = numpy.frombuffer(data_all, dtype=numpy.uint16)
y_values = (data_filtered >> self.__orig_y_shift) & self.__y_mask
x_values = (data_filtered >> self.__orig_x_shift) & self.__x_mask
polarity = (data_filtered >> _P_SHIFT) & _P_MASK
Expand Down

0 comments on commit 8e5febc

Please sign in to comment.