-
Notifications
You must be signed in to change notification settings - Fork 23
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
TDL-13967: Add ProfitAndLoss report stream #37
base: master
Are you sure you want to change the base?
Changes from 6 commits
efb1458
4f4af32
f25d3c0
2237b71
9ea6c51
4ac9530
b7b19cd
18365fe
84f2857
fd07ad1
bf7d920
2cf255d
604c67d
b6e8e4a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
{ | ||
"type": [ | ||
"null", | ||
"object" | ||
], | ||
"properties": { | ||
"ReportDate": { | ||
"type": [ | ||
"null", | ||
"string" | ||
], | ||
"format": "date-time" | ||
}, | ||
"AccountingMethod": { | ||
"type": [ | ||
"null", | ||
"string" | ||
] | ||
}, | ||
"Details": { | ||
"type": [ | ||
"null", | ||
"object" | ||
], | ||
"properties": {} | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
@@ -1,6 +1,11 @@ | ||||
import tap_quickbooks.query_builder as query_builder | ||||
|
||||
import singer | ||||
from singer import utils | ||||
from singer.utils import strptime_to_utc, strftime | ||||
from datetime import timedelta | ||||
|
||||
DATE_WINDOW_SIZE = 29 | ||||
|
||||
class Stream: | ||||
endpoint = '/v3/company/{realm_id}/query' | ||||
|
@@ -193,6 +198,145 @@ class Vendors(Stream): | |||
table_name = 'Vendor' | ||||
additional_where = "Active IN (true, false)" | ||||
|
||||
class ReportStream(Stream): | ||||
parsed_metadata = { | ||||
'dates': [], | ||||
'data': [] | ||||
} | ||||
key_properties = ['ReportDate'] | ||||
replication_method = 'INCREMENTAL' | ||||
# replication keys is ReportDate, manually created from data | ||||
replication_keys = ['ReportDate'] | ||||
|
||||
def sync(self): | ||||
|
||||
is_start_date_used = False | ||||
params = { | ||||
'summarize_column_by': 'Days' | ||||
} | ||||
|
||||
start_dttm_str = singer.get_bookmark(self.state, self.stream_name, 'LastUpdatedTime') | ||||
if start_dttm_str is None: | ||||
start_dttm_str = self.config.get('start_date') | ||||
is_start_date_used = True | ||||
|
||||
start_dttm = strptime_to_utc(start_dttm_str) | ||||
end_dttm = start_dttm + timedelta(days=DATE_WINDOW_SIZE) | ||||
now_dttm = utils.now() | ||||
|
||||
# Fetch records for minimum 30 days | ||||
# if bookmark from state file is used and it's less than 30 days away | ||||
# Fetch records for start_date to current date | ||||
# if start_date is used and it'sless than 30 days away | ||||
if end_dttm > now_dttm: | ||||
end_dttm = now_dttm | ||||
if not is_start_date_used: | ||||
start_dttm = end_dttm - timedelta(days=DATE_WINDOW_SIZE) | ||||
|
||||
while start_dttm < now_dttm: | ||||
self.parsed_metadata = { | ||||
'dates': [], | ||||
'data': [] | ||||
} | ||||
|
||||
start_tm_str = strftime(start_dttm)[0:10] | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use str(start_dttm.date()) to get date There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated date retrieval as per suggestion. |
||||
end_tm_str = strftime(end_dttm)[0:10] | ||||
|
||||
params["start_date"] = start_tm_str | ||||
params["end_date"] = end_tm_str | ||||
|
||||
resp = self.client.get(self.endpoint, params=params) | ||||
self.parse_report_metadata(resp) | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think having Parsing Parsing There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Split down
|
||||
|
||||
reports = self.day_wise_reports() | ||||
if reports: | ||||
for report in reports: | ||||
yield report | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the difference between yield report and yield from report? I've seen in many places using yield from report There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
tap-quickbooks/tap_quickbooks/streams.py Line 260 in b6e8e4a
|
||||
self.state = singer.write_bookmark(self.state, self.stream_name, 'LastUpdatedTime', strptime_to_utc(report.get('ReportDate')).isoformat()) | ||||
singer.write_state(self.state) | ||||
|
||||
start_dttm = end_dttm + timedelta(days=1) # one record is emitted for every day so start from next day | ||||
end_dttm = start_dttm + timedelta(days=DATE_WINDOW_SIZE) | ||||
|
||||
if end_dttm > now_dttm: | ||||
end_dttm = now_dttm | ||||
|
||||
singer.write_state(self.state) | ||||
|
||||
def parse_report_metadata(self, pileOfRows): | ||||
''' | ||||
Restructure data from report response on daily basis and update self.parsed_metadata dictionary | ||||
{ | ||||
"dates": ["2021-07-01", "2021-07-02", "2021-07-03"], | ||||
"data": [ { | ||||
"name": "Total Income", | ||||
"values": ["4.00", "4.00", "4.00", "12.00"] | ||||
}, { | ||||
"name": "Gross Profit", | ||||
"values": ["4.00", "4.00", "4.00", "12.00"] | ||||
}, { | ||||
"name": "Total Expenses", | ||||
"values": ["1.00", "1.00", "1.00", "3.00"] | ||||
}, { | ||||
"name": "Net Income", | ||||
"values": ["3.00", "3.00", "3.00", "9.00"] | ||||
}] | ||||
} | ||||
''' | ||||
|
||||
if isinstance(pileOfRows, list): | ||||
for x in pileOfRows: | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Stop using variable names like x. Please provide more descriptive There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated x with descriptive name |
||||
self.parse_report_metadata(x) | ||||
|
||||
else: | ||||
|
||||
if 'Rows' in pileOfRows.keys(): | ||||
self.parse_report_metadata(pileOfRows['Rows']) | ||||
|
||||
if 'Row' in pileOfRows.keys(): | ||||
self.parse_report_metadata(pileOfRows['Row']) | ||||
|
||||
if 'Summary' in pileOfRows.keys(): | ||||
self.parse_report_metadata(pileOfRows['Summary']) | ||||
|
||||
if 'ColData' in pileOfRows.keys(): | ||||
d = dict() | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Stop using variable names like d. Provide more descriptive variable name. Bad coding practice There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I will keep in mind using descriptive variable names and updated above d with a proper name. |
||||
d['name'] = pileOfRows['ColData'][0]['value'] | ||||
vals = [] | ||||
for x in pileOfRows['ColData'][1:]: | ||||
vals.append(x['value']) | ||||
d['values'] = vals | ||||
self.parsed_metadata['data'].append(d) | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No x, d variable names. Provide descriptive names There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated x,d types of variables with proper descriptive names. |
||||
|
||||
if 'Columns' in pileOfRows.keys(): | ||||
self.parse_report_metadata(pileOfRows['Columns']) | ||||
|
||||
if 'Column' in pileOfRows.keys(): | ||||
for x in pileOfRows['Column']: | ||||
if 'MetaData' in x.keys(): | ||||
for md in x['MetaData']: | ||||
if md['Name'] in ['StartDate']: | ||||
self.parsed_metadata['dates'].append(md['Value']) | ||||
|
||||
def day_wise_reports(self): | ||||
''' | ||||
Return record for every day formed using output of parse_report_metadata | ||||
''' | ||||
for index, date in enumerate(self.parsed_metadata['dates']): | ||||
report = dict() | ||||
report['ReportDate'] = date | ||||
report['AccountingMethod'] = 'Accrual' | ||||
report['Details'] = {} | ||||
|
||||
for data in self.parsed_metadata['data']: | ||||
report['Details'][data['name']] = data['values'][index] | ||||
|
||||
yield report | ||||
|
||||
|
||||
class ProfitAndLossReport(ReportStream): | ||||
stream_name = 'profit_loss_report' | ||||
endpoint = '/v3/company/{realm_id}/reports/ProfitAndLoss' | ||||
|
||||
STREAM_OBJECTS = { | ||||
"accounts": Accounts, | ||||
|
@@ -222,5 +366,6 @@ class Vendors(Stream): | |||
"time_activities": TimeActivities, | ||||
"transfers": Transfers, | ||||
"vendor_credits": VendorCredits, | ||||
"vendors": Vendors | ||||
"vendors": Vendors, | ||||
"profit_loss_report": ProfitAndLossReport | ||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,6 @@ | ||
import os | ||
import unittest | ||
import time | ||
from datetime import datetime as dt | ||
from datetime import timedelta | ||
|
||
|
@@ -23,6 +24,12 @@ class TestQuickbooksBase(unittest.TestCase): | |
INCREMENTAL = "INCREMENTAL" | ||
FULL = "FULL_TABLE" | ||
START_DATE_FORMAT = "%Y-%m-%dT00:00:00Z" # %H:%M:%SZ | ||
DATETIME_FMT = { | ||
"%Y-%m-%dT%H:%M:%SZ", | ||
"%Y-%m-%d %H:%M:%S", | ||
"%Y-%m-%dT%H:%M:%S.000000Z", | ||
"%Y-%m-%dT%H:%M:%S%z" | ||
} | ||
|
||
def setUp(self): | ||
missing_envs = [x for x in [ | ||
|
@@ -89,18 +96,26 @@ def expected_check_streams(): | |
"transfers", | ||
"vendor_credits", | ||
"vendors", | ||
"profit_loss_report" | ||
} | ||
|
||
def expected_metadata(self): | ||
"""The expected streams and metadata about the streams""" | ||
|
||
mdata = {} | ||
for stream in self.expected_check_streams(): | ||
mdata[stream] = { | ||
self.PRIMARY_KEYS: {'Id'}, | ||
self.REPLICATION_METHOD: self.INCREMENTAL, | ||
self.REPLICATION_KEYS: {'MetaData'}, | ||
} | ||
if self.is_report_stream(stream): | ||
mdata[stream] = { | ||
self.PRIMARY_KEYS: {'ReportDate'}, | ||
self.REPLICATION_METHOD: self.INCREMENTAL, | ||
self.REPLICATION_KEYS: {'ReportDate'}, | ||
} | ||
else: | ||
mdata[stream] = { | ||
self.PRIMARY_KEYS: {'Id'}, | ||
self.REPLICATION_METHOD: self.INCREMENTAL, | ||
self.REPLICATION_KEYS: {'MetaData'}, | ||
} | ||
|
||
return mdata | ||
|
||
|
@@ -222,3 +237,14 @@ def minimum_record_count_by_stream(self): | |
record_counts["vendors"] = 26 | ||
|
||
return record_counts | ||
|
||
def dt_to_ts(self, dtime): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Raising an exception after this for loop would prevent the test from getting back a None and failing in a confusing way in the case where the datetime format is unaccounted for. |
||
for date_format in self.DATETIME_FMT: | ||
try: | ||
date_stripped = int(time.mktime(dt.strptime(dtime, date_format).timetuple())) | ||
return date_stripped | ||
except ValueError: | ||
continue | ||
|
||
def is_report_stream(self, stream): | ||
return stream in ["profit_loss_report"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add more comments to the code to explain at each section what is being done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added more comments to the code for better code explanation.