diff --git a/interlocking/interlockingcontroller/flankprotectioncontroller.py b/interlocking/interlockingcontroller/flankprotectioncontroller.py index 78a8205..4e3063d 100644 --- a/interlocking/interlockingcontroller/flankprotectioncontroller.py +++ b/interlocking/interlockingcontroller/flankprotectioncontroller.py @@ -18,24 +18,34 @@ async def add_flank_protection_for_point(self, point: Point, point_orientation: signals, points = self._get_flank_protection_elements_of_point(point, point_orientation, route) results = [] for signal in signals: - signal.state = OccupancyState.FLANK_PROTECTION - signal.used_by.add(train_id) - results.append(await self.signal_controller.set_signal_halt(signal)) + #signal.state = OccupancyState.FLANK_PROTECTION + #signal.used_by.add(train_id) + change_successful = await self.signal_controller.set_signal_halt(signal) + results.append(change_successful) + if change_successful: + signal.is_used_for_flank_protection = True for point in points: occupancy_state, orientation = points[point] - point.state = occupancy_state - point.used_by.add(train_id) + #point.state = occupancy_state + #point.used_by.add(train_id) if orientation is not None: # In case of a Schutztansportweiche the orientation is not relevant (None). - results.append(await self.point_controller.turn_point(point, orientation)) + change_successful = await self.point_controller.turn_point(point, orientation) + results.append(change_successful) + if change_successful: + point.is_used_for_flank_protection = True + else: + point.is_used_for_flank_protection = True return all(results) def free_flank_protection_of_point(self, point: Point, point_orientation: str, route: Route, train_id: str): signals, points = self._get_flank_protection_elements_of_point(point, point_orientation, route) for signal in signals: - self.signal_controller.free_signal(signal, train_id) + #self.signal_controller.free_signal(signal, train_id) + signal.is_used_for_flank_protection = False for point in points: - self.point_controller.set_point_free(point, train_id) + #self.point_controller.set_point_free(point, train_id) + point.is_used_for_flank_protection = False def _get_flank_protection_elements_of_point(self, point: Point, diff --git a/interlocking/interlockingcontroller/pointcontroller.py b/interlocking/interlockingcontroller/pointcontroller.py index 3386147..1b3a17f 100644 --- a/interlocking/interlockingcontroller/pointcontroller.py +++ b/interlocking/interlockingcontroller/pointcontroller.py @@ -57,6 +57,10 @@ async def turn_point(self, point, orientation): if point.orientation == orientation: # Everything is fine return True + if point.is_used_for_flank_protection is True: + logging.error(f"Can not turn point of point {point.point_id} to {orientation}, " + f"since it is used for flank protection.") + return False logging.info(f"--- Move point {point.point_id} to {orientation}") # tasks = [] results = [] diff --git a/interlocking/interlockingcontroller/signalcontroller.py b/interlocking/interlockingcontroller/signalcontroller.py index 69e97ee..9beb889 100644 --- a/interlocking/interlockingcontroller/signalcontroller.py +++ b/interlocking/interlockingcontroller/signalcontroller.py @@ -31,6 +31,10 @@ async def set_signal_aspect(self, signal, signal_aspect): if signal.signal_aspect == signal_aspect: # Everything is fine return True + if signal.is_used_for_flank_protection is True: + logging.error(f"Can not set signal aspect of signal {signal.yaramo_signal.name} to {signal_aspect}, " + f"since it is used for flank protection.") + return False logging.info(f"--- Set signal {signal.yaramo_signal.name} to {signal_aspect}") results = [] diff --git a/interlocking/model/point.py b/interlocking/model/point.py index 4382f37..55c1558 100644 --- a/interlocking/model/point.py +++ b/interlocking/model/point.py @@ -11,6 +11,7 @@ def __init__(self, yaramo_node): self.orientation = "undefined" # either left, right or undefined self.state = OccupancyState.FREE self.used_by = set() # If point is reserved, occupied or part of an overlap, this contains the train numbers. + self.is_used_for_flank_protection = False self.head = None self.left = None self.right = None diff --git a/interlocking/model/signal.py b/interlocking/model/signal.py index aa99823..76cdc72 100644 --- a/interlocking/model/signal.py +++ b/interlocking/model/signal.py @@ -11,4 +11,5 @@ def __init__(self, yaramo_signal: YaramoSignal): self.signal_aspect: str = "undefined" # Either halt or go self.state: OccupancyState = OccupancyState.FREE self.used_by = set() # If point is reserved, occupied or part of an overlap, this contains the train numbers. + self.is_used_for_flank_protection = False self.track: Track or None = None diff --git a/test/flank-protection_test.py b/test/flank-protection_test.py index 1f219a0..18a990f 100644 --- a/test/flank-protection_test.py +++ b/test/flank-protection_test.py @@ -1,6 +1,6 @@ # Example 1: Halt zeigendes Signal # Example 2: Schutzweiche -# Example 3: Schutztransportweiche mit Schutzweise und halt zeigendem Signal +# Example 3: Schutztransportweiche mit Schutzweiche und halt zeigendem Signal # Example 4: Two routes, that do not conflict, but each point is also flank protection of the other route from .helper import topologyhelper, interlockinghelper @@ -54,8 +54,9 @@ def test_example_1(): flank_protection_signal = interlockinghelper.get_interlocking_signal_by_name(interlocking, "99N3") assert flank_protection_signal.signal_aspect == "halt" assert track_operations_ip.was_signal_set_to_aspect(flank_protection_signal.yaramo_signal, "halt") - assert flank_protection_signal.state == OccupancyState.FLANK_PROTECTION - assert "RB101" in flank_protection_signal.used_by + # assert flank_protection_signal.state == OccupancyState.FLANK_PROTECTION + assert flank_protection_signal.is_used_for_flank_protection + # assert "RB101" in flank_protection_signal.used_by def test_example_2(): @@ -79,8 +80,9 @@ def test_example_2(): flank_protection_point_id = "aed72" flank_protection_point = interlocking.point_controller.points[flank_protection_point_id] assert flank_protection_point.orientation == "left" - assert flank_protection_point.state == OccupancyState.FLANK_PROTECTION - assert "RB101" in flank_protection_point.used_by + # assert flank_protection_point.state == OccupancyState.FLANK_PROTECTION + assert flank_protection_point.is_used_for_flank_protection + # assert "RB101" in flank_protection_point.used_by def test_example_3(): @@ -103,20 +105,23 @@ def test_example_3(): flank_protection_transport_point_id = "8fc1f" # n3 flank_protection_transport_point = interlocking.point_controller.points[flank_protection_transport_point_id] - assert flank_protection_transport_point.state == OccupancyState.FLANK_PROTECTION_TRANSPORT - assert "RB101" in flank_protection_transport_point.used_by + # assert flank_protection_transport_point.state == OccupancyState.FLANK_PROTECTION_TRANSPORT + assert flank_protection_transport_point.is_used_for_flank_protection + # assert "RB101" in flank_protection_transport_point.used_by flank_protection_point_id = "da301" # n6 flank_protection_point = interlocking.point_controller.points[flank_protection_point_id] assert point.orientation == "right" - assert flank_protection_point.state == OccupancyState.FLANK_PROTECTION - assert "RB101" in flank_protection_point.used_by + # assert flank_protection_point.state == OccupancyState.FLANK_PROTECTION + assert flank_protection_point.is_used_for_flank_protection + # assert "RB101" in flank_protection_point.used_by flank_protection_signal = interlockinghelper.get_interlocking_signal_by_name(interlocking, "60E2") assert flank_protection_signal.signal_aspect == "halt" assert track_operations_ip.was_signal_set_to_aspect(flank_protection_signal.yaramo_signal, "halt") - assert flank_protection_signal.state == OccupancyState.FLANK_PROTECTION - assert "RB101" in flank_protection_signal.used_by + # assert flank_protection_signal.state == OccupancyState.FLANK_PROTECTION + assert flank_protection_signal.is_used_for_flank_protection + # assert "RB101" in flank_protection_signal.used_by def test_example_4():