ESQL: List/get query API (#124832)

This PR adds two new REST endpoints, for listing queries and getting information on a current query.

* Resolves #124827 
* Related to #124828 (initial work)

Changes from the API specified in the above issues:
* The get API is pretty initial, as we don't have a way of fetching the memory used or number of rows processed.

List queries response:
```
GET /_query/queries
// returns for each of the running queries
// query_id, start_time, running_time, query

{ "queries" : {
 "abc": {
  "id": "abc",
  "start_time_millis": 14585858875292,
  "running_time_nanos": 762794,
  "query": "FROM logs* | STATS BY hostname"
  },
 "4321": {
  "id":"4321",
  "start_time_millis": 14585858823573,
  "running_time_nanos": 90231,
  "query": "FROM orders | LOOKUP country_code ON country"
  }
 } 
}
```

Get query response:
```
GET /_query/queries/abc

{
 "id" : "abc",
  "start_time_millis": 14585858875292,
  "running_time_nanos": 762794,
  "query": "FROM logs* | STATS BY hostname"
  "coordinating_node": "oTUltX4IQMOUUVeiohTt8A"
  "data_nodes" : [ "DwrYwfytxthse49X4", "i5msnbUyWlpe86e7"]
}
```
This commit is contained in:
Gal Lalouche 2025-04-08 22:21:32 +03:00 committed by GitHub
parent b21e3253a8
commit 953b9fbb83
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
30 changed files with 840 additions and 31 deletions

View File

@ -0,0 +1,6 @@
pr: 124832
summary: List/get query API
area: ES|QL
type: feature
issues:
- 124827

View File

@ -194,6 +194,9 @@ This section lists the privileges that you can assign to a role.
`monitor_enrich`
: All read-only operations related to managing and executing enrich policies.
`monitor_esql`
: All read-only operations related to ES|QL queries.
`monitor_inference`
: All read-only operations related to {{infer}}.

View File

@ -0,0 +1,32 @@
{
"esql.get_query": {
"documentation": {
"url": null,
"description": "Executes a get ESQL query request"
},
"stability": "experimental",
"visibility": "public",
"headers": {
"accept": [],
"content_type": [
"application/json"
]
},
"url": {
"paths": [
{
"path": "/_query/queries/{id}",
"methods": [
"GET"
],
"parts": {
"id": {
"type": "string",
"description": "The query ID"
}
}
}
]
}
}
}

View File

@ -0,0 +1,26 @@
{
"esql.list_queries": {
"documentation": {
"url": null,
"description": "Executes a list ESQL queries request"
},
"stability": "experimental",
"visibility": "public",
"headers": {
"accept": [],
"content_type": [
"application/json"
]
},
"url": {
"paths": [
{
"path": "/_query/queries",
"methods": [
"GET"
]
}
]
}
}
}

View File

@ -0,0 +1,79 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
package org.elasticsearch.test;
import org.hamcrest.BaseMatcher;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.isA;
/**
* A type-agnostic way of comparing integer values, not caring if it's a long or an integer.
*/
public abstract sealed class IntOrLongMatcher<T> extends BaseMatcher<T> {
public static IntOrLongMatcher<Integer> matches(int expected) {
return new IntMatcher(expected);
}
public static IntOrLongMatcher<Long> matches(long expected) {
return new LongMatcher(expected);
}
private static final class IntMatcher extends IntOrLongMatcher<Integer> {
private final int expected;
private IntMatcher(int expected) {
this.expected = expected;
}
@Override
public boolean matches(Object o) {
return switch (o) {
case Integer i -> expected == i;
case Long l -> expected == l;
default -> false;
};
}
@Override
public void describeTo(Description description) {
equalTo(expected).describeTo(description);
}
}
private static final class LongMatcher extends IntOrLongMatcher<Long> {
private final long expected;
LongMatcher(long expected) {
this.expected = expected;
}
@Override
public boolean matches(Object o) {
return switch (o) {
case Integer i -> expected == i;
case Long l -> expected == l;
default -> false;
};
}
@Override
public void describeTo(Description description) {
equalTo(expected).describeTo(description);
}
}
public static Matcher<Object> isIntOrLong() {
return anyOf(isA(Integer.class), isA(Long.class));
}
}

View File

@ -196,6 +196,7 @@ public final class ClientHelper {
public static final String APM_ORIGIN = "apm";
public static final String OTEL_ORIGIN = "otel";
public static final String REINDEX_DATA_STREAM_ORIGIN = "reindex_data_stream";
public static final String ESQL_ORIGIN = "esql";
private ClientHelper() {}

View File

@ -110,6 +110,7 @@ public class ClusterPrivilegeResolver {
private static final Set<String> MONITOR_WATCHER_PATTERN = Set.of("cluster:monitor/xpack/watcher/*");
private static final Set<String> MONITOR_ROLLUP_PATTERN = Set.of("cluster:monitor/xpack/rollup/*");
private static final Set<String> MONITOR_ENRICH_PATTERN = Set.of("cluster:monitor/xpack/enrich/*", "cluster:admin/xpack/enrich/get");
private static final Set<String> MONITOR_ESQL_PATTERN = Set.of("cluster:monitor/xpack/esql/*");
// intentionally cluster:monitor/stats* to match cluster:monitor/stats, cluster:monitor/stats[n] and cluster:monitor/stats/remote
private static final Set<String> MONITOR_STATS_PATTERN = Set.of("cluster:monitor/stats*");
@ -249,6 +250,7 @@ public class ClusterPrivilegeResolver {
public static final NamedClusterPrivilege MONITOR_WATCHER = new ActionClusterPrivilege("monitor_watcher", MONITOR_WATCHER_PATTERN);
public static final NamedClusterPrivilege MONITOR_ROLLUP = new ActionClusterPrivilege("monitor_rollup", MONITOR_ROLLUP_PATTERN);
public static final NamedClusterPrivilege MONITOR_ENRICH = new ActionClusterPrivilege("monitor_enrich", MONITOR_ENRICH_PATTERN);
public static final NamedClusterPrivilege MONITOR_ESQL = new ActionClusterPrivilege("monitor_esql", MONITOR_ESQL_PATTERN);
public static final NamedClusterPrivilege MONITOR_STATS = new ActionClusterPrivilege("monitor_stats", MONITOR_STATS_PATTERN);
public static final NamedClusterPrivilege MANAGE = new ActionClusterPrivilege("manage", ALL_CLUSTER_PATTERN, ALL_SECURITY_PATTERN);
public static final NamedClusterPrivilege MANAGE_INFERENCE = new ActionClusterPrivilege("manage_inference", MANAGE_INFERENCE_PATTERN);
@ -431,6 +433,7 @@ public class ClusterPrivilegeResolver {
MONITOR_WATCHER,
MONITOR_ROLLUP,
MONITOR_ENRICH,
MONITOR_ESQL,
MONITOR_STATS,
MANAGE,
MANAGE_CONNECTOR,

View File

@ -172,6 +172,8 @@ public class AsyncTaskManagementService<
this.threadPool = threadPool;
}
public static String ASYNC_ACTION_SUFFIX = "[a]";
public void asyncExecute(
Request request,
TimeValue waitForCompletionTimeout,
@ -182,7 +184,7 @@ public class AsyncTaskManagementService<
String nodeId = clusterService.localNode().getId();
try (var ignored = threadPool.getThreadContext().newTraceContext()) {
@SuppressWarnings("unchecked")
T searchTask = (T) taskManager.register("transport", action + "[a]", new AsyncRequestWrapper(request, nodeId));
T searchTask = (T) taskManager.register("transport", action + ASYNC_ACTION_SUFFIX, new AsyncRequestWrapper(request, nodeId));
boolean operationStarted = false;
try {
operation.execute(

View File

@ -42,6 +42,7 @@ import static org.elasticsearch.test.MapMatcher.assertMap;
import static org.elasticsearch.test.MapMatcher.matchesMap;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
public class EsqlSecurityIT extends ESRestTestCase {
@ClassRule
@ -69,6 +70,8 @@ public class EsqlSecurityIT extends ESRestTestCase {
.user("logs_foo_after_2021", "x-pack-test-password", "logs_foo_after_2021", false)
.user("logs_foo_after_2021_pattern", "x-pack-test-password", "logs_foo_after_2021_pattern", false)
.user("logs_foo_after_2021_alias", "x-pack-test-password", "logs_foo_after_2021_alias", false)
.user("user_without_monitor_privileges", "x-pack-test-password", "user_without_monitor_privileges", false)
.user("user_with_monitor_privileges", "x-pack-test-password", "user_with_monitor_privileges", false)
.build();
@Override
@ -309,7 +312,7 @@ public class EsqlSecurityIT extends ESRestTestCase {
json.endObject();
Request searchRequest = new Request("GET", "/index-user1,index-user2/_search");
searchRequest.setJsonEntity(Strings.toString(json));
searchRequest.setOptions(RequestOptions.DEFAULT.toBuilder().addHeader("es-security-runas-user", "metadata1_read2"));
setUser(searchRequest, "metadata1_read2");
// ES|QL query on the same index pattern
var esqlResp = expectThrows(ResponseException.class, () -> runESQLCommand("metadata1_read2", "FROM index-user1,index-user2"));
@ -429,13 +432,13 @@ public class EsqlSecurityIT extends ESRestTestCase {
public void testFieldLevelSecurityAllowPartial() throws Exception {
Request request = new Request("GET", "/index*/_field_caps");
request.setOptions(RequestOptions.DEFAULT.toBuilder().addHeader("es-security-runas-user", "fls_user"));
setUser(request, "fls_user");
request.addParameter("error_trace", "true");
request.addParameter("pretty", "true");
request.addParameter("fields", "*");
request = new Request("GET", "/index*/_search");
request.setOptions(RequestOptions.DEFAULT.toBuilder().addHeader("es-security-runas-user", "fls_user"));
setUser(request, "fls_user");
request.addParameter("error_trace", "true");
request.addParameter("pretty", "true");
@ -761,6 +764,36 @@ public class EsqlSecurityIT extends ESRestTestCase {
assertThat(resp.getResponse().getStatusLine().getStatusCode(), equalTo(HttpStatus.SC_BAD_REQUEST));
}
public void testListQueryAllowed() throws Exception {
Request request = new Request("GET", "_query/queries");
setUser(request, "user_with_monitor_privileges");
var resp = client().performRequest(request);
assertOK(resp);
}
public void testListQueryForbidden() throws Exception {
Request request = new Request("GET", "_query/queries");
setUser(request, "user_without_monitor_privileges");
var resp = expectThrows(ResponseException.class, () -> client().performRequest(request));
assertThat(resp.getResponse().getStatusLine().getStatusCode(), equalTo(403));
assertThat(resp.getMessage(), containsString("this action is granted by the cluster privileges [monitor_esql,monitor,manage,all]"));
}
public void testGetQueryAllowed() throws Exception {
// This is a bit tricky, since there is no such running query. We just make sure it didn't fail on forbidden privileges.
Request request = new Request("GET", "_query/queries/foo:1234");
var resp = expectThrows(ResponseException.class, () -> client().performRequest(request));
assertThat(resp.getResponse().getStatusLine().getStatusCode(), not(equalTo(404)));
}
public void testGetQueryForbidden() throws Exception {
Request request = new Request("GET", "_query/queries/foo:1234");
setUser(request, "user_without_monitor_privileges");
var resp = expectThrows(ResponseException.class, () -> client().performRequest(request));
assertThat(resp.getResponse().getStatusLine().getStatusCode(), equalTo(403));
assertThat(resp.getMessage(), containsString("this action is granted by the cluster privileges [monitor_esql,monitor,manage,all]"));
}
private void createEnrichPolicy() throws Exception {
createIndex("songs", Settings.EMPTY, """
"properties":{"song_id": {"type": "keyword"}, "title": {"type": "keyword"}, "artist": {"type": "keyword"} }
@ -837,11 +870,16 @@ public class EsqlSecurityIT extends ESRestTestCase {
json.endObject();
Request request = new Request("POST", "_query");
request.setJsonEntity(Strings.toString(json));
request.setOptions(RequestOptions.DEFAULT.toBuilder().addHeader("es-security-runas-user", user));
setUser(request, user);
request.addParameter("error_trace", "true");
return client().performRequest(request);
}
private static void setUser(Request request, String user) {
request.setOptions(RequestOptions.DEFAULT.toBuilder().addHeader("es-security-runas-user", user));
}
static void addRandomPragmas(XContentBuilder builder) throws IOException {
if (Build.current().isSnapshot()) {
Settings pragmas = randomPragmas();
@ -853,7 +891,7 @@ public class EsqlSecurityIT extends ESRestTestCase {
}
}
static Settings randomPragmas() {
private static Settings randomPragmas() {
Settings.Builder settings = Settings.builder();
if (randomBoolean()) {
settings.put("page_size", between(1, 5));

View File

@ -193,3 +193,10 @@ logs_foo_after_2021_alias:
"@timestamp": {"gte": "2021-01-01T00:00:00"}
}
}
user_without_monitor_privileges:
cluster: []
user_with_monitor_privileges:
cluster:
- monitor_esql

View File

@ -20,7 +20,6 @@ import org.elasticsearch.client.WarningsHandler;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
@ -40,7 +39,6 @@ import org.junit.Before;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
@ -1395,15 +1393,11 @@ public abstract class RestEsqlTestCase extends ESRestTestCase {
}
protected static Map<String, Object> entityToMap(HttpEntity entity, XContentType expectedContentType) throws IOException {
try (InputStream content = entity.getContent()) {
XContentType xContentType = XContentType.fromMediaType(entity.getContentType().getValue());
assertEquals(expectedContentType, xContentType);
var map = XContentHelper.convertToMap(xContentType.xContent(), content, false);
if (shouldLog()) {
LOGGER.info("entity={}", map);
}
return map;
var result = EsqlTestUtils.entityToMap(entity, expectedContentType);
if (shouldLog()) {
LOGGER.info("entity={}", result);
}
return result;
}
static void addAsyncParameters(RequestObjectBuilder requestObject, boolean keepOnCompletion) throws IOException {
@ -1535,21 +1529,18 @@ public abstract class RestEsqlTestCase extends ESRestTestCase {
}
private static Request prepareRequest(Mode mode) {
Request request = new Request("POST", "/_query" + (mode == ASYNC ? "/async" : ""));
request.addParameter("error_trace", "true"); // Helps with debugging in case something crazy happens on the server.
request.addParameter("pretty", "true"); // Improves error reporting readability
return request;
return finishRequest(new Request("POST", "/_query" + (mode == ASYNC ? "/async" : "")));
}
private static Request prepareAsyncGetRequest(String id) {
Request request = new Request("GET", "/_query/async/" + id + "?wait_for_completion_timeout=60s");
request.addParameter("error_trace", "true"); // Helps with debugging in case something crazy happens on the server.
request.addParameter("pretty", "true"); // Improves error reporting readability
return request;
return finishRequest(new Request("GET", "/_query/async/" + id + "?wait_for_completion_timeout=60s"));
}
private static Request prepareAsyncDeleteRequest(String id) {
Request request = new Request("DELETE", "/_query/async/" + id);
return finishRequest(new Request("DELETE", "/_query/async/" + id));
}
private static Request finishRequest(Request request) {
request.addParameter("error_trace", "true"); // Helps with debugging in case something crazy happens on the server.
request.addParameter("pretty", "true"); // Improves error reporting readability
return request;

View File

@ -7,6 +7,7 @@
package org.elasticsearch.xpack.esql;
import org.apache.http.HttpEntity;
import org.apache.lucene.document.InetAddressPoint;
import org.apache.lucene.sandbox.document.HalfFloatPoint;
import org.apache.lucene.util.BytesRef;
@ -22,6 +23,7 @@ import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.compute.data.AggregateMetricDoubleBlockBuilder;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.BlockUtils;
@ -41,6 +43,7 @@ import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.xcontent.json.JsonXContent;
import org.elasticsearch.xpack.esql.action.EsqlQueryResponse;
import org.elasticsearch.xpack.esql.analysis.EnrichResolution;
@ -881,6 +884,18 @@ public final class EsqlTestUtils {
return collection.iterator().next();
}
public static Map<String, Object> jsonEntityToMap(HttpEntity entity) throws IOException {
return entityToMap(entity, XContentType.JSON);
}
public static Map<String, Object> entityToMap(HttpEntity entity, XContentType expectedContentType) throws IOException {
try (InputStream content = entity.getContent()) {
XContentType xContentType = XContentType.fromMediaType(entity.getContentType().getValue());
assertEquals(expectedContentType, xContentType);
return XContentHelper.convertToMap(xContentType.xContent(), content, false /* ordered */);
}
}
/**
* Errors from remotes are wrapped in RemoteException while the ones from the local cluster
* aren't. This utility method is useful for unwrapping in such cases.

View File

@ -20,6 +20,7 @@ import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
import org.elasticsearch.xpack.core.async.DeleteAsyncResultRequest;
import org.elasticsearch.xpack.core.async.GetAsyncResultRequest;
import org.elasticsearch.xpack.core.async.TransportDeleteAsyncResultAction;
import org.elasticsearch.xpack.esql.core.async.AsyncTaskManagementService;
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
import org.hamcrest.core.IsEqual;
@ -136,7 +137,11 @@ public class AsyncEsqlQueryActionIT extends AbstractPausableIntegTestCase {
.toList();
assertThat(tasks.size(), greaterThanOrEqualTo(1));
});
client().admin().cluster().prepareCancelTasks().setActions(EsqlQueryAction.NAME + "[a]").get();
client().admin()
.cluster()
.prepareCancelTasks()
.setActions(EsqlQueryAction.NAME + AsyncTaskManagementService.ASYNC_ACTION_SUFFIX)
.get();
assertBusy(() -> {
List<TaskInfo> tasks = getEsqlQueryTasks().stream().filter(TaskInfo::cancelled).toList();
assertThat(tasks, not(empty()));

View File

@ -0,0 +1,88 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.esql.action;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.test.IntOrLongMatcher;
import org.elasticsearch.test.MapMatcher;
import org.elasticsearch.xpack.core.async.GetAsyncResultRequest;
import org.elasticsearch.xpack.esql.EsqlTestUtils;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.core.TimeValue.timeValueSeconds;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.jsonEntityToMap;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.everyItem;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.isA;
public class EsqlListQueriesActionIT extends AbstractPausableIntegTestCase {
private static final String QUERY = "from test | stats sum(pause_me)";
@Override
protected boolean addMockHttpTransport() {
return false;
}
public void testNoRunningQueries() throws Exception {
var request = new Request("GET", "/_query/queries");
var response = getRestClient().performRequest(request);
assertThat(jsonEntityToMap(response.getEntity()), is(Map.of("queries", Map.of())));
}
public void testRunningQueries() throws Exception {
String id = null;
try (var initialResponse = sendAsyncQuery()) {
id = initialResponse.asyncExecutionId().get();
var getResultsRequest = new GetAsyncResultRequest(id);
getResultsRequest.setWaitForCompletionTimeout(timeValueSeconds(1));
client().execute(EsqlAsyncGetResultAction.INSTANCE, getResultsRequest).get().close();
Response listResponse = getRestClient().performRequest(new Request("GET", "/_query/queries"));
@SuppressWarnings("unchecked")
var listResult = (Map<String, Map<String, Object>>) EsqlTestUtils.singleValue(
jsonEntityToMap(listResponse.getEntity()).values()
);
var taskId = new TaskId(EsqlTestUtils.singleValue(listResult.keySet()));
MapMatcher basicMatcher = MapMatcher.matchesMap()
.entry("node", is(taskId.getNodeId()))
.entry("id", IntOrLongMatcher.matches(taskId.getId()))
.entry("query", is(QUERY))
.entry("start_time_millis", IntOrLongMatcher.isIntOrLong())
.entry("running_time_nanos", IntOrLongMatcher.isIntOrLong());
MapMatcher.assertMap(EsqlTestUtils.singleValue(listResult.values()), basicMatcher);
Response getQueryResponse = getRestClient().performRequest(new Request("GET", "/_query/queries/" + taskId));
MapMatcher.assertMap(
jsonEntityToMap(getQueryResponse.getEntity()),
basicMatcher.entry("coordinating_node", isA(String.class))
.entry("data_nodes", allOf(isA(List.class), everyItem(isA(String.class))))
);
} finally {
if (id != null) {
// Finish the query.
scriptPermits.release(numberOfDocs());
var getResultsRequest = new GetAsyncResultRequest(id);
getResultsRequest.setWaitForCompletionTimeout(timeValueSeconds(60));
client().execute(EsqlAsyncGetResultAction.INSTANCE, getResultsRequest).get().close();
}
scriptPermits.drainPermits();
}
}
private EsqlQueryResponse sendAsyncQuery() {
scriptPermits.drainPermits();
scriptPermits.release(between(1, 5));
return EsqlQueryRequestBuilder.newAsyncEsqlQueryRequestBuilder(client()).query(QUERY).execute().actionGet(60, TimeUnit.SECONDS);
}
}

View File

@ -954,7 +954,12 @@ public class EsqlCapabilities {
* the ownership of that block - but didn't account for the fact that the caller might close it, leading to double releases
* in some union type queries. C.f. https://github.com/elastic/elasticsearch/issues/125850
*/
FIX_DOUBLY_RELEASED_NULL_BLOCKS_IN_VALUESOURCEREADER;
FIX_DOUBLY_RELEASED_NULL_BLOCKS_IN_VALUESOURCEREADER,
/**
* Listing queries and getting information on a specific query.
*/
QUERY_MONITORING;
private final boolean enabled;

View File

@ -0,0 +1,20 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.esql.action;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.xpack.esql.plugin.EsqlGetQueryResponse;
public class EsqlGetQueryAction extends ActionType<EsqlGetQueryResponse> {
public static final EsqlGetQueryAction INSTANCE = new EsqlGetQueryAction();
public static final String NAME = "cluster:monitor/xpack/esql/get_query";
private EsqlGetQueryAction() {
super(NAME);
}
}

View File

@ -0,0 +1,43 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.esql.action;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.tasks.TaskId;
import java.io.IOException;
public class EsqlGetQueryRequest extends ActionRequest {
private final TaskId id;
public EsqlGetQueryRequest(TaskId id) {
this.id = id;
}
public TaskId id() {
return id;
}
public EsqlGetQueryRequest(StreamInput streamInput) throws IOException {
super(streamInput);
id = TaskId.readFromStream(streamInput);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeWriteable(id);
}
@Override
public ActionRequestValidationException validate() {
return null;
}
}

View File

@ -0,0 +1,20 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.esql.action;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.xpack.esql.plugin.EsqlListQueriesResponse;
public class EsqlListQueriesAction extends ActionType<EsqlListQueriesResponse> {
public static final EsqlListQueriesAction INSTANCE = new EsqlListQueriesAction();
public static final String NAME = "cluster:monitor/xpack/esql/list_queries";
private EsqlListQueriesAction() {
super(NAME);
}
}

View File

@ -0,0 +1,27 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.esql.action;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.common.io.stream.StreamInput;
import java.io.IOException;
public class EsqlListQueriesRequest extends ActionRequest {
public EsqlListQueriesRequest() {}
public EsqlListQueriesRequest(StreamInput streamInput) throws IOException {
super(streamInput);
}
@Override
public ActionRequestValidationException validate() {
return null;
}
}

View File

@ -0,0 +1,53 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.esql.action;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.Scope;
import org.elasticsearch.rest.ServerlessScope;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.tasks.TaskId;
import java.io.IOException;
import java.util.List;
import static org.elasticsearch.rest.RestRequest.Method.GET;
@ServerlessScope(Scope.PUBLIC)
public class RestEsqlListQueriesAction extends BaseRestHandler {
private static final Logger LOGGER = LogManager.getLogger(RestEsqlListQueriesAction.class);
@Override
public String getName() {
return "esql_list_queries";
}
@Override
public List<Route> routes() {
return List.of(new Route(GET, "/_query/queries/{id}"), new Route(GET, "/_query/queries"));
}
@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
return restChannelConsumer(request, client);
}
private static RestChannelConsumer restChannelConsumer(RestRequest request, NodeClient client) {
LOGGER.debug("Beginning execution of ESQL list queries.");
String id = request.param("id");
var action = id != null ? EsqlGetQueryAction.INSTANCE : EsqlListQueriesAction.INSTANCE;
var actionRequest = id != null ? new EsqlGetQueryRequest(new TaskId(id)) : new EsqlListQueriesRequest();
return channel -> client.execute(action, actionRequest, new RestToXContentListener<>(channel));
}
}

View File

@ -0,0 +1,59 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.esql.plugin;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.List;
public class EsqlGetQueryResponse extends ActionResponse implements ToXContentObject {
// This is rather limited at the moment, as we don't extract information such as CPU and memory usage, owning user, etc. for the task.
public record DetailedQuery(
TaskId id,
long startTimeMillis,
long runningTimeNanos,
String query,
String coordinatingNode,
List<String> dataNodes
) implements ToXContentObject {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("id", id.getId());
builder.field("node", id.getNodeId());
builder.field("start_time_millis", startTimeMillis);
builder.field("running_time_nanos", runningTimeNanos);
builder.field("query", query);
builder.field("coordinating_node", coordinatingNode);
builder.field("data_nodes", dataNodes);
builder.endObject();
return builder;
}
}
private final DetailedQuery query;
public EsqlGetQueryResponse(DetailedQuery query) {
this.query = query;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
throw new AssertionError("should not reach here");
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return query.toXContent(builder, params);
}
}

View File

@ -0,0 +1,57 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.esql.plugin;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.xcontent.ToXContentFragment;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.List;
public class EsqlListQueriesResponse extends ActionResponse implements ToXContentObject {
private final List<Query> queries;
public record Query(TaskId taskId, long startTimeMillis, long runningTimeNanos, String query) implements ToXContentFragment {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(taskId.toString());
builder.field("id", taskId.getId());
builder.field("node", taskId.getNodeId());
builder.field("start_time_millis", startTimeMillis);
builder.field("running_time_nanos", runningTimeNanos);
builder.field("query", query);
builder.endObject();
return builder;
}
}
public EsqlListQueriesResponse(List<Query> queries) {
this.queries = queries;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
throw new AssertionError("should not reach here");
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.startObject("queries");
for (Query query : queries) {
query.toXContent(builder, params);
}
builder.endObject();
builder.endObject();
return builder;
}
}

View File

@ -52,6 +52,8 @@ import org.elasticsearch.xpack.esql.EsqlInfoTransportAction;
import org.elasticsearch.xpack.esql.EsqlUsageTransportAction;
import org.elasticsearch.xpack.esql.action.EsqlAsyncGetResultAction;
import org.elasticsearch.xpack.esql.action.EsqlAsyncStopAction;
import org.elasticsearch.xpack.esql.action.EsqlGetQueryAction;
import org.elasticsearch.xpack.esql.action.EsqlListQueriesAction;
import org.elasticsearch.xpack.esql.action.EsqlQueryAction;
import org.elasticsearch.xpack.esql.action.EsqlQueryRequestBuilder;
import org.elasticsearch.xpack.esql.action.EsqlResolveFieldsAction;
@ -59,6 +61,7 @@ import org.elasticsearch.xpack.esql.action.EsqlSearchShardsAction;
import org.elasticsearch.xpack.esql.action.RestEsqlAsyncQueryAction;
import org.elasticsearch.xpack.esql.action.RestEsqlDeleteAsyncResultAction;
import org.elasticsearch.xpack.esql.action.RestEsqlGetAsyncResultAction;
import org.elasticsearch.xpack.esql.action.RestEsqlListQueriesAction;
import org.elasticsearch.xpack.esql.action.RestEsqlQueryAction;
import org.elasticsearch.xpack.esql.action.RestEsqlStopAsyncAction;
import org.elasticsearch.xpack.esql.enrich.EnrichLookupOperator;
@ -227,7 +230,9 @@ public class EsqlPlugin extends Plugin implements ActionPlugin {
new ActionHandler(XPackInfoFeatureAction.ESQL, EsqlInfoTransportAction.class),
new ActionHandler(EsqlResolveFieldsAction.TYPE, EsqlResolveFieldsAction.class),
new ActionHandler(EsqlSearchShardsAction.TYPE, EsqlSearchShardsAction.class),
new ActionHandler(EsqlAsyncStopAction.INSTANCE, TransportEsqlAsyncStopAction.class)
new ActionHandler(EsqlAsyncStopAction.INSTANCE, TransportEsqlAsyncStopAction.class),
new ActionHandler(EsqlListQueriesAction.INSTANCE, TransportEsqlListQueriesAction.class),
new ActionHandler(EsqlGetQueryAction.INSTANCE, TransportEsqlGetQueryAction.class)
);
}
@ -248,7 +253,8 @@ public class EsqlPlugin extends Plugin implements ActionPlugin {
new RestEsqlAsyncQueryAction(),
new RestEsqlGetAsyncResultAction(),
new RestEsqlStopAsyncAction(),
new RestEsqlDeleteAsyncResultAction()
new RestEsqlDeleteAsyncResultAction(),
new RestEsqlListQueriesAction()
);
}

View File

@ -0,0 +1,98 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.esql.plugin;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.get.TransportGetTaskAction;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.TransportListTasksAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.compute.operator.DriverTaskRunner;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.esql.action.EsqlGetQueryAction;
import org.elasticsearch.xpack.esql.action.EsqlGetQueryRequest;
import org.elasticsearch.xpack.esql.action.EsqlQueryAction;
import static org.elasticsearch.xpack.core.ClientHelper.ESQL_ORIGIN;
public class TransportEsqlGetQueryAction extends HandledTransportAction<EsqlGetQueryRequest, EsqlGetQueryResponse> {
private final NodeClient nodeClient;
@Inject
public TransportEsqlGetQueryAction(TransportService transportService, NodeClient nodeClient, ActionFilters actionFilters) {
super(EsqlGetQueryAction.NAME, transportService, actionFilters, EsqlGetQueryRequest::new, EsExecutors.DIRECT_EXECUTOR_SERVICE);
this.nodeClient = nodeClient;
}
@Override
protected void doExecute(Task task, EsqlGetQueryRequest request, ActionListener<EsqlGetQueryResponse> listener) {
ClientHelper.executeAsyncWithOrigin(
nodeClient,
ESQL_ORIGIN,
TransportGetTaskAction.TYPE,
new GetTaskRequest().setTaskId(request.id()),
new ActionListener<>() {
@Override
public void onResponse(GetTaskResponse response) {
TaskInfo task = response.getTask().getTask();
if (task.action().startsWith(EsqlQueryAction.NAME) == false) {
listener.onFailure(new IllegalArgumentException("Task [" + request.id() + "] is not an ESQL query task"));
return;
}
ClientHelper.executeAsyncWithOrigin(
nodeClient,
ESQL_ORIGIN,
TransportListTasksAction.TYPE,
new ListTasksRequest().setDetailed(true)
.setActions(DriverTaskRunner.ACTION_NAME)
.setTargetParentTaskId(request.id()),
new ActionListener<>() {
@Override
public void onResponse(ListTasksResponse response) {
listener.onResponse(new EsqlGetQueryResponse(toDetailedQuery(task, response)));
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
}
);
}
@Override
public void onFailure(Exception e) {
// The underlying root cause is meaningless to the user, but that is what will be shown, so we remove it.
var withoutCause = new Exception(e.getMessage());
listener.onFailure(withoutCause);
}
}
);
}
private static EsqlGetQueryResponse.DetailedQuery toDetailedQuery(TaskInfo task, ListTasksResponse response) {
return new EsqlGetQueryResponse.DetailedQuery(
task.taskId(),
task.startTime(),
task.runningTimeNanos(),
task.description(), // Query
task.node(), // Coordinating node
response.getTasks().stream().map(TaskInfo::node).distinct().toList() // Data nodes
);
}
}

View File

@ -0,0 +1,81 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.esql.plugin;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.TransportListTasksAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.esql.action.EsqlListQueriesAction;
import org.elasticsearch.xpack.esql.action.EsqlListQueriesRequest;
import org.elasticsearch.xpack.esql.action.EsqlQueryAction;
import org.elasticsearch.xpack.esql.core.async.AsyncTaskManagementService;
import java.util.List;
import static org.elasticsearch.xpack.core.ClientHelper.ESQL_ORIGIN;
public class TransportEsqlListQueriesAction extends HandledTransportAction<EsqlListQueriesRequest, EsqlListQueriesResponse> {
private final NodeClient nodeClient;
@Inject
public TransportEsqlListQueriesAction(TransportService transportService, NodeClient nodeClient, ActionFilters actionFilters) {
super(
EsqlListQueriesAction.NAME,
transportService,
actionFilters,
EsqlListQueriesRequest::new,
EsExecutors.DIRECT_EXECUTOR_SERVICE
);
this.nodeClient = nodeClient;
}
@Override
protected void doExecute(Task task, EsqlListQueriesRequest request, ActionListener<EsqlListQueriesResponse> listener) {
ClientHelper.executeAsyncWithOrigin(
nodeClient,
ESQL_ORIGIN,
TransportListTasksAction.TYPE,
new ListTasksRequest().setActions(EsqlQueryAction.NAME, EsqlQueryAction.NAME + AsyncTaskManagementService.ASYNC_ACTION_SUFFIX)
.setDetailed(true),
new ActionListener<>() {
@Override
public void onResponse(ListTasksResponse response) {
List<EsqlListQueriesResponse.Query> queries = response.getTasks()
.stream()
.map(TransportEsqlListQueriesAction::toQuery)
.toList();
listener.onResponse(new EsqlListQueriesResponse(queries));
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
}
);
}
private static EsqlListQueriesResponse.Query toQuery(TaskInfo taskInfo) {
return new EsqlListQueriesResponse.Query(
taskInfo.taskId(),
taskInfo.startTime(),
taskInfo.runningTimeNanos(),
taskInfo.description()
);
}
}

View File

@ -380,7 +380,7 @@ public class TransportEsqlQueryAction extends HandledTransportAction<EsqlQueryRe
id,
type,
action,
request.getDescription(),
request.query(), // Pass the query as the description
parentTaskId,
headers,
originHeaders,

View File

@ -386,6 +386,8 @@ public class Constants {
"cluster:monitor/xpack/enrich/coordinator_stats",
"cluster:monitor/xpack/enrich/stats",
"cluster:monitor/xpack/eql/stats/dist",
"cluster:monitor/xpack/esql/get_query",
"cluster:monitor/xpack/esql/list_queries",
"cluster:monitor/xpack/esql/stats/dist",
"cluster:monitor/xpack/inference/post",
"cluster:monitor/xpack/inference/get",

View File

@ -32,6 +32,7 @@ import static org.elasticsearch.xpack.core.ClientHelper.CONNECTORS_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.DEPRECATION_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.ENRICH_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.ENT_SEARCH_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.ESQL_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.FLEET_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.IDP_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.INDEX_LIFECYCLE_ORIGIN;
@ -164,6 +165,7 @@ public final class AuthorizationUtils {
case ENT_SEARCH_ORIGIN:
case CONNECTORS_ORIGIN:
case INFERENCE_ORIGIN:
case ESQL_ORIGIN:
case TASKS_ORIGIN: // TODO use a more limited user for tasks
securityContext.executeAsInternalUser(InternalUsers.XPACK_USER, version, consumer);
break;

View File

@ -0,0 +1,40 @@
---
setup:
- requires:
test_runner_features: [ capabilities ]
capabilities:
- method: POST
path: /_query
parameters: [ ]
capabilities: [ query_monitoring ]
reason: "uses query monitoring"
- do:
indices.create:
index: test
body:
mappings:
properties:
message1:
type: keyword
---
# Since this feature requires queries in the background, the yaml tests only test edge cases with
# no running queries. The rest are covered by integration tests (See EsqlListQueriesActionIT).
List with no running queries:
- do:
esql.list_queries: { }
- match: { queries: { } }
---
Get with invalid task ID:
- do:
catch: /malformed task id foobar/
esql.get_query:
id: "foobar"
---
Get with non-existent task ID:
- do:
catch: /task \[foobar:1234\] belongs to the node \[foobar\] which isn't part of the cluster and there is no record of the task/
esql.get_query:
id: "foobar:1234"

View File

@ -15,5 +15,5 @@ setup:
# This is fragile - it needs to be updated every time we add a new cluster/index privilege
# I would much prefer we could just check that specific entries are in the array, but we don't have
# an assertion for that
- length: { "cluster" : 62 }
- length: { "cluster" : 63 }
- length: { "index" : 24 }