From 43bd548dccc70141dcb12679edb0e2a352acab2a Mon Sep 17 00:00:00 2001 From: David Ansari Date: Mon, 10 Oct 2022 16:10:02 +0000 Subject: [PATCH] Handle deprecated classic queue delivery when feature flag classic_queue_type_delivery_support is disabled. --- deps/rabbit/src/rabbit_classic_queue.erl | 2 +- deps/rabbit/src/rabbit_core_ff.erl | 3 ++- deps/rabbitmq_mqtt/src/rabbit_mqtt.erl | 3 --- deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl | 8 +++++++- deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl | 3 +++ deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl | 4 ++++ 6 files changed, 17 insertions(+), 6 deletions(-) diff --git a/deps/rabbit/src/rabbit_classic_queue.erl b/deps/rabbit/src/rabbit_classic_queue.erl index e349b5e572..b7d3d1f8b0 100644 --- a/deps/rabbit/src/rabbit_classic_queue.erl +++ b/deps/rabbit/src/rabbit_classic_queue.erl @@ -545,7 +545,7 @@ send_rejection(Pid, QName, MsgSeqNo) -> end. deliver_to_consumer(Pid, QName, CTag, AckRequired, Message) -> - case has_classic_queue_type_delivery_support() of + case has_classic_queue_type_delivery_support() of true -> Deliver = {deliver, CTag, AckRequired, [Message]}, Evt = {queue_event, QName, Deliver}, diff --git a/deps/rabbit/src/rabbit_core_ff.erl b/deps/rabbit/src/rabbit_core_ff.erl index 1ec4d5f065..651da09250 100644 --- a/deps/rabbit/src/rabbit_core_ff.erl +++ b/deps/rabbit/src/rabbit_core_ff.erl @@ -34,7 +34,8 @@ {stream_queue, #{desc => "Support queues of type `stream`", doc_url => "https://www.rabbitmq.com/stream.html", - stability => stable, + %%TODO remove compatibility code + stability => required, depends_on => [quorum_queue] }}). diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt.erl index c7f5877b7e..a8c5e3ab4d 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt.erl @@ -13,9 +13,6 @@ close_local_client_connections/1]). start(normal, []) -> - %%TODO make feature flag stream_queue 'required' for 3.12 - %% because we rely on rabbit_queue_type interface. - ok = rabbit_feature_flags:enable(stream_queue), rabbit_global_counters:init([{protocol, mqtt}]), {ok, Listeners} = application:get_env(tcp_listeners), {ok, SslListeners} = application:get_env(ssl_listeners), diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl index 05d1d00253..cfaa8cfa7f 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl @@ -11,7 +11,8 @@ -export([info/2, initial_state/2, initial_state/4, process_frame/2, serialise/2, send_will/1, terminate/1, handle_pre_hibernate/0, - handle_ra_event/2, handle_down/2, handle_queue_event/2]). + handle_ra_event/2, handle_down/2, handle_queue_event/2, + handle_deprecated_delivery/2]). %%TODO Use single queue per MQTT subscriber connection? %% * when publishing we store in x-mqtt-publish-qos header the publishing QoS @@ -1197,6 +1198,11 @@ handle_down({'DOWN', _MRef, process, QPid, Reason}, PState0#proc_state{queue_states = QStates} end. +%% Handle deprecated delivery from classic queue. This function is to be +%% removed when feature flag classic_queue_type_delivery_support becomes required. +handle_deprecated_delivery({deliver, ?CONSUMER_TAG, AckRequired, Msg}, PState) -> + {ok, deliver_one_to_client(Msg, AckRequired, PState)}. + handle_queue_event({queue_event, QName, Evt}, PState0 = #proc_state{queue_states = QStates0, unacked_client_pubs = U0}) -> diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl index 29cf66ecf5..549bd3eaf5 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl @@ -127,6 +127,9 @@ handle_cast(QueueEvent = {queue_event, _, _}, State = #state{proc_state = PState}) -> callback_reply(State, rabbit_mqtt_processor:handle_queue_event(QueueEvent, PState)); +handle_cast(Delivery = {deliver, _, _, _}, State = #state{proc_state = PState}) -> + callback_reply(State, rabbit_mqtt_processor:handle_deprecated_delivery(Delivery, PState)); + handle_cast(Msg, State) -> {stop, {mqtt_unexpected_cast, Msg}, State}. diff --git a/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl b/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl index 4ceecbae06..ac523fbd8c 100644 --- a/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl +++ b/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl @@ -170,6 +170,10 @@ websocket_info({'$gen_cast', QueueEvent = {queue_event, _, _}}, [State#state.conn_name, Reason]), stop(State#state{proc_state = PState}) end; +websocket_info({'$gen_cast', Delivery = {deliver, _, _, _}}, + State = #state{proc_state = PState0}) -> + {ok, PState} = rabbit_mqtt_processor:handle_deprecated_delivery(Delivery, PState0), + {[], State#state{proc_state = PState}, hibernate}; websocket_info({'$gen_cast', duplicate_id}, State = #state{ proc_state = ProcState, conn_name = ConnName }) -> rabbit_log_connection:warning("Web MQTT disconnecting a client with duplicate ID '~s' (~p)",