Adapt stream Java tests to client 0.12.0 snapshot

After changes for
https://github.com/rabbitmq/rabbitmq-stream-java-client/issues/333.
This commit is contained in:
Arnaud Cogoluègnes 2023-07-17 11:04:25 +02:00
parent 8b6d109968
commit c594c77049
No known key found for this signature in database
GPG Key ID: D5C8C4DFAD43AFA8
7 changed files with 61 additions and 29 deletions

View File

@ -26,7 +26,7 @@
</developers>
<properties>
<stream-client.version>0.11.0</stream-client.version>
<stream-client.version>[0.12.0-SNAPSHOT,)</stream-client.version>
<junit.jupiter.version>5.9.3</junit.jupiter.version>
<assertj.version>3.24.2</assertj.version>
<logback.version>1.2.12</logback.version>
@ -115,4 +115,15 @@
</build>
<repositories>
<repository>
<id>ossrh</id>
<url>https://oss.sonatype.org/content/repositories/snapshots</url>
<snapshots><enabled>true</enabled></snapshots>
<releases><enabled>false</enabled></releases>
</repository>
</repositories>
</project>

View File

@ -16,9 +16,8 @@
package com.rabbitmq.stream;
import static com.rabbitmq.stream.TestUtils.*;
import static com.rabbitmq.stream.TestUtils.ResponseConditions.ok;
import static com.rabbitmq.stream.TestUtils.waitAtMost;
import static com.rabbitmq.stream.TestUtils.waitUntil;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
@ -164,7 +163,7 @@ public class FailureTest {
new Client.ClientParameters()
.port(TestUtils.streamPortNode1())
.messageListener(
(subscriptionId, offset, chunkTimestamp, committedChunkId, msg) -> {
(subscriptionId, offset, chunkTimestamp, committedChunkId, context, msg) -> {
bodies.add(new String(msg.getBodyAsBinary(), StandardCharsets.UTF_8));
consumeLatch.countDown();
}));
@ -341,11 +340,14 @@ public class FailureTest {
cf.get(
new Client.ClientParameters()
.port(m.getReplicas().get(0).getPort())
.chunkListener(
(client1, subscriptionId, offset, messageCount, dataSize) ->
client1.credit(subscriptionId, 1))
.chunkListener(credit())
.messageListener(
(subscriptionId, offset, chunkTimestamp, committedChunkId, message) -> {
(subscriptionId,
offset,
chunkTimestamp,
committedChunkId,
context,
message) -> {
consumed.add(message);
generations.add((Long) message.getApplicationProperties().get("generation"));
if (consumed.size() == confirmed.size()) {
@ -447,7 +449,7 @@ public class FailureTest {
Set<Long> generations = ConcurrentHashMap.newKeySet();
Set<Long> consumedIds = ConcurrentHashMap.newKeySet();
Client.MessageListener messageListener =
(subscriptionId, offset, chunkTimestamp, committedChunkId, message) -> {
(subscriptionId, offset, chunkTimestamp, committedChunkId, context, message) -> {
consumed.add(message);
generations.add((Long) message.getApplicationProperties().get("generation"));
consumedIds.add(message.getProperties().getMessageIdAsLong());
@ -471,9 +473,7 @@ public class FailureTest {
new Client.ClientParameters()
.port(newReplicaPort)
.shutdownListener(shutdownListenerReference.get())
.chunkListener(
(client1, subscriptionId, offset, messageCount, dataSize) ->
client1.credit(subscriptionId, 1))
.chunkListener(credit())
.messageListener(messageListener));
newConsumer.subscribe(
@ -494,9 +494,7 @@ public class FailureTest {
new Client.ClientParameters()
.port(replica.getPort())
.shutdownListener(shutdownListener)
.chunkListener(
(client1, subscriptionId, offset, messageCount, dataSize) ->
client1.credit(subscriptionId, 1))
.chunkListener(credit())
.messageListener(messageListener));
Client.Response response =

View File

@ -16,6 +16,7 @@
package com.rabbitmq.stream;
import static com.rabbitmq.stream.TestUtils.credit;
import static org.assertj.core.api.Assertions.assertThat;
import com.rabbitmq.stream.impl.Client;
@ -115,11 +116,14 @@ public class StreamTest {
cf.get(
new Client.ClientParameters()
.port(consumerBroker.apply(streamMetadata).getPort())
.chunkListener(
(client1, subscriptionId, offset, messageCount1, dataSize) ->
client1.credit(subscriptionId, 10))
.chunkListener(credit())
.messageListener(
(subscriptionId, offset, chunkTimestamp, committedChunkId, message) -> {
(subscriptionId,
offset,
chunkTimestamp,
committedChunkId,
context,
message) -> {
bodies.add(new String(message.getBodyAsBinary(), StandardCharsets.UTF_8));
consumingLatch.countDown();
}));
@ -128,7 +132,8 @@ public class StreamTest {
assertThat(consumingLatch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(bodies).hasSize(messageCount);
IntStream.range(0, messageCount).forEach(i -> assertThat(bodies.contains("hello " + i)));
IntStream.range(0, messageCount)
.forEach(i -> assertThat(bodies.contains("hello " + i)).isTrue());
}
@Test

View File

@ -218,4 +218,11 @@ public class TestUtils {
expectedResponse);
}
}
static Client.ChunkListener credit() {
return (client, subscriptionId, offset, messageCount, dataSize) -> {
client.credit(subscriptionId, 1);
return null;
};
}
}

View File

@ -26,7 +26,7 @@
</developers>
<properties>
<stream-client.version>0.11.0</stream-client.version>
<stream-client.version>[0.12.0-SNAPSHOT,)</stream-client.version>
<junit.jupiter.version>5.9.3</junit.jupiter.version>
<assertj.version>3.24.2</assertj.version>
<logback.version>1.2.12</logback.version>
@ -131,4 +131,15 @@
</build>
<repositories>
<repository>
<id>ossrh</id>
<url>https://oss.sonatype.org/content/repositories/snapshots</url>
<snapshots><enabled>true</enabled></snapshots>
<releases><enabled>false</enabled></releases>
</repository>
</repositories>
</project>

View File

@ -140,11 +140,6 @@ public class HttpTest {
.collect(Collectors.toList());
}
static List<Map<String, Object>> entities(
List<Map<String, Object>> entities, Predicate<Map<String, Object>> filter) {
return entities.stream().filter(filter).collect(Collectors.toList());
}
static Map<String, Object> entity(
List<Map<String, Object>> entities, Predicate<Map<String, Object>> filter) {
return entities.stream().filter(filter).findFirst().orElse(Collections.emptyMap());
@ -558,9 +553,7 @@ public class HttpTest {
cf.get(
new ClientParameters()
.clientProperty("connection_name", connectionProvidedName)
.chunkListener(
(client1, subscriptionId, offset, messageCount, dataSize) ->
client1.credit(subscriptionId, 1))
.chunkListener(TestUtils.credit())
.shutdownListener(shutdownContext -> closed.set(true)));
client.subscribe((byte) 0, stream, OffsetSpecification.first(), 10, subscriptionProperties);

View File

@ -257,4 +257,11 @@ public class TestUtils {
static Condition<Object> isNull() {
return new Condition<>(Objects::isNull, "null");
}
static Client.ChunkListener credit() {
return (client, subscriptionId, offset, messageCount, dataSize) -> {
client.credit(subscriptionId, 1);
return null;
};
}
}