mirror of https://github.com/apache/kafka.git
				
				
				
			Reviewers: Colin P. McCabe <cmccabe@apache.org>
```
Conflicts:
    clients/src/main/java/org/apache/kafka/common/requests/AlterUserScramCredentialsRequest.java - import statement 
    clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsRequest.java - import statement
    core/src/test/scala/unit/kafka/network/RequestChannelTest.scala - difference in unrelated parameter
    core/src/test/scala/unit/kafka/server/KafkaApisTest.scala - different logging and metadatacache instantiation
```
Cherry-Picked-From: 042be5b9ac
Cherry-Picked-By: Alyssa Huang <ahuang@confluent.io>
Cherry-Picked-At: Mon May 12 13:48:31 2025 -0700
			
			
This commit is contained in:
		
							parent
							
								
									ab1a4053ad
								
							
						
					
					
						commit
						2e8ae0ed45
					
				|  | @ -192,6 +192,8 @@ | |||
|       <allow pkg="io.opentelemetry.proto"/> | ||||
|       <!-- for testing --> | ||||
|       <allow pkg="org.apache.kafka.common.telemetry" /> | ||||
|       <!-- for IncrementalAlterConfigsRequest and AlterUserScramCredentialsRequest --> | ||||
|       <allow pkg="com.fasterxml.jackson.databind" /> | ||||
|     </subpackage> | ||||
| 
 | ||||
|     <subpackage name="serialization"> | ||||
|  |  | |||
|  | @ -136,7 +136,7 @@ public abstract class AbstractRequest implements AbstractRequestResponse { | |||
|     } | ||||
| 
 | ||||
|     @Override | ||||
|     public final String toString() { | ||||
|     public String toString() { | ||||
|         return toString(true); | ||||
|     } | ||||
| 
 | ||||
|  |  | |||
|  | @ -17,10 +17,14 @@ | |||
| package org.apache.kafka.common.requests; | ||||
| 
 | ||||
| import org.apache.kafka.common.message.AlterUserScramCredentialsRequestData; | ||||
| import org.apache.kafka.common.message.AlterUserScramCredentialsRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.AlterUserScramCredentialsResponseData; | ||||
| import org.apache.kafka.common.protocol.ApiKeys; | ||||
| import org.apache.kafka.common.protocol.ByteBufferAccessor; | ||||
| 
 | ||||
| import com.fasterxml.jackson.databind.JsonNode; | ||||
| import com.fasterxml.jackson.databind.node.ObjectNode; | ||||
| 
 | ||||
| import java.nio.ByteBuffer; | ||||
| import java.util.List; | ||||
| import java.util.Set; | ||||
|  | @ -82,4 +86,16 @@ public class AlterUserScramCredentialsRequest extends AbstractRequest { | |||
|                         .collect(Collectors.toList()); | ||||
|         return new AlterUserScramCredentialsResponse(new AlterUserScramCredentialsResponseData().setResults(results)); | ||||
|     } | ||||
| 
 | ||||
|     // Do not print salt or saltedPassword | ||||
|     @Override | ||||
|     public String toString() { | ||||
|         JsonNode json = AlterUserScramCredentialsRequestDataJsonConverter.write(data, version()).deepCopy(); | ||||
| 
 | ||||
|         for (JsonNode upsertion : json.get("upsertions")) { | ||||
|             ((ObjectNode) upsertion).put("salt", ""); | ||||
|             ((ObjectNode) upsertion).put("saltedPassword", ""); | ||||
|         } | ||||
|         return AlterUserScramCredentialsRequestDataJsonConverter.read(json, version()).toString(); | ||||
|     } | ||||
| } | ||||
|  |  | |||
|  | @ -21,11 +21,15 @@ import org.apache.kafka.clients.admin.AlterConfigOp; | |||
| import org.apache.kafka.common.config.ConfigResource; | ||||
| import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData; | ||||
| import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterConfigsResource; | ||||
| import org.apache.kafka.common.message.IncrementalAlterConfigsRequestDataJsonConverter; | ||||
| import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData; | ||||
| import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResponse; | ||||
| import org.apache.kafka.common.protocol.ApiKeys; | ||||
| import org.apache.kafka.common.protocol.ByteBufferAccessor; | ||||
| 
 | ||||
| import com.fasterxml.jackson.databind.JsonNode; | ||||
| import com.fasterxml.jackson.databind.node.ObjectNode; | ||||
| 
 | ||||
| import java.nio.ByteBuffer; | ||||
| import java.util.Collection; | ||||
| import java.util.Map; | ||||
|  | @ -107,4 +111,16 @@ public class IncrementalAlterConfigsRequest extends AbstractRequest { | |||
|         } | ||||
|         return new IncrementalAlterConfigsResponse(response); | ||||
|     } | ||||
| 
 | ||||
|     // It is not safe to print all config values | ||||
|     @Override | ||||
|     public String toString() { | ||||
|         JsonNode json = IncrementalAlterConfigsRequestDataJsonConverter.write(data, version()).deepCopy(); | ||||
|         for (JsonNode resource : json.get("resources")) { | ||||
|             for (JsonNode config : resource.get("configs")) { | ||||
|                 ((ObjectNode) config).put("value", "REDACTED"); | ||||
|             } | ||||
|         } | ||||
|         return IncrementalAlterConfigsRequestDataJsonConverter.read(json, version()).toString(); | ||||
|     } | ||||
| } | ||||
|  |  | |||
|  | @ -128,11 +128,21 @@ class RequestChannelTest { | |||
|                      op: OpType, | ||||
|                      entries: Map[String, String], | ||||
|                      expectedValues: Map[String, String]): Unit = { | ||||
|       val alterConfigs = request(incrementalAlterConfigs(resource, entries, op)) | ||||
|       val loggableAlterConfigs = alterConfigs.loggableRequest.asInstanceOf[IncrementalAlterConfigsRequest] | ||||
|       val alterConfigs = incrementalAlterConfigs(resource, entries, op) | ||||
|       val alterConfigsString = alterConfigs.toString | ||||
|       entries.foreach { entry => | ||||
|         if (!alterConfigsString.contains(entry._1)) { | ||||
|           fail("Config names should be in the request string") | ||||
|         } | ||||
|         if (entry._2 != null && alterConfigsString.contains(entry._2)) { | ||||
|           fail("Config values should not be in the request string") | ||||
|         } | ||||
|       } | ||||
|       val req = request(alterConfigs) | ||||
|       val loggableAlterConfigs = req.loggableRequest.asInstanceOf[IncrementalAlterConfigsRequest] | ||||
|       val loggedConfig = loggableAlterConfigs.data.resources.find(resource.`type`.id, resource.name).configs | ||||
|       assertEquals(expectedValues, toMap(loggedConfig)) | ||||
|       val alterConfigsDesc = RequestConvertToJson.requestDesc(alterConfigs.header, alterConfigs.requestLog, alterConfigs.isForwarded).toString | ||||
|       val alterConfigsDesc = RequestConvertToJson.requestDesc(req.header, req.requestLog, req.isForwarded).toString | ||||
|       assertFalse(alterConfigsDesc.contains(sensitiveValue), s"Sensitive config logged $alterConfigsDesc") | ||||
|     } | ||||
| 
 | ||||
|  |  | |||
|  | @ -255,6 +255,9 @@ class AlterUserScramCredentialsRequestTest extends BaseRequestTest { | |||
|     // create a bunch of credentials | ||||
|     val request1_0 = new AlterUserScramCredentialsRequest.Builder( | ||||
|       new AlterUserScramCredentialsRequestData() | ||||
|         .setDeletions(util.Arrays.asList( | ||||
|           new AlterUserScramCredentialsRequestData.ScramCredentialDeletion() | ||||
|             .setName(user2).setMechanism(ScramMechanism.SCRAM_SHA_256.`type`))) | ||||
|         .setUpsertions(util.Arrays.asList( | ||||
|           new AlterUserScramCredentialsRequestData.ScramCredentialUpsertion() | ||||
|             .setName(user1).setMechanism(ScramMechanism.SCRAM_SHA_256.`type`) | ||||
|  | @ -262,10 +265,15 @@ class AlterUserScramCredentialsRequestTest extends BaseRequestTest { | |||
|             .setSalt(saltBytes) | ||||
|             .setSaltedPassword(saltedPasswordBytes), | ||||
|         ))).build() | ||||
|     assertEquals("AlterUserScramCredentialsRequestData(" + | ||||
|       "deletions=[ScramCredentialDeletion(name='" + user2 + "', mechanism=" + ScramMechanism.SCRAM_SHA_256.`type` + ")], " + | ||||
|       "upsertions=[ScramCredentialUpsertion(name='" + user1 + "', mechanism=" + ScramMechanism.SCRAM_SHA_256.`type` + | ||||
|       ", iterations=4096, salt=[], saltedPassword=[])])", request1_0.toString) | ||||
|     val results1_0 = sendAlterUserScramCredentialsRequest(request1_0).data.results | ||||
|     assertEquals(1, results1_0.size) | ||||
|     checkNoErrorsAlteringCredentials(results1_0) | ||||
|     assertEquals(2, results1_0.size) | ||||
|     assertEquals(1, results1_0.asScala.count(_.errorCode == Errors.RESOURCE_NOT_FOUND.code())) | ||||
|     checkUserAppearsInAlterResults(results1_0, user1) | ||||
|     checkUserAppearsInAlterResults(results1_0, user2) | ||||
| 
 | ||||
|     // When creating credentials, do not update the same user more than once per request | ||||
|     val request1_1 = new AlterUserScramCredentialsRequest.Builder( | ||||
|  | @ -287,6 +295,8 @@ class AlterUserScramCredentialsRequestTest extends BaseRequestTest { | |||
|             .setSalt(saltBytes) | ||||
|             .setSaltedPassword(saltedPasswordBytes), | ||||
|         ))).build() | ||||
|     assertFalse(request1_1.toString.contains(saltBytes)) | ||||
|     assertFalse(request1_1.toString.contains(saltedPasswordBytes)) | ||||
|     val results1_1 = sendAlterUserScramCredentialsRequest(request1_1).data.results | ||||
|     assertEquals(3, results1_1.size) | ||||
|     checkNoErrorsAlteringCredentials(results1_1) | ||||
|  |  | |||
|  | @ -6908,7 +6908,12 @@ class KafkaApisTest extends Logging { | |||
| 
 | ||||
|   @Test | ||||
|   def testEmptyIncrementalAlterConfigsRequestWithKRaft(): Unit = { | ||||
|     val request = buildRequest(new IncrementalAlterConfigsRequest(new IncrementalAlterConfigsRequestData(), 1.toShort)) | ||||
|     val alterConfigsRequest = new IncrementalAlterConfigsRequest(new IncrementalAlterConfigsRequestData(), 1.toShort) | ||||
|     assertEquals( | ||||
|       "IncrementalAlterConfigsRequestData(resources=[], validateOnly=false)", | ||||
|       alterConfigsRequest.toString | ||||
|     ) | ||||
|     val request = buildRequest(alterConfigsRequest) | ||||
|     metadataCache = MetadataCache.kRaftMetadataCache(brokerId) | ||||
|     when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](), | ||||
|       any[Long])).thenReturn(0) | ||||
|  | @ -6920,7 +6925,7 @@ class KafkaApisTest extends Logging { | |||
| 
 | ||||
|   @Test | ||||
|   def testLog4jIncrementalAlterConfigsRequestWithKRaft(): Unit = { | ||||
|     val request = buildRequest(new IncrementalAlterConfigsRequest(new IncrementalAlterConfigsRequestData(). | ||||
|     val alterConfigsRequest = new IncrementalAlterConfigsRequest(new IncrementalAlterConfigsRequestData(). | ||||
|       setValidateOnly(true). | ||||
|       setResources(new IAlterConfigsResourceCollection(asList(new IAlterConfigsResource(). | ||||
|         setResourceName(brokerId.toString). | ||||
|  | @ -6928,7 +6933,16 @@ class KafkaApisTest extends Logging { | |||
|         setConfigs(new IAlterableConfigCollection(asList(new IAlterableConfig(). | ||||
|           setName(Log4jController.ROOT_LOGGER). | ||||
|           setValue("TRACE")).iterator()))).iterator())), | ||||
|         1.toShort)) | ||||
|       1.toShort) | ||||
|     assertEquals( | ||||
|       "IncrementalAlterConfigsRequestData(resources=[" + | ||||
|         "AlterConfigsResource(resourceType=" + BROKER_LOGGER.id() + ", " + | ||||
|         "resourceName='"+ brokerId + "', " + | ||||
|         "configs=[AlterableConfig(name='" + Log4jController.ROOT_LOGGER + "', configOperation=0, value='REDACTED')])], " + | ||||
|         "validateOnly=true)", | ||||
|       alterConfigsRequest.toString | ||||
|     ) | ||||
|     val request = buildRequest(alterConfigsRequest) | ||||
|     metadataCache = MetadataCache.kRaftMetadataCache(brokerId) | ||||
|     when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](), | ||||
|       any[Long])).thenReturn(0) | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue