KAFKA-14273; Close file before atomic move (#14354)

In the Windows OS atomic move are not allowed if the file has another open handle. E.g

__cluster_metadata-0\quorum-state: The process cannot access the file because it is being used by another process
        at java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:92)
        at java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103)
        at java.base/sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:403)
        at java.base/sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:293)
        at java.base/java.nio.file.Files.move(Files.java:1430)
        at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:949)
        at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:932)
        at org.apache.kafka.raft.FileBasedStateStore.writeElectionStateToFile(FileBasedStateStore.java:152)

This is fixed by first closing the temporary quorum-state file before attempting to move it.

Reviewers: Colin Patrick McCabe <cmccabe@apache.org>
Co-Authored-By: Renaldo Baur Filho <renaldobf@gmail.com>
This commit is contained in:
José Armando García Sancio 2023-09-07 16:17:03 -07:00 committed by GitHub
parent a2de7d32c8
commit 7b669e8806
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 35 additions and 12 deletions

View File

@ -64,6 +64,7 @@ public class FileBasedStateStore implements QuorumStateStore {
private final File stateFile;
static final String DATA_VERSION = "data_version";
static final short HIGHEST_SUPPORTED_VERSION = 0;
public FileBasedStateStore(final File stateFile) {
this.stateFile = stateFile;
@ -144,21 +145,27 @@ public class FileBasedStateStore implements QuorumStateStore {
log.trace("Writing tmp quorum state {}", temp.getAbsolutePath());
try (final FileOutputStream fileOutputStream = new FileOutputStream(temp);
final BufferedWriter writer = new BufferedWriter(
new OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8))) {
short version = state.highestSupportedVersion();
ObjectNode jsonState = (ObjectNode) QuorumStateDataJsonConverter.write(state, version);
jsonState.set(DATA_VERSION, new ShortNode(version));
writer.write(jsonState.toString());
writer.flush();
fileOutputStream.getFD().sync();
try {
try (final FileOutputStream fileOutputStream = new FileOutputStream(temp);
final BufferedWriter writer = new BufferedWriter(
new OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8)
)
) {
ObjectNode jsonState = (ObjectNode) QuorumStateDataJsonConverter.write(state, HIGHEST_SUPPORTED_VERSION);
jsonState.set(DATA_VERSION, new ShortNode(HIGHEST_SUPPORTED_VERSION));
writer.write(jsonState.toString());
writer.flush();
fileOutputStream.getFD().sync();
}
Utils.atomicMoveWithFallback(temp.toPath(), stateFile.toPath());
} catch (IOException e) {
throw new UncheckedIOException(
String.format("Error while writing the Quorum status from the file %s",
stateFile.getAbsolutePath()), e);
String.format(
"Error while writing the Quorum status from the file %s",
stateFile.getAbsolutePath()
),
e
);
} finally {
// cleanup the temp file when the write finishes (either success or fail).
deleteFileIfExists(temp);

View File

@ -19,7 +19,9 @@ package org.apache.kafka.raft;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.protocol.types.TaggedFields;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.raft.generated.QuorumStateData;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
@ -107,6 +109,20 @@ public class FileBasedStateStoreTest {
assertCantReadQuorumStateVersion(jsonString);
}
@Test
public void testSupportedVersion() {
// If the next few checks fail, please check that they are compatible with previous releases of KRaft
// Check that FileBasedStateStore supports the latest version
assertEquals(FileBasedStateStore.HIGHEST_SUPPORTED_VERSION, QuorumStateData.HIGHEST_SUPPORTED_VERSION);
// Check that the supported versions haven't changed
assertEquals(0, QuorumStateData.HIGHEST_SUPPORTED_VERSION);
assertEquals(0, QuorumStateData.LOWEST_SUPPORTED_VERSION);
// For the latest version check that the number of tagged fields hasn't changed
TaggedFields taggedFields = (TaggedFields) QuorumStateData.SCHEMA_0.get(6).def.type;
assertEquals(0, taggedFields.numFields());
}
public void assertCantReadQuorumStateVersion(String jsonString) throws IOException {
final File stateFile = TestUtils.tempFile();
stateStore = new FileBasedStateStore(stateFile);