From 19708c1e9cc5429f5aa367b2bc5427e52c0ecc21 Mon Sep 17 00:00:00 2001 From: Niels Bauman <33722607+nielsbauman@users.noreply.github.com> Date: Tue, 1 Jul 2025 09:36:02 -0300 Subject: [PATCH] Make rollup APIs project-aware (#130365) Updates the rollup APIs to work in a multi-project context (even though that will never actually happen, as rollup is deprecated and the plugin is excluded in serverless). --- .../TransportDeleteRollupJobAction.java | 13 +++- .../action/TransportGetRollupCapsAction.java | 13 +++- .../TransportGetRollupIndexCapsAction.java | 14 ++-- .../action/TransportPutRollupJobAction.java | 41 ++++++++---- .../action/TransportRollupSearchAction.java | 9 ++- .../action/PutJobStateMachineTests.java | 66 +++++++++++++++---- .../TransportPutRollupJobActionTests.java | 35 +++++----- .../build.gradle | 1 + 8 files changed, 137 insertions(+), 55 deletions(-) diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportDeleteRollupJobAction.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportDeleteRollupJobAction.java index de737cafcb27..304223d1b605 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportDeleteRollupJobAction.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportDeleteRollupJobAction.java @@ -15,6 +15,7 @@ import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.tasks.TransportTasksAction; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.logging.DeprecationCategory; import org.elasticsearch.common.logging.DeprecationLogger; @@ -44,8 +45,15 @@ public class TransportDeleteRollupJobAction extends TransportTasksAction< private static final DeprecationLogger DEPRECATION_LOGGER = DeprecationLogger.getLogger(TransportDeleteRollupJobAction.class); + private final ProjectResolver projectResolver; + @Inject - public TransportDeleteRollupJobAction(TransportService transportService, ActionFilters actionFilters, ClusterService clusterService) { + public TransportDeleteRollupJobAction( + TransportService transportService, + ActionFilters actionFilters, + ClusterService clusterService, + ProjectResolver projectResolver + ) { super( DeleteRollupJobAction.NAME, clusterService, @@ -55,6 +63,7 @@ public class TransportDeleteRollupJobAction extends TransportTasksAction< DeleteRollupJobAction.Response::new, EsExecutors.DIRECT_EXECUTOR_SERVICE ); + this.projectResolver = projectResolver; } @Override @@ -64,7 +73,7 @@ public class TransportDeleteRollupJobAction extends TransportTasksAction< final DiscoveryNodes nodes = state.nodes(); if (nodes.isLocalNodeElectedMaster()) { - PersistentTasksCustomMetadata pTasksMeta = state.getMetadata().getProject().custom(PersistentTasksCustomMetadata.TYPE); + PersistentTasksCustomMetadata pTasksMeta = projectResolver.getProjectMetadata(state).custom(PersistentTasksCustomMetadata.TYPE); if (pTasksMeta != null && pTasksMeta.getTask(request.getId()) != null) { super.doExecute(task, request, listener); } else { diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportGetRollupCapsAction.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportGetRollupCapsAction.java index 79d5011459fb..13e2bd2da2f4 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportGetRollupCapsAction.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportGetRollupCapsAction.java @@ -13,6 +13,7 @@ import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.MappingMetadata; import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.logging.DeprecationCategory; import org.elasticsearch.common.logging.DeprecationLogger; @@ -43,9 +44,15 @@ public class TransportGetRollupCapsAction extends HandledTransportAction listener) { Transports.assertNotTransportThread("retrieving rollup job caps may be expensive"); - Map allCaps = getCaps(indexPattern, clusterService.state().getMetadata().getProject().indices()); + final var project = projectResolver.getProjectMetadata(clusterService.state()); + Map allCaps = getCaps(indexPattern, project.indices()); listener.onResponse(new GetRollupCapsAction.Response(allCaps)); } diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportGetRollupIndexCapsAction.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportGetRollupIndexCapsAction.java index dd87a850378e..d61a91478e7e 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportGetRollupIndexCapsAction.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportGetRollupIndexCapsAction.java @@ -13,6 +13,7 @@ import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.logging.DeprecationCategory; import org.elasticsearch.common.logging.DeprecationLogger; @@ -46,13 +47,15 @@ public class TransportGetRollupIndexCapsAction extends HandledTransportAction< private final ClusterService clusterService; private final IndexNameExpressionResolver resolver; private final Executor managementExecutor; + private final ProjectResolver projectResolver; @Inject public TransportGetRollupIndexCapsAction( TransportService transportService, ClusterService clusterService, ActionFilters actionFilters, - IndexNameExpressionResolver indexNameExpressionResolver + IndexNameExpressionResolver indexNameExpressionResolver, + ProjectResolver projectResolver ) { // TODO replace SAME when removing workaround for https://github.com/elastic/elasticsearch/issues/97916 super( @@ -65,6 +68,7 @@ public class TransportGetRollupIndexCapsAction extends HandledTransportAction< this.clusterService = clusterService; this.managementExecutor = transportService.getThreadPool().executor(ThreadPool.Names.MANAGEMENT); this.resolver = indexNameExpressionResolver; + this.projectResolver = projectResolver; } @Override @@ -80,11 +84,9 @@ public class TransportGetRollupIndexCapsAction extends HandledTransportAction< private void doExecuteForked(IndicesRequest request, ActionListener listener) { Transports.assertNotTransportThread("retrieving rollup job index caps may be expensive"); - String[] indices = resolver.concreteIndexNames(clusterService.state(), request.indicesOptions(), request); - Map allCaps = getCapsByRollupIndex( - Arrays.asList(indices), - clusterService.state().getMetadata().getProject().indices() - ); + final var project = projectResolver.getProjectMetadata(clusterService.state()); + String[] indices = resolver.concreteIndexNames(project, request.indicesOptions(), request); + Map allCaps = getCapsByRollupIndex(Arrays.asList(indices), project.indices()); listener.onResponse(new GetRollupIndexCapsAction.Response(allCaps)); } diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportPutRollupJobAction.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportPutRollupJobAction.java index 4880454bf81c..45b8260691ee 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportPutRollupJobAction.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportPutRollupJobAction.java @@ -29,7 +29,9 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.MappingMetadata; -import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.metadata.ProjectMetadata; +import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.logging.DeprecationCategory; import org.elasticsearch.common.logging.DeprecationLogger; @@ -78,6 +80,7 @@ public class TransportPutRollupJobAction extends AcknowledgedTransportMasterNode private final PersistentTasksService persistentTasksService; private final Client client; + private final ProjectResolver projectResolver; @Inject public TransportPutRollupJobAction( @@ -86,7 +89,8 @@ public class TransportPutRollupJobAction extends AcknowledgedTransportMasterNode ActionFilters actionFilters, ClusterService clusterService, PersistentTasksService persistentTasksService, - Client client + Client client, + ProjectResolver projectResolver ) { super( PutRollupJobAction.NAME, @@ -99,7 +103,7 @@ public class TransportPutRollupJobAction extends AcknowledgedTransportMasterNode ); this.persistentTasksService = persistentTasksService; this.client = client; - + this.projectResolver = projectResolver; } @Override @@ -113,10 +117,11 @@ public class TransportPutRollupJobAction extends AcknowledgedTransportMasterNode XPackPlugin.checkReadyForXPackCustomMetadata(clusterState); checkForDeprecatedTZ(request); - int numberOfCurrentRollupJobs = RollupUsageTransportAction.findNumberOfRollupJobs(clusterState.metadata().getProject()); + final var project = projectResolver.getProjectMetadata(clusterState); + int numberOfCurrentRollupJobs = RollupUsageTransportAction.findNumberOfRollupJobs(project); if (numberOfCurrentRollupJobs == 0) { try { - boolean hasRollupIndices = hasRollupIndices(clusterState.getMetadata()); + boolean hasRollupIndices = hasRollupIndices(project); if (hasRollupIndices == false) { listener.onFailure( new IllegalArgumentException( @@ -135,6 +140,7 @@ public class TransportPutRollupJobAction extends AcknowledgedTransportMasterNode .fields(request.getConfig().getAllFields().toArray(new String[0])); fieldCapsRequest.setParentTask(clusterService.localNode().getId(), task.getId()); + final var projectId = project.id(); client.fieldCaps(fieldCapsRequest, listener.delegateFailure((l, fieldCapabilitiesResponse) -> { ActionRequestValidationException validationException = request.validateMappings(fieldCapabilitiesResponse.get()); if (validationException != null) { @@ -143,7 +149,7 @@ public class TransportPutRollupJobAction extends AcknowledgedTransportMasterNode } RollupJob job = createRollupJob(request.getConfig(), threadPool); - createIndex(job, l, persistentTasksService, client, LOGGER); + createIndex(projectId, job, l, persistentTasksService, client, LOGGER); })); } @@ -177,6 +183,7 @@ public class TransportPutRollupJobAction extends AcknowledgedTransportMasterNode } static void createIndex( + ProjectId projectId, RollupJob job, ActionListener listener, PersistentTasksService persistentTasksService, @@ -196,10 +203,10 @@ public class TransportPutRollupJobAction extends AcknowledgedTransportMasterNode client.execute( TransportCreateIndexAction.TYPE, request, - ActionListener.wrap(createIndexResponse -> startPersistentTask(job, listener, persistentTasksService), e -> { + ActionListener.wrap(createIndexResponse -> startPersistentTask(projectId, job, listener, persistentTasksService), e -> { if (e instanceof ResourceAlreadyExistsException) { logger.debug("Rolled index already exists for rollup job [" + job.getConfig().getId() + "], updating metadata."); - updateMapping(job, listener, persistentTasksService, client, logger, request.masterNodeTimeout()); + updateMapping(projectId, job, listener, persistentTasksService, client, logger, request.masterNodeTimeout()); } else { String msg = "Could not create index for rollup job [" + job.getConfig().getId() + "]"; logger.error(msg); @@ -245,6 +252,7 @@ public class TransportPutRollupJobAction extends AcknowledgedTransportMasterNode @SuppressWarnings("unchecked") static void updateMapping( + ProjectId projectId, RollupJob job, ActionListener listener, PersistentTasksService persistentTasksService, @@ -301,7 +309,10 @@ public class TransportPutRollupJobAction extends AcknowledgedTransportMasterNode client.execute( TransportPutMappingAction.TYPE, request, - ActionListener.wrap(putMappingResponse -> startPersistentTask(job, listener, persistentTasksService), listener::onFailure) + ActionListener.wrap( + putMappingResponse -> startPersistentTask(projectId, job, listener, persistentTasksService), + listener::onFailure + ) ); }; @@ -314,17 +325,19 @@ public class TransportPutRollupJobAction extends AcknowledgedTransportMasterNode } static void startPersistentTask( + ProjectId projectId, RollupJob job, ActionListener listener, PersistentTasksService persistentTasksService ) { assertNoAuthorizationHeader(job.getHeaders()); - persistentTasksService.sendStartRequest( + persistentTasksService.sendProjectStartRequest( + projectId, job.getConfig().getId(), RollupField.TASK_NAME, job, TimeValue.THIRTY_SECONDS /* TODO should this be configurable? longer by default? infinite? */, - ActionListener.wrap(rollupConfigPersistentTask -> waitForRollupStarted(job, listener, persistentTasksService), e -> { + ActionListener.wrap(rollupConfigPersistentTask -> waitForRollupStarted(projectId, job, listener, persistentTasksService), e -> { if (e instanceof ResourceAlreadyExistsException) { e = new ElasticsearchStatusException( "Cannot create job [" + job.getConfig().getId() + "] because it has already been created (task exists)", @@ -338,11 +351,13 @@ public class TransportPutRollupJobAction extends AcknowledgedTransportMasterNode } private static void waitForRollupStarted( + ProjectId projectId, RollupJob job, ActionListener listener, PersistentTasksService persistentTasksService ) { persistentTasksService.waitForPersistentTaskCondition( + projectId, job.getConfig().getId(), Objects::nonNull, job.getConfig().getTimeout(), @@ -369,9 +384,9 @@ public class TransportPutRollupJobAction extends AcknowledgedTransportMasterNode ); } - static boolean hasRollupIndices(Metadata metadata) throws IOException { + static boolean hasRollupIndices(ProjectMetadata project) throws IOException { // Sniffing logic instead of invoking sourceAsMap(), which would materialize the entire mapping as map of maps. - for (var imd : metadata.getProject()) { + for (var imd : project) { if (imd.mapping() == null) { continue; } diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportRollupSearchAction.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportRollupSearchAction.java index 38e36e8225dc..743a8678bab1 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportRollupSearchAction.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportRollupSearchAction.java @@ -20,6 +20,7 @@ import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.BytesStreamOutput; @@ -88,6 +89,7 @@ public class TransportRollupSearchAction extends TransportAction listener) { DEPRECATION_LOGGER.warn(DeprecationCategory.API, DEPRECATION_KEY, DEPRECATION_MESSAGE); String[] indices = resolver.concreteIndexNames(clusterService.state(), request); - RollupSearchContext rollupSearchContext = separateIndices(indices, clusterService.state().getMetadata().getProject().indices()); + final var project = projectResolver.getProjectMetadata(clusterService.state()); + RollupSearchContext rollupSearchContext = separateIndices(indices, project.indices()); MultiSearchRequest msearch = createMSearchRequest(request, registry, rollupSearchContext); diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/action/PutJobStateMachineTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/action/PutJobStateMachineTests.java index 5868a762ed51..55288f16b97f 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/action/PutJobStateMachineTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/action/PutJobStateMachineTests.java @@ -52,6 +52,7 @@ public class PutJobStateMachineTests extends ESTestCase { @SuppressWarnings({ "unchecked", "rawtypes" }) public void testCreateIndexException() { + final var projectId = randomProjectIdOrDefault(); RollupJob job = new RollupJob(ConfigTestHelpers.randomRollupJobConfig(random(), "foo"), Collections.emptyMap()); ActionListener testListener = ActionListener.wrap(response -> { @@ -70,7 +71,7 @@ public class PutJobStateMachineTests extends ESTestCase { return null; }).when(client).execute(eq(TransportCreateIndexAction.TYPE), any(CreateIndexRequest.class), requestCaptor.capture()); - TransportPutRollupJobAction.createIndex(job, testListener, mock(PersistentTasksService.class), client, logger); + TransportPutRollupJobAction.createIndex(projectId, job, testListener, mock(PersistentTasksService.class), client, logger); // ResourceAlreadyExists should trigger a GetMapping next verify(client).execute(eq(TransportCreateIndexAction.TYPE), any(CreateIndexRequest.class), any()); @@ -78,6 +79,7 @@ public class PutJobStateMachineTests extends ESTestCase { @SuppressWarnings({ "unchecked", "rawtypes" }) public void testIndexAlreadyExists() { + final var projectId = randomProjectIdOrDefault(); RollupJob job = new RollupJob(ConfigTestHelpers.randomRollupJobConfig(random()), Collections.emptyMap()); ActionListener testListener = ActionListener.wrap(response -> { @@ -100,7 +102,7 @@ public class PutJobStateMachineTests extends ESTestCase { return null; }).when(client).execute(eq(GetMappingsAction.INSTANCE), any(GetMappingsRequest.class), requestCaptor2.capture()); - TransportPutRollupJobAction.createIndex(job, testListener, mock(PersistentTasksService.class), client, logger); + TransportPutRollupJobAction.createIndex(projectId, job, testListener, mock(PersistentTasksService.class), client, logger); // ResourceAlreadyExists should trigger a GetMapping next verify(client).execute(eq(GetMappingsAction.INSTANCE), any(GetMappingsRequest.class), any()); @@ -108,6 +110,7 @@ public class PutJobStateMachineTests extends ESTestCase { @SuppressWarnings({ "unchecked", "rawtypes" }) public void testIndexMetadata() throws InterruptedException { + final var projectId = randomProjectIdOrDefault(); RollupJob job = new RollupJob(ConfigTestHelpers.randomRollupJobConfig(random()), Collections.emptyMap()); ActionListener testListener = ActionListener.wrap(response -> { @@ -140,7 +143,7 @@ public class PutJobStateMachineTests extends ESTestCase { return null; }).when(client).execute(eq(GetMappingsAction.INSTANCE), any(GetMappingsRequest.class), requestCaptor2.capture()); - TransportPutRollupJobAction.createIndex(job, testListener, mock(PersistentTasksService.class), client, logger); + TransportPutRollupJobAction.createIndex(projectId, job, testListener, mock(PersistentTasksService.class), client, logger); // ResourceAlreadyExists should trigger a GetMapping next verify(client).execute(eq(GetMappingsAction.INSTANCE), any(GetMappingsRequest.class), any()); @@ -149,6 +152,7 @@ public class PutJobStateMachineTests extends ESTestCase { @SuppressWarnings({ "unchecked", "rawtypes" }) public void testGetMappingFails() { + final var projectId = randomProjectIdOrDefault(); RollupJob job = new RollupJob(ConfigTestHelpers.randomRollupJobConfig(random(), "foo"), Collections.emptyMap()); ActionListener testListener = ActionListener.wrap(response -> { @@ -168,6 +172,7 @@ public class PutJobStateMachineTests extends ESTestCase { }).when(client).execute(eq(GetMappingsAction.INSTANCE), any(GetMappingsRequest.class), requestCaptor.capture()); TransportPutRollupJobAction.updateMapping( + projectId, job, testListener, mock(PersistentTasksService.class), @@ -180,6 +185,7 @@ public class PutJobStateMachineTests extends ESTestCase { @SuppressWarnings({ "unchecked", "rawtypes" }) public void testNoMetadataInMapping() { + final var projectId = randomProjectIdOrDefault(); RollupJob job = new RollupJob(ConfigTestHelpers.randomRollupJobConfig(random()), Collections.emptyMap()); ActionListener testListener = ActionListener.wrap(response -> { @@ -210,6 +216,7 @@ public class PutJobStateMachineTests extends ESTestCase { }).when(client).execute(eq(GetMappingsAction.INSTANCE), any(GetMappingsRequest.class), requestCaptor.capture()); TransportPutRollupJobAction.updateMapping( + projectId, job, testListener, mock(PersistentTasksService.class), @@ -222,6 +229,7 @@ public class PutJobStateMachineTests extends ESTestCase { @SuppressWarnings({ "unchecked", "rawtypes" }) public void testMetadataButNotRollup() { + final var projectId = randomProjectIdOrDefault(); RollupJob job = new RollupJob(ConfigTestHelpers.randomRollupJobConfig(random()), Collections.emptyMap()); ActionListener testListener = ActionListener.wrap(response -> { @@ -254,6 +262,7 @@ public class PutJobStateMachineTests extends ESTestCase { }).when(client).execute(eq(GetMappingsAction.INSTANCE), any(GetMappingsRequest.class), requestCaptor.capture()); TransportPutRollupJobAction.updateMapping( + projectId, job, testListener, mock(PersistentTasksService.class), @@ -266,6 +275,7 @@ public class PutJobStateMachineTests extends ESTestCase { @SuppressWarnings({ "unchecked", "rawtypes" }) public void testJobAlreadyInMapping() { + final var projectId = randomProjectIdOrDefault(); RollupJob job = new RollupJob(ConfigTestHelpers.randomRollupJobConfig(random(), "foo"), Collections.emptyMap()); ActionListener testListener = ActionListener.wrap(response -> { @@ -295,6 +305,7 @@ public class PutJobStateMachineTests extends ESTestCase { }).when(client).execute(eq(GetMappingsAction.INSTANCE), any(GetMappingsRequest.class), requestCaptor.capture()); TransportPutRollupJobAction.updateMapping( + projectId, job, testListener, mock(PersistentTasksService.class), @@ -307,6 +318,7 @@ public class PutJobStateMachineTests extends ESTestCase { @SuppressWarnings({ "unchecked", "rawtypes" }) public void testAddJobToMapping() { + final var projectId = randomProjectIdOrDefault(); final RollupJobConfig unrelatedJob = ConfigTestHelpers.randomRollupJobConfig( random(), ESTestCase.randomAlphaOfLength(10), @@ -348,6 +360,7 @@ public class PutJobStateMachineTests extends ESTestCase { }).when(client).execute(eq(TransportPutMappingAction.TYPE), any(PutMappingRequest.class), requestCaptor2.capture()); TransportPutRollupJobAction.updateMapping( + projectId, job, testListener, mock(PersistentTasksService.class), @@ -361,6 +374,7 @@ public class PutJobStateMachineTests extends ESTestCase { @SuppressWarnings({ "unchecked", "rawtypes" }) public void testTaskAlreadyExists() { + final var projectId = randomProjectIdOrDefault(); RollupJob job = new RollupJob(ConfigTestHelpers.randomRollupJobConfig(random(), "foo"), Collections.emptyMap()); ActionListener testListener = ActionListener.wrap(response -> { @@ -374,14 +388,29 @@ public class PutJobStateMachineTests extends ESTestCase { requestCaptor.getValue().onFailure(new ResourceAlreadyExistsException(job.getConfig().getRollupIndex())); return null; }).when(tasksService) - .sendStartRequest(eq(job.getConfig().getId()), eq(RollupField.TASK_NAME), eq(job), isNotNull(), requestCaptor.capture()); + .sendProjectStartRequest( + eq(projectId), + eq(job.getConfig().getId()), + eq(RollupField.TASK_NAME), + eq(job), + isNotNull(), + requestCaptor.capture() + ); - TransportPutRollupJobAction.startPersistentTask(job, testListener, tasksService); - verify(tasksService).sendStartRequest(eq(job.getConfig().getId()), eq(RollupField.TASK_NAME), eq(job), isNotNull(), any()); + TransportPutRollupJobAction.startPersistentTask(projectId, job, testListener, tasksService); + verify(tasksService).sendProjectStartRequest( + eq(projectId), + eq(job.getConfig().getId()), + eq(RollupField.TASK_NAME), + eq(job), + isNotNull(), + any() + ); } @SuppressWarnings({ "unchecked", "rawtypes" }) public void testStartTask() { + final var projectId = randomProjectIdOrDefault(); RollupJob job = new RollupJob(ConfigTestHelpers.randomRollupJobConfig(random()), Collections.emptyMap()); ActionListener testListener = ActionListener.wrap(response -> { @@ -402,7 +431,14 @@ public class PutJobStateMachineTests extends ESTestCase { requestCaptor.getValue().onResponse(response); return null; }).when(tasksService) - .sendStartRequest(eq(job.getConfig().getId()), eq(RollupField.TASK_NAME), eq(job), isNotNull(), requestCaptor.capture()); + .sendProjectStartRequest( + eq(projectId), + eq(job.getConfig().getId()), + eq(RollupField.TASK_NAME), + eq(job), + isNotNull(), + requestCaptor.capture() + ); ArgumentCaptor requestCaptor2 = ArgumentCaptor.forClass( PersistentTasksService.WaitForPersistentTaskListener.class @@ -411,11 +447,19 @@ public class PutJobStateMachineTests extends ESTestCase { // Bail here with an error, further testing will happen through tests of #startPersistentTask requestCaptor2.getValue().onFailure(new RuntimeException("Ending")); return null; - }).when(tasksService).waitForPersistentTaskCondition(eq(job.getConfig().getId()), any(), any(), requestCaptor2.capture()); + }).when(tasksService) + .waitForPersistentTaskCondition(eq(projectId), eq(job.getConfig().getId()), any(), any(), requestCaptor2.capture()); - TransportPutRollupJobAction.startPersistentTask(job, testListener, tasksService); - verify(tasksService).sendStartRequest(eq(job.getConfig().getId()), eq(RollupField.TASK_NAME), eq(job), isNotNull(), any()); - verify(tasksService).waitForPersistentTaskCondition(eq(job.getConfig().getId()), any(), any(), any()); + TransportPutRollupJobAction.startPersistentTask(projectId, job, testListener, tasksService); + verify(tasksService).sendProjectStartRequest( + eq(projectId), + eq(job.getConfig().getId()), + eq(RollupField.TASK_NAME), + eq(job), + isNotNull(), + any() + ); + verify(tasksService).waitForPersistentTaskCondition(eq(projectId), eq(job.getConfig().getId()), any(), any(), any()); } public void testDeprecatedTimeZone() { diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/action/TransportPutRollupJobActionTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/action/TransportPutRollupJobActionTests.java index 017924e461e5..2ae74498e8c1 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/action/TransportPutRollupJobActionTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/action/TransportPutRollupJobActionTests.java @@ -8,8 +8,7 @@ package org.elasticsearch.xpack.rollup.action; import org.elasticsearch.cluster.metadata.IndexMetadata; -import org.elasticsearch.cluster.metadata.Metadata; -import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.index.IndexVersion; import org.elasticsearch.test.ESTestCase; @@ -35,8 +34,8 @@ public class TransportPutRollupJobActionTests extends ESTestCase { } } """; - var metadata = createMetadata(mappings); - assertTrue(TransportPutRollupJobAction.hasRollupIndices(metadata)); + var project = createProject(mappings); + assertTrue(TransportPutRollupJobAction.hasRollupIndices(project)); } { String mappings = """ @@ -55,8 +54,8 @@ public class TransportPutRollupJobActionTests extends ESTestCase { } } """; - var metadata = createMetadata(mappings); - assertTrue(TransportPutRollupJobAction.hasRollupIndices(metadata)); + var project = createProject(mappings); + assertTrue(TransportPutRollupJobAction.hasRollupIndices(project)); } { String mappings = """ @@ -76,8 +75,8 @@ public class TransportPutRollupJobActionTests extends ESTestCase { } } """; - var metadata = createMetadata(mappings); - assertTrue(TransportPutRollupJobAction.hasRollupIndices(metadata)); + var project = createProject(mappings); + assertTrue(TransportPutRollupJobAction.hasRollupIndices(project)); } { String mappings = """ @@ -91,8 +90,8 @@ public class TransportPutRollupJobActionTests extends ESTestCase { } } """; - var metadata = createMetadata(mappings); - assertFalse(TransportPutRollupJobAction.hasRollupIndices(metadata)); + var project = createProject(mappings); + assertFalse(TransportPutRollupJobAction.hasRollupIndices(project)); } { String mappings = """ @@ -101,8 +100,8 @@ public class TransportPutRollupJobActionTests extends ESTestCase { } } """; - var metadata = createMetadata(mappings); - assertFalse(TransportPutRollupJobAction.hasRollupIndices(metadata)); + var project = createProject(mappings); + assertFalse(TransportPutRollupJobAction.hasRollupIndices(project)); } { String mappings = """ @@ -119,17 +118,15 @@ public class TransportPutRollupJobActionTests extends ESTestCase { } } """; - var metadata = createMetadata(mappings); - assertFalse(TransportPutRollupJobAction.hasRollupIndices(metadata)); + var project = createProject(mappings); + assertFalse(TransportPutRollupJobAction.hasRollupIndices(project)); } } - private static Metadata createMetadata(String mappings) { - Settings.Builder b = Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, IndexVersion.current()); - var metadata = Metadata.builder() - .put(IndexMetadata.builder("my-rollup-index").settings(b).numberOfShards(1).numberOfReplicas(0).putMapping(mappings)) + private static ProjectMetadata createProject(String mappings) { + return ProjectMetadata.builder(randomProjectIdOrDefault()) + .put(IndexMetadata.builder("my-rollup-index").settings(indexSettings(IndexVersion.current(), 1, 0)).putMapping(mappings)) .build(); - return metadata; } } diff --git a/x-pack/qa/multi-project/xpack-rest-tests-with-multiple-projects/build.gradle b/x-pack/qa/multi-project/xpack-rest-tests-with-multiple-projects/build.gradle index c6e9482bc818..eca07a541e40 100644 --- a/x-pack/qa/multi-project/xpack-rest-tests-with-multiple-projects/build.gradle +++ b/x-pack/qa/multi-project/xpack-rest-tests-with-multiple-projects/build.gradle @@ -97,6 +97,7 @@ tasks.named("yamlRestTest").configure { '^monitoring/bulk/10_basic/*', '^monitoring/bulk/20_privileges/*', '^profiling/10_basic/*', + // These rollup tests work in MP mode, they just don't work with security enabled. '^rollup/delete_job/*', '^rollup/get_jobs/*', '^rollup/get_rollup_caps/*',