Skip to content

Commit

Permalink
Implement multiple task queues (#813)
Browse files Browse the repository at this point in the history
* Implement multiple queues for background tasks

* fixes

* fixes

* fix get_pending_tasks docstring paste-o

* cleanup

mcweb/backend/sources/tasks.py: restore docstring
mcweb/backend/util/tasks.py: define SYSTEM_SLOW as 'system-slow'

---------

Authored-by: Phil Budne <[email protected]>
  • Loading branch information
philbudne authored Oct 11, 2024
1 parent c358c61 commit 344dfce
Show file tree
Hide file tree
Showing 6 changed files with 192 additions and 122 deletions.
13 changes: 11 additions & 2 deletions Procfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,13 @@
# if you add new lines to this file, add them to the ps:scale command in dokku-scripts/push.sh!
web: bin/start-pgbouncer gunicorn --timeout 500 --pythonpath mcweb --bind 0.0.0.0:$PORT mcweb.wsgi
# can run multiple queues/workers w/ --queue "name":
worker: python mcweb/manage.py process_tasks
#
# workers for queues defined in mcweb/backend/util/task_queues.py:
# NOTE! names must end in "worker" for dokku-scripts/push.sh!!!
#
# painful to delete old "procs", so "worker" runs system-slow queue
worker: python mcweb/manage.py process_tasks --queue system-slow
system-fast-worker: python mcweb/manage.py process_tasks --queue system-fast
admin-slow-worker: python mcweb/manage.py process_tasks --queue admin-slow
admin-fast-worker: python mcweb/manage.py process_tasks --queue admin-fast
user-slow-worker: python mcweb/manage.py process_tasks --queue user-slow
user-fast-worker: python mcweb/manage.py process_tasks --queue user-fast
5 changes: 4 additions & 1 deletion dokku-scripts/push.sh
Original file line number Diff line number Diff line change
Expand Up @@ -358,8 +358,11 @@ prod)
;;
esac

# extract list of task workers from Procfile
TASK_WORKERS=$(grep -o '^[a-z-]*worker:' Procfile | sed 's/:/=1/')

# add new Procfile entries to next line!!
GOALS="web=$WEB_PROCS worker=1"
GOALS="web=$WEB_PROCS $TASK_WORKERS"

# avoid unnecessary redeploys
SCALE=$(dokku ps:scale $APP | awk -v "goals=$GOALS" -f $SCRIPT_DIR/scale.awk)
Expand Down
15 changes: 9 additions & 6 deletions mcweb/backend/search/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

# PyPI
import mc_providers
from background_task import background

# mcweb/backend/search (local directory)
from .utils import (
Expand All @@ -25,7 +24,11 @@

# mcweb/backend
from ..users.models import QuotaHistory
from ..sources.tasks import _return_task
from backend.util.tasks import (
USER_SLOW,
background,
return_task
)

# mcweb/util
from util.send_emails import send_zipped_large_download_email
Expand All @@ -37,9 +40,9 @@
# by frontend sendTotalAttentionDataEmail
def download_all_large_content_csv(queryState: list[dict], user_id: int, user_isStaff: bool, email: str):
task = _download_all_large_content_csv(queryState, user_id, user_isStaff, email)
return {'task': _return_task(task)}
return {'task': return_task(task)} # XXX double wraps {task: {task: TASK_DATA}}??

@background(remove_existing_tasks=True)
@background(queue=USER_SLOW, remove_existing_tasks=True)
def _download_all_large_content_csv(queryState: list[dict], user_id: int, user_isStaff: bool, email: str):
parsed_queries = [parsed_query_from_dict(q) for q in queryState]
# code from: https://stackoverflow.com/questions/17584550/attach-generated-csv-file-to-email-and-send-with-django
Expand Down Expand Up @@ -100,7 +103,7 @@ def _download_all_large_content_csv(queryState: list[dict], user_id: int, user_i

def download_all_queries_csv_task(data, request):
task = _download_all_queries_csv(data, request.user.id, request.user.is_staff, request.user.email)
return {'task': _return_task(task)}
return {'task': return_task(task)} # XXX double wraps {task: {task: TASK_DATA}}??

# Phil writes: As I found it, this function used query.thing, which I
# don't think could have worked (was a regular tuple)! It also (and
Expand All @@ -114,7 +117,7 @@ def download_all_queries_csv_task(data, request):

# All of the above makes me think this is dead code!

@background(remove_existing_tasks=True)
@background(queue=USER_SLOW, remove_existing_tasks=True)
def _download_all_queries_csv(data: list[ParsedQuery], user_id, is_staff, email):
for pq in data:
provider = pq_provider(pq)
Expand Down
43 changes: 27 additions & 16 deletions mcweb/backend/sources/api.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,43 @@
import time
# Python
import datetime as dt
import json
import os
import time
from typing import List, Optional
from urllib.parse import urlparse, parse_qs

# PyPI
import mcmetadata.urls as urls
import requests
import requests.auth
import datetime as dt
from urllib.parse import urlparse, parse_qs
from django.db.models import Count
from django.db.models import Case, Count, When, Q
from django.shortcuts import get_object_or_404
from rest_framework.response import Response
from mc_providers import PLATFORM_REDDIT, PLATFORM_TWITTER, PLATFORM_YOUTUBE
from rest_framework import viewsets, permissions
from rest_framework.decorators import action, permission_classes
from rest_framework.exceptions import APIException
from rest_framework.permissions import IsAuthenticated
from django.db.models import Case, When, Q
from rest_framework import viewsets, permissions
import mcmetadata.urls as urls
from rest_framework.renderers import JSONRenderer
from typing import List, Optional
from rest_framework.exceptions import APIException
from .serializer import CollectionSerializer, FeedSerializer, SourceSerializer, SourcesViewSerializer, CollectionWriteSerializer
from backend.util import csv_stream
from rest_framework.response import Response

# mcweb
from settings import RSS_FETCHER_URL, RSS_FETCHER_USER, RSS_FETCHER_PASS # mcweb.settings

# mcweb/util
from util.cache import cache_by_kwargs
from util.send_emails import send_source_upload_email

# mcweb/backend/util
from backend.util import csv_stream
from backend.util.tasks import get_completed_tasks, get_pending_tasks

# local directory (mcweb/backend/sources)
from .serializer import CollectionSerializer, FeedSerializer, SourceSerializer, SourcesViewSerializer, CollectionWriteSerializer
from .models import Collection, Feed, Source
from .permissions import IsGetOrIsStaff
from .rss_fetcher_api import RssFetcherApi
from util.send_emails import send_source_upload_email
from .tasks import schedule_scrape_source, schedule_scrape_collection

from mc_providers import PLATFORM_REDDIT, PLATFORM_TWITTER, PLATFORM_YOUTUBE
from .tasks import schedule_scrape_source, get_completed_tasks, get_pending_tasks, schedule_scrape_collection
from settings import RSS_FETCHER_URL, RSS_FETCHER_USER, RSS_FETCHER_PASS # mcweb.settings

def _featured_collection_ids(platform: Optional[str]) -> List:
this_dir = os.path.dirname(os.path.realpath(__file__))
Expand Down
122 changes: 25 additions & 97 deletions mcweb/backend/sources/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,27 @@
import traceback

# PyPI:
from background_task import background
from background_task.models import Task, CompletedTask
from mcmetadata.feeds import normalize_url
from django.core.management import call_command
from django.contrib.auth.models import User
from django.utils import timezone
#import pandas as pd # not currently used
import numpy as np

# mcweb/backend/sources
from .models import Feed, Source, Collection
from .rss_fetcher_api import RssFetcherApi

# mcweb/backend/util
from backend.util.tasks import (
SYSTEM_FAST,
SYSTEM_SLOW,
ADMIN_FAST,
ADMIN_SLOW,
background,
return_error,
return_task
)

# mcweb/
from util.send_emails import send_alert_email, send_rescrape_email
from settings import (
Expand All @@ -48,28 +56,7 @@

logger = logging.getLogger(__name__)

# see https://django-background-tasks.readthedocs.io/en/latest/

# @background decorator takes optional arguments:
# queue='queue-name'
# schedule=TIME (seconds delay, timedelta, django.utils.timezone.now())
# name=None (defaults to "module.function")
# remove_existing_tasks=False
# if True, all unlocked tasks with the identical task hash
# (based on task name and args ONLY) will be removed.
#
# calling decorated function schedules a background task, can be passed:
# verbose_name="description"
# creator=user (any model object?)
# repeat=SECONDS, repeat_until=Optional[dt.datetime]
# available constants for repeat: Task.NEVER, Task.HOURLY,
# Task.DAILY, Task.WEEKLY, Task.EVERY_2_WEEKS, Task.EVERY_4_WEEKS
# decorated function return value is not saved!
# and returns a background_task.models.Task object.
#
# calling decorated_function.now() invokes decorated function synchronously.

@background()
@background(queue=ADMIN_FAST) # admin user initiated
def _scrape_source(source_id: int, homepage: str, name: str, user_email: str) -> None:
t0 = time.monotonic()
logger.info(f"==== starting _scrape_source {source_id} ({name}) {homepage} for {user_email}")
Expand Down Expand Up @@ -104,9 +91,7 @@ def _add_scrape_error_rcpts(users: list[str]) -> None:
if u not in users:
users.append(u)

# Phil: this could take quite a while;
# pass queue="slow-lane" to decorator (and run another process_tasks worker in Procfile)??
@background()
@background(queue=ADMIN_SLOW) # admin user initiated
def _scrape_collection(collection_id: int, user_email: str) -> None:
t0 = time.monotonic()
logger.info(f"==== starting _scrape_collection({collection_id}) for {user_email}")
Expand All @@ -115,7 +100,7 @@ def _scrape_collection(collection_id: int, user_email: str) -> None:
if not collection:
# now checked in schedule_scrape_collection
logger.error(f"_scrape_collection collection {collection_id} not found")
# was _return_error here, but could not be seen! check done in caller.
# was return_error here, but could not be seen! check done in caller.
return

sources = collection.source_set.all()
Expand Down Expand Up @@ -160,14 +145,6 @@ def add_body_chunk(chunk):

logger.info(f"==== finished _scrape_collection({collection.id}, {collection.name}) for {user_email} in {sec:.3f}")

# Phil: not used:
#run_at = dt.time(hour=14, minute=32)
## Calculate the number of days until next Friday
#today = dt.date.today()
#days_until_friday = (4 - today.weekday()) % 7
## Calculate the datetime when the task should run
#next_friday = today + dt.timedelta(days=days_until_friday)
#run_datetime = dt.datetime.combine(next_friday, run_at)

def run_alert_system():
user = User.objects.get(username=ALERTS_TASK_USERNAME)
Expand All @@ -179,12 +156,12 @@ def run_alert_system():
creator= user,
verbose_name=f"source alert system {dt.datetime.now()}",
remove_existing_tasks=True)
return _return_task(task)
return return_task(task)

def _rss_fetcher_api():
return RssFetcherApi(RSS_FETCHER_URL, RSS_FETCHER_USER, RSS_FETCHER_PASS)

@background()
@background(queue=SYSTEM_SLOW)
def _alert_system(collection_ids):
sources = set()
for collection_id in collection_ids:
Expand Down Expand Up @@ -276,17 +253,16 @@ def update_stories_per_week():
creator= user,
verbose_name=f"update stories per week {dt.datetime.now()}",
remove_existing_tasks=True)
return _return_task(task)
return return_task(task)

@background()
@background(queue=SYSTEM_FAST)
def _update_stories_counts():
with _rss_fetcher_api() as rss:
stories_by_source = rss.stories_by_source() # This will generate tuples with (source_id and stories_per_day)
for source_tuple in stories_by_source:
source_id, stories_per_day = source_tuple
print(source_id, stories_per_day)
weekly_count = int(stories_per_day * 7)
print(weekly_count)
print(source_id, stories_per_day, weekly_count)
Source.update_stories_per_week(int(source_id), weekly_count)


Expand All @@ -299,43 +275,17 @@ def _calculate_stories_last_week(stories_fetched):
sum_count = sum(day_data['stories'] for day_data in last_7_days_data)
return sum_count

def _serialize_task(task):
"""
helper to return JSON representation of a Task.
"""
# probably should return a subset of fields?
# (or provide a serializer?)
return { key: (value.isoformat() if isinstance(value, dt.datetime) else value)
for key, value in task.__dict__.items() if key[0] != '_' }

_serialize_completed_task = _serialize_task

def _return_error(message):
"""
helper to return JSON representation for an error
"""
logger.info(f"_return_error {message}")
return {'error': message}

def _return_task(task):
"""
formulate "task" return (analagous to _return_error)
returns dict that "task" with serialized task
"""
return {'task': _serialize_task(task)}

def schedule_scrape_collection(collection_id, user):
"""
call this function from a view action to schedule a (re)scrape for a collection
"""
collection = Collection.objects.get(id=collection_id)
if not collection:
return _return_error(f"collection {collection_id} not found")
return return_error(f"collection {collection_id} not found")

name_or_id = collection.name or str(collection_id)
task = _scrape_collection(collection_id, user.email, creator=user, verbose_name=f"rescrape collection {name_or_id}", remove_existing_tasks=True)

return _return_task(task)
return return_task(task)


def schedule_scrape_source(source_id, user):
Expand All @@ -344,13 +294,13 @@ def schedule_scrape_source(source_id, user):
"""
source = Source.objects.get(id=source_id)
if not source:
return _return_error(f"source {source_id} not found")
return return_error(f"source {source_id} not found")

if not source.homepage:
return _return_error(f"source {source_id} missing homepage")
return return_error(f"source {source_id} missing homepage")

if source.url_search_string:
return _return_error(f"source {source_id} has url_search_string")
return return_error(f"source {source_id} has url_search_string")

# maybe check if re-scraped recently????

Expand All @@ -363,27 +313,5 @@ def schedule_scrape_source(source_id, user):
creator=user,
verbose_name=f"rescrape source {name_or_home}",
remove_existing_tasks=True)
return _return_task(task)



def get_completed_tasks(user):
"""
return ALL completed tasks.
If user set, return just tasks for that user.
"""
tasks = CompletedTask.objects
if user:
tasks = tasks.created_by(user)
return {'completed_tasks': [_serialize_completed_task(task) for task in tasks]}

return return_task(task)

def get_pending_tasks(user):
"""
return ALL pending tasks.
If user set, return just tasks for that user.
"""
tasks = Task.objects
if user:
tasks = tasks.created_by(user)
return {'tasks': [_serialize_task(task) for task in tasks]}
Loading

0 comments on commit 344dfce

Please sign in to comment.