mirror of https://github.com/apache/kafka.git
KAFKA-14371: Remove unused clusterId field from quorum-state file (#13102)
Remove clusterId field from the KRaft controller's quorum-state file $LOG_DIR/__cluster_metadata-0/quorum-state Reviewers: Luke Chen <showuon@gmail.com>, dengziming <dengziming1993@gmail.com>, Christo Lolov <christololov@gmail.com>
This commit is contained in:
parent
f7f376f6c1
commit
0927049a61
|
@ -16,10 +16,10 @@
|
||||||
{
|
{
|
||||||
"type": "data",
|
"type": "data",
|
||||||
"name": "QuorumStateData",
|
"name": "QuorumStateData",
|
||||||
"validVersions": "0",
|
// Version 1 removes clusterId field.
|
||||||
|
"validVersions": "0-1",
|
||||||
"flexibleVersions": "0+",
|
"flexibleVersions": "0+",
|
||||||
"fields": [
|
"fields": [
|
||||||
{"name": "ClusterId", "type": "string", "versions": "0+"},
|
|
||||||
{"name": "LeaderId", "type": "int32", "versions": "0+", "default": "-1"},
|
{"name": "LeaderId", "type": "int32", "versions": "0+", "default": "-1"},
|
||||||
{"name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": "-1"},
|
{"name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": "-1"},
|
||||||
{"name": "VotedId", "type": "int32", "versions": "0+", "default": "-1"},
|
{"name": "VotedId", "type": "int32", "versions": "0+", "default": "-1"},
|
||||||
|
|
|
@ -16,6 +16,13 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.kafka.raft;
|
package org.apache.kafka.raft;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.JsonNode;
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import java.io.BufferedWriter;
|
||||||
|
import java.io.FileOutputStream;
|
||||||
|
import java.io.OutputStreamWriter;
|
||||||
|
import java.io.UncheckedIOException;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
import org.apache.kafka.common.utils.Utils;
|
import org.apache.kafka.common.utils.Utils;
|
||||||
import org.apache.kafka.test.TestUtils;
|
import org.apache.kafka.test.TestUtils;
|
||||||
import org.junit.jupiter.api.AfterEach;
|
import org.junit.jupiter.api.AfterEach;
|
||||||
|
@ -90,6 +97,45 @@ public class FileBasedStateStoreTest {
|
||||||
assertFalse(stateFile.exists());
|
assertFalse(stateFile.exists());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCompatibilityWithClusterId() throws IOException {
|
||||||
|
final File stateFile = TestUtils.tempFile();
|
||||||
|
stateStore = new FileBasedStateStore(stateFile);
|
||||||
|
|
||||||
|
// We initialized a state from the metadata log
|
||||||
|
assertTrue(stateFile.exists());
|
||||||
|
|
||||||
|
final int epoch = 2;
|
||||||
|
final int leaderId = 1;
|
||||||
|
Set<Integer> voters = Utils.mkSet(leaderId);
|
||||||
|
String jsonString = "{\"clusterId\":\"abc\",\"leaderId\":" + leaderId + ",\"leaderEpoch\":" + epoch + ",\"votedId\":-1,\"appliedOffset\":0,\"currentVoters\":[],\"data_version\":0}";
|
||||||
|
writeToStateFile(stateFile, jsonString);
|
||||||
|
|
||||||
|
// verify that we can read the state file that contains the removed "cluserId" field.
|
||||||
|
assertEquals(ElectionState.withElectedLeader(epoch, leaderId, voters), stateStore.readElectionState());
|
||||||
|
|
||||||
|
stateStore.clear();
|
||||||
|
assertFalse(stateFile.exists());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void writeToStateFile(final File stateFile, String jsonString) {
|
||||||
|
try (final FileOutputStream fileOutputStream = new FileOutputStream(stateFile);
|
||||||
|
final BufferedWriter writer = new BufferedWriter(
|
||||||
|
new OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8))) {
|
||||||
|
ObjectMapper mapper = new ObjectMapper();
|
||||||
|
JsonNode node = mapper.readTree(jsonString);
|
||||||
|
|
||||||
|
writer.write(node.toString());
|
||||||
|
writer.flush();
|
||||||
|
fileOutputStream.getFD().sync();
|
||||||
|
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new UncheckedIOException(
|
||||||
|
String.format("Error while writing to Quorum state file %s",
|
||||||
|
stateFile.getAbsolutePath()), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@AfterEach
|
@AfterEach
|
||||||
public void cleanup() throws IOException {
|
public void cleanup() throws IOException {
|
||||||
if (stateStore != null) {
|
if (stateStore != null) {
|
||||||
|
|
Loading…
Reference in New Issue