Refactor AbstractNestedMatcher to use per-instance buffer

- Replace ThreadLocal buffer with a per-instance reusable buffer
- Improves memory locality and reduces ThreadLocal overhead
- Update Javadoc for clarity, performance notes, and subclassing guidance

Closes gh-34651

Signed-off-by: Nabil Fawwaz Elqayyim <master@nabilfawwaz.com>
This commit is contained in:
Nabil Fawwaz Elqayyim 2025-08-22 21:06:31 +07:00
parent bcfae821d4
commit 1bf232d1ad
No known key found for this signature in database
GPG Key ID: A270A6876336275F
1 changed files with 26 additions and 37 deletions

View File

@ -61,6 +61,7 @@ import org.springframework.util.CollectionUtils;
* *
* @author Arjen Poutsma * @author Arjen Poutsma
* @author Brian Clozel * @author Brian Clozel
* @author Nabil Fawwaz Elqayyim
* @since 5.0 * @since 5.0
*/ */
public abstract class DataBufferUtils { public abstract class DataBufferUtils {
@ -859,23 +860,17 @@ public abstract class DataBufferUtils {
/** /**
* An abstract base implementation of {@link NestedMatcher} that looks for * Base {@link NestedMatcher} implementation that scans a {@link DataBuffer}
* a specific delimiter in a {@link DataBuffer}. * for a specific delimiter.
* <p>
* Uses a thread-local buffer to scan data in chunks, reducing memory
* allocations and improving performance when processing large buffers.
* </p>
* *
* <p> * <p>Relies on a per-instance reusable buffer to scan data in chunks,
* Each matcher keeps its own match state, so it is intended for * minimizing allocations and improving performance for large or streaming data.</p>
* single-threaded use. The thread-local buffer ensures that multiple
* threads can run their own matchers independently without interfering.
* </p>
* *
* <p> * <p>Each matcher maintains its own state and buffer, ensuring safe use
* Subclasses can extend this class to add custom matching behavior while * in reactive pipelines where execution may shift across threads.</p>
* reusing the built-in delimiter tracking and scanning logic. *
* </p> * <p>Subclasses may extend this class to customize matching strategies
* while reusing the built-in delimiter tracking and scanning logic.</p>
* *
* @see NestedMatcher * @see NestedMatcher
* @see DataBuffer * @see DataBuffer
@ -886,8 +881,7 @@ public abstract class DataBufferUtils {
private int matches = 0; private int matches = 0;
// Thread-local chunk buffer to avoid per-call allocations private final byte[] localBuffer = new byte[8 * 1024]; // Reusable buffer per matcher instance
private static final ThreadLocal<byte[]> LOCAL_BUFFER = ThreadLocal.withInitial(() -> new byte[8 * 1024]);
protected AbstractNestedMatcher(byte[] delimiter) { protected AbstractNestedMatcher(byte[] delimiter) {
this.delimiter = delimiter; this.delimiter = delimiter;
@ -901,25 +895,21 @@ public abstract class DataBufferUtils {
return this.matches; return this.matches;
} }
protected static void releaseLocalBuffer() {
LOCAL_BUFFER.remove();
}
@Override @Override
public int match(DataBuffer dataBuffer) { public int match(DataBuffer dataBuffer) {
final int readPos = dataBuffer.readPosition(); final int readPos = dataBuffer.readPosition();
final int writePos = dataBuffer.writePosition(); final int writePos = dataBuffer.writePosition();
final int length = writePos - readPos; final int length = writePos - readPos;
final byte[] delimiter0 = this.delimiter; final byte[] delimiterBytes = this.delimiter;
final int delimiterLen = delimiter0.length; final int delimiterLength = delimiterBytes.length;
final byte delimiter1 = delimiter0[0]; final byte delimiterFirstByte = delimiterBytes[0];
final byte[] chunk = localBuffer;
final int chunkSize = Math.min(chunk.length, length);
int matchIndex = this.matches; int matchIndex = this.matches;
final byte[] chunk = LOCAL_BUFFER.get();
final int chunkSize = Math.min(chunk.length, length);
try { try {
for (int offset = 0; offset < length; offset += chunkSize) { for (int offset = 0; offset < length; offset += chunkSize) {
int currentChunkSize = Math.min(chunkSize, length - offset); int currentChunkSize = Math.min(chunkSize, length - offset);
@ -927,7 +917,7 @@ public abstract class DataBufferUtils {
dataBuffer.readPosition(readPos + offset); dataBuffer.readPosition(readPos + offset);
dataBuffer.read(chunk, 0, currentChunkSize); dataBuffer.read(chunk, 0, currentChunkSize);
matchIndex = processChunk(chunk, currentChunkSize, delimiter0, delimiterLen, delimiter1, matchIndex, readPos, offset); matchIndex = processChunk(chunk, currentChunkSize, delimiterBytes, delimiterLength, delimiterFirstByte, matchIndex, readPos, offset);
if (matchIndex < 0) { if (matchIndex < 0) {
return -(matchIndex + 1); // found, returning actual position return -(matchIndex + 1); // found, returning actual position
} }
@ -938,21 +928,20 @@ public abstract class DataBufferUtils {
} }
finally { finally {
dataBuffer.readPosition(readPos); // restore original position dataBuffer.readPosition(readPos); // restore original position
releaseLocalBuffer();
} }
} }
private int processChunk(byte[] chunk, int currentChunkSize, byte[] delimiter0, int delimiterLen, byte delimiter1, int matchIndex, int readPos, int offset) { private int processChunk(byte[] chunk, int currentChunkSize, byte[] delimiterBytes, int delimiterLen, byte delimiterFirstByte, int matchIndex, int readPos, int offset) {
int i = 0; int i = 0;
while (i < currentChunkSize) { while (i < currentChunkSize) {
if (matchIndex == 0) { if (matchIndex == 0) {
i = findNextCandidate(chunk, i, currentChunkSize, delimiter1); i = findNextCandidate(chunk, i, currentChunkSize, delimiterFirstByte);
if (i >= currentChunkSize) { if (i >= currentChunkSize) {
return matchIndex; // no candidate in this chunk return matchIndex; // no candidate in this chunk
} }
} }
matchIndex = updateMatchIndex(chunk[i], delimiter0, delimiterLen, delimiter1, matchIndex); matchIndex = updateMatchIndex(chunk[i], delimiterBytes, delimiterLen, delimiterFirstByte, matchIndex);
if (matchIndex == -1) { if (matchIndex == -1) {
return -(readPos + offset + i + 1); // return found delimiter position (encoded as negative) return -(readPos + offset + i + 1); // return found delimiter position (encoded as negative)
} }
@ -961,16 +950,16 @@ public abstract class DataBufferUtils {
return matchIndex; return matchIndex;
} }
private int findNextCandidate(byte[] chunk, int start, int limit, byte delimiter1) { private int findNextCandidate(byte[] chunk, int start, int limit, byte delimiterFirstByte) {
int j = start; int j = start;
while (j < limit && chunk[j] != delimiter1) { while (j < limit && chunk[j] != delimiterFirstByte) {
j++; j++;
} }
return j; return j;
} }
private int updateMatchIndex(byte b, byte[] delimiter0, int delimiterLen, byte delimiter1, int matchIndex) { private int updateMatchIndex(byte b, byte[] delimiterBytes, int delimiterLen, byte delimiterFirstByte, int matchIndex) {
if (b == delimiter0[matchIndex]) { if (b == delimiterBytes[matchIndex]) {
matchIndex++; matchIndex++;
if (matchIndex == delimiterLen) { if (matchIndex == delimiterLen) {
reset(); reset();
@ -978,7 +967,7 @@ public abstract class DataBufferUtils {
} }
} }
else { else {
matchIndex = (b == delimiter1) ? 1 : 0; matchIndex = (b == delimiterFirstByte) ? 1 : 0;
} }
return matchIndex; return matchIndex;
} }