Skip to content

Commit

Permalink
payouts and payments stream enhancements (#122)
Browse files Browse the repository at this point in the history
* bugfix to replicate the payouts records

* Changelog changes

* changes to payments stream implementation to fetch the data for all location_id

* removes unnecessary write_state statements

* initial commit to make payments stream pseudo incremental

* changes in the code to make it work

* added created_at as an alternate replication key

* chsnges in comment added

* retries 5xx error payments

* add retry for cloudflare error code 1101

* changes in payments sync function

* fetching data as per brk date

* adds another error_message to retry

* logging status code when error occurs

* bump version changes

* refactoring and linting issue fixes

* addresses review comment

---------

Co-authored-by: “rdeshmukh15” <“[email protected]”>
Co-authored-by: RushiT0122 <[email protected]>
  • Loading branch information
3 people authored Nov 14, 2024
1 parent a144007 commit e6239d9
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 27 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
# Changelog

## [v2.2.0] (2024-11-14)

[Full Changelog](https://github.com/singer-io/tap-square/compare/v2.1.1...v2.2.0)

* Bug fix to replicate the `Payouts` stream records [#122](https://github.com/singer-io/tap-square/pull/120)
* Fetches `Payments` stream data for all location IDs
* Updates `Payments` stream implementation to function as a pseudo-incremental stream
* Adds retry logic for handling 5xx errors

## [v2.1.1] (2024-11-04)

[Full Changelog](https://github.com/singer-io/tap-square/compare/v2.1.0...v2.1.1)
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ This tap:
* BankAccounts
* Refunds
* Payments
* Payouts
* Payouts
* ModifierLists
* Inventories
* Orders
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from setuptools import setup

setup(name='tap-square',
version='2.1.1',
version='2.2.0',
description='Singer.io tap for extracting data from the Square API',
author='Stitch',
url='http://singer.io',
Expand Down
53 changes: 37 additions & 16 deletions tap_square/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,18 @@ def _retryable_v2_method(request_method, body, **kwargs):
result = request_method(body, **kwargs)

if result.is_error():
LOGGER.info("HTTP status code when it errors out: %s", result.status_code)
error_message = result.errors if result.errors else result.body
if 'Service Unavailable' in error_message or 'upstream connect error or disconnect/reset before headers' in error_message or result.status_code == 429:

# Refactor the conditions into separate variables for readability
is_service_unavailable = 'Service Unavailable' in error_message
is_upstream_error = 'upstream connect error or disconnect/reset before headers' in error_message
is_cf_error_1101 = '<span class="cf-error-code">1101</span>' in error_message
is_html_error = error_message.startswith('<!DOCTYPE html>')
is_status_429_or_500 = result.status_code == 429 or result.status_code >= 500

retryable_conditions = {is_service_unavailable, is_upstream_error, is_cf_error_1101, is_html_error, is_status_429_or_500}
if any(retryable_conditions):
raise RetryableError(error_message)
else:
raise RuntimeError(error_message)
Expand Down Expand Up @@ -253,23 +263,34 @@ def get_refunds(self, start_time, bookmarked_cursor):
body,
'refunds')

def get_payments(self, start_time, bookmarked_cursor):
start_time = utils.strptime_to_utc(start_time)
start_time = start_time - timedelta(milliseconds=1)
start_time = utils.strftime(start_time)
def get_payments(self, location_id, start_time, bookmarked_cursor):
if bookmarked_cursor:
cursor = bookmarked_cursor
else:
cursor = '__initial__' # initial value so while loop is always entered one time

body = {
}
body['begin_time'] = start_time
end_time = utils.strftime(utils.now(), utils.DATETIME_PARSE)
while cursor:
if cursor == '__initial__':
# Initial text was needed to go into the while loop, but api needs
# it to be a valid bookmarked cursor or None
cursor = bookmarked_cursor

if bookmarked_cursor:
body['cursor'] = bookmarked_cursor
with singer.http_request_timer('GET payments'):
result = self._retryable_v2_method(
lambda bdy: self._client.payments.list_payments(
location_id=location_id,
begin_time=start_time,
end_time=end_time,
cursor=cursor,
limit=100,
),
None,
)

yield from self._get_v2_objects(
'payments',
lambda bdy: self._client.payments.list_payments(**bdy),
body,
'payments')
yield (result.body.get('payments', []), result.body.get('cursor'))

cursor = result.body.get('cursor')

def get_cash_drawer_shifts(self, location_id, start_time, bookmarked_cursor):
if bookmarked_cursor:
Expand Down Expand Up @@ -373,6 +394,6 @@ def get_payouts(self, location_id, start_time, bookmarked_cursor):
None,
)

yield (result.body.get('items', []), result.body.get('cursor'))
yield (result.body.get('payouts', []), result.body.get('cursor'))

cursor = result.body.get('cursor')
36 changes: 27 additions & 9 deletions tap_square/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,13 @@ def sync(self, state, stream_schema, stream_metadata, config, transformer):
start_time = singer.get_bookmark(state, self.tap_stream_id, self.replication_key, config['start_date'])
bookmarked_cursor = singer.get_bookmark(state, self.tap_stream_id, 'cursor')

for page, cursor in self.get_pages_safe(state, bookmarked_cursor, start_time):
for page, _ in self.get_pages_safe(state, bookmarked_cursor, start_time):
for record in page:
transformed_record = transformer.transform(record, stream_schema, stream_metadata)
singer.write_record(
self.tap_stream_id,
transformed_record,
)
singer.write_bookmark(state, self.tap_stream_id, 'cursor', cursor)
singer.write_state(state)

state = singer.clear_bookmark(state, self.tap_stream_id, 'cursor')
singer.write_state(state)
Expand Down Expand Up @@ -180,16 +178,36 @@ def get_pages(self, bookmarked_cursor, start_time):
yield from self.client.get_refunds(start_time, bookmarked_cursor)


class Payments(FullTableStream):
class Payments(Stream):
tap_stream_id = 'payments'
key_properties = ['id']
replication_method = 'FULL_TABLE'
valid_replication_keys = []
replication_key = None
replication_method = 'INCREMENTAL'
valid_replication_keys = ['updated_at']
replication_key = 'updated_at'
# If the records are not updated at all since those are created and if it has missing the updated_at field
second_replication_key = 'created_at'
object_type = 'PAYMENT'

def get_pages(self, bookmarked_cursor, start_time):
yield from self.client.get_payments(start_time, bookmarked_cursor)

def sync(self, state, stream_schema, stream_metadata, config, transformer):
bookmarked_time = singer.get_bookmark(state, self.tap_stream_id, self.replication_key, config['start_date'])
max_bookmark_value = bookmarked_time
all_location_ids = Locations.get_all_location_ids(self.client)

for location_id in all_location_ids:
for page, _ in self.client.get_payments(location_id, bookmarked_time, bookmarked_cursor=None):
for record in page:
transformed_record = transformer.transform(record, stream_schema, stream_metadata)

if record.get(self.replication_key, self.second_replication_key) >= bookmarked_time:
singer.write_record(self.tap_stream_id, transformed_record,)
max_bookmark_value = max(transformed_record.get(self.replication_key) or \
transformed_record.get(self.second_replication_key), \
max_bookmark_value)

state = singer.write_bookmark(state, self.tap_stream_id, self.replication_key, max_bookmark_value)
singer.write_state(state)
return state


class Orders(Stream):
Expand Down

0 comments on commit e6239d9

Please sign in to comment.