diff --git a/entrypoint.sh b/entrypoint.sh index a5b935a..63db785 100755 --- a/entrypoint.sh +++ b/entrypoint.sh @@ -34,8 +34,14 @@ echo "Load waarnemingen observation data via: python manage.py load_waarnemingen # Start Gunicorn echo "Starting Gunicorn..." gunicorn --workers 3 \ + --worker-class gthread \ + --threads 4 \ + --worker-connections 1000 \ --timeout 1800 \ + --graceful-timeout 300 \ --keep-alive 65 \ + --max-requests 1000 \ + --max-requests-jitter 50 \ --bind 0.0.0.0:8000 \ vespadb.wsgi:application & diff --git a/nginx.conf b/nginx.conf index 1946f62..5bd3146 100644 --- a/nginx.conf +++ b/nginx.conf @@ -113,4 +113,4 @@ http { proxy_busy_buffers_size 256k; } } -} \ No newline at end of file +} diff --git a/poetry.lock b/poetry.lock index f403b7d..c30e111 100644 --- a/poetry.lock +++ b/poetry.lock @@ -726,6 +726,20 @@ files = [ asgiref = ">=3.6" django = ">=4.2" +[[package]] +name = "django-extensions" +version = "3.2.3" +description = "Extensions for Django" +optional = false +python-versions = ">=3.6" +files = [ + {file = "django-extensions-3.2.3.tar.gz", hash = "sha256:44d27919d04e23b3f40231c4ab7af4e61ce832ef46d610cc650d53e68328410a"}, + {file = "django_extensions-3.2.3-py3-none-any.whl", hash = "sha256:9600b7562f79a92cbf1fde6403c04fee314608fefbb595502e34383ae8203401"}, +] + +[package.dependencies] +Django = ">=3.2" + [[package]] name = "django-filter" version = "23.5" @@ -2796,4 +2810,4 @@ brotli = ["brotli"] [metadata] lock-version = "2.0" python-versions = ">=3.11.6,<4.0" -content-hash = "55104070732a4b70d487543187b0ef0936fbc4f1ac660a7886bd4508ec4260d4" +content-hash = "550711048bca44df4e5917c00b7b05b0016617e390d57297ef3285cd214347b0" diff --git a/pyproject.toml b/pyproject.toml index 362ed32..9aa4a70 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -46,6 +46,7 @@ types-python-dateutil = "^2.9.0.20240316" whitenoise = "^6.6.0" django-ses = "^4.2.0" tenacity = "^9.0.0" +django-extensions = "^3.2.3" [tool.poetry.group.dev.dependencies] # https://python-poetry.org/docs/master/managing-dependencies/ coverage = { extras = ["toml"], version = ">=7.4.1" } ipython = ">=8.20.0" diff --git a/src/components/MapPage.vue b/src/components/MapPage.vue index 5d76b27..1f2c61d 100644 --- a/src/components/MapPage.vue +++ b/src/components/MapPage.vue @@ -15,6 +15,9 @@
Observaties laden...
+
+ Exporteren... +
Gerapporteerd nest @@ -61,6 +64,7 @@ export default { setup() { const vespaStore = useVespaStore(); const searchQuery = ref(''); + const isExporting = computed(() => vespaStore.isExporting); const router = useRouter(); const selectedObservation = computed(() => vespaStore.selectedObservation); const isEditing = computed(() => vespaStore.isEditing); @@ -327,6 +331,7 @@ export default { updateMarkerColor, searchQuery, searchAddress, + isExporting, }; }, }; diff --git a/src/components/NavbarComponent.vue b/src/components/NavbarComponent.vue index f1f4130..b11420c 100644 --- a/src/components/NavbarComponent.vue +++ b/src/components/NavbarComponent.vue @@ -19,7 +19,15 @@ Export
@@ -67,6 +75,7 @@ export default { const isModalVisible = ref(false); const modalTitle = ref(''); const modalMessage = ref(''); + const isExporting = computed(() => vespaStore.isExporting); watch(() => vespaStore.error, (newError) => { if (newError) { @@ -94,10 +103,18 @@ export default { }; const exportData = async (format) => { - await vespaStore.exportData(format); - }; + try { + if (vespaStore.isExporting) return; + + await vespaStore.exportData(format); + } catch (error) { + modalTitle.value = 'Error'; + modalMessage.value = 'Er is een fout opgetreden tijdens het exporteren.'; + isModalVisible.value = true; + } + }; - return { isLoggedIn, loadingAuth, username, logout, navigateToChangePassword, exportData, fileInput, isModalVisible, modalTitle, modalMessage }; + return { isLoggedIn, loadingAuth, username, logout, navigateToChangePassword, exportData, fileInput, isModalVisible, modalTitle, modalMessage, isExporting }; }, mounted() { var dropdownElementList = [].slice.call(document.querySelectorAll('.dropdown-toggle')); diff --git a/src/stores/vespaStore.js b/src/stores/vespaStore.js index d132687..26d3205 100644 --- a/src/stores/vespaStore.js +++ b/src/stores/vespaStore.js @@ -27,6 +27,7 @@ export const useVespaStore = defineStore('vespaStore', { isEditing: false, map: null, viewMode: 'map', + isExporting: false, filters: { municipalities: [], provinces: [], @@ -307,21 +308,55 @@ export const useVespaStore = defineStore('vespaStore', { } }, async exportData(format) { - const filterQuery = this.createFilterQuery(); - const url = `/observations/export?export_format=${format}&${filterQuery}`; - try { - const response = await ApiService.get(url, { responseType: 'blob' }); - const blob = new Blob([response.data], { type: response.headers['content-type'] }); - const downloadUrl = window.URL.createObjectURL(blob); - const link = document.createElement('a'); - link.href = downloadUrl; - link.setAttribute('download', `export.${format}`); - document.body.appendChild(link); - link.click(); - link.remove(); + this.isExporting = true; // Start loading indicator + const response = await ApiService.get( + `/observations/export?${this.createFilterQuery()}` + ); + + if (response.status === 200) { + const { export_id } = response.data; + + const checkStatus = async () => { + const statusResponse = await ApiService.get( + `/observations/export_status?export_id=${export_id}` + ); + + if (statusResponse.data.status === 'completed') { + const downloadResponse = await ApiService.get( + `/observations/download_export/?export_id=${export_id}`, + { responseType: 'blob' } + ); + + const blob = new Blob([downloadResponse.data], { type: 'text/csv' }); + const url = window.URL.createObjectURL(blob); + const link = document.createElement('a'); + link.href = url; + link.setAttribute('download', `observations_export_${new Date().getTime()}.csv`); + document.body.appendChild(link); + link.click(); + document.body.removeChild(link); + window.URL.revokeObjectURL(url); + this.isExporting = false; // Stop loading indicator + return true; + } else if (statusResponse.data.status === 'failed') { + this.isExporting = false; // Stop loading indicator on error + throw new Error(statusResponse.data.error || 'Export failed'); + } + + return new Promise(resolve => { + setTimeout(async () => { + resolve(await checkStatus()); + }, 2000); + }); + }; + + await checkStatus(); + } } catch (error) { + this.isExporting = false; // Stop loading indicator on error console.error('Error exporting data:', error); + throw error; } }, async fetchMunicipalitiesByProvinces(provinceIds) { diff --git a/vespadb/observations/migrations/0033_export.py b/vespadb/observations/migrations/0033_export.py new file mode 100644 index 0000000..b3b0306 --- /dev/null +++ b/vespadb/observations/migrations/0033_export.py @@ -0,0 +1,31 @@ +# Generated by Django 5.1.4 on 2024-12-18 16:03 + +import django.db.models.deletion +from django.conf import settings +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('observations', '0032_rename_wn_notes_observation_notes'), + migrations.swappable_dependency(settings.AUTH_USER_MODEL), + ] + + operations = [ + migrations.CreateModel( + name='Export', + fields=[ + ('id', models.AutoField(primary_key=True, serialize=False)), + ('filters', models.JSONField(default=dict, help_text='Filters applied to the export')), + ('status', models.CharField(choices=[('pending', 'Pending'), ('processing', 'Processing'), ('completed', 'Completed'), ('failed', 'Failed')], default='pending', help_text='Status of the export', max_length=20)), + ('progress', models.IntegerField(default=0, help_text='Progress percentage of the export')), + ('file_path', models.CharField(blank=True, help_text='Path to the exported file', max_length=255, null=True)), + ('created_at', models.DateTimeField(auto_now_add=True, help_text='Datetime when the export was created')), + ('completed_at', models.DateTimeField(blank=True, help_text='Datetime when the export was completed', null=True)), + ('error_message', models.TextField(blank=True, help_text='Error message if the export failed', null=True)), + ('task_id', models.CharField(blank=True, help_text='Celery task ID for the export', max_length=255, null=True)), + ('user', models.ForeignKey(blank=True, help_text='User who requested the export', null=True, on_delete=django.db.models.deletion.SET_NULL, to=settings.AUTH_USER_MODEL)), + ], + ), + ] diff --git a/vespadb/observations/models.py b/vespadb/observations/models.py index a6c9684..55ab414 100644 --- a/vespadb/observations/models.py +++ b/vespadb/observations/models.py @@ -389,3 +389,29 @@ def save(self, *args: Any, **kwargs: Any) -> None: self.province = municipality.province if municipality else None super().save(*args, **kwargs) + +class Export(models.Model): + """Model for tracking observation exports.""" + STATUS_CHOICES = ( + ('pending', 'Pending'), + ('processing', 'Processing'), + ('completed', 'Completed'), + ('failed', 'Failed'), + ) + + id = models.AutoField(primary_key=True) + user = models.ForeignKey( + settings.AUTH_USER_MODEL, + on_delete=models.SET_NULL, + null=True, + blank=True, + help_text="User who requested the export", + ) + filters = models.JSONField(default=dict, help_text="Filters applied to the export") + status = models.CharField(max_length=20, choices=STATUS_CHOICES, default='pending', help_text="Status of the export") + progress = models.IntegerField(default=0, help_text="Progress percentage of the export") + file_path = models.CharField(max_length=255, blank=True, null=True, help_text="Path to the exported file") + created_at = models.DateTimeField(auto_now_add=True, help_text="Datetime when the export was created") + completed_at = models.DateTimeField(blank=True, null=True, help_text="Datetime when the export was completed") + error_message = models.TextField(blank=True, null=True, help_text="Error message if the export failed") + task_id = models.CharField(max_length=255, blank=True, null=True, help_text="Celery task ID for the export") diff --git a/vespadb/observations/serializers.py b/vespadb/observations/serializers.py index 5f0cedb..71332aa 100644 --- a/vespadb/observations/serializers.py +++ b/vespadb/observations/serializers.py @@ -13,7 +13,7 @@ from rest_framework.request import Request from vespadb.observations.helpers import parse_and_convert_to_cet, parse_and_convert_to_utc -from vespadb.observations.models import EradicationResultEnum, Municipality, Observation, Province +from vespadb.observations.models import EradicationResultEnum, Municipality, Observation, Province, Export from vespadb.observations.utils import get_municipality_from_coordinates from vespadb.users.models import VespaUser @@ -484,3 +484,9 @@ class Meta: model = Province fields = ["id", "name"] + + +class ExportSerializer(serializers.ModelSerializer): + class Meta: + model = Export + fields = '__all__' diff --git a/vespadb/observations/tasks/generate_export.py b/vespadb/observations/tasks/generate_export.py new file mode 100644 index 0000000..21aef1f --- /dev/null +++ b/vespadb/observations/tasks/generate_export.py @@ -0,0 +1,230 @@ +import csv +import logging +from datetime import datetime, timedelta +from typing import Optional, Dict, Any, List, Set, Iterator +from django.core.cache import cache +from django.db import models, transaction +from django.utils import timezone +from celery import shared_task +from vespadb.observations.models import Observation, Export +from vespadb.users.models import VespaUser as User +from vespadb.observations.serializers import user_read_fields, public_read_fields + +logger = logging.getLogger(__name__) + +CSV_HEADERS = [ + "id", "created_datetime", "modified_datetime", "latitude", "longitude", + "source", "source_id", "nest_height", "nest_size", "nest_location", + "nest_type", "observation_datetime", "province", "eradication_date", + "municipality", "images", "anb_domain", "notes", "eradication_result", + "wn_id", "wn_validation_status", "nest_status" +] + +class Echo: + """An object that implements just the write method of the file-like interface.""" + def write(self, value): + """Write the value by returning it, instead of storing in a buffer.""" + return value + +def get_status(observation: Observation) -> str: + """Get observation status string.""" + if observation.eradication_result: + return "eradicated" + if observation.reserved_by: + return "reserved" + return "untreated" + +def _prepare_row_data( + observation: Observation, + is_admin: bool, + user_municipality_ids: Set[str] +) -> List[str]: + """ + Prepare a single row of data for the CSV export with error handling. + """ + try: + # Determine allowed fields based on permissions + if is_admin or (observation.municipality_id in user_municipality_ids): + allowed_fields = user_read_fields + else: + allowed_fields = public_read_fields + + allowed_fields.extend(["source_id", "latitude", "longitude", "anb_domain", "nest_status"]) + + row_data = [] + for field in CSV_HEADERS: + try: + if field not in allowed_fields: + row_data.append("") + continue + + if field == "latitude": + row_data.append(str(observation.location.y) if observation.location else "") + elif field == "longitude": + row_data.append(str(observation.location.x) if observation.location else "") + elif field in ["created_datetime", "modified_datetime", "observation_datetime"]: + datetime_val = getattr(observation, field, None) + if datetime_val: + datetime_val = datetime_val.replace(microsecond=0) + row_data.append(datetime_val.isoformat() + "Z") + else: + row_data.append("") + elif field == "province": + row_data.append(observation.province.name if observation.province else "") + elif field == "municipality": + row_data.append(observation.municipality.name if observation.municipality else "") + elif field == "anb_domain": + row_data.append(str(observation.anb)) + elif field == "nest_status": + row_data.append(get_status(observation)) + elif field == "source_id": + row_data.append(str(observation.source_id) if observation.source_id is not None else "") + else: + value = getattr(observation, field, "") + row_data.append(str(value) if value is not None else "") + except Exception as e: + logger.warning(f"Error processing field {field} for observation {observation.id}: {str(e)}") + row_data.append("") + + return row_data + except Exception as e: + logger.error(f"Error preparing row data for observation {observation.id}: {str(e)}") + return [""] * len(CSV_HEADERS) + +def parse_boolean(value: str) -> bool: + """ + Convert a string value to a boolean. + """ + if isinstance(value, bool): + return value + if isinstance(value, str): + value_lower = value.lower() + if value_lower in {"true", "1"}: + return True + elif value_lower in {"false", "0"}: + return False + raise ValueError(f"Invalid boolean value: {value}") + +def generate_rows(queryset, is_admin: bool, user_municipality_ids: set) -> Iterator[List[str]]: + """Generate rows for CSV streaming.""" + # First yield the headers + yield CSV_HEADERS + + # Then yield the data rows + for observation in queryset: + try: + row = _prepare_row_data(observation, is_admin, user_municipality_ids) + yield row + except Exception as e: + logger.error(f"Error processing observation {observation.id}: {str(e)}") + continue + +@shared_task( + name="generate_export", + max_retries=3, + default_retry_delay=60, + soft_time_limit=1700, + time_limit=1800, + acks_late=True +) +def generate_export(export_id: int, filters: Dict[str, Any], user_id: Optional[int] = None) -> Dict[str, Any]: + """ + Generate CSV export of observations based on filters. + + Args: + export_id: ID of the Export record + filters: Dictionary of filters to apply to the queryset + user_id: Optional ID of the user requesting the export + + Returns: + Dictionary containing export status and details + """ + logger.info(f"Starting export {export_id} for user {user_id}") + export = Export.objects.get(id=export_id) + + try: + # Update export status + export.status = 'processing' + export.save() + logger.info(f"Export {export_id} status set to processing") + + # Validate and preprocess filters + valid_fields = {field.name: field for field in Observation._meta.get_fields()} + processed_filters = {} + for key, value in filters.items(): + if key in valid_fields: + field = valid_fields[key] + if isinstance(field, models.BooleanField): + try: + processed_filters[key] = parse_boolean(value) + except ValueError: + logger.error(f"Invalid boolean value for filter {key}: {value}") + continue + else: + processed_filters[key] = value + + # Prepare queryset with optimizations + queryset = (Observation.objects + .filter(**processed_filters) + .select_related('province', 'municipality', 'reserved_by') + .order_by('id')) + + total = queryset.count() + processed = 0 + + is_admin = False + user_municipality_ids = set() + if user_id: + try: + user = User.objects.get(id=user_id) + is_admin = user.is_superuser + user_municipality_ids = set(user.municipalities.values_list('id', flat=True)) + except User.DoesNotExist: + pass + + logger.info(f"Processing {total} observations for export {export_id}") + + # Generate CSV data + rows = list(generate_rows(queryset, is_admin, user_municipality_ids)) + + # Store in cache + cache_key = f'export_{export_id}_data' + cache.set(cache_key, rows, timeout=3600) # Store for 1 hour + + # Update export record + with transaction.atomic(): + export.status = 'completed' + export.completed_at = timezone.now() + export.progress = 100 + export.save() + + logger.info(f"Export {export_id} completed successfully") + + return { + 'status': 'completed', + 'cache_key': cache_key, + 'total_processed': total + } + + except Exception as e: + logger.exception(f"Export {export_id} failed: {str(e)}") + export.status = 'failed' + export.error_message = str(e) + export.save() + raise + +@shared_task +def cleanup_old_exports() -> None: + """Clean up exports older than 24 hours.""" + logger.info("Starting cleanup of old exports") + cutoff = timezone.now() - timedelta(days=1) + old_exports = Export.objects.filter(created_at__lt=cutoff) + + for export in old_exports: + # Remove from cache if exists + cache_key = f'export_{export.id}_data' + cache.delete(cache_key) + + # Delete the export record + export.delete() + logger.info(f"Cleaned up export {export.id}") diff --git a/vespadb/observations/views.py b/vespadb/observations/views.py index 9e5f00a..28b6b9b 100644 --- a/vespadb/observations/views.py +++ b/vespadb/observations/views.py @@ -1,50 +1,36 @@ """Views for the observations app.""" + +import csv import datetime import io import json +import time import logging -import json import csv -from typing import TYPE_CHECKING, Any, Any, Union, TextIO, Union, List, Set, Optional -import datetime -import tempfile +import json +from typing import TYPE_CHECKING, Any, Union, Optional +from django.conf import settings +from django.http import FileResponse, HttpResponseNotFound import os -import logging -from tenacity import retry, stop_after_attempt, wait_exponential -from tenacity import ( - retry, - stop_after_attempt, - wait_exponential, - retry_if_exception_type, - before_log, - after_log, -) -import time -from typing import Generator, Optional -from django.db import OperationalError, connection, transaction -from django.core.exceptions import ValidationError -import psycopg2 -from django.http import FileResponse -import os -import tempfile +from django.conf import settings from django.contrib.gis.db.models.functions import Transform from django.contrib.gis.geos import GEOSGeometry from django.core.cache import cache from django.core.exceptions import PermissionDenied, ValidationError from django.core.files.uploadedfile import InMemoryUploadedFile -from django.core.paginator import Paginator from django.db import transaction from django.db.models import CharField, OuterRef, QuerySet, Subquery, Value from django.db.models.functions import Coalesce from django.db.utils import IntegrityError -from django.http import HttpResponse, JsonResponse, StreamingHttpResponse, HttpRequest +from django.http import HttpResponse, JsonResponse, HttpRequest from django.db import connection from django.utils.decorators import method_decorator from django.utils.timezone import now from django.views.decorators.http import require_GET from django_filters.rest_framework import DjangoFilterBackend from django_ratelimit.decorators import ratelimit +from django.http import StreamingHttpResponse, HttpResponseBadRequest, HttpResponseNotFound, HttpResponseServerError from drf_yasg import openapi from drf_yasg.utils import swagger_auto_schema from geopy.exc import GeocoderServiceError, GeocoderTimedOut @@ -59,17 +45,20 @@ from rest_framework.serializers import BaseSerializer from rest_framework.viewsets import ModelViewSet, ReadOnlyModelViewSet from rest_framework_gis.filters import DistanceToPointFilter -from vespadb.observations.serializers import user_read_fields, public_read_fields from vespadb.observations.cache import invalidate_geojson_cache, invalidate_observation_cache from vespadb.observations.filters import ObservationFilter from vespadb.observations.helpers import parse_and_convert_to_utc -from vespadb.observations.models import Municipality, Observation, Province, EradicationResultEnum -from vespadb.observations.serializers import ( - MunicipalitySerializer, - ObservationSerializer, - ProvinceSerializer, -) +from vespadb.observations.models import Municipality, Observation, Province, Export +from vespadb.observations.models import Export +from vespadb.observations.tasks.generate_export import generate_export +from vespadb.observations.serializers import ObservationSerializer, MunicipalitySerializer, ProvinceSerializer + +from django.utils.decorators import method_decorator +from django_ratelimit.decorators import ratelimit +from rest_framework.decorators import action +from rest_framework.permissions import AllowAny +from django.shortcuts import get_object_or_404 if TYPE_CHECKING: from geopy.location import Location @@ -77,24 +66,23 @@ logger = logging.getLogger(__name__) logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") +class Echo: + """An object that implements just the write method of the file-like interface.""" + def write(self, value): + """Write the value by returning it, instead of storing in a buffer.""" + return value + + BBOX_LENGTH = 4 GEOJSON_REDIS_CACHE_EXPIRATION = 900 # 15 minutes GET_REDIS_CACHE_EXPIRATION = 86400 # 1 day +BATCH_SIZE = 150 CSV_HEADERS = [ "id", "created_datetime", "modified_datetime", "latitude", "longitude", "source", "source_id", "nest_height", "nest_size", "nest_location", "nest_type", "observation_datetime", "province", "eradication_date", "municipality", "images", "anb_domain", "notes", "eradication_result", "wn_id", "wn_validation_status", "nest_status" ] -BATCH_SIZE = 1000 -class ExportError(Exception): - """Custom exception for export-related errors.""" - pass - -class QueryTimeoutError(Exception): - """Custom exception for query timeout errors.""" - pass - class ObservationsViewSet(ModelViewSet): # noqa: PLR0904 """ViewSet for the Observation model.""" @@ -653,362 +641,130 @@ def save_observations(self, valid_data: list[dict[str, Any]]) -> Response: {"error": f"An error occurred during bulk import: {e!s}"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR ) - @retry( - stop=stop_after_attempt(3), - wait=wait_exponential(multiplier=1, min=4, max=10), - retry_error_callback=lambda retry_state: None - ) - def write_batch_to_file( - self, - writer: Any, - batch: List[Observation], - is_admin: bool, - user_municipality_ids: Set[str] - ) -> int: - """ - Write a batch of observations to the CSV file with retry logic. - Returns number of successfully written records. - """ - successful_writes = 0 - for observation in batch: - try: - row_data = self._prepare_row_data(observation, is_admin, user_municipality_ids) - writer.writerow(row_data) - successful_writes += 1 - except Exception as e: - logger.error(f"Error processing observation {observation.id}: {str(e)}") - continue - return successful_writes - - def _prepare_row_data( - self, - observation: Observation, - is_admin: bool, - user_municipality_ids: set[str] - ) -> list[str]: - """ - Prepare a single row of data for the CSV export with error handling. - """ - try: - # Determine allowed fields based on permissions - if is_admin or (observation.municipality_id in user_municipality_ids): - allowed_fields = user_read_fields - else: - allowed_fields = public_read_fields - - allowed_fields.extend(["source_id", "latitude", "longitude", "anb_domain", "nest_status"]) - - row_data = [] - for field in CSV_HEADERS: - try: - if field not in allowed_fields: - row_data.append("") - continue - - if field == "latitude": - row_data.append(str(observation.location.y) if observation.location else "") - elif field == "longitude": - row_data.append(str(observation.location.x) if observation.location else "") - elif field in ["created_datetime", "modified_datetime", "observation_datetime"]: - datetime_val = getattr(observation, field, None) - if datetime_val: - datetime_val = datetime_val.replace(microsecond=0) - row_data.append(datetime_val.isoformat() + "Z") - else: - row_data.append("") - elif field == "province": - row_data.append(observation.province.name if observation.province else "") - elif field == "municipality": - row_data.append(observation.municipality.name if observation.municipality else "") - elif field == "anb_domain": - row_data.append(str(observation.anb)) - elif field == "nest_status": - row_data.append(self.get_status(observation)) - elif field == "source_id": - row_data.append(str(observation.source_id) if observation.source_id is not None else "") - else: - value = getattr(observation, field, "") - row_data.append(str(value) if value is not None else "") - except Exception as e: - logger.warning(f"Error processing field {field} for observation {observation.id}: {str(e)}") - row_data.append("") - - return row_data - except Exception as e: - logger.error(f"Error preparing row data for observation {observation.id}: {str(e)}") - return [""] * len(CSV_HEADERS) # Return empty row in case of error - - @retry( - stop=stop_after_attempt(3), - wait=wait_exponential(multiplier=1, min=4, max=10), - retry=retry_if_exception_type((OperationalError, psycopg2.OperationalError)), - before=before_log(logger, logging.INFO), - after=before_log(logger, logging.INFO) - ) - def get_queryset_count(self, queryset: QuerySet) -> int: - """Get queryset count with retry logic.""" - try: - with transaction.atomic(), connection.cursor() as cursor: - cursor.execute('SET statement_timeout TO 30000') # 30 seconds timeout - return int(queryset.count()) - except (OperationalError, psycopg2.OperationalError) as e: - logger.error(f"Error getting queryset count: {str(e)}") - raise QueryTimeoutError("Query timed out while getting count") from e - - def get_chunk_with_retries( - self, - queryset: QuerySet, - start: int, - batch_size: int, - max_retries: int = 3 - ) -> Optional[List[Observation]]: - """Get a chunk of data with retries and error handling.""" - for attempt in range(max_retries): - try: - with transaction.atomic(), connection.cursor() as cursor: - cursor.execute('SET statement_timeout TO 30000') - chunk = list( - queryset.select_related( - 'province', - 'municipality', - 'reserved_by' - )[start:start + batch_size] - ) - return chunk - except (OperationalError, psycopg2.OperationalError) as e: - if attempt == max_retries - 1: - logger.error(f"Failed to get chunk after {max_retries} attempts: {str(e)}") - return None - wait_time = (2 ** attempt) * 1 # Exponential backoff - logger.warning(f"Retry {attempt + 1}/{max_retries} after {wait_time}s") - time.sleep(wait_time) - return None - - def _generate_csv_content( - self, - queryset: QuerySet, - is_admin: bool, - user_municipality_ids: Set[str] - ) -> Generator[str, None, None]: - """Generate CSV content in smaller chunks.""" - buffer = io.StringIO() - writer = csv.writer(buffer) - - # Write headers first - writer.writerow(CSV_HEADERS) - data = buffer.getvalue() - buffer.seek(0) - buffer.truncate() - yield data - - # Process in smaller chunks - chunk_size = 100 # Kleinere chunk size - total = queryset.count() - - for start in range(0, total, chunk_size): - chunk = queryset.select_related( - 'province', - 'municipality', - 'reserved_by' - )[start:start + chunk_size] - - for observation in chunk: - try: - row_data = self._prepare_row_data( - observation, - is_admin, - user_municipality_ids - ) - writer.writerow(row_data) - data = buffer.getvalue() - buffer.seek(0) - buffer.truncate() - yield data - except Exception as e: - logger.error(f"Error processing observation {observation.id}: {str(e)}") - continue - - buffer.close() - - def create_csv_generator( - self, - queryset: QuerySet, - is_admin: bool, - user_municipality_ids: Set[str], - batch_size: int = BATCH_SIZE - ) -> Generator[str, None, None]: - """Create a generator for CSV streaming with improved error handling.""" - buffer = io.StringIO() - writer = csv.writer(buffer) - - # Write headers - writer.writerow(CSV_HEADERS) - yield buffer.getvalue() - buffer.seek(0) - buffer.truncate(0) - - total_processed = 0 - successful_writes = 0 - error_count = 0 - - try: - total_count = self.get_queryset_count(queryset) - - # Process in chunks - start = 0 - while True: - chunk = self.get_chunk_with_retries(queryset, start, batch_size) - if not chunk: - break - - for observation in chunk: - try: - row_data = self._prepare_row_data( - observation, - is_admin, - user_municipality_ids - ) - writer.writerow(row_data) - successful_writes += 1 - except Exception as e: - error_count += 1 - logger.error(f"Error processing observation {observation.id}: {str(e)}") - if error_count > total_count * 0.1: # If more than 10% errors - raise ExportError("Too many errors during export") - continue - - data = buffer.getvalue() - yield data - buffer.seek(0) - buffer.truncate(0) - - total_processed += len(chunk) - progress = (total_processed / total_count) * 100 if total_count else 0 - logger.info( - f"Export progress: {progress:.1f}% ({total_processed}/{total_count}). " - f"Successful: {successful_writes}, Errors: {error_count}" - ) - - start += batch_size - - except Exception as e: - logger.exception("Error in CSV generator") - raise ExportError(f"Export failed: {str(e)}") from e - finally: - buffer.close() - @method_decorator(ratelimit(key="ip", rate="60/m", method="GET", block=True)) @action(detail=False, methods=["get"], permission_classes=[AllowAny]) - def export(self, request: HttpRequest) -> Union[FileResponse, JsonResponse]: - """Export observations as CSV using temporary file approach.""" - temp_file = None - temp_file_path = None - - try: - # Create temporary file - temp_file = tempfile.NamedTemporaryFile(mode='w+', delete=False, encoding='utf-8-sig') - temp_file_path = temp_file.name - - writer = csv.writer(temp_file) - writer.writerow(CSV_HEADERS) + def export(self, request: HttpRequest) -> JsonResponse: + """Initiate the export of observations and trigger a Celery task.""" + # Initialize the filterset + filterset = self.filterset_class(data=request.GET, queryset=self.get_queryset()) + + # Validate the filterset + if not filterset.is_valid(): + return JsonResponse({"error": filterset.errors}, status=400) + + # Prepare the filter parameters + filters = {key: value for key, value in request.GET.items()} + + # Create an Export record + export = Export.objects.create( + user=request.user if request.user.is_authenticated else None, + filters=filters, + status='pending', + ) - # Get user permissions - if request.user.is_authenticated: - user_municipality_ids = set(request.user.municipalities.values_list("id", flat=True)) - is_admin = request.user.is_superuser - else: - user_municipality_ids = set() - is_admin = False + # Trigger the Celery task + task = generate_export.delay( + export.id, + filters, + user_id=request.user.id if request.user.is_authenticated else None + ) + + # Update the Export record with the task ID + export.task_id = task.id + export.save() + + return JsonResponse({ + 'export_id': export.id, + 'task_id': task.id, + }) - # Get filtered queryset with optimizations - queryset = self.filter_queryset( - self.get_queryset().select_related('province', 'municipality', 'reserved_by') + @swagger_auto_schema( + operation_description="Check the status of an export.", + manual_parameters=[ + openapi.Parameter( + 'export_id', + openapi.IN_QUERY, + description="The ID of the export to check the status of.", + type=openapi.TYPE_INTEGER, + required=True, ) + ], + responses={ + 200: openapi.Schema( + type=openapi.TYPE_OBJECT, + properties={ + 'status': openapi.Schema(type=openapi.TYPE_STRING), + 'progress': openapi.Schema(type=openapi.TYPE_INTEGER), + 'error': openapi.Schema(type=openapi.TYPE_STRING, nullable=True), + 'download_url': openapi.Schema(type=openapi.TYPE_STRING, nullable=True), + }, + ), + 400: "Bad Request", + 404: "Export not found", + }, + ) + @action(detail=False, methods=["get"]) + def export_status(self, request: HttpRequest) -> JsonResponse: + """Check export status.""" + export_id = request.GET.get('export_id') + if not export_id: + logger.error("Export ID not provided") + return JsonResponse({"error": "Export ID is required"}, status=400) + + try: + export = get_object_or_404(Export, id=export_id) + except Exception as e: + logger.exception(f"Export ID {export_id} not found or invalid: {str(e)}") + return JsonResponse({"error": f"Export ID {export_id} not found"}, status=404) + + if export.status == 'completed': + download_url = request.build_absolute_uri(f'/observations/download_export/?export_id={export_id}') + return JsonResponse({ + 'status': 'completed', + 'download_url': download_url + }) + + return JsonResponse({ + 'status': export.status, + 'progress': export.progress, + 'error': export.error_message + }) - # Use much smaller chunk size - chunk_size = 100 - total_count = queryset.count() - processed = 0 + @action(detail=False, methods=["get"]) + def download_export(self, request: HttpRequest) -> Union[StreamingHttpResponse, HttpResponse]: + """Stream the export directly to the user.""" + export_id = request.GET.get('export_id') + if not export_id: + return HttpResponseBadRequest("Export ID is required") - # Process in chunks with periodic flushes - for start in range(0, total_count, chunk_size): - chunk = queryset[start:start + chunk_size] - - for observation in chunk: - try: - row_data = self._prepare_row_data( - observation, - is_admin, - user_municipality_ids - ) - writer.writerow(row_data) - except Exception as e: - logger.error(f"Error processing observation {observation.id}: {str(e)}") - continue - - # Flush after each chunk - temp_file.flush() - os.fsync(temp_file.fileno()) - - processed += len(chunk) - logger.info(f"Export progress: {(processed/total_count)*100:.1f}%") - - # Make sure all data is written and file is closed - temp_file.flush() - os.fsync(temp_file.fileno()) - temp_file.close() - - # Open the file for reading and create response - response = FileResponse( - open(temp_file_path, 'rb'), + try: + export = Export.objects.get(id=export_id) + if export.status != 'completed': + return HttpResponseBadRequest("Export is not ready") + + # Get the data iterator from cache + cache_key = f'export_{export_id}_data' + rows = cache.get(cache_key) + if not rows: + return HttpResponseNotFound("Export data not found or expired") + + # Create the streaming response + pseudo_buffer = Echo() + writer = csv.writer(pseudo_buffer) + response = StreamingHttpResponse( + (writer.writerow(row) for row in rows), content_type='text/csv' ) - # Set explicit headers - filename = f"observations_export_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" - response['Content-Disposition'] = f'attachment; filename="{filename}"; filename*=UTF-8\'\'{filename}' - response['Content-Type'] = 'text/csv; charset=utf-8' - response['Content-Length'] = os.path.getsize(temp_file_path) - response['Cache-Control'] = 'no-cache, no-store, must-revalidate' - response['Pragma'] = 'no-cache' - response['Expires'] = '0' - response['X-Accel-Buffering'] = 'no' - - # Schedule file cleanup after response is sent - def cleanup_temp_file(response): - try: - os.unlink(temp_file_path) - except: - pass - return response - - response.close = cleanup_temp_file.__get__(response, FileResponse) - + timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S') + response['Content-Disposition'] = f'attachment; filename="observations_export_{timestamp}.csv"' return response + except Export.DoesNotExist: + return HttpResponseNotFound("Export not found") except Exception as e: - logger.exception("Export failed") - # Cleanup in case of error - if temp_file: - temp_file.close() - if temp_file_path and os.path.exists(temp_file_path): - try: - os.unlink(temp_file_path) - except: - pass - return JsonResponse( - {"error": f"Export failed: {str(e)}. Please try again or contact support."}, - status=500 - ) - - def get_status(self, observation: Observation) -> str: - """Determine observation status based on eradication data.""" - logger.debug("Getting status for observation %s", observation.eradication_result) - if observation.eradication_result: - return "eradicated" - if observation.reserved_by: - return "reserved" - return "untreated" + logger.error(f"Error streaming export: {str(e)}") + return HttpResponseServerError("Error generating export") @require_GET def search_address(request: Request) -> JsonResponse: diff --git a/vespadb/settings.py b/vespadb/settings.py index 6519d11..7682211 100644 --- a/vespadb/settings.py +++ b/vespadb/settings.py @@ -57,6 +57,7 @@ "django.contrib.messages", "django.contrib.staticfiles", "django_filters", + 'django_extensions', "django_celery_beat", "django_celery_results", "rest_framework", @@ -269,3 +270,6 @@ AWS_SES_REGION_ENDPOINT = "email.eu-west-1.amazonaws.com" DEFAULT_FROM_EMAIL = secrets["DEFAULT_FROM_EMAIL"] SERVER_EMAIL = secrets["DEFAULT_FROM_EMAIL"] +MEDIA_URL = '/media/' +MEDIA_ROOT = os.path.join(BASE_DIR, 'media') +EXPORTS_DIR = os.path.join(MEDIA_ROOT, 'exports')