Skip to content

Commit

Permalink
Convert MARC Export to use Celery (PP-1472) (#2017)
Browse files Browse the repository at this point in the history
This PR takes the approach of processing batch_size (default: 500) records in one task, then saving the output to redis and re-queuing the task to process the next batch_size of records. Once the data in redis is large enough, a multipart upload is started in S3, and the multipart data is cached in redis. This continues until the file is completely generated.
  • Loading branch information
jonathangreen authored Sep 4, 2024
1 parent b190c49 commit 1ac4d03
Show file tree
Hide file tree
Showing 38 changed files with 4,162 additions and 2,076 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,6 @@ docs/source/*
.DS_Store

src/palace/manager/core/_version.py

# Celery beat schedule file
celerybeat-schedule.db
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ grant all privileges on database circ to palace;
Redis is used as the broker for Celery and the caching layer. You can run Redis with docker using the following command:

```sh
docker run -d --name redis -p 6379:6379 redis
docker run -d --name redis -p 6379:6379 redis/redis-stack-server
```

### Environment variables
Expand Down
6 changes: 0 additions & 6 deletions bin/cache_marc_files

This file was deleted.

2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ services:
retries: 5

redis:
image: "redis:7"
image: "redis/redis-stack-server:7.4.0-v0"
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 30s
Expand Down
3 changes: 0 additions & 3 deletions docker/services/cron/cron.d/circulation
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,6 @@ HOME=/var/www/circulation
# Sync a library's collection with NoveList
0 0 * * 0 root bin/run -d 60 novelist_update >> /var/log/cron.log 2>&1

# Generate MARC files for libraries that have a MARC exporter configured.
0 3,11 * * * root bin/run cache_marc_files >> /var/log/cron.log 2>&1

# The remaining scripts keep the circulation manager in sync with
# specific types of collections.

Expand Down
4 changes: 2 additions & 2 deletions src/palace/manager/api/admin/controller/catalog_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
)
from palace.manager.api.admin.form_data import ProcessFormData
from palace.manager.api.admin.problem_details import MULTIPLE_SERVICES_FOR_LIBRARY
from palace.manager.core.marc import MARCExporter
from palace.manager.integration.goals import Goals
from palace.manager.integration.settings import BaseSettings
from palace.manager.marc.exporter import MarcExporter
from palace.manager.sqlalchemy.listeners import site_configuration_has_changed
from palace.manager.sqlalchemy.model.integration import (
IntegrationConfiguration,
Expand All @@ -21,7 +21,7 @@


class CatalogServicesController(
IntegrationSettingsController[MARCExporter],
IntegrationSettingsController[MarcExporter],
AdminPermissionsControllerMixin,
):
def process_catalog_services(self) -> Response | ProblemDetail:
Expand Down
5 changes: 4 additions & 1 deletion src/palace/manager/api/circulation_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,10 @@ def setup_one_time_controllers(self):
"""
self.index_controller = IndexController(self)
self.opds_feeds = OPDSFeedController(self)
self.marc_records = MARCRecordController(self.services.storage.public())
self.marc_records = MARCRecordController(
self.services.storage.public(),
self.services.integration_registry.catalog_services(),
)
self.loans = LoanController(self)
self.annotations = AnnotationController(self)
self.urn_lookup = URNLookupController(self)
Expand Down
16 changes: 11 additions & 5 deletions src/palace/manager/api/controller/marc.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@
from sqlalchemy import select
from sqlalchemy.orm import Session

from palace.manager.core.marc import MARCExporter
from palace.manager.integration.goals import Goals
from palace.manager.marc.exporter import MarcExporter
from palace.manager.service.integration_registry.catalog_services import (
CatalogServicesRegistry,
)
from palace.manager.service.storage.s3 import S3Service
from palace.manager.sqlalchemy.model.collection import Collection
from palace.manager.sqlalchemy.model.integration import (
Expand Down Expand Up @@ -49,21 +52,24 @@ class MARCRecordController:
</body>
</html>"""

def __init__(self, storage_service: S3Service | None) -> None:
def __init__(
self, storage_service: S3Service | None, registry: CatalogServicesRegistry
) -> None:
self.storage_service = storage_service
self.registry = registry

@staticmethod
def library() -> Library:
return flask.request.library # type: ignore[no-any-return,attr-defined]

@staticmethod
def has_integration(session: Session, library: Library) -> bool:
def has_integration(self, session: Session, library: Library) -> bool:
protocols = self.registry.get_protocols(MarcExporter)
integration_query = (
select(IntegrationLibraryConfiguration)
.join(IntegrationConfiguration)
.where(
IntegrationConfiguration.goal == Goals.CATALOG_GOAL,
IntegrationConfiguration.protocol == MARCExporter.__name__,
IntegrationConfiguration.protocol.in_(protocols),
IntegrationLibraryConfiguration.library == library,
)
)
Expand Down
161 changes: 161 additions & 0 deletions src/palace/manager/celery/tasks/marc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
import datetime
from typing import Any

from celery import shared_task

from palace.manager.celery.task import Task
from palace.manager.marc.exporter import LibraryInfo, MarcExporter
from palace.manager.marc.uploader import MarcUploadManager
from palace.manager.service.celery.celery import QueueNames
from palace.manager.service.redis.models.marc import (
MarcFileUploadSession,
MarcFileUploadState,
)
from palace.manager.util.datetime_helpers import utc_now


@shared_task(queue=QueueNames.default, bind=True)
def marc_export(task: Task, force: bool = False) -> None:
"""
Export MARC records for all collections with the `export_marc_records` flag set to True, whose libraries
have a MARC exporter integration enabled.
"""

with task.session() as session:
registry = task.services.integration_registry.catalog_services()
start_time = utc_now()
collections = MarcExporter.enabled_collections(session, registry)
for collection in collections:
# Collection.id should never be able to be None here, but mypy doesn't know that.
# So we assert it for mypy's benefit.
assert collection.id is not None
upload_session = MarcFileUploadSession(
task.services.redis.client(), collection.id
)
with upload_session.lock() as acquired:
if not acquired:
task.log.info(
f"Skipping collection {collection.name} ({collection.id}) because another task holds its lock."
)
continue

if (
upload_state := upload_session.state()
) != MarcFileUploadState.INITIAL:
task.log.info(
f"Skipping collection {collection.name} ({collection.id}) because it is already being "
f"processed (state: {upload_state})."
)
continue

libraries_info = MarcExporter.enabled_libraries(
session, registry, collection.id
)
needs_update = (
any(info.needs_update for info in libraries_info) or force
)

if not needs_update:
task.log.info(
f"Skipping collection {collection.name} ({collection.id}) because it has been updated recently."
)
continue

works = MarcExporter.query_works(
session,
collection.id,
work_id_offset=0,
batch_size=1,
)
if not works:
task.log.info(
f"Skipping collection {collection.name} ({collection.id}) because it has no works."
)
continue

task.log.info(
f"Generating MARC records for collection {collection.name} ({collection.id})."
)
upload_session.set_state(MarcFileUploadState.QUEUED)
marc_export_collection.delay(
collection_id=collection.id,
start_time=start_time,
libraries=[l.dict() for l in libraries_info],
)


@shared_task(queue=QueueNames.default, bind=True)
def marc_export_collection(
task: Task,
collection_id: int,
start_time: datetime.datetime,
libraries: list[dict[str, Any]],
batch_size: int = 500,
last_work_id: int | None = None,
update_number: int = 0,
) -> None:
"""
Export MARC records for a single collection.
This task is designed to be re-queued until all works in the collection have been processed,
this can take some time, however each individual task should complete quickly, so that it
doesn't block other tasks from running.
"""

base_url = task.services.config.sitewide.base_url()
storage_service = task.services.storage.public()
libraries_info = [LibraryInfo.parse_obj(l) for l in libraries]
upload_manager = MarcUploadManager(
storage_service,
MarcFileUploadSession(
task.services.redis.client(), collection_id, update_number
),
)
with upload_manager.begin():
if not upload_manager.locked:
task.log.info(
f"Skipping collection {collection_id} because another task is already processing it."
)
return

with task.session() as session:
works = MarcExporter.query_works(
session,
collection_id,
work_id_offset=last_work_id,
batch_size=batch_size,
)
for work in works:
MarcExporter.process_work(
work, libraries_info, base_url, upload_manager=upload_manager
)

# Sync the upload_manager to ensure that all the data is written to storage.
upload_manager.sync()

if len(works) == batch_size:
# This task is complete, but there are more works waiting to be exported. So we requeue ourselves
# to process the next batch.
raise task.replace(
marc_export_collection.s(
collection_id=collection_id,
start_time=start_time,
libraries=[l.dict() for l in libraries_info],
batch_size=batch_size,
last_work_id=works[-1].id,
update_number=upload_manager.update_number,
)
)

# If we got here, we have finished generating MARC records. Cleanup and exit.
with task.transaction() as session:
collection = MarcExporter.collection(session, collection_id)
collection_name = collection.name if collection else "unknown"
completed_uploads = upload_manager.complete()
MarcExporter.create_marc_upload_records(
session, start_time, collection_id, libraries_info, completed_uploads
)
upload_manager.remove_session()
task.log.info(
f"Finished generating MARC records for collection '{collection_name}' ({collection_id})."
)
Empty file.
Loading

0 comments on commit 1ac4d03

Please sign in to comment.