Add missing exchange write access check
when an MQTT will message is published.
This commit is contained in:
parent
ea57cb65f4
commit
dd372619f8
|
|
@ -255,39 +255,24 @@ process_request(?PUBLISH,
|
|||
packet_id = PacketId },
|
||||
payload = Payload},
|
||||
State0 = #state{unacked_client_pubs = U,
|
||||
cfg = #cfg{retainer_pid = RPid,
|
||||
proto_ver = ProtoVer}}) ->
|
||||
cfg = #cfg{proto_ver = ProtoVer}}) ->
|
||||
EffectiveQos = maybe_downgrade_qos(Qos),
|
||||
rabbit_global_counters:messages_received(ProtoVer, 1),
|
||||
State = maybe_increment_publisher(State0),
|
||||
Publish = fun() ->
|
||||
Msg = #mqtt_msg{retain = Retain,
|
||||
qos = EffectiveQos,
|
||||
topic = Topic,
|
||||
dup = Dup,
|
||||
packet_id = PacketId,
|
||||
payload = Payload},
|
||||
case publish_to_queues(Msg, State) of
|
||||
{ok, _} = Ok ->
|
||||
case Retain of
|
||||
false ->
|
||||
ok;
|
||||
true ->
|
||||
hand_off_to_retainer(RPid, Topic, Msg)
|
||||
end,
|
||||
Ok;
|
||||
Error ->
|
||||
Error
|
||||
end
|
||||
end,
|
||||
Msg = #mqtt_msg{retain = Retain,
|
||||
qos = EffectiveQos,
|
||||
topic = Topic,
|
||||
dup = Dup,
|
||||
packet_id = PacketId,
|
||||
payload = Payload},
|
||||
case EffectiveQos of
|
||||
?QOS_0 ->
|
||||
publish_to_queues_with_checks(Topic, Publish, State);
|
||||
publish_to_queues_with_checks(Msg, State);
|
||||
?QOS_1 ->
|
||||
rabbit_global_counters:messages_received_confirm(ProtoVer, 1),
|
||||
case rabbit_mqtt_confirms:contains(PacketId, U) of
|
||||
false ->
|
||||
publish_to_queues_with_checks(Topic, Publish, State);
|
||||
publish_to_queues_with_checks(Msg, State);
|
||||
true ->
|
||||
%% Client re-sent this PUBLISH packet.
|
||||
%% We already sent this message to target queues awaiting confirmations.
|
||||
|
|
@ -1226,25 +1211,16 @@ terminate(SendWill, ConnName, ProtoFamily,
|
|||
maybe_decrement_publisher(State),
|
||||
maybe_delete_mqtt_qos0_queue(State).
|
||||
|
||||
maybe_send_will(
|
||||
true, ConnStr,
|
||||
#state{cfg = #cfg{retainer_pid = RPid,
|
||||
will_msg = WillMsg = #mqtt_msg{retain = Retain,
|
||||
topic = Topic}}
|
||||
} = State) ->
|
||||
?LOG_DEBUG("sending MQTT will message to topic ~s on connection ~s",
|
||||
[Topic, ConnStr]),
|
||||
case check_topic_access(Topic, write, State) of
|
||||
ok ->
|
||||
_ = publish_to_queues(WillMsg, State),
|
||||
case Retain of
|
||||
false ->
|
||||
ok;
|
||||
true ->
|
||||
hand_off_to_retainer(RPid, Topic, WillMsg)
|
||||
end;
|
||||
{error, access_refused = Reason} ->
|
||||
?LOG_ERROR("failed to send will message: ~p", [Reason])
|
||||
-spec maybe_send_will(boolean(), binary(), state()) -> ok.
|
||||
maybe_send_will(true, ConnStr,
|
||||
State = #state{cfg = #cfg{will_msg = WillMsg = #mqtt_msg{topic = Topic}}}) ->
|
||||
case publish_to_queues_with_checks(WillMsg, State) of
|
||||
{ok, _} ->
|
||||
?LOG_DEBUG("sent MQTT will message to topic ~s on connection ~s",
|
||||
[Topic, ConnStr]);
|
||||
{error, Reason, _} ->
|
||||
?LOG_DEBUG("failed to send MQTT will message to topic ~s on connection ~s: ~p",
|
||||
[Topic, ConnStr, Reason])
|
||||
end;
|
||||
maybe_send_will(_, _, _) ->
|
||||
ok.
|
||||
|
|
@ -1546,17 +1522,31 @@ trace_tap_out(Msg0 = {?QUEUE_TYPE_QOS_0, _, _, _, _},
|
|||
trace_tap_out(Msg, State)
|
||||
end.
|
||||
|
||||
-spec publish_to_queues_with_checks(mqtt_msg(), state()) ->
|
||||
{ok, state()} | {error, any(), state()}.
|
||||
publish_to_queues_with_checks(
|
||||
TopicName, PublishFun,
|
||||
#state{cfg = #cfg{exchange = Exchange},
|
||||
auth_state = #auth_state{user = User,
|
||||
authz_ctx = AuthzCtx}
|
||||
} = State) ->
|
||||
Msg = #mqtt_msg{topic = Topic,
|
||||
retain = Retain},
|
||||
State = #state{cfg = #cfg{exchange = Exchange,
|
||||
retainer_pid = RPid},
|
||||
auth_state = #auth_state{user = User,
|
||||
authz_ctx = AuthzCtx}}) ->
|
||||
case check_resource_access(User, Exchange, write, AuthzCtx) of
|
||||
ok ->
|
||||
case check_topic_access(TopicName, write, State) of
|
||||
case check_topic_access(Topic, write, State) of
|
||||
ok ->
|
||||
PublishFun();
|
||||
case publish_to_queues(Msg, State) of
|
||||
{ok, _} = Ok ->
|
||||
case Retain of
|
||||
false ->
|
||||
ok;
|
||||
true ->
|
||||
hand_off_to_retainer(RPid, Topic, Msg)
|
||||
end,
|
||||
Ok;
|
||||
Error ->
|
||||
Error
|
||||
end;
|
||||
{error, access_refused} ->
|
||||
{error, unauthorized, State}
|
||||
end;
|
||||
|
|
|
|||
|
|
@ -77,6 +77,7 @@ groups() ->
|
|||
no_queue_delete_permission,
|
||||
no_queue_declare_permission,
|
||||
no_publish_permission,
|
||||
no_publish_permission_will_message,
|
||||
no_topic_read_permission,
|
||||
no_topic_write_permission,
|
||||
topic_write_permission_variable_expansion,
|
||||
|
|
@ -339,6 +340,7 @@ end_per_testcase(Testcase, Config) when Testcase == no_queue_bind_permission;
|
|||
Testcase == no_queue_delete_permission;
|
||||
Testcase == no_queue_declare_permission;
|
||||
Testcase == no_publish_permission;
|
||||
Testcase == no_publish_permission_will_message;
|
||||
Testcase == no_topic_read_permission;
|
||||
Testcase == no_topic_write_permission;
|
||||
Testcase == topic_write_permission_variable_expansion;
|
||||
|
|
@ -699,6 +701,43 @@ no_publish_permission(Config) ->
|
|||
]),
|
||||
ok.
|
||||
|
||||
%% Test that publish permission checks are performed for the will message.
|
||||
no_publish_permission_will_message(Config) ->
|
||||
%% Allow write access to queue.
|
||||
%% Disallow write access to exchange.
|
||||
set_permissions(".*", "^mqtt-subscription.*qos1$", ".*", Config),
|
||||
Topic = <<"will/topic">>,
|
||||
Opts = [{will_topic, Topic},
|
||||
{will_payload, <<"will payload">>},
|
||||
{will_qos, 0}],
|
||||
{ok, C} = connect_user(?config(mqtt_user, Config),
|
||||
?config(mqtt_password, Config),
|
||||
Config,
|
||||
<<"client-with-will">>,
|
||||
Opts),
|
||||
{ok, _} = emqtt:connect(C),
|
||||
timer:sleep(100),
|
||||
[ServerPublisherPid] = util:all_connection_pids(Config),
|
||||
|
||||
Sub = open_mqtt_connection(Config),
|
||||
{ok, _, [1]} = emqtt:subscribe(Sub, Topic, qos1),
|
||||
|
||||
unlink(C),
|
||||
%% Trigger sending of will message.
|
||||
erlang:exit(ServerPublisherPid, test_will),
|
||||
|
||||
%% We expect to not receive a will message because of missing publish permission.
|
||||
receive Unexpected -> ct:fail("Received unexpectedly: ~p", [Unexpected])
|
||||
after 300 -> ok
|
||||
end,
|
||||
|
||||
wait_log(Config,
|
||||
[{["MQTT resource access refused: write access to exchange "
|
||||
"'amq.topic' in vhost 'mqtt-vhost' refused for user 'mqtt-user'"],
|
||||
fun () -> stop end}
|
||||
]),
|
||||
ok = emqtt:disconnect(Sub).
|
||||
|
||||
no_topic_read_permission(Config) ->
|
||||
set_permissions(".*", ".*", ".*", Config),
|
||||
set_topic_permissions("^allow-write\\..*", "^allow-read\\..*", Config),
|
||||
|
|
|
|||
|
|
@ -243,10 +243,10 @@ pubsub_separate_connections(Config) ->
|
|||
will_with_disconnect(Config) ->
|
||||
LastWillTopic = <<"/topic/last-will">>,
|
||||
LastWillMsg = <<"last will message">>,
|
||||
PubOpts = [{will_topic, LastWillTopic},
|
||||
{will_payload, LastWillMsg},
|
||||
{will_qos, 1}],
|
||||
Pub = connect(<<(atom_to_binary(?FUNCTION_NAME))/binary, "_publisher">>, Config, PubOpts),
|
||||
Opts = [{will_topic, LastWillTopic},
|
||||
{will_payload, LastWillMsg},
|
||||
{will_qos, 1}],
|
||||
Pub = connect(<<(atom_to_binary(?FUNCTION_NAME))/binary, "_publisher">>, Config, Opts),
|
||||
Sub = connect(<<(atom_to_binary(?FUNCTION_NAME))/binary, "_subscriber">>, Config),
|
||||
{ok, _, [1]} = emqtt:subscribe(Sub, LastWillTopic, qos1),
|
||||
|
||||
|
|
@ -260,10 +260,10 @@ will_with_disconnect(Config) ->
|
|||
will_without_disconnect(Config) ->
|
||||
LastWillTopic = <<"/topic/last-will">>,
|
||||
LastWillMsg = <<"last will message">>,
|
||||
PubOpts = [{will_topic, LastWillTopic},
|
||||
{will_payload, LastWillMsg},
|
||||
{will_qos, 1}],
|
||||
Pub = connect(<<(atom_to_binary(?FUNCTION_NAME))/binary, "_publisher">>, Config, PubOpts),
|
||||
Opts = [{will_topic, LastWillTopic},
|
||||
{will_payload, LastWillMsg},
|
||||
{will_qos, 1}],
|
||||
Pub = connect(<<(atom_to_binary(?FUNCTION_NAME))/binary, "_publisher">>, Config, Opts),
|
||||
timer:sleep(100),
|
||||
[ServerPublisherPid] = all_connection_pids(Config),
|
||||
Sub = connect(<<(atom_to_binary(?FUNCTION_NAME))/binary, "_subscriber">>, Config),
|
||||
|
|
|
|||
Loading…
Reference in New Issue