Skip to content

Commit

Permalink
Add multiple queue: feeds and default
Browse files Browse the repository at this point in the history
  • Loading branch information
thenav56 committed Dec 6, 2024
1 parent cc60125 commit a8982af
Show file tree
Hide file tree
Showing 8 changed files with 76 additions and 49 deletions.
5 changes: 5 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,8 @@ repos:
rev: v1.1.390
hooks:
- id: pyright

# - repo: https://github.com/gruntwork-io/pre-commit
# rev: v0.1.15
# hooks:
# - id: helmlint
4 changes: 1 addition & 3 deletions deploy/run_worker.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,4 @@ ROOT_DIR=$(dirname "$BASE_DIR")

cd $ROOT_DIR

# concurrency: Number of workers
# max-tasks-per-child: Max number of tasks a worker can run before it is terminated
celery -A main worker -l info --concurrency 4 --max-tasks-per-child 10
celery -A main worker -l info $CELERY_ARGS
1 change: 1 addition & 0 deletions helm/linter_values.yaml
7 changes: 1 addition & 6 deletions helm/templates/worker-beat/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,7 @@ spec:
command: ["/code/deploy/run_worker_beat.sh"]
# TODO: livenessProbe
resources:
requests:
cpu: {{ .Values.worker.beat.resources.requests.cpu }}
memory: {{ .Values.worker.beat.resources.requests.memory }}
limits:
cpu: {{ .Values.worker.beat.resources.limits.cpu }}
memory: {{ .Values.worker.beat.resources.limits.memory }}
{{- toYaml .Values.worker.beat.resources | nindent 12 }}
env:
- name: DJANGO_APP_TYPE
value: worker
Expand Down
39 changes: 24 additions & 15 deletions helm/templates/worker/deployment.yaml
Original file line number Diff line number Diff line change
@@ -1,52 +1,61 @@
{{- if .Values.worker.enabled }}

{{- range $queue_name, $config := .Values.worker.queues }}

apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ template "ifrcgo-alert-hub.fullname" . }}-worker
name: {{ template "ifrcgo-alert-hub.fullname" $ }}-worker-{{ $queue_name }}
labels:
app: {{ include "ifrcgo-alert-hub.fullname" . }}
app: {{ include "ifrcgo-alert-hub.fullname" $ }}
component: worker
environment: {{ .Values.environment }}
release: {{ .Release.Name }}
queue: {{ $queue_name }}
environment: {{ $.Values.environment }}
release: {{ $.Release.Name }}
annotations:
reloader.stakater.com/auto: "true"
spec:
replicas: {{ .Values.worker.replicaCount }}
replicas: {{ $config.replicaCount }}
selector:
matchLabels:
app: {{ include "ifrcgo-alert-hub.fullname" . }}
app: {{ include "ifrcgo-alert-hub.fullname" $ }}
component: worker
queue: {{ $queue_name }}
template:
metadata:
{{- if not .Values.azure.aksSecretsProviderAvailable }}
{{- if not $.Values.azure.aksSecretsProviderAvailable }}
annotations:
checksum/secret: {{ include (print .Template.BasePath "/config/secret.yaml") . | sha256sum }}
checksum/configmap: {{ include (print .Template.BasePath "/config/configmap.yaml") . | sha256sum }}
checksum/secret: {{ include (print $.Template.BasePath "/config/secret.yaml") $ | sha256sum }}
checksum/configmap: {{ include (print $.Template.BasePath "/config/configmap.yaml") $ | sha256sum }}
{{- end }}
labels:
app: {{ include "ifrcgo-alert-hub.fullname" . }}
app: {{ include "ifrcgo-alert-hub.fullname" $ }}
component: worker
queue: {{ $queue_name }}
spec:
containers:
- name: worker
image: "{{ .Values.image.name }}:{{ .Values.image.tag }}"
image: "{{ $.Values.image.name }}:{{ $.Values.image.tag }}"
command: ["/code/deploy/run_worker.sh"]
# TODO: livenessProbe
resources:
{{- toYaml .Values.worker.resources | nindent 12 }}
{{- toYaml $config.resources | nindent 12 }}
env:
- name: DJANGO_APP_TYPE
value: worker
- name: CELERY_ARGS
value: {{ required "worker.queues.*.celeryArgs" $config.celeryArgs | quote }}
envFrom:
- secretRef:
name: {{ template "ifrcgo-alert-hub.secretname" . }}
name: {{ template "ifrcgo-alert-hub.secretname" $ }}
- configMapRef:
name: {{ template "ifrcgo-alert-hub.fullname" . }}-api-configmap
name: {{ template "ifrcgo-alert-hub.fullname" $ }}-api-configmap

{{- with .Values.imagePullSecrets }}
{{- with $.Values.imagePullSecrets }}
imagePullSecrets:
{{- toYaml . | nindent 8 }}
{{- end }}

{{- end }}

{{- end }}
2 changes: 2 additions & 0 deletions helm/values-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ secretsAdditional:
MAGIC_KEY: to-much-fun

argoHooks:
db-migrate:
preserveHistory: false
custom-python-script-01:
enabled: true
hook: PostSync
Expand Down
32 changes: 24 additions & 8 deletions helm/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,6 @@ api:

worker:
enabled: true
replicaCount: 1
resources:
requests:
cpu: "1"
memory: 1Gi
limits:
cpu: "1"
memory: 2Gi
beat:
resources:
requests:
Expand All @@ -69,6 +61,30 @@ worker:
limits:
cpu: "1"
memory: 1Gi
queues:
# NOTE: Make sure keys are lowercase
default:
enabled: true
replicaCount: 1
celeryArgs: "-Q default --concurrency 4 --max-tasks-per-child 10"
resources:
requests:
cpu: "1"
memory: 1Gi
limits:
cpu: "1"
memory: 2Gi
feeds:
enabled: true
replicaCount: 2
celeryArgs: "-Q feeds --concurrency 4 --max-tasks-per-child 10 --soft-time-limit 300 --time-limit 500"
resources:
requests:
cpu: "2"
memory: 1Gi
limits:
cpu: "1"
memory: 2Gi

argoHooks:
# NOTE: Make sure keys are lowercase
Expand Down
35 changes: 18 additions & 17 deletions main/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def on_configure(self): # type: ignore[reportIncompatibleVariableOverride]
},
f'{INTERNAL_CELERY_TASK_NAME_PREFIX}process_pending_subscription_alerts': {
'task': 'apps.subscription.tasks.process_pending_subscription_alerts',
'schedule': timedelta(minutes=30), # TODO: Lower this?
'schedule': timedelta(minutes=10),
'options': {'queue': 'default'},
},
f'{INTERNAL_CELERY_TASK_NAME_PREFIX}send_daily_user_alert_subscriptions_email': {
Expand All @@ -65,6 +65,18 @@ def on_configure(self): # type: ignore[reportIncompatibleVariableOverride]
},
}


class CeleryQueue:
# NOTE: Make sure all queue names are lowercase (They are in k8s)
default = Queue('default')
feeds = Queue('feeds')

ALL_QUEUE = (
default,
feeds,
)


# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
Expand All @@ -74,24 +86,13 @@ def on_configure(self): # type: ignore[reportIncompatibleVariableOverride]
# Load task modules from all registered Django apps.
app.autodiscover_tasks()

app.conf.task_default_queue = 'default'
app.conf.task_queues = (Queue('default', routing_key='poll.#', exchange='poll'),)
app.conf.task_default_exchange = 'poll'
app.conf.task_default_exchange_type = 'topic'
app.conf.task_default_routing_key = 'poll.default'
app.conf.result_expires = settings.CELERY_TASK_EXPIRE
app.conf.task_default_queue = CeleryQueue.default.name

task_routes = {
'apps.cap_feed.tasks.poll_feed': {
'queue': 'default',
'routing_key': 'poll.#',
'exchange': 'poll',
},
'apps.cap_feed.tasks.tag_expired_alerts': {
'queue': 'default',
'routing_key': 'poll.#',
'exchange': 'poll',
},
app.conf.task_queues = CeleryQueue.ALL_QUEUE

app.conf.task_routes = {
'apps.cap_feed.tasks.poll_feed': {'queue': CeleryQueue.feeds},
}


Expand Down

0 comments on commit a8982af

Please sign in to comment.