Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(NOBIDS) feat(notification): separate processes per network #2952

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions db/subscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,8 +252,8 @@ func GetSubscriptions(filter GetSubscriptionsFilter) ([]*types.Subscription, err
}

// UpdateSubscriptionsLastSent updates `last_sent_ts` column of the `users_subscriptions` table.
func UpdateSubscriptionsLastSent(subscriptionIDs []uint64, sent time.Time, epoch uint64, useDB *sqlx.DB) error {
_, err := useDB.Exec(`
func UpdateSubscriptionsLastSent(subscriptionIDs []uint64, sent time.Time, epoch uint64) error {
_, err := FrontendWriterDB.Exec(`
UPDATE users_subscriptions
SET last_sent_ts = TO_TIMESTAMP($1), last_sent_epoch = $2
WHERE id = ANY($3)`, sent.Unix(), epoch, pq.Array(subscriptionIDs))
Expand Down
91 changes: 45 additions & 46 deletions services/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
gcp_bigtable "cloud.google.com/go/bigtable"
"firebase.google.com/go/v4/messaging"
"github.com/ethereum/go-ethereum/common"
"github.com/jmoiron/sqlx"
"github.com/lib/pq"
"github.com/rocket-pool/rocketpool-go/utils/eth"
"golang.org/x/text/cases"
Expand Down Expand Up @@ -107,7 +106,7 @@ func notificationCollector() {
break
}

queueNotifications(notifications, db.FrontendWriterDB) // this caused the collected notifications to be queued and sent
queueNotifications(notifications) // this caused the collected notifications to be queued and sent

// Network DB Notifications (user related, must only run on one instance ever!!!!)
if utils.Config.Notifications.UserDBNotifications {
Expand All @@ -120,7 +119,7 @@ func notificationCollector() {
continue
}

queueNotifications(userNotifications, db.FrontendWriterDB)
queueNotifications(userNotifications)
}

logger.
Expand All @@ -142,7 +141,7 @@ func notificationSender() {
start := time.Now()
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5)

conn, err := db.FrontendWriterDB.Conn(ctx)
conn, err := db.WriterDb.Conn(ctx)
if err != nil {
logger.WithError(err).Error("error creating connection")
cancel()
Expand All @@ -162,12 +161,12 @@ func notificationSender() {
}

logger.Info("lock obtained")
err = dispatchNotifications(db.FrontendWriterDB)
err = dispatchNotifications()
if err != nil {
logger.WithError(err).Error("error dispatching notifications")
}

err = garbageCollectNotificationQueue(db.FrontendWriterDB)
err = garbageCollectNotificationQueue()
if err != nil {
logger.WithError(err).Errorf("error garbage collecting the notification queue")
}
Expand Down Expand Up @@ -385,7 +384,7 @@ func collectUserDbNotifications(epoch uint64) (map[uint64]map[types.EventName][]
return notificationsByUserID, nil
}

func queueNotifications(notificationsByUserID map[uint64]map[types.EventName][]types.Notification, useDB *sqlx.DB) {
func queueNotifications(notificationsByUserID map[uint64]map[types.EventName][]types.Notification) {
subByEpoch := map[uint64][]uint64{}

// prevent multiple events being sent with the same subscription id
Expand All @@ -408,17 +407,17 @@ func queueNotifications(notificationsByUserID map[uint64]map[types.EventName][]t
}
}

err := queueEmailNotifications(notificationsByUserID, useDB)
err := queueEmailNotifications(notificationsByUserID)
if err != nil {
logger.WithError(err).Error("error queuing email notifications")
}

err = queuePushNotification(notificationsByUserID, useDB)
err = queuePushNotification(notificationsByUserID)
if err != nil {
logger.WithError(err).Error("error queuing push notifications")
}

err = queueWebhookNotifications(notificationsByUserID, useDB)
err = queueWebhookNotifications(notificationsByUserID)
if err != nil {
logger.WithError(err).Error("error queuing webhook notifications")
}
Expand All @@ -437,7 +436,7 @@ func queueNotifications(notificationsByUserID map[uint64]map[types.EventName][]t
}
for epoch, subIDs := range subByEpoch {
// update that we've queued the subscription (last sent rather means last queued)
err := db.UpdateSubscriptionsLastSent(subIDs, time.Now(), epoch, useDB)
err := db.UpdateSubscriptionsLastSent(subIDs, time.Now(), epoch)
if err != nil {
logger.Errorf("error updating sent-time of sent notifications: %v", err)
metrics.Errors.WithLabelValues("notifications_updating_sent_time").Inc()
Expand Down Expand Up @@ -475,24 +474,24 @@ func queueNotifications(notificationsByUserID map[uint64]map[types.EventName][]t
}
}

func dispatchNotifications(useDB *sqlx.DB) error {
func dispatchNotifications() error {

err := sendEmailNotifications(useDB)
err := sendEmailNotifications()
if err != nil {
return fmt.Errorf("error sending email notifications, err: %w", err)
}

err = sendPushNotifications(useDB)
err = sendPushNotifications()
if err != nil {
return fmt.Errorf("error sending push notifications, err: %w", err)
}

err = sendWebhookNotifications(useDB)
err = sendWebhookNotifications()
if err != nil {
return fmt.Errorf("error sending webhook notifications, err: %w", err)
}

err = sendDiscordNotifications(useDB)
err = sendDiscordNotifications()
if err != nil {
return fmt.Errorf("error sending webhook discord notifications, err: %w", err)
}
Expand All @@ -501,9 +500,9 @@ func dispatchNotifications(useDB *sqlx.DB) error {
}

// garbageCollectNotificationQueue deletes entries from the notification queue that have been processed
func garbageCollectNotificationQueue(useDB *sqlx.DB) error {
func garbageCollectNotificationQueue() error {

rows, err := useDB.Exec(`DELETE FROM notification_queue WHERE (sent < now() - INTERVAL '30 minutes') OR (created < now() - INTERVAL '1 hour')`)
rows, err := db.WriterDb.Exec(`DELETE FROM notification_queue WHERE (sent < now() - INTERVAL '30 minutes') OR (created < now() - INTERVAL '1 hour')`)
if err != nil {
return fmt.Errorf("error deleting from notification_queue %w", err)
}
Expand All @@ -523,7 +522,7 @@ func getNetwork() string {
return ""
}

func queuePushNotification(notificationsByUserID map[uint64]map[types.EventName][]types.Notification, useDB *sqlx.DB) error {
func queuePushNotification(notificationsByUserID map[uint64]map[types.EventName][]types.Notification) error {
userIDs := []uint64{}
for userID := range notificationsByUserID {
userIDs = append(userIDs, userID)
Expand Down Expand Up @@ -576,7 +575,7 @@ func queuePushNotification(notificationsByUserID map[uint64]map[types.EventName]
Messages: batch,
}

_, err = useDB.Exec(`INSERT INTO notification_queue (created, channel, content) VALUES ($1, 'push', $2)`, time.Now(), transitPushContent)
_, err = db.WriterDb.Exec(`INSERT INTO notification_queue (created, channel, content) VALUES ($1, 'push', $2)`, time.Now(), transitPushContent)
if err != nil {
logger.WithError(err).Errorf("error writing transit push notification to db")
return
Expand All @@ -586,10 +585,10 @@ func queuePushNotification(notificationsByUserID map[uint64]map[types.EventName]
return nil
}

func sendPushNotifications(useDB *sqlx.DB) error {
func sendPushNotifications() error {
var notificationQueueItem []types.TransitPush

err := useDB.Select(&notificationQueueItem, `SELECT
err := db.WriterDb.Select(&notificationQueueItem, `SELECT
id,
created,
sent,
Expand Down Expand Up @@ -619,7 +618,7 @@ func sendPushNotifications(useDB *sqlx.DB) error {
metrics.NotificationsSent.WithLabelValues("push", "200").Add(float64(len(n.Content.Messages)))
}

_, err = useDB.Exec(`UPDATE notification_queue SET sent = now() WHERE id = $1`, n.Id)
_, err = db.WriterDb.Exec(`UPDATE notification_queue SET sent = now() WHERE id = $1`, n.Id)
if err != nil {
return fmt.Errorf("error updating sent status for push notification with id: %v, err: %w", n.Id, err)
}
Expand All @@ -628,7 +627,7 @@ func sendPushNotifications(useDB *sqlx.DB) error {
return nil
}

func queueEmailNotifications(notificationsByUserID map[uint64]map[types.EventName][]types.Notification, useDB *sqlx.DB) error {
func queueEmailNotifications(notificationsByUserID map[uint64]map[types.EventName][]types.Notification) error {
userIDs := []uint64{}
for userID := range notificationsByUserID {
userIDs = append(userIDs, userID)
Expand Down Expand Up @@ -765,7 +764,7 @@ func queueEmailNotifications(notificationsByUserID map[uint64]map[types.EventNam
Attachments: attachments,
}

_, err = useDB.Exec(`INSERT INTO notification_queue (created, channel, content) VALUES ($1, 'email', $2)`, time.Now(), transitEmailContent)
_, err = db.WriterDb.Exec(`INSERT INTO notification_queue (created, channel, content) VALUES ($1, 'email', $2)`, time.Now(), transitEmailContent)
if err != nil {
logger.WithError(err).Errorf("error writing transit email to db")
}
Expand All @@ -774,10 +773,10 @@ func queueEmailNotifications(notificationsByUserID map[uint64]map[types.EventNam
return nil
}

func sendEmailNotifications(useDb *sqlx.DB) error {
func sendEmailNotifications() error {
var notificationQueueItem []types.TransitEmail

err := useDb.Select(&notificationQueueItem, `SELECT
err := db.WriterDb.Select(&notificationQueueItem, `SELECT
id,
created,
sent,
Expand All @@ -800,18 +799,18 @@ func sendEmailNotifications(useDb *sqlx.DB) error {
metrics.NotificationsSent.WithLabelValues("email", "200").Inc()
}
}
_, err = useDb.Exec(`UPDATE notification_queue set sent = now() where id = $1`, n.Id)
_, err = db.WriterDb.Exec(`UPDATE notification_queue set sent = now() where id = $1`, n.Id)
if err != nil {
return fmt.Errorf("error updating sent status for email notification with id: %v, err: %w", n.Id, err)
}
}
return nil
}

func queueWebhookNotifications(notificationsByUserID map[uint64]map[types.EventName][]types.Notification, useDB *sqlx.DB) error {
func queueWebhookNotifications(notificationsByUserID map[uint64]map[types.EventName][]types.Notification) error {
for userID, userNotifications := range notificationsByUserID {
var webhooks []types.UserWebhook
err := useDB.Select(&webhooks, `
err := db.FrontendWriterDB.Select(&webhooks, `
SELECT
id,
user_id,
Expand Down Expand Up @@ -849,7 +848,7 @@ func queueWebhookNotifications(notificationsByUserID map[uint64]map[types.EventN
if len(notifications) > 0 {
// reset Retries
if w.Retries > 5 && w.LastSent.Valid && w.LastSent.Time.Add(time.Hour).Before(time.Now()) {
_, err = useDB.Exec(`UPDATE users_webhooks SET retries = 0 WHERE id = $1;`, w.ID)
_, err = db.FrontendWriterDB.Exec(`UPDATE users_webhooks SET retries = 0 WHERE id = $1;`, w.ID)
if err != nil {
logger.WithError(err).Errorf("error updating users_webhooks table; setting retries to zero")
continue
Expand Down Expand Up @@ -926,7 +925,7 @@ func queueWebhookNotifications(notificationsByUserID map[uint64]map[types.EventN
}
// process notifs
for _, n := range notifs {
_, err = useDB.Exec(`INSERT INTO notification_queue (created, channel, content) VALUES (now(), $1, $2);`, n.Channel, n.Content)
_, err = db.WriterDb.Exec(`INSERT INTO notification_queue (created, channel, content) VALUES (now(), $1, $2);`, n.Channel, n.Content)
if err != nil {
logger.WithError(err).Errorf("error inserting into webhooks_queue")
} else {
Expand All @@ -936,7 +935,7 @@ func queueWebhookNotifications(notificationsByUserID map[uint64]map[types.EventN
// process discord notifs
for _, dNotifs := range discordNotifMap {
for _, n := range dNotifs {
_, err = useDB.Exec(`INSERT INTO notification_queue (created, channel, content) VALUES (now(), 'webhook_discord', $1);`, n)
_, err = db.WriterDb.Exec(`INSERT INTO notification_queue (created, channel, content) VALUES (now(), 'webhook_discord', $1);`, n)
if err != nil {
logger.WithError(err).Errorf("error inserting into webhooks_queue (discord)")
continue
Expand All @@ -949,10 +948,10 @@ func queueWebhookNotifications(notificationsByUserID map[uint64]map[types.EventN
return nil
}

func sendWebhookNotifications(useDB *sqlx.DB) error {
func sendWebhookNotifications() error {
var notificationQueueItem []types.TransitWebhook

err := useDB.Select(&notificationQueueItem, `SELECT
err := db.WriterDb.Select(&notificationQueueItem, `SELECT
id,
created,
sent,
Expand All @@ -969,7 +968,7 @@ func sendWebhookNotifications(useDB *sqlx.DB) error {
for _, n := range notificationQueueItem {
// do not retry after 5 attempts
if n.Content.Webhook.Retries > 5 {
_, err := db.FrontendWriterDB.Exec(`DELETE FROM notification_queue WHERE id = $1`, n.Id)
_, err := db.WriterDb.Exec(`DELETE FROM notification_queue WHERE id = $1`, n.Id)
if err != nil {
return fmt.Errorf("error deleting from notification queue: %w", err)
}
Expand All @@ -985,7 +984,7 @@ func sendWebhookNotifications(useDB *sqlx.DB) error {

_, err = url.Parse(n.Content.Webhook.Url)
if err != nil {
_, err := db.FrontendWriterDB.Exec(`DELETE FROM notification_queue WHERE id = $1`, n.Id)
_, err := db.WriterDb.Exec(`DELETE FROM notification_queue WHERE id = $1`, n.Id)
if err != nil {
return fmt.Errorf("error deleting from notification queue: %w", err)
}
Expand All @@ -1003,14 +1002,14 @@ func sendWebhookNotifications(useDB *sqlx.DB) error {
metrics.NotificationsSent.WithLabelValues("webhook", resp.Status).Inc()
}

_, err = useDB.Exec(`UPDATE notification_queue SET sent = now() WHERE id = $1`, n.Id)
_, err = db.WriterDb.Exec(`UPDATE notification_queue SET sent = now() WHERE id = $1`, n.Id)
if err != nil {
logger.WithError(err).Errorf("error updating notification_queue table")
return
}

if resp != nil && resp.StatusCode < 400 {
_, err = useDB.Exec(`UPDATE users_webhooks SET retries = 0, last_sent = now() WHERE id = $1;`, n.Content.Webhook.ID)
_, err = db.FrontendWriterDB.Exec(`UPDATE users_webhooks SET retries = 0, last_sent = now() WHERE id = $1;`, n.Content.Webhook.ID)
if err != nil {
logger.WithError(err).Errorf("error updating users_webhooks table; setting retries to zero")
return
Expand All @@ -1028,7 +1027,7 @@ func sendWebhookNotifications(useDB *sqlx.DB) error {
errResp.Body = string(b)
}

_, err = useDB.Exec(`UPDATE users_webhooks SET retries = retries + 1, last_sent = now(), request = $2, response = $3 WHERE id = $1;`, n.Content.Webhook.ID, n.Content, errResp)
_, err = db.FrontendWriterDB.Exec(`UPDATE users_webhooks SET retries = retries + 1, last_sent = now(), request = $2, response = $3 WHERE id = $1;`, n.Content.Webhook.ID, n.Content, errResp)
if err != nil {
logger.WithError(err).Errorf("error updating users_webhooks table; increasing retries")
return
Expand All @@ -1040,10 +1039,10 @@ func sendWebhookNotifications(useDB *sqlx.DB) error {
return nil
}

func sendDiscordNotifications(useDB *sqlx.DB) error {
func sendDiscordNotifications() error {
var notificationQueueItem []types.TransitDiscord

err := useDB.Select(&notificationQueueItem, `SELECT
err := db.WriterDb.Select(&notificationQueueItem, `SELECT
id,
created,
sent,
Expand All @@ -1064,7 +1063,7 @@ func sendDiscordNotifications(useDB *sqlx.DB) error {
for _, n := range notificationQueueItem {
// purge the event from existence if the retry counter is over 5
if n.Content.Webhook.Retries > 5 {
db.FrontendWriterDB.Exec(`DELETE FROM notification_queue where id = $1`, n.Id)
db.WriterDb.Exec(`DELETE FROM notification_queue where id = $1`, n.Id)
continue
}
if _, exists := webhookMap[n.Content.Webhook.ID]; !exists {
Expand All @@ -1079,7 +1078,7 @@ func sendDiscordNotifications(useDB *sqlx.DB) error {
go func(webhook types.UserWebhook, reqs []types.TransitDiscord) {
defer func() {
// update retries counters in db based on end result
_, err = useDB.Exec(`UPDATE users_webhooks SET retries = $1, last_sent = now() WHERE id = $2;`, webhook.Retries, webhook.ID)
_, err = db.FrontendWriterDB.Exec(`UPDATE users_webhooks SET retries = $1, last_sent = now() WHERE id = $2;`, webhook.Retries, webhook.ID)
if err != nil {
logger.Warnf("failed to update retries counter to %v for webhook %v: %v", webhook.Retries, webhook.ID, err)
}
Expand All @@ -1089,7 +1088,7 @@ func sendDiscordNotifications(useDB *sqlx.DB) error {
for _, req := range reqs {
ids = append(ids, req.Id)
}
_, err = db.FrontendWriterDB.Exec(`UPDATE notification_queue SET sent = now() where id = ANY($1)`, pq.Array(ids))
_, err = db.WriterDb.Exec(`UPDATE notification_queue SET sent = now() where id = ANY($1)`, pq.Array(ids))
if err != nil {
logger.Warnf("failed to update sent for notifcations in queue: %v", err)
}
Expand Down Expand Up @@ -1142,7 +1141,7 @@ func sendDiscordNotifications(useDB *sqlx.DB) error {
} else {
utils.LogError(nil, "error pushing discord webhook", 0, map[string]interface{}{"errResp.Body": errResp.Body, "webhook.Url": webhook.Url})
}
_, err = useDB.Exec(`UPDATE users_webhooks SET request = $2, response = $3 WHERE id = $1;`, webhook.ID, reqs[i].Content.DiscordRequest, errResp)
_, err = db.FrontendWriterDB.Exec(`UPDATE users_webhooks SET request = $2, response = $3 WHERE id = $1;`, webhook.ID, reqs[i].Content.DiscordRequest, errResp)
if err != nil {
logger.Errorf("error storing failure data in users_webhooks table: %v", err)
}
Expand Down
Loading