Skip to content

Commit

Permalink
Creates code for launching navigation algorithms with visualization
Browse files Browse the repository at this point in the history
- Creates utils for running decentralized discrete navigation
- Creates utils for solution visualization
- Adds A* algorithm implementation
- Adds basic navigation politics (random, random + obstacle collision avoidance, A*-based)
- Adds jupyter notebook with random problem example
  • Loading branch information
haiot4105 committed Apr 4, 2024
1 parent 033d520 commit 7bde6c5
Show file tree
Hide file tree
Showing 12 changed files with 1,291 additions and 1 deletion.
8 changes: 8 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
.idea/*
exp/__pycache__/*
exp/.ipynb_checkpoints/*
dec_tswap/__pycache__/*
exp/animated_trajectories.apng
exp/animated_trajectories.gif
exp/animated_trajectories.png
exp/example-Copy1.ipynb
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1 +1 @@
# Decentralized TSWAP
# Decentralized TSWAP
Empty file added dec_tswap/__init__.py
Empty file.
179 changes: 179 additions & 0 deletions dec_tswap/agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
from typing import Callable, Dict, Iterable, List, Optional, Tuple, Type, Union
from manavlib.gen.params import DiscreteAgentParams, BaseAlgParams
import numpy.typing as npt
from enum import Enum
import numpy as np
import random

from dec_tswap.map import Map
from dec_tswap.astar_algorithm import astar_search, manhattan_distance, make_path


class DecTSWAPParams(BaseAlgParams):
def __init__(self) -> None:
super().__init__()
pass


class Message:
def __init__(self):
self.pos: npt.NDArray | None = None
self.next_pos: npt.NDArray | None = None
self.priority: int | None = None


class Action(Enum):
WAIT = (0, 0)
UP = (-1, 0)
DOWN = (1, 0)
LEFT = (0, -1)
RIGHT = (0, 1)


class Agent:
def __init__(self,
a_id: int,
ag_params: DiscreteAgentParams,
alg_params: BaseAlgParams,
grid_map: npt.NDArray,
goals: npt.NDArray):
self.a_id = a_id
self.ag_params = ag_params
self.alg_params = alg_params
self.grid_map = grid_map
self.goals = goals

def update_neighbors_info(self, neighbors_info: List[Message]) -> None:
raise NotImplementedError

def compute_action(self) -> Action:
raise NotImplementedError

def update_state_info(self, new_pos: npt.NDArray) -> None:
raise NotImplementedError

def send_message(self) -> Message:
raise NotImplementedError


class RandomAgent(Agent):
def __init__(self,
a_id: int,
ag_params: DiscreteAgentParams,
alg_params: BaseAlgParams,
grid_map: npt.NDArray,
goals: npt.NDArray):
super().__init__(a_id, ag_params, alg_params, grid_map, goals)
pass

def update_neighbors_info(self, neighbors_info: List[Message]) -> None:
pass

def compute_action(self) -> npt.NDArray:
return np.array(random.choice(list(Action)).value)

def update_state_info(self, new_pos: npt.NDArray) -> None:
pass

def send_message(self) -> Message:
return Message()


class SmartRandomAgent(Agent):
def __init__(self,
a_id: int,
ag_params: DiscreteAgentParams,
alg_params: BaseAlgParams,
grid_map: npt.NDArray,
goals: npt.NDArray):
super().__init__(a_id, ag_params, alg_params, grid_map, goals)
self.pos = None

def update_neighbors_info(self, neighbors_info: List[Message]) -> None:
pass

def compute_action(self) -> npt.NDArray:
actions = list(Action)
actions.remove(Action.WAIT)
while len(actions):
action = random.choice(actions)
actions.remove(action)
action = np.array(action.value)
predicted_pos = self.pos + action
h, w = self.grid_map.shape
i, j = predicted_pos
if not ((0 <= i < h) and (0 <= j < w)):
continue
if self.grid_map[i, j]:
continue

return action

return np.array(Action.WAIT.value)

def update_state_info(self, new_pos: npt.NDArray) -> None:
self.pos = new_pos

def send_message(self) -> Message:
return Message()


class AStarAgent(Agent):
def __init__(self,
a_id: int,
ag_params: DiscreteAgentParams,
alg_params: BaseAlgParams,
grid_map: npt.NDArray,
goals: npt.NDArray):
super().__init__(a_id, ag_params, alg_params, grid_map, goals)
self.pos = None
self.neighbors_info = None
self.path = []
self.goal_chosen = False
self.goal = None
self.search_map = Map(self.grid_map)
self.path_exist = False

def update_neighbors_info(self, neighbors_info: List[Message]) -> None:
self.neighbors_info = neighbors_info

def compute_action(self) -> npt.NDArray:

if not self.goal_chosen:
self.choose_goal()
start_i, start_j = self.pos
goal_i, goal_j = self.goal
path_found, last_node, length = astar_search(self.search_map, start_i, start_j, goal_i, goal_j,
manhattan_distance)
self.path = make_path(last_node)[:-1]

if not self.path_exist or len(self.path) == 0:
return np.array(Action.WAIT.value)
next_pos = np.array(self.path.pop())
action = (next_pos - self.pos)
return action

def update_state_info(self, new_pos: npt.NDArray) -> None:
self.pos = new_pos

def send_message(self) -> Message:
message = Message()
message.pos = self.pos
return message

def choose_goal(self) -> None:
if self.goal_chosen:
return

start_i, start_j = self.pos
min_len = np.inf

for goal_i, goal_j in self.goals:
path_found, last_node, length = astar_search(self.search_map, start_i, start_j, goal_i, goal_j,
manhattan_distance)
if not path_found:
continue
self.path_exist = True
if length < min_len:
min_len = length
self.goal = np.array((goal_i, goal_j))
108 changes: 108 additions & 0 deletions dec_tswap/astar_algorithm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
from dec_tswap.map import Map, compute_cost
from dec_tswap.search_tree import SearchTree
from dec_tswap.node import Node
from typing import Callable, Dict, Iterable, List, Optional, Tuple, Type, Union
import numpy as np


def manhattan_distance(i1: int, j1: int, i2: int, j2: int) -> int:
"""
Computes the Manhattan distance between two cells on a grid.
Parameters
----------
i1, j1 : int
(i, j) coordinates of the first cell on the grid.
i2, j2 : int
(i, j) coordinates of the second cell on the grid.
Returns
-------
int
Manhattan distance between the two cells.
"""
return abs(i1 - i2) + abs(j1 - j2)


def astar_search(
task_map: Map,
start_i: int,
start_j: int,
goal_i: int,
goal_j: int,
heuristic_func: Callable
) -> Tuple[bool, Optional[Node], Optional[int]]:
"""
Implements the A* search algorithm.
Parameters
----------
task_map : Map
The grid or map being searched.
start_i, start_j : int, int
Starting coordinates.
goal_i, goal_j : int, int
Goal coordinates.
heuristic_func : Callable
Heuristic function for estimating the distance from a node to the goal.
Returns
-------
Tuple[bool, Optional[Node], int, int, Optional[Iterable[Node]], Optional[Iterable[Node]]]
Tuple containing:
- A boolean indicating if a path was found.
- The last node in the found path or None.
- Path length
"""
ast = SearchTree()
steps = 0
start_node = Node(start_i, start_j, g=0, h=heuristic_func(start_i, start_j, goal_i, goal_j))
ast.add_to_open(start_node)

while not ast.open_is_empty():
current = ast.get_best_node_from_open()

if current is None:
break

ast.add_to_closed(current)

if (current.i, current.j) == (goal_i, goal_j):
return True, current, current.g

for i, j in task_map.get_neighbors(current.i, current.j):
new_node = Node(i, j)
if not ast.was_expanded(new_node):
new_node.g = current.g + compute_cost(current.i, current.j, i, j)
new_node.h = heuristic_func(i, j, goal_i, goal_j)
new_node.f = new_node.g + new_node.h
new_node.parent = current
ast.add_to_open(new_node)

steps += 1

return False, None, None


def make_path(goal: Node) -> List[Node]:
"""
Creates a path by tracing parent pointers from the goal node to the start node.
It also returns the path's length.
Parameters
----------
goal : Node
Pointer to the goal node in the search tree.
Returns
-------
Tuple[List[Node], float]
Path and its length.
"""
current = goal
path = []
while current.parent:
path.append((current.i, current.j))
current = current.parent
path.append(current)
return path
Loading

0 comments on commit 7bde6c5

Please sign in to comment.