diff --git a/.dockerignore b/.dockerignore index 0d9f9e0..1704ceb 100644 --- a/.dockerignore +++ b/.dockerignore @@ -15,4 +15,6 @@ README.md requirements.txt run.ps1 run.sh -maps4fs.zip \ No newline at end of file +maps4fs.zip +.DS_Store +maps/ \ No newline at end of file diff --git a/.gitignore b/.gitignore index 4397cb6..db497a9 100644 --- a/.gitignore +++ b/.gitignore @@ -7,4 +7,6 @@ bot.env logs/ archives/ previews/ -maps4fs.zip \ No newline at end of file +maps4fs.zip +.DS_Store +maps/ \ No newline at end of file diff --git a/.vscode/launch.json b/.vscode/launch.json index e4fc6be..6db8688 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -1,11 +1,22 @@ { "version": "0.2.0", "configurations": [ + { + "name": "Current File", + "type": "python", + "request": "launch", + "program": "${file}", + "console": "integratedTerminal", + "justMyCode": false, + "env": { + "PYTHONPATH": "${workspaceFolder}${pathSeparator}${env:PYTHONPATH}" + } + }, { "name": "Windows Script", "type": "python", "request": "launch", - "program": "src/main.py", + "program": "maps4fs/ui.py", "console": "integratedTerminal", "justMyCode": true, "env": { @@ -29,7 +40,7 @@ "name": "Linux / Mac Script", "type": "python", "request": "launch", - "program": "src/main.py", + "program": "maps4fs/ui.py", "console": "integratedTerminal", "justMyCode": true, "env": { @@ -41,7 +52,7 @@ "name": "Linux / Mac Bot", "type": "python", "request": "launch", - "program": "src/bot.py", + "program": "bot/bot.py", "console": "integratedTerminal", "justMyCode": true, "env": { diff --git a/Dockerfile b/Dockerfile index 7320cb3..a8ae5b9 100644 --- a/Dockerfile +++ b/Dockerfile @@ -14,4 +14,4 @@ COPY . . RUN pip install -r bot_requirements.txt -CMD ["python", "-u", "./src/bot.py"] \ No newline at end of file +CMD ["python", "-u", "./bot/bot.py"] \ No newline at end of file diff --git a/src/bot.py b/bot/bot.py similarity index 91% rename from src/bot.py rename to bot/bot.py index a1d95e9..616a379 100644 --- a/src/bot.py +++ b/bot/bot.py @@ -9,14 +9,18 @@ KeyboardButton, ReplyKeyboardMarkup, ) +from bot_templates import Buttons, Messages from dotenv import load_dotenv -import generate -from bot_templates import Buttons, Messages -from logger import Logger +import maps4fs as mfs -logger = Logger(__name__) +logger = mfs.Logger(__name__, level="DEBUG") working_directory = os.getcwd() +map_template = os.path.join(working_directory, "data", "map-template.zip") +maps_directory = os.path.join(working_directory, "maps") +archives_directory = os.path.join(working_directory, "archives") +os.makedirs(maps_directory, exist_ok=True) +os.makedirs(archives_directory, exist_ok=True) logger.info(f"Working directory: {working_directory}") env_path = os.path.join(working_directory, "bot.env") if os.path.exists(env_path): @@ -25,7 +29,6 @@ if not token: raise RuntimeError("No token provided.") -logger = Logger(__name__) bot = Bot(token=token) dp = Dispatcher(bot=bot) @@ -52,9 +55,10 @@ def __init__(self, telegram_id: int, coordinates: tuple[float, float]): self.telegram_id = telegram_id self.timestamp = int(datetime.now().timestamp()) self.name = f"{self.telegram_id}_{self.timestamp}" + self.map_directory = os.path.join(maps_directory, self.name) self.coordinates = coordinates self.distance = None - self.dem_settings = None + self.max_height = None def run(self) -> tuple[str, str]: """Runs the session and returns paths to the preview and the archive. @@ -62,12 +66,18 @@ def run(self) -> tuple[str, str]: Returns: tuple[str, str]: Paths to the preview and the archive. """ - gm = generate.Map( - working_directory, self.coordinates, self.distance, self.dem_settings, logger, self.name + mp = mfs.Map( + self.coordinates, + self.distance, + self.map_directory, + blur_seed=5, + max_height=self.max_height, + map_template=map_template, + logger=logger, ) - gm.info_sequence() - preview_path = gm.preview() - archive_path = gm.pack() + mp.generate() + preview_path = mp.previews()[0] + archive_path = mp.pack(os.path.join(archives_directory, self.name)) return preview_path, archive_path @@ -193,7 +203,7 @@ async def coordinates(message: types.Message) -> None: telegram_id = message.from_user.id sessions[telegram_id] = Session(telegram_id, (latitude, longitude)) - sizes = generate.MAP_SIZES + sizes = mfs.globals.MAP_SIZES indicators = ["🟢", "🟢", "🟡", "🔴"] buttons = {} # * Slice sizes because VPS can not handle large images. @@ -224,7 +234,7 @@ async def map_size_callback(callback_query: types.CallbackQuery) -> None: return session.distance = int(map_size / 2) - heights = generate.MAX_HEIGHTS + heights = mfs.globals.MAX_HEIGHTS buttons = {} for height, description in heights.items(): buttons[f"max_height_{height}"] = description @@ -251,7 +261,7 @@ async def max_height_callback(callback_query: types.CallbackQuery) -> None: if not session: return - session.dem_settings = generate.DemSettings(5, max_height) + session.max_height = max_height await bot.send_message( callback_query.from_user.id, diff --git a/src/bot_templates.py b/bot/bot_templates.py similarity index 94% rename from src/bot_templates.py rename to bot/bot_templates.py index 4868cae..2650128 100644 --- a/src/bot_templates.py +++ b/bot/bot_templates.py @@ -20,12 +20,12 @@ class Messages(Enum): ENTER_COORDINATES = ( "Enter the coordinates of the center of the map\." "The coordinates are latitude and longitude separated by a comma\.\n\n" - "For example: `52\.520008, 13\.404954`\n\n" + "For example: `45\.2602, 19\.8086`\n\n" "You can obtain them by right\-clicking on the map in [Google Maps](https://www.google.com/maps)\." ) WRONG_COORDINATES = ( "Please enter the coordinates in the correct format\.\n\n" - "For example: `52\.520008, 13\.404954`\n\n" + "For example: `45\.2602, 19\.8086`\n\n" ) SELECT_MAP_SIZE = ( diff --git a/bot_requirements.txt b/bot_requirements.txt index 91b7c48..93f941a 100644 --- a/bot_requirements.txt +++ b/bot_requirements.txt @@ -2,4 +2,5 @@ opencv-python==4.9.0.80 osmnx==1.8.1 rasterio==1.3.9 python_dotenv==1.0.0 -aiogram==2.25.1 \ No newline at end of file +aiogram==2.25.1 +tqdm==4.66.2 \ No newline at end of file diff --git a/build.sh b/build.sh deleted file mode 100644 index 029dc0f..0000000 --- a/build.sh +++ /dev/null @@ -1,5 +0,0 @@ -#!/bin/bash - -echo "Starting building Docker image..." -docker build --platform linux/amd64 -t iwatkot/maps4fs . -echo "Docker image has been successfully built, exiting..." \ No newline at end of file diff --git a/maps4fs/__init__.py b/maps4fs/__init__.py new file mode 100644 index 0000000..7acb703 --- /dev/null +++ b/maps4fs/__init__.py @@ -0,0 +1,2 @@ +from maps4fs.generator.map import Map +from maps4fs.logger import Logger diff --git a/maps4fs/generator/__init__.py b/maps4fs/generator/__init__.py new file mode 100644 index 0000000..e7996ef --- /dev/null +++ b/maps4fs/generator/__init__.py @@ -0,0 +1,6 @@ +from maps4fs.generator.component import Component +from maps4fs.generator.config import Config +from maps4fs.generator.dem import DEM +from maps4fs.generator.texture import Texture + +BaseComponents = [Config, Texture, DEM] diff --git a/maps4fs/generator/component.py b/maps4fs/generator/component.py new file mode 100644 index 0000000..58b9c8b --- /dev/null +++ b/maps4fs/generator/component.py @@ -0,0 +1,34 @@ +from typing import Any + +import maps4fs as mfs + + +class Component: + """Base class for all map generation components. + + Args: + coordinates (tuple[float, float]): The latitude and longitude of the center of the map. + distance (int): The distance from the center to the edge of the map. + map_directory (str): The directory where the map files are stored. + logger (Any, optional): The logger to use. Must have at least three basic methods: debug, + info, warning. If not provided, default logging will be used. + """ + + def __init__( + self, + coordinates: tuple[float, float], + distance: int, + map_directory: str, + logger: Any = None, + **kwargs, + ): + self.coordinates = coordinates + self.distance = distance + self.map_directory = map_directory + + if not logger: + logger = mfs.Logger(__name__, to_stdout=True, to_file=False) + self.logger = logger + + def process(self): + raise NotImplementedError diff --git a/maps4fs/generator/config.py b/maps4fs/generator/config.py new file mode 100644 index 0000000..5db8065 --- /dev/null +++ b/maps4fs/generator/config.py @@ -0,0 +1,47 @@ +import os +from typing import Any +from xml.etree import ElementTree as ET + +from maps4fs.generator import Component + + +class Config(Component): + """Component for map settings and configuration. + + Args: + coordinates (tuple[float, float]): The latitude and longitude of the center of the map. + distance (int): The distance from the center to the edge of the map. + map_directory (str): The directory where the map files are stored. + logger (Any, optional): The logger to use. Must have at least three basic methods: debug, + info, warning. If not provided, default logging will be used. + """ + + def __init__( + self, + coordinates: tuple[float, float], + distance: int, + map_directory: str, + logger: Any = None, + **kwargs, + ): + super().__init__(coordinates, distance, map_directory, logger) + self._map_xml_path = os.path.join(self.map_directory, "maps", "map", "map.xml") + + def process(self): + self._set_map_size() + + def _set_map_size(self): + """Edits map.xml file to set correct map size.""" + if not os.path.isfile(self._map_xml_path): + self.logger.warning(f"Map XML file not found: {self._map_xml_path}.") + return + tree = ET.parse(self._map_xml_path) + self.logger.debug(f"Map XML file loaded from: {self._map_xml_path}.") + root = tree.getroot() + for map_elem in root.iter("map"): + width = height = str(self.distance * 2) + map_elem.set("width", width) + map_elem.set("height", height) + self.logger.debug(f"Map size set to {width}x{height} in Map XML file.") + tree.write(self._map_xml_path) + self.logger.debug(f"Map XML file saved to: {self._map_xml_path}.") diff --git a/maps4fs/generator/dem.py b/maps4fs/generator/dem.py new file mode 100644 index 0000000..268324b --- /dev/null +++ b/maps4fs/generator/dem.py @@ -0,0 +1,196 @@ +import gzip +import math +import os +import shutil +from typing import Any + +import cv2 +import numpy as np +import osmnx as ox +import rasterio +import requests + +import maps4fs.globals as g +from maps4fs.generator import Component + + +class DEM(Component): + """Component for map settings and configuration. + + Args: + coordinates (tuple[float, float]): The latitude and longitude of the center of the map. + distance (int): The distance from the center to the edge of the map. + map_directory (str): The directory where the map files are stored. + logger (Any, optional): The logger to use. Must have at least three basic methods: debug, + info, warning. If not provided, default logging will be used. + """ + + def __init__( + self, + coordinates: tuple[float, float], + distance: int, + map_directory: str, + logger: Any = None, + **kwargs, + ): + super().__init__(coordinates, distance, map_directory, logger) + self._dem_path = os.path.join(self.map_directory, "maps", "map", "data", "map_dem.png") + self.temp_dir = "temp" + self.hgt_dir = os.path.join(self.temp_dir, "hgt") + self.gz_dir = os.path.join(self.temp_dir, "gz") + os.makedirs(self.hgt_dir, exist_ok=True) + os.makedirs(self.gz_dir, exist_ok=True) + + self._blur_seed = kwargs.get("blur_seed") + self._max_height = kwargs.get("max_height") + + def process(self) -> None: + """Reads SRTM file, crops it to map size, normalizes and blurs it, saves to map directory.""" + north, south, east, west = ox.utils_geo.bbox_from_point( + self.coordinates, dist=self.distance + ) + max_y, min_y = max(north, south), min(north, south) + max_x, min_x = max(east, west), min(east, west) + + dem_output_resolution = (self.distance + 1, self.distance + 1) + + tile_path = self._srtm_tile() + if not tile_path: + self.logger.warning("Tile was not downloaded, DEM file will be filled with zeros.") + self._save_empty_dem(dem_output_resolution) + return + + with rasterio.open(tile_path) as src: + self.logger.debug(f"Opened tile, shape: {src.shape}, dtype: {src.dtypes[0]}.") + window = rasterio.windows.from_bounds(min_x, min_y, max_x, max_y, src.transform) + self.logger.debug( + f"Window parameters. Column offset: {window.col_off}, row offset: {window.row_off}, " + f"width: {window.width}, height: {window.height}." + ) + data = src.read(1, window=window) + + if not data.size > 0: + self.logger.warning("DEM data is empty, DEM file will be filled with zeros.") + self._save_empty_dem(dem_output_resolution) + return + + self.logger.debug( + f"DEM data was read from SRTM file. Shape: {data.shape}, dtype: {data.dtype}. " + f"Min: {data.min()}, max: {data.max()}." + ) + + normalized_data = self._normalize_dem(data) + + resampled_data = cv2.resize( + normalized_data, dem_output_resolution, interpolation=cv2.INTER_LINEAR + ) + self.logger.debug( + f"DEM data was resampled. Shape: {resampled_data.shape}, dtype: {resampled_data.dtype}. " + f"Min: {resampled_data.min()}, max: {resampled_data.max()}." + ) + + blurred_data = cv2.GaussianBlur(resampled_data, (self._blur_seed, self._blur_seed), 0) + cv2.imwrite(self._dem_path, blurred_data) + self.logger.debug(f"DEM data was blurred and saved to {self._dem_path}.") + + def _tile_info(self, lat: float, lon: float) -> tuple[str, str]: + """Returns latitude band and tile name for SRTM tile from coordinates. + + Args: + lat (float): Latitude. + lon (float): Longitude. + + Returns: + tuple[str, str]: Latitude band and tile name. + """ + tile_latitude = math.floor(lat) + tile_longitude = math.floor(lon) + + latitude_band = f"N{abs(tile_latitude):02d}" + if lon < 0: + tile_name = f"N{abs(tile_latitude):02d}W{abs(abs(tile_longitude)):03d}" + else: + tile_name = f"N{abs(tile_latitude):02d}E{abs(tile_longitude):03d}" + + self.logger.debug(f"Detected tile name: {tile_name} for coordinates: lat {lat}, lon {lon}.") + return latitude_band, tile_name + + def _download_tile(self) -> str | None: + """Downloads SRTM tile from Amazon S3 using coordinates. + + Returns: + str: Path to compressed tile or None if download failed. + """ + latitude_band, tile_name = self._tile_info(*self.coordinates) + compressed_file_path = os.path.join(self.gz_dir, f"{tile_name}.hgt.gz") + url = g.SRTM.format(latitude_band=latitude_band, tile_name=tile_name) + self.logger.debug(f"Trying to get response from {url}...") + response = requests.get(url, stream=True) + + if response.status_code == 200: + self.logger.debug(f"Response received. Saving to {compressed_file_path}...") + with open(compressed_file_path, "wb") as f: + for chunk in response.iter_content(chunk_size=8192): + f.write(chunk) + self.logger.debug("Compressed tile successfully downloaded.") + else: + self.logger.error(f"Response was failed with status code {response.status_code}.") + return + + return compressed_file_path + + def _srtm_tile(self) -> str | None: + """Determines SRTM tile name from coordinates downloads it if necessary, and decompresses it. + + Returns: + str: Path to decompressed tile or None if download failed. + """ + latitude_band, tile_name = self._tile_info(*self.coordinates) + self.logger.debug(f"SRTM tile name {tile_name} from latitude band {latitude_band}.") + + decompressed_file_path = os.path.join(self.hgt_dir, f"{tile_name}.hgt") + if os.path.isfile(decompressed_file_path): + self.logger.info( + f"Decompressed tile already exists: {decompressed_file_path}, skipping download." + ) + return decompressed_file_path + + compressed_file_path = self._download_tile() + if not compressed_file_path: + self.logger.error("Download from SRTM failed, DEM file will be filled with zeros.") + return + with gzip.open(compressed_file_path, "rb") as f_in: + with open(decompressed_file_path, "wb") as f_out: + shutil.copyfileobj(f_in, f_out) + self.logger.debug(f"Tile decompressed to {decompressed_file_path}.") + return decompressed_file_path + + def _save_empty_dem(self, dem_output_resolution: tuple[int, int]) -> None: + """Saves empty DEM file filled with zeros.""" + dem_data = np.zeros(dem_output_resolution, dtype="uint16") + cv2.imwrite(self.map_dem_path, dem_data) + self.logger.warning(f"DEM data filled with zeros and saved to {self.map_dem_path}.") + + def _normalize_dem(self, data: np.ndarray) -> np.ndarray: + """Normalize DEM data to 16-bit unsigned integer using max height from settings. + + Args: + data (np.ndarray): DEM data from SRTM file after cropping. + + Returns: + np.ndarray: Normalized DEM data. + """ + max_dev = data.max() - data.min() + scaling_factor = max_dev / self._max_height if max_dev < self._max_height else 1 + adjusted_max_height = int(65535 * scaling_factor) + self.logger.debug( + f"Maximum deviation: {max_dev}. Scaling factor: {scaling_factor}. " + f"Adjusted max height: {adjusted_max_height}." + ) + normalized_data = ( + (data - data.min()) / (data.max() - data.min()) * adjusted_max_height + ).astype("uint16") + self.logger.debug( + f"DEM data was normalized to {normalized_data.min()} - {normalized_data.max()}." + ) + return normalized_data diff --git a/maps4fs/generator/map.py b/maps4fs/generator/map.py new file mode 100644 index 0000000..e3922fa --- /dev/null +++ b/maps4fs/generator/map.py @@ -0,0 +1,74 @@ +import os +import shutil +from typing import Any + +from tqdm import tqdm + +import maps4fs as mfs + + +class Map: + def __init__( + self, + coordinates: tuple[float, float], + distance: int, + map_directory: str, + blur_seed: int, + max_height: int, + map_template: str = None, + logger: Any = None, + ): + self.coordinates = coordinates + self.distance = distance + self.map_directory = map_directory + + if not logger: + logger = mfs.Logger(__name__, to_stdout=True, to_file=False) + self.logger = logger + self.components = [] + + os.makedirs(self.map_directory, exist_ok=True) + if map_template: + shutil.unpack_archive(map_template, self.map_directory) + self.logger.info(f"Map template {map_template} unpacked to {self.map_directory}") + else: + self.logger.warning( + "Map template not provided, if directory does not contain required files, " + "it may not work properly in Giants Editor." + ) + + self._add_components(blur_seed, max_height) + + def _add_components(self, blur_seed: int, max_height: int) -> None: + self.logger.debug("Starting adding components...") + for component in mfs.generator.BaseComponents: + active_component = component( + self.coordinates, + self.distance, + self.map_directory, + self.logger, + blur_seed=blur_seed, + max_height=max_height, + ) + setattr(self, component.__name__.lower(), active_component) + self.components.append(active_component) + self.logger.debug(f"Added {len(self.components)} components.") + + def generate(self) -> None: + with tqdm(total=len(self.components), desc="Generating map...") as pbar: + for component in self.components: + try: + component.process() + except Exception as e: + self.logger.error( + f"Error processing component {component.__class__.__name__}: {e}" + ) + pbar.update(1) + + def previews(self) -> list[str]: + return self.texture.previews() + + def pack(self, archive_name: str) -> str: + archive_path = shutil.make_archive(archive_name, "zip", self.map_directory) + self.logger.info(f"Map packed to {archive_name}.zip") + return archive_path diff --git a/maps4fs/generator/texture.py b/maps4fs/generator/texture.py new file mode 100644 index 0000000..ddc41ac --- /dev/null +++ b/maps4fs/generator/texture.py @@ -0,0 +1,413 @@ +import json +import os +import re +import warnings +from typing import Any, Callable, Generator + +import cv2 +import numpy as np +import osmnx as ox +import pandas as pd +import shapely.geometry + +import maps4fs.globals as g +from maps4fs.generator import Component + + +class Texture(Component): + class Layer: + """Class which represents a layer with textures and tags. + It's using to obtain data from OSM using tags and make changes into corresponding textures. + + Args: + name (str): Name of the layer. + tags (dict[str, str | list[str]]): Dictionary of tags to search for. + width (int | None): Width of the polygon in meters (only for LineString). + color (tuple[int, int, int]): Color of the layer in BGR format. + + Attributes: + name (str): Name of the layer. + tags (dict[str, str | list[str]]): Dictionary of tags to search for. + width (int | None): Width of the polygon in meters (only for LineString). + """ + + def __init__( + self, + weights_dir: str, + name: str, + tags: dict[str, str | list[str]], + width: int = None, + color: tuple[int, int, int] = None, + ): + self.weights_dir = weights_dir + self.name = name + self.tags = tags + self.width = width + self.color = color if color else (255, 255, 255) + self._get_paths() + + def _get_paths(self): + """Gets paths to textures of the layer. + + Raises: + FileNotFoundError: If texture is not found. + """ + if self.name == "waterPuddle": + self.paths = [os.path.join(self.weights_dir, "waterPuddle_weight.png")] + return + weight_files = [ + os.path.join(self.weights_dir, f) + for f in os.listdir(self.weights_dir) + if f.endswith("_weight.png") + ] + pattern = re.compile(rf"{self.name}\d{{2}}_weight") + paths = [path for path in weight_files if pattern.search(path)] + if not paths: + raise FileNotFoundError(f"Texture not found: {self.name}") + self.paths = paths + + @property + def path(self) -> str: + """Returns path to the first texture of the layer. + + Returns: + str: Path to the texture. + """ + return self.paths[0] + + def __init__( + self, + coordinates: tuple[float, float], + distance: int, + map_directory: str, + logger: Any = None, + **kwargs, + ): + super().__init__(coordinates, distance, map_directory, logger) + self._weights_dir = os.path.join(self.map_directory, "maps", "map", "data") + self._bbox = ox.utils_geo.bbox_from_point(self.coordinates, dist=self.distance) + self.info_save_path = os.path.join(self.map_directory, "generation_info.json") + + def process(self): + self._prepare_weights() + self._read_parameters() + self.draw() + self.info_sequence() + + def _read_parameters(self) -> None: + """Reads map parameters from OSM data, such as: + - minimum and maximum coordinates in UTM format + - map dimensions in meters + - map coefficients (meters per pixel) + """ + north, south, east, west = ox.utils_geo.bbox_from_point( + self.coordinates, dist=self.distance, project_utm=True + ) + # Parameters of the map in UTM format (meters). + self.minimum_x = min(west, east) + self.minimum_y = min(south, north) + self.maximum_x = max(west, east) + self.maximum_y = max(south, north) + self.logger.debug(f"Map minimum coordinates (XxY): {self.minimum_x} x {self.minimum_y}.") + self.logger.debug(f"Map maximum coordinates (XxY): {self.maximum_x} x {self.maximum_y}.") + + self.height = abs(north - south) + self.width = abs(east - west) + self.logger.info(f"Map dimensions (HxW): {self.height} x {self.width}.") + + self.height_coef = self.height / (self.distance * 2) + self.width_coef = self.width / (self.distance * 2) + self.logger.debug(f"Map coefficients (HxW): {self.height_coef} x {self.width_coef}.") + + self.easting = self.minimum_x < 500000 + self.northing = self.minimum_y < 10000000 + self.logger.debug(f"Map is in {'east' if self.easting else 'west'} of central meridian.") + self.logger.debug(f"Map is in {'north' if self.northing else 'south'} hemisphere.") + + def info_sequence(self) -> None: + """Saves generation info to JSON file "generation_info.json". + + Info sequence contains following attributes: + - coordinates + - bbox + - distance + - minimum_x + - minimum_y + - maximum_x + - maximum_y + - height + - width + - height_coef + - width_coef + - easting + - northing + """ + useful_attributes = [ + "coordinates", + "bbox", + "distance", + "minimum_x", + "minimum_y", + "maximum_x", + "maximum_y", + "height", + "width", + "height_coef", + "width_coef", + "easting", + "northing", + ] + info_sequence = {attr: getattr(self, attr, None) for attr in useful_attributes} + + with open(self.info_save_path, "w") as f: + json.dump(info_sequence, f, indent=4) + self.logger.info(f"Generation info saved to {self.info_save_path}.") + + def _prepare_weights(self): + self.logger.debug("Starting preparing weights...") + for texture_name, layer_numbers in g.TEXTURES.items(): + self._generate_weights(texture_name, layer_numbers) + self.logger.debug(f"Prepared weights for {len(g.TEXTURES)} textures.") + + def _generate_weights(self, texture_name: str, layer_numbers: int) -> None: + """Generates weight files for textures. Each file is a numpy array of zeros and dtype uint8 (0-255). + + Args: + texture_name (str): Name of the texture. + layer_numbers (int): Number of layers in the texture. + """ + size = self.distance * 2 + postfix = "_weight.png" + if layer_numbers == 0: + filepaths = [os.path.join(self._weights_dir, texture_name + postfix)] + else: + filepaths = [ + os.path.join(self._weights_dir, texture_name + str(i).zfill(2) + postfix) + for i in range(1, layer_numbers + 1) + ] + + for filepath in filepaths: + img = np.zeros((size, size), dtype=np.uint8) + cv2.imwrite(filepath, img) + + @property + def layers(self) -> list[Layer]: + """Returns list of layers with textures and tags from textures.json. + + Returns: + list[Layer]: List of layers. + """ + asphalt = self.Layer( + self._weights_dir, + "asphalt", + {"highway": ["motorway", "trunk", "primary"]}, + width=8, + color=(70, 70, 70), + ) + concrete = self.Layer( + self._weights_dir, "concrete", {"building": True}, width=8, color=(130, 130, 130) + ) + dirtDark = self.Layer( + self._weights_dir, + "dirtDark", + {"highway": ["unclassified", "residential", "track"]}, + width=2, + color=(33, 67, 101), + ) + grassDirt = self.Layer( + self._weights_dir, + "grassDirt", + {"natural": ["wood", "tree_row"]}, + width=2, + color=(0, 252, 124), + ) + grass = self.Layer( + self._weights_dir, "grass", {"natural": "grassland"}, color=(34, 255, 34) + ) + forestGround = self.Layer( + self._weights_dir, "forestGround", {"landuse": "farmland"}, color=(47, 107, 85) + ) + gravel = self.Layer( + self._weights_dir, + "gravel", + {"highway": ["secondary", "tertiary", "road"]}, + width=4, + color=(140, 180, 210), + ) + waterPuddle = self.Layer( + self._weights_dir, + "waterPuddle", + {"natural": "water", "waterway": True}, + width=10, + color=(255, 20, 20), + ) + return [asphalt, concrete, dirtDark, forestGround, grass, grassDirt, gravel, waterPuddle] + + def draw(self) -> None: + """Iterates over layers and fills them with polygons from OSM data.""" + for layer in self.layers: + img = cv2.imread(layer.path, cv2.IMREAD_UNCHANGED) + for polygon in self.polygons(layer.tags, layer.width): + cv2.fillPoly(img, [polygon], color=255) + cv2.imwrite(layer.path, img) + self.logger.debug(f"Texture {layer.path} saved.") + + def get_relative_x(self, x: float) -> int: + """Converts UTM X coordinate to relative X coordinate in map image. + + Args: + x (float): UTM X coordinate. + + Returns: + int: Relative X coordinate in map image. + """ + if self.easting: + raw_x = x - self.minimum_x + else: + raw_x = self.minimum_x - x + return int(raw_x * self.height_coef) + + def get_relative_y(self, y: float) -> int: + """Converts UTM Y coordinate to relative Y coordinate in map image. + + Args: + y (float): UTM Y coordinate. + + Returns: + int: Relative Y coordinate in map image. + """ + if self.northing: + raw_y = y - self.minimum_y + else: + raw_y = self.minimum_y - y + return self.height - int(raw_y * self.width_coef) + + def _to_np(self, geometry: shapely.geometry.polygon.Polygon, *args) -> np.ndarray: + """Converts Polygon geometry to numpy array of polygon points. + + Args: + geometry (shapely.geometry.polygon.Polygon): Polygon geometry. + *args: Additional arguments: + - width (int | None): Width of the polygon in meters. + + Returns: + np.ndarray: Numpy array of polygon points. + """ + xs, ys = geometry.exterior.coords.xy + xs = [int(self.get_relative_x(x)) for x in xs.tolist()] + ys = [int(self.get_relative_y(y)) for y in ys.tolist()] + pairs = list(zip(xs, ys)) + return np.array(pairs, dtype=np.int32).reshape((-1, 1, 2)) + + def _to_polygon(self, obj: pd.core.series.Series, width: int | None) -> np.ndarray | None: + """Converts OSM object to numpy array of polygon points. + + Args: + obj (pd.core.series.Series): OSM object. + width (int | None): Width of the polygon in meters. + + Returns: + np.ndarray | None: Numpy array of polygon points. + """ + geometry = obj["geometry"] + geometry_type = geometry.geom_type + converter = self._converters(geometry_type) + if not converter: + self.logger.warning(f"Geometry type {geometry_type} not supported.") + return + return converter(geometry, width) + + def _sequence( + self, + geometry: shapely.geometry.linestring.LineString | shapely.geometry.point.Point, + width: int | None, + ) -> np.ndarray: + """Converts LineString or Point geometry to numpy array of polygon points. + + Args: + geometry (shapely.geometry.linestring.LineString | shapely.geometry.point.Point): + LineString or Point geometry. + width (int | None): Width of the polygon in meters. + + Returns: + np.ndarray: Numpy array of polygon points. + """ + polygon = geometry.buffer(width) + return self._to_np(polygon) + + def _converters(self, geom_type: str) -> Callable[[shapely.geometry, int | None], np.ndarray]: + """Returns a converter function for a given geometry type. + + Args: + geom_type (str): Geometry type. + + Returns: + Callable[[shapely.geometry, int | None], np.ndarray]: Converter function. + """ + converters = {"Polygon": self._to_np, "LineString": self._sequence, "Point": self._sequence} + return converters.get(geom_type) + + def polygons( + self, tags: dict[str, str | list[str]], width: int | None + ) -> Generator[np.ndarray, None, None]: + """Generator which yields numpy arrays of polygons from OSM data. + + Args: + tags (dict[str, str | list[str]]): Dictionary of tags to search for. + width (int | None): Width of the polygon in meters (only for LineString). + + Yields: + Generator[np.ndarray, None, None]: Numpy array of polygon points. + """ + try: + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + objects = ox.features_from_bbox(*self._bbox, tags=tags) + except Exception as e: + self.logger.warning(f"Error fetching objects for tags: {tags}.") + self.logger.warning(e) + return + objects_utm = ox.project_gdf(objects, to_latlong=False) + self.logger.debug(f"Fetched {len(objects_utm)} elements for tags: {tags}.") + + for index, obj in objects_utm.iterrows(): + polygon = self._to_polygon(obj, width) + if polygon is None: + continue + yield polygon + + def previews(self) -> list[str]: + """Invokes methods to generate previews. Returns list of paths to previews. + + Returns: + list[str]: List of paths to previews. + """ + preview_paths = [] + preview_paths.append(self._osm_preview()) + return preview_paths + + def _osm_preview(self) -> str: + """Merges layers into one image and saves it into the png file. + + Returns: + str: Path to the preview. + """ + preview_size = (2048, 2048) + images = [ + cv2.resize(cv2.imread(layer.path, cv2.IMREAD_UNCHANGED), preview_size) + for layer in self.layers + ] + colors = [layer.color for layer in self.layers] + color_images = [] + for img, color in zip(images, colors): + color_img = np.zeros((img.shape[0], img.shape[1], 3), dtype=np.uint8) + color_img[img > 0] = color + color_images.append(color_img) + merged = np.sum(color_images, axis=0, dtype=np.uint8) + self.logger.debug( + f"Merged layers into one image. Shape: {merged.shape}, dtype: {merged.dtype}." + ) + preview_path = os.path.join(self.map_directory, "preview_osm.png") + cv2.imwrite(preview_path, merged) + self.logger.info(f"Preview saved to {preview_path}.") + return preview_path diff --git a/maps4fs/globals.py b/maps4fs/globals.py new file mode 100644 index 0000000..668b307 --- /dev/null +++ b/maps4fs/globals.py @@ -0,0 +1,31 @@ +MAP_SIZES = ["2048", "4096", "8192", "16384"] +MAX_HEIGHTS = { + "100": "🍀 For flatlands", + "200": "🍀 For plains", + "400": "🗻 For hills", + "600": "⛰️ For large hills", + "800": "🏔️ For mountains", +} + +SRTM = "https://elevation-tiles-prod.s3.amazonaws.com/skadi/{latitude_band}/{tile_name}.hgt.gz" + +TEXTURES = { + "animalMud": 4, + "asphalt": 4, + "cobbleStone": 4, + "concrete": 4, + "concreteRubble": 4, + "concreteTiles": 4, + "dirt": 4, + "dirtDark": 2, + "forestGround": 4, + "forestGroundLeaves": 4, + "grass": 4, + "grassDirt": 4, + "gravel": 4, + "groundBricks": 4, + "mountainRock": 4, + "mountainRockDark": 4, + "riverMud": 4, + "waterPuddle": 0, +} diff --git a/src/logger.py b/maps4fs/logger.py similarity index 60% rename from src/logger.py rename to maps4fs/logger.py index 6e79b5d..d174744 100644 --- a/src/logger.py +++ b/maps4fs/logger.py @@ -2,33 +2,38 @@ import os import sys from datetime import datetime +from typing import Literal -working_directory = os.getcwd() -log_directory = os.path.join(working_directory, "logs") +log_directory = os.path.join(os.getcwd(), "logs") os.makedirs(log_directory, exist_ok=True) class Logger(logging.getLoggerClass()): """Handles logging to the file and stroudt with timestamps.""" - def __init__(self, name: str): + def __init__( + self, + name: str, + level: Literal["DEBUG", "INFO", "WARNING", "ERROR"] = "ERROR", + to_stdout: bool = True, + to_file: bool = True, + ): super().__init__(name) - self.setLevel(logging.DEBUG) + self.setLevel(level) self.stdout_handler = logging.StreamHandler(sys.stdout) self.file_handler = logging.FileHandler( filename=self.log_file(), mode="a", encoding="utf-8" ) - formatter = "%(name)s | %(asctime)s | %(message)s" + formatter = "%(name)s | %(levelname)s | %(asctime)s | %(message)s" self.fmt = formatter self.stdout_handler.setFormatter(logging.Formatter(formatter)) self.file_handler.setFormatter(logging.Formatter(formatter)) - self.addHandler(self.stdout_handler) - self.addHandler(self.file_handler) + if to_stdout: + self.addHandler(self.stdout_handler) + if to_file: + self.addHandler(self.file_handler) def log_file(self): today = datetime.now().strftime("%Y-%m-%d") log_file = os.path.join(log_directory, f"{today}.txt") return log_file - - def log(self, message: str): - return self.info(message) diff --git a/src/main.py b/maps4fs/ui.py old mode 100644 new mode 100755 similarity index 66% rename from src/main.py rename to maps4fs/ui.py index d9caf6b..b41fce1 --- a/src/main.py +++ b/maps4fs/ui.py @@ -3,33 +3,33 @@ import tkinter as tk from tkinter import ttk -from rich.console import Console +import maps4fs as mfs -import generate - -logger = Console() -working_directory = os.getcwd() -output_directory = os.path.join(working_directory, "output") +map_directory = os.path.join(os.getcwd(), "output") +map_template = os.path.join(os.getcwd(), "data", "map-template.zip") +if not os.path.isfile(map_template): + raise FileNotFoundError(f"Map template not found: {map_template}") def start() -> None: """Reads the input from the GUI and starts the map generation.""" - lat = float(lat_entry.get()) - lon = float(lon_entry.get()) + lat_lon = lat_lon_entry.get().strip() + lat, lon = map(float, lat_lon.split(",")) size = int(size_var.get()) distance = int(size / 2) blur_seed = int(blur_seed_entry.get()) max_height = int(max_height_var.get()) - dem_settings = generate.DemSettings(blur_seed, max_height) result_label.config(text="Generating...") root.update() - gm = generate.Map(working_directory, (lat, lon), distance, dem_settings, logger) - gm.preview() - gm.info_sequence() + + mp = mfs.Map((lat, lon), distance, map_directory, blur_seed, max_height, map_template) + mp.generate() + mp.previews() + result_label.config(text="Saved in:") - path_label.config(text=f"{output_directory}") + path_label.config(text=f"{map_directory}") for i in range(5, 0, -1): close_time_label.config(text=f"Closing in {i} seconds...") root.update() @@ -39,29 +39,23 @@ def start() -> None: def open_output_dir(event: tk.Event) -> None: """Open the output directory in the file explorer.""" - os.startfile(output_directory) + os.startfile(map_directory) root = tk.Tk() root.geometry("300x300") -lat_label = tk.Label(root, text="Latitude:") -lat_label.grid(row=0, column=0) -lat_entry = tk.Entry(root) -lat_entry.insert(0, "") -lat_entry.grid(row=0, column=1) - -lon_label = tk.Label(root, text="Longitude:") -lon_label.grid(row=1, column=0) -lon_entry = tk.Entry(root) -lon_entry.insert(0, "") -lon_entry.grid(row=1, column=1) +lat_lon_label = tk.Label(root, text="Latitude and longitude:") +lat_lon_label.grid(row=0, column=0) +lat_lon_entry = tk.Entry(root) +lat_lon_entry.insert(0, "45.2602, 19.8086") +lat_lon_entry.grid(row=0, column=1) size_label = tk.Label(root, text="Map size:") size_label.grid(row=2, column=0) size_var = tk.StringVar(root) size_var.set("2048") -size_menu = tk.OptionMenu(root, size_var, *generate.MAP_SIZES) +size_menu = tk.OptionMenu(root, size_var, *mfs.globals.MAP_SIZES) size_menu.grid(row=2, column=1) button = tk.Button(root, text="Generate map", command=start) @@ -93,7 +87,7 @@ def open_output_dir(event: tk.Event) -> None: max_height_label.grid(row=9, column=0) max_height_var = tk.StringVar(root) max_height_var.set("400") -max_height_menu = tk.OptionMenu(root, max_height_var, *list(generate.MAX_HEIGHTS.keys())) +max_height_menu = tk.OptionMenu(root, max_height_var, *list(mfs.globals.MAX_HEIGHTS.keys())) max_height_menu.grid(row=9, column=1) root.mainloop() diff --git a/release.py b/release.py index 0e09998..5187c23 100644 --- a/release.py +++ b/release.py @@ -1,19 +1,18 @@ import os import zipfile -ARCHIVE_NAME = "maps4fs.zip" +ARCHIVE_PATH = os.path.join("/Users/iwatkot/Downloads", "maps4fs.zip") -release_directories = ["data", "src"] +release_directories = ["data", "maps4fs"] release_files = [ "requirements.txt", "run.ps1", "run.sh", - "textures.json", ] -def main(): - with zipfile.ZipFile(ARCHIVE_NAME, "w") as zf: +def main() -> None: + with zipfile.ZipFile(ARCHIVE_PATH, "w") as zf: for directory in release_directories: for root, _, files in os.walk(directory): for file in files: @@ -22,7 +21,7 @@ def main(): for file in release_files: zf.write(file) - print(f"Release archive created: {ARCHIVE_NAME}") + print(f"Release archive created: {ARCHIVE_PATH}") if __name__ == "__main__": diff --git a/requirements.txt b/requirements.txt index cbabf86..27f1f1b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ -rich==13.7.0 opencv-python==4.9.0.80 osmnx==1.8.1 -rasterio==1.3.9 \ No newline at end of file +rasterio==1.3.9 +tqdm==4.66.2 \ No newline at end of file diff --git a/run.ps1 b/run.ps1 index 3d5fd23..7f13324 100644 --- a/run.ps1 +++ b/run.ps1 @@ -2,6 +2,7 @@ python -m venv .venv . .\.venv\Scripts\Activate -pip install -r requirements.txt +# pip install -r requirements.txt -python src\main.py \ No newline at end of file +$env:PYTHONPATH = "$(Get-Location)$([IO.Path]::PathSeparator)$env:PYTHONPATH" +python maps4fs\ui.py \ No newline at end of file diff --git a/run.sh b/run.sh index 45028f3..da51cdb 100644 --- a/run.sh +++ b/run.sh @@ -32,4 +32,5 @@ echo "Requirements will be installed..." && \ pip3 install -r requirements.txt && \ echo "Requirements have been successfully installed, VENV ready." && \ -$python_executable src/main.py +export PYTHONPATH="${PWD}:${PYTHONPATH}" +$python_executable maps4fs/ui.py diff --git a/src/generate.py b/src/generate.py deleted file mode 100644 index 3828926..0000000 --- a/src/generate.py +++ /dev/null @@ -1,696 +0,0 @@ -import gzip -import json -import math -import os -import re -import shutil -import warnings -import xml.etree.ElementTree as ET -from collections import namedtuple -from typing import Any, Callable, Generator - -import cv2 -import numpy as np -import osmnx as ox -import pandas as pd -import rasterio -import requests -import shapely - -# region constants -MAP_SIZES = ["2048", "4096", "8192", "16384"] -MAX_HEIGHTS = { - "100": "🍀 For flatlands", - "200": "🍀 For plains", - "400": "🗻 For hills", - "600": "⛰️ For large hills", - "800": "🏔️ For mountains", -} -SRTM = "https://elevation-tiles-prod.s3.amazonaws.com/skadi/{latitude_band}/{tile_name}.hgt.gz" -# endregion - -# region settings -DemSettings = namedtuple("DemSettings", ["blur_seed", "max_height"]) -# endregion - - -class Map: - """Class which represents a map instance. It's using to generate map from coordinates. - It's using OSM data to generate textures and SRTM data to generate DEM. - - Args: - working_directory (str): Path to working directory. - coordinates (tuple[float, float]): Coordinates of the map center. - distance (int): Distance from the map center to the map edge. - dem_settings (DemSettings): Settings for DEM generation. - logger (Any): Logger instance. - name (str, optional): Name of the map instance. Defaults to None. Used for multiple instances. - - Attributes: - working_directory (str): Path to working directory. - coordinates (tuple[float, float]): Coordinates of the map center. - distance (int): Distance from the map center to the map edge. - dem_settings (DemSettings): Settings for DEM generation. - logger (Any): Logger instance. - bbox (tuple[float, float, float, float]): Bounding box of the map. - minimum_x (float): Minimum X coordinate of the map in UTM format. - minimum_y (float): Minimum Y coordinate of the map in UTM format. - maximum_x (float): Maximum X coordinate of the map in UTM format. - maximum_y (float): Maximum Y coordinate of the map in UTM format. - height (float): Height of the map in meters. - width (float): Width of the map in meters. - height_coef (float): Height coefficient (meters per pixel). - width_coef (float): Width coefficient (meters per pixel). - easting (bool): True if map is in east hemisphere, False otherwise. - northing (bool): True if map is in north hemisphere, False otherwise. - output_dir (str): Path to output directory. - gz_dir (str): Path to gz directory. - hgt_dir (str): Path to hgt directory. - data_dir (str): Path to data directory. - weights_dir (str): Path to weights directory. - map_xml_path (str): Path to map.xml file. - map_dem_path (str): Path to map_dem.png file. - layers (list[Layer]): List of layers with textures and tags from textures.json. - """ - - def __init__( - self, - working_directory: str, - coordinates: tuple[float, float], - distance: int, - dem_settings: DemSettings, - logger: Any, - name: str = None, - ): - self.working_directory = working_directory - self.coordinates = coordinates - self.distance = distance - self.dem_settings = dem_settings - self.logger = logger - self.name = name - self._prepare_dirs(name) - self._set_map_size() - self._prepare_weights() - - self.logger.log(f"Fetching map data for coordinates: {coordinates}...") - self.bbox = ox.utils_geo.bbox_from_point(self.coordinates, dist=self.distance) - print(self.bbox) - self._read_parameters() - self._locate_map() - self.draw() - self.dem() - - def info_sequence(self) -> None: - """Saves generation info to JSON file "generation_info.json". - - Info sequence contains following attributes: - - coordinates - - bbox - - distance - - minimum_x - - minimum_y - - maximum_x - - maximum_y - - height - - width - - height_coef - - width_coef - - easting - - northing - - tile_name - """ - useful_attributes = [ - "coordinates", - "bbox", - "distance", - "minimum_x", - "minimum_y", - "maximum_x", - "maximum_y", - "height", - "width", - "height_coef", - "width_coef", - "easting", - "northing", - "tile_name", - ] - info_save_path = os.path.join(self.output_dir, "generation_info.json") - info_sequence = {attr: getattr(self, attr, None) for attr in useful_attributes} - - with open(info_save_path, "w") as f: - json.dump(info_sequence, f, indent=4) - self.logger.log(f"Generation info saved to {info_save_path}.") - - def _prepare_dirs(self, name: str | None) -> None: - """Defines directories for map generation and creates some of them. - Unpacks template archive to output directory. - Following directories are used by the instance: - - output (where template will be unpacked and weights edited) - - output/{name} (if name is provided for multiple instances) - - temp (contains gz and hgt directories) - - temp/gz (where SRTM files will be downloaded) - - temp/hgt (where SRTM files will be extracted) - - data (contains map-template.zip) - """ - self.working_directory = os.getcwd() - self.logger.log(f"Working directory: {self.working_directory}") - - self.output_dir = os.path.join(self.working_directory, "output") - if name: - self.output_dir = os.path.join(self.output_dir, name) - os.makedirs(self.output_dir, exist_ok=True) - self.logger.log(f"Output directory created: {self.output_dir}") - - tmp_dir = os.path.join(self.working_directory, "temp") - self.gz_dir = os.path.join(tmp_dir, "gz") - self.hgt_dir = os.path.join(tmp_dir, "hgt") - os.makedirs(self.gz_dir, exist_ok=True) - os.makedirs(self.hgt_dir, exist_ok=True) - self.logger.log(f"Temporary directories created: {self.gz_dir}, {self.hgt_dir}") - - self.data_dir = os.path.join(self.working_directory, "data") - template_archive = os.path.join(self.data_dir, "map-template.zip") - shutil.unpack_archive(template_archive, self.output_dir) - self.logger.log(f"Template archive unpacked to {self.output_dir}") - - global weights_dir - weights_dir = os.path.join(self.output_dir, "maps", "map", "data") - self.weights_dir = weights_dir - self.logger.log(f"Weights directory: {self.weights_dir}") - self.map_xml_path = os.path.join(self.output_dir, "maps", "map", "map.xml") - self.map_dem_path = os.path.join(self.weights_dir, "map_dem.png") - self.logger.log(f"Map XML file: {self.map_xml_path}, DEM file: {self.map_dem_path}") - - def _set_map_size(self): - """Edits map.xml file to set correct map size.""" - tree = ET.parse(self.map_xml_path) - self.logger.log(f"Map XML file loaded from: {self.map_xml_path}.") - root = tree.getroot() - for map_elem in root.iter("map"): - map_elem.set("width", str(self.distance * 2)) - map_elem.set("height", str(self.distance * 2)) - tree.write(self.map_xml_path) - self.logger.log(f"Map XML file saved to: {self.map_xml_path}.") - - def _prepare_weights(self): - """Prepares weights for textures from textures.json.""" - textures_path = os.path.join(self.working_directory, "textures.json") - textures = json.load(open(textures_path, "r")) - self.logger.log(f"Loaded {len(textures)} textures from {textures_path}.") - for texture_name, layer_numbers in textures.items(): - self._generate_weights(texture_name, layer_numbers) - self.logger.log(f"Generated weights for {len(textures)} textures.") - - global weight_files - weight_files = [ - os.path.join(self.weights_dir, f) - for f in os.listdir(self.weights_dir) - if f.endswith("_weight.png") - ] - self.logger.log(f"Fetched {len(weight_files)} weight files.") - - def _generate_weights(self, texture_name: str, layer_numbers: int) -> None: - """Generates weight files for textures. Each file is a numpy array of zeros and dtype uint8 (0-255). - - Args: - texture_name (str): Name of the texture. - layer_numbers (int): Number of layers in the texture. - """ - size = self.distance * 2 - postfix = "_weight.png" - if layer_numbers == 0: - filepaths = [os.path.join(self.weights_dir, texture_name + postfix)] - else: - filepaths = [ - os.path.join(self.weights_dir, texture_name + str(i).zfill(2) + postfix) - for i in range(1, layer_numbers + 1) - ] - - for filepath in filepaths: - img = np.zeros((size, size), dtype=np.uint8) - cv2.imwrite(filepath, img) - - class Layer: - """Class which represents a layer with textures and tags from textures.json. - It's using to obtain data from OSM using tags and make changes into corresponding textures. - - Args: - name (str): Name of the layer. - tags (dict[str, str | list[str]]): Dictionary of tags to search for. - width (int | None): Width of the polygon in meters (only for LineString). - color (tuple[int, int, int]): Color of the layer in BGR format. - - Attributes: - name (str): Name of the layer. - tags (dict[str, str | list[str]]): Dictionary of tags to search for. - width (int | None): Width of the polygon in meters (only for LineString). - paths (list[str]): List of paths to textures of the layer. - path (str): Path to the first texture of the layer. - """ - - def __init__( - self, - name: str, - tags: dict[str, str | list[str]], - width: int = None, - color: tuple[int, int, int] = None, - ): - self.name = name - self.tags = tags - self.width = width - self.color = color if color else (255, 255, 255) - self._get_paths() - self._check_shapes() - - def _get_paths(self): - """Gets paths to textures of the layer. - - Raises: - FileNotFoundError: If texture is not found. - """ - if self.name == "waterPuddle": - self.paths = [os.path.join(weights_dir, "waterPuddle_weight.png")] - return - pattern = re.compile(rf"{self.name}\d{{2}}_weight") - paths = [path for path in weight_files if pattern.search(path)] - if not paths: - raise FileNotFoundError(f"Texture not found: {self.name}") - self.paths = paths - - def _check_shapes(self) -> None: - """Checks if all textures of the layer have the same shape. - - Raises: - ValueError: If textures have different shapes. - """ - unique_shapes = set() - for path in self.paths: - img = cv2.imread(path, cv2.IMREAD_UNCHANGED) - unique_shapes.add(img.shape) - if len(unique_shapes) > 1: - raise ValueError(f"Texture {self.name} has multiple shapes: {unique_shapes}") - - @property - def path(self) -> str: - """Returns path to the first texture of the layer. - - Returns: - str: Path to the texture. - """ - return self.paths[0] - - @property - def layers(self) -> list[Layer]: - """Returns list of layers with textures and tags from textures.json. - - Returns: - list[Layer]: List of layers. - """ - asphalt = self.Layer( - "asphalt", {"highway": ["motorway", "trunk", "primary"]}, width=8, color=(70, 70, 70) - ) - concrete = self.Layer("concrete", {"building": True}, width=8, color=(130, 130, 130)) - dirtDark = self.Layer( - "dirtDark", - {"highway": ["unclassified", "residential", "track"]}, - width=2, - color=(33, 67, 101), - ) - grassDirt = self.Layer( - "grassDirt", {"natural": ["wood", "tree_row"]}, width=2, color=(0, 252, 124) - ) - grass = self.Layer("grass", {"natural": "grassland"}, color=(34, 255, 34)) - forestGround = self.Layer("forestGround", {"landuse": "farmland"}, color=(47, 107, 85)) - gravel = self.Layer( - "gravel", {"highway": ["secondary", "tertiary", "road"]}, width=4, color=(140, 180, 210) - ) - waterPuddle = self.Layer( - "waterPuddle", {"natural": "water", "waterway": True}, width=10, color=(255, 20, 20) - ) - return [asphalt, concrete, dirtDark, forestGround, grass, grassDirt, gravel, waterPuddle] - - def _read_parameters(self) -> None: - """Reads map parameters from OSM data, such as: - - minimum and maximum coordinates in UTM format - - map dimensions in meters - - map coefficients (meters per pixel) - """ - north, south, east, west = ox.utils_geo.bbox_from_point( - self.coordinates, dist=self.distance, project_utm=True - ) - # Parameters of the map in UTM format (meters). - self.minimum_x = min(west, east) - self.minimum_y = min(south, north) - self.maximum_x = max(west, east) - self.maximum_y = max(south, north) - self.logger.log(f"Map minimum coordinates (XxY): {self.minimum_x} x {self.minimum_y}.") - self.logger.log(f"Map maximum coordinates (XxY): {self.maximum_x} x {self.maximum_y}.") - - self.height = abs(north - south) - self.width = abs(east - west) - self.logger.log(f"Map dimensions (HxW): {self.height} x {self.width}.") - - self.height_coef = self.height / (self.distance * 2) - self.width_coef = self.width / (self.distance * 2) - self.logger.log(f"Map coefficients (HxW): {self.height_coef} x {self.width_coef}.") - - def _locate_map(self) -> None: - """Checks if map is in east or west hemisphere and in north or south hemisphere.""" - self.easting = self.minimum_x < 500000 - self.northing = self.minimum_y < 10000000 - self.logger.log(f"Map is in {'east' if self.easting else 'west'} of central meridian.") - self.logger.log(f"Map is in {'north' if self.northing else 'south'} hemisphere.") - - def get_relative_x(self, x: float) -> int: - """Converts UTM X coordinate to relative X coordinate in map image. - - Args: - x (float): UTM X coordinate. - - Returns: - int: Relative X coordinate in map image. - """ - if self.easting: - raw_x = x - self.minimum_x - else: - raw_x = self.minimum_x - x - return int(raw_x * self.height_coef) - - def get_relative_y(self, y: float) -> int: - """Converts UTM Y coordinate to relative Y coordinate in map image. - - Args: - y (float): UTM Y coordinate. - - Returns: - int: Relative Y coordinate in map image. - """ - if self.northing: - raw_y = y - self.minimum_y - else: - raw_y = self.minimum_y - y - return self.height - int(raw_y * self.width_coef) - - def _to_np(self, geometry: shapely.geometry.polygon.Polygon, *args) -> np.ndarray: - """Converts Polygon geometry to numpy array of polygon points. - - Args: - geometry (shapely.geometry.polygon.Polygon): Polygon geometry. - *args: Additional arguments: - - width (int | None): Width of the polygon in meters. - - Returns: - np.ndarray: Numpy array of polygon points. - """ - xs, ys = geometry.exterior.coords.xy - xs = [int(self.get_relative_x(x)) for x in xs.tolist()] - ys = [int(self.get_relative_y(y)) for y in ys.tolist()] - pairs = list(zip(xs, ys)) - return np.array(pairs, dtype=np.int32).reshape((-1, 1, 2)) - - def _to_polygon(self, obj: pd.core.series.Series, width: int | None) -> np.ndarray | None: - """Converts OSM object to numpy array of polygon points. - - Args: - obj (pd.core.series.Series): OSM object. - width (int | None): Width of the polygon in meters. - - Returns: - np.ndarray | None: Numpy array of polygon points. - """ - geometry = obj["geometry"] - geometry_type = geometry.geom_type - converter = self._converters(geometry_type) - if not converter: - self.logger.log(f"Geometry type {geometry_type} not supported.") - return - return converter(geometry, width) - - def _sequence( - self, - geometry: shapely.geometry.linestring.LineString | shapely.geometry.point.Point, - width: int | None, - ) -> np.ndarray: - """Converts LineString or Point geometry to numpy array of polygon points. - - Args: - geometry (shapely.geometry.linestring.LineString | shapely.geometry.point.Point): - LineString or Point geometry. - width (int | None): Width of the polygon in meters. - - Returns: - np.ndarray: Numpy array of polygon points. - """ - polygon = geometry.buffer(width) - return self._to_np(polygon) - - def _converters(self, geom_type: str) -> Callable[[shapely.geometry, int | None], np.ndarray]: - """Returns a converter function for a given geometry type. - - Args: - geom_type (str): Geometry type. - - Returns: - Callable[[shapely.geometry, int | None], np.ndarray]: Converter function. - """ - converters = {"Polygon": self._to_np, "LineString": self._sequence, "Point": self._sequence} - return converters.get(geom_type) - - def polygons( - self, tags: dict[str, str | list[str]], width: int | None - ) -> Generator[np.ndarray, None, None]: - """Generator which yields numpy arrays of polygons from OSM data. - - Args: - tags (dict[str, str | list[str]]): Dictionary of tags to search for. - width (int | None): Width of the polygon in meters (only for LineString). - - Yields: - Generator[np.ndarray, None, None]: Numpy array of polygon points. - """ - try: - with warnings.catch_warnings(): - warnings.simplefilter("ignore", DeprecationWarning) - objects = ox.features_from_bbox(*self.bbox, tags=tags) - except Exception as e: - self.logger.log(f"Error fetching objects for tags: {tags}.") - self.logger.log(e) - return - objects_utm = ox.project_gdf(objects, to_latlong=False) - self.logger.log(f"Fetched {len(objects_utm)} elements for tags: {tags}.") - - for index, obj in objects_utm.iterrows(): - polygon = self._to_polygon(obj, width) - if polygon is None: - continue - yield polygon - - def draw(self) -> None: - """Iterates over layers and fills them with polygons from OSM data.""" - for layer in self.layers: - img = cv2.imread(layer.path, cv2.IMREAD_UNCHANGED) - for polygon in self.polygons(layer.tags, layer.width): - cv2.fillPoly(img, [polygon], color=255) - cv2.imwrite(layer.path, img) - self.logger.log(f"Texture {layer.path} saved.") - - def _tile_info(self, lat: float, lon: float) -> tuple[str, str]: - """Returns latitude band and tile name for SRTM tile from coordinates. - - Args: - lat (float): Latitude. - lon (float): Longitude. - - Returns: - tuple[str, str]: Latitude band and tile name. - """ - tile_latitude = math.floor(lat) - tile_longitude = math.floor(lon) - - latitude_band = f"N{abs(tile_latitude):02d}" - if lon < 0: - tile_name = f"N{abs(tile_latitude):02d}W{abs(abs(tile_longitude)):03d}" - else: - tile_name = f"N{abs(tile_latitude):02d}E{abs(tile_longitude):03d}" - - self.logger.log(f"Detected tile name: {tile_name} for coordinates: lat {lat}, lon {lon}.") - self.tile_name = tile_name - return latitude_band, tile_name - - def _download_tile(self) -> str | None: - """Downloads SRTM tile from Amazon S3 using coordinates. - - Returns: - str: Path to compressed tile or None if download failed. - """ - latitude_band, tile_name = self._tile_info(*self.coordinates) - compressed_file_path = os.path.join(self.gz_dir, f"{tile_name}.hgt.gz") - url = SRTM.format(latitude_band=latitude_band, tile_name=tile_name) - self.logger.log(f"Trying to get response from {url}...") - response = requests.get(url, stream=True) - - if response.status_code == 200: - self.logger.log(f"Response received. Saving to {compressed_file_path}...") - with open(compressed_file_path, "wb") as f: - for chunk in response.iter_content(chunk_size=8192): - f.write(chunk) - self.logger.log("Compressed tile successfully downloaded.") - else: - self.logger.log(f"Response was failed with status code {response.status_code}.") - return - - return compressed_file_path - - def _srtm_tile(self) -> str | None: - """Determines SRTM tile name from coordinates downloads it if necessary, and decompresses it. - - Returns: - str: Path to decompressed tile or None if download failed. - """ - latitude_band, tile_name = self._tile_info(*self.coordinates) - self.logger.log(f"SRTM tile name {tile_name} from latitude band {latitude_band}.") - - decompressed_file_path = os.path.join(self.hgt_dir, f"{tile_name}.hgt") - if os.path.isfile(decompressed_file_path): - self.logger.log( - f"Decompressed tile already exists: {decompressed_file_path}, skipping download." - ) - return decompressed_file_path - - compressed_file_path = self._download_tile() - if not compressed_file_path: - self.logger.log("Download from SRTM failed, DEM file will be filled with zeros.") - return - with gzip.open(compressed_file_path, "rb") as f_in: - with open(decompressed_file_path, "wb") as f_out: - shutil.copyfileobj(f_in, f_out) - self.logger.log(f"Tile decompressed to {decompressed_file_path}.") - return decompressed_file_path - - def dem(self) -> None: - """Reads SRTM file, crops it to map size, normalizes and blurs it, saves to map directory.""" - north, south, east, west = ox.utils_geo.bbox_from_point( - self.coordinates, dist=self.distance - ) - max_y, min_y = max(north, south), min(north, south) - max_x, min_x = max(east, west), min(east, west) - - dem_output_resolution = (self.distance + 1, self.distance + 1) - - tile_path = self._srtm_tile() - if not tile_path: - self.logger.log("Tile was not downloaded, DEM file will be filled with zeros.") - self._save_empty_dem(dem_output_resolution) - return - - with rasterio.open(tile_path) as src: - self.logger.log(f"Opened tile, shape: {src.shape}, dtype: {src.dtypes[0]}.") - window = rasterio.windows.from_bounds(min_x, min_y, max_x, max_y, src.transform) - self.logger.log( - f"Window parameters. Column offset: {window.col_off}, row offset: {window.row_off}, " - f"width: {window.width}, height: {window.height}." - ) - data = src.read(1, window=window) - - if not data.size > 0: - self.logger.log("DEM data is empty, DEM file will be filled with zeros.") - self._save_empty_dem(dem_output_resolution) - return - - self.logger.log( - f"DEM data was read from SRTM file. Shape: {data.shape}, dtype: {data.dtype}. " - f"Min: {data.min()}, max: {data.max()}." - ) - - normalized_data = self._normalize_dem(data) - - resampled_data = cv2.resize( - normalized_data, dem_output_resolution, interpolation=cv2.INTER_LINEAR - ) - self.logger.log( - f"DEM data was resampled. Shape: {resampled_data.shape}, dtype: {resampled_data.dtype}. " - f"Min: {resampled_data.min()}, max: {resampled_data.max()}." - ) - - blur_seed = self.dem_settings.blur_seed - blurred_data = cv2.GaussianBlur(resampled_data, (blur_seed, blur_seed), 0) - cv2.imwrite(self.map_dem_path, blurred_data) - self.logger.log(f"DEM data was blurred and saved to {self.map_dem_path}.") - - def _save_empty_dem(self, dem_output_resolution: tuple[int, int]) -> None: - """Saves empty DEM file filled with zeros.""" - dem_data = np.zeros(dem_output_resolution, dtype="uint16") - cv2.imwrite(self.map_dem_path, dem_data) - self.logger.log(f"DEM data filled with zeros and saved to {self.map_dem_path}.") - - def _normalize_dem(self, data: np.ndarray) -> np.ndarray: - """Normalize DEM data to 16-bit unsigned integer using max height from settings. - - Args: - data (np.ndarray): DEM data from SRTM file after cropping. - - Returns: - np.ndarray: Normalized DEM data. - """ - max_dev = data.max() - data.min() - max_height = self.dem_settings.max_height - scaling_factor = max_dev / max_height if max_dev < max_height else 1 - adjusted_max_height = int(65535 * scaling_factor) - self.logger.log( - f"Maximum deviation: {max_dev}. Scaling factor: {scaling_factor}. " - f"Adjusted max height: {adjusted_max_height}." - ) - normalized_data = ( - (data - data.min()) / (data.max() - data.min()) * adjusted_max_height - ).astype("uint16") - self.logger.log( - f"DEM data was normalized to {normalized_data.min()} - {normalized_data.max()}." - ) - return normalized_data - - def pack(self) -> str: - """Packs map directory to zip archive. - - Returns: - str: Path to the archive. - """ - archives_dir = os.path.join(self.working_directory, "archives") - os.makedirs(archives_dir, exist_ok=True) - archive_name = self.name if self.name else "map" - archive_name += ".zip" - archive_path = os.path.join(archives_dir, archive_name) - self.logger.log(f"Packing map to {archive_path}...") - shutil.make_archive(archive_path[:-4], "zip", self.output_dir) - self.logger.log(f"Map packed to {archive_path}.") - return archive_path - - def preview(self) -> str: - """Merges layers into one image and saves it to previews directory. - - Returns: - str: Path to the preview. - """ - preview_size = (2048, 2048) - images = [ - cv2.resize(cv2.imread(layer.path, cv2.IMREAD_UNCHANGED), preview_size) - for layer in self.layers - ] - colors = [layer.color for layer in self.layers] - color_images = [] - for img, color in zip(images, colors): - color_img = np.zeros((img.shape[0], img.shape[1], 3), dtype=np.uint8) - color_img[img > 0] = color - color_images.append(color_img) - merged = np.sum(color_images, axis=0, dtype=np.uint8) - self.logger.log( - f"Merged layers into one image. Shape: {merged.shape}, dtype: {merged.dtype}." - ) - previews_dir = os.path.join(self.working_directory, "previews") - os.makedirs(previews_dir, exist_ok=True) - preview_name = self.name if self.name else "preview" - preview_name += ".png" - preview_path = os.path.join(previews_dir, preview_name) - cv2.imwrite(preview_path, merged) - self.logger.log(f"Preview saved to {preview_path}.") - return preview_path diff --git a/textures.json b/textures.json deleted file mode 100644 index 976dabf..0000000 --- a/textures.json +++ /dev/null @@ -1,20 +0,0 @@ -{ - "animalMud": 4, - "asphalt": 4, - "cobbleStone": 4, - "concrete": 4, - "concreteRubble": 4, - "concreteTiles": 4, - "dirt": 4, - "dirtDark": 2, - "forestGround": 4, - "forestGroundLeaves": 4, - "grass": 4, - "grassDirt": 4, - "gravel": 4, - "groundBricks": 4, - "mountainRock": 4, - "mountainRockDark": 4, - "riverMud": 4, - "waterPuddle": 0 -}