diff --git a/lib/gnat/jetstream/api/stream.ex b/lib/gnat/jetstream/api/stream.ex index 1114cc0..ca663b4 100644 --- a/lib/gnat/jetstream/api/stream.ex +++ b/lib/gnat/jetstream/api/stream.ex @@ -22,6 +22,8 @@ defmodule Gnat.Jetstream.API.Stream do * `:discard` - determines what happens when a Stream reaches its limits. It has the following options: - `:old` - the default option. Old messages are deleted. - `:new` - refuses new messages. + * `:discard_new_per_subject` - - allows to enable discarding new messages per subject when limits are reached. + Requires `discard: :new` and the `:max_msgs_per_subject` to be configured. * `:domain` - JetStream domain, mainly used for leaf nodes. See [JetStream on Leaf Nodes](https://docs.nats.io/running-a-nats-service/configuration/leafnodes/jetstream_leafnodes). * `:duplicate_window` - the window within which to track duplicate messages, expressed in nanoseconds. @@ -91,7 +93,8 @@ defmodule Gnat.Jetstream.API.Stream do num_replicas: 1, retention: :limits, sealed: false, - storage: :file + storage: :file, + discard_new_per_subject: false ] @type nanoseconds :: non_neg_integer() @@ -127,7 +130,8 @@ defmodule Gnat.Jetstream.API.Stream do sources: nil | list(source()), storage: :file | :memory, subjects: nil | list(binary()), - template_owner: nil | binary() + template_owner: nil | binary(), + discard_new_per_subject: boolean() } @typedoc """ @@ -505,6 +509,7 @@ defmodule Gnat.Jetstream.API.Stream do |> put_if_exist(:allow_rollup_hdrs, stream, "allow_rollup_hdrs") |> put_if_exist(:deny_delete, stream, "deny_delete") |> put_if_exist(:deny_purge, stream, "deny_purge") + |> put_if_exist(:discard_new_per_subject, stream, "discard_new_per_subject") |> put_if_exist(:mirror_direct, stream, "mirror_direct") |> put_if_exist(:sealed, stream, "sealed") end diff --git a/test/jetstream/api/stream_test.exs b/test/jetstream/api/stream_test.exs index b76e3b2..4164547 100644 --- a/test/jetstream/api/stream_test.exs +++ b/test/jetstream/api/stream_test.exs @@ -79,6 +79,52 @@ defmodule Gnat.Jetstream.API.StreamTest do assert :ok = Stream.delete(:gnat, "LIST_OFFSET_TEST_TWO") end + test "create stream with discard_new_per_subject: true" do + stream = %Stream{name: "DISCARD_NEW_PER_SUBJECT_TEST", + subjects: ["STREAM_TEST"], + max_msgs_per_subject: 1, + discard_new_per_subject: true, + discard: :new} + assert {:ok, _response} = Stream.create(:gnat, stream) + + assert {:ok, _} = Gnat.request(:gnat, "STREAM_TEST", "first message") + assert {:ok, response} = + Stream.get_message(:gnat, "DISCARD_NEW_PER_SUBJECT_TEST", %{ + last_by_subj: "STREAM_TEST"}) + %{ + data: "first message", + hdrs: nil, + subject: "STREAM_TEST", + time: %DateTime{} + } = response + + assert {:ok, _} = Gnat.request(:gnat, "STREAM_TEST", "second message") + assert {:ok, response} = + Stream.get_message(:gnat, "DISCARD_NEW_PER_SUBJECT_TEST", %{ + last_by_subj: "STREAM_TEST"}) + %{ + data: "first message", + hdrs: nil, + subject: "STREAM_TEST", + time: %DateTime{} + } = response + + Stream.purge(:gnat, "DISCARD_NEW_PER_SUBJECT_TEST") + + assert {:ok, _} = Gnat.request(:gnat, "STREAM_TEST", "second message") + assert {:ok, response} = + Stream.get_message(:gnat, "DISCARD_NEW_PER_SUBJECT_TEST", %{ + last_by_subj: "STREAM_TEST"}) + %{ + data: "second message", + hdrs: nil, + subject: "STREAM_TEST", + time: %DateTime{} + } = response + + assert :ok = Stream.delete(:gnat, "DISCARD_NEW_PER_SUBJECT_TEST") + end + test "updating a stream" do stream = %Stream{name: "UPDATE_TEST", subjects: ["STREAM_TEST"]} assert {:ok, _response} = Stream.create(:gnat, stream)