KAFKA-8248; Ensure time updated before sending transactional request (#6613)

This patch fixes a bug in the sending of transactional requests. We need to call `KafkaClient.send` with an updated current time. Failing to do so can result in an `IllegalStateExcepton` which leaves the producer effectively dead since the in-flight correlation id has been set, but no request has been sent. To avoid the same problem in the future, we update the in flight correlationId only after sending the request.

Reviewers: Matthias J. Sax <matthias@confluent.io>, Apurva Mehta <apurva@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
This commit is contained in:
Jason Gustafson 2019-05-02 09:29:22 -07:00 committed by GitHub
parent 093a22536f
commit c34330c548
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 571 additions and 484 deletions

View File

@ -89,8 +89,8 @@ final class ClusterConnectionStates {
public long connectionDelay(String id, long now) {
NodeConnectionState state = nodeState.get(id);
if (state == null) return 0;
long timeWaited = now - state.lastConnectAttemptMs;
if (state.state.isDisconnected()) {
long timeWaited = now - state.lastConnectAttemptMs;
return Math.max(state.reconnectBackoffMs - timeWaited, 0);
} else {
// When connecting or connected, we should be able to delay indefinitely since other events (connection or

View File

@ -237,7 +237,7 @@ public class Sender implements Runnable {
// main loop, runs until close is called
while (running) {
try {
run(time.milliseconds());
runOnce();
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
@ -250,7 +250,7 @@ public class Sender implements Runnable {
// wait until these are completed.
while (!forceClose && ((this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0) || hasPendingTransactionalRequests())) {
try {
run(time.milliseconds());
runOnce();
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
@ -263,7 +263,7 @@ public class Sender implements Runnable {
transactionManager.beginAbort();
}
try {
run(time.milliseconds());
runOnce();
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
@ -291,14 +291,14 @@ public class Sender implements Runnable {
/**
* Run a single iteration of sending
*
* @param now The current POSIX time in milliseconds
*/
void run(long now) {
void runOnce() {
if (transactionManager != null) {
try {
if (transactionManager.shouldResetProducerStateAfterResolvingSequences())
// Check if the previous run expired batches which requires a reset of the producer state.
transactionManager.resetProducerId();
if (!transactionManager.isTransactional()) {
// this is an idempotent producer, so make sure we have a producer id
maybeWaitForProducerId();
@ -306,9 +306,9 @@ public class Sender implements Runnable {
transactionManager.transitionToFatalError(
new KafkaException("The client hasn't received acknowledgment for " +
"some previously sent messages and can no longer retry them. It isn't safe to continue."));
} else if (transactionManager.hasInFlightTransactionalRequest() || maybeSendTransactionalRequest(now)) {
} else if (transactionManager.hasInFlightTransactionalRequest() || maybeSendTransactionalRequest()) {
// as long as there are outstanding transactional requests, we simply wait for them to return
client.poll(retryBackoffMs, now);
client.poll(retryBackoffMs, time.milliseconds());
return;
}
@ -318,7 +318,7 @@ public class Sender implements Runnable {
RuntimeException lastError = transactionManager.lastError();
if (lastError != null)
maybeAbortBatches(lastError);
client.poll(retryBackoffMs, now);
client.poll(retryBackoffMs, time.milliseconds());
return;
} else if (transactionManager.hasAbortableError()) {
accumulator.abortUndrainedBatches(transactionManager.lastError());
@ -330,8 +330,9 @@ public class Sender implements Runnable {
}
}
long pollTimeout = sendProducerData(now);
client.poll(pollTimeout, now);
long currentTimeMs = time.milliseconds();
long pollTimeout = sendProducerData(currentTimeMs);
client.poll(pollTimeout, currentTimeMs);
}
private long sendProducerData(long now) {
@ -415,7 +416,7 @@ public class Sender implements Runnable {
return pollTimeout;
}
private boolean maybeSendTransactionalRequest(long now) {
private boolean maybeSendTransactionalRequest() {
if (transactionManager.isCompleting() && accumulator.hasIncomplete()) {
if (transactionManager.isAborting())
accumulator.abortUndrainedBatches(new KafkaException("Failing batch since transaction was aborted"));
@ -452,11 +453,12 @@ public class Sender implements Runnable {
if (targetNode != null) {
if (nextRequestHandler.isRetry())
time.sleep(nextRequestHandler.retryBackoffMs());
long currentTimeMs = time.milliseconds();
ClientRequest clientRequest = client.newClientRequest(
targetNode.idString(), requestBuilder, now, true, requestTimeoutMs, nextRequestHandler);
transactionManager.setInFlightTransactionalRequestCorrelationId(clientRequest.correlationId());
targetNode.idString(), requestBuilder, currentTimeMs, true, requestTimeoutMs, nextRequestHandler);
log.debug("Sending transactional request {} to node {}", requestBuilder, targetNode);
client.send(clientRequest, now);
client.send(clientRequest, currentTimeMs);
transactionManager.setInFlightCorrelationId(clientRequest.correlationId());
return true;
}
} catch (IOException e) {

View File

@ -709,7 +709,7 @@ public class TransactionManager {
lookupCoordinator(request.coordinatorType(), request.coordinatorKey());
}
void setInFlightTransactionalRequestCorrelationId(int correlationId) {
void setInFlightCorrelationId(int correlationId) {
inFlightRequestCorrelationId = correlationId;
}

View File

@ -75,15 +75,7 @@ public class MockClient implements KafkaClient {
private int correlation;
private final Time time;
private final MockMetadataUpdater metadataUpdater;
private final Set<String> ready = new HashSet<>();
// Nodes awaiting reconnect backoff, will not be chosen by leastLoadedNode
private final TransientSet<Node> blackedOut;
// Nodes which will always fail to connect, but can be chosen by leastLoadedNode
private final TransientSet<Node> unreachable;
// Nodes which have a delay before ultimately succeeding to connect
private final TransientSet<Node> delayedReady;
private final Map<String, ConnectionState> connections = new HashMap<>();
private final Map<Node, Long> pendingAuthenticationErrors = new HashMap<>();
private final Map<Node, AuthenticationException> authenticationErrors = new HashMap<>();
// Use concurrent queue for requests so that requests may be queried from a different thread
@ -103,36 +95,30 @@ public class MockClient implements KafkaClient {
public MockClient(Time time, MockMetadataUpdater metadataUpdater) {
this.time = time;
this.metadataUpdater = metadataUpdater;
this.blackedOut = new TransientSet<>(time);
this.unreachable = new TransientSet<>(time);
this.delayedReady = new TransientSet<>(time);
}
private ConnectionState connectionState(String idString) {
ConnectionState connectionState = connections.get(idString);
if (connectionState == null) {
connectionState = new ConnectionState();
connections.put(idString, connectionState);
}
return connectionState;
}
@Override
public boolean isReady(Node node, long now) {
return ready.contains(node.idString());
return connectionState(node.idString()).isReady(now);
}
@Override
public boolean ready(Node node, long now) {
if (blackedOut.contains(node, now))
return false;
if (unreachable.contains(node, now)) {
blackout(node, 100);
return false;
}
if (delayedReady.contains(node, now))
return false;
ready.add(node.idString());
return true;
return connectionState(node.idString()).ready(now);
}
@Override
public long connectionDelay(Node node, long now) {
return blackedOut.expirationDelayMs(node, now);
return connectionState(node.idString()).connectionDelay(now);
}
@Override
@ -141,16 +127,20 @@ public class MockClient implements KafkaClient {
}
public void blackout(Node node, long durationMs) {
blackedOut.add(node, durationMs);
connectionState(node.idString()).backoff(time.milliseconds() + durationMs);
}
public void setUnreachable(Node node, long durationMs) {
disconnect(node.idString());
unreachable.add(node, durationMs);
connectionState(node.idString()).setUnreachable(time.milliseconds() + durationMs);
}
public void throttle(Node node, long durationMs) {
connectionState(node.idString()).throttle(time.milliseconds() + durationMs);
}
public void delayReady(Node node, long durationMs) {
delayedReady.add(node, durationMs);
connectionState(node.idString()).setReadyDelayed(time.milliseconds() + durationMs);
}
public void authenticationFailed(Node node, long blackoutMs) {
@ -166,7 +156,7 @@ public class MockClient implements KafkaClient {
@Override
public boolean connectionFailed(Node node) {
return blackedOut.contains(node);
return connectionState(node.idString()).isBackingOff(time.milliseconds());
}
@Override
@ -187,11 +177,14 @@ public class MockClient implements KafkaClient {
iter.remove();
}
}
ready.remove(node);
connectionState(node).disconnect();
}
@Override
public void send(ClientRequest request, long now) {
if (!connectionState(request.destination()).isReady(now))
throw new IllegalStateException("Cannot send " + request + " since the destination is not ready");
// Check if the request is directed to a node with a pending authentication error.
for (Iterator<Map.Entry<Node, Long>> authErrorIter =
pendingAuthenticationErrors.entrySet().iterator(); authErrorIter.hasNext(); ) {
@ -437,9 +430,7 @@ public class MockClient implements KafkaClient {
}
public void reset() {
ready.clear();
blackedOut.clear();
unreachable.clear();
connections.clear();
requests.clear();
responses.clear();
futureResponses.clear();
@ -499,7 +490,7 @@ public class MockClient implements KafkaClient {
@Override
public boolean hasReadyNodes(long now) {
return !ready.isEmpty();
return connections.values().stream().anyMatch(cxn -> cxn.isReady(now));
}
@Override
@ -537,14 +528,14 @@ public class MockClient implements KafkaClient {
@Override
public void close(String node) {
ready.remove(node);
connections.remove(node);
}
@Override
public Node leastLoadedNode(long now) {
// Consistent with NetworkClient, we do not return nodes awaiting reconnect backoff
for (Node node : metadataUpdater.fetchNodes()) {
if (!blackedOut.contains(node, now))
if (!connectionState(node.idString()).isBackingOff(now))
return node;
}
return null;
@ -580,45 +571,6 @@ public class MockClient implements KafkaClient {
}
}
private static class TransientSet<T> {
// The elements in the set mapped to their expiration timestamps
private final Map<T, Long> elements = new HashMap<>();
private final Time time;
private TransientSet(Time time) {
this.time = time;
}
boolean contains(T element) {
return contains(element, time.milliseconds());
}
boolean contains(T element, long now) {
return expirationDelayMs(element, now) > 0;
}
void add(T element, long durationMs) {
elements.put(element, time.milliseconds() + durationMs);
}
long expirationDelayMs(T element, long now) {
Long expirationTimeMs = elements.get(element);
if (expirationTimeMs == null) {
return 0;
} else if (now > expirationTimeMs) {
elements.remove(element);
return 0;
} else {
return expirationTimeMs - now;
}
}
void clear() {
elements.clear();
}
}
/**
* This is a dumbed down version of {@link MetadataUpdater} which is used to facilitate
* metadata tracking primarily in order to serve {@link KafkaClient#leastLoadedNode(long)}
@ -692,4 +644,92 @@ public class MockClient implements KafkaClient {
}
}
private static class ConnectionState {
enum State { CONNECTING, CONNECTED, DISCONNECTED }
private long throttledUntilMs = 0L;
private long readyDelayedUntilMs = 0L;
private long backingOffUntilMs = 0L;
private long unreachableUntilMs = 0L;
private State state = State.DISCONNECTED;
void backoff(long untilMs) {
backingOffUntilMs = untilMs;
}
void throttle(long untilMs) {
throttledUntilMs = untilMs;
}
void setUnreachable(long untilMs) {
unreachableUntilMs = untilMs;
}
void setReadyDelayed(long untilMs) {
readyDelayedUntilMs = untilMs;
}
boolean isReady(long now) {
return state == State.CONNECTED && notThrottled(now);
}
boolean isReadyDelayed(long now) {
return now < readyDelayedUntilMs;
}
boolean notThrottled(long now) {
return now > throttledUntilMs;
}
boolean isBackingOff(long now) {
return now < backingOffUntilMs;
}
boolean isUnreachable(long now) {
return now < unreachableUntilMs;
}
void disconnect() {
state = State.DISCONNECTED;
}
long connectionDelay(long now) {
if (state != State.DISCONNECTED)
return Long.MAX_VALUE;
if (backingOffUntilMs > now)
return backingOffUntilMs - now;
return 0;
}
boolean ready(long now) {
switch (state) {
case CONNECTED:
return notThrottled(now);
case CONNECTING:
if (isReadyDelayed(now))
return false;
state = State.CONNECTED;
return ready(now);
case DISCONNECTED:
if (isBackingOff(now)) {
return false;
} else if (isUnreachable(now)) {
backingOffUntilMs = now + 100;
return false;
}
state = State.CONNECTING;
return ready(now);
default:
throw new IllegalArgumentException("Invalid state: " + state);
}
}
}
}

View File

@ -32,10 +32,12 @@ import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.message.InitProducerIdResponseData;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.network.Selectable;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.InitProducerIdResponse;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Serializer;
@ -643,6 +645,42 @@ public class KafkaProducerTest {
}
}
@Test
public void testInitTransactionWhileThrottled() {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "some.id");
configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10000);
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
Time time = new MockTime(1);
MetadataResponse initialUpdateResponse = TestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
ProducerMetadata metadata = newMetadata(0, Long.MAX_VALUE);
MockClient client = new MockClient(time, metadata);
client.updateMetadata(initialUpdateResponse);
Node node = metadata.fetch().nodes().get(0);
client.throttle(node, 5000);
client.prepareResponse(new FindCoordinatorResponse(Errors.NONE, host1));
client.prepareResponse(initProducerIdResponse(1L, (short) 5, Errors.NONE));
try (Producer<String, String> producer = new KafkaProducer<>(configs, new StringSerializer(),
new StringSerializer(), metadata, client, null, time)) {
producer.initTransactions();
}
}
private InitProducerIdResponse initProducerIdResponse(long producerId, short producerEpoch, Errors error) {
InitProducerIdResponseData responseData = new InitProducerIdResponseData()
.setErrorCode(error.code())
.setProducerEpoch(producerEpoch)
.setProducerId(producerId)
.setThrottleTimeMs(0);
return new InitProducerIdResponse(responseData);
}
@Test
public void testOnlyCanExecuteCloseAfterInitTransactionsTimeout() {
Map<String, Object> configs = new HashMap<>();

View File

@ -16,13 +16,14 @@
*/
package kafka.server.epoch.util
import java.net.SocketTimeoutException
import java.util
import java.util.Collections
import kafka.cluster.BrokerEndPoint
import kafka.server.BlockingSend
import org.apache.kafka.clients.MockClient.MockMetadataUpdater
import org.apache.kafka.clients.{ClientRequest, ClientResponse, MockClient}
import org.apache.kafka.clients.{ClientRequest, ClientResponse, MockClient, NetworkClientUtils}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.Records
import org.apache.kafka.common.requests.AbstractRequest.Builder
@ -38,7 +39,11 @@ import org.apache.kafka.common.{Node, TopicPartition}
* OFFSET_FOR_LEADER_EPOCH with different offsets in response, it should update offsets using
* setOffsetsForNextResponse
*/
class ReplicaFetcherMockBlockingSend(offsets: java.util.Map[TopicPartition, EpochEndOffset], destination: BrokerEndPoint, time: Time) extends BlockingSend {
class ReplicaFetcherMockBlockingSend(offsets: java.util.Map[TopicPartition, EpochEndOffset],
sourceBroker: BrokerEndPoint,
time: Time)
extends BlockingSend {
private val client = new MockClient(new SystemTime, new MockMetadataUpdater {
override def fetchNodes(): util.List[Node] = Collections.emptyList()
override def isUpdateNeeded: Boolean = false
@ -49,6 +54,7 @@ class ReplicaFetcherMockBlockingSend(offsets: java.util.Map[TopicPartition, Epoc
var lastUsedOffsetForLeaderEpochVersion = -1
var callback: Option[() => Unit] = None
var currentOffsets: java.util.Map[TopicPartition, EpochEndOffset] = offsets
private val sourceNode = new Node(sourceBroker.id, sourceBroker.host, sourceBroker.port)
def setEpochRequestCallback(postEpochFunction: () => Unit){
callback = Some(postEpochFunction)
@ -59,6 +65,8 @@ class ReplicaFetcherMockBlockingSend(offsets: java.util.Map[TopicPartition, Epoc
}
override def sendRequest(requestBuilder: Builder[_ <: AbstractRequest]): ClientResponse = {
if (!NetworkClientUtils.awaitReady(client, sourceNode, time, 500))
throw new SocketTimeoutException(s"Failed to connect within 500 ms")
//Send the request to the mock client
val clientRequest = request(requestBuilder)
@ -82,13 +90,13 @@ class ReplicaFetcherMockBlockingSend(offsets: java.util.Map[TopicPartition, Epoc
}
//Use mock client to create the appropriate response object
client.respondFrom(response, new Node(destination.id, destination.host, destination.port))
client.respondFrom(response, sourceNode)
client.poll(30, time.milliseconds()).iterator().next()
}
private def request(requestBuilder: Builder[_ <: AbstractRequest]): ClientRequest = {
client.newClientRequest(
destination.id.toString,
sourceBroker.id.toString,
requestBuilder,
time.milliseconds(),
true)