KAFKA-13414: Replace PowerMock/EasyMock with Mockito in connect.storage.KafkaOffsetBackingStoreTest (#12418)

Reviewers: Chris Egerton <chrise@aiven.io>
This commit is contained in:
Christo Lolov 2022-11-15 21:30:52 +00:00 committed by GitHub
parent 09da44ed80
commit 1894856d0e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 152 additions and 226 deletions

View File

@ -406,7 +406,7 @@ subprojects {
// connect tests
"**/ConnectorPluginsResourceTest.*",
"**/DistributedHerderTest.*", "**/FileOffsetBakingStoreTest.*",
"**/ErrorHandlingTaskTest.*", "**/KafkaConfigBackingStoreTest.*", "**/KafkaOffsetBackingStoreTest.*",
"**/ErrorHandlingTaskTest.*", "**/KafkaConfigBackingStoreTest.*",
"**/KafkaBasedLogTest.*", "**/OffsetStorageWriterTest.*", "**/StandaloneHerderTest.*",
"**/SourceTaskOffsetCommitterTest.*",
"**/WorkerSinkTaskTest.*", "**/WorkerSinkTaskThreadedTest.*",

View File

@ -139,7 +139,8 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore {
}
protected KafkaBasedLog<byte[], byte[]> offsetLog;
private final HashMap<ByteBuffer, ByteBuffer> data = new HashMap<>();
// Visible for testing
final HashMap<ByteBuffer, ByteBuffer> data = new HashMap<>();
private final Supplier<TopicAdmin> topicAdminSupplier;
private final Supplier<String> clientIdBase;
private SharedTopicAdmin ownTopicAdmin;
@ -230,7 +231,8 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore {
this.offsetLog = createKafkaBasedLog(topic, producerProps, consumerProps, consumedCallback, topicDescription, adminSupplier);
}
private KafkaBasedLog<byte[], byte[]> createKafkaBasedLog(String topic, Map<String, Object> producerProps,
// Visible for testing
KafkaBasedLog<byte[], byte[]> createKafkaBasedLog(String topic, Map<String, Object> producerProps,
Map<String, Object> consumerProps,
Callback<ConsumerRecord<byte[], byte[]>> consumedCallback,
final NewTopic topicDescription, Supplier<TopicAdmin> adminSupplier) {

View File

@ -29,25 +29,20 @@ import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.apache.kafka.connect.util.SharedTopicAdmin;
import org.apache.kafka.connect.util.TopicAdmin;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock;
import org.powermock.api.easymock.annotation.Mock;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.reflect.Whitebox;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.MockedStatic;
import org.mockito.junit.MockitoJUnitRunner;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
@ -64,12 +59,21 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.AdditionalMatchers.aryEq;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.Mockito.CALLS_REAL_METHODS;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
@RunWith(PowerMockRunner.class)
@PrepareForTest({KafkaOffsetBackingStore.class, WorkerConfig.class})
@PowerMockIgnore({"javax.management.*", "javax.crypto.*"})
@RunWith(MockitoJUnitRunner.StrictStubs.class)
public class KafkaOffsetBackingStoreTest {
private static final String CLIENT_ID_BASE = "test-client-id-";
private static final String TOPIC = "connect-offsets";
@ -110,41 +114,59 @@ public class KafkaOffsetBackingStoreTest {
KafkaBasedLog<byte[], byte[]> storeLog;
private KafkaOffsetBackingStore store;
private Capture<String> capturedTopic = EasyMock.newCapture();
private Capture<Map<String, Object>> capturedProducerProps = EasyMock.newCapture();
private Capture<Map<String, Object>> capturedConsumerProps = EasyMock.newCapture();
private Capture<Supplier<TopicAdmin>> capturedAdminSupplier = EasyMock.newCapture();
private Capture<NewTopic> capturedNewTopic = EasyMock.newCapture();
private Capture<Callback<ConsumerRecord<byte[], byte[]>>> capturedConsumedCallback = EasyMock.newCapture();
private MockedStatic<WorkerConfig> workerConfigMockedStatic;
@Captor
private ArgumentCaptor<String> capturedTopic;
@Captor
private ArgumentCaptor<Map<String, Object>> capturedProducerProps;
@Captor
private ArgumentCaptor<Map<String, Object>> capturedConsumerProps;
@Captor
private ArgumentCaptor<Supplier<TopicAdmin>> capturedAdminSupplier;
@Captor
private ArgumentCaptor<NewTopic> capturedNewTopic;
@Captor
private ArgumentCaptor<Callback<ConsumerRecord<byte[], byte[]>>> capturedConsumedCallback;
@Captor
private ArgumentCaptor<Callback<Void>> storeLogCallbackArgumentCaptor;
@Before
public void setUp() throws Exception {
Supplier<SharedTopicAdmin> adminSupplier = () -> {
public void setup() throws Exception {
Supplier<TopicAdmin> adminSupplier = () -> {
fail("Should not attempt to instantiate admin in these tests");
// Should never be reached; only add this thrown exception to satisfy the compiler
throw new AssertionError();
};
Supplier<String> clientIdBase = () -> CLIENT_ID_BASE;
store = PowerMock.createPartialMock(
KafkaOffsetBackingStore.class,
new String[] {"createKafkaBasedLog"},
adminSupplier, clientIdBase
);
store = spy(new KafkaOffsetBackingStore(adminSupplier, clientIdBase));
doReturn(storeLog).when(store).createKafkaBasedLog(capturedTopic.capture(), capturedProducerProps.capture(),
capturedConsumerProps.capture(), capturedConsumedCallback.capture(),
capturedNewTopic.capture(), capturedAdminSupplier.capture());
workerConfigMockedStatic = mockStatic(WorkerConfig.class, CALLS_REAL_METHODS);
workerConfigMockedStatic.when(() -> WorkerConfig.lookupKafkaClusterId(any())).thenReturn("test-cluster");
}
@After
public void tearDown() {
workerConfigMockedStatic.close();
verifyNoMoreInteractions(storeLog);
}
@Test
public void testStartStop() throws Exception {
expectConfigure();
expectStart(Collections.emptyList());
expectStop();
expectClusterId();
PowerMock.replayAll();
public void testStartStop() {
Map<String, String> settings = new HashMap<>(DEFAULT_PROPS);
settings.put("offset.storage.min.insync.replicas", "3");
settings.put("offset.storage.max.message.bytes", "1001");
store.configure(new DistributedConfig(settings));
store.start();
verify(storeLog).start();
assertEquals(TOPIC, capturedTopic.getValue());
assertEquals("org.apache.kafka.common.serialization.ByteArraySerializer", capturedProducerProps.getValue().get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG));
assertEquals("org.apache.kafka.common.serialization.ByteArraySerializer", capturedProducerProps.getValue().get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG));
@ -155,103 +177,55 @@ public class KafkaOffsetBackingStoreTest {
assertEquals(TOPIC_PARTITIONS, capturedNewTopic.getValue().numPartitions());
assertEquals(TOPIC_REPLICATION_FACTOR, capturedNewTopic.getValue().replicationFactor());
store.start();
store.stop();
PowerMock.verifyAll();
verify(storeLog).stop();
}
@Test
public void testReloadOnStart() throws Exception {
expectConfigure();
expectStart(Arrays.asList(
new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, TP0_KEY.array(), TP0_VALUE.array(),
new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0, 0, TP1_KEY.array(), TP1_VALUE.array(),
new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TP0_KEY.array(), TP0_VALUE_NEW.array(),
new RecordHeaders(), Optional.empty()),
new ConsumerRecord<>(TOPIC, 1, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TP1_KEY.array(), TP1_VALUE_NEW.array(),
new RecordHeaders(), Optional.empty())
));
expectStop();
expectClusterId();
PowerMock.replayAll();
public void testReloadOnStart() {
doAnswer(invocation -> {
capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, TP0_KEY.array(), TP0_VALUE.array(),
new RecordHeaders(), Optional.empty()));
capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0, 0, TP1_KEY.array(), TP1_VALUE.array(),
new RecordHeaders(), Optional.empty()));
capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TP0_KEY.array(), TP0_VALUE_NEW.array(),
new RecordHeaders(), Optional.empty()));
capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 1, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TP1_KEY.array(), TP1_VALUE_NEW.array(),
new RecordHeaders(), Optional.empty()));
return null;
}).when(storeLog).start();
store.configure(DEFAULT_DISTRIBUTED_CONFIG);
store.start();
HashMap<ByteBuffer, ByteBuffer> data = Whitebox.getInternalState(store, "data");
HashMap<ByteBuffer, ByteBuffer> data = store.data;
assertEquals(TP0_VALUE_NEW, data.get(TP0_KEY));
assertEquals(TP1_VALUE_NEW, data.get(TP1_KEY));
store.stop();
PowerMock.verifyAll();
verify(storeLog).stop();
}
@Test
public void testGetSet() throws Exception {
expectConfigure();
expectStart(Collections.emptyList());
expectStop();
// First get() against an empty store
final Capture<Callback<Void>> firstGetReadToEndCallback = EasyMock.newCapture();
storeLog.readToEnd(EasyMock.capture(firstGetReadToEndCallback));
PowerMock.expectLastCall().andAnswer(() -> {
firstGetReadToEndCallback.getValue().onCompletion(null, null);
return null;
});
// Set offsets
Capture<org.apache.kafka.clients.producer.Callback> callback0 = EasyMock.newCapture();
storeLog.send(EasyMock.aryEq(TP0_KEY.array()), EasyMock.aryEq(TP0_VALUE.array()), EasyMock.capture(callback0));
PowerMock.expectLastCall();
Capture<org.apache.kafka.clients.producer.Callback> callback1 = EasyMock.newCapture();
storeLog.send(EasyMock.aryEq(TP1_KEY.array()), EasyMock.aryEq(TP1_VALUE.array()), EasyMock.capture(callback1));
PowerMock.expectLastCall();
// Second get() should get the produced data and return the new values
final Capture<Callback<Void>> secondGetReadToEndCallback = EasyMock.newCapture();
storeLog.readToEnd(EasyMock.capture(secondGetReadToEndCallback));
PowerMock.expectLastCall().andAnswer(() -> {
capturedConsumedCallback.getValue().onCompletion(null,
new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, TP0_KEY.array(), TP0_VALUE.array(),
new RecordHeaders(), Optional.empty()));
capturedConsumedCallback.getValue().onCompletion(null,
new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0, 0, TP1_KEY.array(), TP1_VALUE.array(),
new RecordHeaders(), Optional.empty()));
secondGetReadToEndCallback.getValue().onCompletion(null, null);
return null;
});
// Third get() should pick up data produced by someone else and return those values
final Capture<Callback<Void>> thirdGetReadToEndCallback = EasyMock.newCapture();
storeLog.readToEnd(EasyMock.capture(thirdGetReadToEndCallback));
PowerMock.expectLastCall().andAnswer(() -> {
capturedConsumedCallback.getValue().onCompletion(null,
new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TP0_KEY.array(), TP0_VALUE_NEW.array(),
new RecordHeaders(), Optional.empty()));
capturedConsumedCallback.getValue().onCompletion(null,
new ConsumerRecord<>(TOPIC, 1, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TP1_KEY.array(), TP1_VALUE_NEW.array(),
new RecordHeaders(), Optional.empty()));
thirdGetReadToEndCallback.getValue().onCompletion(null, null);
return null;
});
expectClusterId();
PowerMock.replayAll();
store.configure(DEFAULT_DISTRIBUTED_CONFIG);
store.start();
verify(storeLog).start();
doAnswer(invocation -> {
// First get() against an empty store
storeLogCallbackArgumentCaptor.getValue().onCompletion(null, null);
return null;
}).when(storeLog).readToEnd(storeLogCallbackArgumentCaptor.capture());
// Getting from empty store should return nulls
Map<ByteBuffer, ByteBuffer> offsets = store.get(Arrays.asList(TP0_KEY, TP1_KEY)).get(10000, TimeUnit.MILLISECONDS);
// Since we didn't read them yet, these will be null
assertNull(offsets.get(TP0_KEY));
assertNull(offsets.get(TP1_KEY));
// Set some offsets
Map<ByteBuffer, ByteBuffer> toSet = new HashMap<>();
toSet.put(TP0_KEY, TP0_VALUE);
@ -259,6 +233,11 @@ public class KafkaOffsetBackingStoreTest {
final AtomicBoolean invoked = new AtomicBoolean(false);
Future<Void> setFuture = store.set(toSet, (error, result) -> invoked.set(true));
assertFalse(setFuture.isDone());
// Set offsets
ArgumentCaptor<org.apache.kafka.clients.producer.Callback> callback0 = ArgumentCaptor.forClass(org.apache.kafka.clients.producer.Callback.class);
verify(storeLog).send(aryEq(TP0_KEY.array()), aryEq(TP0_VALUE.array()), callback0.capture());
ArgumentCaptor<org.apache.kafka.clients.producer.Callback> callback1 = ArgumentCaptor.forClass(org.apache.kafka.clients.producer.Callback.class);
verify(storeLog).send(aryEq(TP1_KEY.array()), aryEq(TP1_VALUE.array()), callback1.capture());
// Out of order callbacks shouldn't matter, should still require all to be invoked before invoking the callback
// for the store's set callback
callback1.getValue().onCompletion(null, null);
@ -267,11 +246,35 @@ public class KafkaOffsetBackingStoreTest {
setFuture.get(10000, TimeUnit.MILLISECONDS);
assertTrue(invoked.get());
doAnswer(invocation -> {
// Second get() should get the produced data and return the new values
capturedConsumedCallback.getValue().onCompletion(null,
new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, TP0_KEY.array(), TP0_VALUE.array(),
new RecordHeaders(), Optional.empty()));
capturedConsumedCallback.getValue().onCompletion(null,
new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0, 0, TP1_KEY.array(), TP1_VALUE.array(),
new RecordHeaders(), Optional.empty()));
storeLogCallbackArgumentCaptor.getValue().onCompletion(null, null);
return null;
}).when(storeLog).readToEnd(storeLogCallbackArgumentCaptor.capture());
// Getting data should read to end of our published data and return it
offsets = store.get(Arrays.asList(TP0_KEY, TP1_KEY)).get(10000, TimeUnit.MILLISECONDS);
assertEquals(TP0_VALUE, offsets.get(TP0_KEY));
assertEquals(TP1_VALUE, offsets.get(TP1_KEY));
doAnswer(invocation -> {
// Third get() should pick up data produced by someone else and return those values
capturedConsumedCallback.getValue().onCompletion(null,
new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TP0_KEY.array(), TP0_VALUE_NEW.array(),
new RecordHeaders(), Optional.empty()));
capturedConsumedCallback.getValue().onCompletion(null,
new ConsumerRecord<>(TOPIC, 1, 1, 0L, TimestampType.CREATE_TIME, 0, 0, TP1_KEY.array(), TP1_VALUE_NEW.array(),
new RecordHeaders(), Optional.empty()));
storeLogCallbackArgumentCaptor.getValue().onCompletion(null, null);
return null;
}).when(storeLog).readToEnd(storeLogCallbackArgumentCaptor.capture());
// Getting data should read to end of our published data and return it
offsets = store.get(Arrays.asList(TP0_KEY, TP1_KEY)).get(10000, TimeUnit.MILLISECONDS);
assertEquals(TP0_VALUE_NEW, offsets.get(TP0_KEY));
@ -279,44 +282,28 @@ public class KafkaOffsetBackingStoreTest {
store.stop();
PowerMock.verifyAll();
verify(storeLog).stop();
}
@Test
public void testGetSetNull() throws Exception {
expectConfigure();
expectStart(Collections.emptyList());
// Set offsets
Capture<org.apache.kafka.clients.producer.Callback> callback0 = EasyMock.newCapture();
storeLog.send(EasyMock.isNull(byte[].class), EasyMock.aryEq(TP0_VALUE.array()), EasyMock.capture(callback0));
PowerMock.expectLastCall();
Capture<org.apache.kafka.clients.producer.Callback> callback1 = EasyMock.newCapture();
storeLog.send(EasyMock.aryEq(TP1_KEY.array()), EasyMock.isNull(byte[].class), EasyMock.capture(callback1));
PowerMock.expectLastCall();
// Second get() should get the produced data and return the new values
final Capture<Callback<Void>> secondGetReadToEndCallback = EasyMock.newCapture();
storeLog.readToEnd(EasyMock.capture(secondGetReadToEndCallback));
PowerMock.expectLastCall().andAnswer(() -> {
doAnswer(invocation -> {
capturedConsumedCallback.getValue().onCompletion(null,
new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, null, TP0_VALUE.array(),
new RecordHeaders(), Optional.empty()));
capturedConsumedCallback.getValue().onCompletion(null,
new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0, 0, TP1_KEY.array(), null,
new RecordHeaders(), Optional.empty()));
secondGetReadToEndCallback.getValue().onCompletion(null, null);
storeLogCallbackArgumentCaptor.getValue().onCompletion(null, null);
return null;
});
expectStop();
expectClusterId();
PowerMock.replayAll();
}).when(storeLog).readToEnd(storeLogCallbackArgumentCaptor.capture());
store.configure(DEFAULT_DISTRIBUTED_CONFIG);
store.start();
verify(storeLog).start();
// Set offsets using null keys and values
Map<ByteBuffer, ByteBuffer> toSet = new HashMap<>();
toSet.put(null, TP0_VALUE);
@ -324,6 +311,12 @@ public class KafkaOffsetBackingStoreTest {
final AtomicBoolean invoked = new AtomicBoolean(false);
Future<Void> setFuture = store.set(toSet, (error, result) -> invoked.set(true));
assertFalse(setFuture.isDone());
// Set offsets
ArgumentCaptor<org.apache.kafka.clients.producer.Callback> callback0 = ArgumentCaptor.forClass(org.apache.kafka.clients.producer.Callback.class);
verify(storeLog).send(isNull(), aryEq(TP0_VALUE.array()), callback0.capture());
ArgumentCaptor<org.apache.kafka.clients.producer.Callback> callback1 = ArgumentCaptor.forClass(org.apache.kafka.clients.producer.Callback.class);
verify(storeLog).send(aryEq(TP1_KEY.array()), isNull(), callback1.capture());
// Out of order callbacks shouldn't matter, should still require all to be invoked before invoking the callback
// for the store's set callback
callback1.getValue().onCompletion(null, null);
@ -331,7 +324,6 @@ public class KafkaOffsetBackingStoreTest {
callback0.getValue().onCompletion(null, null);
setFuture.get(10000, TimeUnit.MILLISECONDS);
assertTrue(invoked.get());
// Getting data should read to end of our published data and return it
Map<ByteBuffer, ByteBuffer> offsets = store.get(Arrays.asList(null, TP1_KEY)).get(10000, TimeUnit.MILLISECONDS);
assertEquals(TP0_VALUE, offsets.get(null));
@ -339,33 +331,16 @@ public class KafkaOffsetBackingStoreTest {
store.stop();
PowerMock.verifyAll();
verify(storeLog).stop();
}
@Test
public void testSetFailure() throws Exception {
expectConfigure();
expectStart(Collections.emptyList());
expectStop();
// Set offsets
Capture<org.apache.kafka.clients.producer.Callback> callback0 = EasyMock.newCapture();
storeLog.send(EasyMock.aryEq(TP0_KEY.array()), EasyMock.aryEq(TP0_VALUE.array()), EasyMock.capture(callback0));
PowerMock.expectLastCall();
Capture<org.apache.kafka.clients.producer.Callback> callback1 = EasyMock.newCapture();
storeLog.send(EasyMock.aryEq(TP1_KEY.array()), EasyMock.aryEq(TP1_VALUE.array()), EasyMock.capture(callback1));
PowerMock.expectLastCall();
Capture<org.apache.kafka.clients.producer.Callback> callback2 = EasyMock.newCapture();
storeLog.send(EasyMock.aryEq(TP2_KEY.array()), EasyMock.aryEq(TP2_VALUE.array()), EasyMock.capture(callback2));
PowerMock.expectLastCall();
expectClusterId();
PowerMock.replayAll();
public void testSetFailure() {
store.configure(DEFAULT_DISTRIBUTED_CONFIG);
store.start();
verify(storeLog).start();
// Set some offsets
Map<ByteBuffer, ByteBuffer> toSet = new HashMap<>();
toSet.put(TP0_KEY, TP0_VALUE);
@ -379,6 +354,13 @@ public class KafkaOffsetBackingStoreTest {
invokedFailure.set(true);
});
assertFalse(setFuture.isDone());
// Set offsets
ArgumentCaptor<org.apache.kafka.clients.producer.Callback> callback0 = ArgumentCaptor.forClass(org.apache.kafka.clients.producer.Callback.class);
verify(storeLog).send(aryEq(TP0_KEY.array()), aryEq(TP0_VALUE.array()), callback0.capture());
ArgumentCaptor<org.apache.kafka.clients.producer.Callback> callback1 = ArgumentCaptor.forClass(org.apache.kafka.clients.producer.Callback.class);
verify(storeLog).send(aryEq(TP1_KEY.array()), aryEq(TP1_VALUE.array()), callback1.capture());
ArgumentCaptor<org.apache.kafka.clients.producer.Callback> callback2 = ArgumentCaptor.forClass(org.apache.kafka.clients.producer.Callback.class);
verify(storeLog).send(aryEq(TP2_KEY.array()), aryEq(TP2_VALUE.array()), callback2.capture());
// Out of order callbacks shouldn't matter, should still require all to be invoked before invoking the callback
// for the store's set callback
callback1.getValue().onCompletion(null, null);
@ -387,91 +369,64 @@ public class KafkaOffsetBackingStoreTest {
assertTrue(invoked.get());
assertTrue(invokedFailure.get());
callback0.getValue().onCompletion(null, null);
try {
setFuture.get(10000, TimeUnit.MILLISECONDS);
fail("Should have seen KafkaException thrown when waiting on KafkaOffsetBackingStore.set() future");
} catch (ExecutionException e) {
// expected
ExecutionException e = assertThrows(ExecutionException.class, () -> setFuture.get(10000, TimeUnit.MILLISECONDS));
assertNotNull(e.getCause());
assertTrue(e.getCause() instanceof KafkaException);
}
store.stop();
PowerMock.verifyAll();
verify(storeLog).stop();
}
@Test
public void testConsumerPropertiesInsertedByDefaultWithExactlyOnceSourceEnabled() throws Exception {
public void testConsumerPropertiesInsertedByDefaultWithExactlyOnceSourceEnabled() {
Map<String, String> workerProps = new HashMap<>(DEFAULT_PROPS);
workerProps.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "enabled");
workerProps.remove(ISOLATION_LEVEL_CONFIG);
DistributedConfig config = new DistributedConfig(workerProps);
expectConfigure();
expectClusterId();
PowerMock.replayAll();
store.configure(config);
assertEquals(
IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT),
capturedConsumerProps.getValue().get(ISOLATION_LEVEL_CONFIG)
);
PowerMock.verifyAll();
}
@Test
public void testConsumerPropertiesOverrideUserSuppliedValuesWithExactlyOnceSourceEnabled() throws Exception {
public void testConsumerPropertiesOverrideUserSuppliedValuesWithExactlyOnceSourceEnabled() {
Map<String, String> workerProps = new HashMap<>(DEFAULT_PROPS);
workerProps.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "enabled");
workerProps.put(ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_UNCOMMITTED.name().toLowerCase(Locale.ROOT));
DistributedConfig config = new DistributedConfig(workerProps);
expectConfigure();
expectClusterId();
PowerMock.replayAll();
store.configure(config);
assertEquals(
IsolationLevel.READ_COMMITTED.name().toLowerCase(Locale.ROOT),
capturedConsumerProps.getValue().get(ISOLATION_LEVEL_CONFIG)
);
PowerMock.verifyAll();
}
@Test
public void testConsumerPropertiesNotInsertedByDefaultWithoutExactlyOnceSourceEnabled() throws Exception {
public void testConsumerPropertiesNotInsertedByDefaultWithoutExactlyOnceSourceEnabled() {
Map<String, String> workerProps = new HashMap<>(DEFAULT_PROPS);
workerProps.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "disabled");
workerProps.remove(ISOLATION_LEVEL_CONFIG);
DistributedConfig config = new DistributedConfig(workerProps);
expectConfigure();
expectClusterId();
PowerMock.replayAll();
store.configure(config);
assertNull(capturedConsumerProps.getValue().get(ISOLATION_LEVEL_CONFIG));
PowerMock.verifyAll();
}
@Test
public void testConsumerPropertiesDoNotOverrideUserSuppliedValuesWithoutExactlyOnceSourceEnabled() throws Exception {
public void testConsumerPropertiesDoNotOverrideUserSuppliedValuesWithoutExactlyOnceSourceEnabled() {
Map<String, String> workerProps = new HashMap<>(DEFAULT_PROPS);
workerProps.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "disabled");
workerProps.put(ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_UNCOMMITTED.name().toLowerCase(Locale.ROOT));
DistributedConfig config = new DistributedConfig(workerProps);
expectConfigure();
expectClusterId();
PowerMock.replayAll();
store.configure(config);
assertEquals(
@ -479,51 +434,20 @@ public class KafkaOffsetBackingStoreTest {
capturedConsumerProps.getValue().get(ISOLATION_LEVEL_CONFIG)
);
PowerMock.verifyAll();
}
@Test
public void testClientIds() throws Exception {
public void testClientIds() {
Map<String, String> workerProps = new HashMap<>(DEFAULT_PROPS);
DistributedConfig config = new DistributedConfig(workerProps);
expectConfigure();
expectClusterId();
PowerMock.replayAll();
workerConfigMockedStatic.when(() -> WorkerConfig.lookupKafkaClusterId(any())).thenReturn("test-cluster");
store.configure(config);
final String expectedClientId = CLIENT_ID_BASE + "offsets";
assertEquals(expectedClientId, capturedProducerProps.getValue().get(CLIENT_ID_CONFIG));
assertEquals(expectedClientId, capturedConsumerProps.getValue().get(CLIENT_ID_CONFIG));
PowerMock.verifyAll();
}
private void expectConfigure() throws Exception {
PowerMock.expectPrivate(store, "createKafkaBasedLog", EasyMock.capture(capturedTopic), EasyMock.capture(capturedProducerProps),
EasyMock.capture(capturedConsumerProps), EasyMock.capture(capturedConsumedCallback),
EasyMock.capture(capturedNewTopic), EasyMock.capture(capturedAdminSupplier))
.andReturn(storeLog);
}
private void expectStart(final List<ConsumerRecord<byte[], byte[]>> preexistingRecords) {
storeLog.start();
PowerMock.expectLastCall().andAnswer(() -> {
for (ConsumerRecord<byte[], byte[]> rec : preexistingRecords)
capturedConsumedCallback.getValue().onCompletion(null, rec);
return null;
});
}
private void expectStop() {
storeLog.stop();
PowerMock.expectLastCall();
}
private void expectClusterId() {
PowerMock.mockStaticPartial(WorkerConfig.class, "lookupKafkaClusterId");
EasyMock.expect(WorkerConfig.lookupKafkaClusterId(EasyMock.anyObject())).andReturn("test-cluster").anyTimes();
}
private static ByteBuffer buffer(String v) {