React to basic.cancel from server by UNSUBSCRIB(E)ing

This commit is contained in:
Steve Powell 2012-07-11 11:44:12 +01:00
parent 72f08da72e
commit e33b0df580
3 changed files with 79 additions and 46 deletions

View File

@ -28,11 +28,14 @@
-define(HEADER_MESSAGE_ID, "message-id").
-define(HEADER_PASSCODE, "passcode").
-define(HEADER_PERSISTENT, "persistent").
-define(HEADER_PREFETCH_COUNT, "prefetch-count").
-define(HEADER_PRIORITY, "priority").
-define(HEADER_RECEIPT, "receipt").
-define(HEADER_REPLY_TO, "reply-to").
-define(HEADER_SERVER, "server").
-define(HEADER_SESSION, "session").
-define(HEADER_SUBSCRIPTION, "subscription").
-define(HEADER_TRANSACTION, "transaction").
-define(HEADER_VERSION, "version").
-define(MESSAGE_ID_SEPARATOR, "@@").

View File

@ -138,6 +138,8 @@ handle_info(#'basic.ack'{delivery_tag = Tag, multiple = IsMulti}, State) ->
handle_info({Delivery = #'basic.deliver'{},
#amqp_msg{props = Props, payload = Payload}}, State) ->
{noreply, send_delivery(Delivery, Props, Payload, State), hibernate};
handle_info(#'basic.cancel'{consumer_tag = Ctag}, State) ->
{noreply, server_cancel_consumer(Ctag, State), hibernate};
handle_info({'EXIT', Conn,
{shutdown, {server_initiated_close, Code, Explanation}}},
State = #state{connection = Conn}) ->
@ -150,7 +152,6 @@ handle_info({inet_reply, _, ok}, State) ->
handle_info({bump_credit, Msg}, State) ->
credit_flow:handle_bump_msg(Msg),
{noreply, State, hibernate};
handle_info({inet_reply, _, Status}, State) ->
{stop, Status, State}.
@ -340,6 +341,24 @@ ack_action(Command, Frame,
%% Internal helpers for processing frames callbacks
%%----------------------------------------------------------------------------
server_cancel_consumer(ConsumerTag, State = #state{subscriptions = Subs}) ->
case dict:find(ConsumerTag, Subs) of
error ->
error("Server cancelled unknown subscription",
"Consumer tag ~p is not associated with a subscription.\n",
[ConsumerTag],
State);
{ok, Subscription = #subscription{description = Description}} ->
send_error_frame("Server cancelled subscription",
[{?HEADER_SUBSCRIPTION, ConsumerTag}],
"The server has cancelled a subscription.\n"
"No more messages will be delivered for ~p.\n",
[Description],
State),
cancel_found_subscription(ConsumerTag, Subscription,
#stomp_frame{}, State)
end.
cancel_subscription({error, invalid_prefix}, _Frame, State) ->
error("Invalid id",
"UNSUBSCRIBE 'id' may not start with ~s~n",
@ -352,8 +371,7 @@ cancel_subscription({error, _}, _Frame, State) ->
State);
cancel_subscription({ok, ConsumerTag, Description}, Frame,
State = #state{channel = MainChannel,
subscriptions = Subs}) ->
State = #state{subscriptions = Subs}) ->
case dict:find(ConsumerTag, Subs) of
error ->
error("No subscription found",
@ -361,43 +379,50 @@ cancel_subscription({ok, ConsumerTag, Description}, Frame,
"Subscription to ~p not found.\n",
[Description],
State);
{ok, #subscription{dest_hdr = DestHdr, channel = SubChannel}} ->
case amqp_channel:call(SubChannel,
#'basic.cancel'{
consumer_tag = ConsumerTag}) of
#'basic.cancel_ok'{consumer_tag = ConsumerTag} ->
ok = ensure_subchannel_closed(SubChannel, MainChannel),
NewSubs = dict:erase(ConsumerTag, Subs),
maybe_delete_durable_sub(DestHdr, Frame,
State#state{
subscriptions = NewSubs});
_ ->
error("Failed to cancel subscription",
"UNSUBSCRIBE to ~p failed.\n",
[Description],
State)
end
{ok, Subscription} ->
cancel_found_subscription(ConsumerTag, Subscription, Frame, State)
end.
maybe_delete_durable_sub(DestHdr, Frame, State = #state{channel = Channel}) ->
case rabbit_stomp_util:parse_destination(DestHdr) of
{ok, {topic, Name}} ->
case rabbit_stomp_frame:boolean_header(Frame,
?HEADER_PERSISTENT, false) of
true ->
{ok, Id} = rabbit_stomp_frame:header(Frame, ?HEADER_ID),
QName =
rabbit_stomp_util:durable_subscription_queue(Name, Id),
amqp_channel:call(Channel, #'queue.delete'{queue = QName,
nowait = false}),
ok(State);
false ->
ok(State)
end;
cancel_found_subscription(ConsumerTag,
#subscription{dest_hdr = DestHdr,
channel = SubChannel,
description = Description},
Frame,
State = #state{channel = MainChannel,
subscriptions = Subs}) ->
case amqp_channel:call(SubChannel,
#'basic.cancel'{
consumer_tag = ConsumerTag}) of
#'basic.cancel_ok'{consumer_tag = ConsumerTag} ->
ok = ensure_subchannel_closed(SubChannel, MainChannel),
NewSubs = dict:erase(ConsumerTag, Subs),
{ok, Dest} =
rabbit_stomp_util:parse_destination(DestHdr),
maybe_delete_durable_sub(Dest, Frame,
State#state{subscriptions = NewSubs});
_ ->
ok(State)
error("Failed to cancel subscription",
"UNSUBSCRIBE to ~p failed.\n",
[Description],
State)
end.
maybe_delete_durable_sub({topic, Name}, Frame,
State = #state{channel = Channel}) ->
case rabbit_stomp_frame:boolean_header(Frame,
?HEADER_PERSISTENT, false) of
true ->
{ok, Id} = rabbit_stomp_frame:header(Frame, ?HEADER_ID),
QName = rabbit_stomp_util:durable_subscription_queue(Name, Id),
amqp_channel:call(Channel, #'queue.delete'{queue = QName,
nowait = false}),
ok(State);
false ->
ok(State)
end;
maybe_delete_durable_sub(_Destination, _Frame, State) ->
ok(State).
ensure_subchannel_closed(SubChannel, MainChannel)
when SubChannel == MainChannel ->
ok;
@ -499,8 +524,8 @@ do_subscribe(Destination, DestHdr, Frame,
connection = Connection,
channel = MainChannel}) ->
Prefetch =
rabbit_stomp_frame:integer_header(Frame, "prefetch-count", undefined),
rabbit_stomp_frame:integer_header(Frame, ?HEADER_PREFETCH_COUNT,
undefined),
Channel = case Prefetch of
undefined ->
MainChannel;
@ -767,7 +792,7 @@ accumulate_receipts1(DeliveryTag, {_Key, Value, PR}, Acc) ->
%%----------------------------------------------------------------------------
transactional(Frame) ->
case rabbit_stomp_frame:header(Frame, "transaction") of
case rabbit_stomp_frame:header(Frame, ?HEADER_TRANSACTION) of
{ok, Transaction} -> {yes, Transaction};
not_found -> no
end.
@ -889,7 +914,7 @@ ensure_queue(_, {queue, Name}, _Frame, Channel, DestQs) ->
end,
{ok, Queue, DestQs1};
ensure_queue(subscribe, {topic, Name}, Frame, Channel, DestQs) ->
%% Create queue for SUBSCRIBE on /topic destinations Queues are
%% Create queue for SUBSCRIBE on /topic destinations. Queues are
%% anonymous, auto_delete and exclusive for transient
%% subscriptions. Durable subscriptions get shared, named, durable
%% queues.
@ -903,9 +928,7 @@ ensure_queue(subscribe, {topic, Name}, Frame, Channel, DestQs) ->
false ->
#'queue.declare'{auto_delete = true, exclusive = true}
end,
#'queue.declare_ok'{queue = Queue} =
amqp_channel:call(Channel, Method),
#'queue.declare_ok'{queue = Queue} = amqp_channel:call(Channel, Method),
{ok, Queue, DestQs};
ensure_queue(send, {topic, _}, _Frame, _Channel, DestQs) ->
%% Don't create queues on SEND for /topic destinations
@ -979,12 +1002,19 @@ send_frame(Frame, State = #state{send_fun = SendFun}) ->
SendFun(async, rabbit_stomp_frame:serialize(Frame)),
State.
send_error(Message, Detail, State) ->
send_error_frame(Message, ExtraHeaders, Format, Args, State) ->
send_error_frame(Message, ExtraHeaders, rabbit_misc:format(Format, Args), State).
send_error_frame(Message, ExtraHeaders, Detail, State) ->
send_frame("ERROR", [{"message", Message},
{"content-type", "text/plain"},
{"version", string:join(?SUPPORTED_VERSIONS, ",")}],
Detail, State).
{"version", string:join(?SUPPORTED_VERSIONS, ",")}]
++ ExtraHeaders,
Detail, State).
send_error(Message, Detail, State) ->
send_error_frame(Message, [], Detail, State).
send_error(Message, Format, Args, State) ->
send_error(Message, rabbit_misc:format(Format, Args), State).

View File

@ -103,7 +103,7 @@ message_headers(SessionId,
end,
case ConsumerTag of
<<?INTERNAL_TAG_PREFIX, Id/binary>> ->
[{"subscription", binary_to_list(Id)} | Basic];
[{?HEADER_SUBSCRIPTION, binary_to_list(Id)} | Basic];
_ ->
Basic
end,