mirror of https://github.com/apache/kafka.git
MINOR: Small cleanups in Connect (#15128)
Reviewers: Divij Vaidya <diviv@amazon.com>
This commit is contained in:
parent
c8d61a5cbe
commit
eccfb03e9a
|
@ -25,7 +25,6 @@ import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import javax.security.auth.login.Configuration;
|
import javax.security.auth.login.Configuration;
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.function.Supplier;
|
import java.util.function.Supplier;
|
||||||
|
|
||||||
|
@ -100,7 +99,7 @@ public class BasicAuthSecurityRestExtension implements ConnectRestExtension {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() throws IOException {
|
public void close() {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -34,7 +34,6 @@ import org.apache.kafka.connect.errors.ConnectException;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.util.Base64;
|
import java.util.Base64;
|
||||||
|
|
||||||
|
@ -42,7 +41,6 @@ import javax.security.auth.callback.Callback;
|
||||||
import javax.security.auth.callback.CallbackHandler;
|
import javax.security.auth.callback.CallbackHandler;
|
||||||
import javax.security.auth.callback.NameCallback;
|
import javax.security.auth.callback.NameCallback;
|
||||||
import javax.security.auth.callback.PasswordCallback;
|
import javax.security.auth.callback.PasswordCallback;
|
||||||
import javax.security.auth.callback.UnsupportedCallbackException;
|
|
||||||
import javax.security.auth.login.LoginContext;
|
import javax.security.auth.login.LoginContext;
|
||||||
import javax.security.auth.login.LoginException;
|
import javax.security.auth.login.LoginException;
|
||||||
import javax.ws.rs.Priorities;
|
import javax.ws.rs.Priorities;
|
||||||
|
@ -87,7 +85,7 @@ public class JaasBasicAuthFilter implements ContainerRequestFilter {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void filter(ContainerRequestContext requestContext) throws IOException {
|
public void filter(ContainerRequestContext requestContext) {
|
||||||
if (isInternalRequest(requestContext)) {
|
if (isInternalRequest(requestContext)) {
|
||||||
log.trace("Skipping authentication for internal request");
|
log.trace("Skipping authentication for internal request");
|
||||||
return;
|
return;
|
||||||
|
@ -119,8 +117,8 @@ public class JaasBasicAuthFilter implements ContainerRequestFilter {
|
||||||
|
|
||||||
public static class BasicAuthCallBackHandler implements CallbackHandler {
|
public static class BasicAuthCallBackHandler implements CallbackHandler {
|
||||||
|
|
||||||
private String username;
|
private final String username;
|
||||||
private String password;
|
private final String password;
|
||||||
|
|
||||||
public BasicAuthCallBackHandler(BasicAuthCredentials credentials) {
|
public BasicAuthCallBackHandler(BasicAuthCredentials credentials) {
|
||||||
username = credentials.username();
|
username = credentials.username();
|
||||||
|
@ -128,7 +126,7 @@ public class JaasBasicAuthFilter implements ContainerRequestFilter {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
|
public void handle(Callback[] callbacks) {
|
||||||
List<Callback> unsupportedCallbacks = new ArrayList<>();
|
List<Callback> unsupportedCallbacks = new ArrayList<>();
|
||||||
for (Callback callback : callbacks) {
|
for (Callback callback : callbacks) {
|
||||||
if (callback instanceof NameCallback) {
|
if (callback instanceof NameCallback) {
|
||||||
|
|
|
@ -54,7 +54,7 @@ public class PropertyFileLoginModule implements LoginModule {
|
||||||
private String fileName;
|
private String fileName;
|
||||||
private boolean authenticated;
|
private boolean authenticated;
|
||||||
|
|
||||||
private static Map<String, Properties> credentialPropertiesMap = new ConcurrentHashMap<>();
|
private static final Map<String, Properties> CREDENTIAL_PROPERTIES = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void initialize(Subject subject, CallbackHandler callbackHandler, Map<String, ?> sharedState, Map<String, ?> options) {
|
public void initialize(Subject subject, CallbackHandler callbackHandler, Map<String, ?> sharedState, Map<String, ?> options) {
|
||||||
|
@ -64,7 +64,7 @@ public class PropertyFileLoginModule implements LoginModule {
|
||||||
throw new ConfigException("Property Credentials file must be specified");
|
throw new ConfigException("Property Credentials file must be specified");
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!credentialPropertiesMap.containsKey(fileName)) {
|
if (!CREDENTIAL_PROPERTIES.containsKey(fileName)) {
|
||||||
log.trace("Opening credential properties file '{}'", fileName);
|
log.trace("Opening credential properties file '{}'", fileName);
|
||||||
Properties credentialProperties = new Properties();
|
Properties credentialProperties = new Properties();
|
||||||
try {
|
try {
|
||||||
|
@ -72,7 +72,7 @@ public class PropertyFileLoginModule implements LoginModule {
|
||||||
log.trace("Parsing credential properties file '{}'", fileName);
|
log.trace("Parsing credential properties file '{}'", fileName);
|
||||||
credentialProperties.load(inputStream);
|
credentialProperties.load(inputStream);
|
||||||
}
|
}
|
||||||
credentialPropertiesMap.putIfAbsent(fileName, credentialProperties);
|
CREDENTIAL_PROPERTIES.putIfAbsent(fileName, credentialProperties);
|
||||||
if (credentialProperties.isEmpty())
|
if (credentialProperties.isEmpty())
|
||||||
log.warn("Credential properties file '{}' is empty; all requests will be permitted",
|
log.warn("Credential properties file '{}' is empty; all requests will be permitted",
|
||||||
fileName);
|
fileName);
|
||||||
|
@ -101,7 +101,7 @@ public class PropertyFileLoginModule implements LoginModule {
|
||||||
String username = ((NameCallback) callbacks[0]).getName();
|
String username = ((NameCallback) callbacks[0]).getName();
|
||||||
char[] passwordChars = ((PasswordCallback) callbacks[1]).getPassword();
|
char[] passwordChars = ((PasswordCallback) callbacks[1]).getPassword();
|
||||||
String password = passwordChars != null ? new String(passwordChars) : null;
|
String password = passwordChars != null ? new String(passwordChars) : null;
|
||||||
Properties credentialProperties = credentialPropertiesMap.get(fileName);
|
Properties credentialProperties = CREDENTIAL_PROPERTIES.get(fileName);
|
||||||
|
|
||||||
if (credentialProperties.isEmpty()) {
|
if (credentialProperties.isEmpty()) {
|
||||||
log.trace("Not validating credentials for user '{}' as credential properties file '{}' is empty",
|
log.trace("Not validating credentials for user '{}' as credential properties file '{}' is empty",
|
||||||
|
@ -132,17 +132,17 @@ public class PropertyFileLoginModule implements LoginModule {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean commit() throws LoginException {
|
public boolean commit() {
|
||||||
return authenticated;
|
return authenticated;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean abort() throws LoginException {
|
public boolean abort() {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean logout() throws LoginException {
|
public boolean logout() {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -131,7 +131,7 @@ public class JaasBasicAuthFilterTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testUnknownCredentialsFile() throws IOException {
|
public void testUnknownCredentialsFile() {
|
||||||
JaasBasicAuthFilter jaasBasicAuthFilter = setupJaasFilter("KafkaConnect", "/tmp/testcrednetial");
|
JaasBasicAuthFilter jaasBasicAuthFilter = setupJaasFilter("KafkaConnect", "/tmp/testcrednetial");
|
||||||
ContainerRequestContext requestContext = setMock("Basic", "user", "password");
|
ContainerRequestContext requestContext = setMock("Basic", "user", "password");
|
||||||
jaasBasicAuthFilter.filter(requestContext);
|
jaasBasicAuthFilter.filter(requestContext);
|
||||||
|
@ -142,7 +142,7 @@ public class JaasBasicAuthFilterTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testNoFileOption() throws IOException {
|
public void testNoFileOption() {
|
||||||
JaasBasicAuthFilter jaasBasicAuthFilter = setupJaasFilter("KafkaConnect", null);
|
JaasBasicAuthFilter jaasBasicAuthFilter = setupJaasFilter("KafkaConnect", null);
|
||||||
ContainerRequestContext requestContext = setMock("Basic", "user", "password");
|
ContainerRequestContext requestContext = setMock("Basic", "user", "password");
|
||||||
jaasBasicAuthFilter.filter(requestContext);
|
jaasBasicAuthFilter.filter(requestContext);
|
||||||
|
|
|
@ -40,7 +40,6 @@ public class FileStreamSinkTaskTest {
|
||||||
|
|
||||||
private FileStreamSinkTask task;
|
private FileStreamSinkTask task;
|
||||||
private ByteArrayOutputStream os;
|
private ByteArrayOutputStream os;
|
||||||
private PrintStream printStream;
|
|
||||||
|
|
||||||
@TempDir
|
@TempDir
|
||||||
public Path topDir;
|
public Path topDir;
|
||||||
|
@ -49,7 +48,7 @@ public class FileStreamSinkTaskTest {
|
||||||
@BeforeEach
|
@BeforeEach
|
||||||
public void setup() {
|
public void setup() {
|
||||||
os = new ByteArrayOutputStream();
|
os = new ByteArrayOutputStream();
|
||||||
printStream = new PrintStream(os);
|
PrintStream printStream = new PrintStream(os);
|
||||||
task = new FileStreamSinkTask(printStream);
|
task = new FileStreamSinkTask(printStream);
|
||||||
outputFile = topDir.resolve("connect.output").toAbsolutePath().toString();
|
outputFile = topDir.resolve("connect.output").toAbsolutePath().toString();
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue