Record project deletions in ProjectStateRegistry (#130225)

Project deletions is used to update the Stateless lease blob, as a project deletion 
is a notable event that that our stateless cluster consistency check should consider 
before acknowledging writes. 

Relates ES-11207
This commit is contained in:
Pooya Salehi 2025-07-02 20:08:56 +02:00 committed by GitHub
parent 89f701f4c4
commit 54c9db9a41
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 226 additions and 7 deletions

View File

@ -326,6 +326,7 @@ public class TransportVersions {
public static final TransportVersion ML_INFERENCE_COHERE_API_VERSION = def(9_110_0_00); public static final TransportVersion ML_INFERENCE_COHERE_API_VERSION = def(9_110_0_00);
public static final TransportVersion ESQL_PROFILE_INCLUDE_PLAN = def(9_111_0_00); public static final TransportVersion ESQL_PROFILE_INCLUDE_PLAN = def(9_111_0_00);
public static final TransportVersion MAPPINGS_IN_DATA_STREAMS = def(9_112_0_00); public static final TransportVersion MAPPINGS_IN_DATA_STREAMS = def(9_112_0_00);
public static final TransportVersion PROJECT_STATE_REGISTRY_RECORDS_DELETIONS = def(9_113_0_00);
public static final TransportVersion ESQL_SERIALIZE_TIMESERIES_FIELD_TYPE = def(9_113_0_00); public static final TransportVersion ESQL_SERIALIZE_TIMESERIES_FIELD_TYPE = def(9_113_0_00);
/* /*

View File

@ -20,28 +20,49 @@ import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.xcontent.ToXContent; import org.elasticsearch.xcontent.ToXContent;
import java.io.IOException; import java.io.IOException;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import java.util.Set;
/** /**
* Represents a registry for managing and retrieving project-specific state in the cluster state. * Represents a registry for managing and retrieving project-specific state in the cluster state.
*/ */
public class ProjectStateRegistry extends AbstractNamedDiffable<ClusterState.Custom> implements ClusterState.Custom { public class ProjectStateRegistry extends AbstractNamedDiffable<ClusterState.Custom> implements ClusterState.Custom {
public static final String TYPE = "projects_registry"; public static final String TYPE = "projects_registry";
public static final ProjectStateRegistry EMPTY = new ProjectStateRegistry(Collections.emptyMap()); public static final ProjectStateRegistry EMPTY = new ProjectStateRegistry(Collections.emptyMap(), Collections.emptySet(), 0);
private final Map<ProjectId, Settings> projectsSettings; private final Map<ProjectId, Settings> projectsSettings;
// Projects that have been marked for deletion based on their file-based setting
private final Set<ProjectId> projectsMarkedForDeletion;
// A counter that is incremented each time one or more projects are marked for deletion.
private final long projectsMarkedForDeletionGeneration;
public ProjectStateRegistry(StreamInput in) throws IOException { public ProjectStateRegistry(StreamInput in) throws IOException {
projectsSettings = in.readMap(ProjectId::readFrom, Settings::readSettingsFromStream); projectsSettings = in.readMap(ProjectId::readFrom, Settings::readSettingsFromStream);
if (in.getTransportVersion().onOrAfter(TransportVersions.PROJECT_STATE_REGISTRY_RECORDS_DELETIONS)) {
projectsMarkedForDeletion = in.readCollectionAsImmutableSet(ProjectId::readFrom);
projectsMarkedForDeletionGeneration = in.readVLong();
} else {
projectsMarkedForDeletion = Collections.emptySet();
projectsMarkedForDeletionGeneration = 0;
}
} }
private ProjectStateRegistry(Map<ProjectId, Settings> projectsSettings) { private ProjectStateRegistry(
Map<ProjectId, Settings> projectsSettings,
Set<ProjectId> projectsMarkedForDeletion,
long projectsMarkedForDeletionGeneration
) {
this.projectsSettings = projectsSettings; this.projectsSettings = projectsSettings;
this.projectsMarkedForDeletion = projectsMarkedForDeletion;
this.projectsMarkedForDeletionGeneration = projectsMarkedForDeletionGeneration;
} }
/** /**
@ -72,9 +93,11 @@ public class ProjectStateRegistry extends AbstractNamedDiffable<ClusterState.Cus
builder.startObject("settings"); builder.startObject("settings");
entry.getValue().toXContent(builder, new ToXContent.MapParams(Collections.singletonMap("flat_settings", "true"))); entry.getValue().toXContent(builder, new ToXContent.MapParams(Collections.singletonMap("flat_settings", "true")));
builder.endObject(); builder.endObject();
builder.field("marked_for_deletion", projectsMarkedForDeletion.contains(entry.getKey()));
return builder.endObject(); return builder.endObject();
}), }),
Iterators.single((builder, p) -> builder.endArray()) Iterators.single((builder, p) -> builder.endArray()),
Iterators.single((builder, p) -> builder.field("projects_marked_for_deletion_generation", projectsMarkedForDeletionGeneration))
); );
} }
@ -95,12 +118,44 @@ public class ProjectStateRegistry extends AbstractNamedDiffable<ClusterState.Cus
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
out.writeMap(projectsSettings); out.writeMap(projectsSettings);
if (out.getTransportVersion().onOrAfter(TransportVersions.PROJECT_STATE_REGISTRY_RECORDS_DELETIONS)) {
out.writeCollection(projectsMarkedForDeletion);
out.writeVLong(projectsMarkedForDeletionGeneration);
} else {
// There should be no deletion unless all MP nodes are at or after PROJECT_STATE_REGISTRY_RECORDS_DELETIONS
assert projectsMarkedForDeletion.isEmpty();
assert projectsMarkedForDeletionGeneration == 0;
}
} }
public int size() { public int size() {
return projectsSettings.size(); return projectsSettings.size();
} }
public long getProjectsMarkedForDeletionGeneration() {
return projectsMarkedForDeletionGeneration;
}
// visible for testing
Map<ProjectId, Settings> getProjectsSettings() {
return Collections.unmodifiableMap(projectsSettings);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o instanceof ProjectStateRegistry == false) return false;
ProjectStateRegistry that = (ProjectStateRegistry) o;
return projectsMarkedForDeletionGeneration == that.projectsMarkedForDeletionGeneration
&& Objects.equals(projectsSettings, that.projectsSettings)
&& Objects.equals(projectsMarkedForDeletion, that.projectsMarkedForDeletion);
}
@Override
public int hashCode() {
return Objects.hash(projectsSettings, projectsMarkedForDeletion, projectsMarkedForDeletionGeneration);
}
public static Builder builder(ClusterState original) { public static Builder builder(ClusterState original) {
ProjectStateRegistry projectRegistry = original.custom(TYPE, EMPTY); ProjectStateRegistry projectRegistry = original.custom(TYPE, EMPTY);
return builder(projectRegistry); return builder(projectRegistry);
@ -116,13 +171,20 @@ public class ProjectStateRegistry extends AbstractNamedDiffable<ClusterState.Cus
public static class Builder { public static class Builder {
private final ImmutableOpenMap.Builder<ProjectId, Settings> projectsSettings; private final ImmutableOpenMap.Builder<ProjectId, Settings> projectsSettings;
private final Set<ProjectId> projectsMarkedForDeletion;
private final long projectsMarkedForDeletionGeneration;
private boolean newProjectMarkedForDeletion = false;
private Builder() { private Builder() {
this.projectsSettings = ImmutableOpenMap.builder(); this.projectsSettings = ImmutableOpenMap.builder();
projectsMarkedForDeletion = new HashSet<>();
projectsMarkedForDeletionGeneration = 0;
} }
private Builder(ProjectStateRegistry original) { private Builder(ProjectStateRegistry original) {
this.projectsSettings = ImmutableOpenMap.builder(original.projectsSettings); this.projectsSettings = ImmutableOpenMap.builder(original.projectsSettings);
this.projectsMarkedForDeletion = new HashSet<>(original.projectsMarkedForDeletion);
this.projectsMarkedForDeletionGeneration = original.projectsMarkedForDeletionGeneration;
} }
public Builder putProjectSettings(ProjectId projectId, Settings settings) { public Builder putProjectSettings(ProjectId projectId, Settings settings) {
@ -130,8 +192,25 @@ public class ProjectStateRegistry extends AbstractNamedDiffable<ClusterState.Cus
return this; return this;
} }
public Builder markProjectForDeletion(ProjectId projectId) {
if (projectsMarkedForDeletion.add(projectId)) {
newProjectMarkedForDeletion = true;
}
return this;
}
public ProjectStateRegistry build() { public ProjectStateRegistry build() {
return new ProjectStateRegistry(projectsSettings.build()); final var unknownButUnderDeletion = Sets.difference(projectsMarkedForDeletion, projectsSettings.keys());
if (unknownButUnderDeletion.isEmpty() == false) {
throw new IllegalArgumentException(
"Cannot mark projects for deletion that are not in the registry: " + unknownButUnderDeletion
);
}
return new ProjectStateRegistry(
projectsSettings.build(),
projectsMarkedForDeletion,
newProjectMarkedForDeletion ? projectsMarkedForDeletionGeneration + 1 : projectsMarkedForDeletionGeneration
);
} }
} }
} }

View File

@ -809,9 +809,11 @@ public class ClusterStateTests extends ESTestCase {
"settings": { "settings": {
"project.setting": "42", "project.setting": "42",
"project.setting2": "43" "project.setting2": "43"
},
"marked_for_deletion": true
} }
} ],
] "projects_marked_for_deletion_generation": 1
} }
} }
""", """,
@ -927,6 +929,7 @@ public class ClusterStateTests extends ESTestCase {
projectId1, projectId1,
Settings.builder().put(PROJECT_SETTING.getKey(), 42).put(PROJECT_SETTING2.getKey(), 43).build() Settings.builder().put(PROJECT_SETTING.getKey(), 42).put(PROJECT_SETTING2.getKey(), 43).build()
) )
.markProjectForDeletion(projectId1)
.build() .build()
) )
.blocks( .blocks(
@ -2226,7 +2229,7 @@ public class ClusterStateTests extends ESTestCase {
} else if (custom instanceof SnapshotsInProgress snapshotsInProgress) { } else if (custom instanceof SnapshotsInProgress snapshotsInProgress) {
chunkCount += 2 + snapshotsInProgress.asStream().count(); chunkCount += 2 + snapshotsInProgress.asStream().count();
} else if (custom instanceof ProjectStateRegistry projectStateRegistry) { } else if (custom instanceof ProjectStateRegistry projectStateRegistry) {
chunkCount += 2 + projectStateRegistry.size(); chunkCount += 3 + projectStateRegistry.size();
} else { } else {
// could be anything, we have to just try it // could be anything, we have to just try it
chunkCount += Iterables.size( chunkCount += Iterables.size(

View File

@ -0,0 +1,89 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
package org.elasticsearch.cluster.project;
import org.elasticsearch.cluster.ClusterModule;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.Diff;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.SimpleDiffableWireSerializationTestCase;
import java.io.IOException;
import java.util.stream.IntStream;
public class ProjectStateRegistrySerializationTests extends SimpleDiffableWireSerializationTestCase<ClusterState.Custom> {
@Override
protected ClusterState.Custom makeTestChanges(ClusterState.Custom testInstance) {
return mutate((ProjectStateRegistry) testInstance);
}
@Override
protected Writeable.Reader<Diff<ClusterState.Custom>> diffReader() {
return ProjectStateRegistry::readDiffFrom;
}
@Override
protected NamedWriteableRegistry getNamedWriteableRegistry() {
return new NamedWriteableRegistry(ClusterModule.getNamedWriteables());
}
@Override
protected Writeable.Reader<ClusterState.Custom> instanceReader() {
return ProjectStateRegistry::new;
}
@Override
protected ClusterState.Custom createTestInstance() {
return randomProjectStateRegistry();
}
@Override
protected ClusterState.Custom mutateInstance(ClusterState.Custom instance) throws IOException {
return mutate((ProjectStateRegistry) instance);
}
private ProjectStateRegistry mutate(ProjectStateRegistry instance) {
if (randomBoolean() && instance.size() > 0) {
// Remove or mutate a project's settings or deletion flag
var projectId = randomFrom(instance.getProjectsSettings().keySet());
var builder = ProjectStateRegistry.builder(instance);
builder.putProjectSettings(projectId, randomSettings());
if (randomBoolean()) {
// mark for deletion
builder.markProjectForDeletion(projectId);
}
return builder.build();
} else {
// add a new project
return ProjectStateRegistry.builder(instance).putProjectSettings(randomUniqueProjectId(), randomSettings()).build();
}
}
private static ProjectStateRegistry randomProjectStateRegistry() {
final var projects = randomSet(1, 5, ESTestCase::randomUniqueProjectId);
final var projectsUnderDeletion = randomSet(0, 5, ESTestCase::randomUniqueProjectId);
var builder = ProjectStateRegistry.builder();
projects.forEach(projectId -> builder.putProjectSettings(projectId, randomSettings()));
projectsUnderDeletion.forEach(
projectId -> builder.putProjectSettings(projectId, randomSettings()).markProjectForDeletion(projectId)
);
return builder.build();
}
public static Settings randomSettings() {
var builder = Settings.builder();
IntStream.range(0, randomIntBetween(1, 5)).forEach(i -> builder.put(randomIdentifier(), randomIdentifier()));
return builder.build();
}
}

View File

@ -0,0 +1,47 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
package org.elasticsearch.cluster.project;
import org.elasticsearch.test.ESTestCase;
import org.hamcrest.Matchers;
import static org.elasticsearch.cluster.project.ProjectStateRegistrySerializationTests.randomSettings;
public class ProjectStateRegistryTests extends ESTestCase {
public void testBuilder() {
final var projects = randomSet(1, 5, ESTestCase::randomUniqueProjectId);
final var projectsUnderDeletion = randomSet(0, 5, ESTestCase::randomUniqueProjectId);
var builder = ProjectStateRegistry.builder();
projects.forEach(projectId -> builder.putProjectSettings(projectId, randomSettings()));
projectsUnderDeletion.forEach(
projectId -> builder.putProjectSettings(projectId, randomSettings()).markProjectForDeletion(projectId)
);
var projectStateRegistry = builder.build();
var gen1 = projectStateRegistry.getProjectsMarkedForDeletionGeneration();
assertThat(gen1, Matchers.equalTo(projectsUnderDeletion.isEmpty() ? 0L : 1L));
projectStateRegistry = ProjectStateRegistry.builder(projectStateRegistry).markProjectForDeletion(randomFrom(projects)).build();
var gen2 = projectStateRegistry.getProjectsMarkedForDeletionGeneration();
assertThat(gen2, Matchers.equalTo(gen1 + 1));
if (projectsUnderDeletion.isEmpty() == false) {
// re-adding the same projectId should not change the generation
projectStateRegistry = ProjectStateRegistry.builder(projectStateRegistry)
.markProjectForDeletion(randomFrom(projectsUnderDeletion))
.build();
assertThat(projectStateRegistry.getProjectsMarkedForDeletionGeneration(), Matchers.equalTo(gen2));
}
var unknownProjectId = randomUniqueProjectId();
var throwingBuilder = ProjectStateRegistry.builder(projectStateRegistry).markProjectForDeletion(unknownProjectId);
assertThrows(IllegalArgumentException.class, throwingBuilder::build);
}
}