mirror of https://github.com/apache/kafka.git
MINOR: Fix StreamsRebalanceListenerInvoker (#20575)
StreamsRebalanceListenerInvoker was implemented to match the behavior of ConsumerRebalanceListenerInvoker, however StreamsRebalanceListener has a subtly different interface than ConsumerRebalanceListener - it does not throw exceptions, but returns it as an Optional. In the interest of consistency, this change fixes this mismatch by changing the StreamsRebalanceListener interface to behave more like the ConsumerRebalanceListener - throwing exceptions directly. In another minor fix, the StreamsRebalanceListenerInvoker is changed to simply skip callback execution instead of throwing an IllegalStateException when no streamRebalanceListener is defined. This can happen when the consumer is closed before Consumer.subscribe is called. Reviewers: Lianet Magrans <lmagrans@confluent.io>, Matthias J. Sax <matthias@confluent.io>
This commit is contained in:
parent
0a483618b9
commit
1f7631c8c6
|
@ -16,7 +16,6 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.kafka.clients.consumer.internals;
|
package org.apache.kafka.clients.consumer.internals;
|
||||||
|
|
||||||
import java.util.Optional;
|
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -28,22 +27,18 @@ public interface StreamsRebalanceListener {
|
||||||
* Called when tasks are revoked from a stream thread.
|
* Called when tasks are revoked from a stream thread.
|
||||||
*
|
*
|
||||||
* @param tasks The tasks to be revoked.
|
* @param tasks The tasks to be revoked.
|
||||||
* @return The exception thrown during the callback, if any.
|
|
||||||
*/
|
*/
|
||||||
Optional<Exception> onTasksRevoked(final Set<StreamsRebalanceData.TaskId> tasks);
|
void onTasksRevoked(final Set<StreamsRebalanceData.TaskId> tasks);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Called when tasks are assigned from a stream thread.
|
* Called when tasks are assigned from a stream thread.
|
||||||
*
|
*
|
||||||
* @param assignment The tasks assigned.
|
* @param assignment The tasks assigned.
|
||||||
* @return The exception thrown during the callback, if any.
|
|
||||||
*/
|
*/
|
||||||
Optional<Exception> onTasksAssigned(final StreamsRebalanceData.Assignment assignment);
|
void onTasksAssigned(final StreamsRebalanceData.Assignment assignment);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Called when a stream thread loses all assigned tasks.
|
* Called when a stream thread loses all assigned tasks
|
||||||
*
|
|
||||||
* @return The exception thrown during the callback, if any.
|
|
||||||
*/
|
*/
|
||||||
Optional<Exception> onAllTasksLost();
|
void onAllTasksLost();
|
||||||
}
|
}
|
||||||
|
|
|
@ -51,14 +51,14 @@ public class StreamsRebalanceListenerInvoker {
|
||||||
|
|
||||||
public Exception invokeAllTasksRevoked() {
|
public Exception invokeAllTasksRevoked() {
|
||||||
if (listener.isEmpty()) {
|
if (listener.isEmpty()) {
|
||||||
throw new IllegalStateException("StreamsRebalanceListener is not defined");
|
return null;
|
||||||
}
|
}
|
||||||
return invokeTasksRevoked(streamsRebalanceData.reconciledAssignment().activeTasks());
|
return invokeTasksRevoked(streamsRebalanceData.reconciledAssignment().activeTasks());
|
||||||
}
|
}
|
||||||
|
|
||||||
public Exception invokeTasksAssigned(final StreamsRebalanceData.Assignment assignment) {
|
public Exception invokeTasksAssigned(final StreamsRebalanceData.Assignment assignment) {
|
||||||
if (listener.isEmpty()) {
|
if (listener.isEmpty()) {
|
||||||
throw new IllegalStateException("StreamsRebalanceListener is not defined");
|
return null;
|
||||||
}
|
}
|
||||||
log.info("Invoking tasks assigned callback for new assignment: {}", assignment);
|
log.info("Invoking tasks assigned callback for new assignment: {}", assignment);
|
||||||
try {
|
try {
|
||||||
|
@ -78,7 +78,7 @@ public class StreamsRebalanceListenerInvoker {
|
||||||
|
|
||||||
public Exception invokeTasksRevoked(final Set<StreamsRebalanceData.TaskId> tasks) {
|
public Exception invokeTasksRevoked(final Set<StreamsRebalanceData.TaskId> tasks) {
|
||||||
if (listener.isEmpty()) {
|
if (listener.isEmpty()) {
|
||||||
throw new IllegalStateException("StreamsRebalanceListener is not defined");
|
return null;
|
||||||
}
|
}
|
||||||
log.info("Invoking task revoked callback for revoked active tasks {}", tasks);
|
log.info("Invoking task revoked callback for revoked active tasks {}", tasks);
|
||||||
try {
|
try {
|
||||||
|
@ -98,7 +98,7 @@ public class StreamsRebalanceListenerInvoker {
|
||||||
|
|
||||||
public Exception invokeAllTasksLost() {
|
public Exception invokeAllTasksLost() {
|
||||||
if (listener.isEmpty()) {
|
if (listener.isEmpty()) {
|
||||||
throw new IllegalStateException("StreamsRebalanceListener is not defined");
|
return null;
|
||||||
}
|
}
|
||||||
log.info("Invoking tasks lost callback for all tasks");
|
log.info("Invoking tasks lost callback for all tasks");
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -2218,7 +2218,6 @@ public class AsyncKafkaConsumerTest {
|
||||||
try (final MockedStatic<RequestManagers> requestManagers = mockStatic(RequestManagers.class)) {
|
try (final MockedStatic<RequestManagers> requestManagers = mockStatic(RequestManagers.class)) {
|
||||||
consumer = newConsumerWithStreamRebalanceData(requiredConsumerConfigAndGroupId(groupId), streamsRebalanceData);
|
consumer = newConsumerWithStreamRebalanceData(requiredConsumerConfigAndGroupId(groupId), streamsRebalanceData);
|
||||||
StreamsRebalanceListener mockStreamsListener = mock(StreamsRebalanceListener.class);
|
StreamsRebalanceListener mockStreamsListener = mock(StreamsRebalanceListener.class);
|
||||||
when(mockStreamsListener.onTasksRevoked(any())).thenReturn(Optional.empty());
|
|
||||||
consumer.subscribe(singletonList("topic"), mockStreamsListener);
|
consumer.subscribe(singletonList("topic"), mockStreamsListener);
|
||||||
final MemberStateListener groupMetadataUpdateListener = captureGroupMetadataUpdateListener(requestManagers);
|
final MemberStateListener groupMetadataUpdateListener = captureGroupMetadataUpdateListener(requestManagers);
|
||||||
final int memberEpoch = 42;
|
final int memberEpoch = 42;
|
||||||
|
@ -2239,7 +2238,6 @@ public class AsyncKafkaConsumerTest {
|
||||||
try (final MockedStatic<RequestManagers> requestManagers = mockStatic(RequestManagers.class)) {
|
try (final MockedStatic<RequestManagers> requestManagers = mockStatic(RequestManagers.class)) {
|
||||||
consumer = newConsumerWithStreamRebalanceData(requiredConsumerConfigAndGroupId(groupId), streamsRebalanceData);
|
consumer = newConsumerWithStreamRebalanceData(requiredConsumerConfigAndGroupId(groupId), streamsRebalanceData);
|
||||||
StreamsRebalanceListener mockStreamsListener = mock(StreamsRebalanceListener.class);
|
StreamsRebalanceListener mockStreamsListener = mock(StreamsRebalanceListener.class);
|
||||||
when(mockStreamsListener.onAllTasksLost()).thenReturn(Optional.empty());
|
|
||||||
consumer.subscribe(singletonList("topic"), mockStreamsListener);
|
consumer.subscribe(singletonList("topic"), mockStreamsListener);
|
||||||
final MemberStateListener groupMetadataUpdateListener = captureGroupMetadataUpdateListener(requestManagers);
|
final MemberStateListener groupMetadataUpdateListener = captureGroupMetadataUpdateListener(requestManagers);
|
||||||
final int memberEpoch = 0;
|
final int memberEpoch = 0;
|
||||||
|
|
|
@ -28,7 +28,6 @@ import org.mockito.junit.jupiter.MockitoExtension;
|
||||||
import org.mockito.junit.jupiter.MockitoSettings;
|
import org.mockito.junit.jupiter.MockitoSettings;
|
||||||
import org.mockito.quality.Strictness;
|
import org.mockito.quality.Strictness;
|
||||||
|
|
||||||
import java.util.Optional;
|
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
@ -73,7 +72,6 @@ public class StreamsRebalanceListenerInvokerTest {
|
||||||
|
|
||||||
StreamsRebalanceData.Assignment mockAssignment = createMockAssignment();
|
StreamsRebalanceData.Assignment mockAssignment = createMockAssignment();
|
||||||
when(streamsRebalanceData.reconciledAssignment()).thenReturn(mockAssignment);
|
when(streamsRebalanceData.reconciledAssignment()).thenReturn(mockAssignment);
|
||||||
when(secondListener.onTasksRevoked(any())).thenReturn(Optional.empty());
|
|
||||||
|
|
||||||
// Set first listener
|
// Set first listener
|
||||||
invoker.setRebalanceListener(firstListener);
|
invoker.setRebalanceListener(firstListener);
|
||||||
|
@ -89,21 +87,10 @@ public class StreamsRebalanceListenerInvokerTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testInvokeMethodsWithNoListener() {
|
public void testInvokeMethodsWithNoListener() {
|
||||||
IllegalStateException exception1 = assertThrows(IllegalStateException.class,
|
assertNull(invoker.invokeAllTasksRevoked());
|
||||||
() -> invoker.invokeAllTasksRevoked());
|
assertNull(invoker.invokeTasksAssigned(createMockAssignment()));
|
||||||
assertEquals("StreamsRebalanceListener is not defined", exception1.getMessage());
|
assertNull(invoker.invokeTasksRevoked(createMockTasks()));
|
||||||
|
assertNull(invoker.invokeAllTasksLost());
|
||||||
IllegalStateException exception2 = assertThrows(IllegalStateException.class,
|
|
||||||
() -> invoker.invokeTasksAssigned(createMockAssignment()));
|
|
||||||
assertEquals("StreamsRebalanceListener is not defined", exception2.getMessage());
|
|
||||||
|
|
||||||
IllegalStateException exception3 = assertThrows(IllegalStateException.class,
|
|
||||||
() -> invoker.invokeTasksRevoked(createMockTasks()));
|
|
||||||
assertEquals("StreamsRebalanceListener is not defined", exception3.getMessage());
|
|
||||||
|
|
||||||
IllegalStateException exception4 = assertThrows(IllegalStateException.class,
|
|
||||||
() -> invoker.invokeAllTasksLost());
|
|
||||||
assertEquals("StreamsRebalanceListener is not defined", exception4.getMessage());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -112,8 +99,7 @@ public class StreamsRebalanceListenerInvokerTest {
|
||||||
|
|
||||||
StreamsRebalanceData.Assignment mockAssignment = createMockAssignment();
|
StreamsRebalanceData.Assignment mockAssignment = createMockAssignment();
|
||||||
when(streamsRebalanceData.reconciledAssignment()).thenReturn(mockAssignment);
|
when(streamsRebalanceData.reconciledAssignment()).thenReturn(mockAssignment);
|
||||||
when(mockListener.onTasksRevoked(any())).thenReturn(Optional.empty());
|
|
||||||
|
|
||||||
Exception result = invoker.invokeAllTasksRevoked();
|
Exception result = invoker.invokeAllTasksRevoked();
|
||||||
|
|
||||||
assertNull(result);
|
assertNull(result);
|
||||||
|
@ -124,8 +110,7 @@ public class StreamsRebalanceListenerInvokerTest {
|
||||||
public void testInvokeTasksAssignedWithListener() {
|
public void testInvokeTasksAssignedWithListener() {
|
||||||
invoker.setRebalanceListener(mockListener);
|
invoker.setRebalanceListener(mockListener);
|
||||||
StreamsRebalanceData.Assignment assignment = createMockAssignment();
|
StreamsRebalanceData.Assignment assignment = createMockAssignment();
|
||||||
when(mockListener.onTasksAssigned(assignment)).thenReturn(Optional.empty());
|
|
||||||
|
|
||||||
Exception result = invoker.invokeTasksAssigned(assignment);
|
Exception result = invoker.invokeTasksAssigned(assignment);
|
||||||
|
|
||||||
assertNull(result);
|
assertNull(result);
|
||||||
|
@ -177,8 +162,7 @@ public class StreamsRebalanceListenerInvokerTest {
|
||||||
public void testInvokeTasksRevokedWithListener() {
|
public void testInvokeTasksRevokedWithListener() {
|
||||||
invoker.setRebalanceListener(mockListener);
|
invoker.setRebalanceListener(mockListener);
|
||||||
Set<StreamsRebalanceData.TaskId> tasks = createMockTasks();
|
Set<StreamsRebalanceData.TaskId> tasks = createMockTasks();
|
||||||
when(mockListener.onTasksRevoked(tasks)).thenReturn(Optional.empty());
|
|
||||||
|
|
||||||
Exception result = invoker.invokeTasksRevoked(tasks);
|
Exception result = invoker.invokeTasksRevoked(tasks);
|
||||||
|
|
||||||
assertNull(result);
|
assertNull(result);
|
||||||
|
@ -229,8 +213,7 @@ public class StreamsRebalanceListenerInvokerTest {
|
||||||
@Test
|
@Test
|
||||||
public void testInvokeAllTasksLostWithListener() {
|
public void testInvokeAllTasksLostWithListener() {
|
||||||
invoker.setRebalanceListener(mockListener);
|
invoker.setRebalanceListener(mockListener);
|
||||||
when(mockListener.onAllTasksLost()).thenReturn(Optional.empty());
|
|
||||||
|
|
||||||
Exception result = invoker.invokeAllTasksLost();
|
Exception result = invoker.invokeAllTasksLost();
|
||||||
|
|
||||||
assertNull(result);
|
assertNull(result);
|
||||||
|
|
|
@ -3912,14 +3912,9 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
|
||||||
consumer.subscribe(
|
consumer.subscribe(
|
||||||
if (topicAsSourceTopic || topicAsRepartitionSourceTopic) util.Set.of(sourceTopic, topic) else util.Set.of(sourceTopic),
|
if (topicAsSourceTopic || topicAsRepartitionSourceTopic) util.Set.of(sourceTopic, topic) else util.Set.of(sourceTopic),
|
||||||
new StreamsRebalanceListener {
|
new StreamsRebalanceListener {
|
||||||
override def onTasksRevoked(tasks: util.Set[StreamsRebalanceData.TaskId]): Optional[Exception] =
|
override def onTasksRevoked(tasks: util.Set[StreamsRebalanceData.TaskId]): Unit = ()
|
||||||
Optional.empty()
|
override def onTasksAssigned(assignment: StreamsRebalanceData.Assignment): Unit = ()
|
||||||
|
override def onAllTasksLost(): Unit = ()
|
||||||
override def onTasksAssigned(assignment: StreamsRebalanceData.Assignment): Optional[Exception] =
|
|
||||||
Optional.empty()
|
|
||||||
|
|
||||||
override def onAllTasksLost(): Optional[Exception] =
|
|
||||||
Optional.empty()
|
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
consumer.poll(Duration.ofMillis(500L))
|
consumer.poll(Duration.ofMillis(500L))
|
||||||
|
|
|
@ -272,13 +272,9 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {
|
||||||
)
|
)
|
||||||
consumer.subscribe(util.Set.of(inputTopic),
|
consumer.subscribe(util.Set.of(inputTopic),
|
||||||
new StreamsRebalanceListener {
|
new StreamsRebalanceListener {
|
||||||
override def onTasksRevoked(tasks: util.Set[StreamsRebalanceData.TaskId]): Optional[Exception] =
|
override def onTasksRevoked(tasks: util.Set[StreamsRebalanceData.TaskId]): Unit = ()
|
||||||
Optional.empty()
|
override def onTasksAssigned(assignment: StreamsRebalanceData.Assignment): Unit = ()
|
||||||
override def onTasksAssigned(assignment: StreamsRebalanceData.Assignment): Optional[Exception] = {
|
override def onAllTasksLost(): Unit = ()
|
||||||
Optional.empty()
|
|
||||||
}
|
|
||||||
override def onAllTasksLost(): Optional[Exception] =
|
|
||||||
Optional.empty()
|
|
||||||
})
|
})
|
||||||
consumer
|
consumer
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,7 +26,6 @@ import org.slf4j.Logger;
|
||||||
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Optional;
|
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
@ -52,59 +51,44 @@ public class DefaultStreamsRebalanceListener implements StreamsRebalanceListener
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Optional<Exception> onTasksRevoked(final Set<StreamsRebalanceData.TaskId> tasks) {
|
public void onTasksRevoked(final Set<StreamsRebalanceData.TaskId> tasks) {
|
||||||
try {
|
final Map<TaskId, Set<TopicPartition>> activeTasksToRevokeWithPartitions =
|
||||||
final Map<TaskId, Set<TopicPartition>> activeTasksToRevokeWithPartitions =
|
pairWithTopicPartitions(tasks.stream());
|
||||||
pairWithTopicPartitions(tasks.stream());
|
final Set<TopicPartition> partitionsToRevoke = activeTasksToRevokeWithPartitions.values().stream()
|
||||||
final Set<TopicPartition> partitionsToRevoke = activeTasksToRevokeWithPartitions.values().stream()
|
.flatMap(Collection::stream)
|
||||||
.flatMap(Collection::stream)
|
.collect(Collectors.toSet());
|
||||||
.collect(Collectors.toSet());
|
|
||||||
|
|
||||||
final long start = time.milliseconds();
|
final long start = time.milliseconds();
|
||||||
try {
|
try {
|
||||||
log.info("Revoking active tasks {}.", tasks);
|
log.info("Revoking active tasks {}.", tasks);
|
||||||
taskManager.handleRevocation(partitionsToRevoke);
|
taskManager.handleRevocation(partitionsToRevoke);
|
||||||
} finally {
|
} finally {
|
||||||
log.info("partition revocation took {} ms.", time.milliseconds() - start);
|
log.info("partition revocation took {} ms.", time.milliseconds() - start);
|
||||||
}
|
}
|
||||||
if (streamThread.state() != StreamThread.State.PENDING_SHUTDOWN) {
|
if (streamThread.state() != StreamThread.State.PENDING_SHUTDOWN) {
|
||||||
streamThread.setState(StreamThread.State.PARTITIONS_REVOKED);
|
streamThread.setState(StreamThread.State.PARTITIONS_REVOKED);
|
||||||
}
|
|
||||||
} catch (final Exception exception) {
|
|
||||||
return Optional.of(exception);
|
|
||||||
}
|
}
|
||||||
return Optional.empty();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Optional<Exception> onTasksAssigned(final StreamsRebalanceData.Assignment assignment) {
|
public void onTasksAssigned(final StreamsRebalanceData.Assignment assignment) {
|
||||||
try {
|
final Map<TaskId, Set<TopicPartition>> activeTasksWithPartitions =
|
||||||
final Map<TaskId, Set<TopicPartition>> activeTasksWithPartitions =
|
pairWithTopicPartitions(assignment.activeTasks().stream());
|
||||||
pairWithTopicPartitions(assignment.activeTasks().stream());
|
final Map<TaskId, Set<TopicPartition>> standbyTasksWithPartitions =
|
||||||
final Map<TaskId, Set<TopicPartition>> standbyTasksWithPartitions =
|
pairWithTopicPartitions(Stream.concat(assignment.standbyTasks().stream(), assignment.warmupTasks().stream()));
|
||||||
pairWithTopicPartitions(Stream.concat(assignment.standbyTasks().stream(), assignment.warmupTasks().stream()));
|
|
||||||
|
|
||||||
log.info("Processing new assignment {} from Streams Rebalance Protocol", assignment);
|
log.info("Processing new assignment {} from Streams Rebalance Protocol", assignment);
|
||||||
|
|
||||||
taskManager.handleAssignment(activeTasksWithPartitions, standbyTasksWithPartitions);
|
taskManager.handleAssignment(activeTasksWithPartitions, standbyTasksWithPartitions);
|
||||||
streamThread.setState(StreamThread.State.PARTITIONS_ASSIGNED);
|
streamThread.setState(StreamThread.State.PARTITIONS_ASSIGNED);
|
||||||
taskManager.handleRebalanceComplete();
|
taskManager.handleRebalanceComplete();
|
||||||
streamsRebalanceData.setReconciledAssignment(assignment);
|
streamsRebalanceData.setReconciledAssignment(assignment);
|
||||||
} catch (final Exception exception) {
|
|
||||||
return Optional.of(exception);
|
|
||||||
}
|
|
||||||
return Optional.empty();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Optional<Exception> onAllTasksLost() {
|
public void onAllTasksLost() {
|
||||||
try {
|
taskManager.handleLostAll();
|
||||||
taskManager.handleLostAll();
|
streamsRebalanceData.setReconciledAssignment(StreamsRebalanceData.Assignment.EMPTY);
|
||||||
streamsRebalanceData.setReconciledAssignment(StreamsRebalanceData.Assignment.EMPTY);
|
|
||||||
} catch (final Exception exception) {
|
|
||||||
return Optional.of(exception);
|
|
||||||
}
|
|
||||||
return Optional.empty();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private Map<TaskId, Set<TopicPartition>> pairWithTopicPartitions(final Stream<StreamsRebalanceData.TaskId> taskIdStream) {
|
private Map<TaskId, Set<TopicPartition>> pairWithTopicPartitions(final Stream<StreamsRebalanceData.TaskId> taskIdStream) {
|
||||||
|
|
|
@ -32,8 +32,9 @@ import java.util.Optional;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||||
import static org.mockito.ArgumentMatchers.any;
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
import static org.mockito.Mockito.doThrow;
|
import static org.mockito.Mockito.doThrow;
|
||||||
import static org.mockito.Mockito.inOrder;
|
import static org.mockito.Mockito.inOrder;
|
||||||
|
@ -84,11 +85,9 @@ public class DefaultStreamsRebalanceListenerTest {
|
||||||
));
|
));
|
||||||
when(streamThread.state()).thenReturn(state);
|
when(streamThread.state()).thenReturn(state);
|
||||||
|
|
||||||
final Optional<Exception> result = defaultStreamsRebalanceListener.onTasksRevoked(
|
assertDoesNotThrow(() -> defaultStreamsRebalanceListener.onTasksRevoked(
|
||||||
Set.of(new StreamsRebalanceData.TaskId("1", 0))
|
Set.of(new StreamsRebalanceData.TaskId("1", 0))
|
||||||
);
|
));
|
||||||
|
|
||||||
assertTrue(result.isEmpty());
|
|
||||||
|
|
||||||
final InOrder inOrder = inOrder(taskManager, streamThread);
|
final InOrder inOrder = inOrder(taskManager, streamThread);
|
||||||
inOrder.verify(taskManager).handleRevocation(
|
inOrder.verify(taskManager).handleRevocation(
|
||||||
|
@ -109,9 +108,9 @@ public class DefaultStreamsRebalanceListenerTest {
|
||||||
|
|
||||||
createRebalanceListenerWithRebalanceData(new StreamsRebalanceData(UUID.randomUUID(), Optional.empty(), Map.of(), Map.of()));
|
createRebalanceListenerWithRebalanceData(new StreamsRebalanceData(UUID.randomUUID(), Optional.empty(), Map.of(), Map.of()));
|
||||||
|
|
||||||
final Optional<Exception> result = defaultStreamsRebalanceListener.onTasksRevoked(Set.of());
|
final Exception actualException = assertThrows(RuntimeException.class, () -> defaultStreamsRebalanceListener.onTasksRevoked(Set.of()));
|
||||||
|
|
||||||
assertTrue(result.isPresent());
|
assertEquals(actualException, exception);
|
||||||
verify(taskManager).handleRevocation(any());
|
verify(taskManager).handleRevocation(any());
|
||||||
verify(streamThread, never()).setState(any());
|
verify(streamThread, never()).setState(any());
|
||||||
}
|
}
|
||||||
|
@ -153,9 +152,7 @@ public class DefaultStreamsRebalanceListenerTest {
|
||||||
Set.of(new StreamsRebalanceData.TaskId("3", 0))
|
Set.of(new StreamsRebalanceData.TaskId("3", 0))
|
||||||
);
|
);
|
||||||
|
|
||||||
final Optional<Exception> result = defaultStreamsRebalanceListener.onTasksAssigned(assignment);
|
assertDoesNotThrow(() -> defaultStreamsRebalanceListener.onTasksAssigned(assignment));
|
||||||
|
|
||||||
assertTrue(result.isEmpty());
|
|
||||||
|
|
||||||
final InOrder inOrder = inOrder(taskManager, streamThread, streamsRebalanceData);
|
final InOrder inOrder = inOrder(taskManager, streamThread, streamsRebalanceData);
|
||||||
inOrder.verify(taskManager).handleAssignment(
|
inOrder.verify(taskManager).handleAssignment(
|
||||||
|
@ -179,11 +176,11 @@ public class DefaultStreamsRebalanceListenerTest {
|
||||||
when(streamsRebalanceData.subtopologies()).thenReturn(Map.of());
|
when(streamsRebalanceData.subtopologies()).thenReturn(Map.of());
|
||||||
createRebalanceListenerWithRebalanceData(streamsRebalanceData);
|
createRebalanceListenerWithRebalanceData(streamsRebalanceData);
|
||||||
|
|
||||||
final Optional<Exception> result = defaultStreamsRebalanceListener.onTasksAssigned(
|
final Exception actualException = assertThrows(RuntimeException.class, () -> defaultStreamsRebalanceListener.onTasksAssigned(
|
||||||
new StreamsRebalanceData.Assignment(Set.of(), Set.of(), Set.of())
|
new StreamsRebalanceData.Assignment(Set.of(), Set.of(), Set.of())
|
||||||
);
|
));
|
||||||
assertTrue(result.isPresent());
|
|
||||||
assertEquals(exception, result.get());
|
assertEquals(exception, actualException);
|
||||||
verify(taskManager).handleAssignment(any(), any());
|
verify(taskManager).handleAssignment(any(), any());
|
||||||
verify(streamThread, never()).setState(StreamThread.State.PARTITIONS_ASSIGNED);
|
verify(streamThread, never()).setState(StreamThread.State.PARTITIONS_ASSIGNED);
|
||||||
verify(taskManager, never()).handleRebalanceComplete();
|
verify(taskManager, never()).handleRebalanceComplete();
|
||||||
|
@ -196,7 +193,7 @@ public class DefaultStreamsRebalanceListenerTest {
|
||||||
when(streamsRebalanceData.subtopologies()).thenReturn(Map.of());
|
when(streamsRebalanceData.subtopologies()).thenReturn(Map.of());
|
||||||
createRebalanceListenerWithRebalanceData(streamsRebalanceData);
|
createRebalanceListenerWithRebalanceData(streamsRebalanceData);
|
||||||
|
|
||||||
assertTrue(defaultStreamsRebalanceListener.onAllTasksLost().isEmpty());
|
assertDoesNotThrow(() -> defaultStreamsRebalanceListener.onAllTasksLost());
|
||||||
|
|
||||||
final InOrder inOrder = inOrder(taskManager, streamsRebalanceData);
|
final InOrder inOrder = inOrder(taskManager, streamsRebalanceData);
|
||||||
inOrder.verify(taskManager).handleLostAll();
|
inOrder.verify(taskManager).handleLostAll();
|
||||||
|
@ -211,9 +208,10 @@ public class DefaultStreamsRebalanceListenerTest {
|
||||||
final StreamsRebalanceData streamsRebalanceData = mock(StreamsRebalanceData.class);
|
final StreamsRebalanceData streamsRebalanceData = mock(StreamsRebalanceData.class);
|
||||||
when(streamsRebalanceData.subtopologies()).thenReturn(Map.of());
|
when(streamsRebalanceData.subtopologies()).thenReturn(Map.of());
|
||||||
createRebalanceListenerWithRebalanceData(streamsRebalanceData);
|
createRebalanceListenerWithRebalanceData(streamsRebalanceData);
|
||||||
final Optional<Exception> result = defaultStreamsRebalanceListener.onAllTasksLost();
|
|
||||||
assertTrue(result.isPresent());
|
final Exception actualException = assertThrows(RuntimeException.class, () -> defaultStreamsRebalanceListener.onAllTasksLost());
|
||||||
assertEquals(exception, result.get());
|
|
||||||
|
assertEquals(exception, actualException);
|
||||||
verify(taskManager).handleLostAll();
|
verify(taskManager).handleLostAll();
|
||||||
verify(streamsRebalanceData, never()).setReconciledAssignment(any());
|
verify(streamsRebalanceData, never()).setReconciledAssignment(any());
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue