mirror of https://github.com/apache/kafka.git
KAFKA-10533; KafkaRaftClient should flush log after appends (#9352)
This patch adds missing flush logic to `KafkaRaftClient`. The initial flushing behavior is simplistic. We guarantee that the leader will not replicate above the last flushed offset and we guarantee that the follower will not fetch data above its own flush point. More sophisticated flush behavior is proposed in KAFKA-10526. We have also extended the simulation test so that it covers flush behavior. When a node is shutdown, all unflushed data is lost. We were able to confirm that the monotonic high watermark invariant fails without the added `flush` calls. This patch also piggybacks a fix to the `TestRaftServer` implementation. The initial check-in contained a bug which caused `RequestChannel` to fail sending responses because the disabled APIs did not have metrics registered. As a result of this, it is impossible to elect leaders. Reviewers: Guozhang Wang <wangguoz@gmail.com>
This commit is contained in:
parent
1457cc6525
commit
a72f0c1eac
|
@ -60,11 +60,11 @@ object RequestChannel extends Logging {
|
|||
val sanitizedUser: String = Sanitizer.sanitize(principal.getName)
|
||||
}
|
||||
|
||||
class Metrics {
|
||||
class Metrics(allowDisabledApis: Boolean = false) {
|
||||
|
||||
private val metricsMap = mutable.Map[String, RequestMetrics]()
|
||||
|
||||
(ApiKeys.enabledApis.asScala.toSeq.map(_.name) ++
|
||||
(ApiKeys.values.toSeq.filter(_.isEnabled || allowDisabledApis).map(_.name) ++
|
||||
Seq(RequestMetrics.consumerFetchMetricName, RequestMetrics.followFetchMetricName)).foreach { name =>
|
||||
metricsMap.put(name, new RequestMetrics(name))
|
||||
}
|
||||
|
@ -311,10 +311,11 @@ object RequestChannel extends Logging {
|
|||
}
|
||||
|
||||
class RequestChannel(val queueSize: Int,
|
||||
val metricNamePrefix : String,
|
||||
time: Time) extends KafkaMetricsGroup {
|
||||
val metricNamePrefix: String,
|
||||
time: Time,
|
||||
allowDisabledApis: Boolean = false) extends KafkaMetricsGroup {
|
||||
import RequestChannel._
|
||||
val metrics = new RequestChannel.Metrics
|
||||
val metrics = new RequestChannel.Metrics(allowDisabledApis)
|
||||
private val requestQueue = new ArrayBlockingQueue[BaseRequest](queueSize)
|
||||
private val processors = new ConcurrentHashMap[Int, Processor]()
|
||||
val requestQueueSizeMetricName = metricNamePrefix.concat(RequestQueueSizeMetric)
|
||||
|
|
|
@ -78,7 +78,8 @@ import scala.util.control.ControlThrowable
|
|||
class SocketServer(val config: KafkaConfig,
|
||||
val metrics: Metrics,
|
||||
val time: Time,
|
||||
val credentialProvider: CredentialProvider)
|
||||
val credentialProvider: CredentialProvider,
|
||||
val allowDisabledApis: Boolean = false)
|
||||
extends Logging with KafkaMetricsGroup with BrokerReconfigurable {
|
||||
|
||||
private val maxQueuedRequests = config.queuedMaxRequests
|
||||
|
@ -94,12 +95,12 @@ class SocketServer(val config: KafkaConfig,
|
|||
// data-plane
|
||||
private val dataPlaneProcessors = new ConcurrentHashMap[Int, Processor]()
|
||||
private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint, Acceptor]()
|
||||
val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, DataPlaneMetricPrefix, time)
|
||||
val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, DataPlaneMetricPrefix, time, allowDisabledApis)
|
||||
// control-plane
|
||||
private var controlPlaneProcessorOpt : Option[Processor] = None
|
||||
private[network] var controlPlaneAcceptorOpt : Option[Acceptor] = None
|
||||
val controlPlaneRequestChannelOpt: Option[RequestChannel] = config.controlPlaneListenerName.map(_ =>
|
||||
new RequestChannel(20, ControlPlaneMetricPrefix, time))
|
||||
new RequestChannel(20, ControlPlaneMetricPrefix, time, allowDisabledApis))
|
||||
|
||||
private var nextProcessorId = 0
|
||||
private var connectionQuotas: ConnectionQuotas = _
|
||||
|
@ -430,7 +431,8 @@ class SocketServer(val config: KafkaConfig,
|
|||
credentialProvider,
|
||||
memoryPool,
|
||||
logContext,
|
||||
isPrivilegedListener = isPrivilegedListener
|
||||
isPrivilegedListener = isPrivilegedListener,
|
||||
allowDisabledApis = allowDisabledApis
|
||||
)
|
||||
}
|
||||
|
||||
|
@ -752,7 +754,8 @@ private[kafka] class Processor(val id: Int,
|
|||
memoryPool: MemoryPool,
|
||||
logContext: LogContext,
|
||||
connectionQueueSize: Int = ConnectionQueueSize,
|
||||
isPrivilegedListener: Boolean = false) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
|
||||
isPrivilegedListener: Boolean = false,
|
||||
allowDisabledApis: Boolean = false) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
|
||||
|
||||
private object ConnectionId {
|
||||
def fromString(s: String): Option[ConnectionId] = s.split("-") match {
|
||||
|
@ -943,10 +946,11 @@ private[kafka] class Processor(val id: Int,
|
|||
|
||||
protected def parseRequestHeader(buffer: ByteBuffer): RequestHeader = {
|
||||
val header = RequestHeader.parse(buffer)
|
||||
if (!header.apiKey.isEnabled) {
|
||||
if (header.apiKey.isEnabled || allowDisabledApis) {
|
||||
header
|
||||
} else {
|
||||
throw new InvalidRequestException("Received request for disabled api key " + header.apiKey)
|
||||
}
|
||||
header
|
||||
}
|
||||
|
||||
private def processCompletedReceives(): Unit = {
|
||||
|
|
|
@ -27,9 +27,11 @@ import org.apache.kafka.raft.{LogAppendInfo, LogFetchInfo, LogOffsetMetadata, Is
|
|||
|
||||
import scala.compat.java8.OptionConverters._
|
||||
|
||||
class KafkaMetadataLog(log: Log,
|
||||
topicPartition: TopicPartition,
|
||||
maxFetchSizeInBytes: Int = 1024 * 1024) extends ReplicatedLog {
|
||||
class KafkaMetadataLog(
|
||||
log: Log,
|
||||
topicPartition: TopicPartition,
|
||||
maxFetchSizeInBytes: Int = 1024 * 1024
|
||||
) extends ReplicatedLog {
|
||||
|
||||
override def read(startOffset: Long, readIsolation: Isolation): LogFetchInfo = {
|
||||
val isolation = readIsolation match {
|
||||
|
@ -124,6 +126,14 @@ class KafkaMetadataLog(log: Log,
|
|||
}
|
||||
}
|
||||
|
||||
override def flush(): Unit = {
|
||||
log.flush()
|
||||
}
|
||||
|
||||
override def lastFlushedOffset(): Long = {
|
||||
log.recoveryPoint
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the topic partition associated with the log.
|
||||
*/
|
||||
|
|
|
@ -18,14 +18,13 @@
|
|||
package kafka.tools
|
||||
|
||||
import java.io.File
|
||||
import java.nio.ByteBuffer
|
||||
import java.nio.file.Files
|
||||
import java.util.concurrent.CountDownLatch
|
||||
import java.util.{Properties, Random}
|
||||
|
||||
import joptsimple.OptionParser
|
||||
import kafka.log.{Log, LogConfig, LogManager}
|
||||
import kafka.network.{ConnectionQuotas, Processor, RequestChannel, SocketServer}
|
||||
import kafka.network.SocketServer
|
||||
import kafka.raft.{KafkaFuturePurgatory, KafkaMetadataLog, KafkaNetworkChannel}
|
||||
import kafka.security.CredentialProvider
|
||||
import kafka.server.{BrokerTopicStats, KafkaConfig, KafkaRequestHandlerPool, KafkaServer, LogDirFailureChannel}
|
||||
|
@ -34,12 +33,9 @@ import kafka.utils.{CommandLineUtils, CoreUtils, Exit, KafkaScheduler, Logging,
|
|||
import org.apache.kafka.clients.{ApiVersions, ClientDnsLookup, ManualMetadataUpdater, NetworkClient}
|
||||
import org.apache.kafka.common.TopicPartition
|
||||
import org.apache.kafka.common.config.ConfigException
|
||||
import org.apache.kafka.common.memory.MemoryPool
|
||||
import org.apache.kafka.common.metrics.Metrics
|
||||
import org.apache.kafka.common.network.{ChannelBuilders, ListenerName, NetworkReceive, Selectable, Selector}
|
||||
import org.apache.kafka.common.requests.RequestHeader
|
||||
import org.apache.kafka.common.network.{ChannelBuilders, NetworkReceive, Selectable, Selector}
|
||||
import org.apache.kafka.common.security.JaasContext
|
||||
import org.apache.kafka.common.security.auth.SecurityProtocol
|
||||
import org.apache.kafka.common.security.scram.internals.ScramMechanism
|
||||
import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
|
||||
import org.apache.kafka.common.utils.{LogContext, Time, Utils}
|
||||
|
@ -80,7 +76,7 @@ class TestRaftServer(val config: KafkaConfig) extends Logging {
|
|||
tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
|
||||
credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache)
|
||||
|
||||
socketServer = new RaftSocketServer(config, metrics, time, credentialProvider, logContext)
|
||||
socketServer = new SocketServer(config, metrics, time, credentialProvider, allowDisabledApis = true)
|
||||
socketServer.startup(startProcessingRequests = false)
|
||||
|
||||
val logDirName = Log.logDirName(partition)
|
||||
|
@ -303,50 +299,6 @@ class TestRaftServer(val config: KafkaConfig) extends Logging {
|
|||
|
||||
}
|
||||
|
||||
class RaftSocketServer(
|
||||
config: KafkaConfig,
|
||||
metrics: Metrics,
|
||||
time: Time,
|
||||
credentialProvider: CredentialProvider,
|
||||
logContext: LogContext
|
||||
) extends SocketServer(config, metrics, time, credentialProvider) {
|
||||
override def newProcessor(
|
||||
id: Int,
|
||||
requestChannel: RequestChannel,
|
||||
connectionQuotas: ConnectionQuotas,
|
||||
listenerName: ListenerName,
|
||||
securityProtocol: SecurityProtocol,
|
||||
memoryPool: MemoryPool,
|
||||
isPrivilegedListener: Boolean
|
||||
): Processor = {
|
||||
new Processor(id,
|
||||
time,
|
||||
config.socketRequestMaxBytes,
|
||||
requestChannel,
|
||||
connectionQuotas,
|
||||
config.connectionsMaxIdleMs,
|
||||
config.failedAuthenticationDelayMs,
|
||||
listenerName,
|
||||
securityProtocol,
|
||||
config,
|
||||
metrics,
|
||||
credentialProvider,
|
||||
memoryPool,
|
||||
logContext,
|
||||
isPrivilegedListener = isPrivilegedListener
|
||||
) {
|
||||
// We extend this API to skip the check for only enabled APIs. This
|
||||
// gets us access to Vote, BeginQuorumEpoch, etc. which are not usable
|
||||
// from the Kafka broker yet.
|
||||
override def parseRequestHeader(buffer: ByteBuffer): RequestHeader = {
|
||||
RequestHeader.parse(buffer)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
object TestRaftServer extends Logging {
|
||||
import kafka.utils.Implicits._
|
||||
|
||||
|
|
|
@ -91,19 +91,25 @@ public class FollowerState implements EpochState {
|
|||
fetchTimer.reset(timeoutMs);
|
||||
}
|
||||
|
||||
public void updateHighWatermark(OptionalLong highWatermark) {
|
||||
public boolean updateHighWatermark(OptionalLong highWatermark) {
|
||||
if (!highWatermark.isPresent() && this.highWatermark.isPresent())
|
||||
throw new IllegalArgumentException("Attempt to overwrite current high watermark " + this.highWatermark +
|
||||
" with unknown value");
|
||||
this.highWatermark.ifPresent(previousHighWatermark -> {
|
||||
|
||||
if (this.highWatermark.isPresent()) {
|
||||
long previousHighWatermark = this.highWatermark.getAsLong();
|
||||
long updatedHighWatermark = highWatermark.getAsLong();
|
||||
|
||||
if (updatedHighWatermark < 0)
|
||||
throw new IllegalArgumentException("Illegal negative high watermark update");
|
||||
if (previousHighWatermark > highWatermark.getAsLong())
|
||||
if (previousHighWatermark > updatedHighWatermark)
|
||||
throw new IllegalArgumentException("Non-monotonic update of high watermark attempted");
|
||||
});
|
||||
if (previousHighWatermark == updatedHighWatermark)
|
||||
return false;
|
||||
}
|
||||
|
||||
this.highWatermark = highWatermark;
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -193,8 +193,9 @@ public class KafkaRaftClient implements RaftClient {
|
|||
) {
|
||||
highWatermarkOpt.ifPresent(highWatermark -> {
|
||||
long newHighWatermark = Math.min(endOffset().offset, highWatermark);
|
||||
state.updateHighWatermark(OptionalLong.of(newHighWatermark));
|
||||
updateHighWatermark(state, currentTimeMs);
|
||||
if (state.updateHighWatermark(OptionalLong.of(newHighWatermark))) {
|
||||
updateHighWatermark(state, currentTimeMs);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -203,25 +204,15 @@ public class KafkaRaftClient implements RaftClient {
|
|||
long currentTimeMs
|
||||
) {
|
||||
final LogOffsetMetadata endOffsetMetadata = log.endOffset();
|
||||
|
||||
if (state.updateLocalState(currentTimeMs, endOffsetMetadata)) {
|
||||
updateHighWatermark(state, currentTimeMs);
|
||||
}
|
||||
|
||||
LogOffset endOffset = new LogOffset(endOffset().offset, Isolation.UNCOMMITTED);
|
||||
LogOffset endOffset = new LogOffset(endOffsetMetadata.offset, Isolation.UNCOMMITTED);
|
||||
fetchPurgatory.maybeComplete(endOffset, currentTimeMs);
|
||||
}
|
||||
|
||||
private void updateReplicaEndOffsetAndTimestamp(
|
||||
LeaderState state,
|
||||
int replicaId,
|
||||
LogOffsetMetadata endOffsetMetadata,
|
||||
long currentTimeMs
|
||||
) {
|
||||
if (state.updateReplicaState(replicaId, currentTimeMs, endOffsetMetadata)) {
|
||||
updateHighWatermark(state, currentTimeMs);
|
||||
}
|
||||
}
|
||||
|
||||
private void updateHighWatermark(
|
||||
EpochState state,
|
||||
long currentTimeMs
|
||||
|
@ -299,7 +290,13 @@ public class KafkaRaftClient implements RaftClient {
|
|||
MemoryRecords records = MemoryRecords.withLeaderChangeMessage(
|
||||
currentTimeMs, quorum.epoch(), leaderChangeMessage);
|
||||
|
||||
appendAsLeader(state, records, currentTimeMs);
|
||||
appendAsLeader(records);
|
||||
flushLeaderLog(state, currentTimeMs);
|
||||
}
|
||||
|
||||
private void flushLeaderLog(LeaderState state, long currentTimeMs) {
|
||||
log.flush();
|
||||
updateLeaderEndOffsetAndTimestamp(state, currentTimeMs);
|
||||
}
|
||||
|
||||
private boolean maybeTransitionToLeader(CandidateState state, long currentTimeMs) throws IOException {
|
||||
|
@ -898,7 +895,11 @@ public class KafkaRaftClient implements RaftClient {
|
|||
return buildFetchResponse(Errors.NONE, MemoryRecords.EMPTY, divergingEpoch, state.highWatermark());
|
||||
} else {
|
||||
LogFetchInfo info = log.read(fetchOffset, Isolation.UNCOMMITTED);
|
||||
updateReplicaEndOffsetAndTimestamp(state, replicaId, info.startOffsetMetadata, currentTimeMs);
|
||||
|
||||
if (state.updateReplicaState(replicaId, currentTimeMs, info.startOffsetMetadata)) {
|
||||
updateHighWatermark(state, currentTimeMs);
|
||||
}
|
||||
|
||||
return buildFetchResponse(Errors.NONE, info.records, Optional.empty(), state.highWatermark());
|
||||
}
|
||||
}
|
||||
|
@ -984,11 +985,7 @@ public class KafkaRaftClient implements RaftClient {
|
|||
} else {
|
||||
Records records = (Records) partitionResponse.recordSet();
|
||||
if (records.sizeInBytes() > 0) {
|
||||
LogAppendInfo info = log.appendAsFollower(records);
|
||||
OffsetAndEpoch endOffset = endOffset();
|
||||
kafkaRaftMetrics.updateFetchedRecords(info.lastOffset - info.firstOffset + 1);
|
||||
kafkaRaftMetrics.updateLogEnd(endOffset);
|
||||
logger.trace("Follower end offset updated to {} after append", endOffset);
|
||||
appendAsFollower(records);
|
||||
}
|
||||
OptionalLong highWatermark = partitionResponse.highWatermark() < 0 ?
|
||||
OptionalLong.empty() : OptionalLong.of(partitionResponse.highWatermark());
|
||||
|
@ -1002,14 +999,23 @@ public class KafkaRaftClient implements RaftClient {
|
|||
}
|
||||
}
|
||||
|
||||
private void appendAsFollower(
|
||||
Records records
|
||||
) {
|
||||
LogAppendInfo info = log.appendAsFollower(records);
|
||||
log.flush();
|
||||
|
||||
OffsetAndEpoch endOffset = endOffset();
|
||||
kafkaRaftMetrics.updateFetchedRecords(info.lastOffset - info.firstOffset + 1);
|
||||
kafkaRaftMetrics.updateLogEnd(endOffset);
|
||||
logger.trace("Follower end offset updated to {} after append", endOffset);
|
||||
}
|
||||
|
||||
private LogAppendInfo appendAsLeader(
|
||||
LeaderState state,
|
||||
Records records,
|
||||
long currentTimeMs
|
||||
Records records
|
||||
) {
|
||||
LogAppendInfo info = log.appendAsLeader(records, quorum.epoch());
|
||||
OffsetAndEpoch endOffset = endOffset();
|
||||
updateLeaderEndOffsetAndTimestamp(state, currentTimeMs);
|
||||
kafkaRaftMetrics.updateAppendRecords(info.lastOffset - info.firstOffset + 1);
|
||||
kafkaRaftMetrics.updateLogEnd(endOffset);
|
||||
logger.trace("Leader appended records at base offset {}, new end offset is {}", info.firstOffset, endOffset);
|
||||
|
@ -1439,8 +1445,7 @@ public class KafkaRaftClient implements RaftClient {
|
|||
|
||||
private long pollLeader(long currentTimeMs) {
|
||||
LeaderState state = quorum.leaderStateOrThrow();
|
||||
|
||||
pollPendingAppends(currentTimeMs);
|
||||
pollPendingAppends(state, currentTimeMs);
|
||||
|
||||
return maybeSendRequests(
|
||||
currentTimeMs,
|
||||
|
@ -1587,7 +1592,6 @@ public class KafkaRaftClient implements RaftClient {
|
|||
} else {
|
||||
long currentTimeMs = time.milliseconds();
|
||||
long pollTimeoutMs = pollCurrentState(currentTimeMs);
|
||||
|
||||
kafkaRaftMetrics.updatePollStart(currentTimeMs);
|
||||
List<RaftMessage> inboundMessages = channel.receive(pollTimeoutMs);
|
||||
|
||||
|
@ -1608,7 +1612,7 @@ public class KafkaRaftClient implements RaftClient {
|
|||
unwrittenAppends.clear();
|
||||
}
|
||||
|
||||
private void pollPendingAppends(long currentTimeMs) {
|
||||
private void pollPendingAppends(LeaderState state, long currentTimeMs) {
|
||||
int numAppends = 0;
|
||||
int maxNumAppends = unwrittenAppends.size();
|
||||
|
||||
|
@ -1622,10 +1626,11 @@ public class KafkaRaftClient implements RaftClient {
|
|||
unwrittenAppend.fail(new TimeoutException("Request timeout " + unwrittenAppend.requestTimeoutMs
|
||||
+ " expired before the records could be appended to the log"));
|
||||
} else {
|
||||
LeaderState leaderState = quorum.leaderStateOrThrow();
|
||||
int epoch = quorum.epoch();
|
||||
LogAppendInfo info = appendAsLeader(leaderState, unwrittenAppend.records, currentTimeMs);
|
||||
LogAppendInfo info = appendAsLeader(unwrittenAppend.records);
|
||||
OffsetAndEpoch offsetAndEpoch = new OffsetAndEpoch(info.lastOffset, epoch);
|
||||
long numRecords = info.lastOffset - info.firstOffset + 1;
|
||||
logger.debug("Completed write of {} records at {}", numRecords, offsetAndEpoch);
|
||||
|
||||
if (unwrittenAppend.ackMode == AckMode.LEADER) {
|
||||
unwrittenAppend.complete(offsetAndEpoch);
|
||||
|
@ -1641,13 +1646,11 @@ public class KafkaRaftClient implements RaftClient {
|
|||
unwrittenAppend.fail(exception);
|
||||
} else {
|
||||
long elapsedTime = Math.max(0, completionTimeMs - currentTimeMs);
|
||||
long numCommittedRecords = info.lastOffset - info.firstOffset + 1;
|
||||
double elapsedTimePerRecord = (double) elapsedTime / numCommittedRecords;
|
||||
double elapsedTimePerRecord = (double) elapsedTime / numRecords;
|
||||
kafkaRaftMetrics.updateCommitLatency(elapsedTimePerRecord, currentTimeMs);
|
||||
unwrittenAppend.complete(offsetAndEpoch);
|
||||
|
||||
logger.debug("Completed committing append with {} records at {}",
|
||||
numCommittedRecords, offsetAndEpoch);
|
||||
logger.debug("Completed commit of {} records at {}", numRecords, offsetAndEpoch);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -1655,6 +1658,10 @@ public class KafkaRaftClient implements RaftClient {
|
|||
|
||||
numAppends++;
|
||||
}
|
||||
|
||||
if (numAppends > 0) {
|
||||
flushLeaderLog(state, currentTimeMs);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -17,7 +17,6 @@
|
|||
package org.apache.kafka.raft;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
|
@ -114,14 +113,6 @@ public class LeaderState implements EpochState {
|
|||
return false;
|
||||
}
|
||||
|
||||
private OptionalLong quorumMajorityFetchTimestamp() {
|
||||
// Find the latest timestamp which is fetched by a majority of replicas (the leader counts)
|
||||
ArrayList<ReplicaState> followersByDescendingFetchTimestamp = new ArrayList<>(this.voterReplicaStates.values());
|
||||
followersByDescendingFetchTimestamp.sort(FETCH_TIMESTAMP_COMPARATOR);
|
||||
int indexOfTimestamp = voterReplicaStates.size() / 2;
|
||||
return followersByDescendingFetchTimestamp.get(indexOfTimestamp).lastFetchTimestamp;
|
||||
}
|
||||
|
||||
/**
|
||||
* Update the local replica state.
|
||||
*
|
||||
|
@ -272,18 +263,6 @@ public class LeaderState implements EpochState {
|
|||
}
|
||||
}
|
||||
|
||||
private static final Comparator<ReplicaState> FETCH_TIMESTAMP_COMPARATOR = (state, that) -> {
|
||||
if (state.lastFetchTimestamp.equals(that.lastFetchTimestamp))
|
||||
return Integer.compare(state.nodeId, that.nodeId);
|
||||
else if (!state.lastFetchTimestamp.isPresent())
|
||||
return 1;
|
||||
else if (!that.lastFetchTimestamp.isPresent())
|
||||
return -1;
|
||||
else
|
||||
return Long.compare(that.lastFetchTimestamp.getAsLong(), state.lastFetchTimestamp.getAsLong());
|
||||
};
|
||||
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "Leader(" +
|
||||
|
|
|
@ -103,6 +103,16 @@ public interface ReplicatedLog extends Closeable {
|
|||
*/
|
||||
void updateHighWatermark(LogOffsetMetadata offsetMetadata);
|
||||
|
||||
/**
|
||||
* Flush the current log to disk.
|
||||
*/
|
||||
void flush();
|
||||
|
||||
/**
|
||||
* Get the last offset which has been flushed to disk.
|
||||
*/
|
||||
long lastFlushedOffset();
|
||||
|
||||
/**
|
||||
* Return the topic partition associated with the log.
|
||||
*/
|
||||
|
|
|
@ -215,7 +215,8 @@ public class KafkaRaftClientTest {
|
|||
long electionTimestamp = time.milliseconds();
|
||||
|
||||
// Leader change record appended
|
||||
assertEquals(1, log.endOffset().offset);
|
||||
assertEquals(1L, log.endOffset().offset);
|
||||
assertEquals(1L, log.lastFlushedOffset());
|
||||
|
||||
// Send BeginQuorumEpoch to voters
|
||||
client.poll();
|
||||
|
@ -396,6 +397,7 @@ public class KafkaRaftClientTest {
|
|||
client.append(MemoryRecords.withRecords(CompressionType.NONE, records), AckMode.LEADER, Integer.MAX_VALUE);
|
||||
client.poll();
|
||||
assertEquals(3L, log.endOffset().offset);
|
||||
assertEquals(3L, log.lastFlushedOffset());
|
||||
assertEquals(OptionalLong.of(1L), client.highWatermark());
|
||||
|
||||
validateLocalRead(client, new OffsetAndEpoch(1L, epoch), Isolation.COMMITTED, new SimpleRecord[0]);
|
||||
|
@ -1779,6 +1781,7 @@ public class KafkaRaftClientTest {
|
|||
|
||||
client.poll();
|
||||
assertEquals(2L, log.endOffset().offset);
|
||||
assertEquals(2L, log.lastFlushedOffset());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -43,10 +43,11 @@ import java.util.stream.Collectors;
|
|||
public class MockLog implements ReplicatedLog {
|
||||
private final List<EpochStartOffset> epochStartOffsets = new ArrayList<>();
|
||||
private final List<LogBatch> log = new ArrayList<>();
|
||||
private final TopicPartition topicPartition;
|
||||
|
||||
private UUID nextId = UUID.randomUUID();
|
||||
private LogOffsetMetadata highWatermark = new LogOffsetMetadata(0L, Optional.of(new MockOffsetMetadata(nextId)));
|
||||
private final TopicPartition topicPartition;
|
||||
private LogOffsetMetadata highWatermark = new LogOffsetMetadata(0L, Optional.empty());
|
||||
private long lastFlushedOffset = 0L;
|
||||
|
||||
public MockLog(TopicPartition topicPartition) {
|
||||
this.topicPartition = topicPartition;
|
||||
|
@ -240,6 +241,25 @@ public class MockLog implements ReplicatedLog {
|
|||
return new LogAppendInfo(baseOffset, lastOffset);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flush() {
|
||||
lastFlushedOffset = endOffset().offset;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long lastFlushedOffset() {
|
||||
return lastFlushedOffset;
|
||||
}
|
||||
|
||||
/**
|
||||
* Reopening the log causes all unflushed data to be lost.
|
||||
*/
|
||||
public void reopen() {
|
||||
log.removeIf(batch -> batch.firstOffset() >= lastFlushedOffset);
|
||||
epochStartOffsets.removeIf(epochStartOffset -> epochStartOffset.startOffset >= lastFlushedOffset);
|
||||
highWatermark = new LogOffsetMetadata(0L, Optional.empty());
|
||||
}
|
||||
|
||||
public List<LogBatch> readBatches(long startOffset, OptionalLong maxOffsetOpt) {
|
||||
verifyOffsetInRange(startOffset);
|
||||
|
||||
|
@ -436,10 +456,4 @@ public class MockLog implements ReplicatedLog {
|
|||
}
|
||||
}
|
||||
|
||||
public void clear() {
|
||||
epochStartOffsets.clear();
|
||||
log.clear();
|
||||
highWatermark = new LogOffsetMetadata(0L);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -382,6 +382,20 @@ public class MockLogTest {
|
|||
assertEquals(Optional.of(new OffsetAndEpoch(5L, 3)), log.endOffsetForEpoch(3));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUnflushedRecordsLostAfterReopen() {
|
||||
appendBatch(5, 1);
|
||||
appendBatch(10, 2);
|
||||
log.flush();
|
||||
|
||||
appendBatch(5, 3);
|
||||
appendBatch(10, 4);
|
||||
log.reopen();
|
||||
|
||||
assertEquals(15L, log.endOffset().offset);
|
||||
assertEquals(2, log.lastFetchedEpoch());
|
||||
}
|
||||
|
||||
private Optional<OffsetRange> readOffsets(long startOffset, Isolation isolation) {
|
||||
Records records = log.read(startOffset, isolation).records;
|
||||
long firstReadOffset = -1L;
|
||||
|
|
|
@ -199,6 +199,56 @@ public class RaftEventSimulationTest {
|
|||
}
|
||||
|
||||
scheduler.runUntil(() -> cluster.allReachedHighWatermark(20));
|
||||
long highWatermark = cluster.maxHighWatermarkReached();
|
||||
|
||||
// Restart the node and verify it catches up
|
||||
cluster.start(leaderId);
|
||||
scheduler.runUntil(() -> cluster.allReachedHighWatermark(highWatermark + 10));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRecoveryAfterAllNodesFailQuorumSizeThree() {
|
||||
testRecoveryAfterAllNodesFail(new QuorumConfig(3));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRecoveryAfterAllNodesFailQuorumSizeFour() {
|
||||
testRecoveryAfterAllNodesFail(new QuorumConfig(4));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRecoveryAfterAllNodesFailQuorumSizeFive() {
|
||||
testRecoveryAfterAllNodesFail(new QuorumConfig(5));
|
||||
}
|
||||
|
||||
private void testRecoveryAfterAllNodesFail(QuorumConfig config) {
|
||||
for (int seed = 0; seed < 100; seed++) {
|
||||
Cluster cluster = new Cluster(config, seed);
|
||||
MessageRouter router = new MessageRouter(cluster);
|
||||
EventScheduler scheduler = schedulerWithDefaultInvariants(cluster);
|
||||
|
||||
// Seed the cluster with some data
|
||||
cluster.startAll();
|
||||
schedulePolling(scheduler, cluster, 3, 5);
|
||||
scheduler.schedule(router::deliverAll, 0, 2, 1);
|
||||
scheduler.schedule(new SequentialAppendAction(cluster), 0, 2, 3);
|
||||
scheduler.runUntil(cluster::hasConsistentLeader);
|
||||
scheduler.runUntil(() -> cluster.anyReachedHighWatermark(10));
|
||||
long highWatermark = cluster.maxHighWatermarkReached();
|
||||
|
||||
// We kill all of the nodes. Then we bring back a majority and verify that
|
||||
// they are able to elect a leader and continue making progress
|
||||
|
||||
cluster.killAll();
|
||||
|
||||
Iterator<Integer> nodeIdsIterator = cluster.nodes().iterator();
|
||||
for (int i = 0; i < cluster.majoritySize(); i++) {
|
||||
Integer nodeId = nodeIdsIterator.next();
|
||||
cluster.start(nodeId);
|
||||
}
|
||||
|
||||
scheduler.runUntil(() -> cluster.allReachedHighWatermark(highWatermark + 10));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -627,6 +677,10 @@ public class RaftEventSimulationTest {
|
|||
return true;
|
||||
}
|
||||
|
||||
void killAll() {
|
||||
running.clear();
|
||||
}
|
||||
|
||||
void kill(int nodeId) {
|
||||
running.remove(nodeId);
|
||||
}
|
||||
|
@ -702,6 +756,8 @@ public class RaftEventSimulationTest {
|
|||
this::nodeAddress
|
||||
));
|
||||
|
||||
persistentState.log.reopen();
|
||||
|
||||
KafkaRaftClient client = new KafkaRaftClient(channel, persistentState.log, quorum, time, metrics,
|
||||
fetchPurgatory, appendPurgatory, voterConnectionMap, ELECTION_JITTER_MS,
|
||||
RETRY_BACKOFF_MS, REQUEST_TIMEOUT_MS, FETCH_MAX_WAIT_MS, logContext, random);
|
||||
|
|
Loading…
Reference in New Issue