Snapshots support multi-project (#130000)

This PR makes snapshot service code and APIs multi-project compatible. 

Resolves: ES-10225 Resolves: ES-10226
This commit is contained in:
Yang Wang 2025-07-03 18:48:44 +10:00 committed by GitHub
parent ebec2c31ee
commit f15ef7c2ed
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
57 changed files with 806 additions and 506 deletions

View File

@ -15,6 +15,7 @@ import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.DataStreamTestHelper;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
@ -138,8 +139,9 @@ public class TransportDeleteDataStreamActionTests extends ESTestCase {
List.of(new Tuple<>(dataStreamName, 2), new Tuple<>(dataStreamName2, 2)),
otherIndices
);
SnapshotsInProgress snapshotsInProgress = SnapshotsInProgress.EMPTY.withAddedEntry(createEntry(dataStreamName, "repo1", false))
.withAddedEntry(createEntry(dataStreamName2, "repo2", true));
SnapshotsInProgress snapshotsInProgress = SnapshotsInProgress.EMPTY.withAddedEntry(
createEntry(dataStreamName, projectId, "repo1", false)
).withAddedEntry(createEntry(dataStreamName2, projectId, "repo2", true));
ClusterState snapshotCs = ClusterState.builder(cs).putCustom(SnapshotsInProgress.TYPE, snapshotsInProgress).build();
DeleteDataStreamAction.Request req = new DeleteDataStreamAction.Request(TEST_REQUEST_TIMEOUT, new String[] { dataStreamName });
@ -157,9 +159,9 @@ public class TransportDeleteDataStreamActionTests extends ESTestCase {
);
}
private SnapshotsInProgress.Entry createEntry(String dataStreamName, String repo, boolean partial) {
private SnapshotsInProgress.Entry createEntry(String dataStreamName, ProjectId projectId, String repo, boolean partial) {
return SnapshotsInProgress.Entry.snapshot(
new Snapshot(repo, new SnapshotId("", "")),
new Snapshot(projectId, repo, new SnapshotId("", "")),
false,
partial,
SnapshotsInProgress.State.SUCCESS,

View File

@ -391,6 +391,7 @@ class S3Repository extends MeteredBlobStoreRepository {
if (SnapshotsService.useShardGenerations(finalizeSnapshotContext.repositoryMetaVersion()) == false) {
final ListenableFuture<Void> metadataDone = new ListenableFuture<>();
wrappedFinalizeContext = new FinalizeSnapshotContext(
finalizeSnapshotContext.serializeProjectMetadata(),
finalizeSnapshotContext.updatedShardGenerations(),
finalizeSnapshotContext.repositoryStateId(),
finalizeSnapshotContext.clusterMetadata(),

View File

@ -890,7 +890,7 @@ public class CloneSnapshotIT extends AbstractSnapshotIntegTestCase {
ShardGeneration generation
) throws IOException {
return BlobStoreRepository.INDEX_SHARD_SNAPSHOTS_FORMAT.read(
repository.getMetadata().name(),
repository.getProjectRepo(),
repository.shardContainer(repositoryShardId.index(), repositoryShardId.shardId()),
generation.getGenerationUUID(),
NamedXContentRegistry.EMPTY

View File

@ -820,7 +820,8 @@ public class CorruptedBlobStoreRepositoryIT extends AbstractSnapshotIntegTestCas
"fallback message",
"org.elasticsearch.repositories.blobstore.BlobStoreRepository",
Level.ERROR,
"index [test-idx-1/*] shard generation [*] in [test-repo][*] not found - falling back to reading all shard snapshots"
"index [test-idx-1/*] shard generation [*] in [default/test-repo][*] not found "
+ "- falling back to reading all shard snapshots"
)
);
mockLog.addExpectation(
@ -828,7 +829,7 @@ public class CorruptedBlobStoreRepositoryIT extends AbstractSnapshotIntegTestCas
"shard blobs list",
"org.elasticsearch.repositories.blobstore.BlobStoreRepository",
Level.ERROR,
"read shard snapshots [*] due to missing shard generation [*] for index [test-idx-1/*] in [test-repo][*]"
"read shard snapshots [*] due to missing shard generation [*] for index [test-idx-1/*] in [default/test-repo][*]"
)
);
if (repairWithDelete) {

View File

@ -189,9 +189,9 @@ public class MetadataLoadingDuringSnapshotRestoreIT extends AbstractSnapshotInte
}
@Override
public Metadata getSnapshotGlobalMetadata(SnapshotId snapshotId) {
public Metadata getSnapshotGlobalMetadata(SnapshotId snapshotId, boolean fromProjectMetadata) {
globalMetadata.computeIfAbsent(snapshotId.getName(), (s) -> new AtomicInteger(0)).incrementAndGet();
return super.getSnapshotGlobalMetadata(snapshotId);
return super.getSnapshotGlobalMetadata(snapshotId, fromProjectMetadata);
}
@Override

View File

@ -141,7 +141,7 @@ public class SnapshotThrottlingIT extends AbstractSnapshotIntegTestCase {
"snapshot speed over recovery speed",
"org.elasticsearch.repositories.blobstore.BlobStoreRepository",
Level.WARN,
"repository [test-repo] has a rate limit [max_snapshot_bytes_per_sec=1gb] per second which is above "
"repository [default/test-repo] has a rate limit [max_snapshot_bytes_per_sec=1gb] per second which is above "
+ "the effective recovery rate limit [indices.recovery.max_bytes_per_sec=100mb] per second, thus the repository "
+ "rate limit will be superseded by the recovery rate limit"
);
@ -152,7 +152,7 @@ public class SnapshotThrottlingIT extends AbstractSnapshotIntegTestCase {
"snapshot restore speed over recovery speed",
"org.elasticsearch.repositories.blobstore.BlobStoreRepository",
Level.WARN,
"repository [test-repo] has a rate limit [max_restore_bytes_per_sec=2gb] per second which is above "
"repository [default/test-repo] has a rate limit [max_restore_bytes_per_sec=2gb] per second which is above "
+ "the effective recovery rate limit [indices.recovery.max_bytes_per_sec=100mb] per second, thus the repository "
+ "rate limit will be superseded by the recovery rate limit"
);

View File

@ -16,6 +16,7 @@ import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.SnapshotDeletionsInProgress;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.snapshots.mockstore.MockRepository;
@ -43,7 +44,7 @@ public class SnapshotsServiceIT extends AbstractSnapshotIntegTestCase {
"[does-not-exist]",
SnapshotsService.class.getName(),
Level.INFO,
"deleting snapshots [does-not-exist] from repository [test-repo]"
"deleting snapshots [does-not-exist] from repository [default/test-repo]"
)
);
@ -52,7 +53,7 @@ public class SnapshotsServiceIT extends AbstractSnapshotIntegTestCase {
"[deleting test-snapshot]",
SnapshotsService.class.getName(),
Level.INFO,
"deleting snapshots [test-snapshot] from repository [test-repo]"
"deleting snapshots [test-snapshot] from repository [default/test-repo]"
)
);
@ -61,7 +62,7 @@ public class SnapshotsServiceIT extends AbstractSnapshotIntegTestCase {
"[test-snapshot deleted]",
SnapshotsService.class.getName(),
Level.INFO,
"snapshots [test-snapshot/*] deleted"
"snapshots [test-snapshot/*] deleted in repository [default/test-repo]"
)
);
@ -90,7 +91,7 @@ public class SnapshotsServiceIT extends AbstractSnapshotIntegTestCase {
"[test-snapshot]",
SnapshotsService.class.getName(),
Level.WARN,
"failed to complete snapshot deletion for [test-snapshot] from repository [test-repo]"
"failed to complete snapshot deletion for [test-snapshot] from repository [default/test-repo]"
)
);
@ -176,10 +177,10 @@ public class SnapshotsServiceIT extends AbstractSnapshotIntegTestCase {
return false;
}
if (deleteHasStarted.get() == false) {
deleteHasStarted.set(deletionsInProgress.hasExecutingDeletion(repositoryName));
deleteHasStarted.set(deletionsInProgress.hasExecutingDeletion(ProjectId.DEFAULT, repositoryName));
return false;
} else {
return deletionsInProgress.hasExecutingDeletion(repositoryName) == false;
return deletionsInProgress.hasExecutingDeletion(ProjectId.DEFAULT, repositoryName) == false;
}
});
}

View File

@ -22,12 +22,13 @@ import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.blobstore.DeleteResult;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ListenableFuture;
import org.elasticsearch.core.FixForMultiProject;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.injection.guice.Inject;
@ -66,6 +67,7 @@ public final class TransportCleanupRepositoryAction extends TransportMasterNodeA
private static final Logger logger = LogManager.getLogger(TransportCleanupRepositoryAction.class);
private final RepositoriesService repositoriesService;
private final ProjectResolver projectResolver;
@Inject
public TransportCleanupRepositoryAction(
@ -73,7 +75,8 @@ public final class TransportCleanupRepositoryAction extends TransportMasterNodeA
ClusterService clusterService,
RepositoriesService repositoriesService,
ThreadPool threadPool,
ActionFilters actionFilters
ActionFilters actionFilters,
ProjectResolver projectResolver
) {
super(
TYPE.name(),
@ -86,6 +89,7 @@ public final class TransportCleanupRepositoryAction extends TransportMasterNodeA
EsExecutors.DIRECT_EXECUTOR_SERVICE
);
this.repositoriesService = repositoriesService;
this.projectResolver = projectResolver;
// We add a state applier that will remove any dangling repository cleanup actions on master failover.
// This is safe to do since cleanups will increment the repository state id before executing any operations to prevent concurrent
// operations from corrupting the repository. This is the same safety mechanism used by snapshot deletes.
@ -134,22 +138,23 @@ public final class TransportCleanupRepositoryAction extends TransportMasterNodeA
ClusterState state,
ActionListener<CleanupRepositoryResponse> listener
) {
cleanupRepo(request.name(), listener.map(CleanupRepositoryResponse::new));
cleanupRepo(projectResolver.getProjectId(), request.name(), listener.map(CleanupRepositoryResponse::new));
}
@Override
protected ClusterBlockException checkBlock(CleanupRepositoryRequest request, ClusterState state) {
// Cluster is not affected but we look up repositories in metadata
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
return state.blocks().globalBlockedException(projectResolver.getProjectId(), ClusterBlockLevel.METADATA_READ);
}
/**
* Runs cleanup operations on the given repository.
* @param projectId Project for the repository
* @param repositoryName Repository to clean up
* @param listener Listener for cleanup result
*/
private void cleanupRepo(String repositoryName, ActionListener<RepositoryCleanupResult> listener) {
final Repository repository = repositoriesService.repository(repositoryName);
private void cleanupRepo(ProjectId projectId, String repositoryName, ActionListener<RepositoryCleanupResult> listener) {
final Repository repository = repositoriesService.repository(projectId, repositoryName);
if (repository instanceof BlobStoreRepository == false) {
listener.onFailure(new IllegalArgumentException("Repository [" + repositoryName + "] does not support repository cleanup"));
return;
@ -172,8 +177,10 @@ public final class TransportCleanupRepositoryAction extends TransportMasterNodeA
@Override
public ClusterState execute(ClusterState currentState) {
SnapshotsService.ensureRepositoryExists(repositoryName, currentState);
SnapshotsService.ensureNotReadOnly(currentState, repositoryName);
final ProjectMetadata projectMetadata = currentState.metadata().getProject(projectId);
SnapshotsService.ensureRepositoryExists(repositoryName, projectMetadata);
SnapshotsService.ensureNotReadOnly(projectMetadata, repositoryName);
// Repository cleanup is intentionally cluster wide exclusive
final RepositoryCleanupInProgress repositoryCleanupInProgress = RepositoryCleanupInProgress.get(currentState);
if (repositoryCleanupInProgress.hasCleanupInProgress()) {
throw new IllegalStateException(
@ -200,8 +207,6 @@ public final class TransportCleanupRepositoryAction extends TransportMasterNodeA
"Cannot cleanup [" + repositoryName + "] - a snapshot is currently running in [" + snapshots + "]"
);
}
@FixForMultiProject
final var projectId = ProjectId.DEFAULT;
return ClusterState.builder(currentState)
.putCustom(
RepositoryCleanupInProgress.TYPE,

View File

@ -17,6 +17,7 @@ import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAc
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.injection.guice.Inject;
@ -32,6 +33,7 @@ public final class TransportCloneSnapshotAction extends AcknowledgedTransportMas
public static final ActionType<AcknowledgedResponse> TYPE = new ActionType<>("cluster:admin/snapshot/clone");
private final SnapshotsService snapshotsService;
private final ProjectResolver projectResolver;
@Inject
public TransportCloneSnapshotAction(
@ -39,7 +41,8 @@ public final class TransportCloneSnapshotAction extends AcknowledgedTransportMas
ClusterService clusterService,
ThreadPool threadPool,
SnapshotsService snapshotsService,
ActionFilters actionFilters
ActionFilters actionFilters,
ProjectResolver projectResolver
) {
super(
TYPE.name(),
@ -51,12 +54,13 @@ public final class TransportCloneSnapshotAction extends AcknowledgedTransportMas
EsExecutors.DIRECT_EXECUTOR_SERVICE
);
this.snapshotsService = snapshotsService;
this.projectResolver = projectResolver;
}
@Override
protected ClusterBlockException checkBlock(CloneSnapshotRequest request, ClusterState state) {
// Cluster is not affected but we look up repositories in metadata
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
return state.blocks().globalBlockedException(projectResolver.getProjectId(), ClusterBlockLevel.METADATA_READ);
}
@Override
@ -66,6 +70,6 @@ public final class TransportCloneSnapshotAction extends AcknowledgedTransportMas
ClusterState state,
final ActionListener<AcknowledgedResponse> listener
) {
snapshotsService.cloneSnapshot(request, listener.map(v -> AcknowledgedResponse.TRUE));
snapshotsService.cloneSnapshot(projectResolver.getProjectId(), request, listener.map(v -> AcknowledgedResponse.TRUE));
}
}

View File

@ -16,6 +16,7 @@ import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.injection.guice.Inject;
@ -31,6 +32,7 @@ import org.elasticsearch.transport.TransportService;
public class TransportCreateSnapshotAction extends TransportMasterNodeAction<CreateSnapshotRequest, CreateSnapshotResponse> {
public static final ActionType<CreateSnapshotResponse> TYPE = new ActionType<>("cluster:admin/snapshot/create");
private final SnapshotsService snapshotsService;
private final ProjectResolver projectResolver;
@Inject
public TransportCreateSnapshotAction(
@ -38,7 +40,8 @@ public class TransportCreateSnapshotAction extends TransportMasterNodeAction<Cre
ClusterService clusterService,
ThreadPool threadPool,
SnapshotsService snapshotsService,
ActionFilters actionFilters
ActionFilters actionFilters,
ProjectResolver projectResolver
) {
super(
TYPE.name(),
@ -51,12 +54,13 @@ public class TransportCreateSnapshotAction extends TransportMasterNodeAction<Cre
EsExecutors.DIRECT_EXECUTOR_SERVICE
);
this.snapshotsService = snapshotsService;
this.projectResolver = projectResolver;
}
@Override
protected ClusterBlockException checkBlock(CreateSnapshotRequest request, ClusterState state) {
// We only check metadata block, as we want to snapshot closed indices (which have a read block)
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
return state.blocks().globalBlockedException(projectResolver.getProjectId(), ClusterBlockLevel.METADATA_READ);
}
@Override
@ -67,9 +71,13 @@ public class TransportCreateSnapshotAction extends TransportMasterNodeAction<Cre
final ActionListener<CreateSnapshotResponse> listener
) {
if (request.waitForCompletion()) {
snapshotsService.executeSnapshot(request, listener.map(CreateSnapshotResponse::new));
snapshotsService.executeSnapshot(projectResolver.getProjectId(), request, listener.map(CreateSnapshotResponse::new));
} else {
snapshotsService.createSnapshot(request, listener.map(snapshot -> new CreateSnapshotResponse((SnapshotInfo) null)));
snapshotsService.createSnapshot(
projectResolver.getProjectId(),
request,
listener.map(snapshot -> new CreateSnapshotResponse((SnapshotInfo) null))
);
}
}
}

View File

@ -18,6 +18,7 @@ import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAc
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.injection.guice.Inject;
@ -32,6 +33,7 @@ import org.elasticsearch.transport.TransportService;
public class TransportDeleteSnapshotAction extends AcknowledgedTransportMasterNodeAction<DeleteSnapshotRequest> {
public static final ActionType<AcknowledgedResponse> TYPE = new ActionType<>("cluster:admin/snapshot/delete");
private final SnapshotsService snapshotsService;
private final ProjectResolver projectResolver;
@Inject
public TransportDeleteSnapshotAction(
@ -39,7 +41,8 @@ public class TransportDeleteSnapshotAction extends AcknowledgedTransportMasterNo
ClusterService clusterService,
ThreadPool threadPool,
SnapshotsService snapshotsService,
ActionFilters actionFilters
ActionFilters actionFilters,
ProjectResolver projectResolver
) {
super(
TYPE.name(),
@ -51,12 +54,13 @@ public class TransportDeleteSnapshotAction extends AcknowledgedTransportMasterNo
EsExecutors.DIRECT_EXECUTOR_SERVICE
);
this.snapshotsService = snapshotsService;
this.projectResolver = projectResolver;
}
@Override
protected ClusterBlockException checkBlock(DeleteSnapshotRequest request, ClusterState state) {
// Cluster is not affected but we look up repositories in metadata
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
return state.blocks().globalBlockedException(projectResolver.getProjectId(), ClusterBlockLevel.METADATA_READ);
}
@Override
@ -74,6 +78,6 @@ public class TransportDeleteSnapshotAction extends AcknowledgedTransportMasterNo
ClusterState state,
final ActionListener<AcknowledgedResponse> listener
) {
snapshotsService.deleteSnapshots(request, listener.map(v -> AcknowledgedResponse.TRUE));
snapshotsService.deleteSnapshots(projectResolver.getProjectId(), request, listener.map(v -> AcknowledgedResponse.TRUE));
}
}

View File

@ -19,7 +19,9 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Iterators;
@ -78,6 +80,7 @@ public class TransportGetSnapshotsAction extends TransportMasterNodeAction<GetSn
private static final Logger logger = LogManager.getLogger(TransportGetSnapshotsAction.class);
private final RepositoriesService repositoriesService;
private final ProjectResolver projectResolver;
/*
* [NOTE ON THREADING]
@ -115,7 +118,8 @@ public class TransportGetSnapshotsAction extends TransportMasterNodeAction<GetSn
ClusterService clusterService,
ThreadPool threadPool,
RepositoriesService repositoriesService,
ActionFilters actionFilters
ActionFilters actionFilters,
ProjectResolver projectResolver
) {
super(
TYPE.name(),
@ -128,11 +132,12 @@ public class TransportGetSnapshotsAction extends TransportMasterNodeAction<GetSn
threadPool.executor(ThreadPool.Names.MANAGEMENT) // see [NOTE ON THREADING]
);
this.repositoriesService = repositoriesService;
this.projectResolver = projectResolver;
}
@Override
protected ClusterBlockException checkBlock(GetSnapshotsRequest request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
return state.blocks().globalBlockedException(projectResolver.getProjectId(), ClusterBlockLevel.METADATA_READ);
}
@Override
@ -144,13 +149,15 @@ public class TransportGetSnapshotsAction extends TransportMasterNodeAction<GetSn
) {
assert task instanceof CancellableTask : task + " not cancellable";
final var resolvedRepositories = ResolvedRepositories.resolve(state.metadata().getProject(), request.repositories());
final var projectId = projectResolver.getProjectId();
final var resolvedRepositories = ResolvedRepositories.resolve(state.metadata().getProject(projectId), request.repositories());
if (resolvedRepositories.hasMissingRepositories()) {
throw new RepositoryMissingException(String.join(", ", resolvedRepositories.missing()));
}
new GetSnapshotsOperation(
(CancellableTask) task,
projectId,
resolvedRepositories.repositoryMetadata(),
request.snapshots(),
request.ignoreUnavailable(),
@ -178,6 +185,7 @@ public class TransportGetSnapshotsAction extends TransportMasterNodeAction<GetSn
private class GetSnapshotsOperation {
private final CancellableTask cancellableTask;
private final ProjectId projectId;
// repositories
private final List<RepositoryMetadata> repositories;
@ -217,6 +225,7 @@ public class TransportGetSnapshotsAction extends TransportMasterNodeAction<GetSn
GetSnapshotsOperation(
CancellableTask cancellableTask,
ProjectId projectId,
List<RepositoryMetadata> repositories,
String[] snapshots,
boolean ignoreUnavailable,
@ -233,6 +242,7 @@ public class TransportGetSnapshotsAction extends TransportMasterNodeAction<GetSn
EnumSet<SnapshotState> states
) {
this.cancellableTask = cancellableTask;
this.projectId = projectId;
this.repositories = repositories;
this.ignoreUnavailable = ignoreUnavailable;
this.sortBy = sortBy;
@ -310,7 +320,10 @@ public class TransportGetSnapshotsAction extends TransportMasterNodeAction<GetSn
assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.MANAGEMENT);
cancellableTask.ensureNotCancelled();
ensureRequiredNamesPresent(repositoryName, repositoryData);
return getAsyncSnapshotInfoIterator(repositoriesService.repository(repositoryName), repositoryData);
return getAsyncSnapshotInfoIterator(
repositoriesService.repository(projectId, repositoryName),
repositoryData
);
})
.addListener(asyncRepositoryContentsListener)
),
@ -362,7 +375,7 @@ public class TransportGetSnapshotsAction extends TransportMasterNodeAction<GetSn
if (snapshotNamePredicate == SnapshotNamePredicate.MATCH_CURRENT_ONLY) {
listener.onResponse(null);
} else {
repositoriesService.repository(repositoryName).getRepositoryData(executor, listener);
repositoriesService.repository(projectId, repositoryName).getRepositoryData(executor, listener);
}
}
@ -386,7 +399,7 @@ public class TransportGetSnapshotsAction extends TransportMasterNodeAction<GetSn
}
final var unmatchedRequiredNames = new HashSet<>(snapshotNamePredicate.requiredNames());
for (final var snapshotInProgress : snapshotsInProgress.forRepo(repositoryName)) {
for (final var snapshotInProgress : snapshotsInProgress.forRepo(projectId, repositoryName)) {
unmatchedRequiredNames.remove(snapshotInProgress.snapshot().getSnapshotId().getName());
}
if (unmatchedRequiredNames.isEmpty()) {
@ -464,7 +477,7 @@ public class TransportGetSnapshotsAction extends TransportMasterNodeAction<GetSn
ActionListener.completeWith(
listener,
() -> new SnapshotInfo(
new Snapshot(repository.getMetadata().name(), snapshotId),
new Snapshot(repository.getProjectId(), repository.getMetadata().name(), snapshotId),
indicesLookup.getOrDefault(snapshotId, Collections.emptyList()),
Collections.emptyList(),
Collections.emptyList(),
@ -490,18 +503,15 @@ public class TransportGetSnapshotsAction extends TransportMasterNodeAction<GetSn
final var indicesLookup = getIndicesLookup(repositoryData);
return Iterators.concat(
// matching in-progress snapshots first
Iterators.map(
Iterators.filter(snapshotsInProgress.forRepo(repository.getMetadata().name()).iterator(), snapshotInProgress -> {
final var snapshotId = snapshotInProgress.snapshot().getSnapshotId();
if (snapshotNamePredicate.test(snapshotId.getName(), true)) {
matchingInProgressSnapshots.add(snapshotId);
return true;
} else {
return false;
}
}),
this::forSnapshotInProgress
),
Iterators.map(Iterators.filter(snapshotsInProgress.forRepo(repository.getProjectRepo()).iterator(), snapshotInProgress -> {
final var snapshotId = snapshotInProgress.snapshot().getSnapshotId();
if (snapshotNamePredicate.test(snapshotId.getName(), true)) {
matchingInProgressSnapshots.add(snapshotId);
return true;
} else {
return false;
}
}), this::forSnapshotInProgress),
repositoryData == null
// Only returning in-progress snapshots:
? Collections.emptyIterator()

View File

@ -25,12 +25,12 @@ import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.common.util.concurrent.ListenableFuture;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.core.FixForMultiProject;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.injection.guice.Inject;
@ -76,6 +76,7 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeAction<Sn
private final RepositoriesService repositoriesService;
private final NodeClient client;
private final ProjectResolver projectResolver;
@Inject
public TransportSnapshotsStatusAction(
@ -84,7 +85,8 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeAction<Sn
ThreadPool threadPool,
RepositoriesService repositoriesService,
NodeClient client,
ActionFilters actionFilters
ActionFilters actionFilters,
ProjectResolver projectResolver
) {
super(
TYPE.name(),
@ -99,11 +101,12 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeAction<Sn
);
this.repositoriesService = repositoriesService;
this.client = client;
this.projectResolver = projectResolver;
}
@Override
protected ClusterBlockException checkBlock(SnapshotsStatusRequest request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
return state.blocks().globalBlockedException(projectResolver.getProjectId(), ClusterBlockLevel.METADATA_READ);
}
@Override
@ -117,13 +120,24 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeAction<Sn
final CancellableTask cancellableTask = (CancellableTask) task;
final SnapshotsInProgress snapshotsInProgress = SnapshotsInProgress.get(state);
final ProjectId projectId = projectResolver.getProjectId();
List<SnapshotsInProgress.Entry> currentSnapshots = SnapshotsService.currentSnapshots(
snapshotsInProgress,
projectId,
request.repository(),
Arrays.asList(request.snapshots())
);
if (currentSnapshots.isEmpty()) {
buildResponse(snapshotsInProgress, request, currentSnapshots, null, state.getMinTransportVersion(), cancellableTask, listener);
buildResponse(
snapshotsInProgress,
projectId,
request,
currentSnapshots,
null,
state.getMinTransportVersion(),
cancellableTask,
listener
);
return;
}
@ -153,6 +167,7 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeAction<Sn
listener.delegateFailureAndWrap(
(l, nodeSnapshotStatuses) -> buildResponse(
snapshotsInProgress,
projectId,
request,
currentSnapshots,
nodeSnapshotStatuses,
@ -165,7 +180,16 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeAction<Sn
);
} else {
// We don't have any in-progress shards, just return current stats
buildResponse(snapshotsInProgress, request, currentSnapshots, null, state.getMinTransportVersion(), cancellableTask, listener);
buildResponse(
snapshotsInProgress,
projectId,
request,
currentSnapshots,
null,
state.getMinTransportVersion(),
cancellableTask,
listener
);
}
}
@ -173,6 +197,7 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeAction<Sn
// Package access for testing.
void buildResponse(
SnapshotsInProgress snapshotsInProgress,
ProjectId projectId,
SnapshotsStatusRequest request,
List<SnapshotsInProgress.Entry> currentSnapshotEntries,
TransportNodesSnapshotsStatus.NodesSnapshotStatus nodeSnapshotStatuses,
@ -282,7 +307,7 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeAction<Sn
// Now add snapshots on disk that are not currently running
final String repositoryName = request.repository();
if (Strings.hasText(repositoryName) && CollectionUtils.isEmpty(request.snapshots()) == false) {
loadRepositoryData(snapshotsInProgress, request, builder, currentSnapshotNames, repositoryName, task, listener);
loadRepositoryData(snapshotsInProgress, request, builder, currentSnapshotNames, projectId, repositoryName, task, listener);
} else {
listener.onResponse(new SnapshotsStatusResponse(Collections.unmodifiableList(builder)));
}
@ -293,14 +318,13 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeAction<Sn
SnapshotsStatusRequest request,
List<SnapshotStatus> builder,
Set<String> currentSnapshotNames,
ProjectId projectId,
String repositoryName,
CancellableTask task,
ActionListener<SnapshotsStatusResponse> listener
) {
final Set<String> requestedSnapshotNames = Sets.newHashSet(request.snapshots());
final ListenableFuture<RepositoryData> repositoryDataListener = new ListenableFuture<>();
@FixForMultiProject(description = "resolve the actual projectId, ES-10166")
final var projectId = ProjectId.DEFAULT;
repositoriesService.getRepositoryData(projectId, repositoryName, repositoryDataListener);
final Collection<SnapshotId> snapshotIdsToLoad = new ArrayList<>();
repositoryDataListener.addListener(listener.delegateFailureAndWrap((delegate, repositoryData) -> {
@ -329,7 +353,7 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeAction<Sn
throw new SnapshotMissingException(repositoryName, snapshotName);
}
}
if (snapshotsInProgress.snapshot(new Snapshot(repositoryName, snapshotId)) == null) {
if (snapshotsInProgress.snapshot(new Snapshot(projectId, repositoryName, snapshotId)) == null) {
snapshotIdsToLoad.add(snapshotId);
}
}
@ -338,10 +362,11 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeAction<Sn
delegate.onResponse(new SnapshotsStatusResponse(Collections.unmodifiableList(builder)));
} else {
final List<SnapshotStatus> threadSafeBuilder = Collections.synchronizedList(builder);
repositoriesService.repository(repositoryName).getSnapshotInfo(snapshotIdsToLoad, true, task::isCancelled, snapshotInfo -> {
final Repository repository = repositoriesService.repository(projectId, repositoryName);
repository.getSnapshotInfo(snapshotIdsToLoad, true, task::isCancelled, snapshotInfo -> {
List<SnapshotIndexShardStatus> shardStatusBuilder = new ArrayList<>();
final Map<ShardId, IndexShardSnapshotStatus.Copy> shardStatuses;
shardStatuses = snapshotShards(repositoryName, repositoryData, task, snapshotInfo);
shardStatuses = snapshotShards(projectId, repositoryName, repositoryData, task, snapshotInfo);
// an exception here stops further fetches of snapshotInfo since context is fail-fast
for (final var shardStatus : shardStatuses.entrySet()) {
IndexShardSnapshotStatus.Copy lastSnapshotStatus = shardStatus.getValue();
@ -360,7 +385,7 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeAction<Sn
: "Inconsistent timestamps found in SnapshotInfo [" + snapshotInfo + "]";
threadSafeBuilder.add(
new SnapshotStatus(
new Snapshot(repositoryName, snapshotInfo.snapshotId()),
new Snapshot(projectId, repositoryName, snapshotInfo.snapshotId()),
state,
Collections.unmodifiableList(shardStatusBuilder),
snapshotInfo.includeGlobalState(),
@ -382,17 +407,19 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeAction<Sn
* returns similar information but for already finished snapshots.
* </p>
*
* @param projectId project for the repository
* @param repositoryName repository name
* @param snapshotInfo snapshot info
* @return map of shard id to snapshot status
*/
private Map<ShardId, IndexShardSnapshotStatus.Copy> snapshotShards(
final ProjectId projectId,
final String repositoryName,
final RepositoryData repositoryData,
final CancellableTask task,
final SnapshotInfo snapshotInfo
) throws IOException {
final Repository repository = repositoriesService.repository(repositoryName);
final Repository repository = repositoriesService.repository(projectId, repositoryName);
final Map<ShardId, IndexShardSnapshotStatus.Copy> shardStatus = new HashMap<>();
for (String index : snapshotInfo.indices()) {
IndexId indexId = repositoryData.resolveIndexId(index);

View File

@ -20,6 +20,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.repositories.ProjectRepo;
import org.elasticsearch.repositories.RepositoryOperation;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.xcontent.ToXContent;
@ -65,10 +66,10 @@ public class SnapshotDeletionsInProgress extends AbstractNamedDiffable<Custom> i
}
private static boolean assertNoConcurrentDeletionsForSameRepository(List<Entry> entries) {
final Set<String> activeRepositories = new HashSet<>();
final Set<ProjectRepo> activeRepositories = new HashSet<>();
for (Entry entry : entries) {
if (entry.state() == State.STARTED) {
final boolean added = activeRepositories.add(entry.repository());
final boolean added = activeRepositories.add(new ProjectRepo(entry.projectId(), entry.repository()));
assert added : "Found multiple running deletes for a single repository in " + entries;
}
}
@ -114,11 +115,12 @@ public class SnapshotDeletionsInProgress extends AbstractNamedDiffable<Custom> i
/**
* Checks if there is an actively executing delete operation for the given repository
*
* @param projectId project for the repository
* @param repository repository name
*/
public boolean hasExecutingDeletion(String repository) {
public boolean hasExecutingDeletion(ProjectId projectId, String repository) {
for (Entry entry : entries) {
if (entry.state() == State.STARTED && entry.repository().equals(repository)) {
if (entry.state() == State.STARTED && entry.projectId.equals(projectId) && entry.repository().equals(repository)) {
return true;
}
}
@ -133,6 +135,13 @@ public class SnapshotDeletionsInProgress extends AbstractNamedDiffable<Custom> i
return entries.isEmpty() == false;
}
/**
* Similar to {@link #hasDeletionsInProgress()} but checks in the scope of the given project.
*/
public boolean hasDeletionsInProgress(ProjectId projectId) {
return entries.stream().anyMatch(entry -> entry.projectId().equals(projectId));
}
@Override
public String getWriteableName() {
return TYPE;

View File

@ -34,8 +34,8 @@ import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.ProjectRepo;
import org.elasticsearch.repositories.RepositoryOperation;
import org.elasticsearch.repositories.RepositoryOperation.ProjectRepo;
import org.elasticsearch.repositories.RepositoryShardId;
import org.elasticsearch.repositories.ShardGeneration;
import org.elasticsearch.repositories.ShardSnapshotResult;
@ -60,7 +60,7 @@ import java.util.Objects;
import java.util.Set;
import java.util.stream.Stream;
import static org.elasticsearch.repositories.RepositoryOperation.PROJECT_REPO_SERIALIZER;
import static org.elasticsearch.repositories.ProjectRepo.PROJECT_REPO_SERIALIZER;
/**
* Meta data about snapshots that are currently executing
@ -174,11 +174,15 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
return forRepo(Metadata.DEFAULT_PROJECT_ID, repository);
}
public List<Entry> forRepo(ProjectId projectId, String repository) {
return forRepo(new ProjectRepo(projectId, repository));
}
/**
* Returns the list of snapshots in the specified repository.
*/
public List<Entry> forRepo(ProjectId projectId, String repository) {
return entries.getOrDefault(new ProjectRepo(projectId, repository), ByRepo.EMPTY).entries;
public List<Entry> forRepo(ProjectRepo projectRepo) {
return entries.getOrDefault(projectRepo, ByRepo.EMPTY).entries;
}
public boolean isEmpty() {
@ -197,10 +201,30 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
return () -> Iterators.map(entries.values().iterator(), byRepo -> byRepo.entries);
}
/**
* Similar to {@link #entriesByRepo()} but only returns entries for the specified project.
*/
public Iterable<List<Entry>> entriesByRepo(ProjectId projectId) {
return () -> Iterators.map(
Iterators.filter(entries.entrySet().iterator(), entry -> entry.getKey().projectId().equals(projectId)),
entry -> entry.getValue().entries
);
}
public Stream<Entry> asStream() {
return entries.values().stream().flatMap(t -> t.entries.stream());
}
/**
* Similar to {@link #asStream()} but only returns entries for the specified project.
*/
public Stream<Entry> asStream(ProjectId projectId) {
return entries.entrySet()
.stream()
.filter(entry -> entry.getKey().projectId().equals(projectId))
.flatMap(entry -> entry.getValue().entries.stream());
}
@Nullable
public Entry snapshot(final Snapshot snapshot) {
return findSnapshotInList(snapshot, forRepo(snapshot.getProjectId(), snapshot.getRepository()));
@ -221,25 +245,6 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
return null;
}
/**
* Computes a map of repository shard id to set of shard generations, containing all shard generations that became obsolete and may be
* deleted from the repository as the cluster state moves from the given old value of {@link SnapshotsInProgress} to this instance.
* <p>
* An unique shard generation is created for every in-progress shard snapshot. The shard generation file contains information about all
* the files needed by pre-existing and any new shard snapshots that were in-progress. When a shard snapshot is finalized, its file list
* is promoted to the official shard snapshot list for the index shard. This final list will contain metadata about any other
* in-progress shard snapshots that were not yet finalized when it began. All these other in-progress shard snapshot lists are scheduled
* for deletion now.
*/
@FixForMultiProject
@Deprecated(forRemoval = true)
public Map<RepositoryShardId, Set<ShardGeneration>> obsoleteGenerations(
String repository,
SnapshotsInProgress oldClusterStateSnapshots
) {
return obsoleteGenerations(Metadata.DEFAULT_PROJECT_ID, repository, oldClusterStateSnapshots);
}
/**
* Computes a map of repository shard id to set of shard generations, containing all shard generations that became obsolete and may be
* deleted from the repository as the cluster state moves from the given old value of {@link SnapshotsInProgress} to this instance.

View File

@ -2089,6 +2089,9 @@ public class ProjectMetadata implements Iterable<IndexMetadata>, Diffable<Projec
public static ProjectMetadata fromXContent(XContentParser parser) throws IOException {
XContentParser.Token token = parser.currentToken();
if (token == null) {
token = parser.nextToken();
}
String currentFieldName = null;
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, token, parser);

View File

@ -14,6 +14,7 @@ import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.core.FixForMultiProject;
import java.util.Objects;
@ -73,7 +74,9 @@ public class SnapshotInProgressAllocationDecider extends AllocationDecider {
return YES_NOT_SNAPSHOTTED;
}
for (final var entriesByRepo : snapshotsInProgress.entriesByRepo()) {
@FixForMultiProject(description = "replace with entriesByRepo(ProjectId), see also ES-12195")
final var entriesByRepoIterable = snapshotsInProgress.entriesByRepo();
for (final var entriesByRepo : entriesByRepoIterable) {
for (final var entry : entriesByRepo) {
if (entry.isClone()) {
// clones do not run on data nodes

View File

@ -1115,7 +1115,8 @@ class NodeConstruction {
repositoriesService,
transportService,
actionModule.getActionFilters(),
systemIndices
systemIndices,
projectResolver.supportsMultipleProjects()
);
SnapshotShardsService snapshotShardsService = new SnapshotShardsService(
settings,

View File

@ -67,8 +67,8 @@ public class FilterRepository implements Repository {
}
@Override
public Metadata getSnapshotGlobalMetadata(SnapshotId snapshotId) {
return in.getSnapshotGlobalMetadata(snapshotId);
public Metadata getSnapshotGlobalMetadata(SnapshotId snapshotId, boolean fromProjectMetadata) {
return in.getSnapshotGlobalMetadata(snapshotId, fromProjectMetadata);
}
@Override

View File

@ -27,6 +27,7 @@ import java.util.Set;
*/
public final class FinalizeSnapshotContext extends DelegatingActionListener<RepositoryData, RepositoryData> {
private final boolean serializeProjectMetadata;
private final UpdatedShardGenerations updatedShardGenerations;
/**
@ -46,6 +47,7 @@ public final class FinalizeSnapshotContext extends DelegatingActionListener<Repo
private final Runnable onDone;
/**
* @param serializeProjectMetadata serialize only the project metadata of the cluster metadata
* @param updatedShardGenerations updated shard generations for both live and deleted indices
* @param repositoryStateId the unique id identifying the state of the repository when the snapshot began
* @param clusterMetadata cluster metadata
@ -57,6 +59,7 @@ public final class FinalizeSnapshotContext extends DelegatingActionListener<Repo
* once all cleanup operations after snapshot completion have executed
*/
public FinalizeSnapshotContext(
boolean serializeProjectMetadata,
UpdatedShardGenerations updatedShardGenerations,
long repositoryStateId,
Metadata clusterMetadata,
@ -66,6 +69,7 @@ public final class FinalizeSnapshotContext extends DelegatingActionListener<Repo
Runnable onDone
) {
super(listener);
this.serializeProjectMetadata = serializeProjectMetadata;
this.updatedShardGenerations = updatedShardGenerations;
this.repositoryStateId = repositoryStateId;
this.clusterMetadata = clusterMetadata;
@ -74,6 +78,10 @@ public final class FinalizeSnapshotContext extends DelegatingActionListener<Repo
this.onDone = onDone;
}
public boolean serializeProjectMetadata() {
return serializeProjectMetadata;
}
public long repositoryStateId() {
return repositoryStateId;
}
@ -107,7 +115,8 @@ public final class FinalizeSnapshotContext extends DelegatingActionListener<Repo
// Now that the updated cluster state may have changed in-progress shard snapshots' shard generations to the latest shard
// generation, let's mark any now unreferenced shard generations as obsolete and ready to be deleted.
obsoleteGenerations.set(
SnapshotsInProgress.get(updatedState).obsoleteGenerations(snapshotInfo.repository(), SnapshotsInProgress.get(state))
SnapshotsInProgress.get(updatedState)
.obsoleteGenerations(snapshotInfo.projectId(), snapshotInfo.repository(), SnapshotsInProgress.get(state))
);
return updatedState;
}

View File

@ -76,7 +76,7 @@ public class InvalidRepository extends AbstractLifecycleComponent implements Rep
}
@Override
public Metadata getSnapshotGlobalMetadata(SnapshotId snapshotId) {
public Metadata getSnapshotGlobalMetadata(SnapshotId snapshotId, boolean fromProjectMetadata) {
throw createCreationException();
}

View File

@ -0,0 +1,57 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
package org.elasticsearch.repositories;
import org.elasticsearch.cluster.DiffableUtils;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import java.io.IOException;
/**
* A project qualified repository
*
* @param projectId The project that the repository belongs to
* @param name Name of the repository
*/
public record ProjectRepo(ProjectId projectId, String name) implements Writeable {
public static final DiffableUtils.KeySerializer<ProjectRepo> PROJECT_REPO_SERIALIZER = new DiffableUtils.KeySerializer<>() {
@Override
public void writeKey(ProjectRepo key, StreamOutput out) throws IOException {
key.writeTo(out);
}
@Override
public ProjectRepo readKey(StreamInput in) throws IOException {
return new ProjectRepo(in);
}
};
public ProjectRepo(StreamInput in) throws IOException {
this(ProjectId.readFrom(in), in.readString());
}
@Override
public void writeTo(StreamOutput out) throws IOException {
projectId.writeTo(out);
out.writeString(name);
}
@Override
public String toString() {
return projectRepoString(projectId, name);
}
public static String projectRepoString(ProjectId projectId, String repositoryName) {
return "[" + projectId + "/" + repositoryName + "]";
}
}

View File

@ -77,7 +77,7 @@ import java.util.stream.Stream;
import static java.util.Collections.unmodifiableMap;
import static org.elasticsearch.core.Strings.format;
import static org.elasticsearch.repositories.RepositoryOperation.projectRepoString;
import static org.elasticsearch.repositories.ProjectRepo.projectRepoString;
import static org.elasticsearch.snapshots.SearchableSnapshotsSettings.SEARCHABLE_SNAPSHOTS_REPOSITORY_NAME_SETTING_KEY;
import static org.elasticsearch.snapshots.SearchableSnapshotsSettings.SEARCHABLE_SNAPSHOTS_REPOSITORY_UUID_SETTING_KEY;
@ -455,8 +455,8 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C
logger.info(
Strings.format(
"Registering repository [%s] with repository UUID [%s] and generation [%d]",
repositoryName,
"Registering repository %s with repository UUID [%s] and generation [%d]",
projectRepoString(projectId, repositoryName),
repositoryData.getUuid(),
repositoryData.getGenId()
)

View File

@ -87,6 +87,13 @@ public interface Repository extends LifecycleComponent {
@Nullable
ProjectId getProjectId();
/**
* Get the project qualified repository
*/
default ProjectRepo getProjectRepo() {
return new ProjectRepo(getProjectId(), getMetadata().name());
}
/**
* Returns metadata about this repository.
*/
@ -138,10 +145,11 @@ public interface Repository extends LifecycleComponent {
/**
* Returns global metadata associated with the snapshot.
*
* @param snapshotId the snapshot id to load the global metadata from
* @param snapshotId the snapshot id to load the global metadata from
* @param fromProjectMetadata The metadata may need to be constructed by first reading the project metadata
* @return the global metadata about the snapshot
*/
Metadata getSnapshotGlobalMetadata(SnapshotId snapshotId);
Metadata getSnapshotGlobalMetadata(SnapshotId snapshotId, boolean fromProjectMetadata);
/**
* Returns the index metadata associated with the snapshot.

View File

@ -8,13 +8,7 @@
*/
package org.elasticsearch.repositories;
import org.elasticsearch.cluster.DiffableUtils;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import java.io.IOException;
/**
* Coordinates of an operation that modifies a repository, assuming that repository at a specific generation.
@ -36,46 +30,4 @@ public interface RepositoryOperation {
*/
long repositoryStateId();
/**
* A project qualified repository
* @param projectId The project that the repository belongs to
* @param name Name of the repository
*/
record ProjectRepo(ProjectId projectId, String name) implements Writeable {
public ProjectRepo(StreamInput in) throws IOException {
this(ProjectId.readFrom(in), in.readString());
}
@Override
public void writeTo(StreamOutput out) throws IOException {
projectId.writeTo(out);
out.writeString(name);
}
@Override
public String toString() {
return projectRepoString(projectId, name);
}
}
static ProjectRepo projectRepo(ProjectId projectId, String repositoryName) {
return new ProjectRepo(projectId, repositoryName);
}
static String projectRepoString(ProjectId projectId, String repositoryName) {
return "[" + projectId + "][" + repositoryName + "]";
}
DiffableUtils.KeySerializer<ProjectRepo> PROJECT_REPO_SERIALIZER = new DiffableUtils.KeySerializer<>() {
@Override
public void writeKey(ProjectRepo key, StreamOutput out) throws IOException {
key.writeTo(out);
}
@Override
public ProjectRepo readKey(StreamInput in) throws IOException {
return new ProjectRepo(in);
}
};
}

View File

@ -74,7 +74,7 @@ public class UnknownTypeRepository extends AbstractLifecycleComponent implements
}
@Override
public Metadata getSnapshotGlobalMetadata(SnapshotId snapshotId) {
public Metadata getSnapshotGlobalMetadata(SnapshotId snapshotId, boolean fromProjectMetadata) {
throw createUnknownTypeException();
}

View File

@ -179,6 +179,7 @@ import static org.elasticsearch.core.Strings.format;
import static org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo;
import static org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo.canonicalName;
import static org.elasticsearch.indices.recovery.RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING;
import static org.elasticsearch.repositories.ProjectRepo.projectRepoString;
/**
* BlobStore - based implementation of Snapshot Repository
@ -371,6 +372,15 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
ChunkedToXContent::wrapAsToXContent
);
public static final ChecksumBlobStoreFormat<ProjectMetadata> PROJECT_METADATA_FORMAT = new ChecksumBlobStoreFormat<>(
"project-metadata",
METADATA_NAME_FORMAT,
(repoName, parser) -> ProjectMetadata.Builder.fromXContent(parser),
projectMetadata -> ChunkedToXContent.wrapAsToXContent(
params -> Iterators.concat(Iterators.single((builder, ignored) -> builder.field("id", projectMetadata.id())))
)
);
public static final ChecksumBlobStoreFormat<IndexMetadata> INDEX_METADATA_FORMAT = new ChecksumBlobStoreFormat<>(
"index-metadata",
METADATA_NAME_FORMAT,
@ -710,7 +720,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
@Override
public void updateState(ClusterState state) {
final Settings previousSettings = metadata.settings();
metadata = getRepoMetadata(state);
metadata = getRepoMetadata(state.metadata().getProject(getProjectId()));
final Settings updatedSettings = metadata.settings();
if (updatedSettings.equals(previousSettings) == false) {
snapshotRateLimiter = getSnapshotRateLimiter();
@ -725,7 +735,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
return;
}
if (bestEffortConsistency) {
long bestGenerationFromCS = bestGeneration(SnapshotsInProgress.get(state).forRepo(this.metadata.name()));
long bestGenerationFromCS = bestGeneration(SnapshotsInProgress.get(state).forRepo(getProjectRepo()));
// Don't use generation from the delete task if we already found a generation for an in progress snapshot.
// In this case, the generation points at the generation the repo will be in after the snapshot finishes so it may not yet
// exist
@ -1282,7 +1292,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
private void getOneShardCount(String indexMetaGeneration) {
try {
updateShardCount(
INDEX_METADATA_FORMAT.read(metadata.name(), indexContainer, indexMetaGeneration, namedXContentRegistry)
INDEX_METADATA_FORMAT.read(getProjectRepo(), indexContainer, indexMetaGeneration, namedXContentRegistry)
.getNumberOfShards()
);
} catch (Exception ex) {
@ -1513,8 +1523,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
} catch (Exception e) {
logger.warn(
() -> format(
"[%s] The following blobs are no longer part of any snapshot [%s] but failed to remove them",
metadata.name(),
"%s The following blobs are no longer part of any snapshot [%s] but failed to remove them",
toStringShort(),
staleRootBlobs
),
e
@ -1542,8 +1552,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
logger.debug("[{}] Cleaned up stale index [{}]", metadata.name(), indexId);
} catch (IOException e) {
logger.warn(() -> format("""
[%s] index %s is no longer part of any snapshot in the repository, \
but failed to clean up its index folder""", metadata.name(), indexId), e);
%s index %s is no longer part of any snapshot in the repository, \
but failed to clean up its index folder""", toStringShort(), indexId), e);
}
}));
}
@ -1616,7 +1626,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
.collect(Collectors.toSet());
final List<String> blobsToLog = blobsToDelete.stream().filter(b -> blobNamesToIgnore.contains(b) == false).toList();
if (blobsToLog.isEmpty() == false) {
logger.info("[{}] Found stale root level blobs {}. Cleaning them up", metadata.name(), blobsToLog);
logger.info("{} Found stale root level blobs {}. Cleaning them up", toStringShort(), blobsToLog);
}
}
}
@ -1749,6 +1759,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
@Override
public void finalizeSnapshot(final FinalizeSnapshotContext finalizeSnapshotContext) {
assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SNAPSHOT);
assert finalizeSnapshotContext.snapshotInfo().projectId().equals(getProjectId())
: "project-id mismatch: " + finalizeSnapshotContext.snapshotInfo().projectId() + " != " + getProjectId();
final long repositoryStateId = finalizeSnapshotContext.repositoryStateId();
final SnapshotInfo snapshotInfo = finalizeSnapshotContext.snapshotInfo();
assert repositoryStateId > RepositoryData.UNKNOWN_REPO_GEN
@ -1813,17 +1825,19 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
// Write global metadata
final Metadata clusterMetadata = finalizeSnapshotContext.clusterMetadata();
executor.execute(
ActionRunnable.run(
allMetaListeners.acquire(),
() -> GLOBAL_METADATA_FORMAT.write(clusterMetadata, blobContainer(), snapshotId.getUUID(), compress)
)
);
final var projectMetadata = clusterMetadata.getProject(getProjectId());
executor.execute(ActionRunnable.run(allMetaListeners.acquire(), () -> {
if (finalizeSnapshotContext.serializeProjectMetadata()) {
PROJECT_METADATA_FORMAT.write(projectMetadata, blobContainer(), snapshotId.getUUID(), compress);
} else {
GLOBAL_METADATA_FORMAT.write(clusterMetadata, blobContainer(), snapshotId.getUUID(), compress);
}
}));
// Write the index metadata for each index in the snapshot
for (IndexId index : indices) {
executor.execute(ActionRunnable.run(allMetaListeners.acquire(), () -> {
final IndexMetadata indexMetaData = clusterMetadata.getProject().index(index.getName());
final IndexMetadata indexMetaData = projectMetadata.index(index.getName());
if (writeIndexGens) {
final String identifiers = IndexMetaDataGenerations.buildUniqueIdentifier(indexMetaData);
String metaUUID = existingRepositoryData.indexMetaDataGenerations().getIndexMetaBlobId(identifiers);
@ -1836,7 +1850,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
metadataWriteResult.indexMetas().put(index, identifiers);
} else {
INDEX_METADATA_FORMAT.write(
clusterMetadata.getProject().index(index.getName()),
clusterMetadata.getProject(getProjectId()).index(index.getName()),
indexContainer(index),
snapshotId.getUUID(),
compress
@ -2014,7 +2028,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
Exception failure = null;
SnapshotInfo snapshotInfo = null;
try {
snapshotInfo = SNAPSHOT_FORMAT.read(metadata.name(), blobContainer(), snapshotId.getUUID(), namedXContentRegistry);
snapshotInfo = SNAPSHOT_FORMAT.read(getProjectRepo(), blobContainer(), snapshotId.getUUID(), namedXContentRegistry);
} catch (NoSuchFileException ex) {
failure = new SnapshotMissingException(metadata.name(), snapshotId, ex);
} catch (IOException | NotXContentException ex) {
@ -2038,9 +2052,19 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
}
@Override
public Metadata getSnapshotGlobalMetadata(final SnapshotId snapshotId) {
public Metadata getSnapshotGlobalMetadata(final SnapshotId snapshotId, boolean fromProjectMetadata) {
try {
return GLOBAL_METADATA_FORMAT.read(metadata.name(), blobContainer(), snapshotId.getUUID(), namedXContentRegistry);
if (fromProjectMetadata) {
final var projectMetadata = PROJECT_METADATA_FORMAT.read(
getProjectRepo(),
blobContainer(),
snapshotId.getUUID(),
namedXContentRegistry
);
return Metadata.builder().put(projectMetadata).build();
} else {
return GLOBAL_METADATA_FORMAT.read(getProjectRepo(), blobContainer(), snapshotId.getUUID(), namedXContentRegistry);
}
} catch (NoSuchFileException ex) {
throw new SnapshotMissingException(metadata.name(), snapshotId, ex);
} catch (IOException ex) {
@ -2052,7 +2076,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
public IndexMetadata getSnapshotIndexMetaData(RepositoryData repositoryData, SnapshotId snapshotId, IndexId index) throws IOException {
try {
return INDEX_METADATA_FORMAT.read(
metadata.name(),
getProjectRepo(),
indexContainer(index),
repositoryData.indexMetaDataGenerations().indexMetaBlobId(snapshotId, index),
namedXContentRegistry
@ -2130,9 +2154,9 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
if (warnIfOverRecovery && effectiveRecoverySpeed.getBytes() > 0) {
if (maxConfiguredBytesPerSec.getBytes() > effectiveRecoverySpeed.getBytes()) {
logger.warn(
"repository [{}] has a rate limit [{}={}] per second which is above the effective recovery rate limit "
"repository {} has a rate limit [{}={}] per second which is above the effective recovery rate limit "
+ "[{}={}] per second, thus the repository rate limit will be superseded by the recovery rate limit",
metadata.name(),
toStringShort(),
settingKey,
maxConfiguredBytesPerSec,
INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(),
@ -2339,7 +2363,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
@Override
public void onFailure(Exception e) {
logger.warn(
() -> format("[%s] Exception when initializing repository generation in cluster state", metadata.name()),
() -> format("%s Exception when initializing repository generation in cluster state", toStringShort()),
e
);
acquireAndClearRepoDataInitialized().onFailure(e);
@ -2403,11 +2427,11 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
private ClusterState getClusterStateWithUpdatedRepositoryGeneration(ClusterState currentState, RepositoryData repoData) {
// In theory we might have failed over to a different master which initialized the repo and then failed back to this node, so we
// must check the repository generation in the cluster state is still unknown here.
final RepositoryMetadata repoMetadata = getRepoMetadata(currentState);
final var project = currentState.metadata().getProject(getProjectId());
final RepositoryMetadata repoMetadata = getRepoMetadata(project);
if (repoMetadata.generation() != RepositoryData.UNKNOWN_REPO_GEN) {
throw new RepositoryException(repoMetadata.name(), "Found unexpected initialized repo metadata [" + repoMetadata + "]");
}
final var project = currentState.metadata().getProject(getProjectId());
return ClusterState.builder(currentState)
.putProjectMetadata(
ProjectMetadata.builder(project)
@ -2588,56 +2612,53 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
private void markRepoCorrupted(long corruptedGeneration, Exception originalException, ActionListener<Void> listener) {
assert corruptedGeneration != RepositoryData.UNKNOWN_REPO_GEN;
assert bestEffortConsistency == false;
logger.warn(() -> "Marking repository [" + metadata.name() + "] as corrupted", originalException);
submitUnbatchedTask(
"mark repository corrupted [" + metadata.name() + "][" + corruptedGeneration + "]",
new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
final var project = currentState.metadata().getDefaultProject();
final RepositoriesMetadata state = RepositoriesMetadata.get(project);
final RepositoryMetadata repoState = state.repository(metadata.name());
if (repoState.generation() != corruptedGeneration) {
throw new IllegalStateException(
"Tried to mark repo generation ["
+ corruptedGeneration
+ "] as corrupted but its state concurrently changed to ["
+ repoState
+ "]"
);
}
return ClusterState.builder(currentState)
.putProjectMetadata(
ProjectMetadata.builder(project)
.putCustom(
RepositoriesMetadata.TYPE,
state.withUpdatedGeneration(
metadata.name(),
RepositoryData.CORRUPTED_REPO_GEN,
repoState.pendingGeneration()
)
)
)
.build();
}
@Override
public void onFailure(Exception e) {
listener.onFailure(
new RepositoryException(
metadata.name(),
"Failed marking repository state as corrupted",
ExceptionsHelper.useOrSuppress(e, originalException)
)
logger.warn(() -> "Marking repository " + toStringShort() + " as corrupted", originalException);
submitUnbatchedTask("mark repository corrupted " + toStringShort() + "[" + corruptedGeneration + "]", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
final var project = currentState.metadata().getProject(projectId);
final RepositoriesMetadata state = RepositoriesMetadata.get(project);
final RepositoryMetadata repoState = state.repository(metadata.name());
if (repoState.generation() != corruptedGeneration) {
throw new IllegalStateException(
"Tried to mark repo generation ["
+ corruptedGeneration
+ "] as corrupted but its state concurrently changed to ["
+ repoState
+ "]"
);
}
@Override
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
listener.onResponse(null);
}
return ClusterState.builder(currentState)
.putProjectMetadata(
ProjectMetadata.builder(project)
.putCustom(
RepositoriesMetadata.TYPE,
state.withUpdatedGeneration(
metadata.name(),
RepositoryData.CORRUPTED_REPO_GEN,
repoState.pendingGeneration()
)
)
)
.build();
}
);
@Override
public void onFailure(Exception e) {
listener.onFailure(
new RepositoryException(
metadata.name(),
"Failed marking repository state as corrupted",
ExceptionsHelper.useOrSuppress(e, originalException)
)
);
}
@Override
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
listener.onResponse(null);
}
});
}
private RepositoryData getRepositoryData(long indexGen) {
@ -2748,7 +2769,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
@Override
public ClusterState execute(ClusterState currentState) {
final RepositoryMetadata meta = getRepoMetadata(currentState);
final var project = currentState.metadata().getProject(projectId);
final RepositoryMetadata meta = getRepoMetadata(project);
final String repoName = metadata.name();
if (RepositoriesService.isReadOnly(meta.settings())) {
@ -2762,9 +2784,9 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
final boolean uninitializedMeta = meta.generation() == RepositoryData.UNKNOWN_REPO_GEN || bestEffortConsistency;
if (uninitializedMeta == false && meta.pendingGeneration() != genInState) {
logger.info(
"Trying to write new repository data over unfinished write, repo [{}] is at "
"Trying to write new repository data over unfinished write, repo {} is at "
+ "safe generation [{}] and pending generation [{}]",
meta.name(),
toStringShort(),
genInState,
meta.pendingGeneration()
);
@ -2788,7 +2810,6 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
+ "] must be larger than latest known generation ["
+ latestKnownRepoGen.get()
+ "]";
final var project = currentState.metadata().getDefaultProject();
return ClusterState.builder(currentState)
.putProjectMetadata(
ProjectMetadata.builder(project)
@ -2897,9 +2918,9 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
assert newRepositoryData.getUuid().equals(RepositoryData.MISSING_UUID) == false;
logger.info(
Strings.format(
"Generated new repository UUID [%s] for repository [%s] in generation [%d]",
"Generated new repository UUID [%s] for repository %s in generation [%d]",
newRepositoryData.getUuid(),
metadata.name(),
toStringShort(),
newGen
)
);
@ -2914,7 +2935,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
submitUnbatchedTask(setSafeGenerationSource, new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
final RepositoryMetadata meta = getRepoMetadata(currentState);
final var project = currentState.metadata().getProject(projectId);
final RepositoryMetadata meta = getRepoMetadata(project);
if (meta.generation() != expectedGen) {
throw new IllegalStateException(
"Tried to update repo generation to [" + newGen + "] but saw unexpected generation in state [" + meta + "]"
@ -2929,7 +2951,6 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
+ "]"
);
}
final var project = currentState.metadata().getDefaultProject();
final RepositoriesMetadata withGenerations = RepositoriesMetadata.get(project)
.withUpdatedGeneration(metadata.name(), newGen, newGen);
final RepositoriesMetadata withUuid = meta.uuid().equals(newRepositoryData.getUuid())
@ -3089,7 +3110,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
boolean changedSnapshots = false;
final List<SnapshotsInProgress.Entry> snapshotEntries = new ArrayList<>();
final SnapshotsInProgress snapshotsInProgress = SnapshotsInProgress.get(state);
for (SnapshotsInProgress.Entry entry : snapshotsInProgress.forRepo(repoName)) {
for (SnapshotsInProgress.Entry entry : snapshotsInProgress.forRepo(getProjectId(), repoName)) {
if (entry.repositoryStateId() == oldGen) {
snapshotEntries.add(entry.withRepoGen(newGen));
changedSnapshots = true;
@ -3098,13 +3119,13 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
}
}
updatedSnapshotsInProgress = changedSnapshots
? snapshotsInProgress.createCopyWithUpdatedEntriesForRepo(repoName, snapshotEntries)
? snapshotsInProgress.createCopyWithUpdatedEntriesForRepo(getProjectId(), repoName, snapshotEntries)
: null;
final SnapshotDeletionsInProgress updatedDeletionsInProgress;
boolean changedDeletions = false;
final List<SnapshotDeletionsInProgress.Entry> deletionEntries = new ArrayList<>();
for (SnapshotDeletionsInProgress.Entry entry : SnapshotDeletionsInProgress.get(state).getEntries()) {
if (entry.repository().equals(repoName) && entry.repositoryStateId() == oldGen) {
if (entry.projectId().equals(getProjectId()) && entry.repository().equals(repoName) && entry.repositoryStateId() == oldGen) {
deletionEntries.add(entry.withRepoGen(newGen));
changedDeletions = true;
} else {
@ -3115,9 +3136,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
return SnapshotsService.updateWithSnapshots(state, updatedSnapshotsInProgress, updatedDeletionsInProgress);
}
private RepositoryMetadata getRepoMetadata(ClusterState state) {
final RepositoryMetadata repositoryMetadata = RepositoriesMetadata.get(state.getMetadata().getProject(getProjectId()))
.repository(metadata.name());
private RepositoryMetadata getRepoMetadata(ProjectMetadata projectMetadata) {
final RepositoryMetadata repositoryMetadata = RepositoriesMetadata.get(projectMetadata).repository(metadata.name());
assert repositoryMetadata != null || lifecycle.stoppedOrClosed()
: "did not find metadata for repo [" + metadata.name() + "] in state [" + lifecycleState() + "]";
return repositoryMetadata;
@ -3189,7 +3209,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
} catch (NumberFormatException nfe) {
// the index- blob wasn't of the format index-N where N is a number,
// no idea what this blob is but it doesn't belong in the repository!
logger.warn("[{}] Unknown blob in the repository: {}", metadata.name(), blobName);
logger.warn("[{}] Unknown blob in the repository: {}", toStringShort(), blobName);
}
}
return latest;
@ -3868,7 +3888,12 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
@Override
public String toString() {
return "BlobStoreRepository[" + "[" + metadata.name() + "], [" + blobStore.get() + ']' + ']';
return "BlobStoreRepository[" + toStringShort() + ", [" + blobStore.get() + ']' + ']';
}
// Package private for testing
String toStringShort() {
return projectRepoString(projectId, metadata.name());
}
/**
@ -3898,7 +3923,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
*/
public BlobStoreIndexShardSnapshot loadShardSnapshot(BlobContainer shardContainer, SnapshotId snapshotId) {
try {
return INDEX_SHARD_SNAPSHOT_FORMAT.read(metadata.name(), shardContainer, snapshotId.getUUID(), namedXContentRegistry);
return INDEX_SHARD_SNAPSHOT_FORMAT.read(getProjectRepo(), shardContainer, snapshotId.getUUID(), namedXContentRegistry);
} catch (NoSuchFileException ex) {
throw new SnapshotMissingException(metadata.name(), snapshotId, ex);
} catch (IOException ex) {
@ -3953,7 +3978,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
try {
return new Tuple<>(
INDEX_SHARD_SNAPSHOTS_FORMAT.read(
metadata.name(),
getProjectRepo(),
shardContainer,
generation.getGenerationUUID(),
namedXContentRegistry
@ -3989,10 +4014,10 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
// keeping hold of its data blobs.
try {
final var message = Strings.format(
"index %s shard generation [%s] in [%s][%s] not found - falling back to reading all shard snapshots",
"index %s shard generation [%s] in %s[%s] not found - falling back to reading all shard snapshots",
indexId,
generation,
metadata.name(),
toStringShort(),
shardContainer.path()
);
logger.error(message, noSuchFileException);
@ -4009,7 +4034,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
&& shardSnapshotBlobName.endsWith(METADATA_BLOB_NAME_SUFFIX)
&& shardSnapshotBlobName.length() == shardSnapshotBlobNameLength) {
final var shardSnapshot = INDEX_SHARD_SNAPSHOT_FORMAT.read(
metadata.name(),
getProjectRepo(),
shardContainer,
shardSnapshotBlobName.substring(SNAPSHOT_PREFIX.length(), shardSnapshotBlobNameLengthBeforeExt),
namedXContentRegistry
@ -4033,17 +4058,17 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
}
}
logger.error(
"read shard snapshots [{}] due to missing shard generation [{}] for index {} in [{}][{}]",
"read shard snapshots [{}] due to missing shard generation [{}] for index {} in {}[{}]",
messageBuilder,
generation,
indexId,
metadata.name(),
toStringShort(),
shardContainer.path()
);
return new Tuple<>(blobStoreIndexShardSnapshots, generation);
} catch (Exception fallbackException) {
logger.error(
Strings.format("failed while reading all shard snapshots from [%s][%s]", metadata.name(), shardContainer.path()),
Strings.format("failed while reading all shard snapshots from %s[%s]", toStringShort(), shardContainer.path()),
fallbackException
);
noSuchFileException.addSuppressed(fallbackException);
@ -4067,7 +4092,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
long latest = latestGeneration(blobs);
if (latest >= 0) {
final BlobStoreIndexShardSnapshots shardSnapshots = INDEX_SHARD_SNAPSHOTS_FORMAT.read(
metadata.name(),
getProjectRepo(),
shardContainer,
Long.toString(latest),
namedXContentRegistry

View File

@ -30,6 +30,7 @@ import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParserUtils;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.gateway.CorruptStateException;
import org.elasticsearch.repositories.ProjectRepo;
import org.elasticsearch.xcontent.NamedXContentRegistry;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentBuilder;
@ -69,9 +70,9 @@ public final class ChecksumBlobStoreFormat<T> {
private final String blobNameFormat;
private final CheckedBiFunction<String, XContentParser, T, IOException> reader;
private final CheckedBiFunction<ProjectRepo, XContentParser, T, IOException> reader;
private final CheckedBiFunction<String, XContentParser, T, IOException> fallbackReader;
private final CheckedBiFunction<ProjectRepo, XContentParser, T, IOException> fallbackReader;
private final Function<T, ? extends ToXContent> writer;
@ -85,8 +86,8 @@ public final class ChecksumBlobStoreFormat<T> {
public ChecksumBlobStoreFormat(
String codec,
String blobNameFormat,
CheckedBiFunction<String, XContentParser, T, IOException> reader,
@Nullable CheckedBiFunction<String, XContentParser, T, IOException> fallbackReader,
CheckedBiFunction<ProjectRepo, XContentParser, T, IOException> reader,
@Nullable CheckedBiFunction<ProjectRepo, XContentParser, T, IOException> fallbackReader,
Function<T, ? extends ToXContent> writer
) {
this.reader = reader;
@ -105,7 +106,7 @@ public final class ChecksumBlobStoreFormat<T> {
public ChecksumBlobStoreFormat(
String codec,
String blobNameFormat,
CheckedBiFunction<String, XContentParser, T, IOException> reader,
CheckedBiFunction<ProjectRepo, XContentParser, T, IOException> reader,
Function<T, ? extends ToXContent> writer
) {
this(codec, blobNameFormat, reader, null, writer);
@ -118,11 +119,11 @@ public final class ChecksumBlobStoreFormat<T> {
* @param name name to be translated into
* @return parsed blob object
*/
public T read(String repoName, BlobContainer blobContainer, String name, NamedXContentRegistry namedXContentRegistry)
public T read(ProjectRepo projectRepo, BlobContainer blobContainer, String name, NamedXContentRegistry namedXContentRegistry)
throws IOException {
String blobName = blobName(name);
try (InputStream in = blobContainer.readBlob(OperationPurpose.SNAPSHOT_METADATA, blobName)) {
return deserialize(repoName, namedXContentRegistry, in);
return deserialize(projectRepo, namedXContentRegistry, in);
}
}
@ -130,7 +131,7 @@ public final class ChecksumBlobStoreFormat<T> {
return String.format(Locale.ROOT, blobNameFormat, name);
}
public T deserialize(String repoName, NamedXContentRegistry namedXContentRegistry, InputStream input) throws IOException {
public T deserialize(ProjectRepo projectRepo, NamedXContentRegistry namedXContentRegistry, InputStream input) throws IOException {
final DeserializeMetaBlobInputStream deserializeMetaBlobInputStream = new DeserializeMetaBlobInputStream(input);
try {
CodecUtil.checkHeader(new InputStreamDataInput(deserializeMetaBlobInputStream), codec, VERSION, VERSION);
@ -154,7 +155,7 @@ public final class ChecksumBlobStoreFormat<T> {
XContentType.SMILE
)
) {
result = reader.apply(repoName, parser);
result = reader.apply(projectRepo, parser);
XContentParserUtils.ensureExpectedToken(null, parser.nextToken(), parser);
} catch (Exception e) {
try (
@ -165,7 +166,7 @@ public final class ChecksumBlobStoreFormat<T> {
XContentType.SMILE
)
) {
result = fallbackReader.apply(repoName, parser);
result = fallbackReader.apply(projectRepo, parser);
XContentParserUtils.ensureExpectedToken(null, parser.nextToken(), parser);
}
}
@ -174,7 +175,7 @@ public final class ChecksumBlobStoreFormat<T> {
XContentParser parser = XContentType.SMILE.xContent()
.createParser(namedXContentRegistry, LoggingDeprecationHandler.INSTANCE, wrappedStream)
) {
result = reader.apply(repoName, parser);
result = reader.apply(projectRepo, parser);
XContentParserUtils.ensureExpectedToken(null, parser.nextToken(), parser);
}
deserializeMetaBlobInputStream.verifyFooter();

View File

@ -12,7 +12,7 @@ package org.elasticsearch.snapshots;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.RepositoryOperation.ProjectRepo;
import org.elasticsearch.repositories.ProjectRepo;
import org.elasticsearch.repositories.RepositoryShardId;
import org.elasticsearch.repositories.ShardGeneration;
import org.elasticsearch.repositories.ShardGenerations;

View File

@ -359,7 +359,7 @@ public final class RestoreService implements ClusterStateApplier {
Metadata globalMetadata = null;
final Metadata.Builder metadataBuilder;
if (request.includeGlobalState()) {
globalMetadata = repository.getSnapshotGlobalMetadata(snapshotId);
globalMetadata = repository.getSnapshotGlobalMetadata(snapshotId, false);
metadataBuilder = Metadata.builder(globalMetadata);
} else {
metadataBuilder = Metadata.builder();
@ -652,7 +652,7 @@ public final class RestoreService implements ClusterStateApplier {
dataStreamAliases = Map.of();
} else {
if (globalMetadata == null) {
globalMetadata = repository.getSnapshotGlobalMetadata(snapshotId);
globalMetadata = repository.getSnapshotGlobalMetadata(snapshotId, false);
}
final Map<String, DataStream> dataStreamsInSnapshot = globalMetadata.getProject().dataStreams();
allDataStreams = Maps.newMapWithExpectedSize(requestedDataStreams.size());

View File

@ -12,6 +12,7 @@ import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsRequest;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
@ -21,6 +22,7 @@ import org.elasticsearch.common.xcontent.XContentParserUtils;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.repositories.ProjectRepo;
import org.elasticsearch.repositories.RepositoryShardId;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xcontent.ConstructingObjectParser;
@ -355,6 +357,10 @@ public final class SnapshotInfo implements Comparable<SnapshotInfo>, ToXContentF
return snapshot.getSnapshotId();
}
public ProjectId projectId() {
return snapshot.getProjectId();
}
public String repository() {
return snapshot.getRepository();
}
@ -699,7 +705,7 @@ public final class SnapshotInfo implements Comparable<SnapshotInfo>, ToXContentF
* handle x-content written with the external version as external x-content
* is only for display purposes and does not need to be parsed.
*/
public static SnapshotInfo fromXContentInternal(final String repoName, final XContentParser parser) throws IOException {
public static SnapshotInfo fromXContentInternal(final ProjectRepo projectRepo, final XContentParser parser) throws IOException {
String name = null;
String uuid = null;
IndexVersion version = IndexVersion.current();
@ -794,7 +800,7 @@ public final class SnapshotInfo implements Comparable<SnapshotInfo>, ToXContentF
uuid = name;
}
return new SnapshotInfo(
new Snapshot(repoName, new SnapshotId(name, uuid)),
new Snapshot(projectRepo.projectId(), projectRepo.name(), new SnapshotId(name, uuid)),
indices,
dataStreams,
featureStates,

View File

@ -82,7 +82,8 @@ public class TransportSnapshotsStatusActionTests extends ESTestCase {
threadPool,
repositoriesService,
nodeClient,
new ActionFilters(Set.of())
new ActionFilters(Set.of()),
TestProjectResolvers.DEFAULT_PROJECT_ONLY
);
}
@ -198,6 +199,7 @@ public class TransportSnapshotsStatusActionTests extends ESTestCase {
action.buildResponse(
SnapshotsInProgress.EMPTY,
ProjectId.DEFAULT,
new SnapshotsStatusRequest(TEST_REQUEST_TIMEOUT),
currentSnapshotEntries,
nodeSnapshotStatuses,
@ -357,6 +359,7 @@ public class TransportSnapshotsStatusActionTests extends ESTestCase {
action.buildResponse(
SnapshotsInProgress.EMPTY,
ProjectId.DEFAULT,
new SnapshotsStatusRequest(TEST_REQUEST_TIMEOUT),
currentSnapshotEntries,
nodeSnapshotStatuses,

View File

@ -559,7 +559,8 @@ public class MetadataDataStreamsServiceTests extends MapperServiceTestCase {
public void testDeleteSnapshotting() {
String dataStreamName = randomAlphaOfLength(5);
Snapshot snapshot = new Snapshot("doesn't matter", new SnapshotId("snapshot name", "snapshot uuid"));
var projectId = randomProjectIdOrDefault();
Snapshot snapshot = new Snapshot(projectId, "doesn't matter", new SnapshotId("snapshot name", "snapshot uuid"));
SnapshotsInProgress snaps = SnapshotsInProgress.EMPTY.withAddedEntry(
SnapshotsInProgress.Entry.snapshot(
snapshot,
@ -578,7 +579,6 @@ public class MetadataDataStreamsServiceTests extends MapperServiceTestCase {
)
);
final DataStream dataStream = DataStreamTestHelper.randomInstance(dataStreamName);
var projectId = randomProjectIdOrDefault();
ProjectState state = ClusterState.builder(ClusterName.DEFAULT)
.putCustom(SnapshotsInProgress.TYPE, snaps)
.putProjectMetadata(ProjectMetadata.builder(projectId).put(dataStream))

View File

@ -88,7 +88,8 @@ public class MetadataDeleteIndexServiceTests extends ESTestCase {
public void testDeleteSnapshotting() {
String indexName = randomAlphaOfLength(5);
Snapshot snapshot = new Snapshot("doesn't matter", new SnapshotId("snapshot name", "snapshot uuid"));
final ProjectId projectId = randomProjectIdOrDefault();
Snapshot snapshot = new Snapshot(projectId, "doesn't matter", new SnapshotId("snapshot name", "snapshot uuid"));
SnapshotsInProgress snaps = SnapshotsInProgress.EMPTY.withAddedEntry(
SnapshotsInProgress.Entry.snapshot(
snapshot,
@ -107,7 +108,7 @@ public class MetadataDeleteIndexServiceTests extends ESTestCase {
)
);
final Index index = new Index(indexName, randomUUID());
ClusterState state = ClusterState.builder(clusterState(index)).putCustom(SnapshotsInProgress.TYPE, snaps).build();
ClusterState state = ClusterState.builder(clusterState(projectId, index)).putCustom(SnapshotsInProgress.TYPE, snaps).build();
Exception e = expectThrows(
SnapshotInProgressException.class,
() -> MetadataDeleteIndexService.deleteIndices(state, Set.of(index), Settings.EMPTY)
@ -125,9 +126,8 @@ public class MetadataDeleteIndexServiceTests extends ESTestCase {
// Create an unassigned index
String indexName = randomAlphaOfLength(5);
Index index = new Index(indexName, randomUUID());
ClusterState before = clusterState(index);
final var projectId = before.metadata().projectFor(index).id();
final ProjectId projectId = randomProjectIdOrDefault();
ClusterState before = clusterState(projectId, index);
// Mock the built reroute
when(allocationService.reroute(any(ClusterState.class), anyString(), any())).then(i -> i.getArguments()[0]);
@ -433,11 +433,10 @@ public class MetadataDeleteIndexServiceTests extends ESTestCase {
assertThat(after.metadata().projects(), aMapWithSize(numProjects));
}
private ClusterState clusterState(Index index) {
private ClusterState clusterState(ProjectId projectId, Index index) {
final IndexMetadata indexMetadata = IndexMetadata.builder(index.getName())
.settings(indexSettings(IndexVersionUtils.randomVersion(), index.getUUID(), 1, 1))
.build();
final ProjectId projectId = randomProjectIdOrDefault();
final Metadata.Builder metadataBuilder = Metadata.builder().put(ProjectMetadata.builder(projectId).put(indexMetadata, false));
if (randomBoolean()) {
@ -454,7 +453,7 @@ public class MetadataDeleteIndexServiceTests extends ESTestCase {
return ClusterState.builder(ClusterName.DEFAULT)
.metadata(metadata)
.routingTable(GlobalRoutingTableTestHelper.buildRoutingTable(metadata, RoutingTable.Builder::addAsNew))
.blocks(ClusterBlocks.builder().addBlocks(indexMetadata))
.blocks(ClusterBlocks.builder().addBlocks(projectId, indexMetadata))
.build();
}
}

View File

@ -452,7 +452,11 @@ public class MetadataIndexStateServiceTests extends ESTestCase {
);
}
final Snapshot snapshot = new Snapshot(randomAlphaOfLength(10), new SnapshotId(randomAlphaOfLength(5), randomAlphaOfLength(5)));
final Snapshot snapshot = new Snapshot(
projectId,
randomAlphaOfLength(10),
new SnapshotId(randomAlphaOfLength(5), randomAlphaOfLength(5))
);
final SnapshotsInProgress.Entry entry = SnapshotsInProgress.Entry.snapshot(
snapshot,
randomBoolean(),

View File

@ -32,7 +32,7 @@ public class InvalidRepositoryTests extends ESTestCase {
assertThat(repository.getProjectId(), equalTo(projectId));
final var expectedException = expectThrows(
RepositoryException.class,
() -> repository.getSnapshotGlobalMetadata(new SnapshotId("name", "uuid"))
() -> repository.getSnapshotGlobalMetadata(new SnapshotId("name", "uuid"), false)
);
assertThat(expectedException.getMessage(), equalTo("[name] repository type [type] failed to create on current node"));
assertThat(expectedException.getCause(), isA(RepositoryException.class));

View File

@ -670,7 +670,7 @@ public class RepositoriesServiceTests extends ESTestCase {
}
@Override
public Metadata getSnapshotGlobalMetadata(SnapshotId snapshotId) {
public Metadata getSnapshotGlobalMetadata(SnapshotId snapshotId, boolean fromProjectMetadata) {
return null;
}

View File

@ -24,7 +24,7 @@ public class UnknownTypeRepositoryTests extends ESTestCase {
public void testShouldThrowWhenGettingMetadata() {
assertThat(repository.getProjectId(), equalTo(projectId));
expectThrows(RepositoryException.class, () -> repository.getSnapshotGlobalMetadata(new SnapshotId("name", "uuid")));
expectThrows(RepositoryException.class, () -> repository.getSnapshotGlobalMetadata(new SnapshotId("name", "uuid"), false));
}
public void testShouldNotThrowWhenApplyingLifecycleChanges() {

View File

@ -180,6 +180,7 @@ public class BlobStoreRepositoryRestoreTests extends IndexShardTestCase {
final RepositoryData ignoredRepositoryData = safeAwait(
listener -> repository.finalizeSnapshot(
new FinalizeSnapshotContext(
false,
snapshotShardGenerations,
RepositoryData.EMPTY_REPO_GEN,
Metadata.builder().put(shard.indexSettings().getIndexMetadata(), false).build(),

View File

@ -700,7 +700,7 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase {
"new repo uuid message",
BlobStoreRepository.class.getCanonicalName(),
Level.INFO,
Strings.format("Generated new repository UUID [*] for repository [%s] in generation [*]", repoName)
Strings.format("Generated new repository UUID [*] for repository %s in generation [*]", repo.toStringShort())
)
);
@ -729,7 +729,7 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase {
"existing repo uuid message",
RepositoriesService.class.getCanonicalName(),
Level.INFO,
Strings.format("Registering repository [%s] with repository UUID *", repoName)
Strings.format("Registering repository %s with repository UUID *", repo.toStringShort())
)
);
@ -785,7 +785,7 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase {
"existing repo uuid message",
RepositoriesService.class.getCanonicalName(),
Level.INFO,
Strings.format("Registering repository [%s] with repository UUID *", repoName)
Strings.format("Registering repository %s with repository UUID *", repo.toStringShort())
)
);
},

View File

@ -11,6 +11,7 @@ package org.elasticsearch.snapshots;
import org.elasticsearch.ElasticsearchCorruptionException;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
@ -19,6 +20,7 @@ import org.elasticsearch.common.blobstore.support.BlobMetadata;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.xcontent.XContentParserUtils;
import org.elasticsearch.core.Streams;
import org.elasticsearch.repositories.ProjectRepo;
import org.elasticsearch.repositories.blobstore.ChecksumBlobStoreFormat;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xcontent.ToXContent;
@ -95,8 +97,9 @@ public class BlobStoreFormatTests extends ESTestCase {
checksumSMILE.write(new BlobObj(compressedText), blobContainer, "check-smile-comp", true);
// Assert that all checksum blobs can be read
assertEquals(normalText, checksumSMILE.read("repo", blobContainer, "check-smile", xContentRegistry()).getText());
assertEquals(compressedText, checksumSMILE.read("repo", blobContainer, "check-smile-comp", xContentRegistry()).getText());
final var projectRepo = new ProjectRepo(ProjectId.DEFAULT, "repo");
assertEquals(normalText, checksumSMILE.read(projectRepo, blobContainer, "check-smile", xContentRegistry()).getText());
assertEquals(compressedText, checksumSMILE.read(projectRepo, blobContainer, "check-smile-comp", xContentRegistry()).getText());
}
public void testCompressionIsApplied() throws IOException {
@ -133,10 +136,11 @@ public class BlobStoreFormatTests extends ESTestCase {
Function.identity()
);
checksumFormat.write(blobObj, blobContainer, "test-path", randomBoolean());
assertEquals(checksumFormat.read("repo", blobContainer, "test-path", xContentRegistry()).getText(), testString);
final var projectRepo = new ProjectRepo(ProjectId.DEFAULT, "repo");
assertEquals(checksumFormat.read(projectRepo, blobContainer, "test-path", xContentRegistry()).getText(), testString);
randomCorruption(blobContainer, "test-path");
try {
checksumFormat.read("repo", blobContainer, "test-path", xContentRegistry());
checksumFormat.read(projectRepo, blobContainer, "test-path", xContentRegistry());
fail("Should have failed due to corruption");
} catch (ElasticsearchCorruptionException | EOFException ex) {
// expected exceptions from random byte corruption

View File

@ -11,6 +11,7 @@ package org.elasticsearch.snapshots;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.repositories.ProjectRepo;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.test.AbstractWireTestCase;
import org.elasticsearch.xcontent.NamedXContentRegistry;
@ -34,7 +35,7 @@ public class SnapshotInfoBlobSerializationTests extends AbstractWireTestCase<Sna
final BytesStreamOutput out = new BytesStreamOutput();
BlobStoreRepository.SNAPSHOT_FORMAT.serialize(instance, "test", randomBoolean(), out);
return BlobStoreRepository.SNAPSHOT_FORMAT.deserialize(
instance.repository(),
new ProjectRepo(instance.projectId(), instance.repository()),
NamedXContentRegistry.EMPTY,
out.bytes().streamInput()
);

View File

@ -2421,7 +2421,8 @@ public class SnapshotResiliencyTests extends ESTestCase {
repositoriesService,
transportService,
actionFilters,
EmptySystemIndices.INSTANCE
EmptySystemIndices.INSTANCE,
false
);
nodeEnv = new NodeEnvironment(settings, environment);
final NamedXContentRegistry namedXContentRegistry = new NamedXContentRegistry(Collections.emptyList());
@ -2781,19 +2782,47 @@ public class SnapshotResiliencyTests extends ESTestCase {
);
actions.put(
TransportCleanupRepositoryAction.TYPE,
new TransportCleanupRepositoryAction(transportService, clusterService, repositoriesService, threadPool, actionFilters)
new TransportCleanupRepositoryAction(
transportService,
clusterService,
repositoriesService,
threadPool,
actionFilters,
TestProjectResolvers.DEFAULT_PROJECT_ONLY
)
);
actions.put(
TransportCreateSnapshotAction.TYPE,
new TransportCreateSnapshotAction(transportService, clusterService, threadPool, snapshotsService, actionFilters)
new TransportCreateSnapshotAction(
transportService,
clusterService,
threadPool,
snapshotsService,
actionFilters,
TestProjectResolvers.DEFAULT_PROJECT_ONLY
)
);
actions.put(
TransportCloneSnapshotAction.TYPE,
new TransportCloneSnapshotAction(transportService, clusterService, threadPool, snapshotsService, actionFilters)
new TransportCloneSnapshotAction(
transportService,
clusterService,
threadPool,
snapshotsService,
actionFilters,
TestProjectResolvers.DEFAULT_PROJECT_ONLY
)
);
actions.put(
TransportGetSnapshotsAction.TYPE,
new TransportGetSnapshotsAction(transportService, clusterService, threadPool, repositoriesService, actionFilters)
new TransportGetSnapshotsAction(
transportService,
clusterService,
threadPool,
repositoriesService,
actionFilters,
TestProjectResolvers.DEFAULT_PROJECT_ONLY
)
);
actions.put(
TransportClusterRerouteAction.TYPE,
@ -2843,7 +2872,14 @@ public class SnapshotResiliencyTests extends ESTestCase {
);
actions.put(
TransportDeleteSnapshotAction.TYPE,
new TransportDeleteSnapshotAction(transportService, clusterService, threadPool, snapshotsService, actionFilters)
new TransportDeleteSnapshotAction(
transportService,
clusterService,
threadPool,
snapshotsService,
actionFilters,
TestProjectResolvers.DEFAULT_PROJECT_ONLY
)
);
client.initialize(
actions,

View File

@ -35,7 +35,7 @@ import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.RepositoryOperation.ProjectRepo;
import org.elasticsearch.repositories.ProjectRepo;
import org.elasticsearch.repositories.ShardGeneration;
import org.elasticsearch.repositories.ShardSnapshotResult;
import org.elasticsearch.test.AbstractChunkedSerializingTestCase;

View File

@ -82,7 +82,7 @@ public abstract class RestoreOnlyRepository extends AbstractLifecycleComponent i
}
@Override
public Metadata getSnapshotGlobalMetadata(SnapshotId snapshotId) {
public Metadata getSnapshotGlobalMetadata(SnapshotId snapshotId, boolean fromProjectMetadata) {
return null;
}

View File

@ -22,6 +22,7 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.SnapshotDeletionsInProgress;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.RepositoriesMetadata;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
@ -39,6 +40,7 @@ import org.elasticsearch.index.IndexVersions;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.FinalizeSnapshotContext;
import org.elasticsearch.repositories.FinalizeSnapshotContext.UpdatedShardGenerations;
import org.elasticsearch.repositories.ProjectRepo;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.RepositoryData;
@ -423,7 +425,7 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
.replace(IndexVersion.current().toString(), version.toString())
)
) {
downgradedSnapshotInfo = SnapshotInfo.fromXContentInternal(repoName, parser);
downgradedSnapshotInfo = SnapshotInfo.fromXContentInternal(new ProjectRepo(ProjectId.DEFAULT, repoName), parser);
}
final BlobStoreRepository blobStoreRepository = getRepositoryOnMaster(repoName);
BlobStoreRepository.SNAPSHOT_FORMAT.write(
@ -545,6 +547,7 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
safeAwait(
(ActionListener<RepositoryData> listener) -> repo.finalizeSnapshot(
new FinalizeSnapshotContext(
false,
UpdatedShardGenerations.EMPTY,
getRepositoryData(repoName).getGenId(),
state.metadata(),

View File

@ -281,7 +281,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
}
@Override
public Metadata getSnapshotGlobalMetadata(SnapshotId snapshotId) {
public Metadata getSnapshotGlobalMetadata(SnapshotId snapshotId, boolean fromProjectMetadata) {
assert SNAPSHOT_ID.equals(snapshotId) : "RemoteClusterRepository only supports " + SNAPSHOT_ID + " as the SnapshotId";
var remoteClient = getRemoteClusterClient();
ClusterStateResponse clusterState = executeRecoveryAction(

View File

@ -100,6 +100,7 @@ public final class SourceOnlySnapshotRepository extends FilterRepository {
// required engine, that the index is read-only and the mapping to a default mapping
super.finalizeSnapshot(
new FinalizeSnapshotContext(
finalizeSnapshotContext.serializeProjectMetadata(),
finalizeSnapshotContext.updatedShardGenerations(),
finalizeSnapshotContext.repositoryStateId(),
metadataToSnapshot(

View File

@ -145,7 +145,7 @@ public abstract class AsyncRetryDuringSnapshotActionStep extends AsyncActionStep
// The index has since been deleted, mission accomplished!
return true;
}
for (List<SnapshotsInProgress.Entry> snapshots : SnapshotsInProgress.get(state).entriesByRepo()) {
for (List<SnapshotsInProgress.Entry> snapshots : SnapshotsInProgress.get(state).entriesByRepo(projectId)) {
for (SnapshotsInProgress.Entry snapshot : snapshots) {
if (snapshot.indices().containsKey(indexName)) {
// There is a snapshot running with this index name

View File

@ -375,6 +375,7 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase {
.build();
repository.finalizeSnapshot(
new FinalizeSnapshotContext(
false,
new UpdatedShardGenerations(shardGenerations, ShardGenerations.EMPTY),
ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository).getGenId(),
Metadata.builder().put(shard.indexSettings().getIndexMetadata(), false).build(),

View File

@ -251,6 +251,7 @@ public class SLMStatDisruptionIT extends AbstractSnapshotIntegTestCase {
@Override
public void finalizeSnapshot(FinalizeSnapshotContext fsc) {
var newFinalizeContext = new FinalizeSnapshotContext(
false,
fsc.updatedShardGenerations(),
fsc.repositoryStateId(),
fsc.clusterMetadata(),

View File

@ -20,6 +20,7 @@ import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.scheduler.SchedulerEngine;
import org.elasticsearch.core.FixForMultiProject;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.snapshots.RegisteredPolicySnapshots;
@ -218,7 +219,9 @@ public class SnapshotLifecycleTask implements SchedulerEngine.Listener {
static Set<SnapshotId> currentlyRunningSnapshots(ClusterState clusterState) {
final SnapshotsInProgress snapshots = clusterState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
final Set<SnapshotId> currentlyRunning = new HashSet<>();
for (final List<SnapshotsInProgress.Entry> entriesForRepo : snapshots.entriesByRepo()) {
@FixForMultiProject(description = "replace with snapshots.entriesByRepo(ProjectId) when SLM is project aware")
final Iterable<List<SnapshotsInProgress.Entry>> entriesByRepo = snapshots.entriesByRepo();
for (final List<SnapshotsInProgress.Entry> entriesForRepo : entriesByRepo) {
for (SnapshotsInProgress.Entry entry : entriesForRepo) {
currentlyRunning.add(entry.snapshot().getSnapshotId());
}

View File

@ -86,7 +86,7 @@ public class TransportGetSnapshotLifecycleAction extends TransportMasterNodeActi
}
} else {
final Map<String, SnapshotLifecyclePolicyItem.SnapshotInProgress> inProgress = new HashMap<>();
for (List<SnapshotsInProgress.Entry> entriesForRepo : SnapshotsInProgress.get(state).entriesByRepo()) {
for (List<SnapshotsInProgress.Entry> entriesForRepo : SnapshotsInProgress.get(state).entriesByRepo(projectMetadata.id())) {
for (SnapshotsInProgress.Entry entry : entriesForRepo) {
Map<String, Object> meta = entry.userMetadata();
if (meta == null

View File

@ -14,6 +14,7 @@ import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
@ -27,6 +28,7 @@ import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshotsI
import org.elasticsearch.index.snapshots.blobstore.SnapshotFiles;
import org.elasticsearch.index.store.StoreFileMetadata;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.ProjectRepo;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.ShardGenerations;
@ -468,7 +470,11 @@ public class RepositoryVerifyIntegrityIT extends AbstractSnapshotIntegTestCase {
final SnapshotInfo snapshotInfo;
try (var inputStream = Files.newInputStream(snapshotInfoBlob)) {
snapshotInfo = SNAPSHOT_FORMAT.deserialize(testContext.repositoryName(), xContentRegistry(), inputStream);
snapshotInfo = SNAPSHOT_FORMAT.deserialize(
new ProjectRepo(ProjectId.DEFAULT, testContext.repositoryName()),
xContentRegistry(),
inputStream
);
}
final var newIndices = new ArrayList<>(snapshotInfo.indices());
@ -727,7 +733,7 @@ public class RepositoryVerifyIntegrityIT extends AbstractSnapshotIntegTestCase {
final BlobStoreIndexShardSnapshots blobStoreIndexShardSnapshots;
try (var inputStream = Files.newInputStream(shardGenerationBlob)) {
blobStoreIndexShardSnapshots = INDEX_SHARD_SNAPSHOTS_FORMAT.deserialize(
testContext.repositoryName(),
new ProjectRepo(ProjectId.DEFAULT, testContext.repositoryName()),
xContentRegistry(),
inputStream
);

View File

@ -315,7 +315,7 @@ class RepositoryIntegrityVerifier {
private void verifySnapshotGlobalMetadata(ActionListener<Void> listener) {
metadataTaskRunner.run(ActionRunnable.wrap(listener, l -> {
try {
blobStoreRepository.getSnapshotGlobalMetadata(snapshotId);
blobStoreRepository.getSnapshotGlobalMetadata(snapshotId, false);
// no checks here, loading it is enough
l.onResponse(null);
} catch (Exception e) {

View File

@ -47,8 +47,6 @@ tasks.named("yamlRestTest").configure {
ArrayList<String> blacklist = [
/* These tests don't work on multi-project yet - we need to go through each of them and make them work */
'^cat.recovery/*/*',
'^cat.repositories/*/*',
'^cat.snapshots/*/*',
'^cluster.desired_balance/10_basic/*',
'^cluster.stats/10_basic/snapshot stats reported in get cluster stats',
'^data_stream/40_supported_apis/Verify shard stores api', // uses _shard_stores API
@ -58,14 +56,7 @@ tasks.named("yamlRestTest").configure {
'^indices.resolve_cluster/*/*/*',
'^indices.shard_stores/*/*',
'^migration/*/*',
'^nodes.stats/70_repository_throttling_stats/Repository throttling stats (some repositories exist)',
'^snapshot.clone/*/*',
'^snapshot.create/*/*',
'^snapshot.delete/*/*',
'^snapshot.get/*/*',
'^snapshot.get_repository/*/*',
'^snapshot.restore/*/*',
'^snapshot.status/*/*',
'^synonyms/*/*',
'^tsdb/30_snapshot/*',
'^update_by_query/80_scripting/Update all docs with one deletion and one noop using a stored script', // scripting is not project aware yet