Remove use of ra.hrl
As this is an internal header now. Rename use of maybe/1 type to option to avoid confusion with rabbit_types:maybe/1 which is different.
This commit is contained in:
parent
c9002d8fc3
commit
544d38c706
|
|
@ -22,7 +22,6 @@
|
|||
-compile(inline).
|
||||
-compile({no_auto_import, [apply/3]}).
|
||||
|
||||
-include_lib("ra/include/ra.hrl").
|
||||
-include("rabbit_fifo.hrl").
|
||||
-include_lib("rabbit_common/include/rabbit.hrl").
|
||||
|
||||
|
|
@ -68,8 +67,8 @@
|
|||
]).
|
||||
|
||||
%% command records representing all the protocol actions that are supported
|
||||
-record(enqueue, {pid :: maybe(pid()),
|
||||
seq :: maybe(msg_seqno()),
|
||||
-record(enqueue, {pid :: option(pid()),
|
||||
seq :: option(msg_seqno()),
|
||||
msg :: raw_msg()}).
|
||||
-record(checkout, {consumer_id :: consumer_id(),
|
||||
spec :: checkout_spec(),
|
||||
|
|
@ -1604,7 +1603,7 @@ is_over_limit(#?MODULE{cfg = #cfg{max_length = MaxLength,
|
|||
|
||||
messages_ready(State) > MaxLength orelse (BytesEnq > MaxBytes).
|
||||
|
||||
-spec make_enqueue(maybe(pid()), maybe(msg_seqno()), raw_msg()) -> protocol().
|
||||
-spec make_enqueue(option(pid()), option(msg_seqno()), raw_msg()) -> protocol().
|
||||
make_enqueue(Pid, Seq, Msg) ->
|
||||
#enqueue{pid = Pid, seq = Seq, msg = Msg}.
|
||||
-spec make_checkout(consumer_id(),
|
||||
|
|
|
|||
|
|
@ -1,4 +1,6 @@
|
|||
|
||||
-type option(T) :: undefined | T.
|
||||
|
||||
-type raw_msg() :: term().
|
||||
%% The raw message. It is opaque to rabbit_fifo.
|
||||
|
||||
|
|
@ -28,7 +30,7 @@
|
|||
-type msg_size() :: non_neg_integer().
|
||||
%% the size in bytes of the msg payload
|
||||
|
||||
-type indexed_msg() :: {ra_index(), msg()}.
|
||||
-type indexed_msg() :: {ra:index(), msg()}.
|
||||
|
||||
-type prefix_msg() :: {'$prefix_msg', msg_header()}.
|
||||
|
||||
|
|
@ -93,7 +95,7 @@
|
|||
-record(enqueuer,
|
||||
{next_seqno = 1 :: msg_seqno(),
|
||||
% out of order enqueues - sorted list
|
||||
pending = [] :: [{msg_seqno(), ra_index(), raw_msg()}],
|
||||
pending = [] :: [{msg_seqno(), ra:index(), raw_msg()}],
|
||||
status = up :: up | suspected_down
|
||||
}).
|
||||
|
||||
|
|
@ -101,16 +103,16 @@
|
|||
{name :: atom(),
|
||||
resource :: rabbit_types:r('queue'),
|
||||
release_cursor_interval = ?RELEASE_CURSOR_EVERY :: non_neg_integer(),
|
||||
dead_letter_handler :: maybe(applied_mfa()),
|
||||
become_leader_handler :: maybe(applied_mfa()),
|
||||
max_length :: maybe(non_neg_integer()),
|
||||
max_bytes :: maybe(non_neg_integer()),
|
||||
dead_letter_handler :: option(applied_mfa()),
|
||||
become_leader_handler :: option(applied_mfa()),
|
||||
max_length :: option(non_neg_integer()),
|
||||
max_bytes :: option(non_neg_integer()),
|
||||
%% whether single active consumer is on or not for this queue
|
||||
consumer_strategy = competing :: consumer_strategy(),
|
||||
%% the maximum number of unsuccessful delivery attempts permitted
|
||||
delivery_limit :: maybe(non_neg_integer()),
|
||||
max_in_memory_length :: maybe(non_neg_integer()),
|
||||
max_in_memory_bytes :: maybe(non_neg_integer())
|
||||
delivery_limit :: option(non_neg_integer()),
|
||||
max_in_memory_length :: option(non_neg_integer()),
|
||||
max_in_memory_bytes :: option(non_neg_integer())
|
||||
}).
|
||||
|
||||
-record(rabbit_fifo,
|
||||
|
|
@ -119,7 +121,7 @@
|
|||
messages = #{} :: #{msg_in_id() => indexed_msg()},
|
||||
% defines the lowest message in id available in the messages map
|
||||
% that isn't a return
|
||||
low_msg_num :: maybe(msg_in_id()),
|
||||
low_msg_num :: option(msg_in_id()),
|
||||
% defines the next message in id to be added to the messages map
|
||||
next_msg_num = 1 :: msg_in_id(),
|
||||
% list of returned msg_in_ids - when checking out it picks from
|
||||
|
|
@ -139,7 +141,7 @@
|
|||
% for normal appending operations as it's backed by a map
|
||||
ra_indexes = rabbit_fifo_index:empty() :: rabbit_fifo_index:state(),
|
||||
release_cursors = lqueue:new() :: lqueue:lqueue({release_cursor,
|
||||
ra_index(), #rabbit_fifo{}}),
|
||||
ra:index(), #rabbit_fifo{}}),
|
||||
% consumers need to reflect consumer state at time of snapshot
|
||||
% needs to be part of snapshot
|
||||
consumers = #{} :: #{consumer_id() => #consumer{}},
|
||||
|
|
|
|||
|
|
@ -43,7 +43,6 @@
|
|||
stat/1
|
||||
]).
|
||||
|
||||
-include_lib("ra/include/ra.hrl").
|
||||
-include_lib("rabbit_common/include/rabbit.hrl").
|
||||
|
||||
-define(SOFT_LIMIT, 256).
|
||||
|
|
@ -63,8 +62,8 @@
|
|||
delivery_count = 0 :: non_neg_integer()}).
|
||||
|
||||
-record(state, {cluster_name :: cluster_name(),
|
||||
servers = [] :: [ra_server_id()],
|
||||
leader :: maybe(ra_server_id()),
|
||||
servers = [] :: [ra:server_id()],
|
||||
leader :: undefined | ra:server_id(),
|
||||
next_seq = 0 :: seq(),
|
||||
%% Last applied is initialise to -1 to note that no command has yet been
|
||||
%% applied, but allowing to resend messages if the first ones on the sequence
|
||||
|
|
@ -77,7 +76,7 @@
|
|||
{[seq()], [seq()], [seq()]}},
|
||||
soft_limit = ?SOFT_LIMIT :: non_neg_integer(),
|
||||
pending = #{} :: #{seq() =>
|
||||
{maybe(term()), rabbit_fifo:command()}},
|
||||
{term(), rabbit_fifo:command()}},
|
||||
consumer_deliveries = #{} :: #{rabbit_fifo:consumer_tag() =>
|
||||
#consumer{}},
|
||||
block_handler = fun() -> ok end :: fun(() -> term()),
|
||||
|
|
@ -99,7 +98,7 @@
|
|||
%% @param ClusterName the id of the cluster to interact with
|
||||
%% @param Servers The known servers of the queue. If the current leader is known
|
||||
%% ensure the leader node is at the head of the list.
|
||||
-spec init(cluster_name(), [ra_server_id()]) -> state().
|
||||
-spec init(cluster_name(), [ra:server_id()]) -> state().
|
||||
init(ClusterName, Servers) ->
|
||||
init(ClusterName, Servers, ?SOFT_LIMIT).
|
||||
|
||||
|
|
@ -109,7 +108,7 @@ init(ClusterName, Servers) ->
|
|||
%% @param Servers The known servers of the queue. If the current leader is known
|
||||
%% ensure the leader node is at the head of the list.
|
||||
%% @param MaxPending size defining the max number of pending commands.
|
||||
-spec init(cluster_name(), [ra_server_id()], non_neg_integer()) -> state().
|
||||
-spec init(cluster_name(), [ra:server_id()], non_neg_integer()) -> state().
|
||||
init(ClusterName = #resource{}, Servers, SoftLimit) ->
|
||||
Timeout = application:get_env(kernel, net_ticktime, 60000) + 5000,
|
||||
#state{cluster_name = ClusterName,
|
||||
|
|
@ -117,7 +116,7 @@ init(ClusterName = #resource{}, Servers, SoftLimit) ->
|
|||
soft_limit = SoftLimit,
|
||||
timeout = Timeout}.
|
||||
|
||||
-spec init(cluster_name(), [ra_server_id()], non_neg_integer(), fun(() -> ok),
|
||||
-spec init(cluster_name(), [ra:server_id()], non_neg_integer(), fun(() -> ok),
|
||||
fun(() -> ok)) -> state().
|
||||
init(ClusterName = #resource{}, Servers, SoftLimit, BlockFun, UnblockFun) ->
|
||||
Timeout = application:get_env(kernel, net_ticktime, 60000) + 5000,
|
||||
|
|
@ -401,7 +400,7 @@ cancel_checkout(ConsumerTag, #state{consumer_deliveries = CDels} = State0) ->
|
|||
|
||||
%% @doc Purges all the messages from a rabbit_fifo queue and returns the number
|
||||
%% of messages purged.
|
||||
-spec purge(ra_server_id()) -> {ok, non_neg_integer()} | {error | timeout, term()}.
|
||||
-spec purge(ra:server_id()) -> {ok, non_neg_integer()} | {error | timeout, term()}.
|
||||
purge(Node) ->
|
||||
case ra:process_command(Node, rabbit_fifo:make_purge()) of
|
||||
{ok, {purge, Reply}, _} ->
|
||||
|
|
@ -414,7 +413,7 @@ purge(Node) ->
|
|||
pending_size(#state{pending = Pend}) ->
|
||||
maps:size(Pend).
|
||||
|
||||
-spec stat(ra_server_id()) ->
|
||||
-spec stat(ra:server_id()) ->
|
||||
{ok, non_neg_integer(), non_neg_integer()}
|
||||
| {error | timeout, term()}.
|
||||
stat(Leader) ->
|
||||
|
|
@ -458,7 +457,7 @@ update_machine_state(Node, Conf) ->
|
|||
%% end
|
||||
%% '''
|
||||
%%
|
||||
%% @param From the {@link ra_server_id().} of the sending process.
|
||||
%% @param From the {@link ra:server_id().} of the sending process.
|
||||
%% @param Event the body of the `ra_event'.
|
||||
%% @param State the current {@module} state.
|
||||
%%
|
||||
|
|
@ -479,7 +478,7 @@ update_machine_state(Node, Conf) ->
|
|||
%% <li>`MsgId' is a consumer scoped monotonically incrementing id that can be
|
||||
%% used to {@link settle/3.} (roughly: AMQP 0.9.1 ack) message once finished
|
||||
%% with them.</li>
|
||||
-spec handle_ra_event(ra_server_id(), ra_server_proc:ra_event_body(), state()) ->
|
||||
-spec handle_ra_event(ra:server_id(), ra_server_proc:ra_event_body(), state()) ->
|
||||
{internal, Correlators :: [term()], actions(), state()} |
|
||||
{rabbit_fifo:client_msg(), state()} | eol.
|
||||
handle_ra_event(From, {applied, Seqs},
|
||||
|
|
@ -567,7 +566,7 @@ handle_ra_event(_Leader, {machine, eol}, _State0) ->
|
|||
%% @param Msg the message to enqueue.
|
||||
%%
|
||||
%% @returns `ok'
|
||||
-spec untracked_enqueue([ra_server_id()], term()) ->
|
||||
-spec untracked_enqueue([ra:server_id()], term()) ->
|
||||
ok.
|
||||
untracked_enqueue([Node | _], Msg) ->
|
||||
Cmd = rabbit_fifo:make_enqueue(undefined, undefined, Msg),
|
||||
|
|
|
|||
|
|
@ -10,7 +10,6 @@
|
|||
map/2
|
||||
]).
|
||||
|
||||
-include_lib("ra/include/ra.hrl").
|
||||
-compile({no_auto_import, [size/1]}).
|
||||
|
||||
%% the empty atom is a lot smaller (4 bytes) than e.g. `undefined` (13 bytes).
|
||||
|
|
|
|||
|
|
@ -10,7 +10,6 @@
|
|||
|
||||
-include_lib("common_test/include/ct.hrl").
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
-include_lib("ra/include/ra.hrl").
|
||||
-include_lib("rabbit_common/include/rabbit.hrl").
|
||||
-include("src/rabbit_fifo.hrl").
|
||||
|
||||
|
|
|
|||
|
|
@ -8,7 +8,6 @@
|
|||
-include_lib("proper/include/proper.hrl").
|
||||
-include_lib("common_test/include/ct.hrl").
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
-include_lib("ra/include/ra.hrl").
|
||||
-include("src/rabbit_fifo.hrl").
|
||||
|
||||
%%%===================================================================
|
||||
|
|
|
|||
Loading…
Reference in New Issue