mirror of https://github.com/apache/kafka.git
KAFKA-16969: Log error if config conficts with MV (#16366)
When broker configuration is incompatible with the current Metadata Version the Broker should log an error-level message but avoid shutting down. Reviewers: Luke Chen <showuon@gmail.com>
This commit is contained in:
parent
029bbb16e9
commit
ceab1fe658
|
@ -57,15 +57,13 @@ public class MetadataVersionConfigValidator implements MetadataPublisher {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("ThrowableNotThrown")
|
||||||
private void onMetadataVersionChanged(MetadataVersion metadataVersion) {
|
private void onMetadataVersionChanged(MetadataVersion metadataVersion) {
|
||||||
try {
|
try {
|
||||||
this.config.validateWithMetadataVersion(metadataVersion);
|
this.config.validateWithMetadataVersion(metadataVersion);
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
RuntimeException exception = this.faultHandler.handleFault(
|
this.faultHandler.handleFault(
|
||||||
"Broker configuration does not support the cluster MetadataVersion", t);
|
"Broker configuration does not support the cluster MetadataVersion", t);
|
||||||
if (exception != null) {
|
|
||||||
throw exception;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.kafka.server.common.MetadataVersion;
|
||||||
import org.apache.kafka.server.fault.FaultHandler;
|
import org.apache.kafka.server.fault.FaultHandler;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
import static org.mockito.ArgumentMatchers.eq;
|
import static org.mockito.ArgumentMatchers.eq;
|
||||||
import static org.mockito.BDDMockito.willAnswer;
|
import static org.mockito.BDDMockito.willAnswer;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
|
@ -85,6 +86,7 @@ public class MetadataVersionConfigValidatorTest {
|
||||||
KafkaConfig config = mock(KafkaConfig.class);
|
KafkaConfig config = mock(KafkaConfig.class);
|
||||||
FaultHandler faultHandler = mock(FaultHandler.class);
|
FaultHandler faultHandler = mock(FaultHandler.class);
|
||||||
|
|
||||||
|
when(faultHandler.handleFault(any(), any())).thenReturn(new RuntimeException("returned exception"));
|
||||||
when(config.brokerId()).thenReturn(8);
|
when(config.brokerId()).thenReturn(8);
|
||||||
willAnswer(invocation -> {
|
willAnswer(invocation -> {
|
||||||
throw exception;
|
throw exception;
|
||||||
|
|
|
@ -102,6 +102,12 @@
|
||||||
If READ is not authorized, checkpointing is limited to offsets mirrorred after the start of the task.
|
If READ is not authorized, checkpointing is limited to offsets mirrorred after the start of the task.
|
||||||
See <a href="https://issues.apache.org/jira/browse/KAFKA-15905">KAFKA-15905</a> for more details.
|
See <a href="https://issues.apache.org/jira/browse/KAFKA-15905">KAFKA-15905</a> for more details.
|
||||||
</li>
|
</li>
|
||||||
|
<li>
|
||||||
|
JBOD support in KRaft was introduced from Metadata Version (MV) 3.7-IV2.
|
||||||
|
Configuring Brokers with multiple log directories can lead to indefinite unavailability.
|
||||||
|
Brokers will now detect this situation and log an error.
|
||||||
|
See <a href="https://issues.apache.org/jira/browse/KAFKA-16606">KAFKA-16606</a> for more details.
|
||||||
|
</li>
|
||||||
</ul>
|
</ul>
|
||||||
|
|
||||||
<h5><a id="upgrade_370_notable" href="#upgrade_370_notable">Notable changes in 3.7.0</a></h5>
|
<h5><a id="upgrade_370_notable" href="#upgrade_370_notable">Notable changes in 3.7.0</a></h5>
|
||||||
|
|
Loading…
Reference in New Issue