Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TDL-25607] : API version upgrade #67

Merged
merged 21 commits into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 33 additions & 28 deletions tap_klaviyo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,37 +5,40 @@
import sys
import singer
from singer import metadata
from tap_klaviyo.utils import get_incremental_pull, get_full_pulls, get_all_pages
from tap_klaviyo.utils import get_incremental_pull, get_full_pulls, get_all_using_next

LOGGER = singer.get_logger()

API_VERSION = "2024-02-15"

ENDPOINTS = {
'global_exclusions': 'https://a.klaviyo.com/api/v1/people/exclusions',
'lists': 'https://a.klaviyo.com/api/v1/lists',
'metrics': 'https://a.klaviyo.com/api/v1/metrics',
'metric': 'https://a.klaviyo.com/api/v1/metric/',
'campaigns': 'https://a.klaviyo.com/api/v1/campaigns'
'global_exclusions': 'https://a.klaviyo.com/api/profiles',
shantanu73 marked this conversation as resolved.
Show resolved Hide resolved
'lists': 'https://a.klaviyo.com/api/lists',
'metrics': 'https://a.klaviyo.com/api/metrics',
'events': 'https://a.klaviyo.com/api/events',
'campaigns': 'https://a.klaviyo.com/api/campaigns'
}

EVENT_MAPPINGS = {
"Received Email": "receive",
"Clicked Email": "click",
"Opened Email": "open",
"Bounced Email": "bounce",
"Unsubscribed": "unsubscribe",
"Unsubscribed from Email Marketing": "unsubscribe",
"Marked Email as Spam": "mark_as_spam",
"Unsubscribed from List": "unsub_list",
"Subscribed to Email Marketing": "subscribed_to_email",
"Subscribed to List": "subscribe_list",
"Updated Email Preferences": "update_email_preferences",
"Dropped Email": "dropped_email",
"Clicked SMS": "clicked_sms",
"Consented to Receive SMS": "consented_to_receive",
"Subscribed to SMS Marketing": "subscribed_to_sms",
"Failed to Deliver SMS": "failed_to_deliver",
"Failed to deliver Automated Response SMS": "failed_to_deliver_automated_response",
"Received Automated Response SMS": "received_automated_response",
"Received SMS": "received_sms",
"Sent SMS": "sent_sms",
"Unsubscribed from SMS": "unsubscribed_from_sms"
"Unsubscribed from SMS Marketing": "unsubscribed_from_sms"
}


Expand Down Expand Up @@ -83,7 +86,7 @@ def to_catalog_dict(self):
GLOBAL_EXCLUSIONS = Stream(
'global_exclusions',
'global_exclusions',
'email',
'id',
'FULL_TABLE'
)

Expand Down Expand Up @@ -126,8 +129,7 @@ def load_shared_schema_refs():

return shared_schema_refs

def do_sync(config, state, catalog):
api_key = config['api_key']
def do_sync(config, state, catalog, headers):
start_date = config['start_date'] if 'start_date' in config else None

stream_ids_to_sync = set()
Expand All @@ -147,21 +149,21 @@ def do_sync(config, state, catalog):
)

if stream['stream'] in EVENT_MAPPINGS.values():
get_incremental_pull(stream, ENDPOINTS['metric'], state,
api_key, start_date)
get_incremental_pull(stream, ENDPOINTS['events'], state,
headers, start_date)
else:
get_full_pulls(stream, ENDPOINTS[stream['stream']], api_key)
get_full_pulls(stream, ENDPOINTS[stream['stream']], headers)


def get_available_metrics(api_key):
def get_available_metrics(headers):
metric_streams = []
for response in get_all_pages('metric_list',
ENDPOINTS['metrics'], api_key):
for response in get_all_using_next('metric_list',
ENDPOINTS['metrics'], headers, {}):
for metric in response.json().get('data'):
if metric['name'] in EVENT_MAPPINGS:
if metric['attributes']['name'] in EVENT_MAPPINGS:
metric_streams.append(
Stream(
stream=EVENT_MAPPINGS[metric['name']],
stream=EVENT_MAPPINGS[metric['attributes']['name']],
tap_stream_id=metric['id'],
key_properties="id",
replication_method='INCREMENTAL',
Expand All @@ -172,29 +174,32 @@ def get_available_metrics(api_key):
return metric_streams


def discover(api_key):
metric_streams = get_available_metrics(api_key)
def discover(headers):
metric_streams = get_available_metrics(headers)
return {"streams": [a.to_catalog_dict()
for a in metric_streams + FULL_STREAMS]}


def do_discover(api_key):
print(json.dumps(discover(api_key), indent=2))
def do_discover(headers):
print(json.dumps(discover(headers), indent=2))

@singer.utils.handle_top_exception(LOGGER)
def main():

args = singer.utils.parse_args(REQUIRED_CONFIG_KEYS)
headers = {
"Authorization": f"Klaviyo-API-Key {args.config.get('api_key')}",
"revision": API_VERSION
}

if args.discover:
do_discover(args.config['api_key'])
do_discover(headers)

else:
catalog = args.catalog.to_dict() if args.catalog else discover(
args.config['api_key'])
catalog = args.catalog.to_dict() if args.catalog else discover(headers)

state = args.state if args.state else {"bookmarks": {}}
do_sync(args.config, state, catalog)
do_sync(args.config, state, catalog, headers)

if __name__ == '__main__':
main()
Loading
Loading