Merge 3ace6a407e
into 7e6874ad80
This commit is contained in:
commit
477bbb2f8c
|
@ -60,6 +60,7 @@ import org.springframework.util.CollectionUtils;
|
||||||
*
|
*
|
||||||
* @author Arjen Poutsma
|
* @author Arjen Poutsma
|
||||||
* @author Brian Clozel
|
* @author Brian Clozel
|
||||||
|
* @author Nabil Fawwaz Elqayyim
|
||||||
* @since 5.0
|
* @since 5.0
|
||||||
*/
|
*/
|
||||||
public abstract class DataBufferUtils {
|
public abstract class DataBufferUtils {
|
||||||
|
@ -858,7 +859,20 @@ public abstract class DataBufferUtils {
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Base class for a {@link NestedMatcher}.
|
* Base {@link NestedMatcher} implementation that scans a {@link DataBuffer}
|
||||||
|
* for a specific delimiter.
|
||||||
|
*
|
||||||
|
* <p>Relies on a per-instance reusable buffer to scan data in chunks,
|
||||||
|
* minimizing allocations and improving performance for large or streaming data.</p>
|
||||||
|
*
|
||||||
|
* <p>Each matcher maintains its own state and buffer, ensuring safe use
|
||||||
|
* in reactive pipelines where execution may shift across threads.</p>
|
||||||
|
*
|
||||||
|
* <p>Subclasses may extend this class to customize matching strategies
|
||||||
|
* while reusing the built-in delimiter tracking and scanning logic.</p>
|
||||||
|
*
|
||||||
|
* @see NestedMatcher
|
||||||
|
* @see DataBuffer
|
||||||
*/
|
*/
|
||||||
private abstract static class AbstractNestedMatcher implements NestedMatcher {
|
private abstract static class AbstractNestedMatcher implements NestedMatcher {
|
||||||
|
|
||||||
|
@ -866,6 +880,7 @@ public abstract class DataBufferUtils {
|
||||||
|
|
||||||
private int matches = 0;
|
private int matches = 0;
|
||||||
|
|
||||||
|
private final byte[] localBuffer = new byte[8 * 1024]; // Reusable buffer per matcher instance
|
||||||
|
|
||||||
protected AbstractNestedMatcher(byte[] delimiter) {
|
protected AbstractNestedMatcher(byte[] delimiter) {
|
||||||
this.delimiter = delimiter;
|
this.delimiter = delimiter;
|
||||||
|
@ -881,14 +896,79 @@ public abstract class DataBufferUtils {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int match(DataBuffer dataBuffer) {
|
public int match(DataBuffer dataBuffer) {
|
||||||
for (int pos = dataBuffer.readPosition(); pos < dataBuffer.writePosition(); pos++) {
|
final int readPos = dataBuffer.readPosition();
|
||||||
byte b = dataBuffer.getByte(pos);
|
final int writePos = dataBuffer.writePosition();
|
||||||
if (match(b)) {
|
final int length = writePos - readPos;
|
||||||
|
|
||||||
|
final byte[] delimiterBytes = this.delimiter;
|
||||||
|
final int delimiterLength = delimiterBytes.length;
|
||||||
|
final byte delimiterFirstByte = delimiterBytes[0];
|
||||||
|
|
||||||
|
final byte[] chunk = localBuffer;
|
||||||
|
final int chunkSize = Math.min(chunk.length, length);
|
||||||
|
|
||||||
|
int matchIndex = this.matches;
|
||||||
|
|
||||||
|
try {
|
||||||
|
for (int offset = 0; offset < length; offset += chunkSize) {
|
||||||
|
int currentChunkSize = Math.min(chunkSize, length - offset);
|
||||||
|
|
||||||
|
dataBuffer.readPosition(readPos + offset);
|
||||||
|
dataBuffer.read(chunk, 0, currentChunkSize);
|
||||||
|
|
||||||
|
matchIndex = processChunk(chunk, currentChunkSize, delimiterBytes, delimiterLength, delimiterFirstByte, matchIndex, readPos, offset);
|
||||||
|
if (matchIndex < 0) {
|
||||||
|
return -(matchIndex + 1); // found, returning actual position
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
this.matches = matchIndex;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
dataBuffer.readPosition(readPos); // restore original position
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private int processChunk(byte[] chunk, int currentChunkSize, byte[] delimiterBytes, int delimiterLen, byte delimiterFirstByte, int matchIndex, int readPos, int offset) {
|
||||||
|
int i = 0;
|
||||||
|
while (i < currentChunkSize) {
|
||||||
|
if (matchIndex == 0) {
|
||||||
|
i = findNextCandidate(chunk, i, currentChunkSize, delimiterFirstByte);
|
||||||
|
if (i >= currentChunkSize) {
|
||||||
|
return matchIndex; // no candidate in this chunk
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
matchIndex = updateMatchIndex(chunk[i], delimiterBytes, delimiterLen, delimiterFirstByte, matchIndex);
|
||||||
|
if (matchIndex == -1) {
|
||||||
|
return -(readPos + offset + i + 1); // return found delimiter position (encoded as negative)
|
||||||
|
}
|
||||||
|
i++;
|
||||||
|
}
|
||||||
|
return matchIndex;
|
||||||
|
}
|
||||||
|
|
||||||
|
private int findNextCandidate(byte[] chunk, int start, int limit, byte delimiterFirstByte) {
|
||||||
|
int j = start;
|
||||||
|
while (j < limit && chunk[j] != delimiterFirstByte) {
|
||||||
|
j++;
|
||||||
|
}
|
||||||
|
return j;
|
||||||
|
}
|
||||||
|
|
||||||
|
private int updateMatchIndex(byte b, byte[] delimiterBytes, int delimiterLen, byte delimiterFirstByte, int matchIndex) {
|
||||||
|
if (b == delimiterBytes[matchIndex]) {
|
||||||
|
matchIndex++;
|
||||||
|
if (matchIndex == delimiterLen) {
|
||||||
reset();
|
reset();
|
||||||
return pos;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return -1;
|
else {
|
||||||
|
matchIndex = (b == delimiterFirstByte) ? 1 : 0;
|
||||||
|
}
|
||||||
|
return matchIndex;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -1025,7 +1105,7 @@ public abstract class DataBufferUtils {
|
||||||
private final AtomicReference<State> state = new AtomicReference<>(State.IDLE);
|
private final AtomicReference<State> state = new AtomicReference<>(State.IDLE);
|
||||||
|
|
||||||
public ReadCompletionHandler(AsynchronousFileChannel channel,
|
public ReadCompletionHandler(AsynchronousFileChannel channel,
|
||||||
FluxSink<DataBuffer> sink, long position, DataBufferFactory dataBufferFactory, int bufferSize) {
|
FluxSink<DataBuffer> sink, long position, DataBufferFactory dataBufferFactory, int bufferSize) {
|
||||||
|
|
||||||
this.channel = channel;
|
this.channel = channel;
|
||||||
this.sink = sink;
|
this.sink = sink;
|
||||||
|
|
Loading…
Reference in New Issue