From dd8e6951436a8101c8bb33182f291e216da68a77 Mon Sep 17 00:00:00 2001 From: im-adithya Date: Fri, 20 Dec 2024 02:17:56 +0530 Subject: [PATCH 1/3] fix: only publish and wait till eos if unpublished --- internal/nostr/nostr.go | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/internal/nostr/nostr.go b/internal/nostr/nostr.go index a7eed77..48b68ff 100644 --- a/internal/nostr/nostr.go +++ b/internal/nostr/nostr.go @@ -865,14 +865,15 @@ func (svc *Service) processEvents(ctx context.Context, subscription *Subscriptio svc.subscriptionsMutex.Unlock() go func(){ - // block till EOS is received - <-sub.EndOfStoredEvents - svc.Logger.WithFields(logrus.Fields{ - "subscription_id": subscription.ID, - "relay_url": subscription.RelayUrl, - }).Debug("Received EOS") + // block till EOS is received for nip 47 handlers + // only if request event is not yet published + if (onReceiveEOS != nil && subscription.RequestEvent.State != REQUEST_EVENT_PUBLISH_CONFIRMED) { + <-sub.EndOfStoredEvents + svc.Logger.WithFields(logrus.Fields{ + "subscription_id": subscription.ID, + "relay_url": subscription.RelayUrl, + }).Debug("Received EOS") - if (onReceiveEOS != nil) { onReceiveEOS(ctx, subscription) } From 192ad371a87ab7e1c2c3226e1b18063dd0cfc894 Mon Sep 17 00:00:00 2001 From: im-adithya Date: Fri, 20 Dec 2024 14:21:21 +0530 Subject: [PATCH 2/3] chore: remove all mutexes and store in subscription struct --- internal/nostr/models.go | 49 ++++++++++++++++++++-------------------- internal/nostr/nostr.go | 42 ++++++++++++---------------------- 2 files changed, 39 insertions(+), 52 deletions(-) diff --git a/internal/nostr/models.go b/internal/nostr/models.go index 756f3b6..7ca9f02 100644 --- a/internal/nostr/models.go +++ b/internal/nostr/models.go @@ -25,30 +25,31 @@ const ( ) type Subscription struct { - ID uint - RelayUrl string - WebhookUrl string - PushToken string - IsIOS bool - Open bool - Ids *[]string `gorm:"-"` - Kinds *[]int `gorm:"-"` - Authors *[]string `gorm:"-"` // WalletPubkey is included in this - Tags *nostr.TagMap `gorm:"-"` // RequestEvent ID goes in the "e" tag - Since time.Time - Until time.Time - Limit int - Search string - CreatedAt time.Time - UpdatedAt time.Time - Uuid string `gorm:"type:uuid;default:gen_random_uuid()"` - EventChan chan *nostr.Event `gorm:"-"` - RequestEvent *RequestEvent `gorm:"-"` - - IdsJson json.RawMessage `gorm:"type:jsonb"` - KindsJson json.RawMessage `gorm:"type:jsonb"` - AuthorsJson json.RawMessage `gorm:"type:jsonb"` - TagsJson json.RawMessage `gorm:"type:jsonb"` + ID uint + RelayUrl string + WebhookUrl string + PushToken string + IsIOS bool + Open bool + Ids *[]string `gorm:"-"` + Kinds *[]int `gorm:"-"` + Authors *[]string `gorm:"-"` // WalletPubkey is included in this + Tags *nostr.TagMap `gorm:"-"` // RequestEvent ID goes in the "e" tag + Since time.Time + Until time.Time + Limit int + Search string + CreatedAt time.Time + UpdatedAt time.Time + Uuid string `gorm:"type:uuid;default:gen_random_uuid()"` + EventChan chan *nostr.Event `gorm:"-"` + RequestEvent *RequestEvent `gorm:"-"` + RelaySubscription *nostr.Subscription `gorm:"-"` + + IdsJson json.RawMessage `gorm:"type:jsonb"` + KindsJson json.RawMessage `gorm:"type:jsonb"` + AuthorsJson json.RawMessage `gorm:"type:jsonb"` + TagsJson json.RawMessage `gorm:"type:jsonb"` } func (s *Subscription) BeforeSave(tx *gorm.DB) error { diff --git a/internal/nostr/nostr.go b/internal/nostr/nostr.go index 48b68ff..da521e1 100644 --- a/internal/nostr/nostr.go +++ b/internal/nostr/nostr.go @@ -47,7 +47,6 @@ type Service struct { Relay *nostr.Relay Cfg *Config Logger *logrus.Logger - subscriptions map[string]*nostr.Subscription subscriptionsMutex sync.Mutex relayMutex sync.Mutex client *expo.PushClient @@ -116,8 +115,6 @@ func NewService(ctx context.Context) (*Service, error) { return nil, err } - subscriptions := make(map[string]*nostr.Subscription) - client := expo.NewPushClient(&expo.ClientConfig{ Host: "https://api.expo.dev", APIURL: "/v2", @@ -131,7 +128,6 @@ func NewService(ctx context.Context) (*Service, error) { Wg: &wg, Logger: logger, Relay: relay, - subscriptions: subscriptions, client: client, } @@ -685,15 +681,11 @@ func (svc *Service) StopSubscriptionHandler(c echo.Context) error { } func (svc *Service) stopSubscription(subscription *Subscription) error { - svc.subscriptionsMutex.Lock() - sub, exists := svc.subscriptions[subscription.Uuid] - if exists { - sub.Unsub() - delete(svc.subscriptions, subscription.Uuid) + if subscription.RelaySubscription != nil { + subscription.RelaySubscription.Unsub() } - svc.subscriptionsMutex.Unlock() - if (!exists && !subscription.Open) { + if (!subscription.Open) { return errors.New(SUBSCRIPTION_ALREADY_CLOSED) } @@ -739,7 +731,7 @@ func (svc *Service) startSubscription(ctx context.Context, subscription *Subscri continue } - sub, err := relay.Subscribe(ctx, []nostr.Filter{*filter}) + relaySubscription, err := relay.Subscribe(ctx, []nostr.Filter{*filter}) if err != nil { // TODO: notify user about subscription failure waitToReconnectSeconds = max(waitToReconnectSeconds, 1) @@ -751,9 +743,7 @@ func (svc *Service) startSubscription(ctx context.Context, subscription *Subscri continue } - svc.subscriptionsMutex.Lock() - svc.subscriptions[subscription.Uuid] = sub - svc.subscriptionsMutex.Unlock() + subscription.RelaySubscription = relaySubscription svc.Logger.WithFields(logrus.Fields{ "subscription_id": subscription.ID, @@ -791,10 +781,8 @@ func (svc *Service) startSubscription(ctx context.Context, subscription *Subscri func (svc *Service) publishRequestEvent(ctx context.Context, subscription *Subscription) { walletPubkey, clientPubkey := getPubkeys(subscription) - svc.subscriptionsMutex.Lock() - sub := svc.subscriptions[subscription.Uuid] - svc.subscriptionsMutex.Unlock() - err := sub.Relay.Publish(ctx, *subscription.RequestEvent.SignedEvent) + relaySubscription := subscription.RelaySubscription + err := relaySubscription.Relay.Publish(ctx, *subscription.RequestEvent.SignedEvent) if err != nil { // TODO: notify user about publish failure svc.Logger.WithError(err).WithFields(logrus.Fields{ @@ -804,7 +792,7 @@ func (svc *Service) publishRequestEvent(ctx context.Context, subscription *Subsc "client_pubkey": clientPubkey, }).Error("Failed to publish to relay") subscription.RequestEvent.State = REQUEST_EVENT_PUBLISH_FAILED - sub.Unsub() + relaySubscription.Unsub() } else { svc.Logger.WithFields(logrus.Fields{ "request_event_id": subscription.RequestEvent.NostrId, @@ -860,15 +848,13 @@ func (svc *Service) handleSubscribedEvent(event *nostr.Event, subscription *Subs } func (svc *Service) processEvents(ctx context.Context, subscription *Subscription, onReceiveEOS OnReceiveEOSFunc, handleEvent HandleEventFunc) error { - svc.subscriptionsMutex.Lock() - sub := svc.subscriptions[subscription.Uuid] - svc.subscriptionsMutex.Unlock() + relaySubscription := subscription.RelaySubscription go func(){ // block till EOS is received for nip 47 handlers // only if request event is not yet published if (onReceiveEOS != nil && subscription.RequestEvent.State != REQUEST_EVENT_PUBLISH_CONFIRMED) { - <-sub.EndOfStoredEvents + <-relaySubscription.EndOfStoredEvents svc.Logger.WithFields(logrus.Fields{ "subscription_id": subscription.ID, "relay_url": subscription.RelayUrl, @@ -878,7 +864,7 @@ func (svc *Service) processEvents(ctx context.Context, subscription *Subscriptio } // loop through incoming events - for event := range sub.Events { + for event := range relaySubscription.Events { go handleEvent(event, subscription) } @@ -889,11 +875,11 @@ func (svc *Service) processEvents(ctx context.Context, subscription *Subscriptio }() select { - case <-sub.Relay.Context().Done(): - return sub.Relay.ConnectionError + case <-relaySubscription.Relay.Context().Done(): + return relaySubscription.Relay.ConnectionError case <-ctx.Done(): return nil - case <-sub.Context.Done(): + case <-relaySubscription.Context.Done(): return nil } } From 924c065a87929acfc828d224da7fd259d9bba946 Mon Sep 17 00:00:00 2001 From: im-adithya Date: Fri, 20 Dec 2024 20:59:27 +0530 Subject: [PATCH 3/3] chore: add subCancelFnMap --- internal/nostr/nostr.go | 27 +++++++++++++++++++++++---- internal/nostr/push.go | 7 ++++++- 2 files changed, 29 insertions(+), 5 deletions(-) diff --git a/internal/nostr/nostr.go b/internal/nostr/nostr.go index da521e1..b2b3aa0 100644 --- a/internal/nostr/nostr.go +++ b/internal/nostr/nostr.go @@ -50,6 +50,7 @@ type Service struct { subscriptionsMutex sync.Mutex relayMutex sync.Mutex client *expo.PushClient + subCancelFnMap map[string]context.CancelFunc } func NewService(ctx context.Context) (*Service, error) { @@ -138,7 +139,7 @@ func NewService(ctx context.Context) (*Service, error) { logger.WithError(err).Error("Failed to query open subscriptions") return nil, err } - + cancelFnMap := make(map[string]context.CancelFunc) for _, sub := range openSubscriptions { // Create a copy of the loop variable to // avoid passing address of the same variable @@ -147,8 +148,11 @@ func NewService(ctx context.Context) (*Service, error) { if sub.PushToken != "" { handleEvent = svc.handleSubscribedEventForPushNotification } - go svc.startSubscription(svc.Ctx, &subscription, nil, handleEvent) + subCtx, subCancelFn := context.WithCancel(svc.Ctx) + cancelFnMap[subscription.Uuid] = subCancelFn + go svc.startSubscription(subCtx, &subscription, nil, handleEvent) } + svc.subCancelFnMap = cancelFnMap return svc, nil } @@ -565,7 +569,11 @@ func (svc *Service) NIP47NotificationHandler(c echo.Context) error { }) } - go svc.startSubscription(svc.Ctx, &subscription, nil, svc.handleSubscribedEvent) + subCtx, subCancelFn := context.WithCancel(svc.Ctx) + svc.subscriptionsMutex.Lock() + svc.subCancelFnMap[subscription.Uuid] = subCancelFn + svc.subscriptionsMutex.Unlock() + go svc.startSubscription(subCtx, &subscription, nil, svc.handleSubscribedEvent) return c.JSON(http.StatusOK, SubscriptionResponse{ SubscriptionId: subscription.Uuid, @@ -624,7 +632,11 @@ func (svc *Service) SubscriptionHandler(c echo.Context) error { }) } - go svc.startSubscription(svc.Ctx, &subscription, nil, svc.handleSubscribedEvent) + subCtx, subCancelFn := context.WithCancel(svc.Ctx) + svc.subscriptionsMutex.Lock() + svc.subCancelFnMap[subscription.Uuid] = subCancelFn + svc.subscriptionsMutex.Unlock() + go svc.startSubscription(subCtx, &subscription, nil, svc.handleSubscribedEvent) return c.JSON(http.StatusOK, SubscriptionResponse{ SubscriptionId: subscription.Uuid, @@ -681,6 +693,13 @@ func (svc *Service) StopSubscriptionHandler(c echo.Context) error { } func (svc *Service) stopSubscription(subscription *Subscription) error { + svc.subscriptionsMutex.Lock() + cancelFn, exists := svc.subCancelFnMap[subscription.Uuid] + svc.subscriptionsMutex.Unlock() + if exists { + cancelFn() + } + if subscription.RelaySubscription != nil { subscription.RelaySubscription.Unsub() } diff --git a/internal/nostr/push.go b/internal/nostr/push.go index 1439f90..d473528 100644 --- a/internal/nostr/push.go +++ b/internal/nostr/push.go @@ -1,6 +1,7 @@ package nostr import ( + "context" "net/http" "time" @@ -107,7 +108,11 @@ func (svc *Service) NIP47PushNotificationHandler(c echo.Context) error { }) } - go svc.startSubscription(svc.Ctx, &subscription, nil, svc.handleSubscribedEventForPushNotification) + subCtx, subCancelFn := context.WithCancel(svc.Ctx) + svc.subscriptionsMutex.Lock() + svc.subCancelFnMap[subscription.Uuid] = subCancelFn + svc.subscriptionsMutex.Unlock() + go svc.startSubscription(subCtx, &subscription, nil, svc.handleSubscribedEventForPushNotification) return c.JSON(http.StatusOK, PushSubscriptionResponse{ SubscriptionId: subscription.Uuid,