Fix MetricWriterMessageHandler to deal with reset
Update MetricWriterMessageHandler to deal with 'reset' messages and to log unsupported payload types. Fixes gh-3378
This commit is contained in:
parent
fa7199ddac
commit
3ef667f0d8
|
|
@ -1,5 +1,5 @@
|
||||||
/*
|
/*
|
||||||
* Copyright 2012-2013 the original author or authors.
|
* Copyright 2012-2015 the original author or authors.
|
||||||
*
|
*
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
* you may not use this file except in compliance with the License.
|
* you may not use this file except in compliance with the License.
|
||||||
|
|
@ -18,7 +18,6 @@ package org.springframework.boot.actuate.metrics.writer;
|
||||||
|
|
||||||
import org.springframework.boot.actuate.metrics.Metric;
|
import org.springframework.boot.actuate.metrics.Metric;
|
||||||
import org.springframework.messaging.MessageChannel;
|
import org.springframework.messaging.MessageChannel;
|
||||||
import org.springframework.messaging.support.MessageBuilder;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A {@link MetricWriter} that publishes the metric updates on a {@link MessageChannel}.
|
* A {@link MetricWriter} that publishes the metric updates on a {@link MessageChannel}.
|
||||||
|
|
@ -26,13 +25,10 @@ import org.springframework.messaging.support.MessageBuilder;
|
||||||
* carry an additional header "metricName" with the name of the metric in it.
|
* carry an additional header "metricName" with the name of the metric in it.
|
||||||
*
|
*
|
||||||
* @author Dave Syer
|
* @author Dave Syer
|
||||||
|
* @see MetricWriterMessageHandler
|
||||||
*/
|
*/
|
||||||
public class MessageChannelMetricWriter implements MetricWriter {
|
public class MessageChannelMetricWriter implements MetricWriter {
|
||||||
|
|
||||||
private static final String METRIC_NAME = "metricName";
|
|
||||||
|
|
||||||
private final String DELETE = "delete";
|
|
||||||
|
|
||||||
private final MessageChannel channel;
|
private final MessageChannel channel;
|
||||||
|
|
||||||
public MessageChannelMetricWriter(MessageChannel channel) {
|
public MessageChannelMetricWriter(MessageChannel channel) {
|
||||||
|
|
@ -41,20 +37,17 @@ public class MessageChannelMetricWriter implements MetricWriter {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void increment(Delta<?> delta) {
|
public void increment(Delta<?> delta) {
|
||||||
this.channel.send(MessageBuilder.withPayload(delta)
|
this.channel.send(MetricMessage.forIncrement(delta));
|
||||||
.setHeader(METRIC_NAME, delta.getName()).build());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void set(Metric<?> value) {
|
public void set(Metric<?> value) {
|
||||||
this.channel.send(MessageBuilder.withPayload(value)
|
this.channel.send(MetricMessage.forSet(value));
|
||||||
.setHeader(METRIC_NAME, value.getName()).build());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void reset(String metricName) {
|
public void reset(String metricName) {
|
||||||
this.channel.send(MessageBuilder.withPayload(this.DELETE)
|
this.channel.send(MetricMessage.forReset(metricName));
|
||||||
.setHeader(METRIC_NAME, metricName).build());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,70 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2012-2015 the original author or authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.springframework.boot.actuate.metrics.writer;
|
||||||
|
|
||||||
|
import org.springframework.boot.actuate.metrics.Metric;
|
||||||
|
import org.springframework.messaging.Message;
|
||||||
|
import org.springframework.messaging.support.MessageBuilder;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A metric message sent via Spring Integration.
|
||||||
|
*
|
||||||
|
* @author Phillip Webb
|
||||||
|
*/
|
||||||
|
class MetricMessage {
|
||||||
|
|
||||||
|
private static final String METRIC_NAME = "metricName";
|
||||||
|
|
||||||
|
private static final String DELETE = "delete";
|
||||||
|
|
||||||
|
private final Message<?> message;
|
||||||
|
|
||||||
|
public MetricMessage(Message<?> message) {
|
||||||
|
this.message = message;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isReset() {
|
||||||
|
return DELETE.equals(getPayload());
|
||||||
|
}
|
||||||
|
|
||||||
|
public Object getPayload() {
|
||||||
|
return this.message.getPayload();
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getMetricName() {
|
||||||
|
return this.message.getHeaders().get(METRIC_NAME, String.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Message<?> forIncrement(Delta<?> delta) {
|
||||||
|
return forPayload(delta.getName(), delta);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Message<?> forSet(Metric<?> value) {
|
||||||
|
return forPayload(value.getName(), value);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Message<?> forReset(String metricName) {
|
||||||
|
return forPayload(metricName, DELETE);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Message<?> forPayload(String metricName, Object payload) {
|
||||||
|
MessageBuilder<Object> builder = MessageBuilder.withPayload(payload);
|
||||||
|
builder.setHeader(METRIC_NAME, metricName);
|
||||||
|
return builder.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -1,5 +1,5 @@
|
||||||
/*
|
/*
|
||||||
* Copyright 2012-2013 the original author or authors.
|
* Copyright 2012-2015 the original author or authors.
|
||||||
*
|
*
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
* you may not use this file except in compliance with the License.
|
* you may not use this file except in compliance with the License.
|
||||||
|
|
@ -16,6 +16,8 @@
|
||||||
|
|
||||||
package org.springframework.boot.actuate.metrics.writer;
|
package org.springframework.boot.actuate.metrics.writer;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.springframework.boot.actuate.metrics.Metric;
|
import org.springframework.boot.actuate.metrics.Metric;
|
||||||
import org.springframework.messaging.Message;
|
import org.springframework.messaging.Message;
|
||||||
import org.springframework.messaging.MessageHandler;
|
import org.springframework.messaging.MessageHandler;
|
||||||
|
|
@ -26,9 +28,12 @@ import org.springframework.messaging.MessagingException;
|
||||||
* {@link MetricWriter}.
|
* {@link MetricWriter}.
|
||||||
*
|
*
|
||||||
* @author Dave Syer
|
* @author Dave Syer
|
||||||
|
* @see MessageChannelMetricWriter
|
||||||
*/
|
*/
|
||||||
public final class MetricWriterMessageHandler implements MessageHandler {
|
public final class MetricWriterMessageHandler implements MessageHandler {
|
||||||
|
|
||||||
|
private static final Log logger = LogFactory.getLog(MetricWriterMessageHandler.class);
|
||||||
|
|
||||||
private final MetricWriter observer;
|
private final MetricWriter observer;
|
||||||
|
|
||||||
public MetricWriterMessageHandler(MetricWriter observer) {
|
public MetricWriterMessageHandler(MetricWriter observer) {
|
||||||
|
|
@ -37,14 +42,28 @@ public final class MetricWriterMessageHandler implements MessageHandler {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void handleMessage(Message<?> message) throws MessagingException {
|
public void handleMessage(Message<?> message) throws MessagingException {
|
||||||
|
handleMessage(new MetricMessage(message));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void handleMessage(MetricMessage message) {
|
||||||
Object payload = message.getPayload();
|
Object payload = message.getPayload();
|
||||||
if (payload instanceof Delta) {
|
if (message.isReset()) {
|
||||||
|
this.observer.reset(message.getMetricName());
|
||||||
|
}
|
||||||
|
else if (payload instanceof Delta) {
|
||||||
Delta<?> value = (Delta<?>) payload;
|
Delta<?> value = (Delta<?>) payload;
|
||||||
this.observer.increment(value);
|
this.observer.increment(value);
|
||||||
}
|
}
|
||||||
else {
|
else if (payload instanceof Metric) {
|
||||||
Metric<?> value = (Metric<?>) payload;
|
Metric<?> value = (Metric<?>) payload;
|
||||||
this.observer.set(value);
|
this.observer.set(value);
|
||||||
}
|
}
|
||||||
|
else {
|
||||||
|
if (logger.isWarnEnabled()) {
|
||||||
|
logger.warn("Unsupported metric payload "
|
||||||
|
+ (payload == null ? "null" : payload.getClass().getName()));
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,5 @@
|
||||||
/*
|
/*
|
||||||
* Copyright 2012-2013 the original author or authors.
|
* Copyright 2012-2015 the original author or authors.
|
||||||
*
|
*
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
* you may not use this file except in compliance with the License.
|
* you may not use this file except in compliance with the License.
|
||||||
|
|
@ -16,35 +16,73 @@
|
||||||
|
|
||||||
package org.springframework.boot.actuate.metrics.writer;
|
package org.springframework.boot.actuate.metrics.writer;
|
||||||
|
|
||||||
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.mockito.Mock;
|
||||||
|
import org.mockito.MockitoAnnotations;
|
||||||
|
import org.mockito.invocation.InvocationOnMock;
|
||||||
|
import org.mockito.stubbing.Answer;
|
||||||
import org.springframework.boot.actuate.metrics.Metric;
|
import org.springframework.boot.actuate.metrics.Metric;
|
||||||
import org.springframework.messaging.Message;
|
import org.springframework.messaging.Message;
|
||||||
import org.springframework.messaging.MessageChannel;
|
import org.springframework.messaging.MessageChannel;
|
||||||
|
|
||||||
|
import static org.mockito.BDDMockito.given;
|
||||||
import static org.mockito.Matchers.any;
|
import static org.mockito.Matchers.any;
|
||||||
import static org.mockito.Mockito.mock;
|
|
||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
* Tests for {@link MessageChannelMetricWriter} and {@link MetricWriterMessageHandler}.
|
||||||
|
*
|
||||||
* @author Dave Syer
|
* @author Dave Syer
|
||||||
*/
|
*/
|
||||||
public class MessageChannelMetricWriterTests {
|
public class MessageChannelMetricWriterTests {
|
||||||
|
|
||||||
private final MessageChannel channel = mock(MessageChannel.class);
|
@Mock
|
||||||
|
private MessageChannel channel;
|
||||||
|
|
||||||
private final MessageChannelMetricWriter observer = new MessageChannelMetricWriter(
|
@Mock
|
||||||
this.channel);
|
private MetricWriter observer;
|
||||||
|
|
||||||
|
private MessageChannelMetricWriter writer;
|
||||||
|
|
||||||
|
private MetricWriterMessageHandler handler;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() {
|
||||||
|
MockitoAnnotations.initMocks(this);
|
||||||
|
given(this.channel.send(any(Message.class))).willAnswer(new Answer<Object>() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Object answer(InvocationOnMock invocation) throws Throwable {
|
||||||
|
MessageChannelMetricWriterTests.this.handler.handleMessage(invocation
|
||||||
|
.getArgumentAt(0, Message.class));
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
});
|
||||||
|
this.writer = new MessageChannelMetricWriter(this.channel);
|
||||||
|
this.handler = new MetricWriterMessageHandler(this.observer);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void messageSentOnAdd() {
|
public void messageSentOnAdd() {
|
||||||
this.observer.increment(new Delta<Integer>("foo", 1));
|
this.writer.increment(new Delta<Integer>("foo", 1));
|
||||||
verify(this.channel).send(any(Message.class));
|
verify(this.channel).send(any(Message.class));
|
||||||
|
verify(this.observer).increment(any(Delta.class));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void messageSentOnSet() {
|
public void messageSentOnSet() {
|
||||||
this.observer.set(new Metric<Double>("foo", 1d));
|
this.writer.set(new Metric<Double>("foo", 1d));
|
||||||
verify(this.channel).send(any(Message.class));
|
verify(this.channel).send(any(Message.class));
|
||||||
|
verify(this.observer).set(any(Metric.class));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void messageSentOnReset() throws Exception {
|
||||||
|
this.writer.reset("foo");
|
||||||
|
verify(this.channel).send(any(Message.class));
|
||||||
|
verify(this.observer).reset("foo");
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue