Skip to content

Commit

Permalink
Handle instantaneous and unsolicited PUBACKs (#158)
Browse files Browse the repository at this point in the history
Prior to this commit, it was possible to crash the read thread
by sending a PUBACK with an unexpected ID, e.g. an ID the client
had not used before, or had already deleted the queue for.
These packets will now simple be ignored.

Furthermore, with QoS 1 or 2, the PUBACK queue for a packet
was only created _after_ the packet had already been published.
If the PUBACK arrived within this tiny gap, the read thread would crash.
We fix this by creating the queue before starting to publish.
  • Loading branch information
leoarnold authored May 19, 2024
1 parent 75c70e6 commit b2e48d0
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 11 deletions.
15 changes: 6 additions & 9 deletions lib/mqtt/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -332,14 +332,11 @@ def publish(topic, payload = '', retain = false, qos = 0)
:payload => payload
)

# Send the packet
res = send_packet(packet)

return if qos.zero?
queue = qos.zero? ? nil : wait_for_puback(packet.id)

queue = Queue.new
res = send_packet(packet)

wait_for_puback packet.id, queue
return unless queue

deadline = current_time + @ack_timeout

Expand Down Expand Up @@ -488,9 +485,9 @@ def receive_packet
Thread.current[:parent].raise(exp)
end

def wait_for_puback(id, queue)
def wait_for_puback(id)
@pubacks_semaphore.synchronize do
@pubacks[id] = queue
@pubacks[id] = Queue.new
end
end

Expand All @@ -502,7 +499,7 @@ def handle_packet(packet)
@last_ping_response = current_time
elsif packet.class == MQTT::Packet::Puback
@pubacks_semaphore.synchronize do
@pubacks[packet.id] << packet
@pubacks[packet.id] << packet if @pubacks[packet.id]
end
end
# Ignore all other packets
Expand Down
18 changes: 16 additions & 2 deletions spec/mqtt_client_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -641,11 +641,11 @@ def inject_puback(packet)
@injected_pubacks[packet.id] = packet
end

def wait_for_puback(id, queue)
def wait_for_puback(id)
packet = @injected_pubacks.fetch(id) {
return super
}
queue << packet
Queue.new << packet
end
end

Expand Down Expand Up @@ -745,6 +745,20 @@ def wait_for_puback(id, queue)
expect(client).to receive(:send_packet) { |packet| expect(packet.id).to eq(2) }
client.publish "topic", "message", false, 1
end

it "does not crash when receiving a PUBACK for a packet it never sent" do
expect { client.send(:handle_packet, MQTT::Packet::Puback.new(:id => 666)) }.to_not raise_error
end

it "does not crash with QoS 1 when the broker sends the PUBACK instantly" do
allow(client).to receive(:send_packet).and_wrap_original do |send_packet, packet, *args, **kwargs, &block|
send_packet.call(packet, *args, **kwargs, &block).tap do
client.send(:handle_packet, MQTT::Packet::Puback.new(:id => packet.id))
end
end

expect { client.publish("topic", "message", false, 1) }.to_not raise_error
end
end

describe "when calling the 'subscribe' method" do
Expand Down

0 comments on commit b2e48d0

Please sign in to comment.