Merge pull request #30792 from StefanBratanov

* gh-30792:
  Polish "Add WebClient based sender for Zipkin"
  Add WebClient based sender for Zipkin

Closes gh-30792
This commit is contained in:
Moritz Halbritter 2022-06-22 10:25:55 +02:00
commit 611b029a73
8 changed files with 573 additions and 132 deletions

View File

@ -0,0 +1,151 @@
/*
* Copyright 2012-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.actuate.autoconfigure.tracing.zipkin;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.zip.GZIPOutputStream;
import zipkin2.Call;
import zipkin2.CheckResult;
import zipkin2.codec.Encoding;
import zipkin2.reporter.BytesMessageEncoder;
import zipkin2.reporter.ClosedSenderException;
import zipkin2.reporter.Sender;
import org.springframework.http.HttpHeaders;
import org.springframework.util.unit.DataSize;
/**
* A Zipkin {@link Sender} that uses an HTTP client to send JSON spans. Supports automatic
* compression with gzip.
*
* @author Moritz Halbritter
* @author Stefan Bratanov
*/
abstract class HttpSender extends Sender {
private static final DataSize MESSAGE_MAX_BYTES = DataSize.ofKilobytes(512);
private volatile boolean closed;
@Override
public Encoding encoding() {
return Encoding.JSON;
}
@Override
public int messageMaxBytes() {
return (int) MESSAGE_MAX_BYTES.toBytes();
}
@Override
public int messageSizeInBytes(List<byte[]> encodedSpans) {
return encoding().listSizeInBytes(encodedSpans);
}
@Override
public int messageSizeInBytes(int encodedSizeInBytes) {
return encoding().listSizeInBytes(encodedSizeInBytes);
}
@Override
public CheckResult check() {
try {
sendSpans(List.of()).execute();
return CheckResult.OK;
}
catch (IOException | RuntimeException ex) {
return CheckResult.failed(ex);
}
}
@Override
public void close() throws IOException {
this.closed = true;
}
/**
* The returned {@link HttpPostCall} will send span(s) as a POST to a zipkin endpoint
* when executed.
* @param batchedEncodedSpans list of encoded spans as a byte array
* @return an instance of a Zipkin {@link Call} which can be executed
*/
protected abstract HttpPostCall sendSpans(byte[] batchedEncodedSpans);
@Override
public Call<Void> sendSpans(List<byte[]> encodedSpans) {
if (this.closed) {
throw new ClosedSenderException();
}
return sendSpans(BytesMessageEncoder.JSON.encode(encodedSpans));
}
abstract static class HttpPostCall extends Call.Base<Void> {
/**
* Only use gzip compression on data which is bigger than this in bytes.
*/
private static final DataSize COMPRESSION_THRESHOLD = DataSize.ofKilobytes(1);
private final byte[] body;
HttpPostCall(byte[] body) {
this.body = body;
}
protected byte[] getBody() {
if (needsCompression()) {
return compress(this.body);
}
return this.body;
}
protected byte[] getUncompressedBody() {
return this.body;
}
protected HttpHeaders getDefaultHeaders() {
HttpHeaders headers = new HttpHeaders();
headers.set("b3", "0");
headers.set("Content-Type", "application/json");
if (needsCompression()) {
headers.set("Content-Encoding", "gzip");
}
return headers;
}
private boolean needsCompression() {
return this.body.length > COMPRESSION_THRESHOLD.toBytes();
}
private byte[] compress(byte[] input) {
ByteArrayOutputStream result = new ByteArrayOutputStream();
try (GZIPOutputStream gzip = new GZIPOutputStream(result)) {
gzip.write(input);
}
catch (IOException ex) {
throw new UncheckedIOException(ex);
}
return result.toByteArray();
}
}
}

View File

@ -28,22 +28,26 @@ import zipkin2.reporter.urlconnection.URLConnectionSender;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnWebApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.web.client.RestTemplateBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.web.client.RestTemplate;
import org.springframework.web.reactive.function.client.WebClient;
/**
* Configurations for Zipkin. Those are imported by {@link ZipkinAutoConfiguration}.
*
* @author Moritz Halbritter
* @author Stefan Bratanov
*/
class ZipkinConfigurations {
@Configuration(proxyBeanMethods = false)
@Import({ UrlConnectionSenderConfiguration.class, RestTemplateSenderConfiguration.class })
@Import({ UrlConnectionSenderConfiguration.class, RestTemplateSenderConfiguration.class,
WebClientSenderConfiguration.class })
static class SenderConfiguration {
}
@ -80,6 +84,21 @@ class ZipkinConfigurations {
}
@Configuration(proxyBeanMethods = false)
@ConditionalOnWebApplication(type = ConditionalOnWebApplication.Type.REACTIVE)
@EnableConfigurationProperties(ZipkinProperties.class)
static class WebClientSenderConfiguration {
@Bean
@ConditionalOnMissingBean(Sender.class)
@ConditionalOnBean(WebClient.Builder.class)
ZipkinWebClientSender webClientSender(ZipkinProperties properties, WebClient.Builder webClientBuilder) {
WebClient webClient = webClientBuilder.build();
return new ZipkinWebClientSender(properties.getEndpoint(), webClient);
}
}
@Configuration(proxyBeanMethods = false)
static class ReporterConfiguration {

View File

@ -16,129 +16,57 @@
package org.springframework.boot.actuate.autoconfigure.tracing.zipkin;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.List;
import java.util.zip.GZIPOutputStream;
import zipkin2.Call;
import zipkin2.Callback;
import zipkin2.CheckResult;
import zipkin2.codec.Encoding;
import zipkin2.reporter.BytesMessageEncoder;
import zipkin2.reporter.ClosedSenderException;
import zipkin2.reporter.Sender;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.util.unit.DataSize;
import org.springframework.web.client.RestTemplate;
/**
* A Zipkin {@link Sender} which uses {@link RestTemplate} for HTTP communication.
* Supports automatic compression with gzip.
* A {@link HttpSender} which uses {@link RestTemplate} for HTTP communication.
*
* @author Moritz Halbritter
* @author Stefan Bratanov
*/
class ZipkinRestTemplateSender extends Sender {
private static final DataSize MESSAGE_MAX_BYTES = DataSize.ofKilobytes(512);
class ZipkinRestTemplateSender extends HttpSender {
private final String endpoint;
private final RestTemplate restTemplate;
private volatile boolean closed;
ZipkinRestTemplateSender(String endpoint, RestTemplate restTemplate) {
this.endpoint = endpoint;
this.restTemplate = restTemplate;
}
@Override
public Encoding encoding() {
return Encoding.JSON;
public HttpPostCall sendSpans(byte[] batchedEncodedSpans) {
return new RestTemplateHttpPostCall(this.endpoint, batchedEncodedSpans, this.restTemplate);
}
@Override
public int messageMaxBytes() {
return (int) MESSAGE_MAX_BYTES.toBytes();
}
@Override
public int messageSizeInBytes(List<byte[]> encodedSpans) {
return encoding().listSizeInBytes(encodedSpans);
}
@Override
public int messageSizeInBytes(int encodedSizeInBytes) {
return encoding().listSizeInBytes(encodedSizeInBytes);
}
@Override
public Call<Void> sendSpans(List<byte[]> encodedSpans) {
if (this.closed) {
throw new ClosedSenderException();
}
return new HttpCall(this.endpoint, BytesMessageEncoder.JSON.encode(encodedSpans), this.restTemplate);
}
@Override
public CheckResult check() {
try {
sendSpans(List.of()).execute();
return CheckResult.OK;
}
catch (IOException | RuntimeException ex) {
return CheckResult.failed(ex);
}
}
@Override
public void close() throws IOException {
this.closed = true;
}
private static class HttpCall extends Call.Base<Void> {
/**
* Only use gzip compression on data which is bigger than this in bytes.
*/
private static final DataSize COMPRESSION_THRESHOLD = DataSize.ofKilobytes(1);
private static class RestTemplateHttpPostCall extends HttpPostCall {
private final String endpoint;
private final byte[] body;
private final RestTemplate restTemplate;
HttpCall(String endpoint, byte[] body, RestTemplate restTemplate) {
RestTemplateHttpPostCall(String endpoint, byte[] body, RestTemplate restTemplate) {
super(body);
this.endpoint = endpoint;
this.body = body;
this.restTemplate = restTemplate;
}
@Override
protected Void doExecute() throws IOException {
HttpHeaders headers = new HttpHeaders();
headers.set("b3", "0");
headers.set("Content-Type", "application/json");
byte[] body;
if (needsCompression(this.body)) {
headers.set("Content-Encoding", "gzip");
body = compress(this.body);
}
else {
body = this.body;
}
HttpEntity<byte[]> request = new HttpEntity<>(body, headers);
this.restTemplate.exchange(this.endpoint, HttpMethod.POST, request, Void.class);
return null;
public Call<Void> clone() {
return new RestTemplateHttpPostCall(this.endpoint, getUncompressedBody(), this.restTemplate);
}
private boolean needsCompression(byte[] body) {
return body.length > COMPRESSION_THRESHOLD.toBytes();
@Override
protected Void doExecute() {
HttpEntity<byte[]> request = new HttpEntity<>(getBody(), getDefaultHeaders());
this.restTemplate.exchange(this.endpoint, HttpMethod.POST, request, Void.class);
return null;
}
@Override
@ -147,24 +75,11 @@ class ZipkinRestTemplateSender extends Sender {
doExecute();
callback.onSuccess(null);
}
catch (IOException | RuntimeException ex) {
catch (Exception ex) {
callback.onError(ex);
}
}
@Override
public Call<Void> clone() {
return new HttpCall(this.endpoint, this.body, this.restTemplate);
}
private byte[] compress(byte[] input) throws IOException {
ByteArrayOutputStream result = new ByteArrayOutputStream();
try (GZIPOutputStream gzip = new GZIPOutputStream(result)) {
gzip.write(input);
}
return result.toByteArray();
}
}
}

View File

@ -0,0 +1,82 @@
/*
* Copyright 2012-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.actuate.autoconfigure.tracing.zipkin;
import reactor.core.publisher.Mono;
import zipkin2.Call;
import zipkin2.Callback;
import org.springframework.http.ResponseEntity;
import org.springframework.web.reactive.function.client.WebClient;
/**
* A {@link HttpSender} which uses {@link WebClient} for HTTP communication.
*
* @author Stefan Bratanov
*/
class ZipkinWebClientSender extends HttpSender {
private final String endpoint;
private final WebClient webClient;
ZipkinWebClientSender(String endpoint, WebClient webClient) {
this.endpoint = endpoint;
this.webClient = webClient;
}
@Override
public HttpPostCall sendSpans(byte[] batchedEncodedSpans) {
return new WebClientHttpPostCall(this.endpoint, batchedEncodedSpans, this.webClient);
}
private static class WebClientHttpPostCall extends HttpPostCall {
private final String endpoint;
private final WebClient webClient;
WebClientHttpPostCall(String endpoint, byte[] body, WebClient webClient) {
super(body);
this.endpoint = endpoint;
this.webClient = webClient;
}
@Override
public Call<Void> clone() {
return new WebClientHttpPostCall(this.endpoint, getUncompressedBody(), this.webClient);
}
@Override
protected Void doExecute() {
sendRequest().block();
return null;
}
@Override
protected void doEnqueue(Callback<Void> callback) {
sendRequest().subscribe((__) -> callback.onSuccess(null), callback::onError);
}
private Mono<ResponseEntity<Void>> sendRequest() {
return this.webClient.post().uri(this.endpoint).headers((headers) -> headers.addAll(getDefaultHeaders()))
.bodyValue(getBody()).retrieve().toBodilessEntity();
}
}
}

View File

@ -24,9 +24,11 @@ import org.springframework.boot.actuate.autoconfigure.tracing.zipkin.ZipkinConfi
import org.springframework.boot.autoconfigure.AutoConfigurations;
import org.springframework.boot.test.context.FilteredClassLoader;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import org.springframework.boot.test.context.runner.ReactiveWebApplicationContextRunner;
import org.springframework.boot.web.client.RestTemplateBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.client.WebClient;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
@ -41,6 +43,9 @@ class ZipkinConfigurationsSenderConfigurationTests {
private final ApplicationContextRunner contextRunner = new ApplicationContextRunner()
.withConfiguration(AutoConfigurations.of(SenderConfiguration.class));
private final ReactiveWebApplicationContextRunner reactiveContextRunner = new ReactiveWebApplicationContextRunner()
.withConfiguration(AutoConfigurations.of(SenderConfiguration.class));
@Test
void shouldSupplyBeans() {
this.contextRunner.run((context) -> {
@ -51,7 +56,26 @@ class ZipkinConfigurationsSenderConfigurationTests {
}
@Test
void shouldUseRestTemplateSenderIfUrlConnectionSenderIsNotAvailable() {
void shouldUseWebClientSenderIfWebApplicationIsReactive() {
this.reactiveContextRunner.withUserConfiguration(WebClientConfiguration.class)
.withClassLoader(new FilteredClassLoader("zipkin2.reporter.urlconnection")).run((context) -> {
assertThat(context).doesNotHaveBean(URLConnectionSender.class);
assertThat(context).hasSingleBean(Sender.class);
assertThat(context).hasSingleBean(ZipkinWebClientSender.class);
});
}
@Test
void shouldNotUseWebClientSenderIfNoBuilderIsAvailable() {
this.reactiveContextRunner.run((context) -> {
assertThat(context).doesNotHaveBean(ZipkinWebClientSender.class);
assertThat(context).hasSingleBean(Sender.class);
assertThat(context).hasSingleBean(URLConnectionSender.class);
});
}
@Test
void shouldUseRestTemplateSenderIfUrlConnectionSenderIsNotAvailableAndWebAppIsNotReactive() {
this.contextRunner.withUserConfiguration(RestTemplateConfiguration.class)
.withClassLoader(new FilteredClassLoader("zipkin2.reporter.urlconnection")).run((context) -> {
assertThat(context).doesNotHaveBean(URLConnectionSender.class);
@ -78,6 +102,16 @@ class ZipkinConfigurationsSenderConfigurationTests {
}
@Configuration(proxyBeanMethods = false)
private static class WebClientConfiguration {
@Bean
WebClient.Builder webClientBuilder() {
return WebClient.builder();
}
}
@Configuration(proxyBeanMethods = false)
private static class CustomConfiguration {

View File

@ -0,0 +1,96 @@
/*
* Copyright 2012-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.actuate.autoconfigure.tracing.zipkin;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import zipkin2.Callback;
import zipkin2.reporter.ClosedSenderException;
import zipkin2.reporter.Sender;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
/**
* Abstract base test class which is used for testing the different implementations of the
* {@link HttpSender}.
*
* @author Stefan Bratanov
*/
abstract class ZipkinHttpSenderTests {
protected Sender sut;
abstract Sender createSut();
@BeforeEach
void setUp() {
this.sut = createSut();
}
@Test
void sendSpansShouldThrowIfCloseWasCalled() throws IOException {
this.sut.close();
assertThatThrownBy(() -> this.sut.sendSpans(List.of())).isInstanceOf(ClosedSenderException.class);
}
protected void makeRequest(List<byte[]> encodedSpans, boolean async) throws IOException {
if (async) {
CallbackResult callbackResult = this.makeAsyncRequest(encodedSpans);
assertThat(callbackResult.success()).isTrue();
}
else {
this.makeSyncRequest(encodedSpans);
}
}
protected CallbackResult makeAsyncRequest(List<byte[]> encodedSpans) {
AtomicReference<CallbackResult> callbackResult = new AtomicReference<>();
this.sut.sendSpans(encodedSpans).enqueue(new Callback<>() {
@Override
public void onSuccess(Void value) {
callbackResult.set(new CallbackResult(true, null));
}
@Override
public void onError(Throwable t) {
callbackResult.set(new CallbackResult(false, t));
}
});
return Awaitility.await().atMost(Duration.ofSeconds(5)).until(callbackResult::get, Objects::nonNull);
}
protected void makeSyncRequest(List<byte[]> encodedSpans) throws IOException {
this.sut.sendSpans(encodedSpans).execute();
}
protected byte[] toByteArray(String input) {
return input.getBytes(StandardCharsets.UTF_8);
}
record CallbackResult(boolean success, Throwable error) {
}
}

View File

@ -17,15 +17,15 @@
package org.springframework.boot.actuate.autoconfigure.tracing.zipkin;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.List;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import zipkin2.CheckResult;
import zipkin2.reporter.ClosedSenderException;
import zipkin2.reporter.Sender;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
@ -44,20 +44,19 @@ import static org.springframework.test.web.client.response.MockRestResponseCreat
* Tests for {@link ZipkinRestTemplateSender}.
*
* @author Moritz Halbritter
* @author Stefan Bratanov
*/
class ZipkinRestTemplateSenderTests {
class ZipkinRestTemplateSenderTests extends ZipkinHttpSenderTests {
private static final String ZIPKIN_URL = "http://localhost:9411/api/v2/spans";
private MockRestServiceServer mockServer;
private ZipkinRestTemplateSender sut;
@BeforeEach
void setUp() {
@Override
Sender createSut() {
RestTemplate restTemplate = new RestTemplate();
this.mockServer = MockRestServiceServer.createServer(restTemplate);
this.sut = new ZipkinRestTemplateSender(ZIPKIN_URL, restTemplate);
return new ZipkinRestTemplateSender(ZIPKIN_URL, restTemplate);
}
@AfterEach
@ -81,30 +80,33 @@ class ZipkinRestTemplateSenderTests {
assertThat(result.error()).hasMessageContaining("500 Internal Server Error");
}
@Test
void sendSpansShouldSendSpansToZipkin() throws IOException {
@ParameterizedTest
@ValueSource(booleans = { true, false })
void sendSpansShouldSendSpansToZipkin(boolean async) throws IOException {
this.mockServer.expect(requestTo(ZIPKIN_URL)).andExpect(method(HttpMethod.POST))
.andExpect(content().contentType("application/json")).andExpect(content().string("[span1,span2]"))
.andRespond(withStatus(HttpStatus.ACCEPTED));
this.sut.sendSpans(List.of(toByteArray("span1"), toByteArray("span2"))).execute();
this.makeRequest(List.of(toByteArray("span1"), toByteArray("span2")), async);
}
@Test
void sendSpansShouldThrowOnHttpFailure() {
@ParameterizedTest
@ValueSource(booleans = { true, false })
void sendSpansShouldHandleHttpFailures(boolean async) {
this.mockServer.expect(requestTo(ZIPKIN_URL)).andExpect(method(HttpMethod.POST))
.andRespond(withStatus(HttpStatus.INTERNAL_SERVER_ERROR));
assertThatThrownBy(() -> this.sut.sendSpans(List.of()).execute())
.hasMessageContaining("500 Internal Server Error");
if (async) {
CallbackResult callbackResult = this.makeAsyncRequest(List.of());
assertThat(callbackResult.success()).isFalse();
assertThat(callbackResult.error()).isNotNull().hasMessageContaining("500 Internal Server Error");
}
else {
assertThatThrownBy(() -> this.makeSyncRequest(List.of())).hasMessageContaining("500 Internal Server Error");
}
}
@Test
void sendSpansShouldThrowIfCloseWasCalled() throws IOException {
this.sut.close();
assertThatThrownBy(() -> this.sut.sendSpans(List.of())).isInstanceOf(ClosedSenderException.class);
}
@Test
void sendSpansShouldCompressData() throws IOException {
@ParameterizedTest
@ValueSource(booleans = { true, false })
void sendSpansShouldCompressData(boolean async) throws IOException {
String uncompressed = "a".repeat(10000);
// This is gzip compressed 10000 times 'a'
byte[] compressed = Base64.getDecoder()
@ -112,11 +114,7 @@ class ZipkinRestTemplateSenderTests {
this.mockServer.expect(requestTo(ZIPKIN_URL)).andExpect(method(HttpMethod.POST))
.andExpect(header("Content-Encoding", "gzip")).andExpect(content().contentType("application/json"))
.andExpect(content().bytes(compressed)).andRespond(withStatus(HttpStatus.ACCEPTED));
this.sut.sendSpans(List.of(toByteArray(uncompressed))).execute();
}
private byte[] toByteArray(String input) {
return input.getBytes(StandardCharsets.UTF_8);
this.makeRequest(List.of(toByteArray(uncompressed)), async);
}
}

View File

@ -0,0 +1,146 @@
/*
* Copyright 2012-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.actuate.autoconfigure.tracing.zipkin;
import java.io.IOException;
import java.util.Base64;
import java.util.List;
import java.util.function.Consumer;
import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import okhttp3.mockwebserver.RecordedRequest;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import zipkin2.CheckResult;
import zipkin2.reporter.Sender;
import org.springframework.web.reactive.function.client.WebClient;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
/**
* Tests for {@link ZipkinWebClientSender}.
*
* @author Stefan Bratanov
*/
class ZipkinWebClientSenderTests extends ZipkinHttpSenderTests {
private static MockWebServer mockBackEnd;
public static String ZIPKIN_URL;
@BeforeAll
static void beforeAll() throws IOException {
mockBackEnd = new MockWebServer();
mockBackEnd.start();
ZIPKIN_URL = "http://localhost:%s/api/v2/spans".formatted(mockBackEnd.getPort());
}
@AfterAll
static void tearDown() throws IOException {
mockBackEnd.shutdown();
}
@Override
Sender createSut() {
WebClient webClient = WebClient.builder().build();
return new ZipkinWebClientSender(ZIPKIN_URL, webClient);
}
@Test
void checkShouldSendEmptySpanList() throws InterruptedException {
mockBackEnd.enqueue(new MockResponse());
assertThat(this.sut.check()).isEqualTo(CheckResult.OK);
requestAssertions((request) -> {
assertThat(request.getMethod()).isEqualTo("POST");
assertThat(request.getBody().readUtf8()).isEqualTo("[]");
});
}
@Test
void checkShouldNotRaiseException() throws InterruptedException {
mockBackEnd.enqueue(new MockResponse().setResponseCode(500));
CheckResult result = this.sut.check();
assertThat(result.ok()).isFalse();
assertThat(result.error()).hasMessageContaining("500 Internal Server Error");
requestAssertions((request) -> assertThat(request.getMethod()).isEqualTo("POST"));
}
@ParameterizedTest
@ValueSource(booleans = { true, false })
void sendSpansShouldSendSpansToZipkin(boolean async) throws IOException, InterruptedException {
mockBackEnd.enqueue(new MockResponse());
List<byte[]> encodedSpans = List.of(toByteArray("span1"), toByteArray("span2"));
this.makeRequest(encodedSpans, async);
requestAssertions((request) -> {
assertThat(request.getMethod()).isEqualTo("POST");
assertThat(request.getHeader("Content-Type")).isEqualTo("application/json");
assertThat(request.getBody().readUtf8()).isEqualTo("[span1,span2]");
});
}
@ParameterizedTest
@ValueSource(booleans = { true, false })
void sendSpansShouldHandleHttpFailures(boolean async) throws InterruptedException {
mockBackEnd.enqueue(new MockResponse().setResponseCode(500));
if (async) {
CallbackResult callbackResult = this.makeAsyncRequest(List.of());
assertThat(callbackResult.success()).isFalse();
assertThat(callbackResult.error()).isNotNull().hasMessageContaining("500 Internal Server Error");
}
else {
assertThatThrownBy(() -> this.makeSyncRequest(List.of())).hasMessageContaining("500 Internal Server Error");
}
requestAssertions((request) -> assertThat(request.getMethod()).isEqualTo("POST"));
}
@ParameterizedTest
@ValueSource(booleans = { true, false })
void sendSpansShouldCompressData(boolean async) throws IOException, InterruptedException {
String uncompressed = "a".repeat(10000);
// This is gzip compressed 10000 times 'a'
byte[] compressed = Base64.getDecoder()
.decode("H4sIAAAAAAAA/+3BMQ0AAAwDIKFLj/k3UR8NcA8AAAAAAAAAAAADUsAZfeASJwAA");
mockBackEnd.enqueue(new MockResponse());
this.makeRequest(List.of(toByteArray(uncompressed)), async);
requestAssertions((request) -> {
assertThat(request.getMethod()).isEqualTo("POST");
assertThat(request.getHeader("Content-Type")).isEqualTo("application/json");
assertThat(request.getHeader("Content-Encoding")).isEqualTo("gzip");
assertThat(request.getBody().readByteArray()).isEqualTo(compressed);
});
}
private void requestAssertions(Consumer<RecordedRequest> assertions) throws InterruptedException {
RecordedRequest request = mockBackEnd.takeRequest();
assertThat(request).satisfies(assertions);
}
}