Skip to content

Commit

Permalink
Merge pull request #11867 from rabbitmq/mqtt-credential-expiration
Browse files Browse the repository at this point in the history
Disconnect MQTT client when its credential expires
  • Loading branch information
michaelklishin authored Jul 31, 2024
2 parents d7f2942 + 7fb7833 commit f914982
Show file tree
Hide file tree
Showing 7 changed files with 119 additions and 11 deletions.
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_amqp_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -910,7 +910,7 @@ ensure_credential_expiry_timer(User) ->
ok;
false ->
protocol_error(?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS,
"Credential expired ~b ms ago", [Time])
"Credential expired ~b ms ago", [abs(Time)])
end
end.

Expand Down
2 changes: 1 addition & 1 deletion deps/rabbitmq_auth_backend_oauth2/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ eunit(

broker_for_integration_suites(
extra_plugins = [
"//deps/rabbitmq_mqtt:erlang_app",
"//deps/rabbitmq_web_mqtt:erlang_app",
],
)

Expand Down
2 changes: 1 addition & 1 deletion deps/rabbitmq_auth_backend_oauth2/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ export BUILD_WITHOUT_QUIC
LOCAL_DEPS = inets public_key
BUILD_DEPS = rabbit_common
DEPS = rabbit cowlib jose base64url oauth2_client
TEST_DEPS = cowboy rabbitmq_web_dispatch rabbitmq_ct_helpers rabbitmq_ct_client_helpers amqp_client rabbitmq_mqtt emqtt
TEST_DEPS = cowboy rabbitmq_web_dispatch rabbitmq_ct_helpers rabbitmq_ct_client_helpers amqp_client rabbitmq_web_mqtt emqtt

PLT_APPS += rabbitmqctl

Expand Down
78 changes: 73 additions & 5 deletions deps/rabbitmq_auth_backend_oauth2/test/system_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@ groups() ->
test_failed_connection_with_a_non_token,
test_failed_connection_with_a_token_with_insufficient_vhost_permission,
test_failed_connection_with_a_token_with_insufficient_resource_permission,
more_than_one_resource_server_id_not_allowed_in_one_token
more_than_one_resource_server_id_not_allowed_in_one_token,
mqtt_expirable_token,
web_mqtt_expirable_token,
mqtt_expired_token
]},

{token_refresh, [], [
Expand Down Expand Up @@ -422,15 +425,80 @@ mqtt(Config) ->
{ok, Pub} = emqtt:start_link([{clientid, <<"mqtt-publisher">>} | Opts]),
{ok, _} = emqtt:connect(Pub),
{ok, _} = emqtt:publish(Pub, Topic, Payload, at_least_once),
receive
{publish, #{client_pid := Sub,
topic := Topic,
payload := Payload}} -> ok
receive {publish, #{client_pid := Sub,
topic := Topic,
payload := Payload}} -> ok
after 1000 -> ct:fail("no publish received")
end,
ok = emqtt:disconnect(Sub),
ok = emqtt:disconnect(Pub).

mqtt_expirable_token(Config) ->
mqtt_expirable_token0(tcp_port_mqtt,
[],
fun emqtt:connect/1,
Config).

web_mqtt_expirable_token(Config) ->
mqtt_expirable_token0(tcp_port_web_mqtt,
[{ws_path, "/ws"}],
fun emqtt:ws_connect/1,
Config).

mqtt_expirable_token0(Port, AdditionalOpts, Connect, Config) ->
Topic = <<"test/topic">>,
Payload = <<"mqtt-test-message">>,

Seconds = 4,
Millis = Seconds * 1000,
{_Algo, Token} = generate_expirable_token(Config,
[<<"rabbitmq.configure:*/*/*">>,
<<"rabbitmq.write:*/*/*">>,
<<"rabbitmq.read:*/*/*">>],
Seconds),

Opts = [{port, rabbit_ct_broker_helpers:get_node_config(Config, 0, Port)},
{proto_ver, v5},
{username, <<"">>},
{password, Token}] ++ AdditionalOpts,
{ok, Sub} = emqtt:start_link([{clientid, <<"my subscriber">>} | Opts]),
{ok, _} = Connect(Sub),
{ok, _, [1]} = emqtt:subscribe(Sub, Topic, at_least_once),
{ok, Pub} = emqtt:start_link([{clientid, <<"my publisher">>} | Opts]),
{ok, _} = Connect(Pub),
{ok, _} = emqtt:publish(Pub, Topic, Payload, at_least_once),
receive {publish, #{client_pid := Sub,
topic := Topic,
payload := Payload}} -> ok
after 1000 -> ct:fail("no publish received")
end,

%% reason code "Maximum connect time" defined in
%% https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901208
ReasonCode = 16#A0,
true = unlink(Sub),
true = unlink(Pub),

%% In 4 seconds from now, we expect that RabbitMQ disconnects us because our token expired.
receive {disconnected, ReasonCode, _} -> ok
after Millis * 2 -> ct:fail("missing DISCONNECT packet from server")
end,
receive {disconnected, ReasonCode, _} -> ok
after Millis * 2 -> ct:fail("missing DISCONNECT packet from server")
end.

mqtt_expired_token(Config) ->
{_Algo, Token} = generate_expired_token(Config),
Opts = [{port, rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_mqtt)},
{proto_ver, v5},
{username, <<"">>},
{password, Token}],
ClientId = atom_to_binary(?FUNCTION_NAME),
{ok, C} = emqtt:start_link([{clientid, ClientId} | Opts]),
true = unlink(C),
?assertMatch({error, {bad_username_or_password, _}},
emqtt:connect(C)).

test_successful_connection_with_complex_claim_as_a_map(Config) ->
{_Algo, Token} = generate_valid_token_with_extra_fields(
Config,
Expand Down
22 changes: 22 additions & 0 deletions deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ process_connect(
ok ?= check_user_connection_limit(Username),
{ok, AuthzCtx} ?= check_vhost_access(VHost, User, ClientId, PeerIp),
ok ?= check_user_loopback(Username, PeerIp),
ok ?= ensure_credential_expiry_timer(User, PeerIp),
rabbit_core_metrics:auth_attempt_succeeded(PeerIp, Username, mqtt),
ok = register_client_id(VHost, ClientId, CleanStart, WillProps),
{ok, WillMsg} ?= make_will_msg(Packet),
Expand Down Expand Up @@ -1086,6 +1087,27 @@ check_user_loopback(Username, PeerIp) ->
{error, ?RC_NOT_AUTHORIZED}
end.


ensure_credential_expiry_timer(User = #user{username = Username}, PeerIp) ->
case rabbit_access_control:expiry_timestamp(User) of
never ->
ok;
Ts when is_integer(Ts) ->
Time = (Ts - os:system_time(second)) * 1000,
?LOG_DEBUG("Credential expires in ~b ms frow now "
"(absolute timestamp = ~b seconds since epoch)",
[Time, Ts]),
case Time > 0 of
true ->
_TimerRef = erlang:send_after(Time, self(), credential_expired),
ok;
false ->
auth_attempt_failed(PeerIp, Username),
?LOG_WARNING("Credential expired ~b ms ago", [abs(Time)]),
{error, ?RC_NOT_AUTHORIZED}
end
end.

get_vhost(UserBin, none, Port) ->
get_vhost_no_ssl(UserBin, Port);
get_vhost(UserBin, SslLogin, Port) ->
Expand Down
11 changes: 10 additions & 1 deletion deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ handle_cast({duplicate_id, SendWill},
{stop, {shutdown, duplicate_id}, {SendWill, State}};

handle_cast({close_connection, Reason},
State = #state{conn_name = ConnName, proc_state = PState}) ->
State = #state{conn_name = ConnName,
proc_state = PState}) ->
?LOG_WARNING("MQTT disconnecting client ~tp with client ID '~ts', reason: ~ts",
[ConnName, rabbit_mqtt_processor:info(client_id, PState), Reason]),
case Reason of
Expand Down Expand Up @@ -209,6 +210,14 @@ handle_info({keepalive, Req}, State = #state{proc_state = PState,
{stop, Reason, State}
end;

handle_info(credential_expired,
State = #state{conn_name = ConnName,
proc_state = PState}) ->
?LOG_WARNING("MQTT disconnecting client ~tp with client ID '~ts' because credential expired",
[ConnName, rabbit_mqtt_processor:info(client_id, PState)]),
rabbit_mqtt_processor:send_disconnect(?RC_MAXIMUM_CONNECT_TIME, PState),
{stop, {shutdown, {disconnect, server_initiated}}, State};

handle_info(login_timeout, State = #state{proc_state = connect_packet_unprocessed,
conn_name = ConnName}) ->
%% The connection is also closed if the CONNECT packet happens to
Expand Down
13 changes: 11 additions & 2 deletions deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,9 @@ websocket_info({'$gen_cast', {duplicate_id, SendWill}},
rabbit_mqtt_processor:send_disconnect(?RC_SESSION_TAKEN_OVER, ProcState),
defer_close(?CLOSE_NORMAL, SendWill),
{[], State};
websocket_info({'$gen_cast', {close_connection, Reason}}, State = #state{proc_state = ProcState,
conn_name = ConnName}) ->
websocket_info({'$gen_cast', {close_connection, Reason}},
State = #state{proc_state = ProcState,
conn_name = ConnName}) ->
?LOG_WARNING("Web MQTT disconnecting client with ID '~s' (~p), reason: ~s",
[rabbit_mqtt_processor:info(client_id, ProcState), ConnName, Reason]),
case Reason of
Expand Down Expand Up @@ -215,6 +216,14 @@ websocket_info({keepalive, Req}, State = #state{proc_state = ProcState,
[ConnName, Reason]),
stop(State)
end;
websocket_info(credential_expired,
State = #state{proc_state = ProcState,
conn_name = ConnName}) ->
?LOG_WARNING("Web MQTT disconnecting client with ID '~s' (~p) because credential expired",
[rabbit_mqtt_processor:info(client_id, ProcState), ConnName]),
rabbit_mqtt_processor:send_disconnect(?RC_MAXIMUM_CONNECT_TIME, ProcState),
defer_close(?CLOSE_NORMAL),
{[], State};
websocket_info(emit_stats, State) ->
{[], emit_stats(State), hibernate};
websocket_info({{'DOWN', _QName}, _MRef, process, _Pid, _Reason} = Evt,
Expand Down

0 comments on commit f914982

Please sign in to comment.