mirror of https://github.com/apache/kafka.git
This reverts commit 0927049a61
.
Reviewers: Luke Chen <showuon@gmail.com>
This commit is contained in:
parent
84351efd51
commit
c13b49f2d1
|
@ -16,10 +16,10 @@
|
|||
{
|
||||
"type": "data",
|
||||
"name": "QuorumStateData",
|
||||
// Version 1 removes clusterId field.
|
||||
"validVersions": "0-1",
|
||||
"validVersions": "0",
|
||||
"flexibleVersions": "0+",
|
||||
"fields": [
|
||||
{"name": "ClusterId", "type": "string", "versions": "0+"},
|
||||
{"name": "LeaderId", "type": "int32", "versions": "0+", "default": "-1"},
|
||||
{"name": "LeaderEpoch", "type": "int32", "versions": "0+", "default": "-1"},
|
||||
{"name": "VotedId", "type": "int32", "versions": "0+", "default": "-1"},
|
||||
|
|
|
@ -16,13 +16,6 @@
|
|||
*/
|
||||
package org.apache.kafka.raft;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import java.io.BufferedWriter;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.OutputStreamWriter;
|
||||
import java.io.UncheckedIOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import org.apache.kafka.common.utils.Utils;
|
||||
import org.apache.kafka.test.TestUtils;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
|
@ -97,45 +90,6 @@ public class FileBasedStateStoreTest {
|
|||
assertFalse(stateFile.exists());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCompatibilityWithClusterId() throws IOException {
|
||||
final File stateFile = TestUtils.tempFile();
|
||||
stateStore = new FileBasedStateStore(stateFile);
|
||||
|
||||
// We initialized a state from the metadata log
|
||||
assertTrue(stateFile.exists());
|
||||
|
||||
final int epoch = 2;
|
||||
final int leaderId = 1;
|
||||
Set<Integer> voters = Utils.mkSet(leaderId);
|
||||
String jsonString = "{\"clusterId\":\"abc\",\"leaderId\":" + leaderId + ",\"leaderEpoch\":" + epoch + ",\"votedId\":-1,\"appliedOffset\":0,\"currentVoters\":[],\"data_version\":0}";
|
||||
writeToStateFile(stateFile, jsonString);
|
||||
|
||||
// verify that we can read the state file that contains the removed "cluserId" field.
|
||||
assertEquals(ElectionState.withElectedLeader(epoch, leaderId, voters), stateStore.readElectionState());
|
||||
|
||||
stateStore.clear();
|
||||
assertFalse(stateFile.exists());
|
||||
}
|
||||
|
||||
private void writeToStateFile(final File stateFile, String jsonString) {
|
||||
try (final FileOutputStream fileOutputStream = new FileOutputStream(stateFile);
|
||||
final BufferedWriter writer = new BufferedWriter(
|
||||
new OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8))) {
|
||||
ObjectMapper mapper = new ObjectMapper();
|
||||
JsonNode node = mapper.readTree(jsonString);
|
||||
|
||||
writer.write(node.toString());
|
||||
writer.flush();
|
||||
fileOutputStream.getFD().sync();
|
||||
|
||||
} catch (IOException e) {
|
||||
throw new UncheckedIOException(
|
||||
String.format("Error while writing to Quorum state file %s",
|
||||
stateFile.getAbsolutePath()), e);
|
||||
}
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
public void cleanup() throws IOException {
|
||||
if (stateStore != null) {
|
||||
|
|
Loading…
Reference in New Issue