Skip to content

Commit

Permalink
Merge branch 'main' into task-eos
Browse files Browse the repository at this point in the history
  • Loading branch information
im-adithya committed Dec 20, 2024
2 parents 0aec5b8 + 308c12d commit 49b5a30
Showing 1 changed file with 108 additions and 73 deletions.
181 changes: 108 additions & 73 deletions internal/nostr/nostr.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func (svc *Service) InfoHandler(c echo.Context) error {

select {
case <-ctx.Done():
svc.Logger.WithFields(logrus.Fields{
svc.Logger.WithError(ctx.Err()).WithFields(logrus.Fields{
"relay_url": requestData.RelayUrl,
"wallet_pubkey": requestData.WalletPubkey,
}).Error("Exiting info subscription without receiving event")
Expand All @@ -227,7 +227,7 @@ func (svc *Service) InfoHandler(c echo.Context) error {
svc.Logger.WithFields(logrus.Fields{
"relay_url": requestData.RelayUrl,
"wallet_pubkey": requestData.WalletPubkey,
"event_id": event.ID,
"response_event_id": event.ID,
}).Info("Received info event")
sub.Unsub()
return c.JSON(http.StatusOK, InfoResponse{
Expand Down Expand Up @@ -362,15 +362,16 @@ func (svc *Service) NIP47Handler(c echo.Context) error {
})
}

subscription := svc.prepareNIP47Subscription(requestData.RelayUrl, requestData.WalletPubkey, "", requestEvent)
subscription := svc.prepareNIP47Subscription(requestData.RelayUrl, requestData.WalletPubkey, "", &requestEvent)

ctx, cancel := context.WithTimeout(c.Request().Context(), 90*time.Second)
defer cancel()
go svc.startSubscription(ctx, &subscription, svc.publishRequestEvent, svc.handleResponseEvent)

select {
case <-ctx.Done():
svc.Logger.WithFields(logrus.Fields{
svc.Logger.WithError(ctx.Err()).WithFields(logrus.Fields{
"subscription_id": subscription.Uuid,
"request_event_id": requestData.SignedEvent.ID,
"client_pubkey": requestData.SignedEvent.PubKey,
"wallet_pubkey": requestData.WalletPubkey,
Expand Down Expand Up @@ -462,7 +463,7 @@ func (svc *Service) NIP47WebhookHandler(c echo.Context) error {
})
}

subscription := svc.prepareNIP47Subscription(requestData.RelayUrl, requestData.WalletPubkey, requestData.WebhookUrl, requestEvent)
subscription := svc.prepareNIP47Subscription(requestData.RelayUrl, requestData.WalletPubkey, requestData.WebhookUrl, &requestEvent)

ctx, cancel := context.WithTimeout(svc.Ctx, 90*time.Second)

Expand All @@ -472,14 +473,15 @@ func (svc *Service) NIP47WebhookHandler(c echo.Context) error {
defer cancel()
select {
case <-ctx.Done():
svc.Logger.WithFields(logrus.Fields{
svc.Logger.WithError(ctx.Err()).WithFields(logrus.Fields{
"subscription_id": subscription.Uuid,
"request_event_id": requestData.SignedEvent.ID,
"client_pubkey": requestData.SignedEvent.PubKey,
"wallet_pubkey": requestData.WalletPubkey,
"relay_url": requestData.RelayUrl,
}).Error("Stopped subscription without receiving event")
case event := <-subscription.EventChan:
svc.postEventToWebhook(event, subscription.WebhookUrl)
svc.postEventToWebhook(event, &subscription)
}
}()

Expand All @@ -488,19 +490,19 @@ func (svc *Service) NIP47WebhookHandler(c echo.Context) error {
})
}

func (svc *Service) prepareNIP47Subscription(relayUrl, walletPubkey, webhookUrl string, requestEvent RequestEvent) (Subscription) {
func (svc *Service) prepareNIP47Subscription(relayUrl, walletPubkey, webhookUrl string, requestEvent *RequestEvent) (Subscription) {
return Subscription{
RelayUrl: relayUrl,
WebhookUrl: webhookUrl,
Open: true,
Authors: &[]string{walletPubkey},
Kinds: &[]int{NIP_47_RESPONSE_KIND},
Tags: &nostr.TagMap{"e": []string{requestEvent.NostrId}},
Since: time.Now(),
Limit: 1,
RequestEvent: &requestEvent,
EventChan: make(chan *nostr.Event, 1),
Uuid: uuid.New().String(),
RelayUrl: relayUrl,
WebhookUrl: webhookUrl,
Open: true,
Authors: &[]string{walletPubkey},
Kinds: &[]int{NIP_47_RESPONSE_KIND},
Tags: &nostr.TagMap{"e": []string{requestEvent.NostrId}},
Since: time.Now(),
Limit: 1,
RequestEvent: requestEvent,
EventChan: make(chan *nostr.Event, 1),
Uuid: uuid.New().String(),
}
}

Expand Down Expand Up @@ -663,14 +665,14 @@ func (svc *Service) StopSubscriptionHandler(c echo.Context) error {
}

svc.Logger.WithFields(logrus.Fields{
"subscription_id": subscription.ID,
"subscription_id": subscription.Uuid,
}).Debug("Stopping subscription")

err := svc.stopSubscription(&subscription)
if err != nil {
svc.Logger.WithFields(logrus.Fields{
"subscription_id": subscription.ID,
}).Debug("Subscription is stopped already")
"subscription_id": subscription.Uuid,
}).Error("Subscription is stopped already")

return c.JSON(http.StatusAlreadyReported, StopSubscriptionResponse{
Message: "Subscription is already closed",
Expand All @@ -683,7 +685,7 @@ func (svc *Service) StopSubscriptionHandler(c echo.Context) error {
// delete svix app

svc.Logger.WithFields(logrus.Fields{
"subscription_id": subscription.ID,
"subscription_id": subscription.Uuid,
}).Info("Stopped subscription")

return c.JSON(http.StatusOK, StopSubscriptionResponse{
Expand Down Expand Up @@ -712,72 +714,62 @@ func (svc *Service) stopSubscription(subscription *Subscription) error {
}

func (svc *Service) startSubscription(ctx context.Context, subscription *Subscription, onReceiveEOS OnReceiveEOSFunc, handleEvent HandleEventFunc) {
requestEventId := ""
if subscription.RequestEvent != nil {
requestEventId = subscription.RequestEvent.NostrId
}
svc.Logger.WithFields(logrus.Fields{
"subscription_id": subscription.ID,
"relay_url": subscription.RelayUrl,
"request_event_id": requestEventId,
"subscription_id": subscription.Uuid,
"relay_url": subscription.RelayUrl,
}).Debug("Starting subscription")

filter := svc.subscriptionToFilter(subscription)

var relay *nostr.Relay
var isCustomRelay bool
var err error
waitToReconnectSeconds := 0

for {
// context expiration has no effect on relays
if relay != nil && relay.Connection != nil && isCustomRelay {
relay.Close()
}
if ctx.Err() != nil {
svc.Logger.WithFields(logrus.Fields{
"subscription_id": subscription.ID,
"relay_url": subscription.RelayUrl,
}).Debug("Context canceled, stopping subscription")
svc.Logger.WithError(ctx.Err()).WithFields(logrus.Fields{
"request_event_id": requestEventId,
"subscription_id": subscription.Uuid,
"relay_url": subscription.RelayUrl,
}).Error("Stopping subscription")
svc.stopSubscription(subscription)
return
}
time.Sleep(time.Duration(waitToReconnectSeconds) * time.Second)
relay, isCustomRelay, err = svc.getRelayConnection(ctx, subscription.RelayUrl)
if err != nil {
// TODO: notify user about relay failure
waitToReconnectSeconds = max(waitToReconnectSeconds, 1)
waitToReconnectSeconds = min(waitToReconnectSeconds * 2, 900)
svc.Logger.WithError(err).WithFields(logrus.Fields{
"subscription_id": subscription.ID,
"relay_url": subscription.RelayUrl,
}).Errorf("Failed to connect to relay, retrying in %vs...", waitToReconnectSeconds)
continue
}

relaySubscription, err := relay.Subscribe(ctx, []nostr.Filter{*filter})
if err != nil {
// TODO: notify user about subscription failure
waitToReconnectSeconds = max(waitToReconnectSeconds, 1)
waitToReconnectSeconds = min(waitToReconnectSeconds * 2, 900)
svc.Logger.WithError(err).WithFields(logrus.Fields{
"subscription_id": subscription.ID,
"relay_url": subscription.RelayUrl,
}).Errorf("Failed to subscribe to relay, retrying in %vs...", waitToReconnectSeconds)
continue
}

subscription.RelaySubscription = relaySubscription

svc.Logger.WithFields(logrus.Fields{
"subscription_id": subscription.ID,
"relay_url": subscription.RelayUrl,
"request_event_id": requestEventId,
"subscription_id": subscription.Uuid,
"relay_url": subscription.RelayUrl,
}).Debug("Started subscription")

waitToReconnectSeconds = 0

err = svc.processEvents(ctx, subscription, onReceiveEOS, handleEvent)

if err != nil {
// TODO: notify user about subscription failure
svc.Logger.WithError(err).WithFields(logrus.Fields{
"subscription_id": subscription.ID,
"relay_url": subscription.RelayUrl,
"request_event_id": requestEventId,
"subscription_id": subscription.Uuid,
"relay_url": subscription.RelayUrl,
}).Error("Subscription stopped due to relay error, reconnecting...")
continue
} else {
Expand All @@ -787,8 +779,9 @@ func (svc *Service) startSubscription(ctx context.Context, subscription *Subscri
// stop the subscription if it's an NIP47 request
if (subscription.RequestEvent != nil) {
svc.Logger.WithFields(logrus.Fields{
"subscription_id": subscription.ID,
"relay_url": subscription.RelayUrl,
"request_event_id": requestEventId,
"subscription_id": subscription.Uuid,
"relay_url": subscription.RelayUrl,
}).Debug("Stopping subscription")
svc.stopSubscription(subscription)
}
Expand All @@ -805,6 +798,7 @@ func (svc *Service) publishRequestEvent(ctx context.Context, subscription *Subsc
if err != nil {
// TODO: notify user about publish failure
svc.Logger.WithError(err).WithFields(logrus.Fields{
"subscription_id": subscription.Uuid,
"request_event_id": subscription.RequestEvent.NostrId,
"relay_url": subscription.RelayUrl,
"wallet_pubkey": walletPubkey,
Expand All @@ -814,6 +808,7 @@ func (svc *Service) publishRequestEvent(ctx context.Context, subscription *Subsc
relaySubscription.Unsub()
} else {
svc.Logger.WithFields(logrus.Fields{
"subscription_id": subscription.Uuid,
"request_event_id": subscription.RequestEvent.NostrId,
"relay_url": subscription.RelayUrl,
"wallet_pubkey": walletPubkey,
Expand Down Expand Up @@ -851,10 +846,10 @@ func (svc *Service) handleResponseEvent(event *nostr.Event, subscription *Subscr

func (svc *Service) handleSubscribedEvent(event *nostr.Event, subscription *Subscription) {
svc.Logger.WithFields(logrus.Fields{
"event_id": event.ID,
"event_kind": event.Kind,
"subscription_id": subscription.ID,
"relay_url": subscription.RelayUrl,
"response_event_id": event.ID,
"response_event_kind": event.Kind,
"subscription_id": subscription.Uuid,
"relay_url": subscription.RelayUrl,
}).Info("Received subscribed event")
responseEvent := ResponseEvent{
NostrId: event.ID,
Expand All @@ -863,7 +858,7 @@ func (svc *Service) handleSubscribedEvent(event *nostr.Event, subscription *Subs
SubscriptionId: &subscription.ID,
}
svc.db.Save(&responseEvent)
svc.postEventToWebhook(event, subscription.WebhookUrl)
svc.postEventToWebhook(event, subscription)
}

func (svc *Service) processEvents(ctx context.Context, subscription *Subscription, onReceiveEOS OnReceiveEOSFunc, handleEvent HandleEventFunc) error {
Expand All @@ -875,7 +870,7 @@ func (svc *Service) processEvents(ctx context.Context, subscription *Subscriptio
if (onReceiveEOS != nil && subscription.RequestEvent.State != REQUEST_EVENT_PUBLISH_CONFIRMED) {
<-relaySubscription.EndOfStoredEvents
svc.Logger.WithFields(logrus.Fields{
"subscription_id": subscription.ID,
"subscription_id": subscription.Uuid,
"relay_url": subscription.RelayUrl,
}).Debug("Received EOS")

Expand All @@ -888,7 +883,7 @@ func (svc *Service) processEvents(ctx context.Context, subscription *Subscriptio
}

svc.Logger.WithFields(logrus.Fields{
"subscription_id": subscription.ID,
"subscription_id": subscription.Uuid,
"relay_url": subscription.RelayUrl,
}).Debug("Relay subscription events channel ended")
}()
Expand All @@ -908,7 +903,7 @@ func (svc *Service) getRelayConnection(ctx context.Context, customRelayURL strin
svc.Logger.WithFields(logrus.Fields{
"custom_relay_url": customRelayURL,
}).Info("Connecting to custom relay")
relay, err := nostr.RelayConnect(ctx, customRelayURL)
relay, err := svc.relayConnectWithBackoff(ctx, customRelayURL)
return relay, true, err // true means custom and the relay should be closed
}
// use mutex otherwise the svc.Relay will be reconnected more than once
Expand All @@ -919,40 +914,80 @@ func (svc *Service) getRelayConnection(ctx context.Context, customRelayURL strin
return svc.Relay, false, nil
} else {
svc.Logger.Info("Lost connection to default relay, reconnecting...")
relay, err := nostr.RelayConnect(svc.Ctx, svc.Cfg.DefaultRelayURL)
relay, err := svc.relayConnectWithBackoff(svc.Ctx, svc.Cfg.DefaultRelayURL)
if err == nil {
svc.Relay = relay
}
return svc.Relay, false, err
}
}

func (svc *Service) postEventToWebhook(event *nostr.Event, webhookURL string) {
func (svc *Service) relayConnectWithBackoff(ctx context.Context, relayURL string) (relay *nostr.Relay, err error) {
waitToReconnectSeconds := 0
for {
select {
case <-ctx.Done():
svc.Logger.WithError(err).WithFields(logrus.Fields{
"relay_url": relayURL,
}).Errorf("Context canceled, exiting attempt to connect to relay")
return nil, ctx.Err()
default:
time.Sleep(time.Duration(waitToReconnectSeconds) * time.Second)
relay, err = nostr.RelayConnect(ctx, relayURL)
if err != nil {
// TODO: notify user about relay failure
waitToReconnectSeconds = max(waitToReconnectSeconds, 1)
waitToReconnectSeconds = min(waitToReconnectSeconds * 2, 900)
svc.Logger.WithError(err).WithFields(logrus.Fields{
"relay_url": relayURL,
}).Errorf("Failed to connect to relay, retrying in %vs...", waitToReconnectSeconds)
continue
}
svc.Logger.WithFields(logrus.Fields{
"relay_url": relayURL,
}).Info("Relay connection successful.")
return relay, nil
}
}
}

func (svc *Service) postEventToWebhook(event *nostr.Event, subscription *Subscription) {
eventData, err := json.Marshal(event)
requestEventId := ""
if subscription.RequestEvent != nil {
requestEventId = subscription.RequestEvent.NostrId
}

if err != nil {
svc.Logger.WithError(err).WithFields(logrus.Fields{
"event_id": event.ID,
"event_kind": event.Kind,
"webhook_url": webhookURL,
"subscription_id": subscription.Uuid,
"request_event_id": requestEventId,
"response_event_id": event.ID,
"response_event_kind": event.Kind,
"webhook_url": subscription.WebhookUrl,
}).Error("Failed to marshal event for webhook")
return
}

// TODO: add svix functionality
_, err = http.Post(webhookURL, "application/json", bytes.NewBuffer(eventData))
_, err = http.Post(subscription.WebhookUrl, "application/json", bytes.NewBuffer(eventData))
if err != nil {
svc.Logger.WithError(err).WithFields(logrus.Fields{
"event_id": event.ID,
"event_kind": event.Kind,
"webhook_url": webhookURL,
"subscription_id": subscription.Uuid,
"request_event_id": requestEventId,
"response_event_id": event.ID,
"response_event_kind": event.Kind,
"webhook_url": subscription.WebhookUrl,
}).Error("Failed to post event to webhook")
return
}

svc.Logger.WithFields(logrus.Fields{
"event_id": event.ID,
"event_kind": event.Kind,
"webhook_url": webhookURL,
"subscription_id": subscription.Uuid,
"request_event_id": requestEventId,
"response_event_id": event.ID,
"response_event_kind": event.Kind,
"webhook_url": subscription.WebhookUrl,
}).Info("Posted event to webhook")
}

Expand Down

0 comments on commit 49b5a30

Please sign in to comment.