Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature(locksmith): Event deployment worker #15259

Merged
merged 8 commits into from
Dec 17, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
241 changes: 200 additions & 41 deletions locksmith/src/worker/jobs/locks.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,35 @@
import { Op } from 'sequelize'
import { networks } from '@unlock-protocol/networks'
import { Hook, ProcessedHookItem } from '../../models'
import {
Hook,
ProcessedHookItem,
EventData,
CheckoutConfig,
} from '../../models'
import { TOPIC_LOCKS } from '../topics'
import { notifyHook, filterHooksByTopic } from '../helpers'
import { logger } from '../../logger'
import { OrderDirection, SubgraphService } from '@unlock-protocol/unlock-js'
import { LockOrderBy } from '@unlock-protocol/unlock-js'
import { EventStatus } from '@unlock-protocol/types'
import { PaywallConfigType } from '@unlock-protocol/core'
import { getEventUrl } from '../../utils/eventHelpers'
import { sendEmail } from '../../operations/wedlocksOperations'

/**
* Number of locks to fetch in each batch
*/
const FETCH_LIMIT = 25

/**
* Fetches all unprocessed locks for a given network
* Uses pagination to fetch locks in batches
* Filters out any locks that have already been processed
*
* @param network - Network ID to fetch locks from
* @param page - Page number for pagination
* @returns Array of unprocessed locks
*/
async function fetchUnprocessedLocks(network: number, page = 0) {
const subgraph = new SubgraphService()

Expand Down Expand Up @@ -40,65 +61,203 @@ async function fetchUnprocessedLocks(network: number, page = 0) {
return unprocessedLocks
}

async function notifyHooksOfAllUnprocessedLocks(
/**
* Updates pending events with new lock information
* Creates checkout configs for matching events
* Updates event status to DEPLOYED when matches are found
*
* @param locks - Array of new locks to check against pending events
* @param network - Network ID where locks exist
*/
async function updatePendingEvents(locks: any[], network: number) {
const pendingEvents = await EventData.findAll({
where: {
status: EventStatus.PENDING,
transactionHash: {
[Op.not]: null,
},
},
})

for (const event of pendingEvents) {
const matchingLock = locks.find(
(lock) =>
lock.creationTransactionHash?.toLowerCase() ===
event.transactionHash?.toLowerCase()
)

if (matchingLock) {
const config: PaywallConfigType = {
title: 'Registration',
locks: {
[matchingLock.address]: {
network,
},
},
}

// create checkout config for the event
const checkoutConfig = await CheckoutConfig.create({
id: `${event.slug}-${Date.now()}`,
name: `Checkout config for ${event.name} (${event.slug})`,
config,
createdBy: event.createdBy,
})

// update event status to DEPLOYED
await event.update({
status: EventStatus.DEPLOYED,
checkoutConfigId: checkoutConfig.id,
})

// send email notification to event creator
await sendEmail({
template: 'eventDeployed',
recipient: event.data.replyTo,
params: {
eventName: event!.name,
eventDate: event!.data.ticket.event_start_date,
eventTime: event!.data.ticket.event_start_time,
eventUrl: getEventUrl(event!),
},
attachments: [],
})

logger.info(
`Updated event ${event.slug} with lock ${matchingLock.address}`
)
}
}
}

/**
* Notifies all relevant hooks about new locks
* Sends parallel notifications to each hook
*
* @param hooks - Array of hooks to notify
* @param locks - Array of new locks to notify about
* @param network - Network ID where locks exist
*/
async function notifyHooksOfNewLocks(
hooks: Hook[],
locks: any[],
network: number
) {
return Promise.all(
hooks.map(async (hook) => {
return notifyHook(hook, {
data: locks,
network,
})
})
)
}

/**
* Marks locks as processed
* Creates bulk records to track processed locks
*
* @param locks - Array of locks to mark as processed
* @param network - Network ID where locks exist
*/
async function markLocksAsProcessed(locks: any[], network: number) {
await ProcessedHookItem.bulkCreate(
locks.map((lock: any) => ({
network,
type: 'lock',
objectId: lock.id,
}))
)
}

/**
* Finalizes the processing of new locks
* Handles both event updates and marking locks as processed
* Both tasks run in parallel for efficiency
*
* @param locks - Array of locks to finalize
* @param network - Network ID where locks exist
*/
async function finalizeLockProcessing(locks: any[], network: number) {
await Promise.all([
updatePendingEvents(locks, network),
markLocksAsProcessed(locks, network),
])
}

/**
* Processes a single batch of newly discovered locks
* Runs webhook notifications and lock finalization in parallel
*
* @param hooks - Relevant webhooks to notify
* @param locks - Array of new locks to process
* @param network - Network ID where locks were found
*/
async function processLockBatch(hooks: Hook[], locks: any[], network: number) {
await Promise.all([
notifyHooksOfNewLocks(hooks, locks, network),
finalizeLockProcessing(locks, network),
])
}

/**
* Handles lock processing for a specific network
* Implements pagination to process all unprocessed locks in batches
*
* Process:
* 1. Fetches unprocessed locks in batches of FETCH_LIMIT
* 2. For each batch:
* - Notifies relevant webhooks
* - Updates associated events
* - Marks locks as processed
* 3. Continues until no new locks are found
*
* @param hooks - Array of webhooks for this specific network
* @param network - Network ID being processed
*/
async function handleNetworkLocks(hooks: Hook[], network: number) {
let page = 0
while (true) {
logger.info(`Running job on ${network}`)
const locks = await fetchUnprocessedLocks(network, page)
logger.info(`Processing locks for network ${network}`)
const newLocks = await fetchUnprocessedLocks(network, page)

// If empty, break the loop and return as there are no more new locks to process.
if (!locks.length) {
logger.info(`No new locks for, ${network}`)
if (!newLocks.length) {
logger.info(`No new locks found for network ${network}`)
break
}

logger.info('Found new locks', {
locks: locks.map((lock: any) => [network, lock.id]),
})

await Promise.all(
hooks.map(async (hook) => {
const data = locks
const hookEvent = await notifyHook(hook, {
data,
network,
})
return hookEvent
})
)

const processedHookItems = locks.map((lock: any) => {
return {
network,
type: 'lock',
objectId: lock.id,
}
locks: newLocks.map((lock: any) => [network, lock.id]),
})

await ProcessedHookItem.bulkCreate(processedHookItems)

await processLockBatch(hooks, newLocks, network)
page += 1
}
}

/**
* Main entry point for lock notifications system
* Orchestrates the processing of new locks across all supported networks
*
* Process:
* 1. Filters hooks by relevant topic and network
* 2. For each network, initiates parallel processing
* 3. Skips local development network (31337)
*
* @param hooks - Array of webhook configurations to be notified
*/
export async function notifyOfLocks(hooks: Hook[]) {
const subscribedHooks = filterHooksByTopic(hooks, TOPIC_LOCKS)
const tasks: Promise<void>[] = []
const networkTasks: Promise<void>[] = []

for (const network of Object.values(networks)) {
if (network.id !== 31337) {
const hooksFilteredByNetwork = subscribedHooks.filter(
(hook) => hook.network === network.id
)
const task = notifyHooksOfAllUnprocessedLocks(
hooksFilteredByNetwork,
network.id
)
tasks.push(task)
}
if (network.id === 31337) continue // Skip local network

const networkHooks = subscribedHooks.filter(
(hook) => hook.network === network.id
)
networkTasks.push(handleNetworkLocks(networkHooks, network.id))
}
await Promise.allSettled(tasks)

await Promise.allSettled(networkTasks)
}
Loading