Skip to content

Commit

Permalink
Return new_topic in open message when a fresh topic is subscribed to
Browse files Browse the repository at this point in the history
Associated tests
  • Loading branch information
karmanyaahm committed Dec 17, 2023
1 parent 9bbb1ac commit 0d607c7
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 3 deletions.
11 changes: 9 additions & 2 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1218,16 +1218,19 @@ func (s *Server) handleSubscribeHTTP(w http.ResponseWriter, r *http.Request, v *
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

createdNewTopics := false
subscriberIDs := make([]int, 0)
for _, t := range topics {
createdNewTopics = createdNewTopics || t.NeverSubscribed()
subscriberIDs = append(subscriberIDs, t.Subscribe(sub, v.MaybeUserID(), cancel))
}
defer func() {
for i, subscriberID := range subscriberIDs {
topics[i].Unsubscribe(subscriberID) // Order!
}
}()
if err := sub(v, newOpenMessage(topicsStr)); err != nil { // Send out open message
if err := sub(v, newOpenMessage(topicsStr, createdNewTopics)); err != nil { // Send out open message
return err
}
if err := s.sendOldMessages(topics, since, scheduled, v, sub); err != nil {
Expand Down Expand Up @@ -1367,16 +1370,20 @@ func (s *Server) handleSubscribeWS(w http.ResponseWriter, r *http.Request, v *vi
}
return s.sendOldMessages(topics, since, scheduled, v, sub)
}

createdNewTopic := false
subscriberIDs := make([]int, 0)
for _, t := range topics {
createdNewTopic = createdNewTopic || t.NeverSubscribed()
subscriberIDs = append(subscriberIDs, t.Subscribe(sub, v.MaybeUserID(), cancel))
}
defer func() {
for i, subscriberID := range subscriberIDs {
topics[i].Unsubscribe(subscriberID) // Order!
}
}()
if err := sub(v, newOpenMessage(topicsStr)); err != nil { // Send out open message

if err := sub(v, newOpenMessage(topicsStr, createdNewTopic)); err != nil { // Send out open message
return err
}
if err := s.sendOldMessages(topics, since, scheduled, v, sub); err != nil {
Expand Down
52 changes: 51 additions & 1 deletion server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func TestServer_SubscribeOpenAndKeepalive(t *testing.T) {

require.Equal(t, openEvent, messages[0].Event)
require.Equal(t, "mytopic", messages[0].Topic)
require.Equal(t, "", messages[0].Message)
require.Equal(t, "new_topic", messages[0].Message)
require.Equal(t, "", messages[0].Title)
require.Equal(t, 0, messages[0].Priority)
require.Nil(t, messages[0].Tags)
Expand All @@ -147,6 +147,56 @@ func TestServer_SubscribeOpenAndKeepalive(t *testing.T) {
require.Equal(t, "", messages[1].Title)
require.Equal(t, 0, messages[1].Priority)
require.Nil(t, messages[1].Tags)

// The next time subscribing to the same topic will not result in new_topic on open
rr = httptest.NewRecorder()
ctx, cancel = context.WithCancel(context.Background())
req, err = http.NewRequestWithContext(ctx, "GET", "/mytopic/json", nil)
if err != nil {
t.Fatal(err)
}
go func() {
s.handle(rr, req)
doneChan <- true
}()
time.Sleep(300 * time.Millisecond)
cancel()
<-doneChan

messages = toMessages(t, rr.Body.String())
require.Equal(t, 1, len(messages))

require.Equal(t, openEvent, messages[0].Event)
require.Equal(t, "mytopic", messages[0].Topic)
require.Equal(t, "", messages[0].Message)
require.Equal(t, "", messages[0].Title)
require.Equal(t, 0, messages[0].Priority)
require.Nil(t, messages[0].Tags)

// Subscribing to any new topic again will result in new_topic being sent
rr = httptest.NewRecorder()
ctx, cancel = context.WithCancel(context.Background())
req, err = http.NewRequestWithContext(ctx, "GET", "/mytopic,topic2/json", nil)
if err != nil {
t.Fatal(err)
}
go func() {
s.handle(rr, req)
doneChan <- true
}()
time.Sleep(300 * time.Millisecond)
cancel()
<-doneChan

messages = toMessages(t, rr.Body.String())
require.Equal(t, 1, len(messages))

require.Equal(t, openEvent, messages[0].Event)
require.Equal(t, "mytopic,topic2", messages[0].Topic)
require.Equal(t, "new_topic", messages[0].Message)
require.Equal(t, "", messages[0].Title)
require.Equal(t, 0, messages[0].Priority)
require.Nil(t, messages[0].Tags)
}

func TestServer_PublishAndSubscribe(t *testing.T) {
Expand Down

0 comments on commit 0d607c7

Please sign in to comment.