parent
							
								
									70538c587f
								
							
						
					
					
						commit
						f20f415576
					
				| 
						 | 
					@ -52,9 +52,12 @@ end_per_group(_Group, _Config) ->
 | 
				
			||||||
    ok.
 | 
					    ok.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
init_per_testcase(_TestCase, Config) ->
 | 
					init_per_testcase(_TestCase, Config) ->
 | 
				
			||||||
 | 
					    ok = meck:new(rabbit_feature_flags),
 | 
				
			||||||
 | 
					    meck:expect(rabbit_feature_flags, is_enabled, fun (_) -> true end),
 | 
				
			||||||
    Config.
 | 
					    Config.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
end_per_testcase(_TestCase, _Config) ->
 | 
					end_per_testcase(_TestCase, _Config) ->
 | 
				
			||||||
 | 
					    meck:unload(),
 | 
				
			||||||
    ok.
 | 
					    ok.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
simple_sac_test(_) ->
 | 
					simple_sac_test(_) ->
 | 
				
			||||||
| 
						 | 
					@ -71,7 +74,7 @@ simple_sac_test(_) ->
 | 
				
			||||||
        rabbit_stream_sac_coordinator:apply(Command0, State0),
 | 
					        rabbit_stream_sac_coordinator:apply(Command0, State0),
 | 
				
			||||||
    ?assert(Active1),
 | 
					    ?assert(Active1),
 | 
				
			||||||
    ?assertEqual([consumer(ConnectionPid, 0, true)], Consumers1),
 | 
					    ?assertEqual([consumer(ConnectionPid, 0, true)], Consumers1),
 | 
				
			||||||
    assertSendMessageEffect(ConnectionPid, 0, true, Effects1),
 | 
					    assertSendMessageEffect(ConnectionPid, 0, Stream, ConsumerName, true, Effects1),
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    Command1 =
 | 
					    Command1 =
 | 
				
			||||||
        register_consumer_command(Stream, -1, ConsumerName, ConnectionPid, 1),
 | 
					        register_consumer_command(Stream, -1, ConsumerName, ConnectionPid, 1),
 | 
				
			||||||
| 
						 | 
					@ -107,7 +110,7 @@ simple_sac_test(_) ->
 | 
				
			||||||
    ?assertEqual([consumer(ConnectionPid, 1, true),
 | 
					    ?assertEqual([consumer(ConnectionPid, 1, true),
 | 
				
			||||||
                  consumer(ConnectionPid, 2, false)],
 | 
					                  consumer(ConnectionPid, 2, false)],
 | 
				
			||||||
                 Consumers4),
 | 
					                 Consumers4),
 | 
				
			||||||
    assertSendMessageEffect(ConnectionPid, 1, true, Effects4),
 | 
					    assertSendMessageEffect(ConnectionPid, 1, Stream, ConsumerName, true, Effects4),
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    Command4 =
 | 
					    Command4 =
 | 
				
			||||||
        unregister_consumer_command(Stream, ConsumerName, ConnectionPid, 1),
 | 
					        unregister_consumer_command(Stream, ConsumerName, ConnectionPid, 1),
 | 
				
			||||||
| 
						 | 
					@ -116,7 +119,7 @@ simple_sac_test(_) ->
 | 
				
			||||||
     ok, Effects5} =
 | 
					     ok, Effects5} =
 | 
				
			||||||
        rabbit_stream_sac_coordinator:apply(Command4, State4),
 | 
					        rabbit_stream_sac_coordinator:apply(Command4, State4),
 | 
				
			||||||
    ?assertEqual([consumer(ConnectionPid, 2, true)], Consumers5),
 | 
					    ?assertEqual([consumer(ConnectionPid, 2, true)], Consumers5),
 | 
				
			||||||
    assertSendMessageEffect(ConnectionPid, 2, true, Effects5),
 | 
					    assertSendMessageEffect(ConnectionPid, 2, Stream, ConsumerName, true, Effects5),
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    Command5 =
 | 
					    Command5 =
 | 
				
			||||||
        unregister_consumer_command(Stream, ConsumerName, ConnectionPid, 2),
 | 
					        unregister_consumer_command(Stream, ConsumerName, ConnectionPid, 2),
 | 
				
			||||||
| 
						 | 
					@ -141,7 +144,7 @@ super_stream_partition_sac_test(_) ->
 | 
				
			||||||
        rabbit_stream_sac_coordinator:apply(Command0, State0),
 | 
					        rabbit_stream_sac_coordinator:apply(Command0, State0),
 | 
				
			||||||
    ?assert(Active1),
 | 
					    ?assert(Active1),
 | 
				
			||||||
    ?assertEqual([consumer(ConnectionPid, 0, true)], Consumers1),
 | 
					    ?assertEqual([consumer(ConnectionPid, 0, true)], Consumers1),
 | 
				
			||||||
    assertSendMessageEffect(ConnectionPid, 0, true, Effects1),
 | 
					    assertSendMessageEffect(ConnectionPid, 0, Stream, ConsumerName, true, Effects1),
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    Command1 =
 | 
					    Command1 =
 | 
				
			||||||
        register_consumer_command(Stream, 1, ConsumerName, ConnectionPid, 1),
 | 
					        register_consumer_command(Stream, 1, ConsumerName, ConnectionPid, 1),
 | 
				
			||||||
| 
						 | 
					@ -155,7 +158,7 @@ super_stream_partition_sac_test(_) ->
 | 
				
			||||||
    ?assertEqual([consumer(ConnectionPid, 0, false),
 | 
					    ?assertEqual([consumer(ConnectionPid, 0, false),
 | 
				
			||||||
                  consumer(ConnectionPid, 1, false)],
 | 
					                  consumer(ConnectionPid, 1, false)],
 | 
				
			||||||
                 Consumers2),
 | 
					                 Consumers2),
 | 
				
			||||||
    assertSendMessageSteppingDownEffect(ConnectionPid, 0, Effects2),
 | 
					    assertSendMessageSteppingDownEffect(ConnectionPid, 0, Stream, ConsumerName, Effects2),
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    Command2 = activate_consumer_command(Stream, ConsumerName),
 | 
					    Command2 = activate_consumer_command(Stream, ConsumerName),
 | 
				
			||||||
    {#?STATE{groups = #{GroupId := #group{consumers = Consumers3}}} =
 | 
					    {#?STATE{groups = #{GroupId := #group{consumers = Consumers3}}} =
 | 
				
			||||||
| 
						 | 
					@ -167,7 +170,7 @@ super_stream_partition_sac_test(_) ->
 | 
				
			||||||
    ?assertEqual([consumer(ConnectionPid, 0, false),
 | 
					    ?assertEqual([consumer(ConnectionPid, 0, false),
 | 
				
			||||||
                  consumer(ConnectionPid, 1, true)],
 | 
					                  consumer(ConnectionPid, 1, true)],
 | 
				
			||||||
                 Consumers3),
 | 
					                 Consumers3),
 | 
				
			||||||
    assertSendMessageEffect(ConnectionPid, 1, true, Effects3),
 | 
					    assertSendMessageEffect(ConnectionPid, 1, Stream, ConsumerName, true, Effects3),
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    Command3 =
 | 
					    Command3 =
 | 
				
			||||||
        register_consumer_command(Stream, 1, ConsumerName, ConnectionPid, 2),
 | 
					        register_consumer_command(Stream, 1, ConsumerName, ConnectionPid, 2),
 | 
				
			||||||
| 
						 | 
					@ -197,7 +200,7 @@ super_stream_partition_sac_test(_) ->
 | 
				
			||||||
                  consumer(ConnectionPid, 2, false)],
 | 
					                  consumer(ConnectionPid, 2, false)],
 | 
				
			||||||
                 Consumers5),
 | 
					                 Consumers5),
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    assertSendMessageSteppingDownEffect(ConnectionPid, 1, Effects5),
 | 
					    assertSendMessageSteppingDownEffect(ConnectionPid, 1, Stream, ConsumerName, Effects5),
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    Command5 = activate_consumer_command(Stream, ConsumerName),
 | 
					    Command5 = activate_consumer_command(Stream, ConsumerName),
 | 
				
			||||||
    {#?STATE{groups = #{GroupId := #group{consumers = Consumers6}}} =
 | 
					    {#?STATE{groups = #{GroupId := #group{consumers = Consumers6}}} =
 | 
				
			||||||
| 
						 | 
					@ -208,7 +211,7 @@ super_stream_partition_sac_test(_) ->
 | 
				
			||||||
    ?assertEqual([consumer(ConnectionPid, 1, false),
 | 
					    ?assertEqual([consumer(ConnectionPid, 1, false),
 | 
				
			||||||
                  consumer(ConnectionPid, 2, true)],
 | 
					                  consumer(ConnectionPid, 2, true)],
 | 
				
			||||||
                 Consumers6),
 | 
					                 Consumers6),
 | 
				
			||||||
    assertSendMessageEffect(ConnectionPid, 2, true, Effects6),
 | 
					    assertSendMessageEffect(ConnectionPid, 2, Stream, ConsumerName, true, Effects6),
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    Command6 =
 | 
					    Command6 =
 | 
				
			||||||
        unregister_consumer_command(Stream, ConsumerName, ConnectionPid, 1),
 | 
					        unregister_consumer_command(Stream, ConsumerName, ConnectionPid, 1),
 | 
				
			||||||
| 
						 | 
					@ -310,7 +313,9 @@ ensure_monitors_test(_) ->
 | 
				
			||||||
    ok.
 | 
					    ok.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
handle_connection_down_test(_) ->
 | 
					handle_connection_down_test(_) ->
 | 
				
			||||||
    GroupId = {<<"/">>, <<"stream">>, <<"app">>},
 | 
					    Stream = <<"stream">>,
 | 
				
			||||||
 | 
					    ConsumerName = <<"app">>,
 | 
				
			||||||
 | 
					    GroupId = {<<"/">>, Stream, ConsumerName},
 | 
				
			||||||
    Pid0 = self(),
 | 
					    Pid0 = self(),
 | 
				
			||||||
    Pid1 = spawn(fun() -> ok end),
 | 
					    Pid1 = spawn(fun() -> ok end),
 | 
				
			||||||
    Group =
 | 
					    Group =
 | 
				
			||||||
| 
						 | 
					@ -326,7 +331,7 @@ handle_connection_down_test(_) ->
 | 
				
			||||||
        rabbit_stream_sac_coordinator:handle_connection_down(Pid0, State0),
 | 
					        rabbit_stream_sac_coordinator:handle_connection_down(Pid0, State0),
 | 
				
			||||||
    assertSize(1, PidsGroups1),
 | 
					    assertSize(1, PidsGroups1),
 | 
				
			||||||
    assertSize(1, maps:get(Pid1, PidsGroups1)),
 | 
					    assertSize(1, maps:get(Pid1, PidsGroups1)),
 | 
				
			||||||
    assertSendMessageEffect(Pid1, 1, true, Effects1),
 | 
					    assertSendMessageEffect(Pid1, 1, Stream, ConsumerName, true, Effects1),
 | 
				
			||||||
    ?assertEqual(#{GroupId => cgroup([consumer(Pid1, 1, true)])},
 | 
					    ?assertEqual(#{GroupId => cgroup([consumer(Pid1, 1, true)])},
 | 
				
			||||||
                 Groups1),
 | 
					                 Groups1),
 | 
				
			||||||
    {#?STATE{pids_groups = PidsGroups2, groups = Groups2} = _State2,
 | 
					    {#?STATE{pids_groups = PidsGroups2, groups = Groups2} = _State2,
 | 
				
			||||||
| 
						 | 
					@ -397,22 +402,28 @@ activate_consumer_command(Stream, ConsumerName) ->
 | 
				
			||||||
                               stream = Stream,
 | 
					                               stream = Stream,
 | 
				
			||||||
                               consumer_name = ConsumerName}.
 | 
					                               consumer_name = ConsumerName}.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
assertSendMessageEffect(Pid, SubId, Active, [Effect]) ->
 | 
					assertSendMessageEffect(Pid, SubId, Stream, ConsumerName, Active, [Effect]) ->
 | 
				
			||||||
    ?assertEqual({mod_call,
 | 
					    ?assertEqual({mod_call,
 | 
				
			||||||
                  rabbit_stream_sac_coordinator,
 | 
					                  rabbit_stream_sac_coordinator,
 | 
				
			||||||
                  send_message,
 | 
					                  send_message,
 | 
				
			||||||
                  [Pid,
 | 
					                  [Pid,
 | 
				
			||||||
                   {sac,
 | 
					                   {sac,
 | 
				
			||||||
                    {{subscription_id, SubId}, {active, Active},
 | 
					                    #{subscription_id => SubId,
 | 
				
			||||||
                     {extra, []}}}]},
 | 
					                      stream => Stream,
 | 
				
			||||||
 | 
					                      consumer_name => ConsumerName,
 | 
				
			||||||
 | 
					                      active => Active}
 | 
				
			||||||
 | 
					                    }]},
 | 
				
			||||||
                 Effect).
 | 
					                 Effect).
 | 
				
			||||||
 | 
					
 | 
				
			||||||
assertSendMessageSteppingDownEffect(Pid, SubId, [Effect]) ->
 | 
					assertSendMessageSteppingDownEffect(Pid, SubId, Stream, ConsumerName, [Effect]) ->
 | 
				
			||||||
    ?assertEqual({mod_call,
 | 
					    ?assertEqual({mod_call,
 | 
				
			||||||
                  rabbit_stream_sac_coordinator,
 | 
					                  rabbit_stream_sac_coordinator,
 | 
				
			||||||
                  send_message,
 | 
					                  send_message,
 | 
				
			||||||
                  [Pid,
 | 
					                  [Pid,
 | 
				
			||||||
                   {sac,
 | 
					                   {sac,
 | 
				
			||||||
                    {{subscription_id, SubId}, {active, false},
 | 
					                    #{subscription_id => SubId,
 | 
				
			||||||
                     {extra, [{stepping_down, true}]}}}]},
 | 
					                      stream => Stream,
 | 
				
			||||||
 | 
					                      consumer_name => ConsumerName,
 | 
				
			||||||
 | 
					                      active => false,
 | 
				
			||||||
 | 
					                      stepping_down => true}}]},
 | 
				
			||||||
                 Effect).
 | 
					                 Effect).
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue