Add active consumer fields if necessary

When calling node is 3.8 and one of the nodes is < 3.8, active and
activity status fields need to be set with default values.

[#163298456]
This commit is contained in:
Arnaud Cogoluègnes 2019-02-05 15:06:51 +01:00
parent aa93258f16
commit 97c048cf37
2 changed files with 67 additions and 4 deletions

View File

@ -50,14 +50,15 @@ defmodule RabbitMQ.CLI.Ctl.Commands.ListConsumersCommand do
info_keys = InfoKeys.prepare_info_keys(args)
Helpers.with_nodes_in_cluster(node_name, fn nodes ->
RpcStream.receive_list_items(
RpcStream.receive_list_items_with_fun(
node_name,
:rabbit_amqqueue,
[{:rabbit_amqqueue,
:emit_consumers_all,
[nodes, vhost],
[nodes, vhost]}],
timeout,
info_keys,
Kernel.length(nodes)
Kernel.length(nodes),
fn item -> fill_consumer_active_fields(item) end
)
end)
end
@ -75,4 +76,28 @@ defmodule RabbitMQ.CLI.Ctl.Commands.ListConsumersCommand do
end
def banner(_, %{vhost: vhost}), do: "Listing consumers on vhost #{vhost} ..."
# add missing fields if response comes from node < 3.8
def fill_consumer_active_fields({[], {chunk, :continue}}) do
{[], {chunk, :continue}}
end
def fill_consumer_active_fields({items, {chunk, :continue}}) do
{Enum.map(items, fn item ->
case Keyword.has_key?(item, :active) do
true ->
item
false ->
Keyword.drop(item, [:arguments])
++ [active: true, activity_status: :up]
++ [arguments: Keyword.get(item, :arguments, [])]
end
end), {chunk, :continue}}
end
def fill_consumer_active_fields(v) do
v
end
end

View File

@ -169,5 +169,43 @@ defmodule ListConsumersCommandTest do
end
end
test "fill_consumer_active_fields: add missing fields if necessary" do
consumer38 = [
queue_name: {:resource, "/", :queue, "queue1"},
channel_pid: "",
consumer_tag: "ctag1",
ack_required: false,
prefetch_count: 0,
active: true,
activity_status: :up,
arguments: []
]
assert @command.fill_consumer_active_fields({[
consumer38
], {1, :continue}}) == {[consumer38], {1, :continue}}
assert @command.fill_consumer_active_fields({[
[
queue_name: {:resource, "/", :queue, "queue2"},
channel_pid: "",
consumer_tag: "ctag2",
ack_required: false,
prefetch_count: 0,
arguments: []
]
], {1, :continue}}) == {[
[
queue_name: {:resource, "/", :queue, "queue2"},
channel_pid: "",
consumer_tag: "ctag2",
ack_required: false,
prefetch_count: 0,
active: true,
activity_status: :up,
arguments: []
]
], {1, :continue}}
end
end