diff --git a/db/subscriptions.go b/db/subscriptions.go index 3c8c66be91..d2af788f76 100644 --- a/db/subscriptions.go +++ b/db/subscriptions.go @@ -252,8 +252,8 @@ func GetSubscriptions(filter GetSubscriptionsFilter) ([]*types.Subscription, err } // UpdateSubscriptionsLastSent updates `last_sent_ts` column of the `users_subscriptions` table. -func UpdateSubscriptionsLastSent(subscriptionIDs []uint64, sent time.Time, epoch uint64, useDB *sqlx.DB) error { - _, err := useDB.Exec(` +func UpdateSubscriptionsLastSent(subscriptionIDs []uint64, sent time.Time, epoch uint64) error { + _, err := FrontendWriterDB.Exec(` UPDATE users_subscriptions SET last_sent_ts = TO_TIMESTAMP($1), last_sent_epoch = $2 WHERE id = ANY($3)`, sent.Unix(), epoch, pq.Array(subscriptionIDs)) diff --git a/services/notifications.go b/services/notifications.go index 638075d1fd..3cbc859fd7 100644 --- a/services/notifications.go +++ b/services/notifications.go @@ -31,7 +31,6 @@ import ( gcp_bigtable "cloud.google.com/go/bigtable" "firebase.google.com/go/v4/messaging" "github.com/ethereum/go-ethereum/common" - "github.com/jmoiron/sqlx" "github.com/lib/pq" "github.com/rocket-pool/rocketpool-go/utils/eth" "golang.org/x/text/cases" @@ -107,7 +106,7 @@ func notificationCollector() { break } - queueNotifications(notifications, db.FrontendWriterDB) // this caused the collected notifications to be queued and sent + queueNotifications(notifications) // this caused the collected notifications to be queued and sent // Network DB Notifications (user related, must only run on one instance ever!!!!) if utils.Config.Notifications.UserDBNotifications { @@ -120,7 +119,7 @@ func notificationCollector() { continue } - queueNotifications(userNotifications, db.FrontendWriterDB) + queueNotifications(userNotifications) } logger. @@ -142,7 +141,7 @@ func notificationSender() { start := time.Now() ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5) - conn, err := db.FrontendWriterDB.Conn(ctx) + conn, err := db.WriterDb.Conn(ctx) if err != nil { logger.WithError(err).Error("error creating connection") cancel() @@ -162,12 +161,12 @@ func notificationSender() { } logger.Info("lock obtained") - err = dispatchNotifications(db.FrontendWriterDB) + err = dispatchNotifications() if err != nil { logger.WithError(err).Error("error dispatching notifications") } - err = garbageCollectNotificationQueue(db.FrontendWriterDB) + err = garbageCollectNotificationQueue() if err != nil { logger.WithError(err).Errorf("error garbage collecting the notification queue") } @@ -385,7 +384,7 @@ func collectUserDbNotifications(epoch uint64) (map[uint64]map[types.EventName][] return notificationsByUserID, nil } -func queueNotifications(notificationsByUserID map[uint64]map[types.EventName][]types.Notification, useDB *sqlx.DB) { +func queueNotifications(notificationsByUserID map[uint64]map[types.EventName][]types.Notification) { subByEpoch := map[uint64][]uint64{} // prevent multiple events being sent with the same subscription id @@ -408,17 +407,17 @@ func queueNotifications(notificationsByUserID map[uint64]map[types.EventName][]t } } - err := queueEmailNotifications(notificationsByUserID, useDB) + err := queueEmailNotifications(notificationsByUserID) if err != nil { logger.WithError(err).Error("error queuing email notifications") } - err = queuePushNotification(notificationsByUserID, useDB) + err = queuePushNotification(notificationsByUserID) if err != nil { logger.WithError(err).Error("error queuing push notifications") } - err = queueWebhookNotifications(notificationsByUserID, useDB) + err = queueWebhookNotifications(notificationsByUserID) if err != nil { logger.WithError(err).Error("error queuing webhook notifications") } @@ -437,7 +436,7 @@ func queueNotifications(notificationsByUserID map[uint64]map[types.EventName][]t } for epoch, subIDs := range subByEpoch { // update that we've queued the subscription (last sent rather means last queued) - err := db.UpdateSubscriptionsLastSent(subIDs, time.Now(), epoch, useDB) + err := db.UpdateSubscriptionsLastSent(subIDs, time.Now(), epoch) if err != nil { logger.Errorf("error updating sent-time of sent notifications: %v", err) metrics.Errors.WithLabelValues("notifications_updating_sent_time").Inc() @@ -475,24 +474,24 @@ func queueNotifications(notificationsByUserID map[uint64]map[types.EventName][]t } } -func dispatchNotifications(useDB *sqlx.DB) error { +func dispatchNotifications() error { - err := sendEmailNotifications(useDB) + err := sendEmailNotifications() if err != nil { return fmt.Errorf("error sending email notifications, err: %w", err) } - err = sendPushNotifications(useDB) + err = sendPushNotifications() if err != nil { return fmt.Errorf("error sending push notifications, err: %w", err) } - err = sendWebhookNotifications(useDB) + err = sendWebhookNotifications() if err != nil { return fmt.Errorf("error sending webhook notifications, err: %w", err) } - err = sendDiscordNotifications(useDB) + err = sendDiscordNotifications() if err != nil { return fmt.Errorf("error sending webhook discord notifications, err: %w", err) } @@ -501,9 +500,9 @@ func dispatchNotifications(useDB *sqlx.DB) error { } // garbageCollectNotificationQueue deletes entries from the notification queue that have been processed -func garbageCollectNotificationQueue(useDB *sqlx.DB) error { +func garbageCollectNotificationQueue() error { - rows, err := useDB.Exec(`DELETE FROM notification_queue WHERE (sent < now() - INTERVAL '30 minutes') OR (created < now() - INTERVAL '1 hour')`) + rows, err := db.WriterDb.Exec(`DELETE FROM notification_queue WHERE (sent < now() - INTERVAL '30 minutes') OR (created < now() - INTERVAL '1 hour')`) if err != nil { return fmt.Errorf("error deleting from notification_queue %w", err) } @@ -523,7 +522,7 @@ func getNetwork() string { return "" } -func queuePushNotification(notificationsByUserID map[uint64]map[types.EventName][]types.Notification, useDB *sqlx.DB) error { +func queuePushNotification(notificationsByUserID map[uint64]map[types.EventName][]types.Notification) error { userIDs := []uint64{} for userID := range notificationsByUserID { userIDs = append(userIDs, userID) @@ -576,7 +575,7 @@ func queuePushNotification(notificationsByUserID map[uint64]map[types.EventName] Messages: batch, } - _, err = useDB.Exec(`INSERT INTO notification_queue (created, channel, content) VALUES ($1, 'push', $2)`, time.Now(), transitPushContent) + _, err = db.WriterDb.Exec(`INSERT INTO notification_queue (created, channel, content) VALUES ($1, 'push', $2)`, time.Now(), transitPushContent) if err != nil { logger.WithError(err).Errorf("error writing transit push notification to db") return @@ -586,10 +585,10 @@ func queuePushNotification(notificationsByUserID map[uint64]map[types.EventName] return nil } -func sendPushNotifications(useDB *sqlx.DB) error { +func sendPushNotifications() error { var notificationQueueItem []types.TransitPush - err := useDB.Select(¬ificationQueueItem, `SELECT + err := db.WriterDb.Select(¬ificationQueueItem, `SELECT id, created, sent, @@ -619,7 +618,7 @@ func sendPushNotifications(useDB *sqlx.DB) error { metrics.NotificationsSent.WithLabelValues("push", "200").Add(float64(len(n.Content.Messages))) } - _, err = useDB.Exec(`UPDATE notification_queue SET sent = now() WHERE id = $1`, n.Id) + _, err = db.WriterDb.Exec(`UPDATE notification_queue SET sent = now() WHERE id = $1`, n.Id) if err != nil { return fmt.Errorf("error updating sent status for push notification with id: %v, err: %w", n.Id, err) } @@ -628,7 +627,7 @@ func sendPushNotifications(useDB *sqlx.DB) error { return nil } -func queueEmailNotifications(notificationsByUserID map[uint64]map[types.EventName][]types.Notification, useDB *sqlx.DB) error { +func queueEmailNotifications(notificationsByUserID map[uint64]map[types.EventName][]types.Notification) error { userIDs := []uint64{} for userID := range notificationsByUserID { userIDs = append(userIDs, userID) @@ -765,7 +764,7 @@ func queueEmailNotifications(notificationsByUserID map[uint64]map[types.EventNam Attachments: attachments, } - _, err = useDB.Exec(`INSERT INTO notification_queue (created, channel, content) VALUES ($1, 'email', $2)`, time.Now(), transitEmailContent) + _, err = db.WriterDb.Exec(`INSERT INTO notification_queue (created, channel, content) VALUES ($1, 'email', $2)`, time.Now(), transitEmailContent) if err != nil { logger.WithError(err).Errorf("error writing transit email to db") } @@ -774,10 +773,10 @@ func queueEmailNotifications(notificationsByUserID map[uint64]map[types.EventNam return nil } -func sendEmailNotifications(useDb *sqlx.DB) error { +func sendEmailNotifications() error { var notificationQueueItem []types.TransitEmail - err := useDb.Select(¬ificationQueueItem, `SELECT + err := db.WriterDb.Select(¬ificationQueueItem, `SELECT id, created, sent, @@ -800,7 +799,7 @@ func sendEmailNotifications(useDb *sqlx.DB) error { metrics.NotificationsSent.WithLabelValues("email", "200").Inc() } } - _, err = useDb.Exec(`UPDATE notification_queue set sent = now() where id = $1`, n.Id) + _, err = db.WriterDb.Exec(`UPDATE notification_queue set sent = now() where id = $1`, n.Id) if err != nil { return fmt.Errorf("error updating sent status for email notification with id: %v, err: %w", n.Id, err) } @@ -808,10 +807,10 @@ func sendEmailNotifications(useDb *sqlx.DB) error { return nil } -func queueWebhookNotifications(notificationsByUserID map[uint64]map[types.EventName][]types.Notification, useDB *sqlx.DB) error { +func queueWebhookNotifications(notificationsByUserID map[uint64]map[types.EventName][]types.Notification) error { for userID, userNotifications := range notificationsByUserID { var webhooks []types.UserWebhook - err := useDB.Select(&webhooks, ` + err := db.FrontendWriterDB.Select(&webhooks, ` SELECT id, user_id, @@ -849,7 +848,7 @@ func queueWebhookNotifications(notificationsByUserID map[uint64]map[types.EventN if len(notifications) > 0 { // reset Retries if w.Retries > 5 && w.LastSent.Valid && w.LastSent.Time.Add(time.Hour).Before(time.Now()) { - _, err = useDB.Exec(`UPDATE users_webhooks SET retries = 0 WHERE id = $1;`, w.ID) + _, err = db.FrontendWriterDB.Exec(`UPDATE users_webhooks SET retries = 0 WHERE id = $1;`, w.ID) if err != nil { logger.WithError(err).Errorf("error updating users_webhooks table; setting retries to zero") continue @@ -926,7 +925,7 @@ func queueWebhookNotifications(notificationsByUserID map[uint64]map[types.EventN } // process notifs for _, n := range notifs { - _, err = useDB.Exec(`INSERT INTO notification_queue (created, channel, content) VALUES (now(), $1, $2);`, n.Channel, n.Content) + _, err = db.WriterDb.Exec(`INSERT INTO notification_queue (created, channel, content) VALUES (now(), $1, $2);`, n.Channel, n.Content) if err != nil { logger.WithError(err).Errorf("error inserting into webhooks_queue") } else { @@ -936,7 +935,7 @@ func queueWebhookNotifications(notificationsByUserID map[uint64]map[types.EventN // process discord notifs for _, dNotifs := range discordNotifMap { for _, n := range dNotifs { - _, err = useDB.Exec(`INSERT INTO notification_queue (created, channel, content) VALUES (now(), 'webhook_discord', $1);`, n) + _, err = db.WriterDb.Exec(`INSERT INTO notification_queue (created, channel, content) VALUES (now(), 'webhook_discord', $1);`, n) if err != nil { logger.WithError(err).Errorf("error inserting into webhooks_queue (discord)") continue @@ -949,10 +948,10 @@ func queueWebhookNotifications(notificationsByUserID map[uint64]map[types.EventN return nil } -func sendWebhookNotifications(useDB *sqlx.DB) error { +func sendWebhookNotifications() error { var notificationQueueItem []types.TransitWebhook - err := useDB.Select(¬ificationQueueItem, `SELECT + err := db.WriterDb.Select(¬ificationQueueItem, `SELECT id, created, sent, @@ -969,7 +968,7 @@ func sendWebhookNotifications(useDB *sqlx.DB) error { for _, n := range notificationQueueItem { // do not retry after 5 attempts if n.Content.Webhook.Retries > 5 { - _, err := db.FrontendWriterDB.Exec(`DELETE FROM notification_queue WHERE id = $1`, n.Id) + _, err := db.WriterDb.Exec(`DELETE FROM notification_queue WHERE id = $1`, n.Id) if err != nil { return fmt.Errorf("error deleting from notification queue: %w", err) } @@ -985,7 +984,7 @@ func sendWebhookNotifications(useDB *sqlx.DB) error { _, err = url.Parse(n.Content.Webhook.Url) if err != nil { - _, err := db.FrontendWriterDB.Exec(`DELETE FROM notification_queue WHERE id = $1`, n.Id) + _, err := db.WriterDb.Exec(`DELETE FROM notification_queue WHERE id = $1`, n.Id) if err != nil { return fmt.Errorf("error deleting from notification queue: %w", err) } @@ -1003,14 +1002,14 @@ func sendWebhookNotifications(useDB *sqlx.DB) error { metrics.NotificationsSent.WithLabelValues("webhook", resp.Status).Inc() } - _, err = useDB.Exec(`UPDATE notification_queue SET sent = now() WHERE id = $1`, n.Id) + _, err = db.WriterDb.Exec(`UPDATE notification_queue SET sent = now() WHERE id = $1`, n.Id) if err != nil { logger.WithError(err).Errorf("error updating notification_queue table") return } if resp != nil && resp.StatusCode < 400 { - _, err = useDB.Exec(`UPDATE users_webhooks SET retries = 0, last_sent = now() WHERE id = $1;`, n.Content.Webhook.ID) + _, err = db.FrontendWriterDB.Exec(`UPDATE users_webhooks SET retries = 0, last_sent = now() WHERE id = $1;`, n.Content.Webhook.ID) if err != nil { logger.WithError(err).Errorf("error updating users_webhooks table; setting retries to zero") return @@ -1028,7 +1027,7 @@ func sendWebhookNotifications(useDB *sqlx.DB) error { errResp.Body = string(b) } - _, err = useDB.Exec(`UPDATE users_webhooks SET retries = retries + 1, last_sent = now(), request = $2, response = $3 WHERE id = $1;`, n.Content.Webhook.ID, n.Content, errResp) + _, err = db.FrontendWriterDB.Exec(`UPDATE users_webhooks SET retries = retries + 1, last_sent = now(), request = $2, response = $3 WHERE id = $1;`, n.Content.Webhook.ID, n.Content, errResp) if err != nil { logger.WithError(err).Errorf("error updating users_webhooks table; increasing retries") return @@ -1040,10 +1039,10 @@ func sendWebhookNotifications(useDB *sqlx.DB) error { return nil } -func sendDiscordNotifications(useDB *sqlx.DB) error { +func sendDiscordNotifications() error { var notificationQueueItem []types.TransitDiscord - err := useDB.Select(¬ificationQueueItem, `SELECT + err := db.WriterDb.Select(¬ificationQueueItem, `SELECT id, created, sent, @@ -1064,7 +1063,7 @@ func sendDiscordNotifications(useDB *sqlx.DB) error { for _, n := range notificationQueueItem { // purge the event from existence if the retry counter is over 5 if n.Content.Webhook.Retries > 5 { - db.FrontendWriterDB.Exec(`DELETE FROM notification_queue where id = $1`, n.Id) + db.WriterDb.Exec(`DELETE FROM notification_queue where id = $1`, n.Id) continue } if _, exists := webhookMap[n.Content.Webhook.ID]; !exists { @@ -1079,7 +1078,7 @@ func sendDiscordNotifications(useDB *sqlx.DB) error { go func(webhook types.UserWebhook, reqs []types.TransitDiscord) { defer func() { // update retries counters in db based on end result - _, err = useDB.Exec(`UPDATE users_webhooks SET retries = $1, last_sent = now() WHERE id = $2;`, webhook.Retries, webhook.ID) + _, err = db.FrontendWriterDB.Exec(`UPDATE users_webhooks SET retries = $1, last_sent = now() WHERE id = $2;`, webhook.Retries, webhook.ID) if err != nil { logger.Warnf("failed to update retries counter to %v for webhook %v: %v", webhook.Retries, webhook.ID, err) } @@ -1089,7 +1088,7 @@ func sendDiscordNotifications(useDB *sqlx.DB) error { for _, req := range reqs { ids = append(ids, req.Id) } - _, err = db.FrontendWriterDB.Exec(`UPDATE notification_queue SET sent = now() where id = ANY($1)`, pq.Array(ids)) + _, err = db.WriterDb.Exec(`UPDATE notification_queue SET sent = now() where id = ANY($1)`, pq.Array(ids)) if err != nil { logger.Warnf("failed to update sent for notifcations in queue: %v", err) } @@ -1142,7 +1141,7 @@ func sendDiscordNotifications(useDB *sqlx.DB) error { } else { utils.LogError(nil, "error pushing discord webhook", 0, map[string]interface{}{"errResp.Body": errResp.Body, "webhook.Url": webhook.Url}) } - _, err = useDB.Exec(`UPDATE users_webhooks SET request = $2, response = $3 WHERE id = $1;`, webhook.ID, reqs[i].Content.DiscordRequest, errResp) + _, err = db.FrontendWriterDB.Exec(`UPDATE users_webhooks SET request = $2, response = $3 WHERE id = $1;`, webhook.ID, reqs[i].Content.DiscordRequest, errResp) if err != nil { logger.Errorf("error storing failure data in users_webhooks table: %v", err) }