merge default into bug21346

This commit is contained in:
Tony Garnock-Jones 2009-09-18 13:05:47 +01:00
commit a95080fdfe
55 changed files with 1553 additions and 766 deletions

View File

@ -1,18 +1,17 @@
syntax: glob
*.beam
*~
*.swp
*.patch
erl_crash.dump
syntax: regexp
^cover/
^dist/
^include/rabbit_framing.hrl$
^src/rabbit_framing.erl$
^rabbit.plt$
^ebin/rabbit.app$
^ebin/rabbit.rel$
^ebin/rabbit.boot$
^ebin/rabbit.script$
^include/rabbit_framing\.hrl$
^src/rabbit_framing\.erl$
^rabbit\.plt$
^ebin/rabbit\.(app|rel|boot|script)$
^plugins/
^priv/plugins/

View File

@ -20,10 +20,10 @@ PYTHON=python
ifndef USE_SPECS
# our type specs rely on features / bug fixes in dialyzer that are
# only available in R12B-3 upwards
# only available in R13B upwards (R13B is eshell 5.7.1)
#
# NB: the test assumes that version number will only contain single digits
USE_SPECS=$(shell if [ $$(erl -noshell -eval 'io:format(erlang:system_info(version)), halt().') \> "5.6.2" ]; then echo "true"; else echo "false"; fi)
USE_SPECS=$(shell if [ $$(erl -noshell -eval 'io:format(erlang:system_info(version)), halt().') \> "5.7.0" ]; then echo "true"; else echo "false"; fi)
endif
#other args: +native +"{hipe,[o3,verbose]}" -Ddebug=true +debug_info +no_strict_record_tests
@ -39,9 +39,6 @@ AMQP_SPEC_JSON_PATH=$(AMQP_CODEGEN_DIR)/amqp-0.8.json
ERL_CALL=erl_call -sname $(RABBITMQ_NODENAME) -e
# for the moment we don't use boot files because they introduce a
# dependency on particular versions of OTP applications
#all: $(EBIN_DIR)/rabbit.boot
all: $(TARGETS)
$(EBIN_DIR)/rabbit.app: $(EBIN_DIR)/rabbit_app.in $(BEAM_TARGETS) generate_app
@ -101,7 +98,8 @@ run-tests: all
start-background-node:
$(BASIC_SCRIPT_ENVIRONMENT_SETTINGS) \
RABBITMQ_NODE_ONLY=true \
./scripts/rabbitmq-server -detached; sleep 1
RABBITMQ_SERVER_START_ARGS="$(RABBITMQ_SERVER_START_ARGS) -detached" \
./scripts/rabbitmq-server ; sleep 1
start-rabbit-on-node: all
echo "rabbit:start()." | $(ERL_CALL)
@ -115,8 +113,11 @@ force-snapshot: all
stop-node:
-$(ERL_CALL) -q
# code coverage will be created for subdirectory "ebin" of COVER_DIR
COVER_DIR=.
start-cover: all
echo "cover:start(), rabbit_misc:enable_cover()." | $(ERL_CALL)
echo "cover:start(), rabbit_misc:enable_cover([\"$(COVER_DIR)\"])." | $(ERL_CALL)
stop-cover: all
echo "rabbit_misc:report_cover(), cover:stop()." | $(ERL_CALL)
@ -136,7 +137,7 @@ srcdist: distclean
sed -i.save 's/%%VSN%%/$(VERSION)/' $(TARGET_SRC_DIR)/ebin/rabbit_app.in && rm -f $(TARGET_SRC_DIR)/ebin/rabbit_app.in.save
cp -r $(AMQP_CODEGEN_DIR)/* $(TARGET_SRC_DIR)/codegen/
cp codegen.py Makefile generate_app $(TARGET_SRC_DIR)
cp codegen.py Makefile generate_app calculate-relative $(TARGET_SRC_DIR)
cp -r scripts $(TARGET_SRC_DIR)
cp -r docs $(TARGET_SRC_DIR)
@ -147,7 +148,7 @@ srcdist: distclean
rm -rf $(TARGET_SRC_DIR)
distclean: clean
make -C $(AMQP_CODEGEN_DIR) distclean
$(MAKE) -C $(AMQP_CODEGEN_DIR) distclean
rm -rf dist
find . -regex '.*\(~\|#\|\.swp\|\.dump\)' -exec rm {} \;
@ -162,7 +163,8 @@ distclean: clean
docs_all: $(MANPAGES)
install: all docs_all
install: SCRIPTS_REL_PATH=$(shell ./calculate-relative $(TARGET_DIR)/sbin $(SBIN_DIR))
install: all docs_all install_dirs
@[ -n "$(TARGET_DIR)" ] || (echo "Please set TARGET_DIR."; false)
@[ -n "$(SBIN_DIR)" ] || (echo "Please set SBIN_DIR."; false)
@[ -n "$(MAN_DIR)" ] || (echo "Please set MAN_DIR."; false)
@ -171,13 +173,17 @@ install: all docs_all
cp -r ebin include LICENSE LICENSE-MPL-RabbitMQ INSTALL $(TARGET_DIR)
chmod 0755 scripts/*
mkdir -p $(SBIN_DIR)
cp scripts/rabbitmq-server $(SBIN_DIR)
cp scripts/rabbitmqctl $(SBIN_DIR)
cp scripts/rabbitmq-multi $(SBIN_DIR)
for script in rabbitmq-env rabbitmq-server rabbitmqctl rabbitmq-multi rabbitmq-activate-plugins; do \
cp scripts/$$script $(TARGET_DIR)/sbin; \
[ -e $(SBIN_DIR)/$$script ] || ln -s $(SCRIPTS_REL_PATH)/$$script $(SBIN_DIR)/$$script; \
done
for section in 1 5; do \
mkdir -p $(MAN_DIR)/man$$section; \
for manpage in docs/*.$$section.pod; do \
cp docs/`basename $$manpage .pod`.gz $(MAN_DIR)/man$$section; \
done; \
done
install_dirs:
mkdir -p $(SBIN_DIR)
mkdir -p $(TARGET_DIR)/sbin

45
calculate-relative Executable file
View File

@ -0,0 +1,45 @@
#!/usr/bin/env python
#
# relpath.py
# R.Barran 30/08/2004
# Retrieved from http://code.activestate.com/recipes/302594/
import os
import sys
def relpath(target, base=os.curdir):
"""
Return a relative path to the target from either the current dir or an optional base dir.
Base can be a directory specified either as absolute or relative to current dir.
"""
if not os.path.exists(target):
raise OSError, 'Target does not exist: '+target
if not os.path.isdir(base):
raise OSError, 'Base is not a directory or does not exist: '+base
base_list = (os.path.abspath(base)).split(os.sep)
target_list = (os.path.abspath(target)).split(os.sep)
# On the windows platform the target may be on a completely different drive from the base.
if os.name in ['nt','dos','os2'] and base_list[0] <> target_list[0]:
raise OSError, 'Target is on a different drive to base. Target: '+target_list[0].upper()+', base: '+base_list[0].upper()
# Starting from the filepath root, work out how much of the filepath is
# shared by base and target.
for i in range(min(len(base_list), len(target_list))):
if base_list[i] <> target_list[i]: break
else:
# If we broke out of the loop, i is pointing to the first differing path elements.
# If we didn't break out of the loop, i is pointing to identical path elements.
# Increment i so that in all cases it points to the first differing path elements.
i+=1
rel_list = [os.pardir] * (len(base_list)-i) + target_list[i:]
if (len(rel_list) == 0):
return "."
return os.path.join(*rel_list)
if __name__ == "__main__":
print(relpath(sys.argv[1], sys.argv[2]))

View File

@ -117,6 +117,13 @@ def genErl(spec):
def genMethodHasContent(m):
print "method_has_content(%s) -> %s;" % (m.erlangName(), str(m.hasContent).lower())
def genMethodIsSynchronous(m):
hasNoWait = "nowait" in fieldNameList(m.arguments)
if m.isSynchronous and hasNoWait:
print "is_method_synchronous(#%s{nowait = NoWait}) -> not(NoWait);" % (m.erlangName())
else:
print "is_method_synchronous(#%s{}) -> %s;" % (m.erlangName(), str(m.isSynchronous).lower())
def genMethodFieldTypes(m):
"""Not currently used - may be useful in future?"""
@ -246,6 +253,7 @@ def genErl(spec):
-export([method_id/1]).
-export([method_has_content/1]).
-export([is_method_synchronous/1]).
-export([method_fieldnames/1]).
-export([decode_method_fields/2]).
-export([decode_properties/2]).
@ -266,6 +274,9 @@ bitvalue(undefined) -> 0.
for m in methods: genMethodHasContent(m)
print "method_has_content(Name) -> exit({unknown_method_name, Name})."
for m in methods: genMethodIsSynchronous(m)
print "is_method_synchronous(Name) -> exit({unknown_method_name, Name})."
for m in methods: genMethodFieldNames(m)
print "method_fieldnames(Name) -> exit({unknown_method_name, Name})."

View File

@ -0,0 +1,37 @@
=head1 NAME
rabbitmq-activate-plugins - command line tool for activating plugins
in a RabbitMQ broker
=head1 SYNOPSIS
rabbitmq-activate-plugins
=head1 DESCRIPTION
RabbitMQ is an implementation of AMQP, the emerging standard for high
performance enterprise messaging. The RabbitMQ server is a robust and
scalable implementation of an AMQP broker.
rabbitmq-activate-plugins is a command line tool for activating
plugins installed into the broker's plugins directory.
=head1 EXAMPLES
To activate all of the installed plugins in the current RabbitMQ install,
execute:
rabbitmq-activate-plugins
=head1 SEE ALSO
L<rabbitmq.conf(5)>, L<rabbitmq-multi(1)>, L<rabbitmq-server(1)>,
L<rabbitmqctl(1)>
=head1 AUTHOR
The RabbitMQ Team <info@rabbitmq.com>
=head1 REFERENCES
RabbitMQ Web Site: L<http://www.rabbitmq.com>

View File

@ -15,22 +15,30 @@ scalable implementation of an AMQP broker.
rabbitmq-multi scripts allows for easy set-up of a cluster on a single
machine.
See also rabbitmq-server(1) for configuration information.
See also L<rabbitmq-server(1)> for configuration information.
=head1 COMMANDS
start_all I<count>
start count nodes with unique names, listening on all IP addresses
and on sequential ports starting from 5672.
=over
status
print the status of all running RabbitMQ nodes
=item start_all I<count>
stop_all
stop all local RabbitMQ nodes
Start count nodes with unique names, listening on all IP addresses and
on sequential ports starting from 5672.
rotate_logs
rotate log files for all local and running RabbitMQ nodes
=item status
Print the status of all running RabbitMQ nodes.
=item stop_all
Stop all local RabbitMQ nodes,
=item rotate_logs
Rotate log files for all local and running RabbitMQ nodes.
=back
=head1 EXAMPLES
@ -40,7 +48,7 @@ Start 3 local RabbitMQ nodes with unique, sequential port numbers:
=head1 SEE ALSO
rabbitmq.conf(5), rabbitmq-server(1), rabbitmqctl(1)
L<rabbitmq.conf(5)>, L<rabbitmq-server(1)>, L<rabbitmqctl(1)>
=head1 AUTHOR
@ -48,4 +56,4 @@ The RabbitMQ Team <info@rabbitmq.com>
=head1 REFERENCES
RabbitMQ Web Site: http://www.rabbitmq.com
RabbitMQ Web Site: L<http://www.rabbitmq.com>

View File

@ -16,43 +16,57 @@ Running rabbitmq-server in the foreground displays a banner message,
and reports on progress in the startup sequence, concluding with the
message "broker running", indicating that the RabbitMQ broker has been
started successfully. To shut down the server, just terminate the
process or use rabbitmqctl(1).
process or use L<rabbitmqctl(1)>.
=head1 ENVIRONMENT
B<RABBITMQ_MNESIA_BASE>
Defaults to /var/lib/rabbitmq/mnesia. Set this to the directory
where Mnesia database files should be placed.
=over
B<RABBITMQ_LOG_BASE>
Defaults to /var/log/rabbitmq. Log files generated by the server
will be placed in this directory.
=item B<RABBITMQ_MNESIA_BASE>
B<RABBITMQ_NODENAME>
Defaults to rabbit. This can be useful if you want to run more
than one node per machine - B<RABBITMQ_NODENAME> should be unique
per erlang-node-and-machine combination. See clustering on a
single machine guide at
http://www.rabbitmq.com/clustering.html#single-machine for
details.
Defaults to F</var/lib/rabbitmq/mnesia>. Set this to the directory where
Mnesia database files should be placed.
B<RABBITMQ_NODE_IP_ADDRESS>
Defaults to 0.0.0.0. This can be changed if you only want to bind
to one network interface.
=item B<RABBITMQ_LOG_BASE>
B<RABBITMQ_NODE_PORT>
Defaults to 5672.
Defaults to F</var/log/rabbitmq>. Log files generated by the server will
be placed in this directory.
B<RABBITMQ_CLUSTER_CONFIG_FILE>
Defaults to /etc/rabbitmq/rabbitmq_cluster.config. If this file is
present it is used by the server to auto-configure a RabbitMQ
cluster.
See the clustering guide at http://www.rabbitmq.com/clustering.html
for details.
=item B<RABBITMQ_NODENAME>
Defaults to rabbit. This can be useful if you want to run more than
one node per machine - B<RABBITMQ_NODENAME> should be unique per
erlang-node-and-machine combination. See clustering on a single
machine guide at
L<http://www.rabbitmq.com/clustering.html#single-machine> for details.
=item B<RABBITMQ_NODE_IP_ADDRESS>
Defaults to 0.0.0.0. This can be changed if you only want to bind to
one network interface.
=item B<RABBITMQ_NODE_PORT>
Defaults to 5672.
=item B<RABBITMQ_CLUSTER_CONFIG_FILE>
Defaults to F</etc/rabbitmq/rabbitmq_cluster.config>. If this file is
present it is used by the server to auto-configure a RabbitMQ cluster.
See the clustering guide at L<http://www.rabbitmq.com/clustering.html>
for details.
=back
=head1 OPTIONS
B<-detached> start the server process in the background
=over
=item B<-detached>
start the server process in the background
=back
=head1 EXAMPLES
@ -62,7 +76,7 @@ Run RabbitMQ AMQP server in the background:
=head1 SEE ALSO
rabbitmq.conf(5), rabbitmq-multi(1), rabbitmqctl(1)
L<rabbitmq.conf(5)>, L<rabbitmq-multi(1)>, L<rabbitmqctl(1)>
=head1 AUTHOR
@ -70,4 +84,5 @@ The RabbitMQ Team <info@rabbitmq.com>
=head1 REFERENCES
RabbitMQ Web Site: http://www.rabbitmq.com
RabbitMQ Web Site: L<http://www.rabbitmq.com>

View File

@ -1,10 +1,11 @@
=head1 NAME
/etc/rabbitmq/rabbitmq.conf - default settings for RabbitMQ AMQP server
F</etc/rabbitmq/rabbitmq.conf> - default settings for RabbitMQ AMQP
server
=head1 DESCRIPTION
/etc/rabbitmq/rabbitmq.conf contains variable settings that override the
F</etc/rabbitmq/rabbitmq.conf> contains variable settings that override the
defaults built in to the RabbitMQ startup scripts.
The file is interpreted by the system shell, and so should consist of
@ -13,27 +14,35 @@ syntax is permitted (since the file is sourced using the shell "."
operator), including line comments starting with "#".
In order of preference, the startup scripts get their values from the
environment, from /etc/rabbitmq/rabbitmq.conf and finally from the
built-in default values. For example, for the B<RABBITMQ_NODENAME> setting,
environment, from F</etc/rabbitmq/rabbitmq.conf> and finally from the
built-in default values. For example, for the B<RABBITMQ_NODENAME>
setting,
B<RABBITMQ_NODENAME>
from the environment is checked first. If it is absent or equal to
the empty string, then
=over
B<NODENAME>
from /etc/rabbitmq/rabbitmq.conf is checked. If it is also absent
or set equal to the empty string then the default value from
the startup script is used.
=item B<RABBITMQ_NODENAME>
from the environment is checked first. If it is absent or equal to the
empty string, then
=item B<NODENAME>
from L</etc/rabbitmq/rabbitmq.conf> is checked. If it is also absent
or set equal to the empty string then the default value from the
startup script is used.
The variable names in /etc/rabbitmq/rabbitmq.conf are always equal to the
environment variable names, with the B<RABBITMQ_> prefix removed:
B<RABBITMQ_NODE_PORT> from the environment becomes B<NODE_PORT> in the
/etc/rabbitmq/rabbitmq.conf file, etc.
F</etc/rabbitmq/rabbitmq.conf> file, etc.
=back
=head1 EXAMPLES
The following is an example of a complete /etc/rabbitmq/rabbitmq.conf file
that overrides the default Erlang node name from "rabbit" to "hare":
The following is an example of a complete
F</etc/rabbitmq/rabbitmq.conf> file that overrides the default Erlang
node name from "rabbit" to "hare":
# I am a complete /etc/rabbitmq/rabbitmq.conf file.
# Comment lines start with a hash character.
@ -42,7 +51,7 @@ that overrides the default Erlang node name from "rabbit" to "hare":
=head1 SEE ALSO
rabbitmq-server(1), rabbitmq-multi(1), rabbitmqctl(1)
L<rabbitmq-server(1)>, L<rabbitmq-multi(1)>, L<rabbitmqctl(1)>
=head1 AUTHOR
@ -57,4 +66,4 @@ info@rabbitmq.com.
=head1 REFERENCES
RabbitMQ Web Site: http://www.rabbitmq.com
RabbitMQ Web Site: L<http://www.rabbitmq.com>

View File

@ -18,269 +18,388 @@ It performs all actions by connecting to one of the broker's nodes.
=head1 OPTIONS
B<-n> I<node>
default node is C<rabbit@server>, where server is the local host.
On a host named C<server.example.com>, the node name of the
RabbitMQ Erlang node will usually be rabbit@server (unless
RABBITMQ_NODENAME has been set to some non-default value at broker
startup time). The output of hostname -s is usually the correct
suffix to use after the "@" sign. See rabbitmq-server(1) for
details of configuring the RabbitMQ broker.
=over
B<-q>
quiet output mode is selected with the B<-q> flag. Informational
messages are suppressed when quiet mode is in effect.
=item B<-n> I<node>
Default node is C<rabbit@server>, where server is the local host. On
a host named C<server.example.com>, the node name of the RabbitMQ
Erlang node will usually be rabbit@server (unless RABBITMQ_NODENAME
has been set to some non-default value at broker startup time). The
output of hostname -s is usually the correct suffix to use after the
"@" sign. See rabbitmq-server(1) for details of configuring the
RabbitMQ broker.
=item B<-q>
Quiet output mode is selected with the B<-q> flag. Informational
messages are suppressed when quiet mode is in effect.
=back
=head1 COMMANDS
=head2 APPLICATION AND CLUSTER MANAGEMENT
stop
stop the Erlang node on which RabbitMQ broker is running.
=over
stop_app
stop the RabbitMQ application, leaving the Erlang node running.
This command is typically run prior to performing other management
actions that require the RabbitMQ application to be stopped,
e.g. I<reset>.
=item stop
start_app
start the RabbitMQ application.
This command is typically run prior to performing other management
actions that require the RabbitMQ application to be stopped,
e.g. I<reset>.
Stop the Erlang node on which RabbitMQ broker is running.
status
display various information about the RabbitMQ broker, such as
whether the RabbitMQ application on the current node, its version
number, what nodes are part of the broker, which of these are
running.
=item stop_app
force
return a RabbitMQ node to its virgin state.
Removes the node from any cluster it belongs to, removes all data
from the management database, such as configured users, vhosts and
deletes all persistent messages.
Stop the RabbitMQ application, leaving the Erlang node running. This
command is typically run prior to performing other management actions
that require the RabbitMQ application to be stopped, e.g. I<reset>.
force_reset
the same as I<force> command, but resets the node unconditionally,
regardless of the current management database state and cluster
configuration.
It should only be used as a last resort if the database or cluster
configuration has been corrupted.
=item start_app
rotate_logs [suffix]
instruct the RabbitMQ node to rotate the log files. The RabbitMQ
broker will attempt to append the current contents of the log file
to the file with the name composed of the original name and the
suffix. It will create a new file if such a file does not already
exist. When no I<suffix> is specified, the empty log file is
simply created at the original location; no rotation takes place.
When an error occurs while appending the contents of the old log
file, the operation behaves in the same way as if no I<suffix> was
specified.
This command might be helpful when you are e.g. writing your own
logrotate script and you do not want to restart the RabbitMQ node.
Start the RabbitMQ application. This command is typically run prior
to performing other management actions that require the RabbitMQ
application to be stopped, e.g. I<reset>.
cluster I<clusternode> ...
instruct the node to become member of a cluster with the specified
nodes determined by I<clusternode> option(s).
See http://www.rabbitmq.com/clustering.html for more information
about clustering.
=item status
Display various information about the RabbitMQ broker, such as whether
the RabbitMQ application on the current node, its version number, what
nodes are part of the broker, which of these are running.
=item reset
Return a RabbitMQ node to its virgin state. Removes the node from any
cluster it belongs to, removes all data from the management database,
such as configured users, vhosts and deletes all persistent messages.
=item force_reset
The same as I<reset> command, but resets the node unconditionally,
regardless of the current management database state and cluster
configuration. It should only be used as a last resort if the
database or cluster configuration has been corrupted.
=item rotate_logs [suffix]
Instruct the RabbitMQ node to rotate the log files. The RabbitMQ
broker will attempt to append the current contents of the log file to
the file with the name composed of the original name and the
suffix. It will create a new file if such a file does not already
exist. When no I<suffix> is specified, the empty log file is simply
created at the original location; no rotation takes place. When an
error occurs while appending the contents of the old log file, the
operation behaves in the same way as if no I<suffix> was specified.
This command might be helpful when you are e.g. writing your own
logrotate script and you do not want to restart the RabbitMQ node.
=item cluster I<clusternode> ...
Instruct the node to become member of a cluster with the specified
nodes determined by I<clusternode> option(s). See
L<http://www.rabbitmq.com/clustering.html> for more information about
clustering.
=back
=head2 USER MANAGEMENT
add_user I<username> I<password>
create a user named I<username> with (initial) password I<password>.
=over
delete_user I<username>
delete the user named I<username>.
=item add_user I<username> I<password>
change_password I<username> I<newpassword>
change the password for the user named I<username> to I<newpassword>.
Create a user named I<username> with (initial) password I<password>.
list_users
list all users.
=item delete_user I<username>
Delete the user named I<username>.
=item change_password I<username> I<newpassword>
Change the password for the user named I<username> to I<newpassword>.
=item list_users
List all users, one per line.
=back
=head2 ACCESS CONTROL
add_vhost I<vhostpath>
create a new virtual host called I<vhostpath>.
=over
delete_vhost I<vhostpath>
delete a virtual host I<vhostpath>.
That command deletes also all its exchanges, queues and user
mappings.
list_vhosts
list all virtual hosts.
=item add_vhost I<vhostpath>
set_permissions [-p I<vhostpath>] I<username> I<regexp> I<regexp> I<regexp>
set the permissions for the user named I<username> in the virtual
host I<vhostpath>, granting 'configure', 'write' and 'read' access
to resources with names matching the first, second and third
I<regexp>, respectively.
Create a new virtual host called I<vhostpath>.
clear_permissions [-p I<vhostpath>] I<username>
remove the permissions for the user named I<username> in the
virtual host I<vhostpath>.
=item delete_vhost I<vhostpath>
list_permissions [-p I<vhostpath>]
list all the users and their permissions in the virtual host
I<vhostpath>.
Delete a virtual host I<vhostpath>. This command deletes also all its
exchanges, queues and user mappings.
list_user_permissions I<username>
list the permissions of the user named I<username> across all
virtual hosts.
=item list_vhosts
List all virtual hosts, one per line.
=item set_permissions [-p I<vhostpath>] I<username> I<regexp> I<regexp> I<regexp>
Set the permissions for the user named I<username> in the virtual host
I<vhostpath>, granting I<configure>, I<write> and I<read> access to
resources with names matching the first, second and third I<regexp>,
respectively.
=item clear_permissions [-p I<vhostpath>] I<username>
Remove the permissions for the user named I<username> in the virtual
host I<vhostpath>.
=item list_permissions [-p I<vhostpath>]
List all the users and their permissions in the virtual host
I<vhostpath>. Each output line contains the username and their
I<configure>, I<write> and I<read> access regexps, separated by tab
characters.
=item list_user_permissions I<username>
List the permissions of the user named I<username> across all virtual
hosts.
=back
=head2 SERVER STATUS
list_queues [-p I<vhostpath>] [I<queueinfoitem> ...]
list queue information by virtual host. If no I<queueinfoitem>s
are specified then then name and number of messages is displayed
for each queue.
=over
=item list_queues [-p I<vhostpath>] [I<queueinfoitem> ...]
List queue information by virtual host. Each line printed
describes a queue, with the requested I<queueinfoitem> values
separated by tab characters. If no I<queueinfoitem>s are
specified then I<name> and I<messages> are assumed.
=back
=head3 Queue information items
=over 4
=over
name
URL-encoded name of the queue
=item name
durable
whether the queue survives server restarts
name of the queue
auto_delete
whether the queue will be deleted when no longer used
=item durable
arguments
queue arguments
whether the queue survives server restarts
node
node on which the process associated with the queue resides
=item auto_delete
messages_ready
number of messages ready to be delivered to clients
whether the queue will be deleted when no longer used
messages_unacknowledged
number of messages delivered to clients but not yet
acknowledged
=item arguments
messages_uncommitted
number of messages published in as yet uncommitted transactions
queue arguments
messages
sum of ready, unacknowledged and uncommitted messages
=item node
acks_uncommitted
number of acknowledgements received in as yet uncommitted
transactions
node on which the process associated with the queue resides
consumers
number of consumers
=item messages_ready
transactions
number of transactions
number of messages ready to be delivered to clients
memory
bytes of memory consumed by the Erlang process for the queue,
including stack, heap and internal structures
=item messages_unacknowledged
number of messages delivered to clients but not yet acknowledged
=item messages_uncommitted
number of messages published in as yet uncommitted transactions
=item messages
sum of ready, unacknowledged and uncommitted messages
=item acks_uncommitted
number of acknowledgements received in as yet uncommitted transactions
=item consumers
number of consumers
=item transactions
number of transactions
=item memory
bytes of memory consumed by the Erlang process for the queue,
including stack, heap and internal structures
=back
list_exchanges [-p I<vhostpath>] [I<exchangeinfoitem> ...]
list exchange information by virtual host. If no
I<exchangeinfoitem>s are specified then name and type is displayed
for each exchange.
=over
=item list_exchanges [-p I<vhostpath>] [I<exchangeinfoitem> ...]
List queue information by virtual host. Each line printed describes an
exchange, with the requested I<exchangeinfoitem> values separated by
tab characters. If no I<exchangeinfoitem>s are specified then I<name>
and I<type> are assumed.
=back
=head3 Exchange information items
=over 4
=over
name
URL-encoded name of the exchange
=item name
type
exchange type (B<direct>, B<topic>, B<fanout>, or B<headers>)
name of the exchange
durable
whether the exchange survives server restarts
=item type
auto_delete
whether the exchange is deleted when no longer used
exchange type (B<direct>, B<topic>, B<fanout>, or B<headers>)
arguments
exchange arguments
=item durable
whether the exchange survives server restarts
=item auto_delete
whether the exchange is deleted when no longer used
=item arguments
exchange arguments
=back
list_bindings [-p I<vhostpath>]
list bindings by virtual host. Each line contains exchange name,
routing key and queue name (all URL encoded) and arguments.
=over
list_connections [I<connectioninfoitem> ...]
list connection information. If no I<connectioninfoitem>s are
specified then the user, peer address and peer port are displayed.
=item list_bindings [-p I<vhostpath>]
List bindings by virtual host. Each line printed describes a binding,
with the exchange name, routing key, queue name and arguments,
separated by tab characters.
=item list_connections [I<connectioninfoitem> ...]
List queue information by virtual host. Each line printed describes an
connection, with the requested I<connectioninfoitem> values separated
by tab characters. If no I<connectioninfoitem>s are specified then
I<user>, I<peer_address>, I<peer_port> and I<state> are assumed.
=back
=head3 Connection information items
=over 4
=over
node
node on which the process associated with the connection resides
=item node
address
server IP number
node on which the process associated with the connection resides
port
server port
=item address
peer_address
peer address
server IP number
peer_port
peer port
=item port
state
connection state (B<pre-init>, B<starting>, B<tuning>, B<opening>,
B<running>, B<closing>, B<closed>)
server port
channels
number of channels using the connection
=item peer_address
user
username associated with the connection
peer address
vhost
URL-encoded virtual host
=item peer_port
timeout
connection timeout
peer port
frame_max
maximum frame size (bytes)
=item state
recv_oct
octets received
connection state (B<pre-init>, B<starting>, B<tuning>, B<opening>,
B<running>, B<closing>, B<closed>)
recv_cnt
packets received
=item channels
send_oct
octets sent
number of channels using the connection
send_cnt
packets sent
=item user
send_pend
send queue size
username associated with the connection
=item vhost
virtual host
=item timeout
connection timeout
=item frame_max
maximum frame size (bytes)
=item recv_oct
octets received
=item recv_cnt
packets received
=item send_oct
octets sent
=item send_cnt
packets sent
=item send_pend
send queue size
=back
The list_queues, list_exchanges and list_bindings commands accept an
optional virtual host parameter for which to display results, defaulting
to I<"/">. The default can be overridden with the B<-p> flag. Result
columns for these commands and list_connections are tab-separated.
optional virtual host parameter for which to display results,
defaulting to I<"/">. The default can be overridden with the B<-p>
flag.
=head1 OUTPUT ESCAPING
Various items that may appear in the output of rabbitmqctl can contain
arbitrary octets. If a octet corresponds to a non-printing ASCII
character (values 0 to 31, and 127), it will be escaped in the output,
using a sequence consisting of a backslash character followed by three
octal digits giving the octet's value (i.e., as used in string
literals in the C programming language). An octet corresponding to
the backslash character (i.e. with value 92) will be escaped using a
sequence of two backslash characters. Octets with a value of 128 or
above are not escaped, in order to preserve strings encoded with
UTF-8.
The items to which this escaping scheme applies are:
=over
=item *
Usernames
=item *
Virtual host names
=item *
Queue names
=item *
Exchange names
=item *
Regular expressions used for access control
=back
=head1 EXAMPLES
@ -309,4 +428,4 @@ The RabbitMQ Team <info@rabbitmq.com>
=head1 REFERENCES
RabbitMQ Web Site: http://www.rabbitmq.com
RabbitMQ Web Site: L<http://www.rabbitmq.com>

View File

@ -15,6 +15,8 @@
%% actually want to start it
{mod, {rabbit, []}},
{env, [{tcp_listeners, [{"0.0.0.0", 5672}]},
{ssl_listeners, []},
{ssl_options, []},
{extra_startup_steps, []},
{default_user, <<"guest">>},
{default_pass, <<"guest">>},

View File

@ -64,6 +64,7 @@
-record(basic_message, {exchange_name, routing_key, content, persistent_key}).
-record(ssl_socket, {tcp, ssl}).
-record(delivery, {mandatory, immediate, txn, sender, message}).
%%----------------------------------------------------------------------------
@ -74,7 +75,8 @@
-type(maybe(T) :: T | 'none').
-type(erlang_node() :: atom()).
-type(socket() :: port()).
-type(ssl_socket() :: #ssl_socket{}).
-type(socket() :: port() | ssl_socket()).
-type(thunk(T) :: fun(() -> T)).
-type(info_key() :: atom()).
-type(info() :: {info_key(), any()}).

View File

@ -1,7 +1,8 @@
VERSION=0.0.0
SOURCE_TARBALL_DIR=../../../dist
TARBALL_DIR=../../../dist
TARBALL=$(notdir $(wildcard $(TARBALL_DIR)/rabbitmq-server-[0-9.]*.tar.gz))
COMMON_DIR=../../common
TARBALL=$(SOURCE_TARBALL_DIR)/rabbitmq-server-$(VERSION).tar.gz
VERSION=$(shell echo $(TARBALL) | sed -e 's:rabbitmq-server-\(.*\)\.tar\.gz:\1:g')
TOP_DIR=$(shell pwd)
#Under debian we do not want to check build dependencies, since that
#only checks build-dependencies using rpms, not debs
@ -23,13 +24,16 @@ rpms: clean server
prepare:
mkdir -p BUILD SOURCES SPECS SRPMS RPMS tmp
cp $(TOP_DIR)/$(TARBALL) SOURCES
cp $(TARBALL_DIR)/$(TARBALL) SOURCES
cp rabbitmq-server.spec SPECS
sed -i 's|%%VERSION%%|$(VERSION)|;s|%%REQUIRES%%|$(REQUIRES)|' \
SPECS/rabbitmq-server.spec
cp init.d SOURCES/rabbitmq-server.init
cp ${COMMON_DIR}/* SOURCES/
sed -i \
-e 's|^DEFAULTS_FILE=.*$$|DEFAULTS_FILE=/etc/sysconfig/rabbitmq|' \
-e 's|^LOCK_FILE=.*$$|LOCK_FILE=/var/lock/subsys/$$NAME|' \
SOURCES/rabbitmq-server.init
cp rabbitmq-server.logrotate SOURCES/rabbitmq-server.logrotate
server: prepare

View File

@ -9,6 +9,7 @@ Source: http://www.rabbitmq.com/releases/rabbitmq-server/v%{version}/%{name}-%{v
Source1: rabbitmq-server.init
Source2: rabbitmq-script-wrapper
Source3: rabbitmq-server.logrotate
Source4: rabbitmq-asroot-script-wrapper
URL: http://www.rabbitmq.com/
BuildRequires: erlang, python-simplejson
Requires: erlang, logrotate
@ -22,9 +23,10 @@ RabbitMQ is an implementation of AMQP, the emerging standard for high
performance enterprise messaging. The RabbitMQ server is a robust and
scalable implementation of an AMQP broker.
%define _rabbit_erllibdir %{_libdir}/erlang/lib/rabbitmq_server-%{version}
%define _rabbit_erllibdir %{_libdir}/rabbitmq/lib/rabbitmq_server-%{version}
%define _rabbit_libdir %{_libdir}/rabbitmq
%define _rabbit_wrapper %{_builddir}/`basename %{S:2}`
%define _rabbit_asroot_wrapper %{_builddir}/`basename %{S:4}`
%define _maindir %{buildroot}%{_rabbit_erllibdir}
@ -34,6 +36,8 @@ scalable implementation of an AMQP broker.
%build
cp %{S:2} %{_rabbit_wrapper}
sed -i 's|/usr/lib/|%{_libdir}/|' %{_rabbit_wrapper}
cp %{S:4} %{_rabbit_asroot_wrapper}
sed -i 's|/usr/lib/|%{_libdir}/|' %{_rabbit_asroot_wrapper}
make %{?_smp_mflags}
%install
@ -51,6 +55,7 @@ install -p -D -m 0755 %{S:1} %{buildroot}%{_initrddir}/rabbitmq-server
install -p -D -m 0755 %{_rabbit_wrapper} %{buildroot}%{_sbindir}/rabbitmqctl
install -p -D -m 0755 %{_rabbit_wrapper} %{buildroot}%{_sbindir}/rabbitmq-server
install -p -D -m 0755 %{_rabbit_wrapper} %{buildroot}%{_sbindir}/rabbitmq-multi
install -p -D -m 0755 %{_rabbit_asroot_wrapper} %{buildroot}%{_sbindir}/rabbitmq-activate-plugins
install -p -D -m 0644 %{S:3} %{buildroot}%{_sysconfdir}/logrotate.d/rabbitmq-server

View File

@ -0,0 +1,53 @@
#!/bin/bash
## The contents of this file are subject to the Mozilla Public License
## Version 1.1 (the "License"); you may not use this file except in
## compliance with the License. You may obtain a copy of the License at
## http://www.mozilla.org/MPL/
##
## Software distributed under the License is distributed on an "AS IS"
## basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
## License for the specific language governing rights and limitations
## under the License.
##
## The Original Code is RabbitMQ.
##
## The Initial Developers of the Original Code are LShift Ltd,
## Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
##
## Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
## Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
## are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
## Technologies LLC, and Rabbit Technologies Ltd.
##
## Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
## Ltd. Portions created by Cohesive Financial Technologies LLC are
## Copyright (C) 2007-2009 Cohesive Financial Technologies
## LLC. Portions created by Rabbit Technologies Ltd are Copyright
## (C) 2007-2009 Rabbit Technologies Ltd.
##
## All Rights Reserved.
##
## Contributor(s): ______________________________________.
##
# Escape spaces and quotes, because shell is revolting.
for arg in "$@" ; do
# Escape quotes in parameters, so that they're passed through cleanly.
arg=$(sed -e 's/"/\\"/' <<-END
$arg
END
)
CMDLINE="${CMDLINE} \"${arg}\""
done
cd /var/lib/rabbitmq
SCRIPT=`basename $0`
if [ `id -u` = 0 ] ; then
/usr/lib/rabbitmq/bin/${SCRIPT} ${CMDLINE}
else
echo -e "\nOnly root should run ${SCRIPT}\n"
exit 1
fi

View File

@ -1,4 +1,35 @@
#!/bin/sh
## The contents of this file are subject to the Mozilla Public License
## Version 1.1 (the "License"); you may not use this file except in
## compliance with the License. You may obtain a copy of the License at
## http://www.mozilla.org/MPL/
##
## Software distributed under the License is distributed on an "AS IS"
## basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
## License for the specific language governing rights and limitations
## under the License.
##
## The Original Code is RabbitMQ.
##
## The Initial Developers of the Original Code are LShift Ltd,
## Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
##
## Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
## Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
## are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
## Technologies LLC, and Rabbit Technologies Ltd.
##
## Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
## Ltd. Portions created by Cohesive Financial Technologies LLC are
## Copyright (C) 2007-2009 Cohesive Financial Technologies
## LLC. Portions created by Rabbit Technologies Ltd are Copyright
## (C) 2007-2009 Rabbit Technologies Ltd.
##
## All Rights Reserved.
##
## Contributor(s): ______________________________________.
##
# Escape spaces and quotes, because shell is revolting.
for arg in "$@" ; do
# Escape quotes in parameters, so that they're passed through cleanly.

View File

@ -8,10 +8,10 @@
### BEGIN INIT INFO
# Provides: rabbitmq-server
# Default-Start:
# Default-Stop:
# Required-Start: $remote_fs $network
# Required-Stop: $remote_fs $network
# Default-Start:
# Default-Stop:
# Description: RabbitMQ broker
# Short-Description: Enable AMQP service provided by RabbitMQ broker
### END INIT INFO
@ -24,13 +24,14 @@ USER=rabbitmq
NODE_COUNT=1
ROTATE_SUFFIX=
LOCK_FILE=/var/lock/subsys/$NAME
DEFAULTS_FILE= # This is filled in when building packages
LOCK_FILE= # This is filled in when building packages
test -x $DAEMON || exit 0
# Include rabbitmq defaults if available
if [ -f /etc/sysconfig/rabbitmq ] ; then
. /etc/sysconfig/rabbitmq
if [ -f "$DEFAULTS_FILE" ] ; then
. $DEFAULTS_FILE
fi
RETVAL=0
@ -41,7 +42,8 @@ start_rabbitmq () {
$DAEMON start_all ${NODE_COUNT} > /var/log/rabbitmq/startup_log 2> /var/log/rabbitmq/startup_err
case "$?" in
0)
echo SUCCESS && touch $LOCK_FILE
echo SUCCESS
[ -n "$LOCK_FILE" ] && touch $LOCK_FILE
RETVAL=0
;;
1)
@ -52,7 +54,7 @@ start_rabbitmq () {
echo FAILED - check /var/log/rabbitmq/startup_log, _err
RETVAL=1
;;
esac
esac
set -e
}
@ -62,10 +64,12 @@ stop_rabbitmq () {
if [ $RETVAL = 0 ] ; then
$DAEMON stop_all > /var/log/rabbitmq/shutdown_log 2> /var/log/rabbitmq/shutdown_err
RETVAL=$?
if [ $RETVAL != 0 ] ; then
echo FAILED - check /var/log/rabbitmq/shutdown_log, _err
if [ $RETVAL = 0 ] ; then
# Try to stop epmd if run by the rabbitmq user
pkill -u rabbitmq epmd || :
[ -n "$LOCK_FILE" ] && rm -rf $LOCK_FILE
else
rm -rf $LOCK_FILE
echo FAILED - check /var/log/rabbitmq/shutdown_log, _err
fi
else
echo No nodes running
@ -119,19 +123,14 @@ case "$1" in
echo -n "Rotating log files for $DESC: "
rotate_logs_rabbitmq
;;
force-reload|reload|restart)
echo -n "Restarting $DESC: "
restart_rabbitmq
echo "$NAME."
;;
condrestart|try-restart)
force-reload|reload|restart|condrestart|try-restart)
echo -n "Restarting $DESC: "
restart_rabbitmq
echo "$NAME."
;;
*)
echo "Usage: $0 {start|stop|status|rotate-logs|restart|condrestart|try-restart|reload|force-reload}" >&2
RETVAL=2
RETVAL=1
;;
esac

View File

@ -1,8 +1,9 @@
TARBALL_DIR=../../../dist
TARBALL=$(shell (cd $(TARBALL_DIR); echo rabbitmq-server-[0-9]*.tar.gz))
TARBALL=$(notdir $(wildcard $(TARBALL_DIR)/rabbitmq-server-[0-9.]*.tar.gz))
COMMON_DIR=../../common
DEBIAN_ORIG_TARBALL=$(shell echo $(TARBALL) | sed -e 's:\(.*\)-\(.*\)\(\.tar\.gz\):\1_\2\.orig\3:g')
VERSION=$(shell echo $(TARBALL) | sed -e 's:rabbitmq-server-\(.*\)\.tar\.gz:\1:g')
DEBIAN_ORIG_TARBALL=$(shell echo $(TARBALL) | sed -e 's:\(.*\)-\(.*\)\(\.tar\.gz\):\1_\2\.orig\3:g')
UNPACKED_DIR=rabbitmq-server-$(VERSION)
PACKAGENAME=rabbitmq-server
SIGNING_KEY_ID=056E8E56
@ -21,6 +22,10 @@ package: clean
tar -zxvf $(DEBIAN_ORIG_TARBALL)
cp -r debian $(UNPACKED_DIR)
cp $(COMMON_DIR)/* $(UNPACKED_DIR)/debian/
sed -i \
-e 's|^DEFAULTS_FILE=.*$$|DEFAULTS_FILE=/etc/default/rabbitmq|' \
-e 's|^LOCK_FILE=.*$$|LOCK_FILE=|' \
$(UNPACKED_DIR)/debian/rabbitmq-server.init
chmod a+x $(UNPACKED_DIR)/debian/rules
UNOFFICIAL_RELEASE=$(UNOFFICIAL_RELEASE) VERSION=$(VERSION) ./check-changelog.sh rabbitmq-server $(UNPACKED_DIR)
cd $(UNPACKED_DIR); GNUPGHOME=$(GNUPG_PATH)/.gnupg dpkg-buildpackage -rfakeroot $(SIGNING)

View File

@ -1,122 +0,0 @@
#!/bin/sh
### BEGIN INIT INFO
# Provides: rabbitmq
# Required-Start: $remote_fs $network
# Required-Stop: $remote_fs $network
# Default-Start: 2 3 4 5
# Default-Stop: 0 1 6
# Description: RabbitMQ broker
# Short-Description: Enable AMQP service provided by RabbitMQ broker
### END INIT INFO
PATH=/sbin:/usr/sbin:/bin:/usr/bin
DAEMON=/usr/sbin/rabbitmq-multi
NAME=rabbitmq-server
DESC=rabbitmq-server
USER=rabbitmq
NODE_COUNT=1
ROTATE_SUFFIX=
test -x $DAEMON || exit 0
# Include rabbitmq defaults if available
if [ -f /etc/default/rabbitmq ] ; then
. /etc/default/rabbitmq
fi
RETVAL=0
set -e
start_rabbitmq () {
set +e
$DAEMON start_all ${NODE_COUNT} > /var/log/rabbitmq/startup_log 2> /var/log/rabbitmq/startup_err
case "$?" in
0)
echo SUCCESS
RETVAL=0
;;
1)
echo TIMEOUT - check /var/log/rabbitmq/startup_\{log,err\}
RETVAL=1
;;
*)
echo FAILED - check /var/log/rabbitmq/startup_log, _err
RETVAL=1
;;
esac
set -e
}
stop_rabbitmq () {
set +e
status_rabbitmq quiet
if [ $RETVAL = 0 ] ; then
$DAEMON stop_all > /var/log/rabbitmq/shutdown_log 2> /var/log/rabbitmq/shutdown_err
RETVAL=$?
if [ $RETVAL != 0 ] ; then
echo FAILED - check /var/log/rabbitmq/shutdown_log, _err
fi
else
echo No nodes running
RETVAL=0
fi
set -e
}
status_rabbitmq() {
set +e
if [ "$1" != "quiet" ] ; then
$DAEMON status 2>&1
else
$DAEMON status > /dev/null 2>&1
fi
if [ $? != 0 ] ; then
RETVAL=1
fi
set -e
}
rotate_logs_rabbitmq() {
set +e
$DAEMON rotate_logs ${ROTATE_SUFFIX}
if [ $? != 0 ] ; then
RETVAL=1
fi
set -e
}
restart_rabbitmq() {
stop_rabbitmq
start_rabbitmq
}
case "$1" in
start)
echo -n "Starting $DESC: "
start_rabbitmq
echo "$NAME."
;;
stop)
echo -n "Stopping $DESC: "
stop_rabbitmq
echo "$NAME."
;;
status)
status_rabbitmq
;;
rotate-logs)
echo -n "Rotating log files for $DESC: "
rotate_logs_rabbitmq
;;
force-reload|restart)
echo -n "Restarting $DESC: "
restart_rabbitmq
echo "$NAME."
;;
*)
echo "Usage: $0 {start|stop|status|rotate-logs|restart|force-reload}" >&2
RETVAL=1
;;
esac
exit $RETVAL

View File

@ -3,7 +3,7 @@
include /usr/share/cdbs/1/rules/debhelper.mk
include /usr/share/cdbs/1/class/makefile.mk
RABBIT_LIB=$(DEB_DESTDIR)usr/lib/erlang/lib/rabbitmq_server-$(DEB_UPSTREAM_VERSION)/
RABBIT_LIB=$(DEB_DESTDIR)usr/lib/rabbitmq/lib/rabbitmq_server-$(DEB_UPSTREAM_VERSION)/
RABBIT_BIN=$(DEB_DESTDIR)usr/lib/rabbitmq/bin/
DEB_MAKE_INSTALL_TARGET := install TARGET_DIR=$(RABBIT_LIB) SBIN_DIR=$(RABBIT_BIN) MAN_DIR=$(DEB_DESTDIR)usr/share/man/
@ -17,3 +17,6 @@ install/rabbitmq-server::
for script in rabbitmqctl rabbitmq-server rabbitmq-multi; do \
install -p -D -m 0755 debian/rabbitmq-script-wrapper $(DEB_DESTDIR)usr/sbin/$$script; \
done
for script in rabbitmq-activate-plugins; do \
install -p -D -m 0755 debian/rabbitmq-asroot-script-wrapper $(DEB_DESTDIR)usr/sbin/$$script; \
done

View File

@ -4,10 +4,10 @@ TARGET_DIR=rabbitmq_server-$(VERSION)
TARGET_TARBALL=rabbitmq-server-generic-unix-$(VERSION)
dist:
make -C ../.. VERSION=$(VERSION) srcdist
$(MAKE) -C ../.. VERSION=$(VERSION) srcdist
tar -zxvf ../../dist/$(SOURCE_DIR).tar.gz
make -C $(SOURCE_DIR) \
$(MAKE) -C $(SOURCE_DIR) \
TARGET_DIR=`pwd`/$(TARGET_DIR) \
SBIN_DIR=`pwd`/$(TARGET_DIR)/sbin \
MAN_DIR=`pwd`/$(TARGET_DIR)/share/man \

View File

@ -42,7 +42,7 @@ use_parallel_build yes
build.args PYTHON=${prefix}/bin/python2.5
destroot.destdir \
TARGET_DIR=${destroot}${prefix}/lib/erlang/lib/rabbitmq_server-${version} \
TARGET_DIR=${destroot}${prefix}/lib/rabbitmq/lib/rabbitmq_server-${version} \
SBIN_DIR=${sbindir} \
MAN_DIR=${destroot}${prefix}/share/man
@ -61,9 +61,7 @@ post-destroot {
xinstall -d -g [existsgroup ${servergroup}] -m 775 ${destroot}${mnesiadbdir}
reinplace -E "s:(/etc/rabbitmq/rabbitmq.conf):${prefix}\\1:g" \
${sbindir}/rabbitmq-multi \
${sbindir}/rabbitmq-server \
${sbindir}/rabbitmqctl
${sbindir}/rabbitmq-env
reinplace -E "s:(CLUSTER_CONFIG_FILE)=/:\\1=${prefix}/:" \
${sbindir}/rabbitmq-multi \
${sbindir}/rabbitmq-server \
@ -83,14 +81,19 @@ post-destroot {
xinstall -m 555 ${filespath}/rabbitmq-script-wrapper \
${wrappersbin}/rabbitmq-multi
xinstall -m 555 ${filespath}/rabbitmq-asroot-script-wrapper \
${wrappersbin}/rabbitmq-activate-plugins
reinplace -E "s:/usr/lib/rabbitmq/bin/:${prefix}/lib/rabbitmq/bin/:" \
${wrappersbin}/rabbitmq-multi
reinplace -E "s:/var/lib/rabbitmq:${prefix}/var/lib/rabbitmq:" \
${wrappersbin}/rabbitmq-multi
reinplace -E "s:/usr/lib/rabbitmq/bin/:${prefix}/lib/rabbitmq/bin/:" \
${wrappersbin}/rabbitmq-activate-plugins
reinplace -E "s:/var/lib/rabbitmq:${prefix}/var/lib/rabbitmq:" \
${wrappersbin}/rabbitmq-activate-plugins
file copy ${wrappersbin}/rabbitmq-multi ${wrappersbin}/rabbitmq-server
file copy ${wrappersbin}/rabbitmq-multi ${wrappersbin}/rabbitmqctl
}
pre-install {

View File

@ -0,0 +1,12 @@
#!/bin/bash
cd /var/lib/rabbitmq
SCRIPT=`basename $0`
if [ `id -u` = 0 ] ; then
/usr/lib/rabbitmq/bin/${SCRIPT} "$@"
else
echo -e "\nOnly root should run ${SCRIPT}\n"
exit 1
fi

View File

@ -4,15 +4,16 @@ TARGET_DIR=rabbitmq_server-$(VERSION)
TARGET_ZIP=rabbitmq-server-windows-$(VERSION)
dist:
make -C ../.. VERSION=$(VERSION) srcdist
$(MAKE) -C ../.. VERSION=$(VERSION) srcdist
tar -zxvf ../../dist/$(SOURCE_DIR).tar.gz
make -C $(SOURCE_DIR)
$(MAKE) -C $(SOURCE_DIR)
mkdir $(SOURCE_DIR)/sbin
mv $(SOURCE_DIR)/scripts/rabbitmq-server.bat $(SOURCE_DIR)/sbin
mv $(SOURCE_DIR)/scripts/rabbitmq-service.bat $(SOURCE_DIR)/sbin
mv $(SOURCE_DIR)/scripts/rabbitmqctl.bat $(SOURCE_DIR)/sbin
mv $(SOURCE_DIR)/scripts/rabbitmq-multi.bat $(SOURCE_DIR)/sbin
mv $(SOURCE_DIR)/scripts/rabbitmq-activate-plugins.bat $(SOURCE_DIR)/sbin
rm -rf $(SOURCE_DIR)/scripts
rm -rf $(SOURCE_DIR)/codegen* $(SOURCE_DIR)/Makefile
rm -f $(SOURCE_DIR)/README

View File

@ -30,18 +30,18 @@
## Contributor(s): ______________________________________.
##
[ -f /etc/rabbitmq/rabbitmq.conf ] && . /etc/rabbitmq/rabbitmq.conf
. `dirname $0`/rabbitmq-env
RABBITMQ_EBIN=`dirname $0`/../ebin
[ "x" = "x$RABBITMQ_PLUGINS_DIR" ] && RABBITMQ_PLUGINS_DIR="`dirname $0`/../plugins"
[ "x" = "x$RABBITMQ_PLUGINS_EXPAND_DIR" ] && RABBITMQ_PLUGINS_EXPAND_DIR="`dirname $0`/../priv/plugins"
RABBITMQ_EBIN=${RABBITMQ_HOME}/ebin
[ "x" = "x$RABBITMQ_PLUGINS_DIR" ] && RABBITMQ_PLUGINS_DIR="${RABBITMQ_HOME}/plugins"
[ "x" = "x$RABBITMQ_PLUGINS_EXPAND_DIR" ] && RABBITMQ_PLUGINS_EXPAND_DIR="${RABBITMQ_HOME}/priv/plugins"
exec erl \
-pa "$RABBITMQ_EBIN" \
-rabbit plugins_dir "\"$RABBITMQ_PLUGINS_DIR\"" \
-rabbit plugins_expand_dir "\"$RABBITMQ_PLUGINS_EXPAND_DIR\"" \
-rabbit rabbit_ebin "\"$RABBITMQ_EBIN\"" \
-noinput \
-rabbit plugins_expand_dir "\"$RABBITMQ_PLUGINS_EXPAND_DIR\"" \
-rabbit rabbit_ebin "\"$RABBITMQ_EBIN\"" \
-noinput \
-hidden \
-s rabbit_plugin_activator \
-extra "$@"

View File

@ -30,10 +30,6 @@ REM
REM Contributor(s): ______________________________________.
REM
if "%ERLANG_HOME%"=="" (
set ERLANG_HOME=%~dp0%..\..\..
)
if not exist "%ERLANG_HOME%\bin\erl.exe" (
echo.
echo ******************************

53
scripts/rabbitmq-env Executable file
View File

@ -0,0 +1,53 @@
#!/bin/sh
## The contents of this file are subject to the Mozilla Public License
## Version 1.1 (the "License"); you may not use this file except in
## compliance with the License. You may obtain a copy of the License at
## http://www.mozilla.org/MPL/
##
## Software distributed under the License is distributed on an "AS IS"
## basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
## License for the specific language governing rights and limitations
## under the License.
##
## The Original Code is RabbitMQ.
##
## The Initial Developers of the Original Code are LShift Ltd,
## Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
##
## Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
## Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
## are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
## Technologies LLC, and Rabbit Technologies Ltd.
##
## Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
## Ltd. Portions created by Cohesive Financial Technologies LLC are
## Copyright (C) 2007-2009 Cohesive Financial Technologies
## LLC. Portions created by Rabbit Technologies Ltd are Copyright
## (C) 2007-2009 Rabbit Technologies Ltd.
##
## All Rights Reserved.
##
## Contributor(s): ______________________________________.
##
# Determine where this script is really located
SCRIPT_PATH="$0"
while [ -h "$SCRIPT_PATH" ] ; do
FULL_PATH=`readlink -f $SCRIPT_PATH 2>/dev/null`
if [ "$?" != "0" ]; then
REL_PATH=`readlink $SCRIPT_PATH`
if expr "$REL_PATH" : '/.*' > /dev/null; then
SCRIPT_PATH="$REL_PATH"
else
SCRIPT_PATH="`dirname "$SCRIPT_PATH"`/$REL_PATH"
fi
else
SCRIPT_PATH=$FULL_PATH
fi
done
SCRIPT_DIR=`dirname $SCRIPT_PATH`
RABBITMQ_HOME="${SCRIPT_DIR}/.."
# Load configuration from the rabbitmq.conf file
[ -f /etc/rabbitmq/rabbitmq.conf ] && . /etc/rabbitmq/rabbitmq.conf

View File

@ -37,7 +37,7 @@ PIDS_FILE=/var/lib/rabbitmq/pids
MULTI_ERL_ARGS=
MULTI_START_ARGS=
[ -f /etc/rabbitmq/rabbitmq.conf ] && . /etc/rabbitmq/rabbitmq.conf
. `dirname $0`/rabbitmq-env
[ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME}
[ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS}
@ -60,7 +60,7 @@ export \
set -f
exec erl \
-pa "`dirname $0`/../ebin" \
-pa "${RABBITMQ_HOME}/ebin" \
-noinput \
-hidden \
${RABBITMQ_MULTI_ERL_ARGS} \

View File

@ -49,10 +49,6 @@ if "%RABBITMQ_NODE_PORT%"=="" (
set RABBITMQ_PIDS_FILE=%RABBITMQ_BASE%\rabbitmq.pids
set RABBITMQ_SCRIPT_HOME=%~sdp0%
if "%ERLANG_HOME%"=="" (
set ERLANG_HOME=%~dp0%..\..\..
)
if not exist "%ERLANG_HOME%\bin\erl.exe" (
echo.
echo ******************************

View File

@ -41,7 +41,7 @@ LOG_BASE=/var/log/rabbitmq
MNESIA_BASE=/var/lib/rabbitmq/mnesia
SERVER_START_ARGS=
[ -f /etc/rabbitmq/rabbitmq.conf ] && . /etc/rabbitmq/rabbitmq.conf
. `dirname $0`/rabbitmq-env
[ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME}
[ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS}
@ -75,7 +75,7 @@ fi
RABBITMQ_START_RABBIT=
[ "x" = "x$RABBITMQ_NODE_ONLY" ] && RABBITMQ_START_RABBIT='-noinput -s rabbit'
RABBITMQ_EBIN_ROOT="`dirname $0`/../ebin"
RABBITMQ_EBIN_ROOT="${RABBITMQ_HOME}/ebin"
if [ -f "${RABBITMQ_EBIN_ROOT}/rabbit.boot" ]; then
RABBITMQ_BOOT_FILE="${RABBITMQ_EBIN_ROOT}/rabbit"
RABBITMQ_EBIN_PATH=""

View File

@ -46,10 +46,6 @@ if "%RABBITMQ_NODE_PORT%"=="" (
set RABBITMQ_NODE_PORT=5672
)
if "%ERLANG_HOME%"=="" (
set ERLANG_HOME=%~dp0%..\..\..
)
if not exist "%ERLANG_HOME%\bin\erl.exe" (
echo.
echo ******************************

View File

@ -30,12 +30,12 @@
## Contributor(s): ______________________________________.
##
[ -f /etc/rabbitmq/rabbitmq.conf ] && . /etc/rabbitmq/rabbitmq.conf
. `dirname $0`/rabbitmq-env
[ "x" = "x$RABBITMQ_CTL_ERL_ARGS" ] && RABBITMQ_CTL_ERL_ARGS=${CTL_ERL_ARGS}
exec erl \
-pa "`dirname $0`/../ebin" \
-pa "${RABBITMQ_HOME}/ebin" \
-noinput \
-hidden \
${RABBITMQ_CTL_ERL_ARGS} \

View File

@ -30,10 +30,6 @@ REM
REM Contributor(s): ______________________________________.
REM
if "%ERLANG_HOME%"=="" (
set ERLANG_HOME=%~dp0%..\..\..
)
if not exist "%ERLANG_HOME%\bin\erl.exe" (
echo.
echo ******************************

View File

@ -437,7 +437,10 @@ unregister_name({local,Name}) ->
unregister_name({global,Name}) ->
_ = global:unregister_name(Name);
unregister_name(Pid) when is_pid(Pid) ->
Pid.
Pid;
% Under R12 let's just ignore it, as we have a single term as Name.
% On R13 it will never get here, as we get tuple with 'local/global' atom.
unregister_name(_Name) -> ok.
extend_backoff(undefined) ->
undefined;

View File

@ -55,7 +55,8 @@
-module(priority_queue).
-export([new/0, is_queue/1, is_empty/1, len/1, to_list/1, in/2, in/3, out/1]).
-export([new/0, is_queue/1, is_empty/1, len/1, to_list/1, in/2, in/3,
out/1, join/2]).
%%----------------------------------------------------------------------------
@ -73,6 +74,7 @@
-spec(in/2 :: (any(), pqueue()) -> pqueue()).
-spec(in/3 :: (any(), priority(), pqueue()) -> pqueue()).
-spec(out/1 :: (pqueue()) -> {empty | {value, any()}, pqueue()}).
-spec(join/2 :: (pqueue(), pqueue()) -> pqueue()).
-endif.
@ -147,6 +149,42 @@ out({pqueue, [{P, Q} | Queues]}) ->
end,
{R, NewQ}.
join(A, {queue, [], []}) ->
A;
join({queue, [], []}, B) ->
B;
join({queue, AIn, AOut}, {queue, BIn, BOut}) ->
{queue, BIn, AOut ++ lists:reverse(AIn, BOut)};
join(A = {queue, _, _}, {pqueue, BPQ}) ->
{Pre, Post} = lists:splitwith(fun ({P, _}) -> P < 0 end, BPQ),
Post1 = case Post of
[] -> [ {0, A} ];
[ {0, ZeroQueue} | Rest ] -> [ {0, join(A, ZeroQueue)} | Rest ];
_ -> [ {0, A} | Post ]
end,
{pqueue, Pre ++ Post1};
join({pqueue, APQ}, B = {queue, _, _}) ->
{Pre, Post} = lists:splitwith(fun ({P, _}) -> P < 0 end, APQ),
Post1 = case Post of
[] -> [ {0, B} ];
[ {0, ZeroQueue} | Rest ] -> [ {0, join(ZeroQueue, B)} | Rest ];
_ -> [ {0, B} | Post ]
end,
{pqueue, Pre ++ Post1};
join({pqueue, APQ}, {pqueue, BPQ}) ->
{pqueue, merge(APQ, BPQ, [])}.
merge([], BPQ, Acc) ->
lists:reverse(Acc, BPQ);
merge(APQ, [], Acc) ->
lists:reverse(Acc, APQ);
merge([{P, A}|As], [{P, B}|Bs], Acc) ->
merge(As, Bs, [ {P, join(A, B)} | Acc ]);
merge([{PA, A}|As], Bs = [{PB, _}|_], Acc) when PA < PB ->
merge(As, Bs, [ {PA, A} | Acc ]);
merge(As = [{_, _}|_], [{PB, B}|Bs], Acc) ->
merge(As, Bs, [ {PB, B} | Acc ]).
r2f([]) -> {queue, [], []};
r2f([_] = R) -> {queue, [], R};
r2f([X,Y]) -> {queue, [X], [Y]};

View File

@ -116,8 +116,6 @@ start(normal, []) ->
print_banner(),
{ok, ExtraSteps} = application:get_env(extra_startup_steps),
lists:foreach(
fun ({Msg, Thunk}) ->
io:format("starting ~-20s ...", [Msg]),
@ -135,13 +133,13 @@ start(normal, []) ->
ok = start_child(rabbit_log),
ok = rabbit_hooks:start(),
ok = rabbit_amqqueue:start(),
ok = rabbit_binary_generator:
check_empty_content_body_frame_size(),
{ok, MemoryAlarms} = application:get_env(memory_alarms),
ok = rabbit_alarm:start(MemoryAlarms),
ok = rabbit_binary_generator:
check_empty_content_body_frame_size(),
ok = rabbit_amqqueue:start(),
ok = start_child(rabbit_router),
ok = start_child(rabbit_node_monitor)
@ -170,14 +168,28 @@ start(normal, []) ->
{"TCP listeners",
fun () ->
ok = rabbit_networking:start(),
{ok, TCPListeners} = application:get_env(tcp_listeners),
{ok, TcpListeners} = application:get_env(tcp_listeners),
lists:foreach(
fun ({Host, Port}) ->
ok = rabbit_networking:start_tcp_listener(Host, Port)
end,
TCPListeners)
end}]
++ ExtraSteps),
TcpListeners)
end},
{"SSL listeners",
fun () ->
case application:get_env(ssl_listeners) of
{ok, []} ->
ok;
{ok, SslListeners} ->
ok = rabbit_misc:start_applications([crypto, ssl]),
{ok, SslOpts} = application:get_env(ssl_options),
[rabbit_networking:start_ssl_listener
(Host, Port, SslOpts) || {Host, Port} <- SslListeners],
ok
end
end}]),
io:format("~nbroker running~n"),

View File

@ -41,7 +41,7 @@
-define(MEMSUP_CHECK_INTERVAL, 1000).
%% OSes on which we know memory alarms to be trustworthy
-define(SUPPORTED_OS, [{unix, linux}]).
-define(SUPPORTED_OS, [{unix, linux}, {unix, darwin}]).
-record(alarms, {alertees, system_memory_high_watermark = false}).
@ -136,33 +136,35 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
start_memsup() ->
Mod = case os:type() of
%% memsup doesn't take account of buffers or cache when
%% considering "free" memory - therefore on Linux we can
%% get memory alarms very easily without any pressure
%% existing on memory at all. Therefore we need to use
%% our own simple memory monitor.
%%
{unix, linux} -> rabbit_memsup_linux;
{Mod, Args} =
case os:type() of
%% memsup doesn't take account of buffers or cache when
%% considering "free" memory - therefore on Linux we can
%% get memory alarms very easily without any pressure
%% existing on memory at all. Therefore we need to use
%% our own simple memory monitor.
%%
{unix, linux} -> {rabbit_memsup, [rabbit_memsup_linux]};
{unix, darwin} -> {rabbit_memsup, [rabbit_memsup_darwin]};
%% Start memsup programmatically rather than via the
%% rabbitmq-server script. This is not quite the right
%% thing to do as os_mon checks to see if memsup is
%% available before starting it, but as memsup is
%% available everywhere (even on VXWorks) it should be
%% ok.
%%
%% One benefit of the programmatic startup is that we
%% can add our alarm_handler before memsup is running,
%% thus ensuring that we notice memory alarms that go
%% off on startup.
%%
_ -> memsup
end,
%% Start memsup programmatically rather than via the
%% rabbitmq-server script. This is not quite the right
%% thing to do as os_mon checks to see if memsup is
%% available before starting it, but as memsup is
%% available everywhere (even on VXWorks) it should be
%% ok.
%%
%% One benefit of the programmatic startup is that we
%% can add our alarm_handler before memsup is running,
%% thus ensuring that we notice memory alarms that go
%% off on startup.
%%
_ -> {memsup, []}
end,
%% This is based on os_mon:childspec(memsup, true)
{ok, _} = supervisor:start_child(
os_mon_sup,
{memsup, {Mod, start_link, []},
{memsup, {Mod, start_link, Args},
permanent, 2000, worker, [Mod]}),
ok.

View File

@ -303,10 +303,10 @@ basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
infinity).
notify_sent(QPid, ChPid) ->
gen_server2:cast(QPid, {notify_sent, ChPid}).
gen_server2:pcast(QPid, 8, {notify_sent, ChPid}).
unblock(QPid, ChPid) ->
gen_server2:cast(QPid, {unblock, ChPid}).
gen_server2:pcast(QPid, 8, {unblock, ChPid}).
internal_delete(QueueName) ->
rabbit_misc:execute_mnesia_transaction(

View File

@ -126,7 +126,7 @@ handle_cast({method, Method, Content}, State) ->
{stop, normal, State#ch{state = terminating}}
catch
exit:{amqp, Error, Explanation, none} ->
ok = notify_queues(internal_rollback(State)),
ok = rollback_and_notify(State),
Reason = {amqp, Error, Explanation,
rabbit_misc:method_record_type(Method)},
State#ch.reader_pid ! {channel_exit, State#ch.channel, Reason},
@ -157,6 +157,10 @@ handle_cast({conserve_memory, Conserve}, State) ->
State#ch.writer_pid, #'channel.flow'{active = not(Conserve)}),
noreply(State).
handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}},
State = #ch{writer_pid = WriterPid}) ->
State#ch.reader_pid ! {channel_exit, State#ch.channel, Reason},
{stop, normal, State};
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State};
@ -171,7 +175,7 @@ terminate(_Reason, #ch{writer_pid = WriterPid, limiter_pid = LimiterPid,
terminate(Reason, State = #ch{writer_pid = WriterPid,
limiter_pid = LimiterPid}) ->
Res = notify_queues(internal_rollback(State)),
Res = rollback_and_notify(State),
case Reason of
normal -> ok = Res;
_ -> ok
@ -293,7 +297,7 @@ handle_method(_Method, _, #ch{state = starting}) ->
rabbit_misc:protocol_error(channel_error, "expected 'channel.open'", []);
handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid}) ->
ok = notify_queues(internal_rollback(State)),
ok = rollback_and_notify(State),
ok = rabbit_writer:send_command(WriterPid, #'channel.close_ok'{}),
stop;
@ -868,6 +872,11 @@ internal_rollback(State = #ch{transaction_id = TxnKey,
internal_error, "rollback failed: ~w", [Errors])
end.
rollback_and_notify(State = #ch{transaction_id = none}) ->
notify_queues(State);
rollback_and_notify(State) ->
notify_queues(internal_rollback(State)).
fold_per_queue(F, Acc0, UAQ) ->
D = lists:foldl(
fun ({_DTag, _CTag,

View File

@ -164,7 +164,7 @@ exchange name, routing key, queue name and arguments, in that order.
<ConnectionInfoItem> must be a member of the list [node, address, port,
peer_address, peer_port, state, channels, user, vhost, timeout, frame_max,
recv_oct, recv_cnt, send_oct, send_cnt, send_pend]. The default is to display
user, peer_address and peer_port.
user, peer_address, peer_port and state.
"),
halt(1).
@ -270,8 +270,9 @@ action(list_bindings, Node, Args, Inform) ->
action(list_connections, Node, Args, Inform) ->
Inform("Listing connections", []),
ArgAtoms = list_replace(node, pid,
default_if_empty(Args, [user, peer_address, peer_port])),
ArgAtoms = list_replace(node, pid,
default_if_empty(Args, [user, peer_address,
peer_port, state])),
display_info_list(rpc_call(Node, rabbit_networking, connection_info_all,
[ArgAtoms]),
ArgAtoms);
@ -314,7 +315,7 @@ default_if_empty(List, Default) when is_list(List) ->
end.
display_info_list(Results, InfoItemKeys) when is_list(Results) ->
lists:foreach(fun (Result) -> display_row([format_info_item(Result, X) ||
lists:foreach(fun (Result) -> display_row([format_info_item(X, Result) ||
X <- InfoItemKeys])
end, Results),
ok;
@ -325,26 +326,29 @@ display_row(Row) ->
io:fwrite(lists:flatten(rabbit_misc:intersperse("\t", Row))),
io:nl().
format_info_item(Items, Key) ->
{value, Info = {Key, Value}} = lists:keysearch(Key, 1, Items),
case Info of
{_, #resource{name = Name}} ->
url_encode(Name);
_ when Key =:= address; Key =:= peer_address andalso is_tuple(Value) ->
format_info_item(Key, Items) ->
case proplists:get_value(Key, Items) of
#resource{name = Name} ->
escape(Name);
Value when Key =:= address; Key =:= peer_address andalso
is_tuple(Value) ->
inet_parse:ntoa(Value);
_ when is_pid(Value) ->
Value when is_pid(Value) ->
atom_to_list(node(Value));
_ when is_binary(Value) ->
url_encode(Value);
_ ->
Value when is_binary(Value) ->
escape(Value);
Value when is_atom(Value) ->
escape(atom_to_list(Value));
Value ->
io_lib:format("~w", [Value])
end.
display_list(L) when is_list(L) ->
lists:foreach(fun (I) when is_binary(I) ->
io:format("~s~n", [url_encode(I)]);
io:format("~s~n", [escape(I)]);
(I) when is_tuple(I) ->
display_row([url_encode(V) || V <- tuple_to_list(I)])
display_row([escape(V)
|| V <- tuple_to_list(I)])
end,
lists:sort(L)),
ok;
@ -356,32 +360,25 @@ call(Node, {Mod, Fun, Args}) ->
rpc_call(Node, Mod, Fun, Args) ->
rpc:call(Node, Mod, Fun, Args, ?RPC_TIMEOUT).
%% url_encode is lifted from ibrowse, modified to preserve some characters
url_encode(Bin) when binary(Bin) ->
url_encode_char(lists:reverse(binary_to_list(Bin)), []).
%% escape does C-style backslash escaping of non-printable ASCII
%% characters. We don't escape characters above 127, since they may
%% form part of UTF-8 strings.
url_encode_char([X | T], Acc) when X >= $a, X =< $z ->
url_encode_char(T, [X | Acc]);
url_encode_char([X | T], Acc) when X >= $A, X =< $Z ->
url_encode_char(T, [X | Acc]);
url_encode_char([X | T], Acc) when X >= $0, X =< $9 ->
url_encode_char(T, [X | Acc]);
url_encode_char([X | T], Acc)
when X == $-; X == $_; X == $.; X == $~;
X == $!; X == $*; X == $'; X == $(;
X == $); X == $;; X == $:; X == $@;
X == $&; X == $=; X == $+; X == $$;
X == $,; X == $/; X == $?; X == $%;
X == $#; X == $[; X == $] ->
url_encode_char(T, [X | Acc]);
url_encode_char([X | T], Acc) ->
url_encode_char(T, [$%, d2h(X bsr 4), d2h(X band 16#0f) | Acc]);
url_encode_char([], Acc) ->
escape(Bin) when binary(Bin) ->
escape(binary_to_list(Bin));
escape(L) when is_list(L) ->
escape_char(lists:reverse(L), []).
escape_char([$\\ | T], Acc) ->
escape_char(T, [$\\, $\\ | Acc]);
escape_char([X | T], Acc) when X > 32, X /= 127 ->
escape_char(T, [X | Acc]);
escape_char([X | T], Acc) ->
escape_char(T, [$\\, $0 + (X bsr 6), $0 + (X band 8#070 bsr 3),
$0 + (X band 7) | Acc]);
escape_char([], Acc) ->
Acc.
d2h(N) when N<10 -> N+$0;
d2h(N) -> N+$a-10.
list_replace(Find, Replace, List) ->
[case X of Find -> Replace; _ -> X end || X <- List].

View File

@ -42,6 +42,7 @@
terminate/2, code_change/3]).
-define(SERVER, ?MODULE).
-define(SERIAL_FILENAME, "rabbit_serial").
-record(state, {serial}).
@ -59,17 +60,28 @@
%%----------------------------------------------------------------------------
start_link() ->
%% The persister can get heavily loaded, and we don't want that to
%% impact guid generation. We therefore keep the serial in a
%% separate process rather than calling rabbit_persister:serial/0
%% directly in the functions below.
gen_server:start_link({local, ?SERVER}, ?MODULE,
[rabbit_persister:serial()], []).
[update_disk_serial()], []).
update_disk_serial() ->
Filename = filename:join(rabbit_mnesia:dir(), ?SERIAL_FILENAME),
Serial = case rabbit_misc:read_term_file(Filename) of
{ok, [Num]} -> Num;
{error, enoent} -> rabbit_persister:serial();
{error, Reason} ->
throw({error, {cannot_read_serial_file, Filename, Reason}})
end,
case rabbit_misc:write_term_file(Filename, [Serial + 1]) of
ok -> ok;
{error, Reason1} ->
throw({error, {cannot_write_serial_file, Filename, Reason1}})
end,
Serial.
%% generate a guid that is monotonically increasing per process.
%%
%% The id is only unique within a single cluster and as long as the
%% persistent message store hasn't been deleted.
%% serial store hasn't been deleted.
guid() ->
%% We don't use erlang:now() here because a) it may return
%% duplicates when the system clock has been rewound prior to a
@ -77,7 +89,7 @@ guid() ->
%% now() to move ahead of the system time), and b) it is really
%% slow since it takes a global lock and makes a system call.
%%
%% rabbit_persister:serial/0, in combination with self/0 (which
%% A persisted serial number, in combination with self/0 (which
%% includes the node name) uniquely identifies a process in space
%% and time. We combine that with a process-local counter to give
%% us a GUID that is monotonically increasing per process.

View File

@ -53,7 +53,7 @@ start_heartbeat(Sock, TimeoutSec) ->
spawn_link(fun () -> heartbeater(Sock, TimeoutSec * 1000 div 2,
send_oct, 0,
fun () ->
catch gen_tcp:send(Sock, rabbit_binary_generator:build_heartbeat_frame()),
catch rabbit_net:send(Sock, rabbit_binary_generator:build_heartbeat_frame()),
continue
end,
erlang:monitor(process, Parent)) end),
@ -73,7 +73,7 @@ heartbeater(Sock, TimeoutMillisec, StatName, Threshold, Handler, MonitorRef) ->
{'DOWN', MonitorRef, process, _Object, _Info} -> ok;
Other -> exit({unexpected_message, Other})
after TimeoutMillisec ->
case inet:getstat(Sock, [StatName]) of
case rabbit_net:getstat(Sock, [StatName]) of
{ok, [{StatName, NewStatVal}]} ->
if NewStatVal =/= StatVal ->
F({NewStatVal, 0});

142
src/rabbit_memsup.erl Normal file
View File

@ -0,0 +1,142 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License at
%% http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
%% License for the specific language governing rights and limitations
%% under the License.
%%
%% The Original Code is RabbitMQ.
%%
%% The Initial Developers of the Original Code are LShift Ltd,
%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
%%
%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
%% Technologies LLC, and Rabbit Technologies Ltd.
%%
%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
%% Copyright (C) 2007-2009 Cohesive Financial Technologies
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
%% (C) 2007-2009 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
%% Contributor(s): ______________________________________.
%%
-module(rabbit_memsup).
-behaviour(gen_server).
-export([start_link/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-export([update/0]).
-record(state, {memory_fraction,
timeout,
timer,
mod,
mod_state,
alarmed
}).
-define(SERVER, memsup). %% must be the same as the standard memsup
-define(DEFAULT_MEMORY_CHECK_INTERVAL, 1000).
%%----------------------------------------------------------------------------
-ifdef(use_specs).
-spec(start_link/1 :: (atom()) -> {'ok', pid()} | 'ignore' | {'error', any()}).
-spec(update/0 :: () -> 'ok').
-endif.
%%----------------------------------------------------------------------------
start_link(Args) ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [Args], []).
update() ->
gen_server:cast(?SERVER, update).
%%----------------------------------------------------------------------------
init([Mod]) ->
Fraction = os_mon:get_env(memsup, system_memory_high_watermark),
TRef = start_timer(?DEFAULT_MEMORY_CHECK_INTERVAL),
InitState = Mod:init(),
State = #state { memory_fraction = Fraction,
timeout = ?DEFAULT_MEMORY_CHECK_INTERVAL,
timer = TRef,
mod = Mod,
mod_state = InitState,
alarmed = false },
{ok, internal_update(State)}.
start_timer(Timeout) ->
{ok, TRef} = timer:apply_interval(Timeout, ?MODULE, update, []),
TRef.
%% Export the same API as the real memsup. Note that
%% get_sysmem_high_watermark gives an int in the range 0 - 100, while
%% set_sysmem_high_watermark takes a float in the range 0.0 - 1.0.
handle_call(get_sysmem_high_watermark, _From, State) ->
{reply, trunc(100 * State#state.memory_fraction), State};
handle_call({set_sysmem_high_watermark, Float}, _From, State) ->
{reply, ok, State#state{memory_fraction = Float}};
handle_call(get_check_interval, _From, State) ->
{reply, State#state.timeout, State};
handle_call({set_check_interval, Timeout}, _From, State) ->
{ok, cancel} = timer:cancel(State#state.timer),
{reply, ok, State#state{timeout = Timeout, timer = start_timer(Timeout)}};
handle_call(get_memory_data, _From,
State = #state { mod = Mod, mod_state = ModState }) ->
{reply, Mod:get_memory_data(ModState), State};
handle_call(_Request, _From, State) ->
{noreply, State}.
handle_cast(update, State) ->
{noreply, internal_update(State)};
handle_cast(_Request, State) ->
{noreply, State}.
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
internal_update(State = #state { memory_fraction = MemoryFraction,
alarmed = Alarmed,
mod = Mod, mod_state = ModState }) ->
ModState1 = Mod:update(ModState),
{MemTotal, MemUsed, _BigProc} = Mod:get_memory_data(ModState1),
NewAlarmed = MemUsed / MemTotal > MemoryFraction,
case {Alarmed, NewAlarmed} of
{false, true} ->
alarm_handler:set_alarm({system_memory_high_watermark, []});
{true, false} ->
alarm_handler:clear_alarm(system_memory_high_watermark);
_ ->
ok
end,
State #state { mod_state = ModState1, alarmed = NewAlarmed }.

View File

@ -0,0 +1,88 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License at
%% http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
%% License for the specific language governing rights and limitations
%% under the License.
%%
%% The Original Code is RabbitMQ.
%%
%% The Initial Developers of the Original Code are LShift Ltd,
%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
%%
%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
%% Technologies LLC, and Rabbit Technologies Ltd.
%%
%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
%% Copyright (C) 2007-2009 Cohesive Financial Technologies
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
%% (C) 2007-2009 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
%% Contributor(s): ______________________________________.
%%
-module(rabbit_memsup_darwin).
-export([init/0, update/1, get_memory_data/1]).
-record(state, {total_memory,
allocated_memory}).
%%----------------------------------------------------------------------------
-ifdef(use_specs).
-type(state() :: #state { total_memory :: ('undefined' | non_neg_integer()),
allocated_memory :: ('undefined' | non_neg_integer())
}).
-spec(init/0 :: () -> state()).
-spec(update/1 :: (state()) -> state()).
-spec(get_memory_data/1 :: (state()) -> {non_neg_integer(), non_neg_integer(),
('undefined' | pid())}).
-endif.
%%----------------------------------------------------------------------------
init() ->
#state{total_memory = undefined,
allocated_memory = undefined}.
update(State) ->
File = os:cmd("/usr/bin/vm_stat"),
Lines = string:tokens(File, "\n"),
Dict = dict:from_list(lists:map(fun parse_line/1, Lines)),
[PageSize, Inactive, Active, Free, Wired] =
[dict:fetch(Key, Dict) ||
Key <- [page_size, 'Pages inactive', 'Pages active', 'Pages free',
'Pages wired down']],
MemTotal = PageSize * (Inactive + Active + Free + Wired),
MemUsed = PageSize * (Active + Wired),
State#state{total_memory = MemTotal, allocated_memory = MemUsed}.
get_memory_data(State) ->
{State#state.total_memory, State#state.allocated_memory, undefined}.
%%----------------------------------------------------------------------------
%% A line looks like "Foo bar: 123456."
parse_line(Line) ->
[Name, RHS | _Rest] = string:tokens(Line, ":"),
case Name of
"Mach Virtual Memory Statistics" ->
["(page", "size", "of", PageSize, "bytes)"] =
string:tokens(RHS, " "),
{page_size, list_to_integer(PageSize)};
_ ->
[Value | _Rest1] = string:tokens(RHS, " ."),
{list_to_atom(Name), list_to_integer(Value)}
end.

View File

@ -31,104 +31,44 @@
-module(rabbit_memsup_linux).
-behaviour(gen_server).
-export([init/0, update/1, get_memory_data/1]).
-export([start_link/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-export([update/0]).
-define(SERVER, memsup). %% must be the same as the standard memsup
-define(DEFAULT_MEMORY_CHECK_INTERVAL, 1000).
-record(state, {memory_fraction, alarmed, timeout, timer}).
-record(state, {total_memory,
allocated_memory}).
%%----------------------------------------------------------------------------
-ifdef(use_specs).
-spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}).
-spec(update/0 :: () -> 'ok').
-type(state() :: #state { total_memory :: ('undefined' | non_neg_integer()),
allocated_memory :: ('undefined' | non_neg_integer())
}).
-spec(init/0 :: () -> state()).
-spec(update/1 :: (state()) -> state()).
-spec(get_memory_data/1 :: (state()) -> {non_neg_integer(), non_neg_integer(),
('undefined' | pid())}).
-endif.
%%----------------------------------------------------------------------------
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
init() ->
#state{total_memory = undefined,
allocated_memory = undefined}.
update() ->
gen_server:cast(?SERVER, update).
%%----------------------------------------------------------------------------
init(_Args) ->
Fraction = os_mon:get_env(memsup, system_memory_high_watermark),
TRef = start_timer(?DEFAULT_MEMORY_CHECK_INTERVAL),
{ok, #state{alarmed = false,
memory_fraction = Fraction,
timeout = ?DEFAULT_MEMORY_CHECK_INTERVAL,
timer = TRef}}.
start_timer(Timeout) ->
{ok, TRef} = timer:apply_interval(Timeout, ?MODULE, update, []),
TRef.
%% Export the same API as the real memsup. Note that
%% get_sysmem_high_watermark gives an int in the range 0 - 100, while
%% set_sysmem_high_watermark takes a float in the range 0.0 - 1.0.
handle_call(get_sysmem_high_watermark, _From, State) ->
{reply, trunc(100 * State#state.memory_fraction), State};
handle_call({set_sysmem_high_watermark, Float}, _From, State) ->
{reply, ok, State#state{memory_fraction = Float}};
handle_call(get_check_interval, _From, State) ->
{reply, State#state.timeout, State};
handle_call({set_check_interval, Timeout}, _From, State) ->
{ok, cancel} = timer:cancel(State#state.timer),
{reply, ok, State#state{timeout = Timeout, timer = start_timer(Timeout)}};
handle_call(_Request, _From, State) ->
{noreply, State}.
handle_cast(update, State = #state{alarmed = Alarmed,
memory_fraction = MemoryFraction}) ->
update(State) ->
File = read_proc_file("/proc/meminfo"),
Lines = string:tokens(File, "\n"),
Dict = dict:from_list(lists:map(fun parse_line/1, Lines)),
MemTotal = dict:fetch('MemTotal', Dict),
MemUsed = MemTotal
- dict:fetch('MemFree', Dict)
- dict:fetch('Buffers', Dict)
- dict:fetch('Cached', Dict),
NewAlarmed = MemUsed / MemTotal > MemoryFraction,
case {Alarmed, NewAlarmed} of
{false, true} ->
alarm_handler:set_alarm({system_memory_high_watermark, []});
{true, false} ->
alarm_handler:clear_alarm(system_memory_high_watermark);
_ ->
ok
end,
{noreply, State#state{alarmed = NewAlarmed}};
[MemTotal, MemFree, Buffers, Cached] =
[dict:fetch(Key, Dict) ||
Key <- ['MemTotal', 'MemFree', 'Buffers', 'Cached']],
MemUsed = MemTotal - MemFree - Buffers - Cached,
State#state{total_memory = MemTotal, allocated_memory = MemUsed}.
handle_cast(_Request, State) ->
{noreply, State}.
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
get_memory_data(State) ->
{State#state.total_memory, State#state.allocated_memory, undefined}.
%%----------------------------------------------------------------------------
@ -152,5 +92,10 @@ read_proc_file(IoDevice, Acc) ->
%% A line looks like "FooBar: 123456 kB"
parse_line(Line) ->
[Name, Value | _] = string:tokens(Line, ": "),
{list_to_atom(Name), list_to_integer(Value)}.
[Name, RHS | _Rest] = string:tokens(Line, ":"),
[Value | UnitsRest] = string:tokens(RHS, " "),
Value1 = case UnitsRest of
[] -> list_to_integer(Value); %% no units
["kB"] -> list_to_integer(Value) * 1024
end,
{list_to_atom(Name), Value1}.

View File

@ -50,9 +50,11 @@
-export([intersperse/2, upmap/2, map_in_order/2]).
-export([table_foreach/2]).
-export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]).
-export([read_term_file/1, write_term_file/2]).
-export([append_file/2, ensure_parent_dirs_exist/1]).
-export([format_stderr/2]).
-export([start_applications/1, stop_applications/1]).
-export([unfold/2, ceil/1]).
-import(mnesia).
-import(lists).
@ -65,6 +67,8 @@
-include_lib("kernel/include/inet.hrl").
-type(ok_or_error() :: 'ok' | {'error', any()}).
-spec(method_record_type/1 :: (tuple()) -> atom()).
-spec(polite_pause/0 :: () -> 'done').
-spec(polite_pause/1 :: (non_neg_integer()) -> 'done').
@ -88,9 +92,9 @@
-spec(r_arg/4 :: (vhost() | r(atom()), K, amqp_table(), binary()) ->
undefined | r(K) when is_subtype(K, atom())).
-spec(rs/1 :: (r(atom())) -> string()).
-spec(enable_cover/0 :: () -> 'ok' | {'error', any()}).
-spec(enable_cover/0 :: () -> ok_or_error()).
-spec(report_cover/0 :: () -> 'ok').
-spec(enable_cover/1 :: (string()) -> 'ok' | {'error', any()}).
-spec(enable_cover/1 :: (string()) -> ok_or_error()).
-spec(report_cover/1 :: (string()) -> 'ok').
-spec(throw_on_error/2 ::
(atom(), thunk({error, any()} | {ok, A} | A)) -> A).
@ -100,7 +104,7 @@
-spec(with_vhost/2 :: (vhost(), thunk(A)) -> A).
-spec(with_user_and_vhost/3 :: (username(), vhost(), thunk(A)) -> A).
-spec(execute_mnesia_transaction/1 :: (thunk(A)) -> A).
-spec(ensure_ok/2 :: ('ok' | {'error', any()}, atom()) -> 'ok').
-spec(ensure_ok/2 :: (ok_or_error(), atom()) -> 'ok').
-spec(localnode/1 :: (atom()) -> erlang_node()).
-spec(tcp_name/3 :: (atom(), ip_address(), ip_port()) -> atom()).
-spec(intersperse/2 :: (A, [A]) -> [A]).
@ -110,12 +114,16 @@
-spec(dirty_read_all/1 :: (atom()) -> [any()]).
-spec(dirty_foreach_key/2 :: (fun ((any()) -> any()), atom()) ->
'ok' | 'aborted').
-spec(dirty_dump_log/1 :: (string()) -> 'ok' | {'error', any()}).
-spec(append_file/2 :: (string(), string()) -> 'ok' | {'error', any()}).
-spec(dirty_dump_log/1 :: (string()) -> ok_or_error()).
-spec(read_term_file/1 :: (string()) -> {'ok', [any()]} | {'error', any()}).
-spec(write_term_file/2 :: (string(), [any()]) -> ok_or_error()).
-spec(append_file/2 :: (string(), string()) -> ok_or_error()).
-spec(ensure_parent_dirs_exist/1 :: (string()) -> 'ok').
-spec(format_stderr/2 :: (string(), [any()]) -> 'ok').
-spec(start_applications/1 :: ([atom()]) -> 'ok').
-spec(stop_applications/1 :: ([atom()]) -> 'ok').
-spec(unfold/2 :: (fun ((A) -> ({'true', B, A} | 'false')), A) -> {[B], A}).
-spec(ceil/1 :: (number()) -> number()).
-endif.
@ -360,7 +368,9 @@ dirty_foreach_key1(F, TableName, K) ->
end.
dirty_dump_log(FileName) ->
{ok, LH} = disk_log:open([{name, dirty_dump_log}, {mode, read_only}, {file, FileName}]),
{ok, LH} = disk_log:open([{name, dirty_dump_log},
{mode, read_only},
{file, FileName}]),
dirty_dump_log1(LH, disk_log:chunk(LH, start)),
disk_log:close(LH).
@ -374,6 +384,12 @@ dirty_dump_log1(LH, {K, Terms, BadBytes}) ->
dirty_dump_log1(LH, disk_log:chunk(LH, K)).
read_term_file(File) -> file:consult(File).
write_term_file(File, Terms) ->
file:write_file(File, list_to_binary([io_lib:format("~w.~n", [Term]) ||
Term <- Terms])).
append_file(File, Suffix) ->
case file:read_file_info(File) of
{ok, FInfo} -> append_file(File, FInfo#file_info.size, Suffix);
@ -444,3 +460,18 @@ stop_applications(Apps) ->
cannot_stop_application,
Apps).
unfold(Fun, Init) ->
unfold(Fun, [], Init).
unfold(Fun, Acc, Init) ->
case Fun(Init) of
{true, E, I} -> unfold(Fun, [E|Acc], I);
false -> {Acc, Init}
end.
ceil(N) ->
T = trunc(N),
case N - T of
0 -> N;
_ -> 1 + T
end.

View File

@ -149,6 +149,11 @@ table_definitions() ->
table_names() ->
[Tab || {Tab, _} <- table_definitions()].
replicated_table_names() ->
[Tab || {Tab, Attrs} <- table_definitions(),
not lists:member({local_content, true}, Attrs)
].
dir() -> mnesia:system_info(directory).
ensure_mnesia_dir() ->
@ -192,28 +197,16 @@ cluster_nodes_config_filename() ->
create_cluster_nodes_config(ClusterNodes) ->
FileName = cluster_nodes_config_filename(),
Handle = case file:open(FileName, [write]) of
{ok, Device} -> Device;
{error, Reason} ->
throw({error, {cannot_create_cluster_nodes_config,
FileName, Reason}})
end,
try
ok = io:write(Handle, ClusterNodes),
ok = io:put_chars(Handle, [$.])
after
case file:close(Handle) of
ok -> ok;
{error, Reason1} ->
throw({error, {cannot_close_cluster_nodes_config,
FileName, Reason1}})
end
end,
ok.
case rabbit_misc:write_term_file(FileName, [ClusterNodes]) of
ok -> ok;
{error, Reason} ->
throw({error, {cannot_create_cluster_nodes_config,
FileName, Reason}})
end.
read_cluster_nodes_config() ->
FileName = cluster_nodes_config_filename(),
case file:consult(FileName) of
case rabbit_misc:read_term_file(FileName) of
{ok, [ClusterNodes]} -> ClusterNodes;
{error, enoent} ->
case application:get_env(cluster_config) of
@ -250,12 +243,10 @@ delete_cluster_nodes_config() ->
%% standalone disk node, or disk or ram node connected to the
%% specified cluster nodes.
init_db(ClusterNodes) ->
WasDiskNode = mnesia:system_info(use_dir),
IsDiskNode = ClusterNodes == [] orelse
lists:member(node(), ClusterNodes),
case mnesia:change_config(extra_db_nodes, ClusterNodes -- [node()]) of
{ok, []} ->
if WasDiskNode and IsDiskNode ->
case mnesia:system_info(use_dir) of
true ->
case check_schema_integrity() of
ok ->
ok;
@ -270,22 +261,18 @@ init_db(ClusterNodes) ->
ok = move_db(),
ok = create_schema()
end;
WasDiskNode ->
throw({error, {cannot_convert_disk_node_to_ram_node,
ClusterNodes}});
IsDiskNode ->
ok = create_schema();
true ->
throw({error, {unable_to_contact_cluster_nodes,
ClusterNodes}})
false ->
ok = create_schema()
end;
{ok, [_|_]} ->
ok = wait_for_tables(),
ok = create_local_table_copies(
case IsDiskNode of
true -> disc;
false -> ram
end);
IsDiskNode = ClusterNodes == [] orelse
lists:member(node(), ClusterNodes),
ok = wait_for_replicated_tables(),
ok = create_local_table_copy(schema, disc_copies),
ok = create_local_table_copies(case IsDiskNode of
true -> disc;
false -> ram
end);
{error, Reason} ->
%% one reason we may end up here is if we try to join
%% nodes together that are currently running standalone or
@ -336,40 +323,36 @@ create_tables() ->
table_definitions()),
ok.
table_has_copy_type(TabDef, DiscType) ->
lists:member(node(), proplists:get_value(DiscType, TabDef, [])).
create_local_table_copies(Type) ->
ok = if Type /= ram -> create_local_table_copy(schema, disc_copies);
true -> ok
end,
lists:foreach(
fun({Tab, TabDef}) ->
HasDiscCopies =
lists:keymember(disc_copies, 1, TabDef),
HasDiscOnlyCopies =
lists:keymember(disc_only_copies, 1, TabDef),
HasDiscCopies = table_has_copy_type(TabDef, disc_copies),
HasDiscOnlyCopies = table_has_copy_type(TabDef, disc_only_copies),
LocalTab = proplists:get_bool(local_content, TabDef),
StorageType =
case Type of
disc ->
if
Type =:= disc orelse LocalTab ->
if
HasDiscCopies -> disc_copies;
HasDiscCopies -> disc_copies;
HasDiscOnlyCopies -> disc_only_copies;
true -> ram_copies
true -> ram_copies
end;
%% unused code - commented out to keep dialyzer happy
%% disc_only ->
%% Type =:= disc_only ->
%% if
%% HasDiscCopies or HasDiscOnlyCopies ->
%% disc_only_copies;
%% true -> ram_copies
%% end;
ram ->
Type =:= ram ->
ram_copies
end,
ok = create_local_table_copy(Tab, StorageType)
end,
table_definitions()),
ok = if Type == ram -> create_local_table_copy(schema, ram_copies);
true -> ok
end,
ok.
create_local_table_copy(Tab, Type) ->
@ -384,10 +367,14 @@ create_local_table_copy(Tab, Type) ->
end,
ok.
wait_for_tables() ->
wait_for_replicated_tables() -> wait_for_tables(replicated_table_names()).
wait_for_tables() -> wait_for_tables(table_names()).
wait_for_tables(TableNames) ->
case check_schema_integrity() of
ok ->
case mnesia:wait_for_tables(table_names(), 30000) of
case mnesia:wait_for_tables(TableNames, 30000) of
ok -> ok;
{timeout, BadTabs} ->
throw({error, {timeout_waiting_for_tables, BadTabs}});

View File

@ -114,12 +114,13 @@ action(status, [], RpcTimeout) ->
io:format("Status of all running nodes...~n", []),
call_all_nodes(
fun({Node, Pid}) ->
Status = rpc:call(Node, rabbit, status, [], RpcTimeout),
RabbitRunning =
case is_rabbit_running(Node, RpcTimeout) of
false -> not_running;
true -> running
end,
io:format("Node '~p' with Pid ~p: ~p~n",
[Node, Pid, case parse_status(Status) of
false -> not_running;
true -> running
end])
[Node, Pid, RabbitRunning])
end);
action(stop_all, [], RpcTimeout) ->
@ -197,7 +198,7 @@ start_node(NodeName, NodePort, RpcTimeout) ->
wait_for_rabbit_to_start(_ , RpcTimeout, _) when RpcTimeout < 0 ->
false;
wait_for_rabbit_to_start(Node, RpcTimeout, Port) ->
case parse_status(rpc:call(Node, rabbit, status, [])) of
case is_rabbit_running(Node, RpcTimeout) of
true -> true;
false -> receive
{'EXIT', Port, PosixCode} ->
@ -211,22 +212,20 @@ wait_for_rabbit_to_start(Node, RpcTimeout, Port) ->
run_cmd(FullPath) ->
erlang:open_port({spawn, FullPath}, [nouse_stdio]).
parse_status({badrpc, _}) ->
false;
parse_status(Status) ->
case lists:keysearch(running_applications, 1, Status) of
{value, {running_applications, Apps}} ->
lists:keymember(rabbit, 1, Apps);
_ ->
false
is_rabbit_running(Node, RpcTimeout) ->
case rpc:call(Node, rabbit, status, [], RpcTimeout) of
{badrpc, _} -> false;
Status -> case proplists:get_value(running_applications, Status) of
undefined -> false;
Apps -> lists:keymember(rabbit, 1, Apps)
end
end.
with_os(Handlers) ->
{OsFamily, _} = os:type(),
case lists:keysearch(OsFamily, 1, Handlers) of
{value, {_, Handler}} -> Handler();
false -> throw({unsupported_os, OsFamily})
case proplists:get_value(OsFamily, Handlers) of
undefined -> throw({unsupported_os, OsFamily});
Handler -> Handler()
end.
script_filename() ->

132
src/rabbit_net.erl Normal file
View File

@ -0,0 +1,132 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License at
%% http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
%% License for the specific language governing rights and limitations
%% under the License.
%%
%% The Original Code is RabbitMQ.
%%
%% The Initial Developers of the Original Code are LShift Ltd,
%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
%%
%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
%% Technologies LLC, and Rabbit Technologies Ltd.
%%
%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
%% Ltd. Portions created by Cohesive Financial Technologies LLC are
%% Copyright (C) 2007-2009 Cohesive Financial Technologies
%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
%% (C) 2007-2009 Rabbit Technologies Ltd.
%%
%% All Rights Reserved.
%%
%% Contributor(s): ______________________________________.
%%
-module(rabbit_net).
-include("rabbit.hrl").
-include_lib("kernel/include/inet.hrl").
-export([async_recv/3, close/1, controlling_process/2,
getstat/2, peername/1, port_command/2,
send/2, sockname/1]).
%%---------------------------------------------------------------------------
-ifdef(use_specs).
-type(stat_option() ::
'recv_cnt' | 'recv_max' | 'recv_avg' | 'recv_oct' | 'recv_dvi' |
'send_cnt' | 'send_max' | 'send_avg' | 'send_oct' | 'send_pend').
-type(error() :: {'error', any()}).
-spec(async_recv/3 :: (socket(), integer(), timeout()) -> {'ok', any()}).
-spec(close/1 :: (socket()) -> 'ok' | error()).
-spec(controlling_process/2 :: (socket(), pid()) -> 'ok' | error()).
-spec(port_command/2 :: (socket(), iolist()) -> 'true').
-spec(send/2 :: (socket(), binary() | iolist()) -> 'ok' | error()).
-spec(peername/1 :: (socket()) ->
{'ok', {ip_address(), non_neg_integer()}} | error()).
-spec(sockname/1 :: (socket()) ->
{'ok', {ip_address(), non_neg_integer()}} | error()).
-spec(getstat/2 :: (socket(), [stat_option()]) ->
{'ok', [{stat_option(), integer()}]} | error()).
-endif.
%%---------------------------------------------------------------------------
async_recv(Sock, Length, Timeout) when is_record(Sock, ssl_socket) ->
Pid = self(),
Ref = make_ref(),
spawn(fun() -> Pid ! {inet_async, Sock, Ref,
ssl:recv(Sock#ssl_socket.ssl, Length, Timeout)}
end),
{ok, Ref};
async_recv(Sock, Length, infinity) when is_port(Sock) ->
prim_inet:async_recv(Sock, Length, -1);
async_recv(Sock, Length, Timeout) when is_port(Sock) ->
prim_inet:async_recv(Sock, Length, Timeout).
close(Sock) when is_record(Sock, ssl_socket) ->
ssl:close(Sock#ssl_socket.ssl);
close(Sock) when is_port(Sock) ->
gen_tcp:close(Sock).
controlling_process(Sock, Pid) when is_record(Sock, ssl_socket) ->
ssl:controlling_process(Sock#ssl_socket.ssl, Pid);
controlling_process(Sock, Pid) when is_port(Sock) ->
gen_tcp:controlling_process(Sock, Pid).
getstat(Sock, Stats) when is_record(Sock, ssl_socket) ->
inet:getstat(Sock#ssl_socket.tcp, Stats);
getstat(Sock, Stats) when is_port(Sock) ->
inet:getstat(Sock, Stats).
peername(Sock) when is_record(Sock, ssl_socket) ->
ssl:peername(Sock#ssl_socket.ssl);
peername(Sock) when is_port(Sock) ->
inet:peername(Sock).
port_command(Sock, Data) when is_record(Sock, ssl_socket) ->
case ssl:send(Sock#ssl_socket.ssl, Data) of
ok ->
self() ! {inet_reply, Sock, ok},
true;
{error, Reason} ->
erlang:error(Reason)
end;
port_command(Sock, Data) when is_port(Sock) ->
erlang:port_command(Sock, Data).
send(Sock, Data) when is_record(Sock, ssl_socket) ->
ssl:send(Sock#ssl_socket.ssl, Data);
send(Sock, Data) when is_port(Sock) ->
gen_tcp:send(Sock, Data).
sockname(Sock) when is_record(Sock, ssl_socket) ->
ssl:sockname(Sock#ssl_socket.ssl);
sockname(Sock) when is_port(Sock) ->
inet:sockname(Sock).

View File

@ -31,18 +31,28 @@
-module(rabbit_networking).
-export([start/0, start_tcp_listener/2, stop_tcp_listener/2,
on_node_down/1, active_listeners/0, node_listeners/1,
connections/0, connection_info/1, connection_info/2,
connection_info_all/0, connection_info_all/1]).
-export([start/0, start_tcp_listener/2, start_ssl_listener/3,
stop_tcp_listener/2, on_node_down/1, active_listeners/0,
node_listeners/1, connections/0, connection_info/1,
connection_info/2, connection_info_all/0,
connection_info_all/1]).
%%used by TCP-based transports, e.g. STOMP adapter
-export([check_tcp_listener_address/3]).
-export([tcp_listener_started/2, tcp_listener_stopped/2, start_client/1]).
-export([tcp_listener_started/2, ssl_connection_upgrade/2,
tcp_listener_stopped/2, start_client/1]).
-include("rabbit.hrl").
-include_lib("kernel/include/inet.hrl").
-define(RABBIT_TCP_OPTS, [
binary,
{packet, raw}, % no packaging
{reuseaddr, true}, % allow rebind without waiting
%% {nodelay, true}, % TCP_NODELAY - disable Nagle's alg.
%% {delay_send, true},
{exit_on_close, false}
]).
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@ -52,6 +62,7 @@
-spec(start/0 :: () -> 'ok').
-spec(start_tcp_listener/2 :: (host(), ip_port()) -> 'ok').
-spec(start_ssl_listener/3 :: (host(), ip_port(), [info()]) -> 'ok').
-spec(stop_tcp_listener/2 :: (host(), ip_port()) -> 'ok').
-spec(active_listeners/0 :: () -> [listener()]).
-spec(node_listeners/1 :: (erlang_node()) -> [listener()]).
@ -96,21 +107,24 @@ check_tcp_listener_address(NamePrefix, Host, Port) ->
{IPAddress, Name}.
start_tcp_listener(Host, Port) ->
{IPAddress, Name} = check_tcp_listener_address(rabbit_tcp_listener_sup, Host, Port),
start_listener(Host, Port, "TCP Listener",
{?MODULE, start_client, []}).
start_ssl_listener(Host, Port, SslOpts) ->
start_listener(Host, Port, "SSL Listener",
{?MODULE, ssl_connection_upgrade, [SslOpts]}).
start_listener(Host, Port, Label, OnConnect) ->
{IPAddress, Name} =
check_tcp_listener_address(rabbit_tcp_listener_sup, Host, Port),
{ok,_} = supervisor:start_child(
rabbit_sup,
{Name,
{tcp_listener_sup, start_link,
[IPAddress, Port,
[binary,
{packet, raw}, % no packaging
{reuseaddr, true}, % allow rebind without waiting
%% {nodelay, true}, % TCP_NODELAY - disable Nagle's alg.
%% {delay_send, true},
{exit_on_close, false}],
[IPAddress, Port, ?RABBIT_TCP_OPTS ,
{?MODULE, tcp_listener_started, []},
{?MODULE, tcp_listener_stopped, []},
{?MODULE, start_client, []}]},
OnConnect, Label]},
transient, infinity, supervisor, [tcp_listener_sup]}),
ok.
@ -148,10 +162,27 @@ on_node_down(Node) ->
start_client(Sock) ->
{ok, Child} = supervisor:start_child(rabbit_tcp_client_sup, []),
ok = gen_tcp:controlling_process(Sock, Child),
ok = rabbit_net:controlling_process(Sock, Child),
Child ! {go, Sock},
Child.
ssl_connection_upgrade(SslOpts, Sock) ->
{ok, {PeerAddress, PeerPort}} = rabbit_net:peername(Sock),
PeerIp = inet_parse:ntoa(PeerAddress),
case ssl:ssl_accept(Sock, SslOpts) of
{ok, SslSock} ->
rabbit_log:info("upgraded TCP connection from ~s:~p to SSL~n",
[PeerIp, PeerPort]),
RabbitSslSock = #ssl_socket{tcp = Sock, ssl = SslSock},
start_client(RabbitSslSock);
{error, Reason} ->
gen_tcp:close(Sock),
rabbit_log:error("failed to upgrade TCP connection from ~s:~p "
"to SSL: ~n~p~n", [PeerIp, PeerPort, Reason]),
{error, Reason}
end.
connections() ->
[Pid || {_, Pid, _, _} <- supervisor:which_children(
rabbit_tcp_client_sup)].

View File

@ -68,7 +68,7 @@ start() ->
AppList
end,
AppVersions = [determine_version(App) || App <- AllApps],
{value, {rabbit, RabbitVersion}} = lists:keysearch(rabbit, 1, AppVersions),
{rabbit, RabbitVersion} = proplists:lookup(rabbit, AppVersions),
%% Build the overall release descriptor
RDesc = {release,

View File

@ -200,7 +200,7 @@ inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F).
peername(Sock) ->
try
{Address, Port} = inet_op(fun () -> inet:peername(Sock) end),
{Address, Port} = inet_op(fun () -> rabbit_net:peername(Sock) end),
AddressS = inet_parse:ntoa(Address),
{AddressS, Port}
catch
@ -286,7 +286,7 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
%% since this termination is initiated by our parent it is
%% probably more important to exit quickly.
exit(Reason);
{'EXIT', _Pid, E = {writer, send_failed, _Error}} ->
{channel_exit, _Chan, E = {writer, send_failed, _Error}} ->
throw(E);
{channel_exit, Channel, Reason} ->
mainloop(Parent, Deb, handle_channel_exit(Channel, Reason, State));
@ -323,8 +323,8 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
end.
switch_callback(OldState, NewCallback, Length) ->
Ref = inet_op(fun () -> prim_inet:async_recv(
OldState#v1.sock, Length, -1) end),
Ref = inet_op(fun () -> rabbit_net:async_recv(
OldState#v1.sock, Length, infinity) end),
OldState#v1{callback = NewCallback,
recv_ref = Ref}.
@ -539,7 +539,7 @@ handle_input(handshake, <<"AMQP",1,1,ProtocolMajor,ProtocolMinor>>,
end;
handle_input(handshake, Other, #v1{sock = Sock}) ->
ok = inet_op(fun () -> gen_tcp:send(
ok = inet_op(fun () -> rabbit_net:send(
Sock, <<"AMQP",1,1,
?PROTOCOL_VERSION_MAJOR,
?PROTOCOL_VERSION_MINOR>>) end),
@ -675,23 +675,23 @@ infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
i(pid, #v1{}) ->
self();
i(address, #v1{sock = Sock}) ->
{ok, {A, _}} = inet:sockname(Sock),
{ok, {A, _}} = rabbit_net:sockname(Sock),
A;
i(port, #v1{sock = Sock}) ->
{ok, {_, P}} = inet:sockname(Sock),
{ok, {_, P}} = rabbit_net:sockname(Sock),
P;
i(peer_address, #v1{sock = Sock}) ->
{ok, {A, _}} = inet:peername(Sock),
{ok, {A, _}} = rabbit_net:peername(Sock),
A;
i(peer_port, #v1{sock = Sock}) ->
{ok, {_, P}} = inet:peername(Sock),
{ok, {_, P}} = rabbit_net:peername(Sock),
P;
i(SockStat, #v1{sock = Sock}) when SockStat =:= recv_oct;
SockStat =:= recv_cnt;
SockStat =:= send_oct;
SockStat =:= send_cnt;
SockStat =:= send_pend ->
case inet:getstat(Sock, [SockStat]) of
case rabbit_net:getstat(Sock, [SockStat]) of
{ok, [{SockStat, StatVal}]} -> StatVal;
{error, einval} -> undefined;
{error, Error} -> throw({cannot_get_socket_stats, Error})
@ -703,7 +703,7 @@ i(channels, #v1{}) ->
i(user, #v1{connection = #connection{user = #user{username = Username}}}) ->
Username;
i(user, #v1{connection = #connection{user = none}}) ->
none;
'';
i(vhost, #v1{connection = #connection{vhost = VHost}}) ->
VHost;
i(timeout, #v1{connection = #connection{timeout_sec = Timeout}}) ->

View File

@ -49,6 +49,7 @@ test_content_prop_roundtrip(Datum, Binary) ->
all_tests() ->
passed = test_priority_queue(),
passed = test_unfold(),
passed = test_parsing(),
passed = test_topic_matching(),
passed = test_log_management(),
@ -75,7 +76,8 @@ test_priority_queue() ->
%% 1-element priority Q
Q1 = priority_queue:in(foo, 1, priority_queue:new()),
{true, false, 1, [{1, foo}], [foo]} = test_priority_queue(Q1),
{true, false, 1, [{1, foo}], [foo]} =
test_priority_queue(Q1),
%% 2-element same-priority Q
Q2 = priority_queue:in(bar, 1, Q1),
@ -91,6 +93,71 @@ test_priority_queue() ->
Q4 = priority_queue:in(foo, -1, priority_queue:new()),
{true, false, 1, [{-1, foo}], [foo]} = test_priority_queue(Q4),
%% merge 2 * 1-element no-priority Qs
Q5 = priority_queue:join(priority_queue:in(foo, Q),
priority_queue:in(bar, Q)),
{true, false, 2, [{0, foo}, {0, bar}], [foo, bar]} =
test_priority_queue(Q5),
%% merge 1-element no-priority Q with 1-element priority Q
Q6 = priority_queue:join(priority_queue:in(foo, Q),
priority_queue:in(bar, 1, Q)),
{true, false, 2, [{1, bar}, {0, foo}], [bar, foo]} =
test_priority_queue(Q6),
%% merge 1-element priority Q with 1-element no-priority Q
Q7 = priority_queue:join(priority_queue:in(foo, 1, Q),
priority_queue:in(bar, Q)),
{true, false, 2, [{1, foo}, {0, bar}], [foo, bar]} =
test_priority_queue(Q7),
%% merge 2 * 1-element same-priority Qs
Q8 = priority_queue:join(priority_queue:in(foo, 1, Q),
priority_queue:in(bar, 1, Q)),
{true, false, 2, [{1, foo}, {1, bar}], [foo, bar]} =
test_priority_queue(Q8),
%% merge 2 * 1-element different-priority Qs
Q9 = priority_queue:join(priority_queue:in(foo, 1, Q),
priority_queue:in(bar, 2, Q)),
{true, false, 2, [{2, bar}, {1, foo}], [bar, foo]} =
test_priority_queue(Q9),
%% merge 2 * 1-element different-priority Qs (other way around)
Q10 = priority_queue:join(priority_queue:in(bar, 2, Q),
priority_queue:in(foo, 1, Q)),
{true, false, 2, [{2, bar}, {1, foo}], [bar, foo]} =
test_priority_queue(Q10),
%% merge 2 * 2-element multi-different-priority Qs
Q11 = priority_queue:join(Q6, Q5),
{true, false, 4, [{1, bar}, {0, foo}, {0, foo}, {0, bar}],
[bar, foo, foo, bar]} = test_priority_queue(Q11),
%% and the other way around
Q12 = priority_queue:join(Q5, Q6),
{true, false, 4, [{1, bar}, {0, foo}, {0, bar}, {0, foo}],
[bar, foo, bar, foo]} = test_priority_queue(Q12),
%% merge with negative priorities
Q13 = priority_queue:join(Q4, Q5),
{true, false, 3, [{0, foo}, {0, bar}, {-1, foo}], [foo, bar, foo]} =
test_priority_queue(Q13),
%% and the other way around
Q14 = priority_queue:join(Q5, Q4),
{true, false, 3, [{0, foo}, {0, bar}, {-1, foo}], [foo, bar, foo]} =
test_priority_queue(Q14),
%% joins with empty queues:
Q1 = priority_queue:join(Q, Q1),
Q1 = priority_queue:join(Q1, Q),
%% insert with priority into non-empty zero-priority queue
Q15 = priority_queue:in(baz, 1, Q5),
{true, false, 3, [{1, baz}, {0, foo}, {0, bar}], [baz, foo, bar]} =
test_priority_queue(Q15),
passed.
priority_queue_in_all(Q, L) ->
@ -116,6 +183,14 @@ test_simple_n_element_queue(N) ->
{true, false, N, ToListRes, Items} = test_priority_queue(Q),
passed.
test_unfold() ->
{[], test} = rabbit_misc:unfold(fun (_V) -> false end, test),
List = lists:seq(2,20,2),
{List, 0} = rabbit_misc:unfold(fun (0) -> false;
(N) -> {true, N*2, N-1}
end, 10),
passed.
test_parsing() ->
passed = test_content_properties(),
passed.
@ -408,19 +483,17 @@ test_cluster_management() ->
end,
ClusteringSequence),
%% attempt to convert a disk node into a ram node
%% convert a disk node into a ram node
ok = control_action(reset, []),
ok = control_action(start_app, []),
ok = control_action(stop_app, []),
{error, {cannot_convert_disk_node_to_ram_node, _}} =
control_action(cluster, ["invalid1@invalid",
"invalid2@invalid"]),
ok = control_action(cluster, ["invalid1@invalid",
"invalid2@invalid"]),
%% attempt to join a non-existing cluster as a ram node
%% join a non-existing cluster as a ram node
ok = control_action(reset, []),
{error, {unable_to_contact_cluster_nodes, _}} =
control_action(cluster, ["invalid1@invalid",
"invalid2@invalid"]),
ok = control_action(cluster, ["invalid1@invalid",
"invalid2@invalid"]),
SecondaryNode = rabbit_misc:localnode(hare),
case net_adm:ping(SecondaryNode) of
@ -436,11 +509,12 @@ test_cluster_management2(SecondaryNode) ->
NodeS = atom_to_list(node()),
SecondaryNodeS = atom_to_list(SecondaryNode),
%% attempt to convert a disk node into a ram node
%% make a disk node
ok = control_action(reset, []),
ok = control_action(cluster, [NodeS]),
{error, {unable_to_join_cluster, _, _}} =
control_action(cluster, [SecondaryNodeS]),
%% make a ram node
ok = control_action(reset, []),
ok = control_action(cluster, [SecondaryNodeS]),
%% join cluster as a ram node
ok = control_action(reset, []),
@ -453,21 +527,21 @@ test_cluster_management2(SecondaryNode) ->
ok = control_action(start_app, []),
ok = control_action(stop_app, []),
%% attempt to join non-existing cluster as a ram node
{error, _} = control_action(cluster, ["invalid1@invalid",
"invalid2@invalid"]),
%% join non-existing cluster as a ram node
ok = control_action(cluster, ["invalid1@invalid",
"invalid2@invalid"]),
%% turn ram node into disk node
ok = control_action(reset, []),
ok = control_action(cluster, [SecondaryNodeS, NodeS]),
ok = control_action(start_app, []),
ok = control_action(stop_app, []),
%% attempt to convert a disk node into a ram node
{error, {cannot_convert_disk_node_to_ram_node, _}} =
control_action(cluster, ["invalid1@invalid",
"invalid2@invalid"]),
%% convert a disk node into a ram node
ok = control_action(cluster, ["invalid1@invalid",
"invalid2@invalid"]),
%% turn a disk node into a ram node
ok = control_action(reset, []),
ok = control_action(cluster, [SecondaryNodeS]),
ok = control_action(start_app, []),
ok = control_action(stop_app, []),

View File

@ -169,7 +169,7 @@ assemble_frames(Channel, MethodRecord, Content, FrameMax) ->
tcp_send(Sock, Data) ->
rabbit_misc:throw_on_error(inet_error,
fun () -> gen_tcp:send(Sock, Data) end).
fun () -> rabbit_net:send(Sock, Data) end).
internal_send_command(Sock, Channel, MethodRecord) ->
ok = tcp_send(Sock, assemble_frames(Channel, MethodRecord)).
@ -206,6 +206,6 @@ internal_send_command_async(Sock, Channel, MethodRecord, Content, FrameMax) ->
ok.
port_cmd(Sock, Data) ->
try erlang:port_command(Sock, Data)
try rabbit_net:port_command(Sock, Data)
catch error:Error -> exit({writer, send_failed, Error})
end.

View File

@ -33,28 +33,28 @@
-behaviour(gen_server).
-export([start_link/7]).
-export([start_link/8]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-record(state, {sock, on_startup, on_shutdown}).
-record(state, {sock, on_startup, on_shutdown, label}).
%%--------------------------------------------------------------------
start_link(IPAddress, Port, SocketOpts,
ConcurrentAcceptorCount, AcceptorSup,
OnStartup, OnShutdown) ->
OnStartup, OnShutdown, Label) ->
gen_server:start_link(
?MODULE, {IPAddress, Port, SocketOpts,
ConcurrentAcceptorCount, AcceptorSup,
OnStartup, OnShutdown}, []).
OnStartup, OnShutdown, Label}, []).
%%--------------------------------------------------------------------
init({IPAddress, Port, SocketOpts,
ConcurrentAcceptorCount, AcceptorSup,
{M,F,A} = OnStartup, OnShutdown}) ->
{M,F,A} = OnStartup, OnShutdown, Label}) ->
process_flag(trap_exit, true),
case gen_tcp:listen(Port, SocketOpts ++ [{ip, IPAddress},
{active, false}]) of
@ -65,15 +65,16 @@ init({IPAddress, Port, SocketOpts,
end,
lists:duplicate(ConcurrentAcceptorCount, dummy)),
{ok, {LIPAddress, LPort}} = inet:sockname(LSock),
error_logger:info_msg("started TCP listener on ~s:~p~n",
[inet_parse:ntoa(LIPAddress), LPort]),
error_logger:info_msg("started ~s on ~s:~p~n",
[Label, inet_parse:ntoa(LIPAddress), LPort]),
apply(M, F, A ++ [IPAddress, Port]),
{ok, #state{sock=LSock,
on_startup = OnStartup, on_shutdown = OnShutdown}};
{ok, #state{sock = LSock,
on_startup = OnStartup, on_shutdown = OnShutdown,
label = Label}};
{error, Reason} ->
error_logger:error_msg(
"failed to start TCP listener on ~s:~p - ~p~n",
[inet_parse:ntoa(IPAddress), Port, Reason]),
"failed to start ~s on ~s:~p - ~p~n",
[Label, inet_parse:ntoa(IPAddress), Port, Reason]),
{stop, {cannot_listen, IPAddress, Port, Reason}}
end.
@ -86,11 +87,11 @@ handle_cast(_Msg, State) ->
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, #state{sock=LSock, on_shutdown = {M,F,A}}) ->
terminate(_Reason, #state{sock=LSock, on_shutdown = {M,F,A}, label=Label}) ->
{ok, {IPAddress, Port}} = inet:sockname(LSock),
gen_tcp:close(LSock),
error_logger:info_msg("stopped TCP listener on ~s:~p~n",
[inet_parse:ntoa(IPAddress), Port]),
error_logger:info_msg("stopped ~s on ~s:~p~n",
[Label, inet_parse:ntoa(IPAddress), Port]),
apply(M, F, A ++ [IPAddress, Port]).
code_change(_OldVsn, State, _Extra) ->

View File

@ -33,23 +33,23 @@
-behaviour(supervisor).
-export([start_link/6, start_link/7]).
-export([start_link/7, start_link/8]).
-export([init/1]).
start_link(IPAddress, Port, SocketOpts, OnStartup, OnShutdown,
AcceptCallback) ->
AcceptCallback, Label) ->
start_link(IPAddress, Port, SocketOpts, OnStartup, OnShutdown,
AcceptCallback, 1).
AcceptCallback, 1, Label).
start_link(IPAddress, Port, SocketOpts, OnStartup, OnShutdown,
AcceptCallback, ConcurrentAcceptorCount) ->
AcceptCallback, ConcurrentAcceptorCount, Label) ->
supervisor:start_link(
?MODULE, {IPAddress, Port, SocketOpts, OnStartup, OnShutdown,
AcceptCallback, ConcurrentAcceptorCount}).
AcceptCallback, ConcurrentAcceptorCount, Label}).
init({IPAddress, Port, SocketOpts, OnStartup, OnShutdown,
AcceptCallback, ConcurrentAcceptorCount}) ->
AcceptCallback, ConcurrentAcceptorCount, Label}) ->
%% This is gross. The tcp_listener needs to know about the
%% tcp_acceptor_sup, and the only way I can think of accomplishing
%% that without jumping through hoops is to register the
@ -62,5 +62,5 @@ init({IPAddress, Port, SocketOpts, OnStartup, OnShutdown,
{tcp_listener, {tcp_listener, start_link,
[IPAddress, Port, SocketOpts,
ConcurrentAcceptorCount, Name,
OnStartup, OnShutdown]},
OnStartup, OnShutdown, Label]},
transient, 100, worker, [tcp_listener]}]}}.