Implement JDK HttpClient based Zipkin sender

Closes gh-39545
This commit is contained in:
Moritz Halbritter 2024-02-20 14:21:44 +01:00
parent 52648d9d70
commit 013e148526
12 changed files with 392 additions and 10 deletions

View File

@ -1,5 +1,5 @@
/*
* Copyright 2012-2022 the original author or authors.
* Copyright 2012-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -74,7 +74,7 @@ abstract class HttpSender extends BaseHttpSender<URI, byte[]> {
* @param headers headers for the POST request
* @param body list of possibly gzipped, encoded spans.
*/
abstract void postSpans(URI endpoint, HttpHeaders headers, byte[] body);
abstract void postSpans(URI endpoint, HttpHeaders headers, byte[] body) throws IOException;
HttpHeaders getDefaultHeaders() {
HttpHeaders headers = new HttpHeaders();

View File

@ -1,5 +1,5 @@
/*
* Copyright 2012-2023 the original author or authors.
* Copyright 2012-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -16,6 +16,9 @@
package org.springframework.boot.actuate.autoconfigure.tracing.zipkin;
import java.net.http.HttpClient;
import java.net.http.HttpClient.Builder;
import brave.Tag;
import brave.Tags;
import brave.handler.MutableSpan;
@ -53,7 +56,7 @@ class ZipkinConfigurations {
@Configuration(proxyBeanMethods = false)
@Import({ UrlConnectionSenderConfiguration.class, WebClientSenderConfiguration.class,
RestTemplateSenderConfiguration.class })
RestTemplateSenderConfiguration.class, HttpClientSenderConfiguration.class })
static class SenderConfiguration {
}
@ -90,6 +93,7 @@ class ZipkinConfigurations {
@Bean
@ConditionalOnMissingBean(BytesMessageSender.class)
@SuppressWarnings("removal")
ZipkinRestTemplateSender restTemplateSender(ZipkinProperties properties, Encoding encoding,
ObjectProvider<ZipkinRestTemplateBuilderCustomizer> customizers,
ObjectProvider<ZipkinConnectionDetails> connectionDetailsProvider,
@ -106,6 +110,7 @@ class ZipkinConfigurations {
restTemplateBuilder.build());
}
@SuppressWarnings("removal")
private RestTemplateBuilder applyCustomizers(RestTemplateBuilder restTemplateBuilder,
ObjectProvider<ZipkinRestTemplateBuilderCustomizer> customizers) {
Iterable<ZipkinRestTemplateBuilderCustomizer> orderedCustomizers = () -> customizers.orderedStream()
@ -126,6 +131,7 @@ class ZipkinConfigurations {
@Bean
@ConditionalOnMissingBean(BytesMessageSender.class)
@SuppressWarnings("removal")
ZipkinWebClientSender webClientSender(ZipkinProperties properties, Encoding encoding,
ObjectProvider<ZipkinWebClientBuilderCustomizer> customizers,
ObjectProvider<ZipkinConnectionDetails> connectionDetailsProvider,
@ -142,6 +148,29 @@ class ZipkinConfigurations {
}
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(HttpClient.class)
@EnableConfigurationProperties(ZipkinProperties.class)
static class HttpClientSenderConfiguration {
@Bean
@ConditionalOnMissingBean(BytesMessageSender.class)
ZipkinHttpClientSender httpClientSender(ZipkinProperties properties, Encoding encoding,
ObjectProvider<ZipkinHttpClientBuilderCustomizer> customizers,
ObjectProvider<ZipkinConnectionDetails> connectionDetailsProvider,
ObjectProvider<HttpEndpointSupplier.Factory> endpointSupplierFactoryProvider) {
ZipkinConnectionDetails connectionDetails = connectionDetailsProvider
.getIfAvailable(() -> new PropertiesZipkinConnectionDetails(properties));
HttpEndpointSupplier.Factory endpointSupplierFactory = endpointSupplierFactoryProvider
.getIfAvailable(HttpEndpointSuppliers::constantFactory);
Builder httpClientBuilder = HttpClient.newBuilder().connectTimeout(properties.getConnectTimeout());
customizers.orderedStream().forEach((customizer) -> customizer.customize(httpClientBuilder));
return new ZipkinHttpClientSender(encoding, endpointSupplierFactory, connectionDetails.getSpanEndpoint(),
httpClientBuilder.build(), properties.getReadTimeout());
}
}
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(AsyncZipkinSpanHandler.class)
static class BraveConfiguration {

View File

@ -0,0 +1,37 @@
/*
* Copyright 2012-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.actuate.autoconfigure.tracing.zipkin;
import java.net.http.HttpClient;
/**
* Callback interface that can be implemented by beans wishing to customize the
* {@link HttpClient.Builder} used to send spans to Zipkin.
*
* @author Moritz Halbritter
* @since 3.3.0
*/
@FunctionalInterface
public interface ZipkinHttpClientBuilderCustomizer {
/**
* Customize the http client builder.
* @param httpClient the http client builder to customize
*/
void customize(HttpClient.Builder httpClient);
}

View File

@ -0,0 +1,71 @@
/*
* Copyright 2012-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.actuate.autoconfigure.tracing.zipkin;
import java.io.IOException;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpRequest.BodyPublishers;
import java.net.http.HttpRequest.Builder;
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandlers;
import java.time.Duration;
import zipkin2.reporter.Encoding;
import zipkin2.reporter.HttpEndpointSupplier.Factory;
import org.springframework.http.HttpHeaders;
/**
* A {@link HttpSender} which uses the JDK {@link HttpClient} for HTTP communication.
*
* @author Moritz Halbritter
*/
class ZipkinHttpClientSender extends HttpSender {
private final HttpClient httpClient;
private final Duration readTimeout;
ZipkinHttpClientSender(Encoding encoding, Factory endpointSupplierFactory, String endpoint, HttpClient httpClient,
Duration readTimeout) {
super(encoding, endpointSupplierFactory, endpoint);
this.httpClient = httpClient;
this.readTimeout = readTimeout;
}
@Override
void postSpans(URI endpoint, HttpHeaders headers, byte[] body) throws IOException {
Builder request = HttpRequest.newBuilder()
.POST(BodyPublishers.ofByteArray(body))
.uri(endpoint)
.timeout(this.readTimeout);
headers.forEach((key, values) -> values.forEach((value) -> request.header(key, value)));
try {
HttpResponse<Void> response = this.httpClient.send(request.build(), BodyHandlers.discarding());
if (response.statusCode() / 100 != 2) {
throw new IOException("Expected HTTP status 2xx, got %d".formatted(response.statusCode()));
}
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new IOException("Got interrupted while sending spans", ex);
}
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2012-2022 the original author or authors.
* Copyright 2012-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -24,8 +24,11 @@ import org.springframework.boot.web.client.RestTemplateBuilder;
*
* @author Marcin Grzejszczak
* @since 3.0.0
* @deprecated since 3.3.0 for removal in 3.5.0 in favor of
* {@link ZipkinHttpClientBuilderCustomizer}
*/
@FunctionalInterface
@Deprecated(since = "3.3.0", forRemoval = true)
public interface ZipkinRestTemplateBuilderCustomizer {
/**

View File

@ -1,5 +1,5 @@
/*
* Copyright 2012-2022 the original author or authors.
* Copyright 2012-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -32,6 +32,7 @@ import org.springframework.web.client.RestTemplate;
* @author Moritz Halbritter
* @author Stefan Bratanov
*/
@Deprecated(since = "3.3.0", forRemoval = true)
class ZipkinRestTemplateSender extends HttpSender {
private final RestTemplate restTemplate;

View File

@ -1,5 +1,5 @@
/*
* Copyright 2012-2022 the original author or authors.
* Copyright 2012-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -25,8 +25,11 @@ import org.springframework.web.reactive.function.client.WebClient.Builder;
*
* @author Marcin Grzejszczak
* @since 3.0.0
* @deprecated since 3.3.0 for removal in 3.5.0 in favor of
* {@link ZipkinHttpClientBuilderCustomizer}
*/
@FunctionalInterface
@Deprecated(since = "3.3.0", forRemoval = true)
public interface ZipkinWebClientBuilderCustomizer {
/**

View File

@ -1,5 +1,5 @@
/*
* Copyright 2012-2023 the original author or authors.
* Copyright 2012-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -31,6 +31,7 @@ import org.springframework.web.reactive.function.client.WebClient;
* @author Stefan Bratanov
* @author Moritz Halbritter
*/
@Deprecated(since = "3.3.0", forRemoval = true)
class ZipkinWebClientSender extends HttpSender {
private final WebClient webClient;

View File

@ -50,6 +50,7 @@ import static org.mockito.Mockito.mock;
*
* @author Moritz Halbritter
*/
@SuppressWarnings("removal")
class ZipkinConfigurationsSenderConfigurationTests {
private final ApplicationContextRunner contextRunner = new ApplicationContextRunner()
@ -70,6 +71,20 @@ class ZipkinConfigurationsSenderConfigurationTests {
});
}
@Test
void shouldUseHttpClientIfUrlSenderIsNotAvailable() {
this.contextRunner.withUserConfiguration(HttpClientConfiguration.class)
.withClassLoader(new FilteredClassLoader("zipkin2.reporter.urlconnection", "org.springframework.web.client",
"org.springframework.web.reactive.function.client"))
.run((context) -> {
assertThat(context).doesNotHaveBean(URLConnectionSender.class);
assertThat(context).hasSingleBean(BytesMessageSender.class);
assertThat(context).hasSingleBean(ZipkinHttpClientSender.class);
then(context.getBean(ZipkinHttpClientBuilderCustomizer.class)).should()
.customize(ArgumentMatchers.any());
});
}
@Test
void shouldPreferWebClientSenderIfWebApplicationIsReactiveAndUrlSenderIsNotAvailable() {
this.reactiveContextRunner.withUserConfiguration(RestTemplateConfiguration.class, WebClientConfiguration.class)
@ -220,6 +235,16 @@ class ZipkinConfigurationsSenderConfigurationTests {
}
@Configuration(proxyBeanMethods = false)
private static final class HttpClientConfiguration {
@Bean
ZipkinHttpClientBuilderCustomizer httpClientBuilderCustomizer() {
return mock(ZipkinHttpClientBuilderCustomizer.class);
}
}
@Configuration(proxyBeanMethods = false)
private static final class CustomConfiguration {

View File

@ -0,0 +1,210 @@
/*
* Copyright 2012-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.actuate.autoconfigure.tracing.zipkin;
import java.io.IOException;
import java.net.URI;
import java.net.http.HttpClient;
import java.time.Duration;
import java.util.Base64;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import okhttp3.mockwebserver.QueueDispatcher;
import okhttp3.mockwebserver.RecordedRequest;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import zipkin2.reporter.BytesMessageSender;
import zipkin2.reporter.Encoding;
import zipkin2.reporter.HttpEndpointSupplier;
import zipkin2.reporter.HttpEndpointSuppliers;
import org.springframework.http.HttpHeaders;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatException;
import static org.assertj.core.api.Assertions.assertThatIOException;
/**
* Tests for {@link ZipkinHttpClientSender}.
*
* @author Moritz Halbritter
*/
class ZipkinHttpClientSenderTests extends ZipkinHttpSenderTests {
private static ClearableDispatcher dispatcher;
private static MockWebServer mockBackEnd;
private static String zipkinUrl;
@BeforeAll
static void beforeAll() throws IOException {
dispatcher = new ClearableDispatcher();
mockBackEnd = new MockWebServer();
mockBackEnd.setDispatcher(dispatcher);
mockBackEnd.start();
zipkinUrl = mockBackEnd.url("/api/v2/spans").toString();
}
@AfterAll
static void afterAll() throws IOException {
mockBackEnd.shutdown();
}
@Override
@BeforeEach
void beforeEach() throws Exception {
super.beforeEach();
clearResponses();
clearRequests();
}
@Override
BytesMessageSender createSender() {
return createSender(Encoding.JSON, Duration.ofSeconds(10));
}
ZipkinHttpClientSender createSender(Encoding encoding, Duration timeout) {
return createSender(HttpEndpointSuppliers.constantFactory(), encoding, timeout);
}
ZipkinHttpClientSender createSender(HttpEndpointSupplier.Factory endpointSupplierFactory, Encoding encoding,
Duration timeout) {
HttpClient httpClient = HttpClient.newBuilder().connectTimeout(timeout).build();
return new ZipkinHttpClientSender(encoding, endpointSupplierFactory, zipkinUrl, httpClient, timeout);
}
@Test
void sendShouldSendSpansToZipkin() throws IOException, InterruptedException {
mockBackEnd.enqueue(new MockResponse());
List<byte[]> encodedSpans = List.of(toByteArray("span1"), toByteArray("span2"));
this.sender.send(encodedSpans);
requestAssertions((request) -> {
assertThat(request.getMethod()).isEqualTo("POST");
assertThat(request.getHeader("Content-Type")).isEqualTo("application/json");
assertThat(request.getBody().readUtf8()).isEqualTo("[span1,span2]");
});
}
@Test
void sendShouldSendSpansToZipkinInProto3() throws IOException, InterruptedException {
mockBackEnd.enqueue(new MockResponse());
List<byte[]> encodedSpans = List.of(toByteArray("span1"), toByteArray("span2"));
try (BytesMessageSender sender = createSender(Encoding.PROTO3, Duration.ofSeconds(10))) {
sender.send(encodedSpans);
}
requestAssertions((request) -> {
assertThat(request.getMethod()).isEqualTo("POST");
assertThat(request.getHeader("Content-Type")).isEqualTo("application/x-protobuf");
assertThat(request.getBody().readUtf8()).isEqualTo("span1span2");
});
}
/**
* This tests that a dynamic {@linkplain HttpEndpointSupplier} updates are visible to
* {@link HttpSender#postSpans(URI, HttpHeaders, byte[])}.
*/
@Test
void sendUsesDynamicEndpoint() throws Exception {
mockBackEnd.enqueue(new MockResponse());
mockBackEnd.enqueue(new MockResponse());
AtomicInteger suffix = new AtomicInteger();
try (BytesMessageSender sender = createSender((e) -> new HttpEndpointSupplier() {
@Override
public String get() {
return zipkinUrl + "/" + suffix.incrementAndGet();
}
@Override
public void close() {
}
}, Encoding.JSON, Duration.ofSeconds(10))) {
sender.send(Collections.emptyList());
sender.send(Collections.emptyList());
}
assertThat(mockBackEnd.takeRequest().getPath()).endsWith("/1");
assertThat(mockBackEnd.takeRequest().getPath()).endsWith("/2");
}
@Test
void sendShouldHandleHttpFailures() throws InterruptedException {
mockBackEnd.enqueue(new MockResponse().setResponseCode(500));
assertThatException().isThrownBy(() -> this.sender.send(Collections.emptyList()))
.withMessageContaining("Expected HTTP status 2xx, got 500");
requestAssertions((request) -> assertThat(request.getMethod()).isEqualTo("POST"));
}
@Test
void sendShouldCompressData() throws IOException, InterruptedException {
String uncompressed = "a".repeat(10000);
// This is gzip compressed 10000 times 'a'
byte[] compressed = Base64.getDecoder()
.decode("H4sIAAAAAAAA/+3BMQ0AAAwDIKFLj/k3UR8NcA8AAAAAAAAAAAADUsAZfeASJwAA");
mockBackEnd.enqueue(new MockResponse());
this.sender.send(List.of(toByteArray(uncompressed)));
requestAssertions((request) -> {
assertThat(request.getMethod()).isEqualTo("POST");
assertThat(request.getHeader("Content-Type")).isEqualTo("application/json");
assertThat(request.getHeader("Content-Encoding")).isEqualTo("gzip");
assertThat(request.getBody().readByteArray()).isEqualTo(compressed);
});
}
@Test
void shouldTimeout() throws IOException {
try (BytesMessageSender sender = createSender(Encoding.JSON, Duration.ofMillis(1))) {
MockResponse response = new MockResponse().setResponseCode(200).setHeadersDelay(100, TimeUnit.MILLISECONDS);
mockBackEnd.enqueue(response);
assertThatIOException().isThrownBy(() -> sender.send(Collections.emptyList()))
.withMessageContaining("timed out");
}
}
private void requestAssertions(Consumer<RecordedRequest> assertions) throws InterruptedException {
RecordedRequest request = mockBackEnd.takeRequest();
assertThat(request).satisfies(assertions);
}
private static void clearRequests() throws InterruptedException {
RecordedRequest request;
do {
request = mockBackEnd.takeRequest(0, TimeUnit.SECONDS);
}
while (request != null);
}
private static void clearResponses() {
dispatcher.clear();
}
private static final class ClearableDispatcher extends QueueDispatcher {
void clear() {
getResponseQueue().clear();
}
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2012-2023 the original author or authors.
* Copyright 2012-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -49,6 +49,7 @@ import static org.springframework.test.web.client.response.MockRestResponseCreat
* @author Moritz Halbritter
* @author Stefan Bratanov
*/
@SuppressWarnings("removal")
class ZipkinRestTemplateSenderTests extends ZipkinHttpSenderTests {
private static final String ZIPKIN_URL = "http://localhost:9411/api/v2/spans";

View File

@ -1,5 +1,5 @@
/*
* Copyright 2012-2023 the original author or authors.
* Copyright 2012-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -51,6 +51,7 @@ import static org.assertj.core.api.Assertions.assertThatException;
*
* @author Stefan Bratanov
*/
@SuppressWarnings("removal")
class ZipkinWebClientSenderTests extends ZipkinHttpSenderTests {
private static ClearableDispatcher dispatcher;