Streams: adapt tests to the latest Java stream client listener interface

This commit is contained in:
Michael Klishin 2022-08-10 11:43:44 +04:00
parent d666555b40
commit 68969faf94
No known key found for this signature in database
GPG Key ID: 8ADA141E1AD87C94
2 changed files with 4 additions and 4 deletions

View File

@ -164,7 +164,7 @@ public class FailureTest {
new Client.ClientParameters()
.port(TestUtils.streamPortNode1())
.messageListener(
(subscriptionId, offset, chunkTimestamp, msg) -> {
(subscriptionId, offset, chunkTimestamp, committedOffset, message) -> {
bodies.add(new String(msg.getBodyAsBinary(), StandardCharsets.UTF_8));
consumeLatch.countDown();
}));
@ -345,7 +345,7 @@ public class FailureTest {
(client1, subscriptionId, offset, messageCount, dataSize) ->
client1.credit(subscriptionId, 1))
.messageListener(
(subscriptionId, offset, chunkTimestamp, message) -> {
(subscriptionId, offset, chunkTimestamp, committedOffset, message) -> {
consumed.add(message);
generations.add((Long) message.getApplicationProperties().get("generation"));
if (consumed.size() == confirmed.size()) {
@ -447,7 +447,7 @@ public class FailureTest {
Set<Long> generations = ConcurrentHashMap.newKeySet();
Set<Long> consumedIds = ConcurrentHashMap.newKeySet();
Client.MessageListener messageListener =
(subscriptionId, offset, chunkTimestamp, message) -> {
(subscriptionId, offset, chunkTimestamp, committedOffset, message) -> {
consumed.add(message);
generations.add((Long) message.getApplicationProperties().get("generation"));
consumedIds.add(message.getProperties().getMessageIdAsLong());

View File

@ -119,7 +119,7 @@ public class StreamTest {
(client1, subscriptionId, offset, messageCount1, dataSize) ->
client1.credit(subscriptionId, 10))
.messageListener(
(subscriptionId, offset, chunkTimestamp, message) -> {
(subscriptionId, offset, chunkTimestamp, committedOffset, message) -> {
bodies.add(new String(message.getBodyAsBinary(), StandardCharsets.UTF_8));
consumingLatch.countDown();
}));