Skip to content

Commit

Permalink
added loop to the listen method
Browse files Browse the repository at this point in the history
  • Loading branch information
aleksei-okatiev committed Nov 12, 2024
1 parent e640dc3 commit eca10b8
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 13 deletions.
2 changes: 1 addition & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: .
specs:
redis_stream (0.1.5)
redis_stream (0.2.0)

GEM
remote: https://rubygems.org/
Expand Down
16 changes: 9 additions & 7 deletions lib/redis_stream/subscriber.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@ def self.listen(streams:, group: nil, consumer: nil, &block)
create_group(stream_key, group)
end

# listen for up to 10 messages forever
ids = Array.new(streams.length, ">")
messages = RedisStream.client.xreadgroup(group, consumer, streams, ids, count: 1, block: 0, noack: true)

messages.each do |stream, stream_messages|
stream_messages.each do |message_id, message_hash|
yield(stream, message_id, message_hash["name"], JSON.parse(message_hash["json"]))
loop do
# listen for up to 10 messages forever
ids = Array.new(streams.length, ">")
messages = RedisStream.client.xreadgroup(group, consumer, streams, ids, count: 1, block: 0, noack: true)

messages.each do |stream, stream_messages|
stream_messages.each do |message_id, message_hash|
yield(stream, message_id, message_hash["name"], JSON.parse(message_hash["json"]))
end
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/redis_stream/version.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# frozen_string_literal: true

module RedisStream
VERSION = "0.1.5"
VERSION = "0.2.0"
end
20 changes: 16 additions & 4 deletions spec/redis_stream/subscriber_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,24 @@

describe ".listen" do
it "subscribes to the stream" do
described_class.listen(streams: "stream_test") do |message|
expect(message).to eq("test")
allow(described_class).to receive(:loop).and_yield.twice

allow(RedisStream.client).to receive(:xreadgroup).and_return([
["stream_test", [["message_id", {"name" => "test", "json" => "{}"}]]]
])

described_class.listen(streams: "stream_test") do |stream, message_id, name, json|
expect(stream).to eq("stream_test")
expect(message_id).to eq("message_id")
expect(name).to eq("test")
expect(json).to eq({})
end

described_class.listen(streams: ["stream_test"]) do |message|
expect(message).to eq("test")
described_class.listen(streams: ["stream_test"]) do |stream, message_id, name, json|
expect(stream).to eq("stream_test")
expect(message_id).to eq("message_id")
expect(name).to eq("test")
expect(json).to eq({})
end
end
end
Expand Down

0 comments on commit eca10b8

Please sign in to comment.