Initial import
This commit is contained in:
parent
b9de5bc80b
commit
ab98aecd17
|
|
@ -15,3 +15,42 @@ deps
|
|||
.rebar3
|
||||
_build/
|
||||
_checkouts/
|
||||
|
||||
erl_crash.dump
|
||||
.sw?
|
||||
.*.sw?
|
||||
*.beam
|
||||
/.erlang.mk/
|
||||
/cover/
|
||||
/deps/
|
||||
/ebin/
|
||||
/logs/
|
||||
/plugins/
|
||||
/xrefr
|
||||
elvis
|
||||
callgrind*
|
||||
ct.coverdata
|
||||
test/ct.cover.spec
|
||||
_build
|
||||
|
||||
rabbitmq_stream.d
|
||||
*.plt
|
||||
*.d
|
||||
|
||||
*.jar
|
||||
|
||||
|
||||
*~
|
||||
.sw?
|
||||
.*.sw?
|
||||
*.beam
|
||||
*.class
|
||||
*.dat
|
||||
*.dump
|
||||
*.iml
|
||||
*.ipr
|
||||
*.iws
|
||||
.DS_Store
|
||||
\#~
|
||||
/.idea/
|
||||
/deps/
|
||||
|
|
|
|||
|
|
@ -0,0 +1,44 @@
|
|||
# Contributor Code of Conduct
|
||||
|
||||
As contributors and maintainers of this project, and in the interest of fostering an open
|
||||
and welcoming community, we pledge to respect all people who contribute through reporting
|
||||
issues, posting feature requests, updating documentation, submitting pull requests or
|
||||
patches, and other activities.
|
||||
|
||||
We are committed to making participation in this project a harassment-free experience for
|
||||
everyone, regardless of level of experience, gender, gender identity and expression,
|
||||
sexual orientation, disability, personal appearance, body size, race, ethnicity, age,
|
||||
religion, or nationality.
|
||||
|
||||
Examples of unacceptable behavior by participants include:
|
||||
|
||||
* The use of sexualized language or imagery
|
||||
* Personal attacks
|
||||
* Trolling or insulting/derogatory comments
|
||||
* Public or private harassment
|
||||
* Publishing other's private information, such as physical or electronic addresses,
|
||||
without explicit permission
|
||||
* Other unethical or unprofessional conduct
|
||||
|
||||
Project maintainers have the right and responsibility to remove, edit, or reject comments,
|
||||
commits, code, wiki edits, issues, and other contributions that are not aligned to this
|
||||
Code of Conduct, or to ban temporarily or permanently any contributor for other behaviors
|
||||
that they deem inappropriate, threatening, offensive, or harmful.
|
||||
|
||||
By adopting this Code of Conduct, project maintainers commit themselves to fairly and
|
||||
consistently applying these principles to every aspect of managing this project. Project
|
||||
maintainers who do not follow or enforce the Code of Conduct may be permanently removed
|
||||
from the project team.
|
||||
|
||||
This Code of Conduct applies both within project spaces and in public spaces when an
|
||||
individual is representing the project or its community.
|
||||
|
||||
Instances of abusive, harassing, or otherwise unacceptable behavior may be reported by
|
||||
contacting a project maintainer at [info@rabbitmq.com](mailto:info@rabbitmq.com). All complaints will
|
||||
be reviewed and investigated and will result in a response that is deemed necessary and
|
||||
appropriate to the circumstances. Maintainers are obligated to maintain confidentiality
|
||||
with regard to the reporter of an incident.
|
||||
|
||||
This Code of Conduct is adapted from the
|
||||
[Contributor Covenant](http://contributor-covenant.org), version 1.3.0, available at
|
||||
[contributor-covenant.org/version/1/3/0/](http://contributor-covenant.org/version/1/3/0/)
|
||||
|
|
@ -0,0 +1,38 @@
|
|||
## Overview
|
||||
|
||||
RabbitMQ projects use pull requests to discuss, collaborate on and accept code contributions.
|
||||
Pull requests is the primary place of discussing code changes.
|
||||
|
||||
## How to Contribute
|
||||
|
||||
The process is fairly standard:
|
||||
|
||||
* Fork the repository or repositories you plan on contributing to
|
||||
* Clone [RabbitMQ umbrella repository](https://github.com/rabbitmq/rabbitmq-public-umbrella)
|
||||
* `cd umbrella`, `make co`
|
||||
* Create a branch with a descriptive name in the relevant repositories
|
||||
* Make your changes, run tests, commit with a [descriptive message](http://tbaggery.com/2008/04/19/a-note-about-git-commit-messages.html), push to your fork
|
||||
* Submit pull requests with an explanation what has been changed and **why**
|
||||
* Submit a filled out and signed [Contributor Agreement](https://github.com/rabbitmq/ca#how-to-submit) if needed (see below)
|
||||
* Be patient. We will get to your pull request eventually
|
||||
|
||||
If what you are going to work on is a substantial change, please first ask the core team
|
||||
of their opinion on [RabbitMQ mailing list](https://groups.google.com/forum/#!forum/rabbitmq-users).
|
||||
|
||||
|
||||
## Code of Conduct
|
||||
|
||||
See [CODE_OF_CONDUCT.md](./CODE_OF_CONDUCT.md).
|
||||
|
||||
|
||||
## Contributor Agreement
|
||||
|
||||
If you want to contribute a non-trivial change, please submit a signed copy of our
|
||||
[Contributor Agreement](https://github.com/rabbitmq/ca#how-to-submit) around the time
|
||||
you submit your pull request. This will make it much easier (in some cases, possible)
|
||||
for the RabbitMQ team at Pivotal to merge your contribution.
|
||||
|
||||
|
||||
## Where to Ask Questions
|
||||
|
||||
If something isn't clear, feel free to ask on our [mailing list](https://groups.google.com/forum/#!forum/rabbitmq-users).
|
||||
|
|
@ -0,0 +1,5 @@
|
|||
This package, the RabbitMQ server is licensed under the MPL 1.1. For the
|
||||
MPL 1.1, please see LICENSE-MPL-RabbitMQ.
|
||||
|
||||
If you have any questions regarding licensing, please contact us at
|
||||
info@rabbitmq.com.
|
||||
|
|
@ -0,0 +1,31 @@
|
|||
PROJECT = rabbitmq_stream
|
||||
PROJECT_DESCRIPTION = RabbitMQ Stream
|
||||
PROJECT_MOD = rabbit_stream
|
||||
|
||||
define PROJECT_ENV
|
||||
[
|
||||
{tcp_listeners, [5555]},
|
||||
{num_tcp_acceptors, 10},
|
||||
{num_ssl_acceptors, 10},
|
||||
{tcp_listen_options, [{backlog, 128},
|
||||
{nodelay, true}]},
|
||||
{initial_credits, 50000},
|
||||
{credits_required_for_unblocking, 12500}
|
||||
]
|
||||
endef
|
||||
|
||||
|
||||
DEPS = rabbit
|
||||
TEST_DEPS = rabbitmq_ct_helpers
|
||||
|
||||
DEP_EARLY_PLUGINS = rabbit_common/mk/rabbitmq-early-plugin.mk
|
||||
DEP_PLUGINS = rabbit_common/mk/rabbitmq-plugin.mk
|
||||
|
||||
# FIXME: Use erlang.mk patched for RabbitMQ, while waiting for PRs to be
|
||||
# reviewed and merged.
|
||||
|
||||
ERLANG_MK_REPO = https://github.com/rabbitmq/erlang.mk.git
|
||||
ERLANG_MK_COMMIT = rabbitmq-tmp
|
||||
|
||||
include rabbitmq-components.mk
|
||||
include erlang.mk
|
||||
|
|
@ -1,2 +1,2 @@
|
|||
# rabbitmq-stream
|
||||
RabbitMQ Stream Plugin
|
||||
# RabbitMQ Stream Plugin
|
||||
|
||||
|
|
|
|||
File diff suppressed because it is too large
Load Diff
|
|
@ -0,0 +1,337 @@
|
|||
ifeq ($(.DEFAULT_GOAL),)
|
||||
# Define default goal to `all` because this file defines some targets
|
||||
# before the inclusion of erlang.mk leading to the wrong target becoming
|
||||
# the default.
|
||||
.DEFAULT_GOAL = all
|
||||
endif
|
||||
|
||||
# PROJECT_VERSION defaults to:
|
||||
# 1. the version exported by rabbitmq-server-release;
|
||||
# 2. the version stored in `git-revisions.txt`, if it exists;
|
||||
# 3. a version based on git-describe(1), if it is a Git clone;
|
||||
# 4. 0.0.0
|
||||
|
||||
PROJECT_VERSION := $(RABBITMQ_VERSION)
|
||||
|
||||
ifeq ($(PROJECT_VERSION),)
|
||||
PROJECT_VERSION := $(shell \
|
||||
if test -f git-revisions.txt; then \
|
||||
head -n1 git-revisions.txt | \
|
||||
awk '{print $$$(words $(PROJECT_DESCRIPTION) version);}'; \
|
||||
else \
|
||||
(git describe --dirty --abbrev=7 --tags --always --first-parent \
|
||||
2>/dev/null || echo rabbitmq_v0_0_0) | \
|
||||
sed -e 's/^rabbitmq_v//' -e 's/^v//' -e 's/_/./g' -e 's/-/+/' \
|
||||
-e 's/-/./g'; \
|
||||
fi)
|
||||
endif
|
||||
|
||||
# --------------------------------------------------------------------
|
||||
# RabbitMQ components.
|
||||
# --------------------------------------------------------------------
|
||||
|
||||
# For RabbitMQ repositories, we want to checkout branches which match
|
||||
# the parent project. For instance, if the parent project is on a
|
||||
# release tag, dependencies must be on the same release tag. If the
|
||||
# parent project is on a topic branch, dependencies must be on the same
|
||||
# topic branch or fallback to `stable` or `master` whichever was the
|
||||
# base of the topic branch.
|
||||
|
||||
dep_amqp_client = git_rmq rabbitmq-erlang-client $(current_rmq_ref) $(base_rmq_ref) master
|
||||
dep_amqp10_client = git_rmq rabbitmq-amqp1.0-client $(current_rmq_ref) $(base_rmq_ref) master
|
||||
dep_amqp10_common = git_rmq rabbitmq-amqp1.0-common $(current_rmq_ref) $(base_rmq_ref) master
|
||||
dep_rabbit = git_rmq rabbitmq-server $(current_rmq_ref) $(base_rmq_ref) master
|
||||
dep_rabbit_common = git_rmq rabbitmq-common $(current_rmq_ref) $(base_rmq_ref) master
|
||||
dep_rabbitmq_amqp1_0 = git_rmq rabbitmq-amqp1.0 $(current_rmq_ref) $(base_rmq_ref) master
|
||||
dep_rabbitmq_auth_backend_amqp = git_rmq rabbitmq-auth-backend-amqp $(current_rmq_ref) $(base_rmq_ref) master
|
||||
dep_rabbitmq_auth_backend_cache = git_rmq rabbitmq-auth-backend-cache $(current_rmq_ref) $(base_rmq_ref) master
|
||||
dep_rabbitmq_auth_backend_http = git_rmq rabbitmq-auth-backend-http $(current_rmq_ref) $(base_rmq_ref) master
|
||||
dep_rabbitmq_auth_backend_ldap = git_rmq rabbitmq-auth-backend-ldap $(current_rmq_ref) $(base_rmq_ref) master
|
||||
dep_rabbitmq_auth_backend_oauth2 = git_rmq rabbitmq-auth-backend-oauth2 $(current_rmq_ref) $(base_rmq_ref) master
|
||||
dep_rabbitmq_auth_mechanism_ssl = git_rmq rabbitmq-auth-mechanism-ssl $(current_rmq_ref) $(base_rmq_ref) master
|
||||
dep_rabbitmq_aws = git_rmq rabbitmq-aws $(current_rmq_ref) $(base_rmq_ref) master
|
||||
dep_rabbitmq_boot_steps_visualiser = git_rmq rabbitmq-boot-steps-visualiser $(current_rmq_ref) $(base_rmq_ref) master
|
||||
dep_rabbitmq_cli = git_rmq rabbitmq-cli $(current_rmq_ref) $(base_rmq_ref) master
|
||||
dep_rabbitmq_codegen = git_rmq rabbitmq-codegen $(current_rmq_ref) $(base_rmq_ref) master
|
||||
dep_rabbitmq_consistent_hash_exchange = git_rmq rabbitmq-consistent-hash-exchange $(current_rmq_ref) $(base_rmq_ref) master
|
||||
dep_rabbitmq_ct_client_helpers = git_rmq rabbitmq-ct-client-helpers $(current_rmq_ref) $(base_rmq_ref) master
|
||||
dep_rabbitmq_ct_helpers = git_rmq rabbitmq-ct-helpers $(current_rmq_ref) $(base_rmq_ref) master
|
||||
dep_rabbitmq_delayed_message_exchange = git_rmq rabbitmq-delayed-message-exchange $(current_rmq_ref) $(base_rmq_ref) master
|
||||
dep_rabbitmq_dotnet_client = git_rmq rabbitmq-dotnet-client $(current_rmq_ref) $(base_rmq_ref) master
|
||||
dep_rabbitmq_event_exchange = git_rmq rabbitmq-event-exchange $(current_rmq_ref) $(base_rmq_ref) master
|
||||
dep_rabbitmq_federation = git_rmq rabbitmq-federation $(current_rmq_ref) $(base_rmq_ref) master
|
||||
dep_rabbitmq_federation_management = git_rmq rabbitmq-federation-management $(current_rmq_ref) $(base_rmq_ref) master
|
||||
dep_rabbitmq_java_client = git_rmq rabbitmq-java-client $(current_rmq_ref) $(base_rmq_ref) master
|
||||
dep_rabbitmq_jms_client = git_rmq rabbitmq-jms-client $(current_rmq_ref) $(base_rmq_ref) master
|
||||
dep_rabbitmq_jms_cts = git_rmq rabbitmq-jms-cts $(current_rmq_ref) $(base_rmq_ref) master
|
||||
dep_rabbitmq_jms_topic_exchange = git_rmq rabbitmq-jms-topic-exchange $(current_rmq_ref) $(base_rmq_ref) master
|
||||
dep_rabbitmq_lvc_exchange = git_rmq rabbitmq-lvc-exchange $(current_rmq_ref) $(base_rmq_ref) master
|
||||
dep_rabbitmq_management = git_rmq rabbitmq-management $(current_rmq_ref) $(base_rmq_ref) master
|
||||
dep_rabbitmq_management_agent = git_rmq rabbitmq-management-agent $(current_rmq_ref) $(base_rmq_ref) master
|
||||
dep_rabbitmq_management_exchange = git_rmq rabbitmq-management-exchange $(current_rmq_ref) $(base_rmq_ref) master
|
||||
dep_rabbitmq_management_themes = git_rmq rabbitmq-management-themes $(current_rmq_ref) $(base_rmq_ref) master
|
||||
dep_rabbitmq_message_timestamp = git_rmq rabbitmq-message-timestamp $(current_rmq_ref) $(base_rmq_ref) master
|
||||
dep_rabbitmq_metronome = git_rmq rabbitmq-metronome $(current_rmq_ref) $(base_rmq_ref) master
|
||||
dep_rabbitmq_mqtt = git_rmq rabbitmq-mqtt $(current_rmq_ref) $(base_rmq_ref) master
|
||||
dep_rabbitmq_objc_client = git_rmq rabbitmq-objc-client $(current_rmq_ref) $(base_rmq_ref) master
|
||||
dep_rabbitmq_peer_discovery_aws = git_rmq rabbitmq-peer-discovery-aws $(current_rmq_ref) $(base_rmq_ref) master
|
||||
dep_rabbitmq_peer_discovery_common = git_rmq rabbitmq-peer-discovery-common $(current_rmq_ref) $(base_rmq_ref) master
|
||||
dep_rabbitmq_peer_discovery_consul = git_rmq rabbitmq-peer-discovery-consul $(current_rmq_ref) $(base_rmq_ref) master
|
||||
dep_rabbitmq_peer_discovery_etcd = git_rmq rabbitmq-peer-discovery-etcd $(current_rmq_ref) $(base_rmq_ref) master
|
||||
dep_rabbitmq_peer_discovery_k8s = git_rmq rabbitmq-peer-discovery-k8s $(current_rmq_ref) $(base_rmq_ref) master
|
||||
dep_rabbitmq_prometheus = git_rmq rabbitmq-prometheus $(current_rmq_ref) $(base_rmq_ref) master
|
||||
dep_rabbitmq_random_exchange = git_rmq rabbitmq-random-exchange $(current_rmq_ref) $(base_rmq_ref) master
|
||||
dep_rabbitmq_recent_history_exchange = git_rmq rabbitmq-recent-history-exchange $(current_rmq_ref) $(base_rmq_ref) master
|
||||
dep_rabbitmq_routing_node_stamp = git_rmq rabbitmq-routing-node-stamp $(current_rmq_ref) $(base_rmq_ref) master
|
||||
dep_rabbitmq_rtopic_exchange = git_rmq rabbitmq-rtopic-exchange $(current_rmq_ref) $(base_rmq_ref) master
|
||||
dep_rabbitmq_server_release = git_rmq rabbitmq-server-release $(current_rmq_ref) $(base_rmq_ref) master
|
||||
dep_rabbitmq_sharding = git_rmq rabbitmq-sharding $(current_rmq_ref) $(base_rmq_ref) master
|
||||
dep_rabbitmq_shovel = git_rmq rabbitmq-shovel $(current_rmq_ref) $(base_rmq_ref) master
|
||||
dep_rabbitmq_shovel_management = git_rmq rabbitmq-shovel-management $(current_rmq_ref) $(base_rmq_ref) master
|
||||
dep_rabbitmq_stomp = git_rmq rabbitmq-stomp $(current_rmq_ref) $(base_rmq_ref) master
|
||||
dep_rabbitmq_toke = git_rmq rabbitmq-toke $(current_rmq_ref) $(base_rmq_ref) master
|
||||
dep_rabbitmq_top = git_rmq rabbitmq-top $(current_rmq_ref) $(base_rmq_ref) master
|
||||
dep_rabbitmq_tracing = git_rmq rabbitmq-tracing $(current_rmq_ref) $(base_rmq_ref) master
|
||||
dep_rabbitmq_trust_store = git_rmq rabbitmq-trust-store $(current_rmq_ref) $(base_rmq_ref) master
|
||||
dep_rabbitmq_test = git_rmq rabbitmq-test $(current_rmq_ref) $(base_rmq_ref) master
|
||||
dep_rabbitmq_web_dispatch = git_rmq rabbitmq-web-dispatch $(current_rmq_ref) $(base_rmq_ref) master
|
||||
dep_rabbitmq_web_stomp = git_rmq rabbitmq-web-stomp $(current_rmq_ref) $(base_rmq_ref) master
|
||||
dep_rabbitmq_web_stomp_examples = git_rmq rabbitmq-web-stomp-examples $(current_rmq_ref) $(base_rmq_ref) master
|
||||
dep_rabbitmq_web_mqtt = git_rmq rabbitmq-web-mqtt $(current_rmq_ref) $(base_rmq_ref) master
|
||||
dep_rabbitmq_web_mqtt_examples = git_rmq rabbitmq-web-mqtt-examples $(current_rmq_ref) $(base_rmq_ref) master
|
||||
dep_rabbitmq_website = git_rmq rabbitmq-website $(current_rmq_ref) $(base_rmq_ref) live master
|
||||
dep_toke = git_rmq toke $(current_rmq_ref) $(base_rmq_ref) master
|
||||
|
||||
dep_rabbitmq_public_umbrella = git_rmq rabbitmq-public-umbrella $(current_rmq_ref) $(base_rmq_ref) master
|
||||
|
||||
# Third-party dependencies version pinning.
|
||||
#
|
||||
# We do that in this file, which is copied in all projects, to ensure
|
||||
# all projects use the same versions. It avoids conflicts and makes it
|
||||
# possible to work with rabbitmq-public-umbrella.
|
||||
|
||||
dep_accept = hex 0.3.5
|
||||
dep_cowboy = hex 2.6.1
|
||||
dep_cowlib = hex 2.7.0
|
||||
dep_jsx = hex 2.9.0
|
||||
dep_lager = hex 3.8.0
|
||||
dep_prometheus = git https://github.com/deadtrickster/prometheus.erl.git master
|
||||
dep_ra = git https://github.com/rabbitmq/ra.git master
|
||||
dep_ranch = hex 1.7.1
|
||||
dep_recon = hex 2.5.0
|
||||
dep_observer_cli = hex 1.5.3
|
||||
dep_stdout_formatter = hex 0.2.2
|
||||
dep_sysmon_handler = hex 1.2.0
|
||||
|
||||
RABBITMQ_COMPONENTS = amqp_client \
|
||||
amqp10_common \
|
||||
amqp10_client \
|
||||
rabbit \
|
||||
rabbit_common \
|
||||
rabbitmq_amqp1_0 \
|
||||
rabbitmq_auth_backend_amqp \
|
||||
rabbitmq_auth_backend_cache \
|
||||
rabbitmq_auth_backend_http \
|
||||
rabbitmq_auth_backend_ldap \
|
||||
rabbitmq_auth_backend_oauth2 \
|
||||
rabbitmq_auth_mechanism_ssl \
|
||||
rabbitmq_aws \
|
||||
rabbitmq_boot_steps_visualiser \
|
||||
rabbitmq_cli \
|
||||
rabbitmq_codegen \
|
||||
rabbitmq_consistent_hash_exchange \
|
||||
rabbitmq_ct_client_helpers \
|
||||
rabbitmq_ct_helpers \
|
||||
rabbitmq_delayed_message_exchange \
|
||||
rabbitmq_dotnet_client \
|
||||
rabbitmq_event_exchange \
|
||||
rabbitmq_federation \
|
||||
rabbitmq_federation_management \
|
||||
rabbitmq_java_client \
|
||||
rabbitmq_jms_client \
|
||||
rabbitmq_jms_cts \
|
||||
rabbitmq_jms_topic_exchange \
|
||||
rabbitmq_lvc_exchange \
|
||||
rabbitmq_management \
|
||||
rabbitmq_management_agent \
|
||||
rabbitmq_management_exchange \
|
||||
rabbitmq_management_themes \
|
||||
rabbitmq_message_timestamp \
|
||||
rabbitmq_metronome \
|
||||
rabbitmq_mqtt \
|
||||
rabbitmq_objc_client \
|
||||
rabbitmq_peer_discovery_aws \
|
||||
rabbitmq_peer_discovery_common \
|
||||
rabbitmq_peer_discovery_consul \
|
||||
rabbitmq_peer_discovery_etcd \
|
||||
rabbitmq_peer_discovery_k8s \
|
||||
rabbitmq_prometheus \
|
||||
rabbitmq_random_exchange \
|
||||
rabbitmq_recent_history_exchange \
|
||||
rabbitmq_routing_node_stamp \
|
||||
rabbitmq_rtopic_exchange \
|
||||
rabbitmq_server_release \
|
||||
rabbitmq_sharding \
|
||||
rabbitmq_shovel \
|
||||
rabbitmq_shovel_management \
|
||||
rabbitmq_stomp \
|
||||
rabbitmq_toke \
|
||||
rabbitmq_top \
|
||||
rabbitmq_tracing \
|
||||
rabbitmq_trust_store \
|
||||
rabbitmq_web_dispatch \
|
||||
rabbitmq_web_mqtt \
|
||||
rabbitmq_web_mqtt_examples \
|
||||
rabbitmq_web_stomp \
|
||||
rabbitmq_web_stomp_examples \
|
||||
rabbitmq_website
|
||||
|
||||
# Erlang.mk does not rebuild dependencies by default, once they were
|
||||
# compiled once, except for those listed in the `$(FORCE_REBUILD)`
|
||||
# variable.
|
||||
#
|
||||
# We want all RabbitMQ components to always be rebuilt: this eases
|
||||
# the work on several components at the same time.
|
||||
|
||||
FORCE_REBUILD = $(RABBITMQ_COMPONENTS)
|
||||
|
||||
# Several components have a custom erlang.mk/build.config, mainly
|
||||
# to disable eunit. Therefore, we can't use the top-level project's
|
||||
# erlang.mk copy.
|
||||
NO_AUTOPATCH += $(RABBITMQ_COMPONENTS)
|
||||
|
||||
ifeq ($(origin current_rmq_ref),undefined)
|
||||
ifneq ($(wildcard .git),)
|
||||
current_rmq_ref := $(shell (\
|
||||
ref=$$(LANG=C git branch --list | awk '/^\* \(.*detached / {ref=$$0; sub(/.*detached [^ ]+ /, "", ref); sub(/\)$$/, "", ref); print ref; exit;} /^\* / {ref=$$0; sub(/^\* /, "", ref); print ref; exit}');\
|
||||
if test "$$(git rev-parse --short HEAD)" != "$$ref"; then echo "$$ref"; fi))
|
||||
else
|
||||
current_rmq_ref := master
|
||||
endif
|
||||
endif
|
||||
export current_rmq_ref
|
||||
|
||||
ifeq ($(origin base_rmq_ref),undefined)
|
||||
ifneq ($(wildcard .git),)
|
||||
possible_base_rmq_ref := master
|
||||
ifeq ($(possible_base_rmq_ref),$(current_rmq_ref))
|
||||
base_rmq_ref := $(current_rmq_ref)
|
||||
else
|
||||
base_rmq_ref := $(shell \
|
||||
(git rev-parse --verify -q master >/dev/null && \
|
||||
git rev-parse --verify -q $(possible_base_rmq_ref) >/dev/null && \
|
||||
git merge-base --is-ancestor $$(git merge-base master HEAD) $(possible_base_rmq_ref) && \
|
||||
echo $(possible_base_rmq_ref)) || \
|
||||
echo master)
|
||||
endif
|
||||
else
|
||||
base_rmq_ref := master
|
||||
endif
|
||||
endif
|
||||
export base_rmq_ref
|
||||
|
||||
# Repository URL selection.
|
||||
#
|
||||
# First, we infer other components' location from the current project
|
||||
# repository URL, if it's a Git repository:
|
||||
# - We take the "origin" remote URL as the base
|
||||
# - The current project name and repository name is replaced by the
|
||||
# target's properties:
|
||||
# eg. rabbitmq-common is replaced by rabbitmq-codegen
|
||||
# eg. rabbit_common is replaced by rabbitmq_codegen
|
||||
#
|
||||
# If cloning from this computed location fails, we fallback to RabbitMQ
|
||||
# upstream which is GitHub.
|
||||
|
||||
# Macro to transform eg. "rabbit_common" to "rabbitmq-common".
|
||||
rmq_cmp_repo_name = $(word 2,$(dep_$(1)))
|
||||
|
||||
# Upstream URL for the current project.
|
||||
RABBITMQ_COMPONENT_REPO_NAME := $(call rmq_cmp_repo_name,$(PROJECT))
|
||||
RABBITMQ_UPSTREAM_FETCH_URL ?= https://github.com/rabbitmq/$(RABBITMQ_COMPONENT_REPO_NAME).git
|
||||
RABBITMQ_UPSTREAM_PUSH_URL ?= git@github.com:rabbitmq/$(RABBITMQ_COMPONENT_REPO_NAME).git
|
||||
|
||||
# Current URL for the current project. If this is not a Git clone,
|
||||
# default to the upstream Git repository.
|
||||
ifneq ($(wildcard .git),)
|
||||
git_origin_fetch_url := $(shell git config remote.origin.url)
|
||||
git_origin_push_url := $(shell git config remote.origin.pushurl || git config remote.origin.url)
|
||||
RABBITMQ_CURRENT_FETCH_URL ?= $(git_origin_fetch_url)
|
||||
RABBITMQ_CURRENT_PUSH_URL ?= $(git_origin_push_url)
|
||||
else
|
||||
RABBITMQ_CURRENT_FETCH_URL ?= $(RABBITMQ_UPSTREAM_FETCH_URL)
|
||||
RABBITMQ_CURRENT_PUSH_URL ?= $(RABBITMQ_UPSTREAM_PUSH_URL)
|
||||
endif
|
||||
|
||||
# Macro to replace the following pattern:
|
||||
# 1. /foo.git -> /bar.git
|
||||
# 2. /foo -> /bar
|
||||
# 3. /foo/ -> /bar/
|
||||
subst_repo_name = $(patsubst %/$(1)/%,%/$(2)/%,$(patsubst %/$(1),%/$(2),$(patsubst %/$(1).git,%/$(2).git,$(3))))
|
||||
|
||||
# Macro to replace both the project's name (eg. "rabbit_common") and
|
||||
# repository name (eg. "rabbitmq-common") by the target's equivalent.
|
||||
#
|
||||
# This macro is kept on one line because we don't want whitespaces in
|
||||
# the returned value, as it's used in $(dep_fetch_git_rmq) in a shell
|
||||
# single-quoted string.
|
||||
dep_rmq_repo = $(if $(dep_$(2)),$(call subst_repo_name,$(PROJECT),$(2),$(call subst_repo_name,$(RABBITMQ_COMPONENT_REPO_NAME),$(call rmq_cmp_repo_name,$(2)),$(1))),$(pkg_$(1)_repo))
|
||||
|
||||
dep_rmq_commits = $(if $(dep_$(1)), \
|
||||
$(wordlist 3,$(words $(dep_$(1))),$(dep_$(1))), \
|
||||
$(pkg_$(1)_commit))
|
||||
|
||||
define dep_fetch_git_rmq
|
||||
fetch_url1='$(call dep_rmq_repo,$(RABBITMQ_CURRENT_FETCH_URL),$(1))'; \
|
||||
fetch_url2='$(call dep_rmq_repo,$(RABBITMQ_UPSTREAM_FETCH_URL),$(1))'; \
|
||||
if test "$$$$fetch_url1" != '$(RABBITMQ_CURRENT_FETCH_URL)' && \
|
||||
git clone -q -n -- "$$$$fetch_url1" $(DEPS_DIR)/$(call dep_name,$(1)); then \
|
||||
fetch_url="$$$$fetch_url1"; \
|
||||
push_url='$(call dep_rmq_repo,$(RABBITMQ_CURRENT_PUSH_URL),$(1))'; \
|
||||
elif git clone -q -n -- "$$$$fetch_url2" $(DEPS_DIR)/$(call dep_name,$(1)); then \
|
||||
fetch_url="$$$$fetch_url2"; \
|
||||
push_url='$(call dep_rmq_repo,$(RABBITMQ_UPSTREAM_PUSH_URL),$(1))'; \
|
||||
fi; \
|
||||
cd $(DEPS_DIR)/$(call dep_name,$(1)) && ( \
|
||||
$(foreach ref,$(call dep_rmq_commits,$(1)), \
|
||||
git checkout -q $(ref) >/dev/null 2>&1 || \
|
||||
) \
|
||||
(echo "error: no valid pathspec among: $(call dep_rmq_commits,$(1))" \
|
||||
1>&2 && false) ) && \
|
||||
(test "$$$$fetch_url" = "$$$$push_url" || \
|
||||
git remote set-url --push origin "$$$$push_url")
|
||||
endef
|
||||
|
||||
# --------------------------------------------------------------------
|
||||
# Component distribution.
|
||||
# --------------------------------------------------------------------
|
||||
|
||||
list-dist-deps::
|
||||
@:
|
||||
|
||||
prepare-dist::
|
||||
@:
|
||||
|
||||
# --------------------------------------------------------------------
|
||||
# Umbrella-specific settings.
|
||||
# --------------------------------------------------------------------
|
||||
|
||||
# If this project is under the Umbrella project, we override $(DEPS_DIR)
|
||||
# to point to the Umbrella's one. We also disable `make distclean` so
|
||||
# $(DEPS_DIR) is not accidentally removed.
|
||||
|
||||
ifneq ($(wildcard ../../UMBRELLA.md),)
|
||||
UNDER_UMBRELLA = 1
|
||||
DEPS_DIR ?= $(abspath ..)
|
||||
else ifneq ($(wildcard ../../../../UMBRELLA.md),)
|
||||
UNDER_UMBRELLA = 1
|
||||
DEPS_DIR ?= $(abspath ../../..)
|
||||
else ifneq ($(wildcard UMBRELLA.md),)
|
||||
UNDER_UMBRELLA = 1
|
||||
endif
|
||||
|
||||
ifeq ($(UNDER_UMBRELLA),1)
|
||||
ifneq ($(filter distclean distclean-deps,$(MAKECMDGOALS)),)
|
||||
SKIP_DEPS = 1
|
||||
endif
|
||||
endif
|
||||
|
|
@ -0,0 +1,39 @@
|
|||
%% The contents of this file are subject to the Mozilla Public License
|
||||
%% Version 1.1 (the "License"); you may not use this file except in
|
||||
%% compliance with the License. You may obtain a copy of the License
|
||||
%% at https://www.mozilla.org/MPL/
|
||||
%%
|
||||
%% Software distributed under the License is distributed on an "AS IS"
|
||||
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
|
||||
%% the License for the specific language governing rights and
|
||||
%% limitations under the License.
|
||||
%%
|
||||
%% The Original Code is RabbitMQ.
|
||||
%%
|
||||
%% The Initial Developer of the Original Code is GoPivotal, Inc.
|
||||
%% Copyright (c) 2020 VMware, Inc. or its affiliates. All rights reserved.
|
||||
%%
|
||||
|
||||
-module(rabbit_stream).
|
||||
-behaviour(application).
|
||||
|
||||
-export([start/2, port/0]).
|
||||
-export([stop/1]).
|
||||
|
||||
-include_lib("rabbit_common/include/rabbit.hrl").
|
||||
|
||||
start(_Type, _Args) ->
|
||||
rabbit_stream_sup:start_link().
|
||||
|
||||
port() ->
|
||||
Listeners = rabbit_networking:node_listeners(node()),
|
||||
Port = lists:foldl(fun(#listener{port = Port, protocol = stream}, _Acc) ->
|
||||
Port;
|
||||
(_, Acc) ->
|
||||
Acc
|
||||
end, undefined, Listeners),
|
||||
Port.
|
||||
|
||||
|
||||
stop(_State) ->
|
||||
ok.
|
||||
|
|
@ -0,0 +1,186 @@
|
|||
%% The contents of this file are subject to the Mozilla Public License
|
||||
%% Version 1.1 (the "License"); you may not use this file except in
|
||||
%% compliance with the License. You may obtain a copy of the License
|
||||
%% at https://www.mozilla.org/MPL/
|
||||
%%
|
||||
%% Software distributed under the License is distributed on an "AS IS"
|
||||
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
|
||||
%% the License for the specific language governing rights and
|
||||
%% limitations under the License.
|
||||
%%
|
||||
%% The Original Code is RabbitMQ.
|
||||
%%
|
||||
%% The Initial Developer of the Original Code is GoPivotal, Inc.
|
||||
%% Copyright (c) 2020 VMware, Inc. or its affiliates. All rights reserved.
|
||||
%%
|
||||
|
||||
-module(rabbit_stream_manager).
|
||||
-behaviour(gen_server).
|
||||
|
||||
%% API
|
||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2]).
|
||||
-export([start_link/1, create/1, register/0, delete/1, lookup/1, unregister/0, init_mnesia_tables/0]).
|
||||
|
||||
-record(state, {
|
||||
configuration, listeners, monitors
|
||||
}).
|
||||
|
||||
-record(?MODULE, {name, leader_pid, leader, replicas}).
|
||||
|
||||
-define(TABLE, ?MODULE).
|
||||
|
||||
-rabbit_boot_step(
|
||||
{rabbit_exchange_type_consistent_hash_mnesia,
|
||||
[{description, "rabbitmq stream: shared state"},
|
||||
{mfa, {?MODULE, init_mnesia_tables, []}},
|
||||
{requires, database},
|
||||
{enables, external_infrastructure}]}).
|
||||
|
||||
init_mnesia_tables() ->
|
||||
mnesia:create_table(?TABLE,
|
||||
[{attributes, record_info(fields, ?MODULE)},
|
||||
{type, set}]),
|
||||
mnesia:add_table_copy(?TABLE, node(), ram_copies),
|
||||
mnesia:wait_for_tables([?TABLE], 30000).
|
||||
|
||||
start_link(Conf) ->
|
||||
gen_server:start_link({local, ?MODULE}, ?MODULE, [Conf], []).
|
||||
|
||||
init([Conf]) ->
|
||||
GetAllRecords = fun() ->
|
||||
mnesia:foldl(fun(Record, Acc) ->
|
||||
[Record] ++ Acc
|
||||
end,
|
||||
[], ?TABLE)
|
||||
end,
|
||||
Records = mnesia:activity(sync_transaction, GetAllRecords),
|
||||
[begin
|
||||
#?MODULE{
|
||||
name = Name, leader = Leader, replicas = Replicas, leader_pid = LeaderPidInRecord
|
||||
} = Record,
|
||||
error_logger:info_msg("Loading from database ~p~n", [Record]),
|
||||
case node() of
|
||||
Leader ->
|
||||
case is_process_alive(LeaderPidInRecord) of
|
||||
true ->
|
||||
error_logger:info_msg("Process still alive, doing nothing~n"),
|
||||
ok;
|
||||
false ->
|
||||
error_logger:info_msg("Starting the Osiris cluster~n"),
|
||||
Reference = binary_to_list(Name),
|
||||
|
||||
OsirisConf = #{leader_node => Leader,
|
||||
reference => Reference, name => Reference,
|
||||
replica_nodes => Replicas},
|
||||
|
||||
{ok, #{leader_pid := LeaderPid}} = osiris:start_cluster(OsirisConf),
|
||||
|
||||
error_logger:info_msg("New PID for ~p is ~p~n", [Name, LeaderPid]),
|
||||
|
||||
UpdateFunction = fun() ->
|
||||
mnesia:write(Record#?MODULE{leader_pid = LeaderPid}) end,
|
||||
mnesia:activity(sync_transaction, UpdateFunction)
|
||||
end;
|
||||
_ ->
|
||||
error_logger:info_msg("Node not leader, not starting it~n"),
|
||||
ok
|
||||
end
|
||||
end || Record <- Records],
|
||||
|
||||
{ok, #state{configuration = Conf, listeners = [], monitors = #{}}}.
|
||||
|
||||
create(Reference) ->
|
||||
gen_server:call(?MODULE, {create, Reference}).
|
||||
|
||||
delete(Reference) ->
|
||||
gen_server:call(?MODULE, {delete, Reference}).
|
||||
|
||||
register() ->
|
||||
gen_server:call(?MODULE, {register, self()}).
|
||||
|
||||
unregister() ->
|
||||
gen_server:call(?MODULE, {unregister, self()}).
|
||||
|
||||
lookup(Target) ->
|
||||
gen_server:call(?MODULE, {lookup, Target}).
|
||||
|
||||
replicas_for_current_node() ->
|
||||
rabbit_mnesia:cluster_nodes(all) -- [node()].
|
||||
|
||||
read(Name) ->
|
||||
mnesia:activity(sync_transaction, fun() -> mnesia:read({?TABLE, Name}) end).
|
||||
|
||||
handle_call({create, Reference}, _From, State) ->
|
||||
Key = list_to_binary(Reference),
|
||||
case read(Key) of
|
||||
[] ->
|
||||
LeaderNode = node(),
|
||||
Replicas = replicas_for_current_node(),
|
||||
error_logger:info_msg("Creating ~p cluster on ~p with replica(s) ~p~n", [Key, LeaderNode, Replicas]),
|
||||
OsirisConf = #{leader_node => node(),
|
||||
reference => Reference, name => Reference,
|
||||
replica_nodes => Replicas},
|
||||
{ok, #{leader_pid := LeaderPid}} = Res = osiris:start_cluster(OsirisConf),
|
||||
ClusterRecord = #?MODULE{name = Key, leader_pid = LeaderPid, leader = LeaderNode, replicas = Replicas},
|
||||
F = fun() ->
|
||||
mnesia:write(ClusterRecord)
|
||||
end,
|
||||
mnesia:activity(sync_transaction, F),
|
||||
{reply, Res, State};
|
||||
[_ClusterRecord] ->
|
||||
{reply, {error, reference_already_exists}, State}
|
||||
end;
|
||||
handle_call({delete, Reference}, _From, #state{listeners = Listeners} = State) ->
|
||||
Key = list_to_binary(Reference),
|
||||
case read(Key) of
|
||||
[] ->
|
||||
{reply, {error, reference_not_found}, State};
|
||||
[ClusterRecord] ->
|
||||
Conf = #{
|
||||
name => Reference,
|
||||
reference => Reference,
|
||||
replica_nodes => ClusterRecord#?MODULE.replicas,
|
||||
leader_pid => ClusterRecord#?MODULE.leader_pid,
|
||||
leader_node => ClusterRecord#?MODULE.leader},
|
||||
ok = osiris:delete_cluster(Conf),
|
||||
[Pid ! {stream_manager, cluster_deleted, Reference} || Pid <- Listeners],
|
||||
F = fun() ->
|
||||
mnesia:delete({?TABLE, Key})
|
||||
end,
|
||||
mnesia:activity(sync_transaction, F),
|
||||
{reply, {ok, deleted}, State}
|
||||
end;
|
||||
handle_call({register, Pid}, _From, #state{listeners = Listeners, monitors = Monitors} = State) ->
|
||||
case lists:member(Pid, Listeners) of
|
||||
false ->
|
||||
MonitorRef = erlang:monitor(process, Pid),
|
||||
{reply, ok, State#state{listeners = [Pid | Listeners], monitors = Monitors#{Pid => MonitorRef}}};
|
||||
true ->
|
||||
{reply, ok, State}
|
||||
end;
|
||||
handle_call({unregister, Pid}, _From, #state{listeners = Listeners, monitors = Monitors} = State) ->
|
||||
Monitors1 = case maps:get(Pid, Monitors, undefined) of
|
||||
undefined ->
|
||||
Monitors;
|
||||
MonitorRef ->
|
||||
erlang:demonitor(MonitorRef, [flush]),
|
||||
maps:remove(Pid, Monitors)
|
||||
end,
|
||||
{reply, ok, State#state{listeners = lists:delete(Pid, Listeners), monitors = Monitors1}};
|
||||
handle_call({lookup, Target}, _From, State) ->
|
||||
Res = case read(Target) of
|
||||
[] ->
|
||||
cluster_not_found;
|
||||
[#?MODULE{leader_pid = LeaderPid}] ->
|
||||
LeaderPid
|
||||
end,
|
||||
{reply, Res, State}.
|
||||
|
||||
handle_cast(_, State) ->
|
||||
{noreply, State}.
|
||||
|
||||
handle_info({'DOWN', _MonitorRef, process, Pid, _Info}, #state{listeners = Listeners, monitors = Monitors} = State) ->
|
||||
{noreply, State#state{listeners = lists:delete(Pid, Listeners), monitors = maps:remove(Pid, Monitors)}};
|
||||
handle_info(Info, State) ->
|
||||
error_logger:info_msg("Received info ~p~n", [Info]),
|
||||
{noreply, State}.
|
||||
|
|
@ -0,0 +1,531 @@
|
|||
%% The contents of this file are subject to the Mozilla Public License
|
||||
%% Version 1.1 (the "License"); you may not use this file except in
|
||||
%% compliance with the License. You may obtain a copy of the License
|
||||
%% at https://www.mozilla.org/MPL/
|
||||
%%
|
||||
%% Software distributed under the License is distributed on an "AS IS"
|
||||
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
|
||||
%% the License for the specific language governing rights and
|
||||
%% limitations under the License.
|
||||
%%
|
||||
%% The Original Code is RabbitMQ.
|
||||
%%
|
||||
%% The Initial Developer of the Original Code is GoPivotal, Inc.
|
||||
%% Copyright (c) 2020 VMware, Inc. or its affiliates. All rights reserved.
|
||||
%%
|
||||
|
||||
-module(rabbit_stream_protocol).
|
||||
|
||||
-behaviour(ranch_protocol).
|
||||
|
||||
-export([start_link/4]).
|
||||
-export([init/3]).
|
||||
|
||||
-record(connection, {
|
||||
listen_socket, socket, clusters, data, consumers, max_offsets,
|
||||
target_subscriptions, credits,
|
||||
blocked
|
||||
}).
|
||||
|
||||
-record(consumer, {
|
||||
socket, leader, offset, subscription_id, segment, credit, target
|
||||
}).
|
||||
|
||||
-record(configuration, {
|
||||
initial_credits, credits_required_for_unblocking
|
||||
}).
|
||||
|
||||
-define(COMMAND_PUBLISH, 0).
|
||||
-define(COMMAND_PUBLISH_CONFIRM, 1).
|
||||
-define(COMMAND_SUBSCRIBE, 2).
|
||||
-define(COMMAND_DELIVER, 3).
|
||||
-define(COMMAND_CREDIT, 4).
|
||||
-define(COMMAND_UNSUBSCRIBE, 5).
|
||||
-define(COMMAND_PUBLISH_ERROR, 6).
|
||||
-define(COMMAND_METADATA_UPDATE, 7).
|
||||
-define(COMMAND_METADATA, 8).
|
||||
-define(COMMAND_CREATE_TARGET, 998).
|
||||
-define(COMMAND_DELETE_TARGET, 999).
|
||||
|
||||
-define(VERSION_0, 0).
|
||||
|
||||
-define(RESPONSE_CODE_OK, 0).
|
||||
-define(RESPONSE_CODE_TARGET_DOES_NOT_EXIST, 1).
|
||||
-define(RESPONSE_CODE_SUBSCRIPTION_ID_ALREADY_EXISTS, 2).
|
||||
-define(RESPONSE_CODE_SUBSCRIPTION_ID_DOES_NOT_EXIST, 3).
|
||||
-define(RESPONSE_CODE_TARGET_ALREADY_EXISTS, 4).
|
||||
-define(RESPONSE_CODE_TARGET_DELETED, 5).
|
||||
|
||||
-define(RESPONSE_FRAME_SIZE, 10). % 2 (key) + 2 (version) + 4 (correlation ID) + 2 (response code)
|
||||
|
||||
start_link(Ref, _Socket, Transport, Opts) ->
|
||||
Pid = spawn_link(?MODULE, init, [Ref, Transport, Opts]),
|
||||
{ok, Pid}.
|
||||
|
||||
init(Ref, Transport, _Opts = #{initial_credits := InitialCredits,
|
||||
credits_required_for_unblocking := CreditsRequiredBeforeUnblocking} = _ServerConfiguration) ->
|
||||
{ok, Socket} = ranch:handshake(Ref),
|
||||
rabbit_stream_manager:register(),
|
||||
Credits = atomics:new(1, [{signed, true}]),
|
||||
init_credit(Credits, InitialCredits),
|
||||
State = #connection{socket = Socket, data = none,
|
||||
clusters = #{},
|
||||
consumers = #{}, max_offsets = #{}, target_subscriptions = #{},
|
||||
blocked = false, credits = Credits},
|
||||
Transport:setopts(Socket, [{active, once}]),
|
||||
listen_loop(Transport, State, #configuration{
|
||||
initial_credits = application:get_env(rabbitmq_stream, initial_credits, InitialCredits),
|
||||
credits_required_for_unblocking = application:get_env(rabbitmq_stream, credits_required_for_unblocking, CreditsRequiredBeforeUnblocking)
|
||||
}).
|
||||
|
||||
init_credit(CreditReference, Credits) ->
|
||||
atomics:put(CreditReference, 1, Credits).
|
||||
|
||||
sub_credits(CreditReference, Credits) ->
|
||||
atomics:sub(CreditReference, 1, Credits).
|
||||
|
||||
add_credits(CreditReference, Credits) ->
|
||||
|
||||
atomics:add(CreditReference, 1, Credits).
|
||||
|
||||
has_credits(CreditReference) ->
|
||||
atomics:get(CreditReference, 1) > 0.
|
||||
|
||||
has_enough_credits_to_unblock(CreditReference, CreditsRequiredForUnblocking) ->
|
||||
atomics:get(CreditReference, 1) > CreditsRequiredForUnblocking.
|
||||
|
||||
listen_loop(Transport, #connection{socket = S, consumers = Consumers, max_offsets = MaxOffsets,
|
||||
target_subscriptions = TargetSubscriptions, credits = Credits, blocked = Blocked} = State,
|
||||
#configuration{credits_required_for_unblocking = CreditsRequiredForUnblocking} = Configuration) ->
|
||||
{OK, Closed, Error} = Transport:messages(),
|
||||
receive
|
||||
{OK, S, Data} ->
|
||||
State1 = handle_inbound_data(Transport, State, Data),
|
||||
State2 = case Blocked of
|
||||
true ->
|
||||
case has_enough_credits_to_unblock(Credits, CreditsRequiredForUnblocking) of
|
||||
true ->
|
||||
Transport:setopts(S, [{active, once}]),
|
||||
State1#connection{blocked = false};
|
||||
false ->
|
||||
State1
|
||||
end;
|
||||
false ->
|
||||
case has_credits(Credits) of
|
||||
true ->
|
||||
Transport:setopts(S, [{active, once}]),
|
||||
State1;
|
||||
false ->
|
||||
State1#connection{blocked = true}
|
||||
end
|
||||
end,
|
||||
listen_loop(Transport, State2, Configuration);
|
||||
{stream_manager, cluster_deleted, ClusterReference} ->
|
||||
Target = list_to_binary(ClusterReference),
|
||||
State1 = case clean_state_after_target_deletion(Target, State) of
|
||||
{cleaned, NewState} ->
|
||||
TargetSize = byte_size(Target),
|
||||
FrameSize = 2 + 2 + 2 + 2 + TargetSize,
|
||||
Transport:send(S, [<<FrameSize:32, ?COMMAND_METADATA_UPDATE:16, ?VERSION_0:16,
|
||||
?RESPONSE_CODE_TARGET_DELETED:16, TargetSize:16, Target/binary>>]),
|
||||
NewState;
|
||||
{not_cleaned, SameState} ->
|
||||
SameState
|
||||
end,
|
||||
listen_loop(Transport, State1, Configuration);
|
||||
{osiris_written, _Name, CorrelationIdList} ->
|
||||
CorrelationIdBinaries = [<<CorrelationId:64>> || CorrelationId <- CorrelationIdList],
|
||||
CorrelationIdCount = length(CorrelationIdList),
|
||||
FrameSize = 2 + 2 + 4 + CorrelationIdCount * 8,
|
||||
Transport:send(S, [<<FrameSize:32, ?COMMAND_PUBLISH_CONFIRM:16, ?VERSION_0:16>>, <<CorrelationIdCount:32>>, CorrelationIdBinaries]),
|
||||
add_credits(Credits, CorrelationIdCount),
|
||||
State1 = case Blocked of
|
||||
true ->
|
||||
case has_enough_credits_to_unblock(Credits, CreditsRequiredForUnblocking) of
|
||||
true ->
|
||||
Transport:setopts(S, [{active, once}]),
|
||||
State#connection{blocked = false};
|
||||
false ->
|
||||
State
|
||||
end;
|
||||
false ->
|
||||
State
|
||||
end,
|
||||
listen_loop(Transport, State1, Configuration);
|
||||
{osiris_offset, TargetName, -1} ->
|
||||
error_logger:info_msg("received osiris offset event for ~p with offset ~p~n", [TargetName, -1]),
|
||||
listen_loop(Transport, State, Configuration);
|
||||
{osiris_offset, TargetName, Offset} when Offset > -1 ->
|
||||
State1 = case maps:get(TargetName, TargetSubscriptions, undefined) of
|
||||
undefined ->
|
||||
error_logger:info_msg("osiris offset event for ~p, but no subscription (leftover messages after unsubscribe?)", [TargetName]),
|
||||
unregister_offset_listener(State, TargetName),
|
||||
State#connection{max_offsets = maps:remove(TargetName, MaxOffsets)};
|
||||
[] ->
|
||||
error_logger:info_msg("osiris offset event for ~p, but no registered consumers!", [TargetName]),
|
||||
unregister_offset_listener(State, TargetName),
|
||||
State#connection{target_subscriptions = maps:remove(TargetName, TargetSubscriptions),
|
||||
max_offsets = maps:remove(TargetName, MaxOffsets)};
|
||||
CorrelationIds when is_list(CorrelationIds) ->
|
||||
Consumers1 = lists:foldl(fun(CorrelationId, ConsumersAcc) ->
|
||||
#{CorrelationId := Consumer} = ConsumersAcc,
|
||||
#consumer{credit = Credit} = Consumer,
|
||||
Consumer1 = case Credit of
|
||||
0 ->
|
||||
Consumer;
|
||||
_ ->
|
||||
{{segment, Segment1}, {credit, Credit1}} = send_chunks(
|
||||
Transport,
|
||||
Consumer,
|
||||
Offset
|
||||
),
|
||||
Consumer#consumer{segment = Segment1, credit = Credit1}
|
||||
end,
|
||||
ConsumersAcc#{CorrelationId => Consumer1}
|
||||
end,
|
||||
Consumers,
|
||||
CorrelationIds),
|
||||
State#connection{consumers = Consumers1, max_offsets = MaxOffsets#{TargetName => Offset}}
|
||||
end,
|
||||
listen_loop(Transport, State1, Configuration);
|
||||
{Closed, S} ->
|
||||
%% unregister from osiris clusters
|
||||
maps:fold(fun(Target, _SubIds, _Acc) ->
|
||||
unregister_offset_listener(State, Target)
|
||||
end, 0, TargetSubscriptions),
|
||||
rabbit_stream_manager:unregister(),
|
||||
error_logger:info_msg("Socket ~w closed [~w]~n", [S, self()]),
|
||||
ok;
|
||||
{Error, S, Reason} ->
|
||||
%% unregister from osiris clusters
|
||||
maps:fold(fun(Target, _SubIds, _Acc) ->
|
||||
unregister_offset_listener(State, Target)
|
||||
end, 0, TargetSubscriptions),
|
||||
rabbit_stream_manager:unregister(),
|
||||
error_logger:info_msg("Socket error ~p [~w]~n", [Reason, S, self()]);
|
||||
M ->
|
||||
error_logger:warning_msg("Unknown message ~p~n", [M]),
|
||||
listen_loop(Transport, State, Configuration)
|
||||
end.
|
||||
|
||||
unregister_offset_listener(State, Target) ->
|
||||
TargetKey = case Target of
|
||||
B when is_binary(B) ->
|
||||
B;
|
||||
L when is_list(L) ->
|
||||
list_to_binary(L)
|
||||
end,
|
||||
case lookup_cluster(TargetKey, State) of
|
||||
cluster_not_found ->
|
||||
ok;
|
||||
{ClusterLeader, _} ->
|
||||
osiris_writer:unregister_offset_listener(ClusterLeader)
|
||||
end.
|
||||
|
||||
handle_inbound_data(_Transport, State, <<>>) ->
|
||||
State;
|
||||
handle_inbound_data(Transport, #connection{data = none} = State, <<Size:32, Frame:Size/binary, Rest/bits>>) ->
|
||||
{State1, Rest1} = handle_frame(Transport, State, Frame, Rest),
|
||||
handle_inbound_data(Transport, State1, Rest1);
|
||||
handle_inbound_data(_Transport, #connection{data = none} = State, Data) ->
|
||||
State#connection{data = Data};
|
||||
handle_inbound_data(Transport, #connection{data = Leftover} = State, Data) ->
|
||||
State1 = State#connection{data = none},
|
||||
%% FIXME avoid concatenation to avoid a new binary allocation
|
||||
%% see osiris_replica:parse_chunk/3
|
||||
handle_inbound_data(Transport, State1, <<Leftover/binary, Data/binary>>).
|
||||
|
||||
write_messages(_ClusterLeader, <<>>) ->
|
||||
ok;
|
||||
write_messages(ClusterLeader, <<PublishingId:64, MessageSize:32, Message:MessageSize/binary, Rest/binary>>) ->
|
||||
% FIXME handle write error
|
||||
ok = osiris:write(ClusterLeader, PublishingId, Message),
|
||||
write_messages(ClusterLeader, Rest).
|
||||
|
||||
generate_publishing_error_details(Acc, <<>>) ->
|
||||
Acc;
|
||||
generate_publishing_error_details(Acc, <<PublishingId:64, MessageSize:32, _Message:MessageSize/binary, Rest/binary>>) ->
|
||||
generate_publishing_error_details(
|
||||
<<Acc/binary, PublishingId:64, ?RESPONSE_CODE_TARGET_DOES_NOT_EXIST:16>>,
|
||||
Rest).
|
||||
|
||||
handle_frame(Transport, #connection{socket = S, credits = Credits} = State,
|
||||
<<?COMMAND_PUBLISH:16, ?VERSION_0:16,
|
||||
TargetSize:16, Target:TargetSize/binary,
|
||||
MessageCount:32, Messages/binary>>, Rest) ->
|
||||
case lookup_cluster(Target, State) of
|
||||
cluster_not_found ->
|
||||
FrameSize = 2 + 2 + 4 + (8 + 2) * MessageCount,
|
||||
Details = generate_publishing_error_details(<<>>, Messages),
|
||||
Transport:send(S, [<<FrameSize:32, ?COMMAND_PUBLISH_ERROR:16, ?VERSION_0:16,
|
||||
MessageCount:32, Details/binary>>]),
|
||||
{State, Rest};
|
||||
{ClusterLeader, State1} ->
|
||||
write_messages(ClusterLeader, Messages),
|
||||
sub_credits(Credits, MessageCount),
|
||||
{State1, Rest}
|
||||
end;
|
||||
handle_frame(Transport, #connection{socket = Socket, consumers = Consumers, target_subscriptions = TargetSubscriptions,
|
||||
max_offsets = MaxOffsets} = State,
|
||||
<<?COMMAND_SUBSCRIBE:16, ?VERSION_0:16, CorrelationId:32, SubscriptionId:32, TargetSize:16, Target:TargetSize/binary, Offset:64/unsigned, Credit:16>>, Rest) ->
|
||||
case lookup_cluster(Target, State) of
|
||||
cluster_not_found ->
|
||||
response(Transport, State, ?COMMAND_SUBSCRIBE, CorrelationId, ?RESPONSE_CODE_TARGET_DOES_NOT_EXIST),
|
||||
{State, Rest};
|
||||
{ClusterLeader, State1} ->
|
||||
% offset message uses a list for the target, so storing this in the state for easier retrieval
|
||||
TargetKey = binary_to_list(Target),
|
||||
case subscription_exists(TargetSubscriptions, SubscriptionId) of
|
||||
true ->
|
||||
response(Transport, State1, ?COMMAND_SUBSCRIBE, CorrelationId, ?RESPONSE_CODE_SUBSCRIPTION_ID_ALREADY_EXISTS),
|
||||
{State1, Rest};
|
||||
false ->
|
||||
Segment = osiris_writer:init_reader(ClusterLeader, Offset),
|
||||
ConsumerState = #consumer{
|
||||
leader = ClusterLeader, offset = Offset, subscription_id = SubscriptionId, socket = Socket,
|
||||
segment = Segment,
|
||||
credit = Credit,
|
||||
target = TargetKey
|
||||
},
|
||||
error_logger:info_msg("registering consumer ~p in ~p~n", [ConsumerState, self()]),
|
||||
|
||||
response_ok(Transport, State1, ?COMMAND_SUBSCRIBE, CorrelationId),
|
||||
|
||||
Consumers1 =
|
||||
case MaxOffsets of
|
||||
#{TargetKey := MaxOffset} ->
|
||||
%% already received messages from this target in this connection, pushing what we can now
|
||||
{{segment, Segment1}, {credit, Credit1}} = send_chunks(
|
||||
Transport,
|
||||
ConsumerState,
|
||||
MaxOffset
|
||||
),
|
||||
Consumers#{SubscriptionId => ConsumerState#consumer{segment = Segment1, credit = Credit1}};
|
||||
_ ->
|
||||
osiris_writer:register_offset_listener(ClusterLeader),
|
||||
%% first registration for this target, messages will be sent when receiving offset message
|
||||
Consumers#{SubscriptionId => ConsumerState}
|
||||
end,
|
||||
TargetSubscriptions1 =
|
||||
case TargetSubscriptions of
|
||||
#{TargetKey := SubscriptionIds} ->
|
||||
TargetSubscriptions#{TargetKey => [SubscriptionId] ++ SubscriptionIds};
|
||||
_ ->
|
||||
TargetSubscriptions#{TargetKey => [SubscriptionId]}
|
||||
end,
|
||||
{State1#connection{consumers = Consumers1, target_subscriptions = TargetSubscriptions1}, Rest}
|
||||
end
|
||||
end;
|
||||
handle_frame(Transport, #connection{consumers = Consumers, target_subscriptions = TargetSubscriptions,
|
||||
max_offsets = MaxOffsets, clusters = Clusters} = State,
|
||||
<<?COMMAND_UNSUBSCRIBE:16, ?VERSION_0:16, CorrelationId:32, SubscriptionId:32>>, Rest) ->
|
||||
case subscription_exists(TargetSubscriptions, SubscriptionId) of
|
||||
false ->
|
||||
response(Transport, State, ?COMMAND_UNSUBSCRIBE, CorrelationId, ?RESPONSE_CODE_SUBSCRIPTION_ID_DOES_NOT_EXIST),
|
||||
{State, Rest};
|
||||
true ->
|
||||
#{SubscriptionId := Consumer} = Consumers,
|
||||
Target = Consumer#consumer.target,
|
||||
#{Target := SubscriptionsForThisTarget} = TargetSubscriptions,
|
||||
SubscriptionsForThisTarget1 = lists:delete(SubscriptionId, SubscriptionsForThisTarget),
|
||||
{TargetSubscriptions1, MaxOffsets1, Clusters1} =
|
||||
case length(SubscriptionsForThisTarget1) of
|
||||
0 ->
|
||||
%% no more subscriptions for this target
|
||||
unregister_offset_listener(State, Target),
|
||||
{maps:remove(Target, TargetSubscriptions), maps:remove(Target, MaxOffsets),
|
||||
maps:remove(list_to_binary(Target), Clusters)
|
||||
};
|
||||
_ ->
|
||||
{TargetSubscriptions#{Target => SubscriptionsForThisTarget1}, MaxOffsets, Clusters}
|
||||
end,
|
||||
Consumers1 = maps:remove(SubscriptionId, Consumers),
|
||||
response_ok(Transport, State, ?COMMAND_SUBSCRIBE, CorrelationId),
|
||||
{State#connection{consumers = Consumers1,
|
||||
target_subscriptions = TargetSubscriptions1,
|
||||
max_offsets = MaxOffsets1,
|
||||
clusters = Clusters1
|
||||
}, Rest}
|
||||
end;
|
||||
handle_frame(Transport, #connection{consumers = Consumers, max_offsets = MaxOffsets} = State,
|
||||
<<?COMMAND_CREDIT:16, ?VERSION_0:16, SubscriptionId:32, Credit:16>>, Rest) ->
|
||||
|
||||
case Consumers of
|
||||
#{SubscriptionId := Consumer} ->
|
||||
#consumer{target = Target, credit = AvailableCredit} = Consumer,
|
||||
#{Target := MaxOffset} = MaxOffsets,
|
||||
|
||||
{{segment, Segment1}, {credit, Credit1}} = send_chunks(
|
||||
Transport,
|
||||
Consumer,
|
||||
MaxOffset,
|
||||
AvailableCredit + Credit
|
||||
),
|
||||
|
||||
Consumer1 = Consumer#consumer{segment = Segment1, credit = Credit1},
|
||||
{State#connection{consumers = Consumers#{SubscriptionId => Consumer1}}, Rest};
|
||||
_ ->
|
||||
%% FIXME find a way to tell the client it's crediting an unknown subscription
|
||||
error_logger:warning_msg("Giving credit to unknown subscription: ~p~n", [SubscriptionId]),
|
||||
{State, Rest}
|
||||
end;
|
||||
handle_frame(Transport, State,
|
||||
<<?COMMAND_CREATE_TARGET:16, ?VERSION_0:16, CorrelationId:32, TargetSize:16, Target:TargetSize/binary>>, Rest) ->
|
||||
case rabbit_stream_manager:create(binary_to_list(Target)) of
|
||||
{ok, #{leader_pid := LeaderPid, replica_pids := ReturnedReplicas}} ->
|
||||
error_logger:info_msg("Created cluster with leader ~p and replicas ~p~n", [LeaderPid, ReturnedReplicas]),
|
||||
response_ok(Transport, State, ?COMMAND_CREATE_TARGET, CorrelationId),
|
||||
{State, Rest};
|
||||
{error, reference_already_exists} ->
|
||||
response(Transport, State, ?COMMAND_CREATE_TARGET, CorrelationId, ?RESPONSE_CODE_TARGET_ALREADY_EXISTS),
|
||||
{State, Rest}
|
||||
end;
|
||||
handle_frame(Transport, #connection{socket = S} = State,
|
||||
<<?COMMAND_DELETE_TARGET:16, ?VERSION_0:16, CorrelationId:32, TargetSize:16, Target:TargetSize/binary>>, Rest) ->
|
||||
case rabbit_stream_manager:delete(binary_to_list(Target)) of
|
||||
{ok, deleted} ->
|
||||
response_ok(Transport, State, ?COMMAND_DELETE_TARGET, CorrelationId),
|
||||
State1 = case clean_state_after_target_deletion(Target, State) of
|
||||
{cleaned, NewState} ->
|
||||
TargetSize = byte_size(Target),
|
||||
FrameSize = 2 + 2 + 2 + 2 + TargetSize,
|
||||
Transport:send(S, [<<FrameSize:32, ?COMMAND_METADATA_UPDATE:16, ?VERSION_0:16,
|
||||
?RESPONSE_CODE_TARGET_DELETED:16, TargetSize:16, Target/binary>>]),
|
||||
NewState;
|
||||
{not_cleaned, SameState} ->
|
||||
SameState
|
||||
end,
|
||||
{State1, Rest};
|
||||
{error, reference_not_found} ->
|
||||
response(Transport, State, ?COMMAND_DELETE_TARGET, CorrelationId, ?RESPONSE_CODE_TARGET_DOES_NOT_EXIST),
|
||||
{State, Rest}
|
||||
end;
|
||||
handle_frame(Transport, #connection{socket = S} = State,
|
||||
<<?COMMAND_METADATA:16, ?VERSION_0:16, CorrelationId:32, TargetCount:32, BinaryTargets/binary>>, Rest) ->
|
||||
%% FIXME: rely only on rabbit_networking to discover the listeners
|
||||
Nodes = rabbit_mnesia:cluster_nodes(all),
|
||||
{NodesInfo, _} = lists:foldl(fun(Node, {Acc, Index}) ->
|
||||
{ok, Host} = rpc:call(Node, inet, gethostname, []),
|
||||
Port = rpc:call(Node, rabbit_stream, port, []),
|
||||
{Acc#{Node => {{index, Index}, {host, list_to_binary(Host)}, {port, Port}}}, Index + 1}
|
||||
end, {#{}, 0}, Nodes),
|
||||
|
||||
BrokersCount = length(Nodes),
|
||||
BrokersBin = maps:fold(fun(_K, {{index, Index}, {host, Host}, {port, Port}}, Acc) ->
|
||||
HostLength = byte_size(Host),
|
||||
<<Acc/binary, Index:16, HostLength:16, Host:HostLength/binary, Port:32>>
|
||||
end, <<BrokersCount:32>>, NodesInfo),
|
||||
|
||||
Targets = extract_target_list(BinaryTargets, []),
|
||||
|
||||
MetadataBin = lists:foldl(fun(Target, Acc) ->
|
||||
TargetLength = byte_size(Target),
|
||||
case lookup_cluster(Target, State) of
|
||||
cluster_not_found ->
|
||||
<<Acc/binary, TargetLength:16, Target:TargetLength/binary, ?RESPONSE_CODE_TARGET_DOES_NOT_EXIST:16,
|
||||
-1:16, 0:32>>;
|
||||
{Cluster, _} ->
|
||||
LeaderNode = node(Cluster),
|
||||
#{LeaderNode := NodeInfo} = NodesInfo,
|
||||
{{index, LeaderIndex}, {host, _}, {port, _}} = NodeInfo,
|
||||
Replicas = maps:without([LeaderNode], NodesInfo),
|
||||
ReplicasBinary = lists:foldl(fun(NI, Bin) ->
|
||||
{{index, ReplicaIndex}, {host, _}, {port, _}} = NI,
|
||||
<<Bin/binary, ReplicaIndex:16>>
|
||||
end, <<>>, maps:values(Replicas)),
|
||||
ReplicasCount = maps:size(Replicas),
|
||||
|
||||
<<Acc/binary, TargetLength:16, Target:TargetLength/binary, ?RESPONSE_CODE_OK:16,
|
||||
LeaderIndex:16, ReplicasCount:32, ReplicasBinary/binary>>
|
||||
end
|
||||
|
||||
end, <<TargetCount:32>>, Targets),
|
||||
Frame = <<?COMMAND_METADATA:16, ?VERSION_0:16, CorrelationId:32, BrokersBin/binary, MetadataBin/binary>>,
|
||||
FrameSize = byte_size(Frame),
|
||||
Transport:send(S, <<FrameSize:32, Frame/binary>>),
|
||||
{State, Rest};
|
||||
handle_frame(_Transport, State, Frame, Rest) ->
|
||||
error_logger:warning_msg("unknown frame ~p ~p, ignoring.~n", [Frame, Rest]),
|
||||
{State, Rest}.
|
||||
|
||||
extract_target_list(<<>>, Targets) ->
|
||||
Targets;
|
||||
extract_target_list(<<Length:16, Target:Length/binary, Rest/binary>>, Targets) ->
|
||||
extract_target_list(Rest, [Target | Targets]).
|
||||
|
||||
clean_state_after_target_deletion(Target, #connection{clusters = Clusters, target_subscriptions = TargetSubscriptions,
|
||||
consumers = Consumers, max_offsets = MaxOffsets} = State) ->
|
||||
TargetAsList = binary_to_list(Target),
|
||||
case maps:is_key(TargetAsList, TargetSubscriptions) of
|
||||
true ->
|
||||
unregister_offset_listener(State, Target),
|
||||
#{TargetAsList := SubscriptionIds} = TargetSubscriptions,
|
||||
{cleaned, State#connection{
|
||||
clusters = maps:remove(Target, Clusters),
|
||||
target_subscriptions = maps:remove(TargetAsList, TargetSubscriptions),
|
||||
consumers = maps:without(SubscriptionIds, Consumers),
|
||||
max_offsets = maps:remove(TargetAsList, MaxOffsets)
|
||||
}};
|
||||
false ->
|
||||
{not_cleaned, State}
|
||||
end.
|
||||
|
||||
lookup_cluster(Target, #connection{clusters = Clusters} = State) ->
|
||||
case maps:get(Target, Clusters, undefined) of
|
||||
undefined ->
|
||||
case lookup_cluster_from_manager(Target) of
|
||||
cluster_not_found ->
|
||||
cluster_not_found;
|
||||
ClusterPid ->
|
||||
{ClusterPid, State#connection{clusters = Clusters#{Target => ClusterPid}}}
|
||||
end;
|
||||
ClusterPid ->
|
||||
{ClusterPid, State}
|
||||
end.
|
||||
|
||||
lookup_cluster_from_manager(Target) ->
|
||||
rabbit_stream_manager:lookup(Target).
|
||||
|
||||
response_ok(Transport, State, CommandId, CorrelationId) ->
|
||||
response(Transport, State, CommandId, CorrelationId, ?RESPONSE_CODE_OK).
|
||||
|
||||
response(Transport, #connection{socket = S}, CommandId, CorrelationId, ResponseCode) ->
|
||||
Transport:send(S, [<<?RESPONSE_FRAME_SIZE:32, CommandId:16, ?VERSION_0:16>>, <<CorrelationId:32>>, <<ResponseCode:16>>]).
|
||||
|
||||
subscription_exists(TargetSubscriptions, SubscriptionId) ->
|
||||
SubscriptionIds = lists:flatten(maps:values(TargetSubscriptions)),
|
||||
lists:any(fun(Id) -> Id =:= SubscriptionId end, SubscriptionIds).
|
||||
|
||||
send_file_callback(Transport, #consumer{socket = S, subscription_id = SubscriptionId}) ->
|
||||
fun(Size) ->
|
||||
FrameSize = 2 + 2 + 4 + Size,
|
||||
FrameBeginning = <<FrameSize:32, ?COMMAND_DELIVER:16, ?VERSION_0:16, SubscriptionId:32>>,
|
||||
Transport:send(S, FrameBeginning)
|
||||
end.
|
||||
|
||||
send_chunks(Transport, #consumer{credit = Credit} = State, MaxOffset) ->
|
||||
send_chunks(Transport, State, MaxOffset, Credit).
|
||||
|
||||
send_chunks(_Transport, #consumer{segment = Segment}, _MaxOffset, 0) ->
|
||||
{{segment, Segment}, {credit, 0}};
|
||||
send_chunks(Transport, #consumer{segment = Segment} = State, MaxOffset, Credit) ->
|
||||
send_chunks(Transport, State, Segment, MaxOffset, Credit).
|
||||
|
||||
send_chunks(_Transport, _State, Segment, _MaxOffset, 0 = _Credit) ->
|
||||
{{segment, Segment}, {credit, 0}};
|
||||
send_chunks(Transport, #consumer{socket = S} = State, Segment, MaxOffset, Credit) ->
|
||||
case osiris_segment:send_file(S, Segment, MaxOffset, send_file_callback(Transport, State)) of
|
||||
{ok, Segment1} ->
|
||||
send_chunks(
|
||||
Transport,
|
||||
State,
|
||||
Segment1,
|
||||
MaxOffset,
|
||||
Credit - 1
|
||||
);
|
||||
{end_of_stream, Segment1} ->
|
||||
{{segment, Segment1}, {credit, Credit}}
|
||||
end.
|
||||
|
||||
|
||||
|
||||
|
|
@ -0,0 +1,60 @@
|
|||
%% The contents of this file are subject to the Mozilla Public License
|
||||
%% Version 1.1 (the "License"); you may not use this file except in
|
||||
%% compliance with the License. You may obtain a copy of the License
|
||||
%% at https://www.mozilla.org/MPL/
|
||||
%%
|
||||
%% Software distributed under the License is distributed on an "AS IS"
|
||||
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
|
||||
%% the License for the specific language governing rights and
|
||||
%% limitations under the License.
|
||||
%%
|
||||
%% The Original Code is RabbitMQ.
|
||||
%%
|
||||
%% The Initial Developer of the Original Code is GoPivotal, Inc.
|
||||
%% Copyright (c) 2020 VMware, Inc. or its affiliates. All rights reserved.
|
||||
%%
|
||||
|
||||
-module(rabbit_stream_sup).
|
||||
-behaviour(supervisor).
|
||||
|
||||
-export([start_link/0]).
|
||||
-export([init/1]).
|
||||
|
||||
-define(INITIAL_CREDITS, 50000).
|
||||
-define(CREDITS_REQUIRED_FOR_UNBLOCKING, 12500).
|
||||
|
||||
start_link() ->
|
||||
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
|
||||
|
||||
init([]) ->
|
||||
{ok, Listeners} = application:get_env(rabbitmq_stream, tcp_listeners),
|
||||
NumTcpAcceptors = application:get_env(rabbitmq_stream, num_tcp_acceptors, 10),
|
||||
{ok, SocketOpts} = application:get_env(rabbitmq_stream, tcp_listen_options),
|
||||
Nodes = rabbit_mnesia:cluster_nodes(all),
|
||||
OsirisConf = #{nodes => Nodes},
|
||||
|
||||
ServerConfiguration = #{
|
||||
initial_credits => application:get_env(rabbitmq_stream, initial_credits, ?INITIAL_CREDITS),
|
||||
credits_required_for_unblocking => application:get_env(rabbitmq_stream, credits_required_for_unblocking, ?CREDITS_REQUIRED_FOR_UNBLOCKING)
|
||||
},
|
||||
|
||||
StreamManager = #{id => rabbit_stream_manager,
|
||||
type => worker,
|
||||
start => {rabbit_stream_manager, start_link, [OsirisConf]}},
|
||||
|
||||
{ok, {{one_for_all, 10, 10},
|
||||
[StreamManager] ++
|
||||
listener_specs(fun tcp_listener_spec/1,
|
||||
[SocketOpts, ServerConfiguration, NumTcpAcceptors], Listeners)}}.
|
||||
|
||||
listener_specs(Fun, Args, Listeners) ->
|
||||
[Fun([Address | Args]) ||
|
||||
Listener <- Listeners,
|
||||
Address <- rabbit_networking:tcp_listener_addresses(Listener)].
|
||||
|
||||
tcp_listener_spec([Address, SocketOpts, Configuration, NumAcceptors]) ->
|
||||
rabbit_networking:tcp_listener_spec(
|
||||
rabbit_stream_listener_sup, Address, SocketOpts,
|
||||
ranch_tcp, rabbit_stream_protocol, Configuration,
|
||||
stream, NumAcceptors, "Stream TCP listener").
|
||||
|
||||
|
|
@ -0,0 +1,142 @@
|
|||
%% The contents of this file are subject to the Mozilla Public License
|
||||
%% Version 1.1 (the "License"); you may not use this file except in
|
||||
%% compliance with the License. You may obtain a copy of the License
|
||||
%% at https://www.mozilla.org/MPL/
|
||||
%%
|
||||
%% Software distributed under the License is distributed on an "AS IS"
|
||||
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
|
||||
%% the License for the specific language governing rights and
|
||||
%% limitations under the License.
|
||||
%%
|
||||
%% The Original Code is RabbitMQ.
|
||||
%%
|
||||
%% The Initial Developer of the Original Code is GoPivotal, Inc.
|
||||
%% Copyright (c) 2020 VMware, Inc. or its affiliates. All rights reserved.
|
||||
%%
|
||||
|
||||
-module(rabbit_stream_SUITE).
|
||||
|
||||
-include_lib("common_test/include/ct.hrl").
|
||||
|
||||
-compile(export_all).
|
||||
|
||||
all() ->
|
||||
[
|
||||
{group, single_node},
|
||||
{group, cluster}
|
||||
].
|
||||
|
||||
groups() ->
|
||||
[
|
||||
{single_node, [], [test_stream]},
|
||||
{cluster, [], [test_stream]}
|
||||
].
|
||||
|
||||
init_per_suite(Config) ->
|
||||
rabbit_ct_helpers:log_environment(),
|
||||
Config.
|
||||
|
||||
end_per_suite(Config) ->
|
||||
Config.
|
||||
|
||||
init_per_group(single_node, Config) ->
|
||||
Config1 = rabbit_ct_helpers:set_config(Config, [{rmq_nodes_clustered, false}]),
|
||||
rabbit_ct_helpers:run_setup_steps(Config1,
|
||||
rabbit_ct_broker_helpers:setup_steps());
|
||||
init_per_group(cluster = Group, Config) ->
|
||||
Config1 = rabbit_ct_helpers:set_config(Config, [{rmq_nodes_clustered, true}]),
|
||||
Config2 = rabbit_ct_helpers:set_config(Config1,
|
||||
[{rmq_nodes_count, 3},
|
||||
{rmq_nodename_suffix, Group},
|
||||
{tcp_ports_base}]),
|
||||
rabbit_ct_helpers:run_setup_steps(Config2,
|
||||
rabbit_ct_broker_helpers:setup_steps()).
|
||||
|
||||
end_per_group(_, Config) ->
|
||||
rabbit_ct_helpers:run_steps(Config,
|
||||
rabbit_ct_broker_helpers:teardown_steps()).
|
||||
|
||||
init_per_testcase(_TestCase, Config) ->
|
||||
Config.
|
||||
|
||||
end_per_testcase(_Test, _Config) ->
|
||||
ok.
|
||||
|
||||
test_stream(Config) ->
|
||||
Port = get_stream_port(Config),
|
||||
test_server(Port),
|
||||
ok.
|
||||
|
||||
get_stream_port(Config) ->
|
||||
rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_stream).
|
||||
|
||||
test_server(Port) ->
|
||||
{ok, S} = gen_tcp:connect("localhost", Port, [{active, false},
|
||||
{mode, binary}]),
|
||||
Target = <<"target1">>,
|
||||
test_create_target(S, Target),
|
||||
Body = <<"hello">>,
|
||||
test_publish_confirm(S, Target, Body),
|
||||
SubscriptionId = 42,
|
||||
test_subscribe(S, SubscriptionId, Target),
|
||||
test_deliver(S, SubscriptionId, Body),
|
||||
test_delete_target(S, Target),
|
||||
test_metadata_update_target_deleted(S, Target),
|
||||
ok.
|
||||
|
||||
test_create_target(S, Target) ->
|
||||
TargetSize = byte_size(Target),
|
||||
CreateTargetFrame = <<998:16, 0:16, 1:32, TargetSize:16, Target:TargetSize/binary>>,
|
||||
FrameSize = byte_size(CreateTargetFrame),
|
||||
gen_tcp:send(S, <<FrameSize:32, CreateTargetFrame/binary>>),
|
||||
{ok, <<_Size:32, 998:16, 0:16, 1:32, 0:16>>} = gen_tcp:recv(S, 0, 5000).
|
||||
|
||||
test_delete_target(S, Target) ->
|
||||
TargetSize = byte_size(Target),
|
||||
DeleteTargetFrame = <<999:16, 0:16, 1:32, TargetSize:16, Target:TargetSize/binary>>,
|
||||
FrameSize = byte_size(DeleteTargetFrame),
|
||||
gen_tcp:send(S, <<FrameSize:32, DeleteTargetFrame/binary>>),
|
||||
ResponseFrameSize = 10,
|
||||
{ok, <<ResponseFrameSize:32, 999:16, 0:16, 1:32, 0:16>>} = gen_tcp:recv(S, 4 + 10, 5000).
|
||||
|
||||
test_publish_confirm(S, Target, Body) ->
|
||||
BodySize = byte_size(Body),
|
||||
TargetSize = byte_size(Target),
|
||||
PublishFrame = <<0:16, 0:16, TargetSize:16, Target:TargetSize/binary, 1:32, 1:64, BodySize:32, Body:BodySize/binary>>,
|
||||
FrameSize = byte_size(PublishFrame),
|
||||
gen_tcp:send(S, <<FrameSize:32, PublishFrame/binary>>),
|
||||
{ok, <<_Size:32, 1:16, 0:16, 1:32, 1:64>>} = gen_tcp:recv(S, 0, 5000).
|
||||
|
||||
test_subscribe(S, SubscriptionId, Target) ->
|
||||
TargetSize = byte_size(Target),
|
||||
SubscribeFrame = <<2:16, 0:16, 1:32, SubscriptionId:32, TargetSize:16, Target:TargetSize/binary, 0:64, 10:16>>,
|
||||
FrameSize = byte_size(SubscribeFrame),
|
||||
gen_tcp:send(S, <<FrameSize:32, SubscribeFrame/binary>>),
|
||||
Res = gen_tcp:recv(S, 0, 5000),
|
||||
{ok, <<_Size:32, 2:16, 0:16, 1:32, 0:16>>} = Res.
|
||||
|
||||
test_deliver(S, SubscriptionId, Body) ->
|
||||
BodySize = byte_size(Body),
|
||||
Frame = read_frame(S, <<>>),
|
||||
<<48:32, 3:16, 0:16, SubscriptionId:32, 5:4/unsigned, 0:4/unsigned, 1:16, 1:32, _Epoch:64, 0:64, _Crc:32, _DataLength:32,
|
||||
0:1, BodySize:31/unsigned, Body/binary>> = Frame.
|
||||
|
||||
test_metadata_update_target_deleted(S, Target) ->
|
||||
TargetSize = byte_size(Target),
|
||||
{ok, <<15:32, 7:16, 0:16, 5:16, TargetSize:16, Target/binary>>} = gen_tcp:recv(S, 0, 5000).
|
||||
|
||||
read_frame(S, Buffer) ->
|
||||
inet:setopts(S, [{active, once}]),
|
||||
receive
|
||||
{tcp, S, Received} ->
|
||||
Data = <<Buffer/binary, Received/binary>>,
|
||||
case Data of
|
||||
<<Size:32, _Body:Size/binary>> ->
|
||||
Data;
|
||||
_ ->
|
||||
read_frame(S, Data)
|
||||
end
|
||||
after
|
||||
1000 ->
|
||||
inet:setopts(S, [{active, false}])
|
||||
end.
|
||||
Loading…
Reference in New Issue