Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TDL-15454 removed the buffer system #77

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
9 changes: 8 additions & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,19 @@ jobs:
virtualenv -p python3 /usr/local/share/virtualenvs/tap-zendesk
source /usr/local/share/virtualenvs/tap-zendesk/bin/activate
pip install .[test]
pip install coverage
- run:
name: 'pylint'
command: |
source /usr/local/share/virtualenvs/tap-zendesk/bin/activate
make test
pylint tap_zendesk -d missing-docstring,invalid-name,line-too-long,too-many-locals,too-few-public-methods,fixme,stop-iteration-return,too-many-branches,useless-import-alias,no-else-return,logging-not-lazy
nosetests --with-coverage --cover-erase --cover-package=tap_zendesk --cover-html-dir=htmlcov test/unittests
coverage html
- add_ssh_keys
- store_test_results:
path: test_output/report.xml
- store_artifacts:
path: htmlcov
- run:
name: 'Integration Tests'
command: |
Expand Down
8 changes: 5 additions & 3 deletions tap_zendesk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

LOGGER = singer.get_logger()


REQUIRED_CONFIG_KEYS = [
"start_date",
"subdomain",
Expand Down Expand Up @@ -46,9 +47,9 @@ def request_metrics_patch(self, method, url, **kwargs):
Session.request = request_metrics_patch
# end patch

def do_discover(client):
def do_discover(client, config):
LOGGER.info("Starting discover")
catalog = {"streams": discover_streams(client)}
catalog = {"streams": discover_streams(client, config)}
json.dump(catalog, sys.stdout, indent=2)
LOGGER.info("Finished discover")

Expand Down Expand Up @@ -199,7 +200,8 @@ def main():
LOGGER.error("""No suitable authentication keys provided.""")

if parsed_args.discover:
do_discover(client)
# passing the config to check the authentication in the do_discover method
do_discover(client, parsed_args.config)
elif parsed_args.catalog:
state = parsed_args.state
do_sync(client, parsed_args.catalog, state, parsed_args.config)
46 changes: 41 additions & 5 deletions tap_zendesk/discover.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import os
import json
import singer
import zenpy
from tap_zendesk.streams import STREAMS
from tap_zendesk.http import ZendeskForbiddenError

LOGGER = singer.get_logger()

def get_abs_path(path):
return os.path.join(os.path.dirname(os.path.realpath(__file__)), path)
Expand All @@ -20,12 +24,44 @@ def load_shared_schema_refs():

return shared_schema_refs

def discover_streams(client):
def discover_streams(client, config):
streams = []
error_list = []
refs = load_shared_schema_refs()

for s in STREAMS.values():
s = s(client)
schema = singer.resolve_schema_references(s.load_schema(), refs)
streams.append({'stream': s.name, 'tap_stream_id': s.name, 'schema': schema, 'metadata': s.load_metadata()})

for stream in STREAMS.values():
# for each stream in the `STREAMS` check if the user has the permission to access the data of that stream
stream = stream(client, config)
schema = singer.resolve_schema_references(stream.load_schema(), refs)
try:
# Here it call the check_access method to check whether stream have read permission or not.
# If stream does not have read permission then append that stream name to list and at the end of all streams
# raise forbidden error with proper message containing stream names.
stream.check_access()
except ZendeskForbiddenError as e:
error_list.append(stream.name) # Append stream name to the error_list
except zenpy.lib.exception.APIException as e:
args0 = json.loads(e.args[0])
err = args0.get('error')

# check if the error is of type dictionary and the message retrieved from the dictionary
# is the expected message. If so, only then print the logger message and return the schema
if isinstance(err, dict):
if err.get('message', None) == "You do not have access to this page. Please contact the account owner of this help desk for further help.":
error_list.append(stream.name)
elif args0.get('description') == "You are missing the following required scopes: read":
error_list.append(stream.name)
else:
raise e from None # raise error if it is other than 403 forbidden error

streams.append({'stream': stream.name, 'tap_stream_id': stream.name, 'schema': schema, 'metadata': stream.load_metadata()})

if error_list:
streams_name = ", ".join(error_list)
message = "HTTP-error-code: 403, Error: You are missing the following required scopes: read. "\
"The account credentials supplied do not have read access for the following stream(s): {}".format(streams_name)
raise ZendeskForbiddenError(message)


return streams
169 changes: 148 additions & 21 deletions tap_zendesk/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,154 @@
import backoff
import requests
import singer
from requests.exceptions import Timeout, HTTPError



LOGGER = singer.get_logger()


class ZendeskError(Exception):
def __init__(self, message=None, response=None):
super().__init__(message)
self.message = message
self.response = response

class ZendeskBackoffError(ZendeskError):
pass

class ZendeskBadRequestError(ZendeskError):
pass

class ZendeskUnauthorizedError(ZendeskError):
pass

class ZendeskForbiddenError(ZendeskError):
pass

class ZendeskNotFoundError(ZendeskError):
pass

class ZendeskConflictError(ZendeskError):
pass

class ZendeskUnprocessableEntityError(ZendeskError):
pass

class ZendeskRateLimitError(ZendeskBackoffError):
pass

class ZendeskInternalServerError(ZendeskBackoffError):
pass

class ZendeskNotImplementedError(ZendeskBackoffError):
pass

class ZendeskBadGatewayError(ZendeskBackoffError):
pass

class ZendeskServiceUnavailableError(ZendeskBackoffError):
pass

ERROR_CODE_EXCEPTION_MAPPING = {
400: {
"raise_exception": ZendeskBadRequestError,
"message": "A validation exception has occurred."
},
401: {
"raise_exception": ZendeskUnauthorizedError,
"message": "The access token provided is expired, revoked, malformed or invalid for other reasons."
},
403: {
"raise_exception": ZendeskForbiddenError,
"message": "You are missing the following required scopes: read"
},
404: {
"raise_exception": ZendeskNotFoundError,
"message": "The resource you have specified cannot be found."
},
409: {
"raise_exception": ZendeskConflictError,
"message": "The API request cannot be completed because the requested operation would conflict with an existing item."
},
422: {
"raise_exception": ZendeskUnprocessableEntityError,
"message": "The request content itself is not processable by the server."
},
429: {
"raise_exception": ZendeskRateLimitError,
"message": "The API rate limit for your organisation/application pairing has been exceeded."
},
500: {
"raise_exception": ZendeskInternalServerError,
"message": "The server encountered an unexpected condition which prevented" \
" it from fulfilling the request."
},
501: {
"raise_exception": ZendeskNotImplementedError,
"message": "The server does not support the functionality required to fulfill the request."
},
502: {
"raise_exception": ZendeskBadGatewayError,
"message": "Server received an invalid response."
},
503: {
"raise_exception": ZendeskServiceUnavailableError,
"message": "API service is currently unavailable."
}
}
def is_fatal(exception):
status_code = exception.response.status_code

if status_code == 429:
sleep_time = int(exception.response.headers['Retry-After'])
LOGGER.info("Caught HTTP 429, retrying request in %s seconds", sleep_time)
sleep(sleep_time)
return False

return 400 <= status_code < 500
if status_code in [429, 503]:
# If status_code is 429 or 503 then checking whether response header has 'Retry-After' attribute or not.
# If response header has 'Retry-After' attribute then retry the error otherwise raise the error directly.
retry_after = exception.response.headers.get('Retry-After')
if retry_after:
sleep_time = int(retry_after)
LOGGER.info("Caught HTTP %s, retrying request in %s seconds", status_code, sleep_time)
sleep(sleep_time)
return False
else:
return True

return 400 <=status_code < 500

def raise_for_error(response):
""" Error handling method which throws custom error. Class for each error defined above which extends `ZendeskError`.
This method map the status code with `ERROR_CODE_EXCEPTION_MAPPING` dictionary and accordingly raise the error.
If status_code is 200 then simply return json response.
"""
try:
response_json = response.json()
except Exception: # pylint: disable=broad-except
response_json = {}
if response.status_code != 200:
if response_json.get('error'):
message = "HTTP-error-code: {}, Error: {}".format(response.status_code, response_json.get('error'))
else:
message = "HTTP-error-code: {}, Error: {}".format(
response.status_code,
response_json.get("message", ERROR_CODE_EXCEPTION_MAPPING.get(
response.status_code, {}).get("message", "Unknown Error")))
exc = ERROR_CODE_EXCEPTION_MAPPING.get(
response.status_code, {}).get("raise_exception", ZendeskError)
raise exc(message, response) from None

@backoff.on_exception(backoff.expo,
requests.exceptions.HTTPError,
(HTTPError, ZendeskBackoffError),
max_tries=10,
giveup=is_fatal)
def call_api(url, params, headers):
response = requests.get(url, params=params, headers=headers)
response.raise_for_status()
@backoff.on_exception(backoff.expo,
(ConnectionError, Timeout),#As ConnectionError error and timeout error does not have attribute status_code,
max_tries=10, # here we added another backoff expression.
factor=2)
def call_api(url, request_timeout, params, headers):
response = requests.get(url, params=params, headers=headers, timeout=request_timeout) # Pass request timeout
raise_for_error(response)
return response

def get_cursor_based(url, access_token, cursor=None, **kwargs):
def get_cursor_based(url, access_token, request_timeout, cursor=None, **kwargs):
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json',
Expand All @@ -43,7 +165,7 @@ def get_cursor_based(url, access_token, cursor=None, **kwargs):
if cursor:
params['page[after]'] = cursor

response = call_api(url, params=params, headers=headers)
response = call_api(url, request_timeout, params=params, headers=headers)
response_json = response.json()

yield response_json
Expand All @@ -54,13 +176,13 @@ def get_cursor_based(url, access_token, cursor=None, **kwargs):
cursor = response_json['meta']['after_cursor']
params['page[after]'] = cursor

response = call_api(url, params=params, headers=headers)
response = call_api(url, request_timeout, params=params, headers=headers)
response_json = response.json()

yield response_json
has_more = response_json['meta']['has_more']

def get_offset_based(url, access_token, **kwargs):
def get_offset_based(url, access_token, request_timeout, **kwargs):
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json',
Expand All @@ -73,21 +195,21 @@ def get_offset_based(url, access_token, **kwargs):
**kwargs.get('params', {})
}

response = call_api(url, params=params, headers=headers)
response = call_api(url, request_timeout, params=params, headers=headers)
response_json = response.json()

yield response_json

next_url = response_json.get('next_page')

while next_url:
response = call_api(next_url, params=None, headers=headers)
response = call_api(next_url, request_timeout, params=None, headers=headers)
response_json = response.json()

yield response_json
next_url = response_json.get('next_page')

def get_incremental_export(url, access_token, start_time):
def get_incremental_export(url, access_token, request_timeout, start_time):
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json',
Expand All @@ -96,7 +218,7 @@ def get_incremental_export(url, access_token, start_time):

params = {'start_time': start_time.timestamp()}

response = call_api(url, params=params, headers=headers)
response = call_api(url, request_timeout, params=params, headers=headers)
response_json = response.json()

yield response_json
Expand All @@ -107,8 +229,13 @@ def get_incremental_export(url, access_token, start_time):
cursor = response_json['after_cursor']

params = {'cursor': cursor}
response = requests.get(url, params=params, headers=headers)
response.raise_for_status()
# Replaced below line of code with call_api method
# response = requests.get(url, params=params, headers=headers)
# response.raise_for_status()
# Because it doing the same as call_api. So, now error handling will work properly with backoff
# as earlier backoff was not possible
response = call_api(url, request_timeout, params=params, headers=headers)

response_json = response.json()

yield response_json
Expand Down
Loading