diff --git a/.hgignore b/.hgignore index 3323e2fc59..d77d4e5de4 100644 --- a/.hgignore +++ b/.hgignore @@ -1,18 +1,17 @@ syntax: glob *.beam *~ +*.swp +*.patch erl_crash.dump syntax: regexp ^cover/ ^dist/ -^include/rabbit_framing.hrl$ -^src/rabbit_framing.erl$ -^rabbit.plt$ -^ebin/rabbit.app$ -^ebin/rabbit.rel$ -^ebin/rabbit.boot$ -^ebin/rabbit.script$ +^include/rabbit_framing\.hrl$ +^src/rabbit_framing\.erl$ +^rabbit\.plt$ +^ebin/rabbit\.(app|rel|boot|script)$ ^plugins/ ^priv/plugins/ diff --git a/Makefile b/Makefile index 5c7f629327..f070275681 100644 --- a/Makefile +++ b/Makefile @@ -20,10 +20,10 @@ PYTHON=python ifndef USE_SPECS # our type specs rely on features / bug fixes in dialyzer that are -# only available in R12B-3 upwards +# only available in R13B upwards (R13B is eshell 5.7.1) # # NB: the test assumes that version number will only contain single digits -USE_SPECS=$(shell if [ $$(erl -noshell -eval 'io:format(erlang:system_info(version)), halt().') \> "5.6.2" ]; then echo "true"; else echo "false"; fi) +USE_SPECS=$(shell if [ $$(erl -noshell -eval 'io:format(erlang:system_info(version)), halt().') \> "5.7.0" ]; then echo "true"; else echo "false"; fi) endif #other args: +native +"{hipe,[o3,verbose]}" -Ddebug=true +debug_info +no_strict_record_tests @@ -39,9 +39,6 @@ AMQP_SPEC_JSON_PATH=$(AMQP_CODEGEN_DIR)/amqp-0.8.json ERL_CALL=erl_call -sname $(RABBITMQ_NODENAME) -e -# for the moment we don't use boot files because they introduce a -# dependency on particular versions of OTP applications -#all: $(EBIN_DIR)/rabbit.boot all: $(TARGETS) $(EBIN_DIR)/rabbit.app: $(EBIN_DIR)/rabbit_app.in $(BEAM_TARGETS) generate_app @@ -101,7 +98,8 @@ run-tests: all start-background-node: $(BASIC_SCRIPT_ENVIRONMENT_SETTINGS) \ RABBITMQ_NODE_ONLY=true \ - ./scripts/rabbitmq-server -detached; sleep 1 + RABBITMQ_SERVER_START_ARGS="$(RABBITMQ_SERVER_START_ARGS) -detached" \ + ./scripts/rabbitmq-server ; sleep 1 start-rabbit-on-node: all echo "rabbit:start()." | $(ERL_CALL) @@ -115,8 +113,11 @@ force-snapshot: all stop-node: -$(ERL_CALL) -q +# code coverage will be created for subdirectory "ebin" of COVER_DIR +COVER_DIR=. + start-cover: all - echo "cover:start(), rabbit_misc:enable_cover()." | $(ERL_CALL) + echo "cover:start(), rabbit_misc:enable_cover([\"$(COVER_DIR)\"])." | $(ERL_CALL) stop-cover: all echo "rabbit_misc:report_cover(), cover:stop()." | $(ERL_CALL) @@ -136,7 +137,7 @@ srcdist: distclean sed -i.save 's/%%VSN%%/$(VERSION)/' $(TARGET_SRC_DIR)/ebin/rabbit_app.in && rm -f $(TARGET_SRC_DIR)/ebin/rabbit_app.in.save cp -r $(AMQP_CODEGEN_DIR)/* $(TARGET_SRC_DIR)/codegen/ - cp codegen.py Makefile generate_app $(TARGET_SRC_DIR) + cp codegen.py Makefile generate_app calculate-relative $(TARGET_SRC_DIR) cp -r scripts $(TARGET_SRC_DIR) cp -r docs $(TARGET_SRC_DIR) @@ -147,7 +148,7 @@ srcdist: distclean rm -rf $(TARGET_SRC_DIR) distclean: clean - make -C $(AMQP_CODEGEN_DIR) distclean + $(MAKE) -C $(AMQP_CODEGEN_DIR) distclean rm -rf dist find . -regex '.*\(~\|#\|\.swp\|\.dump\)' -exec rm {} \; @@ -162,7 +163,8 @@ distclean: clean docs_all: $(MANPAGES) -install: all docs_all +install: SCRIPTS_REL_PATH=$(shell ./calculate-relative $(TARGET_DIR)/sbin $(SBIN_DIR)) +install: all docs_all install_dirs @[ -n "$(TARGET_DIR)" ] || (echo "Please set TARGET_DIR."; false) @[ -n "$(SBIN_DIR)" ] || (echo "Please set SBIN_DIR."; false) @[ -n "$(MAN_DIR)" ] || (echo "Please set MAN_DIR."; false) @@ -171,13 +173,17 @@ install: all docs_all cp -r ebin include LICENSE LICENSE-MPL-RabbitMQ INSTALL $(TARGET_DIR) chmod 0755 scripts/* - mkdir -p $(SBIN_DIR) - cp scripts/rabbitmq-server $(SBIN_DIR) - cp scripts/rabbitmqctl $(SBIN_DIR) - cp scripts/rabbitmq-multi $(SBIN_DIR) + for script in rabbitmq-env rabbitmq-server rabbitmqctl rabbitmq-multi rabbitmq-activate-plugins; do \ + cp scripts/$$script $(TARGET_DIR)/sbin; \ + [ -e $(SBIN_DIR)/$$script ] || ln -s $(SCRIPTS_REL_PATH)/$$script $(SBIN_DIR)/$$script; \ + done for section in 1 5; do \ mkdir -p $(MAN_DIR)/man$$section; \ for manpage in docs/*.$$section.pod; do \ cp docs/`basename $$manpage .pod`.gz $(MAN_DIR)/man$$section; \ done; \ done + +install_dirs: + mkdir -p $(SBIN_DIR) + mkdir -p $(TARGET_DIR)/sbin diff --git a/calculate-relative b/calculate-relative new file mode 100755 index 0000000000..3af18e8ff8 --- /dev/null +++ b/calculate-relative @@ -0,0 +1,45 @@ +#!/usr/bin/env python +# +# relpath.py +# R.Barran 30/08/2004 +# Retrieved from http://code.activestate.com/recipes/302594/ + +import os +import sys + +def relpath(target, base=os.curdir): + """ + Return a relative path to the target from either the current dir or an optional base dir. + Base can be a directory specified either as absolute or relative to current dir. + """ + + if not os.path.exists(target): + raise OSError, 'Target does not exist: '+target + + if not os.path.isdir(base): + raise OSError, 'Base is not a directory or does not exist: '+base + + base_list = (os.path.abspath(base)).split(os.sep) + target_list = (os.path.abspath(target)).split(os.sep) + + # On the windows platform the target may be on a completely different drive from the base. + if os.name in ['nt','dos','os2'] and base_list[0] <> target_list[0]: + raise OSError, 'Target is on a different drive to base. Target: '+target_list[0].upper()+', base: '+base_list[0].upper() + + # Starting from the filepath root, work out how much of the filepath is + # shared by base and target. + for i in range(min(len(base_list), len(target_list))): + if base_list[i] <> target_list[i]: break + else: + # If we broke out of the loop, i is pointing to the first differing path elements. + # If we didn't break out of the loop, i is pointing to identical path elements. + # Increment i so that in all cases it points to the first differing path elements. + i+=1 + + rel_list = [os.pardir] * (len(base_list)-i) + target_list[i:] + if (len(rel_list) == 0): + return "." + return os.path.join(*rel_list) + +if __name__ == "__main__": + print(relpath(sys.argv[1], sys.argv[2])) diff --git a/codegen.py b/codegen.py index 84741ea28d..533192c507 100644 --- a/codegen.py +++ b/codegen.py @@ -117,6 +117,13 @@ def genErl(spec): def genMethodHasContent(m): print "method_has_content(%s) -> %s;" % (m.erlangName(), str(m.hasContent).lower()) + + def genMethodIsSynchronous(m): + hasNoWait = "nowait" in fieldNameList(m.arguments) + if m.isSynchronous and hasNoWait: + print "is_method_synchronous(#%s{nowait = NoWait}) -> not(NoWait);" % (m.erlangName()) + else: + print "is_method_synchronous(#%s{}) -> %s;" % (m.erlangName(), str(m.isSynchronous).lower()) def genMethodFieldTypes(m): """Not currently used - may be useful in future?""" @@ -246,6 +253,7 @@ def genErl(spec): -export([method_id/1]). -export([method_has_content/1]). +-export([is_method_synchronous/1]). -export([method_fieldnames/1]). -export([decode_method_fields/2]). -export([decode_properties/2]). @@ -266,6 +274,9 @@ bitvalue(undefined) -> 0. for m in methods: genMethodHasContent(m) print "method_has_content(Name) -> exit({unknown_method_name, Name})." + for m in methods: genMethodIsSynchronous(m) + print "is_method_synchronous(Name) -> exit({unknown_method_name, Name})." + for m in methods: genMethodFieldNames(m) print "method_fieldnames(Name) -> exit({unknown_method_name, Name})." diff --git a/docs/rabbitmq-activate-plugins.1.pod b/docs/rabbitmq-activate-plugins.1.pod new file mode 100644 index 0000000000..58ffea79d7 --- /dev/null +++ b/docs/rabbitmq-activate-plugins.1.pod @@ -0,0 +1,37 @@ +=head1 NAME + +rabbitmq-activate-plugins - command line tool for activating plugins +in a RabbitMQ broker + +=head1 SYNOPSIS + +rabbitmq-activate-plugins + +=head1 DESCRIPTION + +RabbitMQ is an implementation of AMQP, the emerging standard for high +performance enterprise messaging. The RabbitMQ server is a robust and +scalable implementation of an AMQP broker. + +rabbitmq-activate-plugins is a command line tool for activating +plugins installed into the broker's plugins directory. + +=head1 EXAMPLES + +To activate all of the installed plugins in the current RabbitMQ install, +execute: + + rabbitmq-activate-plugins + +=head1 SEE ALSO + +L, L, L, +L + +=head1 AUTHOR + +The RabbitMQ Team + +=head1 REFERENCES + +RabbitMQ Web Site: L diff --git a/docs/rabbitmq-multi.1.pod b/docs/rabbitmq-multi.1.pod index 23fd96ed65..640609eef9 100644 --- a/docs/rabbitmq-multi.1.pod +++ b/docs/rabbitmq-multi.1.pod @@ -15,22 +15,30 @@ scalable implementation of an AMQP broker. rabbitmq-multi scripts allows for easy set-up of a cluster on a single machine. -See also rabbitmq-server(1) for configuration information. +See also L for configuration information. =head1 COMMANDS -start_all I - start count nodes with unique names, listening on all IP addresses - and on sequential ports starting from 5672. +=over -status - print the status of all running RabbitMQ nodes +=item start_all I -stop_all - stop all local RabbitMQ nodes +Start count nodes with unique names, listening on all IP addresses and +on sequential ports starting from 5672. -rotate_logs - rotate log files for all local and running RabbitMQ nodes +=item status + +Print the status of all running RabbitMQ nodes. + +=item stop_all + +Stop all local RabbitMQ nodes, + +=item rotate_logs + +Rotate log files for all local and running RabbitMQ nodes. + +=back =head1 EXAMPLES @@ -40,7 +48,7 @@ Start 3 local RabbitMQ nodes with unique, sequential port numbers: =head1 SEE ALSO -rabbitmq.conf(5), rabbitmq-server(1), rabbitmqctl(1) +L, L, L =head1 AUTHOR @@ -48,4 +56,4 @@ The RabbitMQ Team =head1 REFERENCES -RabbitMQ Web Site: http://www.rabbitmq.com +RabbitMQ Web Site: L diff --git a/docs/rabbitmq-server.1.pod b/docs/rabbitmq-server.1.pod index 99a7ceccf3..d74ab8d94f 100644 --- a/docs/rabbitmq-server.1.pod +++ b/docs/rabbitmq-server.1.pod @@ -16,43 +16,57 @@ Running rabbitmq-server in the foreground displays a banner message, and reports on progress in the startup sequence, concluding with the message "broker running", indicating that the RabbitMQ broker has been started successfully. To shut down the server, just terminate the -process or use rabbitmqctl(1). +process or use L. =head1 ENVIRONMENT -B - Defaults to /var/lib/rabbitmq/mnesia. Set this to the directory - where Mnesia database files should be placed. +=over -B - Defaults to /var/log/rabbitmq. Log files generated by the server - will be placed in this directory. +=item B -B - Defaults to rabbit. This can be useful if you want to run more - than one node per machine - B should be unique - per erlang-node-and-machine combination. See clustering on a - single machine guide at - http://www.rabbitmq.com/clustering.html#single-machine for - details. +Defaults to F. Set this to the directory where +Mnesia database files should be placed. -B - Defaults to 0.0.0.0. This can be changed if you only want to bind - to one network interface. +=item B -B - Defaults to 5672. +Defaults to F. Log files generated by the server will +be placed in this directory. -B - Defaults to /etc/rabbitmq/rabbitmq_cluster.config. If this file is - present it is used by the server to auto-configure a RabbitMQ - cluster. - See the clustering guide at http://www.rabbitmq.com/clustering.html - for details. +=item B + +Defaults to rabbit. This can be useful if you want to run more than +one node per machine - B should be unique per +erlang-node-and-machine combination. See clustering on a single +machine guide at +L for details. + +=item B + +Defaults to 0.0.0.0. This can be changed if you only want to bind to +one network interface. + +=item B + +Defaults to 5672. + +=item B + +Defaults to F. If this file is +present it is used by the server to auto-configure a RabbitMQ cluster. +See the clustering guide at L +for details. + +=back =head1 OPTIONS -B<-detached> start the server process in the background +=over + +=item B<-detached> + +start the server process in the background + +=back =head1 EXAMPLES @@ -62,7 +76,7 @@ Run RabbitMQ AMQP server in the background: =head1 SEE ALSO -rabbitmq.conf(5), rabbitmq-multi(1), rabbitmqctl(1) +L, L, L =head1 AUTHOR @@ -70,4 +84,5 @@ The RabbitMQ Team =head1 REFERENCES -RabbitMQ Web Site: http://www.rabbitmq.com +RabbitMQ Web Site: L + diff --git a/docs/rabbitmq.conf.5.pod b/docs/rabbitmq.conf.5.pod index 9b2536c383..a7bf4c0942 100644 --- a/docs/rabbitmq.conf.5.pod +++ b/docs/rabbitmq.conf.5.pod @@ -1,10 +1,11 @@ =head1 NAME -/etc/rabbitmq/rabbitmq.conf - default settings for RabbitMQ AMQP server +F - default settings for RabbitMQ AMQP +server =head1 DESCRIPTION -/etc/rabbitmq/rabbitmq.conf contains variable settings that override the +F contains variable settings that override the defaults built in to the RabbitMQ startup scripts. The file is interpreted by the system shell, and so should consist of @@ -13,27 +14,35 @@ syntax is permitted (since the file is sourced using the shell "." operator), including line comments starting with "#". In order of preference, the startup scripts get their values from the -environment, from /etc/rabbitmq/rabbitmq.conf and finally from the -built-in default values. For example, for the B setting, +environment, from F and finally from the +built-in default values. For example, for the B +setting, -B - from the environment is checked first. If it is absent or equal to - the empty string, then +=over -B - from /etc/rabbitmq/rabbitmq.conf is checked. If it is also absent - or set equal to the empty string then the default value from - the startup script is used. +=item B + +from the environment is checked first. If it is absent or equal to the +empty string, then + +=item B + +from L is checked. If it is also absent +or set equal to the empty string then the default value from the +startup script is used. The variable names in /etc/rabbitmq/rabbitmq.conf are always equal to the environment variable names, with the B prefix removed: B from the environment becomes B in the -/etc/rabbitmq/rabbitmq.conf file, etc. +F file, etc. + +=back =head1 EXAMPLES -The following is an example of a complete /etc/rabbitmq/rabbitmq.conf file -that overrides the default Erlang node name from "rabbit" to "hare": +The following is an example of a complete +F file that overrides the default Erlang +node name from "rabbit" to "hare": # I am a complete /etc/rabbitmq/rabbitmq.conf file. # Comment lines start with a hash character. @@ -42,7 +51,7 @@ that overrides the default Erlang node name from "rabbit" to "hare": =head1 SEE ALSO -rabbitmq-server(1), rabbitmq-multi(1), rabbitmqctl(1) +L, L, L =head1 AUTHOR @@ -57,4 +66,4 @@ info@rabbitmq.com. =head1 REFERENCES -RabbitMQ Web Site: http://www.rabbitmq.com +RabbitMQ Web Site: L diff --git a/docs/rabbitmqctl.1.pod b/docs/rabbitmqctl.1.pod index 421568960e..c43ed2ea25 100644 --- a/docs/rabbitmqctl.1.pod +++ b/docs/rabbitmqctl.1.pod @@ -18,269 +18,388 @@ It performs all actions by connecting to one of the broker's nodes. =head1 OPTIONS -B<-n> I - default node is C, where server is the local host. - On a host named C, the node name of the - RabbitMQ Erlang node will usually be rabbit@server (unless - RABBITMQ_NODENAME has been set to some non-default value at broker - startup time). The output of hostname -s is usually the correct - suffix to use after the "@" sign. See rabbitmq-server(1) for - details of configuring the RabbitMQ broker. +=over -B<-q> - quiet output mode is selected with the B<-q> flag. Informational - messages are suppressed when quiet mode is in effect. +=item B<-n> I + +Default node is C, where server is the local host. On +a host named C, the node name of the RabbitMQ +Erlang node will usually be rabbit@server (unless RABBITMQ_NODENAME +has been set to some non-default value at broker startup time). The +output of hostname -s is usually the correct suffix to use after the +"@" sign. See rabbitmq-server(1) for details of configuring the +RabbitMQ broker. + +=item B<-q> + +Quiet output mode is selected with the B<-q> flag. Informational +messages are suppressed when quiet mode is in effect. + +=back =head1 COMMANDS =head2 APPLICATION AND CLUSTER MANAGEMENT -stop - stop the Erlang node on which RabbitMQ broker is running. +=over -stop_app - stop the RabbitMQ application, leaving the Erlang node running. - This command is typically run prior to performing other management - actions that require the RabbitMQ application to be stopped, - e.g. I. +=item stop -start_app - start the RabbitMQ application. - This command is typically run prior to performing other management - actions that require the RabbitMQ application to be stopped, - e.g. I. +Stop the Erlang node on which RabbitMQ broker is running. -status - display various information about the RabbitMQ broker, such as - whether the RabbitMQ application on the current node, its version - number, what nodes are part of the broker, which of these are - running. +=item stop_app -force - return a RabbitMQ node to its virgin state. - Removes the node from any cluster it belongs to, removes all data - from the management database, such as configured users, vhosts and - deletes all persistent messages. +Stop the RabbitMQ application, leaving the Erlang node running. This +command is typically run prior to performing other management actions +that require the RabbitMQ application to be stopped, e.g. I. -force_reset - the same as I command, but resets the node unconditionally, - regardless of the current management database state and cluster - configuration. - It should only be used as a last resort if the database or cluster - configuration has been corrupted. +=item start_app -rotate_logs [suffix] - instruct the RabbitMQ node to rotate the log files. The RabbitMQ - broker will attempt to append the current contents of the log file - to the file with the name composed of the original name and the - suffix. It will create a new file if such a file does not already - exist. When no I is specified, the empty log file is - simply created at the original location; no rotation takes place. - When an error occurs while appending the contents of the old log - file, the operation behaves in the same way as if no I was - specified. - This command might be helpful when you are e.g. writing your own - logrotate script and you do not want to restart the RabbitMQ node. +Start the RabbitMQ application. This command is typically run prior +to performing other management actions that require the RabbitMQ +application to be stopped, e.g. I. -cluster I ... - instruct the node to become member of a cluster with the specified - nodes determined by I option(s). - See http://www.rabbitmq.com/clustering.html for more information - about clustering. +=item status + +Display various information about the RabbitMQ broker, such as whether +the RabbitMQ application on the current node, its version number, what +nodes are part of the broker, which of these are running. + +=item reset + +Return a RabbitMQ node to its virgin state. Removes the node from any +cluster it belongs to, removes all data from the management database, +such as configured users, vhosts and deletes all persistent messages. + +=item force_reset + +The same as I command, but resets the node unconditionally, +regardless of the current management database state and cluster +configuration. It should only be used as a last resort if the +database or cluster configuration has been corrupted. + +=item rotate_logs [suffix] + +Instruct the RabbitMQ node to rotate the log files. The RabbitMQ +broker will attempt to append the current contents of the log file to +the file with the name composed of the original name and the +suffix. It will create a new file if such a file does not already +exist. When no I is specified, the empty log file is simply +created at the original location; no rotation takes place. When an +error occurs while appending the contents of the old log file, the +operation behaves in the same way as if no I was specified. +This command might be helpful when you are e.g. writing your own +logrotate script and you do not want to restart the RabbitMQ node. + +=item cluster I ... + +Instruct the node to become member of a cluster with the specified +nodes determined by I option(s). See +L for more information about +clustering. + +=back =head2 USER MANAGEMENT -add_user I I - create a user named I with (initial) password I. +=over -delete_user I - delete the user named I. +=item add_user I I -change_password I I - change the password for the user named I to I. +Create a user named I with (initial) password I. -list_users - list all users. +=item delete_user I + +Delete the user named I. + +=item change_password I I + +Change the password for the user named I to I. + +=item list_users + +List all users, one per line. + +=back =head2 ACCESS CONTROL -add_vhost I - create a new virtual host called I. +=over -delete_vhost I - delete a virtual host I. - That command deletes also all its exchanges, queues and user - mappings. - -list_vhosts - list all virtual hosts. +=item add_vhost I -set_permissions [-p I] I I I I - set the permissions for the user named I in the virtual - host I, granting 'configure', 'write' and 'read' access - to resources with names matching the first, second and third - I, respectively. +Create a new virtual host called I. -clear_permissions [-p I] I - remove the permissions for the user named I in the - virtual host I. +=item delete_vhost I -list_permissions [-p I] - list all the users and their permissions in the virtual host - I. +Delete a virtual host I. This command deletes also all its +exchanges, queues and user mappings. -list_user_permissions I - list the permissions of the user named I across all - virtual hosts. +=item list_vhosts + +List all virtual hosts, one per line. + +=item set_permissions [-p I] I I I I + +Set the permissions for the user named I in the virtual host +I, granting I, I and I access to +resources with names matching the first, second and third I, +respectively. + +=item clear_permissions [-p I] I + +Remove the permissions for the user named I in the virtual +host I. + +=item list_permissions [-p I] + +List all the users and their permissions in the virtual host +I. Each output line contains the username and their +I, I and I access regexps, separated by tab +characters. + +=item list_user_permissions I + +List the permissions of the user named I across all virtual +hosts. + +=back =head2 SERVER STATUS -list_queues [-p I] [I ...] - list queue information by virtual host. If no Is - are specified then then name and number of messages is displayed - for each queue. +=over + +=item list_queues [-p I] [I ...] + +List queue information by virtual host. Each line printed +describes a queue, with the requested I values +separated by tab characters. If no Is are +specified then I and I are assumed. + +=back =head3 Queue information items -=over 4 +=over -name - URL-encoded name of the queue +=item name -durable - whether the queue survives server restarts +name of the queue -auto_delete - whether the queue will be deleted when no longer used +=item durable -arguments - queue arguments +whether the queue survives server restarts -node - node on which the process associated with the queue resides +=item auto_delete -messages_ready - number of messages ready to be delivered to clients +whether the queue will be deleted when no longer used -messages_unacknowledged - number of messages delivered to clients but not yet - acknowledged +=item arguments -messages_uncommitted - number of messages published in as yet uncommitted transactions +queue arguments -messages - sum of ready, unacknowledged and uncommitted messages +=item node -acks_uncommitted - number of acknowledgements received in as yet uncommitted - transactions +node on which the process associated with the queue resides -consumers - number of consumers +=item messages_ready -transactions - number of transactions +number of messages ready to be delivered to clients -memory - bytes of memory consumed by the Erlang process for the queue, - including stack, heap and internal structures +=item messages_unacknowledged + +number of messages delivered to clients but not yet acknowledged + +=item messages_uncommitted + +number of messages published in as yet uncommitted transactions + +=item messages + +sum of ready, unacknowledged and uncommitted messages + +=item acks_uncommitted + +number of acknowledgements received in as yet uncommitted transactions + +=item consumers + +number of consumers + +=item transactions + +number of transactions + +=item memory + +bytes of memory consumed by the Erlang process for the queue, +including stack, heap and internal structures =back -list_exchanges [-p I] [I ...] - list exchange information by virtual host. If no - Is are specified then name and type is displayed - for each exchange. +=over + +=item list_exchanges [-p I] [I ...] + +List queue information by virtual host. Each line printed describes an +exchange, with the requested I values separated by +tab characters. If no Is are specified then I +and I are assumed. + +=back =head3 Exchange information items -=over 4 +=over -name - URL-encoded name of the exchange +=item name -type - exchange type (B, B, B, or B) +name of the exchange -durable - whether the exchange survives server restarts +=item type -auto_delete - whether the exchange is deleted when no longer used +exchange type (B, B, B, or B) -arguments - exchange arguments +=item durable + +whether the exchange survives server restarts + +=item auto_delete + +whether the exchange is deleted when no longer used + +=item arguments + +exchange arguments =back -list_bindings [-p I] - list bindings by virtual host. Each line contains exchange name, - routing key and queue name (all URL encoded) and arguments. +=over -list_connections [I ...] - list connection information. If no Is are - specified then the user, peer address and peer port are displayed. +=item list_bindings [-p I] + +List bindings by virtual host. Each line printed describes a binding, +with the exchange name, routing key, queue name and arguments, +separated by tab characters. + +=item list_connections [I ...] + +List queue information by virtual host. Each line printed describes an +connection, with the requested I values separated +by tab characters. If no Is are specified then +I, I, I and I are assumed. + +=back =head3 Connection information items -=over 4 +=over -node - node on which the process associated with the connection resides +=item node -address - server IP number +node on which the process associated with the connection resides -port - server port +=item address -peer_address - peer address +server IP number -peer_port - peer port +=item port -state - connection state (B, B, B, B, - B, B, B) +server port -channels - number of channels using the connection +=item peer_address -user - username associated with the connection +peer address -vhost - URL-encoded virtual host +=item peer_port -timeout - connection timeout +peer port -frame_max - maximum frame size (bytes) +=item state -recv_oct - octets received +connection state (B, B, B, B, +B, B, B) -recv_cnt - packets received +=item channels -send_oct - octets sent +number of channels using the connection -send_cnt - packets sent +=item user -send_pend - send queue size +username associated with the connection + +=item vhost + +virtual host + +=item timeout + +connection timeout + +=item frame_max + +maximum frame size (bytes) + +=item recv_oct + +octets received + +=item recv_cnt + +packets received + +=item send_oct + +octets sent + +=item send_cnt + +packets sent + +=item send_pend + +send queue size =back The list_queues, list_exchanges and list_bindings commands accept an -optional virtual host parameter for which to display results, defaulting -to I<"/">. The default can be overridden with the B<-p> flag. Result -columns for these commands and list_connections are tab-separated. +optional virtual host parameter for which to display results, +defaulting to I<"/">. The default can be overridden with the B<-p> +flag. + +=head1 OUTPUT ESCAPING + +Various items that may appear in the output of rabbitmqctl can contain +arbitrary octets. If a octet corresponds to a non-printing ASCII +character (values 0 to 31, and 127), it will be escaped in the output, +using a sequence consisting of a backslash character followed by three +octal digits giving the octet's value (i.e., as used in string +literals in the C programming language). An octet corresponding to +the backslash character (i.e. with value 92) will be escaped using a +sequence of two backslash characters. Octets with a value of 128 or +above are not escaped, in order to preserve strings encoded with +UTF-8. + +The items to which this escaping scheme applies are: + +=over + +=item * +Usernames + +=item * +Virtual host names + +=item * +Queue names + +=item * +Exchange names + +=item * +Regular expressions used for access control + +=back =head1 EXAMPLES @@ -309,4 +428,4 @@ The RabbitMQ Team =head1 REFERENCES -RabbitMQ Web Site: http://www.rabbitmq.com +RabbitMQ Web Site: L diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index 0057ea0478..6fc6e464f4 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -15,6 +15,8 @@ %% actually want to start it {mod, {rabbit, []}}, {env, [{tcp_listeners, [{"0.0.0.0", 5672}]}, + {ssl_listeners, []}, + {ssl_options, []}, {extra_startup_steps, []}, {default_user, <<"guest">>}, {default_pass, <<"guest">>}, diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 784c21b39d..d1a2f3bde1 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -64,6 +64,7 @@ -record(basic_message, {exchange_name, routing_key, content, persistent_key}). +-record(ssl_socket, {tcp, ssl}). -record(delivery, {mandatory, immediate, txn, sender, message}). %%---------------------------------------------------------------------------- @@ -74,7 +75,8 @@ -type(maybe(T) :: T | 'none'). -type(erlang_node() :: atom()). --type(socket() :: port()). +-type(ssl_socket() :: #ssl_socket{}). +-type(socket() :: port() | ssl_socket()). -type(thunk(T) :: fun(() -> T)). -type(info_key() :: atom()). -type(info() :: {info_key(), any()}). diff --git a/packaging/RPMS/Fedora/Makefile b/packaging/RPMS/Fedora/Makefile index c74d453361..fa2844fddf 100644 --- a/packaging/RPMS/Fedora/Makefile +++ b/packaging/RPMS/Fedora/Makefile @@ -1,7 +1,8 @@ -VERSION=0.0.0 -SOURCE_TARBALL_DIR=../../../dist +TARBALL_DIR=../../../dist +TARBALL=$(notdir $(wildcard $(TARBALL_DIR)/rabbitmq-server-[0-9.]*.tar.gz)) COMMON_DIR=../../common -TARBALL=$(SOURCE_TARBALL_DIR)/rabbitmq-server-$(VERSION).tar.gz +VERSION=$(shell echo $(TARBALL) | sed -e 's:rabbitmq-server-\(.*\)\.tar\.gz:\1:g') + TOP_DIR=$(shell pwd) #Under debian we do not want to check build dependencies, since that #only checks build-dependencies using rpms, not debs @@ -23,13 +24,16 @@ rpms: clean server prepare: mkdir -p BUILD SOURCES SPECS SRPMS RPMS tmp - cp $(TOP_DIR)/$(TARBALL) SOURCES + cp $(TARBALL_DIR)/$(TARBALL) SOURCES cp rabbitmq-server.spec SPECS sed -i 's|%%VERSION%%|$(VERSION)|;s|%%REQUIRES%%|$(REQUIRES)|' \ SPECS/rabbitmq-server.spec - cp init.d SOURCES/rabbitmq-server.init cp ${COMMON_DIR}/* SOURCES/ + sed -i \ + -e 's|^DEFAULTS_FILE=.*$$|DEFAULTS_FILE=/etc/sysconfig/rabbitmq|' \ + -e 's|^LOCK_FILE=.*$$|LOCK_FILE=/var/lock/subsys/$$NAME|' \ + SOURCES/rabbitmq-server.init cp rabbitmq-server.logrotate SOURCES/rabbitmq-server.logrotate server: prepare diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index eb953b81fa..7f442831de 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -9,6 +9,7 @@ Source: http://www.rabbitmq.com/releases/rabbitmq-server/v%{version}/%{name}-%{v Source1: rabbitmq-server.init Source2: rabbitmq-script-wrapper Source3: rabbitmq-server.logrotate +Source4: rabbitmq-asroot-script-wrapper URL: http://www.rabbitmq.com/ BuildRequires: erlang, python-simplejson Requires: erlang, logrotate @@ -22,9 +23,10 @@ RabbitMQ is an implementation of AMQP, the emerging standard for high performance enterprise messaging. The RabbitMQ server is a robust and scalable implementation of an AMQP broker. -%define _rabbit_erllibdir %{_libdir}/erlang/lib/rabbitmq_server-%{version} +%define _rabbit_erllibdir %{_libdir}/rabbitmq/lib/rabbitmq_server-%{version} %define _rabbit_libdir %{_libdir}/rabbitmq %define _rabbit_wrapper %{_builddir}/`basename %{S:2}` +%define _rabbit_asroot_wrapper %{_builddir}/`basename %{S:4}` %define _maindir %{buildroot}%{_rabbit_erllibdir} @@ -34,6 +36,8 @@ scalable implementation of an AMQP broker. %build cp %{S:2} %{_rabbit_wrapper} sed -i 's|/usr/lib/|%{_libdir}/|' %{_rabbit_wrapper} +cp %{S:4} %{_rabbit_asroot_wrapper} +sed -i 's|/usr/lib/|%{_libdir}/|' %{_rabbit_asroot_wrapper} make %{?_smp_mflags} %install @@ -51,6 +55,7 @@ install -p -D -m 0755 %{S:1} %{buildroot}%{_initrddir}/rabbitmq-server install -p -D -m 0755 %{_rabbit_wrapper} %{buildroot}%{_sbindir}/rabbitmqctl install -p -D -m 0755 %{_rabbit_wrapper} %{buildroot}%{_sbindir}/rabbitmq-server install -p -D -m 0755 %{_rabbit_wrapper} %{buildroot}%{_sbindir}/rabbitmq-multi +install -p -D -m 0755 %{_rabbit_asroot_wrapper} %{buildroot}%{_sbindir}/rabbitmq-activate-plugins install -p -D -m 0644 %{S:3} %{buildroot}%{_sysconfdir}/logrotate.d/rabbitmq-server diff --git a/packaging/common/rabbitmq-asroot-script-wrapper b/packaging/common/rabbitmq-asroot-script-wrapper new file mode 100644 index 0000000000..0dd1c0fbda --- /dev/null +++ b/packaging/common/rabbitmq-asroot-script-wrapper @@ -0,0 +1,53 @@ +#!/bin/bash +## The contents of this file are subject to the Mozilla Public License +## Version 1.1 (the "License"); you may not use this file except in +## compliance with the License. You may obtain a copy of the License at +## http://www.mozilla.org/MPL/ +## +## Software distributed under the License is distributed on an "AS IS" +## basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +## License for the specific language governing rights and limitations +## under the License. +## +## The Original Code is RabbitMQ. +## +## The Initial Developers of the Original Code are LShift Ltd, +## Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +## +## Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +## Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +## are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +## Technologies LLC, and Rabbit Technologies Ltd. +## +## Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +## Ltd. Portions created by Cohesive Financial Technologies LLC are +## Copyright (C) 2007-2009 Cohesive Financial Technologies +## LLC. Portions created by Rabbit Technologies Ltd are Copyright +## (C) 2007-2009 Rabbit Technologies Ltd. +## +## All Rights Reserved. +## +## Contributor(s): ______________________________________. +## + +# Escape spaces and quotes, because shell is revolting. +for arg in "$@" ; do + # Escape quotes in parameters, so that they're passed through cleanly. + arg=$(sed -e 's/"/\\"/' <<-END + $arg + END + ) + CMDLINE="${CMDLINE} \"${arg}\"" +done + +cd /var/lib/rabbitmq + +SCRIPT=`basename $0` + +if [ `id -u` = 0 ] ; then + /usr/lib/rabbitmq/bin/${SCRIPT} ${CMDLINE} +else + echo -e "\nOnly root should run ${SCRIPT}\n" + exit 1 +fi + diff --git a/packaging/common/rabbitmq-script-wrapper b/packaging/common/rabbitmq-script-wrapper index fbb300ace6..94d72f169a 100644 --- a/packaging/common/rabbitmq-script-wrapper +++ b/packaging/common/rabbitmq-script-wrapper @@ -1,4 +1,35 @@ #!/bin/sh +## The contents of this file are subject to the Mozilla Public License +## Version 1.1 (the "License"); you may not use this file except in +## compliance with the License. You may obtain a copy of the License at +## http://www.mozilla.org/MPL/ +## +## Software distributed under the License is distributed on an "AS IS" +## basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +## License for the specific language governing rights and limitations +## under the License. +## +## The Original Code is RabbitMQ. +## +## The Initial Developers of the Original Code are LShift Ltd, +## Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +## +## Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +## Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +## are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +## Technologies LLC, and Rabbit Technologies Ltd. +## +## Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +## Ltd. Portions created by Cohesive Financial Technologies LLC are +## Copyright (C) 2007-2009 Cohesive Financial Technologies +## LLC. Portions created by Rabbit Technologies Ltd are Copyright +## (C) 2007-2009 Rabbit Technologies Ltd. +## +## All Rights Reserved. +## +## Contributor(s): ______________________________________. +## + # Escape spaces and quotes, because shell is revolting. for arg in "$@" ; do # Escape quotes in parameters, so that they're passed through cleanly. diff --git a/packaging/RPMS/Fedora/init.d b/packaging/common/rabbitmq-server.init similarity index 83% rename from packaging/RPMS/Fedora/init.d rename to packaging/common/rabbitmq-server.init index 77a6a89af1..e71562f834 100644 --- a/packaging/RPMS/Fedora/init.d +++ b/packaging/common/rabbitmq-server.init @@ -8,10 +8,10 @@ ### BEGIN INIT INFO # Provides: rabbitmq-server -# Default-Start: -# Default-Stop: # Required-Start: $remote_fs $network # Required-Stop: $remote_fs $network +# Default-Start: +# Default-Stop: # Description: RabbitMQ broker # Short-Description: Enable AMQP service provided by RabbitMQ broker ### END INIT INFO @@ -24,13 +24,14 @@ USER=rabbitmq NODE_COUNT=1 ROTATE_SUFFIX= -LOCK_FILE=/var/lock/subsys/$NAME +DEFAULTS_FILE= # This is filled in when building packages +LOCK_FILE= # This is filled in when building packages test -x $DAEMON || exit 0 # Include rabbitmq defaults if available -if [ -f /etc/sysconfig/rabbitmq ] ; then - . /etc/sysconfig/rabbitmq +if [ -f "$DEFAULTS_FILE" ] ; then + . $DEFAULTS_FILE fi RETVAL=0 @@ -41,7 +42,8 @@ start_rabbitmq () { $DAEMON start_all ${NODE_COUNT} > /var/log/rabbitmq/startup_log 2> /var/log/rabbitmq/startup_err case "$?" in 0) - echo SUCCESS && touch $LOCK_FILE + echo SUCCESS + [ -n "$LOCK_FILE" ] && touch $LOCK_FILE RETVAL=0 ;; 1) @@ -52,7 +54,7 @@ start_rabbitmq () { echo FAILED - check /var/log/rabbitmq/startup_log, _err RETVAL=1 ;; - esac + esac set -e } @@ -62,10 +64,12 @@ stop_rabbitmq () { if [ $RETVAL = 0 ] ; then $DAEMON stop_all > /var/log/rabbitmq/shutdown_log 2> /var/log/rabbitmq/shutdown_err RETVAL=$? - if [ $RETVAL != 0 ] ; then - echo FAILED - check /var/log/rabbitmq/shutdown_log, _err + if [ $RETVAL = 0 ] ; then + # Try to stop epmd if run by the rabbitmq user + pkill -u rabbitmq epmd || : + [ -n "$LOCK_FILE" ] && rm -rf $LOCK_FILE else - rm -rf $LOCK_FILE + echo FAILED - check /var/log/rabbitmq/shutdown_log, _err fi else echo No nodes running @@ -119,19 +123,14 @@ case "$1" in echo -n "Rotating log files for $DESC: " rotate_logs_rabbitmq ;; - force-reload|reload|restart) - echo -n "Restarting $DESC: " - restart_rabbitmq - echo "$NAME." - ;; - condrestart|try-restart) + force-reload|reload|restart|condrestart|try-restart) echo -n "Restarting $DESC: " restart_rabbitmq echo "$NAME." ;; *) echo "Usage: $0 {start|stop|status|rotate-logs|restart|condrestart|try-restart|reload|force-reload}" >&2 - RETVAL=2 + RETVAL=1 ;; esac diff --git a/packaging/debs/Debian/Makefile b/packaging/debs/Debian/Makefile index 67fabae0aa..dafaf9cef4 100644 --- a/packaging/debs/Debian/Makefile +++ b/packaging/debs/Debian/Makefile @@ -1,8 +1,9 @@ TARBALL_DIR=../../../dist -TARBALL=$(shell (cd $(TARBALL_DIR); echo rabbitmq-server-[0-9]*.tar.gz)) +TARBALL=$(notdir $(wildcard $(TARBALL_DIR)/rabbitmq-server-[0-9.]*.tar.gz)) COMMON_DIR=../../common -DEBIAN_ORIG_TARBALL=$(shell echo $(TARBALL) | sed -e 's:\(.*\)-\(.*\)\(\.tar\.gz\):\1_\2\.orig\3:g') VERSION=$(shell echo $(TARBALL) | sed -e 's:rabbitmq-server-\(.*\)\.tar\.gz:\1:g') + +DEBIAN_ORIG_TARBALL=$(shell echo $(TARBALL) | sed -e 's:\(.*\)-\(.*\)\(\.tar\.gz\):\1_\2\.orig\3:g') UNPACKED_DIR=rabbitmq-server-$(VERSION) PACKAGENAME=rabbitmq-server SIGNING_KEY_ID=056E8E56 @@ -21,6 +22,10 @@ package: clean tar -zxvf $(DEBIAN_ORIG_TARBALL) cp -r debian $(UNPACKED_DIR) cp $(COMMON_DIR)/* $(UNPACKED_DIR)/debian/ + sed -i \ + -e 's|^DEFAULTS_FILE=.*$$|DEFAULTS_FILE=/etc/default/rabbitmq|' \ + -e 's|^LOCK_FILE=.*$$|LOCK_FILE=|' \ + $(UNPACKED_DIR)/debian/rabbitmq-server.init chmod a+x $(UNPACKED_DIR)/debian/rules UNOFFICIAL_RELEASE=$(UNOFFICIAL_RELEASE) VERSION=$(VERSION) ./check-changelog.sh rabbitmq-server $(UNPACKED_DIR) cd $(UNPACKED_DIR); GNUPGHOME=$(GNUPG_PATH)/.gnupg dpkg-buildpackage -rfakeroot $(SIGNING) diff --git a/packaging/debs/Debian/debian/init.d b/packaging/debs/Debian/debian/init.d deleted file mode 100644 index a35a60ec68..0000000000 --- a/packaging/debs/Debian/debian/init.d +++ /dev/null @@ -1,122 +0,0 @@ -#!/bin/sh -### BEGIN INIT INFO -# Provides: rabbitmq -# Required-Start: $remote_fs $network -# Required-Stop: $remote_fs $network -# Default-Start: 2 3 4 5 -# Default-Stop: 0 1 6 -# Description: RabbitMQ broker -# Short-Description: Enable AMQP service provided by RabbitMQ broker -### END INIT INFO - -PATH=/sbin:/usr/sbin:/bin:/usr/bin -DAEMON=/usr/sbin/rabbitmq-multi -NAME=rabbitmq-server -DESC=rabbitmq-server -USER=rabbitmq -NODE_COUNT=1 -ROTATE_SUFFIX= - -test -x $DAEMON || exit 0 - -# Include rabbitmq defaults if available -if [ -f /etc/default/rabbitmq ] ; then - . /etc/default/rabbitmq -fi - -RETVAL=0 -set -e - -start_rabbitmq () { - set +e - $DAEMON start_all ${NODE_COUNT} > /var/log/rabbitmq/startup_log 2> /var/log/rabbitmq/startup_err - case "$?" in - 0) - echo SUCCESS - RETVAL=0 - ;; - 1) - echo TIMEOUT - check /var/log/rabbitmq/startup_\{log,err\} - RETVAL=1 - ;; - *) - echo FAILED - check /var/log/rabbitmq/startup_log, _err - RETVAL=1 - ;; - esac - set -e -} - -stop_rabbitmq () { - set +e - status_rabbitmq quiet - if [ $RETVAL = 0 ] ; then - $DAEMON stop_all > /var/log/rabbitmq/shutdown_log 2> /var/log/rabbitmq/shutdown_err - RETVAL=$? - if [ $RETVAL != 0 ] ; then - echo FAILED - check /var/log/rabbitmq/shutdown_log, _err - fi - else - echo No nodes running - RETVAL=0 - fi - set -e -} - -status_rabbitmq() { - set +e - if [ "$1" != "quiet" ] ; then - $DAEMON status 2>&1 - else - $DAEMON status > /dev/null 2>&1 - fi - if [ $? != 0 ] ; then - RETVAL=1 - fi - set -e -} - -rotate_logs_rabbitmq() { - set +e - $DAEMON rotate_logs ${ROTATE_SUFFIX} - if [ $? != 0 ] ; then - RETVAL=1 - fi - set -e -} - -restart_rabbitmq() { - stop_rabbitmq - start_rabbitmq -} - -case "$1" in - start) - echo -n "Starting $DESC: " - start_rabbitmq - echo "$NAME." - ;; - stop) - echo -n "Stopping $DESC: " - stop_rabbitmq - echo "$NAME." - ;; - status) - status_rabbitmq - ;; - rotate-logs) - echo -n "Rotating log files for $DESC: " - rotate_logs_rabbitmq - ;; - force-reload|restart) - echo -n "Restarting $DESC: " - restart_rabbitmq - echo "$NAME." - ;; - *) - echo "Usage: $0 {start|stop|status|rotate-logs|restart|force-reload}" >&2 - RETVAL=1 - ;; -esac - -exit $RETVAL diff --git a/packaging/debs/Debian/debian/rules b/packaging/debs/Debian/debian/rules index 31904851a9..365eea6eb4 100644 --- a/packaging/debs/Debian/debian/rules +++ b/packaging/debs/Debian/debian/rules @@ -3,7 +3,7 @@ include /usr/share/cdbs/1/rules/debhelper.mk include /usr/share/cdbs/1/class/makefile.mk -RABBIT_LIB=$(DEB_DESTDIR)usr/lib/erlang/lib/rabbitmq_server-$(DEB_UPSTREAM_VERSION)/ +RABBIT_LIB=$(DEB_DESTDIR)usr/lib/rabbitmq/lib/rabbitmq_server-$(DEB_UPSTREAM_VERSION)/ RABBIT_BIN=$(DEB_DESTDIR)usr/lib/rabbitmq/bin/ DEB_MAKE_INSTALL_TARGET := install TARGET_DIR=$(RABBIT_LIB) SBIN_DIR=$(RABBIT_BIN) MAN_DIR=$(DEB_DESTDIR)usr/share/man/ @@ -17,3 +17,6 @@ install/rabbitmq-server:: for script in rabbitmqctl rabbitmq-server rabbitmq-multi; do \ install -p -D -m 0755 debian/rabbitmq-script-wrapper $(DEB_DESTDIR)usr/sbin/$$script; \ done + for script in rabbitmq-activate-plugins; do \ + install -p -D -m 0755 debian/rabbitmq-asroot-script-wrapper $(DEB_DESTDIR)usr/sbin/$$script; \ + done diff --git a/packaging/generic-unix/Makefile b/packaging/generic-unix/Makefile index b398869693..4eade6c744 100644 --- a/packaging/generic-unix/Makefile +++ b/packaging/generic-unix/Makefile @@ -4,10 +4,10 @@ TARGET_DIR=rabbitmq_server-$(VERSION) TARGET_TARBALL=rabbitmq-server-generic-unix-$(VERSION) dist: - make -C ../.. VERSION=$(VERSION) srcdist + $(MAKE) -C ../.. VERSION=$(VERSION) srcdist tar -zxvf ../../dist/$(SOURCE_DIR).tar.gz - make -C $(SOURCE_DIR) \ + $(MAKE) -C $(SOURCE_DIR) \ TARGET_DIR=`pwd`/$(TARGET_DIR) \ SBIN_DIR=`pwd`/$(TARGET_DIR)/sbin \ MAN_DIR=`pwd`/$(TARGET_DIR)/share/man \ diff --git a/packaging/macports/net/rabbitmq-server/Portfile b/packaging/macports/net/rabbitmq-server/Portfile index b8096d206d..1826d5c415 100644 --- a/packaging/macports/net/rabbitmq-server/Portfile +++ b/packaging/macports/net/rabbitmq-server/Portfile @@ -42,7 +42,7 @@ use_parallel_build yes build.args PYTHON=${prefix}/bin/python2.5 destroot.destdir \ - TARGET_DIR=${destroot}${prefix}/lib/erlang/lib/rabbitmq_server-${version} \ + TARGET_DIR=${destroot}${prefix}/lib/rabbitmq/lib/rabbitmq_server-${version} \ SBIN_DIR=${sbindir} \ MAN_DIR=${destroot}${prefix}/share/man @@ -61,9 +61,7 @@ post-destroot { xinstall -d -g [existsgroup ${servergroup}] -m 775 ${destroot}${mnesiadbdir} reinplace -E "s:(/etc/rabbitmq/rabbitmq.conf):${prefix}\\1:g" \ - ${sbindir}/rabbitmq-multi \ - ${sbindir}/rabbitmq-server \ - ${sbindir}/rabbitmqctl + ${sbindir}/rabbitmq-env reinplace -E "s:(CLUSTER_CONFIG_FILE)=/:\\1=${prefix}/:" \ ${sbindir}/rabbitmq-multi \ ${sbindir}/rabbitmq-server \ @@ -83,14 +81,19 @@ post-destroot { xinstall -m 555 ${filespath}/rabbitmq-script-wrapper \ ${wrappersbin}/rabbitmq-multi + xinstall -m 555 ${filespath}/rabbitmq-asroot-script-wrapper \ + ${wrappersbin}/rabbitmq-activate-plugins reinplace -E "s:/usr/lib/rabbitmq/bin/:${prefix}/lib/rabbitmq/bin/:" \ ${wrappersbin}/rabbitmq-multi reinplace -E "s:/var/lib/rabbitmq:${prefix}/var/lib/rabbitmq:" \ ${wrappersbin}/rabbitmq-multi + reinplace -E "s:/usr/lib/rabbitmq/bin/:${prefix}/lib/rabbitmq/bin/:" \ + ${wrappersbin}/rabbitmq-activate-plugins + reinplace -E "s:/var/lib/rabbitmq:${prefix}/var/lib/rabbitmq:" \ + ${wrappersbin}/rabbitmq-activate-plugins file copy ${wrappersbin}/rabbitmq-multi ${wrappersbin}/rabbitmq-server file copy ${wrappersbin}/rabbitmq-multi ${wrappersbin}/rabbitmqctl - } pre-install { diff --git a/packaging/macports/net/rabbitmq-server/files/rabbitmq-asroot-script-wrapper b/packaging/macports/net/rabbitmq-server/files/rabbitmq-asroot-script-wrapper new file mode 100644 index 0000000000..c4488dcbe5 --- /dev/null +++ b/packaging/macports/net/rabbitmq-server/files/rabbitmq-asroot-script-wrapper @@ -0,0 +1,12 @@ +#!/bin/bash +cd /var/lib/rabbitmq + +SCRIPT=`basename $0` + +if [ `id -u` = 0 ] ; then + /usr/lib/rabbitmq/bin/${SCRIPT} "$@" +else + echo -e "\nOnly root should run ${SCRIPT}\n" + exit 1 +fi + diff --git a/packaging/windows/Makefile b/packaging/windows/Makefile index 59101cb2c6..387becb333 100644 --- a/packaging/windows/Makefile +++ b/packaging/windows/Makefile @@ -4,15 +4,16 @@ TARGET_DIR=rabbitmq_server-$(VERSION) TARGET_ZIP=rabbitmq-server-windows-$(VERSION) dist: - make -C ../.. VERSION=$(VERSION) srcdist + $(MAKE) -C ../.. VERSION=$(VERSION) srcdist tar -zxvf ../../dist/$(SOURCE_DIR).tar.gz - make -C $(SOURCE_DIR) + $(MAKE) -C $(SOURCE_DIR) mkdir $(SOURCE_DIR)/sbin mv $(SOURCE_DIR)/scripts/rabbitmq-server.bat $(SOURCE_DIR)/sbin mv $(SOURCE_DIR)/scripts/rabbitmq-service.bat $(SOURCE_DIR)/sbin mv $(SOURCE_DIR)/scripts/rabbitmqctl.bat $(SOURCE_DIR)/sbin mv $(SOURCE_DIR)/scripts/rabbitmq-multi.bat $(SOURCE_DIR)/sbin + mv $(SOURCE_DIR)/scripts/rabbitmq-activate-plugins.bat $(SOURCE_DIR)/sbin rm -rf $(SOURCE_DIR)/scripts rm -rf $(SOURCE_DIR)/codegen* $(SOURCE_DIR)/Makefile rm -f $(SOURCE_DIR)/README diff --git a/scripts/activate-plugins b/scripts/rabbitmq-activate-plugins similarity index 81% rename from scripts/activate-plugins rename to scripts/rabbitmq-activate-plugins index 52f7ddbe61..5ce64c686c 100755 --- a/scripts/activate-plugins +++ b/scripts/rabbitmq-activate-plugins @@ -30,18 +30,18 @@ ## Contributor(s): ______________________________________. ## -[ -f /etc/rabbitmq/rabbitmq.conf ] && . /etc/rabbitmq/rabbitmq.conf +. `dirname $0`/rabbitmq-env -RABBITMQ_EBIN=`dirname $0`/../ebin -[ "x" = "x$RABBITMQ_PLUGINS_DIR" ] && RABBITMQ_PLUGINS_DIR="`dirname $0`/../plugins" -[ "x" = "x$RABBITMQ_PLUGINS_EXPAND_DIR" ] && RABBITMQ_PLUGINS_EXPAND_DIR="`dirname $0`/../priv/plugins" +RABBITMQ_EBIN=${RABBITMQ_HOME}/ebin +[ "x" = "x$RABBITMQ_PLUGINS_DIR" ] && RABBITMQ_PLUGINS_DIR="${RABBITMQ_HOME}/plugins" +[ "x" = "x$RABBITMQ_PLUGINS_EXPAND_DIR" ] && RABBITMQ_PLUGINS_EXPAND_DIR="${RABBITMQ_HOME}/priv/plugins" exec erl \ -pa "$RABBITMQ_EBIN" \ -rabbit plugins_dir "\"$RABBITMQ_PLUGINS_DIR\"" \ - -rabbit plugins_expand_dir "\"$RABBITMQ_PLUGINS_EXPAND_DIR\"" \ - -rabbit rabbit_ebin "\"$RABBITMQ_EBIN\"" \ - -noinput \ + -rabbit plugins_expand_dir "\"$RABBITMQ_PLUGINS_EXPAND_DIR\"" \ + -rabbit rabbit_ebin "\"$RABBITMQ_EBIN\"" \ + -noinput \ -hidden \ -s rabbit_plugin_activator \ -extra "$@" diff --git a/scripts/activate-plugins.bat b/scripts/rabbitmq-activate-plugins.bat similarity index 94% rename from scripts/activate-plugins.bat rename to scripts/rabbitmq-activate-plugins.bat index 8bef4ad266..3540bf2d9b 100644 --- a/scripts/activate-plugins.bat +++ b/scripts/rabbitmq-activate-plugins.bat @@ -30,10 +30,6 @@ REM REM Contributor(s): ______________________________________. REM -if "%ERLANG_HOME%"=="" ( - set ERLANG_HOME=%~dp0%..\..\.. -) - if not exist "%ERLANG_HOME%\bin\erl.exe" ( echo. echo ****************************** diff --git a/scripts/rabbitmq-env b/scripts/rabbitmq-env new file mode 100755 index 0000000000..69ddbcfed1 --- /dev/null +++ b/scripts/rabbitmq-env @@ -0,0 +1,53 @@ +#!/bin/sh +## The contents of this file are subject to the Mozilla Public License +## Version 1.1 (the "License"); you may not use this file except in +## compliance with the License. You may obtain a copy of the License at +## http://www.mozilla.org/MPL/ +## +## Software distributed under the License is distributed on an "AS IS" +## basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +## License for the specific language governing rights and limitations +## under the License. +## +## The Original Code is RabbitMQ. +## +## The Initial Developers of the Original Code are LShift Ltd, +## Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +## +## Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +## Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +## are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +## Technologies LLC, and Rabbit Technologies Ltd. +## +## Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +## Ltd. Portions created by Cohesive Financial Technologies LLC are +## Copyright (C) 2007-2009 Cohesive Financial Technologies +## LLC. Portions created by Rabbit Technologies Ltd are Copyright +## (C) 2007-2009 Rabbit Technologies Ltd. +## +## All Rights Reserved. +## +## Contributor(s): ______________________________________. +## + +# Determine where this script is really located +SCRIPT_PATH="$0" +while [ -h "$SCRIPT_PATH" ] ; do + FULL_PATH=`readlink -f $SCRIPT_PATH 2>/dev/null` + if [ "$?" != "0" ]; then + REL_PATH=`readlink $SCRIPT_PATH` + if expr "$REL_PATH" : '/.*' > /dev/null; then + SCRIPT_PATH="$REL_PATH" + else + SCRIPT_PATH="`dirname "$SCRIPT_PATH"`/$REL_PATH" + fi + else + SCRIPT_PATH=$FULL_PATH + fi +done + +SCRIPT_DIR=`dirname $SCRIPT_PATH` +RABBITMQ_HOME="${SCRIPT_DIR}/.." + +# Load configuration from the rabbitmq.conf file +[ -f /etc/rabbitmq/rabbitmq.conf ] && . /etc/rabbitmq/rabbitmq.conf diff --git a/scripts/rabbitmq-multi b/scripts/rabbitmq-multi index 1d0c785f6b..7db4cb70b2 100755 --- a/scripts/rabbitmq-multi +++ b/scripts/rabbitmq-multi @@ -37,7 +37,7 @@ PIDS_FILE=/var/lib/rabbitmq/pids MULTI_ERL_ARGS= MULTI_START_ARGS= -[ -f /etc/rabbitmq/rabbitmq.conf ] && . /etc/rabbitmq/rabbitmq.conf +. `dirname $0`/rabbitmq-env [ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME} [ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS} @@ -60,7 +60,7 @@ export \ set -f exec erl \ - -pa "`dirname $0`/../ebin" \ + -pa "${RABBITMQ_HOME}/ebin" \ -noinput \ -hidden \ ${RABBITMQ_MULTI_ERL_ARGS} \ diff --git a/scripts/rabbitmq-multi.bat b/scripts/rabbitmq-multi.bat index a30c0889ab..8abf13f192 100755 --- a/scripts/rabbitmq-multi.bat +++ b/scripts/rabbitmq-multi.bat @@ -49,10 +49,6 @@ if "%RABBITMQ_NODE_PORT%"=="" ( set RABBITMQ_PIDS_FILE=%RABBITMQ_BASE%\rabbitmq.pids set RABBITMQ_SCRIPT_HOME=%~sdp0% -if "%ERLANG_HOME%"=="" ( - set ERLANG_HOME=%~dp0%..\..\.. -) - if not exist "%ERLANG_HOME%\bin\erl.exe" ( echo. echo ****************************** diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server index 41e84639ba..547220b4a5 100755 --- a/scripts/rabbitmq-server +++ b/scripts/rabbitmq-server @@ -41,7 +41,7 @@ LOG_BASE=/var/log/rabbitmq MNESIA_BASE=/var/lib/rabbitmq/mnesia SERVER_START_ARGS= -[ -f /etc/rabbitmq/rabbitmq.conf ] && . /etc/rabbitmq/rabbitmq.conf +. `dirname $0`/rabbitmq-env [ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME} [ "x" = "x$RABBITMQ_NODE_IP_ADDRESS" ] && RABBITMQ_NODE_IP_ADDRESS=${NODE_IP_ADDRESS} @@ -75,7 +75,7 @@ fi RABBITMQ_START_RABBIT= [ "x" = "x$RABBITMQ_NODE_ONLY" ] && RABBITMQ_START_RABBIT='-noinput -s rabbit' -RABBITMQ_EBIN_ROOT="`dirname $0`/../ebin" +RABBITMQ_EBIN_ROOT="${RABBITMQ_HOME}/ebin" if [ -f "${RABBITMQ_EBIN_ROOT}/rabbit.boot" ]; then RABBITMQ_BOOT_FILE="${RABBITMQ_EBIN_ROOT}/rabbit" RABBITMQ_EBIN_PATH="" diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat index b4868841d7..a784fee349 100755 --- a/scripts/rabbitmq-server.bat +++ b/scripts/rabbitmq-server.bat @@ -46,10 +46,6 @@ if "%RABBITMQ_NODE_PORT%"=="" ( set RABBITMQ_NODE_PORT=5672 ) -if "%ERLANG_HOME%"=="" ( - set ERLANG_HOME=%~dp0%..\..\.. -) - if not exist "%ERLANG_HOME%\bin\erl.exe" ( echo. echo ****************************** diff --git a/scripts/rabbitmqctl b/scripts/rabbitmqctl index c57978c050..9c45e73dd5 100755 --- a/scripts/rabbitmqctl +++ b/scripts/rabbitmqctl @@ -30,12 +30,12 @@ ## Contributor(s): ______________________________________. ## -[ -f /etc/rabbitmq/rabbitmq.conf ] && . /etc/rabbitmq/rabbitmq.conf +. `dirname $0`/rabbitmq-env [ "x" = "x$RABBITMQ_CTL_ERL_ARGS" ] && RABBITMQ_CTL_ERL_ARGS=${CTL_ERL_ARGS} exec erl \ - -pa "`dirname $0`/../ebin" \ + -pa "${RABBITMQ_HOME}/ebin" \ -noinput \ -hidden \ ${RABBITMQ_CTL_ERL_ARGS} \ diff --git a/scripts/rabbitmqctl.bat b/scripts/rabbitmqctl.bat index e4dccfba64..5111724f7d 100755 --- a/scripts/rabbitmqctl.bat +++ b/scripts/rabbitmqctl.bat @@ -30,10 +30,6 @@ REM REM Contributor(s): ______________________________________. REM -if "%ERLANG_HOME%"=="" ( - set ERLANG_HOME=%~dp0%..\..\.. -) - if not exist "%ERLANG_HOME%\bin\erl.exe" ( echo. echo ****************************** diff --git a/src/gen_server2.erl b/src/gen_server2.erl index 36fb4fa8c3..a2d9350c4e 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -437,7 +437,10 @@ unregister_name({local,Name}) -> unregister_name({global,Name}) -> _ = global:unregister_name(Name); unregister_name(Pid) when is_pid(Pid) -> - Pid. + Pid; +% Under R12 let's just ignore it, as we have a single term as Name. +% On R13 it will never get here, as we get tuple with 'local/global' atom. +unregister_name(_Name) -> ok. extend_backoff(undefined) -> undefined; diff --git a/src/priority_queue.erl b/src/priority_queue.erl index 732757c41c..c74b39a957 100644 --- a/src/priority_queue.erl +++ b/src/priority_queue.erl @@ -55,7 +55,8 @@ -module(priority_queue). --export([new/0, is_queue/1, is_empty/1, len/1, to_list/1, in/2, in/3, out/1]). +-export([new/0, is_queue/1, is_empty/1, len/1, to_list/1, in/2, in/3, + out/1, join/2]). %%---------------------------------------------------------------------------- @@ -73,6 +74,7 @@ -spec(in/2 :: (any(), pqueue()) -> pqueue()). -spec(in/3 :: (any(), priority(), pqueue()) -> pqueue()). -spec(out/1 :: (pqueue()) -> {empty | {value, any()}, pqueue()}). +-spec(join/2 :: (pqueue(), pqueue()) -> pqueue()). -endif. @@ -147,6 +149,42 @@ out({pqueue, [{P, Q} | Queues]}) -> end, {R, NewQ}. +join(A, {queue, [], []}) -> + A; +join({queue, [], []}, B) -> + B; +join({queue, AIn, AOut}, {queue, BIn, BOut}) -> + {queue, BIn, AOut ++ lists:reverse(AIn, BOut)}; +join(A = {queue, _, _}, {pqueue, BPQ}) -> + {Pre, Post} = lists:splitwith(fun ({P, _}) -> P < 0 end, BPQ), + Post1 = case Post of + [] -> [ {0, A} ]; + [ {0, ZeroQueue} | Rest ] -> [ {0, join(A, ZeroQueue)} | Rest ]; + _ -> [ {0, A} | Post ] + end, + {pqueue, Pre ++ Post1}; +join({pqueue, APQ}, B = {queue, _, _}) -> + {Pre, Post} = lists:splitwith(fun ({P, _}) -> P < 0 end, APQ), + Post1 = case Post of + [] -> [ {0, B} ]; + [ {0, ZeroQueue} | Rest ] -> [ {0, join(ZeroQueue, B)} | Rest ]; + _ -> [ {0, B} | Post ] + end, + {pqueue, Pre ++ Post1}; +join({pqueue, APQ}, {pqueue, BPQ}) -> + {pqueue, merge(APQ, BPQ, [])}. + +merge([], BPQ, Acc) -> + lists:reverse(Acc, BPQ); +merge(APQ, [], Acc) -> + lists:reverse(Acc, APQ); +merge([{P, A}|As], [{P, B}|Bs], Acc) -> + merge(As, Bs, [ {P, join(A, B)} | Acc ]); +merge([{PA, A}|As], Bs = [{PB, _}|_], Acc) when PA < PB -> + merge(As, Bs, [ {PA, A} | Acc ]); +merge(As = [{_, _}|_], [{PB, B}|Bs], Acc) -> + merge(As, Bs, [ {PB, B} | Acc ]). + r2f([]) -> {queue, [], []}; r2f([_] = R) -> {queue, [], R}; r2f([X,Y]) -> {queue, [X], [Y]}; diff --git a/src/rabbit.erl b/src/rabbit.erl index b0d62b5ab8..ef1e004996 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -116,8 +116,6 @@ start(normal, []) -> print_banner(), - {ok, ExtraSteps} = application:get_env(extra_startup_steps), - lists:foreach( fun ({Msg, Thunk}) -> io:format("starting ~-20s ...", [Msg]), @@ -135,13 +133,13 @@ start(normal, []) -> ok = start_child(rabbit_log), ok = rabbit_hooks:start(), - ok = rabbit_amqqueue:start(), + ok = rabbit_binary_generator: + check_empty_content_body_frame_size(), {ok, MemoryAlarms} = application:get_env(memory_alarms), ok = rabbit_alarm:start(MemoryAlarms), - ok = rabbit_binary_generator: - check_empty_content_body_frame_size(), + ok = rabbit_amqqueue:start(), ok = start_child(rabbit_router), ok = start_child(rabbit_node_monitor) @@ -170,14 +168,28 @@ start(normal, []) -> {"TCP listeners", fun () -> ok = rabbit_networking:start(), - {ok, TCPListeners} = application:get_env(tcp_listeners), + {ok, TcpListeners} = application:get_env(tcp_listeners), lists:foreach( fun ({Host, Port}) -> ok = rabbit_networking:start_tcp_listener(Host, Port) end, - TCPListeners) - end}] - ++ ExtraSteps), + TcpListeners) + end}, + {"SSL listeners", + fun () -> + case application:get_env(ssl_listeners) of + {ok, []} -> + ok; + {ok, SslListeners} -> + ok = rabbit_misc:start_applications([crypto, ssl]), + + {ok, SslOpts} = application:get_env(ssl_options), + + [rabbit_networking:start_ssl_listener + (Host, Port, SslOpts) || {Host, Port} <- SslListeners], + ok + end + end}]), io:format("~nbroker running~n"), diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl index 21999f16c3..309c9a0e80 100644 --- a/src/rabbit_alarm.erl +++ b/src/rabbit_alarm.erl @@ -41,7 +41,7 @@ -define(MEMSUP_CHECK_INTERVAL, 1000). %% OSes on which we know memory alarms to be trustworthy --define(SUPPORTED_OS, [{unix, linux}]). +-define(SUPPORTED_OS, [{unix, linux}, {unix, darwin}]). -record(alarms, {alertees, system_memory_high_watermark = false}). @@ -136,33 +136,35 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- start_memsup() -> - Mod = case os:type() of - %% memsup doesn't take account of buffers or cache when - %% considering "free" memory - therefore on Linux we can - %% get memory alarms very easily without any pressure - %% existing on memory at all. Therefore we need to use - %% our own simple memory monitor. - %% - {unix, linux} -> rabbit_memsup_linux; + {Mod, Args} = + case os:type() of + %% memsup doesn't take account of buffers or cache when + %% considering "free" memory - therefore on Linux we can + %% get memory alarms very easily without any pressure + %% existing on memory at all. Therefore we need to use + %% our own simple memory monitor. + %% + {unix, linux} -> {rabbit_memsup, [rabbit_memsup_linux]}; + {unix, darwin} -> {rabbit_memsup, [rabbit_memsup_darwin]}; - %% Start memsup programmatically rather than via the - %% rabbitmq-server script. This is not quite the right - %% thing to do as os_mon checks to see if memsup is - %% available before starting it, but as memsup is - %% available everywhere (even on VXWorks) it should be - %% ok. - %% - %% One benefit of the programmatic startup is that we - %% can add our alarm_handler before memsup is running, - %% thus ensuring that we notice memory alarms that go - %% off on startup. - %% - _ -> memsup - end, + %% Start memsup programmatically rather than via the + %% rabbitmq-server script. This is not quite the right + %% thing to do as os_mon checks to see if memsup is + %% available before starting it, but as memsup is + %% available everywhere (even on VXWorks) it should be + %% ok. + %% + %% One benefit of the programmatic startup is that we + %% can add our alarm_handler before memsup is running, + %% thus ensuring that we notice memory alarms that go + %% off on startup. + %% + _ -> {memsup, []} + end, %% This is based on os_mon:childspec(memsup, true) {ok, _} = supervisor:start_child( os_mon_sup, - {memsup, {Mod, start_link, []}, + {memsup, {Mod, start_link, Args}, permanent, 2000, worker, [Mod]}), ok. diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 4903c2c57f..f05f7880b7 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -303,10 +303,10 @@ basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> infinity). notify_sent(QPid, ChPid) -> - gen_server2:cast(QPid, {notify_sent, ChPid}). + gen_server2:pcast(QPid, 8, {notify_sent, ChPid}). unblock(QPid, ChPid) -> - gen_server2:cast(QPid, {unblock, ChPid}). + gen_server2:pcast(QPid, 8, {unblock, ChPid}). internal_delete(QueueName) -> rabbit_misc:execute_mnesia_transaction( diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 58b9423460..1285064f43 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -126,7 +126,7 @@ handle_cast({method, Method, Content}, State) -> {stop, normal, State#ch{state = terminating}} catch exit:{amqp, Error, Explanation, none} -> - ok = notify_queues(internal_rollback(State)), + ok = rollback_and_notify(State), Reason = {amqp, Error, Explanation, rabbit_misc:method_record_type(Method)}, State#ch.reader_pid ! {channel_exit, State#ch.channel, Reason}, @@ -157,6 +157,10 @@ handle_cast({conserve_memory, Conserve}, State) -> State#ch.writer_pid, #'channel.flow'{active = not(Conserve)}), noreply(State). +handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}}, + State = #ch{writer_pid = WriterPid}) -> + State#ch.reader_pid ! {channel_exit, State#ch.channel, Reason}, + {stop, normal, State}; handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; @@ -171,7 +175,7 @@ terminate(_Reason, #ch{writer_pid = WriterPid, limiter_pid = LimiterPid, terminate(Reason, State = #ch{writer_pid = WriterPid, limiter_pid = LimiterPid}) -> - Res = notify_queues(internal_rollback(State)), + Res = rollback_and_notify(State), case Reason of normal -> ok = Res; _ -> ok @@ -293,7 +297,7 @@ handle_method(_Method, _, #ch{state = starting}) -> rabbit_misc:protocol_error(channel_error, "expected 'channel.open'", []); handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid}) -> - ok = notify_queues(internal_rollback(State)), + ok = rollback_and_notify(State), ok = rabbit_writer:send_command(WriterPid, #'channel.close_ok'{}), stop; @@ -868,6 +872,11 @@ internal_rollback(State = #ch{transaction_id = TxnKey, internal_error, "rollback failed: ~w", [Errors]) end. +rollback_and_notify(State = #ch{transaction_id = none}) -> + notify_queues(State); +rollback_and_notify(State) -> + notify_queues(internal_rollback(State)). + fold_per_queue(F, Acc0, UAQ) -> D = lists:foldl( fun ({_DTag, _CTag, diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 37e4d18993..69e91803d6 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -164,7 +164,7 @@ exchange name, routing key, queue name and arguments, in that order. must be a member of the list [node, address, port, peer_address, peer_port, state, channels, user, vhost, timeout, frame_max, recv_oct, recv_cnt, send_oct, send_cnt, send_pend]. The default is to display -user, peer_address and peer_port. +user, peer_address, peer_port and state. "), halt(1). @@ -270,8 +270,9 @@ action(list_bindings, Node, Args, Inform) -> action(list_connections, Node, Args, Inform) -> Inform("Listing connections", []), - ArgAtoms = list_replace(node, pid, - default_if_empty(Args, [user, peer_address, peer_port])), + ArgAtoms = list_replace(node, pid, + default_if_empty(Args, [user, peer_address, + peer_port, state])), display_info_list(rpc_call(Node, rabbit_networking, connection_info_all, [ArgAtoms]), ArgAtoms); @@ -314,7 +315,7 @@ default_if_empty(List, Default) when is_list(List) -> end. display_info_list(Results, InfoItemKeys) when is_list(Results) -> - lists:foreach(fun (Result) -> display_row([format_info_item(Result, X) || + lists:foreach(fun (Result) -> display_row([format_info_item(X, Result) || X <- InfoItemKeys]) end, Results), ok; @@ -325,26 +326,29 @@ display_row(Row) -> io:fwrite(lists:flatten(rabbit_misc:intersperse("\t", Row))), io:nl(). -format_info_item(Items, Key) -> - {value, Info = {Key, Value}} = lists:keysearch(Key, 1, Items), - case Info of - {_, #resource{name = Name}} -> - url_encode(Name); - _ when Key =:= address; Key =:= peer_address andalso is_tuple(Value) -> +format_info_item(Key, Items) -> + case proplists:get_value(Key, Items) of + #resource{name = Name} -> + escape(Name); + Value when Key =:= address; Key =:= peer_address andalso + is_tuple(Value) -> inet_parse:ntoa(Value); - _ when is_pid(Value) -> + Value when is_pid(Value) -> atom_to_list(node(Value)); - _ when is_binary(Value) -> - url_encode(Value); - _ -> + Value when is_binary(Value) -> + escape(Value); + Value when is_atom(Value) -> + escape(atom_to_list(Value)); + Value -> io_lib:format("~w", [Value]) end. display_list(L) when is_list(L) -> lists:foreach(fun (I) when is_binary(I) -> - io:format("~s~n", [url_encode(I)]); + io:format("~s~n", [escape(I)]); (I) when is_tuple(I) -> - display_row([url_encode(V) || V <- tuple_to_list(I)]) + display_row([escape(V) + || V <- tuple_to_list(I)]) end, lists:sort(L)), ok; @@ -356,32 +360,25 @@ call(Node, {Mod, Fun, Args}) -> rpc_call(Node, Mod, Fun, Args) -> rpc:call(Node, Mod, Fun, Args, ?RPC_TIMEOUT). -%% url_encode is lifted from ibrowse, modified to preserve some characters -url_encode(Bin) when binary(Bin) -> - url_encode_char(lists:reverse(binary_to_list(Bin)), []). +%% escape does C-style backslash escaping of non-printable ASCII +%% characters. We don't escape characters above 127, since they may +%% form part of UTF-8 strings. -url_encode_char([X | T], Acc) when X >= $a, X =< $z -> - url_encode_char(T, [X | Acc]); -url_encode_char([X | T], Acc) when X >= $A, X =< $Z -> - url_encode_char(T, [X | Acc]); -url_encode_char([X | T], Acc) when X >= $0, X =< $9 -> - url_encode_char(T, [X | Acc]); -url_encode_char([X | T], Acc) - when X == $-; X == $_; X == $.; X == $~; - X == $!; X == $*; X == $'; X == $(; - X == $); X == $;; X == $:; X == $@; - X == $&; X == $=; X == $+; X == $$; - X == $,; X == $/; X == $?; X == $%; - X == $#; X == $[; X == $] -> - url_encode_char(T, [X | Acc]); -url_encode_char([X | T], Acc) -> - url_encode_char(T, [$%, d2h(X bsr 4), d2h(X band 16#0f) | Acc]); -url_encode_char([], Acc) -> +escape(Bin) when binary(Bin) -> + escape(binary_to_list(Bin)); +escape(L) when is_list(L) -> + escape_char(lists:reverse(L), []). + +escape_char([$\\ | T], Acc) -> + escape_char(T, [$\\, $\\ | Acc]); +escape_char([X | T], Acc) when X > 32, X /= 127 -> + escape_char(T, [X | Acc]); +escape_char([X | T], Acc) -> + escape_char(T, [$\\, $0 + (X bsr 6), $0 + (X band 8#070 bsr 3), + $0 + (X band 7) | Acc]); +escape_char([], Acc) -> Acc. -d2h(N) when N<10 -> N+$0; -d2h(N) -> N+$a-10. - list_replace(Find, Replace, List) -> [case X of Find -> Replace; _ -> X end || X <- List]. diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl index 2be005034e..b789fbd1e0 100644 --- a/src/rabbit_guid.erl +++ b/src/rabbit_guid.erl @@ -42,6 +42,7 @@ terminate/2, code_change/3]). -define(SERVER, ?MODULE). +-define(SERIAL_FILENAME, "rabbit_serial"). -record(state, {serial}). @@ -59,17 +60,28 @@ %%---------------------------------------------------------------------------- start_link() -> - %% The persister can get heavily loaded, and we don't want that to - %% impact guid generation. We therefore keep the serial in a - %% separate process rather than calling rabbit_persister:serial/0 - %% directly in the functions below. gen_server:start_link({local, ?SERVER}, ?MODULE, - [rabbit_persister:serial()], []). + [update_disk_serial()], []). + +update_disk_serial() -> + Filename = filename:join(rabbit_mnesia:dir(), ?SERIAL_FILENAME), + Serial = case rabbit_misc:read_term_file(Filename) of + {ok, [Num]} -> Num; + {error, enoent} -> rabbit_persister:serial(); + {error, Reason} -> + throw({error, {cannot_read_serial_file, Filename, Reason}}) + end, + case rabbit_misc:write_term_file(Filename, [Serial + 1]) of + ok -> ok; + {error, Reason1} -> + throw({error, {cannot_write_serial_file, Filename, Reason1}}) + end, + Serial. %% generate a guid that is monotonically increasing per process. %% %% The id is only unique within a single cluster and as long as the -%% persistent message store hasn't been deleted. +%% serial store hasn't been deleted. guid() -> %% We don't use erlang:now() here because a) it may return %% duplicates when the system clock has been rewound prior to a @@ -77,7 +89,7 @@ guid() -> %% now() to move ahead of the system time), and b) it is really %% slow since it takes a global lock and makes a system call. %% - %% rabbit_persister:serial/0, in combination with self/0 (which + %% A persisted serial number, in combination with self/0 (which %% includes the node name) uniquely identifies a process in space %% and time. We combine that with a process-local counter to give %% us a GUID that is monotonically increasing per process. diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl index 0a68c9adad..ed0066fe07 100644 --- a/src/rabbit_heartbeat.erl +++ b/src/rabbit_heartbeat.erl @@ -53,7 +53,7 @@ start_heartbeat(Sock, TimeoutSec) -> spawn_link(fun () -> heartbeater(Sock, TimeoutSec * 1000 div 2, send_oct, 0, fun () -> - catch gen_tcp:send(Sock, rabbit_binary_generator:build_heartbeat_frame()), + catch rabbit_net:send(Sock, rabbit_binary_generator:build_heartbeat_frame()), continue end, erlang:monitor(process, Parent)) end), @@ -73,7 +73,7 @@ heartbeater(Sock, TimeoutMillisec, StatName, Threshold, Handler, MonitorRef) -> {'DOWN', MonitorRef, process, _Object, _Info} -> ok; Other -> exit({unexpected_message, Other}) after TimeoutMillisec -> - case inet:getstat(Sock, [StatName]) of + case rabbit_net:getstat(Sock, [StatName]) of {ok, [{StatName, NewStatVal}]} -> if NewStatVal =/= StatVal -> F({NewStatVal, 0}); diff --git a/src/rabbit_memsup.erl b/src/rabbit_memsup.erl new file mode 100644 index 0000000000..b0d57cb27e --- /dev/null +++ b/src/rabbit_memsup.erl @@ -0,0 +1,142 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_memsup). + +-behaviour(gen_server). + +-export([start_link/1]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-export([update/0]). + +-record(state, {memory_fraction, + timeout, + timer, + mod, + mod_state, + alarmed + }). + +-define(SERVER, memsup). %% must be the same as the standard memsup + +-define(DEFAULT_MEMORY_CHECK_INTERVAL, 1000). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_link/1 :: (atom()) -> {'ok', pid()} | 'ignore' | {'error', any()}). +-spec(update/0 :: () -> 'ok'). + +-endif. + +%%---------------------------------------------------------------------------- + +start_link(Args) -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [Args], []). + +update() -> + gen_server:cast(?SERVER, update). + +%%---------------------------------------------------------------------------- + +init([Mod]) -> + Fraction = os_mon:get_env(memsup, system_memory_high_watermark), + TRef = start_timer(?DEFAULT_MEMORY_CHECK_INTERVAL), + InitState = Mod:init(), + State = #state { memory_fraction = Fraction, + timeout = ?DEFAULT_MEMORY_CHECK_INTERVAL, + timer = TRef, + mod = Mod, + mod_state = InitState, + alarmed = false }, + {ok, internal_update(State)}. + +start_timer(Timeout) -> + {ok, TRef} = timer:apply_interval(Timeout, ?MODULE, update, []), + TRef. + +%% Export the same API as the real memsup. Note that +%% get_sysmem_high_watermark gives an int in the range 0 - 100, while +%% set_sysmem_high_watermark takes a float in the range 0.0 - 1.0. +handle_call(get_sysmem_high_watermark, _From, State) -> + {reply, trunc(100 * State#state.memory_fraction), State}; + +handle_call({set_sysmem_high_watermark, Float}, _From, State) -> + {reply, ok, State#state{memory_fraction = Float}}; + +handle_call(get_check_interval, _From, State) -> + {reply, State#state.timeout, State}; + +handle_call({set_check_interval, Timeout}, _From, State) -> + {ok, cancel} = timer:cancel(State#state.timer), + {reply, ok, State#state{timeout = Timeout, timer = start_timer(Timeout)}}; + +handle_call(get_memory_data, _From, + State = #state { mod = Mod, mod_state = ModState }) -> + {reply, Mod:get_memory_data(ModState), State}; + +handle_call(_Request, _From, State) -> + {noreply, State}. + +handle_cast(update, State) -> + {noreply, internal_update(State)}; + +handle_cast(_Request, State) -> + {noreply, State}. + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +internal_update(State = #state { memory_fraction = MemoryFraction, + alarmed = Alarmed, + mod = Mod, mod_state = ModState }) -> + ModState1 = Mod:update(ModState), + {MemTotal, MemUsed, _BigProc} = Mod:get_memory_data(ModState1), + NewAlarmed = MemUsed / MemTotal > MemoryFraction, + case {Alarmed, NewAlarmed} of + {false, true} -> + alarm_handler:set_alarm({system_memory_high_watermark, []}); + {true, false} -> + alarm_handler:clear_alarm(system_memory_high_watermark); + _ -> + ok + end, + State #state { mod_state = ModState1, alarmed = NewAlarmed }. diff --git a/src/rabbit_memsup_darwin.erl b/src/rabbit_memsup_darwin.erl new file mode 100644 index 0000000000..3de2d8430e --- /dev/null +++ b/src/rabbit_memsup_darwin.erl @@ -0,0 +1,88 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_memsup_darwin). + +-export([init/0, update/1, get_memory_data/1]). + +-record(state, {total_memory, + allocated_memory}). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-type(state() :: #state { total_memory :: ('undefined' | non_neg_integer()), + allocated_memory :: ('undefined' | non_neg_integer()) + }). + +-spec(init/0 :: () -> state()). +-spec(update/1 :: (state()) -> state()). +-spec(get_memory_data/1 :: (state()) -> {non_neg_integer(), non_neg_integer(), + ('undefined' | pid())}). + +-endif. + +%%---------------------------------------------------------------------------- + +init() -> + #state{total_memory = undefined, + allocated_memory = undefined}. + +update(State) -> + File = os:cmd("/usr/bin/vm_stat"), + Lines = string:tokens(File, "\n"), + Dict = dict:from_list(lists:map(fun parse_line/1, Lines)), + [PageSize, Inactive, Active, Free, Wired] = + [dict:fetch(Key, Dict) || + Key <- [page_size, 'Pages inactive', 'Pages active', 'Pages free', + 'Pages wired down']], + MemTotal = PageSize * (Inactive + Active + Free + Wired), + MemUsed = PageSize * (Active + Wired), + State#state{total_memory = MemTotal, allocated_memory = MemUsed}. + +get_memory_data(State) -> + {State#state.total_memory, State#state.allocated_memory, undefined}. + +%%---------------------------------------------------------------------------- + +%% A line looks like "Foo bar: 123456." +parse_line(Line) -> + [Name, RHS | _Rest] = string:tokens(Line, ":"), + case Name of + "Mach Virtual Memory Statistics" -> + ["(page", "size", "of", PageSize, "bytes)"] = + string:tokens(RHS, " "), + {page_size, list_to_integer(PageSize)}; + _ -> + [Value | _Rest1] = string:tokens(RHS, " ."), + {list_to_atom(Name), list_to_integer(Value)} + end. diff --git a/src/rabbit_memsup_linux.erl b/src/rabbit_memsup_linux.erl index ffdc7e9946..ca942d7caa 100644 --- a/src/rabbit_memsup_linux.erl +++ b/src/rabbit_memsup_linux.erl @@ -31,104 +31,44 @@ -module(rabbit_memsup_linux). --behaviour(gen_server). +-export([init/0, update/1, get_memory_data/1]). --export([start_link/0]). - --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). - --export([update/0]). - --define(SERVER, memsup). %% must be the same as the standard memsup - --define(DEFAULT_MEMORY_CHECK_INTERVAL, 1000). - --record(state, {memory_fraction, alarmed, timeout, timer}). +-record(state, {total_memory, + allocated_memory}). %%---------------------------------------------------------------------------- -ifdef(use_specs). --spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}). --spec(update/0 :: () -> 'ok'). - +-type(state() :: #state { total_memory :: ('undefined' | non_neg_integer()), + allocated_memory :: ('undefined' | non_neg_integer()) + }). + +-spec(init/0 :: () -> state()). +-spec(update/1 :: (state()) -> state()). +-spec(get_memory_data/1 :: (state()) -> {non_neg_integer(), non_neg_integer(), + ('undefined' | pid())}). + -endif. %%---------------------------------------------------------------------------- -start_link() -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). +init() -> + #state{total_memory = undefined, + allocated_memory = undefined}. - -update() -> - gen_server:cast(?SERVER, update). - -%%---------------------------------------------------------------------------- - -init(_Args) -> - Fraction = os_mon:get_env(memsup, system_memory_high_watermark), - TRef = start_timer(?DEFAULT_MEMORY_CHECK_INTERVAL), - {ok, #state{alarmed = false, - memory_fraction = Fraction, - timeout = ?DEFAULT_MEMORY_CHECK_INTERVAL, - timer = TRef}}. - -start_timer(Timeout) -> - {ok, TRef} = timer:apply_interval(Timeout, ?MODULE, update, []), - TRef. - -%% Export the same API as the real memsup. Note that -%% get_sysmem_high_watermark gives an int in the range 0 - 100, while -%% set_sysmem_high_watermark takes a float in the range 0.0 - 1.0. -handle_call(get_sysmem_high_watermark, _From, State) -> - {reply, trunc(100 * State#state.memory_fraction), State}; - -handle_call({set_sysmem_high_watermark, Float}, _From, State) -> - {reply, ok, State#state{memory_fraction = Float}}; - -handle_call(get_check_interval, _From, State) -> - {reply, State#state.timeout, State}; - -handle_call({set_check_interval, Timeout}, _From, State) -> - {ok, cancel} = timer:cancel(State#state.timer), - {reply, ok, State#state{timeout = Timeout, timer = start_timer(Timeout)}}; - -handle_call(_Request, _From, State) -> - {noreply, State}. - -handle_cast(update, State = #state{alarmed = Alarmed, - memory_fraction = MemoryFraction}) -> +update(State) -> File = read_proc_file("/proc/meminfo"), Lines = string:tokens(File, "\n"), Dict = dict:from_list(lists:map(fun parse_line/1, Lines)), - MemTotal = dict:fetch('MemTotal', Dict), - MemUsed = MemTotal - - dict:fetch('MemFree', Dict) - - dict:fetch('Buffers', Dict) - - dict:fetch('Cached', Dict), - NewAlarmed = MemUsed / MemTotal > MemoryFraction, - case {Alarmed, NewAlarmed} of - {false, true} -> - alarm_handler:set_alarm({system_memory_high_watermark, []}); - {true, false} -> - alarm_handler:clear_alarm(system_memory_high_watermark); - _ -> - ok - end, - {noreply, State#state{alarmed = NewAlarmed}}; + [MemTotal, MemFree, Buffers, Cached] = + [dict:fetch(Key, Dict) || + Key <- ['MemTotal', 'MemFree', 'Buffers', 'Cached']], + MemUsed = MemTotal - MemFree - Buffers - Cached, + State#state{total_memory = MemTotal, allocated_memory = MemUsed}. -handle_cast(_Request, State) -> - {noreply, State}. - -handle_info(_Info, State) -> - {noreply, State}. - -terminate(_Reason, _State) -> - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. +get_memory_data(State) -> + {State#state.total_memory, State#state.allocated_memory, undefined}. %%---------------------------------------------------------------------------- @@ -152,5 +92,10 @@ read_proc_file(IoDevice, Acc) -> %% A line looks like "FooBar: 123456 kB" parse_line(Line) -> - [Name, Value | _] = string:tokens(Line, ": "), - {list_to_atom(Name), list_to_integer(Value)}. + [Name, RHS | _Rest] = string:tokens(Line, ":"), + [Value | UnitsRest] = string:tokens(RHS, " "), + Value1 = case UnitsRest of + [] -> list_to_integer(Value); %% no units + ["kB"] -> list_to_integer(Value) * 1024 + end, + {list_to_atom(Name), Value1}. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index abf4c7ccfa..95a274e37e 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -50,9 +50,11 @@ -export([intersperse/2, upmap/2, map_in_order/2]). -export([table_foreach/2]). -export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]). +-export([read_term_file/1, write_term_file/2]). -export([append_file/2, ensure_parent_dirs_exist/1]). -export([format_stderr/2]). -export([start_applications/1, stop_applications/1]). +-export([unfold/2, ceil/1]). -import(mnesia). -import(lists). @@ -65,6 +67,8 @@ -include_lib("kernel/include/inet.hrl"). +-type(ok_or_error() :: 'ok' | {'error', any()}). + -spec(method_record_type/1 :: (tuple()) -> atom()). -spec(polite_pause/0 :: () -> 'done'). -spec(polite_pause/1 :: (non_neg_integer()) -> 'done'). @@ -88,9 +92,9 @@ -spec(r_arg/4 :: (vhost() | r(atom()), K, amqp_table(), binary()) -> undefined | r(K) when is_subtype(K, atom())). -spec(rs/1 :: (r(atom())) -> string()). --spec(enable_cover/0 :: () -> 'ok' | {'error', any()}). +-spec(enable_cover/0 :: () -> ok_or_error()). -spec(report_cover/0 :: () -> 'ok'). --spec(enable_cover/1 :: (string()) -> 'ok' | {'error', any()}). +-spec(enable_cover/1 :: (string()) -> ok_or_error()). -spec(report_cover/1 :: (string()) -> 'ok'). -spec(throw_on_error/2 :: (atom(), thunk({error, any()} | {ok, A} | A)) -> A). @@ -100,7 +104,7 @@ -spec(with_vhost/2 :: (vhost(), thunk(A)) -> A). -spec(with_user_and_vhost/3 :: (username(), vhost(), thunk(A)) -> A). -spec(execute_mnesia_transaction/1 :: (thunk(A)) -> A). --spec(ensure_ok/2 :: ('ok' | {'error', any()}, atom()) -> 'ok'). +-spec(ensure_ok/2 :: (ok_or_error(), atom()) -> 'ok'). -spec(localnode/1 :: (atom()) -> erlang_node()). -spec(tcp_name/3 :: (atom(), ip_address(), ip_port()) -> atom()). -spec(intersperse/2 :: (A, [A]) -> [A]). @@ -110,12 +114,16 @@ -spec(dirty_read_all/1 :: (atom()) -> [any()]). -spec(dirty_foreach_key/2 :: (fun ((any()) -> any()), atom()) -> 'ok' | 'aborted'). --spec(dirty_dump_log/1 :: (string()) -> 'ok' | {'error', any()}). --spec(append_file/2 :: (string(), string()) -> 'ok' | {'error', any()}). +-spec(dirty_dump_log/1 :: (string()) -> ok_or_error()). +-spec(read_term_file/1 :: (string()) -> {'ok', [any()]} | {'error', any()}). +-spec(write_term_file/2 :: (string(), [any()]) -> ok_or_error()). +-spec(append_file/2 :: (string(), string()) -> ok_or_error()). -spec(ensure_parent_dirs_exist/1 :: (string()) -> 'ok'). -spec(format_stderr/2 :: (string(), [any()]) -> 'ok'). -spec(start_applications/1 :: ([atom()]) -> 'ok'). -spec(stop_applications/1 :: ([atom()]) -> 'ok'). +-spec(unfold/2 :: (fun ((A) -> ({'true', B, A} | 'false')), A) -> {[B], A}). +-spec(ceil/1 :: (number()) -> number()). -endif. @@ -360,7 +368,9 @@ dirty_foreach_key1(F, TableName, K) -> end. dirty_dump_log(FileName) -> - {ok, LH} = disk_log:open([{name, dirty_dump_log}, {mode, read_only}, {file, FileName}]), + {ok, LH} = disk_log:open([{name, dirty_dump_log}, + {mode, read_only}, + {file, FileName}]), dirty_dump_log1(LH, disk_log:chunk(LH, start)), disk_log:close(LH). @@ -374,6 +384,12 @@ dirty_dump_log1(LH, {K, Terms, BadBytes}) -> dirty_dump_log1(LH, disk_log:chunk(LH, K)). +read_term_file(File) -> file:consult(File). + +write_term_file(File, Terms) -> + file:write_file(File, list_to_binary([io_lib:format("~w.~n", [Term]) || + Term <- Terms])). + append_file(File, Suffix) -> case file:read_file_info(File) of {ok, FInfo} -> append_file(File, FInfo#file_info.size, Suffix); @@ -444,3 +460,18 @@ stop_applications(Apps) -> cannot_stop_application, Apps). +unfold(Fun, Init) -> + unfold(Fun, [], Init). + +unfold(Fun, Acc, Init) -> + case Fun(Init) of + {true, E, I} -> unfold(Fun, [E|Acc], I); + false -> {Acc, Init} + end. + +ceil(N) -> + T = trunc(N), + case N - T of + 0 -> N; + _ -> 1 + T + end. diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 575ecb0adc..37e20335bb 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -149,6 +149,11 @@ table_definitions() -> table_names() -> [Tab || {Tab, _} <- table_definitions()]. +replicated_table_names() -> + [Tab || {Tab, Attrs} <- table_definitions(), + not lists:member({local_content, true}, Attrs) + ]. + dir() -> mnesia:system_info(directory). ensure_mnesia_dir() -> @@ -192,28 +197,16 @@ cluster_nodes_config_filename() -> create_cluster_nodes_config(ClusterNodes) -> FileName = cluster_nodes_config_filename(), - Handle = case file:open(FileName, [write]) of - {ok, Device} -> Device; - {error, Reason} -> - throw({error, {cannot_create_cluster_nodes_config, - FileName, Reason}}) - end, - try - ok = io:write(Handle, ClusterNodes), - ok = io:put_chars(Handle, [$.]) - after - case file:close(Handle) of - ok -> ok; - {error, Reason1} -> - throw({error, {cannot_close_cluster_nodes_config, - FileName, Reason1}}) - end - end, - ok. + case rabbit_misc:write_term_file(FileName, [ClusterNodes]) of + ok -> ok; + {error, Reason} -> + throw({error, {cannot_create_cluster_nodes_config, + FileName, Reason}}) + end. read_cluster_nodes_config() -> FileName = cluster_nodes_config_filename(), - case file:consult(FileName) of + case rabbit_misc:read_term_file(FileName) of {ok, [ClusterNodes]} -> ClusterNodes; {error, enoent} -> case application:get_env(cluster_config) of @@ -250,12 +243,10 @@ delete_cluster_nodes_config() -> %% standalone disk node, or disk or ram node connected to the %% specified cluster nodes. init_db(ClusterNodes) -> - WasDiskNode = mnesia:system_info(use_dir), - IsDiskNode = ClusterNodes == [] orelse - lists:member(node(), ClusterNodes), case mnesia:change_config(extra_db_nodes, ClusterNodes -- [node()]) of {ok, []} -> - if WasDiskNode and IsDiskNode -> + case mnesia:system_info(use_dir) of + true -> case check_schema_integrity() of ok -> ok; @@ -270,22 +261,18 @@ init_db(ClusterNodes) -> ok = move_db(), ok = create_schema() end; - WasDiskNode -> - throw({error, {cannot_convert_disk_node_to_ram_node, - ClusterNodes}}); - IsDiskNode -> - ok = create_schema(); - true -> - throw({error, {unable_to_contact_cluster_nodes, - ClusterNodes}}) + false -> + ok = create_schema() end; {ok, [_|_]} -> - ok = wait_for_tables(), - ok = create_local_table_copies( - case IsDiskNode of - true -> disc; - false -> ram - end); + IsDiskNode = ClusterNodes == [] orelse + lists:member(node(), ClusterNodes), + ok = wait_for_replicated_tables(), + ok = create_local_table_copy(schema, disc_copies), + ok = create_local_table_copies(case IsDiskNode of + true -> disc; + false -> ram + end); {error, Reason} -> %% one reason we may end up here is if we try to join %% nodes together that are currently running standalone or @@ -336,40 +323,36 @@ create_tables() -> table_definitions()), ok. +table_has_copy_type(TabDef, DiscType) -> + lists:member(node(), proplists:get_value(DiscType, TabDef, [])). + create_local_table_copies(Type) -> - ok = if Type /= ram -> create_local_table_copy(schema, disc_copies); - true -> ok - end, lists:foreach( fun({Tab, TabDef}) -> - HasDiscCopies = - lists:keymember(disc_copies, 1, TabDef), - HasDiscOnlyCopies = - lists:keymember(disc_only_copies, 1, TabDef), + HasDiscCopies = table_has_copy_type(TabDef, disc_copies), + HasDiscOnlyCopies = table_has_copy_type(TabDef, disc_only_copies), + LocalTab = proplists:get_bool(local_content, TabDef), StorageType = - case Type of - disc -> + if + Type =:= disc orelse LocalTab -> if - HasDiscCopies -> disc_copies; + HasDiscCopies -> disc_copies; HasDiscOnlyCopies -> disc_only_copies; - true -> ram_copies + true -> ram_copies end; %% unused code - commented out to keep dialyzer happy -%% disc_only -> +%% Type =:= disc_only -> %% if %% HasDiscCopies or HasDiscOnlyCopies -> %% disc_only_copies; %% true -> ram_copies %% end; - ram -> + Type =:= ram -> ram_copies end, ok = create_local_table_copy(Tab, StorageType) end, table_definitions()), - ok = if Type == ram -> create_local_table_copy(schema, ram_copies); - true -> ok - end, ok. create_local_table_copy(Tab, Type) -> @@ -384,10 +367,14 @@ create_local_table_copy(Tab, Type) -> end, ok. -wait_for_tables() -> +wait_for_replicated_tables() -> wait_for_tables(replicated_table_names()). + +wait_for_tables() -> wait_for_tables(table_names()). + +wait_for_tables(TableNames) -> case check_schema_integrity() of ok -> - case mnesia:wait_for_tables(table_names(), 30000) of + case mnesia:wait_for_tables(TableNames, 30000) of ok -> ok; {timeout, BadTabs} -> throw({error, {timeout_waiting_for_tables, BadTabs}}); diff --git a/src/rabbit_multi.erl b/src/rabbit_multi.erl index d91975359a..b1cc4d028f 100644 --- a/src/rabbit_multi.erl +++ b/src/rabbit_multi.erl @@ -114,12 +114,13 @@ action(status, [], RpcTimeout) -> io:format("Status of all running nodes...~n", []), call_all_nodes( fun({Node, Pid}) -> - Status = rpc:call(Node, rabbit, status, [], RpcTimeout), + RabbitRunning = + case is_rabbit_running(Node, RpcTimeout) of + false -> not_running; + true -> running + end, io:format("Node '~p' with Pid ~p: ~p~n", - [Node, Pid, case parse_status(Status) of - false -> not_running; - true -> running - end]) + [Node, Pid, RabbitRunning]) end); action(stop_all, [], RpcTimeout) -> @@ -197,7 +198,7 @@ start_node(NodeName, NodePort, RpcTimeout) -> wait_for_rabbit_to_start(_ , RpcTimeout, _) when RpcTimeout < 0 -> false; wait_for_rabbit_to_start(Node, RpcTimeout, Port) -> - case parse_status(rpc:call(Node, rabbit, status, [])) of + case is_rabbit_running(Node, RpcTimeout) of true -> true; false -> receive {'EXIT', Port, PosixCode} -> @@ -211,22 +212,20 @@ wait_for_rabbit_to_start(Node, RpcTimeout, Port) -> run_cmd(FullPath) -> erlang:open_port({spawn, FullPath}, [nouse_stdio]). -parse_status({badrpc, _}) -> - false; - -parse_status(Status) -> - case lists:keysearch(running_applications, 1, Status) of - {value, {running_applications, Apps}} -> - lists:keymember(rabbit, 1, Apps); - _ -> - false +is_rabbit_running(Node, RpcTimeout) -> + case rpc:call(Node, rabbit, status, [], RpcTimeout) of + {badrpc, _} -> false; + Status -> case proplists:get_value(running_applications, Status) of + undefined -> false; + Apps -> lists:keymember(rabbit, 1, Apps) + end end. with_os(Handlers) -> {OsFamily, _} = os:type(), - case lists:keysearch(OsFamily, 1, Handlers) of - {value, {_, Handler}} -> Handler(); - false -> throw({unsupported_os, OsFamily}) + case proplists:get_value(OsFamily, Handlers) of + undefined -> throw({unsupported_os, OsFamily}); + Handler -> Handler() end. script_filename() -> diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl new file mode 100644 index 0000000000..a5ccc8e9ae --- /dev/null +++ b/src/rabbit_net.erl @@ -0,0 +1,132 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_net). +-include("rabbit.hrl"). +-include_lib("kernel/include/inet.hrl"). + +-export([async_recv/3, close/1, controlling_process/2, + getstat/2, peername/1, port_command/2, + send/2, sockname/1]). +%%--------------------------------------------------------------------------- + +-ifdef(use_specs). + +-type(stat_option() :: + 'recv_cnt' | 'recv_max' | 'recv_avg' | 'recv_oct' | 'recv_dvi' | + 'send_cnt' | 'send_max' | 'send_avg' | 'send_oct' | 'send_pend'). +-type(error() :: {'error', any()}). + +-spec(async_recv/3 :: (socket(), integer(), timeout()) -> {'ok', any()}). +-spec(close/1 :: (socket()) -> 'ok' | error()). +-spec(controlling_process/2 :: (socket(), pid()) -> 'ok' | error()). +-spec(port_command/2 :: (socket(), iolist()) -> 'true'). +-spec(send/2 :: (socket(), binary() | iolist()) -> 'ok' | error()). +-spec(peername/1 :: (socket()) -> + {'ok', {ip_address(), non_neg_integer()}} | error()). +-spec(sockname/1 :: (socket()) -> + {'ok', {ip_address(), non_neg_integer()}} | error()). +-spec(getstat/2 :: (socket(), [stat_option()]) -> + {'ok', [{stat_option(), integer()}]} | error()). + +-endif. + +%%--------------------------------------------------------------------------- + + +async_recv(Sock, Length, Timeout) when is_record(Sock, ssl_socket) -> + Pid = self(), + Ref = make_ref(), + + spawn(fun() -> Pid ! {inet_async, Sock, Ref, + ssl:recv(Sock#ssl_socket.ssl, Length, Timeout)} + end), + + {ok, Ref}; + +async_recv(Sock, Length, infinity) when is_port(Sock) -> + prim_inet:async_recv(Sock, Length, -1); + +async_recv(Sock, Length, Timeout) when is_port(Sock) -> + prim_inet:async_recv(Sock, Length, Timeout). + +close(Sock) when is_record(Sock, ssl_socket) -> + ssl:close(Sock#ssl_socket.ssl); + +close(Sock) when is_port(Sock) -> + gen_tcp:close(Sock). + + +controlling_process(Sock, Pid) when is_record(Sock, ssl_socket) -> + ssl:controlling_process(Sock#ssl_socket.ssl, Pid); + +controlling_process(Sock, Pid) when is_port(Sock) -> + gen_tcp:controlling_process(Sock, Pid). + + +getstat(Sock, Stats) when is_record(Sock, ssl_socket) -> + inet:getstat(Sock#ssl_socket.tcp, Stats); + +getstat(Sock, Stats) when is_port(Sock) -> + inet:getstat(Sock, Stats). + + +peername(Sock) when is_record(Sock, ssl_socket) -> + ssl:peername(Sock#ssl_socket.ssl); + +peername(Sock) when is_port(Sock) -> + inet:peername(Sock). + + +port_command(Sock, Data) when is_record(Sock, ssl_socket) -> + case ssl:send(Sock#ssl_socket.ssl, Data) of + ok -> + self() ! {inet_reply, Sock, ok}, + true; + {error, Reason} -> + erlang:error(Reason) + end; + +port_command(Sock, Data) when is_port(Sock) -> + erlang:port_command(Sock, Data). + +send(Sock, Data) when is_record(Sock, ssl_socket) -> + ssl:send(Sock#ssl_socket.ssl, Data); + +send(Sock, Data) when is_port(Sock) -> + gen_tcp:send(Sock, Data). + + +sockname(Sock) when is_record(Sock, ssl_socket) -> + ssl:sockname(Sock#ssl_socket.ssl); + +sockname(Sock) when is_port(Sock) -> + inet:sockname(Sock). diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 2dbd5a5af2..eed21a01cd 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -31,18 +31,28 @@ -module(rabbit_networking). --export([start/0, start_tcp_listener/2, stop_tcp_listener/2, - on_node_down/1, active_listeners/0, node_listeners/1, - connections/0, connection_info/1, connection_info/2, - connection_info_all/0, connection_info_all/1]). +-export([start/0, start_tcp_listener/2, start_ssl_listener/3, + stop_tcp_listener/2, on_node_down/1, active_listeners/0, + node_listeners/1, connections/0, connection_info/1, + connection_info/2, connection_info_all/0, + connection_info_all/1]). %%used by TCP-based transports, e.g. STOMP adapter -export([check_tcp_listener_address/3]). --export([tcp_listener_started/2, tcp_listener_stopped/2, start_client/1]). +-export([tcp_listener_started/2, ssl_connection_upgrade/2, + tcp_listener_stopped/2, start_client/1]). -include("rabbit.hrl"). -include_lib("kernel/include/inet.hrl"). +-define(RABBIT_TCP_OPTS, [ + binary, + {packet, raw}, % no packaging + {reuseaddr, true}, % allow rebind without waiting + %% {nodelay, true}, % TCP_NODELAY - disable Nagle's alg. + %% {delay_send, true}, + {exit_on_close, false} + ]). %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -52,6 +62,7 @@ -spec(start/0 :: () -> 'ok'). -spec(start_tcp_listener/2 :: (host(), ip_port()) -> 'ok'). +-spec(start_ssl_listener/3 :: (host(), ip_port(), [info()]) -> 'ok'). -spec(stop_tcp_listener/2 :: (host(), ip_port()) -> 'ok'). -spec(active_listeners/0 :: () -> [listener()]). -spec(node_listeners/1 :: (erlang_node()) -> [listener()]). @@ -96,21 +107,24 @@ check_tcp_listener_address(NamePrefix, Host, Port) -> {IPAddress, Name}. start_tcp_listener(Host, Port) -> - {IPAddress, Name} = check_tcp_listener_address(rabbit_tcp_listener_sup, Host, Port), + start_listener(Host, Port, "TCP Listener", + {?MODULE, start_client, []}). + +start_ssl_listener(Host, Port, SslOpts) -> + start_listener(Host, Port, "SSL Listener", + {?MODULE, ssl_connection_upgrade, [SslOpts]}). + +start_listener(Host, Port, Label, OnConnect) -> + {IPAddress, Name} = + check_tcp_listener_address(rabbit_tcp_listener_sup, Host, Port), {ok,_} = supervisor:start_child( rabbit_sup, {Name, {tcp_listener_sup, start_link, - [IPAddress, Port, - [binary, - {packet, raw}, % no packaging - {reuseaddr, true}, % allow rebind without waiting - %% {nodelay, true}, % TCP_NODELAY - disable Nagle's alg. - %% {delay_send, true}, - {exit_on_close, false}], + [IPAddress, Port, ?RABBIT_TCP_OPTS , {?MODULE, tcp_listener_started, []}, {?MODULE, tcp_listener_stopped, []}, - {?MODULE, start_client, []}]}, + OnConnect, Label]}, transient, infinity, supervisor, [tcp_listener_sup]}), ok. @@ -148,10 +162,27 @@ on_node_down(Node) -> start_client(Sock) -> {ok, Child} = supervisor:start_child(rabbit_tcp_client_sup, []), - ok = gen_tcp:controlling_process(Sock, Child), + ok = rabbit_net:controlling_process(Sock, Child), Child ! {go, Sock}, Child. +ssl_connection_upgrade(SslOpts, Sock) -> + {ok, {PeerAddress, PeerPort}} = rabbit_net:peername(Sock), + PeerIp = inet_parse:ntoa(PeerAddress), + + case ssl:ssl_accept(Sock, SslOpts) of + {ok, SslSock} -> + rabbit_log:info("upgraded TCP connection from ~s:~p to SSL~n", + [PeerIp, PeerPort]), + RabbitSslSock = #ssl_socket{tcp = Sock, ssl = SslSock}, + start_client(RabbitSslSock); + {error, Reason} -> + gen_tcp:close(Sock), + rabbit_log:error("failed to upgrade TCP connection from ~s:~p " + "to SSL: ~n~p~n", [PeerIp, PeerPort, Reason]), + {error, Reason} + end. + connections() -> [Pid || {_, Pid, _, _} <- supervisor:which_children( rabbit_tcp_client_sup)]. diff --git a/src/rabbit_plugin_activator.erl b/src/rabbit_plugin_activator.erl index 71278bfb2a..0206f73e9f 100644 --- a/src/rabbit_plugin_activator.erl +++ b/src/rabbit_plugin_activator.erl @@ -68,7 +68,7 @@ start() -> AppList end, AppVersions = [determine_version(App) || App <- AllApps], - {value, {rabbit, RabbitVersion}} = lists:keysearch(rabbit, 1, AppVersions), + {rabbit, RabbitVersion} = proplists:lookup(rabbit, AppVersions), %% Build the overall release descriptor RDesc = {release, diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 426b99eba1..690e6f0e38 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -200,7 +200,7 @@ inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F). peername(Sock) -> try - {Address, Port} = inet_op(fun () -> inet:peername(Sock) end), + {Address, Port} = inet_op(fun () -> rabbit_net:peername(Sock) end), AddressS = inet_parse:ntoa(Address), {AddressS, Port} catch @@ -286,7 +286,7 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) -> %% since this termination is initiated by our parent it is %% probably more important to exit quickly. exit(Reason); - {'EXIT', _Pid, E = {writer, send_failed, _Error}} -> + {channel_exit, _Chan, E = {writer, send_failed, _Error}} -> throw(E); {channel_exit, Channel, Reason} -> mainloop(Parent, Deb, handle_channel_exit(Channel, Reason, State)); @@ -323,8 +323,8 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) -> end. switch_callback(OldState, NewCallback, Length) -> - Ref = inet_op(fun () -> prim_inet:async_recv( - OldState#v1.sock, Length, -1) end), + Ref = inet_op(fun () -> rabbit_net:async_recv( + OldState#v1.sock, Length, infinity) end), OldState#v1{callback = NewCallback, recv_ref = Ref}. @@ -539,7 +539,7 @@ handle_input(handshake, <<"AMQP",1,1,ProtocolMajor,ProtocolMinor>>, end; handle_input(handshake, Other, #v1{sock = Sock}) -> - ok = inet_op(fun () -> gen_tcp:send( + ok = inet_op(fun () -> rabbit_net:send( Sock, <<"AMQP",1,1, ?PROTOCOL_VERSION_MAJOR, ?PROTOCOL_VERSION_MINOR>>) end), @@ -675,23 +675,23 @@ infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. i(pid, #v1{}) -> self(); i(address, #v1{sock = Sock}) -> - {ok, {A, _}} = inet:sockname(Sock), + {ok, {A, _}} = rabbit_net:sockname(Sock), A; i(port, #v1{sock = Sock}) -> - {ok, {_, P}} = inet:sockname(Sock), + {ok, {_, P}} = rabbit_net:sockname(Sock), P; i(peer_address, #v1{sock = Sock}) -> - {ok, {A, _}} = inet:peername(Sock), + {ok, {A, _}} = rabbit_net:peername(Sock), A; i(peer_port, #v1{sock = Sock}) -> - {ok, {_, P}} = inet:peername(Sock), + {ok, {_, P}} = rabbit_net:peername(Sock), P; i(SockStat, #v1{sock = Sock}) when SockStat =:= recv_oct; SockStat =:= recv_cnt; SockStat =:= send_oct; SockStat =:= send_cnt; SockStat =:= send_pend -> - case inet:getstat(Sock, [SockStat]) of + case rabbit_net:getstat(Sock, [SockStat]) of {ok, [{SockStat, StatVal}]} -> StatVal; {error, einval} -> undefined; {error, Error} -> throw({cannot_get_socket_stats, Error}) @@ -703,7 +703,7 @@ i(channels, #v1{}) -> i(user, #v1{connection = #connection{user = #user{username = Username}}}) -> Username; i(user, #v1{connection = #connection{user = none}}) -> - none; + ''; i(vhost, #v1{connection = #connection{vhost = VHost}}) -> VHost; i(timeout, #v1{connection = #connection{timeout_sec = Timeout}}) -> diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index e5100ccd16..b4cd30bc92 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -49,6 +49,7 @@ test_content_prop_roundtrip(Datum, Binary) -> all_tests() -> passed = test_priority_queue(), + passed = test_unfold(), passed = test_parsing(), passed = test_topic_matching(), passed = test_log_management(), @@ -75,7 +76,8 @@ test_priority_queue() -> %% 1-element priority Q Q1 = priority_queue:in(foo, 1, priority_queue:new()), - {true, false, 1, [{1, foo}], [foo]} = test_priority_queue(Q1), + {true, false, 1, [{1, foo}], [foo]} = + test_priority_queue(Q1), %% 2-element same-priority Q Q2 = priority_queue:in(bar, 1, Q1), @@ -91,6 +93,71 @@ test_priority_queue() -> Q4 = priority_queue:in(foo, -1, priority_queue:new()), {true, false, 1, [{-1, foo}], [foo]} = test_priority_queue(Q4), + %% merge 2 * 1-element no-priority Qs + Q5 = priority_queue:join(priority_queue:in(foo, Q), + priority_queue:in(bar, Q)), + {true, false, 2, [{0, foo}, {0, bar}], [foo, bar]} = + test_priority_queue(Q5), + + %% merge 1-element no-priority Q with 1-element priority Q + Q6 = priority_queue:join(priority_queue:in(foo, Q), + priority_queue:in(bar, 1, Q)), + {true, false, 2, [{1, bar}, {0, foo}], [bar, foo]} = + test_priority_queue(Q6), + + %% merge 1-element priority Q with 1-element no-priority Q + Q7 = priority_queue:join(priority_queue:in(foo, 1, Q), + priority_queue:in(bar, Q)), + {true, false, 2, [{1, foo}, {0, bar}], [foo, bar]} = + test_priority_queue(Q7), + + %% merge 2 * 1-element same-priority Qs + Q8 = priority_queue:join(priority_queue:in(foo, 1, Q), + priority_queue:in(bar, 1, Q)), + {true, false, 2, [{1, foo}, {1, bar}], [foo, bar]} = + test_priority_queue(Q8), + + %% merge 2 * 1-element different-priority Qs + Q9 = priority_queue:join(priority_queue:in(foo, 1, Q), + priority_queue:in(bar, 2, Q)), + {true, false, 2, [{2, bar}, {1, foo}], [bar, foo]} = + test_priority_queue(Q9), + + %% merge 2 * 1-element different-priority Qs (other way around) + Q10 = priority_queue:join(priority_queue:in(bar, 2, Q), + priority_queue:in(foo, 1, Q)), + {true, false, 2, [{2, bar}, {1, foo}], [bar, foo]} = + test_priority_queue(Q10), + + %% merge 2 * 2-element multi-different-priority Qs + Q11 = priority_queue:join(Q6, Q5), + {true, false, 4, [{1, bar}, {0, foo}, {0, foo}, {0, bar}], + [bar, foo, foo, bar]} = test_priority_queue(Q11), + + %% and the other way around + Q12 = priority_queue:join(Q5, Q6), + {true, false, 4, [{1, bar}, {0, foo}, {0, bar}, {0, foo}], + [bar, foo, bar, foo]} = test_priority_queue(Q12), + + %% merge with negative priorities + Q13 = priority_queue:join(Q4, Q5), + {true, false, 3, [{0, foo}, {0, bar}, {-1, foo}], [foo, bar, foo]} = + test_priority_queue(Q13), + + %% and the other way around + Q14 = priority_queue:join(Q5, Q4), + {true, false, 3, [{0, foo}, {0, bar}, {-1, foo}], [foo, bar, foo]} = + test_priority_queue(Q14), + + %% joins with empty queues: + Q1 = priority_queue:join(Q, Q1), + Q1 = priority_queue:join(Q1, Q), + + %% insert with priority into non-empty zero-priority queue + Q15 = priority_queue:in(baz, 1, Q5), + {true, false, 3, [{1, baz}, {0, foo}, {0, bar}], [baz, foo, bar]} = + test_priority_queue(Q15), + passed. priority_queue_in_all(Q, L) -> @@ -116,6 +183,14 @@ test_simple_n_element_queue(N) -> {true, false, N, ToListRes, Items} = test_priority_queue(Q), passed. +test_unfold() -> + {[], test} = rabbit_misc:unfold(fun (_V) -> false end, test), + List = lists:seq(2,20,2), + {List, 0} = rabbit_misc:unfold(fun (0) -> false; + (N) -> {true, N*2, N-1} + end, 10), + passed. + test_parsing() -> passed = test_content_properties(), passed. @@ -408,19 +483,17 @@ test_cluster_management() -> end, ClusteringSequence), - %% attempt to convert a disk node into a ram node + %% convert a disk node into a ram node ok = control_action(reset, []), ok = control_action(start_app, []), ok = control_action(stop_app, []), - {error, {cannot_convert_disk_node_to_ram_node, _}} = - control_action(cluster, ["invalid1@invalid", - "invalid2@invalid"]), + ok = control_action(cluster, ["invalid1@invalid", + "invalid2@invalid"]), - %% attempt to join a non-existing cluster as a ram node + %% join a non-existing cluster as a ram node ok = control_action(reset, []), - {error, {unable_to_contact_cluster_nodes, _}} = - control_action(cluster, ["invalid1@invalid", - "invalid2@invalid"]), + ok = control_action(cluster, ["invalid1@invalid", + "invalid2@invalid"]), SecondaryNode = rabbit_misc:localnode(hare), case net_adm:ping(SecondaryNode) of @@ -436,11 +509,12 @@ test_cluster_management2(SecondaryNode) -> NodeS = atom_to_list(node()), SecondaryNodeS = atom_to_list(SecondaryNode), - %% attempt to convert a disk node into a ram node + %% make a disk node ok = control_action(reset, []), ok = control_action(cluster, [NodeS]), - {error, {unable_to_join_cluster, _, _}} = - control_action(cluster, [SecondaryNodeS]), + %% make a ram node + ok = control_action(reset, []), + ok = control_action(cluster, [SecondaryNodeS]), %% join cluster as a ram node ok = control_action(reset, []), @@ -453,21 +527,21 @@ test_cluster_management2(SecondaryNode) -> ok = control_action(start_app, []), ok = control_action(stop_app, []), - %% attempt to join non-existing cluster as a ram node - {error, _} = control_action(cluster, ["invalid1@invalid", - "invalid2@invalid"]), - + %% join non-existing cluster as a ram node + ok = control_action(cluster, ["invalid1@invalid", + "invalid2@invalid"]), %% turn ram node into disk node + ok = control_action(reset, []), ok = control_action(cluster, [SecondaryNodeS, NodeS]), ok = control_action(start_app, []), ok = control_action(stop_app, []), - %% attempt to convert a disk node into a ram node - {error, {cannot_convert_disk_node_to_ram_node, _}} = - control_action(cluster, ["invalid1@invalid", - "invalid2@invalid"]), + %% convert a disk node into a ram node + ok = control_action(cluster, ["invalid1@invalid", + "invalid2@invalid"]), %% turn a disk node into a ram node + ok = control_action(reset, []), ok = control_action(cluster, [SecondaryNodeS]), ok = control_action(start_app, []), ok = control_action(stop_app, []), diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index e338ddfe9d..1679ce7c15 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -169,7 +169,7 @@ assemble_frames(Channel, MethodRecord, Content, FrameMax) -> tcp_send(Sock, Data) -> rabbit_misc:throw_on_error(inet_error, - fun () -> gen_tcp:send(Sock, Data) end). + fun () -> rabbit_net:send(Sock, Data) end). internal_send_command(Sock, Channel, MethodRecord) -> ok = tcp_send(Sock, assemble_frames(Channel, MethodRecord)). @@ -206,6 +206,6 @@ internal_send_command_async(Sock, Channel, MethodRecord, Content, FrameMax) -> ok. port_cmd(Sock, Data) -> - try erlang:port_command(Sock, Data) + try rabbit_net:port_command(Sock, Data) catch error:Error -> exit({writer, send_failed, Error}) end. diff --git a/src/tcp_listener.erl b/src/tcp_listener.erl index 92a47cf127..4a2e149bb8 100644 --- a/src/tcp_listener.erl +++ b/src/tcp_listener.erl @@ -33,28 +33,28 @@ -behaviour(gen_server). --export([start_link/7]). +-export([start_link/8]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {sock, on_startup, on_shutdown}). +-record(state, {sock, on_startup, on_shutdown, label}). %%-------------------------------------------------------------------- start_link(IPAddress, Port, SocketOpts, ConcurrentAcceptorCount, AcceptorSup, - OnStartup, OnShutdown) -> + OnStartup, OnShutdown, Label) -> gen_server:start_link( ?MODULE, {IPAddress, Port, SocketOpts, ConcurrentAcceptorCount, AcceptorSup, - OnStartup, OnShutdown}, []). + OnStartup, OnShutdown, Label}, []). %%-------------------------------------------------------------------- init({IPAddress, Port, SocketOpts, ConcurrentAcceptorCount, AcceptorSup, - {M,F,A} = OnStartup, OnShutdown}) -> + {M,F,A} = OnStartup, OnShutdown, Label}) -> process_flag(trap_exit, true), case gen_tcp:listen(Port, SocketOpts ++ [{ip, IPAddress}, {active, false}]) of @@ -65,15 +65,16 @@ init({IPAddress, Port, SocketOpts, end, lists:duplicate(ConcurrentAcceptorCount, dummy)), {ok, {LIPAddress, LPort}} = inet:sockname(LSock), - error_logger:info_msg("started TCP listener on ~s:~p~n", - [inet_parse:ntoa(LIPAddress), LPort]), + error_logger:info_msg("started ~s on ~s:~p~n", + [Label, inet_parse:ntoa(LIPAddress), LPort]), apply(M, F, A ++ [IPAddress, Port]), - {ok, #state{sock=LSock, - on_startup = OnStartup, on_shutdown = OnShutdown}}; + {ok, #state{sock = LSock, + on_startup = OnStartup, on_shutdown = OnShutdown, + label = Label}}; {error, Reason} -> error_logger:error_msg( - "failed to start TCP listener on ~s:~p - ~p~n", - [inet_parse:ntoa(IPAddress), Port, Reason]), + "failed to start ~s on ~s:~p - ~p~n", + [Label, inet_parse:ntoa(IPAddress), Port, Reason]), {stop, {cannot_listen, IPAddress, Port, Reason}} end. @@ -86,11 +87,11 @@ handle_cast(_Msg, State) -> handle_info(_Info, State) -> {noreply, State}. -terminate(_Reason, #state{sock=LSock, on_shutdown = {M,F,A}}) -> +terminate(_Reason, #state{sock=LSock, on_shutdown = {M,F,A}, label=Label}) -> {ok, {IPAddress, Port}} = inet:sockname(LSock), gen_tcp:close(LSock), - error_logger:info_msg("stopped TCP listener on ~s:~p~n", - [inet_parse:ntoa(IPAddress), Port]), + error_logger:info_msg("stopped ~s on ~s:~p~n", + [Label, inet_parse:ntoa(IPAddress), Port]), apply(M, F, A ++ [IPAddress, Port]). code_change(_OldVsn, State, _Extra) -> diff --git a/src/tcp_listener_sup.erl b/src/tcp_listener_sup.erl index 901a0da3b7..d6bbac080f 100644 --- a/src/tcp_listener_sup.erl +++ b/src/tcp_listener_sup.erl @@ -33,23 +33,23 @@ -behaviour(supervisor). --export([start_link/6, start_link/7]). +-export([start_link/7, start_link/8]). -export([init/1]). start_link(IPAddress, Port, SocketOpts, OnStartup, OnShutdown, - AcceptCallback) -> + AcceptCallback, Label) -> start_link(IPAddress, Port, SocketOpts, OnStartup, OnShutdown, - AcceptCallback, 1). + AcceptCallback, 1, Label). start_link(IPAddress, Port, SocketOpts, OnStartup, OnShutdown, - AcceptCallback, ConcurrentAcceptorCount) -> + AcceptCallback, ConcurrentAcceptorCount, Label) -> supervisor:start_link( ?MODULE, {IPAddress, Port, SocketOpts, OnStartup, OnShutdown, - AcceptCallback, ConcurrentAcceptorCount}). + AcceptCallback, ConcurrentAcceptorCount, Label}). init({IPAddress, Port, SocketOpts, OnStartup, OnShutdown, - AcceptCallback, ConcurrentAcceptorCount}) -> + AcceptCallback, ConcurrentAcceptorCount, Label}) -> %% This is gross. The tcp_listener needs to know about the %% tcp_acceptor_sup, and the only way I can think of accomplishing %% that without jumping through hoops is to register the @@ -62,5 +62,5 @@ init({IPAddress, Port, SocketOpts, OnStartup, OnShutdown, {tcp_listener, {tcp_listener, start_link, [IPAddress, Port, SocketOpts, ConcurrentAcceptorCount, Name, - OnStartup, OnShutdown]}, + OnStartup, OnShutdown, Label]}, transient, 100, worker, [tcp_listener]}]}}.