diff --git a/server/src/main/java/module-info.java b/server/src/main/java/module-info.java index e6a44a0fba7a..ef3d5da1c953 100644 --- a/server/src/main/java/module-info.java +++ b/server/src/main/java/module-info.java @@ -431,7 +431,8 @@ module org.elasticsearch.server { org.elasticsearch.search.SearchFeatures, org.elasticsearch.script.ScriptFeatures, org.elasticsearch.search.retriever.RetrieversFeatures, - org.elasticsearch.action.admin.cluster.stats.ClusterStatsFeatures; + org.elasticsearch.action.admin.cluster.stats.ClusterStatsFeatures, + org.elasticsearch.ingest.IngestFeatures; uses org.elasticsearch.plugins.internal.SettingsExtension; uses RestExtension; diff --git a/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java b/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java index 06cefe35e6ac..92d2a8a45197 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java @@ -48,6 +48,7 @@ class SimulateExecutionService { pipeline.getVersion(), pipeline.getMetadata(), verbosePipelineProcessor, + pipeline.getFieldAccessPattern(), pipeline.getDeprecated() ); ingestDocument.executePipeline(verbosePipeline, (result, e) -> { diff --git a/server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java b/server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java index 926c1c2e8500..fbf0f7bc2f04 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java @@ -21,6 +21,7 @@ import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.core.RestApiVersion; import org.elasticsearch.core.UpdateForV10; +import org.elasticsearch.features.NodeFeature; import org.elasticsearch.index.VersionType; import org.elasticsearch.ingest.ConfigurationUtils; import org.elasticsearch.ingest.IngestDocument; @@ -38,6 +39,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.function.Predicate; public class SimulatePipelineRequest extends LegacyActionRequest implements ToXContentObject { private static final DeprecationLogger deprecationLogger = DeprecationLogger.getLogger(SimulatePipelineRequest.class); @@ -154,7 +156,8 @@ public class SimulatePipelineRequest extends LegacyActionRequest implements ToXC Map config, boolean verbose, IngestService ingestService, - RestApiVersion restApiVersion + RestApiVersion restApiVersion, + Predicate hasFeature ) throws Exception { Map pipelineConfig = ConfigurationUtils.readMap(null, null, config, Fields.PIPELINE); Pipeline pipeline = Pipeline.create( @@ -162,7 +165,8 @@ public class SimulatePipelineRequest extends LegacyActionRequest implements ToXC pipelineConfig, ingestService.getProcessorFactories(), ingestService.getScriptService(), - projectId + projectId, + hasFeature ); List ingestDocumentList = parseDocs(config, restApiVersion); return new Parsed(pipeline, ingestDocumentList, verbose); diff --git a/server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineTransportAction.java b/server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineTransportAction.java index f8e7668379af..b9586571fa9c 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineTransportAction.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineTransportAction.java @@ -18,11 +18,13 @@ import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.project.ProjectResolver; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.features.FeatureService; import org.elasticsearch.ingest.IngestService; import org.elasticsearch.injection.guice.Inject; import org.elasticsearch.tasks.Task; @@ -51,6 +53,8 @@ public class SimulatePipelineTransportAction extends HandledTransportAction random = ThreadLocal.withInitial(Randomness::get); @@ -61,7 +65,9 @@ public class SimulatePipelineTransportAction extends HandledTransportAction featureService.clusterHasFeature(clusterService.state(), feature) ); } executionService.execute(simulateRequest, listener); diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestFeatures.java b/server/src/main/java/org/elasticsearch/ingest/IngestFeatures.java new file mode 100644 index 000000000000..bee1a2a272b2 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/ingest/IngestFeatures.java @@ -0,0 +1,27 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.ingest; + +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.features.FeatureSpecification; +import org.elasticsearch.features.NodeFeature; + +import java.util.Set; + +public class IngestFeatures implements FeatureSpecification { + @Override + public Set getFeatures() { + if (DataStream.LOGS_STREAM_FEATURE_FLAG) { + return Set.of(IngestService.FIELD_ACCESS_PATTERN); + } else { + return Set.of(); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestPipelineFieldAccessPattern.java b/server/src/main/java/org/elasticsearch/ingest/IngestPipelineFieldAccessPattern.java new file mode 100644 index 000000000000..878a7b9a3fb8 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/ingest/IngestPipelineFieldAccessPattern.java @@ -0,0 +1,51 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.ingest; + +import java.util.Map; + +public enum IngestPipelineFieldAccessPattern { + /** + * Field names will be split on the `.` character into their contingent parts. Resolution will strictly check + * for nested objects following the field path. + */ + CLASSIC("classic"), + /** + * Field names will be split on the `.` character into their contingent parts. Resolution will flexibly check + * for nested objects following the field path. If nested objects are not found for a key, the access pattern + * will fall back to joining subsequent path elements together until it finds the next object that matches the + * concatenated path. Allows for simple resolution of dotted field names. + */ + FLEXIBLE("flexible"); + + private final String key; + + IngestPipelineFieldAccessPattern(String key) { + this.key = key; + } + + public String getKey() { + return key; + } + + private static final Map NAME_REGISTRY = Map.of(CLASSIC.key, CLASSIC, FLEXIBLE.key, FLEXIBLE); + + public static boolean isValidAccessPattern(String accessPatternName) { + return NAME_REGISTRY.containsKey(accessPatternName); + } + + public static IngestPipelineFieldAccessPattern getAccessPattern(String accessPatternName) { + IngestPipelineFieldAccessPattern accessPattern = NAME_REGISTRY.get(accessPatternName); + if (accessPattern == null) { + throw new IllegalArgumentException("Invalid ingest pipeline access pattern name [" + accessPatternName + "] given"); + } + return accessPattern; + } +} diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 59cd9c11c9b6..1847368e9c40 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -66,6 +66,7 @@ import org.elasticsearch.core.Tuple; import org.elasticsearch.core.UpdateForV10; import org.elasticsearch.env.Environment; import org.elasticsearch.features.FeatureService; +import org.elasticsearch.features.NodeFeature; import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.grok.MatcherWatchdog; import org.elasticsearch.index.IndexSettings; @@ -119,6 +120,25 @@ public class IngestService implements ClusterStateApplier, ReportingService taskQueue; private final ClusterService clusterService; private final ScriptService scriptService; @@ -376,6 +396,10 @@ public class IngestService implements ClusterStateApplier, ReportingService featureService.clusterHasFeature(state, n) + ); List exceptions = new ArrayList<>(); for (Processor processor : pipeline.flattenAllProcessors()) { @@ -1428,7 +1459,8 @@ public class IngestService implements ClusterStateApplier, ReportingService featureService.clusterHasFeature(clusterService.state(), nodeFeature) ); newPipelines.put(newConfiguration.getId(), new PipelineHolder(newConfiguration, newPipeline)); @@ -1557,7 +1589,14 @@ public class IngestService implements ClusterStateApplier, ReportingService featureService.clusterHasFeature(state, nodeFeature) + ); ImmutableOpenMap updatedPipelines = ImmutableOpenMap.builder(originalPipelines) .fPut(id, new PipelineHolder(holder.configuration, updatedPipeline)) .build(); diff --git a/server/src/main/java/org/elasticsearch/ingest/Pipeline.java b/server/src/main/java/org/elasticsearch/ingest/Pipeline.java index b0747ddc2058..e1ade4393328 100644 --- a/server/src/main/java/org/elasticsearch/ingest/Pipeline.java +++ b/server/src/main/java/org/elasticsearch/ingest/Pipeline.java @@ -12,6 +12,7 @@ package org.elasticsearch.ingest; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.core.Nullable; +import org.elasticsearch.features.NodeFeature; import org.elasticsearch.script.ScriptService; import java.util.Arrays; @@ -19,6 +20,7 @@ import java.util.List; import java.util.Map; import java.util.function.BiConsumer; import java.util.function.LongSupplier; +import java.util.function.Predicate; /** * A pipeline is a list of {@link Processor} instances grouped under a unique id. @@ -30,6 +32,7 @@ public final class Pipeline { public static final String VERSION_KEY = "version"; public static final String ON_FAILURE_KEY = "on_failure"; public static final String META_KEY = "_meta"; + public static final String FIELD_ACCESS_PATTERN = "field_access_pattern"; public static final String DEPRECATED_KEY = "deprecated"; private final String id; @@ -42,6 +45,7 @@ public final class Pipeline { private final CompoundProcessor compoundProcessor; private final IngestPipelineMetric metrics; private final LongSupplier relativeTimeProvider; + private final IngestPipelineFieldAccessPattern fieldAccessPattern; @Nullable private final Boolean deprecated; @@ -52,7 +56,7 @@ public final class Pipeline { @Nullable Map metadata, CompoundProcessor compoundProcessor ) { - this(id, description, version, metadata, compoundProcessor, null); + this(id, description, version, metadata, compoundProcessor, IngestPipelineFieldAccessPattern.CLASSIC, null); } public Pipeline( @@ -61,9 +65,10 @@ public final class Pipeline { @Nullable Integer version, @Nullable Map metadata, CompoundProcessor compoundProcessor, + IngestPipelineFieldAccessPattern fieldAccessPattern, @Nullable Boolean deprecated ) { - this(id, description, version, metadata, compoundProcessor, System::nanoTime, deprecated); + this(id, description, version, metadata, compoundProcessor, System::nanoTime, fieldAccessPattern, deprecated); } // package private for testing @@ -74,6 +79,7 @@ public final class Pipeline { @Nullable Map metadata, CompoundProcessor compoundProcessor, LongSupplier relativeTimeProvider, + IngestPipelineFieldAccessPattern fieldAccessPattern, @Nullable Boolean deprecated ) { this.id = id; @@ -83,20 +89,50 @@ public final class Pipeline { this.version = version; this.metrics = new IngestPipelineMetric(); this.relativeTimeProvider = relativeTimeProvider; + this.fieldAccessPattern = fieldAccessPattern; this.deprecated = deprecated; } + /** + * @deprecated To be removed after Logstash has transitioned fully to the logstash-bridge library. Functionality will be relocated to + * there. Use {@link Pipeline#create(String, Map, Map, ScriptService, ProjectId, Predicate)} instead. + */ + @Deprecated public static Pipeline create( String id, Map config, Map processorFactories, ScriptService scriptService, ProjectId projectId + ) throws Exception { + return create(id, config, processorFactories, scriptService, projectId, IngestService::locallySupportedIngestFeature); + } + + public static Pipeline create( + String id, + Map config, + Map processorFactories, + ScriptService scriptService, + ProjectId projectId, + Predicate hasFeature ) throws Exception { String description = ConfigurationUtils.readOptionalStringProperty(null, null, config, DESCRIPTION_KEY); Integer version = ConfigurationUtils.readIntProperty(null, null, config, VERSION_KEY, null); Map metadata = ConfigurationUtils.readOptionalMap(null, null, config, META_KEY); Boolean deprecated = ConfigurationUtils.readOptionalBooleanProperty(null, null, config, DEPRECATED_KEY); + String fieldAccessPatternRaw = ConfigurationUtils.readOptionalStringProperty(null, null, config, FIELD_ACCESS_PATTERN); + if (fieldAccessPatternRaw != null && hasFeature.test(IngestService.FIELD_ACCESS_PATTERN) == false) { + throw new ElasticsearchParseException( + "pipeline [" + id + "] doesn't support one or more provided configuration parameters [field_access_pattern]" + ); + } else if (fieldAccessPatternRaw != null && IngestPipelineFieldAccessPattern.isValidAccessPattern(fieldAccessPatternRaw) == false) { + throw new ElasticsearchParseException( + "pipeline [" + id + "] doesn't support value of [" + fieldAccessPatternRaw + "] for parameter [field_access_pattern]" + ); + } + IngestPipelineFieldAccessPattern accessPattern = fieldAccessPatternRaw == null + ? IngestPipelineFieldAccessPattern.CLASSIC + : IngestPipelineFieldAccessPattern.getAccessPattern(fieldAccessPatternRaw); List> processorConfigs = ConfigurationUtils.readList(null, null, config, PROCESSORS_KEY); List processors = ConfigurationUtils.readProcessorConfigs( processorConfigs, @@ -123,7 +159,7 @@ public final class Pipeline { throw new ElasticsearchParseException("pipeline [" + id + "] cannot have an empty on_failure option defined"); } CompoundProcessor compoundProcessor = new CompoundProcessor(false, processors, onFailureProcessors); - return new Pipeline(id, description, version, metadata, compoundProcessor, deprecated); + return new Pipeline(id, description, version, metadata, compoundProcessor, accessPattern, deprecated); } /** @@ -215,6 +251,13 @@ public final class Pipeline { return metrics; } + /** + * The field access pattern that the pipeline will use to retrieve and set fields on documents. + */ + public IngestPipelineFieldAccessPattern getFieldAccessPattern() { + return fieldAccessPattern; + } + public Boolean getDeprecated() { return deprecated; } diff --git a/server/src/main/java/org/elasticsearch/ingest/SimulateIngestService.java b/server/src/main/java/org/elasticsearch/ingest/SimulateIngestService.java index c4fa1358d821..2f91e106248d 100644 --- a/server/src/main/java/org/elasticsearch/ingest/SimulateIngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/SimulateIngestService.java @@ -58,7 +58,9 @@ public class SimulateIngestService extends IngestService { entry.getValue(), ingestService.getProcessorFactories(), ingestService.getScriptService(), - ingestService.getProjectResolver().getProjectId() + ingestService.getProjectResolver().getProjectId(), + (nodeFeature) -> ingestService.getFeatureService() + .clusterHasFeature(ingestService.getClusterService().state(), nodeFeature) ); parsedPipelineSubstitutions.put(pipelineId, pipeline); } diff --git a/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java b/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java index 00304d4cb845..73dae3437acc 100644 --- a/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java +++ b/server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java @@ -216,6 +216,7 @@ public final class TrackingResultProcessor implements Processor { pipeline.getVersion(), pipeline.getMetadata(), verbosePipelineProcessor, + pipeline.getFieldAccessPattern(), pipeline.getDeprecated() ); ingestDocument.executePipeline(verbosePipeline, handler); diff --git a/server/src/main/resources/META-INF/services/org.elasticsearch.features.FeatureSpecification b/server/src/main/resources/META-INF/services/org.elasticsearch.features.FeatureSpecification index 14749330f09d..5a258a4cd774 100644 --- a/server/src/main/resources/META-INF/services/org.elasticsearch.features.FeatureSpecification +++ b/server/src/main/resources/META-INF/services/org.elasticsearch.features.FeatureSpecification @@ -18,3 +18,4 @@ org.elasticsearch.search.retriever.RetrieversFeatures org.elasticsearch.script.ScriptFeatures org.elasticsearch.cluster.routing.RoutingFeatures org.elasticsearch.action.admin.cluster.stats.ClusterStatsFeatures +org.elasticsearch.ingest.IngestFeatures diff --git a/server/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestParsingTests.java b/server/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestParsingTests.java index a71a2f1c6e26..b9a7607563bd 100644 --- a/server/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestParsingTests.java +++ b/server/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestParsingTests.java @@ -10,6 +10,7 @@ package org.elasticsearch.action.ingest; import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.core.RestApiVersion; import org.elasticsearch.index.VersionType; import org.elasticsearch.ingest.CompoundProcessor; @@ -195,7 +196,8 @@ public class SimulatePipelineRequestParsingTests extends ESTestCase { requestContent, false, ingestService, - RestApiVersion.current() + RestApiVersion.current(), + (nodeFeature) -> DataStream.LOGS_STREAM_FEATURE_FLAG ); assertThat(actualRequest.verbose(), equalTo(false)); assertThat(actualRequest.documents().size(), equalTo(numDocs)); @@ -268,7 +270,14 @@ public class SimulatePipelineRequestParsingTests extends ESTestCase { requestContent.put(Fields.PIPELINE, pipelineConfig); Exception e1 = expectThrows( IllegalArgumentException.class, - () -> SimulatePipelineRequest.parse(projectId, requestContent, false, ingestService, RestApiVersion.current()) + () -> SimulatePipelineRequest.parse( + projectId, + requestContent, + false, + ingestService, + RestApiVersion.current(), + (nodeFeature) -> DataStream.LOGS_STREAM_FEATURE_FLAG + ) ); assertThat(e1.getMessage(), equalTo("must specify at least one document in [docs]")); @@ -279,7 +288,14 @@ public class SimulatePipelineRequestParsingTests extends ESTestCase { requestContent.put(Fields.PIPELINE, pipelineConfig); Exception e2 = expectThrows( IllegalArgumentException.class, - () -> SimulatePipelineRequest.parse(projectId, requestContent, false, ingestService, RestApiVersion.current()) + () -> SimulatePipelineRequest.parse( + projectId, + requestContent, + false, + ingestService, + RestApiVersion.current(), + (nodeFeature) -> DataStream.LOGS_STREAM_FEATURE_FLAG + ) ); assertThat(e2.getMessage(), equalTo("malformed [docs] section, should include an inner object")); @@ -288,7 +304,14 @@ public class SimulatePipelineRequestParsingTests extends ESTestCase { requestContent.put(Fields.PIPELINE, pipelineConfig); Exception e3 = expectThrows( ElasticsearchParseException.class, - () -> SimulatePipelineRequest.parse(projectId, requestContent, false, ingestService, RestApiVersion.current()) + () -> SimulatePipelineRequest.parse( + projectId, + requestContent, + false, + ingestService, + RestApiVersion.current(), + (nodeFeature) -> DataStream.LOGS_STREAM_FEATURE_FLAG + ) ); assertThat(e3.getMessage(), containsString("required property is missing")); } @@ -367,7 +390,8 @@ public class SimulatePipelineRequestParsingTests extends ESTestCase { requestContent, false, ingestService, - RestApiVersion.V_8 + RestApiVersion.V_8, + (nodeFeature) -> DataStream.LOGS_STREAM_FEATURE_FLAG ); assertThat(actualRequest.verbose(), equalTo(false)); assertThat(actualRequest.documents().size(), equalTo(numDocs)); diff --git a/server/src/test/java/org/elasticsearch/ingest/PipelineFactoryTests.java b/server/src/test/java/org/elasticsearch/ingest/PipelineFactoryTests.java index 21bdbe2972a2..901285f0aa86 100644 --- a/server/src/test/java/org/elasticsearch/ingest/PipelineFactoryTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/PipelineFactoryTests.java @@ -10,6 +10,7 @@ package org.elasticsearch.ingest; import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.core.Tuple; import org.elasticsearch.script.ScriptService; @@ -46,8 +47,20 @@ public class PipelineFactoryTests extends ESTestCase { } pipelineConfig.put(Pipeline.DEPRECATED_KEY, deprecated); pipelineConfig.put(Pipeline.PROCESSORS_KEY, List.of(Map.of("test", processorConfig0), Map.of("test", processorConfig1))); + IngestPipelineFieldAccessPattern expectedAccessPattern = IngestPipelineFieldAccessPattern.CLASSIC; + if (DataStream.LOGS_STREAM_FEATURE_FLAG) { + expectedAccessPattern = randomFrom(IngestPipelineFieldAccessPattern.values()); + pipelineConfig.put(Pipeline.FIELD_ACCESS_PATTERN, expectedAccessPattern.getKey()); + } Map processorRegistry = Map.of("test", new TestProcessor.Factory()); - Pipeline pipeline = Pipeline.create("_id", pipelineConfig, processorRegistry, scriptService, null); + Pipeline pipeline = Pipeline.create( + "_id", + pipelineConfig, + processorRegistry, + scriptService, + null, + nodeFeature -> DataStream.LOGS_STREAM_FEATURE_FLAG + ); assertThat(pipeline.getId(), equalTo("_id")); assertThat(pipeline.getDescription(), equalTo("_description")); assertThat(pipeline.getVersion(), equalTo(version)); @@ -57,6 +70,7 @@ public class PipelineFactoryTests extends ESTestCase { assertThat(pipeline.getProcessors().get(0).getTag(), equalTo("first-processor")); assertThat(pipeline.getProcessors().get(1).getType(), equalTo("test-processor")); assertThat(pipeline.getProcessors().get(1).getTag(), nullValue()); + assertThat(pipeline.getFieldAccessPattern(), equalTo(expectedAccessPattern)); } public void testCreateWithNoProcessorsField() throws Exception { @@ -67,7 +81,7 @@ public class PipelineFactoryTests extends ESTestCase { pipelineConfig.put(Pipeline.META_KEY, metadata); } try { - Pipeline.create("_id", pipelineConfig, Map.of(), scriptService, null); + Pipeline.create("_id", pipelineConfig, Map.of(), scriptService, null, nodeFeature -> DataStream.LOGS_STREAM_FEATURE_FLAG); fail("should fail, missing required [processors] field"); } catch (ElasticsearchParseException e) { assertThat(e.getMessage(), equalTo("[processors] required property is missing")); @@ -82,7 +96,14 @@ public class PipelineFactoryTests extends ESTestCase { pipelineConfig.put(Pipeline.META_KEY, metadata); } pipelineConfig.put(Pipeline.PROCESSORS_KEY, List.of()); - Pipeline pipeline = Pipeline.create("_id", pipelineConfig, null, scriptService, null); + Pipeline pipeline = Pipeline.create( + "_id", + pipelineConfig, + null, + scriptService, + null, + nodeFeature -> DataStream.LOGS_STREAM_FEATURE_FLAG + ); assertThat(pipeline.getId(), equalTo("_id")); assertThat(pipeline.getDescription(), equalTo("_description")); assertThat(pipeline.getVersion(), equalTo(version)); @@ -100,7 +121,14 @@ public class PipelineFactoryTests extends ESTestCase { pipelineConfig.put(Pipeline.PROCESSORS_KEY, List.of(Map.of("test", processorConfig))); pipelineConfig.put(Pipeline.ON_FAILURE_KEY, List.of(Map.of("test", processorConfig))); Map processorRegistry = Map.of("test", new TestProcessor.Factory()); - Pipeline pipeline = Pipeline.create("_id", pipelineConfig, processorRegistry, scriptService, null); + Pipeline pipeline = Pipeline.create( + "_id", + pipelineConfig, + processorRegistry, + scriptService, + null, + nodeFeature -> DataStream.LOGS_STREAM_FEATURE_FLAG + ); assertThat(pipeline.getId(), equalTo("_id")); assertThat(pipeline.getDescription(), equalTo("_description")); assertThat(pipeline.getVersion(), equalTo(version)); @@ -123,7 +151,14 @@ public class PipelineFactoryTests extends ESTestCase { Map processorRegistry = Map.of("test", new TestProcessor.Factory()); Exception e = expectThrows( ElasticsearchParseException.class, - () -> Pipeline.create("_id", pipelineConfig, processorRegistry, scriptService, null) + () -> Pipeline.create( + "_id", + pipelineConfig, + processorRegistry, + scriptService, + null, + nodeFeature -> DataStream.LOGS_STREAM_FEATURE_FLAG + ) ); assertThat(e.getMessage(), equalTo("pipeline [_id] cannot have an empty on_failure option defined")); } @@ -141,7 +176,14 @@ public class PipelineFactoryTests extends ESTestCase { Map processorRegistry = Map.of("test", new TestProcessor.Factory()); Exception e = expectThrows( ElasticsearchParseException.class, - () -> Pipeline.create("_id", pipelineConfig, processorRegistry, scriptService, null) + () -> Pipeline.create( + "_id", + pipelineConfig, + processorRegistry, + scriptService, + null, + nodeFeature -> DataStream.LOGS_STREAM_FEATURE_FLAG + ) ); assertThat(e.getMessage(), equalTo("[on_failure] processors list cannot be empty")); } @@ -159,7 +201,14 @@ public class PipelineFactoryTests extends ESTestCase { } pipelineConfig.put(Pipeline.PROCESSORS_KEY, List.of(Map.of("test", processorConfig))); - Pipeline pipeline = Pipeline.create("_id", pipelineConfig, processorRegistry, scriptService, null); + Pipeline pipeline = Pipeline.create( + "_id", + pipelineConfig, + processorRegistry, + scriptService, + null, + nodeFeature -> DataStream.LOGS_STREAM_FEATURE_FLAG + ); assertThat(pipeline.getId(), equalTo("_id")); assertThat(pipeline.getDescription(), equalTo("_description")); assertThat(pipeline.getVersion(), equalTo(version)); @@ -171,6 +220,56 @@ public class PipelineFactoryTests extends ESTestCase { assertThat(processor.getProcessors().get(0).getType(), equalTo("test-processor")); } + public void testCreateUnsupportedFieldAccessPattern() throws Exception { + Map processorConfig = new HashMap<>(); + processorConfig.put(ConfigurationUtils.TAG_KEY, "test-processor"); + Map pipelineConfig = new HashMap<>(); + pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description"); + pipelineConfig.put(Pipeline.VERSION_KEY, versionString); + pipelineConfig.put(Pipeline.FIELD_ACCESS_PATTERN, "random"); + if (metadata != null) { + pipelineConfig.put(Pipeline.META_KEY, metadata); + } + pipelineConfig.put(Pipeline.PROCESSORS_KEY, List.of(Map.of("test", processorConfig))); + Map processorRegistry = Map.of("test", new TestProcessor.Factory()); + Exception e = expectThrows( + ElasticsearchParseException.class, + // All node features disabled + () -> Pipeline.create( + "_id", + pipelineConfig, + processorRegistry, + scriptService, + null, + nodeFeature -> DataStream.LOGS_STREAM_FEATURE_FLAG + ) + ); + assertThat(e.getMessage(), equalTo("pipeline [_id] doesn't support value of [random] for parameter [field_access_pattern]")); + } + + public void testCreateUnsupportedPipelineOptions() throws Exception { + Map processorConfig = new HashMap<>(); + processorConfig.put(ConfigurationUtils.TAG_KEY, "test-processor"); + Map pipelineConfig = new HashMap<>(); + pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description"); + pipelineConfig.put(Pipeline.VERSION_KEY, versionString); + pipelineConfig.put(Pipeline.FIELD_ACCESS_PATTERN, IngestPipelineFieldAccessPattern.FLEXIBLE.getKey()); + if (metadata != null) { + pipelineConfig.put(Pipeline.META_KEY, metadata); + } + pipelineConfig.put(Pipeline.PROCESSORS_KEY, List.of(Map.of("test", processorConfig))); + Map processorRegistry = Map.of("test", new TestProcessor.Factory()); + Exception e = expectThrows( + ElasticsearchParseException.class, + // All node features disabled + () -> Pipeline.create("_id", pipelineConfig, processorRegistry, scriptService, null, nodeFeature -> false) + ); + assertThat( + e.getMessage(), + equalTo("pipeline [_id] doesn't support one or more provided configuration parameters [field_access_pattern]") + ); + } + public void testCreateUnusedProcessorOptions() throws Exception { Map processorConfig = new HashMap<>(); processorConfig.put("unused", "value"); @@ -184,7 +283,14 @@ public class PipelineFactoryTests extends ESTestCase { Map processorRegistry = Map.of("test", new TestProcessor.Factory()); Exception e = expectThrows( ElasticsearchParseException.class, - () -> Pipeline.create("_id", pipelineConfig, processorRegistry, scriptService, null) + () -> Pipeline.create( + "_id", + pipelineConfig, + processorRegistry, + scriptService, + null, + nodeFeature -> DataStream.LOGS_STREAM_FEATURE_FLAG + ) ); assertThat(e.getMessage(), equalTo("processor [test] doesn't support one or more provided configuration parameters [unused]")); } @@ -201,7 +307,14 @@ public class PipelineFactoryTests extends ESTestCase { } pipelineConfig.put(Pipeline.PROCESSORS_KEY, List.of(Map.of("test", processorConfig))); Map processorRegistry = Map.of("test", new TestProcessor.Factory()); - Pipeline pipeline = Pipeline.create("_id", pipelineConfig, processorRegistry, scriptService, null); + Pipeline pipeline = Pipeline.create( + "_id", + pipelineConfig, + processorRegistry, + scriptService, + null, + nodeFeature -> DataStream.LOGS_STREAM_FEATURE_FLAG + ); assertThat(pipeline.getId(), equalTo("_id")); assertThat(pipeline.getDescription(), equalTo("_description")); assertThat(pipeline.getVersion(), equalTo(version)); diff --git a/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java b/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java index cf11c29978c7..c3eb779e9c19 100644 --- a/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/PipelineProcessorTests.java @@ -166,6 +166,7 @@ public class PipelineProcessorTests extends ESTestCase { null, new CompoundProcessor(pipeline1Processor), relativeTimeProvider, + IngestPipelineFieldAccessPattern.CLASSIC, null ); @@ -181,13 +182,14 @@ public class PipelineProcessorTests extends ESTestCase { ingestDocument.setFieldValue(key1, randomInt()); }), pipeline2Processor), List.of()), relativeTimeProvider, + IngestPipelineFieldAccessPattern.CLASSIC, null ); relativeTimeProvider = mock(LongSupplier.class); when(relativeTimeProvider.getAsLong()).thenReturn(0L, TimeUnit.MILLISECONDS.toNanos(2)); Pipeline pipeline3 = new Pipeline(pipeline3Id, null, null, null, new CompoundProcessor(new TestProcessor(ingestDocument -> { throw new RuntimeException("error"); - })), relativeTimeProvider, null); + })), relativeTimeProvider, IngestPipelineFieldAccessPattern.CLASSIC, null); when(ingestService.getPipeline(pipeline1Id)).thenReturn(pipeline1); when(ingestService.getPipeline(pipeline2Id)).thenReturn(pipeline2); when(ingestService.getPipeline(pipeline3Id)).thenReturn(pipeline3); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/validation/SourceDestValidatorTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/validation/SourceDestValidatorTests.java index 7a318af6d6e7..960e3e0a6450 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/validation/SourceDestValidatorTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/common/validation/SourceDestValidatorTests.java @@ -19,6 +19,7 @@ import org.elasticsearch.client.internal.RemoteClusterClient; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.AliasMetadata; +import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.Metadata; @@ -600,7 +601,14 @@ public class SourceDestValidatorTests extends ESTestCase { ); Map processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory()); var projectId = randomProjectIdOrDefault(); - Pipeline pipeline = Pipeline.create("missing-pipeline", pipelineConfig, processorRegistry, null, projectId); + Pipeline pipeline = Pipeline.create( + "missing-pipeline", + pipelineConfig, + processorRegistry, + null, + projectId, + nodeFeature -> DataStream.LOGS_STREAM_FEATURE_FLAG + ); when(ingestService.getPipeline("missing-pipeline")).thenReturn(pipeline); assertValidation(