From 076e8a1cb06d14aee35eb55cb137836d9b3f5c90 Mon Sep 17 00:00:00 2001 From: fiatjaf Date: Thu, 12 Sep 2024 17:59:44 -0300 Subject: [PATCH] nip77 nostr-negentropy syncing basics. --- envelopes.go | 12 +- nip77/envelopes.go | 400 +++++++----------------------- nip77/negentropy/negentropy.go | 14 +- nip77/negentropy/whatever_test.go | 4 +- nip77/nip77.go | 93 ++++++- 5 files changed, 199 insertions(+), 324 deletions(-) diff --git a/envelopes.go b/envelopes.go index 7bcacac..8b2d442 100644 --- a/envelopes.go +++ b/envelopes.go @@ -58,11 +58,6 @@ type Envelope interface { String() string } -type EventEnvelope struct { - SubscriptionID *string - Event -} - var ( _ Envelope = (*EventEnvelope)(nil) _ Envelope = (*ReqEnvelope)(nil) @@ -74,6 +69,11 @@ var ( _ Envelope = (*AuthEnvelope)(nil) ) +type EventEnvelope struct { + SubscriptionID *string + Event +} + func (_ EventEnvelope) Label() string { return "EVENT" } func (v *EventEnvelope) UnmarshalJSON(data []byte) error { @@ -96,7 +96,7 @@ func (v EventEnvelope) MarshalJSON() ([]byte, error) { if v.SubscriptionID != nil { w.RawString(`"` + *v.SubscriptionID + `",`) } - v.MarshalEasyJSON(&w) + v.Event.MarshalEasyJSON(&w) w.RawString(`]`) return w.BuildBytes() } diff --git a/nip77/envelopes.go b/nip77/envelopes.go index f8d69e5..4f48c29 100644 --- a/nip77/envelopes.go +++ b/nip77/envelopes.go @@ -2,45 +2,31 @@ package nip77 import ( "bytes" - "encoding/json" "fmt" - "strconv" "github.com/mailru/easyjson" jwriter "github.com/mailru/easyjson/jwriter" + "github.com/nbd-wtf/go-nostr" "github.com/tidwall/gjson" ) -func ParseNegMessage(message []byte) Envelope { +func ParseNegMessage(message []byte) nostr.Envelope { firstComma := bytes.Index(message, []byte{','}) if firstComma == -1 { return nil } label := message[0:firstComma] - var v Envelope + var v nostr.Envelope switch { - case bytes.Contains(label, []byte("EVENT")): - v = &EventEnvelope{} - case bytes.Contains(label, []byte("REQ")): - v = &ReqEnvelope{} - case bytes.Contains(label, []byte("COUNT")): - v = &CountEnvelope{} - case bytes.Contains(label, []byte("NOTICE")): - x := NoticeEnvelope("") - v = &x - case bytes.Contains(label, []byte("EOSE")): - x := EOSEEnvelope("") - v = &x - case bytes.Contains(label, []byte("OK")): - v = &OKEnvelope{} - case bytes.Contains(label, []byte("AUTH")): - v = &AuthEnvelope{} - case bytes.Contains(label, []byte("CLOSED")): - v = &ClosedEnvelope{} - case bytes.Contains(label, []byte("CLOSE")): - x := CloseEnvelope("") - v = &x + case bytes.Contains(label, []byte("NEG-MSG")): + v = &MessageEnvelope{} + case bytes.Contains(label, []byte("NEG-OPEN")): + v = &OpenEnvelope{} + case bytes.Contains(label, []byte("NEG-ERR")): + v = &ErrorEnvelope{} + case bytes.Contains(label, []byte("NEG-CLOSE")): + v = &CloseEnvelope{} default: return nil } @@ -51,345 +37,145 @@ func ParseNegMessage(message []byte) Envelope { return v } -type Envelope interface { - Label() string - UnmarshalJSON([]byte) error - MarshalJSON() ([]byte, error) - String() string -} - -type EventEnvelope struct { - SubscriptionID *string - Event -} - var ( - _ Envelope = (*EventEnvelope)(nil) - _ Envelope = (*ReqEnvelope)(nil) - _ Envelope = (*CountEnvelope)(nil) - _ Envelope = (*NoticeEnvelope)(nil) - _ Envelope = (*EOSEEnvelope)(nil) - _ Envelope = (*CloseEnvelope)(nil) - _ Envelope = (*OKEnvelope)(nil) - _ Envelope = (*AuthEnvelope)(nil) + _ nostr.Envelope = (*OpenEnvelope)(nil) + _ nostr.Envelope = (*MessageEnvelope)(nil) + _ nostr.Envelope = (*CloseEnvelope)(nil) + _ nostr.Envelope = (*ErrorEnvelope)(nil) ) -func (_ EventEnvelope) Label() string { return "EVENT" } - -func (v *EventEnvelope) UnmarshalJSON(data []byte) error { - r := gjson.ParseBytes(data) - arr := r.Array() - switch len(arr) { - case 2: - return easyjson.Unmarshal([]byte(arr[1].Raw), &v.Event) - case 3: - v.SubscriptionID = &arr[1].Str - return easyjson.Unmarshal([]byte(arr[2].Raw), &v.Event) - default: - return fmt.Errorf("failed to decode EVENT envelope") - } -} - -func (v EventEnvelope) MarshalJSON() ([]byte, error) { - w := jwriter.Writer{} - w.RawString(`["EVENT",`) - if v.SubscriptionID != nil { - w.RawString(`"` + *v.SubscriptionID + `",`) - } - v.MarshalEasyJSON(&w) - w.RawString(`]`) - return w.BuildBytes() -} - -type ReqEnvelope struct { +type OpenEnvelope struct { SubscriptionID string - Filters + Filter nostr.Filter + Message string } -func (_ ReqEnvelope) Label() string { return "REQ" } +func (_ OpenEnvelope) Label() string { return "NEG-OPEN" } +func (v OpenEnvelope) String() string { + b, _ := v.MarshalJSON() + return string(b) +} -func (v *ReqEnvelope) UnmarshalJSON(data []byte) error { +func (v *OpenEnvelope) UnmarshalJSON(data []byte) error { r := gjson.ParseBytes(data) arr := r.Array() - if len(arr) < 3 { - return fmt.Errorf("failed to decode REQ envelope: missing filters") - } - v.SubscriptionID = arr[1].Str - v.Filters = make(Filters, len(arr)-2) - f := 0 - for i := 2; i < len(arr); i++ { - if err := easyjson.Unmarshal([]byte(arr[i].Raw), &v.Filters[f]); err != nil { - return fmt.Errorf("%w -- on filter %d", err, f) - } - f++ + if len(arr) != 4 { + return fmt.Errorf("failed to decode NEG-OPEN envelope") } - return nil + v.SubscriptionID = arr[1].Str + v.Message = arr[3].Str + return easyjson.Unmarshal([]byte(arr[2].Raw), &v.Filter) } -func (v ReqEnvelope) MarshalJSON() ([]byte, error) { +func (v OpenEnvelope) MarshalJSON() ([]byte, error) { + res := bytes.NewBuffer(make([]byte, 0, 17+len(v.SubscriptionID)+len(v.Message)+500)) + + res.WriteString(`["NEG-OPEN","`) + res.WriteString(v.SubscriptionID) + res.WriteString(`",`) + w := jwriter.Writer{} - w.RawString(`["REQ",`) - w.RawString(`"` + v.SubscriptionID + `"`) - for _, filter := range v.Filters { - w.RawString(`,`) - filter.MarshalEasyJSON(&w) - } - w.RawString(`]`) - return w.BuildBytes() + v.Filter.MarshalEasyJSON(&w) + w.Buffer.DumpTo(res) + + res.WriteString(`,"`) + res.WriteString(v.Message) + res.WriteString(`"]`) + + return res.Bytes(), nil } -type CountEnvelope struct { +type MessageEnvelope struct { SubscriptionID string - Filters - Count *int64 + Message string } -func (_ CountEnvelope) Label() string { return "COUNT" } -func (c CountEnvelope) String() string { - v, _ := json.Marshal(c) - return string(v) +func (_ MessageEnvelope) Label() string { return "NEG-MSG" } +func (v MessageEnvelope) String() string { + b, _ := v.MarshalJSON() + return string(b) } -func (v *CountEnvelope) UnmarshalJSON(data []byte) error { +func (v *MessageEnvelope) UnmarshalJSON(data []byte) error { r := gjson.ParseBytes(data) arr := r.Array() if len(arr) < 3 { - return fmt.Errorf("failed to decode COUNT envelope: missing filters") + return fmt.Errorf("failed to decode NEG-MSG envelope") } v.SubscriptionID = arr[1].Str - - if len(arr) < 3 { - return fmt.Errorf("COUNT array must have at least 3 items") - } - - var countResult struct { - Count *int64 `json:"count"` - } - if err := json.Unmarshal([]byte(arr[2].Raw), &countResult); err == nil && countResult.Count != nil { - v.Count = countResult.Count - return nil - } - - v.Filters = make(Filters, len(arr)-2) - f := 0 - for i := 2; i < len(arr); i++ { - item := []byte(arr[i].Raw) - - if err := easyjson.Unmarshal(item, &v.Filters[f]); err != nil { - return fmt.Errorf("%w -- on filter %d", err, f) - } - - f++ - } - + v.Message = arr[2].Str return nil } -func (v CountEnvelope) MarshalJSON() ([]byte, error) { - w := jwriter.Writer{} - w.RawString(`["COUNT",`) - w.RawString(`"` + v.SubscriptionID + `"`) - if v.Count != nil { - w.RawString(`,{"count":`) - w.RawString(strconv.FormatInt(*v.Count, 10)) - w.RawString(`}`) - } else { - for _, filter := range v.Filters { - w.RawString(`,`) - filter.MarshalEasyJSON(&w) - } - } - w.RawString(`]`) - return w.BuildBytes() -} - -type NoticeEnvelope string +func (v MessageEnvelope) MarshalJSON() ([]byte, error) { + res := bytes.NewBuffer(make([]byte, 0, 17+len(v.SubscriptionID)+len(v.Message))) -func (_ NoticeEnvelope) Label() string { return "NOTICE" } -func (n NoticeEnvelope) String() string { - v, _ := json.Marshal(n) - return string(v) -} + res.WriteString(`["NEG-MSG","`) + res.WriteString(v.SubscriptionID) + res.WriteString(`","`) + res.WriteString(v.Message) + res.WriteString(`"]`) -func (v *NoticeEnvelope) UnmarshalJSON(data []byte) error { - r := gjson.ParseBytes(data) - arr := r.Array() - if len(arr) < 2 { - return fmt.Errorf("failed to decode NOTICE envelope") - } - *v = NoticeEnvelope(arr[1].Str) - return nil + return res.Bytes(), nil } -func (v NoticeEnvelope) MarshalJSON() ([]byte, error) { - w := jwriter.Writer{} - w.RawString(`["NOTICE",`) - w.Raw(json.Marshal(string(v))) - w.RawString(`]`) - return w.BuildBytes() +type CloseEnvelope struct { + SubscriptionID string } -type EOSEEnvelope string - -func (_ EOSEEnvelope) Label() string { return "EOSE" } -func (e EOSEEnvelope) String() string { - v, _ := json.Marshal(e) - return string(v) +func (_ CloseEnvelope) Label() string { return "NEG-CLOSE" } +func (v CloseEnvelope) String() string { + b, _ := v.MarshalJSON() + return string(b) } -func (v *EOSEEnvelope) UnmarshalJSON(data []byte) error { +func (v *CloseEnvelope) UnmarshalJSON(data []byte) error { r := gjson.ParseBytes(data) arr := r.Array() if len(arr) < 2 { - return fmt.Errorf("failed to decode EOSE envelope") + return fmt.Errorf("failed to decode NEG-CLOSE envelope") } - *v = EOSEEnvelope(arr[1].Str) + v.SubscriptionID = arr[1].Str return nil } -func (v EOSEEnvelope) MarshalJSON() ([]byte, error) { - w := jwriter.Writer{} - w.RawString(`["EOSE",`) - w.Raw(json.Marshal(string(v))) - w.RawString(`]`) - return w.BuildBytes() -} - -type CloseEnvelope string - -func (_ CloseEnvelope) Label() string { return "CLOSE" } -func (c CloseEnvelope) String() string { - v, _ := json.Marshal(c) - return string(v) -} - -func (v *CloseEnvelope) UnmarshalJSON(data []byte) error { - r := gjson.ParseBytes(data) - arr := r.Array() - switch len(arr) { - case 2: - *v = CloseEnvelope(arr[1].Str) - return nil - default: - return fmt.Errorf("failed to decode CLOSE envelope") - } -} - func (v CloseEnvelope) MarshalJSON() ([]byte, error) { - w := jwriter.Writer{} - w.RawString(`["CLOSE",`) - w.Raw(json.Marshal(string(v))) - w.RawString(`]`) - return w.BuildBytes() + res := bytes.NewBuffer(make([]byte, 0, 14+len(v.SubscriptionID))) + res.WriteString(`["NEG-CLOSE","`) + res.WriteString(v.SubscriptionID) + res.WriteString(`"]`) + return res.Bytes(), nil } -type ClosedEnvelope struct { +type ErrorEnvelope struct { SubscriptionID string Reason string } -func (_ ClosedEnvelope) Label() string { return "CLOSED" } -func (c ClosedEnvelope) String() string { - v, _ := json.Marshal(c) - return string(v) +func (_ ErrorEnvelope) Label() string { return "NEG-ERROR" } +func (v ErrorEnvelope) String() string { + b, _ := v.MarshalJSON() + return string(b) } -func (v *ClosedEnvelope) UnmarshalJSON(data []byte) error { +func (v *ErrorEnvelope) UnmarshalJSON(data []byte) error { r := gjson.ParseBytes(data) arr := r.Array() - switch len(arr) { - case 3: - *v = ClosedEnvelope{arr[1].Str, arr[2].Str} - return nil - default: - return fmt.Errorf("failed to decode CLOSED envelope") - } -} - -func (v ClosedEnvelope) MarshalJSON() ([]byte, error) { - w := jwriter.Writer{} - w.RawString(`["CLOSED",`) - w.Raw(json.Marshal(string(v.SubscriptionID))) - w.RawString(`,`) - w.Raw(json.Marshal(v.Reason)) - w.RawString(`]`) - return w.BuildBytes() -} - -type OKEnvelope struct { - EventID string - OK bool - Reason string -} - -func (_ OKEnvelope) Label() string { return "OK" } -func (o OKEnvelope) String() string { - v, _ := json.Marshal(o) - return string(v) -} - -func (v *OKEnvelope) UnmarshalJSON(data []byte) error { - r := gjson.ParseBytes(data) - arr := r.Array() - if len(arr) < 4 { - return fmt.Errorf("failed to decode OK envelope: missing fields") - } - v.EventID = arr[1].Str - v.OK = arr[2].Raw == "true" - v.Reason = arr[3].Str - - return nil -} - -func (v OKEnvelope) MarshalJSON() ([]byte, error) { - w := jwriter.Writer{} - w.RawString(`["OK",`) - w.RawString(`"` + v.EventID + `",`) - ok := "false" - if v.OK { - ok = "true" - } - w.RawString(ok) - w.RawString(`,`) - w.Raw(json.Marshal(v.Reason)) - w.RawString(`]`) - return w.BuildBytes() -} - -type AuthEnvelope struct { - Challenge *string - Event Event -} - -func (_ AuthEnvelope) Label() string { return "AUTH" } -func (a AuthEnvelope) String() string { - v, _ := json.Marshal(a) - return string(v) -} - -func (v *AuthEnvelope) UnmarshalJSON(data []byte) error { - r := gjson.ParseBytes(data) - arr := r.Array() - if len(arr) < 2 { - return fmt.Errorf("failed to decode Auth envelope: missing fields") - } - if arr[1].IsObject() { - return easyjson.Unmarshal([]byte(arr[1].Raw), &v.Event) - } else { - v.Challenge = &arr[1].Str + if len(arr) < 3 { + return fmt.Errorf("failed to decode NEG-ERROR envelope") } + v.SubscriptionID = arr[1].Str + v.Reason = arr[2].Str return nil } -func (v AuthEnvelope) MarshalJSON() ([]byte, error) { - w := jwriter.Writer{} - w.RawString(`["AUTH",`) - if v.Challenge != nil { - w.Raw(json.Marshal(*v.Challenge)) - } else { - v.Event.MarshalEasyJSON(&w) - } - w.RawString(`]`) - return w.BuildBytes() +func (v ErrorEnvelope) MarshalJSON() ([]byte, error) { + res := bytes.NewBuffer(make([]byte, 0, 19+len(v.SubscriptionID)+len(v.Reason))) + res.WriteString(`["NEG-ERROR","`) + res.WriteString(v.SubscriptionID) + res.WriteString(`","`) + res.WriteString(v.Reason) + res.WriteString(`"]`) + return res.Bytes(), nil } diff --git a/nip77/negentropy/negentropy.go b/nip77/negentropy/negentropy.go index 0a66f4f..e382d3f 100644 --- a/nip77/negentropy/negentropy.go +++ b/nip77/negentropy/negentropy.go @@ -60,16 +60,16 @@ func (n *Negentropy) Initiate() []byte { output := bytes.NewBuffer(make([]byte, 0, 1+n.storage.Size()*32)) output.WriteByte(protocolVersion) - n.SplitRange(0, 0, n.storage.Size(), infiniteBound, output) + n.SplitRange(0, n.storage.Size(), infiniteBound, output) return output.Bytes() } -func (n *Negentropy) Reconcile(step int, query []byte) (output []byte, err error) { +func (n *Negentropy) Reconcile(msg []byte) (output []byte, err error) { n.seal() - reader := bytes.NewReader(query) + reader := bytes.NewReader(msg) - output, err = n.reconcileAux(step, reader) + output, err = n.reconcileAux(reader) if err != nil { return nil, err } @@ -83,7 +83,7 @@ func (n *Negentropy) Reconcile(step int, query []byte) (output []byte, err error return output, nil } -func (n *Negentropy) reconcileAux(step int, reader *bytes.Reader) ([]byte, error) { +func (n *Negentropy) reconcileAux(reader *bytes.Reader) ([]byte, error) { n.lastTimestampIn, n.lastTimestampOut = 0, 0 // reset for each message fullOutput := bytes.NewBuffer(make([]byte, 0, 5000)) @@ -153,7 +153,7 @@ func (n *Negentropy) reconcileAux(step int, reader *bytes.Reader) ([]byte, error skip = true } else { doSkip() - n.SplitRange(step, lower, upper, currBound, partialOutput) + n.SplitRange(lower, upper, currBound, partialOutput) } case IdListMode: @@ -248,7 +248,7 @@ func (n *Negentropy) reconcileAux(step int, reader *bytes.Reader) ([]byte, error return fullOutput.Bytes(), nil } -func (n *Negentropy) SplitRange(step int, lower, upper int, upperBound Bound, output *bytes.Buffer) { +func (n *Negentropy) SplitRange(lower, upper int, upperBound Bound, output *bytes.Buffer) { numElems := upper - lower const buckets = 16 diff --git a/nip77/negentropy/whatever_test.go b/nip77/negentropy/whatever_test.go index 4f4bb89..0a6efb7 100644 --- a/nip77/negentropy/whatever_test.go +++ b/nip77/negentropy/whatever_test.go @@ -93,7 +93,7 @@ func runTestWith(t *testing.T, } } - q, err = n2.Reconcile(1, q) + q, err = n2.Reconcile(q) if err != nil { t.Fatal(err) return @@ -114,7 +114,7 @@ func runTestWith(t *testing.T, for n := n1; q != nil; n = invert[n] { i++ - q, err = n.Reconcile(i, q) + q, err = n.Reconcile(q) if err != nil { t.Fatal(err) return diff --git a/nip77/nip77.go b/nip77/nip77.go index dcdd363..acbacf8 100644 --- a/nip77/nip77.go +++ b/nip77/nip77.go @@ -1,6 +1,95 @@ package nip77 -import "github.com/nbd-wtf/go-nostr" +import ( + "context" + "encoding/hex" + "fmt" -func NegentropySync(r *nostr.Relay) { + "github.com/nbd-wtf/go-nostr" + "github.com/nbd-wtf/go-nostr/nip77/negentropy" +) + +func NegentropySync(ctx context.Context, store nostr.RelayStore, url string, filter nostr.Filter) error { + id := "go-nostr-tmp" // for now we can't have more than one subscription in the same connection + + data, err := store.QuerySync(ctx, filter) + if err != nil { + return fmt.Errorf("failed to query our local store: %w", err) + } + + neg := negentropy.NewNegentropy(negentropy.NewVector(), 1024*1024) + for _, evt := range data { + neg.Insert(evt) + } + + result := make(chan error) + + var r *nostr.Relay + r, err = nostr.RelayConnect(ctx, url, nostr.WithCustomHandler(func(data []byte) { + envelope := ParseNegMessage(data) + if envelope == nil { + return + } + switch env := envelope.(type) { + case *OpenEnvelope, *CloseEnvelope: + result <- fmt.Errorf("unexpected %s received from relay", env.Label()) + return + case *ErrorEnvelope: + result <- fmt.Errorf("relay returned a %s: %s", env.Label(), env.Reason) + return + case *MessageEnvelope: + msg, err := hex.DecodeString(env.Message) + if err != nil { + result <- fmt.Errorf("relay sent invalid message: %w", err) + return + } + + nextmsg, err := neg.Reconcile(msg) + if err != nil { + result <- fmt.Errorf("failed to reconcile: %w", err) + return + } + + msgb, _ := MessageEnvelope{id, hex.EncodeToString(nextmsg)}.MarshalJSON() + r.Write(msgb) + } + })) + if err != nil { + return err + } + + msg := neg.Initiate() + open, _ := OpenEnvelope{id, filter, hex.EncodeToString(msg)}.MarshalJSON() + err = <-r.Write(open) + if err != nil { + return fmt.Errorf("failed to write to relay: %w", err) + } + + defer func() { + clse, _ := CloseEnvelope{id}.MarshalJSON() + r.Write(clse) + }() + + for _, p := range []struct { + items chan string + source nostr.RelayStore + target nostr.RelayStore + }{{neg.Haves, store, r}, {neg.HaveNots, r, store}} { + p := p + go func() { + for item := range p.items { + evts, _ := p.source.QuerySync(ctx, nostr.Filter{IDs: []string{item}}) + for _, evt := range evts { + p.target.Publish(ctx, *evt) + } + } + }() + } + + err = <-result + if err != nil { + return err + } + + return nil }