From 7aa295f5f03f680ca3d02b759538d86d2fa6b408 Mon Sep 17 00:00:00 2001 From: Alexander Kuleshov Date: Thu, 20 Jun 2024 14:10:53 +0500 Subject: [PATCH] Add stream field discard_new_per_subject --- lib/gnat/jetstream/api/stream.ex | 9 ++++-- test/jetstream/api/stream_test.exs | 46 ++++++++++++++++++++++++++++++ 2 files changed, 53 insertions(+), 2 deletions(-) diff --git a/lib/gnat/jetstream/api/stream.ex b/lib/gnat/jetstream/api/stream.ex index 563379e..b8c4e1c 100644 --- a/lib/gnat/jetstream/api/stream.ex +++ b/lib/gnat/jetstream/api/stream.ex @@ -21,6 +21,8 @@ defmodule Gnat.Jetstream.API.Stream do * `:discard` - determines what happens when a Stream reaches its limits. It has the following options: - `:old` - the default option. Old messages are deleted. - `:new` - refuses new messages. + * `:discard_new_per_subject` - - allows to enable discarding new messages per subject when limits are reached. + Requires `discard: :new` and the `:max_msgs_per_subject` to be configured. * `:domain` - JetStream domain, mainly used for leaf nodes. See [JetStream on Leaf Nodes](https://docs.nats.io/running-a-nats-service/configuration/leafnodes/jetstream_leafnodes). * `:duplicate_window` - the window within which to track duplicate messages, expressed in nanoseconds. @@ -87,7 +89,8 @@ defmodule Gnat.Jetstream.API.Stream do num_replicas: 1, retention: :limits, sealed: false, - storage: :file + storage: :file, + discard_new_per_subject: false ] @type nanoseconds :: non_neg_integer() @@ -121,7 +124,8 @@ defmodule Gnat.Jetstream.API.Stream do sources: nil | list(source()), storage: :file | :memory, subjects: nil | list(binary()), - template_owner: nil | binary() + template_owner: nil | binary(), + discard_new_per_subject: boolean() } @typedoc """ @@ -497,6 +501,7 @@ defmodule Gnat.Jetstream.API.Stream do |> put_if_exist(:allow_rollup_hdrs, stream, "allow_rollup_hdrs") |> put_if_exist(:deny_delete, stream, "deny_delete") |> put_if_exist(:deny_purge, stream, "deny_purge") + |> put_if_exist(:discard_new_per_subject, stream, "discard_new_per_subject") |> put_if_exist(:sealed, stream, "sealed") end diff --git a/test/jetstream/api/stream_test.exs b/test/jetstream/api/stream_test.exs index b76e3b2..4164547 100644 --- a/test/jetstream/api/stream_test.exs +++ b/test/jetstream/api/stream_test.exs @@ -79,6 +79,52 @@ defmodule Gnat.Jetstream.API.StreamTest do assert :ok = Stream.delete(:gnat, "LIST_OFFSET_TEST_TWO") end + test "create stream with discard_new_per_subject: true" do + stream = %Stream{name: "DISCARD_NEW_PER_SUBJECT_TEST", + subjects: ["STREAM_TEST"], + max_msgs_per_subject: 1, + discard_new_per_subject: true, + discard: :new} + assert {:ok, _response} = Stream.create(:gnat, stream) + + assert {:ok, _} = Gnat.request(:gnat, "STREAM_TEST", "first message") + assert {:ok, response} = + Stream.get_message(:gnat, "DISCARD_NEW_PER_SUBJECT_TEST", %{ + last_by_subj: "STREAM_TEST"}) + %{ + data: "first message", + hdrs: nil, + subject: "STREAM_TEST", + time: %DateTime{} + } = response + + assert {:ok, _} = Gnat.request(:gnat, "STREAM_TEST", "second message") + assert {:ok, response} = + Stream.get_message(:gnat, "DISCARD_NEW_PER_SUBJECT_TEST", %{ + last_by_subj: "STREAM_TEST"}) + %{ + data: "first message", + hdrs: nil, + subject: "STREAM_TEST", + time: %DateTime{} + } = response + + Stream.purge(:gnat, "DISCARD_NEW_PER_SUBJECT_TEST") + + assert {:ok, _} = Gnat.request(:gnat, "STREAM_TEST", "second message") + assert {:ok, response} = + Stream.get_message(:gnat, "DISCARD_NEW_PER_SUBJECT_TEST", %{ + last_by_subj: "STREAM_TEST"}) + %{ + data: "second message", + hdrs: nil, + subject: "STREAM_TEST", + time: %DateTime{} + } = response + + assert :ok = Stream.delete(:gnat, "DISCARD_NEW_PER_SUBJECT_TEST") + end + test "updating a stream" do stream = %Stream{name: "UPDATE_TEST", subjects: ["STREAM_TEST"]} assert {:ok, _response} = Stream.create(:gnat, stream)