Bump dependencies in Java test suite

And apply Google Java Format with Maven Spotless plugin.
This commit is contained in:
Arnaud Cogoluègnes 2020-09-29 12:05:29 +02:00
parent 03a11e0771
commit b8bdb5ae7b
5 changed files with 775 additions and 645 deletions

View File

@ -27,13 +27,14 @@
<properties>
<stream-client.version>0.1.0-SNAPSHOT</stream-client.version>
<proton-j.version>0.33.5</proton-j.version>
<junit.jupiter.version>5.6.2</junit.jupiter.version>
<assertj.version>3.16.1</assertj.version>
<mockito.version>3.3.3</mockito.version>
<proton-j.version>0.33.6</proton-j.version>
<junit.jupiter.version>5.7.0</junit.jupiter.version>
<assertj.version>3.17.2</assertj.version>
<mockito.version>3.5.11</mockito.version>
<logback.version>1.2.3</logback.version>
<maven.compiler.plugin.version>3.8.1</maven.compiler.plugin.version>
<maven-surefire-plugin.version>2.22.2</maven-surefire-plugin.version>
<spotless.version>2.2.0</spotless.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
@ -110,6 +111,20 @@
<version>${maven-surefire-plugin.version}</version>
</plugin>
<plugin>
<groupId>com.diffplug.spotless</groupId>
<artifactId>spotless-maven-plugin</artifactId>
<version>${spotless.version}</version>
<configuration>
<java>
<googleJavaFormat>
<version>1.9</version>
<style>GOOGLE</style>
</googleJavaFormat>
</java>
</configuration>
</plugin>
</plugins>
</build>

View File

@ -16,12 +16,11 @@
package com.rabbitmq.stream;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
import com.rabbitmq.stream.codec.WrapperMessageBuilder;
import com.rabbitmq.stream.impl.Client;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.*;
@ -29,88 +28,100 @@ import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@ExtendWith(TestUtils.StreamTestInfrastructureExtension.class)
public class FailureTest {
TestUtils.ClientFactory cf;
String stream;
ExecutorService executorService;
TestUtils.ClientFactory cf;
String stream;
ExecutorService executorService;
static void wait(Duration duration) {
try {
Thread.sleep(duration.toMillis());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
static void wait(Duration duration) {
try {
Thread.sleep(duration.toMillis());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
@AfterEach
void tearDown() {
if (executorService != null) {
executorService.shutdownNow();
}
@AfterEach
void tearDown() {
if (executorService != null) {
executorService.shutdownNow();
}
}
@Test
void leaderFailureWhenPublisherConnectedToReplica() throws Exception {
Set<String> messages = new HashSet<>();
Client client = cf.get(new Client.ClientParameters()
.port(TestUtils.streamPortNode1())
);
Map<String, Client.StreamMetadata> metadata = client.metadata(stream);
Client.StreamMetadata streamMetadata = metadata.get(stream);
assertThat(streamMetadata).isNotNull();
@Test
void leaderFailureWhenPublisherConnectedToReplica() throws Exception {
Set<String> messages = new HashSet<>();
Client client = cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode1()));
Map<String, Client.StreamMetadata> metadata = client.metadata(stream);
Client.StreamMetadata streamMetadata = metadata.get(stream);
assertThat(streamMetadata).isNotNull();
assertThat(streamMetadata.getLeader().getPort()).isEqualTo(TestUtils.streamPortNode1());
assertThat(streamMetadata.getReplicas()).isNotEmpty();
Client.Broker replica = streamMetadata.getReplicas().get(0);
assertThat(replica.getPort()).isNotEqualTo(TestUtils.streamPortNode1());
assertThat(streamMetadata.getLeader().getPort()).isEqualTo(TestUtils.streamPortNode1());
assertThat(streamMetadata.getReplicas()).isNotEmpty();
Client.Broker replica = streamMetadata.getReplicas().get(0);
assertThat(replica.getPort()).isNotEqualTo(TestUtils.streamPortNode1());
AtomicReference<CountDownLatch> confirmLatch = new AtomicReference<>(new CountDownLatch(1));
AtomicReference<CountDownLatch> confirmLatch = new AtomicReference<>(new CountDownLatch(1));
CountDownLatch metadataLatch = new CountDownLatch(1);
Client publisher = cf.get(new Client.ClientParameters()
CountDownLatch metadataLatch = new CountDownLatch(1);
Client publisher =
cf.get(
new Client.ClientParameters()
.port(replica.getPort())
.metadataListener((stream, code) -> metadataLatch.countDown())
.publishConfirmListener((publisherId, publishingId) -> confirmLatch.get().countDown()));
String message = "all nodes available";
messages.add(message);
publisher.publish(stream, (byte) 1,
Collections.singletonList(publisher.messageBuilder().addData(message.getBytes(StandardCharsets.UTF_8)).build()));
assertThat(confirmLatch.get().await(10, TimeUnit.SECONDS)).isTrue();
confirmLatch.set(null);
.publishConfirmListener(
(publisherId, publishingId) -> confirmLatch.get().countDown()));
String message = "all nodes available";
messages.add(message);
publisher.publish(
stream,
(byte) 1,
Collections.singletonList(
publisher.messageBuilder().addData(message.getBytes(StandardCharsets.UTF_8)).build()));
assertThat(confirmLatch.get().await(10, TimeUnit.SECONDS)).isTrue();
confirmLatch.set(null);
try {
Host.rabbitmqctl("stop_app");
try {
cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode1()));
fail("Node app stopped, connecting should not be possible");
} catch (Exception e) {
// OK
}
try {
Host.rabbitmqctl("stop_app");
try {
cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode1()));
fail("Node app stopped, connecting should not be possible");
} catch (Exception e) {
// OK
}
assertThat(metadataLatch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(metadataLatch.await(10, TimeUnit.SECONDS)).isTrue();
// wait until there's a new leader
TestUtils.waitAtMost(Duration.ofSeconds(10), () -> {
Client.StreamMetadata m = publisher.metadata(stream).get(stream);
return m.getLeader() != null && m.getLeader().getPort() != TestUtils.streamPortNode1();
});
// wait until there's a new leader
TestUtils.waitAtMost(
Duration.ofSeconds(10),
() -> {
Client.StreamMetadata m = publisher.metadata(stream).get(stream);
return m.getLeader() != null && m.getLeader().getPort() != TestUtils.streamPortNode1();
});
confirmLatch.set(new CountDownLatch(1));
message = "2 nodes available";
messages.add(message);
publisher.publish(stream, (byte) 1, Collections.singletonList(publisher.messageBuilder()
.addData(message.getBytes(StandardCharsets.UTF_8)).build()));
assertThat(confirmLatch.get().await(10, TimeUnit.SECONDS)).isTrue();
confirmLatch.set(null);
} finally {
Host.rabbitmqctl("start_app");
}
confirmLatch.set(new CountDownLatch(1));
message = "2 nodes available";
messages.add(message);
publisher.publish(
stream,
(byte) 1,
Collections.singletonList(
publisher
.messageBuilder()
.addData(message.getBytes(StandardCharsets.UTF_8))
.build()));
assertThat(confirmLatch.get().await(10, TimeUnit.SECONDS)).isTrue();
confirmLatch.set(null);
} finally {
Host.rabbitmqctl("start_app");
}
// wait until all the replicas are there
TestUtils.waitAtMost(
@ -120,350 +131,411 @@ public class FailureTest {
return m.getReplicas().size() == 2;
});
confirmLatch.set(new CountDownLatch(1));
message = "all nodes are back";
messages.add(message);
publisher.publish(stream, (byte) 1, Collections.singletonList(publisher.messageBuilder()
.addData(message.getBytes(StandardCharsets.UTF_8)).build()));
assertThat(confirmLatch.get().await(10, TimeUnit.SECONDS)).isTrue();
confirmLatch.set(null);
confirmLatch.set(new CountDownLatch(1));
message = "all nodes are back";
messages.add(message);
publisher.publish(
stream,
(byte) 1,
Collections.singletonList(
publisher.messageBuilder().addData(message.getBytes(StandardCharsets.UTF_8)).build()));
assertThat(confirmLatch.get().await(10, TimeUnit.SECONDS)).isTrue();
confirmLatch.set(null);
CountDownLatch consumeLatch = new CountDownLatch(2);
Set<String> bodies = ConcurrentHashMap.newKeySet();
Client consumer = cf.get(new Client.ClientParameters()
CountDownLatch consumeLatch = new CountDownLatch(2);
Set<String> bodies = ConcurrentHashMap.newKeySet();
Client consumer =
cf.get(
new Client.ClientParameters()
.port(TestUtils.streamPortNode1())
.messageListener((subscriptionId, offset, msg) -> {
bodies.add(new String(msg.getBodyAsBinary(), StandardCharsets.UTF_8));
consumeLatch.countDown();
}));
.messageListener(
(subscriptionId, offset, msg) -> {
bodies.add(new String(msg.getBodyAsBinary(), StandardCharsets.UTF_8));
consumeLatch.countDown();
}));
TestUtils.waitAtMost(Duration.ofSeconds(5), () -> {
Client.Response response = consumer.subscribe((byte) 1, stream, OffsetSpecification.first(), 10);
return response.isOk();
TestUtils.waitAtMost(
Duration.ofSeconds(5),
() -> {
Client.Response response =
consumer.subscribe((byte) 1, stream, OffsetSpecification.first(), 10);
return response.isOk();
});
assertThat(consumeLatch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(bodies).hasSize(3).contains("all nodes available", "2 nodes available", "all nodes are back");
}
assertThat(consumeLatch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(bodies)
.hasSize(3)
.contains("all nodes available", "2 nodes available", "all nodes are back");
}
@Test
void noLostConfirmedMessagesWhenLeaderGoesAway() throws Exception {
executorService = Executors.newCachedThreadPool();
Client client = cf.get(new Client.ClientParameters()
.port(TestUtils.streamPortNode1())
);
Map<String, Client.StreamMetadata> metadata = client.metadata(stream);
Client.StreamMetadata streamMetadata = metadata.get(stream);
assertThat(streamMetadata).isNotNull();
@Test
void noLostConfirmedMessagesWhenLeaderGoesAway() throws Exception {
executorService = Executors.newCachedThreadPool();
Client client = cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode1()));
Map<String, Client.StreamMetadata> metadata = client.metadata(stream);
Client.StreamMetadata streamMetadata = metadata.get(stream);
assertThat(streamMetadata).isNotNull();
assertThat(streamMetadata.getLeader()).isNotNull();
assertThat(streamMetadata.getLeader().getPort()).isEqualTo(TestUtils.streamPortNode1());
assertThat(streamMetadata.getLeader()).isNotNull();
assertThat(streamMetadata.getLeader().getPort()).isEqualTo(TestUtils.streamPortNode1());
Map<Long, Message> published = new ConcurrentHashMap<>();
Set<Message> confirmed = ConcurrentHashMap.newKeySet();
Map<Long, Message> published = new ConcurrentHashMap<>();
Set<Message> confirmed = ConcurrentHashMap.newKeySet();
Client.PublishConfirmListener publishConfirmListener = (publisherId, publishingId) -> {
Message confirmedMessage;
int attempts = 0;
while ((confirmedMessage = published.remove(publishingId)) == null && attempts < 10) {
wait(Duration.ofMillis(5));
attempts++;
}
confirmed.add(confirmedMessage);
Client.PublishConfirmListener publishConfirmListener =
(publisherId, publishingId) -> {
Message confirmedMessage;
int attempts = 0;
while ((confirmedMessage = published.remove(publishingId)) == null && attempts < 10) {
wait(Duration.ofMillis(5));
attempts++;
}
confirmed.add(confirmedMessage);
};
AtomicLong generation = new AtomicLong(0);
AtomicLong sequence = new AtomicLong(0);
AtomicBoolean connected = new AtomicBoolean(true);
AtomicReference<Client> publisher = new AtomicReference<>();
CountDownLatch reconnectionLatch = new CountDownLatch(1);
AtomicReference<Client.ShutdownListener> shutdownListenerReference = new AtomicReference<>();
Client.ShutdownListener shutdownListener = shutdownContext -> {
if (shutdownContext.getShutdownReason() == Client.ShutdownContext.ShutdownReason.UNKNOWN) {
// avoid long-running task in the IO thread
executorService.submit(() -> {
connected.set(false);
AtomicLong generation = new AtomicLong(0);
AtomicLong sequence = new AtomicLong(0);
AtomicBoolean connected = new AtomicBoolean(true);
AtomicReference<Client> publisher = new AtomicReference<>();
CountDownLatch reconnectionLatch = new CountDownLatch(1);
AtomicReference<Client.ShutdownListener> shutdownListenerReference = new AtomicReference<>();
Client.ShutdownListener shutdownListener =
shutdownContext -> {
if (shutdownContext.getShutdownReason()
== Client.ShutdownContext.ShutdownReason.UNKNOWN) {
// avoid long-running task in the IO thread
executorService.submit(
() -> {
connected.set(false);
Client locator = cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode2()));
// wait until there's a new leader
try {
TestUtils.waitAtMost(Duration.ofSeconds(5), () -> {
Client.StreamMetadata m = locator.metadata(stream).get(stream);
return m.getLeader() != null && m.getLeader().getPort() != TestUtils.streamPortNode1();
Client locator =
cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode2()));
// wait until there's a new leader
try {
TestUtils.waitAtMost(
Duration.ofSeconds(5),
() -> {
Client.StreamMetadata m = locator.metadata(stream).get(stream);
return m.getLeader() != null
&& m.getLeader().getPort() != TestUtils.streamPortNode1();
});
} catch (Throwable e) {
reconnectionLatch.countDown();
return;
}
int newLeaderPort = locator.metadata(stream).get(stream).getLeader().getPort();
Client newPublisher = cf.get(new Client.ClientParameters()
.port(newLeaderPort)
.shutdownListener(shutdownListenerReference.get())
.publishConfirmListener(publishConfirmListener)
);
generation.incrementAndGet();
published.clear();
publisher.set(newPublisher);
connected.set(true);
} catch (Throwable e) {
reconnectionLatch.countDown();
});
}
};
shutdownListenerReference.set(shutdownListener);
return;
}
client = cf.get(new Client.ClientParameters()
int newLeaderPort = locator.metadata(stream).get(stream).getLeader().getPort();
Client newPublisher =
cf.get(
new Client.ClientParameters()
.port(newLeaderPort)
.shutdownListener(shutdownListenerReference.get())
.publishConfirmListener(publishConfirmListener));
generation.incrementAndGet();
published.clear();
publisher.set(newPublisher);
connected.set(true);
reconnectionLatch.countDown();
});
}
};
shutdownListenerReference.set(shutdownListener);
client =
cf.get(
new Client.ClientParameters()
.port(streamMetadata.getLeader().getPort())
.shutdownListener(shutdownListener)
.publishConfirmListener(publishConfirmListener));
publisher.set(client);
publisher.set(client);
AtomicBoolean keepPublishing = new AtomicBoolean(true);
AtomicBoolean keepPublishing = new AtomicBoolean(true);
executorService.submit(() -> {
while (keepPublishing.get()) {
if (connected.get()) {
Message message = publisher.get().messageBuilder()
.properties().messageId(sequence.getAndIncrement())
.messageBuilder().applicationProperties().entry("generation", generation.get())
.messageBuilder().build();
try {
long publishingId = publisher.get().publish(stream, (byte) 1, Collections.singletonList(message)).get(0);
published.put(publishingId, message);
} catch (Exception e) {
// keep going
}
wait(Duration.ofMillis(10));
} else {
wait(Duration.ofSeconds(1));
}
executorService.submit(
() -> {
while (keepPublishing.get()) {
if (connected.get()) {
Message message =
publisher
.get()
.messageBuilder()
.properties()
.messageId(sequence.getAndIncrement())
.messageBuilder()
.applicationProperties()
.entry("generation", generation.get())
.messageBuilder()
.build();
try {
long publishingId =
publisher
.get()
.publish(stream, (byte) 1, Collections.singletonList(message))
.get(0);
published.put(publishingId, message);
} catch (Exception e) {
// keep going
}
wait(Duration.ofMillis(10));
} else {
wait(Duration.ofSeconds(1));
}
}
});
// let's publish for a bit of time
Thread.sleep(2000);
// let's publish for a bit of time
Thread.sleep(2000);
assertThat(confirmed).isNotEmpty();
int confirmedCount = confirmed.size();
assertThat(confirmed).isNotEmpty();
int confirmedCount = confirmed.size();
try {
Host.rabbitmqctl("stop_app");
try {
Host.rabbitmqctl("stop_app");
assertThat(reconnectionLatch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(reconnectionLatch.await(10, TimeUnit.SECONDS)).isTrue();
// let's publish for a bit of time
Thread.sleep(2000);
// let's publish for a bit of time
Thread.sleep(2000);
} finally {
Host.rabbitmqctl("start_app");
}
assertThat(confirmed).hasSizeGreaterThan(confirmedCount);
confirmedCount = confirmed.size();
Client metadataClient = cf.get(new Client.ClientParameters()
.port(TestUtils.streamPortNode2())
);
// wait until all the replicas are there
TestUtils.waitAtMost(Duration.ofSeconds(5), () -> {
Client.StreamMetadata m = metadataClient.metadata(stream).get(stream);
return m.getReplicas().size() == 2;
});
// let's publish for a bit of time
Thread.sleep(2000);
assertThat(confirmed).hasSizeGreaterThan(confirmedCount);
keepPublishing.set(false);
Queue<Message> consumed = new ConcurrentLinkedQueue<>();
Set<Long> generations = ConcurrentHashMap.newKeySet();
CountDownLatch consumedLatch = new CountDownLatch(1);
Client.StreamMetadata m = metadataClient.metadata(stream).get(stream);
Client consumer = cf.get(new Client.ClientParameters()
.port(m.getReplicas().get(0).getPort())
.chunkListener((client1, subscriptionId, offset, messageCount, dataSize) -> client1.credit(subscriptionId, 1))
.messageListener((subscriptionId, offset, message) -> {
consumed.add(message);
generations.add((Long) message.getApplicationProperties().get("generation"));
if (consumed.size() == confirmed.size()) {
consumedLatch.countDown();
}
})
);
Client.Response response = consumer.subscribe((byte) 1, stream, OffsetSpecification.first(), 10);
assertThat(response.isOk()).isTrue();
assertThat(consumedLatch.await(5, TimeUnit.SECONDS)).isTrue();
assertThat(generations).hasSize(2).contains(0L, 1L);
assertThat(consumed).hasSizeGreaterThanOrEqualTo(confirmed.size());
long lastMessageId = -1;
for (Message message : consumed) {
long messageId = message.getProperties().getMessageIdAsLong();
assertThat(messageId).isGreaterThanOrEqualTo(lastMessageId);
lastMessageId = messageId;
}
assertThat(lastMessageId).isPositive().isLessThanOrEqualTo(sequence.get());
} finally {
Host.rabbitmqctl("start_app");
}
assertThat(confirmed).hasSizeGreaterThan(confirmedCount);
confirmedCount = confirmed.size();
@Test
void consumerReattachesToOtherReplicaWhenReplicaGoesAway() throws Exception {
executorService = Executors.newCachedThreadPool();
Client metadataClient = cf.get(new Client.ClientParameters()
.port(TestUtils.streamPortNode1())
);
Map<String, Client.StreamMetadata> metadata = metadataClient.metadata(stream);
Client.StreamMetadata streamMetadata = metadata.get(stream);
assertThat(streamMetadata).isNotNull();
assertThat(streamMetadata.getLeader()).isNotNull();
assertThat(streamMetadata.getLeader().getPort()).isEqualTo(TestUtils.streamPortNode1());
Map<Long, Message> published = new ConcurrentHashMap<>();
Set<Message> confirmed = ConcurrentHashMap.newKeySet();
Set<Long> confirmedIds = ConcurrentHashMap.newKeySet();
Client.PublishConfirmListener publishConfirmListener = (publisherId, publishingId) -> {
Message confirmedMessage;
int attempts = 0;
while ((confirmedMessage = published.remove(publishingId)) == null && attempts < 10) {
wait(Duration.ofMillis(5));
attempts++;
}
confirmed.add(confirmedMessage);
confirmedIds.add(confirmedMessage.getProperties().getMessageIdAsLong());
};
Client publisher = cf.get(new Client.ClientParameters()
.port(streamMetadata.getLeader().getPort())
.publishConfirmListener(publishConfirmListener)
);
AtomicLong generation = new AtomicLong(0);
AtomicLong sequence = new AtomicLong(0);
AtomicBoolean keepPublishing = new AtomicBoolean(true);
CountDownLatch publishingLatch = new CountDownLatch(1);
executorService.submit(() -> {
while (keepPublishing.get()) {
Message message = new WrapperMessageBuilder()
.properties().messageId(sequence.getAndIncrement())
.messageBuilder().applicationProperties().entry("generation", generation.get())
.messageBuilder().build();
try {
long publishingId = publisher.publish(stream, (byte) 1, Collections.singletonList(message)).get(0);
published.put(publishingId, message);
} catch (Exception e) {
// keep going
}
wait(Duration.ofMillis(10));
}
publishingLatch.countDown();
Client metadataClient = cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode2()));
// wait until all the replicas are there
TestUtils.waitAtMost(
Duration.ofSeconds(5),
() -> {
Client.StreamMetadata m = metadataClient.metadata(stream).get(stream);
return m.getReplicas().size() == 2;
});
Queue<Message> consumed = new ConcurrentLinkedQueue<>();
// let's publish for a bit of time
Thread.sleep(2000);
Client.Broker replica = streamMetadata.getReplicas().stream()
.filter(broker -> broker.getPort() == TestUtils.streamPortNode2())
.findFirst()
.orElseThrow(() -> new NoSuchElementException());
assertThat(confirmed).hasSizeGreaterThan(confirmedCount);
AtomicLong lastProcessedOffset = new AtomicLong(-1);
Set<Long> generations = ConcurrentHashMap.newKeySet();
Set<Long> consumedIds = ConcurrentHashMap.newKeySet();
Client.MessageListener messageListener = (subscriptionId, offset, message) -> {
consumed.add(message);
generations.add((Long) message.getApplicationProperties().get("generation"));
consumedIds.add(message.getProperties().getMessageIdAsLong());
lastProcessedOffset.set(offset);
keepPublishing.set(false);
Queue<Message> consumed = new ConcurrentLinkedQueue<>();
Set<Long> generations = ConcurrentHashMap.newKeySet();
CountDownLatch consumedLatch = new CountDownLatch(1);
Client.StreamMetadata m = metadataClient.metadata(stream).get(stream);
Client consumer =
cf.get(
new Client.ClientParameters()
.port(m.getReplicas().get(0).getPort())
.chunkListener(
(client1, subscriptionId, offset, messageCount, dataSize) ->
client1.credit(subscriptionId, 1))
.messageListener(
(subscriptionId, offset, message) -> {
consumed.add(message);
generations.add((Long) message.getApplicationProperties().get("generation"));
if (consumed.size() == confirmed.size()) {
consumedLatch.countDown();
}
}));
Client.Response response =
consumer.subscribe((byte) 1, stream, OffsetSpecification.first(), 10);
assertThat(response.isOk()).isTrue();
assertThat(consumedLatch.await(5, TimeUnit.SECONDS)).isTrue();
assertThat(generations).hasSize(2).contains(0L, 1L);
assertThat(consumed).hasSizeGreaterThanOrEqualTo(confirmed.size());
long lastMessageId = -1;
for (Message message : consumed) {
long messageId = message.getProperties().getMessageIdAsLong();
assertThat(messageId).isGreaterThanOrEqualTo(lastMessageId);
lastMessageId = messageId;
}
assertThat(lastMessageId).isPositive().isLessThanOrEqualTo(sequence.get());
}
@Test
void consumerReattachesToOtherReplicaWhenReplicaGoesAway() throws Exception {
executorService = Executors.newCachedThreadPool();
Client metadataClient = cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode1()));
Map<String, Client.StreamMetadata> metadata = metadataClient.metadata(stream);
Client.StreamMetadata streamMetadata = metadata.get(stream);
assertThat(streamMetadata).isNotNull();
assertThat(streamMetadata.getLeader()).isNotNull();
assertThat(streamMetadata.getLeader().getPort()).isEqualTo(TestUtils.streamPortNode1());
Map<Long, Message> published = new ConcurrentHashMap<>();
Set<Message> confirmed = ConcurrentHashMap.newKeySet();
Set<Long> confirmedIds = ConcurrentHashMap.newKeySet();
Client.PublishConfirmListener publishConfirmListener =
(publisherId, publishingId) -> {
Message confirmedMessage;
int attempts = 0;
while ((confirmedMessage = published.remove(publishingId)) == null && attempts < 10) {
wait(Duration.ofMillis(5));
attempts++;
}
confirmed.add(confirmedMessage);
confirmedIds.add(confirmedMessage.getProperties().getMessageIdAsLong());
};
CountDownLatch reconnectionLatch = new CountDownLatch(1);
AtomicReference<Client.ShutdownListener> shutdownListenerReference = new AtomicReference<>();
Client.ShutdownListener shutdownListener = shutdownContext -> {
if (shutdownContext.getShutdownReason() == Client.ShutdownContext.ShutdownReason.UNKNOWN) {
// avoid long-running task in the IO thread
executorService.submit(() -> {
Client.StreamMetadata m = metadataClient.metadata(stream).get(stream);
int newReplicaPort = m.getReplicas().get(0).getPort();
Client publisher =
cf.get(
new Client.ClientParameters()
.port(streamMetadata.getLeader().getPort())
.publishConfirmListener(publishConfirmListener));
Client newConsumer = cf.get(new Client.ClientParameters()
.port(newReplicaPort)
.shutdownListener(shutdownListenerReference.get())
.chunkListener((client1, subscriptionId, offset, messageCount, dataSize) -> client1.credit(subscriptionId, 1))
.messageListener(messageListener)
);
AtomicLong generation = new AtomicLong(0);
AtomicLong sequence = new AtomicLong(0);
AtomicBoolean keepPublishing = new AtomicBoolean(true);
CountDownLatch publishingLatch = new CountDownLatch(1);
newConsumer.subscribe((byte) 1, stream, OffsetSpecification.offset(lastProcessedOffset.get() + 1), 10);
generation.incrementAndGet();
reconnectionLatch.countDown();
});
executorService.submit(
() -> {
while (keepPublishing.get()) {
Message message =
new WrapperMessageBuilder()
.properties()
.messageId(sequence.getAndIncrement())
.messageBuilder()
.applicationProperties()
.entry("generation", generation.get())
.messageBuilder()
.build();
try {
long publishingId =
publisher.publish(stream, (byte) 1, Collections.singletonList(message)).get(0);
published.put(publishingId, message);
} catch (Exception e) {
// keep going
}
};
shutdownListenerReference.set(shutdownListener);
wait(Duration.ofMillis(10));
}
publishingLatch.countDown();
});
Client consumer = cf.get(new Client.ClientParameters()
Queue<Message> consumed = new ConcurrentLinkedQueue<>();
Client.Broker replica =
streamMetadata.getReplicas().stream()
.filter(broker -> broker.getPort() == TestUtils.streamPortNode2())
.findFirst()
.orElseThrow(() -> new NoSuchElementException());
AtomicLong lastProcessedOffset = new AtomicLong(-1);
Set<Long> generations = ConcurrentHashMap.newKeySet();
Set<Long> consumedIds = ConcurrentHashMap.newKeySet();
Client.MessageListener messageListener =
(subscriptionId, offset, message) -> {
consumed.add(message);
generations.add((Long) message.getApplicationProperties().get("generation"));
consumedIds.add(message.getProperties().getMessageIdAsLong());
lastProcessedOffset.set(offset);
};
CountDownLatch reconnectionLatch = new CountDownLatch(1);
AtomicReference<Client.ShutdownListener> shutdownListenerReference = new AtomicReference<>();
Client.ShutdownListener shutdownListener =
shutdownContext -> {
if (shutdownContext.getShutdownReason()
== Client.ShutdownContext.ShutdownReason.UNKNOWN) {
// avoid long-running task in the IO thread
executorService.submit(
() -> {
Client.StreamMetadata m = metadataClient.metadata(stream).get(stream);
int newReplicaPort = m.getReplicas().get(0).getPort();
Client newConsumer =
cf.get(
new Client.ClientParameters()
.port(newReplicaPort)
.shutdownListener(shutdownListenerReference.get())
.chunkListener(
(client1, subscriptionId, offset, messageCount, dataSize) ->
client1.credit(subscriptionId, 1))
.messageListener(messageListener));
newConsumer.subscribe(
(byte) 1,
stream,
OffsetSpecification.offset(lastProcessedOffset.get() + 1),
10);
generation.incrementAndGet();
reconnectionLatch.countDown();
});
}
};
shutdownListenerReference.set(shutdownListener);
Client consumer =
cf.get(
new Client.ClientParameters()
.port(replica.getPort())
.shutdownListener(shutdownListener)
.chunkListener((client1, subscriptionId, offset, messageCount, dataSize) -> client1.credit(subscriptionId, 1))
.messageListener(messageListener)
);
.chunkListener(
(client1, subscriptionId, offset, messageCount, dataSize) ->
client1.credit(subscriptionId, 1))
.messageListener(messageListener));
Client.Response response = consumer.subscribe((byte) 1, stream, OffsetSpecification.first(), 10);
assertThat(response.isOk()).isTrue();
Client.Response response =
consumer.subscribe((byte) 1, stream, OffsetSpecification.first(), 10);
assertThat(response.isOk()).isTrue();
// let's publish for a bit of time
Thread.sleep(2000);
// let's publish for a bit of time
Thread.sleep(2000);
assertThat(confirmed).isNotEmpty();
assertThat(consumed).isNotEmpty();
int confirmedCount = confirmed.size();
assertThat(confirmed).isNotEmpty();
assertThat(consumed).isNotEmpty();
int confirmedCount = confirmed.size();
try {
Host.rabbitmqctl("stop_app", Host.node2name());
try {
Host.rabbitmqctl("stop_app", Host.node2name());
assertThat(reconnectionLatch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(reconnectionLatch.await(10, TimeUnit.SECONDS)).isTrue();
// let's publish for a bit of time
Thread.sleep(2000);
// let's publish for a bit of time
Thread.sleep(2000);
} finally {
Host.rabbitmqctl("start_app", Host.node2name());
}
assertThat(confirmed).hasSizeGreaterThan(confirmedCount);
confirmedCount = confirmed.size();
} finally {
Host.rabbitmqctl("start_app", Host.node2name());
}
assertThat(confirmed).hasSizeGreaterThan(confirmedCount);
confirmedCount = confirmed.size();
// wait until all the replicas are there
TestUtils.waitAtMost(Duration.ofSeconds(5), () -> {
Client.StreamMetadata m = metadataClient.metadata(stream).get(stream);
return m.getReplicas().size() == 2;
// wait until all the replicas are there
TestUtils.waitAtMost(
Duration.ofSeconds(5),
() -> {
Client.StreamMetadata m = metadataClient.metadata(stream).get(stream);
return m.getReplicas().size() == 2;
});
// let's publish for a bit of time
Thread.sleep(2000);
// let's publish for a bit of time
Thread.sleep(2000);
assertThat(confirmed).hasSizeGreaterThan(confirmedCount);
assertThat(confirmed).hasSizeGreaterThan(confirmedCount);
keepPublishing.set(false);
keepPublishing.set(false);
assertThat(publishingLatch.await(5, TimeUnit.SECONDS)).isTrue();
assertThat(publishingLatch.await(5, TimeUnit.SECONDS)).isTrue();
TestUtils.waitAtMost(Duration.ofSeconds(5), () -> consumed.size() >= confirmed.size());
TestUtils.waitAtMost(Duration.ofSeconds(5), () -> consumed.size() >= confirmed.size());
assertThat(generations).hasSize(2).contains(0L, 1L);
assertThat(consumed).hasSizeGreaterThanOrEqualTo(confirmed.size());
long lastMessageId = -1;
for (Message message : consumed) {
long messageId = message.getProperties().getMessageIdAsLong();
assertThat(messageId).isGreaterThanOrEqualTo(lastMessageId);
lastMessageId = messageId;
}
assertThat(lastMessageId).isPositive().isLessThanOrEqualTo(sequence.get());
confirmedIds.forEach(confirmedId -> assertThat(consumedIds).contains(confirmedId));
assertThat(generations).hasSize(2).contains(0L, 1L);
assertThat(consumed).hasSizeGreaterThanOrEqualTo(confirmed.size());
long lastMessageId = -1;
for (Message message : consumed) {
long messageId = message.getProperties().getMessageIdAsLong();
assertThat(messageId).isGreaterThanOrEqualTo(lastMessageId);
lastMessageId = messageId;
}
assertThat(lastMessageId).isPositive().isLessThanOrEqualTo(sequence.get());
confirmedIds.forEach(confirmedId -> assertThat(consumedIds).contains(confirmedId));
}
}

View File

@ -25,88 +25,93 @@ import java.net.UnknownHostException;
public class Host {
private static String capture(InputStream is)
throws IOException {
BufferedReader br = new BufferedReader(new InputStreamReader(is));
String line;
StringBuilder buff = new StringBuilder();
while ((line = br.readLine()) != null) {
buff.append(line).append("\n");
}
return buff.toString();
private static String capture(InputStream is) throws IOException {
BufferedReader br = new BufferedReader(new InputStreamReader(is));
String line;
StringBuilder buff = new StringBuilder();
while ((line = br.readLine()) != null) {
buff.append(line).append("\n");
}
return buff.toString();
}
private static Process executeCommand(String command) throws IOException {
Process pr = executeCommandProcess(command);
private static Process executeCommand(String command) throws IOException {
Process pr = executeCommandProcess(command);
int ev = waitForExitValue(pr);
if (ev != 0) {
String stdout = capture(pr.getInputStream());
String stderr = capture(pr.getErrorStream());
throw new IOException("unexpected command exit value: " + ev +
"\ncommand: " + command + "\n" +
"\nstdout:\n" + stdout +
"\nstderr:\n" + stderr + "\n");
}
return pr;
int ev = waitForExitValue(pr);
if (ev != 0) {
String stdout = capture(pr.getInputStream());
String stderr = capture(pr.getErrorStream());
throw new IOException(
"unexpected command exit value: "
+ ev
+ "\ncommand: "
+ command
+ "\n"
+ "\nstdout:\n"
+ stdout
+ "\nstderr:\n"
+ stderr
+ "\n");
}
return pr;
}
private static int waitForExitValue(Process pr) {
while (true) {
try {
pr.waitFor();
break;
} catch (InterruptedException ignored) {
}
}
return pr.exitValue();
private static int waitForExitValue(Process pr) {
while (true) {
try {
pr.waitFor();
break;
} catch (InterruptedException ignored) {
}
}
return pr.exitValue();
}
private static Process executeCommandProcess(String command) throws IOException {
String[] finalCommand;
if (System.getProperty("os.name").toLowerCase().contains("windows")) {
finalCommand = new String[4];
finalCommand[0] = "C:\\winnt\\system32\\cmd.exe";
finalCommand[1] = "/y";
finalCommand[2] = "/c";
finalCommand[3] = command;
} else {
finalCommand = new String[3];
finalCommand[0] = "/bin/sh";
finalCommand[1] = "-c";
finalCommand[2] = command;
}
return Runtime.getRuntime().exec(finalCommand);
private static Process executeCommandProcess(String command) throws IOException {
String[] finalCommand;
if (System.getProperty("os.name").toLowerCase().contains("windows")) {
finalCommand = new String[4];
finalCommand[0] = "C:\\winnt\\system32\\cmd.exe";
finalCommand[1] = "/y";
finalCommand[2] = "/c";
finalCommand[3] = command;
} else {
finalCommand = new String[3];
finalCommand[0] = "/bin/sh";
finalCommand[1] = "-c";
finalCommand[2] = command;
}
return Runtime.getRuntime().exec(finalCommand);
}
public static Process rabbitmqctl(String command) throws IOException {
return rabbitmqctl(command, node1name());
public static Process rabbitmqctl(String command) throws IOException {
return rabbitmqctl(command, node1name());
}
public static Process rabbitmqctl(String command, String nodename) throws IOException {
return executeCommand(rabbitmqctlCommand() + " -n '" + nodename + "'" + " " + command);
}
public static String node1name() {
try {
return System.getProperty(
"node1.name", "rabbit-1@" + InetAddress.getLocalHost().getHostName());
} catch (UnknownHostException e) {
throw new RuntimeException(e);
}
}
public static Process rabbitmqctl(String command, String nodename) throws IOException {
return executeCommand(rabbitmqctlCommand() +
" -n '" + nodename + "'" +
" " + command);
}
public static String node1name() {
try {
return System.getProperty("node1.name", "rabbit-1@" + InetAddress.getLocalHost().getHostName());
} catch (UnknownHostException e) {
throw new RuntimeException(e);
}
}
public static String node2name() {
try {
return System.getProperty("node2.name", "rabbit-2@" + InetAddress.getLocalHost().getHostName());
} catch (UnknownHostException e) {
throw new RuntimeException(e);
}
}
static String rabbitmqctlCommand() {
return System.getProperty("rabbitmqctl.bin");
public static String node2name() {
try {
return System.getProperty(
"node2.name", "rabbit-2@" + InetAddress.getLocalHost().getHostName());
} catch (UnknownHostException e) {
throw new RuntimeException(e);
}
}
static String rabbitmqctlCommand() {
return System.getProperty("rabbitmqctl.bin");
}
}

View File

@ -16,13 +16,9 @@
package com.rabbitmq.stream;
import com.rabbitmq.stream.impl.Client;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import static org.assertj.core.api.Assertions.assertThat;
import com.rabbitmq.stream.impl.Client;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
@ -32,106 +28,146 @@ import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import static org.assertj.core.api.Assertions.assertThat;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
@ExtendWith(TestUtils.StreamTestInfrastructureExtension.class)
public class StreamTest {
String stream;
TestUtils.ClientFactory cf;
String stream;
TestUtils.ClientFactory cf;
static Stream<Arguments> shouldBePossibleToPublishFromAnyNodeAndConsumeFromAnyMember() {
return Stream.of(
brokers("leader", metadata -> metadata.getLeader(), "leader", metadata -> metadata.getLeader()),
brokers("leader", metadata -> metadata.getLeader(), "replica", metadata -> metadata.getReplicas().iterator().next()),
brokers("replica", metadata -> metadata.getReplicas().iterator().next(), "leader", metadata -> metadata.getLeader()),
brokers("replica", metadata -> new ArrayList<>(metadata.getReplicas()).get(0), "replica", metadata -> new ArrayList<>(metadata.getReplicas()).get(1))
);
}
static Stream<Arguments> shouldBePossibleToPublishFromAnyNodeAndConsumeFromAnyMember() {
return Stream.of(
brokers(
"leader", metadata -> metadata.getLeader(), "leader", metadata -> metadata.getLeader()),
brokers(
"leader",
metadata -> metadata.getLeader(),
"replica",
metadata -> metadata.getReplicas().iterator().next()),
brokers(
"replica",
metadata -> metadata.getReplicas().iterator().next(),
"leader",
metadata -> metadata.getLeader()),
brokers(
"replica",
metadata -> new ArrayList<>(metadata.getReplicas()).get(0),
"replica",
metadata -> new ArrayList<>(metadata.getReplicas()).get(1)));
}
static Arguments brokers(String dp, Function<Client.StreamMetadata, Client.Broker> publisherBroker,
String dc, Function<Client.StreamMetadata, Client.Broker> consumerBroker) {
return Arguments.of(new FunctionWithToString<>(dp, publisherBroker), new FunctionWithToString<>(dc, consumerBroker));
}
static Arguments brokers(
String dp,
Function<Client.StreamMetadata, Client.Broker> publisherBroker,
String dc,
Function<Client.StreamMetadata, Client.Broker> consumerBroker) {
return Arguments.of(
new FunctionWithToString<>(dp, publisherBroker),
new FunctionWithToString<>(dc, consumerBroker));
}
@ParameterizedTest
@MethodSource
void shouldBePossibleToPublishFromAnyNodeAndConsumeFromAnyMember(Function<Client.StreamMetadata, Client.Broker> publisherBroker,
Function<Client.StreamMetadata, Client.Broker> consumerBroker) throws Exception {
@ParameterizedTest
@MethodSource
void shouldBePossibleToPublishFromAnyNodeAndConsumeFromAnyMember(
Function<Client.StreamMetadata, Client.Broker> publisherBroker,
Function<Client.StreamMetadata, Client.Broker> consumerBroker)
throws Exception {
int messageCount = 10_000;
Client client = cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode1()));
Map<String, Client.StreamMetadata> metadata = client.metadata(stream);
assertThat(metadata).hasSize(1).containsKey(stream);
Client.StreamMetadata streamMetadata = metadata.get(stream);
int messageCount = 10_000;
Client client = cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode1()));
Map<String, Client.StreamMetadata> metadata = client.metadata(stream);
assertThat(metadata).hasSize(1).containsKey(stream);
Client.StreamMetadata streamMetadata = metadata.get(stream);
CountDownLatch publishingLatch = new CountDownLatch(messageCount);
Client publisher = cf.get(new Client.ClientParameters()
CountDownLatch publishingLatch = new CountDownLatch(messageCount);
Client publisher =
cf.get(
new Client.ClientParameters()
.port(publisherBroker.apply(streamMetadata).getPort())
.publishConfirmListener((publisherId, publishingId) -> publishingLatch.countDown()));
.publishConfirmListener(
(publisherId, publishingId) -> publishingLatch.countDown()));
IntStream.range(0, messageCount).forEach(i -> publisher.publish(stream, (byte) 1, Collections.singletonList(
publisher.messageBuilder().addData(("hello " + i).getBytes(StandardCharsets.UTF_8)).build())));
IntStream.range(0, messageCount)
.forEach(
i ->
publisher.publish(
stream,
(byte) 1,
Collections.singletonList(
publisher
.messageBuilder()
.addData(("hello " + i).getBytes(StandardCharsets.UTF_8))
.build())));
assertThat(publishingLatch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(publishingLatch.await(10, TimeUnit.SECONDS)).isTrue();
CountDownLatch consumingLatch = new CountDownLatch(messageCount);
Set<String> bodies = ConcurrentHashMap.newKeySet(messageCount);
Client consumer = cf.get(new Client.ClientParameters()
CountDownLatch consumingLatch = new CountDownLatch(messageCount);
Set<String> bodies = ConcurrentHashMap.newKeySet(messageCount);
Client consumer =
cf.get(
new Client.ClientParameters()
.port(consumerBroker.apply(streamMetadata).getPort())
.chunkListener((client1, subscriptionId, offset, messageCount1, dataSize) -> client1.credit(subscriptionId, 10))
.messageListener((subscriptionId, offset, message) -> {
bodies.add(new String(message.getBodyAsBinary(), StandardCharsets.UTF_8));
consumingLatch.countDown();
})
);
.chunkListener(
(client1, subscriptionId, offset, messageCount1, dataSize) ->
client1.credit(subscriptionId, 10))
.messageListener(
(subscriptionId, offset, message) -> {
bodies.add(new String(message.getBodyAsBinary(), StandardCharsets.UTF_8));
consumingLatch.countDown();
}));
consumer.subscribe((byte) 1, stream, OffsetSpecification.first(), 10);
consumer.subscribe((byte) 1, stream, OffsetSpecification.first(), 10);
assertThat(consumingLatch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(bodies).hasSize(messageCount);
IntStream.range(0, messageCount).forEach(i -> assertThat(bodies.contains("hello " + i)));
}
assertThat(consumingLatch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(bodies).hasSize(messageCount);
IntStream.range(0, messageCount).forEach(i -> assertThat(bodies.contains("hello " + i)));
}
@Test
void metadataOnClusterShouldReturnLeaderAndReplicas() {
Client client = cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode1()));
Map<String, Client.StreamMetadata> metadata = client.metadata(stream);
assertThat(metadata).hasSize(1).containsKey(stream);
Client.StreamMetadata streamMetadata = metadata.get(stream);
assertThat(streamMetadata.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_OK);
assertThat(streamMetadata.getReplicas()).hasSize(2);
@Test
void metadataOnClusterShouldReturnLeaderAndReplicas() {
Client client = cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode1()));
Map<String, Client.StreamMetadata> metadata = client.metadata(stream);
assertThat(metadata).hasSize(1).containsKey(stream);
Client.StreamMetadata streamMetadata = metadata.get(stream);
assertThat(streamMetadata.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_OK);
assertThat(streamMetadata.getReplicas()).hasSize(2);
BiConsumer<Client.Broker, Client.Broker> assertNodesAreDifferent = (node, anotherNode) -> {
assertThat(node.getHost()).isEqualTo(anotherNode.getHost());
assertThat(node.getPort()).isNotEqualTo(anotherNode.getPort());
BiConsumer<Client.Broker, Client.Broker> assertNodesAreDifferent =
(node, anotherNode) -> {
assertThat(node.getHost()).isEqualTo(anotherNode.getHost());
assertThat(node.getPort()).isNotEqualTo(anotherNode.getPort());
};
streamMetadata.getReplicas().forEach(replica -> assertNodesAreDifferent.accept(replica, streamMetadata.getLeader()));
List<Client.Broker> replicas = new ArrayList<>(streamMetadata.getReplicas());
assertNodesAreDifferent.accept(replicas.get(0), replicas.get(1));
streamMetadata
.getReplicas()
.forEach(replica -> assertNodesAreDifferent.accept(replica, streamMetadata.getLeader()));
List<Client.Broker> replicas = new ArrayList<>(streamMetadata.getReplicas());
assertNodesAreDifferent.accept(replicas.get(0), replicas.get(1));
}
static class FunctionWithToString<T, R> implements Function<T, R> {
final String toString;
final Function<T, R> delegate;
FunctionWithToString(String toString, Function<T, R> delegate) {
this.toString = toString;
this.delegate = delegate;
}
static class FunctionWithToString<T, R> implements Function<T, R> {
final String toString;
final Function<T, R> delegate;
FunctionWithToString(String toString, Function<T, R> delegate) {
this.toString = toString;
this.delegate = delegate;
}
@Override
public R apply(T t) {
return delegate.apply(t);
}
@Override
public String toString() {
return toString;
}
@Override
public R apply(T t) {
return delegate.apply(t);
}
@Override
public String toString() {
return toString;
}
}
}

View File

@ -16,162 +16,164 @@
package com.rabbitmq.stream;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.fail;
import com.rabbitmq.stream.impl.Client;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import org.junit.jupiter.api.extension.*;
import java.lang.reflect.Field;
import java.time.Duration;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BooleanSupplier;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.fail;
import org.junit.jupiter.api.extension.*;
public class TestUtils {
static int streamPortNode1() {
String port = System.getProperty("node1.stream.port", "5555");
return Integer.valueOf(port);
static int streamPortNode1() {
String port = System.getProperty("node1.stream.port", "5555");
return Integer.valueOf(port);
}
static int streamPortNode2() {
String port = System.getProperty("node2.stream.port", "5556");
return Integer.valueOf(port);
}
static void waitAtMost(Duration duration, BooleanSupplier condition) throws InterruptedException {
if (condition.getAsBoolean()) {
return;
}
int waitTime = 100;
int waitedTime = 0;
long timeoutInMs = duration.toMillis();
while (waitedTime <= timeoutInMs) {
Thread.sleep(waitTime);
if (condition.getAsBoolean()) {
return;
}
waitedTime += waitTime;
}
fail("Waited " + duration.getSeconds() + " second(s), condition never got true");
}
static class StreamTestInfrastructureExtension
implements BeforeAllCallback, AfterAllCallback, BeforeEachCallback, AfterEachCallback {
private static final ExtensionContext.Namespace NAMESPACE =
ExtensionContext.Namespace.create(StreamTestInfrastructureExtension.class);
private static ExtensionContext.Store store(ExtensionContext extensionContext) {
return extensionContext.getRoot().getStore(NAMESPACE);
}
static int streamPortNode2() {
String port = System.getProperty("node2.stream.port", "5556");
return Integer.valueOf(port);
private static EventLoopGroup eventLoopGroup(ExtensionContext context) {
return (EventLoopGroup) store(context).get("nettyEventLoopGroup");
}
static void waitAtMost(Duration duration, BooleanSupplier condition) throws InterruptedException {
if (condition.getAsBoolean()) {
return;
}
int waitTime = 100;
int waitedTime = 0;
long timeoutInMs = duration.toMillis();
while (waitedTime <= timeoutInMs) {
Thread.sleep(waitTime);
if (condition.getAsBoolean()) {
return;
}
waitedTime += waitTime;
}
fail("Waited " + duration.getSeconds() + " second(s), condition never got true");
@Override
public void beforeAll(ExtensionContext context) {
store(context).put("nettyEventLoopGroup", new NioEventLoopGroup());
}
static class StreamTestInfrastructureExtension implements BeforeAllCallback, AfterAllCallback, BeforeEachCallback, AfterEachCallback {
@Override
public void beforeEach(ExtensionContext context) throws Exception {
try {
Field streamField =
context.getTestInstance().get().getClass().getDeclaredField("eventLoopGroup");
streamField.setAccessible(true);
streamField.set(context.getTestInstance().get(), eventLoopGroup(context));
} catch (NoSuchFieldException e) {
private static final ExtensionContext.Namespace NAMESPACE = ExtensionContext.Namespace.create(StreamTestInfrastructureExtension.class);
}
try {
Field streamField = context.getTestInstance().get().getClass().getDeclaredField("stream");
streamField.setAccessible(true);
String stream = UUID.randomUUID().toString();
streamField.set(context.getTestInstance().get(), stream);
Client client =
new Client(
new Client.ClientParameters()
.eventLoopGroup(eventLoopGroup(context))
.port(streamPortNode1()));
Client.Response response = client.create(stream);
assertThat(response.isOk()).isTrue();
client.close();
store(context).put("testMethodStream", stream);
} catch (NoSuchFieldException e) {
private static ExtensionContext.Store store(ExtensionContext extensionContext) {
return extensionContext.getRoot().getStore(NAMESPACE);
}
for (Field declaredField : context.getTestInstance().get().getClass().getDeclaredFields()) {
if (declaredField.getType().equals(ClientFactory.class)) {
declaredField.setAccessible(true);
ClientFactory clientFactory = new ClientFactory(eventLoopGroup(context));
declaredField.set(context.getTestInstance().get(), clientFactory);
store(context).put("testClientFactory", clientFactory);
break;
}
private static EventLoopGroup eventLoopGroup(ExtensionContext context) {
return (EventLoopGroup) store(context).get("nettyEventLoopGroup");
}
@Override
public void beforeAll(ExtensionContext context) {
store(context).put("nettyEventLoopGroup", new NioEventLoopGroup());
}
@Override
public void beforeEach(ExtensionContext context) throws Exception {
try {
Field streamField = context.getTestInstance().get().getClass().getDeclaredField("eventLoopGroup");
streamField.setAccessible(true);
streamField.set(context.getTestInstance().get(), eventLoopGroup(context));
} catch (NoSuchFieldException e) {
}
try {
Field streamField = context.getTestInstance().get().getClass().getDeclaredField("stream");
streamField.setAccessible(true);
String stream = UUID.randomUUID().toString();
streamField.set(context.getTestInstance().get(), stream);
Client client = new Client(new Client.ClientParameters().eventLoopGroup(eventLoopGroup(context))
.port(streamPortNode1())
);
Client.Response response = client.create(stream);
assertThat(response.isOk()).isTrue();
client.close();
store(context).put("testMethodStream", stream);
} catch (NoSuchFieldException e) {
}
for (Field declaredField : context.getTestInstance().get().getClass().getDeclaredFields()) {
if (declaredField.getType().equals(ClientFactory.class)) {
declaredField.setAccessible(true);
ClientFactory clientFactory = new ClientFactory(eventLoopGroup(context));
declaredField.set(context.getTestInstance().get(), clientFactory);
store(context).put("testClientFactory", clientFactory);
break;
}
}
}
@Override
public void afterEach(ExtensionContext context) throws Exception {
try {
Field streamField = context.getTestInstance().get().getClass().getDeclaredField("stream");
streamField.setAccessible(true);
String stream = (String) streamField.get(context.getTestInstance().get());
Client client = new Client(new Client.ClientParameters().eventLoopGroup(eventLoopGroup(context))
.port(streamPortNode1())
);
Client.Response response = client.delete(stream);
assertThat(response.isOk()).isTrue();
client.close();
store(context).remove("testMethodStream");
} catch (NoSuchFieldException e) {
}
ClientFactory clientFactory = (ClientFactory) store(context).get("testClientFactory");
if (clientFactory != null) {
clientFactory.close();
}
}
@Override
public void afterAll(ExtensionContext context) throws Exception {
EventLoopGroup eventLoopGroup = eventLoopGroup(context);
eventLoopGroup.shutdownGracefully(1, 10, SECONDS).get(10, SECONDS);
}
}
}
static class ClientFactory {
@Override
public void afterEach(ExtensionContext context) throws Exception {
try {
Field streamField = context.getTestInstance().get().getClass().getDeclaredField("stream");
streamField.setAccessible(true);
String stream = (String) streamField.get(context.getTestInstance().get());
Client client =
new Client(
new Client.ClientParameters()
.eventLoopGroup(eventLoopGroup(context))
.port(streamPortNode1()));
Client.Response response = client.delete(stream);
assertThat(response.isOk()).isTrue();
client.close();
store(context).remove("testMethodStream");
} catch (NoSuchFieldException e) {
private final EventLoopGroup eventLoopGroup;
private final Set<Client> clients = ConcurrentHashMap.newKeySet();
}
public ClientFactory(EventLoopGroup eventLoopGroup) {
this.eventLoopGroup = eventLoopGroup;
}
public Client get() {
return get(new Client.ClientParameters());
}
public Client get(Client.ClientParameters parameters) {
// don't set the port, it would override the caller's port setting
Client client = new Client(parameters.eventLoopGroup(eventLoopGroup));
clients.add(client);
return client;
}
private void close() {
for (Client c : clients) {
c.close();
}
}
ClientFactory clientFactory = (ClientFactory) store(context).get("testClientFactory");
if (clientFactory != null) {
clientFactory.close();
}
}
@Override
public void afterAll(ExtensionContext context) throws Exception {
EventLoopGroup eventLoopGroup = eventLoopGroup(context);
eventLoopGroup.shutdownGracefully(1, 10, SECONDS).get(10, SECONDS);
}
}
static class ClientFactory {
private final EventLoopGroup eventLoopGroup;
private final Set<Client> clients = ConcurrentHashMap.newKeySet();
public ClientFactory(EventLoopGroup eventLoopGroup) {
this.eventLoopGroup = eventLoopGroup;
}
public Client get() {
return get(new Client.ClientParameters());
}
public Client get(Client.ClientParameters parameters) {
// don't set the port, it would override the caller's port setting
Client client = new Client(parameters.eventLoopGroup(eventLoopGroup));
clients.add(client);
return client;
}
private void close() {
for (Client c : clients) {
c.close();
}
}
}
}