diff --git a/vespadb/observations/helpers.py b/vespadb/observations/helpers.py index 04c866e..a7164f5 100644 --- a/vespadb/observations/helpers.py +++ b/vespadb/observations/helpers.py @@ -62,16 +62,14 @@ def parse_and_convert_to_cet(datetime_str: str) -> datetime: return parser.parse(datetime_str).astimezone(cet_tz) -def retry_with_backoff(func: Callable[..., T], *args: Any, max_retries: int = 3, backoff_in_seconds: int = 2) -> Any: +def retry_with_backoff(func: Callable[..., T], retries: int=3, backoff_in_seconds: int=2) -> Any: """Retry mechanism for retrying a function with a backoff strategy.""" - attempt = 0 - while attempt < max_retries: + for attempt in range(retries): try: - return func(*args) # Call the function with the provided arguments - except OperationalError: - attempt += 1 - if attempt < max_retries: - time.sleep(backoff_in_seconds) # Wait before retrying + return func() + except Exception as e: + if attempt < retries - 1: + wait_time = backoff_in_seconds * (2 ** attempt) + time.sleep(wait_time) else: raise - return None diff --git a/vespadb/observations/views.py b/vespadb/observations/views.py index 854588c..fcbdc29 100644 --- a/vespadb/observations/views.py +++ b/vespadb/observations/views.py @@ -633,38 +633,41 @@ def save_observations(self, valid_data: list[dict[str, Any]]) -> Response: @method_decorator(ratelimit(key="ip", rate="60/m", method="GET", block=True)) @action(detail=False, methods=["get"], permission_classes=[AllowAny]) def export(self, request: Request) -> Response: - export_format = request.query_params.get("export_format", "csv").lower() - queryset = self.filter_queryset(self.get_queryset()) - paginator = Paginator(queryset, 1000) # Process in batches of 1000 items + try: + export_format = request.query_params.get("export_format", "csv").lower() + queryset = self.filter_queryset(self.get_queryset()) + paginator = Paginator(queryset, 1000) # Process in batches of 1000 items - serialized_data = [] - errors = [] + serialized_data = [] + errors = [] - # Loop through each page in the paginator - for page_number in paginator.page_range: - page = paginator.page(page_number) + # Loop through each page in the paginator + for page_number in paginator.page_range: + page = paginator.page(page_number) - try: - # Try serializing the page - serializer = self.get_serializer(page, many=True) - serialized_data.extend(serializer.data) - except Exception as e: - # Retry with backoff for individual objects on failure - for obj in page.object_list: - try: - serialized_obj = retry_with_backoff(lambda: self.get_serializer(obj).data) - serialized_data.append(serialized_obj) - except Exception as inner_error: - errors.append({ - "id": obj.id, - "error": str(inner_error) - }) - - if export_format == "csv": - return self.export_as_csv(serialized_data) + try: + # Try serializing the page + serializer = self.get_serializer(page, many=True) + serialized_data.extend(serializer.data) + except Exception as e: + # Retry with backoff for individual objects on failure + for obj in page.object_list: + try: + serialized_obj = retry_with_backoff(lambda: self.get_serializer(obj).data) + serialized_data.append(serialized_obj) + except Exception as inner_error: + errors.append({ + "id": obj.id, + "error": str(inner_error) + }) + + if export_format == "csv": + return self.export_as_csv(serialized_data) + + return JsonResponse({"data": serialized_data, "errors": errors}, safe=False) + except Exception as e: + return JsonResponse({"errors": errors}, safe=False) - return JsonResponse({"data": serialized_data, "errors": errors}, safe=False) - def export_as_csv(self, data: list[dict[str, Any]]) -> HttpResponse: """Export the data as a CSV file.""" response = HttpResponse(content_type="text/csv")