Skip to content

Commit

Permalink
Add flank protection flag
Browse files Browse the repository at this point in the history
  • Loading branch information
arneboockmeyer committed Jun 27, 2024
1 parent 935a44c commit c9affcb
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 19 deletions.
26 changes: 18 additions & 8 deletions interlocking/interlockingcontroller/flankprotectioncontroller.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,34 @@ async def add_flank_protection_for_point(self, point: Point, point_orientation:
signals, points = self._get_flank_protection_elements_of_point(point, point_orientation, route)
results = []
for signal in signals:
signal.state = OccupancyState.FLANK_PROTECTION
signal.used_by.add(train_id)
results.append(await self.signal_controller.set_signal_halt(signal))
#signal.state = OccupancyState.FLANK_PROTECTION
#signal.used_by.add(train_id)
change_successful = await self.signal_controller.set_signal_halt(signal)
results.append(change_successful)
if change_successful:
signal.is_used_for_flank_protection = True
for point in points:
occupancy_state, orientation = points[point]
point.state = occupancy_state
point.used_by.add(train_id)
#point.state = occupancy_state
#point.used_by.add(train_id)
if orientation is not None:
# In case of a Schutztansportweiche the orientation is not relevant (None).
results.append(await self.point_controller.turn_point(point, orientation))
change_successful = await self.point_controller.turn_point(point, orientation)
results.append(change_successful)
if change_successful:
point.is_used_for_flank_protection = True
else:
point.is_used_for_flank_protection = True
return all(results)

def free_flank_protection_of_point(self, point: Point, point_orientation: str, route: Route, train_id: str):
signals, points = self._get_flank_protection_elements_of_point(point, point_orientation, route)
for signal in signals:
self.signal_controller.free_signal(signal, train_id)
#self.signal_controller.free_signal(signal, train_id)
signal.is_used_for_flank_protection = False
for point in points:
self.point_controller.set_point_free(point, train_id)
#self.point_controller.set_point_free(point, train_id)
point.is_used_for_flank_protection = False

def _get_flank_protection_elements_of_point(self,
point: Point,
Expand Down
4 changes: 4 additions & 0 deletions interlocking/interlockingcontroller/pointcontroller.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ async def turn_point(self, point, orientation):
if point.orientation == orientation:
# Everything is fine
return True
if point.is_used_for_flank_protection is True:
logging.error(f"Can not turn point of point {point.point_id} to {orientation}, "
f"since it is used for flank protection.")
return False
logging.info(f"--- Move point {point.point_id} to {orientation}")
# tasks = []
results = []
Expand Down
4 changes: 4 additions & 0 deletions interlocking/interlockingcontroller/signalcontroller.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ async def set_signal_aspect(self, signal, signal_aspect):
if signal.signal_aspect == signal_aspect:
# Everything is fine
return True
if signal.is_used_for_flank_protection is True:
logging.error(f"Can not set signal aspect of signal {signal.yaramo_signal.name} to {signal_aspect}, "
f"since it is used for flank protection.")
return False
logging.info(f"--- Set signal {signal.yaramo_signal.name} to {signal_aspect}")

results = []
Expand Down
1 change: 1 addition & 0 deletions interlocking/model/point.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ def __init__(self, yaramo_node):
self.orientation = "undefined" # either left, right or undefined
self.state = OccupancyState.FREE
self.used_by = set() # If point is reserved, occupied or part of an overlap, this contains the train numbers.
self.is_used_for_flank_protection = False
self.head = None
self.left = None
self.right = None
Expand Down
1 change: 1 addition & 0 deletions interlocking/model/signal.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ def __init__(self, yaramo_signal: YaramoSignal):
self.signal_aspect: str = "undefined" # Either halt or go
self.state: OccupancyState = OccupancyState.FREE
self.used_by = set() # If point is reserved, occupied or part of an overlap, this contains the train numbers.
self.is_used_for_flank_protection = False
self.track: Track or None = None
27 changes: 16 additions & 11 deletions test/flank-protection_test.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Example 1: Halt zeigendes Signal
# Example 2: Schutzweiche
# Example 3: Schutztransportweiche mit Schutzweise und halt zeigendem Signal
# Example 3: Schutztransportweiche mit Schutzweiche und halt zeigendem Signal
# Example 4: Two routes, that do not conflict, but each point is also flank protection of the other route

from .helper import topologyhelper, interlockinghelper
Expand Down Expand Up @@ -54,8 +54,9 @@ def test_example_1():
flank_protection_signal = interlockinghelper.get_interlocking_signal_by_name(interlocking, "99N3")
assert flank_protection_signal.signal_aspect == "halt"
assert track_operations_ip.was_signal_set_to_aspect(flank_protection_signal.yaramo_signal, "halt")
assert flank_protection_signal.state == OccupancyState.FLANK_PROTECTION
assert "RB101" in flank_protection_signal.used_by
# assert flank_protection_signal.state == OccupancyState.FLANK_PROTECTION
assert flank_protection_signal.is_used_for_flank_protection
# assert "RB101" in flank_protection_signal.used_by


def test_example_2():
Expand All @@ -79,8 +80,9 @@ def test_example_2():
flank_protection_point_id = "aed72"
flank_protection_point = interlocking.point_controller.points[flank_protection_point_id]
assert flank_protection_point.orientation == "left"
assert flank_protection_point.state == OccupancyState.FLANK_PROTECTION
assert "RB101" in flank_protection_point.used_by
# assert flank_protection_point.state == OccupancyState.FLANK_PROTECTION
assert flank_protection_point.is_used_for_flank_protection
# assert "RB101" in flank_protection_point.used_by


def test_example_3():
Expand All @@ -103,20 +105,23 @@ def test_example_3():

flank_protection_transport_point_id = "8fc1f" # n3
flank_protection_transport_point = interlocking.point_controller.points[flank_protection_transport_point_id]
assert flank_protection_transport_point.state == OccupancyState.FLANK_PROTECTION_TRANSPORT
assert "RB101" in flank_protection_transport_point.used_by
# assert flank_protection_transport_point.state == OccupancyState.FLANK_PROTECTION_TRANSPORT
assert flank_protection_transport_point.is_used_for_flank_protection
# assert "RB101" in flank_protection_transport_point.used_by

flank_protection_point_id = "da301" # n6
flank_protection_point = interlocking.point_controller.points[flank_protection_point_id]
assert point.orientation == "right"
assert flank_protection_point.state == OccupancyState.FLANK_PROTECTION
assert "RB101" in flank_protection_point.used_by
# assert flank_protection_point.state == OccupancyState.FLANK_PROTECTION
assert flank_protection_point.is_used_for_flank_protection
# assert "RB101" in flank_protection_point.used_by

flank_protection_signal = interlockinghelper.get_interlocking_signal_by_name(interlocking, "60E2")
assert flank_protection_signal.signal_aspect == "halt"
assert track_operations_ip.was_signal_set_to_aspect(flank_protection_signal.yaramo_signal, "halt")
assert flank_protection_signal.state == OccupancyState.FLANK_PROTECTION
assert "RB101" in flank_protection_signal.used_by
# assert flank_protection_signal.state == OccupancyState.FLANK_PROTECTION
assert flank_protection_signal.is_used_for_flank_protection
# assert "RB101" in flank_protection_signal.used_by


def test_example_4():
Expand Down

0 comments on commit c9affcb

Please sign in to comment.