diff --git a/Procfile b/Procfile index 3ff9146f5..39e9b4924 100644 --- a/Procfile +++ b/Procfile @@ -1,4 +1,13 @@ # if you add new lines to this file, add them to the ps:scale command in dokku-scripts/push.sh! web: bin/start-pgbouncer gunicorn --timeout 500 --pythonpath mcweb --bind 0.0.0.0:$PORT mcweb.wsgi -# can run multiple queues/workers w/ --queue "name": -worker: python mcweb/manage.py process_tasks +# +# workers for queues defined in mcweb/backend/util/task_queues.py: +# NOTE! names must end in "worker" for dokku-scripts/push.sh!!! +# +# painful to delete old "procs", so "worker" runs system-slow queue +worker: python mcweb/manage.py process_tasks --queue system-slow +system-fast-worker: python mcweb/manage.py process_tasks --queue system-fast +admin-slow-worker: python mcweb/manage.py process_tasks --queue admin-slow +admin-fast-worker: python mcweb/manage.py process_tasks --queue admin-fast +user-slow-worker: python mcweb/manage.py process_tasks --queue user-slow +user-fast-worker: python mcweb/manage.py process_tasks --queue user-fast diff --git a/dokku-scripts/push.sh b/dokku-scripts/push.sh index 397f78428..28866b527 100755 --- a/dokku-scripts/push.sh +++ b/dokku-scripts/push.sh @@ -358,8 +358,11 @@ prod) ;; esac +# extract list of task workers from Procfile +TASK_WORKERS=$(grep -o '^[a-z-]*worker:' Procfile | sed 's/:/=1/') + # add new Procfile entries to next line!! -GOALS="web=$WEB_PROCS worker=1" +GOALS="web=$WEB_PROCS $TASK_WORKERS" # avoid unnecessary redeploys SCALE=$(dokku ps:scale $APP | awk -v "goals=$GOALS" -f $SCRIPT_DIR/scale.awk) diff --git a/mcweb/backend/search/tasks.py b/mcweb/backend/search/tasks.py index f727885fe..64fe8fb10 100644 --- a/mcweb/backend/search/tasks.py +++ b/mcweb/backend/search/tasks.py @@ -11,7 +11,6 @@ # PyPI import mc_providers -from background_task import background # mcweb/backend/search (local directory) from .utils import ( @@ -25,7 +24,11 @@ # mcweb/backend from ..users.models import QuotaHistory -from ..sources.tasks import _return_task +from backend.util.tasks import ( + USER_SLOW, + background, + return_task +) # mcweb/util from util.send_emails import send_zipped_large_download_email @@ -37,9 +40,9 @@ # by frontend sendTotalAttentionDataEmail def download_all_large_content_csv(queryState: list[dict], user_id: int, user_isStaff: bool, email: str): task = _download_all_large_content_csv(queryState, user_id, user_isStaff, email) - return {'task': _return_task(task)} + return {'task': return_task(task)} # XXX double wraps {task: {task: TASK_DATA}}?? -@background(remove_existing_tasks=True) +@background(queue=USER_SLOW, remove_existing_tasks=True) def _download_all_large_content_csv(queryState: list[dict], user_id: int, user_isStaff: bool, email: str): parsed_queries = [parsed_query_from_dict(q) for q in queryState] # code from: https://stackoverflow.com/questions/17584550/attach-generated-csv-file-to-email-and-send-with-django @@ -100,7 +103,7 @@ def _download_all_large_content_csv(queryState: list[dict], user_id: int, user_i def download_all_queries_csv_task(data, request): task = _download_all_queries_csv(data, request.user.id, request.user.is_staff, request.user.email) - return {'task': _return_task(task)} + return {'task': return_task(task)} # XXX double wraps {task: {task: TASK_DATA}}?? # Phil writes: As I found it, this function used query.thing, which I # don't think could have worked (was a regular tuple)! It also (and @@ -114,7 +117,7 @@ def download_all_queries_csv_task(data, request): # All of the above makes me think this is dead code! -@background(remove_existing_tasks=True) +@background(queue=USER_SLOW, remove_existing_tasks=True) def _download_all_queries_csv(data: list[ParsedQuery], user_id, is_staff, email): for pq in data: provider = pq_provider(pq) diff --git a/mcweb/backend/sources/api.py b/mcweb/backend/sources/api.py index 80a45ac32..c86b6f2b5 100644 --- a/mcweb/backend/sources/api.py +++ b/mcweb/backend/sources/api.py @@ -1,32 +1,43 @@ -import time +# Python +import datetime as dt import json import os +import time +from typing import List, Optional +from urllib.parse import urlparse, parse_qs + +# PyPI +import mcmetadata.urls as urls import requests import requests.auth -import datetime as dt -from urllib.parse import urlparse, parse_qs -from django.db.models import Count +from django.db.models import Case, Count, When, Q from django.shortcuts import get_object_or_404 -from rest_framework.response import Response +from mc_providers import PLATFORM_REDDIT, PLATFORM_TWITTER, PLATFORM_YOUTUBE +from rest_framework import viewsets, permissions from rest_framework.decorators import action, permission_classes +from rest_framework.exceptions import APIException from rest_framework.permissions import IsAuthenticated -from django.db.models import Case, When, Q -from rest_framework import viewsets, permissions -import mcmetadata.urls as urls from rest_framework.renderers import JSONRenderer -from typing import List, Optional -from rest_framework.exceptions import APIException -from .serializer import CollectionSerializer, FeedSerializer, SourceSerializer, SourcesViewSerializer, CollectionWriteSerializer -from backend.util import csv_stream +from rest_framework.response import Response + +# mcweb +from settings import RSS_FETCHER_URL, RSS_FETCHER_USER, RSS_FETCHER_PASS # mcweb.settings + +# mcweb/util from util.cache import cache_by_kwargs +from util.send_emails import send_source_upload_email + +# mcweb/backend/util +from backend.util import csv_stream +from backend.util.tasks import get_completed_tasks, get_pending_tasks + +# local directory (mcweb/backend/sources) +from .serializer import CollectionSerializer, FeedSerializer, SourceSerializer, SourcesViewSerializer, CollectionWriteSerializer from .models import Collection, Feed, Source from .permissions import IsGetOrIsStaff from .rss_fetcher_api import RssFetcherApi -from util.send_emails import send_source_upload_email +from .tasks import schedule_scrape_source, schedule_scrape_collection -from mc_providers import PLATFORM_REDDIT, PLATFORM_TWITTER, PLATFORM_YOUTUBE -from .tasks import schedule_scrape_source, get_completed_tasks, get_pending_tasks, schedule_scrape_collection -from settings import RSS_FETCHER_URL, RSS_FETCHER_USER, RSS_FETCHER_PASS # mcweb.settings def _featured_collection_ids(platform: Optional[str]) -> List: this_dir = os.path.dirname(os.path.realpath(__file__)) diff --git a/mcweb/backend/sources/tasks.py b/mcweb/backend/sources/tasks.py index c94eea8ff..f6a137a65 100644 --- a/mcweb/backend/sources/tasks.py +++ b/mcweb/backend/sources/tasks.py @@ -12,19 +12,27 @@ import traceback # PyPI: -from background_task import background -from background_task.models import Task, CompletedTask from mcmetadata.feeds import normalize_url from django.core.management import call_command from django.contrib.auth.models import User from django.utils import timezone -#import pandas as pd # not currently used import numpy as np # mcweb/backend/sources from .models import Feed, Source, Collection from .rss_fetcher_api import RssFetcherApi +# mcweb/backend/util +from backend.util.tasks import ( + SYSTEM_FAST, + SYSTEM_SLOW, + ADMIN_FAST, + ADMIN_SLOW, + background, + return_error, + return_task +) + # mcweb/ from util.send_emails import send_alert_email, send_rescrape_email from settings import ( @@ -48,28 +56,7 @@ logger = logging.getLogger(__name__) -# see https://django-background-tasks.readthedocs.io/en/latest/ - -# @background decorator takes optional arguments: -# queue='queue-name' -# schedule=TIME (seconds delay, timedelta, django.utils.timezone.now()) -# name=None (defaults to "module.function") -# remove_existing_tasks=False -# if True, all unlocked tasks with the identical task hash -# (based on task name and args ONLY) will be removed. -# -# calling decorated function schedules a background task, can be passed: -# verbose_name="description" -# creator=user (any model object?) -# repeat=SECONDS, repeat_until=Optional[dt.datetime] -# available constants for repeat: Task.NEVER, Task.HOURLY, -# Task.DAILY, Task.WEEKLY, Task.EVERY_2_WEEKS, Task.EVERY_4_WEEKS -# decorated function return value is not saved! -# and returns a background_task.models.Task object. -# -# calling decorated_function.now() invokes decorated function synchronously. - -@background() +@background(queue=ADMIN_FAST) # admin user initiated def _scrape_source(source_id: int, homepage: str, name: str, user_email: str) -> None: t0 = time.monotonic() logger.info(f"==== starting _scrape_source {source_id} ({name}) {homepage} for {user_email}") @@ -104,9 +91,7 @@ def _add_scrape_error_rcpts(users: list[str]) -> None: if u not in users: users.append(u) -# Phil: this could take quite a while; -# pass queue="slow-lane" to decorator (and run another process_tasks worker in Procfile)?? -@background() +@background(queue=ADMIN_SLOW) # admin user initiated def _scrape_collection(collection_id: int, user_email: str) -> None: t0 = time.monotonic() logger.info(f"==== starting _scrape_collection({collection_id}) for {user_email}") @@ -115,7 +100,7 @@ def _scrape_collection(collection_id: int, user_email: str) -> None: if not collection: # now checked in schedule_scrape_collection logger.error(f"_scrape_collection collection {collection_id} not found") - # was _return_error here, but could not be seen! check done in caller. + # was return_error here, but could not be seen! check done in caller. return sources = collection.source_set.all() @@ -160,14 +145,6 @@ def add_body_chunk(chunk): logger.info(f"==== finished _scrape_collection({collection.id}, {collection.name}) for {user_email} in {sec:.3f}") -# Phil: not used: -#run_at = dt.time(hour=14, minute=32) -## Calculate the number of days until next Friday -#today = dt.date.today() -#days_until_friday = (4 - today.weekday()) % 7 -## Calculate the datetime when the task should run -#next_friday = today + dt.timedelta(days=days_until_friday) -#run_datetime = dt.datetime.combine(next_friday, run_at) def run_alert_system(): user = User.objects.get(username=ALERTS_TASK_USERNAME) @@ -179,12 +156,12 @@ def run_alert_system(): creator= user, verbose_name=f"source alert system {dt.datetime.now()}", remove_existing_tasks=True) - return _return_task(task) + return return_task(task) def _rss_fetcher_api(): return RssFetcherApi(RSS_FETCHER_URL, RSS_FETCHER_USER, RSS_FETCHER_PASS) -@background() +@background(queue=SYSTEM_SLOW) def _alert_system(collection_ids): sources = set() for collection_id in collection_ids: @@ -276,17 +253,16 @@ def update_stories_per_week(): creator= user, verbose_name=f"update stories per week {dt.datetime.now()}", remove_existing_tasks=True) - return _return_task(task) + return return_task(task) -@background() +@background(queue=SYSTEM_FAST) def _update_stories_counts(): with _rss_fetcher_api() as rss: stories_by_source = rss.stories_by_source() # This will generate tuples with (source_id and stories_per_day) for source_tuple in stories_by_source: source_id, stories_per_day = source_tuple - print(source_id, stories_per_day) weekly_count = int(stories_per_day * 7) - print(weekly_count) + print(source_id, stories_per_day, weekly_count) Source.update_stories_per_week(int(source_id), weekly_count) @@ -299,43 +275,17 @@ def _calculate_stories_last_week(stories_fetched): sum_count = sum(day_data['stories'] for day_data in last_7_days_data) return sum_count -def _serialize_task(task): - """ - helper to return JSON representation of a Task. - """ - # probably should return a subset of fields? - # (or provide a serializer?) - return { key: (value.isoformat() if isinstance(value, dt.datetime) else value) - for key, value in task.__dict__.items() if key[0] != '_' } - -_serialize_completed_task = _serialize_task - -def _return_error(message): - """ - helper to return JSON representation for an error - """ - logger.info(f"_return_error {message}") - return {'error': message} - -def _return_task(task): - """ - formulate "task" return (analagous to _return_error) - returns dict that "task" with serialized task - """ - return {'task': _serialize_task(task)} - def schedule_scrape_collection(collection_id, user): """ call this function from a view action to schedule a (re)scrape for a collection """ collection = Collection.objects.get(id=collection_id) if not collection: - return _return_error(f"collection {collection_id} not found") + return return_error(f"collection {collection_id} not found") name_or_id = collection.name or str(collection_id) task = _scrape_collection(collection_id, user.email, creator=user, verbose_name=f"rescrape collection {name_or_id}", remove_existing_tasks=True) - - return _return_task(task) + return return_task(task) def schedule_scrape_source(source_id, user): @@ -344,13 +294,13 @@ def schedule_scrape_source(source_id, user): """ source = Source.objects.get(id=source_id) if not source: - return _return_error(f"source {source_id} not found") + return return_error(f"source {source_id} not found") if not source.homepage: - return _return_error(f"source {source_id} missing homepage") + return return_error(f"source {source_id} missing homepage") if source.url_search_string: - return _return_error(f"source {source_id} has url_search_string") + return return_error(f"source {source_id} has url_search_string") # maybe check if re-scraped recently???? @@ -363,27 +313,5 @@ def schedule_scrape_source(source_id, user): creator=user, verbose_name=f"rescrape source {name_or_home}", remove_existing_tasks=True) - return _return_task(task) - - - -def get_completed_tasks(user): - """ - return ALL completed tasks. - If user set, return just tasks for that user. - """ - tasks = CompletedTask.objects - if user: - tasks = tasks.created_by(user) - return {'completed_tasks': [_serialize_completed_task(task) for task in tasks]} - + return return_task(task) -def get_pending_tasks(user): - """ - return ALL pending tasks. - If user set, return just tasks for that user. - """ - tasks = Task.objects - if user: - tasks = tasks.created_by(user) - return {'tasks': [_serialize_task(task) for task in tasks]} diff --git a/mcweb/backend/util/tasks.py b/mcweb/backend/util/tasks.py new file mode 100644 index 000000000..d9834c12a --- /dev/null +++ b/mcweb/backend/util/tasks.py @@ -0,0 +1,116 @@ +""" +Utilities for background tasks +""" + +# Python +import datetime as dt +import logging + +# PyPI +import background_task +from background_task.models import Task, CompletedTask + +logger = logging.getLogger(__name__) + +# NOTE! To enable/add a new queue you must also add a new line to Procfile!! + +# periodic system tasks, not launched on demand +SYSTEM_FAST = 'system-fast' # eg stories/week update +SYSTEM_SLOW = 'system-slow' # eg sources alerts + +# admin/collections tasks, launched on demand +ADMIN_FAST = 'admin-fast' # eg scrape source +ADMIN_SLOW = 'admin-slow' # eg scrape collection + +# ordinary user tasks, launched on demand +USER_FAST = 'user-fast' +USER_SLOW = 'user-slow' # eg email query results + +def background(queue: str, **kws): + """ + @background decorator for background task functions. + + (wrapper for background_task.background + with required queue name argument!) + + see https://django-background-tasks.readthedocs.io/en/latest/ + optional arguments: + schedule=TIME (seconds delay, timedelta, django.utils.timezone.now()) + name=None (defaults to "module.function") + remove_existing_tasks=False + if True, all unlocked tasks with the identical task hash + (based on task name and args ONLY) will be removed. + + calling decorated function schedules a background task, can be passed: + verbose_name="description" + creator=user (any model object?) + repeat=SECONDS, repeat_until=Optional[dt.datetime] + available constants for repeat: Task.NEVER, Task.HOURLY, + Task.DAILY, Task.WEEKLY, Task.EVERY_2_WEEKS, Task.EVERY_4_WEEKS + + decorated function return value is not saved! + background_task.models.Task is returned to caller. + calling decorated_function.now() invokes decorated function synchronously. + + NOTE!!! to avoid problems with previously queued tasks, + decorated functions must not: + * get additional required argument + * be renamed + """ + return background_task.background(queue=queue, **kws) + + +def _serialize_task(task): + """ + helper to return JSON representation of a Task. + """ + # probably should return a subset of fields? + # (or provide a serializer?) + return { key: (value.isoformat() if isinstance(value, dt.datetime) else value) + for key, value in task.__dict__.items() if key[0] != '_' } + + +_serialize_completed_task = _serialize_task + + +def return_error(message): + """ + helper to return JSON representation for an error + NOT task specific, but used by task functions + """ + logger.info(f"return_error {message}") + return {'error': message} + + +def return_task(task): + """ + formulate "task" return (analagous to return_error) + returns dict with "task" key w/ serialized task (dict) + """ + return {'task': _serialize_task(task)} + + +def get_completed_tasks(user: str | None) -> dict: + """ + return ALL completed tasks. + If user provided, return just tasks for that user. + + Currently available thru SourcesViewSet for historical reasons. + """ + tasks = CompletedTask.objects + if user: + tasks = tasks.created_by(user) + return {'completed_tasks': [_serialize_completed_task(task) for task in tasks]} + + +def get_pending_tasks(user: str | None) -> dict[str, list[dict]]: + """ + return ALL pending tasks. + If user provided, return just tasks for that user. + + Currently available thru SourcesViewSet for historical reasons. + """ + tasks = Task.objects + if user: + tasks = tasks.created_by(user) + return {'tasks': [_serialize_task(task) for task in tasks]}