Delete unowned documents during split (#130240)

This commit is contained in:
Oleksandr Kolomiiets 2025-07-02 16:20:10 -07:00 committed by GitHub
parent 9e6464eeae
commit f3c5eb7815
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 25 additions and 3 deletions

View File

@ -207,6 +207,10 @@ public class IndexReshardingMetadata implements ToXContentFragment, Writeable {
return new IndexReshardingMetadata(IndexReshardingState.Split.newSplitByMultiple(shardCount, multiple));
}
public static boolean isSplitSource(ShardId shardId, @Nullable IndexReshardingMetadata reshardingMetadata) {
return reshardingMetadata != null && reshardingMetadata.isSplit() && reshardingMetadata.getSplit().isSourceShard(shardId.id());
}
public static boolean isSplitTarget(ShardId shardId, @Nullable IndexReshardingMetadata reshardingMetadata) {
return reshardingMetadata != null && reshardingMetadata.isSplit() && reshardingMetadata.getSplit().isTargetShard(shardId.id());
}
@ -221,6 +225,16 @@ public class IndexReshardingMetadata implements ToXContentFragment, Writeable {
return new IndexReshardingMetadata(builder.build());
}
public IndexReshardingMetadata transitionSplitSourceToNewState(
ShardId shardId,
IndexReshardingState.Split.SourceShardState newSourceState
) {
assert state instanceof IndexReshardingState.Split;
IndexReshardingState.Split.Builder builder = new IndexReshardingState.Split.Builder((IndexReshardingState.Split) state);
builder.setSourceShardState(shardId.getId(), newSourceState);
return new IndexReshardingMetadata(builder.build());
}
/**
* @return the split state of this metadata block, or throw IllegalArgumentException if this metadata doesn't represent a split
*/

View File

@ -350,8 +350,12 @@ public abstract sealed class IndexReshardingState implements Writeable, ToXConte
return sourceShards[shardNum];
}
public boolean isSourceShard(int shardId) {
return shardId < shardCountBefore();
}
public boolean isTargetShard(int shardId) {
return shardId >= shardCountBefore();
return isSourceShard(shardId) == false;
}
/**
@ -389,6 +393,10 @@ public abstract sealed class IndexReshardingState implements Writeable, ToXConte
return Arrays.stream(targetShards);
}
public Stream<SourceShardState> sourceStates() {
return Arrays.stream(sourceShards);
}
/**
* Check whether all target shards for the given source shard are done.
* @param shardNum a source shard index greater than or equal to 0 and less than the original shard count

View File

@ -1004,7 +1004,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
ShardRouting sourceShardRouting = routingTable.shardRoutingTable(sourceShardId).primaryShard();
if (sourceShardRouting.active() == false) {
assert false : sourceShardRouting;
assert false : sourceShardRouting.shortSummary();
logger.trace("can't find reshard split source node because source shard {} is not active.", sourceShardRouting);
return null;
}
@ -1014,7 +1014,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
assert false : "Source node for reshard does not exist: " + sourceShardRouting.currentNodeId();
logger.trace(
"can't find reshard split source node because source shard {} is assigned to an unknown node.",
sourceShardRouting
sourceShardRouting.shortSummary()
);
return null;
}