KAFKA-12474: Handle failure to write new session keys gracefully (#10396)

If a distributed worker fails to write (or read back) a new session key to/from the config topic, it dies. This fix softens the blow a bit by instead restarting the herder tick loop anew and forcing a read to the end of the config topic until the worker is able to successfully read to the end.

At this point, if the worker was able to successfully write a new session key in its first attempt, it will have read that key back from the config topic and will not write a new key during the next tick iteration. If it was not able to write that key at all, it will try again to write a new key (if it is still the leader).

Verified with new unit tests for both cases (failure to write, failure to read back after write).

Author: Chris Egerton <chrise@confluent.io>
Reviewers: Greg Harris <gregh@confluent.io>, Randall Hauch <rhauch@gmail.com>
This commit is contained in:
Chris Egerton 2021-04-01 13:26:01 -04:00 committed by GitHub
parent 4ed7f2cd01
commit aea059a07b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 97 additions and 5 deletions

View File

@ -363,10 +363,16 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
if (checkForKeyRotation(now)) {
log.debug("Distributing new session key");
keyExpiration = Long.MAX_VALUE;
try {
configBackingStore.putSessionKey(new SessionKey(
keyGenerator.generateKey(),
now
));
} catch (Exception e) {
log.info("Failed to write new session key to config topic; forcing a read to the end of the config topic before possibly retrying");
canReadConfigs = false;
return;
}
}
// Process any external requests
@ -1173,7 +1179,11 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
* @return true if successful, false if timed out
*/
private boolean readConfigToEnd(long timeoutMs) {
if (configState.offset() < assignment.offset()) {
log.info("Current config state offset {} is behind group assignment {}, reading to end of config log", configState.offset(), assignment.offset());
} else {
log.info("Reading to end of config log; current config state offset: {}", configState.offset());
}
try {
configBackingStore.refresh(timeoutMs, TimeUnit.MILLISECONDS);
configState = configBackingStore.snapshot();

View File

@ -23,11 +23,13 @@ import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.connector.policy.NoneConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.errors.AlreadyExistsException;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.NotFoundException;
import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.MockConnectMetrics;
import org.apache.kafka.connect.runtime.SessionKey;
import org.apache.kafka.connect.runtime.SinkConnectorConfig;
import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.runtime.TaskConfig;
@ -67,6 +69,7 @@ import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.reflect.Whitebox;
import javax.crypto.SecretKey;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@ -84,8 +87,10 @@ import java.util.concurrent.TimeoutException;
import static java.util.Collections.singletonList;
import static javax.ws.rs.core.Response.Status.FORBIDDEN;
import static org.apache.kafka.connect.runtime.distributed.ConnectProtocol.CONNECT_PROTOCOL_V0;
import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.INTER_WORKER_KEY_GENERATION_ALGORITHM_DEFAULT;
import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V1;
import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V2;
import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.capture;
import static org.easymock.EasyMock.newCapture;
import static org.junit.Assert.assertEquals;
@ -2176,6 +2181,83 @@ public class DistributedHerderTest {
PowerMock.verifyAll();
}
@Test
public void testFailedToWriteSessionKey() throws Exception {
// First tick -- after joining the group, we try to write a new
// session key to the config topic, and fail
EasyMock.expect(member.memberId()).andStubReturn("leader");
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V2);
expectRebalance(1, Collections.emptyList(), Collections.emptyList());
expectPostRebalanceCatchup(SNAPSHOT);
configBackingStore.putSessionKey(anyObject(SessionKey.class));
EasyMock.expectLastCall().andThrow(new ConnectException("Oh no!"));
// Second tick -- we read to the end of the config topic first,
// then ensure we're still active in the group
// then try a second time to write a new session key,
// then finally begin polling for group activity
expectPostRebalanceCatchup(SNAPSHOT);
member.ensureActive();
PowerMock.expectLastCall();
configBackingStore.putSessionKey(anyObject(SessionKey.class));
EasyMock.expectLastCall();
member.poll(EasyMock.anyInt());
PowerMock.expectLastCall();
PowerMock.replayAll();
herder.tick();
herder.tick();
PowerMock.verifyAll();
}
@Test
public void testFailedToReadBackNewlyWrittenSessionKey() throws Exception {
SecretKey secretKey = EasyMock.niceMock(SecretKey.class);
EasyMock.expect(secretKey.getAlgorithm()).andReturn(INTER_WORKER_KEY_GENERATION_ALGORITHM_DEFAULT);
EasyMock.expect(secretKey.getEncoded()).andReturn(new byte[32]);
SessionKey sessionKey = new SessionKey(secretKey, time.milliseconds());
ClusterConfigState snapshotWithSessionKey = new ClusterConfigState(1, sessionKey, Collections.singletonMap(CONN1, 3),
Collections.singletonMap(CONN1, CONN1_CONFIG), Collections.singletonMap(CONN1, TargetState.STARTED),
TASK_CONFIGS_MAP, Collections.emptySet());
// First tick -- after joining the group, we try to write a new session key to
// the config topic, and fail (in this case, we're trying to simulate that we've
// actually written the key successfully, but haven't been able to read it back
// from the config topic, so to the herder it looks the same as if it'd just failed
// to write the key)
EasyMock.expect(member.memberId()).andStubReturn("leader");
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V2);
expectRebalance(1, Collections.emptyList(), Collections.emptyList());
expectPostRebalanceCatchup(SNAPSHOT);
configBackingStore.putSessionKey(anyObject(SessionKey.class));
EasyMock.expectLastCall().andThrow(new ConnectException("Oh no!"));
// Second tick -- we read to the end of the config topic first, and pick up
// the session key that we were able to write the last time,
// then ensure we're still active in the group
// then finally begin polling for group activity
// Importantly, we do not try to write a new session key this time around
configBackingStore.refresh(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class));
EasyMock.expectLastCall().andAnswer(() -> {
configUpdateListener.onSessionKeyUpdate(sessionKey);
return null;
});
EasyMock.expect(configBackingStore.snapshot()).andReturn(snapshotWithSessionKey);
member.ensureActive();
PowerMock.expectLastCall();
member.poll(EasyMock.anyInt());
PowerMock.expectLastCall();
PowerMock.replayAll(secretKey);
herder.tick();
herder.tick();
PowerMock.verifyAll();
}
@Test
public void testKeyExceptionDetection() {
assertFalse(herder.isPossibleExpiredKeyException(