Skip to content

Commit

Permalink
TDL-20059: Implement currently_syncing (#55)
Browse files Browse the repository at this point in the history
* added currently syncing functionality

* added interrupted sync tap-tester test

* updated test case

* Updated the tests method to use common test name to use token chaining

* Updated the test method to use common test name to use token chaining

* updated interrupted sync test

* updated interrupted sync test

* added docstring for test case name

* updated comment indentation

* updated tap-tester tests

* updated interrupted sync test case

* updated interrupted sync test case

* formatted test case

* resolve unittest failure

Co-authored-by: RushT007 <[email protected]>
  • Loading branch information
hpatel41 and RushT007 authored Sep 27, 2022
1 parent cc22323 commit bc20ae5
Show file tree
Hide file tree
Showing 12 changed files with 298 additions and 56 deletions.
7 changes: 7 additions & 0 deletions tap_quickbooks/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ def do_sync(client, config, state, catalog):
)

LOGGER.info("Syncing stream: %s", stream_id)
# Set currently syncing stream
state = singer.set_currently_syncing(state, stream_id)
singer.write_state(state)

with Transformer() as transformer:
for rec in stream_object.sync():
Expand All @@ -36,3 +39,7 @@ def do_sync(client, config, state, catalog):
transformer.transform(rec,
stream.schema.to_dict(),
metadata.to_map(stream.metadata)))

# Remove currently syncing after successful sync
state = singer.set_currently_syncing(state, None)
singer.write_state(state)
35 changes: 29 additions & 6 deletions tests/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import time
from datetime import datetime as dt
from datetime import timedelta
import dateutil.parser
import pytz

import tap_tester.menagerie as menagerie
import tap_tester.connections as connections
Expand Down Expand Up @@ -31,6 +33,13 @@ class TestQuickbooksBase(unittest.TestCase):
"%Y-%m-%dT%H:%M:%S%z"
}

def name(self):
"""
Quickbooks uses the token chaining to get the existing token which requires
all tests to have same name So do not overwrite the test name below
"""
return "tap_tester_quickbooks_combined_test"

def setUp(self):
missing_envs = [x for x in [
'TAP_QUICKBOOKS_OAUTH_CLIENT_ID',
Expand All @@ -49,12 +58,17 @@ def get_type():
def tap_name():
return "tap-quickbooks"

def get_properties(self):
return {
'start_date' : '2016-06-02T00:00:00Z',
'sandbox': 'true'
}

def get_properties(self, original=True):
if original:
return {
'start_date' : '2016-06-02T00:00:00Z',
'sandbox': 'true'
}
else:
return {
'start_date' : self.start_date,
'sandbox': 'true'
}

def get_credentials(self):
return {
Expand Down Expand Up @@ -255,3 +269,12 @@ def dt_to_ts(self, dtime):

def is_report_stream(self, stream):
return stream in ["profit_loss_report"]

def convert_state_to_utc(self, date_str):
"""
Convert a saved bookmark value of the form '2020-08-25T13:17:36-07:00' to a string
formatted utc datetime, in order to compare against the json formatted datetime values
"""
date_object = dateutil.parser.parse(date_str)
date_object_utc = date_object.astimezone(tz=pytz.UTC)
return dt.strftime(date_object_utc, "%Y-%m-%dT%H:%M:%SZ")
3 changes: 0 additions & 3 deletions tests/test_quickbooks_all_fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,6 @@ class TestQuickbooksAllFields(TestQuickbooksBase):
]
}

def name(self):
return "tap_tester_quickbooks_combined_test"

def test_run(self):
"""
Testing that all fields mentioned in the catalog are synced from the tap
Expand Down
4 changes: 0 additions & 4 deletions tests/test_quickbooks_automatic_fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,6 @@
page_size_key = 'max_results'

class TestQuickbooksAutomaticFields(TestQuickbooksBase):
"""Test case to verify we are replicating automatic fields data when all the fields are not selected"""

def name(self):
return "tap_tester_quickbooks_combined_test"

@staticmethod
def get_selected_fields_from_metadata(metadata):
Expand Down
14 changes: 0 additions & 14 deletions tests/test_quickbooks_bookmarks.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,10 @@


class TestQuickbooksBookmarks(TestQuickbooksBase):
"""Test case to verify the Tap is writing bookmark as expectation"""

def name(self):
return "tap_tester_quickbooks_combined_test"

def expected_streams(self):
return self.expected_check_streams().difference({'budgets'})

def convert_state_to_utc(self, date_str):
"""
Convert a saved bookmark value of the form '2020-08-25T13:17:36-07:00' to
a string formatted utc datetime,
in order to compare aginast json formatted datetime values
"""
date_object = dateutil.parser.parse(date_str)
date_object_utc = date_object.astimezone(tz=pytz.UTC)
return datetime.datetime.strftime(date_object_utc, "%Y-%m-%dT%H:%M:%SZ")

def calculated_states_by_stream(self, current_state):
"""
Look at the bookmarks from a previous sync and set a new bookmark
Expand Down
4 changes: 0 additions & 4 deletions tests/test_quickbooks_discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,6 @@
from base import TestQuickbooksBase

class TestQuickbooksDiscovery(TestQuickbooksBase):
"""Test case to verify the Tap is creating the catalog file as expected"""

def name(self):
return "tap_tester_quickbooks_combined_test"

def test_run(self):
conn_id = self.ensure_connection()
Expand Down
168 changes: 168 additions & 0 deletions tests/test_quickbooks_interrupted_sync.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
from datetime import datetime as dt
from tap_tester import runner, connections, menagerie
from base import TestQuickbooksBase

class TestQuickbooksInterruptedSyncTest(TestQuickbooksBase):

def assertIsDateFormat(self, value, str_format):
"""
Assertion Method that verifies a string value is a formatted datetime with
the specified format.
"""
try:
dt.strptime(value, str_format)
except ValueError as err:
raise AssertionError(f"Value: {value} does not conform to expected format: {str_format}") from err

def test_run(self):
"""
Scenario: A sync job is interrupted. The state is saved with `currently_syncing`.
The next sync job kicks off and the tap picks back up on that `currently_syncing` stream.
Expected State Structure:
{
"currently_syncing": "stream-name",
"bookmarks": {
"stream-name-1": {"LastUpdatedTime": "2021-06-10T11:46:26-07:00"},
"stream-name-2": {"LastUpdatedTime": "2021-06-10T11:46:26-07:00"}
}
}
Test Cases:
- Verify an interrupted sync can resume based on the `currently_syncing` and stream level bookmark value.
- Verify only records with replication-key values greater than or equal to the stream level bookmark are
replicated on the resuming sync for the interrupted stream.
- Verify the yet-to-be-synced streams are replicated following the interrupted stream in the resuming sync.
"""

self.start_date = "2021-01-01T00:00:00Z"
start_date_datetime = dt.strptime(self.start_date, "%Y-%m-%dT%H:%M:%SZ")

conn_id = self.ensure_connection(original=False)

expected_streams = {"accounts", "bill_payments", "payments"}

# Run in check mode
check_job_name = runner.run_check_mode(self, conn_id)

# Verify check exit codes
exit_status = menagerie.get_exit_status(conn_id, check_job_name)
menagerie.verify_check_exit_status(self, exit_status, check_job_name)

found_catalogs = menagerie.get_catalogs(conn_id)
self.assertGreater(len(found_catalogs), 0, msg="unable to locate schemas for connection {}".format(conn_id))

# Select only the expected streams tables
catalog_entries = [ce for ce in found_catalogs if ce['tap_stream_id'] in expected_streams]

# Catalog selection
self.select_all_streams_and_fields(conn_id, catalog_entries)

# Run a sync job using orchestrator
sync_job_name = runner.run_sync_mode(self, conn_id)
synced_records_full_sync = runner.get_records_from_target_output()

# Verify tap and target exit codes
exit_status = menagerie.get_exit_status(conn_id, sync_job_name)
menagerie.verify_sync_exit_status(self, exit_status, sync_job_name)

full_sync_state = menagerie.get_state(conn_id)

# State to run 2nd sync
# bill_payments: currently syncing
# accounts: synced records successfully
# payments: remaining to sync
state = {
"currently_syncing": "bill_payments",
"bookmarks": { "accounts": {"LastUpdatedTime": "2021-08-10T01:10:04-07:00"}}
}

# Set state for 2nd sync
menagerie.set_state(conn_id, state)

# Run sync after interruption
sync_job_name = runner.run_sync_mode(self, conn_id)
synced_records_interrupted_sync = runner.get_records_from_target_output()
record_count_by_stream_interrupted_sync = runner.examine_target_output_file(
self, conn_id, self.expected_streams(), self.expected_primary_keys())

# Verify tap and target exit codes
exit_status = menagerie.get_exit_status(conn_id, sync_job_name)
menagerie.verify_sync_exit_status(self, exit_status, sync_job_name)

final_state = menagerie.get_state(conn_id)
currently_syncing = final_state.get('currently_syncing')

# Checking resuming the sync resulted in a successfully saved state
with self.subTest():

# Verify sync is not interrupted by checking currently_syncing in the state for sync
self.assertIsNone(currently_syncing)

# Verify bookmarks are saved
self.assertIsNotNone(final_state.get('bookmarks'))

# Verify final_state is equal to uninterrupted sync's state
# (This is what the value would have been without an interruption and proves resuming succeeds)
self.assertDictEqual(final_state, full_sync_state)

# Stream level assertions
for stream in expected_streams:
with self.subTest(stream=stream):

# Gather actual results
full_records = [message['data'] for message in synced_records_full_sync.get(stream, {}).get('messages', [])]
interrupted_records = [message['data'] for message in synced_records_interrupted_sync.get(stream, {}).get('messages', [])]
interrupted_record_count = record_count_by_stream_interrupted_sync.get(stream, 0)

# Final bookmark after interrupted sync
final_stream_bookmark = final_state['bookmarks'][stream]['LastUpdatedTime']

# NOTE: All streams are INCREMENTAL streams for this Tap.

# Verify final bookmark saved the match formatting standards for resuming sync
self.assertIsNotNone(final_stream_bookmark)
self.assertIsInstance(final_stream_bookmark, str)
self.assertIsDateFormat(final_stream_bookmark, "%Y-%m-%dT%H:%M:%S%z") # Bookmark is being saved with timezone

if stream == state['currently_syncing']:

# Assign the start date to the interrupted stream
interrupted_stream_datetime = start_date_datetime

# - Verify resuming sync only replicates records with replication key values greater or
# equal to the state for streams that were replicated during the interrupted sync.
# - Verify the interrupted sync replicates the expected record set all interrupted records are in full records
for record in interrupted_records:
rec_time = dt.strptime(record.get('MetaData').get('LastUpdatedTime'), "%Y-%m-%dT%H:%M:%S.%fZ")
self.assertGreaterEqual(rec_time, interrupted_stream_datetime)

self.assertIn(record, full_records, msg='Incremental table record in interrupted sync not found in full sync')

# Record count for all streams of interrupted sync match expectations
full_records_after_interrupted_bookmark = 0
for record in full_records:
rec_time = dt.strptime(record.get('MetaData').get('LastUpdatedTime'), "%Y-%m-%dT%H:%M:%S.%fZ")
if rec_time >= interrupted_stream_datetime:
full_records_after_interrupted_bookmark += 1

self.assertEqual(full_records_after_interrupted_bookmark, interrupted_record_count, \
msg='Expected {} records in each sync'.format(full_records_after_interrupted_bookmark))

else:
# Get the date to start 2nd sync for non-interrupted streams
synced_stream_bookmark = state['bookmarks'].get(stream, {}).get('LastUpdatedTime')
if synced_stream_bookmark:
synced_stream_datetime = dt.strptime(self.convert_state_to_utc(synced_stream_bookmark), "%Y-%m-%dT%H:%M:%SZ")
else:
synced_stream_datetime = start_date_datetime

# Verify we replicated some records for the non-interrupted streams
self.assertGreater(interrupted_record_count, 0)

# - Verify resuming sync only replicates records with replication key values greater or equal to
# the state for streams that were replicated during the interrupted sync.
# - Verify resuming sync replicates all records that were found in the full sync (non-interupted)
for record in interrupted_records:
rec_time = dt.strptime(record.get('MetaData').get('LastUpdatedTime'), "%Y-%m-%dT%H:%M:%S.%fZ")
self.assertGreaterEqual(rec_time, synced_stream_datetime)

self.assertIn(record, full_records, msg='Unexpected record replicated in resuming sync.')
4 changes: 0 additions & 4 deletions tests/test_quickbooks_pagination.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,6 @@
page_size_key = 'max_results'

class TestQuickbooksPagination(TestQuickbooksBase):
"""Test case to verify the pagination is working as expected"""

def name(self):
return "tap_tester_quickbooks_combined_test"

def expected_streams(self):
"""
Expand Down
4 changes: 0 additions & 4 deletions tests/test_quickbooks_start_date.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,6 @@
from base import TestQuickbooksBase

class TestQuickbooksStartDate(TestQuickbooksBase):
"""Test case to verify the Tap respected the start date provided in the config"""

def name(self):
return "tap_tester_quickbooks_combined_test"

def expected_streams(self):
"""All streams are under test"""
Expand Down
4 changes: 0 additions & 4 deletions tests/test_quickbooks_sync_all.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,6 @@
from base import TestQuickbooksBase

class TestQuickbooksSyncAll(TestQuickbooksBase):
"""Test case to verify the working of the Tap"""

def name(self):
return "tap_tester_quickbooks_combined_test"

def test_run(self):
conn_id = self.ensure_connection()
Expand Down
Loading

0 comments on commit bc20ae5

Please sign in to comment.