Skip to content

Commit

Permalink
return IncomingEvent struct from SimplePool methods containing the re…
Browse files Browse the repository at this point in the history
…lay.
  • Loading branch information
fiatjaf committed Sep 30, 2023
1 parent 18cee74 commit 978d782
Showing 1 changed file with 14 additions and 9 deletions.
23 changes: 14 additions & 9 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ type SimplePool struct {
cancel context.CancelFunc
}

type IncomingEvent struct {
*Event
Relay *Relay
}

func NewSimplePool(ctx context.Context) *SimplePool {
ctx, cancel := context.WithCancel(ctx)

Expand Down Expand Up @@ -52,8 +57,8 @@ func (pool *SimplePool) EnsureRelay(url string) (*Relay, error) {

// SubMany opens a subscription with the given filters to multiple relays
// the subscriptions only end when the context is canceled
func (pool *SimplePool) SubMany(ctx context.Context, urls []string, filters Filters) chan *Event {
uniqueEvents := make(chan *Event)
func (pool *SimplePool) SubMany(ctx context.Context, urls []string, filters Filters) chan IncomingEvent {
uniqueEvents := make(chan IncomingEvent)
seenAlready := xsync.NewMapOf[bool]()

pending := xsync.NewCounter()
Expand All @@ -74,7 +79,7 @@ func (pool *SimplePool) SubMany(ctx context.Context, urls []string, filters Filt
for evt := range sub.Events {
// dispatch unique events to client
if _, ok := seenAlready.LoadOrStore(evt.ID, true); !ok {
uniqueEvents <- evt
uniqueEvents <- IncomingEvent{Event: evt, Relay: relay}
}
}

Expand All @@ -89,10 +94,10 @@ func (pool *SimplePool) SubMany(ctx context.Context, urls []string, filters Filt
}

// SubManyEose is like SubMany, but it stops subscriptions and closes the channel when gets a EOSE
func (pool *SimplePool) SubManyEose(ctx context.Context, urls []string, filters Filters) chan *Event {
func (pool *SimplePool) SubManyEose(ctx context.Context, urls []string, filters Filters) chan IncomingEvent {
ctx, cancel := context.WithCancel(ctx)

uniqueEvents := make(chan *Event)
uniqueEvents := make(chan IncomingEvent)
seenAlready := xsync.NewMapOf[bool]()
wg := sync.WaitGroup{}
wg.Add(len(urls))
Expand Down Expand Up @@ -133,7 +138,7 @@ func (pool *SimplePool) SubManyEose(ctx context.Context, urls []string, filters
// dispatch unique events to client
if _, ok := seenAlready.LoadOrStore(evt.ID, true); !ok {
select {
case uniqueEvents <- evt:
case uniqueEvents <- IncomingEvent{Event: evt, Relay: relay}:
case <-ctx.Done():
return
}
Expand All @@ -147,11 +152,11 @@ func (pool *SimplePool) SubManyEose(ctx context.Context, urls []string, filters
}

// QuerySingle returns the first event returned by the first relay, cancels everything else.
func (pool *SimplePool) QuerySingle(ctx context.Context, urls []string, filter Filter) *Event {
func (pool *SimplePool) QuerySingle(ctx context.Context, urls []string, filter Filter) *IncomingEvent {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
for evt := range pool.SubManyEose(ctx, urls, Filters{filter}) {
return evt
for ievt := range pool.SubManyEose(ctx, urls, Filters{filter}) {
return &ievt
}
return nil
}

0 comments on commit 978d782

Please sign in to comment.