This change falls back to the old field-caps action if the remote cluster has not been updated to 8.16 or later.
This commit is contained in:
parent
84087bda71
commit
fca2f43756
|
@ -22,6 +22,7 @@ import org.elasticsearch.action.support.ActionFilters;
|
|||
import org.elasticsearch.action.support.ChannelActionListener;
|
||||
import org.elasticsearch.action.support.HandledTransportAction;
|
||||
import org.elasticsearch.action.support.RefCountingRunnable;
|
||||
import org.elasticsearch.client.internal.RemoteClusterClient;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
|
@ -113,23 +114,28 @@ public class TransportFieldCapabilitiesAction extends HandledTransportAction<Fie
|
|||
|
||||
@Override
|
||||
protected void doExecute(Task task, FieldCapabilitiesRequest request, final ActionListener<FieldCapabilitiesResponse> listener) {
|
||||
executeRequest(task, request, REMOTE_TYPE, listener);
|
||||
executeRequest(
|
||||
task,
|
||||
request,
|
||||
(remoteClient, remoteRequest, remoteListener) -> remoteClient.execute(REMOTE_TYPE, remoteRequest, remoteListener),
|
||||
listener
|
||||
);
|
||||
}
|
||||
|
||||
public void executeRequest(
|
||||
Task task,
|
||||
FieldCapabilitiesRequest request,
|
||||
RemoteClusterActionType<FieldCapabilitiesResponse> remoteAction,
|
||||
RemoteRequestExecutor remoteRequestExecutor,
|
||||
ActionListener<FieldCapabilitiesResponse> listener
|
||||
) {
|
||||
// workaround for https://github.com/elastic/elasticsearch/issues/97916 - TODO remove this when we can
|
||||
searchCoordinationExecutor.execute(ActionRunnable.wrap(listener, l -> doExecuteForked(task, request, remoteAction, l)));
|
||||
searchCoordinationExecutor.execute(ActionRunnable.wrap(listener, l -> doExecuteForked(task, request, remoteRequestExecutor, l)));
|
||||
}
|
||||
|
||||
private void doExecuteForked(
|
||||
Task task,
|
||||
FieldCapabilitiesRequest request,
|
||||
RemoteClusterActionType<FieldCapabilitiesResponse> remoteAction,
|
||||
RemoteRequestExecutor remoteRequestExecutor,
|
||||
ActionListener<FieldCapabilitiesResponse> listener
|
||||
) {
|
||||
if (ccsCheckCompatibility) {
|
||||
|
@ -282,8 +288,8 @@ public class TransportFieldCapabilitiesAction extends HandledTransportAction<Fie
|
|||
handleIndexFailure.accept(RemoteClusterAware.buildRemoteIndexName(clusterAlias, index), ex);
|
||||
}
|
||||
});
|
||||
remoteClusterClient.execute(
|
||||
remoteAction,
|
||||
remoteRequestExecutor.executeRemoteRequest(
|
||||
remoteClusterClient,
|
||||
remoteRequest,
|
||||
// The underlying transport service may call onFailure with a thread pool other than search_coordinator.
|
||||
// This fork is a workaround to ensure that the merging of field-caps always occurs on the search_coordinator.
|
||||
|
@ -298,6 +304,14 @@ public class TransportFieldCapabilitiesAction extends HandledTransportAction<Fie
|
|||
}
|
||||
}
|
||||
|
||||
public interface RemoteRequestExecutor {
|
||||
void executeRemoteRequest(
|
||||
RemoteClusterClient remoteClient,
|
||||
FieldCapabilitiesRequest remoteRequest,
|
||||
ActionListener<FieldCapabilitiesResponse> remoteListener
|
||||
);
|
||||
}
|
||||
|
||||
private static void checkIndexBlocks(ClusterState clusterState, String[] concreteIndices) {
|
||||
var blocks = clusterState.blocks();
|
||||
if (blocks.global().isEmpty() && blocks.indices().isEmpty()) {
|
||||
|
|
|
@ -18,8 +18,8 @@ dependencies {
|
|||
}
|
||||
|
||||
def supportedVersion = bwcVersion -> {
|
||||
// ESQL requires its own resolve_fields API
|
||||
return bwcVersion.onOrAfter(Version.fromString("8.16.0"));
|
||||
// CCS in ES|QL available in 8.13
|
||||
return bwcVersion.onOrAfter(Version.fromString("8.13.0"));
|
||||
}
|
||||
|
||||
BuildParams.bwcVersions.withWireCompatible(supportedVersion) { bwcVersion, baseName ->
|
||||
|
|
|
@ -104,7 +104,6 @@ public class MultiClusterSpecIT extends EsqlSpecTestCase {
|
|||
protected void shouldSkipTest(String testName) throws IOException {
|
||||
super.shouldSkipTest(testName);
|
||||
checkCapabilities(remoteClusterClient(), remoteFeaturesService(), testName, testCase);
|
||||
assumeTrue("CCS requires its own resolve_fields API", remoteFeaturesService().clusterHasFeature("esql.resolve_fields_api"));
|
||||
assumeFalse("can't test with _index metadata", hasIndexMetadata(testCase.query));
|
||||
assumeTrue(
|
||||
"Test " + testName + " is skipped on " + Clusters.oldVersion(),
|
||||
|
|
|
@ -67,7 +67,6 @@ public class MultiClustersIT extends ESRestTestCase {
|
|||
|
||||
@Before
|
||||
public void setUpIndices() throws Exception {
|
||||
assumeTrue("CCS requires its own resolve_fields API", remoteFeaturesService().clusterHasFeature("esql.resolve_fields_api"));
|
||||
final String mapping = """
|
||||
"properties": {
|
||||
"data": { "type": "long" },
|
||||
|
|
|
@ -6,6 +6,7 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.esql.action;
|
||||
|
||||
import org.elasticsearch.TransportVersions;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionType;
|
||||
import org.elasticsearch.action.RemoteClusterActionType;
|
||||
|
@ -14,6 +15,7 @@ import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse;
|
|||
import org.elasticsearch.action.fieldcaps.TransportFieldCapabilitiesAction;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.HandledTransportAction;
|
||||
import org.elasticsearch.client.internal.RemoteClusterClient;
|
||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||
import org.elasticsearch.injection.guice.Inject;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
|
@ -27,7 +29,7 @@ import org.elasticsearch.transport.TransportService;
|
|||
public class EsqlResolveFieldsAction extends HandledTransportAction<FieldCapabilitiesRequest, FieldCapabilitiesResponse> {
|
||||
public static final String NAME = "indices:data/read/esql/resolve_fields";
|
||||
public static final ActionType<FieldCapabilitiesResponse> TYPE = new ActionType<>(NAME);
|
||||
public static final RemoteClusterActionType<FieldCapabilitiesResponse> REMOTE_TYPE = new RemoteClusterActionType<>(
|
||||
public static final RemoteClusterActionType<FieldCapabilitiesResponse> RESOLVE_REMOTE_TYPE = new RemoteClusterActionType<>(
|
||||
NAME,
|
||||
FieldCapabilitiesResponse::new
|
||||
);
|
||||
|
@ -47,6 +49,19 @@ public class EsqlResolveFieldsAction extends HandledTransportAction<FieldCapabil
|
|||
|
||||
@Override
|
||||
protected void doExecute(Task task, FieldCapabilitiesRequest request, final ActionListener<FieldCapabilitiesResponse> listener) {
|
||||
fieldCapsAction.executeRequest(task, request, REMOTE_TYPE, listener);
|
||||
fieldCapsAction.executeRequest(task, request, this::executeRemoteRequest, listener);
|
||||
}
|
||||
|
||||
void executeRemoteRequest(
|
||||
RemoteClusterClient remoteClient,
|
||||
FieldCapabilitiesRequest remoteRequest,
|
||||
ActionListener<FieldCapabilitiesResponse> remoteListener
|
||||
) {
|
||||
remoteClient.getConnection(remoteRequest, remoteListener.delegateFailure((l, conn) -> {
|
||||
var remoteAction = conn.getTransportVersion().onOrAfter(TransportVersions.ESQL_ORIGINAL_INDICES)
|
||||
? RESOLVE_REMOTE_TYPE
|
||||
: TransportFieldCapabilitiesAction.REMOTE_TYPE;
|
||||
remoteClient.execute(conn, remoteAction, remoteRequest, l);
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue