190 lines
		
	
	
		
			7.8 KiB
		
	
	
	
		
			Erlang
		
	
	
	
			
		
		
	
	
			190 lines
		
	
	
		
			7.8 KiB
		
	
	
	
		
			Erlang
		
	
	
	
%% The contents of this file are subject to the Mozilla Public License
 | 
						|
%% at https://www.mozilla.org/en-US/MPL/2.0/
 | 
						|
%%
 | 
						|
%% Software distributed under the License is distributed on an "AS IS"
 | 
						|
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
 | 
						|
%% the License for the specific language governing rights and
 | 
						|
%% limitations under the License.
 | 
						|
%%
 | 
						|
%% The Original Code is RabbitMQ.
 | 
						|
%%
 | 
						|
%% The Initial Developer of the Original Code is Pivotal Software, Inc.
 | 
						|
%% Copyright (c) 2024-2025 Broadcom. All Rights Reserved.
 | 
						|
%% The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
 | 
						|
%%
 | 
						|
 | 
						|
-module(rabbit_stream_reader_SUITE).
 | 
						|
 | 
						|
-compile(export_all).
 | 
						|
 | 
						|
-include_lib("eunit/include/eunit.hrl").
 | 
						|
-include_lib("rabbitmq_stream/src/rabbit_stream_reader.hrl").
 | 
						|
-include_lib("rabbitmq_stream_common/include/rabbit_stream.hrl").
 | 
						|
 | 
						|
-import(rabbit_stream_reader, [ensure_token_expiry_timer/2]).
 | 
						|
 | 
						|
%%%===================================================================
 | 
						|
%%% Common Test callbacks
 | 
						|
%%%===================================================================
 | 
						|
 | 
						|
all() ->
 | 
						|
    [{group, tests}].
 | 
						|
 | 
						|
%% replicate eunit like test resolution
 | 
						|
all_tests() ->
 | 
						|
    [F
 | 
						|
     || {F, _} <- ?MODULE:module_info(functions),
 | 
						|
        re:run(atom_to_list(F), "_test$") /= nomatch].
 | 
						|
 | 
						|
groups() ->
 | 
						|
    [{tests, [], all_tests()}].
 | 
						|
 | 
						|
init_per_suite(Config) ->
 | 
						|
    Config.
 | 
						|
 | 
						|
end_per_suite(_Config) ->
 | 
						|
    ok.
 | 
						|
 | 
						|
init_per_group(_Group, Config) ->
 | 
						|
    Config.
 | 
						|
 | 
						|
end_per_group(_Group, _Config) ->
 | 
						|
    ok.
 | 
						|
 | 
						|
init_per_testcase(_TestCase, Config) ->
 | 
						|
    Config.
 | 
						|
 | 
						|
end_per_testcase(_TestCase, _Config) ->
 | 
						|
    meck:unload(),
 | 
						|
    ok.
 | 
						|
 | 
						|
ensure_token_expiry_timer_test(_) ->
 | 
						|
    ok = meck:new(rabbit_access_control),
 | 
						|
 | 
						|
    meck:expect(rabbit_access_control, permission_cache_can_expire, fun (_) -> false end),
 | 
						|
    {_, #stream_connection{token_expiry_timer = TR1}} = ensure_token_expiry_timer(#user{}, #stream_connection{}),
 | 
						|
    ?assertEqual(undefined, TR1),
 | 
						|
 | 
						|
    meck:expect(rabbit_access_control, permission_cache_can_expire, fun (_) -> true end),
 | 
						|
    meck:expect(rabbit_access_control, expiry_timestamp, fun (_) -> never end),
 | 
						|
    {_, #stream_connection{token_expiry_timer = TR2}} = ensure_token_expiry_timer(#user{}, #stream_connection{}),
 | 
						|
    ?assertEqual(undefined, TR2),
 | 
						|
 | 
						|
    Now = os:system_time(second),
 | 
						|
    meck:expect(rabbit_access_control, expiry_timestamp, fun (_) -> Now + 60 end),
 | 
						|
    {_, #stream_connection{token_expiry_timer = TR3}} = ensure_token_expiry_timer(#user{}, #stream_connection{}),
 | 
						|
    Cancel3 = erlang:cancel_timer(TR3, [{async, false}, {info, true}]),
 | 
						|
    ?assert(is_integer(Cancel3)),
 | 
						|
 | 
						|
    meck:expect(rabbit_access_control, expiry_timestamp, fun (_) -> Now - 60 end),
 | 
						|
    {_, #stream_connection{token_expiry_timer = TR4}} = ensure_token_expiry_timer(#user{}, #stream_connection{}),
 | 
						|
    ?assertEqual(undefined, TR4),
 | 
						|
 | 
						|
    DummyTRef = erlang:send_after(1_000 * 1_000, self(), dummy),
 | 
						|
    meck:expect(rabbit_access_control, permission_cache_can_expire, fun (_) -> false end),
 | 
						|
    {Cancel5, #stream_connection{token_expiry_timer = TR5}} = ensure_token_expiry_timer(#user{},
 | 
						|
                                                                                        #stream_connection{token_expiry_timer = DummyTRef}),
 | 
						|
    ?assertEqual(undefined, TR5),
 | 
						|
    ?assert(is_integer(Cancel5)),
 | 
						|
 | 
						|
    ok.
 | 
						|
 | 
						|
evaluate_state_after_secret_update_test(_) ->
 | 
						|
    Mod = rabbit_stream_reader,
 | 
						|
    meck:new(Mod, [passthrough]),
 | 
						|
 | 
						|
    ModUtils = rabbit_stream_utils,
 | 
						|
    meck:new(ModUtils, [passthrough]),
 | 
						|
    CheckFun = fun(N) ->
 | 
						|
                       case binary:match(N, <<"ok_">>) of
 | 
						|
                           nomatch ->
 | 
						|
                               error;
 | 
						|
                           _ ->
 | 
						|
                               ok
 | 
						|
                       end
 | 
						|
               end,
 | 
						|
    meck:expect(ModUtils, check_write_permitted, fun(#resource{name = N}, _) -> CheckFun(N) end),
 | 
						|
    meck:expect(ModUtils, check_read_permitted, fun(#resource{name = N}, _, _) -> CheckFun(N) end),
 | 
						|
 | 
						|
    ModAccess = rabbit_access_control,
 | 
						|
    meck:new(ModAccess),
 | 
						|
    meck:expect(ModAccess, permission_cache_can_expire, 1, false),
 | 
						|
 | 
						|
    meck:new(rabbit_stream_metrics, [stub_all]),
 | 
						|
    meck:new(rabbit_global_counters, [stub_all]),
 | 
						|
 | 
						|
    ModTransport = dummy_transport,
 | 
						|
    meck:new(ModTransport, [non_strict]),
 | 
						|
    meck:expect(ModTransport, send, 2, ok),
 | 
						|
 | 
						|
    ModLog = osiris_log,
 | 
						|
    meck:new(ModLog),
 | 
						|
    meck:expect(ModLog, init, 1, ok),
 | 
						|
    put(close_log_count, 0),
 | 
						|
    meck:expect(ModLog, close, fun(_) -> put(close_log_count, get(close_log_count) + 1) end),
 | 
						|
 | 
						|
    ModCore = rabbit_stream_core,
 | 
						|
    meck:new(ModCore),
 | 
						|
    put(metadata_update, []),
 | 
						|
    meck:expect(ModCore, frame, fun(Cmd) -> put(metadata_update, [Cmd | get(metadata_update)]) end),
 | 
						|
 | 
						|
    Publishers = #{0 => #publisher{stream = <<"ok_publish">>},
 | 
						|
                   1 => #publisher{stream = <<"ko_publish">>},
 | 
						|
                   2 => #publisher{stream = <<"ok_publish_consume">>},
 | 
						|
                   3 => #publisher{stream = <<"ko_publish_consume">>}},
 | 
						|
    Subscriptions = #{<<"ok_consume">> => [0],
 | 
						|
                      <<"ko_consume">> => [1],
 | 
						|
                      <<"ok_publish_consume">> => [2],
 | 
						|
                      <<"ko_publish_consume">> => [3]},
 | 
						|
    Consumers = #{0 => consumer(<<"ok_consume">>),
 | 
						|
                  1 => consumer(<<"ko_consume">>),
 | 
						|
                  2 => consumer(<<"ok_publish_consume">>),
 | 
						|
                  3 => consumer(<<"ko_publish_consume">>)},
 | 
						|
 | 
						|
    {C1, S1} = Mod:evaluate_state_after_secret_update(ModTransport, #user{},
 | 
						|
                                                      #stream_connection{publishers = Publishers,
 | 
						|
                                                                         stream_subscriptions = Subscriptions,
 | 
						|
                                                                         user = #user{}},
 | 
						|
                                                      #stream_connection_state{consumers = Consumers}),
 | 
						|
 | 
						|
    meck:validate(ModLog),
 | 
						|
    ?assertEqual(2, get(close_log_count)),
 | 
						|
    erase(close_log_count),
 | 
						|
 | 
						|
    Cmds = get(metadata_update),
 | 
						|
    ?assertEqual(3, length(Cmds)),
 | 
						|
    ?assert(lists:member({metadata_update, <<"ko_publish">>, ?RESPONSE_CODE_STREAM_NOT_AVAILABLE}, Cmds)),
 | 
						|
    ?assert(lists:member({metadata_update, <<"ko_consume">>, ?RESPONSE_CODE_STREAM_NOT_AVAILABLE}, Cmds)),
 | 
						|
    ?assert(lists:member({metadata_update, <<"ko_publish_consume">>, ?RESPONSE_CODE_STREAM_NOT_AVAILABLE}, Cmds)),
 | 
						|
    erase(metadata_update),
 | 
						|
 | 
						|
    #stream_connection{token_expiry_timer = TRef1,
 | 
						|
                       publishers = Pubs1,
 | 
						|
                       stream_subscriptions = Subs1} = C1,
 | 
						|
    ?assertEqual(undefined, TRef1), %% no expiry set in the mock
 | 
						|
    ?assertEqual(2, maps:size(Pubs1)),
 | 
						|
    ?assertEqual(#publisher{stream = <<"ok_publish">>}, maps:get(0, Pubs1)),
 | 
						|
    ?assertEqual(#publisher{stream = <<"ok_publish_consume">>}, maps:get(2, Pubs1)),
 | 
						|
 | 
						|
    #stream_connection_state{consumers = Cons1} = S1,
 | 
						|
    ?assertEqual([0], maps:get(<<"ok_consume">>, Subs1)),
 | 
						|
    ?assertEqual([2], maps:get(<<"ok_publish_consume">>, Subs1)),
 | 
						|
    ?assertEqual(consumer(<<"ok_consume">>), maps:get(0, Cons1)),
 | 
						|
    ?assertEqual(consumer(<<"ok_publish_consume">>), maps:get(2, Cons1)),
 | 
						|
 | 
						|
    %% making sure the token expiry timer is set if the token expires
 | 
						|
    meck:expect(ModAccess, permission_cache_can_expire, 1, true),
 | 
						|
    Now = os:system_time(second),
 | 
						|
    meck:expect(rabbit_access_control, expiry_timestamp, fun (_) -> Now + 60 end),
 | 
						|
    {C2, _} = Mod:evaluate_state_after_secret_update(ModTransport, #user{},
 | 
						|
                                                     #stream_connection{user = #user{}},
 | 
						|
                                                     #stream_connection_state{}),
 | 
						|
    #stream_connection{token_expiry_timer = TRef2} = C2,
 | 
						|
    Cancel2 = erlang:cancel_timer(TRef2, [{async, false}, {info, true}]),
 | 
						|
    ?assert(is_integer(Cancel2)),
 | 
						|
    ok.
 | 
						|
 | 
						|
consumer(S) ->
 | 
						|
    #consumer{configuration = #consumer_configuration{stream = S},
 | 
						|
              log = osiris_log:init(#{})}.
 |