Reduce data amplification in IVFVectorsWriter (#129698)
With this change we will create first the tmp file and the posting list and once the file is deleted we will merge the vectors on the vec file. Therefore we only have two copies of the vector at the same time.
This commit is contained in:
parent
3c1137688e
commit
5bec44ad58
|
@ -225,103 +225,108 @@ public abstract class IVFVectorsWriter extends KnnVectorsWriter {
|
|||
}
|
||||
|
||||
@Override
|
||||
@SuppressForbidden(reason = "require usage of Lucene's IOUtils#deleteFilesIgnoringExceptions(...)")
|
||||
public final void mergeOneField(FieldInfo fieldInfo, MergeState mergeState) throws IOException {
|
||||
rawVectorDelegate.mergeOneField(fieldInfo, mergeState);
|
||||
if (fieldInfo.getVectorEncoding().equals(VectorEncoding.FLOAT32)) {
|
||||
final int numVectors;
|
||||
String tempRawVectorsFileName = null;
|
||||
boolean success = false;
|
||||
// build a float vector values with random access. In order to do that we dump the vectors to
|
||||
// a temporary file
|
||||
// and write the docID follow by the vector
|
||||
try (IndexOutput out = mergeState.segmentInfo.dir.createTempOutput(mergeState.segmentInfo.name, "ivf_", IOContext.DEFAULT)) {
|
||||
tempRawVectorsFileName = out.getName();
|
||||
// TODO do this better, we shouldn't have to write to a temp file, we should be able to
|
||||
// to just from the merged vector values, the tricky part is the random access.
|
||||
numVectors = writeFloatVectorValues(fieldInfo, out, MergedVectorValues.mergeFloatVectorValues(fieldInfo, mergeState));
|
||||
CodecUtil.writeFooter(out);
|
||||
success = true;
|
||||
} finally {
|
||||
if (success == false && tempRawVectorsFileName != null) {
|
||||
org.apache.lucene.util.IOUtils.deleteFilesIgnoringExceptions(mergeState.segmentInfo.dir, tempRawVectorsFileName);
|
||||
}
|
||||
}
|
||||
try (IndexInput in = mergeState.segmentInfo.dir.openInput(tempRawVectorsFileName, IOContext.DEFAULT)) {
|
||||
float[] calculatedGlobalCentroid = new float[fieldInfo.getVectorDimension()];
|
||||
final FloatVectorValues floatVectorValues = getFloatVectorValues(fieldInfo, in, numVectors);
|
||||
success = false;
|
||||
long centroidOffset;
|
||||
long centroidLength;
|
||||
String centroidTempName = null;
|
||||
int numCentroids;
|
||||
IndexOutput centroidTemp = null;
|
||||
CentroidAssignments centroidAssignments;
|
||||
try {
|
||||
centroidTemp = mergeState.segmentInfo.dir.createTempOutput(mergeState.segmentInfo.name, "civf_", IOContext.DEFAULT);
|
||||
centroidTempName = centroidTemp.getName();
|
||||
mergeOneFieldIVF(fieldInfo, mergeState);
|
||||
}
|
||||
// we merge the vectors at the end so we only have two copies of the vectors on disk at the same time.
|
||||
rawVectorDelegate.mergeOneField(fieldInfo, mergeState);
|
||||
}
|
||||
|
||||
centroidAssignments = calculateAndWriteCentroids(
|
||||
fieldInfo,
|
||||
floatVectorValues,
|
||||
centroidTemp,
|
||||
mergeState,
|
||||
calculatedGlobalCentroid
|
||||
);
|
||||
numCentroids = centroidAssignments.numCentroids();
|
||||
|
||||
success = true;
|
||||
} finally {
|
||||
if (success == false && centroidTempName != null) {
|
||||
IOUtils.closeWhileHandlingException(centroidTemp);
|
||||
org.apache.lucene.util.IOUtils.deleteFilesIgnoringExceptions(mergeState.segmentInfo.dir, centroidTempName);
|
||||
}
|
||||
}
|
||||
try {
|
||||
if (numCentroids == 0) {
|
||||
centroidOffset = ivfCentroids.getFilePointer();
|
||||
writeMeta(fieldInfo, centroidOffset, 0, new long[0], null);
|
||||
CodecUtil.writeFooter(centroidTemp);
|
||||
IOUtils.close(centroidTemp);
|
||||
return;
|
||||
}
|
||||
CodecUtil.writeFooter(centroidTemp);
|
||||
IOUtils.close(centroidTemp);
|
||||
centroidOffset = ivfCentroids.alignFilePointer(Float.BYTES);
|
||||
try (IndexInput centroidsInput = mergeState.segmentInfo.dir.openInput(centroidTempName, IOContext.DEFAULT)) {
|
||||
ivfCentroids.copyBytes(centroidsInput, centroidsInput.length() - CodecUtil.footerLength());
|
||||
centroidLength = ivfCentroids.getFilePointer() - centroidOffset;
|
||||
|
||||
CentroidSupplier centroidSupplier = createCentroidSupplier(
|
||||
centroidsInput,
|
||||
numCentroids,
|
||||
fieldInfo,
|
||||
calculatedGlobalCentroid
|
||||
);
|
||||
|
||||
// build a float vector values with random access
|
||||
// build centroids
|
||||
final long[] offsets = buildAndWritePostingsLists(
|
||||
fieldInfo,
|
||||
centroidSupplier,
|
||||
floatVectorValues,
|
||||
ivfClusters,
|
||||
centroidAssignments.assignmentsByCluster()
|
||||
);
|
||||
assert offsets.length == centroidSupplier.size();
|
||||
writeMeta(fieldInfo, centroidOffset, centroidLength, offsets, calculatedGlobalCentroid);
|
||||
}
|
||||
} finally {
|
||||
org.apache.lucene.util.IOUtils.deleteFilesIgnoringExceptions(
|
||||
mergeState.segmentInfo.dir,
|
||||
tempRawVectorsFileName,
|
||||
centroidTempName
|
||||
);
|
||||
}
|
||||
} finally {
|
||||
@SuppressForbidden(reason = "require usage of Lucene's IOUtils#deleteFilesIgnoringExceptions(...)")
|
||||
private void mergeOneFieldIVF(FieldInfo fieldInfo, MergeState mergeState) throws IOException {
|
||||
final int numVectors;
|
||||
String tempRawVectorsFileName = null;
|
||||
boolean success = false;
|
||||
// build a float vector values with random access. In order to do that we dump the vectors to
|
||||
// a temporary file
|
||||
// and write the docID follow by the vector
|
||||
try (IndexOutput out = mergeState.segmentInfo.dir.createTempOutput(mergeState.segmentInfo.name, "ivf_", IOContext.DEFAULT)) {
|
||||
tempRawVectorsFileName = out.getName();
|
||||
// TODO do this better, we shouldn't have to write to a temp file, we should be able to
|
||||
// to just from the merged vector values, the tricky part is the random access.
|
||||
numVectors = writeFloatVectorValues(fieldInfo, out, MergedVectorValues.mergeFloatVectorValues(fieldInfo, mergeState));
|
||||
CodecUtil.writeFooter(out);
|
||||
success = true;
|
||||
} finally {
|
||||
if (success == false && tempRawVectorsFileName != null) {
|
||||
org.apache.lucene.util.IOUtils.deleteFilesIgnoringExceptions(mergeState.segmentInfo.dir, tempRawVectorsFileName);
|
||||
}
|
||||
}
|
||||
try (IndexInput in = mergeState.segmentInfo.dir.openInput(tempRawVectorsFileName, IOContext.DEFAULT)) {
|
||||
float[] calculatedGlobalCentroid = new float[fieldInfo.getVectorDimension()];
|
||||
final FloatVectorValues floatVectorValues = getFloatVectorValues(fieldInfo, in, numVectors);
|
||||
success = false;
|
||||
long centroidOffset;
|
||||
long centroidLength;
|
||||
String centroidTempName = null;
|
||||
int numCentroids;
|
||||
IndexOutput centroidTemp = null;
|
||||
CentroidAssignments centroidAssignments;
|
||||
try {
|
||||
centroidTemp = mergeState.segmentInfo.dir.createTempOutput(mergeState.segmentInfo.name, "civf_", IOContext.DEFAULT);
|
||||
centroidTempName = centroidTemp.getName();
|
||||
|
||||
centroidAssignments = calculateAndWriteCentroids(
|
||||
fieldInfo,
|
||||
floatVectorValues,
|
||||
centroidTemp,
|
||||
mergeState,
|
||||
calculatedGlobalCentroid
|
||||
);
|
||||
numCentroids = centroidAssignments.numCentroids();
|
||||
|
||||
success = true;
|
||||
} finally {
|
||||
if (success == false && centroidTempName != null) {
|
||||
IOUtils.closeWhileHandlingException(centroidTemp);
|
||||
org.apache.lucene.util.IOUtils.deleteFilesIgnoringExceptions(mergeState.segmentInfo.dir, centroidTempName);
|
||||
}
|
||||
}
|
||||
try {
|
||||
if (numCentroids == 0) {
|
||||
centroidOffset = ivfCentroids.getFilePointer();
|
||||
writeMeta(fieldInfo, centroidOffset, 0, new long[0], null);
|
||||
CodecUtil.writeFooter(centroidTemp);
|
||||
IOUtils.close(centroidTemp);
|
||||
return;
|
||||
}
|
||||
CodecUtil.writeFooter(centroidTemp);
|
||||
IOUtils.close(centroidTemp);
|
||||
centroidOffset = ivfCentroids.alignFilePointer(Float.BYTES);
|
||||
try (IndexInput centroidsInput = mergeState.segmentInfo.dir.openInput(centroidTempName, IOContext.DEFAULT)) {
|
||||
ivfCentroids.copyBytes(centroidsInput, centroidsInput.length() - CodecUtil.footerLength());
|
||||
centroidLength = ivfCentroids.getFilePointer() - centroidOffset;
|
||||
|
||||
CentroidSupplier centroidSupplier = createCentroidSupplier(
|
||||
centroidsInput,
|
||||
numCentroids,
|
||||
fieldInfo,
|
||||
calculatedGlobalCentroid
|
||||
);
|
||||
|
||||
// build a float vector values with random access
|
||||
// build centroids
|
||||
final long[] offsets = buildAndWritePostingsLists(
|
||||
fieldInfo,
|
||||
centroidSupplier,
|
||||
floatVectorValues,
|
||||
ivfClusters,
|
||||
centroidAssignments.assignmentsByCluster()
|
||||
);
|
||||
assert offsets.length == centroidSupplier.size();
|
||||
writeMeta(fieldInfo, centroidOffset, centroidLength, offsets, calculatedGlobalCentroid);
|
||||
}
|
||||
} finally {
|
||||
org.apache.lucene.util.IOUtils.deleteFilesIgnoringExceptions(
|
||||
mergeState.segmentInfo.dir,
|
||||
tempRawVectorsFileName,
|
||||
centroidTempName
|
||||
);
|
||||
}
|
||||
} finally {
|
||||
org.apache.lucene.util.IOUtils.deleteFilesIgnoringExceptions(mergeState.segmentInfo.dir, tempRawVectorsFileName);
|
||||
}
|
||||
}
|
||||
|
||||
private static FloatVectorValues getFloatVectorValues(FieldInfo fieldInfo, IndexInput randomAccessInput, int numVectors) {
|
||||
|
|
Loading…
Reference in New Issue