Make rollup APIs project-aware (#130365)

Updates the rollup APIs to work in a multi-project context (even though
that will never actually happen, as rollup is deprecated and the plugin
is excluded in serverless).
This commit is contained in:
Niels Bauman 2025-07-01 09:36:02 -03:00 committed by GitHub
parent c0744a1808
commit 19708c1e9c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 137 additions and 55 deletions

View File

@ -15,6 +15,7 @@ import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.tasks.TransportTasksAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.logging.DeprecationCategory;
import org.elasticsearch.common.logging.DeprecationLogger;
@ -44,8 +45,15 @@ public class TransportDeleteRollupJobAction extends TransportTasksAction<
private static final DeprecationLogger DEPRECATION_LOGGER = DeprecationLogger.getLogger(TransportDeleteRollupJobAction.class);
private final ProjectResolver projectResolver;
@Inject
public TransportDeleteRollupJobAction(TransportService transportService, ActionFilters actionFilters, ClusterService clusterService) {
public TransportDeleteRollupJobAction(
TransportService transportService,
ActionFilters actionFilters,
ClusterService clusterService,
ProjectResolver projectResolver
) {
super(
DeleteRollupJobAction.NAME,
clusterService,
@ -55,6 +63,7 @@ public class TransportDeleteRollupJobAction extends TransportTasksAction<
DeleteRollupJobAction.Response::new,
EsExecutors.DIRECT_EXECUTOR_SERVICE
);
this.projectResolver = projectResolver;
}
@Override
@ -64,7 +73,7 @@ public class TransportDeleteRollupJobAction extends TransportTasksAction<
final DiscoveryNodes nodes = state.nodes();
if (nodes.isLocalNodeElectedMaster()) {
PersistentTasksCustomMetadata pTasksMeta = state.getMetadata().getProject().custom(PersistentTasksCustomMetadata.TYPE);
PersistentTasksCustomMetadata pTasksMeta = projectResolver.getProjectMetadata(state).custom(PersistentTasksCustomMetadata.TYPE);
if (pTasksMeta != null && pTasksMeta.getTask(request.getId()) != null) {
super.doExecute(task, request, listener);
} else {

View File

@ -13,6 +13,7 @@ import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.MappingMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.logging.DeprecationCategory;
import org.elasticsearch.common.logging.DeprecationLogger;
@ -43,9 +44,15 @@ public class TransportGetRollupCapsAction extends HandledTransportAction<GetRoll
private final ClusterService clusterService;
private final Executor managementExecutor;
private final ProjectResolver projectResolver;
@Inject
public TransportGetRollupCapsAction(TransportService transportService, ClusterService clusterService, ActionFilters actionFilters) {
public TransportGetRollupCapsAction(
TransportService transportService,
ClusterService clusterService,
ActionFilters actionFilters,
ProjectResolver projectResolver
) {
// TODO replace SAME when removing workaround for https://github.com/elastic/elasticsearch/issues/97916
super(
GetRollupCapsAction.NAME,
@ -56,6 +63,7 @@ public class TransportGetRollupCapsAction extends HandledTransportAction<GetRoll
);
this.clusterService = clusterService;
this.managementExecutor = transportService.getThreadPool().executor(ThreadPool.Names.MANAGEMENT);
this.projectResolver = projectResolver;
}
@Override
@ -67,7 +75,8 @@ public class TransportGetRollupCapsAction extends HandledTransportAction<GetRoll
private void doExecuteForked(String indexPattern, ActionListener<GetRollupCapsAction.Response> listener) {
Transports.assertNotTransportThread("retrieving rollup job caps may be expensive");
Map<String, RollableIndexCaps> allCaps = getCaps(indexPattern, clusterService.state().getMetadata().getProject().indices());
final var project = projectResolver.getProjectMetadata(clusterService.state());
Map<String, RollableIndexCaps> allCaps = getCaps(indexPattern, project.indices());
listener.onResponse(new GetRollupCapsAction.Response(allCaps));
}

View File

@ -13,6 +13,7 @@ import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.logging.DeprecationCategory;
import org.elasticsearch.common.logging.DeprecationLogger;
@ -46,13 +47,15 @@ public class TransportGetRollupIndexCapsAction extends HandledTransportAction<
private final ClusterService clusterService;
private final IndexNameExpressionResolver resolver;
private final Executor managementExecutor;
private final ProjectResolver projectResolver;
@Inject
public TransportGetRollupIndexCapsAction(
TransportService transportService,
ClusterService clusterService,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver
IndexNameExpressionResolver indexNameExpressionResolver,
ProjectResolver projectResolver
) {
// TODO replace SAME when removing workaround for https://github.com/elastic/elasticsearch/issues/97916
super(
@ -65,6 +68,7 @@ public class TransportGetRollupIndexCapsAction extends HandledTransportAction<
this.clusterService = clusterService;
this.managementExecutor = transportService.getThreadPool().executor(ThreadPool.Names.MANAGEMENT);
this.resolver = indexNameExpressionResolver;
this.projectResolver = projectResolver;
}
@Override
@ -80,11 +84,9 @@ public class TransportGetRollupIndexCapsAction extends HandledTransportAction<
private void doExecuteForked(IndicesRequest request, ActionListener<GetRollupIndexCapsAction.Response> listener) {
Transports.assertNotTransportThread("retrieving rollup job index caps may be expensive");
String[] indices = resolver.concreteIndexNames(clusterService.state(), request.indicesOptions(), request);
Map<String, RollableIndexCaps> allCaps = getCapsByRollupIndex(
Arrays.asList(indices),
clusterService.state().getMetadata().getProject().indices()
);
final var project = projectResolver.getProjectMetadata(clusterService.state());
String[] indices = resolver.concreteIndexNames(project, request.indicesOptions(), request);
Map<String, RollableIndexCaps> allCaps = getCapsByRollupIndex(Arrays.asList(indices), project.indices());
listener.onResponse(new GetRollupIndexCapsAction.Response(allCaps));
}

View File

@ -29,7 +29,9 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.MappingMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.logging.DeprecationCategory;
import org.elasticsearch.common.logging.DeprecationLogger;
@ -78,6 +80,7 @@ public class TransportPutRollupJobAction extends AcknowledgedTransportMasterNode
private final PersistentTasksService persistentTasksService;
private final Client client;
private final ProjectResolver projectResolver;
@Inject
public TransportPutRollupJobAction(
@ -86,7 +89,8 @@ public class TransportPutRollupJobAction extends AcknowledgedTransportMasterNode
ActionFilters actionFilters,
ClusterService clusterService,
PersistentTasksService persistentTasksService,
Client client
Client client,
ProjectResolver projectResolver
) {
super(
PutRollupJobAction.NAME,
@ -99,7 +103,7 @@ public class TransportPutRollupJobAction extends AcknowledgedTransportMasterNode
);
this.persistentTasksService = persistentTasksService;
this.client = client;
this.projectResolver = projectResolver;
}
@Override
@ -113,10 +117,11 @@ public class TransportPutRollupJobAction extends AcknowledgedTransportMasterNode
XPackPlugin.checkReadyForXPackCustomMetadata(clusterState);
checkForDeprecatedTZ(request);
int numberOfCurrentRollupJobs = RollupUsageTransportAction.findNumberOfRollupJobs(clusterState.metadata().getProject());
final var project = projectResolver.getProjectMetadata(clusterState);
int numberOfCurrentRollupJobs = RollupUsageTransportAction.findNumberOfRollupJobs(project);
if (numberOfCurrentRollupJobs == 0) {
try {
boolean hasRollupIndices = hasRollupIndices(clusterState.getMetadata());
boolean hasRollupIndices = hasRollupIndices(project);
if (hasRollupIndices == false) {
listener.onFailure(
new IllegalArgumentException(
@ -135,6 +140,7 @@ public class TransportPutRollupJobAction extends AcknowledgedTransportMasterNode
.fields(request.getConfig().getAllFields().toArray(new String[0]));
fieldCapsRequest.setParentTask(clusterService.localNode().getId(), task.getId());
final var projectId = project.id();
client.fieldCaps(fieldCapsRequest, listener.delegateFailure((l, fieldCapabilitiesResponse) -> {
ActionRequestValidationException validationException = request.validateMappings(fieldCapabilitiesResponse.get());
if (validationException != null) {
@ -143,7 +149,7 @@ public class TransportPutRollupJobAction extends AcknowledgedTransportMasterNode
}
RollupJob job = createRollupJob(request.getConfig(), threadPool);
createIndex(job, l, persistentTasksService, client, LOGGER);
createIndex(projectId, job, l, persistentTasksService, client, LOGGER);
}));
}
@ -177,6 +183,7 @@ public class TransportPutRollupJobAction extends AcknowledgedTransportMasterNode
}
static void createIndex(
ProjectId projectId,
RollupJob job,
ActionListener<AcknowledgedResponse> listener,
PersistentTasksService persistentTasksService,
@ -196,10 +203,10 @@ public class TransportPutRollupJobAction extends AcknowledgedTransportMasterNode
client.execute(
TransportCreateIndexAction.TYPE,
request,
ActionListener.wrap(createIndexResponse -> startPersistentTask(job, listener, persistentTasksService), e -> {
ActionListener.wrap(createIndexResponse -> startPersistentTask(projectId, job, listener, persistentTasksService), e -> {
if (e instanceof ResourceAlreadyExistsException) {
logger.debug("Rolled index already exists for rollup job [" + job.getConfig().getId() + "], updating metadata.");
updateMapping(job, listener, persistentTasksService, client, logger, request.masterNodeTimeout());
updateMapping(projectId, job, listener, persistentTasksService, client, logger, request.masterNodeTimeout());
} else {
String msg = "Could not create index for rollup job [" + job.getConfig().getId() + "]";
logger.error(msg);
@ -245,6 +252,7 @@ public class TransportPutRollupJobAction extends AcknowledgedTransportMasterNode
@SuppressWarnings("unchecked")
static void updateMapping(
ProjectId projectId,
RollupJob job,
ActionListener<AcknowledgedResponse> listener,
PersistentTasksService persistentTasksService,
@ -301,7 +309,10 @@ public class TransportPutRollupJobAction extends AcknowledgedTransportMasterNode
client.execute(
TransportPutMappingAction.TYPE,
request,
ActionListener.wrap(putMappingResponse -> startPersistentTask(job, listener, persistentTasksService), listener::onFailure)
ActionListener.wrap(
putMappingResponse -> startPersistentTask(projectId, job, listener, persistentTasksService),
listener::onFailure
)
);
};
@ -314,17 +325,19 @@ public class TransportPutRollupJobAction extends AcknowledgedTransportMasterNode
}
static void startPersistentTask(
ProjectId projectId,
RollupJob job,
ActionListener<AcknowledgedResponse> listener,
PersistentTasksService persistentTasksService
) {
assertNoAuthorizationHeader(job.getHeaders());
persistentTasksService.sendStartRequest(
persistentTasksService.sendProjectStartRequest(
projectId,
job.getConfig().getId(),
RollupField.TASK_NAME,
job,
TimeValue.THIRTY_SECONDS /* TODO should this be configurable? longer by default? infinite? */,
ActionListener.wrap(rollupConfigPersistentTask -> waitForRollupStarted(job, listener, persistentTasksService), e -> {
ActionListener.wrap(rollupConfigPersistentTask -> waitForRollupStarted(projectId, job, listener, persistentTasksService), e -> {
if (e instanceof ResourceAlreadyExistsException) {
e = new ElasticsearchStatusException(
"Cannot create job [" + job.getConfig().getId() + "] because it has already been created (task exists)",
@ -338,11 +351,13 @@ public class TransportPutRollupJobAction extends AcknowledgedTransportMasterNode
}
private static void waitForRollupStarted(
ProjectId projectId,
RollupJob job,
ActionListener<AcknowledgedResponse> listener,
PersistentTasksService persistentTasksService
) {
persistentTasksService.waitForPersistentTaskCondition(
projectId,
job.getConfig().getId(),
Objects::nonNull,
job.getConfig().getTimeout(),
@ -369,9 +384,9 @@ public class TransportPutRollupJobAction extends AcknowledgedTransportMasterNode
);
}
static boolean hasRollupIndices(Metadata metadata) throws IOException {
static boolean hasRollupIndices(ProjectMetadata project) throws IOException {
// Sniffing logic instead of invoking sourceAsMap(), which would materialize the entire mapping as map of maps.
for (var imd : metadata.getProject()) {
for (var imd : project) {
if (imd.mapping() == null) {
continue;
}

View File

@ -20,6 +20,7 @@ import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
@ -88,6 +89,7 @@ public class TransportRollupSearchAction extends TransportAction<SearchRequest,
private final ScriptService scriptService;
private final ClusterService clusterService;
private final IndexNameExpressionResolver resolver;
private final ProjectResolver projectResolver;
private static final Logger logger = LogManager.getLogger(RollupSearchAction.class);
@Inject
@ -99,7 +101,8 @@ public class TransportRollupSearchAction extends TransportAction<SearchRequest,
BigArrays bigArrays,
ScriptService scriptService,
ClusterService clusterService,
IndexNameExpressionResolver resolver
IndexNameExpressionResolver resolver,
ProjectResolver projectResolver
) {
super(RollupSearchAction.NAME, actionFilters, transportService.getTaskManager(), EsExecutors.DIRECT_EXECUTOR_SERVICE);
this.client = client;
@ -108,6 +111,7 @@ public class TransportRollupSearchAction extends TransportAction<SearchRequest,
this.scriptService = scriptService;
this.clusterService = clusterService;
this.resolver = resolver;
this.projectResolver = projectResolver;
transportService.registerRequestHandler(
actionName,
@ -123,7 +127,8 @@ public class TransportRollupSearchAction extends TransportAction<SearchRequest,
protected void doExecute(Task task, SearchRequest request, ActionListener<SearchResponse> listener) {
DEPRECATION_LOGGER.warn(DeprecationCategory.API, DEPRECATION_KEY, DEPRECATION_MESSAGE);
String[] indices = resolver.concreteIndexNames(clusterService.state(), request);
RollupSearchContext rollupSearchContext = separateIndices(indices, clusterService.state().getMetadata().getProject().indices());
final var project = projectResolver.getProjectMetadata(clusterService.state());
RollupSearchContext rollupSearchContext = separateIndices(indices, project.indices());
MultiSearchRequest msearch = createMSearchRequest(request, registry, rollupSearchContext);

View File

@ -52,6 +52,7 @@ public class PutJobStateMachineTests extends ESTestCase {
@SuppressWarnings({ "unchecked", "rawtypes" })
public void testCreateIndexException() {
final var projectId = randomProjectIdOrDefault();
RollupJob job = new RollupJob(ConfigTestHelpers.randomRollupJobConfig(random(), "foo"), Collections.emptyMap());
ActionListener<AcknowledgedResponse> testListener = ActionListener.wrap(response -> {
@ -70,7 +71,7 @@ public class PutJobStateMachineTests extends ESTestCase {
return null;
}).when(client).execute(eq(TransportCreateIndexAction.TYPE), any(CreateIndexRequest.class), requestCaptor.capture());
TransportPutRollupJobAction.createIndex(job, testListener, mock(PersistentTasksService.class), client, logger);
TransportPutRollupJobAction.createIndex(projectId, job, testListener, mock(PersistentTasksService.class), client, logger);
// ResourceAlreadyExists should trigger a GetMapping next
verify(client).execute(eq(TransportCreateIndexAction.TYPE), any(CreateIndexRequest.class), any());
@ -78,6 +79,7 @@ public class PutJobStateMachineTests extends ESTestCase {
@SuppressWarnings({ "unchecked", "rawtypes" })
public void testIndexAlreadyExists() {
final var projectId = randomProjectIdOrDefault();
RollupJob job = new RollupJob(ConfigTestHelpers.randomRollupJobConfig(random()), Collections.emptyMap());
ActionListener<AcknowledgedResponse> testListener = ActionListener.wrap(response -> {
@ -100,7 +102,7 @@ public class PutJobStateMachineTests extends ESTestCase {
return null;
}).when(client).execute(eq(GetMappingsAction.INSTANCE), any(GetMappingsRequest.class), requestCaptor2.capture());
TransportPutRollupJobAction.createIndex(job, testListener, mock(PersistentTasksService.class), client, logger);
TransportPutRollupJobAction.createIndex(projectId, job, testListener, mock(PersistentTasksService.class), client, logger);
// ResourceAlreadyExists should trigger a GetMapping next
verify(client).execute(eq(GetMappingsAction.INSTANCE), any(GetMappingsRequest.class), any());
@ -108,6 +110,7 @@ public class PutJobStateMachineTests extends ESTestCase {
@SuppressWarnings({ "unchecked", "rawtypes" })
public void testIndexMetadata() throws InterruptedException {
final var projectId = randomProjectIdOrDefault();
RollupJob job = new RollupJob(ConfigTestHelpers.randomRollupJobConfig(random()), Collections.emptyMap());
ActionListener<AcknowledgedResponse> testListener = ActionListener.wrap(response -> {
@ -140,7 +143,7 @@ public class PutJobStateMachineTests extends ESTestCase {
return null;
}).when(client).execute(eq(GetMappingsAction.INSTANCE), any(GetMappingsRequest.class), requestCaptor2.capture());
TransportPutRollupJobAction.createIndex(job, testListener, mock(PersistentTasksService.class), client, logger);
TransportPutRollupJobAction.createIndex(projectId, job, testListener, mock(PersistentTasksService.class), client, logger);
// ResourceAlreadyExists should trigger a GetMapping next
verify(client).execute(eq(GetMappingsAction.INSTANCE), any(GetMappingsRequest.class), any());
@ -149,6 +152,7 @@ public class PutJobStateMachineTests extends ESTestCase {
@SuppressWarnings({ "unchecked", "rawtypes" })
public void testGetMappingFails() {
final var projectId = randomProjectIdOrDefault();
RollupJob job = new RollupJob(ConfigTestHelpers.randomRollupJobConfig(random(), "foo"), Collections.emptyMap());
ActionListener<AcknowledgedResponse> testListener = ActionListener.wrap(response -> {
@ -168,6 +172,7 @@ public class PutJobStateMachineTests extends ESTestCase {
}).when(client).execute(eq(GetMappingsAction.INSTANCE), any(GetMappingsRequest.class), requestCaptor.capture());
TransportPutRollupJobAction.updateMapping(
projectId,
job,
testListener,
mock(PersistentTasksService.class),
@ -180,6 +185,7 @@ public class PutJobStateMachineTests extends ESTestCase {
@SuppressWarnings({ "unchecked", "rawtypes" })
public void testNoMetadataInMapping() {
final var projectId = randomProjectIdOrDefault();
RollupJob job = new RollupJob(ConfigTestHelpers.randomRollupJobConfig(random()), Collections.emptyMap());
ActionListener<AcknowledgedResponse> testListener = ActionListener.wrap(response -> {
@ -210,6 +216,7 @@ public class PutJobStateMachineTests extends ESTestCase {
}).when(client).execute(eq(GetMappingsAction.INSTANCE), any(GetMappingsRequest.class), requestCaptor.capture());
TransportPutRollupJobAction.updateMapping(
projectId,
job,
testListener,
mock(PersistentTasksService.class),
@ -222,6 +229,7 @@ public class PutJobStateMachineTests extends ESTestCase {
@SuppressWarnings({ "unchecked", "rawtypes" })
public void testMetadataButNotRollup() {
final var projectId = randomProjectIdOrDefault();
RollupJob job = new RollupJob(ConfigTestHelpers.randomRollupJobConfig(random()), Collections.emptyMap());
ActionListener<AcknowledgedResponse> testListener = ActionListener.wrap(response -> {
@ -254,6 +262,7 @@ public class PutJobStateMachineTests extends ESTestCase {
}).when(client).execute(eq(GetMappingsAction.INSTANCE), any(GetMappingsRequest.class), requestCaptor.capture());
TransportPutRollupJobAction.updateMapping(
projectId,
job,
testListener,
mock(PersistentTasksService.class),
@ -266,6 +275,7 @@ public class PutJobStateMachineTests extends ESTestCase {
@SuppressWarnings({ "unchecked", "rawtypes" })
public void testJobAlreadyInMapping() {
final var projectId = randomProjectIdOrDefault();
RollupJob job = new RollupJob(ConfigTestHelpers.randomRollupJobConfig(random(), "foo"), Collections.emptyMap());
ActionListener<AcknowledgedResponse> testListener = ActionListener.wrap(response -> {
@ -295,6 +305,7 @@ public class PutJobStateMachineTests extends ESTestCase {
}).when(client).execute(eq(GetMappingsAction.INSTANCE), any(GetMappingsRequest.class), requestCaptor.capture());
TransportPutRollupJobAction.updateMapping(
projectId,
job,
testListener,
mock(PersistentTasksService.class),
@ -307,6 +318,7 @@ public class PutJobStateMachineTests extends ESTestCase {
@SuppressWarnings({ "unchecked", "rawtypes" })
public void testAddJobToMapping() {
final var projectId = randomProjectIdOrDefault();
final RollupJobConfig unrelatedJob = ConfigTestHelpers.randomRollupJobConfig(
random(),
ESTestCase.randomAlphaOfLength(10),
@ -348,6 +360,7 @@ public class PutJobStateMachineTests extends ESTestCase {
}).when(client).execute(eq(TransportPutMappingAction.TYPE), any(PutMappingRequest.class), requestCaptor2.capture());
TransportPutRollupJobAction.updateMapping(
projectId,
job,
testListener,
mock(PersistentTasksService.class),
@ -361,6 +374,7 @@ public class PutJobStateMachineTests extends ESTestCase {
@SuppressWarnings({ "unchecked", "rawtypes" })
public void testTaskAlreadyExists() {
final var projectId = randomProjectIdOrDefault();
RollupJob job = new RollupJob(ConfigTestHelpers.randomRollupJobConfig(random(), "foo"), Collections.emptyMap());
ActionListener<AcknowledgedResponse> testListener = ActionListener.wrap(response -> {
@ -374,14 +388,29 @@ public class PutJobStateMachineTests extends ESTestCase {
requestCaptor.getValue().onFailure(new ResourceAlreadyExistsException(job.getConfig().getRollupIndex()));
return null;
}).when(tasksService)
.sendStartRequest(eq(job.getConfig().getId()), eq(RollupField.TASK_NAME), eq(job), isNotNull(), requestCaptor.capture());
.sendProjectStartRequest(
eq(projectId),
eq(job.getConfig().getId()),
eq(RollupField.TASK_NAME),
eq(job),
isNotNull(),
requestCaptor.capture()
);
TransportPutRollupJobAction.startPersistentTask(job, testListener, tasksService);
verify(tasksService).sendStartRequest(eq(job.getConfig().getId()), eq(RollupField.TASK_NAME), eq(job), isNotNull(), any());
TransportPutRollupJobAction.startPersistentTask(projectId, job, testListener, tasksService);
verify(tasksService).sendProjectStartRequest(
eq(projectId),
eq(job.getConfig().getId()),
eq(RollupField.TASK_NAME),
eq(job),
isNotNull(),
any()
);
}
@SuppressWarnings({ "unchecked", "rawtypes" })
public void testStartTask() {
final var projectId = randomProjectIdOrDefault();
RollupJob job = new RollupJob(ConfigTestHelpers.randomRollupJobConfig(random()), Collections.emptyMap());
ActionListener<AcknowledgedResponse> testListener = ActionListener.wrap(response -> {
@ -402,7 +431,14 @@ public class PutJobStateMachineTests extends ESTestCase {
requestCaptor.getValue().onResponse(response);
return null;
}).when(tasksService)
.sendStartRequest(eq(job.getConfig().getId()), eq(RollupField.TASK_NAME), eq(job), isNotNull(), requestCaptor.capture());
.sendProjectStartRequest(
eq(projectId),
eq(job.getConfig().getId()),
eq(RollupField.TASK_NAME),
eq(job),
isNotNull(),
requestCaptor.capture()
);
ArgumentCaptor<PersistentTasksService.WaitForPersistentTaskListener> requestCaptor2 = ArgumentCaptor.forClass(
PersistentTasksService.WaitForPersistentTaskListener.class
@ -411,11 +447,19 @@ public class PutJobStateMachineTests extends ESTestCase {
// Bail here with an error, further testing will happen through tests of #startPersistentTask
requestCaptor2.getValue().onFailure(new RuntimeException("Ending"));
return null;
}).when(tasksService).waitForPersistentTaskCondition(eq(job.getConfig().getId()), any(), any(), requestCaptor2.capture());
}).when(tasksService)
.waitForPersistentTaskCondition(eq(projectId), eq(job.getConfig().getId()), any(), any(), requestCaptor2.capture());
TransportPutRollupJobAction.startPersistentTask(job, testListener, tasksService);
verify(tasksService).sendStartRequest(eq(job.getConfig().getId()), eq(RollupField.TASK_NAME), eq(job), isNotNull(), any());
verify(tasksService).waitForPersistentTaskCondition(eq(job.getConfig().getId()), any(), any(), any());
TransportPutRollupJobAction.startPersistentTask(projectId, job, testListener, tasksService);
verify(tasksService).sendProjectStartRequest(
eq(projectId),
eq(job.getConfig().getId()),
eq(RollupField.TASK_NAME),
eq(job),
isNotNull(),
any()
);
verify(tasksService).waitForPersistentTaskCondition(eq(projectId), eq(job.getConfig().getId()), any(), any(), any());
}
public void testDeprecatedTimeZone() {

View File

@ -8,8 +8,7 @@
package org.elasticsearch.xpack.rollup.action;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.test.ESTestCase;
@ -35,8 +34,8 @@ public class TransportPutRollupJobActionTests extends ESTestCase {
}
}
""";
var metadata = createMetadata(mappings);
assertTrue(TransportPutRollupJobAction.hasRollupIndices(metadata));
var project = createProject(mappings);
assertTrue(TransportPutRollupJobAction.hasRollupIndices(project));
}
{
String mappings = """
@ -55,8 +54,8 @@ public class TransportPutRollupJobActionTests extends ESTestCase {
}
}
""";
var metadata = createMetadata(mappings);
assertTrue(TransportPutRollupJobAction.hasRollupIndices(metadata));
var project = createProject(mappings);
assertTrue(TransportPutRollupJobAction.hasRollupIndices(project));
}
{
String mappings = """
@ -76,8 +75,8 @@ public class TransportPutRollupJobActionTests extends ESTestCase {
}
}
""";
var metadata = createMetadata(mappings);
assertTrue(TransportPutRollupJobAction.hasRollupIndices(metadata));
var project = createProject(mappings);
assertTrue(TransportPutRollupJobAction.hasRollupIndices(project));
}
{
String mappings = """
@ -91,8 +90,8 @@ public class TransportPutRollupJobActionTests extends ESTestCase {
}
}
""";
var metadata = createMetadata(mappings);
assertFalse(TransportPutRollupJobAction.hasRollupIndices(metadata));
var project = createProject(mappings);
assertFalse(TransportPutRollupJobAction.hasRollupIndices(project));
}
{
String mappings = """
@ -101,8 +100,8 @@ public class TransportPutRollupJobActionTests extends ESTestCase {
}
}
""";
var metadata = createMetadata(mappings);
assertFalse(TransportPutRollupJobAction.hasRollupIndices(metadata));
var project = createProject(mappings);
assertFalse(TransportPutRollupJobAction.hasRollupIndices(project));
}
{
String mappings = """
@ -119,17 +118,15 @@ public class TransportPutRollupJobActionTests extends ESTestCase {
}
}
""";
var metadata = createMetadata(mappings);
assertFalse(TransportPutRollupJobAction.hasRollupIndices(metadata));
var project = createProject(mappings);
assertFalse(TransportPutRollupJobAction.hasRollupIndices(project));
}
}
private static Metadata createMetadata(String mappings) {
Settings.Builder b = Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, IndexVersion.current());
var metadata = Metadata.builder()
.put(IndexMetadata.builder("my-rollup-index").settings(b).numberOfShards(1).numberOfReplicas(0).putMapping(mappings))
private static ProjectMetadata createProject(String mappings) {
return ProjectMetadata.builder(randomProjectIdOrDefault())
.put(IndexMetadata.builder("my-rollup-index").settings(indexSettings(IndexVersion.current(), 1, 0)).putMapping(mappings))
.build();
return metadata;
}
}

View File

@ -97,6 +97,7 @@ tasks.named("yamlRestTest").configure {
'^monitoring/bulk/10_basic/*',
'^monitoring/bulk/20_privileges/*',
'^profiling/10_basic/*',
// These rollup tests work in MP mode, they just don't work with security enabled.
'^rollup/delete_job/*',
'^rollup/get_jobs/*',
'^rollup/get_rollup_caps/*',