rabbitmq-server/deps/rabbit/src/gm.erl

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

1651 lines
70 KiB
Erlang
Raw Normal View History

%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
2024-01-02 11:02:20 +08:00
%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%
-module(gm).
%% Guaranteed Multicast
%% ====================
%%
%% This module provides the ability to create named groups of
%% processes to which members can be dynamically added and removed,
%% and for messages to be broadcast within the group that are
%% guaranteed to reach all members of the group during the lifetime of
%% the message. The lifetime of a message is defined as being, at a
%% minimum, the time from which the message is first sent to any
%% member of the group, up until the time at which it is known by the
%% member who published the message that the message has reached all
%% group members.
%%
%% The guarantee given is that provided a message, once sent, makes it
%% to members who do not all leave the group, the message will
%% continue to propagate to all group members.
%%
%% Another way of stating the guarantee is that if member P publishes
%% messages m and m', then for all members P', if P' is a member of
%% the group prior to the publication of m, and P' receives m', then
%% P' will receive m.
%%
%% Note that only local-ordering is enforced: i.e. if member P sends
%% message m and then message m', then for-all members P', if P'
%% receives m and m', then they will receive m' after m. Causality
%% ordering is _not_ enforced. I.e. if member P receives message m
%% and as a result publishes message m', there is no guarantee that
%% other members P' will receive m before m'.
%%
%%
%% API Use
%% -------
%%
%% Mnesia must be started. Use the idempotent create_tables/0 function
%% to create the tables required.
%%
%% start_link/3
2011-02-23 21:09:16 +08:00
%% Provide the group name, the callback module name, and any arguments
%% you wish to be passed into the callback module's functions. The
2011-02-25 21:05:21 +08:00
%% joined/2 function will be called when we have joined the group,
%% with the arguments passed to start_link and a list of the current
2012-03-06 00:18:50 +08:00
%% members of the group. See the callbacks specs and the comments
%% below for further details of the callback functions.
%%
%% leave/1
%% Provide the Pid. Removes the Pid from the group. The callback
%% handle_terminate/2 function will be called.
%%
%% broadcast/2
%% Provide the Pid and a Message. The message will be sent to all
%% members of the group as per the guarantees given above. This is a
%% cast and the function call will return immediately. There is no
%% guarantee that the message will reach any member of the group.
%%
%% confirmed_broadcast/2
%% Provide the Pid and a Message. As per broadcast/2 except that this
%% is a call, not a cast, and only returns 'ok' once the Message has
%% reached every member of the group. Do not call
%% confirmed_broadcast/2 directly from the callback module otherwise
%% you will deadlock the entire group.
%%
%% info/1
%% Provide the Pid. Returns a proplist with various facts, including
%% the group name and the current group members.
%%
%% validate_members/2
%% Check whether a given member list agrees with the chosen member's
%% view. Any differences will be communicated via the members_changed
%% callback. If there are no differences then there will be no reply.
%% Note that members will not necessarily share the same view.
%%
%% forget_group/1
%% Provide the group name. Removes its mnesia record. Makes no attempt
%% to ensure the group is empty.
%%
%% Implementation Overview
%% -----------------------
%%
%% One possible means of implementation would be a fan-out from the
%% sender to every member of the group. This would require that the
%% group is fully connected, and, in the event that the original
%% sender of the message disappears from the group before the message
%% has made it to every member of the group, raises questions as to
%% who is responsible for sending on the message to new group members.
%% In particular, the issue is with [ Pid ! Msg || Pid <- Members ] -
%% if the sender dies part way through, who is responsible for
%% ensuring that the remaining Members receive the Msg? In the event
%% that within the group, messages sent are broadcast from a subset of
%% the members, the fan-out arrangement has the potential to
%% substantially impact the CPU and network workload of such members,
%% as such members would have to accommodate the cost of sending each
%% message to every group member.
%%
%% Instead, if the members of the group are arranged in a chain, then
%% it becomes easier to reason about who within the group has received
%% each message and who has not. It eases issues of responsibility: in
%% the event of a group member disappearing, the nearest upstream
%% member of the chain is responsible for ensuring that messages
%% continue to propagate down the chain. It also results in equal
%% distribution of sending and receiving workload, even if all
%% messages are being sent from just a single group member. This
%% configuration has the further advantage that it is not necessary
%% for every group member to know of every other group member, and
%% even that a group member does not have to be accessible from all
%% other group members.
%%
%% Performance is kept high by permitting pipelining and all
%% communication between joined group members is asynchronous. In the
%% chain A -> B -> C -> D, if A sends a message to the group, it will
%% not directly contact C or D. However, it must know that D receives
%% the message (in addition to B and C) before it can consider the
%% message fully sent. A simplistic implementation would require that
%% D replies to C, C replies to B and B then replies to A. This would
%% result in a propagation delay of twice the length of the chain. It
%% would also require, in the event of the failure of C, that D knows
%% to directly contact B and issue the necessary replies. Instead, the
%% chain forms a ring: D sends the message on to A: D does not
%% distinguish A as the sender, merely as the next member (downstream)
%% within the chain (which has now become a ring). When A receives
%% from D messages that A sent, it knows that all members have
%% received the message. However, the message is not dead yet: if C
%% died as B was sending to C, then B would need to detect the death
%% of C and forward the message on to D instead: thus every node has
%% to remember every message published until it is told that it can
%% forget about the message. This is essential not just for dealing
%% with failure of members, but also for the addition of new members.
%%
%% Thus once A receives the message back again, it then sends to B an
%% acknowledgement for the message, indicating that B can now forget
%% about the message. B does so, and forwards the ack to C. C forgets
%% the message, and forwards the ack to D, which forgets the message
%% and finally forwards the ack back to A. At this point, A takes no
%% further action: the message and its acknowledgement have made it to
%% every member of the group. The message is now dead, and any new
%% member joining the group at this point will not receive the
%% message.
%%
%% We therefore have two roles:
%%
%% 1. The sender, who upon receiving their own messages back, must
%% then send out acknowledgements, and upon receiving their own
%% acknowledgements back perform no further action.
%%
%% 2. The other group members who upon receiving messages and
%% acknowledgements must update their own internal state accordingly
%% (the sending member must also do this in order to be able to
%% accommodate failures), and forwards messages on to their downstream
%% neighbours.
%%
%%
%% Implementation: It gets trickier
%% --------------------------------
%%
%% Chain A -> B -> C -> D
%%
%% A publishes a message which B receives. A now dies. B and D will
%% detect the death of A, and will link up, thus the chain is now B ->
%% C -> D. B forwards A's message on to C, who forwards it to D, who
%% forwards it to B. Thus B is now responsible for A's messages - both
%% publications and acknowledgements that were in flight at the point
%% at which A died. Even worse is that this is transitive: after B
%% forwards A's message to C, B dies as well. Now C is not only
%% responsible for B's in-flight messages, but is also responsible for
%% A's in-flight messages.
%%
%% Lemma 1: A member can only determine which dead members they have
%% inherited responsibility for if there is a total ordering on the
%% conflicting additions and subtractions of members from the group.
%%
%% Consider the simultaneous death of B and addition of B' that
%% transitions a chain from A -> B -> C to A -> B' -> C. Either B' or
%% C is responsible for in-flight messages from B. It is easy to
%% ensure that at least one of them thinks they have inherited B, but
%% if we do not ensure that exactly one of them inherits B, then we
%% could have B' converting publishes to acks, which then will crash C
%% as C does not believe it has issued acks for those messages.
%%
%% More complex scenarios are easy to concoct: A -> B -> C -> D -> E
%% becoming A -> C' -> E. Who has inherited which of B, C and D?
%%
%% However, for non-conflicting membership changes, only a partial
%% ordering is required. For example, A -> B -> C becoming A -> A' ->
%% B. The addition of A', between A and B can have no conflicts with
%% the death of C: it is clear that A has inherited C's messages.
%%
%% For ease of implementation, we adopt the simple solution, of
%% imposing a total order on all membership changes.
%%
%% On the death of a member, it is ensured the dead member's
%% neighbours become aware of the death, and the upstream neighbour
%% now sends to its new downstream neighbour its state, including the
%% messages pending acknowledgement. The downstream neighbour can then
%% use this to calculate which publishes and acknowledgements it has
%% missed out on, due to the death of its old upstream. Thus the
%% downstream can catch up, and continues the propagation of messages
%% through the group.
%%
%% Lemma 2: When a member is joining, it must synchronously
%% communicate with its upstream member in order to receive its
%% starting state atomically with its addition to the group.
%%
%% New members must start with the same state as their nearest
%% upstream neighbour. This ensures that it is not surprised by
%% acknowledgements they are sent, and that should their downstream
%% neighbour die, they are able to send the correct state to their new
%% downstream neighbour to ensure it can catch up. Thus in the
%% transition A -> B -> C becomes A -> A' -> B -> C becomes A -> A' ->
%% C, A' must start with the state of A, so that it can send C the
%% correct state when B dies, allowing C to detect any missed
%% messages.
%%
%% If A' starts by adding itself to the group membership, A could then
%% die, without A' having received the necessary state from A. This
%% would leave A' responsible for in-flight messages from A, but
%% having the least knowledge of all, of those messages. Thus A' must
%% start by synchronously calling A, which then immediately sends A'
%% back its state. A then adds A' to the group. If A dies at this
%% point then A' will be able to see this (as A' will fail to appear
%% in the group membership), and thus A' will ignore the state it
%% receives from A, and will simply repeat the process, trying to now
%% join downstream from some other member. This ensures that should
%% the upstream die as soon as the new member has been joined, the new
%% member is guaranteed to receive the correct state, allowing it to
%% correctly process messages inherited due to the death of its
%% upstream neighbour.
%%
%% The canonical definition of the group membership is held by a
%% distributed database. Whilst this allows the total ordering of
%% changes to be achieved, it is nevertheless undesirable to have to
%% query this database for the current view, upon receiving each
%% message. Instead, we wish for members to be able to cache a view of
%% the group membership, which then requires a cache invalidation
%% mechanism. Each member maintains its own view of the group
%% membership. Thus when the group's membership changes, members may
%% need to become aware of such changes in order to be able to
%% accurately process messages they receive. Because of the
%% requirement of a total ordering of conflicting membership changes,
%% it is not possible to use the guaranteed broadcast mechanism to
%% communicate these changes: to achieve the necessary ordering, it
%% would be necessary for such messages to be published by exactly one
%% member, which can not be guaranteed given that such a member could
%% die.
%%
%% The total ordering we enforce on membership changes gives rise to a
%% view version number: every change to the membership creates a
%% different view, and the total ordering permits a simple
%% monotonically increasing view version number.
%%
%% Lemma 3: If a message is sent from a member that holds view version
%% N, it can be correctly processed by any member receiving the
%% message with a view version >= N.
%%
%% Initially, let us suppose that each view contains the ordering of
%% every member that was ever part of the group. Dead members are
%% marked as such. Thus we have a ring of members, some of which are
%% dead, and are thus inherited by the nearest alive downstream
%% member.
%%
%% In the chain A -> B -> C, all three members initially have view
%% version 1, which reflects reality. B publishes a message, which is
%% forward by C to A. B now dies, which A notices very quickly. Thus A
%% updates the view, creating version 2. It now forwards B's
%% publication, sending that message to its new downstream neighbour,
%% C. This happens before C is aware of the death of B. C must become
%% aware of the view change before it interprets the message its
%% received, otherwise it will fail to learn of the death of B, and
%% thus will not realise it has inherited B's messages (and will
%% likely crash).
%%
%% Thus very simply, we have that each subsequent view contains more
%% information than the preceding view.
%%
%% However, to avoid the views growing indefinitely, we need to be
%% able to delete members which have died _and_ for which no messages
%% are in-flight. This requires that upon inheriting a dead member, we
%% know the last publication sent by the dead member (this is easy: we
%% inherit a member because we are the nearest downstream member which
%% implies that we know at least as much than everyone else about the
%% publications of the dead member), and we know the earliest message
%% for which the acknowledgement is still in flight.
%%
%% In the chain A -> B -> C, when B dies, A will send to C its state
%% (as C is the new downstream from A), allowing C to calculate which
%% messages it has missed out on (described above). At this point, C
%% also inherits B's messages. If that state from A also includes the
%% last message published by B for which an acknowledgement has been
%% seen, then C knows exactly which further acknowledgements it must
%% receive (also including issuing acknowledgements for publications
%% still in-flight that it receives), after which it is known there
%% are no more messages in flight for B, thus all evidence that B was
%% ever part of the group can be safely removed from the canonical
%% group membership.
%%
%% Thus, for every message that a member sends, it includes with that
%% message its view version. When a member receives a message it will
%% update its view from the canonical copy, should its view be older
%% than the view version included in the message it has received.
%%
%% The state held by each member therefore includes the messages from
%% each publisher pending acknowledgement, the last publication seen
%% from that publisher, and the last acknowledgement from that
%% publisher. In the case of the member's own publications or
%% inherited members, this last acknowledgement seen state indicates
%% the last acknowledgement retired, rather than sent.
%%
%%
%% Proof sketch
%% ------------
%%
%% We need to prove that with the provided operational semantics, we
%% can never reach a state that is not well formed from a well-formed
%% starting state.
%%
%% Operational semantics (small step): straight-forward message
%% sending, process monitoring, state updates.
%%
%% Well formed state: dead members inherited by exactly one non-dead
%% member; for every entry in anyone's pending-acks, either (the
%% publication of the message is in-flight downstream from the member
%% and upstream from the publisher) or (the acknowledgement of the
%% message is in-flight downstream from the publisher and upstream
%% from the member).
%%
%% Proof by induction on the applicable operational semantics.
%%
%%
%% Related work
%% ------------
%%
%% The ring configuration and double traversal of messages around the
%% ring is similar (though developed independently) to the LCR
%% protocol by [Levy 2008]. However, LCR differs in several
%% ways. Firstly, by using vector clocks, it enforces a total order of
%% message delivery, which is unnecessary for our purposes. More
%% significantly, it is built on top of a "group communication system"
%% which performs the group management functions, taking
%% responsibility away from the protocol as to how to cope with safely
%% adding and removing members. When membership changes do occur, the
%% protocol stipulates that every member must perform communication
%% with every other member of the group, to ensure all outstanding
%% deliveries complete, before the entire group transitions to the new
%% view. This, in total, requires two sets of all-to-all synchronous
%% communications.
%%
%% This is not only rather inefficient, but also does not explain what
%% happens upon the failure of a member during this process. It does
%% though entirely avoid the need for inheritance of responsibility of
%% dead members that our protocol incorporates.
%%
%% In [Marandi et al 2010], a Paxos-based protocol is described. This
%% work explicitly focuses on the efficiency of communication. LCR
%% (and our protocol too) are more efficient, but at the cost of
%% higher latency. The Ring-Paxos protocol is itself built on top of
%% IP-multicast, which rules it out for many applications where
%% point-to-point communication is all that can be required. They also
%% have an excellent related work section which I really ought to
%% read...
%%
%%
%% [Levy 2008] The Complexity of Reliable Distributed Storage, 2008.
%% [Marandi et al 2010] Ring Paxos: A High-Throughput Atomic Broadcast
%% Protocol
-behaviour(gen_server2).
2014-06-02 19:19:08 +08:00
-export([create_tables/0, start_link/4, leave/1, broadcast/2, broadcast/3,
confirmed_broadcast/2, info/1, validate_members/2, forget_group/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
2012-11-08 20:53:27 +08:00
code_change/3, prioritise_info/3]).
%% For INSTR_MOD callbacks
-export([call/3, cast/2, monitor/1, demonitor/1]).
2011-08-02 18:58:03 +08:00
-export([table_definitions/0]).
-define(GROUP_TABLE, gm_group).
-define(MAX_BUFFER_SIZE, 100000000). %% 100MB
-define(BROADCAST_TIMER, 25).
Run garbage collection in GM every 250ms In high throughput scenarios, e.g. `basic.reject` or `basic.nack`, messages which belong to a mirrored queue and are replicated within a GM group, are quickly promoted to the old heap. This means that garbage collection happens only when the Erlang VM is under memory pressure, which might be too late. When a process is under pressure, garbage collection slows it down even further, to the point of RabbitMQ nodes running out of memory and crashing. To avoid this scenario, We want the GM process to garbage collect binaries regularly, i.e. every 250ms. The variable queue does the same for a similar reason: rabbitmq/rabbitmq-server#289 Initially, we wanted to use the number of messages as the trigger for garbage collection, but we soon discovered that different workloads (e.g. small vs large messages) would result in unpredictable and sub-optimal GC schedules. Before setting `fullsweep_after` to 0, memory usage was 2x higher (400MB vs 200MB) and throughput was 0.1x lower (18k vs 20k). With this `spawn_opt` setting, the general collection algorithm is disabled, meaning that all live data is copied at every garbage collection: http://erlang.org/doc/man/erlang.html#spawn_opt-3 The RabbitMQ deployment used for testing this change: * AWS, c4.2xlarge, bosh-aws-xen-hvm-ubuntu-trusty-go_agent 3421.11 * 3 RabbitMQ nodes running OTP 20.0.1 * 3 durable & auto-delete queues with 3 replicas each * each queue master was defined on a different RabbitMQ node * every RabbitMQ node was running 1 queue master & 2 queue slaves * 1 consumer per queue with QOS 100 * 100 durable messages @ 1KiB each * `basic.reject` operations ``` | Node | Message throughput | Memory usage | | ------ | -------------------- | -------------- | | rmq0 | 12K - 20K msg/s | 400 - 900 MB | | rmq1 | 12K - 20K msg/s | 500 - 1000 MB | | rmq2 | 12K - 20K msg/s | 500 - 800 MB | ``` [#148892851] Signed-off-by: Gerhard Lazu <gerhard@rabbitmq.com>
2017-07-21 01:37:19 +08:00
-define(FORCE_GC_TIMER, 250).
2011-12-12 22:02:38 +08:00
-define(VERSION_START, 0).
-define(SETS, ordsets).
-record(state,
{ self,
left,
right,
group_name,
module,
view,
pub_count,
members_state,
callback_args,
confirms,
broadcast_buffer,
broadcast_buffer_sz,
2012-10-17 23:25:40 +08:00
broadcast_timer,
Run garbage collection in GM every 250ms In high throughput scenarios, e.g. `basic.reject` or `basic.nack`, messages which belong to a mirrored queue and are replicated within a GM group, are quickly promoted to the old heap. This means that garbage collection happens only when the Erlang VM is under memory pressure, which might be too late. When a process is under pressure, garbage collection slows it down even further, to the point of RabbitMQ nodes running out of memory and crashing. To avoid this scenario, We want the GM process to garbage collect binaries regularly, i.e. every 250ms. The variable queue does the same for a similar reason: rabbitmq/rabbitmq-server#289 Initially, we wanted to use the number of messages as the trigger for garbage collection, but we soon discovered that different workloads (e.g. small vs large messages) would result in unpredictable and sub-optimal GC schedules. Before setting `fullsweep_after` to 0, memory usage was 2x higher (400MB vs 200MB) and throughput was 0.1x lower (18k vs 20k). With this `spawn_opt` setting, the general collection algorithm is disabled, meaning that all live data is copied at every garbage collection: http://erlang.org/doc/man/erlang.html#spawn_opt-3 The RabbitMQ deployment used for testing this change: * AWS, c4.2xlarge, bosh-aws-xen-hvm-ubuntu-trusty-go_agent 3421.11 * 3 RabbitMQ nodes running OTP 20.0.1 * 3 durable & auto-delete queues with 3 replicas each * each queue master was defined on a different RabbitMQ node * every RabbitMQ node was running 1 queue master & 2 queue slaves * 1 consumer per queue with QOS 100 * 100 durable messages @ 1KiB each * `basic.reject` operations ``` | Node | Message throughput | Memory usage | | ------ | -------------------- | -------------- | | rmq0 | 12K - 20K msg/s | 400 - 900 MB | | rmq1 | 12K - 20K msg/s | 500 - 1000 MB | | rmq2 | 12K - 20K msg/s | 500 - 800 MB | ``` [#148892851] Signed-off-by: Gerhard Lazu <gerhard@rabbitmq.com>
2017-07-21 01:37:19 +08:00
force_gc_timer,
txn_executor,
shutting_down
}).
-record(gm_group, { name, version, members }).
-record(view_member, { id, aliases, left, right }).
-record(member, { pending_ack, last_pub, last_ack }).
-define(TABLE, {?GROUP_TABLE, [{record_name, gm_group},
{attributes, record_info(fields, gm_group)}]}).
-define(TABLE_MATCH, {match, #gm_group { _ = '_' }}).
-define(TAG, '$gm').
-export_type([group_name/0]).
-type group_name() :: any().
-type txn_fun() :: fun((fun(() -> any())) -> any()).
%% The joined, members_changed and handle_msg callbacks can all return
%% any of the following terms:
2012-03-06 00:18:50 +08:00
%%
%% 'ok' - the callback function returns normally
%%
%% {'stop', Reason} - the callback indicates the member should stop
%% with reason Reason and should leave the group.
2012-03-06 00:18:50 +08:00
%%
%% {'become', Module, Args} - the callback indicates that the callback
%% module should be changed to Module and that the callback functions
%% should now be passed the arguments Args. This allows the callback
%% module to be dynamically changed.
%% Called when we've successfully joined the group. Supplied with Args
%% provided in start_link, plus current group members.
2012-03-06 00:18:50 +08:00
-callback joined(Args :: term(), Members :: [pid()]) ->
ok | {stop, Reason :: term()} | {become, Module :: atom(), Args :: any()}.
%% Supplied with Args provided in start_link, the list of new members
%% and the list of members previously known to us that have since
%% died. Note that if a member joins and dies very quickly, it's
%% possible that we will never see that member appear in either births
%% or deaths. However we are guaranteed that (1) we will see a member
%% joining either in the births here, or in the members passed to
%% joined/2 before receiving any messages from it; and (2) we will not
%% see members die that we have not seen born (or supplied in the
%% members to joined/2).
-callback members_changed(Args :: term(),
Births :: [pid()], Deaths :: [pid()]) ->
2012-03-06 00:18:50 +08:00
ok | {stop, Reason :: term()} | {become, Module :: atom(), Args :: any()}.
2012-03-06 00:18:50 +08:00
%% Supplied with Args provided in start_link, the sender, and the
%% message. This does get called for messages injected by this member,
%% however, in such cases, there is no special significance of this
%% invocation: it does not indicate that the message has made it to
%% any other members, let alone all other members.
2012-03-06 00:18:50 +08:00
-callback handle_msg(Args :: term(), From :: pid(), Message :: term()) ->
ok | {stop, Reason :: term()} | {become, Module :: atom(), Args :: any()}.
%% Called on gm member termination as per rules in gen_server, with
%% the Args provided in start_link plus the termination Reason.
-callback handle_terminate(Args :: term(), Reason :: term()) ->
2012-03-06 00:18:50 +08:00
ok | term().
2018-11-30 18:30:36 +08:00
-spec create_tables() -> 'ok' | {'aborted', any()}.
create_tables() ->
create_tables([?TABLE]).
create_tables([]) ->
ok;
create_tables([{Table, Attributes} | Tables]) ->
case mnesia:create_table(Table, Attributes) of
{atomic, ok} -> create_tables(Tables);
{aborted, {already_exists, Table}} -> create_tables(Tables);
Err -> Err
end.
table_definitions() ->
{Name, Attributes} = ?TABLE,
[{Name, [?TABLE_MATCH | Attributes]}].
2018-11-30 18:30:36 +08:00
-spec start_link(group_name(), atom(), any(), txn_fun()) ->
rabbit_types:ok_pid_or_error().
start_link(GroupName, Module, Args, TxnFun) ->
Run garbage collection in GM every 250ms In high throughput scenarios, e.g. `basic.reject` or `basic.nack`, messages which belong to a mirrored queue and are replicated within a GM group, are quickly promoted to the old heap. This means that garbage collection happens only when the Erlang VM is under memory pressure, which might be too late. When a process is under pressure, garbage collection slows it down even further, to the point of RabbitMQ nodes running out of memory and crashing. To avoid this scenario, We want the GM process to garbage collect binaries regularly, i.e. every 250ms. The variable queue does the same for a similar reason: rabbitmq/rabbitmq-server#289 Initially, we wanted to use the number of messages as the trigger for garbage collection, but we soon discovered that different workloads (e.g. small vs large messages) would result in unpredictable and sub-optimal GC schedules. Before setting `fullsweep_after` to 0, memory usage was 2x higher (400MB vs 200MB) and throughput was 0.1x lower (18k vs 20k). With this `spawn_opt` setting, the general collection algorithm is disabled, meaning that all live data is copied at every garbage collection: http://erlang.org/doc/man/erlang.html#spawn_opt-3 The RabbitMQ deployment used for testing this change: * AWS, c4.2xlarge, bosh-aws-xen-hvm-ubuntu-trusty-go_agent 3421.11 * 3 RabbitMQ nodes running OTP 20.0.1 * 3 durable & auto-delete queues with 3 replicas each * each queue master was defined on a different RabbitMQ node * every RabbitMQ node was running 1 queue master & 2 queue slaves * 1 consumer per queue with QOS 100 * 100 durable messages @ 1KiB each * `basic.reject` operations ``` | Node | Message throughput | Memory usage | | ------ | -------------------- | -------------- | | rmq0 | 12K - 20K msg/s | 400 - 900 MB | | rmq1 | 12K - 20K msg/s | 500 - 1000 MB | | rmq2 | 12K - 20K msg/s | 500 - 800 MB | ``` [#148892851] Signed-off-by: Gerhard Lazu <gerhard@rabbitmq.com>
2017-07-21 01:37:19 +08:00
gen_server2:start_link(?MODULE, [GroupName, Module, Args, TxnFun],
[{spawn_opt, [{fullsweep_after, 0}]}]).
2018-11-30 18:30:36 +08:00
-spec leave(pid()) -> 'ok'.
leave(Server) ->
gen_server2:cast(Server, leave).
2018-11-30 18:30:36 +08:00
-spec broadcast(pid(), any()) -> 'ok'.
broadcast(Server, Msg) -> broadcast(Server, Msg, 0).
broadcast(Server, Msg, SizeHint) ->
gen_server2:cast(Server, {broadcast, Msg, SizeHint}).
2018-11-30 18:30:36 +08:00
-spec confirmed_broadcast(pid(), any()) -> 'ok'.
confirmed_broadcast(Server, Msg) ->
gen_server2:call(Server, {confirmed_broadcast, Msg}, infinity).
2018-11-30 18:30:36 +08:00
-spec info(pid()) -> rabbit_types:infos().
info(Server) ->
gen_server2:call(Server, info, infinity).
2018-11-30 18:30:36 +08:00
-spec validate_members(pid(), [pid()]) -> 'ok'.
validate_members(Server, Members) ->
gen_server2:cast(Server, {validate_members, Members}).
2018-11-30 18:30:36 +08:00
-spec forget_group(group_name()) -> 'ok'.
forget_group(GroupName) ->
2012-10-17 23:25:40 +08:00
{atomic, ok} = mnesia:sync_transaction(
fun () ->
mnesia:delete({?GROUP_TABLE, GroupName})
end),
ok.
2014-06-02 19:19:08 +08:00
init([GroupName, Module, Args, TxnFun]) ->
put(process_name, {?MODULE, GroupName}),
2011-12-12 22:02:38 +08:00
Self = make_member(GroupName),
gen_server2:cast(self(), join),
{ok, #state { self = Self,
left = {Self, undefined},
right = {Self, undefined},
group_name = GroupName,
module = Module,
view = undefined,
pub_count = -1,
members_state = undefined,
callback_args = Args,
confirms = queue:new(),
broadcast_buffer = [],
broadcast_buffer_sz = 0,
broadcast_timer = undefined,
Run garbage collection in GM every 250ms In high throughput scenarios, e.g. `basic.reject` or `basic.nack`, messages which belong to a mirrored queue and are replicated within a GM group, are quickly promoted to the old heap. This means that garbage collection happens only when the Erlang VM is under memory pressure, which might be too late. When a process is under pressure, garbage collection slows it down even further, to the point of RabbitMQ nodes running out of memory and crashing. To avoid this scenario, We want the GM process to garbage collect binaries regularly, i.e. every 250ms. The variable queue does the same for a similar reason: rabbitmq/rabbitmq-server#289 Initially, we wanted to use the number of messages as the trigger for garbage collection, but we soon discovered that different workloads (e.g. small vs large messages) would result in unpredictable and sub-optimal GC schedules. Before setting `fullsweep_after` to 0, memory usage was 2x higher (400MB vs 200MB) and throughput was 0.1x lower (18k vs 20k). With this `spawn_opt` setting, the general collection algorithm is disabled, meaning that all live data is copied at every garbage collection: http://erlang.org/doc/man/erlang.html#spawn_opt-3 The RabbitMQ deployment used for testing this change: * AWS, c4.2xlarge, bosh-aws-xen-hvm-ubuntu-trusty-go_agent 3421.11 * 3 RabbitMQ nodes running OTP 20.0.1 * 3 durable & auto-delete queues with 3 replicas each * each queue master was defined on a different RabbitMQ node * every RabbitMQ node was running 1 queue master & 2 queue slaves * 1 consumer per queue with QOS 100 * 100 durable messages @ 1KiB each * `basic.reject` operations ``` | Node | Message throughput | Memory usage | | ------ | -------------------- | -------------- | | rmq0 | 12K - 20K msg/s | 400 - 900 MB | | rmq1 | 12K - 20K msg/s | 500 - 1000 MB | | rmq2 | 12K - 20K msg/s | 500 - 800 MB | ``` [#148892851] Signed-off-by: Gerhard Lazu <gerhard@rabbitmq.com>
2017-07-21 01:37:19 +08:00
force_gc_timer = undefined,
txn_executor = TxnFun,
shutting_down = false }}.
handle_call({confirmed_broadcast, _Msg}, _From,
State = #state { shutting_down = {true, _} }) ->
reply(shutting_down, State);
handle_call({confirmed_broadcast, _Msg}, _From,
State = #state { members_state = undefined }) ->
reply(not_joined, State);
handle_call({confirmed_broadcast, Msg}, _From,
State = #state { self = Self,
right = {Self, undefined},
module = Module,
callback_args = Args }) ->
handle_callback_result({Module:handle_msg(Args, get_pid(Self), Msg),
ok, State});
handle_call({confirmed_broadcast, Msg}, From, State) ->
{Result, State1 = #state { pub_count = PubCount, confirms = Confirms }} =
internal_broadcast(Msg, 0, State),
Confirms1 = queue:in({PubCount, From}, Confirms),
handle_callback_result({Result, flush_broadcast_buffer(
State1 #state { confirms = Confirms1 })});
handle_call(info, _From,
State = #state { members_state = undefined }) ->
reply(not_joined, State);
handle_call(info, _From, State = #state { group_name = GroupName,
module = Module,
view = View }) ->
reply([{group_name, GroupName},
{module, Module},
{group_members, get_pids(alive_view_members(View))}], State);
handle_call({add_on_right, _NewMember}, _From,
State = #state { members_state = undefined }) ->
reply(not_ready, State);
handle_call({add_on_right, NewMember}, _From,
State = #state { self = Self,
group_name = GroupName,
members_state = MembersState,
2014-06-02 19:19:08 +08:00
txn_executor = TxnFun }) ->
try
Group = record_new_member_in_group(
NewMember, Self, GroupName, TxnFun),
View1 = group_to_view(check_membership(Self, Group)),
MembersState1 = remove_erased_members(MembersState, View1),
ok = send_right(NewMember, View1,
{catchup, Self, prepare_members_state(MembersState1)}),
{Result, State1} = change_view(View1, State #state {
members_state = MembersState1 }),
handle_callback_result({Result, {ok, Group}, State1})
catch
lost_membership ->
{stop, shutdown, State}
end.
2014-07-15 17:32:46 +08:00
%% add_on_right causes a catchup to be sent immediately from the left,
%% so we can never see this from the left neighbour. However, it's
%% possible for the right neighbour to send us a check_neighbours
%% immediately before that. We can't possibly handle it, but if we're
%% in this state we know a catchup is coming imminently anyway. So
%% just ignore it.
handle_cast({?TAG, _ReqVer, check_neighbours},
State = #state { members_state = undefined }) ->
noreply(State);
handle_cast({?TAG, ReqVer, Msg},
State = #state { view = View,
self = Self,
members_state = MembersState,
group_name = GroupName }) ->
try
{Result, State1} =
case needs_view_update(ReqVer, View) of
true ->
View1 = group_to_view(
check_membership(Self,
dirty_read_group(GroupName))),
MemberState1 = remove_erased_members(MembersState, View1),
change_view(View1, State #state {
members_state = MemberState1 });
false -> {ok, State}
end,
handle_callback_result(
if_callback_success(
Result, fun handle_msg_true/3, fun handle_msg_false/3, Msg, State1))
catch
lost_membership ->
{stop, shutdown, State}
end;
handle_cast({broadcast, _Msg, _SizeHint},
State = #state { shutting_down = {true, _} }) ->
noreply(State);
handle_cast({broadcast, _Msg, _SizeHint},
State = #state { members_state = undefined }) ->
noreply(State);
handle_cast({broadcast, Msg, _SizeHint},
State = #state { self = Self,
right = {Self, undefined},
module = Module,
callback_args = Args }) ->
handle_callback_result({Module:handle_msg(Args, get_pid(Self), Msg),
State});
handle_cast({broadcast, Msg, SizeHint}, State) ->
{Result, State1} = internal_broadcast(Msg, SizeHint, State),
handle_callback_result({Result, maybe_flush_broadcast_buffer(State1)});
handle_cast(join, State = #state { self = Self,
group_name = GroupName,
members_state = undefined,
module = Module,
2012-10-17 23:25:40 +08:00
callback_args = Args,
2014-06-02 19:19:08 +08:00
txn_executor = TxnFun }) ->
try
View = join_group(Self, GroupName, TxnFun),
MembersState =
case alive_view_members(View) of
[Self] -> blank_member_state();
_ -> undefined
end,
State1 = check_neighbours(State #state { view = View,
members_state = MembersState }),
handle_callback_result(
{Module:joined(Args, get_pids(all_known_members(View))), State1})
catch
lost_membership ->
{stop, shutdown, State}
end;
handle_cast({validate_members, OldMembers},
State = #state { view = View,
module = Module,
callback_args = Args }) ->
NewMembers = get_pids(all_known_members(View)),
Births = NewMembers -- OldMembers,
Deaths = OldMembers -- NewMembers,
case {Births, Deaths} of
{[], []} -> noreply(State);
_ -> Result = Module:members_changed(Args, Births, Deaths),
handle_callback_result({Result, State})
end;
handle_cast(leave, State) ->
2011-08-02 18:58:03 +08:00
{stop, normal, State}.
Run garbage collection in GM every 250ms In high throughput scenarios, e.g. `basic.reject` or `basic.nack`, messages which belong to a mirrored queue and are replicated within a GM group, are quickly promoted to the old heap. This means that garbage collection happens only when the Erlang VM is under memory pressure, which might be too late. When a process is under pressure, garbage collection slows it down even further, to the point of RabbitMQ nodes running out of memory and crashing. To avoid this scenario, We want the GM process to garbage collect binaries regularly, i.e. every 250ms. The variable queue does the same for a similar reason: rabbitmq/rabbitmq-server#289 Initially, we wanted to use the number of messages as the trigger for garbage collection, but we soon discovered that different workloads (e.g. small vs large messages) would result in unpredictable and sub-optimal GC schedules. Before setting `fullsweep_after` to 0, memory usage was 2x higher (400MB vs 200MB) and throughput was 0.1x lower (18k vs 20k). With this `spawn_opt` setting, the general collection algorithm is disabled, meaning that all live data is copied at every garbage collection: http://erlang.org/doc/man/erlang.html#spawn_opt-3 The RabbitMQ deployment used for testing this change: * AWS, c4.2xlarge, bosh-aws-xen-hvm-ubuntu-trusty-go_agent 3421.11 * 3 RabbitMQ nodes running OTP 20.0.1 * 3 durable & auto-delete queues with 3 replicas each * each queue master was defined on a different RabbitMQ node * every RabbitMQ node was running 1 queue master & 2 queue slaves * 1 consumer per queue with QOS 100 * 100 durable messages @ 1KiB each * `basic.reject` operations ``` | Node | Message throughput | Memory usage | | ------ | -------------------- | -------------- | | rmq0 | 12K - 20K msg/s | 400 - 900 MB | | rmq1 | 12K - 20K msg/s | 500 - 1000 MB | | rmq2 | 12K - 20K msg/s | 500 - 800 MB | ``` [#148892851] Signed-off-by: Gerhard Lazu <gerhard@rabbitmq.com>
2017-07-21 01:37:19 +08:00
handle_info(force_gc, State) ->
garbage_collect(),
noreply(State #state { force_gc_timer = undefined });
2011-08-02 18:58:03 +08:00
handle_info(flush, State) ->
noreply(
flush_broadcast_buffer(State #state { broadcast_timer = undefined }));
handle_info(timeout, State) ->
2012-12-05 03:57:53 +08:00
noreply(flush_broadcast_buffer(State));
handle_info({'DOWN', _MRef, process, _Pid, _Reason},
State = #state { shutting_down =
{true, {shutdown, ring_shutdown}} }) ->
noreply(State);
handle_info({'DOWN', MRef, process, _Pid, Reason},
State = #state { self = Self,
left = Left,
right = Right,
group_name = GroupName,
2012-10-17 23:25:40 +08:00
confirms = Confirms,
txn_executor = TxnFun }) ->
try
check_membership(GroupName),
Member = case {Left, Right} of
{{Member1, MRef}, _} -> Member1;
{_, {Member1, MRef}} -> Member1;
_ -> undefined
end,
case {Member, Reason} of
{undefined, _} ->
noreply(State);
{_, {shutdown, ring_shutdown}} ->
noreply(State);
_ ->
%% In the event of a partial partition we could see another member
%% go down and then remove them from Mnesia. While they can
%% recover from this they'd have to restart the queue - not
%% ideal. So let's sleep here briefly just in case this was caused
%% by a partial partition; in which case by the time we record the
%% member death in Mnesia we will probably be in a full
%% partition and will not be assassinating another member.
timer:sleep(100),
View1 = group_to_view(record_dead_member_in_group(Self,
Member, GroupName, TxnFun, true)),
handle_callback_result(
case alive_view_members(View1) of
[Self] -> maybe_erase_aliases(
State #state {
members_state = blank_member_state(),
confirms = purge_confirms(Confirms) },
View1);
_ -> change_view(View1, State)
end)
end
catch
lost_membership ->
{stop, shutdown, State}
end;
handle_info(_, State) ->
2016-08-17 00:10:37 +08:00
%% Discard any unexpected messages, such as late replies from neighbour_call/2
%% TODO: For #gm_group{} related info messages, it could be worthwhile to
%% change_view/2, as this might reflect an alteration in the gm group, meaning
%% we now need to update our state. see rabbitmq-server#914.
noreply(State).
terminate(Reason, #state { module = Module, callback_args = Args }) ->
Module:handle_terminate(Args, Reason).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
2012-11-08 20:53:27 +08:00
prioritise_info(flush, _Len, _State) ->
1;
%% DOWN messages should not overtake initial catchups; if they do we
%% will receive a DOWN we do not know what to do with.
2012-11-08 20:53:27 +08:00
prioritise_info({'DOWN', _MRef, process, _Pid, _Reason}, _Len,
#state { members_state = undefined }) ->
0;
%% We should not prioritise DOWN messages from our left since
%% otherwise the DOWN can overtake any last activity from the left,
%% causing that activity to be lost.
prioritise_info({'DOWN', _MRef, process, LeftPid, _Reason}, _Len,
#state { left = {{_LeftVer, LeftPid}, _MRef2} }) ->
0;
%% But prioritise all other DOWNs - we want to make sure we are not
%% sending activity into the void for too long because our right is
%% down but we don't know it.
2014-06-02 19:19:57 +08:00
prioritise_info({'DOWN', _MRef, process, _Pid, _Reason}, _Len, _State) ->
1;
2012-11-08 20:53:27 +08:00
prioritise_info(_, _Len, _State) ->
0.
handle_msg(check_neighbours, State) ->
%% no-op - it's already been done by the calling handle_cast
{ok, State};
handle_msg({catchup, Left, MembersStateLeft},
State = #state { self = Self,
left = {Left, _MRefL},
right = {Right, _MRefR},
view = View,
members_state = undefined }) ->
ok = send_right(Right, View, {catchup, Self, MembersStateLeft}),
MembersStateLeft1 = build_members_state(MembersStateLeft),
{ok, State #state { members_state = MembersStateLeft1 }};
handle_msg({catchup, Left, MembersStateLeft},
State = #state { self = Self,
left = {Left, _MRefL},
view = View,
members_state = MembersState })
when MembersState =/= undefined ->
MembersStateLeft1 = build_members_state(MembersStateLeft),
2017-10-30 23:33:35 +08:00
AllMembers = lists:usort(maps:keys(MembersState) ++
maps:keys(MembersStateLeft1)),
{MembersState1, Activity} =
lists:foldl(
fun (Id, MembersStateActivity) ->
#member { pending_ack = PALeft, last_ack = LA } =
find_member_or_blank(Id, MembersStateLeft1),
with_member_acc(
fun (#member { pending_ack = PA } = Member, Activity1) ->
case is_member_alias(Id, Self, View) of
true ->
{_AcksInFlight, Pubs, _PA1} =
find_prefix_common_suffix(PALeft, PA),
{Member #member { last_ack = LA },
activity_cons(Id, pubs_from_queue(Pubs),
[], Activity1)};
false ->
{Acks, _Common, Pubs} =
find_prefix_common_suffix(PA, PALeft),
{Member,
activity_cons(Id, pubs_from_queue(Pubs),
acks_from_queue(Acks),
Activity1)}
end
end, Id, MembersStateActivity)
end, {MembersState, activity_nil()}, AllMembers),
handle_msg({activity, Left, activity_finalise(Activity)},
State #state { members_state = MembersState1 });
handle_msg({catchup, _NotLeft, _MembersState}, State) ->
{ok, State};
handle_msg({activity, Left, Activity},
State = #state { self = Self,
group_name = GroupName,
left = {Left, _MRefL},
view = View,
members_state = MembersState,
confirms = Confirms })
when MembersState =/= undefined ->
try
%% If we have to stop, do it asap so we avoid any ack confirmation
%% Membership must be checked again by erase_members_in_group, as the
%% node can be marked as dead on the meanwhile
check_membership(GroupName),
{MembersState1, {Confirms1, Activity1}} =
calculate_activity(MembersState, Confirms, Activity, Self, View),
State1 = State #state { members_state = MembersState1,
confirms = Confirms1 },
Activity3 = activity_finalise(Activity1),
ok = maybe_send_activity(Activity3, State1),
{Result, State2} = maybe_erase_aliases(State1, View),
if_callback_success(
Result, fun activity_true/3, fun activity_false/3, Activity3, State2)
catch
lost_membership ->
{{stop, shutdown}, State}
end;
handle_msg({activity, _NotLeft, _Activity}, State) ->
{ok, State}.
noreply(State) ->
Run garbage collection in GM every 250ms In high throughput scenarios, e.g. `basic.reject` or `basic.nack`, messages which belong to a mirrored queue and are replicated within a GM group, are quickly promoted to the old heap. This means that garbage collection happens only when the Erlang VM is under memory pressure, which might be too late. When a process is under pressure, garbage collection slows it down even further, to the point of RabbitMQ nodes running out of memory and crashing. To avoid this scenario, We want the GM process to garbage collect binaries regularly, i.e. every 250ms. The variable queue does the same for a similar reason: rabbitmq/rabbitmq-server#289 Initially, we wanted to use the number of messages as the trigger for garbage collection, but we soon discovered that different workloads (e.g. small vs large messages) would result in unpredictable and sub-optimal GC schedules. Before setting `fullsweep_after` to 0, memory usage was 2x higher (400MB vs 200MB) and throughput was 0.1x lower (18k vs 20k). With this `spawn_opt` setting, the general collection algorithm is disabled, meaning that all live data is copied at every garbage collection: http://erlang.org/doc/man/erlang.html#spawn_opt-3 The RabbitMQ deployment used for testing this change: * AWS, c4.2xlarge, bosh-aws-xen-hvm-ubuntu-trusty-go_agent 3421.11 * 3 RabbitMQ nodes running OTP 20.0.1 * 3 durable & auto-delete queues with 3 replicas each * each queue master was defined on a different RabbitMQ node * every RabbitMQ node was running 1 queue master & 2 queue slaves * 1 consumer per queue with QOS 100 * 100 durable messages @ 1KiB each * `basic.reject` operations ``` | Node | Message throughput | Memory usage | | ------ | -------------------- | -------------- | | rmq0 | 12K - 20K msg/s | 400 - 900 MB | | rmq1 | 12K - 20K msg/s | 500 - 1000 MB | | rmq2 | 12K - 20K msg/s | 500 - 800 MB | ``` [#148892851] Signed-off-by: Gerhard Lazu <gerhard@rabbitmq.com>
2017-07-21 01:37:19 +08:00
{noreply, ensure_timers(State), flush_timeout(State)}.
reply(Reply, State) ->
Run garbage collection in GM every 250ms In high throughput scenarios, e.g. `basic.reject` or `basic.nack`, messages which belong to a mirrored queue and are replicated within a GM group, are quickly promoted to the old heap. This means that garbage collection happens only when the Erlang VM is under memory pressure, which might be too late. When a process is under pressure, garbage collection slows it down even further, to the point of RabbitMQ nodes running out of memory and crashing. To avoid this scenario, We want the GM process to garbage collect binaries regularly, i.e. every 250ms. The variable queue does the same for a similar reason: rabbitmq/rabbitmq-server#289 Initially, we wanted to use the number of messages as the trigger for garbage collection, but we soon discovered that different workloads (e.g. small vs large messages) would result in unpredictable and sub-optimal GC schedules. Before setting `fullsweep_after` to 0, memory usage was 2x higher (400MB vs 200MB) and throughput was 0.1x lower (18k vs 20k). With this `spawn_opt` setting, the general collection algorithm is disabled, meaning that all live data is copied at every garbage collection: http://erlang.org/doc/man/erlang.html#spawn_opt-3 The RabbitMQ deployment used for testing this change: * AWS, c4.2xlarge, bosh-aws-xen-hvm-ubuntu-trusty-go_agent 3421.11 * 3 RabbitMQ nodes running OTP 20.0.1 * 3 durable & auto-delete queues with 3 replicas each * each queue master was defined on a different RabbitMQ node * every RabbitMQ node was running 1 queue master & 2 queue slaves * 1 consumer per queue with QOS 100 * 100 durable messages @ 1KiB each * `basic.reject` operations ``` | Node | Message throughput | Memory usage | | ------ | -------------------- | -------------- | | rmq0 | 12K - 20K msg/s | 400 - 900 MB | | rmq1 | 12K - 20K msg/s | 500 - 1000 MB | | rmq2 | 12K - 20K msg/s | 500 - 800 MB | ``` [#148892851] Signed-off-by: Gerhard Lazu <gerhard@rabbitmq.com>
2017-07-21 01:37:19 +08:00
{reply, Reply, ensure_timers(State), flush_timeout(State)}.
ensure_timers(State) ->
ensure_force_gc_timer(ensure_broadcast_timer(State)).
flush_timeout(#state{broadcast_buffer = []}) -> infinity;
flush_timeout(_) -> 0.
Run garbage collection in GM every 250ms In high throughput scenarios, e.g. `basic.reject` or `basic.nack`, messages which belong to a mirrored queue and are replicated within a GM group, are quickly promoted to the old heap. This means that garbage collection happens only when the Erlang VM is under memory pressure, which might be too late. When a process is under pressure, garbage collection slows it down even further, to the point of RabbitMQ nodes running out of memory and crashing. To avoid this scenario, We want the GM process to garbage collect binaries regularly, i.e. every 250ms. The variable queue does the same for a similar reason: rabbitmq/rabbitmq-server#289 Initially, we wanted to use the number of messages as the trigger for garbage collection, but we soon discovered that different workloads (e.g. small vs large messages) would result in unpredictable and sub-optimal GC schedules. Before setting `fullsweep_after` to 0, memory usage was 2x higher (400MB vs 200MB) and throughput was 0.1x lower (18k vs 20k). With this `spawn_opt` setting, the general collection algorithm is disabled, meaning that all live data is copied at every garbage collection: http://erlang.org/doc/man/erlang.html#spawn_opt-3 The RabbitMQ deployment used for testing this change: * AWS, c4.2xlarge, bosh-aws-xen-hvm-ubuntu-trusty-go_agent 3421.11 * 3 RabbitMQ nodes running OTP 20.0.1 * 3 durable & auto-delete queues with 3 replicas each * each queue master was defined on a different RabbitMQ node * every RabbitMQ node was running 1 queue master & 2 queue slaves * 1 consumer per queue with QOS 100 * 100 durable messages @ 1KiB each * `basic.reject` operations ``` | Node | Message throughput | Memory usage | | ------ | -------------------- | -------------- | | rmq0 | 12K - 20K msg/s | 400 - 900 MB | | rmq1 | 12K - 20K msg/s | 500 - 1000 MB | | rmq2 | 12K - 20K msg/s | 500 - 800 MB | ``` [#148892851] Signed-off-by: Gerhard Lazu <gerhard@rabbitmq.com>
2017-07-21 01:37:19 +08:00
ensure_force_gc_timer(State = #state { force_gc_timer = TRef })
when is_reference(TRef) ->
State;
ensure_force_gc_timer(State = #state { force_gc_timer = undefined }) ->
TRef = erlang:send_after(?FORCE_GC_TIMER, self(), force_gc),
State #state { force_gc_timer = TRef }.
ensure_broadcast_timer(State = #state { broadcast_buffer = [],
broadcast_timer = undefined }) ->
State;
ensure_broadcast_timer(State = #state { broadcast_buffer = [],
broadcast_timer = TRef }) ->
2015-11-20 23:40:15 +08:00
_ = erlang:cancel_timer(TRef),
State #state { broadcast_timer = undefined };
ensure_broadcast_timer(State = #state { broadcast_timer = undefined }) ->
2011-08-02 18:58:03 +08:00
TRef = erlang:send_after(?BROADCAST_TIMER, self(), flush),
State #state { broadcast_timer = TRef };
ensure_broadcast_timer(State) ->
State.
internal_broadcast(Msg, SizeHint,
State = #state { self = Self,
pub_count = PubCount,
module = Module,
callback_args = Args,
broadcast_buffer = Buffer,
broadcast_buffer_sz = BufferSize }) ->
PubCount1 = PubCount + 1,
{Module:handle_msg(Args, get_pid(Self), Msg),
State #state { pub_count = PubCount1,
broadcast_buffer = [{PubCount1, Msg} | Buffer],
broadcast_buffer_sz = BufferSize + SizeHint}}.
%% The Erlang distribution mechanism has an interesting quirk - it
%% will kill the VM cold with "Absurdly large distribution output data
%% buffer" if you attempt to send a message which serialises out to
%% more than 2^31 bytes in size. It's therefore a very good idea to
%% make sure that we don't exceed that size!
%%
%% Now, we could figure out the size of messages as they come in using
%% size(term_to_binary(Msg)) or similar. The trouble is, that requires
%% us to serialise the message only to throw the serialised form
%% away. Hard to believe that's a sensible thing to do. So instead we
%% accept a size hint from the application, via broadcast/3. This size
%% hint can be the size of anything in the message which we expect
%% could be large, and we just ignore the size of any small bits of
%% the message term. Therefore MAX_BUFFER_SIZE is set somewhat
%% conservatively at 100MB - but the buffer is only to allow us to
%% buffer tiny messages anyway, so 100MB is plenty.
maybe_flush_broadcast_buffer(State = #state{broadcast_buffer_sz = Size}) ->
case Size > ?MAX_BUFFER_SIZE of
true -> flush_broadcast_buffer(State);
false -> State
end.
flush_broadcast_buffer(State = #state { broadcast_buffer = [] }) ->
State;
flush_broadcast_buffer(State = #state { self = Self,
members_state = MembersState,
broadcast_buffer = Buffer,
pub_count = PubCount }) ->
2012-05-25 18:42:07 +08:00
[{PubCount, _Msg}|_] = Buffer, %% ASSERTION match on PubCount
Pubs = lists:reverse(Buffer),
Activity = activity_cons(Self, Pubs, [], activity_nil()),
ok = maybe_send_activity(activity_finalise(Activity), State),
MembersState1 = with_member(
fun (Member = #member { pending_ack = PA }) ->
PA1 = queue:join(PA, queue:from_list(Pubs)),
Member #member { pending_ack = PA1,
last_pub = PubCount }
end, Self, MembersState),
State #state { members_state = MembersState1,
broadcast_buffer = [],
Run garbage collection in GM every 250ms In high throughput scenarios, e.g. `basic.reject` or `basic.nack`, messages which belong to a mirrored queue and are replicated within a GM group, are quickly promoted to the old heap. This means that garbage collection happens only when the Erlang VM is under memory pressure, which might be too late. When a process is under pressure, garbage collection slows it down even further, to the point of RabbitMQ nodes running out of memory and crashing. To avoid this scenario, We want the GM process to garbage collect binaries regularly, i.e. every 250ms. The variable queue does the same for a similar reason: rabbitmq/rabbitmq-server#289 Initially, we wanted to use the number of messages as the trigger for garbage collection, but we soon discovered that different workloads (e.g. small vs large messages) would result in unpredictable and sub-optimal GC schedules. Before setting `fullsweep_after` to 0, memory usage was 2x higher (400MB vs 200MB) and throughput was 0.1x lower (18k vs 20k). With this `spawn_opt` setting, the general collection algorithm is disabled, meaning that all live data is copied at every garbage collection: http://erlang.org/doc/man/erlang.html#spawn_opt-3 The RabbitMQ deployment used for testing this change: * AWS, c4.2xlarge, bosh-aws-xen-hvm-ubuntu-trusty-go_agent 3421.11 * 3 RabbitMQ nodes running OTP 20.0.1 * 3 durable & auto-delete queues with 3 replicas each * each queue master was defined on a different RabbitMQ node * every RabbitMQ node was running 1 queue master & 2 queue slaves * 1 consumer per queue with QOS 100 * 100 durable messages @ 1KiB each * `basic.reject` operations ``` | Node | Message throughput | Memory usage | | ------ | -------------------- | -------------- | | rmq0 | 12K - 20K msg/s | 400 - 900 MB | | rmq1 | 12K - 20K msg/s | 500 - 1000 MB | | rmq2 | 12K - 20K msg/s | 500 - 800 MB | ``` [#148892851] Signed-off-by: Gerhard Lazu <gerhard@rabbitmq.com>
2017-07-21 01:37:19 +08:00
broadcast_buffer_sz = 0 }.
%% ---------------------------------------------------------------------------
%% View construction and inspection
%% ---------------------------------------------------------------------------
2012-06-11 21:24:29 +08:00
needs_view_update(ReqVer, {Ver, _View}) -> Ver < ReqVer.
2012-06-11 21:24:29 +08:00
view_version({Ver, _View}) -> Ver.
is_member_alive({dead, _Member}) -> false;
is_member_alive(_) -> true.
is_member_alias(Self, Self, _View) ->
true;
is_member_alias(Member, Self, View) ->
?SETS:is_element(Member,
((fetch_view_member(Self, View)) #view_member.aliases)).
dead_member_id({dead, Member}) -> Member.
store_view_member(VMember = #view_member { id = Id }, {Ver, View}) ->
2017-10-30 23:33:35 +08:00
{Ver, maps:put(Id, VMember, View)}.
with_view_member(Fun, View, Id) ->
store_view_member(Fun(fetch_view_member(Id, View)), View).
2017-10-30 23:33:35 +08:00
fetch_view_member(Id, {_Ver, View}) -> maps:get(Id, View).
2017-10-30 23:33:35 +08:00
find_view_member(Id, {_Ver, View}) -> maps:find(Id, View).
2017-10-30 23:33:35 +08:00
blank_view(Ver) -> {Ver, maps:new()}.
2017-10-30 23:33:35 +08:00
alive_view_members({_Ver, View}) -> maps:keys(View).
all_known_members({_Ver, View}) ->
2017-10-30 23:33:35 +08:00
maps:fold(
2011-03-05 03:32:39 +08:00
fun (Member, #view_member { aliases = Aliases }, Acc) ->
?SETS:to_list(Aliases) ++ [Member | Acc]
end, [], View).
group_to_view(#gm_group { members = Members, version = Ver }) ->
Alive = lists:filter(fun is_member_alive/1, Members),
[_|_] = Alive, %% ASSERTION - can't have all dead members
add_aliases(link_view(Alive ++ Alive ++ Alive, blank_view(Ver)), Members).
link_view([Left, Middle, Right | Rest], View) ->
case find_view_member(Middle, View) of
error ->
link_view(
[Middle, Right | Rest],
store_view_member(#view_member { id = Middle,
aliases = ?SETS:new(),
left = Left,
right = Right }, View));
{ok, _} ->
View
end;
link_view(_, View) ->
View.
add_aliases(View, Members) ->
Members1 = ensure_alive_suffix(Members),
{EmptyDeadSet, View1} =
lists:foldl(
fun (Member, {DeadAcc, ViewAcc}) ->
case is_member_alive(Member) of
true ->
{?SETS:new(),
with_view_member(
fun (VMember =
#view_member { aliases = Aliases }) ->
VMember #view_member {
aliases = ?SETS:union(Aliases, DeadAcc) }
end, ViewAcc, Member)};
false ->
{?SETS:add_element(dead_member_id(Member), DeadAcc),
ViewAcc}
end
end, {?SETS:new(), View}, Members1),
0 = ?SETS:size(EmptyDeadSet), %% ASSERTION
View1.
ensure_alive_suffix(Members) ->
queue:to_list(ensure_alive_suffix1(queue:from_list(Members))).
ensure_alive_suffix1(MembersQ) ->
{{value, Member}, MembersQ1} = queue:out_r(MembersQ),
case is_member_alive(Member) of
true -> MembersQ;
false -> ensure_alive_suffix1(queue:in_r(Member, MembersQ1))
end.
%% ---------------------------------------------------------------------------
%% View modification
%% ---------------------------------------------------------------------------
2014-06-02 19:19:08 +08:00
join_group(Self, GroupName, TxnFun) ->
join_group(Self, GroupName, dirty_read_group(GroupName), TxnFun).
2014-06-02 19:19:08 +08:00
join_group(Self, GroupName, {error, not_found}, TxnFun) ->
2012-10-17 23:25:40 +08:00
join_group(Self, GroupName,
2014-06-02 19:19:08 +08:00
prune_or_create_group(Self, GroupName, TxnFun), TxnFun);
join_group(Self, _GroupName, #gm_group { members = [Self] } = Group, _TxnFun) ->
group_to_view(Group);
2014-06-02 19:19:08 +08:00
join_group(Self, GroupName, #gm_group { members = Members } = Group, TxnFun) ->
case lists:member(Self, Members) of
true ->
group_to_view(Group);
false ->
case lists:filter(fun is_member_alive/1, Members) of
[] ->
join_group(Self, GroupName,
2014-05-12 23:35:50 +08:00
prune_or_create_group(Self, GroupName, TxnFun),
2014-06-02 19:19:08 +08:00
TxnFun);
Alive ->
Left = lists:nth(rand:uniform(length(Alive)), Alive),
Handler =
fun () ->
join_group(
Self, GroupName,
record_dead_member_in_group(Self,
Left, GroupName, TxnFun, false),
2014-06-02 19:19:08 +08:00
TxnFun)
end,
try
2014-06-02 20:12:38 +08:00
case neighbour_call(Left, {add_on_right, Self}) of
{ok, Group1} -> group_to_view(Group1);
not_ready -> join_group(Self, GroupName, TxnFun)
end
catch
exit:{R, _}
when R =:= noproc; R =:= normal; R =:= shutdown ->
Handler();
exit:{{R, _}, _}
when R =:= nodedown; R =:= shutdown ->
Handler()
end
end
end.
dirty_read_group(GroupName) ->
case mnesia:dirty_read(?GROUP_TABLE, GroupName) of
[] -> {error, not_found};
[Group] -> Group
end.
read_group(GroupName) ->
case mnesia:read({?GROUP_TABLE, GroupName}) of
[] -> {error, not_found};
[Group] -> Group
end.
write_group(Group) -> mnesia:write(?GROUP_TABLE, Group, write), Group.
2012-10-17 23:25:40 +08:00
prune_or_create_group(Self, GroupName, TxnFun) ->
2014-04-21 19:57:30 +08:00
TxnFun(
fun () ->
GroupNew = #gm_group { name = GroupName,
members = [Self],
version = get_version(Self) },
case read_group(GroupName) of
{error, not_found} ->
2014-04-21 19:57:30 +08:00
write_group(GroupNew);
Group = #gm_group { members = Members } ->
2014-04-21 19:57:30 +08:00
case lists:any(fun is_member_alive/1, Members) of
true -> Group;
false -> write_group(GroupNew)
2012-10-19 21:19:48 +08:00
end
2014-04-21 19:57:30 +08:00
end
end).
record_dead_member_in_group(Self, Member, GroupName, TxnFun, Verify) ->
Fun =
fun () ->
try
Group = #gm_group { members = Members, version = Ver } =
case Verify of
true ->
check_membership(Self, read_group(GroupName));
false ->
2016-09-11 16:50:56 +08:00
check_group(read_group(GroupName))
end,
case lists:splitwith(
fun (Member1) -> Member1 =/= Member end, Members) of
{_Members1, []} -> %% not found - already recorded dead
Group;
{Members1, [Member | Members2]} ->
Members3 = Members1 ++ [{dead, Member} | Members2],
write_group(Group #gm_group { members = Members3,
version = Ver + 1 })
end
catch
lost_membership ->
%% The transaction must not be abruptly crashed, but
%% leave the gen_server to stop normally
{error, lost_membership}
end
end,
handle_lost_membership_in_txn(TxnFun, Fun).
handle_lost_membership_in_txn(TxnFun, Fun) ->
case TxnFun(Fun) of
{error, lost_membership = T} ->
throw(T);
Any ->
Any
end.
record_new_member_in_group(NewMember, Left, GroupName, TxnFun) ->
Fun =
fun () ->
try
Group = #gm_group { members = Members, version = Ver } =
check_membership(Left, read_group(GroupName)),
case lists:member(NewMember, Members) of
true ->
%% This avois duplicates during partial partitions,
%% as inconsistent views might happen during them
rabbit_log:warning("(~tp) GM avoiding duplicate of ~tp",
[self(), NewMember]),
Group;
false ->
{Prefix, [Left | Suffix]} =
lists:splitwith(fun (M) -> M =/= Left end, Members),
write_group(Group #gm_group {
members = Prefix ++ [Left, NewMember | Suffix],
version = Ver + 1 })
end
catch
lost_membership ->
%% The transaction must not be abruptly crashed, but
%% leave the gen_server to stop normally
{error, lost_membership}
end
end,
handle_lost_membership_in_txn(TxnFun, Fun).
erase_members_in_group(Self, Members, GroupName, TxnFun) ->
DeadMembers = [{dead, Id} || Id <- Members],
Fun =
fun () ->
try
Group = #gm_group { members = [_|_] = Members1, version = Ver } =
check_membership(Self, read_group(GroupName)),
case Members1 -- DeadMembers of
Members1 -> Group;
Members2 -> write_group(
Group #gm_group { members = Members2,
version = Ver + 1 })
end
catch
lost_membership ->
%% The transaction must not be abruptly crashed, but
%% leave the gen_server to stop normally
{error, lost_membership}
2014-04-21 19:57:30 +08:00
end
end,
handle_lost_membership_in_txn(TxnFun, Fun).
maybe_erase_aliases(State = #state { self = Self,
group_name = GroupName,
members_state = MembersState,
2012-10-17 23:25:40 +08:00
txn_executor = TxnFun }, View) ->
#view_member { aliases = Aliases } = fetch_view_member(Self, View),
{Erasable, MembersState1}
= ?SETS:fold(
2011-03-05 03:32:39 +08:00
fun (Id, {ErasableAcc, MembersStateAcc} = Acc) ->
#member { last_pub = LP, last_ack = LA } =
find_member_or_blank(Id, MembersState),
case can_erase_view_member(Self, Id, LA, LP) of
true -> {[Id | ErasableAcc],
erase_member(Id, MembersStateAcc)};
false -> Acc
end
end, {[], MembersState}, Aliases),
View1 = case Erasable of
[] -> View;
_ -> group_to_view(
erase_members_in_group(Self, Erasable, GroupName, TxnFun))
end,
change_view(View1, State #state { members_state = MembersState1 }).
can_erase_view_member(Self, Self, _LA, _LP) -> false;
can_erase_view_member(_Self, _Id, N, N) -> true;
can_erase_view_member(_Self, _Id, _LA, _LP) -> false.
2014-06-02 20:16:21 +08:00
neighbour_cast(N, Msg) -> ?INSTR_MOD:cast(get_pid(N), Msg).
neighbour_call(N, Msg) -> ?INSTR_MOD:call(get_pid(N), Msg, infinity).
%% ---------------------------------------------------------------------------
2019-02-13 01:12:51 +08:00
%% View monitoring and maintenance
%% ---------------------------------------------------------------------------
2014-06-02 19:19:08 +08:00
ensure_neighbour(_Ver, Self, {Self, undefined}, Self) ->
{Self, undefined};
2014-06-02 19:19:08 +08:00
ensure_neighbour(Ver, Self, {Self, undefined}, RealNeighbour) ->
2014-06-02 20:12:38 +08:00
ok = neighbour_cast(RealNeighbour, {?TAG, Ver, check_neighbours}),
{RealNeighbour, maybe_monitor(RealNeighbour, Self)};
2014-06-02 19:19:08 +08:00
ensure_neighbour(_Ver, _Self, {RealNeighbour, MRef}, RealNeighbour) ->
{RealNeighbour, MRef};
2014-06-02 19:19:08 +08:00
ensure_neighbour(Ver, Self, {RealNeighbour, MRef}, Neighbour) ->
true = ?INSTR_MOD:demonitor(MRef),
Msg = {?TAG, Ver, check_neighbours},
2014-06-02 20:12:38 +08:00
ok = neighbour_cast(RealNeighbour, Msg),
ok = case Neighbour of
Self -> ok;
2014-06-02 20:12:38 +08:00
_ -> neighbour_cast(Neighbour, Msg)
end,
{Neighbour, maybe_monitor(Neighbour, Self)}.
2012-06-11 21:24:29 +08:00
maybe_monitor( Self, Self) -> undefined;
maybe_monitor(Other, _Self) -> ?INSTR_MOD:monitor(get_pid(Other)).
check_neighbours(State = #state { self = Self,
left = Left,
right = Right,
view = View,
2014-06-02 19:19:08 +08:00
broadcast_buffer = Buffer }) ->
#view_member { left = VLeft, right = VRight }
= fetch_view_member(Self, View),
Ver = view_version(View),
2014-06-02 19:19:08 +08:00
Left1 = ensure_neighbour(Ver, Self, Left, VLeft),
Right1 = ensure_neighbour(Ver, Self, Right, VRight),
Buffer1 = case Right1 of
{Self, undefined} -> [];
_ -> Buffer
end,
State1 = State #state { left = Left1, right = Right1,
broadcast_buffer = Buffer1 },
ok = maybe_send_catchup(Right, State1),
State1.
maybe_send_catchup(Right, #state { right = Right }) ->
ok;
maybe_send_catchup(_Right, #state { self = Self,
right = {Self, undefined} }) ->
ok;
maybe_send_catchup(_Right, #state { members_state = undefined }) ->
ok;
maybe_send_catchup(_Right, #state { self = Self,
right = {Right, _MRef},
view = View,
2014-06-02 19:19:08 +08:00
members_state = MembersState }) ->
send_right(Right, View,
2014-06-02 19:19:08 +08:00
{catchup, Self, prepare_members_state(MembersState)}).
%% ---------------------------------------------------------------------------
%% Catch_up delta detection
%% ---------------------------------------------------------------------------
find_prefix_common_suffix(A, B) ->
{Prefix, A1} = find_prefix(A, B, queue:new()),
{Common, Suffix} = find_common(A1, B, queue:new()),
{Prefix, Common, Suffix}.
%% Returns the elements of A that occur before the first element of B,
%% plus the remainder of A.
find_prefix(A, B, Prefix) ->
case {queue:out(A), queue:out(B)} of
{{{value, Val}, _A1}, {{value, Val}, _B1}} ->
{Prefix, A};
{{empty, A1}, {{value, _A}, _B1}} ->
{Prefix, A1};
{{{value, {NumA, _MsgA} = Val}, A1},
{{value, {NumB, _MsgB}}, _B1}} when NumA < NumB ->
find_prefix(A1, B, queue:in(Val, Prefix));
{_, {empty, _B1}} ->
{A, Prefix} %% Prefix well be empty here
end.
%% A should be a prefix of B. Returns the commonality plus the
%% remainder of B.
find_common(A, B, Common) ->
case {queue:out(A), queue:out(B)} of
{{{value, Val}, A1}, {{value, Val}, B1}} ->
find_common(A1, B1, queue:in(Val, Common));
{{empty, _A}, _} ->
{Common, B};
%% Drop value from B.
%% Match value to avoid infinite loop, since {empty, B} = queue:out(B).
{_, {{value, _}, B1}} ->
find_common(A, B1, Common);
%% Drop value from A. Empty A should be matched by second close.
{{{value, _}, A1}, _} ->
find_common(A1, B, Common)
end.
%% ---------------------------------------------------------------------------
%% Members helpers
%% ---------------------------------------------------------------------------
with_member(Fun, Id, MembersState) ->
store_member(
Id, Fun(find_member_or_blank(Id, MembersState)), MembersState).
with_member_acc(Fun, Id, {MembersState, Acc}) ->
{MemberState, Acc1} = Fun(find_member_or_blank(Id, MembersState), Acc),
{store_member(Id, MemberState, MembersState), Acc1}.
find_member_or_blank(Id, MembersState) ->
2017-10-30 23:33:35 +08:00
case maps:find(Id, MembersState) of
{ok, Result} -> Result;
error -> blank_member()
end.
2017-10-30 23:33:35 +08:00
erase_member(Id, MembersState) -> maps:remove(Id, MembersState).
blank_member() ->
#member { pending_ack = queue:new(), last_pub = -1, last_ack = -1 }.
2017-10-30 23:33:35 +08:00
blank_member_state() -> maps:new().
store_member(Id, MemberState, MembersState) ->
2017-10-30 23:33:35 +08:00
maps:put(Id, MemberState, MembersState).
2017-10-30 23:33:35 +08:00
prepare_members_state(MembersState) -> maps:to_list(MembersState).
2017-10-30 23:33:35 +08:00
build_members_state(MembersStateList) -> maps:from_list(MembersStateList).
2011-12-12 22:02:38 +08:00
make_member(GroupName) ->
{case dirty_read_group(GroupName) of
2011-12-12 22:02:38 +08:00
#gm_group { version = Version } -> Version;
{error, not_found} -> ?VERSION_START
end, self()}.
2011-12-10 00:17:06 +08:00
remove_erased_members(MembersState, View) ->
lists:foldl(fun (Id, MembersState1) ->
store_member(Id, find_member_or_blank(Id, MembersState),
MembersState1)
end, blank_member_state(), all_known_members(View)).
get_version({Version, _Pid}) -> Version.
2011-12-12 22:02:38 +08:00
get_pid({_Version, Pid}) -> Pid.
2011-12-12 22:02:38 +08:00
get_pids(Ids) -> [Pid || {_Version, Pid} <- Ids].
%% ---------------------------------------------------------------------------
%% Activity assembly
%% ---------------------------------------------------------------------------
2012-06-11 21:24:29 +08:00
activity_nil() -> queue:new().
2012-06-11 21:24:29 +08:00
activity_cons( _Id, [], [], Tail) -> Tail;
activity_cons(Sender, Pubs, Acks, Tail) -> queue:in({Sender, Pubs, Acks}, Tail).
2012-06-11 21:24:29 +08:00
activity_finalise(Activity) -> queue:to_list(Activity).
maybe_send_activity([], _State) ->
ok;
maybe_send_activity(Activity, #state { self = Self,
right = {Right, _MRefR},
2014-06-02 19:19:08 +08:00
view = View }) ->
send_right(Right, View, {activity, Self, Activity}).
2014-06-02 19:19:08 +08:00
send_right(Right, View, Msg) ->
2014-06-02 20:12:38 +08:00
ok = neighbour_cast(Right, {?TAG, view_version(View), Msg}).
calculate_activity(MembersState, Confirms, Activity, Self, View) ->
lists:foldl(
fun ({Id, Pubs, Acks}, MembersStateConfirmsActivity) ->
with_member_acc(
fun (Member = #member { pending_ack = PA,
last_pub = LP,
last_ack = LA },
{Confirms2, Activity2}) ->
case is_member_alias(Id, Self, View) of
true ->
{ToAck, PA1} =
find_common(queue_from_pubs(Pubs), PA,
queue:new()),
LA1 = last_ack(Acks, LA),
AckNums = acks_from_queue(ToAck),
Confirms3 = maybe_confirm(
Self, Id, Confirms2, AckNums),
{Member #member { pending_ack = PA1,
last_ack = LA1 },
{Confirms3,
activity_cons(
Id, [], AckNums, Activity2)}};
false ->
PA1 = apply_acks(Acks, join_pubs(PA, Pubs)),
LA1 = last_ack(Acks, LA),
LP1 = last_pub(Pubs, LP),
{Member #member { pending_ack = PA1,
last_pub = LP1,
last_ack = LA1 },
{Confirms2,
activity_cons(Id, Pubs, Acks, Activity2)}}
end
end, Id, MembersStateConfirmsActivity)
end, {MembersState, {Confirms, activity_nil()}}, Activity).
callback(Args, Module, Activity) ->
Result =
lists:foldl(
fun ({Id, Pubs, _Acks}, {Args1, Module1, ok}) ->
lists:foldl(fun ({_PubNum, Pub}, Acc = {Args2, Module2, ok}) ->
case Module2:handle_msg(
Args2, get_pid(Id), Pub) of
ok ->
Acc;
{become, Module3, Args3} ->
{Args3, Module3, ok};
{stop, _Reason} = Error ->
Error
end;
(_, Error = {stop, _Reason}) ->
Error
end, {Args1, Module1, ok}, Pubs);
(_, Error = {stop, _Reason}) ->
Error
end, {Args, Module, ok}, Activity),
case Result of
{Args, Module, ok} -> ok;
{Args1, Module1, ok} -> {become, Module1, Args1};
{stop, _Reason} = Error -> Error
end.
change_view(View, State = #state { view = View0,
module = Module,
callback_args = Args }) ->
OldMembers = all_known_members(View0),
NewMembers = all_known_members(View),
Births = NewMembers -- OldMembers,
Deaths = OldMembers -- NewMembers,
Result = case {Births, Deaths} of
{[], []} -> ok;
_ -> Module:members_changed(
Args, get_pids(Births), get_pids(Deaths))
end,
{Result, check_neighbours(State #state { view = View })}.
handle_callback_result({Result, State}) ->
if_callback_success(
Result, fun no_reply_true/3, fun no_reply_false/3, undefined, State);
handle_callback_result({Result, Reply, State}) ->
if_callback_success(
Result, fun reply_true/3, fun reply_false/3, Reply, State).
no_reply_true (_Result, _Undefined, State) -> noreply(State).
no_reply_false({stop, Reason}, _Undefined, State) -> {stop, Reason, State}.
reply_true (_Result, Reply, State) -> reply(Reply, State).
reply_false({stop, Reason}, Reply, State) -> {stop, Reason, Reply, State}.
handle_msg_true (_Result, Msg, State) -> handle_msg(Msg, State).
handle_msg_false(Result, _Msg, State) -> {Result, State}.
activity_true(_Result, Activity, State = #state { module = Module,
callback_args = Args }) ->
{callback(Args, Module, Activity), State}.
activity_false(Result, _Activity, State) ->
{Result, State}.
if_callback_success(Result, True, False, Arg, State) ->
{NewResult, NewState} = maybe_stop(Result, State),
if_callback_success1(NewResult, True, False, Arg, NewState).
if_callback_success1(ok, True, _False, Arg, State) ->
True(ok, Arg, State);
if_callback_success1(
{become, Module, Args} = Result, True, _False, Arg, State) ->
True(Result, Arg, State #state { module = Module,
callback_args = Args });
if_callback_success1({stop, _Reason} = Result, _True, False, Arg, State) ->
False(Result, Arg, State).
maybe_stop({stop, Reason}, #state{ shutting_down = false } = State) ->
ShuttingDown = {true, Reason},
case has_pending_messages(State) of
true -> {ok, State #state{ shutting_down = ShuttingDown }};
false -> {{stop, Reason}, State #state{ shutting_down = ShuttingDown }}
end;
maybe_stop(Result, #state{ shutting_down = false } = State) ->
{Result, State};
maybe_stop(Result, #state{ shutting_down = {true, Reason} } = State) ->
case has_pending_messages(State) of
true -> {Result, State};
false -> {{stop, Reason}, State}
end.
has_pending_messages(#state{ broadcast_buffer = Buffer })
when Buffer =/= [] ->
true;
has_pending_messages(#state{ members_state = MembersState }) ->
MembersWithPubAckMismatches = maps:filter(fun(_Id, #member{last_pub = LP, last_ack = LA}) ->
LP =/= LA
end, MembersState),
0 =/= maps:size(MembersWithPubAckMismatches).
maybe_confirm(_Self, _Id, Confirms, []) ->
Confirms;
maybe_confirm(Self, Self, Confirms, [PubNum | PubNums]) ->
case queue:out(Confirms) of
{empty, _Confirms} ->
Confirms;
{{value, {PubNum, From}}, Confirms1} ->
gen_server2:reply(From, ok),
maybe_confirm(Self, Self, Confirms1, PubNums);
{{value, {PubNum1, _From}}, _Confirms} when PubNum1 > PubNum ->
maybe_confirm(Self, Self, Confirms, PubNums)
end;
maybe_confirm(_Self, _Id, Confirms, _PubNums) ->
Confirms.
purge_confirms(Confirms) ->
2015-11-20 23:40:15 +08:00
_ = [gen_server2:reply(From, ok) || {_PubNum, From} <- queue:to_list(Confirms)],
queue:new().
%% ---------------------------------------------------------------------------
%% Msg transformation
%% ---------------------------------------------------------------------------
2012-06-11 21:24:29 +08:00
acks_from_queue(Q) -> [PubNum || {PubNum, _Msg} <- queue:to_list(Q)].
2012-06-11 21:24:29 +08:00
pubs_from_queue(Q) -> queue:to_list(Q).
2012-06-11 21:24:29 +08:00
queue_from_pubs(Pubs) -> queue:from_list(Pubs).
2012-06-11 21:24:29 +08:00
apply_acks( [], Pubs) -> Pubs;
apply_acks(List, Pubs) -> {_, Pubs1} = queue:split(length(List), Pubs),
Pubs1.
join_pubs(Q, []) -> Q;
join_pubs(Q, Pubs) -> queue:join(Q, queue_from_pubs(Pubs)).
2012-06-11 21:24:29 +08:00
last_ack( [], LA) -> LA;
last_ack(List, LA) -> LA1 = lists:last(List),
true = LA1 > LA, %% ASSERTION
LA1.
last_pub( [], LP) -> LP;
last_pub(List, LP) -> {PubNum, _Msg} = lists:last(List),
true = PubNum > LP, %% ASSERTION
PubNum.
%% ---------------------------------------------------------------------------
%% Uninstrumented versions
call(Pid, Msg, Timeout) -> gen_server2:call(Pid, Msg, Timeout).
cast(Pid, Msg) -> gen_server2:cast(Pid, Msg).
monitor(Pid) -> erlang:monitor(process, Pid).
demonitor(MRef) -> erlang:demonitor(MRef).
check_membership(Self, #gm_group{members = M} = Group) ->
case lists:member(Self, M) of
true ->
Group;
false ->
throw(lost_membership)
end;
check_membership(_Self, {error, not_found}) ->
throw(lost_membership).
check_membership(GroupName) ->
case dirty_read_group(GroupName) of
#gm_group{members = M} ->
case lists:keymember(self(), 2, M) of
true ->
ok;
false ->
throw(lost_membership)
end;
{error, not_found} ->
throw(lost_membership)
end.
2016-09-11 16:50:56 +08:00
check_group({error, not_found}) ->
throw(lost_membership);
check_group(Any) ->
Any.