diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/pom.xml b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/pom.xml index 7bc9e659ed..aa27c29baf 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/pom.xml +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/pom.xml @@ -27,13 +27,14 @@ 0.1.0-SNAPSHOT - 0.33.5 - 5.6.2 - 3.16.1 - 3.3.3 + 0.33.6 + 5.7.0 + 3.17.2 + 3.5.11 1.2.3 3.8.1 2.22.2 + 2.2.0 UTF-8 @@ -110,6 +111,20 @@ ${maven-surefire-plugin.version} + + com.diffplug.spotless + spotless-maven-plugin + ${spotless.version} + + + + 1.9 + + + + + + diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java index ecabe0ee5c..c7a390f00d 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/FailureTest.java @@ -16,12 +16,11 @@ package com.rabbitmq.stream; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; + import com.rabbitmq.stream.codec.WrapperMessageBuilder; import com.rabbitmq.stream.impl.Client; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; - import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.*; @@ -29,88 +28,100 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.fail; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; @ExtendWith(TestUtils.StreamTestInfrastructureExtension.class) public class FailureTest { - TestUtils.ClientFactory cf; - String stream; - ExecutorService executorService; + TestUtils.ClientFactory cf; + String stream; + ExecutorService executorService; - static void wait(Duration duration) { - try { - Thread.sleep(duration.toMillis()); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } + static void wait(Duration duration) { + try { + Thread.sleep(duration.toMillis()); + } catch (InterruptedException e) { + throw new RuntimeException(e); } + } - @AfterEach - void tearDown() { - if (executorService != null) { - executorService.shutdownNow(); - } + @AfterEach + void tearDown() { + if (executorService != null) { + executorService.shutdownNow(); } + } - @Test - void leaderFailureWhenPublisherConnectedToReplica() throws Exception { - Set messages = new HashSet<>(); - Client client = cf.get(new Client.ClientParameters() - .port(TestUtils.streamPortNode1()) - ); - Map metadata = client.metadata(stream); - Client.StreamMetadata streamMetadata = metadata.get(stream); - assertThat(streamMetadata).isNotNull(); + @Test + void leaderFailureWhenPublisherConnectedToReplica() throws Exception { + Set messages = new HashSet<>(); + Client client = cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode1())); + Map metadata = client.metadata(stream); + Client.StreamMetadata streamMetadata = metadata.get(stream); + assertThat(streamMetadata).isNotNull(); - assertThat(streamMetadata.getLeader().getPort()).isEqualTo(TestUtils.streamPortNode1()); - assertThat(streamMetadata.getReplicas()).isNotEmpty(); - Client.Broker replica = streamMetadata.getReplicas().get(0); - assertThat(replica.getPort()).isNotEqualTo(TestUtils.streamPortNode1()); + assertThat(streamMetadata.getLeader().getPort()).isEqualTo(TestUtils.streamPortNode1()); + assertThat(streamMetadata.getReplicas()).isNotEmpty(); + Client.Broker replica = streamMetadata.getReplicas().get(0); + assertThat(replica.getPort()).isNotEqualTo(TestUtils.streamPortNode1()); - AtomicReference confirmLatch = new AtomicReference<>(new CountDownLatch(1)); + AtomicReference confirmLatch = new AtomicReference<>(new CountDownLatch(1)); - CountDownLatch metadataLatch = new CountDownLatch(1); - Client publisher = cf.get(new Client.ClientParameters() + CountDownLatch metadataLatch = new CountDownLatch(1); + Client publisher = + cf.get( + new Client.ClientParameters() .port(replica.getPort()) .metadataListener((stream, code) -> metadataLatch.countDown()) - .publishConfirmListener((publisherId, publishingId) -> confirmLatch.get().countDown())); - String message = "all nodes available"; - messages.add(message); - publisher.publish(stream, (byte) 1, - Collections.singletonList(publisher.messageBuilder().addData(message.getBytes(StandardCharsets.UTF_8)).build())); - assertThat(confirmLatch.get().await(10, TimeUnit.SECONDS)).isTrue(); - confirmLatch.set(null); + .publishConfirmListener( + (publisherId, publishingId) -> confirmLatch.get().countDown())); + String message = "all nodes available"; + messages.add(message); + publisher.publish( + stream, + (byte) 1, + Collections.singletonList( + publisher.messageBuilder().addData(message.getBytes(StandardCharsets.UTF_8)).build())); + assertThat(confirmLatch.get().await(10, TimeUnit.SECONDS)).isTrue(); + confirmLatch.set(null); - try { - Host.rabbitmqctl("stop_app"); - try { - cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode1())); - fail("Node app stopped, connecting should not be possible"); - } catch (Exception e) { - // OK - } + try { + Host.rabbitmqctl("stop_app"); + try { + cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode1())); + fail("Node app stopped, connecting should not be possible"); + } catch (Exception e) { + // OK + } - assertThat(metadataLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(metadataLatch.await(10, TimeUnit.SECONDS)).isTrue(); - // wait until there's a new leader - TestUtils.waitAtMost(Duration.ofSeconds(10), () -> { - Client.StreamMetadata m = publisher.metadata(stream).get(stream); - return m.getLeader() != null && m.getLeader().getPort() != TestUtils.streamPortNode1(); - }); + // wait until there's a new leader + TestUtils.waitAtMost( + Duration.ofSeconds(10), + () -> { + Client.StreamMetadata m = publisher.metadata(stream).get(stream); + return m.getLeader() != null && m.getLeader().getPort() != TestUtils.streamPortNode1(); + }); - confirmLatch.set(new CountDownLatch(1)); - message = "2 nodes available"; - messages.add(message); - publisher.publish(stream, (byte) 1, Collections.singletonList(publisher.messageBuilder() - .addData(message.getBytes(StandardCharsets.UTF_8)).build())); - assertThat(confirmLatch.get().await(10, TimeUnit.SECONDS)).isTrue(); - confirmLatch.set(null); - } finally { - Host.rabbitmqctl("start_app"); - } + confirmLatch.set(new CountDownLatch(1)); + message = "2 nodes available"; + messages.add(message); + publisher.publish( + stream, + (byte) 1, + Collections.singletonList( + publisher + .messageBuilder() + .addData(message.getBytes(StandardCharsets.UTF_8)) + .build())); + assertThat(confirmLatch.get().await(10, TimeUnit.SECONDS)).isTrue(); + confirmLatch.set(null); + } finally { + Host.rabbitmqctl("start_app"); + } // wait until all the replicas are there TestUtils.waitAtMost( @@ -120,350 +131,411 @@ public class FailureTest { return m.getReplicas().size() == 2; }); - confirmLatch.set(new CountDownLatch(1)); - message = "all nodes are back"; - messages.add(message); - publisher.publish(stream, (byte) 1, Collections.singletonList(publisher.messageBuilder() - .addData(message.getBytes(StandardCharsets.UTF_8)).build())); - assertThat(confirmLatch.get().await(10, TimeUnit.SECONDS)).isTrue(); - confirmLatch.set(null); + confirmLatch.set(new CountDownLatch(1)); + message = "all nodes are back"; + messages.add(message); + publisher.publish( + stream, + (byte) 1, + Collections.singletonList( + publisher.messageBuilder().addData(message.getBytes(StandardCharsets.UTF_8)).build())); + assertThat(confirmLatch.get().await(10, TimeUnit.SECONDS)).isTrue(); + confirmLatch.set(null); - CountDownLatch consumeLatch = new CountDownLatch(2); - Set bodies = ConcurrentHashMap.newKeySet(); - Client consumer = cf.get(new Client.ClientParameters() + CountDownLatch consumeLatch = new CountDownLatch(2); + Set bodies = ConcurrentHashMap.newKeySet(); + Client consumer = + cf.get( + new Client.ClientParameters() .port(TestUtils.streamPortNode1()) - .messageListener((subscriptionId, offset, msg) -> { - bodies.add(new String(msg.getBodyAsBinary(), StandardCharsets.UTF_8)); - consumeLatch.countDown(); - })); + .messageListener( + (subscriptionId, offset, msg) -> { + bodies.add(new String(msg.getBodyAsBinary(), StandardCharsets.UTF_8)); + consumeLatch.countDown(); + })); - TestUtils.waitAtMost(Duration.ofSeconds(5), () -> { - Client.Response response = consumer.subscribe((byte) 1, stream, OffsetSpecification.first(), 10); - return response.isOk(); + TestUtils.waitAtMost( + Duration.ofSeconds(5), + () -> { + Client.Response response = + consumer.subscribe((byte) 1, stream, OffsetSpecification.first(), 10); + return response.isOk(); }); - assertThat(consumeLatch.await(10, TimeUnit.SECONDS)).isTrue(); - assertThat(bodies).hasSize(3).contains("all nodes available", "2 nodes available", "all nodes are back"); - } + assertThat(consumeLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(bodies) + .hasSize(3) + .contains("all nodes available", "2 nodes available", "all nodes are back"); + } - @Test - void noLostConfirmedMessagesWhenLeaderGoesAway() throws Exception { - executorService = Executors.newCachedThreadPool(); - Client client = cf.get(new Client.ClientParameters() - .port(TestUtils.streamPortNode1()) - ); - Map metadata = client.metadata(stream); - Client.StreamMetadata streamMetadata = metadata.get(stream); - assertThat(streamMetadata).isNotNull(); + @Test + void noLostConfirmedMessagesWhenLeaderGoesAway() throws Exception { + executorService = Executors.newCachedThreadPool(); + Client client = cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode1())); + Map metadata = client.metadata(stream); + Client.StreamMetadata streamMetadata = metadata.get(stream); + assertThat(streamMetadata).isNotNull(); - assertThat(streamMetadata.getLeader()).isNotNull(); - assertThat(streamMetadata.getLeader().getPort()).isEqualTo(TestUtils.streamPortNode1()); + assertThat(streamMetadata.getLeader()).isNotNull(); + assertThat(streamMetadata.getLeader().getPort()).isEqualTo(TestUtils.streamPortNode1()); - Map published = new ConcurrentHashMap<>(); - Set confirmed = ConcurrentHashMap.newKeySet(); + Map published = new ConcurrentHashMap<>(); + Set confirmed = ConcurrentHashMap.newKeySet(); - Client.PublishConfirmListener publishConfirmListener = (publisherId, publishingId) -> { - Message confirmedMessage; - int attempts = 0; - while ((confirmedMessage = published.remove(publishingId)) == null && attempts < 10) { - wait(Duration.ofMillis(5)); - attempts++; - } - confirmed.add(confirmedMessage); + Client.PublishConfirmListener publishConfirmListener = + (publisherId, publishingId) -> { + Message confirmedMessage; + int attempts = 0; + while ((confirmedMessage = published.remove(publishingId)) == null && attempts < 10) { + wait(Duration.ofMillis(5)); + attempts++; + } + confirmed.add(confirmedMessage); }; - AtomicLong generation = new AtomicLong(0); - AtomicLong sequence = new AtomicLong(0); - AtomicBoolean connected = new AtomicBoolean(true); - AtomicReference publisher = new AtomicReference<>(); - CountDownLatch reconnectionLatch = new CountDownLatch(1); - AtomicReference shutdownListenerReference = new AtomicReference<>(); - Client.ShutdownListener shutdownListener = shutdownContext -> { - if (shutdownContext.getShutdownReason() == Client.ShutdownContext.ShutdownReason.UNKNOWN) { - // avoid long-running task in the IO thread - executorService.submit(() -> { - connected.set(false); + AtomicLong generation = new AtomicLong(0); + AtomicLong sequence = new AtomicLong(0); + AtomicBoolean connected = new AtomicBoolean(true); + AtomicReference publisher = new AtomicReference<>(); + CountDownLatch reconnectionLatch = new CountDownLatch(1); + AtomicReference shutdownListenerReference = new AtomicReference<>(); + Client.ShutdownListener shutdownListener = + shutdownContext -> { + if (shutdownContext.getShutdownReason() + == Client.ShutdownContext.ShutdownReason.UNKNOWN) { + // avoid long-running task in the IO thread + executorService.submit( + () -> { + connected.set(false); - Client locator = cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode2())); - // wait until there's a new leader - try { - TestUtils.waitAtMost(Duration.ofSeconds(5), () -> { - Client.StreamMetadata m = locator.metadata(stream).get(stream); - return m.getLeader() != null && m.getLeader().getPort() != TestUtils.streamPortNode1(); + Client locator = + cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode2())); + // wait until there's a new leader + try { + TestUtils.waitAtMost( + Duration.ofSeconds(5), + () -> { + Client.StreamMetadata m = locator.metadata(stream).get(stream); + return m.getLeader() != null + && m.getLeader().getPort() != TestUtils.streamPortNode1(); }); - } catch (Throwable e) { - reconnectionLatch.countDown(); - return; - } - - int newLeaderPort = locator.metadata(stream).get(stream).getLeader().getPort(); - Client newPublisher = cf.get(new Client.ClientParameters() - .port(newLeaderPort) - .shutdownListener(shutdownListenerReference.get()) - .publishConfirmListener(publishConfirmListener) - ); - - generation.incrementAndGet(); - published.clear(); - publisher.set(newPublisher); - connected.set(true); - + } catch (Throwable e) { reconnectionLatch.countDown(); - }); - } - }; - shutdownListenerReference.set(shutdownListener); + return; + } - client = cf.get(new Client.ClientParameters() + int newLeaderPort = locator.metadata(stream).get(stream).getLeader().getPort(); + Client newPublisher = + cf.get( + new Client.ClientParameters() + .port(newLeaderPort) + .shutdownListener(shutdownListenerReference.get()) + .publishConfirmListener(publishConfirmListener)); + + generation.incrementAndGet(); + published.clear(); + publisher.set(newPublisher); + connected.set(true); + + reconnectionLatch.countDown(); + }); + } + }; + shutdownListenerReference.set(shutdownListener); + + client = + cf.get( + new Client.ClientParameters() .port(streamMetadata.getLeader().getPort()) .shutdownListener(shutdownListener) .publishConfirmListener(publishConfirmListener)); - publisher.set(client); + publisher.set(client); - AtomicBoolean keepPublishing = new AtomicBoolean(true); + AtomicBoolean keepPublishing = new AtomicBoolean(true); - executorService.submit(() -> { - while (keepPublishing.get()) { - if (connected.get()) { - Message message = publisher.get().messageBuilder() - .properties().messageId(sequence.getAndIncrement()) - .messageBuilder().applicationProperties().entry("generation", generation.get()) - .messageBuilder().build(); - try { - long publishingId = publisher.get().publish(stream, (byte) 1, Collections.singletonList(message)).get(0); - published.put(publishingId, message); - } catch (Exception e) { - // keep going - } - wait(Duration.ofMillis(10)); - } else { - wait(Duration.ofSeconds(1)); - } + executorService.submit( + () -> { + while (keepPublishing.get()) { + if (connected.get()) { + Message message = + publisher + .get() + .messageBuilder() + .properties() + .messageId(sequence.getAndIncrement()) + .messageBuilder() + .applicationProperties() + .entry("generation", generation.get()) + .messageBuilder() + .build(); + try { + long publishingId = + publisher + .get() + .publish(stream, (byte) 1, Collections.singletonList(message)) + .get(0); + published.put(publishingId, message); + } catch (Exception e) { + // keep going + } + wait(Duration.ofMillis(10)); + } else { + wait(Duration.ofSeconds(1)); } + } }); - // let's publish for a bit of time - Thread.sleep(2000); + // let's publish for a bit of time + Thread.sleep(2000); - assertThat(confirmed).isNotEmpty(); - int confirmedCount = confirmed.size(); + assertThat(confirmed).isNotEmpty(); + int confirmedCount = confirmed.size(); - try { - Host.rabbitmqctl("stop_app"); + try { + Host.rabbitmqctl("stop_app"); - assertThat(reconnectionLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(reconnectionLatch.await(10, TimeUnit.SECONDS)).isTrue(); - // let's publish for a bit of time - Thread.sleep(2000); + // let's publish for a bit of time + Thread.sleep(2000); - } finally { - Host.rabbitmqctl("start_app"); - } - assertThat(confirmed).hasSizeGreaterThan(confirmedCount); - confirmedCount = confirmed.size(); - - Client metadataClient = cf.get(new Client.ClientParameters() - .port(TestUtils.streamPortNode2()) - ); - // wait until all the replicas are there - TestUtils.waitAtMost(Duration.ofSeconds(5), () -> { - Client.StreamMetadata m = metadataClient.metadata(stream).get(stream); - return m.getReplicas().size() == 2; - }); - - // let's publish for a bit of time - Thread.sleep(2000); - - assertThat(confirmed).hasSizeGreaterThan(confirmedCount); - - keepPublishing.set(false); - - Queue consumed = new ConcurrentLinkedQueue<>(); - Set generations = ConcurrentHashMap.newKeySet(); - CountDownLatch consumedLatch = new CountDownLatch(1); - Client.StreamMetadata m = metadataClient.metadata(stream).get(stream); - Client consumer = cf.get(new Client.ClientParameters() - .port(m.getReplicas().get(0).getPort()) - .chunkListener((client1, subscriptionId, offset, messageCount, dataSize) -> client1.credit(subscriptionId, 1)) - .messageListener((subscriptionId, offset, message) -> { - consumed.add(message); - generations.add((Long) message.getApplicationProperties().get("generation")); - if (consumed.size() == confirmed.size()) { - consumedLatch.countDown(); - } - }) - ); - - Client.Response response = consumer.subscribe((byte) 1, stream, OffsetSpecification.first(), 10); - assertThat(response.isOk()).isTrue(); - - assertThat(consumedLatch.await(5, TimeUnit.SECONDS)).isTrue(); - assertThat(generations).hasSize(2).contains(0L, 1L); - assertThat(consumed).hasSizeGreaterThanOrEqualTo(confirmed.size()); - long lastMessageId = -1; - for (Message message : consumed) { - long messageId = message.getProperties().getMessageIdAsLong(); - assertThat(messageId).isGreaterThanOrEqualTo(lastMessageId); - lastMessageId = messageId; - } - assertThat(lastMessageId).isPositive().isLessThanOrEqualTo(sequence.get()); + } finally { + Host.rabbitmqctl("start_app"); } + assertThat(confirmed).hasSizeGreaterThan(confirmedCount); + confirmedCount = confirmed.size(); - @Test - void consumerReattachesToOtherReplicaWhenReplicaGoesAway() throws Exception { - executorService = Executors.newCachedThreadPool(); - Client metadataClient = cf.get(new Client.ClientParameters() - .port(TestUtils.streamPortNode1()) - ); - Map metadata = metadataClient.metadata(stream); - Client.StreamMetadata streamMetadata = metadata.get(stream); - assertThat(streamMetadata).isNotNull(); - - assertThat(streamMetadata.getLeader()).isNotNull(); - assertThat(streamMetadata.getLeader().getPort()).isEqualTo(TestUtils.streamPortNode1()); - - Map published = new ConcurrentHashMap<>(); - Set confirmed = ConcurrentHashMap.newKeySet(); - Set confirmedIds = ConcurrentHashMap.newKeySet(); - Client.PublishConfirmListener publishConfirmListener = (publisherId, publishingId) -> { - Message confirmedMessage; - int attempts = 0; - while ((confirmedMessage = published.remove(publishingId)) == null && attempts < 10) { - wait(Duration.ofMillis(5)); - attempts++; - } - confirmed.add(confirmedMessage); - confirmedIds.add(confirmedMessage.getProperties().getMessageIdAsLong()); - }; - - Client publisher = cf.get(new Client.ClientParameters() - .port(streamMetadata.getLeader().getPort()) - .publishConfirmListener(publishConfirmListener) - ); - - AtomicLong generation = new AtomicLong(0); - AtomicLong sequence = new AtomicLong(0); - AtomicBoolean keepPublishing = new AtomicBoolean(true); - CountDownLatch publishingLatch = new CountDownLatch(1); - - executorService.submit(() -> { - while (keepPublishing.get()) { - Message message = new WrapperMessageBuilder() - .properties().messageId(sequence.getAndIncrement()) - .messageBuilder().applicationProperties().entry("generation", generation.get()) - .messageBuilder().build(); - try { - long publishingId = publisher.publish(stream, (byte) 1, Collections.singletonList(message)).get(0); - published.put(publishingId, message); - } catch (Exception e) { - // keep going - } - wait(Duration.ofMillis(10)); - } - publishingLatch.countDown(); + Client metadataClient = cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode2())); + // wait until all the replicas are there + TestUtils.waitAtMost( + Duration.ofSeconds(5), + () -> { + Client.StreamMetadata m = metadataClient.metadata(stream).get(stream); + return m.getReplicas().size() == 2; }); - Queue consumed = new ConcurrentLinkedQueue<>(); + // let's publish for a bit of time + Thread.sleep(2000); - Client.Broker replica = streamMetadata.getReplicas().stream() - .filter(broker -> broker.getPort() == TestUtils.streamPortNode2()) - .findFirst() - .orElseThrow(() -> new NoSuchElementException()); + assertThat(confirmed).hasSizeGreaterThan(confirmedCount); - AtomicLong lastProcessedOffset = new AtomicLong(-1); - Set generations = ConcurrentHashMap.newKeySet(); - Set consumedIds = ConcurrentHashMap.newKeySet(); - Client.MessageListener messageListener = (subscriptionId, offset, message) -> { - consumed.add(message); - generations.add((Long) message.getApplicationProperties().get("generation")); - consumedIds.add(message.getProperties().getMessageIdAsLong()); - lastProcessedOffset.set(offset); + keepPublishing.set(false); + + Queue consumed = new ConcurrentLinkedQueue<>(); + Set generations = ConcurrentHashMap.newKeySet(); + CountDownLatch consumedLatch = new CountDownLatch(1); + Client.StreamMetadata m = metadataClient.metadata(stream).get(stream); + Client consumer = + cf.get( + new Client.ClientParameters() + .port(m.getReplicas().get(0).getPort()) + .chunkListener( + (client1, subscriptionId, offset, messageCount, dataSize) -> + client1.credit(subscriptionId, 1)) + .messageListener( + (subscriptionId, offset, message) -> { + consumed.add(message); + generations.add((Long) message.getApplicationProperties().get("generation")); + if (consumed.size() == confirmed.size()) { + consumedLatch.countDown(); + } + })); + + Client.Response response = + consumer.subscribe((byte) 1, stream, OffsetSpecification.first(), 10); + assertThat(response.isOk()).isTrue(); + + assertThat(consumedLatch.await(5, TimeUnit.SECONDS)).isTrue(); + assertThat(generations).hasSize(2).contains(0L, 1L); + assertThat(consumed).hasSizeGreaterThanOrEqualTo(confirmed.size()); + long lastMessageId = -1; + for (Message message : consumed) { + long messageId = message.getProperties().getMessageIdAsLong(); + assertThat(messageId).isGreaterThanOrEqualTo(lastMessageId); + lastMessageId = messageId; + } + assertThat(lastMessageId).isPositive().isLessThanOrEqualTo(sequence.get()); + } + + @Test + void consumerReattachesToOtherReplicaWhenReplicaGoesAway() throws Exception { + executorService = Executors.newCachedThreadPool(); + Client metadataClient = cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode1())); + Map metadata = metadataClient.metadata(stream); + Client.StreamMetadata streamMetadata = metadata.get(stream); + assertThat(streamMetadata).isNotNull(); + + assertThat(streamMetadata.getLeader()).isNotNull(); + assertThat(streamMetadata.getLeader().getPort()).isEqualTo(TestUtils.streamPortNode1()); + + Map published = new ConcurrentHashMap<>(); + Set confirmed = ConcurrentHashMap.newKeySet(); + Set confirmedIds = ConcurrentHashMap.newKeySet(); + Client.PublishConfirmListener publishConfirmListener = + (publisherId, publishingId) -> { + Message confirmedMessage; + int attempts = 0; + while ((confirmedMessage = published.remove(publishingId)) == null && attempts < 10) { + wait(Duration.ofMillis(5)); + attempts++; + } + confirmed.add(confirmedMessage); + confirmedIds.add(confirmedMessage.getProperties().getMessageIdAsLong()); }; - CountDownLatch reconnectionLatch = new CountDownLatch(1); - AtomicReference shutdownListenerReference = new AtomicReference<>(); - Client.ShutdownListener shutdownListener = shutdownContext -> { - if (shutdownContext.getShutdownReason() == Client.ShutdownContext.ShutdownReason.UNKNOWN) { - // avoid long-running task in the IO thread - executorService.submit(() -> { - Client.StreamMetadata m = metadataClient.metadata(stream).get(stream); - int newReplicaPort = m.getReplicas().get(0).getPort(); + Client publisher = + cf.get( + new Client.ClientParameters() + .port(streamMetadata.getLeader().getPort()) + .publishConfirmListener(publishConfirmListener)); - Client newConsumer = cf.get(new Client.ClientParameters() - .port(newReplicaPort) - .shutdownListener(shutdownListenerReference.get()) - .chunkListener((client1, subscriptionId, offset, messageCount, dataSize) -> client1.credit(subscriptionId, 1)) - .messageListener(messageListener) - ); + AtomicLong generation = new AtomicLong(0); + AtomicLong sequence = new AtomicLong(0); + AtomicBoolean keepPublishing = new AtomicBoolean(true); + CountDownLatch publishingLatch = new CountDownLatch(1); - newConsumer.subscribe((byte) 1, stream, OffsetSpecification.offset(lastProcessedOffset.get() + 1), 10); - - generation.incrementAndGet(); - reconnectionLatch.countDown(); - }); + executorService.submit( + () -> { + while (keepPublishing.get()) { + Message message = + new WrapperMessageBuilder() + .properties() + .messageId(sequence.getAndIncrement()) + .messageBuilder() + .applicationProperties() + .entry("generation", generation.get()) + .messageBuilder() + .build(); + try { + long publishingId = + publisher.publish(stream, (byte) 1, Collections.singletonList(message)).get(0); + published.put(publishingId, message); + } catch (Exception e) { + // keep going } - }; - shutdownListenerReference.set(shutdownListener); + wait(Duration.ofMillis(10)); + } + publishingLatch.countDown(); + }); - Client consumer = cf.get(new Client.ClientParameters() + Queue consumed = new ConcurrentLinkedQueue<>(); + + Client.Broker replica = + streamMetadata.getReplicas().stream() + .filter(broker -> broker.getPort() == TestUtils.streamPortNode2()) + .findFirst() + .orElseThrow(() -> new NoSuchElementException()); + + AtomicLong lastProcessedOffset = new AtomicLong(-1); + Set generations = ConcurrentHashMap.newKeySet(); + Set consumedIds = ConcurrentHashMap.newKeySet(); + Client.MessageListener messageListener = + (subscriptionId, offset, message) -> { + consumed.add(message); + generations.add((Long) message.getApplicationProperties().get("generation")); + consumedIds.add(message.getProperties().getMessageIdAsLong()); + lastProcessedOffset.set(offset); + }; + + CountDownLatch reconnectionLatch = new CountDownLatch(1); + AtomicReference shutdownListenerReference = new AtomicReference<>(); + Client.ShutdownListener shutdownListener = + shutdownContext -> { + if (shutdownContext.getShutdownReason() + == Client.ShutdownContext.ShutdownReason.UNKNOWN) { + // avoid long-running task in the IO thread + executorService.submit( + () -> { + Client.StreamMetadata m = metadataClient.metadata(stream).get(stream); + int newReplicaPort = m.getReplicas().get(0).getPort(); + + Client newConsumer = + cf.get( + new Client.ClientParameters() + .port(newReplicaPort) + .shutdownListener(shutdownListenerReference.get()) + .chunkListener( + (client1, subscriptionId, offset, messageCount, dataSize) -> + client1.credit(subscriptionId, 1)) + .messageListener(messageListener)); + + newConsumer.subscribe( + (byte) 1, + stream, + OffsetSpecification.offset(lastProcessedOffset.get() + 1), + 10); + + generation.incrementAndGet(); + reconnectionLatch.countDown(); + }); + } + }; + shutdownListenerReference.set(shutdownListener); + + Client consumer = + cf.get( + new Client.ClientParameters() .port(replica.getPort()) .shutdownListener(shutdownListener) - .chunkListener((client1, subscriptionId, offset, messageCount, dataSize) -> client1.credit(subscriptionId, 1)) - .messageListener(messageListener) - ); + .chunkListener( + (client1, subscriptionId, offset, messageCount, dataSize) -> + client1.credit(subscriptionId, 1)) + .messageListener(messageListener)); - Client.Response response = consumer.subscribe((byte) 1, stream, OffsetSpecification.first(), 10); - assertThat(response.isOk()).isTrue(); + Client.Response response = + consumer.subscribe((byte) 1, stream, OffsetSpecification.first(), 10); + assertThat(response.isOk()).isTrue(); - // let's publish for a bit of time - Thread.sleep(2000); + // let's publish for a bit of time + Thread.sleep(2000); - assertThat(confirmed).isNotEmpty(); - assertThat(consumed).isNotEmpty(); - int confirmedCount = confirmed.size(); + assertThat(confirmed).isNotEmpty(); + assertThat(consumed).isNotEmpty(); + int confirmedCount = confirmed.size(); - try { - Host.rabbitmqctl("stop_app", Host.node2name()); + try { + Host.rabbitmqctl("stop_app", Host.node2name()); - assertThat(reconnectionLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(reconnectionLatch.await(10, TimeUnit.SECONDS)).isTrue(); - // let's publish for a bit of time - Thread.sleep(2000); + // let's publish for a bit of time + Thread.sleep(2000); - } finally { - Host.rabbitmqctl("start_app", Host.node2name()); - } - assertThat(confirmed).hasSizeGreaterThan(confirmedCount); - confirmedCount = confirmed.size(); + } finally { + Host.rabbitmqctl("start_app", Host.node2name()); + } + assertThat(confirmed).hasSizeGreaterThan(confirmedCount); + confirmedCount = confirmed.size(); - // wait until all the replicas are there - TestUtils.waitAtMost(Duration.ofSeconds(5), () -> { - Client.StreamMetadata m = metadataClient.metadata(stream).get(stream); - return m.getReplicas().size() == 2; + // wait until all the replicas are there + TestUtils.waitAtMost( + Duration.ofSeconds(5), + () -> { + Client.StreamMetadata m = metadataClient.metadata(stream).get(stream); + return m.getReplicas().size() == 2; }); - // let's publish for a bit of time - Thread.sleep(2000); + // let's publish for a bit of time + Thread.sleep(2000); - assertThat(confirmed).hasSizeGreaterThan(confirmedCount); + assertThat(confirmed).hasSizeGreaterThan(confirmedCount); - keepPublishing.set(false); + keepPublishing.set(false); - assertThat(publishingLatch.await(5, TimeUnit.SECONDS)).isTrue(); + assertThat(publishingLatch.await(5, TimeUnit.SECONDS)).isTrue(); - TestUtils.waitAtMost(Duration.ofSeconds(5), () -> consumed.size() >= confirmed.size()); + TestUtils.waitAtMost(Duration.ofSeconds(5), () -> consumed.size() >= confirmed.size()); - assertThat(generations).hasSize(2).contains(0L, 1L); - assertThat(consumed).hasSizeGreaterThanOrEqualTo(confirmed.size()); - long lastMessageId = -1; - for (Message message : consumed) { - long messageId = message.getProperties().getMessageIdAsLong(); - assertThat(messageId).isGreaterThanOrEqualTo(lastMessageId); - lastMessageId = messageId; - } - assertThat(lastMessageId).isPositive().isLessThanOrEqualTo(sequence.get()); - - confirmedIds.forEach(confirmedId -> assertThat(consumedIds).contains(confirmedId)); + assertThat(generations).hasSize(2).contains(0L, 1L); + assertThat(consumed).hasSizeGreaterThanOrEqualTo(confirmed.size()); + long lastMessageId = -1; + for (Message message : consumed) { + long messageId = message.getProperties().getMessageIdAsLong(); + assertThat(messageId).isGreaterThanOrEqualTo(lastMessageId); + lastMessageId = messageId; } + assertThat(lastMessageId).isPositive().isLessThanOrEqualTo(sequence.get()); + confirmedIds.forEach(confirmedId -> assertThat(consumedIds).contains(confirmedId)); + } } diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/Host.java b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/Host.java index 1c89f5d165..0134038a8b 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/Host.java +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/Host.java @@ -25,88 +25,93 @@ import java.net.UnknownHostException; public class Host { - private static String capture(InputStream is) - throws IOException { - BufferedReader br = new BufferedReader(new InputStreamReader(is)); - String line; - StringBuilder buff = new StringBuilder(); - while ((line = br.readLine()) != null) { - buff.append(line).append("\n"); - } - return buff.toString(); + private static String capture(InputStream is) throws IOException { + BufferedReader br = new BufferedReader(new InputStreamReader(is)); + String line; + StringBuilder buff = new StringBuilder(); + while ((line = br.readLine()) != null) { + buff.append(line).append("\n"); } + return buff.toString(); + } - private static Process executeCommand(String command) throws IOException { - Process pr = executeCommandProcess(command); + private static Process executeCommand(String command) throws IOException { + Process pr = executeCommandProcess(command); - int ev = waitForExitValue(pr); - if (ev != 0) { - String stdout = capture(pr.getInputStream()); - String stderr = capture(pr.getErrorStream()); - throw new IOException("unexpected command exit value: " + ev + - "\ncommand: " + command + "\n" + - "\nstdout:\n" + stdout + - "\nstderr:\n" + stderr + "\n"); - } - return pr; + int ev = waitForExitValue(pr); + if (ev != 0) { + String stdout = capture(pr.getInputStream()); + String stderr = capture(pr.getErrorStream()); + throw new IOException( + "unexpected command exit value: " + + ev + + "\ncommand: " + + command + + "\n" + + "\nstdout:\n" + + stdout + + "\nstderr:\n" + + stderr + + "\n"); } + return pr; + } - private static int waitForExitValue(Process pr) { - while (true) { - try { - pr.waitFor(); - break; - } catch (InterruptedException ignored) { - } - } - return pr.exitValue(); + private static int waitForExitValue(Process pr) { + while (true) { + try { + pr.waitFor(); + break; + } catch (InterruptedException ignored) { + } } + return pr.exitValue(); + } - private static Process executeCommandProcess(String command) throws IOException { - String[] finalCommand; - if (System.getProperty("os.name").toLowerCase().contains("windows")) { - finalCommand = new String[4]; - finalCommand[0] = "C:\\winnt\\system32\\cmd.exe"; - finalCommand[1] = "/y"; - finalCommand[2] = "/c"; - finalCommand[3] = command; - } else { - finalCommand = new String[3]; - finalCommand[0] = "/bin/sh"; - finalCommand[1] = "-c"; - finalCommand[2] = command; - } - return Runtime.getRuntime().exec(finalCommand); + private static Process executeCommandProcess(String command) throws IOException { + String[] finalCommand; + if (System.getProperty("os.name").toLowerCase().contains("windows")) { + finalCommand = new String[4]; + finalCommand[0] = "C:\\winnt\\system32\\cmd.exe"; + finalCommand[1] = "/y"; + finalCommand[2] = "/c"; + finalCommand[3] = command; + } else { + finalCommand = new String[3]; + finalCommand[0] = "/bin/sh"; + finalCommand[1] = "-c"; + finalCommand[2] = command; } + return Runtime.getRuntime().exec(finalCommand); + } - public static Process rabbitmqctl(String command) throws IOException { - return rabbitmqctl(command, node1name()); + public static Process rabbitmqctl(String command) throws IOException { + return rabbitmqctl(command, node1name()); + } + + public static Process rabbitmqctl(String command, String nodename) throws IOException { + return executeCommand(rabbitmqctlCommand() + " -n '" + nodename + "'" + " " + command); + } + + public static String node1name() { + try { + return System.getProperty( + "node1.name", "rabbit-1@" + InetAddress.getLocalHost().getHostName()); + } catch (UnknownHostException e) { + throw new RuntimeException(e); } + } - public static Process rabbitmqctl(String command, String nodename) throws IOException { - return executeCommand(rabbitmqctlCommand() + - " -n '" + nodename + "'" + - " " + command); - } - - public static String node1name() { - try { - return System.getProperty("node1.name", "rabbit-1@" + InetAddress.getLocalHost().getHostName()); - } catch (UnknownHostException e) { - throw new RuntimeException(e); - } - } - - public static String node2name() { - try { - return System.getProperty("node2.name", "rabbit-2@" + InetAddress.getLocalHost().getHostName()); - } catch (UnknownHostException e) { - throw new RuntimeException(e); - } - } - - static String rabbitmqctlCommand() { - return System.getProperty("rabbitmqctl.bin"); + public static String node2name() { + try { + return System.getProperty( + "node2.name", "rabbit-2@" + InetAddress.getLocalHost().getHostName()); + } catch (UnknownHostException e) { + throw new RuntimeException(e); } + } + static String rabbitmqctlCommand() { + return System.getProperty("rabbitmqctl.bin"); + } } diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/StreamTest.java b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/StreamTest.java index ac5ba238e7..08024a12bf 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/StreamTest.java +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/StreamTest.java @@ -16,13 +16,9 @@ package com.rabbitmq.stream; -import com.rabbitmq.stream.impl.Client; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.Arguments; -import org.junit.jupiter.params.provider.MethodSource; +import static org.assertj.core.api.Assertions.assertThat; +import com.rabbitmq.stream.impl.Client; import java.nio.charset.StandardCharsets; import java.util.*; import java.util.concurrent.ConcurrentHashMap; @@ -32,106 +28,146 @@ import java.util.function.BiConsumer; import java.util.function.Function; import java.util.stream.IntStream; import java.util.stream.Stream; - -import static org.assertj.core.api.Assertions.assertThat; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; @ExtendWith(TestUtils.StreamTestInfrastructureExtension.class) public class StreamTest { - String stream; - TestUtils.ClientFactory cf; + String stream; + TestUtils.ClientFactory cf; - static Stream shouldBePossibleToPublishFromAnyNodeAndConsumeFromAnyMember() { - return Stream.of( - brokers("leader", metadata -> metadata.getLeader(), "leader", metadata -> metadata.getLeader()), - brokers("leader", metadata -> metadata.getLeader(), "replica", metadata -> metadata.getReplicas().iterator().next()), - brokers("replica", metadata -> metadata.getReplicas().iterator().next(), "leader", metadata -> metadata.getLeader()), - brokers("replica", metadata -> new ArrayList<>(metadata.getReplicas()).get(0), "replica", metadata -> new ArrayList<>(metadata.getReplicas()).get(1)) - ); - } + static Stream shouldBePossibleToPublishFromAnyNodeAndConsumeFromAnyMember() { + return Stream.of( + brokers( + "leader", metadata -> metadata.getLeader(), "leader", metadata -> metadata.getLeader()), + brokers( + "leader", + metadata -> metadata.getLeader(), + "replica", + metadata -> metadata.getReplicas().iterator().next()), + brokers( + "replica", + metadata -> metadata.getReplicas().iterator().next(), + "leader", + metadata -> metadata.getLeader()), + brokers( + "replica", + metadata -> new ArrayList<>(metadata.getReplicas()).get(0), + "replica", + metadata -> new ArrayList<>(metadata.getReplicas()).get(1))); + } - static Arguments brokers(String dp, Function publisherBroker, - String dc, Function consumerBroker) { - return Arguments.of(new FunctionWithToString<>(dp, publisherBroker), new FunctionWithToString<>(dc, consumerBroker)); - } + static Arguments brokers( + String dp, + Function publisherBroker, + String dc, + Function consumerBroker) { + return Arguments.of( + new FunctionWithToString<>(dp, publisherBroker), + new FunctionWithToString<>(dc, consumerBroker)); + } - @ParameterizedTest - @MethodSource - void shouldBePossibleToPublishFromAnyNodeAndConsumeFromAnyMember(Function publisherBroker, - Function consumerBroker) throws Exception { + @ParameterizedTest + @MethodSource + void shouldBePossibleToPublishFromAnyNodeAndConsumeFromAnyMember( + Function publisherBroker, + Function consumerBroker) + throws Exception { - int messageCount = 10_000; - Client client = cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode1())); - Map metadata = client.metadata(stream); - assertThat(metadata).hasSize(1).containsKey(stream); - Client.StreamMetadata streamMetadata = metadata.get(stream); + int messageCount = 10_000; + Client client = cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode1())); + Map metadata = client.metadata(stream); + assertThat(metadata).hasSize(1).containsKey(stream); + Client.StreamMetadata streamMetadata = metadata.get(stream); - CountDownLatch publishingLatch = new CountDownLatch(messageCount); - Client publisher = cf.get(new Client.ClientParameters() + CountDownLatch publishingLatch = new CountDownLatch(messageCount); + Client publisher = + cf.get( + new Client.ClientParameters() .port(publisherBroker.apply(streamMetadata).getPort()) - .publishConfirmListener((publisherId, publishingId) -> publishingLatch.countDown())); + .publishConfirmListener( + (publisherId, publishingId) -> publishingLatch.countDown())); - IntStream.range(0, messageCount).forEach(i -> publisher.publish(stream, (byte) 1, Collections.singletonList( - publisher.messageBuilder().addData(("hello " + i).getBytes(StandardCharsets.UTF_8)).build()))); + IntStream.range(0, messageCount) + .forEach( + i -> + publisher.publish( + stream, + (byte) 1, + Collections.singletonList( + publisher + .messageBuilder() + .addData(("hello " + i).getBytes(StandardCharsets.UTF_8)) + .build()))); - assertThat(publishingLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(publishingLatch.await(10, TimeUnit.SECONDS)).isTrue(); - CountDownLatch consumingLatch = new CountDownLatch(messageCount); - Set bodies = ConcurrentHashMap.newKeySet(messageCount); - Client consumer = cf.get(new Client.ClientParameters() + CountDownLatch consumingLatch = new CountDownLatch(messageCount); + Set bodies = ConcurrentHashMap.newKeySet(messageCount); + Client consumer = + cf.get( + new Client.ClientParameters() .port(consumerBroker.apply(streamMetadata).getPort()) - .chunkListener((client1, subscriptionId, offset, messageCount1, dataSize) -> client1.credit(subscriptionId, 10)) - .messageListener((subscriptionId, offset, message) -> { - bodies.add(new String(message.getBodyAsBinary(), StandardCharsets.UTF_8)); - consumingLatch.countDown(); - }) - ); + .chunkListener( + (client1, subscriptionId, offset, messageCount1, dataSize) -> + client1.credit(subscriptionId, 10)) + .messageListener( + (subscriptionId, offset, message) -> { + bodies.add(new String(message.getBodyAsBinary(), StandardCharsets.UTF_8)); + consumingLatch.countDown(); + })); - consumer.subscribe((byte) 1, stream, OffsetSpecification.first(), 10); + consumer.subscribe((byte) 1, stream, OffsetSpecification.first(), 10); - assertThat(consumingLatch.await(10, TimeUnit.SECONDS)).isTrue(); - assertThat(bodies).hasSize(messageCount); - IntStream.range(0, messageCount).forEach(i -> assertThat(bodies.contains("hello " + i))); - } + assertThat(consumingLatch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(bodies).hasSize(messageCount); + IntStream.range(0, messageCount).forEach(i -> assertThat(bodies.contains("hello " + i))); + } - @Test - void metadataOnClusterShouldReturnLeaderAndReplicas() { - Client client = cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode1())); - Map metadata = client.metadata(stream); - assertThat(metadata).hasSize(1).containsKey(stream); - Client.StreamMetadata streamMetadata = metadata.get(stream); - assertThat(streamMetadata.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_OK); - assertThat(streamMetadata.getReplicas()).hasSize(2); + @Test + void metadataOnClusterShouldReturnLeaderAndReplicas() { + Client client = cf.get(new Client.ClientParameters().port(TestUtils.streamPortNode1())); + Map metadata = client.metadata(stream); + assertThat(metadata).hasSize(1).containsKey(stream); + Client.StreamMetadata streamMetadata = metadata.get(stream); + assertThat(streamMetadata.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_OK); + assertThat(streamMetadata.getReplicas()).hasSize(2); - BiConsumer assertNodesAreDifferent = (node, anotherNode) -> { - assertThat(node.getHost()).isEqualTo(anotherNode.getHost()); - assertThat(node.getPort()).isNotEqualTo(anotherNode.getPort()); + BiConsumer assertNodesAreDifferent = + (node, anotherNode) -> { + assertThat(node.getHost()).isEqualTo(anotherNode.getHost()); + assertThat(node.getPort()).isNotEqualTo(anotherNode.getPort()); }; - streamMetadata.getReplicas().forEach(replica -> assertNodesAreDifferent.accept(replica, streamMetadata.getLeader())); - List replicas = new ArrayList<>(streamMetadata.getReplicas()); - assertNodesAreDifferent.accept(replicas.get(0), replicas.get(1)); + streamMetadata + .getReplicas() + .forEach(replica -> assertNodesAreDifferent.accept(replica, streamMetadata.getLeader())); + List replicas = new ArrayList<>(streamMetadata.getReplicas()); + assertNodesAreDifferent.accept(replicas.get(0), replicas.get(1)); + } + + static class FunctionWithToString implements Function { + + final String toString; + final Function delegate; + + FunctionWithToString(String toString, Function delegate) { + this.toString = toString; + this.delegate = delegate; } - static class FunctionWithToString implements Function { - - final String toString; - final Function delegate; - - FunctionWithToString(String toString, Function delegate) { - this.toString = toString; - this.delegate = delegate; - } - - @Override - public R apply(T t) { - return delegate.apply(t); - } - - @Override - public String toString() { - return toString; - } + @Override + public R apply(T t) { + return delegate.apply(t); } + @Override + public String toString() { + return toString; + } + } } diff --git a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/TestUtils.java b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/TestUtils.java index da3eb28f02..c49a8d5832 100644 --- a/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/TestUtils.java +++ b/deps/rabbitmq_stream/test/rabbit_stream_SUITE_data/src/test/java/com/rabbitmq/stream/TestUtils.java @@ -16,162 +16,164 @@ package com.rabbitmq.stream; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.fail; + import com.rabbitmq.stream.impl.Client; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; -import org.junit.jupiter.api.extension.*; - import java.lang.reflect.Field; import java.time.Duration; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.function.BooleanSupplier; - -import static java.util.concurrent.TimeUnit.SECONDS; -import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.jupiter.api.Assertions.fail; +import org.junit.jupiter.api.extension.*; public class TestUtils { - static int streamPortNode1() { - String port = System.getProperty("node1.stream.port", "5555"); - return Integer.valueOf(port); + static int streamPortNode1() { + String port = System.getProperty("node1.stream.port", "5555"); + return Integer.valueOf(port); + } + + static int streamPortNode2() { + String port = System.getProperty("node2.stream.port", "5556"); + return Integer.valueOf(port); + } + + static void waitAtMost(Duration duration, BooleanSupplier condition) throws InterruptedException { + if (condition.getAsBoolean()) { + return; + } + int waitTime = 100; + int waitedTime = 0; + long timeoutInMs = duration.toMillis(); + while (waitedTime <= timeoutInMs) { + Thread.sleep(waitTime); + if (condition.getAsBoolean()) { + return; + } + waitedTime += waitTime; + } + fail("Waited " + duration.getSeconds() + " second(s), condition never got true"); + } + + static class StreamTestInfrastructureExtension + implements BeforeAllCallback, AfterAllCallback, BeforeEachCallback, AfterEachCallback { + + private static final ExtensionContext.Namespace NAMESPACE = + ExtensionContext.Namespace.create(StreamTestInfrastructureExtension.class); + + private static ExtensionContext.Store store(ExtensionContext extensionContext) { + return extensionContext.getRoot().getStore(NAMESPACE); } - static int streamPortNode2() { - String port = System.getProperty("node2.stream.port", "5556"); - return Integer.valueOf(port); + private static EventLoopGroup eventLoopGroup(ExtensionContext context) { + return (EventLoopGroup) store(context).get("nettyEventLoopGroup"); } - static void waitAtMost(Duration duration, BooleanSupplier condition) throws InterruptedException { - if (condition.getAsBoolean()) { - return; - } - int waitTime = 100; - int waitedTime = 0; - long timeoutInMs = duration.toMillis(); - while (waitedTime <= timeoutInMs) { - Thread.sleep(waitTime); - if (condition.getAsBoolean()) { - return; - } - waitedTime += waitTime; - } - fail("Waited " + duration.getSeconds() + " second(s), condition never got true"); + @Override + public void beforeAll(ExtensionContext context) { + store(context).put("nettyEventLoopGroup", new NioEventLoopGroup()); } - static class StreamTestInfrastructureExtension implements BeforeAllCallback, AfterAllCallback, BeforeEachCallback, AfterEachCallback { + @Override + public void beforeEach(ExtensionContext context) throws Exception { + try { + Field streamField = + context.getTestInstance().get().getClass().getDeclaredField("eventLoopGroup"); + streamField.setAccessible(true); + streamField.set(context.getTestInstance().get(), eventLoopGroup(context)); + } catch (NoSuchFieldException e) { - private static final ExtensionContext.Namespace NAMESPACE = ExtensionContext.Namespace.create(StreamTestInfrastructureExtension.class); + } + try { + Field streamField = context.getTestInstance().get().getClass().getDeclaredField("stream"); + streamField.setAccessible(true); + String stream = UUID.randomUUID().toString(); + streamField.set(context.getTestInstance().get(), stream); + Client client = + new Client( + new Client.ClientParameters() + .eventLoopGroup(eventLoopGroup(context)) + .port(streamPortNode1())); + Client.Response response = client.create(stream); + assertThat(response.isOk()).isTrue(); + client.close(); + store(context).put("testMethodStream", stream); + } catch (NoSuchFieldException e) { - private static ExtensionContext.Store store(ExtensionContext extensionContext) { - return extensionContext.getRoot().getStore(NAMESPACE); + } + + for (Field declaredField : context.getTestInstance().get().getClass().getDeclaredFields()) { + if (declaredField.getType().equals(ClientFactory.class)) { + declaredField.setAccessible(true); + ClientFactory clientFactory = new ClientFactory(eventLoopGroup(context)); + declaredField.set(context.getTestInstance().get(), clientFactory); + store(context).put("testClientFactory", clientFactory); + break; } - - private static EventLoopGroup eventLoopGroup(ExtensionContext context) { - return (EventLoopGroup) store(context).get("nettyEventLoopGroup"); - } - - @Override - public void beforeAll(ExtensionContext context) { - store(context).put("nettyEventLoopGroup", new NioEventLoopGroup()); - } - - @Override - public void beforeEach(ExtensionContext context) throws Exception { - try { - Field streamField = context.getTestInstance().get().getClass().getDeclaredField("eventLoopGroup"); - streamField.setAccessible(true); - streamField.set(context.getTestInstance().get(), eventLoopGroup(context)); - } catch (NoSuchFieldException e) { - - } - try { - Field streamField = context.getTestInstance().get().getClass().getDeclaredField("stream"); - streamField.setAccessible(true); - String stream = UUID.randomUUID().toString(); - streamField.set(context.getTestInstance().get(), stream); - Client client = new Client(new Client.ClientParameters().eventLoopGroup(eventLoopGroup(context)) - .port(streamPortNode1()) - ); - Client.Response response = client.create(stream); - assertThat(response.isOk()).isTrue(); - client.close(); - store(context).put("testMethodStream", stream); - } catch (NoSuchFieldException e) { - - } - - for (Field declaredField : context.getTestInstance().get().getClass().getDeclaredFields()) { - if (declaredField.getType().equals(ClientFactory.class)) { - declaredField.setAccessible(true); - ClientFactory clientFactory = new ClientFactory(eventLoopGroup(context)); - declaredField.set(context.getTestInstance().get(), clientFactory); - store(context).put("testClientFactory", clientFactory); - break; - } - } - - } - - @Override - public void afterEach(ExtensionContext context) throws Exception { - try { - Field streamField = context.getTestInstance().get().getClass().getDeclaredField("stream"); - streamField.setAccessible(true); - String stream = (String) streamField.get(context.getTestInstance().get()); - Client client = new Client(new Client.ClientParameters().eventLoopGroup(eventLoopGroup(context)) - .port(streamPortNode1()) - ); - Client.Response response = client.delete(stream); - assertThat(response.isOk()).isTrue(); - client.close(); - store(context).remove("testMethodStream"); - } catch (NoSuchFieldException e) { - - } - - ClientFactory clientFactory = (ClientFactory) store(context).get("testClientFactory"); - if (clientFactory != null) { - clientFactory.close(); - } - } - - @Override - public void afterAll(ExtensionContext context) throws Exception { - EventLoopGroup eventLoopGroup = eventLoopGroup(context); - eventLoopGroup.shutdownGracefully(1, 10, SECONDS).get(10, SECONDS); - } - + } } - static class ClientFactory { + @Override + public void afterEach(ExtensionContext context) throws Exception { + try { + Field streamField = context.getTestInstance().get().getClass().getDeclaredField("stream"); + streamField.setAccessible(true); + String stream = (String) streamField.get(context.getTestInstance().get()); + Client client = + new Client( + new Client.ClientParameters() + .eventLoopGroup(eventLoopGroup(context)) + .port(streamPortNode1())); + Client.Response response = client.delete(stream); + assertThat(response.isOk()).isTrue(); + client.close(); + store(context).remove("testMethodStream"); + } catch (NoSuchFieldException e) { - private final EventLoopGroup eventLoopGroup; - private final Set clients = ConcurrentHashMap.newKeySet(); + } - - public ClientFactory(EventLoopGroup eventLoopGroup) { - this.eventLoopGroup = eventLoopGroup; - } - - public Client get() { - return get(new Client.ClientParameters()); - } - - public Client get(Client.ClientParameters parameters) { - // don't set the port, it would override the caller's port setting - Client client = new Client(parameters.eventLoopGroup(eventLoopGroup)); - clients.add(client); - return client; - } - - private void close() { - for (Client c : clients) { - c.close(); - } - } + ClientFactory clientFactory = (ClientFactory) store(context).get("testClientFactory"); + if (clientFactory != null) { + clientFactory.close(); + } } + @Override + public void afterAll(ExtensionContext context) throws Exception { + EventLoopGroup eventLoopGroup = eventLoopGroup(context); + eventLoopGroup.shutdownGracefully(1, 10, SECONDS).get(10, SECONDS); + } + } + + static class ClientFactory { + + private final EventLoopGroup eventLoopGroup; + private final Set clients = ConcurrentHashMap.newKeySet(); + + public ClientFactory(EventLoopGroup eventLoopGroup) { + this.eventLoopGroup = eventLoopGroup; + } + + public Client get() { + return get(new Client.ClientParameters()); + } + + public Client get(Client.ClientParameters parameters) { + // don't set the port, it would override the caller's port setting + Client client = new Client(parameters.eventLoopGroup(eventLoopGroup)); + clients.add(client); + return client; + } + + private void close() { + for (Client c : clients) { + c.close(); + } + } + } }