diff --git a/pool.go b/pool.go index b1b6cf0..e6a9a6a 100644 --- a/pool.go +++ b/pool.go @@ -74,6 +74,7 @@ func (pool *SimplePool) subMany(ctx context.Context, urls []string, filters Filt events := make(chan IncomingEvent) seenAlready := xsync.NewMapOf[Timestamp]() ticker := time.NewTicker(seenAlreadyDropTick) + eose := false pending := xsync.NewCounter() initial := len(urls) @@ -98,17 +99,21 @@ func (pool *SimplePool) subMany(ctx context.Context, urls []string, filters Filt } select { + case <-sub.EndOfStoredEvents: + eose = true case <-ticker.C: - del := map[string]struct{}{} - old := Timestamp(time.Now().Add(-seenAlreadyDropTick).Unix()) - seenAlready.Range(func(key string, value Timestamp) bool { - if value < old { - del[evt.ID] = struct{}{} + if eose { + del := map[string]struct{}{} + old := Timestamp(time.Now().Add(-seenAlreadyDropTick).Unix()) + seenAlready.Range(func(key string, value Timestamp) bool { + if value < old { + del[evt.ID] = struct{}{} + } + return true + }) + for k := range del { + seenAlready.Delete(k) } - return true - }) - for k := range del { - seenAlready.Delete(k) } case events <- IncomingEvent{Event: evt, Relay: relay}: case <-ctx.Done():