Skip to content

Commit

Permalink
Implement ElaboratableWrapper
Browse files Browse the repository at this point in the history
Signed-off-by: Krzysztof Obłonczek <[email protected]>
  • Loading branch information
koblonczek committed Jan 2, 2024
1 parent ac313b2 commit 6385a04
Show file tree
Hide file tree
Showing 6 changed files with 572 additions and 1 deletion.
2 changes: 1 addition & 1 deletion .flake8
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
; Copyright (C) 2023 Antmicro
; SPDX-License-Identifier: Apache-2.0
[flake8]
ignore = E203, E266, E501, W503, F403, F401
ignore = E203, E266, E501, W503, F403, F401, F405
max-line-length = 100
max-complexity = 27
select = B,C,E,F,W,T4,B9
Expand Down
12 changes: 12 additions & 0 deletions docs/source/elaboratable_wrapper.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# ElaboratableWrapper class

{class}`ElaboratableWrapper` encapsulates an Amaranth's Elaboratable and exposes an interface compatible with other wrappers which allows making connections with them.
Supplied elaboratable must contain a `signature` property and a conforming interface as specified by [Amaranth docs](https://amaranth-lang.org/rfcs/0002-interfaces.html).
Ports' directionality, their names and widths are inferred from it.

```{eval-rst}
.. autoclass:: fpga_topwrap.elaboratable_wrapper.ElaboratableWrapper
:members:
.. automethod:: __init__
```
1 change: 1 addition & 0 deletions docs/source/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pipeline_manager
wrapper_port
ipwrapper
ipconnect
elaboratable_wrapper
helpers
fusesoc
```
Expand Down
174 changes: 174 additions & 0 deletions fpga_topwrap/elaboratable_wrapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
# Copyright (C) 2023 Antmicro
# SPDX-License-Identifier: Apache-2.0

from functools import cache
from typing import Iterable, Mapping, Union

from amaranth import *
from amaranth.build import Platform
from amaranth.hdl.ast import Assign, Shape
from amaranth.lib import wiring

from .amaranth_helpers import DIR_IN, DIR_OUT, WrapperPort
from .wrapper import Wrapper

SignalMapping = Mapping[str, Union[Signal, "SignalMapping"]]
InterfaceLike = Union[wiring.PureInterface, Elaboratable]


class ElaboratableWrapper(Wrapper):
"""Allows connecting an Amaranth's Elaboratable with other
classes derived from Wrapper.
"""

def __init__(self, name: str, elaboratable: Elaboratable) -> None:
"""
:param name: name of this wrapper
:param elaboratable: Amaranth's Elaboratable object to wrap
"""
super().__init__(name)
self.elaboratable = elaboratable
self.clk = self._cached_wrapper(
port_width=1, port_flow=wiring.In, name="clk", port_name="clk", iface_name=""
)
self.rst = self._cached_wrapper(
port_width=1, port_flow=wiring.In, name="rst", port_name="rst", iface_name=""
)

def get_ports(self) -> list[WrapperPort]:
"""Return a list of external ports."""
return self._flatten_hier(self.get_ports_hier())

def get_ports_hier(self) -> SignalMapping:
"""Maps elaboratable's Signature to a nested dictionary of WrapperPorts.
See _gather_signature_ports for more details.
"""
return self._gather_signature_ports(self.elaboratable.signature) | {
"clk": self.clk,
"rst": self.rst,
}

@cache
def _cached_wrapper(
self, port_width: int, port_flow: wiring.Flow, name: str, port_name: str, iface_name: str
) -> WrapperPort:
"""Constructs a WrapperPort, but only one instance per set of parameters in
a module is ever created. Multiple calls to this function with the identical
parameters return the same object.
:param port_width: width of the port
:param port_flow: directionality of the port, one of: wiring.In, wiring.Out
:param name: name of the port
:param port_name: original port name as it appears in the signature
:param iface_name: name of the interface the ports belongs to
"""
return WrapperPort(
bounds=[port_width - 1, 0, port_width - 1, 0],
name=name,
internal_name=port_name,
interface_name=iface_name,
direction=DIR_IN if port_flow == wiring.In else DIR_OUT,
)

def _gather_signature_ports(
self, signature: wiring.Signature, prefix: str = ""
) -> SignalMapping:
"""Maps a signature to a nested dictionary of WrapperPorts.
For example, an elaboratable with this signature:
Signature({
"data": Out(Signature({
"payload": Out(7),
"chksum": Out(1)
})),
"ready": In(1),
"valid": Out(1)
})
Translates to this dictionary structure (some details omitted for clarity):
{
"data": {
"payload": WrapperPort(
bounds=[6, 0, 6, 0],
name="data_payload",
internal_name="payload",
interface_name="data",
direction=DIR_OUT
),
"chksum": WrapperPort(...)
},
"ready": WrapperPort(
bounds=[0, 0, 0, 0],
name="ready",
internal_name="ready",
interface_name="",
direction=DIR_IN
),
"valid": WrapperPort(...)
}
:param signature: Amaranth's Signature to map to a dictionary
:param prefix: optional interface prefix to prepend to the name of all ports
"""
iface = {}
for port_name, port in signature.members.items():
name = f"{prefix}_{port_name}" if prefix else port_name
if port.is_signature:
inner_iface = self._gather_signature_ports(port.signature, prefix=name)
iface[port_name] = inner_iface
else:
iface[port_name] = self._cached_wrapper(
Shape.cast(port.shape).width, port.flow, name, port_name, prefix
)
return iface

def _flatten_hier(self, hier: SignalMapping) -> Iterable[Signal]:
"""Flattens a nested dictionary with WrapperPorts.
:param hier: a (nested) dictionary of WrapperPorts
"""
ports = []
try:
for _, port in hier.items():
ports += self._flatten_hier(port)
except AttributeError:
ports += [hier]
return ports

def _connect_ports(self, ports: SignalMapping, iface: InterfaceLike) -> list[Assign]:
"""Returns a list of amaranth assignments between the wrapped elaboratable and external ports.
:param ports: nested dictionary of WrapperPorts mirroring that of iface's signature
:param iface: Amaranth Interface to make connections with
"""
conns = []
for port_name, port in iface.signature.members.items():
iface_port = getattr(iface, port_name)
if port.is_signature:
conns += self._connect_ports(ports[port_name], iface_port)
else:
if port.flow == wiring.In:
conns.append(iface_port.eq(ports[port_name]))
elif port.flow == wiring.Out:
conns.append(ports[port_name].eq(iface_port))
else:
raise TypeError(f"Invalid InOut flow direction in signal '{port_name}'")
return conns

def elaborate(self, platform: Platform) -> Module:
m = Module()

# create an internal clock domain that doesn't propagate upwards in the submodule
# tree and assign clk and rst specified by the user to the internal domain signals
cd = ClockDomain(self.name, local=True)
m.d.comb += ClockSignal(self.name).eq(self.clk)
m.d.comb += ResetSignal(self.name).eq(self.rst)
m.domains += cd

# make the elaboratable use the new clock domain internally
m.submodules += DomainRenamer(self.name)(self.elaboratable)

m.d.comb += self._connect_ports(self.get_ports_hier(), self.elaboratable)

return m
48 changes: 48 additions & 0 deletions fpga_topwrap/wrapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Copyright (C) 2021 Antmicro
# SPDX-License-Identifier: Apache-2.0

from typing import List

from amaranth import *

from .amaranth_helpers import WrapperPort


class Wrapper(Elaboratable):
"""Base class for modules that want to connect to each other.
Derived classes must implement get_ports method that returns
a list of WrapperPort's - external ports of a class that can
be used as endpoints for connections.
"""

def __init__(self, name: str) -> None:
self.name = name

@property
def get_ports(self) -> List[WrapperPort]:
"""Return a list of external ports."""
raise NotImplementedError('Derived classes must implement "get_ports" method')

def get_port_by_name(self, name: str) -> WrapperPort:
"""Given port's name, return the port as WrapperPort object.
:raises ValueError: If such port doesn't exist.
"""
try:
port = {signal.name: signal for signal in self.get_ports()}[name]
except KeyError:
raise ValueError(f"Port named '{name}' couldn't be found in the hierarchy: {self.name}")
return port

def get_ports_of_interface(self, iface_name: str) -> List[WrapperPort]:
"""Return a list of ports of specific interface.
:raises ValueError: if such interface doesn't exist.
"""
ports = [
port for port in filter(lambda x: x.interface_name == iface_name, self.get_ports())
]
if not ports:
raise ValueError(f"No ports could be found for this interface name: {iface_name}")
return ports
Loading

0 comments on commit 6385a04

Please sign in to comment.