Skip to content

Commit

Permalink
chore: use uuid to stop saving unnecessary subscriptions
Browse files Browse the repository at this point in the history
  • Loading branch information
im-adithya committed Jun 1, 2024
1 parent 9b53494 commit ac691e4
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 30 deletions.
43 changes: 14 additions & 29 deletions internal/nostr/nostr.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"sync"
"time"

"github.com/google/uuid"
"github.com/joho/godotenv"
"github.com/kelseyhightower/envconfig"
"github.com/labstack/echo/v4"
Expand Down Expand Up @@ -44,7 +45,7 @@ type Service struct {
Relay *nostr.Relay
Cfg *Config
Logger *logrus.Logger
subscriptions map[uint]*nostr.Subscription
subscriptions map[string]*nostr.Subscription
subscriptionsMutex sync.Mutex
relayMutex sync.Mutex
}
Expand Down Expand Up @@ -112,7 +113,7 @@ func NewService(ctx context.Context) (*Service, error) {
return nil, err
}

subscriptions := make(map[uint]*nostr.Subscription)
subscriptions := make(map[string]*nostr.Subscription)

var wg sync.WaitGroup
svc := &Service{
Expand Down Expand Up @@ -321,19 +322,12 @@ func (svc *Service) NIP47Handler(c echo.Context) error {
})
}

subscription, err := svc.prepareNIP47Subscription(NIP47WebhookRequest{
subscription := svc.prepareNIP47Subscription(NIP47WebhookRequest{
RelayUrl: requestData.RelayUrl,
WalletPubkey: requestData.WalletPubkey,
SignedEvent: requestData.SignedEvent,
}, requestEvent)

if err != nil {
return c.JSON(http.StatusInternalServerError, ErrorResponse{
Message: "Failed to store subscription",
Error: err.Error(),
})
}

ctx, cancel := context.WithTimeout(c.Request().Context(), 90*time.Second)
defer cancel()
go svc.startSubscription(ctx, &subscription, svc.publishEvent, svc.handleResponseEvent)
Expand Down Expand Up @@ -424,14 +418,7 @@ func (svc *Service) NIP47WebhookHandler(c echo.Context) error {
})
}

subscription, err := svc.prepareNIP47Subscription(requestData, requestEvent)

if err != nil {
return c.JSON(http.StatusInternalServerError, ErrorResponse{
Message: "Failed to store subscription",
Error: err.Error(),
})
}
subscription := svc.prepareNIP47Subscription(requestData, requestEvent)

ctx, cancel := context.WithTimeout(svc.Ctx, 90*time.Second)
defer cancel()
Expand All @@ -442,8 +429,8 @@ func (svc *Service) NIP47WebhookHandler(c echo.Context) error {
})
}

func (svc *Service) prepareNIP47Subscription(requestData NIP47WebhookRequest, requestEvent RequestEvent) (Subscription, error) {
subscription := Subscription{
func (svc *Service) prepareNIP47Subscription(requestData NIP47WebhookRequest, requestEvent RequestEvent) (Subscription) {
return Subscription{
RelayUrl: requestData.RelayUrl,
WebhookUrl: requestData.WebhookUrl,
Open: true,
Expand All @@ -455,9 +442,8 @@ func (svc *Service) prepareNIP47Subscription(requestData NIP47WebhookRequest, re
RequestEvent: requestData.SignedEvent,
RequestEventDB: requestEvent,
EventChan: make(chan *nostr.Event, 1),
Uuid: uuid.New().String(),
}

return subscription, svc.db.Create(&subscription).Error
}

func (svc *Service) NIP47NotificationHandler(c echo.Context) error {
Expand Down Expand Up @@ -627,10 +613,10 @@ func (svc *Service) StopSubscriptionHandler(c echo.Context) error {

func (svc *Service) stopSubscription(subscription *Subscription) error {
svc.subscriptionsMutex.Lock()
sub, exists := svc.subscriptions[subscription.ID]
sub, exists := svc.subscriptions[subscription.Uuid]
if exists {
sub.Unsub()
delete(svc.subscriptions, subscription.ID)
delete(svc.subscriptions, subscription.Uuid)
}
svc.subscriptionsMutex.Unlock()

Expand Down Expand Up @@ -686,7 +672,7 @@ func (svc *Service) startSubscription(ctx context.Context, subscription *Subscri
}

svc.subscriptionsMutex.Lock()
svc.subscriptions[subscription.ID] = sub
svc.subscriptions[subscription.Uuid] = sub
svc.subscriptionsMutex.Unlock()

svc.Logger.WithFields(logrus.Fields{
Expand Down Expand Up @@ -728,7 +714,7 @@ func (svc *Service) startSubscription(ctx context.Context, subscription *Subscri

func (svc *Service) publishEvent(ctx context.Context, subscription *Subscription) {
svc.subscriptionsMutex.Lock()
sub := svc.subscriptions[subscription.ID]
sub := svc.subscriptions[subscription.Uuid]
svc.subscriptionsMutex.Unlock()
err := sub.Relay.Publish(ctx, *subscription.RequestEvent)
if err != nil {
Expand Down Expand Up @@ -758,7 +744,6 @@ func (svc *Service) handleResponseEvent(event *nostr.Event, subscription *Subscr
Content: event.Content,
RepliedAt: event.CreatedAt.Time(),
RequestId: &subscription.RequestEventDB.ID,
SubscriptionId: &subscription.ID,
}
svc.db.Save(&responseEvent)
if subscription.WebhookUrl != "" {
Expand All @@ -767,7 +752,7 @@ func (svc *Service) handleResponseEvent(event *nostr.Event, subscription *Subscr
subscription.EventChan <- event
}
svc.subscriptionsMutex.Lock()
svc.subscriptions[subscription.ID].Unsub()
svc.subscriptions[subscription.Uuid].Unsub()
svc.subscriptionsMutex.Unlock()
}

Expand All @@ -789,7 +774,7 @@ func (svc *Service) handleSubscribedEvent(event *nostr.Event, subscription *Subs

func (svc *Service) processEvents(ctx context.Context, subscription *Subscription, onReceiveEOS OnReceiveEOSFunc, handleEvent HandleEventFunc) error {
svc.subscriptionsMutex.Lock()
sub := svc.subscriptions[subscription.ID]
sub := svc.subscriptions[subscription.Uuid]
svc.subscriptionsMutex.Unlock()

go func(){
Expand Down
2 changes: 1 addition & 1 deletion internal/nostr/nostr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func setupTestService() *Service {
Wg: &wg,
Logger: logger,
Relay: relay,
subscriptions: make(map[uint]*nostr.Subscription),
subscriptions: make(map[string]*nostr.Subscription),
}

privateKey = nostr.GeneratePrivateKey()
Expand Down

0 comments on commit ac691e4

Please sign in to comment.