From efb145837853827fb7816acbb3bd6a6801eb9b7d Mon Sep 17 00:00:00 2001 From: savan-chovatiya Date: Fri, 23 Jul 2021 19:12:01 +0530 Subject: [PATCH 01/12] TDL-13967: Added ProfitAndLoss report stream --- .../schemas/profit_loss_report.json | 28 +++ tap_quickbooks/streams.py | 135 ++++++++++- tests/base.py | 35 ++- tests/test_quickbooks_bookmarks.py | 21 +- tests/test_quickbooks_pagination.py | 22 +- tests/test_quickbooks_start_date.py | 16 +- tests/unittests/test_reports_parser.py | 225 ++++++++++++++++++ 7 files changed, 458 insertions(+), 24 deletions(-) create mode 100644 tap_quickbooks/schemas/profit_loss_report.json create mode 100644 tests/unittests/test_reports_parser.py diff --git a/tap_quickbooks/schemas/profit_loss_report.json b/tap_quickbooks/schemas/profit_loss_report.json new file mode 100644 index 0000000..39a089b --- /dev/null +++ b/tap_quickbooks/schemas/profit_loss_report.json @@ -0,0 +1,28 @@ +{ + "type": [ + "null", + "object" + ], + "properties": { + "ReportDate": { + "type": [ + "null", + "string" + ], + "format": "date-time" + }, + "AccountingMethod": { + "type": [ + "null", + "string" + ] + }, + "Details": { + "type": [ + "null", + "object" + ], + "properties": {} + } + } +} \ No newline at end of file diff --git a/tap_quickbooks/streams.py b/tap_quickbooks/streams.py index 21f1347..9f8210b 100644 --- a/tap_quickbooks/streams.py +++ b/tap_quickbooks/streams.py @@ -1,6 +1,11 @@ import tap_quickbooks.query_builder as query_builder import singer +from singer import utils +from singer.utils import strptime_to_utc, strftime +from datetime import timedelta + +DATE_WINDOW_SIZE = 30 class Stream: endpoint = '/v3/company/{realm_id}/query' @@ -193,6 +198,133 @@ class Vendors(Stream): table_name = 'Vendor' additional_where = "Active IN (true, false)" +class ReportStream(Stream): + parsed_metadata = { + 'dates': [], + 'data': [] + } + key_properties = [] + replication_method = 'INCREMENTAL' + # replication keys is ReportDate, manually created from data + replication_keys = ['ReportDate'] + + def sync(self): + + start_dttm_str = singer.get_bookmark(self.state, self.stream_name, 'LastUpdatedTime', self.config.get('start_date')) + start_dttm = strptime_to_utc(start_dttm_str) + + end_dttm = start_dttm + timedelta(days=DATE_WINDOW_SIZE) + now_dttm = utils.now() + + if end_dttm > now_dttm: + end_dttm = now_dttm + params = { + 'summarize_column_by': 'Days' + } + + while start_dttm < now_dttm: + self.parsed_metadata = { + 'dates': [], + 'data': [] + } + + start_tm_str = strftime(start_dttm)[0:10] + end_tm_str = strftime(end_dttm)[0:10] + + params["start_date"] = start_tm_str + params["end_date"] = end_tm_str + + resp = self.client.get(self.endpoint, params=params) + self.parse_report_metadata(resp) + + for report in self.day_wise_reports(): + yield report + + self.state = singer.write_bookmark(self.state, self.stream_name, 'LastUpdatedTime', strptime_to_utc(report.get('ReportDate')).isoformat()) + singer.write_state(self.state) + + start_dttm = end_dttm + end_dttm = start_dttm + timedelta(days=DATE_WINDOW_SIZE) + + if end_dttm > now_dttm: + end_dttm = now_dttm + + singer.write_state(self.state) + + def parse_report_metadata(self, pileOfRows): + ''' + Restructure data from report response on daily basis and update self.parsed_metadata dictionary + { + "dates": ["2021-07-01", "2021-07-02", "2021-07-03"], + "data": [ { + "name": "Total Income", + "values": ["4.00", "4.00", "4.00", "12.00"] + }, { + "name": "Gross Profit", + "values": ["4.00", "4.00", "4.00", "12.00"] + }, { + "name": "Total Expenses", + "values": ["1.00", "1.00", "1.00", "3.00"] + }, { + "name": "Net Income", + "values": ["3.00", "3.00", "3.00", "9.00"] + }] + } + ''' + + if isinstance(pileOfRows, list): + for x in pileOfRows: + self.parse_report_metadata(x) + + else: + + if 'Rows' in pileOfRows.keys(): + self.parse_report_metadata(pileOfRows['Rows']) + + if 'Row' in pileOfRows.keys(): + self.parse_report_metadata(pileOfRows['Row']) + + if 'Summary' in pileOfRows.keys(): + self.parse_report_metadata(pileOfRows['Summary']) + + if 'ColData' in pileOfRows.keys(): + d = dict() + d['name'] = pileOfRows['ColData'][0]['value'] + vals = [] + for x in pileOfRows['ColData'][1:]: + vals.append(x['value']) + d['values'] = vals + self.parsed_metadata['data'].append(d) + + if 'Columns' in pileOfRows.keys(): + self.parse_report_metadata(pileOfRows['Columns']) + + if 'Column' in pileOfRows.keys(): + for x in pileOfRows['Column']: + if 'MetaData' in x.keys(): + for md in x['MetaData']: + if md['Name'] in ['StartDate']: + self.parsed_metadata['dates'].append(md['Value']) + + def day_wise_reports(self): + ''' + Return record for every day formed using output of parse_report_metadata + ''' + for i, date in enumerate(self.parsed_metadata['dates']): + report = dict() + report['ReportDate'] = date + report['AccountingMethod'] = 'Accrual' + report['Details'] = {} + + for data in self.parsed_metadata['data']: + report['Details'][data['name']] = data['values'][i] + + yield report + + +class ProfitAndLossReport(ReportStream): + stream_name = 'profit_loss_report' + endpoint = '/v3/company/{realm_id}/reports/ProfitAndLoss' STREAM_OBJECTS = { "accounts": Accounts, @@ -222,5 +354,6 @@ class Vendors(Stream): "time_activities": TimeActivities, "transfers": Transfers, "vendor_credits": VendorCredits, - "vendors": Vendors + "vendors": Vendors, + "profit_loss_report": ProfitAndLossReport } diff --git a/tests/base.py b/tests/base.py index eee3d9e..3de2dbb 100644 --- a/tests/base.py +++ b/tests/base.py @@ -1,5 +1,6 @@ import os import unittest +import time from datetime import datetime as dt from datetime import timedelta @@ -23,6 +24,12 @@ class TestQuickbooksBase(unittest.TestCase): INCREMENTAL = "INCREMENTAL" FULL = "FULL_TABLE" START_DATE_FORMAT = "%Y-%m-%dT00:00:00Z" # %H:%M:%SZ + DATETIME_FMT = { + "%Y-%m-%dT%H:%M:%SZ", + "%Y-%m-%d %H:%M:%S", + "%Y-%m-%dT%H:%M:%S.000000Z", + "%Y-%m-%dT%H:%M:%S%z" + } def setUp(self): missing_envs = [x for x in [ @@ -89,6 +96,7 @@ def expected_check_streams(): "transfers", "vendor_credits", "vendors", + "profit_loss_report" } def expected_metadata(self): @@ -96,11 +104,17 @@ def expected_metadata(self): mdata = {} for stream in self.expected_check_streams(): - mdata[stream] = { - self.PRIMARY_KEYS: {'Id'}, - self.REPLICATION_METHOD: self.INCREMENTAL, - self.REPLICATION_KEYS: {'MetaData'}, - } + if self.is_report_stream(stream): + mdata[stream] = { + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {'ReportDate'}, + } + else: + mdata[stream] = { + self.PRIMARY_KEYS: {'Id'}, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {'MetaData'}, + } return mdata @@ -222,3 +236,14 @@ def minimum_record_count_by_stream(self): record_counts["vendors"] = 26 return record_counts + + def dt_to_ts(self, dtime): + for date_format in self.DATETIME_FMT: + try: + date_stripped = int(time.mktime(dt.strptime(dtime, date_format).timetuple())) + return date_stripped + except ValueError: + continue + + def is_report_stream(self, stream): + return stream in ["profit_loss_report"] diff --git a/tests/test_quickbooks_bookmarks.py b/tests/test_quickbooks_bookmarks.py index bbe0220..d85419d 100644 --- a/tests/test_quickbooks_bookmarks.py +++ b/tests/test_quickbooks_bookmarks.py @@ -138,20 +138,29 @@ def test_run(self): # Verify the second sync records respect the previous (simulated) bookmark value simulated_bookmark_value = new_state['bookmarks'][stream][sub_level_replication_key] for message in second_sync_messages: - replication_key_value = message.get('data').get(top_level_replication_key).get(sub_level_replication_key) - self.assertGreaterEqual(replication_key_value, simulated_bookmark_value, + if self.is_report_stream(stream): + replication_key_value = message.get('data').get('ReportDate') + else: + replication_key_value = message.get('data').get(top_level_replication_key).get(sub_level_replication_key) + self.assertGreaterEqual(self.dt_to_ts(replication_key_value), self.dt_to_ts(simulated_bookmark_value), msg="Second sync records do not repect the previous bookmark.") # Verify the first sync bookmark value is the max replication key value for a given stream for message in first_sync_messages: - replication_key_value = message.get('data').get(top_level_replication_key).get(sub_level_replication_key) - self.assertLessEqual(replication_key_value, first_bookmark_value_utc, + if self.is_report_stream(stream): + replication_key_value = message.get('data').get('ReportDate') + else: + replication_key_value = message.get('data').get(top_level_replication_key).get(sub_level_replication_key) + self.assertLessEqual(self.dt_to_ts(replication_key_value), self.dt_to_ts(first_bookmark_value_utc), msg="First sync bookmark was set incorrectly, a record with a greater rep key value was synced") # Verify the second sync bookmark value is the max replication key value for a given stream for message in second_sync_messages: - replication_key_value = message.get('data').get(top_level_replication_key).get(sub_level_replication_key) - self.assertLessEqual(replication_key_value, second_bookmark_value_utc, + if self.is_report_stream(stream): + replication_key_value = message.get('data').get('ReportDate') + else: + replication_key_value = message.get('data').get(top_level_replication_key).get(sub_level_replication_key) + self.assertLessEqual(self.dt_to_ts(replication_key_value), self.dt_to_ts(second_bookmark_value_utc), msg="Second sync bookmark was set incorrectly, a record with a greater rep key value was synced") # Verify the number of records in the 2nd sync is less then the first diff --git a/tests/test_quickbooks_pagination.py b/tests/test_quickbooks_pagination.py index 7a8d06a..8bb4bca 100644 --- a/tests/test_quickbooks_pagination.py +++ b/tests/test_quickbooks_pagination.py @@ -67,18 +67,24 @@ def test_run(self): sync_messages = sync_records.get(stream, {'messages': []}).get('messages') - primary_key = self.expected_primary_keys().get(stream).pop() - # Verify the sync meets or exceeds the default record count self.assertLessEqual(expected_count, record_count) # Verify the number or records exceeds the max_results (api limit) - pagination_threshold = int(self.get_properties().get(page_size_key)) + if self.is_report_stream(stream): + #Tap is making API call in 30 days window for reports stream + pagination_threshold = 30 + else: + pagination_threshold = int(self.get_properties().get(page_size_key)) self.assertGreater(record_count, pagination_threshold, msg="Record count not large enough to gaurantee pagination.") - # Verify we did not duplicate any records across pages - records_pks_set = {message.get('data').get(primary_key) for message in sync_messages} - records_pks_list = [message.get('data').get(primary_key) for message in sync_messages] - self.assertCountEqual(records_pks_set, records_pks_list, - msg="We have duplicate records for {}".format(stream)) + # ProfitAndLoss report stream doesn't have any primary key + if not self.is_report_stream(stream): + primary_key = self.expected_primary_keys().get(stream).pop() + + # Verify we did not duplicate any records across pages + records_pks_set = {message.get('data').get(primary_key) for message in sync_messages} + records_pks_list = [message.get('data').get(primary_key) for message in sync_messages] + self.assertCountEqual(records_pks_set, records_pks_list, + msg="We have duplicate records for {}".format(stream)) diff --git a/tests/test_quickbooks_start_date.py b/tests/test_quickbooks_start_date.py index fe2e5b2..8ac94f4 100644 --- a/tests/test_quickbooks_start_date.py +++ b/tests/test_quickbooks_start_date.py @@ -89,7 +89,9 @@ def test_run(self): # start dates start_date_1 = self.get_properties()['start_date'] + start_date_1_epoch = self.dt_to_ts(start_date_1) start_date_2 = self.get_properties(original=False)['start_date'] + start_date_2_epoch = self.dt_to_ts(start_date_2) # Verify by stream that our first sync meets or exceeds the default record count self.assertLessEqual(expected_first_sync_count, first_sync_count) @@ -99,10 +101,16 @@ def test_run(self): # Verify by stream that all records have a rep key that is equal to or greater than that sync's start_date for message in first_sync_messages: - rep_key_value = message.get('data').get('MetaData').get('LastUpdatedTime') - self.assertGreaterEqual(rep_key_value, start_date_1, + if self.is_report_stream(stream): + rep_key_value = message.get('data').get('ReportDate') + else : + rep_key_value = message.get('data').get('MetaData').get('LastUpdatedTime') + self.assertGreaterEqual(self.dt_to_ts(rep_key_value), start_date_1_epoch, msg="A record was replicated with a replication key value prior to the start date") for message in second_sync_messages: - rep_key_value = message.get('data').get('MetaData').get('LastUpdatedTime') - self.assertGreaterEqual(rep_key_value, start_date_2, + if self.is_report_stream(stream): + rep_key_value = message.get('data').get('ReportDate') + else : + rep_key_value = message.get('data').get('MetaData').get('LastUpdatedTime') + self.assertGreaterEqual(self.dt_to_ts(rep_key_value), start_date_2_epoch, msg="A record was replicated with a replication key value prior to the start date") diff --git a/tests/unittests/test_reports_parser.py b/tests/unittests/test_reports_parser.py new file mode 100644 index 0000000..f5be82c --- /dev/null +++ b/tests/unittests/test_reports_parser.py @@ -0,0 +1,225 @@ +import unittest +import tap_quickbooks.streams as streams + + +class TestReportsParser(unittest.TestCase): + + def test_day_wise_reports(self): + reports = streams.ReportStream("", "", "") + reports.parsed_metadata = { + "dates": ["2021-07-01", "2021-07-02", "2021-07-03"], + "data": [ + { + "name": "Total Income", + "values": ["4.00", "5.00", "6.00", "15.00"] + }, { + "name": "Gross Profit", + "values": ["3.00", "4.00", "5.00", "12.00"] + } + ] + } + + expected_records = [ + { + "ReportDate": "2021-07-01", + "AccountingMethod": "Accrual", + "Details": {"Total Income": "4.00", "Gross Profit": "3.00"} + }, + { + "ReportDate": "2021-07-02", + "AccountingMethod": "Accrual", + "Details": {"Total Income": "5.00", "Gross Profit": "4.00"} + }, + { + "ReportDate": "2021-07-03", + "AccountingMethod": "Accrual", + "Details": {"Total Income": "6.00", "Gross Profit": "5.00"} + } + ] + + records = list(reports.day_wise_reports()) + self.assertListEqual(expected_records, records) + + def test_report_parser(self): + reports = streams.ReportStream("", "", "") + + response = { + "Header": { + "Time": "2021-07-22T05:51:37-07:00", + "ReportName": "ProfitAndLoss", + "ReportBasis": "Accrual", + "StartPeriod": "2021-07-20", + "EndPeriod": "2021-07-21", + "SummarizeColumnsBy": "Days", + "Currency": "USD", + "Option": [{ + "Name": "AccountingStandard", + "Value": "GAAP" + }, { + "Name": "NoReportData", + "Value": "true" + }] + }, + "Columns": { + "Column": [{ + "ColTitle": "", + "ColType": "Account", + "MetaData": [{ + "Name": "ColKey", + "Value": "account" + }] + }, { + "ColTitle": "Jul 20, 2021", + "ColType": "Money", + "MetaData": [{ + "Name": "StartDate", + "Value": "2021-07-20" + }, { + "Name": "EndDate", + "Value": "2021-07-20" + }, { + "Name": "ColKey", + "Value": "Jul 20, 2021" + }] + }, { + "ColTitle": "Jul 21, 2021", + "ColType": "Money", + "MetaData": [{ + "Name": "StartDate", + "Value": "2021-07-21" + }, { + "Name": "EndDate", + "Value": "2021-07-21" + }, { + "Name": "ColKey", + "Value": "Jul 21, 2021" + }] + }, { + "ColTitle": "Total", + "ColType": "Money", + "MetaData": [{ + "Name": "ColKey", + "Value": "total" + }] + }] + }, + "Rows": { + "Row": [{ + "Header": { + "ColData": [{ + "value": "Income" + }, { + "value": "" + }, { + "value": "" + }, { + "value": "" + }] + }, + "Summary": { + "ColData": [{ + "value": "Total Income" + }, { + "value": "" + }, { + "value": "" + }, { + "value": "0.00" + }] + }, + "type": "Section", + "group": "Income" + }, { + "Summary": { + "ColData": [{ + "value": "Gross Profit" + }, { + "value": "0.00" + }, { + "value": "0.00" + }, { + "value": "0.00" + }] + }, + "type": "Section", + "group": "GrossProfit" + }, { + "Header": { + "ColData": [{ + "value": "Expenses" + }, { + "value": "" + }, { + "value": "" + }, { + "value": "" + }] + }, + "Summary": { + "ColData": [{ + "value": "Total Expenses" + }, { + "value": "" + }, { + "value": "" + }, { + "value": "0.00" + }] + }, + "type": "Section", + "group": "Expenses" + }, { + "Summary": { + "ColData": [{ + "value": "Net Operating Income" + }, { + "value": "0.00" + }, { + "value": "0.00" + }, { + "value": "0.00" + }] + }, + "type": "Section", + "group": "NetOperatingIncome" + }, { + "Summary": { + "ColData": [{ + "value": "Net Income" + }, { + "value": "0.00" + }, { + "value": "0.00" + }, { + "value": "0.00" + }] + }, + "type": "Section", + "group": "NetIncome" + }] + } + } + + expected_data = { + "dates": ["2021-07-20", "2021-07-21"], + "data": [{ + "name": "Total Income", + "values": ["", "", "0.00"] + }, { + "name": "Gross Profit", + "values": ["0.00", "0.00", "0.00"] + }, { + "name": "Total Expenses", + "values": ["", "", "0.00"] + }, { + "name": "Net Operating Income", + "values": ["0.00", "0.00", "0.00"] + }, { + "name": "Net Income", + "values": ["0.00", "0.00", "0.00"] + }] + } + + reports.parse_report_metadata(response) + self.assertEqual(reports.parsed_metadata, expected_data) + From 4f4af3269834da2c2c39a0fe297cd336f73839a5 Mon Sep 17 00:00:00 2001 From: savan-chovatiya Date: Mon, 26 Jul 2021 11:56:23 +0530 Subject: [PATCH 02/12] TDL-13967: Handled state updation if no records found --- tap_quickbooks/streams.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/tap_quickbooks/streams.py b/tap_quickbooks/streams.py index 9f8210b..d750d91 100644 --- a/tap_quickbooks/streams.py +++ b/tap_quickbooks/streams.py @@ -237,11 +237,12 @@ def sync(self): resp = self.client.get(self.endpoint, params=params) self.parse_report_metadata(resp) - for report in self.day_wise_reports(): - yield report - - self.state = singer.write_bookmark(self.state, self.stream_name, 'LastUpdatedTime', strptime_to_utc(report.get('ReportDate')).isoformat()) - singer.write_state(self.state) + reports = self.day_wise_reports() + if reports: + for report in reports: + yield report + self.state = singer.write_bookmark(self.state, self.stream_name, 'LastUpdatedTime', strptime_to_utc(report.get('ReportDate')).isoformat()) + singer.write_state(self.state) start_dttm = end_dttm end_dttm = start_dttm + timedelta(days=DATE_WINDOW_SIZE) From f25d3c058cb9e675b72a8fb3a6b71cffeb2e54bf Mon Sep 17 00:00:00 2001 From: savan-chovatiya Date: Wed, 4 Aug 2021 16:10:17 +0530 Subject: [PATCH 03/12] Added comments in unittests --- tap_quickbooks/streams.py | 4 ++-- tests/unittests/test_reports_parser.py | 6 ++++++ 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/tap_quickbooks/streams.py b/tap_quickbooks/streams.py index d750d91..de332ec 100644 --- a/tap_quickbooks/streams.py +++ b/tap_quickbooks/streams.py @@ -311,14 +311,14 @@ def day_wise_reports(self): ''' Return record for every day formed using output of parse_report_metadata ''' - for i, date in enumerate(self.parsed_metadata['dates']): + for index, date in enumerate(self.parsed_metadata['dates']): report = dict() report['ReportDate'] = date report['AccountingMethod'] = 'Accrual' report['Details'] = {} for data in self.parsed_metadata['data']: - report['Details'][data['name']] = data['values'][i] + report['Details'][data['name']] = data['values'][index] yield report diff --git a/tests/unittests/test_reports_parser.py b/tests/unittests/test_reports_parser.py index f5be82c..3f3354f 100644 --- a/tests/unittests/test_reports_parser.py +++ b/tests/unittests/test_reports_parser.py @@ -5,6 +5,9 @@ class TestReportsParser(unittest.TestCase): def test_day_wise_reports(self): + """ + Test that day wise reports are generated from formatted parsed_metadata + """ reports = streams.ReportStream("", "", "") reports.parsed_metadata = { "dates": ["2021-07-01", "2021-07-02", "2021-07-03"], @@ -41,6 +44,9 @@ def test_day_wise_reports(self): self.assertListEqual(expected_records, records) def test_report_parser(self): + """ + Test that metadata returned from report API is formatted in desired format + """ reports = streams.ReportStream("", "", "") response = { From 2237b71a7bcf4b13b3805dc7dbfdcd20c7172feb Mon Sep 17 00:00:00 2001 From: savan-chovatiya Date: Tue, 17 Aug 2021 19:53:29 +0530 Subject: [PATCH 04/12] Updated sync function for report streams --- tap_quickbooks/streams.py | 27 +++++++++++++++++++-------- tests/base.py | 1 + tests/test_quickbooks_pagination.py | 16 +++++++--------- 3 files changed, 27 insertions(+), 17 deletions(-) diff --git a/tap_quickbooks/streams.py b/tap_quickbooks/streams.py index de332ec..8ee90c4 100644 --- a/tap_quickbooks/streams.py +++ b/tap_quickbooks/streams.py @@ -5,7 +5,7 @@ from singer.utils import strptime_to_utc, strftime from datetime import timedelta -DATE_WINDOW_SIZE = 30 +DATE_WINDOW_SIZE = 29 class Stream: endpoint = '/v3/company/{realm_id}/query' @@ -203,24 +203,35 @@ class ReportStream(Stream): 'dates': [], 'data': [] } - key_properties = [] + key_properties = ['ReportDate'] replication_method = 'INCREMENTAL' # replication keys is ReportDate, manually created from data replication_keys = ['ReportDate'] def sync(self): - start_dttm_str = singer.get_bookmark(self.state, self.stream_name, 'LastUpdatedTime', self.config.get('start_date')) - start_dttm = strptime_to_utc(start_dttm_str) + is_start_date_used = False + params = { + 'summarize_column_by': 'Days' + } + + start_dttm_str = singer.get_bookmark(self.state, self.stream_name, 'LastUpdatedTime') + if start_dttm_str is None: + start_dttm_str = self.config.get('start_date') + is_start_date_used = True + start_dttm = strptime_to_utc(start_dttm_str) end_dttm = start_dttm + timedelta(days=DATE_WINDOW_SIZE) now_dttm = utils.now() - + + # Fetch records for minimum 30 days + # if bookmark from state file is used and it's less than 30 days away + # Fetch records for start_date to current date + # if start_date is used and it'sless than 30 days away if end_dttm > now_dttm: end_dttm = now_dttm - params = { - 'summarize_column_by': 'Days' - } + if not is_start_date_used: + start_dttm = end_dttm - timedelta(days=DATE_WINDOW_SIZE) while start_dttm < now_dttm: self.parsed_metadata = { diff --git a/tests/base.py b/tests/base.py index 3de2dbb..e743c79 100644 --- a/tests/base.py +++ b/tests/base.py @@ -106,6 +106,7 @@ def expected_metadata(self): for stream in self.expected_check_streams(): if self.is_report_stream(stream): mdata[stream] = { + self.PRIMARY_KEYS: {'ReportDate'}, self.REPLICATION_METHOD: self.INCREMENTAL, self.REPLICATION_KEYS: {'ReportDate'}, } diff --git a/tests/test_quickbooks_pagination.py b/tests/test_quickbooks_pagination.py index 8bb4bca..1715869 100644 --- a/tests/test_quickbooks_pagination.py +++ b/tests/test_quickbooks_pagination.py @@ -67,6 +67,8 @@ def test_run(self): sync_messages = sync_records.get(stream, {'messages': []}).get('messages') + primary_key = self.expected_primary_keys().get(stream).pop() + # Verify the sync meets or exceeds the default record count self.assertLessEqual(expected_count, record_count) @@ -79,12 +81,8 @@ def test_run(self): self.assertGreater(record_count, pagination_threshold, msg="Record count not large enough to gaurantee pagination.") - # ProfitAndLoss report stream doesn't have any primary key - if not self.is_report_stream(stream): - primary_key = self.expected_primary_keys().get(stream).pop() - - # Verify we did not duplicate any records across pages - records_pks_set = {message.get('data').get(primary_key) for message in sync_messages} - records_pks_list = [message.get('data').get(primary_key) for message in sync_messages] - self.assertCountEqual(records_pks_set, records_pks_list, - msg="We have duplicate records for {}".format(stream)) + # Verify we did not duplicate any records across pages + records_pks_set = {message.get('data').get(primary_key) for message in sync_messages} + records_pks_list = [message.get('data').get(primary_key) for message in sync_messages] + self.assertCountEqual(records_pks_set, records_pks_list, + msg="We have duplicate records for {}".format(stream)) From 9ea6c517e6681276c0ed1a6d325d035bd426b7bb Mon Sep 17 00:00:00 2001 From: savan-chovatiya Date: Wed, 18 Aug 2021 12:32:59 +0530 Subject: [PATCH 05/12] Fixed integration tests --- tap_quickbooks/streams.py | 2 +- tests/test_quickbooks_bookmarks.py | 19 ++++++++++++------- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/tap_quickbooks/streams.py b/tap_quickbooks/streams.py index 8ee90c4..7e39d42 100644 --- a/tap_quickbooks/streams.py +++ b/tap_quickbooks/streams.py @@ -255,7 +255,7 @@ def sync(self): self.state = singer.write_bookmark(self.state, self.stream_name, 'LastUpdatedTime', strptime_to_utc(report.get('ReportDate')).isoformat()) singer.write_state(self.state) - start_dttm = end_dttm + start_dttm = end_dttm + timedelta(days=1) # one record is emitted for every day so start from next day end_dttm = start_dttm + timedelta(days=DATE_WINDOW_SIZE) if end_dttm > now_dttm: diff --git a/tests/test_quickbooks_bookmarks.py b/tests/test_quickbooks_bookmarks.py index d85419d..69bac36 100644 --- a/tests/test_quickbooks_bookmarks.py +++ b/tests/test_quickbooks_bookmarks.py @@ -128,21 +128,26 @@ def test_run(self): # bookmarked states (actual values) first_bookmark_value = first_bookmark_key_value.get(sub_level_replication_key) second_bookmark_value = second_bookmark_key_value.get(sub_level_replication_key) - # bookmarked values as utc for comparing against records - first_bookmark_value_utc = self.convert_state_to_utc(first_bookmark_value) - second_bookmark_value_utc = self.convert_state_to_utc(second_bookmark_value) + # bookmarked values as epoch of utc for comparing against records + first_bookmark_value_utc = self.dt_to_ts(self.convert_state_to_utc(first_bookmark_value)) + second_bookmark_value_utc = self.dt_to_ts(self.convert_state_to_utc(second_bookmark_value)) # Verify the second sync bookmark is Equal to the first sync bookmark self.assertEqual(second_bookmark_value, first_bookmark_value) # assumes no changes to data during test # Verify the second sync records respect the previous (simulated) bookmark value - simulated_bookmark_value = new_state['bookmarks'][stream][sub_level_replication_key] + simulated_bookmark_value = self.dt_to_ts(new_state['bookmarks'][stream][sub_level_replication_key]) + + # Decrease 30 days from expected epoch time for reports stream as tap sync minimum data for last 30 days in bookmark scenario + if self.is_report_stream(stream): + simulated_bookmark_value -= 2592000 + for message in second_sync_messages: if self.is_report_stream(stream): replication_key_value = message.get('data').get('ReportDate') else: replication_key_value = message.get('data').get(top_level_replication_key).get(sub_level_replication_key) - self.assertGreaterEqual(self.dt_to_ts(replication_key_value), self.dt_to_ts(simulated_bookmark_value), + self.assertGreaterEqual(self.dt_to_ts(replication_key_value), simulated_bookmark_value, msg="Second sync records do not repect the previous bookmark.") # Verify the first sync bookmark value is the max replication key value for a given stream @@ -151,7 +156,7 @@ def test_run(self): replication_key_value = message.get('data').get('ReportDate') else: replication_key_value = message.get('data').get(top_level_replication_key).get(sub_level_replication_key) - self.assertLessEqual(self.dt_to_ts(replication_key_value), self.dt_to_ts(first_bookmark_value_utc), + self.assertLessEqual(self.dt_to_ts(replication_key_value), first_bookmark_value_utc, msg="First sync bookmark was set incorrectly, a record with a greater rep key value was synced") # Verify the second sync bookmark value is the max replication key value for a given stream @@ -160,7 +165,7 @@ def test_run(self): replication_key_value = message.get('data').get('ReportDate') else: replication_key_value = message.get('data').get(top_level_replication_key).get(sub_level_replication_key) - self.assertLessEqual(self.dt_to_ts(replication_key_value), self.dt_to_ts(second_bookmark_value_utc), + self.assertLessEqual(self.dt_to_ts(replication_key_value), second_bookmark_value_utc, msg="Second sync bookmark was set incorrectly, a record with a greater rep key value was synced") # Verify the number of records in the 2nd sync is less then the first From 4ac953036600ff072ff88d76eac3eb9bb4b297de Mon Sep 17 00:00:00 2001 From: savan-chovatiya Date: Wed, 18 Aug 2021 17:32:35 +0530 Subject: [PATCH 06/12] Added stream into readme --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 7343437..301526c 100644 --- a/README.md +++ b/README.md @@ -37,6 +37,7 @@ This tap: * Transfers * VendorCredits * Vendors + * ProfitAndLossReport - Includes a schema for each resource reflecting most recent tested data retrieved using the api. See [the schema folder](https://github.com/singer-io/tap-quickbooks/tree/master/tap_quickbooks/schemas) for details. - Incrementally pulls data based on the input state From 18365fe88849b7b72ecc789bee94d3047906786c Mon Sep 17 00:00:00 2001 From: savan-chovatiya Date: Fri, 27 Aug 2021 14:33:26 +0530 Subject: [PATCH 07/12] Break down report parser for rows and columns --- tap_quickbooks/streams.py | 40 +++++++++++++++----------- tests/unittests/test_reports_parser.py | 14 +++++++-- 2 files changed, 34 insertions(+), 20 deletions(-) diff --git a/tap_quickbooks/streams.py b/tap_quickbooks/streams.py index 7e39d42..9635104 100644 --- a/tap_quickbooks/streams.py +++ b/tap_quickbooks/streams.py @@ -246,7 +246,8 @@ def sync(self): params["end_date"] = end_tm_str resp = self.client.get(self.endpoint, params=params) - self.parse_report_metadata(resp) + self.parse_report_columns(resp.get('Columns', {})) + self.parse_report_rows(resp.get('Rows', {})) reports = self.day_wise_reports() if reports: @@ -263,7 +264,22 @@ def sync(self): singer.write_state(self.state) - def parse_report_metadata(self, pileOfRows): + def parse_report_columns(self, pileOfColumns): + ''' + Restructure columns data in list of dates and update self.parsed_metadata dictionary. + { + "dates": ["2021-07-01", "2021-07-02", "2021-07-03"], + "data": [] + } + ''' + columns = pileOfColumns.get('Column', []) + for column in columns: + metadatas = column.get('MetaData', []) + for md in metadatas: + if md['Name'] in ['StartDate']: + self.parsed_metadata['dates'].append(md['Value']) + + def parse_report_rows(self, pileOfRows): ''' Restructure data from report response on daily basis and update self.parsed_metadata dictionary { @@ -286,18 +302,18 @@ def parse_report_metadata(self, pileOfRows): if isinstance(pileOfRows, list): for x in pileOfRows: - self.parse_report_metadata(x) + self.parse_report_rows(x) else: if 'Rows' in pileOfRows.keys(): - self.parse_report_metadata(pileOfRows['Rows']) + self.parse_report_rows(pileOfRows['Rows']) if 'Row' in pileOfRows.keys(): - self.parse_report_metadata(pileOfRows['Row']) + self.parse_report_rows(pileOfRows['Row']) if 'Summary' in pileOfRows.keys(): - self.parse_report_metadata(pileOfRows['Summary']) + self.parse_report_rows(pileOfRows['Summary']) if 'ColData' in pileOfRows.keys(): d = dict() @@ -308,19 +324,9 @@ def parse_report_metadata(self, pileOfRows): d['values'] = vals self.parsed_metadata['data'].append(d) - if 'Columns' in pileOfRows.keys(): - self.parse_report_metadata(pileOfRows['Columns']) - - if 'Column' in pileOfRows.keys(): - for x in pileOfRows['Column']: - if 'MetaData' in x.keys(): - for md in x['MetaData']: - if md['Name'] in ['StartDate']: - self.parsed_metadata['dates'].append(md['Value']) - def day_wise_reports(self): ''' - Return record for every day formed using output of parse_report_metadata + Return record for every day formed using output of parse_report_columns and parse_report_rows ''' for index, date in enumerate(self.parsed_metadata['dates']): report = dict() diff --git a/tests/unittests/test_reports_parser.py b/tests/unittests/test_reports_parser.py index 3f3354f..6020b58 100644 --- a/tests/unittests/test_reports_parser.py +++ b/tests/unittests/test_reports_parser.py @@ -206,7 +206,12 @@ def test_report_parser(self): } } - expected_data = { + expected_data_after_parse_columns = { + "dates": ["2021-07-20", "2021-07-21"], + "data": [] + } + + expected_data_after_parse_rows = { "dates": ["2021-07-20", "2021-07-21"], "data": [{ "name": "Total Income", @@ -226,6 +231,9 @@ def test_report_parser(self): }] } - reports.parse_report_metadata(response) - self.assertEqual(reports.parsed_metadata, expected_data) + reports.parse_report_columns(response['Columns']) + self.assertEqual(reports.parsed_metadata, expected_data_after_parse_columns) + + reports.parse_report_rows(response["Rows"]) + self.assertEqual(reports.parsed_metadata, expected_data_after_parse_rows) From fd07ad1595ab77349b80395c2cbb1fe82cbcedaa Mon Sep 17 00:00:00 2001 From: savan-chovatiya Date: Tue, 31 Aug 2021 12:15:55 +0530 Subject: [PATCH 08/12] Resolved circleci failure --- tests/base.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/tests/base.py b/tests/base.py index 7698778..8ec2f3b 100644 --- a/tests/base.py +++ b/tests/base.py @@ -96,11 +96,8 @@ def expected_check_streams(): "transfers", "vendor_credits", "vendors", -<<<<<<< HEAD - "profit_loss_report" -======= + "profit_loss_report", "deleted_objects" ->>>>>>> 7fa0b1c4e139280c731f6ec095a5cdbd5eca8d64 } def expected_metadata(self): From bf7d920df1b0c104a64c06fd98539583acd7e0a0 Mon Sep 17 00:00:00 2001 From: savan-chovatiya Date: Wed, 22 Sep 2021 16:29:32 +0530 Subject: [PATCH 09/12] Updated date retrival from datetime object --- tap_quickbooks/streams.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tap_quickbooks/streams.py b/tap_quickbooks/streams.py index e61bf60..7c31d76 100644 --- a/tap_quickbooks/streams.py +++ b/tap_quickbooks/streams.py @@ -239,8 +239,8 @@ def sync(self): 'data': [] } - start_tm_str = strftime(start_dttm)[0:10] - end_tm_str = strftime(end_dttm)[0:10] + start_tm_str = str(start_dttm.date()) + end_tm_str = str(end_dttm.date()) params["start_date"] = start_tm_str params["end_date"] = end_tm_str From 2cf255d48f40bbbce99740a605a6097a9162445e Mon Sep 17 00:00:00 2001 From: savan-chovatiya Date: Wed, 22 Sep 2021 16:35:01 +0530 Subject: [PATCH 10/12] Break down dev and test ddependancy in setup --- .circleci/config.yml | 2 +- setup.py | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 0cafce2..32f1940 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -11,7 +11,7 @@ jobs: python3 -mvenv /usr/local/share/virtualenvs/tap-quickbooks source /usr/local/share/virtualenvs/tap-quickbooks/bin/activate pip install -U pip setuptools - pip install .[dev] + pip install .[test] - run: name: 'JSON Validator' command: | diff --git a/setup.py b/setup.py index 273b18f..c09f5c9 100644 --- a/setup.py +++ b/setup.py @@ -15,10 +15,12 @@ 'requests_oauthlib==1.3.0', ], extras_require={ - 'dev': [ - 'ipdb==0.11', + 'test': [ 'pylint==2.5.3', 'nose' + ], + 'dev': [ + 'ipdb' ] }, entry_points=''' From 604c67d7a7280777445798853147198e1ac96fe1 Mon Sep 17 00:00:00 2001 From: savan-chovatiya Date: Tue, 5 Oct 2021 12:07:39 +0530 Subject: [PATCH 11/12] TDL-13967: Resolved review comments --- tap_quickbooks/streams.py | 33 +++++++++++++++++++-------------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/tap_quickbooks/streams.py b/tap_quickbooks/streams.py index 7c31d76..68702c5 100644 --- a/tap_quickbooks/streams.py +++ b/tap_quickbooks/streams.py @@ -215,11 +215,13 @@ def sync(self): 'summarize_column_by': 'Days' } + # Get bookmark for the stream start_dttm_str = singer.get_bookmark(self.state, self.stream_name, 'LastUpdatedTime') if start_dttm_str is None: start_dttm_str = self.config.get('start_date') is_start_date_used = True + # Set start_date and end_date for first date window of API calls start_dttm = strptime_to_utc(start_dttm_str) end_dttm = start_dttm + timedelta(days=DATE_WINDOW_SIZE) now_dttm = utils.now() @@ -246,16 +248,17 @@ def sync(self): params["end_date"] = end_tm_str resp = self.client.get(self.endpoint, params=params) - self.parse_report_columns(resp.get('Columns', {})) - self.parse_report_rows(resp.get('Rows', {})) + self.parse_report_columns(resp.get('Columns', {})) # parse report columns from response's metadata + self.parse_report_rows(resp.get('Rows', {})) # parse report rows from response's metadata - reports = self.day_wise_reports() + reports = self.day_wise_reports() # get reports for every days from parsed metadata if reports: for report in reports: yield report self.state = singer.write_bookmark(self.state, self.stream_name, 'LastUpdatedTime', strptime_to_utc(report.get('ReportDate')).isoformat()) singer.write_state(self.state) + # Set start_date and end_date of date window for next API call start_dttm = end_dttm + timedelta(days=1) # one record is emitted for every day so start from next day end_dttm = start_dttm + timedelta(days=DATE_WINDOW_SIZE) @@ -271,13 +274,14 @@ def parse_report_columns(self, pileOfColumns): "dates": ["2021-07-01", "2021-07-02", "2021-07-03"], "data": [] } + Reference for report metadata: https://developer.intuit.com/app/developer/qbo/docs/api/accounting/report-entities/profitandloss ''' columns = pileOfColumns.get('Column', []) for column in columns: metadatas = column.get('MetaData', []) - for md in metadatas: - if md['Name'] in ['StartDate']: - self.parsed_metadata['dates'].append(md['Value']) + for metadata in metadatas: + if metadata['Name'] in ['StartDate']: + self.parsed_metadata['dates'].append(metadata['Value']) def parse_report_rows(self, pileOfRows): ''' @@ -298,11 +302,12 @@ def parse_report_rows(self, pileOfRows): "values": ["3.00", "3.00", "3.00", "9.00"] }] } + Reference for report metadata: https://developer.intuit.com/app/developer/qbo/docs/api/accounting/report-entities/profitandloss ''' if isinstance(pileOfRows, list): - for x in pileOfRows: - self.parse_report_rows(x) + for row in pileOfRows: + self.parse_report_rows(row) else: @@ -316,13 +321,13 @@ def parse_report_rows(self, pileOfRows): self.parse_report_rows(pileOfRows['Summary']) if 'ColData' in pileOfRows.keys(): - d = dict() - d['name'] = pileOfRows['ColData'][0]['value'] + entry_data = dict() + entry_data['name'] = pileOfRows['ColData'][0]['value'] vals = [] - for x in pileOfRows['ColData'][1:]: - vals.append(x['value']) - d['values'] = vals - self.parsed_metadata['data'].append(d) + for column_value in pileOfRows['ColData'][1:]: + vals.append(column_value['value']) + entry_data['values'] = vals + self.parsed_metadata['data'].append(entry_data) def day_wise_reports(self): ''' From b6e8e4ae2228e71d8aae9de4521ed25c4179b3b1 Mon Sep 17 00:00:00 2001 From: savan-chovatiya Date: Tue, 5 Oct 2021 12:14:30 +0530 Subject: [PATCH 12/12] TDL-13967: Added code comments --- tap_quickbooks/streams.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tap_quickbooks/streams.py b/tap_quickbooks/streams.py index 68702c5..56bc1c0 100644 --- a/tap_quickbooks/streams.py +++ b/tap_quickbooks/streams.py @@ -221,7 +221,7 @@ def sync(self): start_dttm_str = self.config.get('start_date') is_start_date_used = True - # Set start_date and end_date for first date window of API calls + # Set start_date and end_date for first date window(30 days) of API calls start_dttm = strptime_to_utc(start_dttm_str) end_dttm = start_dttm + timedelta(days=DATE_WINDOW_SIZE) now_dttm = utils.now() @@ -235,6 +235,7 @@ def sync(self): if not is_start_date_used: start_dttm = end_dttm - timedelta(days=DATE_WINDOW_SIZE) + # Make a API call in 30 days date window until reach current_time while start_dttm < now_dttm: self.parsed_metadata = { 'dates': [], @@ -244,6 +245,7 @@ def sync(self): start_tm_str = str(start_dttm.date()) end_tm_str = str(end_dttm.date()) + # Set date window params["start_date"] = start_tm_str params["end_date"] = end_tm_str