Skip to content

Commit

Permalink
pool.SubMany(Eose)NonUnique()
Browse files Browse the repository at this point in the history
  • Loading branch information
fiatjaf committed Oct 2, 2023
1 parent c68e876 commit e45921c
Showing 1 changed file with 41 additions and 13 deletions.
54 changes: 41 additions & 13 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,17 @@ func (pool *SimplePool) EnsureRelay(url string) (*Relay, error) {

// SubMany opens a subscription with the given filters to multiple relays
// the subscriptions only end when the context is canceled
func (pool *SimplePool) SubMany(ctx context.Context, urls []string, filters Filters) chan IncomingEvent {
uniqueEvents := make(chan IncomingEvent)
func (pool *SimplePool) SubMany(ctx context.Context, urls []string, filters Filters, unique bool) chan IncomingEvent {
return pool.subMany(ctx, urls, filters, true)
}

// SubManyNonUnique is like SubMany, but returns duplicate events if they come from different relays
func (pool *SimplePool) SubManyNonUnique(ctx context.Context, urls []string, filters Filters, unique bool) chan IncomingEvent {
return pool.subMany(ctx, urls, filters, false)
}

func (pool *SimplePool) subMany(ctx context.Context, urls []string, filters Filters, unique bool) chan IncomingEvent {
events := make(chan IncomingEvent)
seenAlready := xsync.NewMapOf[bool]()

pending := xsync.NewCounter()
Expand All @@ -77,27 +86,43 @@ func (pool *SimplePool) SubMany(ctx context.Context, urls []string, filters Filt
}

for evt := range sub.Events {
// dispatch unique events to client
if _, ok := seenAlready.LoadOrStore(evt.ID, true); !ok {
uniqueEvents <- IncomingEvent{Event: evt, Relay: relay}
stop := true
if unique {
_, stop = seenAlready.LoadOrStore(evt.ID, true)
}
if !stop {
select {
case events <- IncomingEvent{Event: evt, Relay: relay}:
case <-ctx.Done():
return
}
}
}

pending.Dec()
if pending.Value() == 0 {
close(uniqueEvents)
close(events)
}
}(NormalizeURL(url))
}

return uniqueEvents
return events
}

// SubManyEose is like SubMany, but it stops subscriptions and closes the channel when gets a EOSE
func (pool *SimplePool) SubManyEose(ctx context.Context, urls []string, filters Filters) chan IncomingEvent {
return pool.subManyEose(ctx, urls, filters, true)
}

// SubManyEoseNonUnique is like SubManyEose, but returns duplicate events if they come from different relays
func (pool *SimplePool) SubManyEoseNonUnique(ctx context.Context, urls []string, filters Filters) chan IncomingEvent {
return pool.subManyEose(ctx, urls, filters, false)
}

func (pool *SimplePool) subManyEose(ctx context.Context, urls []string, filters Filters, unique bool) chan IncomingEvent {
ctx, cancel := context.WithCancel(ctx)

uniqueEvents := make(chan IncomingEvent)
events := make(chan IncomingEvent)
seenAlready := xsync.NewMapOf[bool]()
wg := sync.WaitGroup{}
wg.Add(len(urls))
Expand All @@ -106,7 +131,7 @@ func (pool *SimplePool) SubManyEose(ctx context.Context, urls []string, filters
// this will happen when all subscriptions get an eose (or when they die)
wg.Wait()
cancel()
close(uniqueEvents)
close(events)
}()

for _, url := range urls {
Expand Down Expand Up @@ -135,10 +160,13 @@ func (pool *SimplePool) SubManyEose(ctx context.Context, urls []string, filters
return
}

// dispatch unique events to client
if _, ok := seenAlready.LoadOrStore(evt.ID, true); !ok {
stop := true
if unique {
_, stop = seenAlready.LoadOrStore(evt.ID, true)
}
if !stop {
select {
case uniqueEvents <- IncomingEvent{Event: evt, Relay: relay}:
case events <- IncomingEvent{Event: evt, Relay: relay}:
case <-ctx.Done():
return
}
Expand All @@ -148,7 +176,7 @@ func (pool *SimplePool) SubManyEose(ctx context.Context, urls []string, filters
}(NormalizeURL(url))
}

return uniqueEvents
return events
}

// QuerySingle returns the first event returned by the first relay, cancels everything else.
Expand Down

0 comments on commit e45921c

Please sign in to comment.