KAFKA-19098 Remove `lastOffset` from PartitionResponse (#19398)

The `lastOffset` is not used actually, so it can be removed.

Reviewers: Jhen-Yung Hsu <jhenyunghsu@gmail.com>, Ken Huang
<s7133700@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Nick Guo 2025-04-08 00:06:02 +08:00 committed by GitHub
parent b8c095074d
commit fcf6da0a0d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 2 additions and 16 deletions

View File

@ -71,8 +71,6 @@ import java.util.Set;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.apache.kafka.common.requests.ProduceResponse.INVALID_OFFSET;
/** /**
* The background thread that handles the sending of produce requests to the Kafka cluster. This thread makes metadata * The background thread that handles the sending of produce requests to the Kafka cluster. This thread makes metadata
* requests to renew its view of the cluster and then sends produce requests to the appropriate nodes. * requests to renew its view of the cluster and then sends produce requests to the appropriate nodes.
@ -608,7 +606,6 @@ public class Sender implements Runnable {
ProduceResponse.PartitionResponse partResp = new ProduceResponse.PartitionResponse( ProduceResponse.PartitionResponse partResp = new ProduceResponse.PartitionResponse(
Errors.forCode(p.errorCode()), Errors.forCode(p.errorCode()),
p.baseOffset(), p.baseOffset(),
INVALID_OFFSET,
p.logAppendTimeMs(), p.logAppendTimeMs(),
p.logStartOffset(), p.logStartOffset(),
p.recordErrors() p.recordErrors()

View File

@ -159,7 +159,6 @@ public class ProduceResponse extends AbstractResponse {
public static final class PartitionResponse { public static final class PartitionResponse {
public Errors error; public Errors error;
public long baseOffset; public long baseOffset;
public long lastOffset;
public long logAppendTime; public long logAppendTime;
public long logStartOffset; public long logStartOffset;
public List<RecordError> recordErrors; public List<RecordError> recordErrors;
@ -183,17 +182,12 @@ public class ProduceResponse extends AbstractResponse {
} }
public PartitionResponse(Errors error, long baseOffset, long logAppendTime, long logStartOffset, List<RecordError> recordErrors, String errorMessage) { public PartitionResponse(Errors error, long baseOffset, long logAppendTime, long logStartOffset, List<RecordError> recordErrors, String errorMessage) {
this(error, baseOffset, INVALID_OFFSET, logAppendTime, logStartOffset, recordErrors, errorMessage, new ProduceResponseData.LeaderIdAndEpoch()); this(error, baseOffset, logAppendTime, logStartOffset, recordErrors, errorMessage, new ProduceResponseData.LeaderIdAndEpoch());
}
public PartitionResponse(Errors error, long baseOffset, long lastOffset, long logAppendTime, long logStartOffset, List<RecordError> recordErrors, String errorMessage) {
this(error, baseOffset, lastOffset, logAppendTime, logStartOffset, recordErrors, errorMessage, new ProduceResponseData.LeaderIdAndEpoch());
} }
public PartitionResponse( public PartitionResponse(
Errors error, Errors error,
long baseOffset, long baseOffset,
long lastOffset,
long logAppendTime, long logAppendTime,
long logStartOffset, long logStartOffset,
List<RecordError> recordErrors, List<RecordError> recordErrors,
@ -202,7 +196,6 @@ public class ProduceResponse extends AbstractResponse {
) { ) {
this.error = error; this.error = error;
this.baseOffset = baseOffset; this.baseOffset = baseOffset;
this.lastOffset = lastOffset;
this.logAppendTime = logAppendTime; this.logAppendTime = logAppendTime;
this.logStartOffset = logStartOffset; this.logStartOffset = logStartOffset;
this.recordErrors = recordErrors; this.recordErrors = recordErrors;
@ -216,7 +209,6 @@ public class ProduceResponse extends AbstractResponse {
if (o == null || getClass() != o.getClass()) return false; if (o == null || getClass() != o.getClass()) return false;
PartitionResponse that = (PartitionResponse) o; PartitionResponse that = (PartitionResponse) o;
return baseOffset == that.baseOffset && return baseOffset == that.baseOffset &&
lastOffset == that.lastOffset &&
logAppendTime == that.logAppendTime && logAppendTime == that.logAppendTime &&
logStartOffset == that.logStartOffset && logStartOffset == that.logStartOffset &&
error == that.error && error == that.error &&
@ -227,7 +219,7 @@ public class ProduceResponse extends AbstractResponse {
@Override @Override
public int hashCode() { public int hashCode() {
return Objects.hash(error, baseOffset, lastOffset, logAppendTime, logStartOffset, recordErrors, errorMessage, currentLeader); return Objects.hash(error, baseOffset, logAppendTime, logStartOffset, recordErrors, errorMessage, currentLeader);
} }
@Override @Override
@ -238,8 +230,6 @@ public class ProduceResponse extends AbstractResponse {
b.append(error); b.append(error);
b.append(",offset: "); b.append(",offset: ");
b.append(baseOffset); b.append(baseOffset);
b.append(",lastOffset: ");
b.append(lastOffset);
b.append(",logAppendTime: "); b.append(",logAppendTime: ");
b.append(logAppendTime); b.append(logAppendTime);
b.append(", logStartOffset: "); b.append(", logStartOffset: ");

View File

@ -890,7 +890,6 @@ class ReplicaManager(val config: KafkaConfig,
new PartitionResponse( new PartitionResponse(
result.error, result.error,
result.info.firstOffset, result.info.firstOffset,
result.info.lastOffset,
result.info.logAppendTime, result.info.logAppendTime,
result.info.logStartOffset, result.info.logStartOffset,
result.info.recordErrors, result.info.recordErrors,