Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TDL-13967: Add ProfitAndLoss report stream #37

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ This tap:
* Transfers
* VendorCredits
* Vendors
* ProfitAndLossReport

- Includes a schema for each resource reflecting most recent tested data retrieved using the api. See [the schema folder](https://github.com/singer-io/tap-quickbooks/tree/master/tap_quickbooks/schemas) for details.
- Incrementally pulls data based on the input state
Expand Down
28 changes: 28 additions & 0 deletions tap_quickbooks/schemas/profit_loss_report.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
{
"type": [
"null",
"object"
],
"properties": {
"ReportDate": {
"type": [
"null",
"string"
],
"format": "date-time"
},
"AccountingMethod": {
"type": [
"null",
"string"
]
},
"Details": {
"type": [
"null",
"object"
],
"properties": {}
}
}
}
147 changes: 146 additions & 1 deletion tap_quickbooks/streams.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import tap_quickbooks.query_builder as query_builder

import singer
from singer import utils
from singer.utils import strptime_to_utc, strftime
from datetime import timedelta

DATE_WINDOW_SIZE = 29

class Stream:
endpoint = '/v3/company/{realm_id}/query'
Expand Down Expand Up @@ -193,6 +198,145 @@ class Vendors(Stream):
table_name = 'Vendor'
additional_where = "Active IN (true, false)"

class ReportStream(Stream):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add more comments to the code to explain at each section what is being done

Copy link
Contributor Author

@savan-chovatiya savan-chovatiya Oct 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added more comments to the code for better code explanation.

parsed_metadata = {
'dates': [],
'data': []
}
key_properties = ['ReportDate']
replication_method = 'INCREMENTAL'
# replication keys is ReportDate, manually created from data
replication_keys = ['ReportDate']

def sync(self):

is_start_date_used = False
params = {
'summarize_column_by': 'Days'
}

start_dttm_str = singer.get_bookmark(self.state, self.stream_name, 'LastUpdatedTime')
if start_dttm_str is None:
start_dttm_str = self.config.get('start_date')
is_start_date_used = True

start_dttm = strptime_to_utc(start_dttm_str)
end_dttm = start_dttm + timedelta(days=DATE_WINDOW_SIZE)
now_dttm = utils.now()

# Fetch records for minimum 30 days
# if bookmark from state file is used and it's less than 30 days away
# Fetch records for start_date to current date
# if start_date is used and it'sless than 30 days away
if end_dttm > now_dttm:
end_dttm = now_dttm
if not is_start_date_used:
start_dttm = end_dttm - timedelta(days=DATE_WINDOW_SIZE)

while start_dttm < now_dttm:
self.parsed_metadata = {
'dates': [],
'data': []
}

start_tm_str = strftime(start_dttm)[0:10]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use str(start_dttm.date()) to get date

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated date retrieval as per suggestion.

end_tm_str = strftime(end_dttm)[0:10]

params["start_date"] = start_tm_str
params["end_date"] = end_tm_str

resp = self.client.get(self.endpoint, params=params)
self.parse_report_metadata(resp)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think having parse_report_metadata() be recursive is excessive. The response object here is just three keys, Header (which we ignore), Rows, and Columns.

Parsing resp['Columns'] seems like a loop over resp['Columns']['Column'] and filtering the 'Metadata'.

Parsing resp['Rows'] is the only recursive part, so we can repurpose most of the logic in parse_report_metadata() into parse_rows() or something

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Split down parse_report_metadata() into two functions parse_report_columns() and parse_report_rows().

parse_report_columns() will parse resp['Columns'] with a loop over resp['Columns']['Column'] and filters Metadata.

parse_report_rows() is the recursive one which will parse resp['Rows'].


reports = self.day_wise_reports()
if reports:
for report in reports:
yield report
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the difference between yield report and yield from report? I've seen in many places using yield from report

Copy link
Contributor Author

@savan-chovatiya savan-chovatiya Oct 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • yield from reports and yield inside for loops, both return generators with every element of reports.

  • The python generator will flush out after iteration over it and here, reports is a generator, and tap use last report from reports for storing bookmark.

  • If we use yield from reports then no way to retrieve the last report after it so we used yield with for loop here to utilize the loop's local variable report to retrieve the last report.

self.state = singer.write_bookmark(self.state, self.stream_name, 'LastUpdatedTime', strptime_to_utc(report.get('ReportDate')).isoformat())

self.state = singer.write_bookmark(self.state, self.stream_name, 'LastUpdatedTime', strptime_to_utc(report.get('ReportDate')).isoformat())
singer.write_state(self.state)

start_dttm = end_dttm + timedelta(days=1) # one record is emitted for every day so start from next day
end_dttm = start_dttm + timedelta(days=DATE_WINDOW_SIZE)

if end_dttm > now_dttm:
end_dttm = now_dttm

singer.write_state(self.state)

def parse_report_metadata(self, pileOfRows):
'''
Restructure data from report response on daily basis and update self.parsed_metadata dictionary
{
"dates": ["2021-07-01", "2021-07-02", "2021-07-03"],
"data": [ {
"name": "Total Income",
"values": ["4.00", "4.00", "4.00", "12.00"]
}, {
"name": "Gross Profit",
"values": ["4.00", "4.00", "4.00", "12.00"]
}, {
"name": "Total Expenses",
"values": ["1.00", "1.00", "1.00", "3.00"]
}, {
"name": "Net Income",
"values": ["3.00", "3.00", "3.00", "9.00"]
}]
}
'''

if isinstance(pileOfRows, list):
for x in pileOfRows:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stop using variable names like x. Please provide more descriptive

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated x with descriptive name row

self.parse_report_metadata(x)

else:

if 'Rows' in pileOfRows.keys():
self.parse_report_metadata(pileOfRows['Rows'])

if 'Row' in pileOfRows.keys():
self.parse_report_metadata(pileOfRows['Row'])

if 'Summary' in pileOfRows.keys():
self.parse_report_metadata(pileOfRows['Summary'])

if 'ColData' in pileOfRows.keys():
d = dict()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stop using variable names like d. Provide more descriptive variable name. Bad coding practice

Copy link
Contributor Author

@savan-chovatiya savan-chovatiya Oct 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will keep in mind using descriptive variable names and updated above d with a proper name.

d['name'] = pileOfRows['ColData'][0]['value']
vals = []
for x in pileOfRows['ColData'][1:]:
vals.append(x['value'])
d['values'] = vals
self.parsed_metadata['data'].append(d)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No x, d variable names. Provide descriptive names

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated x,d types of variables with proper descriptive names.


if 'Columns' in pileOfRows.keys():
self.parse_report_metadata(pileOfRows['Columns'])

if 'Column' in pileOfRows.keys():
for x in pileOfRows['Column']:
if 'MetaData' in x.keys():
for md in x['MetaData']:
if md['Name'] in ['StartDate']:
self.parsed_metadata['dates'].append(md['Value'])

def day_wise_reports(self):
'''
Return record for every day formed using output of parse_report_metadata
'''
for index, date in enumerate(self.parsed_metadata['dates']):
report = dict()
report['ReportDate'] = date
report['AccountingMethod'] = 'Accrual'
report['Details'] = {}

for data in self.parsed_metadata['data']:
report['Details'][data['name']] = data['values'][index]

yield report


class ProfitAndLossReport(ReportStream):
stream_name = 'profit_loss_report'
endpoint = '/v3/company/{realm_id}/reports/ProfitAndLoss'

STREAM_OBJECTS = {
"accounts": Accounts,
Expand Down Expand Up @@ -222,5 +366,6 @@ class Vendors(Stream):
"time_activities": TimeActivities,
"transfers": Transfers,
"vendor_credits": VendorCredits,
"vendors": Vendors
"vendors": Vendors,
"profit_loss_report": ProfitAndLossReport
}
36 changes: 31 additions & 5 deletions tests/base.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import unittest
import time
from datetime import datetime as dt
from datetime import timedelta

Expand All @@ -23,6 +24,12 @@ class TestQuickbooksBase(unittest.TestCase):
INCREMENTAL = "INCREMENTAL"
FULL = "FULL_TABLE"
START_DATE_FORMAT = "%Y-%m-%dT00:00:00Z" # %H:%M:%SZ
DATETIME_FMT = {
"%Y-%m-%dT%H:%M:%SZ",
"%Y-%m-%d %H:%M:%S",
"%Y-%m-%dT%H:%M:%S.000000Z",
"%Y-%m-%dT%H:%M:%S%z"
}

def setUp(self):
missing_envs = [x for x in [
Expand Down Expand Up @@ -89,18 +96,26 @@ def expected_check_streams():
"transfers",
"vendor_credits",
"vendors",
"profit_loss_report"
}

def expected_metadata(self):
"""The expected streams and metadata about the streams"""

mdata = {}
for stream in self.expected_check_streams():
mdata[stream] = {
self.PRIMARY_KEYS: {'Id'},
self.REPLICATION_METHOD: self.INCREMENTAL,
self.REPLICATION_KEYS: {'MetaData'},
}
if self.is_report_stream(stream):
mdata[stream] = {
self.PRIMARY_KEYS: {'ReportDate'},
self.REPLICATION_METHOD: self.INCREMENTAL,
self.REPLICATION_KEYS: {'ReportDate'},
}
else:
mdata[stream] = {
self.PRIMARY_KEYS: {'Id'},
self.REPLICATION_METHOD: self.INCREMENTAL,
self.REPLICATION_KEYS: {'MetaData'},
}

return mdata

Expand Down Expand Up @@ -222,3 +237,14 @@ def minimum_record_count_by_stream(self):
record_counts["vendors"] = 26

return record_counts

def dt_to_ts(self, dtime):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Raising an exception after this for loop would prevent the test from getting back a None and failing in a confusing way in the case where the datetime format is unaccounted for.

for date_format in self.DATETIME_FMT:
try:
date_stripped = int(time.mktime(dt.strptime(dtime, date_format).timetuple()))
return date_stripped
except ValueError:
continue

def is_report_stream(self, stream):
return stream in ["profit_loss_report"]
34 changes: 24 additions & 10 deletions tests/test_quickbooks_bookmarks.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,30 +128,44 @@ def test_run(self):
# bookmarked states (actual values)
first_bookmark_value = first_bookmark_key_value.get(sub_level_replication_key)
second_bookmark_value = second_bookmark_key_value.get(sub_level_replication_key)
# bookmarked values as utc for comparing against records
first_bookmark_value_utc = self.convert_state_to_utc(first_bookmark_value)
second_bookmark_value_utc = self.convert_state_to_utc(second_bookmark_value)
# bookmarked values as epoch of utc for comparing against records
first_bookmark_value_utc = self.dt_to_ts(self.convert_state_to_utc(first_bookmark_value))
second_bookmark_value_utc = self.dt_to_ts(self.convert_state_to_utc(second_bookmark_value))

# Verify the second sync bookmark is Equal to the first sync bookmark
self.assertEqual(second_bookmark_value, first_bookmark_value) # assumes no changes to data during test

# Verify the second sync records respect the previous (simulated) bookmark value
simulated_bookmark_value = new_state['bookmarks'][stream][sub_level_replication_key]
simulated_bookmark_value = self.dt_to_ts(new_state['bookmarks'][stream][sub_level_replication_key])

# Decrease 30 days from expected epoch time for reports stream as tap sync minimum data for last 30 days in bookmark scenario
if self.is_report_stream(stream):
simulated_bookmark_value -= 2592000

for message in second_sync_messages:
replication_key_value = message.get('data').get(top_level_replication_key).get(sub_level_replication_key)
self.assertGreaterEqual(replication_key_value, simulated_bookmark_value,
if self.is_report_stream(stream):
replication_key_value = message.get('data').get('ReportDate')
else:
replication_key_value = message.get('data').get(top_level_replication_key).get(sub_level_replication_key)
self.assertGreaterEqual(self.dt_to_ts(replication_key_value), simulated_bookmark_value,
msg="Second sync records do not repect the previous bookmark.")

# Verify the first sync bookmark value is the max replication key value for a given stream
for message in first_sync_messages:
replication_key_value = message.get('data').get(top_level_replication_key).get(sub_level_replication_key)
self.assertLessEqual(replication_key_value, first_bookmark_value_utc,
if self.is_report_stream(stream):
replication_key_value = message.get('data').get('ReportDate')
else:
replication_key_value = message.get('data').get(top_level_replication_key).get(sub_level_replication_key)
self.assertLessEqual(self.dt_to_ts(replication_key_value), first_bookmark_value_utc,
msg="First sync bookmark was set incorrectly, a record with a greater rep key value was synced")

# Verify the second sync bookmark value is the max replication key value for a given stream
for message in second_sync_messages:
replication_key_value = message.get('data').get(top_level_replication_key).get(sub_level_replication_key)
self.assertLessEqual(replication_key_value, second_bookmark_value_utc,
if self.is_report_stream(stream):
replication_key_value = message.get('data').get('ReportDate')
else:
replication_key_value = message.get('data').get(top_level_replication_key).get(sub_level_replication_key)
self.assertLessEqual(self.dt_to_ts(replication_key_value), second_bookmark_value_utc,
msg="Second sync bookmark was set incorrectly, a record with a greater rep key value was synced")

# Verify the number of records in the 2nd sync is less then the first
Expand Down
6 changes: 5 additions & 1 deletion tests/test_quickbooks_pagination.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,11 @@ def test_run(self):
self.assertLessEqual(expected_count, record_count)

# Verify the number or records exceeds the max_results (api limit)
pagination_threshold = int(self.get_properties().get(page_size_key))
if self.is_report_stream(stream):
#Tap is making API call in 30 days window for reports stream
pagination_threshold = 30
else:
pagination_threshold = int(self.get_properties().get(page_size_key))
self.assertGreater(record_count, pagination_threshold,
msg="Record count not large enough to gaurantee pagination.")

Expand Down
16 changes: 12 additions & 4 deletions tests/test_quickbooks_start_date.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,9 @@ def test_run(self):

# start dates
start_date_1 = self.get_properties()['start_date']
start_date_1_epoch = self.dt_to_ts(start_date_1)
start_date_2 = self.get_properties(original=False)['start_date']
start_date_2_epoch = self.dt_to_ts(start_date_2)

# Verify by stream that our first sync meets or exceeds the default record count
self.assertLessEqual(expected_first_sync_count, first_sync_count)
Expand All @@ -99,10 +101,16 @@ def test_run(self):

# Verify by stream that all records have a rep key that is equal to or greater than that sync's start_date
for message in first_sync_messages:
rep_key_value = message.get('data').get('MetaData').get('LastUpdatedTime')
self.assertGreaterEqual(rep_key_value, start_date_1,
if self.is_report_stream(stream):
rep_key_value = message.get('data').get('ReportDate')
else :
rep_key_value = message.get('data').get('MetaData').get('LastUpdatedTime')
self.assertGreaterEqual(self.dt_to_ts(rep_key_value), start_date_1_epoch,
msg="A record was replicated with a replication key value prior to the start date")
for message in second_sync_messages:
rep_key_value = message.get('data').get('MetaData').get('LastUpdatedTime')
self.assertGreaterEqual(rep_key_value, start_date_2,
if self.is_report_stream(stream):
rep_key_value = message.get('data').get('ReportDate')
else :
rep_key_value = message.get('data').get('MetaData').get('LastUpdatedTime')
self.assertGreaterEqual(self.dt_to_ts(rep_key_value), start_date_2_epoch,
msg="A record was replicated with a replication key value prior to the start date")
Loading