Skip to content

Commit

Permalink
Merge pull request #132 from getAlby/task-eos
Browse files Browse the repository at this point in the history
fix: only publish and wait till eos if unpublished
  • Loading branch information
im-adithya authored Dec 20, 2024
2 parents 308c12d + 49b5a30 commit 7e89304
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 61 deletions.
49 changes: 25 additions & 24 deletions internal/nostr/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,30 +25,31 @@ const (
)

type Subscription struct {
ID uint
RelayUrl string
WebhookUrl string
PushToken string
IsIOS bool
Open bool
Ids *[]string `gorm:"-"`
Kinds *[]int `gorm:"-"`
Authors *[]string `gorm:"-"` // WalletPubkey is included in this
Tags *nostr.TagMap `gorm:"-"` // RequestEvent ID goes in the "e" tag
Since time.Time
Until time.Time
Limit int
Search string
CreatedAt time.Time
UpdatedAt time.Time
Uuid string `gorm:"type:uuid;default:gen_random_uuid()"`
EventChan chan *nostr.Event `gorm:"-"`
RequestEvent *RequestEvent `gorm:"-"`

IdsJson json.RawMessage `gorm:"type:jsonb"`
KindsJson json.RawMessage `gorm:"type:jsonb"`
AuthorsJson json.RawMessage `gorm:"type:jsonb"`
TagsJson json.RawMessage `gorm:"type:jsonb"`
ID uint
RelayUrl string
WebhookUrl string
PushToken string
IsIOS bool
Open bool
Ids *[]string `gorm:"-"`
Kinds *[]int `gorm:"-"`
Authors *[]string `gorm:"-"` // WalletPubkey is included in this
Tags *nostr.TagMap `gorm:"-"` // RequestEvent ID goes in the "e" tag
Since time.Time
Until time.Time
Limit int
Search string
CreatedAt time.Time
UpdatedAt time.Time
Uuid string `gorm:"type:uuid;default:gen_random_uuid()"`
EventChan chan *nostr.Event `gorm:"-"`
RequestEvent *RequestEvent `gorm:"-"`
RelaySubscription *nostr.Subscription `gorm:"-"`

IdsJson json.RawMessage `gorm:"type:jsonb"`
KindsJson json.RawMessage `gorm:"type:jsonb"`
AuthorsJson json.RawMessage `gorm:"type:jsonb"`
TagsJson json.RawMessage `gorm:"type:jsonb"`
}

func (s *Subscription) BeforeSave(tx *gorm.DB) error {
Expand Down
78 changes: 42 additions & 36 deletions internal/nostr/nostr.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@ type Service struct {
Relay *nostr.Relay
Cfg *Config
Logger *logrus.Logger
subscriptions map[string]*nostr.Subscription
subscriptionsMutex sync.Mutex
relayMutex sync.Mutex
client *expo.PushClient
subCancelFnMap map[string]context.CancelFunc
}

func NewService(ctx context.Context) (*Service, error) {
Expand Down Expand Up @@ -116,8 +116,6 @@ func NewService(ctx context.Context) (*Service, error) {
return nil, err
}

subscriptions := make(map[string]*nostr.Subscription)

client := expo.NewPushClient(&expo.ClientConfig{
Host: "https://api.expo.dev",
APIURL: "/v2",
Expand All @@ -131,7 +129,6 @@ func NewService(ctx context.Context) (*Service, error) {
Wg: &wg,
Logger: logger,
Relay: relay,
subscriptions: subscriptions,
client: client,
}

Expand All @@ -142,7 +139,7 @@ func NewService(ctx context.Context) (*Service, error) {
logger.WithError(err).Error("Failed to query open subscriptions")
return nil, err
}

cancelFnMap := make(map[string]context.CancelFunc)
for _, sub := range openSubscriptions {
// Create a copy of the loop variable to
// avoid passing address of the same variable
Expand All @@ -151,8 +148,11 @@ func NewService(ctx context.Context) (*Service, error) {
if sub.PushToken != "" {
handleEvent = svc.handleSubscribedEventForPushNotification
}
go svc.startSubscription(svc.Ctx, &subscription, nil, handleEvent)
subCtx, subCancelFn := context.WithCancel(svc.Ctx)
cancelFnMap[subscription.Uuid] = subCancelFn
go svc.startSubscription(subCtx, &subscription, nil, handleEvent)
}
svc.subCancelFnMap = cancelFnMap

return svc, nil
}
Expand Down Expand Up @@ -571,7 +571,11 @@ func (svc *Service) NIP47NotificationHandler(c echo.Context) error {
})
}

go svc.startSubscription(svc.Ctx, &subscription, nil, svc.handleSubscribedEvent)
subCtx, subCancelFn := context.WithCancel(svc.Ctx)
svc.subscriptionsMutex.Lock()
svc.subCancelFnMap[subscription.Uuid] = subCancelFn
svc.subscriptionsMutex.Unlock()
go svc.startSubscription(subCtx, &subscription, nil, svc.handleSubscribedEvent)

return c.JSON(http.StatusOK, SubscriptionResponse{
SubscriptionId: subscription.Uuid,
Expand Down Expand Up @@ -630,7 +634,11 @@ func (svc *Service) SubscriptionHandler(c echo.Context) error {
})
}

go svc.startSubscription(svc.Ctx, &subscription, nil, svc.handleSubscribedEvent)
subCtx, subCancelFn := context.WithCancel(svc.Ctx)
svc.subscriptionsMutex.Lock()
svc.subCancelFnMap[subscription.Uuid] = subCancelFn
svc.subscriptionsMutex.Unlock()
go svc.startSubscription(subCtx, &subscription, nil, svc.handleSubscribedEvent)

return c.JSON(http.StatusOK, SubscriptionResponse{
SubscriptionId: subscription.Uuid,
Expand Down Expand Up @@ -688,14 +696,17 @@ func (svc *Service) StopSubscriptionHandler(c echo.Context) error {

func (svc *Service) stopSubscription(subscription *Subscription) error {
svc.subscriptionsMutex.Lock()
sub, exists := svc.subscriptions[subscription.Uuid]
cancelFn, exists := svc.subCancelFnMap[subscription.Uuid]
svc.subscriptionsMutex.Unlock()
if exists {
sub.Unsub()
delete(svc.subscriptions, subscription.Uuid)
cancelFn()
}

if subscription.RelaySubscription != nil {
subscription.RelaySubscription.Unsub()
}
svc.subscriptionsMutex.Unlock()

if (!exists && !subscription.Open) {
if (!subscription.Open) {
return errors.New(SUBSCRIPTION_ALREADY_CLOSED)
}

Expand Down Expand Up @@ -738,14 +749,12 @@ func (svc *Service) startSubscription(ctx context.Context, subscription *Subscri
continue
}

sub, err := relay.Subscribe(ctx, []nostr.Filter{*filter})
relaySubscription, err := relay.Subscribe(ctx, []nostr.Filter{*filter})
if err != nil {
continue
}

svc.subscriptionsMutex.Lock()
svc.subscriptions[subscription.Uuid] = sub
svc.subscriptionsMutex.Unlock()
subscription.RelaySubscription = relaySubscription

svc.Logger.WithFields(logrus.Fields{
"request_event_id": requestEventId,
Expand Down Expand Up @@ -784,10 +793,8 @@ func (svc *Service) startSubscription(ctx context.Context, subscription *Subscri
func (svc *Service) publishRequestEvent(ctx context.Context, subscription *Subscription) {
walletPubkey, clientPubkey := getPubkeys(subscription)

svc.subscriptionsMutex.Lock()
sub := svc.subscriptions[subscription.Uuid]
svc.subscriptionsMutex.Unlock()
err := sub.Relay.Publish(ctx, *subscription.RequestEvent.SignedEvent)
relaySubscription := subscription.RelaySubscription
err := relaySubscription.Relay.Publish(ctx, *subscription.RequestEvent.SignedEvent)
if err != nil {
// TODO: notify user about publish failure
svc.Logger.WithError(err).WithFields(logrus.Fields{
Expand All @@ -798,7 +805,7 @@ func (svc *Service) publishRequestEvent(ctx context.Context, subscription *Subsc
"client_pubkey": clientPubkey,
}).Error("Failed to publish to relay")
subscription.RequestEvent.State = REQUEST_EVENT_PUBLISH_FAILED
sub.Unsub()
relaySubscription.Unsub()
} else {
svc.Logger.WithFields(logrus.Fields{
"subscription_id": subscription.Uuid,
Expand Down Expand Up @@ -855,24 +862,23 @@ func (svc *Service) handleSubscribedEvent(event *nostr.Event, subscription *Subs
}

func (svc *Service) processEvents(ctx context.Context, subscription *Subscription, onReceiveEOS OnReceiveEOSFunc, handleEvent HandleEventFunc) error {
svc.subscriptionsMutex.Lock()
sub := svc.subscriptions[subscription.Uuid]
svc.subscriptionsMutex.Unlock()
relaySubscription := subscription.RelaySubscription

go func(){
// block till EOS is received
<-sub.EndOfStoredEvents
svc.Logger.WithFields(logrus.Fields{
"subscription_id": subscription.Uuid,
"relay_url": subscription.RelayUrl,
}).Debug("Received EOS")
// block till EOS is received for nip 47 handlers
// only if request event is not yet published
if (onReceiveEOS != nil && subscription.RequestEvent.State != REQUEST_EVENT_PUBLISH_CONFIRMED) {
<-relaySubscription.EndOfStoredEvents
svc.Logger.WithFields(logrus.Fields{
"subscription_id": subscription.Uuid,
"relay_url": subscription.RelayUrl,
}).Debug("Received EOS")

if (onReceiveEOS != nil) {
onReceiveEOS(ctx, subscription)
}

// loop through incoming events
for event := range sub.Events {
for event := range relaySubscription.Events {
go handleEvent(event, subscription)
}

Expand All @@ -883,11 +889,11 @@ func (svc *Service) processEvents(ctx context.Context, subscription *Subscriptio
}()

select {
case <-sub.Relay.Context().Done():
return sub.Relay.ConnectionError
case <-relaySubscription.Relay.Context().Done():
return relaySubscription.Relay.ConnectionError
case <-ctx.Done():
return nil
case <-sub.Context.Done():
case <-relaySubscription.Context.Done():
return nil
}
}
Expand Down
7 changes: 6 additions & 1 deletion internal/nostr/push.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package nostr

import (
"context"
"net/http"
"time"

Expand Down Expand Up @@ -107,7 +108,11 @@ func (svc *Service) NIP47PushNotificationHandler(c echo.Context) error {
})
}

go svc.startSubscription(svc.Ctx, &subscription, nil, svc.handleSubscribedEventForPushNotification)
subCtx, subCancelFn := context.WithCancel(svc.Ctx)
svc.subscriptionsMutex.Lock()
svc.subCancelFnMap[subscription.Uuid] = subCancelFn
svc.subscriptionsMutex.Unlock()
go svc.startSubscription(subCtx, &subscription, nil, svc.handleSubscribedEventForPushNotification)

return c.JSON(http.StatusOK, PushSubscriptionResponse{
SubscriptionId: subscription.Uuid,
Expand Down

0 comments on commit 7e89304

Please sign in to comment.