diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index dc674ab997a..ab6ebff7433 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -194,6 +194,8 @@
+
+
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index 38da78efb1b..4ae02c5145c 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -136,7 +136,7 @@ public abstract class AbstractRequest implements AbstractRequestResponse {
}
@Override
- public final String toString() {
+ public String toString() {
return toString(true);
}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterUserScramCredentialsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterUserScramCredentialsRequest.java
index 1ca7ea77aa4..c779f6d9a84 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AlterUserScramCredentialsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterUserScramCredentialsRequest.java
@@ -17,10 +17,14 @@
package org.apache.kafka.common.requests;
import org.apache.kafka.common.message.AlterUserScramCredentialsRequestData;
+import org.apache.kafka.common.message.AlterUserScramCredentialsRequestDataJsonConverter;
import org.apache.kafka.common.message.AlterUserScramCredentialsResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Set;
@@ -82,4 +86,16 @@ public class AlterUserScramCredentialsRequest extends AbstractRequest {
.collect(Collectors.toList());
return new AlterUserScramCredentialsResponse(new AlterUserScramCredentialsResponseData().setResults(results));
}
+
+ // Do not print salt or saltedPassword
+ @Override
+ public String toString() {
+ JsonNode json = AlterUserScramCredentialsRequestDataJsonConverter.write(data, version()).deepCopy();
+
+ for (JsonNode upsertion : json.get("upsertions")) {
+ ((ObjectNode) upsertion).put("salt", "");
+ ((ObjectNode) upsertion).put("saltedPassword", "");
+ }
+ return AlterUserScramCredentialsRequestDataJsonConverter.read(json, version()).toString();
+ }
}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsRequest.java
index 222097502b2..54540837756 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsRequest.java
@@ -21,11 +21,15 @@ import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData;
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterConfigsResource;
+import org.apache.kafka.common.message.IncrementalAlterConfigsRequestDataJsonConverter;
import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData;
import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResponse;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Map;
@@ -107,4 +111,16 @@ public class IncrementalAlterConfigsRequest extends AbstractRequest {
}
return new IncrementalAlterConfigsResponse(response);
}
+
+ // It is not safe to print all config values
+ @Override
+ public String toString() {
+ JsonNode json = IncrementalAlterConfigsRequestDataJsonConverter.write(data, version()).deepCopy();
+ for (JsonNode resource : json.get("resources")) {
+ for (JsonNode config : resource.get("configs")) {
+ ((ObjectNode) config).put("value", "REDACTED");
+ }
+ }
+ return IncrementalAlterConfigsRequestDataJsonConverter.read(json, version()).toString();
+ }
}
diff --git a/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala b/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
index 1ab0f0ae8e8..ecea412e989 100644
--- a/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
+++ b/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
@@ -131,11 +131,21 @@ class RequestChannelTest {
op: OpType,
entries: Map[String, String],
expectedValues: Map[String, String]): Unit = {
- val alterConfigs = request(incrementalAlterConfigs(resource, entries, op))
- val loggableAlterConfigs = alterConfigs.loggableRequest.asInstanceOf[IncrementalAlterConfigsRequest]
+ val alterConfigs = incrementalAlterConfigs(resource, entries, op)
+ val alterConfigsString = alterConfigs.toString
+ entries.foreach { entry =>
+ if (!alterConfigsString.contains(entry._1)) {
+ fail("Config names should be in the request string")
+ }
+ if (entry._2 != null && alterConfigsString.contains(entry._2)) {
+ fail("Config values should not be in the request string")
+ }
+ }
+ val req = request(alterConfigs)
+ val loggableAlterConfigs = req.loggableRequest.asInstanceOf[IncrementalAlterConfigsRequest]
val loggedConfig = loggableAlterConfigs.data.resources.find(resource.`type`.id, resource.name).configs
assertEquals(expectedValues, toMap(loggedConfig))
- val alterConfigsDesc = RequestConvertToJson.requestDesc(alterConfigs.header, alterConfigs.requestLog.toJava, alterConfigs.isForwarded).toString
+ val alterConfigsDesc = RequestConvertToJson.requestDesc(req.header, req.requestLog.toJava, req.isForwarded).toString
assertFalse(alterConfigsDesc.contains(sensitiveValue), s"Sensitive config logged $alterConfigsDesc")
}
diff --git a/core/src/test/scala/unit/kafka/server/AlterUserScramCredentialsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AlterUserScramCredentialsRequestTest.scala
index ced78873510..4ebe65ec9a9 100644
--- a/core/src/test/scala/unit/kafka/server/AlterUserScramCredentialsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AlterUserScramCredentialsRequestTest.scala
@@ -244,6 +244,9 @@ class AlterUserScramCredentialsRequestTest extends BaseRequestTest {
// create a bunch of credentials
val request1_0 = new AlterUserScramCredentialsRequest.Builder(
new AlterUserScramCredentialsRequestData()
+ .setDeletions(util.Arrays.asList(
+ new AlterUserScramCredentialsRequestData.ScramCredentialDeletion()
+ .setName(user2).setMechanism(ScramMechanism.SCRAM_SHA_256.`type`)))
.setUpsertions(util.Arrays.asList(
new AlterUserScramCredentialsRequestData.ScramCredentialUpsertion()
.setName(user1).setMechanism(ScramMechanism.SCRAM_SHA_256.`type`)
@@ -251,10 +254,15 @@ class AlterUserScramCredentialsRequestTest extends BaseRequestTest {
.setSalt(saltBytes)
.setSaltedPassword(saltedPasswordBytes),
))).build()
+ assertEquals("AlterUserScramCredentialsRequestData(" +
+ "deletions=[ScramCredentialDeletion(name='" + user2 + "', mechanism=" + ScramMechanism.SCRAM_SHA_256.`type` + ")], " +
+ "upsertions=[ScramCredentialUpsertion(name='" + user1 + "', mechanism=" + ScramMechanism.SCRAM_SHA_256.`type` +
+ ", iterations=4096, salt=[], saltedPassword=[])])", request1_0.toString)
val results1_0 = sendAlterUserScramCredentialsRequest(request1_0).data.results
- assertEquals(1, results1_0.size)
- checkNoErrorsAlteringCredentials(results1_0)
+ assertEquals(2, results1_0.size)
+ assertEquals(1, results1_0.asScala.count(_.errorCode == Errors.RESOURCE_NOT_FOUND.code()))
checkUserAppearsInAlterResults(results1_0, user1)
+ checkUserAppearsInAlterResults(results1_0, user2)
// When creating credentials, do not update the same user more than once per request
val request1_1 = new AlterUserScramCredentialsRequest.Builder(
@@ -276,6 +284,8 @@ class AlterUserScramCredentialsRequestTest extends BaseRequestTest {
.setSalt(saltBytes)
.setSaltedPassword(saltedPasswordBytes),
))).build()
+ assertFalse(request1_1.toString.contains(saltBytes))
+ assertFalse(request1_1.toString.contains(saltedPasswordBytes))
val results1_1 = sendAlterUserScramCredentialsRequest(request1_1).data.results
assertEquals(3, results1_1.size)
checkNoErrorsAlteringCredentials(results1_1)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 20a030714f7..bb1f4105ba8 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -9710,7 +9710,12 @@ class KafkaApisTest extends Logging {
@Test
def testEmptyIncrementalAlterConfigsRequestWithKRaft(): Unit = {
- val request = buildRequest(new IncrementalAlterConfigsRequest(new IncrementalAlterConfigsRequestData(), 1.toShort))
+ val alterConfigsRequest = new IncrementalAlterConfigsRequest(new IncrementalAlterConfigsRequestData(), 1.toShort)
+ assertEquals(
+ "IncrementalAlterConfigsRequestData(resources=[], validateOnly=false)",
+ alterConfigsRequest.toString
+ )
+ val request = buildRequest(alterConfigsRequest)
metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
any[Long])).thenReturn(0)
@@ -9722,7 +9727,7 @@ class KafkaApisTest extends Logging {
@Test
def testLog4jIncrementalAlterConfigsRequestWithKRaft(): Unit = {
- val request = buildRequest(new IncrementalAlterConfigsRequest(new IncrementalAlterConfigsRequestData().
+ val alterConfigsRequest = new IncrementalAlterConfigsRequest(new IncrementalAlterConfigsRequestData().
setValidateOnly(true).
setResources(new IAlterConfigsResourceCollection(asList(new IAlterConfigsResource().
setResourceName(brokerId.toString).
@@ -9730,7 +9735,16 @@ class KafkaApisTest extends Logging {
setConfigs(new IAlterableConfigCollection(asList(new IAlterableConfig().
setName(Log4jController.ROOT_LOGGER).
setValue("TRACE")).iterator()))).iterator())),
- 1.toShort))
+ 1.toShort)
+ assertEquals(
+ "IncrementalAlterConfigsRequestData(resources=[" +
+ "AlterConfigsResource(resourceType=" + BROKER_LOGGER.id() + ", " +
+ "resourceName='"+ brokerId + "', " +
+ "configs=[AlterableConfig(name='" + Log4jController.ROOT_LOGGER + "', configOperation=0, value='REDACTED')])], " +
+ "validateOnly=true)",
+ alterConfigsRequest.toString
+ )
+ val request = buildRequest(alterConfigsRequest)
metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
any[Long])).thenReturn(0)