Skip to content

Commit

Permalink
TDL-26714: Fix ratelimit error and add retry for 5xx errors (#155)
Browse files Browse the repository at this point in the history
* Fix 502 and 429 error

* Fix pylint issue

* Replace batch size with concurrency_limit

* Fix rate limit issue

* Remove 404 log

* Increase buffer wait time

* Skip deleted tickets

* Move forward inf condition

* Add unit tests for deleted tickets

* Address review comments

* Fix pylint
  • Loading branch information
prijendev authored Nov 27, 2024
1 parent 93980f8 commit a40fd4c
Show file tree
Hide file tree
Showing 4 changed files with 345 additions and 43 deletions.
23 changes: 12 additions & 11 deletions tap_zendesk/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@


LOGGER = singer.get_logger()
# Default wait time for backoff
# Default wait time for 429 and 5xx error
DEFAULT_WAIT = 60
# Default wait time for backoff for conflict error
DEFAULT_WAIT_FOR_CONFLICT_ERROR = 10
Expand All @@ -35,7 +35,7 @@ class ZendeskForbiddenError(ZendeskError):
class ZendeskNotFoundError(ZendeskError):
pass

class ZendeskConflictError(ZendeskError):
class ZendeskConflictError(ZendeskBackoffError):
pass

class ZendeskUnprocessableEntityError(ZendeskError):
Expand Down Expand Up @@ -217,18 +217,18 @@ async def raise_for_error_for_async(response):
"""
Error handling method which throws custom error. Class for each error defined above which extends `ZendeskError`.
"""
response_json = {}
try:
response_json = await response.json()
except (ContentTypeError, ValueError) as e:
LOGGER.warning("Error decoding response from API. Exception: %s", e, exc_info=True)
except (ContentTypeError, ValueError):
# Invalid JSON response
response_json = {}

if response.status == 200:
return response_json
elif response.status == 429:
elif response.status == 429 or response.status >= 500:
# Get the 'Retry-After' header value, defaulting to 60 seconds if not present.
retry_after = response.headers.get("Retry-After", 1)
LOGGER.warning("Caught HTTP 429, retrying request in %s seconds", retry_after)
retry_after = int(response.headers.get("Retry-After", "0")) or DEFAULT_WAIT
LOGGER.warning("Caught HTTP %s, retrying request in %s seconds", response.status, retry_after)
# Wait for the specified time before retrying the request.
await async_sleep(int(retry_after))
elif response.status == 409:
Expand All @@ -254,16 +254,17 @@ async def raise_for_error_for_async(response):
),
),
)

DEFAULT_ERROR_OBJECT = ZendeskError if response.status < 500 else ZendeskBackoffError
exc = ERROR_CODE_EXCEPTION_MAPPING.get(response.status, {}).get(
"raise_exception", ZendeskError
"raise_exception", DEFAULT_ERROR_OBJECT
)
LOGGER.error(message)
raise exc(message, response) from None


@backoff.on_exception(
backoff.constant,
(ZendeskRateLimitError, ZendeskConflictError),
ZendeskBackoffError,
max_tries=5,
interval=0
)
Expand Down
36 changes: 25 additions & 11 deletions tap_zendesk/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import json
import datetime
import asyncio
import time
import pytz
from zenpy.lib.exception import APIException
from aiohttp import ClientSession
Expand All @@ -18,7 +19,9 @@

DEFAULT_PAGE_SIZE = 100
REQUEST_TIMEOUT = 300
DEFAULT_BATCH_SIZE = 700
CONCURRENCY_LIMIT = 20
# Reference: https://developer.zendesk.com/api-reference/introduction/rate-limits/#endpoint-rate-limits:~:text=List%20Audits%20for,requests%20per%20minute
AUDITS_REQUEST_PER_MINUTE = 450
START_DATE_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
HEADERS = {
'Content-Type': 'application/json',
Expand Down Expand Up @@ -266,7 +269,6 @@ class Tickets(CursorBasedExportStream):
replication_key = "generated_timestamp"
item_key = "tickets"
endpoint = "https://{}.zendesk.com/api/v2/incremental/tickets/cursor.json"
batch_size = DEFAULT_BATCH_SIZE

def sync(self, state): #pylint: disable=too-many-statements

Expand All @@ -276,9 +278,6 @@ def sync(self, state): #pylint: disable=too-many-statements
# https://developer.zendesk.com/documentation/ticketing/using-the-zendesk-api/side_loading/#supported-endpoints
tickets = self.get_objects(bookmark, side_load='metric_sets')

# Run this method to set batch size to fetch ticket audits and comments records in async way
self.check_access()

audits_stream = TicketAudits(self.client, self.config)
metrics_stream = TicketMetrics(self.client, self.config)
comments_stream = TicketComments(self.client, self.config)
Expand All @@ -295,6 +294,8 @@ def emit_sub_stream_metrics(sub_stream):
LOGGER.info("Syncing ticket_audits per ticket...")

ticket_ids = []
counter = 0
start_time = time.time()
for ticket in tickets:
zendesk_metrics.capture('ticket')

Expand All @@ -306,6 +307,9 @@ def emit_sub_stream_metrics(sub_stream):
# yielding stream name with record in a tuple as it is used for obtaining only the parent records while sync
yield (self.stream, ticket)

# Skip deleted tickets because they don't have audits or comments
if ticket.get('status') == 'deleted':
continue

if metrics_stream.is_selected() and ticket.get('metric_set'):
zendesk_metrics.capture('ticket_metric')
Expand All @@ -314,7 +318,7 @@ def emit_sub_stream_metrics(sub_stream):

# Check if the number of ticket IDs has reached the batch size.
ticket_ids.append(ticket["id"])
if len(ticket_ids) >= self.batch_size:
if len(ticket_ids) >= CONCURRENCY_LIMIT:
# Process audits and comments in batches
records = self.sync_ticket_audits_and_comments(
comments_stream, audits_stream, ticket_ids)
Expand All @@ -327,6 +331,20 @@ def emit_sub_stream_metrics(sub_stream):
ticket_ids = []
# Write state after processing the batch.
singer.write_state(state)
counter += CONCURRENCY_LIMIT

# Check if the number of records processed in a minute has reached the limit.
if counter >= AUDITS_REQUEST_PER_MINUTE:
# Calculate elapsed time
elapsed_time = time.time() - start_time

# Calculate remaining time until the next minute, plus buffer of 2 more seconds
remaining_time = max(0, 60 - elapsed_time + 2)

# Sleep for the calculated time
time.sleep(remaining_time)
start_time = time.time()
counter = 0

# Check if there are any remaining ticket IDs after the loop.
if ticket_ids:
Expand Down Expand Up @@ -357,11 +375,7 @@ def check_access(self):
start_time = datetime.datetime.strptime(self.config['start_date'], START_DATE_FORMAT).timestamp()
HEADERS['Authorization'] = 'Bearer {}'.format(self.config["access_token"])

response = http.call_api(url, self.request_timeout, params={'start_time': start_time, 'per_page': 1}, headers=HEADERS)

# Rate limit are varies according to the zendesk account. So, we need to set the batch size dynamically.
# https://developer.zendesk.com/api-reference/introduction/rate-limits/
self.batch_size = int(response.headers.get('x-rate-limit', DEFAULT_BATCH_SIZE))
http.call_api(url, self.request_timeout, params={'start_time': start_time, 'per_page': 1}, headers=HEADERS)


class TicketAudits(Stream):
Expand Down
106 changes: 106 additions & 0 deletions test/unittests/test_async_ticket_audits.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,112 @@ def test_sync_audits_comments_stream__both_not_selected(
# Assertions
self.assertEqual(len(result), 2)

@patch('tap_zendesk.streams.time.sleep')
@patch("tap_zendesk.streams.Tickets.update_bookmark")
@patch("tap_zendesk.streams.Tickets.get_bookmark")
@patch("tap_zendesk.streams.Tickets.get_objects")
@patch("tap_zendesk.streams.Tickets.check_access")
@patch("tap_zendesk.streams.singer.write_state")
@patch("tap_zendesk.streams.zendesk_metrics.capture")
@patch("tap_zendesk.streams.LOGGER.info")
def test_sync_for_deleted_tickets(
self,
mock_info,
mock_capture,
mock_write_state,
mock_check_access,
mock_get_objects,
mock_get_bookmark,
mock_update_bookmark,
mock_sleep
):
"""
Test that sync does not extract records for audits and comments when both of them are not selected.
"""
# Mock the necessary data
state = {}
bookmark = "2023-01-01T00:00:00Z"
tickets = [
{"id": 1, "generated_timestamp": 1672531200, "fields": "duplicate", "status": "deleted"},
{"id": 2, "generated_timestamp": 1672531300, "fields": "duplicate"},
{"id": 3, "generated_timestamp": 1672531200, "fields": "duplicate", "status": "deleted"},
{"id": 4, "generated_timestamp": 1672531300, "fields": "duplicate"}
]
mock_get_bookmark.return_value = bookmark
mock_get_objects.return_value = tickets
streams.AUDITS_REQUEST_PER_MINUTE = 4
streams.CONCURRENCY_LIMIT = 2

# Create an instance of the Tickets class
instance = streams.Tickets(None, {})
instance.sync_ticket_audits_and_comments = MagicMock(return_value=[
(['audit1', 'audit2'], ['comment1', 'comment2']),
(['audit3'], ['comment3']),
])

# Run the sync method
result = list(instance.sync(state))

# Assertions
self.assertEqual(mock_write_state.call_count, 2)
# 4 tickets, 3 audits, 3 comments
self.assertEqual(len(result), 10)

@patch('tap_zendesk.streams.time.sleep')
@patch("tap_zendesk.streams.Tickets.update_bookmark")
@patch("tap_zendesk.streams.Tickets.get_bookmark")
@patch("tap_zendesk.streams.Tickets.get_objects")
@patch("tap_zendesk.streams.Tickets.check_access")
@patch("tap_zendesk.streams.singer.write_state")
@patch("tap_zendesk.streams.zendesk_metrics.capture")
@patch("tap_zendesk.streams.LOGGER.info")
def test_concurrency_for_audit_stream(
self,
mock_info,
mock_capture,
mock_write_state,
mock_check_access,
mock_get_objects,
mock_get_bookmark,
mock_update_bookmark,
mock_sleep
):
"""
Test that sync does not extract records for audits and comments when both of them are not selected.
"""
# Mock the necessary data
state = {}
bookmark = "2023-01-01T00:00:00Z"
tickets = [
{"id": 1, "generated_timestamp": 1672531200, "fields": "duplicate"},
{"id": 2, "generated_timestamp": 1672531300, "fields": "duplicate"},
{"id": 3, "generated_timestamp": 1672531200, "fields": "duplicate"},
{"id": 4, "generated_timestamp": 1672531300, "fields": "duplicate"},
{"id": 5, "generated_timestamp": 1672531300, "fields": "duplicate"},
{"id": 6, "generated_timestamp": 1672531300, "fields": "duplicate"},
{"id": 7, "generated_timestamp": 1672531300, "fields": "duplicate"},
{"id": 8, "generated_timestamp": 1672531300, "fields": "duplicate"},
{"id": 9, "generated_timestamp": 1672531300, "fields": "duplicate"}
]
mock_get_bookmark.return_value = bookmark
mock_get_objects.return_value = tickets
streams.AUDITS_REQUEST_PER_MINUTE = 4
streams.CONCURRENCY_LIMIT = 2

# Create an instance of the Tickets class
instance = streams.Tickets(None, {})
instance.sync_ticket_audits_and_comments = MagicMock(return_value=[
(['audit1', 'audit2'], ['comment1', 'comment2']),
(['audit3', 'audit4'], ['comment3', 'comment4']),
])

# Run the sync method
result = list(instance.sync(state))

# Assertions
self.assertEqual(mock_write_state.call_count, 5)
self.assertEqual(mock_sleep.call_count, 2)

@patch("tap_zendesk.streams.zendesk_metrics.capture")
@patch("tap_zendesk.streams.LOGGER.warning")
@patch(
Expand Down
Loading

0 comments on commit a40fd4c

Please sign in to comment.