Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tdl 20939 #238

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
98 changes: 69 additions & 29 deletions tests/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class TestClient():
START_DATE_FORMAT = "%Y-%m-%dT00:00:00Z"
V3_DEALS_PROPERTY_PREFIXES = {'hs_date_entered', 'hs_date_exited', 'hs_time_in'}
BOOKMARK_DATE_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ'
#time_difference = 0

##########################################################################
### CORE METHODS
Expand Down Expand Up @@ -176,7 +177,8 @@ def datatype_transformations(self, stream, records):
##########################################################################
### GET
##########################################################################
def read(self, stream, parent_ids=[], since=''):
### Take pagination parameter to limit the pagination to first 2 pages tdl-16124 ###
def read(self, stream, parent_ids=[], since='', pagination=False):

# Resets the access_token if the expiry time is less than or equal to the current time
if self.CONFIG["token_expires"] <= datetime.datetime.utcnow():
Expand All @@ -187,29 +189,29 @@ def read(self, stream, parent_ids=[], since=''):
elif stream == 'owners':
return self.get_owners()
elif stream == 'companies':
return self.get_companies(since)
return self.get_companies(since, pagination)
elif stream == 'contact_lists':
return self.get_contact_lists(since)
return self.get_contact_lists(since, pagination=pagination)
elif stream == 'contacts_by_company':
return self.get_contacts_by_company(parent_ids)
return self.get_contacts_by_company(parent_ids, pagination)
elif stream == 'engagements':
return self.get_engagements()
return self.get_engagements(pagination)
elif stream == 'campaigns':
return self.get_campaigns()
elif stream == 'deals':
return self.get_deals()
elif stream == 'workflows':
return self.get_workflows()
elif stream == 'contacts':
return self.get_contacts()
return self.get_contacts(pagination)
elif stream == 'deal_pipelines':
return self.get_deal_pipelines()
elif stream == 'email_events':
return self.get_email_events()
return self.get_email_events(pagination)
elif stream == 'subscription_changes':
return self.get_subscription_changes(since)
return self.get_subscription_changes(since, pagination)
elif stream == "tickets":
return self.get_tickets()
return self.get_tickets(pagination)
else:
raise NotImplementedError

Expand Down Expand Up @@ -238,10 +240,11 @@ def _get_company_by_id(self, company_id):
response = self.get(url)
return response

def get_companies(self, since=''):
def get_companies(self, since='', pagination=False):
"""
Get all companies by paginating using 'hasMore' and 'offset'.
"""
page_size = self.BaseTest.expected_metadata().get('companies', {}).get(self.BaseTest.EXPECTED_PAGE_SIZE)
url = f"{BASE_URL}/companies/v2/companies/paged"
if not since:
since = self.start_date_strf
Expand Down Expand Up @@ -273,6 +276,8 @@ def get_companies(self, since=''):

has_more = response['has-more']
params['offset'] = response['offset']
if pagination and len(companies) > page_size+10:
break

# get the details of each company
for company in companies:
Expand All @@ -283,10 +288,11 @@ def get_companies(self, since=''):

return records

def get_contact_lists(self, since='', list_id=''):
def get_contact_lists(self, since='', list_id='', pagination=False):
"""
Get all contact_lists by paginating using 'has-more' and 'offset'.
"""
page_size = self.BaseTest.expected_metadata().get('contact_lists',{}).get(self.BaseTest.EXPECTED_PAGE_SIZE)
url = f"{BASE_URL}/contacts/v1/lists"

if list_id:
Expand All @@ -296,7 +302,7 @@ def get_contact_lists(self, since='', list_id=''):
return response

if since == 'all':
params = {'count': 250}
params = {'count': page_size}
else:
if not since:
since = self.start_date_strf
Expand All @@ -305,15 +311,15 @@ def get_contact_lists(self, since='', list_id=''):
since = datetime.datetime.strptime(since, self.START_DATE_FORMAT)

since = str(since.timestamp() * 1000).split(".")[0]
params = {'since': since, 'count': 250}
params = {'since': since, 'count': page_size}

records = []
replication_key = list(self.replication_keys['contact_lists'])[0]

# paginating through allxo the contact_lists
has_more = True
while has_more:

response = self.get(url, params=params)
for record in response['lists']:

Expand All @@ -322,6 +328,8 @@ def get_contact_lists(self, since='', list_id=''):

has_more = response['has-more']
params['offset'] = response['offset']
if pagination and len(records) > page_size+10:
break

return records

Expand Down Expand Up @@ -354,16 +362,17 @@ def _get_contacts_by_pks(self, pks):

return records[0]

def get_contacts(self):
def get_contacts(self, pagination=False):
"""
Get all contact vids by paginating using 'has-more' and 'vid-offset/vidOffset'.
Then use the vids to grab the detailed contacts records.
"""
page_size = self.BaseTest.expected_metadata().get('contacts',{}).get(self.BaseTest.EXPECTED_PAGE_SIZE)
url_1 = f"{BASE_URL}/contacts/v1/lists/all/contacts/all"
params_1 = {
'showListMemberships': True,
'includeVersion': True,
'count': 100,
'count': page_size,
}
vids = []
url_2 = f"{BASE_URL}/contacts/v1/contact/vids/batch/"
Expand All @@ -379,18 +388,22 @@ def get_contacts(self):
response_1 = self.get(url_1, params=params_1)
vids = [record['vid'] for record in response_1['contacts']
if record['versionTimestamp'] >= self.start_date]

has_more = response_1['has-more']
params_1['vidOffset'] = response_1['vid-offset']

# get the detailed contacts records by vids
params_2['vid'] = vids
response_2 = self.get(url_2, params=params_2)
records.extend([record for record in response_2.values()])
if pagination and len(records) > page_size+10:
LOGGER.info("haven't met the page size")
break

records = self.denest_properties('contacts', records)
return records

def get_contacts_by_company(self, parent_ids):
def get_contacts_by_company(self, parent_ids, pagination=False):
"""
Get all contacts_by_company iterating over compnayId's and
paginating using 'hasMore' and 'vidOffset'. This stream is essentially
Expand All @@ -400,8 +413,9 @@ def get_contacts_by_company(self, parent_ids):
pulling the 'companyId' from each record to perform the corresponding get here.
"""

page_size = self.BaseTest.expected_metadata().get('contacts_by_company', {}).get(self.BaseTest.EXPECTED_PAGE_SIZE)
url = f"{BASE_URL}/companies/v2/companies/{{}}/vids"
params = dict()
params = {'count': page_size}
records = []

for parent_id in parent_ids:
Expand All @@ -416,8 +430,10 @@ def get_contacts_by_company(self, parent_ids):

has_more = response['hasMore']
params['vidOffset'] = response['vidOffset']
if pagination and len(records) > page_size+10:
break

params = dict()
params = {'count': page_size}

return records

Expand Down Expand Up @@ -512,13 +528,14 @@ def get_deals(self):
records = self.denest_properties('deals', records)
return records

def get_email_events(self, recipient=''):
def get_email_events(self, recipient='', pagination=False):
"""
Get all email_events by paginating using 'hasMore' and 'offset'.
"""
page_size = self.BaseTest.expected_metadata().get('email_events',{}).get(self.BaseTest.EXPECTED_PAGE_SIZE)
url = f"{BASE_URL}/email/public/v1/events"
replication_key = list(self.replication_keys['email_events'])[0]
params = dict()
params = {'count': page_size}
if recipient:
params['recipient'] = recipient
records = []
Expand All @@ -532,6 +549,8 @@ def get_email_events(self, recipient=''):

has_more = response['hasMore']
params['offset'] = response['offset']
if pagination and len(records) > page_size+10:
break

return records

Expand All @@ -549,13 +568,14 @@ def _get_engagements_by_pk(self, engagement_id):

return response

def get_engagements(self):
def get_engagements(self, pagination=False):
"""
Get all engagements by paginating using 'hasMore' and 'offset'.
"""
page_size = self.BaseTest.expected_metadata().get('engagements',{}).get(self.BaseTest.EXPECTED_PAGE_SIZE)
url = f"{BASE_URL}/engagements/v1/engagements/paged"
replication_key = list(self.replication_keys['engagements'])[0]
params = {'limit': 250}
params = {'limit': page_size}
records = []

has_more = True
Expand All @@ -570,6 +590,8 @@ def get_engagements(self):

has_more = response['hasMore']
params['offset'] = response['offset']
if pagination and len(records) > page_size+10:
break

return records

Expand Down Expand Up @@ -606,13 +628,14 @@ def get_owners(self):
transformed_records = self.datatype_transformations('owners', records)
return transformed_records

def get_subscription_changes(self, since=''):
def get_subscription_changes(self, since='', pagination=False):
"""
Get all subscription_changes from 'since' date by paginating using 'hasMore' and 'offset'.
Default since date is one week ago
"""
page_size = self.BaseTest.expected_metadata().get('subscription_changes',{}).get(self.BaseTest.EXPECTED_PAGE_SIZE)
url = f"{BASE_URL}/email/public/v1/subscriptions/timeline"
params = dict()
params = {'count': page_size}
records = []
replication_key = list(self.replication_keys['subscription_changes'])[0]
if not since:
Expand All @@ -632,6 +655,8 @@ def get_subscription_changes(self, since=''):
# this won't be feasible until BUG_TDL-14938 is addressed
if int(since) <= record['timestamp']:
records.append(record)
if pagination and len(records) > page_size+10:
break

return records

Expand Down Expand Up @@ -677,18 +702,19 @@ def get_tickets_properties(self):

return ",".join([record["name"] for record in records["results"]])

def get_tickets(self):
def get_tickets(self, pagination=False):
"""
Get all tickets.
HubSpot API https://developers.hubspot.com/docs/api/crm/tickets
"""
page_size = self.BaseTest.expected_metadata().get('tickets',{}).get(self.BaseTest.EXPECTED_PAGE_SIZE)
url = f"{BASE_URL}/crm/v4/objects/tickets"
replication_key = list(self.replication_keys["tickets"])[0]
records = []

# response = self.get(url)

params = {"limit": 100, "associations": "contact,company,deals", 'properties': self.get_tickets_properties()}
params = {"limit": page_size, "associations": "contact,company,deals", 'properties': self.get_tickets_properties()}
while True:
response = self.get(url, params=params)

Expand All @@ -698,6 +724,8 @@ def get_tickets(self):

if not response.get("paging"):
break
if page_size and len(records) > page_size+10:
break
params["after"] = response.get("paging").get("next").get("after")

records = self.denest_properties('tickets', records)
Expand Down Expand Up @@ -797,6 +825,10 @@ def create_contacts(self):
]
}

#Get the current time in seconds
date= datetime.datetime.utcnow() - datetime.datetime(1970, 1, 1)
seconds =(date.total_seconds())

# generate a contacts record
response = self.post(url, data)
records = [response]
Expand All @@ -805,6 +837,12 @@ def create_contacts(self):
params = {'includeVersion': True}
get_resp = self.get(get_url, params=params)

#Get the created time and the difference to monitor the time difference - tdl-20939
created_time = get_resp.get('properties').get('createdate').get('value')
ts=int(created_time)/1000
LOGGER.info("Created Time %s", datetime.datetime.utcfromtimestamp(ts))
self.time_difference = ts-seconds

converted_versionTimestamp = self.BaseTest.datetime_from_timestamp(
get_resp['versionTimestamp'] / 1000, self.BOOKMARK_DATE_FORMAT
)
Expand Down Expand Up @@ -880,7 +918,8 @@ def create_contacts_by_company(self, company_ids=[], contact_records=[], times=1
if not company_ids:
company_ids = [company['companyId'] for company in self.get_companies()]
if not contact_records:
contact_records = self.get_contacts()
page_size = self.BaseTest.expected_metadata().get('contacts_by_company',{}).get(self.BaseTest.EXPECTED_PAGE_SIZE)
contact_records = self.get_contacts(page_size)

records = []
for _ in range(times):
Expand Down Expand Up @@ -1041,7 +1080,8 @@ def create_engagements(self):
record_uuid = str(uuid.uuid4()).replace('-', '')

# gather all contacts and randomly choose one that has not hit the limit
contact_records = self.get_contacts()
page_size = self.BaseTest.expected_metadata().get('engagements',{}).get(self.BaseTest.EXPECTED_PAGE_SIZE)
contact_records = self.get_contacts(page_size)
contact_ids = [contact['vid']
for contact in contact_records
if contact['vid'] != 2304] # contact 2304 has hit the 10,000 assoc limit
Expand Down
4 changes: 2 additions & 2 deletions tests/test_hubspot_all_fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,9 @@ def setUp(self):
# Get all records
if stream == 'contacts_by_company':
company_ids = [company['companyId'] for company in self.expected_records['companies']]
self.expected_records[stream] = test_client.read(stream, parent_ids=company_ids)
self.expected_records[stream] = test_client.read(stream, self.expected_page_limits().get(stream), parent_ids=company_ids)
else:
self.expected_records[stream] = test_client.read(stream)
self.expected_records[stream] = test_client.read(stream, self.expected_page_limits().get(stream))

for stream, records in self.expected_records.items():
LOGGER.info("The test client found %s %s records.", len(records), stream)
Expand Down
Loading