Skip to content

Commit

Permalink
feature(locksmith): Event deployment worker (unlock-protocol#15259)
Browse files Browse the repository at this point in the history
* add event deployment worker

* worker registration

* refactor locks job

* update worker entry

* update worker structure for clarity

* send email after updating event status
  • Loading branch information
0xTxbi authored and blahkheart committed Dec 21, 2024
1 parent 4faa3ea commit a61449b
Showing 1 changed file with 200 additions and 41 deletions.
241 changes: 200 additions & 41 deletions locksmith/src/worker/jobs/locks.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,35 @@
import { Op } from 'sequelize'
import { networks } from '@unlock-protocol/networks'
import { Hook, ProcessedHookItem } from '../../models'
import {
Hook,
ProcessedHookItem,
EventData,
CheckoutConfig,
} from '../../models'
import { TOPIC_LOCKS } from '../topics'
import { notifyHook, filterHooksByTopic } from '../helpers'
import { logger } from '../../logger'
import { OrderDirection, SubgraphService } from '@unlock-protocol/unlock-js'
import { LockOrderBy } from '@unlock-protocol/unlock-js'
import { EventStatus } from '@unlock-protocol/types'
import { PaywallConfigType } from '@unlock-protocol/core'
import { getEventUrl } from '../../utils/eventHelpers'
import { sendEmail } from '../../operations/wedlocksOperations'

/**
* Number of locks to fetch in each batch
*/
const FETCH_LIMIT = 25

/**
* Fetches all unprocessed locks for a given network
* Uses pagination to fetch locks in batches
* Filters out any locks that have already been processed
*
* @param network - Network ID to fetch locks from
* @param page - Page number for pagination
* @returns Array of unprocessed locks
*/
async function fetchUnprocessedLocks(network: number, page = 0) {
const subgraph = new SubgraphService()

Expand Down Expand Up @@ -40,65 +61,203 @@ async function fetchUnprocessedLocks(network: number, page = 0) {
return unprocessedLocks
}

async function notifyHooksOfAllUnprocessedLocks(
/**
* Updates pending events with new lock information
* Creates checkout configs for matching events
* Updates event status to DEPLOYED when matches are found
*
* @param locks - Array of new locks to check against pending events
* @param network - Network ID where locks exist
*/
async function updatePendingEvents(locks: any[], network: number) {
const pendingEvents = await EventData.findAll({
where: {
status: EventStatus.PENDING,
transactionHash: {
[Op.not]: null,
},
},
})

for (const event of pendingEvents) {
const matchingLock = locks.find(
(lock) =>
lock.creationTransactionHash?.toLowerCase() ===
event.transactionHash?.toLowerCase()
)

if (matchingLock) {
const config: PaywallConfigType = {
title: 'Registration',
locks: {
[matchingLock.address]: {
network,
},
},
}

// create checkout config for the event
const checkoutConfig = await CheckoutConfig.create({
id: `${event.slug}-${Date.now()}`,
name: `Checkout config for ${event.name} (${event.slug})`,
config,
createdBy: event.createdBy,
})

// update event status to DEPLOYED
await event.update({
status: EventStatus.DEPLOYED,
checkoutConfigId: checkoutConfig.id,
})

// send email notification to event creator
await sendEmail({
template: 'eventDeployed',
recipient: event.data.replyTo,
params: {
eventName: event!.name,
eventDate: event!.data.ticket.event_start_date,
eventTime: event!.data.ticket.event_start_time,
eventUrl: getEventUrl(event!),
},
attachments: [],
})

logger.info(
`Updated event ${event.slug} with lock ${matchingLock.address}`
)
}
}
}

/**
* Notifies all relevant hooks about new locks
* Sends parallel notifications to each hook
*
* @param hooks - Array of hooks to notify
* @param locks - Array of new locks to notify about
* @param network - Network ID where locks exist
*/
async function notifyHooksOfNewLocks(
hooks: Hook[],
locks: any[],
network: number
) {
return Promise.all(
hooks.map(async (hook) => {
return notifyHook(hook, {
data: locks,
network,
})
})
)
}

/**
* Marks locks as processed
* Creates bulk records to track processed locks
*
* @param locks - Array of locks to mark as processed
* @param network - Network ID where locks exist
*/
async function markLocksAsProcessed(locks: any[], network: number) {
await ProcessedHookItem.bulkCreate(
locks.map((lock: any) => ({
network,
type: 'lock',
objectId: lock.id,
}))
)
}

/**
* Finalizes the processing of new locks
* Handles both event updates and marking locks as processed
* Both tasks run in parallel for efficiency
*
* @param locks - Array of locks to finalize
* @param network - Network ID where locks exist
*/
async function finalizeLockProcessing(locks: any[], network: number) {
await Promise.all([
updatePendingEvents(locks, network),
markLocksAsProcessed(locks, network),
])
}

/**
* Processes a single batch of newly discovered locks
* Runs webhook notifications and lock finalization in parallel
*
* @param hooks - Relevant webhooks to notify
* @param locks - Array of new locks to process
* @param network - Network ID where locks were found
*/
async function processLockBatch(hooks: Hook[], locks: any[], network: number) {
await Promise.all([
notifyHooksOfNewLocks(hooks, locks, network),
finalizeLockProcessing(locks, network),
])
}

/**
* Handles lock processing for a specific network
* Implements pagination to process all unprocessed locks in batches
*
* Process:
* 1. Fetches unprocessed locks in batches of FETCH_LIMIT
* 2. For each batch:
* - Notifies relevant webhooks
* - Updates associated events
* - Marks locks as processed
* 3. Continues until no new locks are found
*
* @param hooks - Array of webhooks for this specific network
* @param network - Network ID being processed
*/
async function handleNetworkLocks(hooks: Hook[], network: number) {
let page = 0
while (true) {
logger.info(`Running job on ${network}`)
const locks = await fetchUnprocessedLocks(network, page)
logger.info(`Processing locks for network ${network}`)
const newLocks = await fetchUnprocessedLocks(network, page)

// If empty, break the loop and return as there are no more new locks to process.
if (!locks.length) {
logger.info(`No new locks for, ${network}`)
if (!newLocks.length) {
logger.info(`No new locks found for network ${network}`)
break
}

logger.info('Found new locks', {
locks: locks.map((lock: any) => [network, lock.id]),
})

await Promise.all(
hooks.map(async (hook) => {
const data = locks
const hookEvent = await notifyHook(hook, {
data,
network,
})
return hookEvent
})
)

const processedHookItems = locks.map((lock: any) => {
return {
network,
type: 'lock',
objectId: lock.id,
}
locks: newLocks.map((lock: any) => [network, lock.id]),
})

await ProcessedHookItem.bulkCreate(processedHookItems)

await processLockBatch(hooks, newLocks, network)
page += 1
}
}

/**
* Main entry point for lock notifications system
* Orchestrates the processing of new locks across all supported networks
*
* Process:
* 1. Filters hooks by relevant topic and network
* 2. For each network, initiates parallel processing
* 3. Skips local development network (31337)
*
* @param hooks - Array of webhook configurations to be notified
*/
export async function notifyOfLocks(hooks: Hook[]) {
const subscribedHooks = filterHooksByTopic(hooks, TOPIC_LOCKS)
const tasks: Promise<void>[] = []
const networkTasks: Promise<void>[] = []

for (const network of Object.values(networks)) {
if (network.id !== 31337) {
const hooksFilteredByNetwork = subscribedHooks.filter(
(hook) => hook.network === network.id
)
const task = notifyHooksOfAllUnprocessedLocks(
hooksFilteredByNetwork,
network.id
)
tasks.push(task)
}
if (network.id === 31337) continue // Skip local network

const networkHooks = subscribedHooks.filter(
(hook) => hook.network === network.id
)
networkTasks.push(handleNetworkLocks(networkHooks, network.id))
}
await Promise.allSettled(tasks)

await Promise.allSettled(networkTasks)
}

0 comments on commit a61449b

Please sign in to comment.