Port krb5kdc to test container and rework hdfs handling (#106228)

This ports our krb5kdc test fixture to test container and reworks hdfs handling to also be based on test containers.
The yaml rest tests that are using hdfs required introducing variable substitution in yamlresttestparser handling.
This commit is contained in:
Rene Groeschke 2024-03-26 08:39:39 +01:00 committed by GitHub
parent a3d96b9333
commit b39b3731a7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
42 changed files with 1913 additions and 1480 deletions

View File

@ -0,0 +1,142 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package org.elasticsearch.gradle.internal.shadow;
import com.github.jengelman.gradle.plugins.shadow.ShadowStats;
import com.github.jengelman.gradle.plugins.shadow.relocation.RelocateClassContext;
import com.github.jengelman.gradle.plugins.shadow.relocation.Relocator;
import com.github.jengelman.gradle.plugins.shadow.transformers.Transformer;
import com.github.jengelman.gradle.plugins.shadow.transformers.TransformerContext;
import org.apache.commons.io.IOUtils;
import org.apache.tools.zip.ZipEntry;
import org.apache.tools.zip.ZipOutputStream;
import org.gradle.api.file.FileTreeElement;
import org.w3c.dom.Document;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import java.io.BufferedInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.List;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.transform.TransformerException;
import javax.xml.transform.TransformerFactory;
import javax.xml.transform.dom.DOMSource;
import javax.xml.transform.stream.StreamResult;
public class XmlClassRelocationTransformer implements Transformer {
boolean hasTransformedResource = false;
private Document doc;
private String resource;
@Override
public boolean canTransformResource(FileTreeElement element) {
String path = element.getRelativePath().getPathString();
if (resource != null && resource.equals(path)) {
return true;
}
return false;
}
@Override
public void transform(TransformerContext context) {
try {
BufferedInputStream bis = new BufferedInputStream(context.getIs());
DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance();
DocumentBuilder dBuilder = dbFactory.newDocumentBuilder();
doc = dBuilder.parse(bis);
doc.getDocumentElement().normalize();
Node root = doc.getDocumentElement();
walkThroughNodes(root, context);
if (hasTransformedResource == false) {
this.doc = null;
}
} catch (Exception e) {
throw new RuntimeException("Error parsing xml file in " + context.getIs(), e);
}
}
private static String getRelocatedClass(String className, TransformerContext context) {
List<Relocator> relocators = context.getRelocators();
ShadowStats stats = context.getStats();
if (className != null && className.length() > 0 && relocators != null) {
for (Relocator relocator : relocators) {
if (relocator.canRelocateClass(className)) {
RelocateClassContext relocateClassContext = new RelocateClassContext(className, stats);
return relocator.relocateClass(relocateClassContext);
}
}
}
return className;
}
private void walkThroughNodes(Node node, TransformerContext context) {
if (node.getNodeType() == Node.TEXT_NODE) {
String nodeValue = node.getNodeValue();
if (nodeValue.isBlank() == false) {
String relocatedClass = getRelocatedClass(nodeValue, context);
if (relocatedClass.equals(nodeValue) == false) {
node.setNodeValue(relocatedClass);
hasTransformedResource = true;
}
}
}
NodeList nodeList = node.getChildNodes();
for (int i = 0; i < nodeList.getLength(); i++) {
Node currentNode = nodeList.item(i);
walkThroughNodes(currentNode, context);
}
}
@Override
public boolean hasTransformedResource() {
return hasTransformedResource;
}
@Override
public void modifyOutputStream(ZipOutputStream os, boolean preserveFileTimestamps) {
ZipEntry entry = new ZipEntry(resource);
entry.setTime(TransformerContext.getEntryTimestamp(preserveFileTimestamps, entry.getTime()));
try {
// Write the content back to the XML file
TransformerFactory transformerFactory = TransformerFactory.newInstance();
DOMSource source = new DOMSource(doc);
// Result stream will be a ByteArrayOutputStream
ByteArrayOutputStream baos = new ByteArrayOutputStream();
StreamResult result = new StreamResult(baos);
// Do the transformation and serialization
transformerFactory.newTransformer().transform(source, result);
os.putNextEntry(entry);
IOUtils.write(baos.toByteArray(), os);
os.closeEntry();
} catch (IOException e) {
throw new RuntimeException(e);
} catch (TransformerException e) {
throw new RuntimeException(e);
} finally {
hasTransformedResource = false;
doc = null;
}
}
@Override
public String getName() {
return getClass().getSimpleName();
}
}

View File

@ -8,7 +8,7 @@
package org.elasticsearch.gradle.internal.test.rest;
import org.elasticsearch.gradle.testclusters.StandaloneRestIntegTestTask;
import org.elasticsearch.gradle.internal.test.RestIntegTestTask;
import org.elasticsearch.gradle.util.GradleUtils;
import org.gradle.api.Plugin;
import org.gradle.api.Project;
@ -40,13 +40,7 @@ public class InternalJavaRestTestPlugin implements Plugin<Project> {
}
// setup the javaRestTest task
// we use a StandloneRestIntegTestTask here so that the conventions of RestTestBasePlugin don't create a test cluster
TaskProvider<StandaloneRestIntegTestTask> testTask = registerTestTask(
project,
javaTestSourceSet,
SOURCE_SET_NAME,
StandaloneRestIntegTestTask.class
);
TaskProvider<RestIntegTestTask> testTask = registerTestTask(project, javaTestSourceSet, SOURCE_SET_NAME, RestIntegTestTask.class);
project.getTasks().named(JavaBasePlugin.CHECK_TASK_NAME).configure(check -> check.dependsOn(testTask));

View File

@ -8,7 +8,7 @@
package org.elasticsearch.gradle.internal.test.rest;
import org.elasticsearch.gradle.testclusters.StandaloneRestIntegTestTask;
import org.elasticsearch.gradle.internal.test.RestIntegTestTask;
import org.elasticsearch.gradle.util.GradleUtils;
import org.gradle.api.Plugin;
import org.gradle.api.Project;
@ -36,12 +36,7 @@ public class InternalYamlRestTestPlugin implements Plugin<Project> {
SourceSetContainer sourceSets = project.getExtensions().getByType(SourceSetContainer.class);
SourceSet yamlTestSourceSet = sourceSets.create(SOURCE_SET_NAME);
TaskProvider<StandaloneRestIntegTestTask> testTask = registerTestTask(
project,
yamlTestSourceSet,
SOURCE_SET_NAME,
StandaloneRestIntegTestTask.class
);
TaskProvider<RestIntegTestTask> testTask = registerTestTask(project, yamlTestSourceSet, SOURCE_SET_NAME, RestIntegTestTask.class);
project.getTasks().named(JavaBasePlugin.CHECK_TASK_NAME).configure(check -> check.dependsOn(testTask));

View File

@ -6,17 +6,10 @@
* Side Public License, v 1.
*/
import org.apache.tools.ant.filters.ReplaceTokens
import org.elasticsearch.gradle.internal.info.BuildParams
import org.elasticsearch.gradle.internal.test.RestIntegTestTask
import org.elasticsearch.gradle.internal.util.HdfsUtils
import java.nio.file.Path
import static org.elasticsearch.gradle.PropertyNormalization.IGNORE_VALUE
apply plugin: 'elasticsearch.test.fixtures'
apply plugin: 'elasticsearch.legacy-java-rest-test'
apply plugin: 'elasticsearch.legacy-yaml-rest-test'
apply plugin: 'elasticsearch.internal-java-rest-test'
apply plugin: 'elasticsearch.internal-yaml-rest-test'
esplugin {
description 'The HDFS repository plugin adds support for Hadoop Distributed File-System (HDFS) repositories.'
@ -27,15 +20,11 @@ versions << [
'hadoop': '3.3.3'
]
final int minTestedHadoopVersion = 2;
final int maxTestedHadoopVersion = 3;
testFixtures.useFixture ":test:fixtures:krb5kdc-fixture", "hdfs"
configurations {
krb5Config
krb5Keytabs
hdfsFixture2
hdfsFixture3
}
dependencies {
api project(path: 'hadoop-client-api', configuration: 'shadow')
if (isEclipse) {
@ -57,10 +46,27 @@ dependencies {
api 'javax.servlet:javax.servlet-api:3.1.0'
api "org.slf4j:slf4j-api:${versions.slf4j}"
runtimeOnly "org.slf4j:slf4j-nop:${versions.slf4j}"
// runtimeOnly("org.apache.logging.log4j:log4j-slf4j-impl:${versions.log4j}") https://github.com/elastic/elasticsearch/issues/93714
// https://github.com/elastic/elasticsearch/issues/93714
// runtimeOnly("org.apache.logging.log4j:log4j-slf4j-impl:${versions.log4j}")
krb5Keytabs project(path: ':test:fixtures:krb5kdc-fixture', configuration: 'krb5KeytabsHdfsDir')
krb5Config project(path: ':test:fixtures:krb5kdc-fixture', configuration: 'krb5ConfHdfsFile')
testImplementation(project(':test:fixtures:hdfs-fixture'))
javaRestTestCompileOnly(project(':test:fixtures:hdfs-fixture'))
javaRestTestImplementation project(':test:fixtures:krb5kdc-fixture')
javaRestTestImplementation "org.slf4j:slf4j-api:${versions.slf4j}"
javaRestTestRuntimeOnly "com.google.guava:guava:16.0.1"
javaRestTestRuntimeOnly "commons-cli:commons-cli:1.2"
javaRestTestRuntimeOnly "org.apache.logging.log4j:log4j-1.2-api:${versions.log4j}"
yamlRestTestCompileOnly(project(':test:fixtures:hdfs-fixture'))
yamlRestTestImplementation project(':test:fixtures:krb5kdc-fixture')
yamlRestTestImplementation "org.slf4j:slf4j-api:${versions.slf4j}"
yamlRestTestRuntimeOnly "com.google.guava:guava:16.0.1"
yamlRestTestRuntimeOnly "commons-cli:commons-cli:1.2"
yamlRestTestRuntimeOnly "org.apache.logging.log4j:log4j-1.2-api:${versions.log4j}"
hdfsFixture2 project(path: ':test:fixtures:hdfs-fixture', configuration: 'shadowedHdfs2')
hdfsFixture3 project(path: ':test:fixtures:hdfs-fixture', configuration: 'shadow')
}
restResources {
@ -69,237 +75,39 @@ restResources {
}
}
normalization {
runtimeClasspath {
// ignore generated keytab files for the purposes of build avoidance
ignore '*.keytab'
// ignore fixture ports file which is on the classpath primarily to pacify the security manager
ignore 'ports'
}
}
tasks.named("dependencyLicenses").configure {
mapping from: /hadoop-.*/, to: 'hadoop'
}
// TODO work that into the java rest test plugin when combined with java plugin
sourceSets {
javaRestTest {
compileClasspath = compileClasspath + main.compileClasspath
runtimeClasspath = runtimeClasspath + main.runtimeClasspath + files("src/main/plugin-metadata")
}
tasks.withType(RestIntegTestTask).configureEach {
usesDefaultDistribution()
jvmArgs '--add-exports', 'java.security.jgss/sun.security.krb5=ALL-UNNAMED'
}
tasks.named('javaRestTest').configure {
enabled = false
classpath = sourceSets.javaRestTest.runtimeClasspath + configurations.hdfsFixture3
}
tasks.register("javaRestTestHdfs2", RestIntegTestTask) {
description = "Runs rest tests against an elasticsearch cluster with HDFS version 2"
testClassesDirs = sourceSets.javaRestTest.output.classesDirs
classpath = sourceSets.javaRestTest.runtimeClasspath + configurations.hdfsFixture2
}
tasks.named('yamlRestTest').configure {
enabled = false
classpath = sourceSets.yamlRestTest.runtimeClasspath + configurations.hdfsFixture2
}
String realm = "BUILD.ELASTIC.CO"
String krb5conf = project(':test:fixtures:krb5kdc-fixture').ext.krb5Conf("hdfs")
// Determine HDFS Fixture compatibility for the current build environment.
ext.fixtureSupported = project.provider(() -> HdfsUtils.isHdfsFixtureSupported(project))
for (int hadoopVersion = minTestedHadoopVersion; hadoopVersion <= maxTestedHadoopVersion; hadoopVersion++) {
final int hadoopVer = hadoopVersion
configurations.create("hdfs" + hadoopVersion + "Fixture")
dependencies.add("hdfs" + hadoopVersion + "Fixture", project(':test:fixtures:hdfs' + hadoopVersion + '-fixture'))
for (String fixtureName : ['hdfs' + hadoopVersion + 'Fixture', 'haHdfs' + hadoopVersion + 'Fixture', 'secureHdfs' + hadoopVersion + 'Fixture', 'secureHaHdfs' + hadoopVersion + 'Fixture']) {
project.tasks.register(fixtureName, org.elasticsearch.gradle.internal.test.AntFixture) {
executable = "${BuildParams.runtimeJavaHome}/bin/java"
dependsOn project.configurations.getByName("hdfs" + hadoopVer + "Fixture"), project.configurations.krb5Config, project.configurations.krb5Keytabs
env 'CLASSPATH', "${-> project.configurations.getByName("hdfs" + hadoopVer + "Fixture").asPath}"
maxWaitInSeconds 60
BuildParams.withFipsEnabledOnly(it)
waitCondition = { fixture, ant ->
// the hdfs.MiniHDFS fixture writes the ports file when
// it's ready, so we can just wait for the file to exist
return fixture.portsFile.exists()
}
final List<String> miniHDFSArgs = []
// If it's a secure fixture, then depend on Kerberos Fixture and principals + add the krb5conf to the JVM options
if (name.startsWith('secure')) {
miniHDFSArgs.addAll(["--add-exports", "java.security.jgss/sun.security.krb5=ALL-UNNAMED"])
miniHDFSArgs.add("-Djava.security.krb5.conf=${project.configurations.krb5Config.getSingleFile().getPath()}")
miniHDFSArgs.add("-Dhdfs.config.port=" + getSecureNamenodePortForVersion(hadoopVer))
} else {
miniHDFSArgs.add("-Dhdfs.config.port=" + getNonSecureNamenodePortForVersion(hadoopVer))
}
// If it's an HA fixture, set a nameservice to use in the JVM options
if (name.startsWith('haHdfs') || name.startsWith('secureHaHdfs')) {
miniHDFSArgs.add("-Dha-nameservice=ha-hdfs")
}
// Common options
miniHDFSArgs.add('hdfs.MiniHDFS')
miniHDFSArgs.add(baseDir)
// If it's a secure fixture, then set the principal name and keytab locations to use for auth.
if (name.startsWith('secure')) {
miniHDFSArgs.add("hdfs/hdfs.build.elastic.co@${realm}")
miniHDFSArgs.add(new File(project.configurations.krb5Keytabs.singleFile, "hdfs_hdfs.build.elastic.co.keytab").getPath())
}
args miniHDFSArgs.toArray()
}
}
for (String integTestTaskName : ['javaRestTest' + hadoopVersion, 'javaRestTestSecure' + hadoopVersion]) {
tasks.register(integTestTaskName, RestIntegTestTask) {
description = "Runs rest tests against an elasticsearch cluster with HDFS" + hadoopVer + "-HA"
if (name.contains("Secure")) {
dependsOn "secureHaHdfs" + hadoopVer + "Fixture"
}
File portsFileDir = file("${workingDir}/hdfs" + hadoopVer + "Fixture")
Path portsFile = name.contains("Secure") ?
buildDir.toPath()
.resolve("fixtures")
.resolve("secureHaHdfs" + hadoopVer + "Fixture")
.resolve("ports") :
buildDir.toPath()
.resolve("fixtures")
.resolve("haHdfs" + hadoopVer + "Fixture")
.resolve("ports")
nonInputProperties.systemProperty "test.hdfs-fixture.ports", file("$portsFileDir/ports")
// Copy ports file to separate location which is placed on the test classpath
doFirst {
mkdir(portsFileDir)
copy {
from portsFile
into portsFileDir
}
}
testClassesDirs = sourceSets.javaRestTest.output.classesDirs
// Set the keytab files in the classpath so that we can access them from test code without the security manager
// freaking out.
classpath = sourceSets.javaRestTest.runtimeClasspath +
configurations.krb5Keytabs +
files(portsFileDir)
}
}
for (String integTestTaskName : ['yamlRestTest' + hadoopVersion, 'yamlRestTestSecure' + hadoopVersion]) {
tasks.register(integTestTaskName, RestIntegTestTask) {
description = "Runs rest tests against an elasticsearch cluster with HDFS" + hadoopVer
if (name.contains("Secure")) {
dependsOn "secureHdfs" + hadoopVer + "Fixture"
}
tasks.register("yamlRestTestHdfs2", RestIntegTestTask) {
description = "Runs yaml rest tests against an elasticsearch cluster with HDFS version 2"
testClassesDirs = sourceSets.yamlRestTest.output.classesDirs
classpath = sourceSets.yamlRestTest.runtimeClasspath
}
}
def processHadoopTestResources = tasks.register("processHadoop" + hadoopVer + "TestResources", Copy)
processHadoopTestResources.configure {
Map<String, Object> expansions = [
'hdfs_port' : getNonSecureNamenodePortForVersion(hadoopVer),
'secure_hdfs_port': getSecureNamenodePortForVersion(hadoopVer),
]
inputs.properties(expansions)
filter("tokens": expansions.collectEntries { k, v -> [k, v.toString()]}, ReplaceTokens.class)
it.into("build/resources/yamlRestTest/rest-api-spec/test")
it.into("hdfs_repository_" + hadoopVer) {
from "src/yamlRestTest/resources/rest-api-spec/test/hdfs_repository"
}
it.into("secure_hdfs_repository_" + hadoopVer) {
from "src/yamlRestTest/resources/rest-api-spec/test/secure_hdfs_repository"
}
}
tasks.named("processYamlRestTestResources").configure {
dependsOn(processHadoopTestResources)
}
if (fixtureSupported.get()) {
// Check depends on the HA test. Already depends on the standard test.
tasks.named("check").configure {
dependsOn("javaRestTest" + hadoopVer)
}
// Both standard and HA tests depend on their respective HDFS fixtures
tasks.named("yamlRestTest" + hadoopVer).configure {
dependsOn "hdfs" + hadoopVer + "Fixture"
// The normal test runner only runs the standard hdfs rest tests
systemProperty 'tests.rest.suite', 'hdfs_repository_' + hadoopVer
}
tasks.named("javaRestTest" + hadoopVer).configure {
dependsOn "haHdfs" + hadoopVer + "Fixture"
}
} else {
// The normal integration test runner will just test that the plugin loads
tasks.named("yamlRestTest" + hadoopVer).configure {
systemProperty 'tests.rest.suite', 'hdfs_repository_' + hadoopVer + '/10_basic'
}
// HA fixture is unsupported. Don't run them.
tasks.named("javaRestTestSecure" + hadoopVer).configure {
enabled = false
}
}
tasks.named("check").configure {
dependsOn("yamlRestTest" + hadoopVer, "yamlRestTestSecure" + hadoopVer, "javaRestTestSecure" + hadoopVer)
}
// Run just the secure hdfs rest test suite.
tasks.named("yamlRestTestSecure" + hadoopVer).configure {
systemProperty 'tests.rest.suite', 'secure_hdfs_repository_' + hadoopVer
}
classpath = sourceSets.yamlRestTest.runtimeClasspath + configurations.hdfsFixture2
}
def getSecureNamenodePortForVersion(hadoopVersion) {
return 10002 - (2 * hadoopVersion)
tasks.named("check").configure {
dependsOn(tasks.withType(RestIntegTestTask))
}
def getNonSecureNamenodePortForVersion(hadoopVersion) {
return 10003 - (2 * hadoopVersion)
}
Set disabledIntegTestTaskNames = []
tasks.withType(RestIntegTestTask).configureEach { testTask ->
if (disabledIntegTestTaskNames.contains(name)) {
enabled = false;
}
BuildParams.withFipsEnabledOnly(testTask)
if (name.contains("Secure")) {
if (disabledIntegTestTaskNames.contains(name) == false) {
nonInputProperties.systemProperty "test.krb5.principal.es", "elasticsearch@${realm}"
nonInputProperties.systemProperty "test.krb5.principal.hdfs", "hdfs/hdfs.build.elastic.co@${realm}"
nonInputProperties.systemProperty "java.security.krb5.conf", "${project.configurations.krb5Config.getSingleFile().getPath()}"
nonInputProperties.systemProperty(
"test.krb5.keytab.hdfs",
new File(project.configurations.krb5Keytabs.singleFile, "hdfs_hdfs.build.elastic.co.keytab").getPath()
)
}
}
testClusters.matching { it.name == testTask.name }.configureEach {
if (testTask.name.contains("Secure")) {
systemProperty "java.security.krb5.conf", { configurations.krb5Config.singleFile.getPath() }, IGNORE_VALUE
extraConfigFile(
"repository-hdfs/krb5.keytab",
new File(project.configurations.krb5Keytabs.singleFile, "elasticsearch.keytab"),
IGNORE_VALUE
)
}
}
}
tasks.named("thirdPartyAudit").configure {
ignoreMissingClasses()
ignoreViolations(
@ -374,8 +182,3 @@ tasks.named("thirdPartyAudit").configure {
'org.apache.hadoop.thirdparty.protobuf.UnsafeUtil$MemoryAccessor'
)
}
tasks.named('resolveAllDependencies') {
// This avoids spinning up the test fixture when downloading all dependencies
configs = project.configurations - [project.configurations.krb5Config]
}

View File

@ -0,0 +1,75 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package org.elasticsearch.repositories.hdfs;
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.core.Strings;
import org.elasticsearch.test.fixtures.hdfs.HdfsClientThreadLeakFilter;
import org.elasticsearch.test.fixtures.hdfs.HdfsFixture;
import org.elasticsearch.test.fixtures.testcontainers.TestContainersThreadFilter;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.junit.Assert;
import java.io.IOException;
@ThreadLeakFilters(filters = { HdfsClientThreadLeakFilter.class, TestContainersThreadFilter.class })
abstract class AbstractHaHdfsFailoverTestSuiteIT extends ESRestTestCase {
abstract HdfsFixture getHdfsFixture();
String securityCredentials() {
return "";
}
public void testHAFailoverWithRepository() throws Exception {
getHdfsFixture().setupHA();
RestClient client = client();
createRepository(client);
// Get repository
Response response = client.performRequest(new Request("GET", "/_snapshot/hdfs_ha_repo_read/_all"));
Assert.assertEquals(200, response.getStatusLine().getStatusCode());
// Failover the namenode to the second.
getHdfsFixture().failoverHDFS("nn1", "nn2");
safeSleep(2000);
// Get repository again
response = client.performRequest(new Request("GET", "/_snapshot/hdfs_ha_repo_read/_all"));
Assert.assertEquals(200, response.getStatusLine().getStatusCode());
}
private void createRepository(RestClient client) throws IOException {
Request request = new Request("PUT", "/_snapshot/hdfs_ha_repo_read");
request.setJsonEntity(Strings.format("""
{
"type": "hdfs",
"settings": {
"uri": "hdfs://ha-hdfs/",
"path": "/user/elasticsearch/existing/readonly-repository",
"readonly": "true",
%s
"conf.dfs.nameservices": "ha-hdfs",
"conf.dfs.ha.namenodes.ha-hdfs": "nn1,nn2",
"conf.dfs.namenode.rpc-address.ha-hdfs.nn1": "localhost:%s",
"conf.dfs.namenode.rpc-address.ha-hdfs.nn2": "localhost:%s",
"conf.dfs.client.failover.proxy.provider.ha-hdfs":\
"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
}
}""", securityCredentials(), getHdfsFixture().getPort(0), getHdfsFixture().getPort(1)));
Response response = client.performRequest(request);
Assert.assertEquals(200, response.getStatusLine().getStatusCode());
}
}

View File

@ -8,271 +8,41 @@
package org.elasticsearch.repositories.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.BadFencingConfigurationException;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceTarget;
import org.apache.hadoop.ha.NodeFencer;
import org.apache.hadoop.ha.ZKFCProtocol;
import org.apache.hadoop.ha.protocolPB.HAServiceProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.tools.DFSHAAdmin;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.core.PathUtils;
import org.elasticsearch.core.Strings;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.junit.Assert;
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.file.Files;
import java.nio.file.Path;
import java.security.AccessController;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.List;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
import org.elasticsearch.test.fixtures.hdfs.HdfsClientThreadLeakFilter;
import org.elasticsearch.test.fixtures.hdfs.HdfsFixture;
import org.junit.ClassRule;
import org.junit.rules.RuleChain;
import org.junit.rules.TestRule;
/**
* Integration test that runs against an HA-Enabled HDFS instance
*/
public class HaHdfsFailoverTestSuiteIT extends ESRestTestCase {
@ThreadLeakFilters(filters = { HdfsClientThreadLeakFilter.class })
public class HaHdfsFailoverTestSuiteIT extends AbstractHaHdfsFailoverTestSuiteIT {
public void testHAFailoverWithRepository() throws Exception {
RestClient client = client();
public static HdfsFixture hdfsFixture = new HdfsFixture().withHAService("ha-hdfs");
String esKerberosPrincipal = System.getProperty("test.krb5.principal.es");
String hdfsKerberosPrincipal = System.getProperty("test.krb5.principal.hdfs");
String kerberosKeytabLocation = System.getProperty("test.krb5.keytab.hdfs");
String ports = System.getProperty("test.hdfs-fixture.ports");
String nn1Port = "10001";
String nn2Port = "10002";
if (ports.length() > 0) {
final Path path = PathUtils.get(ports);
final List<String> lines = AccessController.doPrivileged((PrivilegedExceptionAction<List<String>>) () -> {
return Files.readAllLines(path);
});
nn1Port = lines.get(0);
nn2Port = lines.get(1);
}
boolean securityEnabled = hdfsKerberosPrincipal != null;
public static ElasticsearchCluster cluster = ElasticsearchCluster.local()
.distribution(DistributionType.DEFAULT)
.plugin("repository-hdfs")
.setting("xpack.license.self_generated.type", "trial")
.setting("xpack.security.enabled", "false")
.build();
Configuration hdfsConfiguration = new Configuration();
hdfsConfiguration.set("dfs.nameservices", "ha-hdfs");
hdfsConfiguration.set("dfs.ha.namenodes.ha-hdfs", "nn1,nn2");
hdfsConfiguration.set("dfs.namenode.rpc-address.ha-hdfs.nn1", "localhost:" + nn1Port);
hdfsConfiguration.set("dfs.namenode.rpc-address.ha-hdfs.nn2", "localhost:" + nn2Port);
hdfsConfiguration.set(
"dfs.client.failover.proxy.provider.ha-hdfs",
"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
);
@ClassRule
public static TestRule ruleChain = RuleChain.outerRule(hdfsFixture).around(cluster);
AccessController.doPrivileged((PrivilegedExceptionAction<Void>) () -> {
if (securityEnabled) {
// ensure that keytab exists
Path kt = PathUtils.get(kerberosKeytabLocation);
if (Files.exists(kt) == false) {
throw new IllegalStateException("Could not locate keytab at " + kerberosKeytabLocation);
}
if (Files.isReadable(kt) != true) {
throw new IllegalStateException("Could not read keytab at " + kerberosKeytabLocation);
}
logger.info("Keytab Length: " + Files.readAllBytes(kt).length);
// set principal names
hdfsConfiguration.set("dfs.namenode.kerberos.principal", hdfsKerberosPrincipal);
hdfsConfiguration.set("dfs.datanode.kerberos.principal", hdfsKerberosPrincipal);
hdfsConfiguration.set("dfs.data.transfer.protection", "authentication");
SecurityUtil.setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.KERBEROS, hdfsConfiguration);
UserGroupInformation.setConfiguration(hdfsConfiguration);
UserGroupInformation.loginUserFromKeytab(hdfsKerberosPrincipal, kerberosKeytabLocation);
} else {
SecurityUtil.setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.SIMPLE, hdfsConfiguration);
UserGroupInformation.setConfiguration(hdfsConfiguration);
UserGroupInformation.getCurrentUser();
}
return null;
});
// Create repository
{
Request request = new Request("PUT", "/_snapshot/hdfs_ha_repo_read");
request.setJsonEntity(Strings.format("""
{
"type": "hdfs",
"settings": {
"uri": "hdfs://ha-hdfs/",
"path": "/user/elasticsearch/existing/readonly-repository",
"readonly": "true",
%s
"conf.dfs.nameservices": "ha-hdfs",
"conf.dfs.ha.namenodes.ha-hdfs": "nn1,nn2",
"conf.dfs.namenode.rpc-address.ha-hdfs.nn1": "localhost:%s",
"conf.dfs.namenode.rpc-address.ha-hdfs.nn2": "localhost:%s",
"conf.dfs.client.failover.proxy.provider.ha-hdfs": \
"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
}
}""", securityCredentials(securityEnabled, esKerberosPrincipal), nn1Port, nn2Port));
Response response = client.performRequest(request);
Assert.assertEquals(200, response.getStatusLine().getStatusCode());
}
// Get repository
{
Response response = client.performRequest(new Request("GET", "/_snapshot/hdfs_ha_repo_read/_all"));
Assert.assertEquals(200, response.getStatusLine().getStatusCode());
}
// Failover the namenode to the second.
failoverHDFS("nn1", "nn2", hdfsConfiguration);
// Get repository again
{
Response response = client.performRequest(new Request("GET", "/_snapshot/hdfs_ha_repo_read/_all"));
Assert.assertEquals(200, response.getStatusLine().getStatusCode());
}
}
private String securityCredentials(boolean securityEnabled, String kerberosPrincipal) {
if (securityEnabled) {
return String.format(java.util.Locale.ROOT, """
"security.principal": "%s","conf.dfs.data.transfer.protection": "authentication",""", kerberosPrincipal);
} else {
return "";
}
}
/**
* Wraps an HAServiceTarget, keeping track of any HAServiceProtocol proxies it generates in order
* to close them at the end of the test lifecycle.
*/
private static class CloseableHAServiceTarget extends HAServiceTarget {
private final HAServiceTarget delegate;
private final List<HAServiceProtocol> protocolsToClose = new ArrayList<>();
CloseableHAServiceTarget(HAServiceTarget delegate) {
this.delegate = delegate;
@Override
protected String getTestRestCluster() {
return cluster.getHttpAddresses();
}
@Override
public InetSocketAddress getAddress() {
return delegate.getAddress();
}
@Override
public InetSocketAddress getHealthMonitorAddress() {
return delegate.getHealthMonitorAddress();
}
@Override
public InetSocketAddress getZKFCAddress() {
return delegate.getZKFCAddress();
}
@Override
public NodeFencer getFencer() {
return delegate.getFencer();
}
@Override
public void checkFencingConfigured() throws BadFencingConfigurationException {
delegate.checkFencingConfigured();
}
@Override
public HAServiceProtocol getProxy(Configuration conf, int timeoutMs) throws IOException {
HAServiceProtocol proxy = delegate.getProxy(conf, timeoutMs);
protocolsToClose.add(proxy);
return proxy;
}
@Override
public HAServiceProtocol getHealthMonitorProxy(Configuration conf, int timeoutMs) throws IOException {
return delegate.getHealthMonitorProxy(conf, timeoutMs);
}
@Override
public ZKFCProtocol getZKFCProxy(Configuration conf, int timeoutMs) throws IOException {
return delegate.getZKFCProxy(conf, timeoutMs);
}
@Override
public boolean isAutoFailoverEnabled() {
return delegate.isAutoFailoverEnabled();
}
private void close() {
for (HAServiceProtocol protocol : protocolsToClose) {
if (protocol instanceof HAServiceProtocolClientSideTranslatorPB haServiceProtocolClientSideTranslatorPB) {
haServiceProtocolClientSideTranslatorPB.close();
}
}
}
}
/**
* The default HAAdmin tool does not throw exceptions on failures, and does not close any client connection
* resources when it concludes. This subclass overrides the tool to allow for exception throwing, and to
* keep track of and clean up connection resources.
*/
private static class CloseableHAAdmin extends DFSHAAdmin {
private final List<CloseableHAServiceTarget> serviceTargets = new ArrayList<>();
@Override
protected HAServiceTarget resolveTarget(String nnId) {
CloseableHAServiceTarget target = new CloseableHAServiceTarget(super.resolveTarget(nnId));
serviceTargets.add(target);
return target;
}
@Override
public int run(String[] argv) throws Exception {
return runCmd(argv);
}
public int transitionToStandby(String namenodeID) throws Exception {
return run(new String[] { "-transitionToStandby", namenodeID });
}
public int transitionToActive(String namenodeID) throws Exception {
return run(new String[] { "-transitionToActive", namenodeID });
}
public void close() {
for (CloseableHAServiceTarget serviceTarget : serviceTargets) {
serviceTarget.close();
}
}
}
/**
* Performs a two-phase leading namenode transition.
* @param from Namenode ID to transition to standby
* @param to Namenode ID to transition to active
* @param configuration Client configuration for HAAdmin tool
* @throws IOException In the event of a raised exception during namenode failover.
*/
private void failoverHDFS(String from, String to, Configuration configuration) throws IOException {
logger.info("Swapping active namenodes: [{}] to standby and [{}] to active", from, to);
try {
AccessController.doPrivileged((PrivilegedExceptionAction<Void>) () -> {
CloseableHAAdmin haAdmin = new CloseableHAAdmin();
haAdmin.setConf(configuration);
try {
haAdmin.transitionToStandby(from);
haAdmin.transitionToActive(to);
} finally {
haAdmin.close();
}
return null;
});
} catch (PrivilegedActionException pae) {
throw new IOException("Unable to perform namenode failover", pae);
}
HdfsFixture getHdfsFixture() {
return hdfsFixture;
}
}

View File

@ -0,0 +1,55 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package org.elasticsearch.repositories.hdfs;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
import org.elasticsearch.test.cluster.util.resource.Resource;
import org.elasticsearch.test.fixtures.hdfs.HdfsFixture;
import org.elasticsearch.test.fixtures.krb5kdc.Krb5kDcContainer;
import org.junit.ClassRule;
import org.junit.rules.RuleChain;
import org.junit.rules.TestRule;
public class SecureHaHdfsFailoverTestSuiteIT extends AbstractHaHdfsFailoverTestSuiteIT {
public static Krb5kDcContainer krb5Fixture = new Krb5kDcContainer();
public static HdfsFixture hdfsFixture = new HdfsFixture().withHAService("ha-hdfs")
.withKerberos(() -> krb5Fixture.getPrincipal(), () -> krb5Fixture.getKeytab());
public static ElasticsearchCluster cluster = ElasticsearchCluster.local()
.distribution(DistributionType.DEFAULT)
.plugin("repository-hdfs")
.setting("xpack.license.self_generated.type", "trial")
.setting("xpack.security.enabled", "false")
.systemProperty("java.security.krb5.conf", () -> krb5Fixture.getConfPath().toString())
.configFile("repository-hdfs/krb5.conf", Resource.fromString(() -> krb5Fixture.getConf()))
.configFile("repository-hdfs/krb5.keytab", Resource.fromFile(() -> krb5Fixture.getEsKeytab()))
.build();
@ClassRule
public static TestRule ruleChain = RuleChain.outerRule(krb5Fixture).around(hdfsFixture).around(cluster);
@Override
HdfsFixture getHdfsFixture() {
return hdfsFixture;
}
@Override
protected String getTestRestCluster() {
return cluster.getHttpAddresses();
}
protected String securityCredentials() {
return String.format(java.util.Locale.ROOT, """
"security.principal": "%s","conf.dfs.data.transfer.protection": "authentication",""", krb5Fixture.getEsPrincipal());
}
}

View File

@ -24,6 +24,7 @@ import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.core.Streams;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.fixtures.hdfs.HdfsClientThreadLeakFilter;
import org.hamcrest.CoreMatchers;
import org.mockito.AdditionalMatchers;
import org.mockito.Mockito;

View File

@ -13,6 +13,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.blobstore.ESBlobStoreRepositoryIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.fixtures.hdfs.HdfsClientThreadLeakFilter;
import java.util.Collection;
import java.util.Collections;

View File

@ -15,6 +15,7 @@ import org.elasticsearch.common.settings.SecureSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.AbstractThirdPartyRepositoryTestCase;
import org.elasticsearch.test.fixtures.hdfs.HdfsClientThreadLeakFilter;
import java.util.Collection;

View File

@ -22,6 +22,7 @@ import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil;
import org.elasticsearch.search.SearchResponseUtils;
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.test.fixtures.hdfs.HdfsClientThreadLeakFilter;
import java.util.Collection;

View File

@ -5,22 +5,52 @@
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package org.elasticsearch.repositories.hdfs;
import com.carrotsearch.randomizedtesting.annotations.Name;
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
import org.elasticsearch.test.fixtures.hdfs.HdfsClientThreadLeakFilter;
import org.elasticsearch.test.fixtures.hdfs.HdfsFixture;
import org.elasticsearch.test.fixtures.testcontainers.TestContainersThreadFilter;
import org.elasticsearch.test.rest.yaml.ClientYamlTestCandidate;
import org.elasticsearch.test.rest.yaml.ESClientYamlSuiteTestCase;
import org.junit.ClassRule;
import org.junit.rules.RuleChain;
import org.junit.rules.TestRule;
import java.util.Map;
@ThreadLeakFilters(filters = { HdfsClientThreadLeakFilter.class, TestContainersThreadFilter.class })
public class RepositoryHdfsClientYamlTestSuiteIT extends ESClientYamlSuiteTestCase {
public static HdfsFixture hdfsFixture = new HdfsFixture();
public static ElasticsearchCluster cluster = ElasticsearchCluster.local()
.distribution(DistributionType.DEFAULT)
.plugin("repository-hdfs")
.setting("xpack.license.self_generated.type", "trial")
.setting("xpack.security.enabled", "false")
.build();
@ClassRule
public static TestRule ruleChain = RuleChain.outerRule(hdfsFixture).around(cluster);
public RepositoryHdfsClientYamlTestSuiteIT(@Name("yaml") ClientYamlTestCandidate testCandidate) {
super(testCandidate);
}
@Override
protected String getTestRestCluster() {
return cluster.getHttpAddresses();
}
@ParametersFactory
public static Iterable<Object[]> parameters() throws Exception {
return ESClientYamlSuiteTestCase.createParameters();
return createParameters(new String[] { "hdfs_repository" }, Map.of("hdfs_port", hdfsFixture.getPort()));
}
}

View File

@ -0,0 +1,62 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package org.elasticsearch.repositories.hdfs;
import com.carrotsearch.randomizedtesting.annotations.Name;
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
import org.elasticsearch.test.cluster.util.resource.Resource;
import org.elasticsearch.test.fixtures.hdfs.HdfsClientThreadLeakFilter;
import org.elasticsearch.test.fixtures.hdfs.HdfsFixture;
import org.elasticsearch.test.fixtures.krb5kdc.Krb5kDcContainer;
import org.elasticsearch.test.fixtures.testcontainers.TestContainersThreadFilter;
import org.elasticsearch.test.rest.yaml.ClientYamlTestCandidate;
import org.elasticsearch.test.rest.yaml.ESClientYamlSuiteTestCase;
import org.junit.ClassRule;
import org.junit.rules.RuleChain;
import org.junit.rules.TestRule;
import java.util.Map;
@ThreadLeakFilters(filters = { HdfsClientThreadLeakFilter.class, TestContainersThreadFilter.class })
public class SecureRepositoryHdfsClientYamlTestSuiteIT extends ESClientYamlSuiteTestCase {
public static Krb5kDcContainer krb5Fixture = new Krb5kDcContainer();
public static HdfsFixture hdfsFixture = new HdfsFixture().withKerberos(() -> krb5Fixture.getPrincipal(), () -> krb5Fixture.getKeytab());
public static ElasticsearchCluster cluster = ElasticsearchCluster.local()
.distribution(DistributionType.DEFAULT)
.plugin("repository-hdfs")
.setting("xpack.license.self_generated.type", "trial")
.setting("xpack.security.enabled", "false")
.systemProperty("java.security.krb5.conf", () -> krb5Fixture.getConfPath().toString())
.configFile("repository-hdfs/krb5.conf", Resource.fromString(() -> krb5Fixture.getConf()))
.configFile("repository-hdfs/krb5.keytab", Resource.fromFile(() -> krb5Fixture.getEsKeytab()))
.build();
@ClassRule
public static TestRule ruleChain = RuleChain.outerRule(krb5Fixture).around(hdfsFixture).around(cluster);
public SecureRepositoryHdfsClientYamlTestSuiteIT(@Name("yaml") ClientYamlTestCandidate testCandidate) {
super(testCandidate);
}
@Override
protected String getTestRestCluster() {
return cluster.getHttpAddresses();
}
@ParametersFactory
public static Iterable<Object[]> parameters() throws Exception {
return createParameters(new String[] { "secure_hdfs_repository" }, Map.of("secure_hdfs_port", hdfsFixture.getPort()));
}
}

View File

@ -90,8 +90,7 @@ List projects = [
'test:framework',
'test:fixtures:azure-fixture',
'test:fixtures:gcs-fixture',
'test:fixtures:hdfs2-fixture',
'test:fixtures:hdfs3-fixture',
'test:fixtures:hdfs-fixture',
'test:fixtures:krb5kdc-fixture',
'test:fixtures:minio-fixture',
'test:fixtures:old-elasticsearch',

90
test/fixtures/hdfs-fixture/build.gradle vendored Normal file
View File

@ -0,0 +1,90 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
apply plugin: 'elasticsearch.java'
apply plugin: 'com.github.johnrengelman.shadow'
import com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar
configurations {
// all {
// transitive = true
// }
hdfs2
hdfs3
consumable("shadowedHdfs2")
}
dependencies {
compileOnly("org.apache.hadoop:hadoop-minicluster:2.8.5")
api("com.carrotsearch.randomizedtesting:randomizedtesting-runner:${versions.randomizedrunner}") {
transitive false
}
compileOnly "junit:junit:${versions.junit}"
hdfs2 "org.apache.hadoop:hadoop-minicluster:2.8.5"
hdfs3 "org.apache.hadoop:hadoop-minicluster:3.3.1"
}
tasks.named("shadowJar").configure {
archiveClassifier.set("hdfs3")
// fix issues with signed jars
relocate("org.apache.hadoop", "fixture.hdfs3.org.apache.hadoop") {
exclude "org.apache.hadoop.hdfs.protocol.ClientProtocol"
exclude "org.apache.hadoop.ipc.StandbyException"
}
configurations << project.configurations.hdfs3
}
def hdfs2Jar = tasks.register("hdfs2jar", ShadowJar) {
relocate("org.apache.hadoop", "fixture.hdfs2.org.apache.hadoop") {
exclude "org.apache.hadoop.hdfs.protocol.ClientProtocol"
exclude "org.apache.hadoop.ipc.StandbyException"
}
archiveClassifier.set("hdfs2")
from sourceSets.main.output
configurations << project.configurations.hdfs2
}
tasks.withType(ShadowJar) {
dependencies {
// exclude(dependency('commons-io:commons-io:2.8.0'))
exclude(dependency("com.carrotsearch.randomizedtesting:randomizedtesting-runner:.*"))
exclude(dependency("junit:junit:.*"))
exclude(dependency("org.slf4j:slf4j-api:.*"))
exclude(dependency("com.google.guava:guava:.*"))
exclude(dependency("org.apache.commons:commons-compress:.*"))
exclude(dependency("commons-logging:commons-logging:.*"))
exclude(dependency("commons-codec:commons-codec:.*"))
exclude(dependency("org.apache.httpcomponents:httpclient:.*"))
exclude(dependency("org.apache.httpcomponents:httpcore:.*"))
exclude(dependency("org.apache.logging.log4j:log4j-1.2-api:.*"))
exclude(dependency("log4j:log4j:.*"))
exclude(dependency("io.netty:.*:.*"))
exclude(dependency("com.nimbusds:nimbus-jose-jwt:.*"))
exclude(dependency("commons-cli:commons-cli:1.2"))
exclude(dependency("net.java.dev.jna:jna:.*"))
exclude(dependency("org.objenesis:objenesis:.*"))
exclude(dependency('com.fasterxml.jackson.core:.*:.*'))
}
transform(org.elasticsearch.gradle.internal.shadow.XmlClassRelocationTransformer.class) {
resource = "core-default.xml"
enabled = true
}
transform(org.elasticsearch.gradle.internal.shadow.XmlClassRelocationTransformer.class) {
resource = "hdfs-default.xml"
enabled = true
}
}
artifacts {
shadowedHdfs2(hdfs2Jar)
}

View File

@ -6,7 +6,7 @@
* Side Public License, v 1.
*/
package org.elasticsearch.repositories.hdfs;
package org.elasticsearch.test.fixtures.hdfs;
import com.carrotsearch.randomizedtesting.ThreadFilter;
@ -29,6 +29,11 @@ public final class HdfsClientThreadLeakFilter implements ThreadFilter {
@Override
public boolean reject(Thread t) {
return t.getName().equals(OFFENDING_THREAD_NAME);
return t.getName().contains(OFFENDING_THREAD_NAME)
|| t.getName().startsWith("LeaseRenewer")
|| t.getName().startsWith("SSL Certificates Store Monitor") // hadoop 3 brings that in
|| t.getName().startsWith("GcTimeMonitor") // hadoop 3
|| t.getName().startsWith("Command processor") // hadoop 3
|| t.getName().startsWith("ForkJoinPool-"); // hadoop 3
}
}

View File

@ -0,0 +1,438 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package org.elasticsearch.test.fixtures.hdfs;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclEntryType;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.ha.BadFencingConfigurationException;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceTarget;
import org.apache.hadoop.ha.NodeFencer;
import org.apache.hadoop.ha.ZKFCProtocol;
import org.apache.hadoop.ha.protocolPB.HAServiceProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.apache.hadoop.hdfs.tools.DFSHAAdmin;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.Assume;
import org.junit.rules.ExternalResource;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.AccessController;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Supplier;
public class HdfsFixture extends ExternalResource {
private static final Logger LOGGER = LoggerFactory.getLogger(HdfsFixture.class);
private TemporaryFolder temporaryFolder = new TemporaryFolder();
private MiniDFSCluster dfs;
private String haNameService;
private Supplier<String> principalConfig = null;
private Supplier<Path> keytab = null;
private Configuration cfg;
private Configuration haConfiguration;
private int explicitPort = findAvailablePort();
public HdfsFixture withHAService(String haNameService) {
this.haNameService = haNameService;
return this;
}
public HdfsFixture withKerberos(Supplier<String> principalConfig, Supplier<Path> keytabFile) {
this.principalConfig = principalConfig;
this.keytab = keytabFile;
return this;
}
@Override
protected void before() throws Throwable {
temporaryFolder.create();
assumeHdfsAvailable();
startMinHdfs();
}
private void assumeHdfsAvailable() {
boolean fixtureSupported = false;
if (isWindows()) {
// hdfs fixture will not start without hadoop native libraries on windows
String nativePath = System.getenv("HADOOP_HOME");
if (nativePath != null) {
java.nio.file.Path path = Paths.get(nativePath);
if (Files.isDirectory(path)
&& Files.exists(path.resolve("bin").resolve("winutils.exe"))
&& Files.exists(path.resolve("bin").resolve("hadoop.dll"))
&& Files.exists(path.resolve("bin").resolve("hdfs.dll"))) {
fixtureSupported = true;
} else {
throw new IllegalStateException(
"HADOOP_HOME: " + path + " is invalid, does not contain hadoop native libraries in " + nativePath + "/bin"
);
}
}
} else {
fixtureSupported = true;
}
boolean nonLegalegalPath = temporaryFolder.getRoot().getAbsolutePath().contains(" ");
if (nonLegalegalPath) {
fixtureSupported = false;
}
Assume.assumeTrue("HDFS Fixture is not supported", fixtureSupported);
}
private boolean isWindows() {
return System.getProperty("os.name").toLowerCase().startsWith("windows");
}
/**
* Performs a two-phase leading namenode transition.
* @param from Namenode ID to transition to standby
* @param to Namenode ID to transition to active
* @throws IOException In the event of a raised exception during namenode failover.
*/
public void failoverHDFS(String from, String to) throws IOException {
assert isHA() && haConfiguration != null : "HA Configuration must be set up before performing failover";
LOGGER.info("Swapping active namenodes: [{}] to standby and [{}] to active", from, to);
try {
AccessController.doPrivileged((PrivilegedExceptionAction<Void>) () -> {
CloseableHAAdmin haAdmin = new CloseableHAAdmin();
haAdmin.setConf(haConfiguration);
try {
haAdmin.transitionToStandby(from);
haAdmin.transitionToActive(to);
} finally {
haAdmin.close();
}
return null;
});
} catch (PrivilegedActionException pae) {
throw new IOException("Unable to perform namenode failover", pae);
}
}
public void setupHA() throws IOException {
assert isHA() : "HA Name Service must be set up before setting up HA";
haConfiguration = new Configuration();
haConfiguration.set("dfs.nameservices", haNameService);
haConfiguration.set("dfs.ha.namenodes.ha-hdfs", "nn1,nn2");
haConfiguration.set("dfs.namenode.rpc-address.ha-hdfs.nn1", "localhost:" + getPort(0));
haConfiguration.set("dfs.namenode.rpc-address.ha-hdfs.nn2", "localhost:" + (getPort(1)));
haConfiguration.set(
"dfs.client.failover.proxy.provider.ha-hdfs",
"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
);
if (isSecure()) {
// ensure that keytab exists
Path kt = this.keytab.get();
if (Files.exists(kt) == false) {
throw new IllegalStateException("Could not locate keytab at " + keytab.get());
}
if (Files.isReadable(kt) != true) {
throw new IllegalStateException("Could not read keytab at " + keytab.get());
}
LOGGER.info("Keytab Length: " + Files.readAllBytes(kt).length);
// set principal names
String hdfsKerberosPrincipal = principalConfig.get();
haConfiguration.set("dfs.namenode.kerberos.principal", hdfsKerberosPrincipal);
haConfiguration.set("dfs.datanode.kerberos.principal", hdfsKerberosPrincipal);
haConfiguration.set("dfs.data.transfer.protection", "authentication");
SecurityUtil.setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.KERBEROS, haConfiguration);
UserGroupInformation.setConfiguration(haConfiguration);
UserGroupInformation.loginUserFromKeytab(hdfsKerberosPrincipal, keytab.get().toString());
} else {
SecurityUtil.setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.SIMPLE, haConfiguration);
UserGroupInformation.setConfiguration(haConfiguration);
UserGroupInformation.getCurrentUser();
}
}
private void startMinHdfs() throws Exception {
Path baseDir = temporaryFolder.newFolder("baseDir").toPath();
if (System.getenv("HADOOP_HOME") == null) {
Path hadoopHome = baseDir.resolve("hadoop-home");
Files.createDirectories(hadoopHome);
System.setProperty("hadoop.home.dir", hadoopHome.toAbsolutePath().toString());
}
// hdfs-data/, where any data is going
Path hdfsHome = baseDir.resolve("hdfs-data");
new File(hdfsHome.toFile(), "data").mkdirs();
// configure cluster
cfg = new Configuration();
cfg.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, hdfsHome.toAbsolutePath().toString());
cfg.set("hadoop.security.group.mapping", "org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback");
// optionally configure security
if (isSecure()) {
String kerberosPrincipal = principalConfig.get();
String keytabFilePath = keytab.get().toString();
cfg.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
cfg.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true");
cfg.set(DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, kerberosPrincipal);
cfg.set(DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY, kerberosPrincipal);
cfg.set(DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY, kerberosPrincipal);
cfg.set(DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY, keytabFilePath);
cfg.set(DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY, keytabFilePath);
cfg.set(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, "true");
cfg.set(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, "true");
cfg.set(DFSConfigKeys.IGNORE_SECURE_PORTS_FOR_TESTING_KEY, "true");
cfg.set(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY, "true");
}
refreshKrb5Config();
UserGroupInformation.setConfiguration(cfg);
MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(cfg);
// if(isSecure()) {
builder.nameNodePort(explicitPort);
// } else {
// builder.nameNodePort(explicitPort);
// }
if (isHA()) {
MiniDFSNNTopology.NNConf nn1 = new MiniDFSNNTopology.NNConf("nn1").setIpcPort(0);
MiniDFSNNTopology.NNConf nn2 = new MiniDFSNNTopology.NNConf("nn2").setIpcPort(0);
MiniDFSNNTopology.NSConf nameservice = new MiniDFSNNTopology.NSConf(haNameService).addNN(nn1).addNN(nn2);
MiniDFSNNTopology namenodeTopology = new MiniDFSNNTopology().addNameservice(nameservice);
builder.nnTopology(namenodeTopology);
}
dfs = builder.build();
// Configure contents of the filesystem
org.apache.hadoop.fs.Path esUserPath = new org.apache.hadoop.fs.Path("/user/elasticsearch");
FileSystem fs;
if (isHA()) {
dfs.transitionToActive(0);
fs = HATestUtil.configureFailoverFs(dfs, cfg);
} else {
fs = dfs.getFileSystem(0);
}
try {
// Set the elasticsearch user directory up
fs.mkdirs(esUserPath);
if (UserGroupInformation.isSecurityEnabled()) {
List<AclEntry> acls = new ArrayList<>();
acls.add(new AclEntry.Builder().setType(AclEntryType.USER).setName("elasticsearch").setPermission(FsAction.ALL).build());
fs.modifyAclEntries(esUserPath, acls);
}
// Install a pre-existing repository into HDFS
String directoryName = "readonly-repository";
String archiveName = directoryName + ".tar.gz";
URL readOnlyRepositoryArchiveURL = getClass().getClassLoader().getResource(archiveName);
if (readOnlyRepositoryArchiveURL != null) {
Path tempDirectory = Files.createTempDirectory(getClass().getName());
File readOnlyRepositoryArchive = tempDirectory.resolve(archiveName).toFile();
FileUtils.copyURLToFile(readOnlyRepositoryArchiveURL, readOnlyRepositoryArchive);
FileUtil.unTar(readOnlyRepositoryArchive, tempDirectory.toFile());
fs.copyFromLocalFile(
true,
true,
new org.apache.hadoop.fs.Path(tempDirectory.resolve(directoryName).toAbsolutePath().toUri()),
esUserPath.suffix("/existing/" + directoryName)
);
FileUtils.deleteDirectory(tempDirectory.toFile());
}
} finally {
fs.close();
}
}
private boolean isSecure() {
return keytab != null && principalConfig != null;
}
@Override
protected void after() {
if (dfs != null) {
try {
if (isHA()) {
dfs.getFileSystem(0).close();
dfs.getFileSystem(1).close();
} else {
dfs.getFileSystem().close();
}
} catch (IOException e) {
throw new RuntimeException(e);
}
dfs.close();
}
temporaryFolder.delete();
}
private boolean isHA() {
return haNameService != null;
}
public int getPort() {
return dfs == null ? explicitPort : dfs.getNameNodePort(0);
}
// fix port handling to allow parallel hdfs fixture runs
public int getPort(int i) {
return dfs.getNameNodePort(i);
}
/**
* Wraps an HAServiceTarget, keeping track of any HAServiceProtocol proxies it generates in order
* to close them at the end of the test lifecycle.
*/
protected static class CloseableHAServiceTarget extends HAServiceTarget {
private final HAServiceTarget delegate;
private final List<HAServiceProtocol> protocolsToClose = new ArrayList<>();
CloseableHAServiceTarget(HAServiceTarget delegate) {
this.delegate = delegate;
}
@Override
public InetSocketAddress getAddress() {
return delegate.getAddress();
}
@Override
public InetSocketAddress getHealthMonitorAddress() {
return delegate.getHealthMonitorAddress();
}
@Override
public InetSocketAddress getZKFCAddress() {
return delegate.getZKFCAddress();
}
@Override
public NodeFencer getFencer() {
return delegate.getFencer();
}
@Override
public void checkFencingConfigured() throws BadFencingConfigurationException {
delegate.checkFencingConfigured();
}
@Override
public HAServiceProtocol getProxy(Configuration conf, int timeoutMs) throws IOException {
HAServiceProtocol proxy = delegate.getProxy(conf, timeoutMs);
protocolsToClose.add(proxy);
return proxy;
}
@Override
public HAServiceProtocol getHealthMonitorProxy(Configuration conf, int timeoutMs) throws IOException {
return delegate.getHealthMonitorProxy(conf, timeoutMs);
}
@Override
public ZKFCProtocol getZKFCProxy(Configuration conf, int timeoutMs) throws IOException {
return delegate.getZKFCProxy(conf, timeoutMs);
}
@Override
public boolean isAutoFailoverEnabled() {
return delegate.isAutoFailoverEnabled();
}
private void close() {
for (HAServiceProtocol protocol : protocolsToClose) {
if (protocol instanceof HAServiceProtocolClientSideTranslatorPB haServiceProtocolClientSideTranslatorPB) {
haServiceProtocolClientSideTranslatorPB.close();
}
}
}
}
/**
* The default HAAdmin tool does not throw exceptions on failures, and does not close any client connection
* resources when it concludes. This subclass overrides the tool to allow for exception throwing, and to
* keep track of and clean up connection resources.
*/
protected static class CloseableHAAdmin extends DFSHAAdmin {
private final List<CloseableHAServiceTarget> serviceTargets = new ArrayList<>();
@Override
protected HAServiceTarget resolveTarget(String nnId) {
CloseableHAServiceTarget target = new CloseableHAServiceTarget(super.resolveTarget(nnId));
serviceTargets.add(target);
return target;
}
@Override
public int run(String[] argv) throws Exception {
return runCmd(argv);
}
public int transitionToStandby(String namenodeID) throws Exception {
return run(new String[] { "-transitionToStandby", namenodeID });
}
public int transitionToActive(String namenodeID) throws Exception {
return run(new String[] { "-transitionToActive", namenodeID });
}
public void close() {
for (CloseableHAServiceTarget serviceTarget : serviceTargets) {
serviceTarget.close();
}
}
}
@SuppressWarnings({ "unchecked", "rawtypes" })
public static void refreshKrb5Config() throws ClassNotFoundException, NoSuchMethodException, IllegalArgumentException,
IllegalAccessException, InvocationTargetException, InvocationTargetException {
Class classRef;
if (System.getProperty("java.vendor").contains("IBM")) {
classRef = Class.forName("com.ibm.security.krb5.internal.Config");
} else {
classRef = Class.forName("sun.security.krb5.Config");
}
Method refreshMethod = classRef.getMethod("refresh");
refreshMethod.invoke(classRef);
}
private static int findAvailablePort() {
try (ServerSocket socket = new ServerSocket(0)) {
return socket.getLocalPort();
} catch (Exception ex) {
LOGGER.error("Failed to find available port", ex);
}
return -1;
}
}

View File

@ -1,13 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
apply plugin: 'elasticsearch.java'
dependencies {
api "org.apache.hadoop:hadoop-minicluster:2.8.5"
}

View File

@ -1,175 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package hdfs;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclEntryType;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.apache.hadoop.security.UserGroupInformation;
import java.io.File;
import java.lang.management.ManagementFactory;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
* MiniHDFS test fixture. There is a CLI tool, but here we can
* easily properly setup logging, avoid parsing JSON, etc.
*/
public class MiniHDFS {
private static String PORT_FILE_NAME = "ports";
private static String PID_FILE_NAME = "pid";
public static void main(String[] args) throws Exception {
if (args.length != 1 && args.length != 3) {
throw new IllegalArgumentException(
"Expected: MiniHDFS <baseDirectory> [<kerberosPrincipal> <kerberosKeytab>], got: " + Arrays.toString(args)
);
}
boolean secure = args.length == 3;
// configure Paths
Path baseDir = Paths.get(args[0]);
// hadoop-home/, so logs will not complain
if (System.getenv("HADOOP_HOME") == null) {
Path hadoopHome = baseDir.resolve("hadoop-home");
Files.createDirectories(hadoopHome);
System.setProperty("hadoop.home.dir", hadoopHome.toAbsolutePath().toString());
}
// hdfs-data/, where any data is going
Path hdfsHome = baseDir.resolve("hdfs-data");
// configure cluster
Configuration cfg = new Configuration();
cfg.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, hdfsHome.toAbsolutePath().toString());
// lower default permission: TODO: needed?
cfg.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_KEY, "766");
// optionally configure security
if (secure) {
String kerberosPrincipal = args[1];
String keytabFile = args[2];
cfg.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
cfg.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true");
cfg.set(DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, kerberosPrincipal);
cfg.set(DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY, kerberosPrincipal);
cfg.set(DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY, kerberosPrincipal);
cfg.set(DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY, keytabFile);
cfg.set(DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY, keytabFile);
cfg.set(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, "true");
cfg.set(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, "true");
cfg.set(DFSConfigKeys.IGNORE_SECURE_PORTS_FOR_TESTING_KEY, "true");
cfg.set(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY, "true");
}
UserGroupInformation.setConfiguration(cfg);
MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(cfg);
String explicitPort = System.getProperty("hdfs.config.port");
if (explicitPort != null) {
builder.nameNodePort(Integer.parseInt(explicitPort));
} else {
if (secure) {
builder.nameNodePort(9998);
} else {
builder.nameNodePort(9999);
}
}
// Configure HA mode
String haNameService = System.getProperty("ha-nameservice");
boolean haEnabled = haNameService != null;
if (haEnabled) {
MiniDFSNNTopology.NNConf nn1 = new MiniDFSNNTopology.NNConf("nn1").setIpcPort(0);
MiniDFSNNTopology.NNConf nn2 = new MiniDFSNNTopology.NNConf("nn2").setIpcPort(0);
MiniDFSNNTopology.NSConf nameservice = new MiniDFSNNTopology.NSConf(haNameService).addNN(nn1).addNN(nn2);
MiniDFSNNTopology namenodeTopology = new MiniDFSNNTopology().addNameservice(nameservice);
builder.nnTopology(namenodeTopology);
}
MiniDFSCluster dfs = builder.build();
// Configure contents of the filesystem
org.apache.hadoop.fs.Path esUserPath = new org.apache.hadoop.fs.Path("/user/elasticsearch");
FileSystem fs;
if (haEnabled) {
dfs.transitionToActive(0);
fs = HATestUtil.configureFailoverFs(dfs, cfg);
} else {
fs = dfs.getFileSystem();
}
try {
// Set the elasticsearch user directory up
fs.mkdirs(esUserPath);
if (UserGroupInformation.isSecurityEnabled()) {
List<AclEntry> acls = new ArrayList<>();
acls.add(new AclEntry.Builder().setType(AclEntryType.USER).setName("elasticsearch").setPermission(FsAction.ALL).build());
fs.modifyAclEntries(esUserPath, acls);
}
// Install a pre-existing repository into HDFS
String directoryName = "readonly-repository";
String archiveName = directoryName + ".tar.gz";
URL readOnlyRepositoryArchiveURL = MiniHDFS.class.getClassLoader().getResource(archiveName);
if (readOnlyRepositoryArchiveURL != null) {
Path tempDirectory = Files.createTempDirectory(MiniHDFS.class.getName());
File readOnlyRepositoryArchive = tempDirectory.resolve(archiveName).toFile();
FileUtils.copyURLToFile(readOnlyRepositoryArchiveURL, readOnlyRepositoryArchive);
FileUtil.unTar(readOnlyRepositoryArchive, tempDirectory.toFile());
fs.copyFromLocalFile(
true,
true,
new org.apache.hadoop.fs.Path(tempDirectory.resolve(directoryName).toAbsolutePath().toUri()),
esUserPath.suffix("/existing/" + directoryName)
);
FileUtils.deleteDirectory(tempDirectory.toFile());
}
} finally {
fs.close();
}
// write our PID file
Path tmp = Files.createTempFile(baseDir, null, null);
String pid = ManagementFactory.getRuntimeMXBean().getName().split("@")[0];
Files.write(tmp, pid.getBytes(StandardCharsets.UTF_8));
Files.move(tmp, baseDir.resolve(PID_FILE_NAME), StandardCopyOption.ATOMIC_MOVE);
// write our port file
String portFileContent = Integer.toString(dfs.getNameNodePort(0));
if (haEnabled) {
portFileContent = portFileContent + "\n" + Integer.toString(dfs.getNameNodePort(1));
}
tmp = Files.createTempFile(baseDir, null, null);
Files.write(tmp, portFileContent.getBytes(StandardCharsets.UTF_8));
Files.move(tmp, baseDir.resolve(PORT_FILE_NAME), StandardCopyOption.ATOMIC_MOVE);
}
}

View File

@ -1,13 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
apply plugin: 'elasticsearch.java'
dependencies {
api "org.apache.hadoop:hadoop-minicluster:3.3.1"
}

View File

@ -1,176 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package hdfs;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclEntryType;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.apache.hadoop.security.UserGroupInformation;
import java.io.File;
import java.lang.management.ManagementFactory;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
* MiniHDFS test fixture. There is a CLI tool, but here we can
* easily properly setup logging, avoid parsing JSON, etc.
*/
public class MiniHDFS {
private static String PORT_FILE_NAME = "ports";
private static String PID_FILE_NAME = "pid";
public static void main(String[] args) throws Exception {
if (args.length != 1 && args.length != 3) {
throw new IllegalArgumentException(
"Expected: MiniHDFS <baseDirectory> [<kerberosPrincipal> <kerberosKeytab>], got: " + Arrays.toString(args)
);
}
boolean secure = args.length == 3;
// configure Paths
Path baseDir = Paths.get(args[0]);
// hadoop-home/, so logs will not complain
if (System.getenv("HADOOP_HOME") == null) {
Path hadoopHome = baseDir.resolve("hadoop-home");
Files.createDirectories(hadoopHome);
System.setProperty("hadoop.home.dir", hadoopHome.toAbsolutePath().toString());
}
// hdfs-data/, where any data is going
Path hdfsHome = baseDir.resolve("hdfs-data");
// configure cluster
Configuration cfg = new Configuration();
cfg.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, hdfsHome.toAbsolutePath().toString());
// lower default permission: TODO: needed?
cfg.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_KEY, "766");
// optionally configure security
if (secure) {
String kerberosPrincipal = args[1];
String keytabFile = args[2];
cfg.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
cfg.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true");
cfg.set(DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, kerberosPrincipal);
cfg.set(DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY, kerberosPrincipal);
cfg.set(DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY, kerberosPrincipal);
cfg.set(DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY, keytabFile);
cfg.set(DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY, keytabFile);
cfg.set(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, "true");
cfg.set(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, "true");
cfg.set(DFSConfigKeys.IGNORE_SECURE_PORTS_FOR_TESTING_KEY, "true");
cfg.set(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY, "true");
cfg.set(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY, "AES/CTR/NoPadding");
}
UserGroupInformation.setConfiguration(cfg);
MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(cfg);
String explicitPort = System.getProperty("hdfs.config.port");
if (explicitPort != null) {
builder.nameNodePort(Integer.parseInt(explicitPort));
} else {
if (secure) {
builder.nameNodePort(9998);
} else {
builder.nameNodePort(9999);
}
}
// Configure HA mode
String haNameService = System.getProperty("ha-nameservice");
boolean haEnabled = haNameService != null;
if (haEnabled) {
MiniDFSNNTopology.NNConf nn1 = new MiniDFSNNTopology.NNConf("nn1").setIpcPort(0);
MiniDFSNNTopology.NNConf nn2 = new MiniDFSNNTopology.NNConf("nn2").setIpcPort(0);
MiniDFSNNTopology.NSConf nameservice = new MiniDFSNNTopology.NSConf(haNameService).addNN(nn1).addNN(nn2);
MiniDFSNNTopology namenodeTopology = new MiniDFSNNTopology().addNameservice(nameservice);
builder.nnTopology(namenodeTopology);
}
MiniDFSCluster dfs = builder.build();
// Configure contents of the filesystem
org.apache.hadoop.fs.Path esUserPath = new org.apache.hadoop.fs.Path("/user/elasticsearch");
FileSystem fs;
if (haEnabled) {
dfs.transitionToActive(0);
fs = HATestUtil.configureFailoverFs(dfs, cfg);
} else {
fs = dfs.getFileSystem();
}
try {
// Set the elasticsearch user directory up
fs.mkdirs(esUserPath);
if (UserGroupInformation.isSecurityEnabled()) {
List<AclEntry> acls = new ArrayList<>();
acls.add(new AclEntry.Builder().setType(AclEntryType.USER).setName("elasticsearch").setPermission(FsAction.ALL).build());
fs.modifyAclEntries(esUserPath, acls);
}
// Install a pre-existing repository into HDFS
String directoryName = "readonly-repository";
String archiveName = directoryName + ".tar.gz";
URL readOnlyRepositoryArchiveURL = MiniHDFS.class.getClassLoader().getResource(archiveName);
if (readOnlyRepositoryArchiveURL != null) {
Path tempDirectory = Files.createTempDirectory(MiniHDFS.class.getName());
File readOnlyRepositoryArchive = tempDirectory.resolve(archiveName).toFile();
FileUtils.copyURLToFile(readOnlyRepositoryArchiveURL, readOnlyRepositoryArchive);
FileUtil.unTar(readOnlyRepositoryArchive, tempDirectory.toFile());
fs.copyFromLocalFile(
true,
true,
new org.apache.hadoop.fs.Path(tempDirectory.resolve(directoryName).toAbsolutePath().toUri()),
esUserPath.suffix("/existing/" + directoryName)
);
FileUtils.deleteDirectory(tempDirectory.toFile());
}
} finally {
fs.close();
}
// write our PID file
Path tmp = Files.createTempFile(baseDir, null, null);
String pid = ManagementFactory.getRuntimeMXBean().getName().split("@")[0];
Files.write(tmp, pid.getBytes(StandardCharsets.UTF_8));
Files.move(tmp, baseDir.resolve(PID_FILE_NAME), StandardCopyOption.ATOMIC_MOVE);
// write our port file
String portFileContent = Integer.toString(dfs.getNameNodePort(0));
if (haEnabled) {
portFileContent = portFileContent + "\n" + Integer.toString(dfs.getNameNodePort(1));
}
tmp = Files.createTempFile(baseDir, null, null);
Files.write(tmp, portFileContent.getBytes(StandardCharsets.UTF_8));
Files.move(tmp, baseDir.resolve(PORT_FILE_NAME), StandardCopyOption.ATOMIC_MOVE);
}
}

View File

@ -8,37 +8,22 @@ import org.gradle.api.services.internal.BuildServiceRegistryInternal
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
apply plugin: 'elasticsearch.test.fixtures'
apply plugin: 'elasticsearch.java'
apply plugin: 'elasticsearch.cache-test-fixtures'
apply plugin: 'elasticsearch.deploy-test-fixtures'
List<String> services = ["peppa", "hdfs"]
tasks.named("preProcessFixture").configure {
doLast {
// We need to create these up-front because if docker creates them they will be owned by root and we won't be
// able to clean them up
services.each { fixturesDir.dir("shared/${it}").get().getAsFile().mkdirs() }
dockerFixtures {
krb5dc {
dockerContext = projectDir
version = "1.0"
baseImages = ["ubuntu:14.04"]
}
}
tasks.named("postProcessFixture").configure { task ->
inputs.dir(fixturesDir.dir('shared').get().getAsFile())
services.each { service ->
File confTemplate = fixturesDir.file("shared/${service}/krb5.conf.template").get().asFile
File confFile = fixturesDir.file("shared/${service}/krb5.conf").get().asFile
outputs.file(confFile)
doLast {
assert confTemplate.exists()
String confContents = confTemplate.text
.replace("\${MAPPED_PORT}", "${ext."test.fixtures.${service}.udp.88"}")
confFile.text = confContents
}
}
}
project.ext.krb5Conf = { s -> file("$testFixturesDir/shared/${s}/krb5.conf") }
project.ext.krb5Keytabs = { s, fileName -> file("$testFixturesDir/shared/${s}/keytabs/${fileName}") }
configurations {
all {
transitive = false
}
krb5ConfHdfsFile {
canBeConsumed = true
canBeResolved = false
@ -49,11 +34,24 @@ configurations {
}
}
artifacts {
krb5ConfHdfsFile(krb5Conf('hdfs')) {
builtBy("postProcessFixture")
}
krb5KeytabsHdfsDir(file("$testFixturesDir/shared/hdfs/keytabs/")) {
builtBy("postProcessFixture")
}
dependencies {
testImplementation project(':test:framework')
api "junit:junit:${versions.junit}"
api project(':test:fixtures:testcontainer-utils')
api "org.testcontainers:testcontainers:${versions.testcontainer}"
implementation "com.carrotsearch.randomizedtesting:randomizedtesting-runner:${versions.randomizedrunner}"
implementation "org.slf4j:slf4j-api:${versions.slf4j}"
implementation "com.github.docker-java:docker-java-api:${versions.dockerJava}"
implementation "com.fasterxml.jackson.core:jackson-annotations:${versions.jackson}"
runtimeOnly "com.github.docker-java:docker-java-transport-zerodep:${versions.dockerJava}"
runtimeOnly "com.github.docker-java:docker-java-transport:${versions.dockerJava}"
runtimeOnly "com.github.docker-java:docker-java-core:${versions.dockerJava}"
runtimeOnly "org.apache.commons:commons-compress:${versions.commonsCompress}"
runtimeOnly "org.rnorth.duct-tape:duct-tape:${versions.ductTape}"
// ensure we have proper logging during when used in tests
runtimeOnly "org.slf4j:slf4j-simple:${versions.slf4j}"
runtimeOnly "org.hamcrest:hamcrest:${versions.hamcrest}"
}

View File

@ -1,32 +0,0 @@
version: '3'
services:
peppa:
hostname: kerberos.build.elastic.co
build:
context: .
dockerfile: Dockerfile
extra_hosts:
- "kerberos.build.elastic.co:127.0.0.1"
command: "bash /fixture/src/main/resources/provision/peppa.sh"
volumes:
- ./testfixtures_shared/shared/peppa:/fixture/build
# containers have bad entropy so mount /dev/urandom. Less secure but this is a test fixture.
- /dev/urandom:/dev/random
ports:
- "4444"
- "88/udp"
hdfs:
hostname: kerberos.build.elastic.co
build:
context: .
dockerfile: Dockerfile
extra_hosts:
- "kerberos.build.elastic.co:127.0.0.1"
command: "bash /fixture/src/main/resources/provision/hdfs.sh"
volumes:
- ./testfixtures_shared/shared/hdfs:/fixture/build
# containers have bad entropy so mount /dev/urandom. Less secure but this is a test fixture.
- /dev/urandom:/dev/random
ports:
- "4444"
- "88/udp"

View File

@ -0,0 +1,172 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package org.elasticsearch.test.fixtures.krb5kdc;
import com.github.dockerjava.api.model.ExposedPort;
import com.github.dockerjava.api.model.Ports;
import org.elasticsearch.test.fixtures.testcontainers.DockerEnvironmentAwareTestContainer;
import org.junit.rules.TemporaryFolder;
import org.testcontainers.containers.Network;
import org.testcontainers.images.RemoteDockerImage;
import org.testcontainers.shaded.org.apache.commons.io.IOUtils;
import org.testcontainers.utility.MountableFile;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
public final class Krb5kDcContainer extends DockerEnvironmentAwareTestContainer {
public static final String DOCKER_BASE_IMAGE = "docker.elastic.co/elasticsearch-dev/krb5dc-fixture:1.0";
private final TemporaryFolder temporaryFolder = new TemporaryFolder();
private final ProvisioningId provisioningId;
private Path krb5ConfFile;
private Path keytabFile;
private Path esKeytabFile;
public enum ProvisioningId {
HDFS(
"hdfs",
"/fixture/src/main/resources/provision/hdfs.sh",
"/fixture/build/keytabs/hdfs_hdfs.build.elastic.co.keytab",
"/fixture/build/keytabs/elasticsearch.keytab",
"hdfs/hdfs.build.elastic.co@BUILD.ELASTIC.CO"
),
PEPPA(
"peppa",
"/fixture/src/main/resources/provision/peppa.sh",
"/fixture/build/keytabs/peppa.keytab",
"/fixture/build/keytabs/HTTP_localhost.keytab",
"peppa@BUILD.ELASTIC.CO"
);
private final String id;
private final String scriptPath;
private final String keytabPath;
public final String esKeytab;
private final String keytabPrincipal;
ProvisioningId(String id, String scriptPath, String keytabPath, String esKeytab, String principal) {
this.id = id;
this.scriptPath = scriptPath;
this.keytabPath = keytabPath;
this.esKeytab = esKeytab;
this.keytabPrincipal = principal;
}
}
public Krb5kDcContainer() {
this(ProvisioningId.HDFS);
}
public Krb5kDcContainer(ProvisioningId provisioningId) {
super(new RemoteDockerImage(DOCKER_BASE_IMAGE));
this.provisioningId = provisioningId;
withNetwork(Network.newNetwork());
addExposedPorts(88, 4444);
withCreateContainerCmdModifier(cmd -> {
// Add previously exposed ports and UDP port
List<ExposedPort> exposedPorts = new ArrayList<>();
for (ExposedPort p : cmd.getExposedPorts()) {
exposedPorts.add(p);
}
exposedPorts.add(ExposedPort.udp(88));
cmd.withExposedPorts(exposedPorts);
// Add previous port bindings and UDP port binding
Ports ports = cmd.getPortBindings();
ports.bind(ExposedPort.udp(88), Ports.Binding.empty());
cmd.withPortBindings(ports);
});
withNetworkAliases("kerberos.build.elastic.co", "build.elastic.co");
withCopyFileToContainer(MountableFile.forHostPath("/dev/urandom"), "/dev/random");
withExtraHost("kerberos.build.elastic.co", "127.0.0.1");
withCommand("bash", provisioningId.scriptPath);
}
@Override
public void start() {
try {
temporaryFolder.create();
} catch (IOException e) {
throw new RuntimeException(e);
}
super.start();
System.setProperty("java.security.krb5.conf", getConfPath().toString());
}
@Override
public void stop() {
super.stop();
System.clearProperty("java.security.krb5.conf");
temporaryFolder.delete();
}
@SuppressWarnings("all")
public String getConf() {
var bindings = Arrays.asList(getCurrentContainerInfo().getNetworkSettings().getPorts().getBindings().get(ExposedPort.udp(88)))
.stream()
.findFirst();
String hostPortSpec = bindings.get().getHostPortSpec();
String s = copyFileFromContainer("/fixture/build/krb5.conf.template", i -> IOUtils.toString(i, StandardCharsets.UTF_8));
return s.replace("${MAPPED_PORT}", hostPortSpec);
}
public Path getKeytab() {
if (keytabFile != null) {
return keytabFile;
}
try {
String keytabPath = provisioningId.keytabPath;
keytabFile = temporaryFolder.newFile(provisioningId.id + ".keytab").toPath();
copyFileFromContainer(keytabPath, keytabFile.toAbsolutePath().toString());
return keytabFile;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public Path getEsKeytab() {
if (esKeytabFile != null) {
return esKeytabFile;
}
try {
esKeytabFile = temporaryFolder.newFile("elasticsearch.keytab").toPath();
copyFileFromContainer(provisioningId.esKeytab, esKeytabFile.toAbsolutePath().toString());
return esKeytabFile;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public Path getConfPath() {
if (krb5ConfFile != null) {
return krb5ConfFile;
}
try {
krb5ConfFile = temporaryFolder.newFile("krb5.conf").toPath();
Files.write(krb5ConfFile, getConf().getBytes(StandardCharsets.UTF_8));
return krb5ConfFile;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public String getPrincipal() {
return provisioningId.keytabPrincipal;
}
public String getEsPrincipal() {
return "elasticsearch@BUILD.ELASTIC.CO";
}
}

View File

@ -64,10 +64,16 @@ public abstract class DockerEnvironmentAwareTestContainer extends GenericContain
public void start() {
Assume.assumeFalse("Docker support excluded on OS", EXCLUDED_OS);
Assume.assumeTrue("Docker probing succesful", DOCKER_PROBING_SUCCESSFUL);
withLogConsumer(new Slf4jLogConsumer(logger()));
withLogConsumer(new Slf4jLogConsumer(LOGGER));
super.start();
}
@Override
public void stop() {
LOGGER.info("Stopping container {}", getContainerId());
super.stop();
}
@Override
public void cache() {
try {

View File

@ -14,18 +14,19 @@ import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.function.Supplier;
public class FileResource implements Resource {
private final Path file;
private final Supplier<Path> file;
FileResource(Path file) {
FileResource(Supplier<Path> file) {
this.file = file;
}
@Override
public InputStream asStream() {
try {
return Files.newInputStream(file, StandardOpenOption.READ);
return Files.newInputStream(file.get(), StandardOpenOption.READ);
} catch (IOException e) {
throw new UncheckedIOException(e);
}

View File

@ -32,6 +32,10 @@ public interface Resource {
}
static Resource fromFile(Path file) {
return fromFile(() -> file);
}
static Resource fromFile(Supplier<Path> file) {
return new FileResource(file);
}

View File

@ -278,11 +278,18 @@ public abstract class ESClientYamlSuiteTestCase extends ESRestTestCase {
return createParameters(executeableSectionRegistry, null);
}
/**
* Create parameters for this parameterized test.
*/
public static Iterable<Object[]> createParameters(String[] testPaths, Map<String, Object> yamlParameters) throws Exception {
return createParameters(ExecutableSection.XCONTENT_REGISTRY, testPaths, yamlParameters);
}
/**
* Create parameters for this parameterized test.
*/
public static Iterable<Object[]> createParameters(String[] testPaths) throws Exception {
return createParameters(ExecutableSection.XCONTENT_REGISTRY, testPaths);
return createParameters(testPaths, Collections.emptyMap());
}
/**
@ -295,6 +302,23 @@ public abstract class ESClientYamlSuiteTestCase extends ESRestTestCase {
*/
public static Iterable<Object[]> createParameters(NamedXContentRegistry executeableSectionRegistry, String[] testPaths)
throws Exception {
return createParameters(executeableSectionRegistry, testPaths, Collections.emptyMap());
}
/**
* Create parameters for this parameterized test.
*
* @param executeableSectionRegistry registry of executable sections
* @param testPaths list of paths to explicitly search for tests. If <code>null</code> then include all tests in root path.
* @param yamlParameters map or parameters used within the yaml specs to be replaced at parsing time.
* @return list of test candidates.
* @throws Exception
*/
public static Iterable<Object[]> createParameters(
NamedXContentRegistry executeableSectionRegistry,
String[] testPaths,
Map<String, ?> yamlParameters
) throws Exception {
if (testPaths != null && System.getProperty(REST_TESTS_SUITE) != null) {
throw new IllegalArgumentException("The '" + REST_TESTS_SUITE + "' system property is not supported with explicit test paths.");
}
@ -308,7 +332,7 @@ public abstract class ESClientYamlSuiteTestCase extends ESRestTestCase {
for (String api : yamlSuites.keySet()) {
List<Path> yamlFiles = new ArrayList<>(yamlSuites.get(api));
for (Path yamlFile : yamlFiles) {
ClientYamlTestSuite suite = ClientYamlTestSuite.parse(executeableSectionRegistry, api, yamlFile);
ClientYamlTestSuite suite = ClientYamlTestSuite.parse(executeableSectionRegistry, api, yamlFile, yamlParameters);
suites.add(suite);
try {
suite.validate();

View File

@ -0,0 +1,295 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package org.elasticsearch.test.rest.yaml;
import org.elasticsearch.core.CheckedFunction;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.RestApiVersion;
import org.elasticsearch.xcontent.DeprecationHandler;
import org.elasticsearch.xcontent.NamedXContentRegistry;
import org.elasticsearch.xcontent.XContentLocation;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xcontent.XContentType;
import java.io.IOException;
import java.nio.CharBuffer;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import java.util.stream.Collectors;
/**
* a wrapper around YamlXContentParser that allows for parameter replacement in the yaml file
*/
public class ParameterizableYamlXContentParser implements XContentParser {
private final XContentParser delegate;
private final Map<String, ?> params;
public ParameterizableYamlXContentParser(XContentParser delegate, Map<String, ?> params) {
this.delegate = delegate;
this.params = params.entrySet().stream().collect(Collectors.toMap(e -> "@" + e.getKey() + "@", Map.Entry::getValue));
}
@Override
public XContentType contentType() {
return delegate.contentType();
}
@Override
public void allowDuplicateKeys(boolean allowDuplicateKeys) {
delegate.allowDuplicateKeys(allowDuplicateKeys);
}
@Override
public Token nextToken() throws IOException {
return delegate.nextToken();
}
@Override
@Nullable
public String nextFieldName() throws IOException {
return delegate.nextFieldName();
}
@Override
public void skipChildren() throws IOException {
delegate.skipChildren();
}
@Override
public Token currentToken() {
return delegate.currentToken();
}
@Override
public String currentName() throws IOException {
return delegate.currentName();
}
@Override
public Map<String, Object> map() throws IOException {
return delegate.map();
}
@Override
public Map<String, Object> mapOrdered() throws IOException {
return visitMapForParameterReplacements(delegate.mapOrdered());
}
private Map<String, Object> visitMapForParameterReplacements(Map<String, Object> stringObjectMap) {
var updatedMap = stringObjectMap.entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> maybeReplaceParams(e.getValue())));
return updatedMap;
}
@SuppressWarnings("unchecked")
private Object maybeReplaceParams(Object inputValue) {
if (inputValue == null) {
return null;
}
if (inputValue instanceof Map) {
return visitMapForParameterReplacements((Map<String, Object>) inputValue);
}
if (inputValue instanceof String) {
if (((String) inputValue).matches(".*@.*@.*")) {
String value = (String) inputValue;
for (String s : params.keySet()) {
if (value.contains(s)) {
value = value.replace(s, params.get(s).toString());
}
}
return value;
}
}
return inputValue;
}
@Override
public Map<String, String> mapStrings() throws IOException {
return delegate.mapStrings();
}
@Override
public <T> Map<String, T> map(Supplier<Map<String, T>> mapFactory, CheckedFunction<XContentParser, T, IOException> mapValueParser)
throws IOException {
return delegate.map(mapFactory, mapValueParser);
}
@Override
public List<Object> list() throws IOException {
return delegate.list();
}
@Override
public List<Object> listOrderedMap() throws IOException {
return delegate.listOrderedMap();
}
@Override
public String text() throws IOException {
return delegate.text();
}
@Override
public String textOrNull() throws IOException {
return delegate.textOrNull();
}
@Override
public CharBuffer charBufferOrNull() throws IOException {
return delegate.charBufferOrNull();
}
@Override
public CharBuffer charBuffer() throws IOException {
return delegate.charBuffer();
}
@Override
public Object objectText() throws IOException {
return delegate.objectText();
}
@Override
public Object objectBytes() throws IOException {
return delegate.objectBytes();
}
@Override
public boolean hasTextCharacters() {
return delegate.hasTextCharacters();
}
@Override
public char[] textCharacters() throws IOException {
return delegate.textCharacters();
}
@Override
public int textLength() throws IOException {
return delegate.textLength();
}
@Override
public int textOffset() throws IOException {
return delegate.textOffset();
}
@Override
public Number numberValue() throws IOException {
return delegate.numberValue();
}
@Override
public NumberType numberType() throws IOException {
return delegate.numberType();
}
@Override
public short shortValue(boolean coerce) throws IOException {
return delegate.shortValue(coerce);
}
@Override
public int intValue(boolean coerce) throws IOException {
return delegate.intValue(coerce);
}
@Override
public long longValue(boolean coerce) throws IOException {
return delegate.longValue(coerce);
}
@Override
public float floatValue(boolean coerce) throws IOException {
return delegate.floatValue(coerce);
}
@Override
public double doubleValue(boolean coerce) throws IOException {
return delegate.doubleValue(coerce);
}
@Override
public short shortValue() throws IOException {
return delegate.shortValue();
}
@Override
public int intValue() throws IOException {
return delegate.intValue();
}
@Override
public long longValue() throws IOException {
return delegate.longValue();
}
@Override
public float floatValue() throws IOException {
return delegate.floatValue();
}
@Override
public double doubleValue() throws IOException {
return delegate.doubleValue();
}
@Override
public boolean isBooleanValue() throws IOException {
return delegate.isBooleanValue();
}
@Override
public boolean booleanValue() throws IOException {
return delegate.booleanValue();
}
@Override
public byte[] binaryValue() throws IOException {
return delegate.binaryValue();
}
@Override
public XContentLocation getTokenLocation() {
return delegate.getTokenLocation();
}
@Override
public <T> T namedObject(Class<T> categoryClass, String name, Object context) throws IOException {
return getXContentRegistry().parseNamedObject(categoryClass, name, this, context);
}
@Override
public NamedXContentRegistry getXContentRegistry() {
return delegate.getXContentRegistry();
}
@Override
public boolean isClosed() {
return delegate.isClosed();
}
@Override
public RestApiVersion getRestApiVersion() {
return delegate.getRestApiVersion();
}
@Override
public DeprecationHandler getDeprecationHandler() {
return delegate.getDeprecationHandler();
}
@Override
public void close() throws IOException {
delegate.close();
}
}

View File

@ -10,6 +10,7 @@ package org.elasticsearch.test.rest.yaml.section;
import org.elasticsearch.client.NodeSelector;
import org.elasticsearch.common.ParsingException;
import org.elasticsearch.common.io.Channels;
import org.elasticsearch.test.rest.yaml.ParameterizableYamlXContentParser;
import org.elasticsearch.xcontent.NamedXContentRegistry;
import org.elasticsearch.xcontent.XContentParseException;
import org.elasticsearch.xcontent.XContentParser;
@ -26,6 +27,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
@ -38,7 +40,8 @@ import java.util.stream.Stream;
* Supports a setup section and multiple test sections.
*/
public class ClientYamlTestSuite {
public static ClientYamlTestSuite parse(NamedXContentRegistry executeableSectionRegistry, String api, Path file) throws IOException {
public static ClientYamlTestSuite parse(NamedXContentRegistry executeableSectionRegistry, String api, Path file, Map<String, ?> params)
throws IOException {
if (Files.isRegularFile(file) == false) {
throw new IllegalArgumentException(file.toAbsolutePath() + " is not a file");
}
@ -63,10 +66,18 @@ public class ClientYamlTestSuite {
}
try (
XContentParser parser = YamlXContent.yamlXContent.createParser(
XContentParser parser = params.isEmpty()
? YamlXContent.yamlXContent.createParser(
XContentParserConfiguration.EMPTY.withRegistry(executeableSectionRegistry),
Files.newInputStream(file)
)
: new ParameterizableYamlXContentParser(
YamlXContent.yamlXContent.createParser(
XContentParserConfiguration.EMPTY.withRegistry(executeableSectionRegistry),
Files.newInputStream(file)
),
params
)
) {
return parse(api, filename, Optional.of(file), parser);
} catch (Exception e) {
@ -103,6 +114,10 @@ public class ClientYamlTestSuite {
return new ClientYamlTestSuite(api, suiteName, file, setupSection, teardownSection, new ArrayList<>(testSections));
}
public static ClientYamlTestSuite parse(NamedXContentRegistry xcontentRegistry, String api, Path filePath) throws IOException {
return parse(xcontentRegistry, api, filePath, Collections.emptyMap());
}
private final String api;
private final String name;
private final Optional<Path> file;

View File

@ -5,28 +5,17 @@
* 2.0.
*/
import org.elasticsearch.gradle.OS
import org.elasticsearch.gradle.internal.info.BuildParams
import org.elasticsearch.gradle.internal.test.RestIntegTestTask
import org.elasticsearch.gradle.internal.util.ports.ReservedPortRange
import java.nio.file.Files
import java.nio.file.Paths
import static org.elasticsearch.gradle.PropertyNormalization.IGNORE_VALUE
apply plugin: 'elasticsearch.test.fixtures'
apply plugin: 'elasticsearch.legacy-java-rest-test'
apply plugin: 'elasticsearch.internal-java-rest-test'
apply plugin: 'elasticsearch.rest-resources'
apply plugin: 'elasticsearch.internal-available-ports'
final Project hdfsFixtureProject = project(':test:fixtures:hdfs2-fixture')
final Project krbFixtureProject = project(':test:fixtures:krb5kdc-fixture')
final Project hdfsRepoPluginProject = project(':plugins:repository-hdfs')
dependencies {
clusterPlugins project(':plugins:repository-hdfs')
javaRestTestImplementation(testArtifact(project(xpackModule('searchable-snapshots'))))
javaRestTestImplementation hdfsRepoPluginProject
javaRestTestImplementation project(path: ':test:fixtures:hdfs-fixture', configuration:"shadowedHdfs2")
javaRestTestImplementation project(':test:fixtures:krb5kdc-fixture')
javaRestTestRuntimeOnly "com.google.guava:guava:16.0.1"
javaRestTestRuntimeOnly "commons-cli:commons-cli:1.2"
}
restResources {
@ -35,152 +24,7 @@ restResources {
}
}
testFixtures.useFixture(krbFixtureProject.path, 'hdfs-snapshot')
configurations {
hdfsFixture
}
dependencies {
hdfsFixture hdfsFixtureProject
// Set the keytab files in the classpath so that we can access them from test code without the security manager freaking out.
if (isEclipse == false) {
javaRestTestRuntimeOnly files(krbFixtureProject.ext.krb5Keytabs("hdfs-snapshot", "hdfs_hdfs.build.elastic.co.keytab").parent){
builtBy ":test:fixtures:krb5kdc-fixture:preProcessFixture"
}
}
}
normalization {
runtimeClasspath {
// ignore generated keytab files for the purposes of build avoidance
ignore '*.keytab'
// ignore fixture ports file which is on the classpath primarily to pacify the security manager
ignore 'ports'
}
}
String realm = "BUILD.ELASTIC.CO"
String krb5conf = krbFixtureProject.ext.krb5Conf("hdfs")
// Create HDFS File System Testing Fixtures
for (String fixtureName : ['hdfsFixture', 'secureHdfsFixture']) {
project.tasks.register(fixtureName, org.elasticsearch.gradle.internal.test.AntFixture) {
dependsOn project.configurations.hdfsFixture, krbFixtureProject.tasks.postProcessFixture
executable = "${BuildParams.runtimeJavaHome}/bin/java"
env 'CLASSPATH', "${-> project.configurations.hdfsFixture.asPath}"
maxWaitInSeconds 60
BuildParams.withFipsEnabledOnly(it)
waitCondition = { fixture, ant ->
// the hdfs.MiniHDFS fixture writes the ports file when
// it's ready, so we can just wait for the file to exist
return fixture.portsFile.exists()
}
final List<String> miniHDFSArgs = []
// If it's a secure fixture, then depend on Kerberos Fixture and principals + add the krb5conf to the JVM options
if (name.equals('secureHdfsFixture')) {
miniHDFSArgs.addAll(["--add-exports", "java.security.jgss/sun.security.krb5=ALL-UNNAMED"])
miniHDFSArgs.add("-Djava.security.krb5.conf=${krb5conf}")
}
// configure port dynamically
def portRange = project.getExtensions().getByType(ReservedPortRange)
miniHDFSArgs.add("-Dhdfs.config.port=${portRange.getOrAllocate(name)}")
// Common options
miniHDFSArgs.add('hdfs.MiniHDFS')
miniHDFSArgs.add(baseDir)
// If it's a secure fixture, then set the principal name and keytab locations to use for auth.
if (name.equals('secureHdfsFixture')) {
miniHDFSArgs.add("hdfs/hdfs.build.elastic.co@${realm}")
miniHDFSArgs.add(project(':test:fixtures:krb5kdc-fixture').ext.krb5Keytabs("hdfs", "hdfs_hdfs.build.elastic.co.keytab"))
}
args miniHDFSArgs.toArray()
}
}
// Disable integration test if Fips mode
tasks.named("javaRestTest").configure {
description = "Runs rest tests against an elasticsearch cluster with HDFS."
def hdfsPort = project.getExtensions().getByType(ReservedPortRange).getOrAllocate("hdfsFixture")
systemProperty 'test.hdfs.uri', "hdfs://localhost:$hdfsPort"
nonInputProperties.systemProperty 'test.hdfs.path', '/user/elasticsearch/test/searchable_snapshots/simple'
BuildParams.withFipsEnabledOnly(it)
}
tasks.register("javaRestTestSecure", RestIntegTestTask) {
description = "Runs rest tests against an elasticsearch cluster with Secured HDFS."
def hdfsPort = project.getExtensions().getByType(ReservedPortRange).getOrAllocate("secureHdfsFixture")
nonInputProperties.systemProperty 'test.hdfs.uri', "hdfs://localhost:$hdfsPort"
nonInputProperties.systemProperty 'test.hdfs.path', '/user/elasticsearch/test/searchable_snapshots/secure'
nonInputProperties.systemProperty "test.krb5.principal.es", "elasticsearch@${realm}"
nonInputProperties.systemProperty "test.krb5.principal.hdfs", "hdfs/hdfs.build.elastic.co@${realm}"
nonInputProperties.systemProperty(
"test.krb5.keytab.hdfs",
project(':test:fixtures:krb5kdc-fixture').ext.krb5Keytabs("hdfs", "hdfs_hdfs.build.elastic.co.keytab")
)
testClassesDirs = sourceSets.javaRestTest.output.classesDirs
classpath = sourceSets.javaRestTest.runtimeClasspath
BuildParams.withFipsEnabledOnly(it)
}
tasks.named("check").configure { dependsOn("javaRestTestSecure") }
testClusters.configureEach {
testDistribution = 'DEFAULT'
plugin(hdfsRepoPluginProject.path)
setting 'xpack.license.self_generated.type', 'trial'
setting 'xpack.searchable.snapshot.shared_cache.size', '16MB'
setting 'xpack.searchable.snapshot.shared_cache.region_size', '256KB'
setting 'xpack.security.enabled', 'false'
}
testClusters.matching { it.name == "javaRestTestSecure" }.configureEach {
systemProperty "java.security.krb5.conf", krb5conf
jvmArgs "--add-exports", "java.security.jgss/sun.security.krb5=ALL-UNNAMED"
extraConfigFile(
"repository-hdfs/krb5.keytab",
file("${project(':test:fixtures:krb5kdc-fixture').ext.krb5Keytabs("hdfs", "elasticsearch.keytab")}"), IGNORE_VALUE
)
}
// Determine HDFS Fixture compatibility for the current build environment.
boolean fixtureSupported = false
if (OS.current() != OS.WINDOWS) {
// hdfs fixture will not start without hadoop native libraries on windows
String nativePath = System.getenv("HADOOP_HOME")
if (nativePath != null) {
java.nio.file.Path path = Paths.get(nativePath)
if (Files.isDirectory(path) &&
Files.exists(path.resolve("bin").resolve("winutils.exe")) &&
Files.exists(path.resolve("bin").resolve("hadoop.dll")) &&
Files.exists(path.resolve("bin").resolve("hdfs.dll"))) {
fixtureSupported = true
} else {
throw new IllegalStateException("HADOOP_HOME: ${path} is invalid, does not contain hadoop native libraries in \$HADOOP_HOME/bin")
}
}
} else {
fixtureSupported = true
}
boolean legalPath = rootProject.rootDir.toString().contains(" ") == false
if (legalPath == false) {
fixtureSupported = false
}
if (fixtureSupported) {
tasks.named("javaRestTest").configure {dependsOn "hdfsFixture" }
tasks.named("javaRestTestSecure").configure {dependsOn "secureHdfsFixture" }
} else {
tasks.named("javaRestTest").configure {enabled = false }
tasks.named("javaRestTestSecure").configure { enabled = false }
if (legalPath) {
logger.warn("hdfsFixture unsupported, please set HADOOP_HOME and put HADOOP_HOME\\bin in PATH")
} else {
logger.warn("hdfsFixture unsupported since there are spaces in the path: '" + rootProject.rootDir.toString() + "'")
}
usesDefaultDistribution()
jvmArgs '--add-exports', 'java.security.jgss/sun.security.krb5=ALL-UNNAMED'
}

View File

@ -7,13 +7,39 @@
package org.elasticsearch.xpack.searchablesnapshots.hdfs;
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
import org.elasticsearch.test.fixtures.hdfs.HdfsClientThreadLeakFilter;
import org.elasticsearch.test.fixtures.hdfs.HdfsFixture;
import org.elasticsearch.xpack.searchablesnapshots.AbstractSearchableSnapshotsRestTestCase;
import org.junit.ClassRule;
import org.junit.rules.RuleChain;
import org.junit.rules.TestRule;
import static org.hamcrest.Matchers.blankOrNullString;
import static org.hamcrest.Matchers.not;
@ThreadLeakFilters(filters = { HdfsClientThreadLeakFilter.class })
public class HdfsSearchableSnapshotsIT extends AbstractSearchableSnapshotsRestTestCase {
public static HdfsFixture hdfsFixture = new HdfsFixture();
public static ElasticsearchCluster cluster = ElasticsearchCluster.local()
.distribution(DistributionType.DEFAULT)
.plugin("repository-hdfs")
.setting("xpack.searchable.snapshot.shared_cache.size", "16MB")
.setting("xpack.searchable.snapshot.shared_cache.region_size", "256KB")
.setting("xpack.license.self_generated.type", "trial")
.setting("xpack.security.enabled", "false")
.build();
@ClassRule
public static TestRule ruleChain = RuleChain.outerRule(hdfsFixture).around(cluster);
@Override
protected String getTestRestCluster() {
return cluster.getHttpAddresses();
}
@Override
protected String writeRepositoryType() {
return "hdfs";
@ -21,19 +47,9 @@ public class HdfsSearchableSnapshotsIT extends AbstractSearchableSnapshotsRestTe
@Override
protected Settings writeRepositorySettings() {
final String uri = System.getProperty("test.hdfs.uri");
assertThat(uri, not(blankOrNullString()));
final String path = System.getProperty("test.hdfs.path");
assertThat(path, not(blankOrNullString()));
// Optional based on type of test
final String principal = System.getProperty("test.krb5.principal.es");
final String uri = "hdfs://localhost:" + hdfsFixture.getPort();
final String path = "/user/elasticsearch/test/searchable_snapshots/simple";
Settings.Builder repositorySettings = Settings.builder().put("client", "searchable_snapshots").put("uri", uri).put("path", path);
if (principal != null) {
repositorySettings.put("security.principal", principal);
}
return repositorySettings.build();
}
}

View File

@ -0,0 +1,67 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.searchablesnapshots.hdfs;
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
import org.elasticsearch.test.cluster.util.resource.Resource;
import org.elasticsearch.test.fixtures.hdfs.HdfsClientThreadLeakFilter;
import org.elasticsearch.test.fixtures.hdfs.HdfsFixture;
import org.elasticsearch.test.fixtures.krb5kdc.Krb5kDcContainer;
import org.elasticsearch.test.fixtures.testcontainers.TestContainersThreadFilter;
import org.elasticsearch.xpack.searchablesnapshots.AbstractSearchableSnapshotsRestTestCase;
import org.junit.ClassRule;
import org.junit.rules.RuleChain;
import org.junit.rules.TestRule;
@ThreadLeakFilters(filters = { HdfsClientThreadLeakFilter.class, TestContainersThreadFilter.class })
public class SecureHdfsSearchableSnapshotsIT extends AbstractSearchableSnapshotsRestTestCase {
public static Krb5kDcContainer krb5Fixture = new Krb5kDcContainer();
public static HdfsFixture hdfsFixture = new HdfsFixture().withKerberos(() -> krb5Fixture.getPrincipal(), () -> krb5Fixture.getKeytab());
public static ElasticsearchCluster cluster = ElasticsearchCluster.local()
.distribution(DistributionType.DEFAULT)
.plugin("repository-hdfs")
.setting("xpack.searchable.snapshot.shared_cache.size", "16MB")
.setting("xpack.searchable.snapshot.shared_cache.region_size", "256KB")
.setting("xpack.license.self_generated.type", "trial")
.setting("xpack.security.enabled", "false")
.systemProperty("java.security.krb5.conf", () -> krb5Fixture.getConfPath().toString())
.configFile("repository-hdfs/krb5.keytab", Resource.fromFile(() -> krb5Fixture.getEsKeytab()))
.build();
@ClassRule
public static TestRule ruleChain = RuleChain.outerRule(krb5Fixture).around(hdfsFixture).around(cluster);
@Override
protected String getTestRestCluster() {
return cluster.getHttpAddresses();
}
@Override
protected String writeRepositoryType() {
return "hdfs";
}
@Override
protected Settings writeRepositorySettings() {
final String uri = "hdfs://localhost:" + hdfsFixture.getPort();
final String path = "/user/elasticsearch/test/searchable_snapshots/secure";
Settings.Builder repositorySettings = Settings.builder().put("client", "searchable_snapshots").put("uri", uri).put("path", path);
final String principal = "elasticsearch@BUILD.ELASTIC.CO";
repositorySettings.put("security.principal", principal);
return repositorySettings.build();
}
}

View File

@ -5,29 +5,19 @@
* 2.0.
*/
import org.elasticsearch.gradle.OS
import org.elasticsearch.gradle.internal.info.BuildParams
import org.elasticsearch.gradle.internal.test.RestIntegTestTask
import org.elasticsearch.gradle.internal.util.ports.ReservedPortRange
import java.nio.file.Files
import java.nio.file.Paths
import static org.elasticsearch.gradle.PropertyNormalization.IGNORE_VALUE
apply plugin: 'elasticsearch.test.fixtures'
apply plugin: 'elasticsearch.legacy-java-rest-test'
apply plugin: 'elasticsearch.internal-java-rest-test'
apply plugin: 'elasticsearch.rest-resources'
apply plugin: 'elasticsearch.internal-available-ports'
final Project hdfsFixtureProject = project(':test:fixtures:hdfs2-fixture')
final Project krbFixtureProject = project(':test:fixtures:krb5kdc-fixture')
final Project hdfsRepoPluginProject = project(':plugins:repository-hdfs')
dependencies {
javaRestTestImplementation testArtifact(project(xpackModule('snapshot-repo-test-kit')))
javaRestTestImplementation project(':plugins:repository-hdfs')
javaRestTestImplementation project(path: ':test:fixtures:hdfs-fixture', configuration:"shadow")
javaRestTestImplementation project(':test:fixtures:krb5kdc-fixture')
javaRestTestImplementation "org.slf4j:slf4j-api:${versions.slf4j}"
javaRestTestImplementation "org.slf4j:slf4j-simple:${versions.slf4j}"
javaRestTestRuntimeOnly "com.google.guava:guava:16.0.1"
javaRestTestRuntimeOnly "commons-cli:commons-cli:1.2"
}
restResources {
@ -36,151 +26,15 @@ restResources {
}
}
testFixtures.useFixture(krbFixtureProject.path, 'hdfs-snapshot-repo-tests')
configurations {
hdfsFixture
}
dependencies {
hdfsFixture hdfsFixtureProject
// Set the keytab files in the classpath so that we can access them from test code without the security manager freaking out.
if (isEclipse == false) {
testRuntimeOnly files(krbFixtureProject.ext.krb5Keytabs("hdfs-snapshot-repo-tests", "hdfs_hdfs.build.elastic.co.keytab").parent){
builtBy ":test:fixtures:krb5kdc-fixture:preProcessFixture"
}
}
}
normalization {
runtimeClasspath {
// ignore generated keytab files for the purposes of build avoidance
ignore '*.keytab'
// ignore fixture ports file which is on the classpath primarily to pacify the security manager
ignore 'ports'
}
}
String realm = "BUILD.ELASTIC.CO"
String krb5conf = krbFixtureProject.ext.krb5Conf("hdfs")
// Create HDFS File System Testing Fixtures
for (String fixtureName : ['hdfsFixture', 'secureHdfsFixture']) {
project.tasks.register(fixtureName, org.elasticsearch.gradle.internal.test.AntFixture) {
dependsOn project.configurations.hdfsFixture, krbFixtureProject.tasks.postProcessFixture
executable = "${BuildParams.runtimeJavaHome}/bin/java"
env 'CLASSPATH', "${-> project.configurations.hdfsFixture.asPath}"
maxWaitInSeconds 60
BuildParams.withFipsEnabledOnly(it)
waitCondition = { fixture, ant ->
// the hdfs.MiniHDFS fixture writes the ports file when
// it's ready, so we can just wait for the file to exist
return fixture.portsFile.exists()
}
final List<String> miniHDFSArgs = []
// If it's a secure fixture, then depend on Kerberos Fixture and principals + add the krb5conf to the JVM options
if (name.equals('secureHdfsFixture')) {
onlyIf("Only runtime java version < 16") { BuildParams.runtimeJavaVersion < JavaVersion.VERSION_16 }
miniHDFSArgs.addAll(["--add-exports", "java.security.jgss/sun.security.krb5=ALL-UNNAMED"])
miniHDFSArgs.add("-Djava.security.krb5.conf=${krb5conf}")
}
// configure port dynamically
def portRange = project.getExtensions().getByType(ReservedPortRange)
miniHDFSArgs.add("-Dhdfs.config.port=${portRange.getOrAllocate(name)}")
// Common options
miniHDFSArgs.add('hdfs.MiniHDFS')
miniHDFSArgs.add(baseDir)
// If it's a secure fixture, then set the principal name and keytab locations to use for auth.
if (name.equals('secureHdfsFixture')) {
miniHDFSArgs.add("hdfs/hdfs.build.elastic.co@${realm}")
miniHDFSArgs.add(project(':test:fixtures:krb5kdc-fixture').ext.krb5Keytabs("hdfs", "hdfs_hdfs.build.elastic.co.keytab"))
}
args miniHDFSArgs.toArray()
}
clusterPlugins project(':plugins:repository-hdfs')
}
// Disable integration test if Fips mode
tasks.named("javaRestTest").configure {
usesDefaultDistribution()
description = "Runs rest tests against an elasticsearch cluster with HDFS."
def hdfsPort = project.getExtensions().getByType(ReservedPortRange).getOrAllocate("hdfsFixture")
systemProperty 'test.hdfs.uri', "hdfs://localhost:$hdfsPort"
nonInputProperties.systemProperty 'test.hdfs.path', '/user/elasticsearch/test/repository_test_kit/simple'
BuildParams.withFipsEnabledOnly(it)
}
tasks.register("javaRestTestSecure", RestIntegTestTask) {
description = "Runs rest tests against an elasticsearch cluster with Secured HDFS."
def hdfsPort = project.getExtensions().getByType(ReservedPortRange).getOrAllocate("secureHdfsFixture")
nonInputProperties.systemProperty 'test.hdfs.uri', "hdfs://localhost:$hdfsPort"
nonInputProperties.systemProperty 'test.hdfs.path', '/user/elasticsearch/test/repository_test_kit/secure'
nonInputProperties.systemProperty "test.krb5.principal.es", "elasticsearch@${realm}"
nonInputProperties.systemProperty "test.krb5.principal.hdfs", "hdfs/hdfs.build.elastic.co@${realm}"
nonInputProperties.systemProperty(
"test.krb5.keytab.hdfs",
project(':test:fixtures:krb5kdc-fixture').ext.krb5Keytabs("hdfs", "hdfs_hdfs.build.elastic.co.keytab")
)
onlyIf("FIPS mode disabled and runtime java < 16") {
BuildParams.inFipsJvm == false && BuildParams.runtimeJavaVersion < JavaVersion.VERSION_16
}
testClassesDirs = sourceSets.javaRestTest.output.classesDirs
classpath = sourceSets.javaRestTest.runtimeClasspath
}
tasks.named("check").configure { dependsOn("javaRestTestSecure") }
testClusters.configureEach {
testDistribution = 'DEFAULT'
plugin(hdfsRepoPluginProject.path)
setting 'xpack.license.self_generated.type', 'trial'
setting 'xpack.security.enabled', 'false'
}
testClusters.matching { it.name == "javaRestTestSecure" }.configureEach {
systemProperty "java.security.krb5.conf", krb5conf
extraConfigFile(
"repository-hdfs/krb5.keytab",
file("${project(':test:fixtures:krb5kdc-fixture').ext.krb5Keytabs("hdfs", "elasticsearch.keytab")}"), IGNORE_VALUE
)
}
// Determine HDFS Fixture compatibility for the current build environment.
boolean fixtureSupported = false
if (OS.current() == OS.WINDOWS) {
// hdfs fixture will not start without hadoop native libraries on windows
String nativePath = System.getenv("HADOOP_HOME")
if (nativePath != null) {
java.nio.file.Path path = Paths.get(nativePath)
if (Files.isDirectory(path) &&
Files.exists(path.resolve("bin").resolve("winutils.exe")) &&
Files.exists(path.resolve("bin").resolve("hadoop.dll")) &&
Files.exists(path.resolve("bin").resolve("hdfs.dll"))) {
fixtureSupported = true
} else {
throw new IllegalStateException("HADOOP_HOME: ${path} is invalid, does not contain hadoop native libraries in \$HADOOP_HOME/bin")
}
}
} else {
fixtureSupported = true
}
boolean legalPath = rootProject.rootDir.toString().contains(" ") == false
if (legalPath == false) {
fixtureSupported = false
}
if (fixtureSupported) {
tasks.named("javaRestTest").configure {dependsOn "hdfsFixture" }
tasks.named("javaRestTestSecure").configure {dependsOn "secureHdfsFixture" }
} else {
tasks.named("javaRestTest").configure {enabled = false }
tasks.named("javaRestTestSecure").configure { enabled = false }
if (legalPath) {
logger.warn("hdfsFixture unsupported, please set HADOOP_HOME and put HADOOP_HOME\\bin in PATH")
} else {
logger.warn("hdfsFixture unsupported since there are spaces in the path: '" + rootProject.rootDir.toString() + "'")
}
// required for krb5kdc-fixture to work
jvmArgs '--add-exports', 'java.security.jgss/sun.security.krb5=ALL-UNNAMED'
}

View File

@ -0,0 +1,38 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.repositories.blobstore.testkit;
import org.elasticsearch.common.settings.Settings;
import static org.hamcrest.Matchers.blankOrNullString;
import static org.hamcrest.Matchers.not;
public abstract class AbstractHdfsSnapshotRepoTestKitIT extends AbstractSnapshotRepoTestKitRestTestCase {
@Override
protected String repositoryType() {
return "hdfs";
}
@Override
protected Settings repositorySettings() {
final String uri = "hdfs://localhost:" + getHdfsPort();
// final String uri = System.getProperty("test.hdfs.uri");
assertThat(uri, not(blankOrNullString()));
final String path = getRepositoryPath();
assertThat(path, not(blankOrNullString()));
Settings.Builder repositorySettings = Settings.builder().put("client", "repository_test_kit").put("uri", uri).put("path", path);
return repositorySettings.build();
}
protected abstract String getRepositoryPath();
protected abstract int getHdfsPort();
}

View File

@ -6,33 +6,43 @@
*/
package org.elasticsearch.repositories.blobstore.testkit;
import org.elasticsearch.common.settings.Settings;
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
import static org.hamcrest.Matchers.blankOrNullString;
import static org.hamcrest.Matchers.not;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
import org.elasticsearch.test.fixtures.hdfs.HdfsClientThreadLeakFilter;
import org.elasticsearch.test.fixtures.hdfs.HdfsFixture;
import org.junit.ClassRule;
import org.junit.rules.RuleChain;
import org.junit.rules.TestRule;
public class HdfsSnapshotRepoTestKitIT extends AbstractSnapshotRepoTestKitRestTestCase {
@ThreadLeakFilters(filters = { HdfsClientThreadLeakFilter.class })
public class HdfsSnapshotRepoTestKitIT extends AbstractHdfsSnapshotRepoTestKitIT {
public static HdfsFixture hdfsFixture = new HdfsFixture();
public static ElasticsearchCluster cluster = ElasticsearchCluster.local()
.distribution(DistributionType.DEFAULT)
.plugin("repository-hdfs")
.setting("xpack.license.self_generated.type", "trial")
.setting("xpack.security.enabled", "false")
.build();
@ClassRule
public static TestRule ruleChain = RuleChain.outerRule(hdfsFixture).around(cluster);
@Override
protected String repositoryType() {
return "hdfs";
protected String getTestRestCluster() {
return cluster.getHttpAddresses();
}
@Override
protected Settings repositorySettings() {
final String uri = System.getProperty("test.hdfs.uri");
assertThat(uri, not(blankOrNullString()));
final String path = System.getProperty("test.hdfs.path");
assertThat(path, not(blankOrNullString()));
// Optional based on type of test
final String principal = System.getProperty("test.krb5.principal.es");
Settings.Builder repositorySettings = Settings.builder().put("client", "repository_test_kit").put("uri", uri).put("path", path);
if (principal != null) {
repositorySettings.put("security.principal", principal);
protected String getRepositoryPath() {
return "/user/elasticsearch/test/repository_test_kit/simple";
}
return repositorySettings.build();
@Override
protected int getHdfsPort() {
return hdfsFixture.getPort();
}
}

View File

@ -0,0 +1,63 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.repositories.blobstore.testkit;
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
import org.elasticsearch.test.cluster.util.resource.Resource;
import org.elasticsearch.test.fixtures.hdfs.HdfsClientThreadLeakFilter;
import org.elasticsearch.test.fixtures.hdfs.HdfsFixture;
import org.elasticsearch.test.fixtures.krb5kdc.Krb5kDcContainer;
import org.elasticsearch.test.fixtures.testcontainers.TestContainersThreadFilter;
import org.junit.ClassRule;
import org.junit.rules.RuleChain;
import org.junit.rules.TestRule;
@ThreadLeakFilters(filters = { HdfsClientThreadLeakFilter.class, TestContainersThreadFilter.class })
public class SecureHdfsSnapshotRepoTestKitIT extends AbstractHdfsSnapshotRepoTestKitIT {
public static Krb5kDcContainer krb5Fixture = new Krb5kDcContainer();
public static HdfsFixture hdfsFixture = new HdfsFixture().withKerberos(() -> krb5Fixture.getPrincipal(), () -> krb5Fixture.getKeytab());
public static ElasticsearchCluster cluster = ElasticsearchCluster.local()
.distribution(DistributionType.DEFAULT)
.plugin("repository-hdfs")
.setting("xpack.license.self_generated.type", "trial")
.setting("xpack.security.enabled", "false")
.systemProperty("java.security.krb5.conf", () -> krb5Fixture.getConfPath().toString())
.configFile("repository-hdfs/krb5.conf", Resource.fromString(() -> krb5Fixture.getConf()))
.configFile("repository-hdfs/krb5.keytab", Resource.fromFile(() -> krb5Fixture.getEsKeytab()))
.build();
@ClassRule
public static TestRule ruleChain = RuleChain.outerRule(krb5Fixture).around(hdfsFixture).around(cluster);
@Override
protected String getTestRestCluster() {
return cluster.getHttpAddresses();
}
@Override
protected int getHdfsPort() {
return hdfsFixture.getPort();
}
@Override
protected String getRepositoryPath() {
return "/user/elasticsearch/test/repository_test_kit/secure";
}
@Override
protected Settings repositorySettings() {
return Settings.builder().put(super.repositorySettings()).put("security.principal", "elasticsearch@BUILD.ELASTIC.CO").build();
}
}

View File

@ -1,51 +1,15 @@
import java.nio.file.Path
import java.nio.file.Paths
apply plugin: 'elasticsearch.internal-java-rest-test'
apply plugin: 'elasticsearch.test.fixtures'
testFixtures.useFixture ":test:fixtures:krb5kdc-fixture", "peppa"
dependencies {
javaRestTestImplementation project(':x-pack:plugin:core')
javaRestTestImplementation(testArtifact(project(xpackModule('core'))))
javaRestTestImplementation(testArtifact(project(xpackModule('security'))))
javaRestTestImplementation project(':test:fixtures:krb5kdc-fixture')
}
normalization {
runtimeClasspath {
ignore 'krb5.conf'
ignore '*.keytab'
}
}
tasks.register("copyKeytabToGeneratedResources", Copy) {
from project(':test:fixtures:krb5kdc-fixture').ext.krb5Keytabs("peppa", "peppa.keytab")
into "$buildDir/generated-resources/keytabs"
from project(':test:fixtures:krb5kdc-fixture').ext.krb5Keytabs("peppa", "HTTP_localhost.keytab")
into "$buildDir/generated-resources/keytabs"
dependsOn ":test:fixtures:krb5kdc-fixture:postProcessFixture"
}
tasks.register("copyConfToGeneratedResources", Copy) {
from project(':test:fixtures:krb5kdc-fixture').ext.krb5Conf("peppa")
into "$buildDir/generated-resources/conf"
dependsOn ":test:fixtures:krb5kdc-fixture:postProcessFixture"
}
String realm = "BUILD.ELASTIC.CO"
tasks.named("javaRestTest").configure {
dependsOn "copyKeytabToGeneratedResources", "copyConfToGeneratedResources"
usesDefaultDistribution()
Path peppaKeytab = Paths.get("${project.buildDir}", "generated-resources", "keytabs", "peppa.keytab")
Path krb5Conf = Paths.get("${project.buildDir}", "generated-resources", "conf", "krb5.conf")
nonInputProperties.systemProperty 'test.userkt', "peppa@${realm}"
nonInputProperties.systemProperty 'test.userkt.keytab', "${peppaKeytab}"
nonInputProperties.systemProperty 'test.userpwd', "george@${realm}"
nonInputProperties.systemProperty 'test.krb5.conf', "${krb5Conf}"
nonInputProperties.systemProperty 'java.security.krb5.conf', "${krb5Conf}"
systemProperty 'test.userpwd.password', "dino_but_longer_than_14_chars"
systemProperty 'sun.security.krb5.debug', true
classpath += files("$buildDir/generated-resources/keytabs")
classpath += files("$buildDir/generated-resources/conf")
description = "Runs rest tests against an elasticsearch cluster with Kerberos."
// required for krb5kdc-fixture to work
jvmArgs '--add-exports', 'java.security.jgss/sun.security.krb5=ALL-UNNAMED'
}

View File

@ -7,6 +7,8 @@
package org.elasticsearch.xpack.security.authc.kerberos;
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.elasticsearch.client.Request;
@ -22,12 +24,16 @@ import org.elasticsearch.core.TimeValue;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
import org.elasticsearch.test.cluster.util.resource.Resource;
import org.elasticsearch.test.fixtures.krb5kdc.Krb5kDcContainer;
import org.elasticsearch.test.fixtures.testcontainers.TestContainersThreadFilter;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentType;
import org.ietf.jgss.GSSException;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.rules.RuleChain;
import org.junit.rules.TestRule;
import java.io.IOException;
import java.net.InetAddress;
@ -56,15 +62,16 @@ import static org.hamcrest.Matchers.nullValue;
* Demonstrates login by keytab and login by password for given user principal
* name using rest client.
*/
@ThreadLeakFilters(filters = { TestContainersThreadFilter.class })
public class KerberosAuthenticationIT extends ESRestTestCase {
private static final String ENABLE_KERBEROS_DEBUG_LOGS_KEY = "test.krb.debug";
private static final String TEST_USER_WITH_KEYTAB_KEY = "test.userkt";
private static final String TEST_USER_WITH_KEYTAB_PATH_KEY = "test.userkt.keytab";
private static final String TEST_USER_WITH_PWD_KEY = "test.userpwd";
private static final String TEST_USER_WITH_PWD_PASSWD_KEY = "test.userpwd.password";
private static final String TEST_USER_WITH_KEYTAB_KEY = "peppa@BUILD.ELASTIC.CO";
private static final String TEST_USER_WITH_PWD_KEY = "george@BUILD.ELASTIC.CO";
private static final String TEST_USER_WITH_PWD_PASSWD_KEY = "dino_but_longer_than_14_chars";
private static final String TEST_KERBEROS_REALM_NAME = "kerberos";
@ClassRule
public static Krb5kDcContainer krb5Fixture = new Krb5kDcContainer(Krb5kDcContainer.ProvisioningId.PEPPA);
public static ElasticsearchCluster cluster = ElasticsearchCluster.local()
.distribution(DistributionType.DEFAULT)
// force localhost IPv4 otherwise it is a chicken and egg problem where we need the keytab for the hostname when starting the
@ -81,13 +88,16 @@ public class KerberosAuthenticationIT extends ESRestTestCase {
.setting("xpack.security.authc.realms.kerberos.kerberos.keytab.path", "es.keytab")
.setting("xpack.security.authc.realms.kerberos.kerberos.krb.debug", "true")
.setting("xpack.security.authc.realms.kerberos.kerberos.remove_realm_name", "false")
.systemProperty("java.security.krb5.conf", System.getProperty("test.krb5.conf"))
.systemProperty("java.security.krb5.conf", () -> krb5Fixture.getConfPath().toString())
.systemProperty("sun.security.krb5.debug", "true")
.user("test_admin", "x-pack-test-password")
.user("test_kibana_user", "x-pack-test-password", "kibana_system", false)
.configFile("es.keytab", Resource.fromClasspath("HTTP_localhost.keytab"))
.configFile("es.keytab", Resource.fromFile(() -> krb5Fixture.getEsKeytab()))
.build();
@ClassRule
public static TestRule ruleChain = RuleChain.outerRule(krb5Fixture).around(cluster);
@Override
protected String getTestRestCluster() {
return cluster.getHttpAddresses();
@ -130,20 +140,19 @@ public class KerberosAuthenticationIT extends ESRestTestCase {
}
public void testLoginByKeytab() throws IOException, PrivilegedActionException {
final String userPrincipalName = System.getProperty(TEST_USER_WITH_KEYTAB_KEY);
final String keytabPath = System.getProperty(TEST_USER_WITH_KEYTAB_PATH_KEY);
final boolean enabledDebugLogs = Boolean.parseBoolean(System.getProperty(ENABLE_KERBEROS_DEBUG_LOGS_KEY));
final String keytabPath = krb5Fixture.getKeytab().toString();
final boolean enabledDebugLogs = Boolean.parseBoolean(ENABLE_KERBEROS_DEBUG_LOGS_KEY);
final SpnegoHttpClientConfigCallbackHandler callbackHandler = new SpnegoHttpClientConfigCallbackHandler(
userPrincipalName,
krb5Fixture.getPrincipal(),
keytabPath,
enabledDebugLogs
);
executeRequestAndVerifyResponse(userPrincipalName, callbackHandler);
executeRequestAndVerifyResponse(krb5Fixture.getPrincipal(), callbackHandler);
}
public void testLoginByUsernamePassword() throws IOException, PrivilegedActionException {
final String userPrincipalName = System.getProperty(TEST_USER_WITH_PWD_KEY);
final String password = System.getProperty(TEST_USER_WITH_PWD_PASSWD_KEY);
final String userPrincipalName = TEST_USER_WITH_PWD_KEY;
final String password = TEST_USER_WITH_PWD_PASSWD_KEY;
final boolean enabledDebugLogs = Boolean.parseBoolean(System.getProperty(ENABLE_KERBEROS_DEBUG_LOGS_KEY));
final SpnegoHttpClientConfigCallbackHandler callbackHandler = new SpnegoHttpClientConfigCallbackHandler(
userPrincipalName,
@ -154,8 +163,8 @@ public class KerberosAuthenticationIT extends ESRestTestCase {
}
public void testGetOauth2TokenInExchangeForKerberosTickets() throws PrivilegedActionException, GSSException, IOException {
final String userPrincipalName = System.getProperty(TEST_USER_WITH_PWD_KEY);
final String password = System.getProperty(TEST_USER_WITH_PWD_PASSWD_KEY);
final String userPrincipalName = TEST_USER_WITH_PWD_KEY;
final String password = TEST_USER_WITH_PWD_PASSWD_KEY;
final boolean enabledDebugLogs = Boolean.parseBoolean(System.getProperty(ENABLE_KERBEROS_DEBUG_LOGS_KEY));
final SpnegoHttpClientConfigCallbackHandler callbackHandler = new SpnegoHttpClientConfigCallbackHandler(
userPrincipalName,