Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TDL-15454 removed the buffer system #77

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
9 changes: 8 additions & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,19 @@ jobs:
virtualenv -p python3 /usr/local/share/virtualenvs/tap-zendesk
source /usr/local/share/virtualenvs/tap-zendesk/bin/activate
pip install .[test]
pip install coverage
- run:
name: 'pylint'
command: |
source /usr/local/share/virtualenvs/tap-zendesk/bin/activate
make test
pylint tap_zendesk -d missing-docstring,invalid-name,line-too-long,too-many-locals,too-few-public-methods,fixme,stop-iteration-return,too-many-branches,useless-import-alias,no-else-return,logging-not-lazy
nosetests --with-coverage --cover-erase --cover-package=tap_zendesk --cover-html-dir=htmlcov test/unittests
coverage html
- add_ssh_keys
- store_test_results:
path: test_output/report.xml
- store_artifacts:
path: htmlcov
- run:
name: 'Integration Tests'
command: |
Expand Down
55 changes: 5 additions & 50 deletions tap_zendesk/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import singer
from singer import metadata
from singer import utils
from singer.metrics import Point
from tap_zendesk import metrics as zendesk_metrics
from tap_zendesk import http

Expand Down Expand Up @@ -259,30 +258,6 @@ class Tickets(CursorBasedExportStream):
item_key = "tickets"
endpoint = "https://{}.zendesk.com/api/v2/incremental/tickets/cursor.json"

last_record_emit = {}
buf = {}
buf_time = 60
def _buffer_record(self, record):
stream_name = record[0].tap_stream_id
if self.last_record_emit.get(stream_name) is None:
self.last_record_emit[stream_name] = utils.now()

if self.buf.get(stream_name) is None:
self.buf[stream_name] = []
self.buf[stream_name].append(record)

if (utils.now() - self.last_record_emit[stream_name]).total_seconds() > self.buf_time:
self.last_record_emit[stream_name] = utils.now()
return True

return False

def _empty_buffer(self):
for stream_name, stream_buf in self.buf.items():
for rec in stream_buf:
yield rec
self.buf[stream_name] = []

def sync(self, state): #pylint: disable=too-many-statements
bookmark = self.get_bookmark(state)
tickets = self.get_objects(bookmark)
Expand All @@ -291,14 +266,6 @@ def sync(self, state): #pylint: disable=too-many-statements
metrics_stream = TicketMetrics(self.client, self.config)
comments_stream = TicketComments(self.client, self.config)

def emit_sub_stream_metrics(sub_stream):
if sub_stream.is_selected():
singer.metrics.log(LOGGER, Point(metric_type='counter',
metric=singer.metrics.Metric.record_count,
value=sub_stream.count,
tags={'endpoint':sub_stream.stream.tap_stream_id}))
sub_stream.count = 0

if audits_stream.is_selected():
LOGGER.info("Syncing ticket_audits per ticket...")

Expand All @@ -308,12 +275,12 @@ def emit_sub_stream_metrics(sub_stream):
self.update_bookmark(state, utils.strftime(generated_timestamp_dt))

ticket.pop('fields') # NB: Fields is a duplicate of custom_fields, remove before emitting
should_yield = self._buffer_record((self.stream, ticket))
yield (self.stream, ticket)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason for yielding self.stream? instead of just ticket

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because the buffer_records also had self.stream passed as an argument. And it helps in identifying which stream is yielded.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes because earlier it is required to identify the stream in function _buffer_record as there is login on the stream name, I don't see that logic now, do we still need this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added a comment regarding why we are yielding stream name and rec in the code


if audits_stream.is_selected():
try:
for audit in audits_stream.sync(ticket["id"]):
self._buffer_record(audit)
yield audit
except HTTPError as e:
if e.response.status_code == 404:
LOGGER.warning("Unable to retrieve audits for ticket (ID: %s), record not found", ticket['id'])
Expand All @@ -323,7 +290,7 @@ def emit_sub_stream_metrics(sub_stream):
if metrics_stream.is_selected():
try:
for metric in metrics_stream.sync(ticket["id"]):
self._buffer_record(metric)
yield metric
except HTTPError as e:
if e.response.status_code == 404:
LOGGER.warning("Unable to retrieve metrics for ticket (ID: %s), record not found", ticket['id'])
Expand All @@ -335,26 +302,14 @@ def emit_sub_stream_metrics(sub_stream):
# add ticket_id to ticket_comment so the comment can
# be linked back to it's corresponding ticket
for comment in comments_stream.sync(ticket["id"]):
self._buffer_record(comment)
yield comment
except HTTPError as e:
if e.response.status_code == 404:
LOGGER.warning("Unable to retrieve comments for ticket (ID: %s), record not found", ticket['id'])
else:
raise e

if should_yield:
for rec in self._empty_buffer():
yield rec
emit_sub_stream_metrics(audits_stream)
emit_sub_stream_metrics(metrics_stream)
emit_sub_stream_metrics(comments_stream)
singer.write_state(state)

for rec in self._empty_buffer():
yield rec
emit_sub_stream_metrics(audits_stream)
emit_sub_stream_metrics(metrics_stream)
emit_sub_stream_metrics(comments_stream)
singer.write_state(state)
singer.write_state(state)

class TicketAudits(Stream):
Expand Down
186 changes: 186 additions & 0 deletions test/unittests/test_yield_records.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
from tap_zendesk import LOGGER, oauth_auth
import unittest
from unittest import mock
from unittest.mock import patch
from tap_zendesk.streams import Stream, Tickets, zendesk_metrics
from tap_zendesk.sync import sync_stream
from tap_zendesk import Zenpy
import json

class Zenpy():
def __init__(self) -> None:
pass

def mocked_sync_audits(ticket_id=None):
"""
Mock the audit records which are retrieved in the sync function of the Audits stream
"""
ticket_audits = [
{
"author_id":387494208358,
"created_at":"2021-10-11T12:23:20.000000Z",
"id":910518732098,
"ticket_id":2
},
{
"author_id":387494208358,
"created_at":"2021-10-11T12:24:05.000000Z",
"id":910519204898,
"ticket_id":2,
}
]
for audit in ticket_audits:
yield ('ticket_audits', audit)

def mocked_sync_metrics(ticket_id=None):
"""
Mock the metric records which are retrieved in the sync function of the Audits stream
"""
ticket_metrics = [
{
"author_id":387494208358,
"created_at":"2021-10-11T12:23:20.000000Z",
"id":910518732090,
"ticket_id":2
},
{
"author_id":387494208358,
"created_at":"2021-10-11T12:24:05.000000Z",
"id":910519204892,
"ticket_id":2,
}
]
for metric in ticket_metrics:
yield ('ticket_metrics', metric)

def mocked_sync_comments(ticket_id=None):
"""
Mock the comment records which are retrieved in the sync function of the Audits stream
"""
ticket_comments = [
{
"author_id":387494208356,
"created_at":"2021-10-11T12:23:20.000000Z",
"id":910518732090,
"ticket_id":2
},
{
"author_id":387494208354,
"created_at":"2021-10-11T12:24:05.000000Z",
"id":910519204892,
"ticket_id":2,
}
]
for comment in ticket_comments:
yield ('ticket_comments', comment)

@mock.patch('tap_zendesk.streams.Stream.update_bookmark')
@mock.patch('tap_zendesk.streams.Stream.get_bookmark')
@mock.patch('tap_zendesk.streams.TicketAudits.is_selected')
@mock.patch('tap_zendesk.streams.TicketMetrics.is_selected')
@mock.patch('tap_zendesk.streams.TicketComments.is_selected')
@mock.patch('tap_zendesk.streams.TicketAudits.sync')
@mock.patch('tap_zendesk.streams.TicketMetrics.sync')
@mock.patch('tap_zendesk.streams.TicketComments.sync')
@mock.patch('tap_zendesk.streams.CursorBasedExportStream.get_objects')
def test_yield_records(mock_objects, mock_comments_sync, mock_metrics_sync, mock_audits_sync, mock_comments, mock_metrics, mock_audits, mock_get_bookmark, mock_update_bookmark):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add comments in unit test

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

"""
This function tests that the Tickets and its substreams' records are yielded properly.
"""
ticket_stream = Tickets(Zenpy(), {})
# mocked ticket record for get_objects() function
tickets = [{
"url":"https://talend1234.zendesk.com/api/v2/tickets/1.json",
"id":2,
"external_id":"None",
"created_at":"2021-10-11T12:12:31Z",
"updated_at":"2021-10-12T08:37:28Z",
"requester_id":387331462257,
"submitter_id":387494208358,
"assignee_id":387494208358,
"organization_id":"None",
"group_id":360010350357,
"due_at":"None",
"ticket_form_id":360003740737,
"brand_id":360004806057,
"generated_timestamp":1634027848,
"fields": []
}]
mock_objects.return_value = tickets

# expected audit records after yield
expected_audits = [
{
"author_id":387494208358,
"created_at":"2021-10-11T12:23:20.000000Z",
"id":910518732098,
"ticket_id":2
},
{
"author_id":387494208358,
"created_at":"2021-10-11T12:24:05.000000Z",
"id":910519204898,
"ticket_id":2,
}
]

# expected metric records after yield
expected_metrics = [
{
"author_id":387494208358,
"created_at":"2021-10-11T12:23:20.000000Z",
"id":910518732090,
"ticket_id":2
},
{
"author_id":387494208358,
"created_at":"2021-10-11T12:24:05.000000Z",
"id":910519204892,
"ticket_id":2,
}
]

# expected comment records after yield
expected_comments = [
{
"author_id":387494208356,
"created_at":"2021-10-11T12:23:20.000000Z",
"id":910518732090,
"ticket_id":2
},
{
"author_id":387494208354,
"created_at":"2021-10-11T12:24:05.000000Z",
"id":910519204892,
"ticket_id":2,
}
]
mock_metrics.return_value = True
mock_audits.return_value = True
mock_comments.return_value = True
mock_update_bookmark.side_effect = None
mock_metrics_sync.side_effect = mocked_sync_metrics
mock_audits_sync.side_effect = mocked_sync_audits
mock_comments_sync.side_effect = mocked_sync_comments

expected_tickets = list(ticket_stream.sync(state={}))
audits = []
metrics = []
comments = []

# the yield returns a list with the first element as the parent stream tickets record
# and other elements as a tuple with the first element as the name of the stream and the second element
# as the record of that stream. Hence we are checking if each element of the stream and appending in our
# custom list and asserting all the lists at last.
for count, each in enumerate(expected_tickets):
if count == 0:
continue
if each[0] == 'ticket_audits':
audits.append(each[1])
if each[0] == 'ticket_metrics':
metrics.append(each[1])
if each[0] == 'ticket_comments':
comments.append(each[1])
assert expected_audits == audits
assert expected_metrics == metrics
assert expected_comments == comments