Fix test flake session_takeover_v3_v5
Prior to this commit the follwing test was flaky:
```
bazel test //deps/rabbitmq_mqtt:v5_SUITE -t- --test_sharding_strategy=disabled \
--test_env FOCUS="-group [mqtt,cluster_size_3] -case session_takeover_v3_v5" \
--test_env RABBITMQ_METADATA_STORE=khepri --config=rbe-26 --runs_per_test=20
```
because rabbit_misc:maps_any/2 filtered out a destination queue after routing if
that destination queue wasn't associated with any matched binding key.
This commit makes the test green.
However, the root cause of this issue isn't solved:
MQTT 5.0 requires the topic exchange to return matched binding keys for
destination queues such that feature NoLocal, and Subscription
Identifiers work correctly.
The current MQTT plugin relies on session state to be stored
consistently in the database. When a new client connects, the session
state is retrieved from the database and appropriate actions are taken:
e.g. consume from a queue, modify binding arguments, etc.
With Mnesia this consistency was guaranteed thanks to sync transactions
when updating queues and bindings.
Khepri has only eventual consistency semantics. This is problematic for the
MQTT plugin in the session_takeover_v3_v5 test scenario:
1. Client subscribes on node 1 (with v3). Node 1 returns subscription
success to client.
2. **Thereafter**, another client with the same MQTT client ID connects
to node 0 (with v5). "Proper" session takeover should take place.
However due to eventual consistency, the subscription / binding isn't
present yet on node 0. Therefore the session upgrade from v3 to v5
does not take place and leads to binding keys being absent when
messages are routed to the session's queue.
This commit is contained in:
parent
70047f5245
commit
a5f4317c3f
|
|
@ -1581,7 +1581,8 @@ drop_local(QNames, #state{subscriptions = Subs,
|
|||
"qos", _:1/binary >>},
|
||||
#{binding_keys := BindingKeys}})
|
||||
when Vhost0 =:= Vhost andalso
|
||||
ClientId0 =:= ClientId ->
|
||||
ClientId0 =:= ClientId andalso
|
||||
map_size(BindingKeys) > 0 ->
|
||||
rabbit_misc:maps_any(
|
||||
fun(BKey, true) ->
|
||||
TopicFilter = amqp_to_mqtt(BKey),
|
||||
|
|
|
|||
Loading…
Reference in New Issue