Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TDL-25607] : API version upgrade #67

Merged
merged 21 commits into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 10 additions & 17 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ version: 2
jobs:
build:
docker:
- image: 218546966473.dkr.ecr.us-east-1.amazonaws.com/circle-ci:tap-tester-v4
- image: 218546966473.dkr.ecr.us-east-1.amazonaws.com/circle-ci:stitch-tap-tester
steps:
- checkout
- run:
name: 'Setup'
command: |
virtualenv -p python3 ~/.virtualenvs/tap-klaviyo
source ~/.virtualenvs/tap-klaviyo/bin/activate
virtualenv -p python3 /usr/local/share/virtualenvs/tap-klaviyo
source /usr/local/share/virtualenvs/tap-klaviyo/bin/activate
pip install -U pip 'setuptools<51.0.0'
pip install .
pip install pylint
- run:
Expand All @@ -20,15 +21,14 @@ jobs:
- run:
name: 'Pylint'
command: |
source ~/.virtualenvs/tap-klaviyo/bin/activate
pylint tap_klaviyo -d C,R,'unspecified-encoding'
source /usr/local/share/virtualenvs/tap-klaviyo/bin/activate
pylint tap_klaviyo -d C,R,W
- run:
name: 'Unit Tests'
command: |
source ~/.virtualenvs/tap-klaviyo/bin/activate
pip install nose coverage
nosetests --with-coverage --cover-erase --cover-package=tap_klaviyo --cover-html-dir=htmlcov tests/unittests
coverage html
source /usr/local/share/virtualenvs/tap-klaviyo/bin/activate
pip install nose2 parameterized nose2[coverage_plugin]>=0.6.5
nose2 --with-coverage -v -s tests/unittests
- store_test_results:
path: test_output/report.xml
- store_artifacts:
Expand All @@ -40,14 +40,7 @@ jobs:
aws s3 cp s3://com-stitchdata-dev-deployment-assets/environments/tap-tester/tap_tester_sandbox dev_env.sh
source dev_env.sh
source /usr/local/share/virtualenvs/tap-tester/bin/activate
run-test --tap=tap-klaviyo \
--target=target-stitch \
--orchestrator=stitch-orchestrator \
[email protected] \
--password=$SANDBOX_PASSWORD \
--client-id=50 \
--token=$STITCH_API_TOKEN \
tests
run-test --tap=tap-klaviyo tests

workflows:
version: 2
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data from the Klaviyo API following the [Singer
spec](https://github.com/singer-io/getting-started/blob/master/SPEC.md).

This tap:
- Pulls raw data from the [Klaviyo metrics API](https://www.klaviyo.com/docs/api/metrics)
- Pulls raw data from the [Klaviyo metrics API](https://developers.klaviyo.com/en/reference/api_overview)
- Outputs the schema for each resource
- Incrementally pulls data based on the input state for incremental endpoints
- Updates full tables for global exclusions and lists endpoints
Expand Down
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
url='http://singer.io',
classifiers=['Programming Language :: Python :: 3 :: Only'],
py_modules=['tap_klaviyo'],
install_requires=['singer-python==5.8.1',
'requests==2.20.0'],
install_requires=['singer-python==6.0.0',
'requests==2.31.0'],
entry_points='''
[console_scripts]
tap-klaviyo=tap_klaviyo:main
Expand Down
62 changes: 34 additions & 28 deletions tap_klaviyo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,37 +5,41 @@
import sys
import singer
from singer import metadata
from tap_klaviyo.utils import get_incremental_pull, get_full_pulls, get_all_pages
from tap_klaviyo.utils import get_incremental_pull, get_full_pulls, get_all_using_next

LOGGER = singer.get_logger()

API_VERSION = "2024-02-15"

# For stream global_exclusions, data related to suppressed users can be found in the /api/profiles endpoint
ENDPOINTS = {
'global_exclusions': 'https://a.klaviyo.com/api/v1/people/exclusions',
'lists': 'https://a.klaviyo.com/api/v1/lists',
'metrics': 'https://a.klaviyo.com/api/v1/metrics',
'metric': 'https://a.klaviyo.com/api/v1/metric/',
'campaigns': 'https://a.klaviyo.com/api/v1/campaigns'
'global_exclusions': 'https://a.klaviyo.com/api/profiles',
shantanu73 marked this conversation as resolved.
Show resolved Hide resolved
'lists': 'https://a.klaviyo.com/api/lists',
'metrics': 'https://a.klaviyo.com/api/metrics',
'events': 'https://a.klaviyo.com/api/events',
'campaigns': 'https://a.klaviyo.com/api/campaigns'
}

EVENT_MAPPINGS = {
"Received Email": "receive",
"Clicked Email": "click",
"Opened Email": "open",
"Bounced Email": "bounce",
"Unsubscribed": "unsubscribe",
"Unsubscribed from Email Marketing": "unsubscribe",
"Marked Email as Spam": "mark_as_spam",
"Unsubscribed from List": "unsub_list",
"Subscribed to Email Marketing": "subscribed_to_email",
"Subscribed to List": "subscribe_list",
"Updated Email Preferences": "update_email_preferences",
"Dropped Email": "dropped_email",
"Clicked SMS": "clicked_sms",
"Consented to Receive SMS": "consented_to_receive",
"Subscribed to SMS Marketing": "subscribed_to_sms",
"Failed to Deliver SMS": "failed_to_deliver",
"Failed to deliver Automated Response SMS": "failed_to_deliver_automated_response",
"Received Automated Response SMS": "received_automated_response",
"Received SMS": "received_sms",
"Sent SMS": "sent_sms",
"Unsubscribed from SMS": "unsubscribed_from_sms"
"Unsubscribed from SMS Marketing": "unsubscribed_from_sms"
}


Expand Down Expand Up @@ -83,7 +87,7 @@ def to_catalog_dict(self):
GLOBAL_EXCLUSIONS = Stream(
'global_exclusions',
'global_exclusions',
'email',
'id',
'FULL_TABLE'
)

Expand Down Expand Up @@ -126,8 +130,7 @@ def load_shared_schema_refs():

return shared_schema_refs

def do_sync(config, state, catalog):
api_key = config['api_key']
def do_sync(config, state, catalog, headers):
start_date = config['start_date'] if 'start_date' in config else None

stream_ids_to_sync = set()
Expand All @@ -147,21 +150,21 @@ def do_sync(config, state, catalog):
)

if stream['stream'] in EVENT_MAPPINGS.values():
get_incremental_pull(stream, ENDPOINTS['metric'], state,
api_key, start_date)
get_incremental_pull(stream, ENDPOINTS['events'], state,
headers, start_date)
else:
get_full_pulls(stream, ENDPOINTS[stream['stream']], api_key)
get_full_pulls(stream, ENDPOINTS[stream['stream']], headers)


def get_available_metrics(api_key):
def get_available_metrics(headers):
metric_streams = []
for response in get_all_pages('metric_list',
ENDPOINTS['metrics'], api_key):
for response in get_all_using_next('metric_list',
ENDPOINTS['metrics'], headers, {}):
for metric in response.json().get('data'):
if metric['name'] in EVENT_MAPPINGS:
if metric['attributes']['name'] in EVENT_MAPPINGS:
metric_streams.append(
Stream(
stream=EVENT_MAPPINGS[metric['name']],
stream=EVENT_MAPPINGS[metric['attributes']['name']],
tap_stream_id=metric['id'],
key_properties="id",
replication_method='INCREMENTAL',
Expand All @@ -172,29 +175,32 @@ def get_available_metrics(api_key):
return metric_streams


def discover(api_key):
metric_streams = get_available_metrics(api_key)
def discover(headers):
metric_streams = get_available_metrics(headers)
return {"streams": [a.to_catalog_dict()
for a in metric_streams + FULL_STREAMS]}


def do_discover(api_key):
print(json.dumps(discover(api_key), indent=2))
def do_discover(headers):
print(json.dumps(discover(headers), indent=2))

@singer.utils.handle_top_exception(LOGGER)
def main():

args = singer.utils.parse_args(REQUIRED_CONFIG_KEYS)
headers = {
"Authorization": f"Klaviyo-API-Key {args.config.get('api_key')}",
"revision": API_VERSION
}

if args.discover:
do_discover(args.config['api_key'])
do_discover(headers)

else:
catalog = args.catalog.to_dict() if args.catalog else discover(
args.config['api_key'])
catalog = args.catalog.to_dict() if args.catalog else discover(headers)

state = args.state if args.state else {"bookmarks": {}}
do_sync(args.config, state, catalog)
do_sync(args.config, state, catalog, headers)

if __name__ == '__main__':
main()
Loading
Loading