diff --git a/entrypoint.sh b/entrypoint.sh
index a5b935a..63db785 100755
--- a/entrypoint.sh
+++ b/entrypoint.sh
@@ -34,8 +34,14 @@ echo "Load waarnemingen observation data via: python manage.py load_waarnemingen
# Start Gunicorn
echo "Starting Gunicorn..."
gunicorn --workers 3 \
+ --worker-class gthread \
+ --threads 4 \
+ --worker-connections 1000 \
--timeout 1800 \
+ --graceful-timeout 300 \
--keep-alive 65 \
+ --max-requests 1000 \
+ --max-requests-jitter 50 \
--bind 0.0.0.0:8000 \
vespadb.wsgi:application &
diff --git a/nginx.conf b/nginx.conf
index 1946f62..5bd3146 100644
--- a/nginx.conf
+++ b/nginx.conf
@@ -113,4 +113,4 @@ http {
proxy_busy_buffers_size 256k;
}
}
-}
\ No newline at end of file
+}
diff --git a/poetry.lock b/poetry.lock
index f403b7d..c30e111 100644
--- a/poetry.lock
+++ b/poetry.lock
@@ -726,6 +726,20 @@ files = [
asgiref = ">=3.6"
django = ">=4.2"
+[[package]]
+name = "django-extensions"
+version = "3.2.3"
+description = "Extensions for Django"
+optional = false
+python-versions = ">=3.6"
+files = [
+ {file = "django-extensions-3.2.3.tar.gz", hash = "sha256:44d27919d04e23b3f40231c4ab7af4e61ce832ef46d610cc650d53e68328410a"},
+ {file = "django_extensions-3.2.3-py3-none-any.whl", hash = "sha256:9600b7562f79a92cbf1fde6403c04fee314608fefbb595502e34383ae8203401"},
+]
+
+[package.dependencies]
+Django = ">=3.2"
+
[[package]]
name = "django-filter"
version = "23.5"
@@ -2796,4 +2810,4 @@ brotli = ["brotli"]
[metadata]
lock-version = "2.0"
python-versions = ">=3.11.6,<4.0"
-content-hash = "55104070732a4b70d487543187b0ef0936fbc4f1ac660a7886bd4508ec4260d4"
+content-hash = "550711048bca44df4e5917c00b7b05b0016617e390d57297ef3285cd214347b0"
diff --git a/pyproject.toml b/pyproject.toml
index 362ed32..9aa4a70 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -46,6 +46,7 @@ types-python-dateutil = "^2.9.0.20240316"
whitenoise = "^6.6.0"
django-ses = "^4.2.0"
tenacity = "^9.0.0"
+django-extensions = "^3.2.3"
[tool.poetry.group.dev.dependencies] # https://python-poetry.org/docs/master/managing-dependencies/
coverage = { extras = ["toml"], version = ">=7.4.1" }
ipython = ">=8.20.0"
diff --git a/src/components/MapPage.vue b/src/components/MapPage.vue
index 5d76b27..1f2c61d 100644
--- a/src/components/MapPage.vue
+++ b/src/components/MapPage.vue
@@ -15,6 +15,9 @@
Gerapporteerd nest
@@ -61,6 +64,7 @@ export default {
setup() {
const vespaStore = useVespaStore();
const searchQuery = ref('');
+ const isExporting = computed(() => vespaStore.isExporting);
const router = useRouter();
const selectedObservation = computed(() => vespaStore.selectedObservation);
const isEditing = computed(() => vespaStore.isEditing);
@@ -327,6 +331,7 @@ export default {
updateMarkerColor,
searchQuery,
searchAddress,
+ isExporting,
};
},
};
diff --git a/src/components/NavbarComponent.vue b/src/components/NavbarComponent.vue
index f1f4130..b11420c 100644
--- a/src/components/NavbarComponent.vue
+++ b/src/components/NavbarComponent.vue
@@ -19,7 +19,15 @@
Export
@@ -67,6 +75,7 @@ export default {
const isModalVisible = ref(false);
const modalTitle = ref('');
const modalMessage = ref('');
+ const isExporting = computed(() => vespaStore.isExporting);
watch(() => vespaStore.error, (newError) => {
if (newError) {
@@ -94,10 +103,18 @@ export default {
};
const exportData = async (format) => {
- await vespaStore.exportData(format);
- };
+ try {
+ if (vespaStore.isExporting) return;
+
+ await vespaStore.exportData(format);
+ } catch (error) {
+ modalTitle.value = 'Error';
+ modalMessage.value = 'Er is een fout opgetreden tijdens het exporteren.';
+ isModalVisible.value = true;
+ }
+ };
- return { isLoggedIn, loadingAuth, username, logout, navigateToChangePassword, exportData, fileInput, isModalVisible, modalTitle, modalMessage };
+ return { isLoggedIn, loadingAuth, username, logout, navigateToChangePassword, exportData, fileInput, isModalVisible, modalTitle, modalMessage, isExporting };
},
mounted() {
var dropdownElementList = [].slice.call(document.querySelectorAll('.dropdown-toggle'));
diff --git a/src/stores/vespaStore.js b/src/stores/vespaStore.js
index d132687..26d3205 100644
--- a/src/stores/vespaStore.js
+++ b/src/stores/vespaStore.js
@@ -27,6 +27,7 @@ export const useVespaStore = defineStore('vespaStore', {
isEditing: false,
map: null,
viewMode: 'map',
+ isExporting: false,
filters: {
municipalities: [],
provinces: [],
@@ -307,21 +308,55 @@ export const useVespaStore = defineStore('vespaStore', {
}
},
async exportData(format) {
- const filterQuery = this.createFilterQuery();
- const url = `/observations/export?export_format=${format}&${filterQuery}`;
-
try {
- const response = await ApiService.get(url, { responseType: 'blob' });
- const blob = new Blob([response.data], { type: response.headers['content-type'] });
- const downloadUrl = window.URL.createObjectURL(blob);
- const link = document.createElement('a');
- link.href = downloadUrl;
- link.setAttribute('download', `export.${format}`);
- document.body.appendChild(link);
- link.click();
- link.remove();
+ this.isExporting = true; // Start loading indicator
+ const response = await ApiService.get(
+ `/observations/export?${this.createFilterQuery()}`
+ );
+
+ if (response.status === 200) {
+ const { export_id } = response.data;
+
+ const checkStatus = async () => {
+ const statusResponse = await ApiService.get(
+ `/observations/export_status?export_id=${export_id}`
+ );
+
+ if (statusResponse.data.status === 'completed') {
+ const downloadResponse = await ApiService.get(
+ `/observations/download_export/?export_id=${export_id}`,
+ { responseType: 'blob' }
+ );
+
+ const blob = new Blob([downloadResponse.data], { type: 'text/csv' });
+ const url = window.URL.createObjectURL(blob);
+ const link = document.createElement('a');
+ link.href = url;
+ link.setAttribute('download', `observations_export_${new Date().getTime()}.csv`);
+ document.body.appendChild(link);
+ link.click();
+ document.body.removeChild(link);
+ window.URL.revokeObjectURL(url);
+ this.isExporting = false; // Stop loading indicator
+ return true;
+ } else if (statusResponse.data.status === 'failed') {
+ this.isExporting = false; // Stop loading indicator on error
+ throw new Error(statusResponse.data.error || 'Export failed');
+ }
+
+ return new Promise(resolve => {
+ setTimeout(async () => {
+ resolve(await checkStatus());
+ }, 2000);
+ });
+ };
+
+ await checkStatus();
+ }
} catch (error) {
+ this.isExporting = false; // Stop loading indicator on error
console.error('Error exporting data:', error);
+ throw error;
}
},
async fetchMunicipalitiesByProvinces(provinceIds) {
diff --git a/vespadb/observations/migrations/0033_export.py b/vespadb/observations/migrations/0033_export.py
new file mode 100644
index 0000000..b3b0306
--- /dev/null
+++ b/vespadb/observations/migrations/0033_export.py
@@ -0,0 +1,31 @@
+# Generated by Django 5.1.4 on 2024-12-18 16:03
+
+import django.db.models.deletion
+from django.conf import settings
+from django.db import migrations, models
+
+
+class Migration(migrations.Migration):
+
+ dependencies = [
+ ('observations', '0032_rename_wn_notes_observation_notes'),
+ migrations.swappable_dependency(settings.AUTH_USER_MODEL),
+ ]
+
+ operations = [
+ migrations.CreateModel(
+ name='Export',
+ fields=[
+ ('id', models.AutoField(primary_key=True, serialize=False)),
+ ('filters', models.JSONField(default=dict, help_text='Filters applied to the export')),
+ ('status', models.CharField(choices=[('pending', 'Pending'), ('processing', 'Processing'), ('completed', 'Completed'), ('failed', 'Failed')], default='pending', help_text='Status of the export', max_length=20)),
+ ('progress', models.IntegerField(default=0, help_text='Progress percentage of the export')),
+ ('file_path', models.CharField(blank=True, help_text='Path to the exported file', max_length=255, null=True)),
+ ('created_at', models.DateTimeField(auto_now_add=True, help_text='Datetime when the export was created')),
+ ('completed_at', models.DateTimeField(blank=True, help_text='Datetime when the export was completed', null=True)),
+ ('error_message', models.TextField(blank=True, help_text='Error message if the export failed', null=True)),
+ ('task_id', models.CharField(blank=True, help_text='Celery task ID for the export', max_length=255, null=True)),
+ ('user', models.ForeignKey(blank=True, help_text='User who requested the export', null=True, on_delete=django.db.models.deletion.SET_NULL, to=settings.AUTH_USER_MODEL)),
+ ],
+ ),
+ ]
diff --git a/vespadb/observations/models.py b/vespadb/observations/models.py
index a6c9684..55ab414 100644
--- a/vespadb/observations/models.py
+++ b/vespadb/observations/models.py
@@ -389,3 +389,29 @@ def save(self, *args: Any, **kwargs: Any) -> None:
self.province = municipality.province if municipality else None
super().save(*args, **kwargs)
+
+class Export(models.Model):
+ """Model for tracking observation exports."""
+ STATUS_CHOICES = (
+ ('pending', 'Pending'),
+ ('processing', 'Processing'),
+ ('completed', 'Completed'),
+ ('failed', 'Failed'),
+ )
+
+ id = models.AutoField(primary_key=True)
+ user = models.ForeignKey(
+ settings.AUTH_USER_MODEL,
+ on_delete=models.SET_NULL,
+ null=True,
+ blank=True,
+ help_text="User who requested the export",
+ )
+ filters = models.JSONField(default=dict, help_text="Filters applied to the export")
+ status = models.CharField(max_length=20, choices=STATUS_CHOICES, default='pending', help_text="Status of the export")
+ progress = models.IntegerField(default=0, help_text="Progress percentage of the export")
+ file_path = models.CharField(max_length=255, blank=True, null=True, help_text="Path to the exported file")
+ created_at = models.DateTimeField(auto_now_add=True, help_text="Datetime when the export was created")
+ completed_at = models.DateTimeField(blank=True, null=True, help_text="Datetime when the export was completed")
+ error_message = models.TextField(blank=True, null=True, help_text="Error message if the export failed")
+ task_id = models.CharField(max_length=255, blank=True, null=True, help_text="Celery task ID for the export")
diff --git a/vespadb/observations/serializers.py b/vespadb/observations/serializers.py
index 5f0cedb..71332aa 100644
--- a/vespadb/observations/serializers.py
+++ b/vespadb/observations/serializers.py
@@ -13,7 +13,7 @@
from rest_framework.request import Request
from vespadb.observations.helpers import parse_and_convert_to_cet, parse_and_convert_to_utc
-from vespadb.observations.models import EradicationResultEnum, Municipality, Observation, Province
+from vespadb.observations.models import EradicationResultEnum, Municipality, Observation, Province, Export
from vespadb.observations.utils import get_municipality_from_coordinates
from vespadb.users.models import VespaUser
@@ -484,3 +484,9 @@ class Meta:
model = Province
fields = ["id", "name"]
+
+
+class ExportSerializer(serializers.ModelSerializer):
+ class Meta:
+ model = Export
+ fields = '__all__'
diff --git a/vespadb/observations/tasks/generate_export.py b/vespadb/observations/tasks/generate_export.py
new file mode 100644
index 0000000..21aef1f
--- /dev/null
+++ b/vespadb/observations/tasks/generate_export.py
@@ -0,0 +1,230 @@
+import csv
+import logging
+from datetime import datetime, timedelta
+from typing import Optional, Dict, Any, List, Set, Iterator
+from django.core.cache import cache
+from django.db import models, transaction
+from django.utils import timezone
+from celery import shared_task
+from vespadb.observations.models import Observation, Export
+from vespadb.users.models import VespaUser as User
+from vespadb.observations.serializers import user_read_fields, public_read_fields
+
+logger = logging.getLogger(__name__)
+
+CSV_HEADERS = [
+ "id", "created_datetime", "modified_datetime", "latitude", "longitude",
+ "source", "source_id", "nest_height", "nest_size", "nest_location",
+ "nest_type", "observation_datetime", "province", "eradication_date",
+ "municipality", "images", "anb_domain", "notes", "eradication_result",
+ "wn_id", "wn_validation_status", "nest_status"
+]
+
+class Echo:
+ """An object that implements just the write method of the file-like interface."""
+ def write(self, value):
+ """Write the value by returning it, instead of storing in a buffer."""
+ return value
+
+def get_status(observation: Observation) -> str:
+ """Get observation status string."""
+ if observation.eradication_result:
+ return "eradicated"
+ if observation.reserved_by:
+ return "reserved"
+ return "untreated"
+
+def _prepare_row_data(
+ observation: Observation,
+ is_admin: bool,
+ user_municipality_ids: Set[str]
+) -> List[str]:
+ """
+ Prepare a single row of data for the CSV export with error handling.
+ """
+ try:
+ # Determine allowed fields based on permissions
+ if is_admin or (observation.municipality_id in user_municipality_ids):
+ allowed_fields = user_read_fields
+ else:
+ allowed_fields = public_read_fields
+
+ allowed_fields.extend(["source_id", "latitude", "longitude", "anb_domain", "nest_status"])
+
+ row_data = []
+ for field in CSV_HEADERS:
+ try:
+ if field not in allowed_fields:
+ row_data.append("")
+ continue
+
+ if field == "latitude":
+ row_data.append(str(observation.location.y) if observation.location else "")
+ elif field == "longitude":
+ row_data.append(str(observation.location.x) if observation.location else "")
+ elif field in ["created_datetime", "modified_datetime", "observation_datetime"]:
+ datetime_val = getattr(observation, field, None)
+ if datetime_val:
+ datetime_val = datetime_val.replace(microsecond=0)
+ row_data.append(datetime_val.isoformat() + "Z")
+ else:
+ row_data.append("")
+ elif field == "province":
+ row_data.append(observation.province.name if observation.province else "")
+ elif field == "municipality":
+ row_data.append(observation.municipality.name if observation.municipality else "")
+ elif field == "anb_domain":
+ row_data.append(str(observation.anb))
+ elif field == "nest_status":
+ row_data.append(get_status(observation))
+ elif field == "source_id":
+ row_data.append(str(observation.source_id) if observation.source_id is not None else "")
+ else:
+ value = getattr(observation, field, "")
+ row_data.append(str(value) if value is not None else "")
+ except Exception as e:
+ logger.warning(f"Error processing field {field} for observation {observation.id}: {str(e)}")
+ row_data.append("")
+
+ return row_data
+ except Exception as e:
+ logger.error(f"Error preparing row data for observation {observation.id}: {str(e)}")
+ return [""] * len(CSV_HEADERS)
+
+def parse_boolean(value: str) -> bool:
+ """
+ Convert a string value to a boolean.
+ """
+ if isinstance(value, bool):
+ return value
+ if isinstance(value, str):
+ value_lower = value.lower()
+ if value_lower in {"true", "1"}:
+ return True
+ elif value_lower in {"false", "0"}:
+ return False
+ raise ValueError(f"Invalid boolean value: {value}")
+
+def generate_rows(queryset, is_admin: bool, user_municipality_ids: set) -> Iterator[List[str]]:
+ """Generate rows for CSV streaming."""
+ # First yield the headers
+ yield CSV_HEADERS
+
+ # Then yield the data rows
+ for observation in queryset:
+ try:
+ row = _prepare_row_data(observation, is_admin, user_municipality_ids)
+ yield row
+ except Exception as e:
+ logger.error(f"Error processing observation {observation.id}: {str(e)}")
+ continue
+
+@shared_task(
+ name="generate_export",
+ max_retries=3,
+ default_retry_delay=60,
+ soft_time_limit=1700,
+ time_limit=1800,
+ acks_late=True
+)
+def generate_export(export_id: int, filters: Dict[str, Any], user_id: Optional[int] = None) -> Dict[str, Any]:
+ """
+ Generate CSV export of observations based on filters.
+
+ Args:
+ export_id: ID of the Export record
+ filters: Dictionary of filters to apply to the queryset
+ user_id: Optional ID of the user requesting the export
+
+ Returns:
+ Dictionary containing export status and details
+ """
+ logger.info(f"Starting export {export_id} for user {user_id}")
+ export = Export.objects.get(id=export_id)
+
+ try:
+ # Update export status
+ export.status = 'processing'
+ export.save()
+ logger.info(f"Export {export_id} status set to processing")
+
+ # Validate and preprocess filters
+ valid_fields = {field.name: field for field in Observation._meta.get_fields()}
+ processed_filters = {}
+ for key, value in filters.items():
+ if key in valid_fields:
+ field = valid_fields[key]
+ if isinstance(field, models.BooleanField):
+ try:
+ processed_filters[key] = parse_boolean(value)
+ except ValueError:
+ logger.error(f"Invalid boolean value for filter {key}: {value}")
+ continue
+ else:
+ processed_filters[key] = value
+
+ # Prepare queryset with optimizations
+ queryset = (Observation.objects
+ .filter(**processed_filters)
+ .select_related('province', 'municipality', 'reserved_by')
+ .order_by('id'))
+
+ total = queryset.count()
+ processed = 0
+
+ is_admin = False
+ user_municipality_ids = set()
+ if user_id:
+ try:
+ user = User.objects.get(id=user_id)
+ is_admin = user.is_superuser
+ user_municipality_ids = set(user.municipalities.values_list('id', flat=True))
+ except User.DoesNotExist:
+ pass
+
+ logger.info(f"Processing {total} observations for export {export_id}")
+
+ # Generate CSV data
+ rows = list(generate_rows(queryset, is_admin, user_municipality_ids))
+
+ # Store in cache
+ cache_key = f'export_{export_id}_data'
+ cache.set(cache_key, rows, timeout=3600) # Store for 1 hour
+
+ # Update export record
+ with transaction.atomic():
+ export.status = 'completed'
+ export.completed_at = timezone.now()
+ export.progress = 100
+ export.save()
+
+ logger.info(f"Export {export_id} completed successfully")
+
+ return {
+ 'status': 'completed',
+ 'cache_key': cache_key,
+ 'total_processed': total
+ }
+
+ except Exception as e:
+ logger.exception(f"Export {export_id} failed: {str(e)}")
+ export.status = 'failed'
+ export.error_message = str(e)
+ export.save()
+ raise
+
+@shared_task
+def cleanup_old_exports() -> None:
+ """Clean up exports older than 24 hours."""
+ logger.info("Starting cleanup of old exports")
+ cutoff = timezone.now() - timedelta(days=1)
+ old_exports = Export.objects.filter(created_at__lt=cutoff)
+
+ for export in old_exports:
+ # Remove from cache if exists
+ cache_key = f'export_{export.id}_data'
+ cache.delete(cache_key)
+
+ # Delete the export record
+ export.delete()
+ logger.info(f"Cleaned up export {export.id}")
diff --git a/vespadb/observations/views.py b/vespadb/observations/views.py
index 9e5f00a..28b6b9b 100644
--- a/vespadb/observations/views.py
+++ b/vespadb/observations/views.py
@@ -1,50 +1,36 @@
"""Views for the observations app."""
+
+import csv
import datetime
import io
import json
+import time
import logging
-import json
import csv
-from typing import TYPE_CHECKING, Any, Any, Union, TextIO, Union, List, Set, Optional
-import datetime
-import tempfile
+import json
+from typing import TYPE_CHECKING, Any, Union, Optional
+from django.conf import settings
+from django.http import FileResponse, HttpResponseNotFound
import os
-import logging
-from tenacity import retry, stop_after_attempt, wait_exponential
-from tenacity import (
- retry,
- stop_after_attempt,
- wait_exponential,
- retry_if_exception_type,
- before_log,
- after_log,
-)
-import time
-from typing import Generator, Optional
-from django.db import OperationalError, connection, transaction
-from django.core.exceptions import ValidationError
-import psycopg2
-from django.http import FileResponse
-import os
-import tempfile
+from django.conf import settings
from django.contrib.gis.db.models.functions import Transform
from django.contrib.gis.geos import GEOSGeometry
from django.core.cache import cache
from django.core.exceptions import PermissionDenied, ValidationError
from django.core.files.uploadedfile import InMemoryUploadedFile
-from django.core.paginator import Paginator
from django.db import transaction
from django.db.models import CharField, OuterRef, QuerySet, Subquery, Value
from django.db.models.functions import Coalesce
from django.db.utils import IntegrityError
-from django.http import HttpResponse, JsonResponse, StreamingHttpResponse, HttpRequest
+from django.http import HttpResponse, JsonResponse, HttpRequest
from django.db import connection
from django.utils.decorators import method_decorator
from django.utils.timezone import now
from django.views.decorators.http import require_GET
from django_filters.rest_framework import DjangoFilterBackend
from django_ratelimit.decorators import ratelimit
+from django.http import StreamingHttpResponse, HttpResponseBadRequest, HttpResponseNotFound, HttpResponseServerError
from drf_yasg import openapi
from drf_yasg.utils import swagger_auto_schema
from geopy.exc import GeocoderServiceError, GeocoderTimedOut
@@ -59,17 +45,20 @@
from rest_framework.serializers import BaseSerializer
from rest_framework.viewsets import ModelViewSet, ReadOnlyModelViewSet
from rest_framework_gis.filters import DistanceToPointFilter
-from vespadb.observations.serializers import user_read_fields, public_read_fields
from vespadb.observations.cache import invalidate_geojson_cache, invalidate_observation_cache
from vespadb.observations.filters import ObservationFilter
from vespadb.observations.helpers import parse_and_convert_to_utc
-from vespadb.observations.models import Municipality, Observation, Province, EradicationResultEnum
-from vespadb.observations.serializers import (
- MunicipalitySerializer,
- ObservationSerializer,
- ProvinceSerializer,
-)
+from vespadb.observations.models import Municipality, Observation, Province, Export
+from vespadb.observations.models import Export
+from vespadb.observations.tasks.generate_export import generate_export
+from vespadb.observations.serializers import ObservationSerializer, MunicipalitySerializer, ProvinceSerializer
+
+from django.utils.decorators import method_decorator
+from django_ratelimit.decorators import ratelimit
+from rest_framework.decorators import action
+from rest_framework.permissions import AllowAny
+from django.shortcuts import get_object_or_404
if TYPE_CHECKING:
from geopy.location import Location
@@ -77,24 +66,23 @@
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
+class Echo:
+ """An object that implements just the write method of the file-like interface."""
+ def write(self, value):
+ """Write the value by returning it, instead of storing in a buffer."""
+ return value
+
+
BBOX_LENGTH = 4
GEOJSON_REDIS_CACHE_EXPIRATION = 900 # 15 minutes
GET_REDIS_CACHE_EXPIRATION = 86400 # 1 day
+BATCH_SIZE = 150
CSV_HEADERS = [
"id", "created_datetime", "modified_datetime", "latitude", "longitude", "source", "source_id",
"nest_height", "nest_size", "nest_location", "nest_type", "observation_datetime",
"province", "eradication_date", "municipality", "images", "anb_domain",
"notes", "eradication_result", "wn_id", "wn_validation_status", "nest_status"
]
-BATCH_SIZE = 1000
-class ExportError(Exception):
- """Custom exception for export-related errors."""
- pass
-
-class QueryTimeoutError(Exception):
- """Custom exception for query timeout errors."""
- pass
-
class ObservationsViewSet(ModelViewSet): # noqa: PLR0904
"""ViewSet for the Observation model."""
@@ -653,362 +641,130 @@ def save_observations(self, valid_data: list[dict[str, Any]]) -> Response:
{"error": f"An error occurred during bulk import: {e!s}"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
- @retry(
- stop=stop_after_attempt(3),
- wait=wait_exponential(multiplier=1, min=4, max=10),
- retry_error_callback=lambda retry_state: None
- )
- def write_batch_to_file(
- self,
- writer: Any,
- batch: List[Observation],
- is_admin: bool,
- user_municipality_ids: Set[str]
- ) -> int:
- """
- Write a batch of observations to the CSV file with retry logic.
- Returns number of successfully written records.
- """
- successful_writes = 0
- for observation in batch:
- try:
- row_data = self._prepare_row_data(observation, is_admin, user_municipality_ids)
- writer.writerow(row_data)
- successful_writes += 1
- except Exception as e:
- logger.error(f"Error processing observation {observation.id}: {str(e)}")
- continue
- return successful_writes
-
- def _prepare_row_data(
- self,
- observation: Observation,
- is_admin: bool,
- user_municipality_ids: set[str]
- ) -> list[str]:
- """
- Prepare a single row of data for the CSV export with error handling.
- """
- try:
- # Determine allowed fields based on permissions
- if is_admin or (observation.municipality_id in user_municipality_ids):
- allowed_fields = user_read_fields
- else:
- allowed_fields = public_read_fields
-
- allowed_fields.extend(["source_id", "latitude", "longitude", "anb_domain", "nest_status"])
-
- row_data = []
- for field in CSV_HEADERS:
- try:
- if field not in allowed_fields:
- row_data.append("")
- continue
-
- if field == "latitude":
- row_data.append(str(observation.location.y) if observation.location else "")
- elif field == "longitude":
- row_data.append(str(observation.location.x) if observation.location else "")
- elif field in ["created_datetime", "modified_datetime", "observation_datetime"]:
- datetime_val = getattr(observation, field, None)
- if datetime_val:
- datetime_val = datetime_val.replace(microsecond=0)
- row_data.append(datetime_val.isoformat() + "Z")
- else:
- row_data.append("")
- elif field == "province":
- row_data.append(observation.province.name if observation.province else "")
- elif field == "municipality":
- row_data.append(observation.municipality.name if observation.municipality else "")
- elif field == "anb_domain":
- row_data.append(str(observation.anb))
- elif field == "nest_status":
- row_data.append(self.get_status(observation))
- elif field == "source_id":
- row_data.append(str(observation.source_id) if observation.source_id is not None else "")
- else:
- value = getattr(observation, field, "")
- row_data.append(str(value) if value is not None else "")
- except Exception as e:
- logger.warning(f"Error processing field {field} for observation {observation.id}: {str(e)}")
- row_data.append("")
-
- return row_data
- except Exception as e:
- logger.error(f"Error preparing row data for observation {observation.id}: {str(e)}")
- return [""] * len(CSV_HEADERS) # Return empty row in case of error
-
- @retry(
- stop=stop_after_attempt(3),
- wait=wait_exponential(multiplier=1, min=4, max=10),
- retry=retry_if_exception_type((OperationalError, psycopg2.OperationalError)),
- before=before_log(logger, logging.INFO),
- after=before_log(logger, logging.INFO)
- )
- def get_queryset_count(self, queryset: QuerySet) -> int:
- """Get queryset count with retry logic."""
- try:
- with transaction.atomic(), connection.cursor() as cursor:
- cursor.execute('SET statement_timeout TO 30000') # 30 seconds timeout
- return int(queryset.count())
- except (OperationalError, psycopg2.OperationalError) as e:
- logger.error(f"Error getting queryset count: {str(e)}")
- raise QueryTimeoutError("Query timed out while getting count") from e
-
- def get_chunk_with_retries(
- self,
- queryset: QuerySet,
- start: int,
- batch_size: int,
- max_retries: int = 3
- ) -> Optional[List[Observation]]:
- """Get a chunk of data with retries and error handling."""
- for attempt in range(max_retries):
- try:
- with transaction.atomic(), connection.cursor() as cursor:
- cursor.execute('SET statement_timeout TO 30000')
- chunk = list(
- queryset.select_related(
- 'province',
- 'municipality',
- 'reserved_by'
- )[start:start + batch_size]
- )
- return chunk
- except (OperationalError, psycopg2.OperationalError) as e:
- if attempt == max_retries - 1:
- logger.error(f"Failed to get chunk after {max_retries} attempts: {str(e)}")
- return None
- wait_time = (2 ** attempt) * 1 # Exponential backoff
- logger.warning(f"Retry {attempt + 1}/{max_retries} after {wait_time}s")
- time.sleep(wait_time)
- return None
-
- def _generate_csv_content(
- self,
- queryset: QuerySet,
- is_admin: bool,
- user_municipality_ids: Set[str]
- ) -> Generator[str, None, None]:
- """Generate CSV content in smaller chunks."""
- buffer = io.StringIO()
- writer = csv.writer(buffer)
-
- # Write headers first
- writer.writerow(CSV_HEADERS)
- data = buffer.getvalue()
- buffer.seek(0)
- buffer.truncate()
- yield data
-
- # Process in smaller chunks
- chunk_size = 100 # Kleinere chunk size
- total = queryset.count()
-
- for start in range(0, total, chunk_size):
- chunk = queryset.select_related(
- 'province',
- 'municipality',
- 'reserved_by'
- )[start:start + chunk_size]
-
- for observation in chunk:
- try:
- row_data = self._prepare_row_data(
- observation,
- is_admin,
- user_municipality_ids
- )
- writer.writerow(row_data)
- data = buffer.getvalue()
- buffer.seek(0)
- buffer.truncate()
- yield data
- except Exception as e:
- logger.error(f"Error processing observation {observation.id}: {str(e)}")
- continue
-
- buffer.close()
-
- def create_csv_generator(
- self,
- queryset: QuerySet,
- is_admin: bool,
- user_municipality_ids: Set[str],
- batch_size: int = BATCH_SIZE
- ) -> Generator[str, None, None]:
- """Create a generator for CSV streaming with improved error handling."""
- buffer = io.StringIO()
- writer = csv.writer(buffer)
-
- # Write headers
- writer.writerow(CSV_HEADERS)
- yield buffer.getvalue()
- buffer.seek(0)
- buffer.truncate(0)
-
- total_processed = 0
- successful_writes = 0
- error_count = 0
-
- try:
- total_count = self.get_queryset_count(queryset)
-
- # Process in chunks
- start = 0
- while True:
- chunk = self.get_chunk_with_retries(queryset, start, batch_size)
- if not chunk:
- break
-
- for observation in chunk:
- try:
- row_data = self._prepare_row_data(
- observation,
- is_admin,
- user_municipality_ids
- )
- writer.writerow(row_data)
- successful_writes += 1
- except Exception as e:
- error_count += 1
- logger.error(f"Error processing observation {observation.id}: {str(e)}")
- if error_count > total_count * 0.1: # If more than 10% errors
- raise ExportError("Too many errors during export")
- continue
-
- data = buffer.getvalue()
- yield data
- buffer.seek(0)
- buffer.truncate(0)
-
- total_processed += len(chunk)
- progress = (total_processed / total_count) * 100 if total_count else 0
- logger.info(
- f"Export progress: {progress:.1f}% ({total_processed}/{total_count}). "
- f"Successful: {successful_writes}, Errors: {error_count}"
- )
-
- start += batch_size
-
- except Exception as e:
- logger.exception("Error in CSV generator")
- raise ExportError(f"Export failed: {str(e)}") from e
- finally:
- buffer.close()
-
@method_decorator(ratelimit(key="ip", rate="60/m", method="GET", block=True))
@action(detail=False, methods=["get"], permission_classes=[AllowAny])
- def export(self, request: HttpRequest) -> Union[FileResponse, JsonResponse]:
- """Export observations as CSV using temporary file approach."""
- temp_file = None
- temp_file_path = None
-
- try:
- # Create temporary file
- temp_file = tempfile.NamedTemporaryFile(mode='w+', delete=False, encoding='utf-8-sig')
- temp_file_path = temp_file.name
-
- writer = csv.writer(temp_file)
- writer.writerow(CSV_HEADERS)
+ def export(self, request: HttpRequest) -> JsonResponse:
+ """Initiate the export of observations and trigger a Celery task."""
+ # Initialize the filterset
+ filterset = self.filterset_class(data=request.GET, queryset=self.get_queryset())
+
+ # Validate the filterset
+ if not filterset.is_valid():
+ return JsonResponse({"error": filterset.errors}, status=400)
+
+ # Prepare the filter parameters
+ filters = {key: value for key, value in request.GET.items()}
+
+ # Create an Export record
+ export = Export.objects.create(
+ user=request.user if request.user.is_authenticated else None,
+ filters=filters,
+ status='pending',
+ )
- # Get user permissions
- if request.user.is_authenticated:
- user_municipality_ids = set(request.user.municipalities.values_list("id", flat=True))
- is_admin = request.user.is_superuser
- else:
- user_municipality_ids = set()
- is_admin = False
+ # Trigger the Celery task
+ task = generate_export.delay(
+ export.id,
+ filters,
+ user_id=request.user.id if request.user.is_authenticated else None
+ )
+
+ # Update the Export record with the task ID
+ export.task_id = task.id
+ export.save()
+
+ return JsonResponse({
+ 'export_id': export.id,
+ 'task_id': task.id,
+ })
- # Get filtered queryset with optimizations
- queryset = self.filter_queryset(
- self.get_queryset().select_related('province', 'municipality', 'reserved_by')
+ @swagger_auto_schema(
+ operation_description="Check the status of an export.",
+ manual_parameters=[
+ openapi.Parameter(
+ 'export_id',
+ openapi.IN_QUERY,
+ description="The ID of the export to check the status of.",
+ type=openapi.TYPE_INTEGER,
+ required=True,
)
+ ],
+ responses={
+ 200: openapi.Schema(
+ type=openapi.TYPE_OBJECT,
+ properties={
+ 'status': openapi.Schema(type=openapi.TYPE_STRING),
+ 'progress': openapi.Schema(type=openapi.TYPE_INTEGER),
+ 'error': openapi.Schema(type=openapi.TYPE_STRING, nullable=True),
+ 'download_url': openapi.Schema(type=openapi.TYPE_STRING, nullable=True),
+ },
+ ),
+ 400: "Bad Request",
+ 404: "Export not found",
+ },
+ )
+ @action(detail=False, methods=["get"])
+ def export_status(self, request: HttpRequest) -> JsonResponse:
+ """Check export status."""
+ export_id = request.GET.get('export_id')
+ if not export_id:
+ logger.error("Export ID not provided")
+ return JsonResponse({"error": "Export ID is required"}, status=400)
+
+ try:
+ export = get_object_or_404(Export, id=export_id)
+ except Exception as e:
+ logger.exception(f"Export ID {export_id} not found or invalid: {str(e)}")
+ return JsonResponse({"error": f"Export ID {export_id} not found"}, status=404)
+
+ if export.status == 'completed':
+ download_url = request.build_absolute_uri(f'/observations/download_export/?export_id={export_id}')
+ return JsonResponse({
+ 'status': 'completed',
+ 'download_url': download_url
+ })
+
+ return JsonResponse({
+ 'status': export.status,
+ 'progress': export.progress,
+ 'error': export.error_message
+ })
- # Use much smaller chunk size
- chunk_size = 100
- total_count = queryset.count()
- processed = 0
+ @action(detail=False, methods=["get"])
+ def download_export(self, request: HttpRequest) -> Union[StreamingHttpResponse, HttpResponse]:
+ """Stream the export directly to the user."""
+ export_id = request.GET.get('export_id')
+ if not export_id:
+ return HttpResponseBadRequest("Export ID is required")
- # Process in chunks with periodic flushes
- for start in range(0, total_count, chunk_size):
- chunk = queryset[start:start + chunk_size]
-
- for observation in chunk:
- try:
- row_data = self._prepare_row_data(
- observation,
- is_admin,
- user_municipality_ids
- )
- writer.writerow(row_data)
- except Exception as e:
- logger.error(f"Error processing observation {observation.id}: {str(e)}")
- continue
-
- # Flush after each chunk
- temp_file.flush()
- os.fsync(temp_file.fileno())
-
- processed += len(chunk)
- logger.info(f"Export progress: {(processed/total_count)*100:.1f}%")
-
- # Make sure all data is written and file is closed
- temp_file.flush()
- os.fsync(temp_file.fileno())
- temp_file.close()
-
- # Open the file for reading and create response
- response = FileResponse(
- open(temp_file_path, 'rb'),
+ try:
+ export = Export.objects.get(id=export_id)
+ if export.status != 'completed':
+ return HttpResponseBadRequest("Export is not ready")
+
+ # Get the data iterator from cache
+ cache_key = f'export_{export_id}_data'
+ rows = cache.get(cache_key)
+ if not rows:
+ return HttpResponseNotFound("Export data not found or expired")
+
+ # Create the streaming response
+ pseudo_buffer = Echo()
+ writer = csv.writer(pseudo_buffer)
+ response = StreamingHttpResponse(
+ (writer.writerow(row) for row in rows),
content_type='text/csv'
)
- # Set explicit headers
- filename = f"observations_export_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
- response['Content-Disposition'] = f'attachment; filename="{filename}"; filename*=UTF-8\'\'{filename}'
- response['Content-Type'] = 'text/csv; charset=utf-8'
- response['Content-Length'] = os.path.getsize(temp_file_path)
- response['Cache-Control'] = 'no-cache, no-store, must-revalidate'
- response['Pragma'] = 'no-cache'
- response['Expires'] = '0'
- response['X-Accel-Buffering'] = 'no'
-
- # Schedule file cleanup after response is sent
- def cleanup_temp_file(response):
- try:
- os.unlink(temp_file_path)
- except:
- pass
- return response
-
- response.close = cleanup_temp_file.__get__(response, FileResponse)
-
+ timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
+ response['Content-Disposition'] = f'attachment; filename="observations_export_{timestamp}.csv"'
return response
+ except Export.DoesNotExist:
+ return HttpResponseNotFound("Export not found")
except Exception as e:
- logger.exception("Export failed")
- # Cleanup in case of error
- if temp_file:
- temp_file.close()
- if temp_file_path and os.path.exists(temp_file_path):
- try:
- os.unlink(temp_file_path)
- except:
- pass
- return JsonResponse(
- {"error": f"Export failed: {str(e)}. Please try again or contact support."},
- status=500
- )
-
- def get_status(self, observation: Observation) -> str:
- """Determine observation status based on eradication data."""
- logger.debug("Getting status for observation %s", observation.eradication_result)
- if observation.eradication_result:
- return "eradicated"
- if observation.reserved_by:
- return "reserved"
- return "untreated"
+ logger.error(f"Error streaming export: {str(e)}")
+ return HttpResponseServerError("Error generating export")
@require_GET
def search_address(request: Request) -> JsonResponse:
diff --git a/vespadb/settings.py b/vespadb/settings.py
index 6519d11..7682211 100644
--- a/vespadb/settings.py
+++ b/vespadb/settings.py
@@ -57,6 +57,7 @@
"django.contrib.messages",
"django.contrib.staticfiles",
"django_filters",
+ 'django_extensions',
"django_celery_beat",
"django_celery_results",
"rest_framework",
@@ -269,3 +270,6 @@
AWS_SES_REGION_ENDPOINT = "email.eu-west-1.amazonaws.com"
DEFAULT_FROM_EMAIL = secrets["DEFAULT_FROM_EMAIL"]
SERVER_EMAIL = secrets["DEFAULT_FROM_EMAIL"]
+MEDIA_URL = '/media/'
+MEDIA_ROOT = os.path.join(BASE_DIR, 'media')
+EXPORTS_DIR = os.path.join(MEDIA_ROOT, 'exports')