Initial commit

This commit is contained in:
Michael Klishin 2017-06-09 03:24:16 +03:00
commit 15e793abf4
9 changed files with 7781 additions and 0 deletions

View File

@ -0,0 +1,20 @@
*~
.sw?
.*.sw?
*.beam
*.coverdata
/.erlang.mk/
/cover/
/deps/
/doc/
/ebin/
/git-revisions.txt
/logs/
/plugins/
/rebar.config
/rebar.lock
/test/ct.cover.spec
/test/config_schema_SUITE_data/schema/
/xrefr
/rabbitmq_peer_discovery_etcd.d

View File

@ -0,0 +1,24 @@
PROJECT = rabbitmq_peer_discovery_etcd
PROJECT_DESCRIPTION = etcd-based RabbitMQ peer discovery backend
PROJECT_MOD = rabbitmq_peer_discovery_etcd_app
# FIXME: Use erlang.mk patched for RabbitMQ, while waiting for PRs to be
# reviewed and merged.
ERLANG_MK_REPO = https://github.com/rabbitmq/erlang.mk.git
ERLANG_MK_COMMIT = rabbitmq-tmp
DEPS = rabbit_common rabbitmq_peer_discovery_common rabbit
TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers ct_helper
dep_ct_helper = git https://github.com/extend/ct_helper.git master
DEP_PLUGINS = rabbit_common/mk/rabbitmq-plugin.mk
# FIXME: Use erlang.mk patched for RabbitMQ, while waiting for PRs to be
# reviewed and merged.
ERLANG_MK_REPO = https://github.com/rabbitmq/erlang.mk.git
ERLANG_MK_COMMIT = rabbitmq-tmp
include rabbitmq-components.mk
include erlang.mk

View File

@ -0,0 +1,28 @@
# RabbitMQ Peer Discovery Etcd
This is an etcd-based implementation of RabbitMQ [peer discovery interface](https://github.com/rabbitmq/rabbitmq-common/blob/master/src/rabbit_peer_discovery_backend.erl)
(new in 3.7.0, previously available in the [rabbitmq-autocluster plugin](https://github.com/rabbitmq/rabbitmq-autocluster)
by Gavin Roy).
## Supported RabbitMQ Versions
This plugin requires RabbitMQ 3.7.0 or later.
For an etcd-based peer discovery and cluster formation
mechanism that supports 3.6.x, see [rabbitmq-autocluster](https://github.com/rabbitmq/rabbitmq-autocluster).
## Contributing
See [CONTRIBUTING.md](./CONTRIBUTING.md) and our [development process overview](http://www.rabbitmq.com/github.html).
## License
[Licensed under the MPL](LICENSE-MPL-RabbitMQ), same as RabbitMQ server.
## Copyright
(c) Pivotal Software Inc., 2007-2017.

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,328 @@
ifeq ($(.DEFAULT_GOAL),)
# Define default goal to `all` because this file defines some targets
# before the inclusion of erlang.mk leading to the wrong target becoming
# the default.
.DEFAULT_GOAL = all
endif
# PROJECT_VERSION defaults to:
# 1. the version exported by rabbitmq-server-release;
# 2. the version stored in `git-revisions.txt`, if it exists;
# 3. a version based on git-describe(1), if it is a Git clone;
# 4. 0.0.0
PROJECT_VERSION := $(RABBITMQ_VERSION)
ifeq ($(PROJECT_VERSION),)
PROJECT_VERSION := $(shell \
if test -f git-revisions.txt; then \
head -n1 git-revisions.txt | \
awk '{print $$$(words $(PROJECT_DESCRIPTION) version);}'; \
else \
(git describe --dirty --abbrev=7 --tags --always --first-parent \
2>/dev/null || echo rabbitmq_v0_0_0) | \
sed -e 's/^rabbitmq_v//' -e 's/^v//' -e 's/_/./g' -e 's/-/+/' \
-e 's/-/./g'; \
fi)
endif
# --------------------------------------------------------------------
# RabbitMQ components.
# --------------------------------------------------------------------
# For RabbitMQ repositories, we want to checkout branches which match
# the parent project. For instance, if the parent project is on a
# release tag, dependencies must be on the same release tag. If the
# parent project is on a topic branch, dependencies must be on the same
# topic branch or fallback to `stable` or `master` whichever was the
# base of the topic branch.
dep_amqp_client = git_rmq rabbitmq-erlang-client $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbit = git_rmq rabbitmq-server $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbit_common = git_rmq rabbitmq-common $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_amqp1_0 = git_rmq rabbitmq-amqp1.0 $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_auth_backend_amqp = git_rmq rabbitmq-auth-backend-amqp $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_auth_backend_cache = git_rmq rabbitmq-auth-backend-cache $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_auth_backend_http = git_rmq rabbitmq-auth-backend-http $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_auth_backend_ldap = git_rmq rabbitmq-auth-backend-ldap $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_auth_mechanism_ssl = git_rmq rabbitmq-auth-mechanism-ssl $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_aws = git_rmq rabbitmq-aws $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_boot_steps_visualiser = git_rmq rabbitmq-boot-steps-visualiser $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_clusterer = git_rmq rabbitmq-clusterer $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_cli = git_rmq rabbitmq-cli $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_codegen = git_rmq rabbitmq-codegen $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_consistent_hash_exchange = git_rmq rabbitmq-consistent-hash-exchange $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_ct_client_helpers = git_rmq rabbitmq-ct-client-helpers $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_ct_helpers = git_rmq rabbitmq-ct-helpers $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_delayed_message_exchange = git_rmq rabbitmq-delayed-message-exchange $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_dotnet_client = git_rmq rabbitmq-dotnet-client $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_event_exchange = git_rmq rabbitmq-event-exchange $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_federation = git_rmq rabbitmq-federation $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_federation_management = git_rmq rabbitmq-federation-management $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_java_client = git_rmq rabbitmq-java-client $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_jms_client = git_rmq rabbitmq-jms-client $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_jms_cts = git_rmq rabbitmq-jms-cts $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_jms_topic_exchange = git_rmq rabbitmq-jms-topic-exchange $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_lvc = git_rmq rabbitmq-lvc-plugin $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_management = git_rmq rabbitmq-management $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_management_agent = git_rmq rabbitmq-management-agent $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_management_exchange = git_rmq rabbitmq-management-exchange $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_management_themes = git_rmq rabbitmq-management-themes $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_management_visualiser = git_rmq rabbitmq-management-visualiser $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_message_timestamp = git_rmq rabbitmq-message-timestamp $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_metronome = git_rmq rabbitmq-metronome $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_mqtt = git_rmq rabbitmq-mqtt $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_objc_client = git_rmq rabbitmq-objc-client $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_peer_discovery_aws = git_rmq rabbitmq-peer-discovery-aws $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_peer_discovery_common = git_rmq rabbitmq-peer-discovery-common $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_peer_discovery_consul = git_rmq rabbitmq-peer-discovery-consul $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_recent_history_exchange = git_rmq rabbitmq-recent-history-exchange $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_routing_node_stamp = git_rmq rabbitmq-routing-node-stamp $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_rtopic_exchange = git_rmq rabbitmq-rtopic-exchange $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_server_release = git_rmq rabbitmq-server-release $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_sharding = git_rmq rabbitmq-sharding $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_shovel = git_rmq rabbitmq-shovel $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_shovel_management = git_rmq rabbitmq-shovel-management $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_stomp = git_rmq rabbitmq-stomp $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_toke = git_rmq rabbitmq-toke $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_top = git_rmq rabbitmq-top $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_tracing = git_rmq rabbitmq-tracing $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_trust_store = git_rmq rabbitmq-trust-store $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_test = git_rmq rabbitmq-test $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_web_dispatch = git_rmq rabbitmq-web-dispatch $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_web_stomp = git_rmq rabbitmq-web-stomp $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_web_stomp_examples = git_rmq rabbitmq-web-stomp-examples $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_web_mqtt = git_rmq rabbitmq-web-mqtt $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_web_mqtt_examples = git_rmq rabbitmq-web-mqtt-examples $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_website = git_rmq rabbitmq-website $(current_rmq_ref) $(base_rmq_ref) live master
dep_toke = git_rmq toke $(current_rmq_ref) $(base_rmq_ref) master
dep_rabbitmq_public_umbrella = git_rmq rabbitmq-public-umbrella $(current_rmq_ref) $(base_rmq_ref) master
# Third-party dependencies version pinning.
#
# We do that in this file, which is copied in all projects, to ensure
# all projects use the same versions. It avoids conflicts and makes it
# possible to work with rabbitmq-public-umbrella.
dep_cowboy_commit = 1.0.4
dep_mochiweb = git git://github.com/basho/mochiweb.git v2.9.0p2
# Last commit of PropEr supporting Erlang R16B03.
dep_proper_commit = 735d972758d8bd85b12483626fe1b66450d6a6fe
dep_ranch_commit = 1.3.1
# Last commit of sockjs support Erlang R16B03 and 17.x.
dep_sockjs = git https://github.com/rabbitmq/sockjs-erlang.git 5af2b588c812c318b19bc105b577a759c71c3e0a
dep_webmachine_commit = 1.10.8p2
RABBITMQ_COMPONENTS = amqp_client \
rabbit \
rabbit_common \
rabbitmq_amqp1_0 \
rabbitmq_auth_backend_amqp \
rabbitmq_auth_backend_cache \
rabbitmq_auth_backend_http \
rabbitmq_auth_backend_ldap \
rabbitmq_auth_mechanism_ssl \
rabbitmq_aws \
rabbitmq_boot_steps_visualiser \
rabbitmq_clusterer \
rabbitmq_cli \
rabbitmq_codegen \
rabbitmq_consistent_hash_exchange \
rabbitmq_ct_client_helpers \
rabbitmq_ct_helpers \
rabbitmq_delayed_message_exchange \
rabbitmq_dotnet_client \
rabbitmq_event_exchange \
rabbitmq_federation \
rabbitmq_federation_management \
rabbitmq_java_client \
rabbitmq_jms_client \
rabbitmq_jms_cts \
rabbitmq_jms_topic_exchange \
rabbitmq_lvc \
rabbitmq_management \
rabbitmq_management_agent \
rabbitmq_management_exchange \
rabbitmq_management_themes \
rabbitmq_management_visualiser \
rabbitmq_message_timestamp \
rabbitmq_metronome \
rabbitmq_mqtt \
rabbitmq_objc_client \
rabbitmq_peer_discovery_aws \
rabbitmq_peer_discovery_common \
rabbitmq_peer_discovery_consul \
rabbitmq_recent_history_exchange \
rabbitmq_routing_node_stamp \
rabbitmq_rtopic_exchange \
rabbitmq_server_release \
rabbitmq_sharding \
rabbitmq_shovel \
rabbitmq_shovel_management \
rabbitmq_stomp \
rabbitmq_toke \
rabbitmq_top \
rabbitmq_tracing \
rabbitmq_trust_store \
rabbitmq_web_dispatch \
rabbitmq_web_mqtt \
rabbitmq_web_mqtt_examples \
rabbitmq_web_stomp \
rabbitmq_web_stomp_examples \
rabbitmq_website
# Several components have a custom erlang.mk/build.config, mainly
# to disable eunit. Therefore, we can't use the top-level project's
# erlang.mk copy.
NO_AUTOPATCH += $(RABBITMQ_COMPONENTS)
ifeq ($(origin current_rmq_ref),undefined)
ifneq ($(wildcard .git),)
current_rmq_ref := $(shell (\
ref=$$(git branch --list | awk '/^\* \(.*detached / {ref=$$0; sub(/.*detached [^ ]+ /, "", ref); sub(/\)$$/, "", ref); print ref; exit;} /^\* / {ref=$$0; sub(/^\* /, "", ref); print ref; exit}');\
if test "$$(git rev-parse --short HEAD)" != "$$ref"; then echo "$$ref"; fi))
else
current_rmq_ref := master
endif
endif
export current_rmq_ref
ifeq ($(origin base_rmq_ref),undefined)
ifneq ($(wildcard .git),)
base_rmq_ref := $(shell \
(git rev-parse --verify -q stable >/dev/null && \
git merge-base --is-ancestor $$(git merge-base master HEAD) stable && \
echo stable) || \
echo master)
else
base_rmq_ref := master
endif
endif
export base_rmq_ref
# Repository URL selection.
#
# First, we infer other components' location from the current project
# repository URL, if it's a Git repository:
# - We take the "origin" remote URL as the base
# - The current project name and repository name is replaced by the
# target's properties:
# eg. rabbitmq-common is replaced by rabbitmq-codegen
# eg. rabbit_common is replaced by rabbitmq_codegen
#
# If cloning from this computed location fails, we fallback to RabbitMQ
# upstream which is GitHub.
# Maccro to transform eg. "rabbit_common" to "rabbitmq-common".
rmq_cmp_repo_name = $(word 2,$(dep_$(1)))
# Upstream URL for the current project.
RABBITMQ_COMPONENT_REPO_NAME := $(call rmq_cmp_repo_name,$(PROJECT))
RABBITMQ_UPSTREAM_FETCH_URL ?= https://github.com/rabbitmq/$(RABBITMQ_COMPONENT_REPO_NAME).git
RABBITMQ_UPSTREAM_PUSH_URL ?= git@github.com:rabbitmq/$(RABBITMQ_COMPONENT_REPO_NAME).git
# Current URL for the current project. If this is not a Git clone,
# default to the upstream Git repository.
ifneq ($(wildcard .git),)
git_origin_fetch_url := $(shell git config remote.origin.url)
git_origin_push_url := $(shell git config remote.origin.pushurl || git config remote.origin.url)
RABBITMQ_CURRENT_FETCH_URL ?= $(git_origin_fetch_url)
RABBITMQ_CURRENT_PUSH_URL ?= $(git_origin_push_url)
else
RABBITMQ_CURRENT_FETCH_URL ?= $(RABBITMQ_UPSTREAM_FETCH_URL)
RABBITMQ_CURRENT_PUSH_URL ?= $(RABBITMQ_UPSTREAM_PUSH_URL)
endif
# Macro to replace the following pattern:
# 1. /foo.git -> /bar.git
# 2. /foo -> /bar
# 3. /foo/ -> /bar/
subst_repo_name = $(patsubst %/$(1)/%,%/$(2)/%,$(patsubst %/$(1),%/$(2),$(patsubst %/$(1).git,%/$(2).git,$(3))))
# Macro to replace both the project's name (eg. "rabbit_common") and
# repository name (eg. "rabbitmq-common") by the target's equivalent.
#
# This macro is kept on one line because we don't want whitespaces in
# the returned value, as it's used in $(dep_fetch_git_rmq) in a shell
# single-quoted string.
dep_rmq_repo = $(if $(dep_$(2)),$(call subst_repo_name,$(PROJECT),$(2),$(call subst_repo_name,$(RABBITMQ_COMPONENT_REPO_NAME),$(call rmq_cmp_repo_name,$(2)),$(1))),$(pkg_$(1)_repo))
dep_rmq_commits = $(if $(dep_$(1)), \
$(wordlist 3,$(words $(dep_$(1))),$(dep_$(1))), \
$(pkg_$(1)_commit))
define dep_fetch_git_rmq
fetch_url1='$(call dep_rmq_repo,$(RABBITMQ_CURRENT_FETCH_URL),$(1))'; \
fetch_url2='$(call dep_rmq_repo,$(RABBITMQ_UPSTREAM_FETCH_URL),$(1))'; \
if test "$$$$fetch_url1" != '$(RABBITMQ_CURRENT_FETCH_URL)' && \
git clone -q -n -- "$$$$fetch_url1" $(DEPS_DIR)/$(call dep_name,$(1)); then \
fetch_url="$$$$fetch_url1"; \
push_url='$(call dep_rmq_repo,$(RABBITMQ_CURRENT_PUSH_URL),$(1))'; \
elif git clone -q -n -- "$$$$fetch_url2" $(DEPS_DIR)/$(call dep_name,$(1)); then \
fetch_url="$$$$fetch_url2"; \
push_url='$(call dep_rmq_repo,$(RABBITMQ_UPSTREAM_PUSH_URL),$(1))'; \
fi; \
cd $(DEPS_DIR)/$(call dep_name,$(1)) && ( \
$(foreach ref,$(call dep_rmq_commits,$(1)), \
git checkout -q $(ref) >/dev/null 2>&1 || \
) \
(echo "error: no valid pathspec among: $(call dep_rmq_commits,$(1))" \
1>&2 && false) ) && \
(test "$$$$fetch_url" = "$$$$push_url" || \
git remote set-url --push origin "$$$$push_url")
endef
# --------------------------------------------------------------------
# Component distribution.
# --------------------------------------------------------------------
list-dist-deps::
@:
prepare-dist::
@:
# --------------------------------------------------------------------
# rabbitmq-components.mk checks.
# --------------------------------------------------------------------
# If this project is under the Umbrella project, we override $(DEPS_DIR)
# to point to the Umbrella's one. We also disable `make distclean` so
# $(DEPS_DIR) is not accidentally removed.
ifneq ($(wildcard ../../UMBRELLA.md),)
UNDER_UMBRELLA = 1
else ifneq ($(wildcard UMBRELLA.md),)
UNDER_UMBRELLA = 1
endif
ifeq ($(UNDER_UMBRELLA),1)
ifneq ($(PROJECT),rabbitmq_public_umbrella)
DEPS_DIR ?= $(abspath ..)
endif
ifneq ($(filter distclean distclean-deps,$(MAKECMDGOALS)),)
SKIP_DEPS = 1
endif
endif
UPSTREAM_RMQ_COMPONENTS_MK = $(DEPS_DIR)/rabbit_common/mk/rabbitmq-components.mk
check-rabbitmq-components.mk:
$(verbose) cmp -s rabbitmq-components.mk \
$(UPSTREAM_RMQ_COMPONENTS_MK) || \
(echo "error: rabbitmq-components.mk must be updated!" 1>&2; \
false)
ifeq ($(PROJECT),rabbit_common)
rabbitmq-components-mk:
@:
else
rabbitmq-components-mk:
$(gen_verbose) cp -a $(UPSTREAM_RMQ_COMPONENTS_MK) .
ifeq ($(DO_COMMIT),yes)
$(verbose) git diff --quiet rabbitmq-components.mk \
|| git commit -m 'Update rabbitmq-components.mk' rabbitmq-components.mk
endif
endif

View File

@ -0,0 +1,291 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License at
%% http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
%% License for the specific language governing rights and limitations
%% under the License.
%%
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is AWeber Communications.
%% Copyright (c) 2015-2016 AWeber Communications
%% Copyright (c) 2016-2017 Pivotal Software, Inc. All rights reserved.
%%
-module(rabbit_peer_discovery_etcd).
-behaviour(rabbit_peer_discovery_backend).
-include_lib("rabbit_common/include/rabbit.hrl").
-include_lib("rabbitmq_peer_discovery_common/include/rabbit_peer_discovery.hrl").
-export([list_nodes/0, supports_registration/0, register/0, unregister/0,
post_registration/0]).
-export([start_node_key_updater/0, update_node_key/0]).
-define(CONFIG_MODULE, rabbit_peer_discovery_config).
-define(UTIL_MODULE, rabbit_peer_discovery_util).
-define(HTTPC_MODULE, rabbit_peer_discovery_httpc).
-define(BACKEND_CONFIG_KEY, peer_discovery_etcd).
-define(CONFIG_MAPPING,
#{
etcd_scheme => #peer_discovery_config_entry_meta{
type = string,
env_variable = "ETCD_SCHEME",
default_value = "http"
},
etcd_host => #peer_discovery_config_entry_meta{
type = string,
env_variable = "ETCD_HOST",
default_value = "localhost"
},
etcd_port => #peer_discovery_config_entry_meta{
type = integer,
env_variable = "ETCD_PORT",
default_value = 2379
},
etcd_prefix => #peer_discovery_config_entry_meta{
type = string,
env_variable = "ETCD_PREFIX",
default_value = "rabbitmq"
},
etcd_node_ttl => #peer_discovery_config_entry_meta{
type = integer,
env_variable = "ETCD_NODE_TTL",
default_value = 30
},
cluster_name => #peer_discovery_config_entry_meta{
type = string,
env_variable = "CLUSTER_NAME",
default_value = "default"
}
}).
%%
%% API
%%
-spec list_nodes() -> {ok, {Nodes :: list(), NodeType :: rabbit_types:node_type()}} | {error, Reason :: string()}.
list_nodes() ->
case application:get_env(rabbit, cluster_formation) of
undefined ->
{ok, {[], disc}};
{ok, ClusterFormation} ->
case proplists:get_value(?BACKEND_CONFIG_KEY, ClusterFormation) of
undefined ->
rabbit_log:warning("Peer discovery backend is set to ~s "
"but final config does not contain rabbit.cluster_formation.peer_discovery_etcd. "
"Cannot discover any nodes because etcd cluster details are not configured!",
[?MODULE]),
{ok, {[], disc}};
Proplist ->
M = maps:from_list(Proplist),
case etcd_get(nodes_path(M), [{recursive, true}], M) of
{ok, Nodes} ->
NodeList = extract_nodes(Nodes),
{ok, NodeList};
{error, "404"} ->
{ok, []};
Error -> Error
end
end
end.
-spec supports_registration() -> boolean().
supports_registration() ->
true.
-spec register() -> ok | {error, Reason :: string()}.
register() ->
M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY),
case set_etcd_node_key(M) of
{ok, _} ->
rabbit_log:info("Registered node with etcd"),
ok;
{error, Error} ->
rabbit_log:error("Failed to register node with etcd: ~s", [Error]),
{error, Error}
end.
-spec unregister() -> ok | {error, Reason :: string()}.
unregister() ->
M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY),
rabbit_log:info("Unregistering node with etcd"),
case etcd_delete(node_path(M), [{recursive, true}], M) of
{ok, _} -> ok;
Error -> Error
end.
-spec post_registration() -> ok | {error, Reason :: string()}.
post_registration() ->
start_node_key_updater(),
ok.
%%
%% Implementation
%%
-spec get_config_key(Key :: atom(), Map :: #{atom() => peer_discovery_config_value()})
-> peer_discovery_config_value().
get_config_key(Key, Map) ->
?CONFIG_MODULE:get(Key, ?CONFIG_MAPPING, Map).
%% @doc Update etcd, setting a key for this node with a TTL of etcd_node_ttl
%% @end
-spec set_etcd_node_key(Map :: #{atom() => peer_discovery_config_value()})
-> ok | {error, Reason :: string()}.
set_etcd_node_key(Map) ->
Interval = get_config_key(etcd_node_ttl, Map),
etcd_put(node_path(Map), [{ttl, Interval}], [{value, enabled}], Map).
%% @doc Part of etcd path that allows us to distinguish different
%% cluster using the same etcd server.
%% @end
-spec cluster_name_path_part(Map :: #{atom() => peer_discovery_config_value()}) -> string().
cluster_name_path_part(Map) ->
case get_config_key(cluster_name, Map) of
"undefined" -> "default";
Value -> Value
end.
%% @doc Return a list of path segments that are the base path for all
%% etcd keys related to current cluster.
%% @end
-spec base_path(Map :: #{atom() => peer_discovery_config_value()}) -> [?HTTPC_MODULE:path_component()].
base_path(Map) ->
[v2, keys, get_config_key(etcd_prefix, Map), cluster_name_path_part(Map)].
%% @doc Returns etcd path under which nodes should be registered.
%% @end
-spec nodes_path(Map :: #{atom() => peer_discovery_config_value()}) -> [?HTTPC_MODULE:path_component()].
nodes_path(Map) ->
base_path(Map) ++ [nodes].
%% @doc Returns etcd path under which current node should be registered
%% @end
-spec node_path(Map :: #{atom() => peer_discovery_config_value()}) -> [?HTTPC_MODULE:path_component()].
node_path(Map) ->
nodes_path(Map) ++ [atom_to_list(node())].
%% @doc Return the list of erlang nodes
%% @end
%%
-spec extract_nodes(list(), list()) -> [node()].
extract_nodes([], Nodes) -> Nodes;
extract_nodes([H|T], Nodes) ->
M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY),
extract_nodes(T, lists:append(Nodes, [get_node_from_key(maps:get(<<"key">>, H), M)])).
%% @doc Return the list of erlang nodes
%% @end
%%
-spec extract_nodes(list()) -> [node()].
extract_nodes([]) -> [];
extract_nodes(Nodes) ->
Dir = maps:get(<<"node">>, Nodes),
case maps:get(<<"nodes">>, Dir, undefined) of
undefined -> [];
Values -> extract_nodes(Values, [])
end.
%% @doc Given an etcd key, return the erlang node name
%% @end
%%
-spec get_node_from_key(binary(), Map :: #{atom() => peer_discovery_config_value()}) -> node().
get_node_from_key(<<"/", V/binary>>, Map) -> get_node_from_key(V, Map);
get_node_from_key(V, Map) ->
%% nodes path is /v2/keys/<etcd-prefix>/<cluster-name>/nodes
%% etcd returns node keys as /<etcd-prefix>/<cluster-name>/nodes/<nodename>
%% We are mapping path components from "<etcd-prefix>" up to "nodes",
%% and discarding the same number of characters from the key returned by etcd.
Path = string:concat(?HTTPC_MODULE:build_path(lists:sublist(nodes_path(Map), 3, 3)), "/"),
?UTIL_MODULE:node_name(string:substr(binary_to_list(V), length(Path))).
%% @doc Generate random string. We are using it for compare-and-change
%% operations in etcd.
%% @end
%% -spec generate_unique_string() -> string().
%% generate_unique_string() ->
%% [ $a - 1 + rand:uniform(26) || _ <- lists:seq(1, 32) ].
-spec etcd_delete(Path, Query, Map)
-> {ok, term()} | {error, string()} when
Path :: [?HTTPC_MODULE:path_component()],
Query :: [?HTTPC_MODULE:query_component()],
Map :: #{atom() => peer_discovery_config_value()}.
etcd_delete(Path, Query, Map) ->
?UTIL_MODULE:stringify_error(
?HTTPC_MODULE:delete(get_config_key(etcd_scheme, Map),
get_config_key(etcd_host, Map),
get_config_key(etcd_port, Map),
Path, Query, "")).
-spec etcd_get(Path, Query, Map)
-> {ok, term()} | {error, string()} when
Path :: [?HTTPC_MODULE:path_component()],
Query :: [?HTTPC_MODULE:query_component()],
Map :: #{atom() => peer_discovery_config_value()}.
etcd_get(Path, Query, Map) ->
?UTIL_MODULE:stringify_error(
?HTTPC_MODULE:get(get_config_key(etcd_scheme, Map),
get_config_key(etcd_host, Map),
get_config_key(etcd_port, Map),
Path, Query)).
-spec etcd_put(Path, Query, Body, Map) -> {ok, term()} | {error, string()} when
Path :: [?HTTPC_MODULE:path_component()],
Query :: [?HTTPC_MODULE:query_component()],
Body :: [?HTTPC_MODULE:query_component()],
Map :: #{atom() => peer_discovery_config_value()}.
etcd_put(Path, Query, Body, Map) ->
?UTIL_MODULE:stringify_error(
?HTTPC_MODULE:put(get_config_key(etcd_scheme, Map),
get_config_key(etcd_host, Map),
get_config_key(etcd_port, Map),
Path, Query, ?HTTPC_MODULE:build_query(Body))).
start_node_key_updater() ->
case rabbit_peer_discovery:backend() of
?MODULE ->
M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY),
case get_config_key(etcd_node_ttl, M) of
undefined -> ok;
%% in seconds
Interval ->
%% We cannot use timer:apply_interval/4 here because this
%% function is executed in a short live process and when it
%% exits, the timer module will automatically cancel the
%% timer.
%%
%% Instead we delegate to a locally registered gen_server,
%% `rabbitmq_peer_discovery_etcd_health_check_helper`.
%%
%% The value is 1/2 of what's configured to avoid a race
%% condition between check TTL expiration and in flight
%% notifications
rabbitmq_peer_discovery_etcd_health_check_helper:start_timer(Interval * 500),
ok
end;
_ -> ok
end.
-spec update_node_key() -> ok.
update_node_key() ->
M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY),
set_etcd_node_key(M).

View File

@ -0,0 +1,30 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License at
%% http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
%% License for the specific language governing rights and limitations
%% under the License.
%%
%% The Original Code is RabbitMQ Management Console.
%%
%% The Initial Developer of the Original Code is GoPivotal, Inc.
%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
%%
-module(rabbitmq_peer_discovery_etcd_app).
%%
%% API
%%
-behaviour(application).
-export([start/2, stop/1]).
start(_Type, _StartArgs) ->
rabbitmq_peer_discovery_etcd_sup:start_link().
stop(_State) ->
ok.

View File

@ -0,0 +1,75 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License at
%% http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
%% License for the specific language governing rights and limitations
%% under the License.
%%
%% The Original Code is RabbitMQ Management Console.
%%
%% The Initial Developer of the Original Code is GoPivotal, Inc.
%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
%%
%% This gen_server starts a periodic timer on behalf of
%% a short lived process that kicks off peer discovery.
%% This is so that the timer is not automatically canceled
%% and cleaned up by the timer server when the short lived
%% process terminates.
-module(rabbitmq_peer_discovery_etcd_health_check_helper).
-behaviour(gen_server).
-export([start_link/0, start_timer/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
-record(state, {timer_ref}).
%%
%% API
%%
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
start_timer(Interval) ->
gen_server:call(?MODULE, {start_timer, Interval}, infinity).
init([]) ->
{ok, #state{timer_ref = undefined}}.
handle_call({start_timer, Interval}, _From, #state{timer_ref = undefined} = State) ->
rabbit_log:info("Starting etcd health check notifier (effective interval: ~p milliseconds)", [Interval]),
{ok, TRef} = timer:apply_interval(Interval, rabbit_peer_discovery_etcd,
update_node_key, []),
{reply, ok, State#state{timer_ref = TRef}};
handle_call({start_timer, _Interval}, _From, State) ->
{reply, ok, State};
handle_call(_Msg, _From, State) ->
{reply, not_understood, State}.
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info(_MSg, State) ->
{noreply, State}.
terminate(_Arg, #state{timer_ref = undefined}) ->
ok;
terminate(_Arg, #state{timer_ref = TRef}) ->
timer:cancel(TRef),
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.

View File

@ -0,0 +1,42 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License at
%% http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
%% License for the specific language governing rights and limitations
%% under the License.
%%
%% The Original Code is RabbitMQ.
%%
%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
%%
-module(rabbitmq_peer_discovery_etcd_sup).
-behaviour(supervisor).
-export([init/1, start_link/0]).
-include_lib("rabbit_common/include/rabbit.hrl").
%%
%% API
%%
init([]) ->
Flags = #{strategy => one_for_one,
intensity => 1,
period => 1},
Specs = [#{id => rabbitmq_peer_discovery_etcd_health_check_helper,
start => {rabbitmq_peer_discovery_etcd_health_check_helper, start_link, []},
restart => permanent,
shutdown => ?SUPERVISOR_WAIT,
type => worker,
modules => [rabbitmq_peer_discovery_etcd_health_check_helper]
}],
{ok, {Flags, Specs}}.
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).