## What?
Support for quorum queues:
* JMS message selectors
* AMQP property filter expressions on header and properties
sections (as already supported in streams)
* message groups
## How?
* performance tests of different PoCs on
https://github.com/ansd/message_selector_perf
* Message selector feature is enabled statically on the QQ via queue arg x-filter-enabled=true
* QQ holds application-properties and a subset of well defined fields of
the header and properties sections in memory. The latter are defined
by x-(filter-field-names).
* JMS spec: “For best performance, applications should only use message properties when they
need to customize a message’s header. The primary reason for doing this is to support
customized message selection.”
Thefore it should be fine for the session to always include all application-properties
in the Ra enqueue command.
* In the enqueue{} Ra command, provide a #{Key => Value} map
* If the Key is atom(), it is a well defined header or properties field name
* If the Key is binary(), it is an (arbitrary) application-properties key
* filter-field-names policy change takes effect for senders of new sessions
* For high prio or normal prio messages, instead of rabbit_fifo_q we use
gb_trees
* each QQ consumer remembers its last scanned Ra index and continues
scanning where it left off previously
* Ideally we need a skip list instead of gb_trees because it's simple, avoids O(n)
rebalance, and could even append in O(1). However, this cannot be implemented in
Erlang. A NIF would be one solution, but isn't interoperable.
* gb_trees rebalancing should be acceptable given that emitting all Ra live
indexes requires O(N) space anyway
* Use another gb_trees for prompt message expiry
* Add/recommend limits for
* max consumers per queue
* max messages per queue
* max metadata size per message
The latter two provide an overall limit on memory usage.
* Round robin isn't guaranteed in all cases. However, this should be
okay because JMS doesn't define how a broker should dispatch messages:
> "Apart from the requirements of any message selectors, Jakarta
Messaging does not define how messages are distributed between multiple
consumers on the same queue."
* rabbit_quorum_queue parses the consumer arg JMS SQL expression.
This way, it’s easy to create consumers with SQL expressions for AMQP 0.9.1 or STOMP, in addition to AMQP 1.0.
Maybe prohibit though?
* JMS SQL expressions and grammar is declared declaraively
* leex is used to scan the SQL string into a flat structure of tokens.
* yecc parses this flat list of tokens into an Erlang expression tree,
which is then provided to the QQ in the Ra #checkout{} command.
* QQ then does the simple matching logic by taking the already parsed
SQL String (similar to module sjx_evaluator) and a map of keys to values.
* We keep the JMS selector matching and AMQP filter expression matching
separate since they are both sufficiently simple.
* This keeps the state machine simple because both parsing of the SQL
expression for the consumer and parsing the mc message fields for the
enqueuer are both done outside the state machine.
* To keep the state machine deterministic, regexes are avoided for SQL 'LIKE'
expressions. From OTP 28 release notes:
"It is worth noting that the internal format produced by re:compile/2 has
changed in Erlang/OTP 28. It cannot be reused across nodes or OTP versions."
Holding compiled regexes in rabbit_fifo state is problematic because
the snapshot can be sent to another node running a different OTP version.
* Support for quorum queues the same AMQP property filter expressions as
for streams, that is application-properties and properties sections,
i.e. the bare message headers.
1. annotations (delivery-annotations, message-annotations, footer sections)
can have complex types which makes filtering more complex
2. filter expressions for streams support only properties and application-properties
3. all JMS Headers and JMS Properties are mapped to one of AMQP header, properties,
application-properties sections because JMS property values can only be simple types
TODOs:
* new Ra state machine version
* Add some configurable limit for max metadata size per message to hold in memory.
Could be configured by x-filter-max-in-memory-bytes-per-msg.
rabbit_fifo_client could then estimate the memory overhead can directly reject the message if too large.
Keeps the state machine simple.
Rejecting the message is fast.
Saves resources: no need to write it too the log.
Define map iteration order.
Prior to this commit map iteration order was undefined and could
therefore be different on different versions of Erlang/OTP leading to
non-determinism on different QQ members. For example, different QQ
members could potentially return messages in a different order.
Helps cleaning-up/coloring stdout for parallel targets
TODO: there are obvious races for different nodes outputs
In the next iteration I hope to implement cursor tracking for each node
As a follow-up to my GChat thread about removing default logger handler to clean CT stdout, I was looking at
injecting logger config with undefined default handler to ct_run. It is possible but breaks cth_styledout - no
nice green things whatsoever. Then I found rabbit_ct_hook which calls redirect_logger_to_ct_logs which in turn
calls logger:remove_handler(default) apparently with zero effect! To cut story short - turned out rabbit_ct_hook
must run before cth_styledout for remove_handler line to have any effect