From 2edc0fb713f0a772d607d1e2ed9eb90145f06f70 Mon Sep 17 00:00:00 2001 From: fiatjaf Date: Wed, 25 Sep 2024 22:38:31 -0300 Subject: [PATCH] sdk: optimize caching lists (so we don't fetch twice in a row). --- sdk/helpers.go | 42 +++++++++++++++++++++++++------------- sdk/list.go | 43 ++++++++++++++++++++++++++------------- sdk/outbox.go | 7 ++----- sdk/replaceable_loader.go | 10 +++------ sdk/utils.go | 39 +++++++++++------------------------ 5 files changed, 74 insertions(+), 67 deletions(-) diff --git a/sdk/helpers.go b/sdk/helpers.go index ecfcf50..e669a7e 100644 --- a/sdk/helpers.go +++ b/sdk/helpers.go @@ -1,21 +1,35 @@ package sdk -import ( - "strings" -) +import "time" -// IsVirtualRelay returns true if the given normalized relay URL shouldn't be considered for outbox-model calculations. -func IsVirtualRelay(url string) bool { - if len(url) < 6 { - // this is just invalid - return true - } +var serial = 0 + +func pickNext(list []string) string { + serial++ + return list[serial%len(list)] +} - if strings.HasPrefix(url, "wss://feeds.nostr.band") || - strings.HasPrefix(url, "wss://filter.nostr.wine") || - strings.HasPrefix(url, "wss://cache") { - return true +func doThisNotMoreThanOnceAnHour(key string) (doItNow bool) { + if _dtnmtoah == nil { + go func() { + _dtnmtoah = make(map[string]time.Time) + for { + time.Sleep(time.Minute * 10) + _dtnmtoahLock.Lock() + now := time.Now() + for k, v := range _dtnmtoah { + if v.Before(now) { + delete(_dtnmtoah, k) + } + } + _dtnmtoahLock.Unlock() + } + }() } - return false + _dtnmtoahLock.Lock() + defer _dtnmtoahLock.Unlock() + + _, exists := _dtnmtoah[key] + return !exists } diff --git a/sdk/list.go b/sdk/list.go index 5d8c74b..7e7976a 100644 --- a/sdk/list.go +++ b/sdk/list.go @@ -3,6 +3,7 @@ package sdk import ( "context" "slices" + "sync" "time" "github.com/nbd-wtf/go-nostr" @@ -20,6 +21,11 @@ type TagItemWithValue interface { Value() string } +var ( + genericListMutexes = [24]sync.Mutex{} + valueWasJustCached = [24]bool{} +) + func fetchGenericList[I TagItemWithValue]( sys *System, ctx context.Context, @@ -29,36 +35,45 @@ func fetchGenericList[I TagItemWithValue]( cache cache.Cache32[GenericList[I]], skipFetch bool, ) (fl GenericList[I], fromInternal bool) { - if cache != nil { - if v, ok := cache.Get(pubkey); ok { - return v, true - } + // we have 24 mutexes, so we can load up to 24 lists at the same time, but if we do the same exact + // call that will do it only once, the subsequent ones will wait for a result to be cached + // and then return it from cache -- 13 is an arbitrary index for the pubkey + lockIdx := (int(pubkey[13]) + kind) % 24 + genericListMutexes[lockIdx].Lock() + + if valueWasJustCached[lockIdx] { + // this ensures the cache has had time to commit the values + // so we don't repeat a fetch immediately after the other + valueWasJustCached[lockIdx] = false + time.Sleep(time.Millisecond * 10) } + defer genericListMutexes[lockIdx].Unlock() + + if v, ok := cache.Get(pubkey); ok { + return v, true + } + + v := GenericList[I]{PubKey: pubkey} + events, _ := sys.StoreRelay.QuerySync(ctx, nostr.Filter{Kinds: []int{kind}, Authors: []string{pubkey}}) if len(events) != 0 { items := parseItemsFromEventTags(events[0], parseTag) - v := GenericList[I]{ - PubKey: pubkey, - Event: events[0], - Items: items, - } - cache.SetWithTTL(pubkey, v, time.Hour*6) + v.Event = events[0] + v.Items = items + valueWasJustCached[lockIdx] = true return v, true } - v := GenericList[I]{PubKey: pubkey} if !skipFetch { thunk := sys.replaceableLoaders[kind].Load(ctx, pubkey) evt, err := thunk() if err == nil { items := parseItemsFromEventTags(evt, parseTag) v.Items = items - if cache != nil { - cache.SetWithTTL(pubkey, v, time.Hour*6) - } sys.StoreRelay.Publish(ctx, *evt) } + valueWasJustCached[lockIdx] = true } return v, false diff --git a/sdk/outbox.go b/sdk/outbox.go index 4fb9299..49cabb9 100644 --- a/sdk/outbox.go +++ b/sdk/outbox.go @@ -17,13 +17,10 @@ func (sys *System) FetchOutboxRelays(ctx context.Context, pubkey string, n int) return relays } - if rl, ok := sys.RelayListCache.Get(pubkey); !ok || (rl.Event != nil && rl.Event.CreatedAt < nostr.Now()-60*60*24*7) { - // try to fetch relays list again if we don't have one or if ours is a week old - fetchGenericList(sys, ctx, pubkey, 10002, parseRelayFromKind10002, sys.RelayListCache, false) - } + // if we have it cached that means we have at least tried to fetch recently and it won't be tried again + fetchGenericList(sys, ctx, pubkey, 10002, parseRelayFromKind10002, sys.RelayListCache, false) relays := sys.Hints.TopN(pubkey, 6) - if len(relays) == 0 { return []string{"wss://relay.damus.io", "wss://nos.lol"} } diff --git a/sdk/replaceable_loader.go b/sdk/replaceable_loader.go index 2ab70c3..be11275 100644 --- a/sdk/replaceable_loader.go +++ b/sdk/replaceable_loader.go @@ -22,11 +22,8 @@ func (sys *System) initializeDataloaders() { func (sys *System) createReplaceableDataloader(kind int) *dataloader.Loader[string, *nostr.Event] { return dataloader.NewBatchedLoader( - func( - ctx context.Context, - pubkeys []string, - ) []*dataloader.Result[*nostr.Event] { - return sys.batchLoadReplaceableEvents(ctx, kind, pubkeys) + func(_ context.Context, pubkeys []string) []*dataloader.Result[*nostr.Event] { + return sys.batchLoadReplaceableEvents(kind, pubkeys) }, dataloader.WithBatchCapacity[string, *nostr.Event](60), dataloader.WithClearCacheOnBatch[string, *nostr.Event](), @@ -35,7 +32,6 @@ func (sys *System) createReplaceableDataloader(kind int) *dataloader.Loader[stri } func (sys *System) batchLoadReplaceableEvents( - ctx context.Context, kind int, pubkeys []string, ) []*dataloader.Result[*nostr.Event] { @@ -67,7 +63,7 @@ func (sys *System) batchLoadReplaceableEvents( } // save attempts here so we don't try the same failed query over and over - if doItNow := DoThisNotMoreThanOnceAnHour("repl:" + strconv.Itoa(kind) + pubkey); !doItNow { + if doItNow := doThisNotMoreThanOnceAnHour("repl:" + strconv.Itoa(kind) + pubkey); !doItNow { results[i] = &dataloader.Result[*nostr.Event]{ Error: fmt.Errorf("last attempt failed, waiting more to try again"), } diff --git a/sdk/utils.go b/sdk/utils.go index 65f74a1..6956195 100644 --- a/sdk/utils.go +++ b/sdk/utils.go @@ -1,6 +1,7 @@ package sdk import ( + "strings" "sync" "time" ) @@ -10,34 +11,18 @@ var ( _dtnmtoahLock sync.Mutex ) -func DoThisNotMoreThanOnceAnHour(key string) (doItNow bool) { - if _dtnmtoah == nil { - go func() { - _dtnmtoah = make(map[string]time.Time) - for { - time.Sleep(time.Minute * 10) - _dtnmtoahLock.Lock() - now := time.Now() - for k, v := range _dtnmtoah { - if v.Before(now) { - delete(_dtnmtoah, k) - } - } - _dtnmtoahLock.Unlock() - } - }() +// IsVirtualRelay returns true if the given normalized relay URL shouldn't be considered for outbox-model calculations. +func IsVirtualRelay(url string) bool { + if len(url) < 6 { + // this is just invalid + return true } - _dtnmtoahLock.Lock() - defer _dtnmtoahLock.Unlock() - - _, exists := _dtnmtoah[key] - return !exists -} - -var serial = 0 + if strings.HasPrefix(url, "wss://feeds.nostr.band") || + strings.HasPrefix(url, "wss://filter.nostr.wine") || + strings.HasPrefix(url, "wss://cache") { + return true + } -func pickNext(list []string) string { - serial++ - return list[serial%len(list)] + return false }