mirror of https://github.com/apache/kafka.git
Reviewers: Colin P. McCabe <cmccabe@apache.org>
This commit is contained in:
parent
064afe2c65
commit
042be5b9ac
|
@ -199,6 +199,8 @@
|
||||||
<allow pkg="io.opentelemetry.proto"/>
|
<allow pkg="io.opentelemetry.proto"/>
|
||||||
<!-- for testing -->
|
<!-- for testing -->
|
||||||
<allow pkg="org.apache.kafka.common.telemetry" />
|
<allow pkg="org.apache.kafka.common.telemetry" />
|
||||||
|
<!-- for IncrementalAlterConfigsRequest and AlterUserScramCredentialsRequest -->
|
||||||
|
<allow pkg="com.fasterxml.jackson.databind" />
|
||||||
</subpackage>
|
</subpackage>
|
||||||
|
|
||||||
<subpackage name="serialization">
|
<subpackage name="serialization">
|
||||||
|
|
|
@ -138,7 +138,7 @@ public abstract class AbstractRequest implements AbstractRequestResponse {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public final String toString() {
|
public String toString() {
|
||||||
return toString(true);
|
return toString(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -17,10 +17,14 @@
|
||||||
package org.apache.kafka.common.requests;
|
package org.apache.kafka.common.requests;
|
||||||
|
|
||||||
import org.apache.kafka.common.message.AlterUserScramCredentialsRequestData;
|
import org.apache.kafka.common.message.AlterUserScramCredentialsRequestData;
|
||||||
|
import org.apache.kafka.common.message.AlterUserScramCredentialsRequestDataJsonConverter;
|
||||||
import org.apache.kafka.common.message.AlterUserScramCredentialsResponseData;
|
import org.apache.kafka.common.message.AlterUserScramCredentialsResponseData;
|
||||||
import org.apache.kafka.common.protocol.ApiKeys;
|
import org.apache.kafka.common.protocol.ApiKeys;
|
||||||
import org.apache.kafka.common.protocol.Readable;
|
import org.apache.kafka.common.protocol.Readable;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.JsonNode;
|
||||||
|
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
@ -81,4 +85,16 @@ public class AlterUserScramCredentialsRequest extends AbstractRequest {
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
return new AlterUserScramCredentialsResponse(new AlterUserScramCredentialsResponseData().setResults(results));
|
return new AlterUserScramCredentialsResponse(new AlterUserScramCredentialsResponseData().setResults(results));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Do not print salt or saltedPassword
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
JsonNode json = AlterUserScramCredentialsRequestDataJsonConverter.write(data, version()).deepCopy();
|
||||||
|
|
||||||
|
for (JsonNode upsertion : json.get("upsertions")) {
|
||||||
|
((ObjectNode) upsertion).put("salt", "");
|
||||||
|
((ObjectNode) upsertion).put("saltedPassword", "");
|
||||||
|
}
|
||||||
|
return AlterUserScramCredentialsRequestDataJsonConverter.read(json, version()).toString();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,11 +21,15 @@ import org.apache.kafka.clients.admin.AlterConfigOp;
|
||||||
import org.apache.kafka.common.config.ConfigResource;
|
import org.apache.kafka.common.config.ConfigResource;
|
||||||
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData;
|
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData;
|
||||||
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterConfigsResource;
|
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterConfigsResource;
|
||||||
|
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestDataJsonConverter;
|
||||||
import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData;
|
import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData;
|
||||||
import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResponse;
|
import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResponse;
|
||||||
import org.apache.kafka.common.protocol.ApiKeys;
|
import org.apache.kafka.common.protocol.ApiKeys;
|
||||||
import org.apache.kafka.common.protocol.Readable;
|
import org.apache.kafka.common.protocol.Readable;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.JsonNode;
|
||||||
|
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||||
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
@ -106,4 +110,16 @@ public class IncrementalAlterConfigsRequest extends AbstractRequest {
|
||||||
}
|
}
|
||||||
return new IncrementalAlterConfigsResponse(response);
|
return new IncrementalAlterConfigsResponse(response);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// It is not safe to print all config values
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
JsonNode json = IncrementalAlterConfigsRequestDataJsonConverter.write(data, version()).deepCopy();
|
||||||
|
for (JsonNode resource : json.get("resources")) {
|
||||||
|
for (JsonNode config : resource.get("configs")) {
|
||||||
|
((ObjectNode) config).put("value", "REDACTED");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return IncrementalAlterConfigsRequestDataJsonConverter.read(json, version()).toString();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -131,11 +131,21 @@ class RequestChannelTest {
|
||||||
op: OpType,
|
op: OpType,
|
||||||
entries: Map[String, String],
|
entries: Map[String, String],
|
||||||
expectedValues: Map[String, String]): Unit = {
|
expectedValues: Map[String, String]): Unit = {
|
||||||
val alterConfigs = request(incrementalAlterConfigs(resource, entries, op))
|
val alterConfigs = incrementalAlterConfigs(resource, entries, op)
|
||||||
val loggableAlterConfigs = alterConfigs.loggableRequest.asInstanceOf[IncrementalAlterConfigsRequest]
|
val alterConfigsString = alterConfigs.toString
|
||||||
|
entries.foreach { entry =>
|
||||||
|
if (!alterConfigsString.contains(entry._1)) {
|
||||||
|
fail("Config names should be in the request string")
|
||||||
|
}
|
||||||
|
if (entry._2 != null && alterConfigsString.contains(entry._2)) {
|
||||||
|
fail("Config values should not be in the request string")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
val req = request(alterConfigs)
|
||||||
|
val loggableAlterConfigs = req.loggableRequest.asInstanceOf[IncrementalAlterConfigsRequest]
|
||||||
val loggedConfig = loggableAlterConfigs.data.resources.find(resource.`type`.id, resource.name).configs
|
val loggedConfig = loggableAlterConfigs.data.resources.find(resource.`type`.id, resource.name).configs
|
||||||
assertEquals(expectedValues, toMap(loggedConfig))
|
assertEquals(expectedValues, toMap(loggedConfig))
|
||||||
val alterConfigsDesc = RequestConvertToJson.requestDesc(alterConfigs.header, alterConfigs.requestLog.toJava, alterConfigs.isForwarded).toString
|
val alterConfigsDesc = RequestConvertToJson.requestDesc(req.header, req.requestLog.toJava, req.isForwarded).toString
|
||||||
assertFalse(alterConfigsDesc.contains(sensitiveValue), s"Sensitive config logged $alterConfigsDesc")
|
assertFalse(alterConfigsDesc.contains(sensitiveValue), s"Sensitive config logged $alterConfigsDesc")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -244,6 +244,9 @@ class AlterUserScramCredentialsRequestTest extends BaseRequestTest {
|
||||||
// create a bunch of credentials
|
// create a bunch of credentials
|
||||||
val request1_0 = new AlterUserScramCredentialsRequest.Builder(
|
val request1_0 = new AlterUserScramCredentialsRequest.Builder(
|
||||||
new AlterUserScramCredentialsRequestData()
|
new AlterUserScramCredentialsRequestData()
|
||||||
|
.setDeletions(util.Arrays.asList(
|
||||||
|
new AlterUserScramCredentialsRequestData.ScramCredentialDeletion()
|
||||||
|
.setName(user2).setMechanism(ScramMechanism.SCRAM_SHA_256.`type`)))
|
||||||
.setUpsertions(util.Arrays.asList(
|
.setUpsertions(util.Arrays.asList(
|
||||||
new AlterUserScramCredentialsRequestData.ScramCredentialUpsertion()
|
new AlterUserScramCredentialsRequestData.ScramCredentialUpsertion()
|
||||||
.setName(user1).setMechanism(ScramMechanism.SCRAM_SHA_256.`type`)
|
.setName(user1).setMechanism(ScramMechanism.SCRAM_SHA_256.`type`)
|
||||||
|
@ -251,10 +254,15 @@ class AlterUserScramCredentialsRequestTest extends BaseRequestTest {
|
||||||
.setSalt(saltBytes)
|
.setSalt(saltBytes)
|
||||||
.setSaltedPassword(saltedPasswordBytes),
|
.setSaltedPassword(saltedPasswordBytes),
|
||||||
))).build()
|
))).build()
|
||||||
|
assertEquals("AlterUserScramCredentialsRequestData(" +
|
||||||
|
"deletions=[ScramCredentialDeletion(name='" + user2 + "', mechanism=" + ScramMechanism.SCRAM_SHA_256.`type` + ")], " +
|
||||||
|
"upsertions=[ScramCredentialUpsertion(name='" + user1 + "', mechanism=" + ScramMechanism.SCRAM_SHA_256.`type` +
|
||||||
|
", iterations=4096, salt=[], saltedPassword=[])])", request1_0.toString)
|
||||||
val results1_0 = sendAlterUserScramCredentialsRequest(request1_0).data.results
|
val results1_0 = sendAlterUserScramCredentialsRequest(request1_0).data.results
|
||||||
assertEquals(1, results1_0.size)
|
assertEquals(2, results1_0.size)
|
||||||
checkNoErrorsAlteringCredentials(results1_0)
|
assertEquals(1, results1_0.asScala.count(_.errorCode == Errors.RESOURCE_NOT_FOUND.code()))
|
||||||
checkUserAppearsInAlterResults(results1_0, user1)
|
checkUserAppearsInAlterResults(results1_0, user1)
|
||||||
|
checkUserAppearsInAlterResults(results1_0, user2)
|
||||||
|
|
||||||
// When creating credentials, do not update the same user more than once per request
|
// When creating credentials, do not update the same user more than once per request
|
||||||
val request1_1 = new AlterUserScramCredentialsRequest.Builder(
|
val request1_1 = new AlterUserScramCredentialsRequest.Builder(
|
||||||
|
@ -276,6 +284,8 @@ class AlterUserScramCredentialsRequestTest extends BaseRequestTest {
|
||||||
.setSalt(saltBytes)
|
.setSalt(saltBytes)
|
||||||
.setSaltedPassword(saltedPasswordBytes),
|
.setSaltedPassword(saltedPasswordBytes),
|
||||||
))).build()
|
))).build()
|
||||||
|
assertFalse(request1_1.toString.contains(saltBytes))
|
||||||
|
assertFalse(request1_1.toString.contains(saltedPasswordBytes))
|
||||||
val results1_1 = sendAlterUserScramCredentialsRequest(request1_1).data.results
|
val results1_1 = sendAlterUserScramCredentialsRequest(request1_1).data.results
|
||||||
assertEquals(3, results1_1.size)
|
assertEquals(3, results1_1.size)
|
||||||
checkNoErrorsAlteringCredentials(results1_1)
|
checkNoErrorsAlteringCredentials(results1_1)
|
||||||
|
|
|
@ -9752,7 +9752,12 @@ class KafkaApisTest extends Logging {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
def testEmptyIncrementalAlterConfigsRequestWithKRaft(): Unit = {
|
def testEmptyIncrementalAlterConfigsRequestWithKRaft(): Unit = {
|
||||||
val request = buildRequest(new IncrementalAlterConfigsRequest(new IncrementalAlterConfigsRequestData(), 1.toShort))
|
val alterConfigsRequest = new IncrementalAlterConfigsRequest(new IncrementalAlterConfigsRequestData(), 1.toShort)
|
||||||
|
assertEquals(
|
||||||
|
"IncrementalAlterConfigsRequestData(resources=[], validateOnly=false)",
|
||||||
|
alterConfigsRequest.toString
|
||||||
|
)
|
||||||
|
val request = buildRequest(alterConfigsRequest)
|
||||||
metadataCache = new KRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)
|
metadataCache = new KRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)
|
||||||
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
|
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
|
||||||
any[Long])).thenReturn(0)
|
any[Long])).thenReturn(0)
|
||||||
|
@ -9764,15 +9769,23 @@ class KafkaApisTest extends Logging {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
def testLog4jIncrementalAlterConfigsRequestWithKRaft(): Unit = {
|
def testLog4jIncrementalAlterConfigsRequestWithKRaft(): Unit = {
|
||||||
val request = buildRequest(new IncrementalAlterConfigsRequest(new IncrementalAlterConfigsRequestData().
|
val alterConfigsRequest = new IncrementalAlterConfigsRequest(new IncrementalAlterConfigsRequestData().
|
||||||
setValidateOnly(true).
|
setValidateOnly(true).
|
||||||
setResources(new IAlterConfigsResourceCollection(asList(new IAlterConfigsResource().
|
setResources(new IAlterConfigsResourceCollection(asList(new IAlterConfigsResource().
|
||||||
setResourceName(brokerId.toString).
|
setResourceName(brokerId.toString).
|
||||||
setResourceType(BROKER_LOGGER.id()).
|
setResourceType(BROKER_LOGGER.id()).
|
||||||
setConfigs(new IAlterableConfigCollection(asList(new IAlterableConfig().
|
setConfigs(new IAlterableConfigCollection(asList(new IAlterableConfig().
|
||||||
setName(LoggingController.ROOT_LOGGER).
|
setName(LoggingController.ROOT_LOGGER).
|
||||||
setValue("TRACE")).iterator()))).iterator())),
|
setValue("TRACE")).iterator()))).iterator())), 1.toShort)
|
||||||
1.toShort))
|
assertEquals(
|
||||||
|
"IncrementalAlterConfigsRequestData(resources=[" +
|
||||||
|
"AlterConfigsResource(resourceType=" + BROKER_LOGGER.id() + ", " +
|
||||||
|
"resourceName='"+ brokerId + "', " +
|
||||||
|
"configs=[AlterableConfig(name='" + LoggingController.ROOT_LOGGER + "', configOperation=0, value='REDACTED')])], " +
|
||||||
|
"validateOnly=true)",
|
||||||
|
alterConfigsRequest.toString
|
||||||
|
)
|
||||||
|
val request = buildRequest(alterConfigsRequest)
|
||||||
metadataCache = new KRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)
|
metadataCache = new KRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)
|
||||||
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
|
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
|
||||||
any[Long])).thenReturn(0)
|
any[Long])).thenReturn(0)
|
||||||
|
|
Loading…
Reference in New Issue