Added sync_all for queue_index, trap exits in amqqueue_process, make sure that terminate calls terminate in variable_queue which calls through into terminate in queue_process.
This commit is contained in:
parent
bd006be70b
commit
a10db26a6d
|
|
@ -98,6 +98,7 @@ start_link(Q) ->
|
||||||
|
|
||||||
init(Q = #amqqueue { name = QName }) ->
|
init(Q = #amqqueue { name = QName }) ->
|
||||||
?LOGDEBUG("Queue starting - ~p~n", [Q]),
|
?LOGDEBUG("Queue starting - ~p~n", [Q]),
|
||||||
|
process_flag(trap_exit, true),
|
||||||
ok = rabbit_memory_manager:register
|
ok = rabbit_memory_manager:register
|
||||||
(self(), false, rabbit_amqqueue, set_storage_mode, [self()]),
|
(self(), false, rabbit_amqqueue, set_storage_mode, [self()]),
|
||||||
VQS = rabbit_variable_queue:init(QName),
|
VQS = rabbit_variable_queue:init(QName),
|
||||||
|
|
@ -113,6 +114,8 @@ init(Q = #amqqueue { name = QName }) ->
|
||||||
{ok, State, hibernate,
|
{ok, State, hibernate,
|
||||||
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
|
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
|
||||||
|
|
||||||
|
terminate(shutdown, #q{variable_queue_state = VQS}) ->
|
||||||
|
_VQS = rabbit_variable_queue:terminate(VQS);
|
||||||
terminate(_Reason, State = #q{variable_queue_state = VQS}) ->
|
terminate(_Reason, State = #q{variable_queue_state = VQS}) ->
|
||||||
%% FIXME: How do we cancel active subscriptions?
|
%% FIXME: How do we cancel active subscriptions?
|
||||||
%% Ensure that any persisted tx messages are removed;
|
%% Ensure that any persisted tx messages are removed;
|
||||||
|
|
|
||||||
|
|
@ -32,7 +32,7 @@
|
||||||
-module(rabbit_queue_index).
|
-module(rabbit_queue_index).
|
||||||
|
|
||||||
-export([init/1, terminate/1, terminate_and_erase/1, write_published/4,
|
-export([init/1, terminate/1, terminate_and_erase/1, write_published/4,
|
||||||
write_delivered/2, write_acks/2, flush_journal/1,
|
write_delivered/2, write_acks/2, flush_journal/1, sync_all/1,
|
||||||
read_segment_entries/2, next_segment_boundary/1, segment_size/0,
|
read_segment_entries/2, next_segment_boundary/1, segment_size/0,
|
||||||
find_lowest_seq_id_seg_and_next_seq_id/1, start_msg_store/1]).
|
find_lowest_seq_id_seg_and_next_seq_id/1, start_msg_store/1]).
|
||||||
|
|
||||||
|
|
@ -229,6 +229,16 @@ full_flush_journal(State) ->
|
||||||
{false, State1} -> State1
|
{false, State1} -> State1
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
sync_all(State = #qistate { hc_state = HCState, seg_num_handles = SegHdls }) ->
|
||||||
|
HCState1 =
|
||||||
|
dict:fold(
|
||||||
|
fun (_Key, Hdl, HCStateN) ->
|
||||||
|
{ok, HCStateM} =
|
||||||
|
horrendously_dumb_file_handle_cache:sync(Hdl, HCStateN),
|
||||||
|
HCStateM
|
||||||
|
end, HCState, SegHdls),
|
||||||
|
State #qistate { hc_state = HCState1 }.
|
||||||
|
|
||||||
flush_journal(State = #qistate { journal_ack_count = 0 }) ->
|
flush_journal(State = #qistate { journal_ack_count = 0 }) ->
|
||||||
{false, State};
|
{false, State};
|
||||||
flush_journal(State = #qistate { journal_ack_dict = JAckDict,
|
flush_journal(State = #qistate { journal_ack_dict = JAckDict,
|
||||||
|
|
|
||||||
|
|
@ -31,10 +31,10 @@
|
||||||
|
|
||||||
-module(rabbit_variable_queue).
|
-module(rabbit_variable_queue).
|
||||||
|
|
||||||
-export([init/1, publish/2, publish_delivered/2, set_queue_ram_duration_target/2,
|
-export([init/1, terminate/1, publish/2, publish_delivered/2,
|
||||||
remeasure_egress_rate/1, fetch/1, ack/2, len/1, is_empty/1,
|
set_queue_ram_duration_target/2, remeasure_egress_rate/1, fetch/1,
|
||||||
maybe_start_prefetcher/1, purge/1, delete/1, requeue/2,
|
ack/2, len/1, is_empty/1, maybe_start_prefetcher/1, purge/1, delete/1,
|
||||||
tx_publish/2, tx_rollback/2, tx_commit/4, do_tx_commit/4]).
|
requeue/2, tx_publish/2, tx_rollback/2, tx_commit/4, do_tx_commit/4]).
|
||||||
|
|
||||||
%%----------------------------------------------------------------------------
|
%%----------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
@ -140,6 +140,9 @@ init(QueueName) ->
|
||||||
},
|
},
|
||||||
maybe_load_next_segment(State).
|
maybe_load_next_segment(State).
|
||||||
|
|
||||||
|
terminate(State = #vqstate { index_state = IndexState }) ->
|
||||||
|
State #vqstate { index_state = rabbit_queue_index:terminate(IndexState) }.
|
||||||
|
|
||||||
publish(Msg, State) ->
|
publish(Msg, State) ->
|
||||||
publish(Msg, false, false, State).
|
publish(Msg, false, false, State).
|
||||||
|
|
||||||
|
|
@ -383,9 +386,10 @@ do_tx_commit(Pubs, AckTags, From, State) ->
|
||||||
{[SeqId | SeqIdsAcc], StateN1}
|
{[SeqId | SeqIdsAcc], StateN1}
|
||||||
end, {[], State}, Pubs),
|
end, {[], State}, Pubs),
|
||||||
%% TODO need to do something here about syncing the queue index, PubSeqIds
|
%% TODO need to do something here about syncing the queue index, PubSeqIds
|
||||||
State2 = ack(AckTags, State1),
|
State2 = #vqstate { index_state = IndexState } = ack(AckTags, State1),
|
||||||
|
IndexState1 = rabbit_queue_index:sync_all(IndexState),
|
||||||
gen_server2:reply(From, ok),
|
gen_server2:reply(From, ok),
|
||||||
State2.
|
State2 #vqstate { index_state = IndexState1 }.
|
||||||
|
|
||||||
%%----------------------------------------------------------------------------
|
%%----------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue