Skip to content

Commit

Permalink
Fix/staging issues 01 (#111)
Browse files Browse the repository at this point in the history
* Update SQL to skip inactive subscriptions

- Optimize by adding is_active filters for subscriptions qs

* Adjust staging/production k8s resources

* Fix crontab issue

- Crontab requires explicit values for minute and hour

* Fix celery task log typo

* Add Azure storage config in helm template
  • Loading branch information
thenav56 authored Dec 9, 2024
1 parent 4bceb7d commit 2a8cbe0
Show file tree
Hide file tree
Showing 10 changed files with 85 additions and 36 deletions.
4 changes: 2 additions & 2 deletions apps/subscription/emails.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def generate_user_alert_subscription_email_context(
email_frequency: UserAlertSubscription.EmailFrequency,
) -> tuple[bool, dict, models.QuerySet[UserAlertSubscription]]:
# NOTE: Number of subscription is static and less than UserAlertSubscription.LIMIT_PER_USER
subscription_qs = UserAlertSubscription.objects.filter(user=user, email_frequency=email_frequency)
subscription_qs = UserAlertSubscription.objects.filter(is_active=True, user=user, email_frequency=email_frequency)

if email_frequency == UserAlertSubscription.EmailFrequency.DAILY:
from_datetime_threshold = timezone.now() - timedelta(hours=24)
Expand Down Expand Up @@ -104,7 +104,7 @@ def send_user_alert_subscription_email(user: User, email_frequency: UserAlertSub
def send_user_alert_subscriptions_email(email_frequency: UserAlertSubscription.EmailFrequency):
# TODO: Send in parallel if email service supports it?
users_qs = User.objects.filter(
id__in=UserAlertSubscription.objects.filter(email_frequency=email_frequency).values('user'),
id__in=UserAlertSubscription.objects.filter(is_active=True, email_frequency=email_frequency).values('user'),
)

for user in users_qs.iterator():
Expand Down
2 changes: 1 addition & 1 deletion apps/subscription/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class Meta:

def validate_is_active(self, is_active):
if is_active:
qs = UserAlertSubscription.objects.filter(user=self.context["request"].user, is_active=True)
qs = UserAlertSubscription.objects.filter(is_active=True, user=self.context["request"].user)
if self.instance and self.instance.pk:
qs = qs.exclude(pk=self.instance.pk)
if qs.count() >= UserAlertSubscription.LIMIT_PER_USER:
Expand Down
24 changes: 15 additions & 9 deletions apps/subscription/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ def _cl_name(field):
alert_data.id AS alert_id
FROM
alert_data
CROSS JOIN {_tb_name(UserAlertSubscription)} AS subscriptions
CROSS JOIN (
SELECT * FROM {_tb_name(UserAlertSubscription)}
WHERE {_cl_name(UserAlertSubscription.is_active)} is True
) AS subscriptions
WHERE
(
subscriptions.{_cl_name(UserAlertSubscription.filter_alert_country)} = alert_data.country_id
Expand Down Expand Up @@ -102,9 +105,10 @@ def process_pending_subscription_alerts():

@shared_task
def send_daily_user_alert_subscriptions_email():
with redis_lock(CacheKey.RedisLockKey.SEND_DAILY_USER_ALERT_SUBSCRIPTION_EMAIL) as acquired:
redis_lock_key = CacheKey.RedisLockKey.SEND_DAILY_USER_ALERT_SUBSCRIPTION_EMAIL
with redis_lock(redis_lock_key) as acquired:
if not acquired:
logger.warning(f'{CacheKey.RedisLockKey.SEND_DAILY_USER_ALERT_SUBSCRIPTION_EMAIL} is already running')
logger.warning(f'{redis_lock_key} is already running')
return
start_time = time.time()
send_user_alert_subscriptions_email(UserAlertSubscription.EmailFrequency.DAILY)
Expand All @@ -113,24 +117,26 @@ def send_daily_user_alert_subscriptions_email():

@shared_task
def send_weekly_user_alert_subscriptions_email():
with redis_lock(CacheKey.RedisLockKey.SEND_WEEKLY_USER_ALERT_SUBSCRIPTION_EMAIL) as acquired:
redis_lock_key = CacheKey.RedisLockKey.SEND_WEEKLY_USER_ALERT_SUBSCRIPTION_EMAIL
with redis_lock(redis_lock_key) as acquired:
if not acquired:
logger.warning(f'{CacheKey.RedisLockKey.SEND_WEEKLY_USER_ALERT_SUBSCRIPTION_EMAIL} is already running')
logger.warning(f'{redis_lock_key} is already running')
return
start_time = time.time()
send_user_alert_subscriptions_email(UserAlertSubscription.EmailFrequency.WEEKLY)
logger.info(f'Send daily user alert subscription email. Runtime: {time.time() - start_time} seconds')
logger.info(f'Send weekly user alert subscription email. Runtime: {time.time() - start_time} seconds')


@shared_task
def send_monthly_user_alert_subscriptions_email():
with redis_lock(CacheKey.RedisLockKey.SEND_MONTHLY_USER_ALERT_SUBSCRIPTION_EMAIL) as acquired:
redis_lock_key = CacheKey.RedisLockKey.SEND_MONTHLY_USER_ALERT_SUBSCRIPTION_EMAIL
with redis_lock(redis_lock_key) as acquired:
if not acquired:
logger.warning(f'{CacheKey.RedisLockKey.SEND_MONTHLY_USER_ALERT_SUBSCRIPTION_EMAIL} is already running')
logger.warning(f'{redis_lock_key} is already running')
return
start_time = time.time()
send_user_alert_subscriptions_email(UserAlertSubscription.EmailFrequency.MONTHLY)
logger.info(f'Send daily user alert subscription email. Runtime: {time.time() - start_time} seconds')
logger.info(f'Send monthly user alert subscription email. Runtime: {time.time() - start_time} seconds')


# TODO: Add tasks to clean up SubscriptionAlert table data for old entries
16 changes: 15 additions & 1 deletion apps/subscription/tests/test_subscription_alert_tagging.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ def test_subscription_alert_tagging(self):
subs = [
UserAlertSubscriptionFactory.create(
user=self.user,
is_active=True,
filter_alert_country=_data[0],
filter_alert_admin1s=_data[1],
filter_alert_urgencies=_data[2],
Expand All @@ -112,8 +113,21 @@ def test_subscription_alert_tagging(self):
]
]

# Noise subscriptions (In active subscriptions)
UserAlertSubscriptionFactory.create_batch(
5,
user=self.user,
is_active=False,
filter_alert_country=self.c_nepal,
filter_alert_admin1s=[],
filter_alert_urgencies=[],
filter_alert_severities=[],
filter_alert_certainties=[],
filter_alert_categories=[],
)

assert Alert.objects.filter(is_processed_by_subscription=False).count() == len(all_alerts)
assert UserAlertSubscription.objects.filter(user=self.user).count() == len(subs)
assert UserAlertSubscription.objects.filter(user=self.user).count() == len(subs) + 5
assert SubscriptionAlert.objects.count() == 0

# Run this twice to make sure re-running doesn't break anything
Expand Down
1 change: 0 additions & 1 deletion helm/linter_values.yaml

This file was deleted.

10 changes: 10 additions & 0 deletions helm/templates/config/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,16 @@ data:
SENTRY_TRACES_SAMPLE_RATE: {{ required "env.SENTRY_TRACES_SAMPLE_RATE" .Values.env.SENTRY_TRACES_SAMPLE_RATE | quote }}
SENTRY_PROFILE_SAMPLE_RATE: {{ required "env.SENTRY_PROFILE_SAMPLE_RATE" .Values.env.SENTRY_PROFILE_SAMPLE_RATE | quote }}

{{- if .Values.env.USE_AZURE_STORAGE }}
# Azure configs
USE_AZURE_STORAGE: "true"
# TODO: Add support for AZURE_STORAGE_TOKEN_CREDENTIAL
AZURE_STORAGE_MANAGED_IDENTITY: "true"
AZURE_STORAGE_ACCOUNT_NAME: {{ required "env.AZURE_STORAGE_ACCOUNT_NAME" .Values.env.AZURE_STORAGE_ACCOUNT_NAME | quote }}
AZURE_STORAGE_MEDIA_CONTAINER: {{ required "env.AZURE_STORAGE_MEDIA_CONTAINER" .Values.env.AZURE_STORAGE_MEDIA_CONTAINER | quote }}
AZURE_STORAGE_STATIC_CONTAINER: {{ required "env.AZURE_STORAGE_STATIC_CONTAINER" .Values.env.AZURE_STORAGE_STATIC_CONTAINER | quote }}
{{- end }}

# Additional configs
{{- range $name, $value := .Values.envAdditional }}
{{ $name }}: {{ $value | quote }}
Expand Down
40 changes: 30 additions & 10 deletions helm/values-production.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,43 @@ redis:
master:
resourcesPreset: micro # https://github.com/bitnami/charts/blob/main/bitnami/common/templates/_resources.tpl#L16

# FIXME: Use proper requests.cpu
worker:
enabled: true
resources:
requests:
cpu: "2"
memory: 2Gi
limits:
cpu: "4"
memory: 4Gi
beat:
resources:
requests:
cpu: "0.1"
memory: 0.1Gi
memory: 0.5Gi
limits:
cpu: "1"
memory: 1Gi
flower:
resources:
requests:
cpu: "0.1"
memory: 0.5Gi
limits:
cpu: "4"
cpu: "1"
memory: 1Gi
queues:
# NOTE: Make sure keys are lowercase
default:
resources:
requests:
cpu: "1"
memory: 1Gi
limits:
cpu: "2"
memory: 2Gi
feeds:
replicaCount: 2
resources:
requests:
cpu: "1"
memory: 1Gi
limits:
cpu: "4"
memory: 2Gi

api:
resources:
Expand Down
11 changes: 1 addition & 10 deletions helm/values-staging.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ worker:
queues:
# NOTE: Make sure keys are lowercase
default:
replicaCount: 2
resources:
requests:
cpu: "0.1"
Expand All @@ -44,7 +43,7 @@ worker:
cpu: "1"
memory: 2Gi
feeds:
replicaCount: 1
replicaCount: 2
resources:
requests:
cpu: "0.2"
Expand All @@ -63,14 +62,6 @@ api:
cpu: "4"
memory: 2Gi

argoHooks:
# NOTE: Make sure keys are lowercase
db-migrate:
requestsCpu: 0.1
requestsMemory: 1G
limitsCpu: 4
limitsMemory: 2G

env:
DJANGO_DEBUG: false
# Misc
Expand Down
9 changes: 9 additions & 0 deletions helm/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ argoHooks:
hook: PostSync
preserveHistory: true
command: ["./manage.py", "migrate"]
requestsCpu: 0.1
requestsMemory: 1G
limitsCpu: 4
limitsMemory: 2G
collect-static:
enabled: true
hook: PostSync
Expand Down Expand Up @@ -129,6 +133,11 @@ env:
# Sentry
SENTRY_TRACES_SAMPLE_RATE: 0.2
SENTRY_PROFILE_SAMPLE_RATE: 0.2
# Azure configs TODO: Add support for AZURE_STORAGE_TOKEN_CREDENTIAL
USE_AZURE_STORAGE: false # NOTE: Make sure this is boolean and not string
AZURE_STORAGE_MEDIA_CONTAINER:
AZURE_STORAGE_STATIC_CONTAINER:
AZURE_STORAGE_ACCOUNT_NAME:
# NOTE: Used to pass additional configs to api/worker containers
# NOTE: Not used by azure vault
envAdditional:
Expand Down
4 changes: 2 additions & 2 deletions main/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,12 @@ def on_configure(self): # type: ignore[reportIncompatibleVariableOverride]
},
f'{INTERNAL_CELERY_TASK_NAME_PREFIX}send_weekly_user_alert_subscriptions_email': {
'task': 'apps.subscription.tasks.send_weekly_user_alert_subscriptions_email',
'schedule': crontab(day_of_week='monday'),
'schedule': crontab(minute=1, hour=1, day_of_week='monday'),
'options': {'queue': 'default'},
},
f'{INTERNAL_CELERY_TASK_NAME_PREFIX}send_monthly_user_alert_subscriptions_email': {
'task': 'apps.subscription.tasks.send_monthly_user_alert_subscriptions_email',
'schedule': crontab(day_of_month='1'),
'schedule': crontab(minute=1, hour=1, day_of_month='1'),
'options': {'queue': 'default'},
},
f'{INTERNAL_CELERY_TASK_NAME_PREFIX}uptime_push': {
Expand Down

0 comments on commit 2a8cbe0

Please sign in to comment.