Use stream instead of target
This commit is contained in:
parent
b61a79b9ff
commit
2d19e85925
|
|
@ -13,17 +13,17 @@
|
|||
-define(COMMAND_OPEN, 12).
|
||||
-define(COMMAND_CLOSE, 13).
|
||||
-define(COMMAND_HEARTBEAT, 14).
|
||||
-define(COMMAND_CREATE_TARGET, 998).
|
||||
-define(COMMAND_DELETE_TARGET, 999).
|
||||
-define(COMMAND_CREATE_STREAM, 998).
|
||||
-define(COMMAND_DELETE_STREAM, 999).
|
||||
|
||||
-define(VERSION_0, 0).
|
||||
|
||||
-define(RESPONSE_CODE_OK, 0).
|
||||
-define(RESPONSE_CODE_TARGET_DOES_NOT_EXIST, 1).
|
||||
-define(RESPONSE_CODE_STREAM_DOES_NOT_EXIST, 1).
|
||||
-define(RESPONSE_CODE_SUBSCRIPTION_ID_ALREADY_EXISTS, 2).
|
||||
-define(RESPONSE_CODE_SUBSCRIPTION_ID_DOES_NOT_EXIST, 3).
|
||||
-define(RESPONSE_CODE_TARGET_ALREADY_EXISTS, 4).
|
||||
-define(RESPONSE_CODE_TARGET_DELETED, 5).
|
||||
-define(RESPONSE_CODE_STREAM_ALREADY_EXISTS, 4).
|
||||
-define(RESPONSE_CODE_STREAM_DELETED, 5).
|
||||
-define(RESPONSE_SASL_MECHANISM_NOT_SUPPORTED, 6).
|
||||
-define(RESPONSE_AUTHENTICATION_FAILURE, 7).
|
||||
-define(RESPONSE_SASL_ERROR, 8).
|
||||
|
|
|
|||
|
|
@ -101,8 +101,8 @@ register() ->
|
|||
unregister() ->
|
||||
gen_server:call(?MODULE, {unregister, self()}).
|
||||
|
||||
lookup(Target) ->
|
||||
gen_server:call(?MODULE, {lookup, Target}).
|
||||
lookup(Stream) ->
|
||||
gen_server:call(?MODULE, {lookup, Stream}).
|
||||
|
||||
replicas_for_current_node() ->
|
||||
rabbit_mnesia:cluster_nodes(all) -- [node()].
|
||||
|
|
@ -167,8 +167,8 @@ handle_call({unregister, Pid}, _From, #state{listeners = Listeners, monitors = M
|
|||
maps:remove(Pid, Monitors)
|
||||
end,
|
||||
{reply, ok, State#state{listeners = lists:delete(Pid, Listeners), monitors = Monitors1}};
|
||||
handle_call({lookup, Target}, _From, State) ->
|
||||
Res = case read(Target) of
|
||||
handle_call({lookup, Stream}, _From, State) ->
|
||||
Res = case read(Stream) of
|
||||
[] ->
|
||||
cluster_not_found;
|
||||
[#?MODULE{leader_pid = LeaderPid}] ->
|
||||
|
|
|
|||
|
|
@ -7,7 +7,7 @@
|
|||
name,
|
||||
helper_sup,
|
||||
listen_socket, socket, clusters, data, consumers,
|
||||
target_subscriptions, credits,
|
||||
stream_subscriptions, credits,
|
||||
blocked,
|
||||
authentication_state, user, virtual_host,
|
||||
connection_step, % tcp_connected, authenticating, authenticated, tuning, tuned, opened, failure, closing, closing_done
|
||||
|
|
@ -16,7 +16,7 @@
|
|||
}).
|
||||
|
||||
-record(consumer, {
|
||||
socket, leader, offset, subscription_id, segment, credit, target
|
||||
socket, leader, offset, subscription_id, segment, credit, stream
|
||||
}).
|
||||
|
||||
-record(configuration, {
|
||||
|
|
@ -53,7 +53,7 @@ init([KeepaliveSup, Transport, Ref, #{initial_credits := InitialCredits,
|
|||
helper_sup = KeepaliveSup,
|
||||
socket = RealSocket, data = none,
|
||||
clusters = #{},
|
||||
consumers = #{}, target_subscriptions = #{},
|
||||
consumers = #{}, stream_subscriptions = #{},
|
||||
blocked = false, credits = Credits,
|
||||
authentication_state = none, user = none,
|
||||
connection_step = tcp_connected,
|
||||
|
|
@ -125,7 +125,7 @@ close(Transport, S) ->
|
|||
Transport:close(S).
|
||||
|
||||
listen_loop_post_auth(Transport, #stream_connection{socket = S, consumers = Consumers,
|
||||
target_subscriptions = TargetSubscriptions, credits = Credits, blocked = Blocked, heartbeater = Heartbeater} = State,
|
||||
stream_subscriptions = StreamSubscriptions, credits = Credits, blocked = Blocked, heartbeater = Heartbeater} = State,
|
||||
#configuration{credits_required_for_unblocking = CreditsRequiredForUnblocking} = Configuration) ->
|
||||
{OK, Closed, Error} = Transport:messages(),
|
||||
receive
|
||||
|
|
@ -160,13 +160,13 @@ listen_loop_post_auth(Transport, #stream_connection{socket = S, consumers = Cons
|
|||
listen_loop_post_auth(Transport, State2, Configuration)
|
||||
end;
|
||||
{stream_manager, cluster_deleted, ClusterReference} ->
|
||||
Target = list_to_binary(ClusterReference),
|
||||
State1 = case clean_state_after_target_deletion(Target, State) of
|
||||
Stream = list_to_binary(ClusterReference),
|
||||
State1 = case clean_state_after_stream_deletion(Stream, State) of
|
||||
{cleaned, NewState} ->
|
||||
TargetSize = byte_size(Target),
|
||||
FrameSize = 2 + 2 + 2 + 2 + TargetSize,
|
||||
StreamSize = byte_size(Stream),
|
||||
FrameSize = 2 + 2 + 2 + 2 + StreamSize,
|
||||
Transport:send(S, [<<FrameSize:32, ?COMMAND_METADATA_UPDATE:16, ?VERSION_0:16,
|
||||
?RESPONSE_CODE_TARGET_DELETED:16, TargetSize:16, Target/binary>>]),
|
||||
?RESPONSE_CODE_STREAM_DELETED:16, StreamSize:16, Stream/binary>>]),
|
||||
NewState;
|
||||
{not_cleaned, SameState} ->
|
||||
SameState
|
||||
|
|
@ -194,17 +194,17 @@ listen_loop_post_auth(Transport, #stream_connection{socket = S, consumers = Cons
|
|||
State
|
||||
end,
|
||||
listen_loop_post_auth(Transport, State1, Configuration);
|
||||
{osiris_offset, TargetName, -1} ->
|
||||
error_logger:info_msg("received osiris offset event for ~p with offset ~p~n", [TargetName, -1]),
|
||||
{osiris_offset, StreamName, -1} ->
|
||||
error_logger:info_msg("received osiris offset event for ~p with offset ~p~n", [StreamName, -1]),
|
||||
listen_loop_post_auth(Transport, State, Configuration);
|
||||
{osiris_offset, TargetName, Offset} when Offset > -1 ->
|
||||
State1 = case maps:get(TargetName, TargetSubscriptions, undefined) of
|
||||
{osiris_offset, StreamName, Offset} when Offset > -1 ->
|
||||
State1 = case maps:get(StreamName, StreamSubscriptions, undefined) of
|
||||
undefined ->
|
||||
error_logger:info_msg("osiris offset event for ~p, but no subscription (leftover messages after unsubscribe?)", [TargetName]),
|
||||
error_logger:info_msg("osiris offset event for ~p, but no subscription (leftover messages after unsubscribe?)", [StreamName]),
|
||||
State;
|
||||
[] ->
|
||||
error_logger:info_msg("osiris offset event for ~p, but no registered consumers!", [TargetName]),
|
||||
State#stream_connection{target_subscriptions = maps:remove(TargetName, TargetSubscriptions)};
|
||||
error_logger:info_msg("osiris offset event for ~p, but no registered consumers!", [StreamName]),
|
||||
State#stream_connection{stream_subscriptions = maps:remove(StreamName, StreamSubscriptions)};
|
||||
CorrelationIds when is_list(CorrelationIds) ->
|
||||
Consumers1 = lists:foldl(fun(CorrelationId, ConsumersAcc) ->
|
||||
#{CorrelationId := Consumer} = ConsumersAcc,
|
||||
|
|
@ -311,7 +311,7 @@ generate_publishing_error_details(Acc, <<>>) ->
|
|||
Acc;
|
||||
generate_publishing_error_details(Acc, <<PublishingId:64, MessageSize:32, _Message:MessageSize/binary, Rest/binary>>) ->
|
||||
generate_publishing_error_details(
|
||||
<<Acc/binary, PublishingId:64, ?RESPONSE_CODE_TARGET_DOES_NOT_EXIST:16>>,
|
||||
<<Acc/binary, PublishingId:64, ?RESPONSE_CODE_STREAM_DOES_NOT_EXIST:16>>,
|
||||
Rest).
|
||||
|
||||
handle_frame_pre_auth(Transport, #stream_connection{socket = S} = State,
|
||||
|
|
@ -441,9 +441,9 @@ handle_frame_pre_auth(_Transport, State, Frame, Rest) ->
|
|||
|
||||
handle_frame_post_auth(Transport, #stream_connection{socket = S, credits = Credits} = State,
|
||||
<<?COMMAND_PUBLISH:16, ?VERSION_0:16,
|
||||
TargetSize:16, Target:TargetSize/binary,
|
||||
StreamSize:16, Stream:StreamSize/binary,
|
||||
MessageCount:32, Messages/binary>>, Rest) ->
|
||||
case lookup_cluster(Target, State) of
|
||||
case lookup_cluster(Stream, State) of
|
||||
cluster_not_found ->
|
||||
FrameSize = 2 + 2 + 4 + (8 + 2) * MessageCount,
|
||||
Details = generate_publishing_error_details(<<>>, Messages),
|
||||
|
|
@ -455,16 +455,16 @@ handle_frame_post_auth(Transport, #stream_connection{socket = S, credits = Credi
|
|||
sub_credits(Credits, MessageCount),
|
||||
{State1, Rest}
|
||||
end;
|
||||
handle_frame_post_auth(Transport, #stream_connection{socket = Socket, consumers = Consumers, target_subscriptions = TargetSubscriptions} = State,
|
||||
<<?COMMAND_SUBSCRIBE:16, ?VERSION_0:16, CorrelationId:32, SubscriptionId:32, TargetSize:16, Target:TargetSize/binary, Offset:64/unsigned, Credit:16/signed>>, Rest) ->
|
||||
case lookup_cluster(Target, State) of
|
||||
handle_frame_post_auth(Transport, #stream_connection{socket = Socket, consumers = Consumers, stream_subscriptions = StreamSubscriptions} = State,
|
||||
<<?COMMAND_SUBSCRIBE:16, ?VERSION_0:16, CorrelationId:32, SubscriptionId:32, StreamSize:16, Stream:StreamSize/binary, Offset:64/unsigned, Credit:16/signed>>, Rest) ->
|
||||
case lookup_cluster(Stream, State) of
|
||||
cluster_not_found ->
|
||||
response(Transport, State, ?COMMAND_SUBSCRIBE, CorrelationId, ?RESPONSE_CODE_TARGET_DOES_NOT_EXIST),
|
||||
response(Transport, State, ?COMMAND_SUBSCRIBE, CorrelationId, ?RESPONSE_CODE_STREAM_DOES_NOT_EXIST),
|
||||
{State, Rest};
|
||||
{ClusterLeader, State1} ->
|
||||
% offset message uses a list for the target, so storing this in the state for easier retrieval
|
||||
TargetKey = binary_to_list(Target),
|
||||
case subscription_exists(TargetSubscriptions, SubscriptionId) of
|
||||
% offset message uses a list for the stream, so storing this in the state for easier retrieval
|
||||
StreamKey = binary_to_list(Stream),
|
||||
case subscription_exists(StreamSubscriptions, SubscriptionId) of
|
||||
true ->
|
||||
response(Transport, State1, ?COMMAND_SUBSCRIBE, CorrelationId, ?RESPONSE_CODE_SUBSCRIPTION_ID_ALREADY_EXISTS),
|
||||
{State1, Rest};
|
||||
|
|
@ -475,7 +475,7 @@ handle_frame_post_auth(Transport, #stream_connection{socket = Socket, consumers
|
|||
leader = ClusterLeader, offset = Offset, subscription_id = SubscriptionId, socket = Socket,
|
||||
segment = Segment,
|
||||
credit = Credit,
|
||||
target = TargetKey
|
||||
stream = StreamKey
|
||||
},
|
||||
error_logger:info_msg("registering consumer ~p in ~p~n", [ConsumerState, self()]),
|
||||
|
||||
|
|
@ -487,41 +487,41 @@ handle_frame_post_auth(Transport, #stream_connection{socket = Socket, consumers
|
|||
),
|
||||
Consumers1 = Consumers#{SubscriptionId => ConsumerState#consumer{segment = Segment1, credit = Credit1}},
|
||||
|
||||
TargetSubscriptions1 =
|
||||
case TargetSubscriptions of
|
||||
#{TargetKey := SubscriptionIds} ->
|
||||
TargetSubscriptions#{TargetKey => [SubscriptionId] ++ SubscriptionIds};
|
||||
StreamSubscriptions1 =
|
||||
case StreamSubscriptions of
|
||||
#{StreamKey := SubscriptionIds} ->
|
||||
StreamSubscriptions#{StreamKey => [SubscriptionId] ++ SubscriptionIds};
|
||||
_ ->
|
||||
TargetSubscriptions#{TargetKey => [SubscriptionId]}
|
||||
StreamSubscriptions#{StreamKey => [SubscriptionId]}
|
||||
end,
|
||||
{State1#stream_connection{consumers = Consumers1, target_subscriptions = TargetSubscriptions1}, Rest}
|
||||
{State1#stream_connection{consumers = Consumers1, stream_subscriptions = StreamSubscriptions1}, Rest}
|
||||
end
|
||||
end;
|
||||
handle_frame_post_auth(Transport, #stream_connection{consumers = Consumers, target_subscriptions = TargetSubscriptions, clusters = Clusters} = State,
|
||||
handle_frame_post_auth(Transport, #stream_connection{consumers = Consumers, stream_subscriptions = StreamSubscriptions, clusters = Clusters} = State,
|
||||
<<?COMMAND_UNSUBSCRIBE:16, ?VERSION_0:16, CorrelationId:32, SubscriptionId:32>>, Rest) ->
|
||||
case subscription_exists(TargetSubscriptions, SubscriptionId) of
|
||||
case subscription_exists(StreamSubscriptions, SubscriptionId) of
|
||||
false ->
|
||||
response(Transport, State, ?COMMAND_UNSUBSCRIBE, CorrelationId, ?RESPONSE_CODE_SUBSCRIPTION_ID_DOES_NOT_EXIST),
|
||||
{State, Rest};
|
||||
true ->
|
||||
#{SubscriptionId := Consumer} = Consumers,
|
||||
Target = Consumer#consumer.target,
|
||||
#{Target := SubscriptionsForThisTarget} = TargetSubscriptions,
|
||||
SubscriptionsForThisTarget1 = lists:delete(SubscriptionId, SubscriptionsForThisTarget),
|
||||
{TargetSubscriptions1, Clusters1} =
|
||||
case length(SubscriptionsForThisTarget1) of
|
||||
Stream = Consumer#consumer.stream,
|
||||
#{Stream := SubscriptionsForThisStream} = StreamSubscriptions,
|
||||
SubscriptionsForThisStream1 = lists:delete(SubscriptionId, SubscriptionsForThisStream),
|
||||
{StreamSubscriptions1, Clusters1} =
|
||||
case length(SubscriptionsForThisStream1) of
|
||||
0 ->
|
||||
%% no more subscriptions for this target
|
||||
{maps:remove(Target, TargetSubscriptions),
|
||||
maps:remove(list_to_binary(Target), Clusters)
|
||||
%% no more subscriptions for this stream
|
||||
{maps:remove(Stream, StreamSubscriptions),
|
||||
maps:remove(list_to_binary(Stream), Clusters)
|
||||
};
|
||||
_ ->
|
||||
{TargetSubscriptions#{Target => SubscriptionsForThisTarget1}, Clusters}
|
||||
{StreamSubscriptions#{Stream => SubscriptionsForThisStream1}, Clusters}
|
||||
end,
|
||||
Consumers1 = maps:remove(SubscriptionId, Consumers),
|
||||
response_ok(Transport, State, ?COMMAND_SUBSCRIBE, CorrelationId),
|
||||
{State#stream_connection{consumers = Consumers1,
|
||||
target_subscriptions = TargetSubscriptions1,
|
||||
stream_subscriptions = StreamSubscriptions1,
|
||||
clusters = Clusters1
|
||||
}, Rest}
|
||||
end;
|
||||
|
|
@ -546,38 +546,38 @@ handle_frame_post_auth(Transport, #stream_connection{consumers = Consumers} = St
|
|||
{State, Rest}
|
||||
end;
|
||||
handle_frame_post_auth(Transport, State,
|
||||
<<?COMMAND_CREATE_TARGET:16, ?VERSION_0:16, CorrelationId:32, TargetSize:16, Target:TargetSize/binary>>, Rest) ->
|
||||
case rabbit_stream_manager:create(binary_to_list(Target)) of
|
||||
<<?COMMAND_CREATE_STREAM:16, ?VERSION_0:16, CorrelationId:32, StreamSize:16, Stream:StreamSize/binary>>, Rest) ->
|
||||
case rabbit_stream_manager:create(binary_to_list(Stream)) of
|
||||
{ok, #{leader_pid := LeaderPid, replica_pids := ReturnedReplicas}} ->
|
||||
error_logger:info_msg("Created cluster with leader ~p and replicas ~p~n", [LeaderPid, ReturnedReplicas]),
|
||||
response_ok(Transport, State, ?COMMAND_CREATE_TARGET, CorrelationId),
|
||||
response_ok(Transport, State, ?COMMAND_CREATE_STREAM, CorrelationId),
|
||||
{State, Rest};
|
||||
{error, reference_already_exists} ->
|
||||
response(Transport, State, ?COMMAND_CREATE_TARGET, CorrelationId, ?RESPONSE_CODE_TARGET_ALREADY_EXISTS),
|
||||
response(Transport, State, ?COMMAND_CREATE_STREAM, CorrelationId, ?RESPONSE_CODE_STREAM_ALREADY_EXISTS),
|
||||
{State, Rest}
|
||||
end;
|
||||
handle_frame_post_auth(Transport, #stream_connection{socket = S} = State,
|
||||
<<?COMMAND_DELETE_TARGET:16, ?VERSION_0:16, CorrelationId:32, TargetSize:16, Target:TargetSize/binary>>, Rest) ->
|
||||
case rabbit_stream_manager:delete(binary_to_list(Target)) of
|
||||
<<?COMMAND_DELETE_STREAM:16, ?VERSION_0:16, CorrelationId:32, StreamSize:16, Stream:StreamSize/binary>>, Rest) ->
|
||||
case rabbit_stream_manager:delete(binary_to_list(Stream)) of
|
||||
{ok, deleted} ->
|
||||
response_ok(Transport, State, ?COMMAND_DELETE_TARGET, CorrelationId),
|
||||
State1 = case clean_state_after_target_deletion(Target, State) of
|
||||
response_ok(Transport, State, ?COMMAND_DELETE_STREAM, CorrelationId),
|
||||
State1 = case clean_state_after_stream_deletion(Stream, State) of
|
||||
{cleaned, NewState} ->
|
||||
TargetSize = byte_size(Target),
|
||||
FrameSize = 2 + 2 + 2 + 2 + TargetSize,
|
||||
StreamSize = byte_size(Stream),
|
||||
FrameSize = 2 + 2 + 2 + 2 + StreamSize,
|
||||
Transport:send(S, [<<FrameSize:32, ?COMMAND_METADATA_UPDATE:16, ?VERSION_0:16,
|
||||
?RESPONSE_CODE_TARGET_DELETED:16, TargetSize:16, Target/binary>>]),
|
||||
?RESPONSE_CODE_STREAM_DELETED:16, StreamSize:16, Stream/binary>>]),
|
||||
NewState;
|
||||
{not_cleaned, SameState} ->
|
||||
SameState
|
||||
end,
|
||||
{State1, Rest};
|
||||
{error, reference_not_found} ->
|
||||
response(Transport, State, ?COMMAND_DELETE_TARGET, CorrelationId, ?RESPONSE_CODE_TARGET_DOES_NOT_EXIST),
|
||||
response(Transport, State, ?COMMAND_DELETE_STREAM, CorrelationId, ?RESPONSE_CODE_STREAM_DOES_NOT_EXIST),
|
||||
{State, Rest}
|
||||
end;
|
||||
handle_frame_post_auth(Transport, #stream_connection{socket = S} = State,
|
||||
<<?COMMAND_METADATA:16, ?VERSION_0:16, CorrelationId:32, TargetCount:32, BinaryTargets/binary>>, Rest) ->
|
||||
<<?COMMAND_METADATA:16, ?VERSION_0:16, CorrelationId:32, StreamCount:32, BinaryStreams/binary>>, Rest) ->
|
||||
%% FIXME: rely only on rabbit_networking to discover the listeners
|
||||
Nodes = rabbit_mnesia:cluster_nodes(all),
|
||||
{NodesInfo, _} = lists:foldl(fun(Node, {Acc, Index}) ->
|
||||
|
|
@ -592,13 +592,13 @@ handle_frame_post_auth(Transport, #stream_connection{socket = S} = State,
|
|||
<<Acc/binary, Index:16, HostLength:16, Host:HostLength/binary, Port:32>>
|
||||
end, <<BrokersCount:32>>, NodesInfo),
|
||||
|
||||
Targets = extract_target_list(BinaryTargets, []),
|
||||
Streams = extract_stream_list(BinaryStreams, []),
|
||||
|
||||
MetadataBin = lists:foldl(fun(Target, Acc) ->
|
||||
TargetLength = byte_size(Target),
|
||||
case lookup_cluster(Target, State) of
|
||||
MetadataBin = lists:foldl(fun(Stream, Acc) ->
|
||||
StreamLength = byte_size(Stream),
|
||||
case lookup_cluster(Stream, State) of
|
||||
cluster_not_found ->
|
||||
<<Acc/binary, TargetLength:16, Target:TargetLength/binary, ?RESPONSE_CODE_TARGET_DOES_NOT_EXIST:16,
|
||||
<<Acc/binary, StreamLength:16, Stream:StreamLength/binary, ?RESPONSE_CODE_STREAM_DOES_NOT_EXIST:16,
|
||||
-1:16, 0:32>>;
|
||||
{Cluster, _} ->
|
||||
LeaderNode = node(Cluster),
|
||||
|
|
@ -611,11 +611,11 @@ handle_frame_post_auth(Transport, #stream_connection{socket = S} = State,
|
|||
end, <<>>, maps:values(Replicas)),
|
||||
ReplicasCount = maps:size(Replicas),
|
||||
|
||||
<<Acc/binary, TargetLength:16, Target:TargetLength/binary, ?RESPONSE_CODE_OK:16,
|
||||
<<Acc/binary, StreamLength:16, Stream:StreamLength/binary, ?RESPONSE_CODE_OK:16,
|
||||
LeaderIndex:16, ReplicasCount:32, ReplicasBinary/binary>>
|
||||
end
|
||||
|
||||
end, <<TargetCount:32>>, Targets),
|
||||
end, <<StreamCount:32>>, Streams),
|
||||
Frame = <<?COMMAND_METADATA:16, ?VERSION_0:16, CorrelationId:32, BrokersBin/binary, MetadataBin/binary>>,
|
||||
FrameSize = byte_size(Frame),
|
||||
Transport:send(S, <<FrameSize:32, Frame/binary>>),
|
||||
|
|
@ -670,41 +670,41 @@ auth_mechanism_to_module(TypeBin, Sock) ->
|
|||
end
|
||||
end.
|
||||
|
||||
extract_target_list(<<>>, Targets) ->
|
||||
Targets;
|
||||
extract_target_list(<<Length:16, Target:Length/binary, Rest/binary>>, Targets) ->
|
||||
extract_target_list(Rest, [Target | Targets]).
|
||||
extract_stream_list(<<>>, Streams) ->
|
||||
Streams;
|
||||
extract_stream_list(<<Length:16, Stream:Length/binary, Rest/binary>>, Streams) ->
|
||||
extract_stream_list(Rest, [Stream | Streams]).
|
||||
|
||||
clean_state_after_target_deletion(Target, #stream_connection{clusters = Clusters, target_subscriptions = TargetSubscriptions,
|
||||
clean_state_after_stream_deletion(Stream, #stream_connection{clusters = Clusters, stream_subscriptions = StreamSubscriptions,
|
||||
consumers = Consumers} = State) ->
|
||||
TargetAsList = binary_to_list(Target),
|
||||
case maps:is_key(TargetAsList, TargetSubscriptions) of
|
||||
StreamAsList = binary_to_list(Stream),
|
||||
case maps:is_key(StreamAsList, StreamSubscriptions) of
|
||||
true ->
|
||||
#{TargetAsList := SubscriptionIds} = TargetSubscriptions,
|
||||
#{StreamAsList := SubscriptionIds} = StreamSubscriptions,
|
||||
{cleaned, State#stream_connection{
|
||||
clusters = maps:remove(Target, Clusters),
|
||||
target_subscriptions = maps:remove(TargetAsList, TargetSubscriptions),
|
||||
clusters = maps:remove(Stream, Clusters),
|
||||
stream_subscriptions = maps:remove(StreamAsList, StreamSubscriptions),
|
||||
consumers = maps:without(SubscriptionIds, Consumers)
|
||||
}};
|
||||
false ->
|
||||
{not_cleaned, State}
|
||||
end.
|
||||
|
||||
lookup_cluster(Target, #stream_connection{clusters = Clusters} = State) ->
|
||||
case maps:get(Target, Clusters, undefined) of
|
||||
lookup_cluster(Stream, #stream_connection{clusters = Clusters} = State) ->
|
||||
case maps:get(Stream, Clusters, undefined) of
|
||||
undefined ->
|
||||
case lookup_cluster_from_manager(Target) of
|
||||
case lookup_cluster_from_manager(Stream) of
|
||||
cluster_not_found ->
|
||||
cluster_not_found;
|
||||
ClusterPid ->
|
||||
{ClusterPid, State#stream_connection{clusters = Clusters#{Target => ClusterPid}}}
|
||||
{ClusterPid, State#stream_connection{clusters = Clusters#{Stream => ClusterPid}}}
|
||||
end;
|
||||
ClusterPid ->
|
||||
{ClusterPid, State}
|
||||
end.
|
||||
|
||||
lookup_cluster_from_manager(Target) ->
|
||||
rabbit_stream_manager:lookup(Target).
|
||||
lookup_cluster_from_manager(Stream) ->
|
||||
rabbit_stream_manager:lookup(Stream).
|
||||
|
||||
frame(Transport, #stream_connection{socket = S}, Frame) ->
|
||||
FrameSize = byte_size(Frame),
|
||||
|
|
@ -716,8 +716,8 @@ response_ok(Transport, State, CommandId, CorrelationId) ->
|
|||
response(Transport, #stream_connection{socket = S}, CommandId, CorrelationId, ResponseCode) ->
|
||||
Transport:send(S, [<<?RESPONSE_FRAME_SIZE:32, CommandId:16, ?VERSION_0:16>>, <<CorrelationId:32>>, <<ResponseCode:16>>]).
|
||||
|
||||
subscription_exists(TargetSubscriptions, SubscriptionId) ->
|
||||
SubscriptionIds = lists:flatten(maps:values(TargetSubscriptions)),
|
||||
subscription_exists(StreamSubscriptions, SubscriptionId) ->
|
||||
SubscriptionIds = lists:flatten(maps:values(StreamSubscriptions)),
|
||||
lists:any(fun(Id) -> Id =:= SubscriptionId end, SubscriptionIds).
|
||||
|
||||
send_file_callback(Transport, #consumer{socket = S, subscription_id = SubscriptionId}) ->
|
||||
|
|
|
|||
|
|
@ -75,15 +75,15 @@ test_server(Port) ->
|
|||
{ok, S} = gen_tcp:connect("localhost", Port, [{active, false},
|
||||
{mode, binary}]),
|
||||
test_authenticate(S),
|
||||
Target = <<"target1">>,
|
||||
test_create_target(S, Target),
|
||||
Stream = <<"stream1">>,
|
||||
test_create_stream(S, Stream),
|
||||
Body = <<"hello">>,
|
||||
test_publish_confirm(S, Target, Body),
|
||||
test_publish_confirm(S, Stream, Body),
|
||||
SubscriptionId = 42,
|
||||
test_subscribe(S, SubscriptionId, Target),
|
||||
test_subscribe(S, SubscriptionId, Stream),
|
||||
test_deliver(S, SubscriptionId, Body),
|
||||
test_delete_target(S, Target),
|
||||
test_metadata_update_target_deleted(S, Target),
|
||||
test_delete_stream(S, Stream),
|
||||
test_metadata_update_stream_deleted(S, Stream),
|
||||
test_close(S),
|
||||
closed = wait_for_socket_close(S, 10),
|
||||
ok.
|
||||
|
|
@ -142,32 +142,32 @@ test_authenticate(S) ->
|
|||
{ok, <<10:32, ?COMMAND_OPEN:16, ?VERSION_0:16, 3:32, ?RESPONSE_CODE_OK:16>>} = gen_tcp:recv(S, 0, 5000).
|
||||
|
||||
|
||||
test_create_target(S, Target) ->
|
||||
TargetSize = byte_size(Target),
|
||||
CreateTargetFrame = <<?COMMAND_CREATE_TARGET:16, ?VERSION_0:16, 1:32, TargetSize:16, Target:TargetSize/binary>>,
|
||||
FrameSize = byte_size(CreateTargetFrame),
|
||||
gen_tcp:send(S, <<FrameSize:32, CreateTargetFrame/binary>>),
|
||||
{ok, <<_Size:32, ?COMMAND_CREATE_TARGET:16, ?VERSION_0:16, 1:32, ?RESPONSE_CODE_OK:16>>} = gen_tcp:recv(S, 0, 5000).
|
||||
test_create_stream(S, Stream) ->
|
||||
StreamSize = byte_size(Stream),
|
||||
CreateStreamFrame = <<?COMMAND_CREATE_STREAM:16, ?VERSION_0:16, 1:32, StreamSize:16, Stream:StreamSize/binary>>,
|
||||
FrameSize = byte_size(CreateStreamFrame),
|
||||
gen_tcp:send(S, <<FrameSize:32, CreateStreamFrame/binary>>),
|
||||
{ok, <<_Size:32, ?COMMAND_CREATE_STREAM:16, ?VERSION_0:16, 1:32, ?RESPONSE_CODE_OK:16>>} = gen_tcp:recv(S, 0, 5000).
|
||||
|
||||
test_delete_target(S, Target) ->
|
||||
TargetSize = byte_size(Target),
|
||||
DeleteTargetFrame = <<?COMMAND_DELETE_TARGET:16, ?VERSION_0:16, 1:32, TargetSize:16, Target:TargetSize/binary>>,
|
||||
FrameSize = byte_size(DeleteTargetFrame),
|
||||
gen_tcp:send(S, <<FrameSize:32, DeleteTargetFrame/binary>>),
|
||||
test_delete_stream(S, Stream) ->
|
||||
StreamSize = byte_size(Stream),
|
||||
DeleteStreamFrame = <<?COMMAND_DELETE_STREAM:16, ?VERSION_0:16, 1:32, StreamSize:16, Stream:StreamSize/binary>>,
|
||||
FrameSize = byte_size(DeleteStreamFrame),
|
||||
gen_tcp:send(S, <<FrameSize:32, DeleteStreamFrame/binary>>),
|
||||
ResponseFrameSize = 10,
|
||||
{ok, <<ResponseFrameSize:32, ?COMMAND_DELETE_TARGET:16, ?VERSION_0:16, 1:32, ?RESPONSE_CODE_OK:16>>} = gen_tcp:recv(S, 4 + 10, 5000).
|
||||
{ok, <<ResponseFrameSize:32, ?COMMAND_DELETE_STREAM:16, ?VERSION_0:16, 1:32, ?RESPONSE_CODE_OK:16>>} = gen_tcp:recv(S, 4 + 10, 5000).
|
||||
|
||||
test_publish_confirm(S, Target, Body) ->
|
||||
test_publish_confirm(S, Stream, Body) ->
|
||||
BodySize = byte_size(Body),
|
||||
TargetSize = byte_size(Target),
|
||||
PublishFrame = <<?COMMAND_PUBLISH:16, ?VERSION_0:16, TargetSize:16, Target:TargetSize/binary, 1:32, 1:64, BodySize:32, Body:BodySize/binary>>,
|
||||
StreamSize = byte_size(Stream),
|
||||
PublishFrame = <<?COMMAND_PUBLISH:16, ?VERSION_0:16, StreamSize:16, Stream:StreamSize/binary, 1:32, 1:64, BodySize:32, Body:BodySize/binary>>,
|
||||
FrameSize = byte_size(PublishFrame),
|
||||
gen_tcp:send(S, <<FrameSize:32, PublishFrame/binary>>),
|
||||
{ok, <<_Size:32, ?COMMAND_PUBLISH_CONFIRM:16, ?VERSION_0:16, 1:32, 1:64>>} = gen_tcp:recv(S, 0, 5000).
|
||||
|
||||
test_subscribe(S, SubscriptionId, Target) ->
|
||||
TargetSize = byte_size(Target),
|
||||
SubscribeFrame = <<?COMMAND_SUBSCRIBE:16, ?VERSION_0:16, 1:32, SubscriptionId:32, TargetSize:16, Target:TargetSize/binary, 0:64, 10:16>>,
|
||||
test_subscribe(S, SubscriptionId, Stream) ->
|
||||
StreamSize = byte_size(Stream),
|
||||
SubscribeFrame = <<?COMMAND_SUBSCRIBE:16, ?VERSION_0:16, 1:32, SubscriptionId:32, StreamSize:16, Stream:StreamSize/binary, 0:64, 10:16>>,
|
||||
FrameSize = byte_size(SubscribeFrame),
|
||||
gen_tcp:send(S, <<FrameSize:32, SubscribeFrame/binary>>),
|
||||
Res = gen_tcp:recv(S, 0, 5000),
|
||||
|
|
@ -179,9 +179,9 @@ test_deliver(S, SubscriptionId, Body) ->
|
|||
<<48:32, ?COMMAND_DELIVER:16, ?VERSION_0:16, SubscriptionId:32, 5:4/unsigned, 0:4/unsigned, 1:16, 1:32, _Epoch:64, 0:64, _Crc:32, _DataLength:32,
|
||||
0:1, BodySize:31/unsigned, Body/binary>> = Frame.
|
||||
|
||||
test_metadata_update_target_deleted(S, Target) ->
|
||||
TargetSize = byte_size(Target),
|
||||
{ok, <<15:32, ?COMMAND_METADATA_UPDATE:16, ?VERSION_0:16, ?RESPONSE_CODE_TARGET_DELETED:16, TargetSize:16, Target/binary>>} = gen_tcp:recv(S, 0, 5000).
|
||||
test_metadata_update_stream_deleted(S, Stream) ->
|
||||
StreamSize = byte_size(Stream),
|
||||
{ok, <<15:32, ?COMMAND_METADATA_UPDATE:16, ?VERSION_0:16, ?RESPONSE_CODE_STREAM_DELETED:16, StreamSize:16, Stream/binary>>} = gen_tcp:recv(S, 0, 5000).
|
||||
|
||||
test_close(S) ->
|
||||
CloseReason = <<"OK">>,
|
||||
|
|
|
|||
Loading…
Reference in New Issue