KAFKA-12376: Apply atomic append to the log (#10253)

This commit is contained in:
José Armando García Sancio 2021-03-04 10:55:43 -08:00 committed by GitHub
parent be1476869f
commit 96a2b7aac4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 268 additions and 103 deletions

View File

@ -92,6 +92,11 @@ trait RaftManager[T] {
listener: RaftClient.Listener[T]
): Unit
def scheduleAtomicAppend(
epoch: Int,
records: Seq[T]
): Option[Long]
def scheduleAppend(
epoch: Int,
records: Seq[T]
@ -157,16 +162,32 @@ class KafkaRaftManager[T](
raftClient.register(listener)
}
override def scheduleAtomicAppend(
epoch: Int,
records: Seq[T]
): Option[Long] = {
append(epoch, records, true)
}
override def scheduleAppend(
epoch: Int,
records: Seq[T]
): Option[Long] = {
val offset: java.lang.Long = raftClient.scheduleAppend(epoch, records.asJava)
if (offset == null) {
None
append(epoch, records, false)
}
private def append(
epoch: Int,
records: Seq[T],
isAtomic: Boolean
): Option[Long] = {
val offset = if (isAtomic) {
raftClient.scheduleAtomicAppend(epoch, records.asJava)
} else {
Some(Long.unbox(offset))
raftClient.scheduleAppend(epoch, records.asJava)
}
Option(offset).map(Long.unbox)
}
override def handleRequest(

View File

@ -86,7 +86,7 @@ public class ClientQuotaControlManager {
}
});
return new ControllerResult<>(outputRecords, outputResults);
return ControllerResult.atomicOf(outputRecords, outputResults);
}
/**

View File

@ -213,7 +213,7 @@ public class ClusterControlManager {
List<ApiMessageAndVersion> records = new ArrayList<>();
records.add(new ApiMessageAndVersion(record, (short) 0));
return new ControllerResult<>(records, new BrokerRegistrationReply(brokerEpoch));
return ControllerResult.of(records, new BrokerRegistrationReply(brokerEpoch));
}
public void replay(RegisterBrokerRecord record) {

View File

@ -83,7 +83,7 @@ public class ConfigurationControlManager {
outputRecords,
outputResults);
}
return new ControllerResult<>(outputRecords, outputResults);
return ControllerResult.atomicOf(outputRecords, outputResults);
}
private void incrementalAlterConfigResource(ConfigResource configResource,
@ -171,7 +171,7 @@ public class ConfigurationControlManager {
outputRecords,
outputResults);
}
return new ControllerResult<>(outputRecords, outputResults);
return ControllerResult.atomicOf(outputRecords, outputResults);
}
private void legacyAlterConfigResource(ConfigResource configResource,

View File

@ -19,7 +19,7 @@ package org.apache.kafka.controller;
import org.apache.kafka.metadata.ApiMessageAndVersion;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
@ -28,15 +28,13 @@ import java.util.stream.Collectors;
class ControllerResult<T> {
private final List<ApiMessageAndVersion> records;
private final T response;
private final boolean isAtomic;
public ControllerResult(T response) {
this(new ArrayList<>(), response);
}
public ControllerResult(List<ApiMessageAndVersion> records, T response) {
protected ControllerResult(List<ApiMessageAndVersion> records, T response, boolean isAtomic) {
Objects.requireNonNull(records);
this.records = records;
this.response = response;
this.isAtomic = isAtomic;
}
public List<ApiMessageAndVersion> records() {
@ -47,6 +45,10 @@ class ControllerResult<T> {
return response;
}
public boolean isAtomic() {
return isAtomic;
}
@Override
public boolean equals(Object o) {
if (o == null || (!o.getClass().equals(getClass()))) {
@ -54,22 +56,34 @@ class ControllerResult<T> {
}
ControllerResult other = (ControllerResult) o;
return records.equals(other.records) &&
Objects.equals(response, other.response);
Objects.equals(response, other.response) &&
Objects.equals(isAtomic, other.isAtomic);
}
@Override
public int hashCode() {
return Objects.hash(records, response);
return Objects.hash(records, response, isAtomic);
}
@Override
public String toString() {
return "ControllerResult(records=" + String.join(",",
records.stream().map(r -> r.toString()).collect(Collectors.toList())) +
", response=" + response + ")";
return String.format(
"ControllerResult(records=%s, response=%s, isAtomic=%s)",
String.join(",", records.stream().map(ApiMessageAndVersion::toString).collect(Collectors.toList())),
response,
isAtomic
);
}
public ControllerResult<T> withoutRecords() {
return new ControllerResult<>(new ArrayList<>(), response);
return new ControllerResult<>(Collections.emptyList(), response, false);
}
public static <T> ControllerResult<T> atomicOf(List<ApiMessageAndVersion> records, T response) {
return new ControllerResult<>(records, response, true);
}
public static <T> ControllerResult<T> of(List<ApiMessageAndVersion> records, T response) {
return new ControllerResult<>(records, response, false);
}
}

View File

@ -19,24 +19,15 @@ package org.apache.kafka.controller;
import org.apache.kafka.metadata.ApiMessageAndVersion;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
class ControllerResultAndOffset<T> extends ControllerResult<T> {
final class ControllerResultAndOffset<T> extends ControllerResult<T> {
private final long offset;
public ControllerResultAndOffset(T response) {
super(new ArrayList<>(), response);
this.offset = -1;
}
public ControllerResultAndOffset(long offset,
List<ApiMessageAndVersion> records,
T response) {
super(records, response);
private ControllerResultAndOffset(long offset, ControllerResult<T> result) {
super(result.records(), result.response(), result.isAtomic());
this.offset = offset;
}
@ -52,18 +43,27 @@ class ControllerResultAndOffset<T> extends ControllerResult<T> {
ControllerResultAndOffset other = (ControllerResultAndOffset) o;
return records().equals(other.records()) &&
response().equals(other.response()) &&
isAtomic() == other.isAtomic() &&
offset == other.offset;
}
@Override
public int hashCode() {
return Objects.hash(records(), response(), offset);
return Objects.hash(records(), response(), isAtomic(), offset);
}
@Override
public String toString() {
return "ControllerResultAndOffset(records=" + String.join(",",
records().stream().map(r -> r.toString()).collect(Collectors.toList())) +
", response=" + response() + ", offset=" + offset + ")";
return String.format(
"ControllerResultAndOffset(records=%s, response=%s, isAtomic=%s, offset=%s)",
String.join(",", records().stream().map(ApiMessageAndVersion::toString).collect(Collectors.toList())),
response(),
isAtomic(),
offset
);
}
public static <T> ControllerResultAndOffset<T> of(long offset, ControllerResult<T> result) {
return new ControllerResultAndOffset<>(offset, result);
}
}

View File

@ -69,7 +69,8 @@ public class FeatureControlManager {
results.put(entry.getKey(), updateFeature(entry.getKey(), entry.getValue(),
downgradeables.contains(entry.getKey()), brokerFeatures, records));
}
return new ControllerResult<>(records, results);
return ControllerResult.atomicOf(records, results);
}
private ApiError updateFeature(String featureName,

View File

@ -17,7 +17,6 @@
package org.apache.kafka.controller;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@ -265,7 +264,7 @@ public final class QuorumController implements Controller {
class ControlEvent implements EventQueue.Event {
private final String name;
private final Runnable handler;
private long eventCreatedTimeNs = time.nanoseconds();
private final long eventCreatedTimeNs = time.nanoseconds();
private Optional<Long> startProcessingTimeNs = Optional.empty();
ControlEvent(String name, Runnable handler) {
@ -307,7 +306,7 @@ public final class QuorumController implements Controller {
private final String name;
private final CompletableFuture<T> future;
private final Supplier<T> handler;
private long eventCreatedTimeNs = time.nanoseconds();
private final long eventCreatedTimeNs = time.nanoseconds();
private Optional<Long> startProcessingTimeNs = Optional.empty();
ControllerReadEvent(String name, Supplier<T> handler) {
@ -389,7 +388,7 @@ public final class QuorumController implements Controller {
private final String name;
private final CompletableFuture<T> future;
private final ControllerWriteOperation<T> op;
private long eventCreatedTimeNs = time.nanoseconds();
private final long eventCreatedTimeNs = time.nanoseconds();
private Optional<Long> startProcessingTimeNs = Optional.empty();
private ControllerResultAndOffset<T> resultAndOffset;
@ -423,8 +422,7 @@ public final class QuorumController implements Controller {
if (!maybeOffset.isPresent()) {
// If the purgatory is empty, there are no pending operations and no
// uncommitted state. We can return immediately.
resultAndOffset = new ControllerResultAndOffset<>(-1,
new ArrayList<>(), result.response());
resultAndOffset = ControllerResultAndOffset.of(-1, result);
log.debug("Completing read-only operation {} immediately because " +
"the purgatory is empty.", this);
complete(null);
@ -432,8 +430,7 @@ public final class QuorumController implements Controller {
}
// If there are operations in the purgatory, we want to wait for the latest
// one to complete before returning our result to the user.
resultAndOffset = new ControllerResultAndOffset<>(maybeOffset.get(),
result.records(), result.response());
resultAndOffset = ControllerResultAndOffset.of(maybeOffset.get(), result);
log.debug("Read-only operation {} will be completed when the log " +
"reaches offset {}", this, resultAndOffset.offset());
} else {
@ -441,11 +438,15 @@ public final class QuorumController implements Controller {
// written before we can return our result to the user. Here, we hand off
// the batch of records to the metadata log manager. They will be written
// out asynchronously.
long offset = logManager.scheduleWrite(controllerEpoch, result.records());
final long offset;
if (result.isAtomic()) {
offset = logManager.scheduleAtomicWrite(controllerEpoch, result.records());
} else {
offset = logManager.scheduleWrite(controllerEpoch, result.records());
}
op.processBatchEndOffset(offset);
writeOffset = offset;
resultAndOffset = new ControllerResultAndOffset<>(offset,
result.records(), result.response());
resultAndOffset = ControllerResultAndOffset.of(offset, result);
for (ApiMessageAndVersion message : result.records()) {
replay(message.message());
}

View File

@ -407,7 +407,7 @@ public class ReplicationControlManager {
resultsPrefix = ", ";
}
log.info("createTopics result(s): {}", resultsBuilder.toString());
return new ControllerResult<>(records, data);
return ControllerResult.atomicOf(records, data);
}
private ApiError createTopic(CreatableTopic topic,
@ -626,7 +626,7 @@ public class ReplicationControlManager {
setIsr(partitionData.newIsr()));
}
}
return new ControllerResult<>(records, response);
return ControllerResult.of(records, response);
}
/**
@ -780,7 +780,7 @@ public class ReplicationControlManager {
setErrorMessage(error.message()));
}
}
return new ControllerResult<>(records, response);
return ControllerResult.of(records, response);
}
static boolean electionIsUnclean(byte electionType) {
@ -875,7 +875,7 @@ public class ReplicationControlManager {
states.next().fenced(),
states.next().inControlledShutdown(),
states.next().shouldShutDown());
return new ControllerResult<>(records, reply);
return ControllerResult.of(records, reply);
}
int bestLeader(int[] replicas, int[] isr, boolean unclean) {
@ -904,7 +904,7 @@ public class ReplicationControlManager {
}
List<ApiMessageAndVersion> records = new ArrayList<>();
handleBrokerUnregistered(brokerId, registration.epoch(), records);
return new ControllerResult<>(records, null);
return ControllerResult.of(records, null);
}
ControllerResult<Void> maybeFenceStaleBrokers() {
@ -916,6 +916,6 @@ public class ReplicationControlManager {
handleBrokerFenced(brokerId, records);
heartbeatManager.fence(brokerId);
}
return new ControllerResult<>(records, null);
return ControllerResult.of(records, null);
}
}

View File

@ -328,8 +328,21 @@ public final class LocalLogManager implements MetaLogManager, AutoCloseable {
@Override
public long scheduleWrite(long epoch, List<ApiMessageAndVersion> batch) {
return shared.tryAppend(nodeId, leader.epoch(), new LocalRecordBatch(
batch.stream().map(r -> r.message()).collect(Collectors.toList())));
return scheduleAtomicWrite(epoch, batch);
}
@Override
public long scheduleAtomicWrite(long epoch, List<ApiMessageAndVersion> batch) {
return shared.tryAppend(
nodeId,
leader.epoch(),
new LocalRecordBatch(
batch
.stream()
.map(ApiMessageAndVersion::message)
.collect(Collectors.toList())
)
);
}
@Override

View File

@ -50,13 +50,30 @@ public interface MetaLogManager {
* offset before renouncing its leadership. The listener should determine this by
* monitoring the committed offsets.
*
* @param epoch The controller epoch.
* @param batch The batch of messages to write.
* @param epoch the controller epoch
* @param batch the batch of messages to write
*
* @return The offset of the message.
* @return the offset of the last message in the batch
* @throws IllegalArgumentException if buffer allocatio failed and the client should backoff
*/
long scheduleWrite(long epoch, List<ApiMessageAndVersion> batch);
/**
* Schedule a atomic write to the log.
*
* The write will be scheduled to happen at some time in the future. All of the messages in batch
* will be appended atomically in one batch. The listener may regard the write as successful
* if and only if the MetaLogManager reaches the given offset before renouncing its leadership.
* The listener should determine this by monitoring the committed offsets.
*
* @param epoch the controller epoch
* @param batch the batch of messages to write
*
* @return the offset of the last message in the batch
* @throws IllegalArgumentException if buffer allocatio failed and the client should backoff
*/
long scheduleAtomicWrite(long epoch, List<ApiMessageAndVersion> batch);
/**
* Renounce the leadership.
*

View File

@ -135,18 +135,42 @@ public class ConfigurationControlManagerTest {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
ConfigurationControlManager manager =
new ConfigurationControlManager(new LogContext(), snapshotRegistry, CONFIGS);
assertEquals(new ControllerResult<Map<ConfigResource, ApiError>>(Collections.singletonList(
new ApiMessageAndVersion(new ConfigRecord().
setResourceType(TOPIC.id()).setResourceName("mytopic").
setName("abc").setValue("123"), (short) 0)),
toMap(entry(BROKER0, new ApiError(
Errors.INVALID_REQUEST, "A DELETE op was given with a non-null value.")),
entry(MYTOPIC, ApiError.NONE))),
manager.incrementalAlterConfigs(toMap(entry(BROKER0, toMap(
entry("foo.bar", entry(DELETE, "abc")),
entry("quux", entry(SET, "abc")))),
entry(MYTOPIC, toMap(
entry("abc", entry(APPEND, "123")))))));
assertEquals(
ControllerResult.atomicOf(
Collections.singletonList(
new ApiMessageAndVersion(
new ConfigRecord()
.setResourceType(TOPIC.id())
.setResourceName("mytopic")
.setName("abc")
.setValue("123"),
(short) 0
)
),
toMap(
entry(
BROKER0,
new ApiError(
Errors.INVALID_REQUEST,
"A DELETE op was given with a non-null value."
)
),
entry(MYTOPIC, ApiError.NONE)
)
),
manager.incrementalAlterConfigs(
toMap(
entry(
BROKER0,
toMap(
entry("foo.bar", entry(DELETE, "abc")),
entry("quux", entry(SET, "abc"))
)
),
entry(MYTOPIC, toMap(entry("abc", entry(APPEND, "123"))))
)
)
);
}
@Test
@ -184,20 +208,33 @@ public class ConfigurationControlManagerTest {
new ApiMessageAndVersion(new ConfigRecord().
setResourceType(TOPIC.id()).setResourceName("mytopic").
setName("def").setValue("901"), (short) 0));
assertEquals(new ControllerResult<Map<ConfigResource, ApiError>>(
assertEquals(
ControllerResult.atomicOf(
expectedRecords1,
toMap(entry(MYTOPIC, ApiError.NONE))),
manager.legacyAlterConfigs(toMap(entry(MYTOPIC, toMap(
entry("abc", "456"), entry("def", "901"))))));
toMap(entry(MYTOPIC, ApiError.NONE))
),
manager.legacyAlterConfigs(
toMap(entry(MYTOPIC, toMap(entry("abc", "456"), entry("def", "901"))))
)
);
for (ApiMessageAndVersion message : expectedRecords1) {
manager.replay((ConfigRecord) message.message());
}
assertEquals(new ControllerResult<Map<ConfigResource, ApiError>>(Arrays.asList(
new ApiMessageAndVersion(new ConfigRecord().
setResourceType(TOPIC.id()).setResourceName("mytopic").
setName("abc").setValue(null), (short) 0)),
toMap(entry(MYTOPIC, ApiError.NONE))),
manager.legacyAlterConfigs(toMap(entry(MYTOPIC, toMap(
entry("def", "901"))))));
assertEquals(
ControllerResult.atomicOf(
Arrays.asList(
new ApiMessageAndVersion(
new ConfigRecord()
.setResourceType(TOPIC.id())
.setResourceName("mytopic")
.setName("abc")
.setValue(null),
(short) 0
)
),
toMap(entry(MYTOPIC, ApiError.NONE))
),
manager.legacyAlterConfigs(toMap(entry(MYTOPIC, toMap(entry("def", "901")))))
);
}
}

View File

@ -18,10 +18,8 @@
package org.apache.kafka.controller;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
@ -61,11 +59,11 @@ public class FeatureControlManagerTest {
rangeMap("foo", 1, 2), snapshotRegistry);
assertEquals(new FeatureMapAndEpoch(new FeatureMap(Collections.emptyMap()), -1),
manager.finalizedFeatures(-1));
assertEquals(new ControllerResult<>(Collections.
assertEquals(ControllerResult.atomicOf(Collections.emptyList(), Collections.
singletonMap("foo", new ApiError(Errors.INVALID_UPDATE_VERSION,
"The controller does not support the given feature range."))),
manager.updateFeatures(rangeMap("foo", 1, 3),
new HashSet<>(Arrays.asList("foo")),
Collections.singleton("foo"),
Collections.emptyMap()));
ControllerResult<Map<String, ApiError>> result = manager.updateFeatures(
rangeMap("foo", 1, 2, "bar", 1, 1), Collections.emptySet(),
@ -101,12 +99,24 @@ public class FeatureControlManagerTest {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
FeatureControlManager manager = new FeatureControlManager(
rangeMap("foo", 1, 5, "bar", 1, 2), snapshotRegistry);
assertEquals(new ControllerResult<>(Collections.
singletonMap("foo", new ApiError(Errors.INVALID_UPDATE_VERSION,
"Broker 5 does not support the given feature range."))),
manager.updateFeatures(rangeMap("foo", 1, 3),
new HashSet<>(Arrays.asList("foo")),
Collections.singletonMap(5, rangeMap())));
assertEquals(
ControllerResult.atomicOf(
Collections.emptyList(),
Collections.singletonMap(
"foo",
new ApiError(
Errors.INVALID_UPDATE_VERSION,
"Broker 5 does not support the given feature range."
)
)
),
manager.updateFeatures(
rangeMap("foo", 1, 3),
Collections.singleton("foo"),
Collections.singletonMap(5, rangeMap())
)
);
ControllerResult<Map<String, ApiError>> result = manager.updateFeatures(
rangeMap("foo", 1, 3), Collections.emptySet(), Collections.emptyMap());
@ -114,19 +124,31 @@ public class FeatureControlManagerTest {
manager.replay((FeatureLevelRecord) result.records().get(0).message(), 3);
snapshotRegistry.createSnapshot(3);
assertEquals(new ControllerResult<>(Collections.
assertEquals(ControllerResult.atomicOf(Collections.emptyList(), Collections.
singletonMap("foo", new ApiError(Errors.INVALID_UPDATE_VERSION,
"Can't downgrade the maximum version of this feature without " +
"setting downgradable to true."))),
manager.updateFeatures(rangeMap("foo", 1, 2),
Collections.emptySet(), Collections.emptyMap()));
assertEquals(new ControllerResult<>(
Collections.singletonList(new ApiMessageAndVersion(new FeatureLevelRecord().
setName("foo").setMinFeatureLevel((short) 1).setMaxFeatureLevel((short) 2),
(short) 0)),
Collections.singletonMap("foo", ApiError.NONE)),
manager.updateFeatures(rangeMap("foo", 1, 2),
new HashSet<>(Collections.singletonList("foo")), Collections.emptyMap()));
assertEquals(
ControllerResult.atomicOf(
Collections.singletonList(
new ApiMessageAndVersion(
new FeatureLevelRecord()
.setName("foo")
.setMinFeatureLevel((short) 1)
.setMaxFeatureLevel((short) 2),
(short) 0
)
),
Collections.singletonMap("foo", ApiError.NONE)
),
manager.updateFeatures(
rangeMap("foo", 1, 2),
Collections.singleton("foo"),
Collections.emptyMap()
)
);
}
}

View File

@ -371,8 +371,21 @@ public final class LocalLogManager implements MetaLogManager, AutoCloseable {
@Override
public long scheduleWrite(long epoch, List<ApiMessageAndVersion> batch) {
return shared.tryAppend(nodeId, leader.epoch(), new LocalRecordBatch(
batch.stream().map(r -> r.message()).collect(Collectors.toList())));
return scheduleAtomicWrite(epoch, batch);
}
@Override
public long scheduleAtomicWrite(long epoch, List<ApiMessageAndVersion> batch) {
return shared.tryAppend(
nodeId,
leader.epoch(),
new LocalRecordBatch(
batch
.stream()
.map(ApiMessageAndVersion::message)
.collect(Collectors.toList())
)
);
}
@Override

View File

@ -52,9 +52,35 @@ public class MetaLogRaftShim implements MetaLogManager {
client.register(new ListenerShim(listener));
}
@Override
public long scheduleAtomicWrite(long epoch, List<ApiMessageAndVersion> batch) {
return write(epoch, batch, true);
}
@Override
public long scheduleWrite(long epoch, List<ApiMessageAndVersion> batch) {
return client.scheduleAppend((int) epoch, batch);
return write(epoch, batch, false);
}
private long write(long epoch, List<ApiMessageAndVersion> batch, boolean isAtomic) {
final Long result;
if (isAtomic) {
result = client.scheduleAtomicAppend((int) epoch, batch);
} else {
result = client.scheduleAppend((int) epoch, batch);
}
if (result == null) {
throw new IllegalArgumentException(
String.format(
"Unable to alloate a buffer for the schedule write operation: epoch %s, batch %s)",
epoch,
batch
)
);
} else {
return result;
}
}
@Override