Skip to content

Commit

Permalink
make attribution-window user-configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
haleemur committed Nov 14, 2023
1 parent 2d218b3 commit f581d75
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 28 deletions.
45 changes: 24 additions & 21 deletions tap_marketo/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#!/usr/bin/env python3

# Marketo Docs are located at http://developers.marketo.com/rest-api/

from datetime import datetime, timedelta
import itertools
import re

Expand Down Expand Up @@ -35,39 +37,40 @@


ATTRIBUTION_WINDOW_README = """
`attribution_window` may be specified by a combination of days, hours and minutes. This parameter is
`attribution_window` may be specified by a combination of days, hours and minutes seconds. This parameter is
quite useful in a moderate frequency incremental bulk extracts (e.g. once an hour)
to allow users a way to avoid extracting all leads updated 1 day prior (i.e. default attribution window)
examples of valid attribution_windows: 1d, 12h, 1h30m, 1d6h55m
examples of valid attribution_windows: `1 day`, `1 days`, `2 day`, `10 days`, `10:00:00`, `1 day 05:00:00`
"""
ATTRIBUTION_WINDOW_SPEC_RE_ATOMS = [r'\d{1,2}d', r'\d{1,2}h', r'\d{1,2}m']
ATTRIBUTION_WINDOW_ALLOWED_RE = [
re.compile(''.join(x))
for i in range(1, 4)
for x in itertools.combinations(ATTRIBUTION_WINDOW_SPEC_RE_ATOMS, i)
]


def parse_attribution_window(attribution_window_string):
f"""
Parse optional config parameter `attribution_window`.
Attribution window is used to set an earlier export_start
for incremental replication of of the leads stream.
{ATTRIBUTION_WINDOW_README}
"""
aw = attribution_window_string.lower().strip()
if not any(p.match(aw) for p in ATTRIBUTION_WINDOW_ALLOWED_RE):
raise ValueError(
f"Invalid attribution window string: {attribution_window_string}."
f"{ATTRIBUTION_WINDOW_README}"
)
window_code_name_map = {'d': 'days', 'h': 'hours', 'm': 'minutes'}
return {
window_code_name_map[y]: int(x)
for x, y in re.findall(r'(\d{1,2})([dhm])', aw)
}
errstr = f"`{attribution_window_string}` is not a valid attribution window."
pat = '^((?P<day>^\d+)\s+days?)?(\s+)?(?P<time>(\d{2}:\d{2}:\d{2}))?$'
match = re.match(pat, attribution_window_string)
if not match:
raise ValueError(errstr)
groups = match.groupdict()
delta_day = groups["day"] or '0'
delta_time = groups["time"] or '00:00:00'
try:
parsed_time = datetime.strptime(delta_time, '%H:%M:%S')
return timedelta(
days=int(delta_day) if delta_day else 0,
hours=parsed_time.hour,
minutes=parsed_time.minute,
seconds=parsed_time.second
)
except ValueError as e:
raise ValueError(errstr)




def validate_state(config, catalog, state):
Expand Down
9 changes: 7 additions & 2 deletions tap_marketo/sync.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from datetime import timedelta

import csv
import json
import pendulum
Expand Down Expand Up @@ -307,8 +309,11 @@ def sync_leads(client, state, stream, config):
initial_bookmark = pendulum.parse(bookmarks.get_bookmark(state, "leads", replication_key))
export_start = pendulum.parse(bookmarks.get_bookmark(state, "leads", replication_key))
if client.use_corona:
attribution_window = config.get('attribution_window', {'days': ATTRIBUTION_WINDOW_DAYS})
export_start = export_start.subtract(**attribution_window)
aw = config.get(
'attribution_window',
timedelta(days=ATTRIBUTION_WINDOW_DAYS)
)
export_start = export_start.subtract(days=aw.days, seconds=aw.seconds)

# job_started is truncated to the microsecond
# in get_export_end the field export_end is also truncated to the microsecond
Expand Down
29 changes: 28 additions & 1 deletion tests/test_startup.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from datetime import timedelta
import unittest

import pendulum
import requests_mock

from tap_marketo import validate_state
from tap_marketo import validate_state, parse_attribution_window
from tap_marketo.sync import determine_replication_key

class TestValidateState(unittest.TestCase):
Expand Down Expand Up @@ -268,3 +269,29 @@ def test_validate_state(self):

self.assertDictEqual(validate_state(mock_config, mock_catalog, mock_state_2),
expected_state_2)

def test_parse_attribution_window_parses(self):
"""Verify attribution window is successfully parsed for valid patterns."""

aw1 = '3 day 20:00:00'
aw2 = '1 days'
aw3 = '10:10:10'
res1 = parse_attribution_window(aw1)
res2 = parse_attribution_window(aw2)
res3 = parse_attribution_window(aw3)
assert res1 == timedelta(days=3, hours=20)
assert res2 == timedelta(days=1)
assert res3 == timedelta(hours=10, minutes=10, seconds=10)

def test_parse_attribution_window_raises(self):
"""Verify parse_attribution_window raised ValueError for invalid patterns."""

aw1 = 'foobar 3 day 20:00:00'
aw2 = '3 day 1:00:00'
aw3 = '10:00:00 foobar'
with self.assertRaises(ValueError):
parse_attribution_window(aw1)
with self.assertRaises(ValueError):
parse_attribution_window(aw2)
with self.assertRaises(ValueError):
parse_attribution_window(aw3)
4 changes: 0 additions & 4 deletions tests/test_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,6 @@
from tap_marketo.sync import *


SAMPLE_LEADS_CSV = """
"""

def parse_params(request):
return dict(urllib.parse.parse_qsl(request.query))

Expand Down Expand Up @@ -56,7 +53,6 @@ class TestMarketoExport(unittest.TestCase):
def setUp(self):
self.client = Client("123-ABC-789", "id", "secret")
self.client._use_corona = True
self.sample_leads_csv = SAMPLE_LEADS_CSV
self.mock_status_completed = {
"result": [
{
Expand Down

0 comments on commit f581d75

Please sign in to comment.