144 lines
5.8 KiB
Plaintext
144 lines
5.8 KiB
Plaintext
This file is intended to tell you How It All Works, concentrating on
|
|
the things you might not expect.
|
|
|
|
The theory
|
|
==========
|
|
|
|
The 'x-federation' exchange is defined in
|
|
rabbit_federation_exchange. This starts up a bunch of link processes
|
|
(one for each upstream) which:
|
|
|
|
* Connect to the upstream broker
|
|
* Create a queue and bind it to the upstream exchange
|
|
* Keep bindings in sync with the downstream exchange
|
|
* Consume messages from the upstream queue and republish them to the
|
|
downstream exchange (matching confirms with acks)
|
|
|
|
Each link process monitors the connections / channels it opens, and
|
|
dies if they do. We use a supervisor2 to ensure that we get some
|
|
backoff when restarting.
|
|
|
|
We use process groups to identify all link processes for a certain
|
|
exchange, as well as all link processes together.
|
|
|
|
However, there are a bunch of wrinkles:
|
|
|
|
|
|
Wrinkle: The exchange will be recovered when the Erlang client is not available
|
|
===============================================================================
|
|
|
|
Exchange recovery happens within the rabbit application - therefore at
|
|
the time that the exchange is recovered, we can't make any connections
|
|
since the amqp_client application has not yet started. Each link
|
|
therefore initially has a state 'not_started'. When it is created it
|
|
checks to see if the rabbitmq_federation application is running. If
|
|
so, it starts fully. If not, it goes into the 'not_started'
|
|
state. When rabbitmq_federation starts, it sends a 'go' message to all
|
|
links, prodding them to bring up the link.
|
|
|
|
|
|
Wrinkle: On reconnect we want to assert bindings atomically
|
|
===========================================================
|
|
|
|
If the link goes down for whatever reason, then by the time it comes
|
|
up again the bindings downstream may no longer be in sync with those
|
|
upstream. Therefore on link establishment we want to ensure that a
|
|
certain set of bindings exists. (Of course bringing up a link for the
|
|
first time is a simple case of this.) And we want to do this with AMQP
|
|
methods. But if we were to tear down all bindings and recreate them,
|
|
we would have a time period when messages would not be forwarded for
|
|
bindings that *do* still exist before and after.
|
|
|
|
We use exchange to exchange bindings to work around this:
|
|
|
|
We bind the upstream exchange (X) to the upstream queue (Q) via an
|
|
internal fanout exchange (IXA) like so: (routing keys R1 and R2):
|
|
|
|
X----R1,R2--->IXA---->Q
|
|
|
|
This has the same effect as binding the queue to the exchange directly.
|
|
|
|
Now imagine the link has gone down, and is about to be
|
|
reestablished. In the meanwhile, routing has changed downstream so
|
|
that we now want routing keys R1 and R3. On link reconnection we can
|
|
create and bind another internal fanout exchange IXB:
|
|
|
|
X----R1,R2--->IXA---->Q
|
|
| ^
|
|
| |
|
|
\----R1,R3--->IXB-----/
|
|
|
|
and then delete the original exchange IXA:
|
|
|
|
X Q
|
|
| ^
|
|
| |
|
|
\----R1,R3--->IXB-----/
|
|
|
|
This means that messages matching R1 are always routed during the
|
|
switchover. Messages for R3 will start being routed as soon as we bind
|
|
the second exchange, and messages for R2 will be stopped in a timely
|
|
way. Of course this could lag the downstream situation somewhat, in
|
|
which case some R2 messages will get thrown away downstream since they
|
|
are unroutable. However this lag is inevitable when the link goes
|
|
down.
|
|
|
|
This means that the downstream only needs to keep track of whether the
|
|
upstream is currently going via internal exchange A or B. This is
|
|
held in the exchange scratch space in Mnesia.
|
|
|
|
|
|
Wrinkle: We need to amalgamate bindings
|
|
=======================================
|
|
|
|
Since we only bind to one exchange upstream, but the downstream
|
|
exchange can be bound to many queues, we can have duplicated bindings
|
|
downstream (same source, routing key and args but different
|
|
destination) that cannot be duplicated upstream (since the destination
|
|
is the same). The link therefore maintains a mapping of (Key, Args) to
|
|
set(Dest). Duplicated bindings do not get repeated upstream, and are
|
|
only unbound upstream when the last one goes away downstream.
|
|
|
|
Furthermore, this works as an optimisation since this will tend to
|
|
reduce upstream binding count and churn.
|
|
|
|
|
|
Wrinkle: We may receive binding events out of order
|
|
===================================================
|
|
|
|
The rabbit_federation_exchange callbacks are invoked by channel
|
|
processes within rabbit. Therefore they can be executed concurrently,
|
|
and can arrive at the link processes in an order that does not
|
|
correspond to the wall clock.
|
|
|
|
We need to keep the state of the link in sync with Mnesia. Therefore
|
|
not only do we need to impose an ordering on these events, we need to
|
|
impose Mnesia's ordering on them. We therefore added a function to the
|
|
callback interface, serialise_events. When this returns true, the
|
|
callback mechanism inside rabbit increments a per-exchange counter
|
|
within an Mnesia transaction, and returns the value as part of the
|
|
add_binding and remove_binding callbacks. The link process then queues
|
|
up these events, and replays them in order. The link process's state
|
|
thus always follows Mnesia (it may be delayed, but the effects happen
|
|
in the same order).
|
|
|
|
|
|
Other issues
|
|
============
|
|
|
|
Since links are implemented in terms of AMQP, link failure may cause
|
|
messages to be redelivered. If you're unlucky this could lead to
|
|
duplication.
|
|
|
|
Message duplication can also happen with some topologies. In some
|
|
cases it may not be possible to set max_hops such that messages arrive
|
|
once at every node.
|
|
|
|
While we correctly order bind / unbind events, we don't do the same
|
|
thing for exchange creation / deletion. (This is harder - if you
|
|
delete and recreate an exchange with the same name, is it the same
|
|
exchange? What about if its type changes?) This would only be an issue
|
|
if exchanges churn rapidly; however we could get into a state where
|
|
Mnesia sees CDCD but we see CDDC and leave a process running when we
|
|
shouldn't.
|