KAFKA-10525: Emit JSONs with new auto-generated schema (KIP-673) (#9526)

This patch updates the request logger to output request and response payloads in JSON. Payloads are converted to JSON based on their auto-generated schema.

Reviewers:  Lucas Bradstreet <lucas@confluent.io>, David Mao <dmao@confluent.io>, David Jacot <djacot@confluent.io>
This commit is contained in:
Anastasia Vela 2020-12-15 05:33:36 -08:00 committed by GitHub
parent 5e5daf47ef
commit 1a10c3445e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
133 changed files with 913 additions and 259 deletions

View File

@ -893,10 +893,10 @@ public class NetworkClient implements KafkaClient {
private void handleApiVersionsResponse(List<ClientResponse> responses,
InFlightRequest req, long now, ApiVersionsResponse apiVersionsResponse) {
final String node = req.destination;
if (apiVersionsResponse.data.errorCode() != Errors.NONE.code()) {
if (req.request.version() == 0 || apiVersionsResponse.data.errorCode() != Errors.UNSUPPORTED_VERSION.code()) {
if (apiVersionsResponse.data().errorCode() != Errors.NONE.code()) {
if (req.request.version() == 0 || apiVersionsResponse.data().errorCode() != Errors.UNSUPPORTED_VERSION.code()) {
log.warn("Received error {} from node {} when making an ApiVersionsRequest with correlation id {}. Disconnecting.",
Errors.forCode(apiVersionsResponse.data.errorCode()), node, req.header.correlationId());
Errors.forCode(apiVersionsResponse.data().errorCode()), node, req.header.correlationId());
this.selector.close(node);
processDisconnection(responses, node, now, ChannelState.LOCAL_CLOSE);
} else {
@ -904,8 +904,8 @@ public class NetworkClient implements KafkaClient {
// the ApiVersionsRequest when an UNSUPPORTED_VERSION error is returned.
// If not provided, the client falls back to version 0.
short maxApiVersion = 0;
if (apiVersionsResponse.data.apiKeys().size() > 0) {
ApiVersionsResponseKey apiVersion = apiVersionsResponse.data.apiKeys().find(ApiKeys.API_VERSIONS.id);
if (apiVersionsResponse.data().apiKeys().size() > 0) {
ApiVersionsResponseKey apiVersion = apiVersionsResponse.data().apiKeys().find(ApiKeys.API_VERSIONS.id);
if (apiVersion != null) {
maxApiVersion = apiVersion.maxVersion();
}
@ -914,7 +914,7 @@ public class NetworkClient implements KafkaClient {
}
return;
}
NodeApiVersions nodeVersionInfo = new NodeApiVersions(apiVersionsResponse.data.apiKeys());
NodeApiVersions nodeVersionInfo = new NodeApiVersions(apiVersionsResponse.data().apiKeys());
apiVersions.update(node, nodeVersionInfo);
this.connectionStates.ready(node);
log.debug("Recorded API versions for node {}: {}", node, nodeVersionInfo);

View File

@ -3451,12 +3451,12 @@ public class KafkaAdminClient extends AdminClient {
}
// If the error is an error at the group level, the future is failed with it
final Errors groupError = Errors.forCode(response.data.errorCode());
final Errors groupError = Errors.forCode(response.data().errorCode());
if (handleGroupRequestError(groupError, context.future()))
return;
final Map<TopicPartition, Errors> partitions = new HashMap<>();
response.data.topics().forEach(topic -> topic.partitions().forEach(partition -> partitions.put(
response.data().topics().forEach(topic -> topic.partitions().forEach(partition -> partitions.put(
new TopicPartition(topic.name(), partition.partitionIndex()),
Errors.forCode(partition.errorCode())))
);
@ -4359,10 +4359,10 @@ public class KafkaAdminClient extends AdminClient {
@Override
void handleResponse(AbstractResponse response) {
final ApiVersionsResponse apiVersionsResponse = (ApiVersionsResponse) response;
if (apiVersionsResponse.data.errorCode() == Errors.NONE.code()) {
if (apiVersionsResponse.data().errorCode() == Errors.NONE.code()) {
future.complete(createFeatureMetadata(apiVersionsResponse));
} else {
future.completeExceptionally(Errors.forCode(apiVersionsResponse.data.errorCode()).exception());
future.completeExceptionally(Errors.forCode(apiVersionsResponse.data().errorCode()).exception());
}
}

View File

@ -732,9 +732,9 @@ public abstract class AbstractCoordinator implements Closeable {
RequestFuture<ByteBuffer> future) {
Errors error = syncResponse.error();
if (error == Errors.NONE) {
if (isProtocolTypeInconsistent(syncResponse.data.protocolType())) {
if (isProtocolTypeInconsistent(syncResponse.data().protocolType())) {
log.error("SyncGroup failed due to inconsistent Protocol Type, received {} but expected {}",
syncResponse.data.protocolType(), protocolType());
syncResponse.data().protocolType(), protocolType());
future.raise(Errors.INCONSISTENT_GROUP_PROTOCOL);
} else {
log.debug("Received successful SyncGroup response: {}", syncResponse);
@ -743,7 +743,7 @@ public abstract class AbstractCoordinator implements Closeable {
synchronized (AbstractCoordinator.this) {
if (!generation.equals(Generation.NO_GENERATION) && state == MemberState.COMPLETING_REBALANCE) {
// check protocol name only if the generation is not reset
final String protocolName = syncResponse.data.protocolName();
final String protocolName = syncResponse.data().protocolName();
final boolean protocolNameInconsistent = protocolName != null &&
!protocolName.equals(generation.protocolName);
@ -761,7 +761,7 @@ public abstract class AbstractCoordinator implements Closeable {
sensors.successfulRebalanceSensor.record(lastRebalanceEndMs - lastRebalanceStartMs);
lastRebalanceStartMs = -1L;
future.complete(ByteBuffer.wrap(syncResponse.data.assignment()));
future.complete(ByteBuffer.wrap(syncResponse.data().assignment()));
}
} else {
log.info("Generation data was cleared by heartbeat thread to {} and state is now {} before " +

View File

@ -1348,8 +1348,8 @@ public class TransactionManager {
Errors error = initProducerIdResponse.error();
if (error == Errors.NONE) {
ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(initProducerIdResponse.data.producerId(),
initProducerIdResponse.data.producerEpoch());
ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(initProducerIdResponse.data().producerId(),
initProducerIdResponse.data().producerEpoch());
setProducerIdAndEpoch(producerIdAndEpoch);
transitionTo(State.READY);
lastError = null;
@ -1623,7 +1623,7 @@ public class TransactionManager {
@Override
public void handleResponse(AbstractResponse response) {
AddOffsetsToTxnResponse addOffsetsToTxnResponse = (AddOffsetsToTxnResponse) response;
Errors error = Errors.forCode(addOffsetsToTxnResponse.data.errorCode());
Errors error = Errors.forCode(addOffsetsToTxnResponse.data().errorCode());
if (error == Errors.NONE) {
log.debug("Successfully added partition for consumer group {} to transaction", builder.data.groupId());

View File

@ -102,7 +102,7 @@ public abstract class AbstractRequest implements AbstractRequestResponse {
return SendBuilder.buildRequestSend(header, data());
}
protected abstract Message data();
public abstract Message data();
// Visible for testing
public final ByteBuffer serialize() {

View File

@ -26,7 +26,7 @@ import java.nio.ByteBuffer;
public class AddOffsetsToTxnRequest extends AbstractRequest {
public AddOffsetsToTxnRequestData data;
private final AddOffsetsToTxnRequestData data;
public static class Builder extends AbstractRequest.Builder<AddOffsetsToTxnRequest> {
public AddOffsetsToTxnRequestData data;
@ -53,7 +53,7 @@ public class AddOffsetsToTxnRequest extends AbstractRequest {
}
@Override
protected AddOffsetsToTxnRequestData data() {
public AddOffsetsToTxnRequestData data() {
return data;
}

View File

@ -39,7 +39,7 @@ import java.util.Map;
*/
public class AddOffsetsToTxnResponse extends AbstractResponse {
public AddOffsetsToTxnResponseData data;
private final AddOffsetsToTxnResponseData data;
public AddOffsetsToTxnResponse(AddOffsetsToTxnResponseData data) {
super(ApiKeys.ADD_OFFSETS_TO_TXN);
@ -57,7 +57,7 @@ public class AddOffsetsToTxnResponse extends AbstractResponse {
}
@Override
protected AddOffsetsToTxnResponseData data() {
public AddOffsetsToTxnResponseData data() {
return data;
}

View File

@ -32,7 +32,7 @@ import java.util.Map;
public class AddPartitionsToTxnRequest extends AbstractRequest {
public final AddPartitionsToTxnRequestData data;
private final AddPartitionsToTxnRequestData data;
private List<TopicPartition> cachedPartitions = null;
@ -112,7 +112,7 @@ public class AddPartitionsToTxnRequest extends AbstractRequest {
}
@Override
protected AddPartitionsToTxnRequestData data() {
public AddPartitionsToTxnRequestData data() {
return data;
}

View File

@ -46,7 +46,7 @@ import java.util.Map;
*/
public class AddPartitionsToTxnResponse extends AbstractResponse {
public final AddPartitionsToTxnResponseData data;
private final AddPartitionsToTxnResponseData data;
private Map<TopicPartition, Errors> cachedErrorsMap = null;
@ -117,7 +117,7 @@ public class AddPartitionsToTxnResponse extends AbstractResponse {
}
@Override
protected AddPartitionsToTxnResponseData data() {
public AddPartitionsToTxnResponseData data() {
return data;
}

View File

@ -111,7 +111,7 @@ public class AlterClientQuotasRequest extends AbstractRequest {
}
@Override
protected AlterClientQuotasRequestData data() {
public AlterClientQuotasRequestData data() {
return data;
}

View File

@ -77,7 +77,7 @@ public class AlterClientQuotasResponse extends AbstractResponse {
}
@Override
protected AlterClientQuotasResponseData data() {
public AlterClientQuotasResponseData data() {
return data;
}

View File

@ -112,7 +112,7 @@ public class AlterConfigsRequest extends AbstractRequest {
}
@Override
protected AlterConfigsRequestData data() {
public AlterConfigsRequestData data() {
return data;
}

View File

@ -34,6 +34,7 @@ public class AlterIsrRequest extends AbstractRequest {
this.data = data;
}
@Override
public AlterIsrRequestData data() {
return data;
}

View File

@ -35,6 +35,7 @@ public class AlterIsrResponse extends AbstractResponse {
this.data = data;
}
@Override
public AlterIsrResponseData data() {
return data;
}

View File

@ -40,6 +40,7 @@ public class AlterPartitionReassignmentsResponse extends AbstractResponse {
new AlterPartitionReassignmentsResponseData(new ByteBufferAccessor(buffer), version));
}
@Override
public AlterPartitionReassignmentsResponseData data() {
return data;
}

View File

@ -61,7 +61,7 @@ public class AlterReplicaLogDirsRequest extends AbstractRequest {
}
@Override
protected AlterReplicaLogDirsRequestData data() {
public AlterReplicaLogDirsRequestData data() {
return data;
}

View File

@ -59,6 +59,7 @@ public class AlterUserScramCredentialsRequest extends AbstractRequest {
return new AlterUserScramCredentialsRequest(new AlterUserScramCredentialsRequestData(new ByteBufferAccessor(buffer), version), version);
}
@Override
public AlterUserScramCredentialsRequestData data() {
return data;
}

View File

@ -33,6 +33,7 @@ public class AlterUserScramCredentialsResponse extends AbstractResponse {
this.data = responseData;
}
@Override
public AlterUserScramCredentialsResponseData data() {
return data;
}

View File

@ -46,7 +46,7 @@ public class ApiVersionsResponse extends AbstractResponse {
public static final ApiVersionsResponse DEFAULT_API_VERSIONS_RESPONSE = createApiVersionsResponse(
DEFAULT_THROTTLE_TIME, RecordBatch.CURRENT_MAGIC_VALUE);
public final ApiVersionsResponseData data;
private final ApiVersionsResponseData data;
public ApiVersionsResponse(ApiVersionsResponseData data) {
super(ApiKeys.API_VERSIONS);

View File

@ -46,7 +46,7 @@ public class BeginQuorumEpochRequest extends AbstractRequest {
}
}
public final BeginQuorumEpochRequestData data;
private final BeginQuorumEpochRequestData data;
private BeginQuorumEpochRequest(BeginQuorumEpochRequestData data, short version) {
super(ApiKeys.BEGIN_QUORUM_EPOCH, version);
@ -54,7 +54,7 @@ public class BeginQuorumEpochRequest extends AbstractRequest {
}
@Override
protected BeginQuorumEpochRequestData data() {
public BeginQuorumEpochRequestData data() {
return data;
}

View File

@ -42,7 +42,7 @@ import java.util.Map;
* - {@link Errors#UNKNOWN_TOPIC_OR_PARTITION}
*/
public class BeginQuorumEpochResponse extends AbstractResponse {
public final BeginQuorumEpochResponseData data;
private final BeginQuorumEpochResponseData data;
public BeginQuorumEpochResponse(BeginQuorumEpochResponseData data) {
super(ApiKeys.BEGIN_QUORUM_EPOCH);
@ -86,7 +86,7 @@ public class BeginQuorumEpochResponse extends AbstractResponse {
}
@Override
protected BeginQuorumEpochResponseData data() {
public BeginQuorumEpochResponseData data() {
return data;
}

View File

@ -65,6 +65,7 @@ public class ControlledShutdownRequest extends AbstractRequest {
version);
}
@Override
public ControlledShutdownRequestData data() {
return data;
}

View File

@ -62,6 +62,7 @@ public class ControlledShutdownResponse extends AbstractResponse {
return new ControlledShutdownResponse(new ControlledShutdownResponseData(new ByteBufferAccessor(buffer), version));
}
@Override
public ControlledShutdownResponseData data() {
return data;
}

View File

@ -70,7 +70,7 @@ public class CreateAclsRequest extends AbstractRequest {
}
@Override
protected CreateAclsRequestData data() {
public CreateAclsRequestData data() {
return data;
}

View File

@ -34,7 +34,7 @@ public class CreateAclsResponse extends AbstractResponse {
}
@Override
protected CreateAclsResponseData data() {
public CreateAclsResponseData data() {
return data;
}

View File

@ -38,6 +38,7 @@ public class CreateDelegationTokenRequest extends AbstractRequest {
version);
}
@Override
public CreateDelegationTokenRequestData data() {
return data;
}

View File

@ -64,6 +64,7 @@ public class CreateDelegationTokenResponse extends AbstractResponse {
return prepareResponse(throttleTimeMs, error, owner, -1, -1, -1, "", ByteBuffer.wrap(new byte[] {}));
}
@Override
public CreateDelegationTokenResponseData data() {
return data;
}

View File

@ -55,6 +55,7 @@ public class CreatePartitionsRequest extends AbstractRequest {
this.data = data;
}
@Override
public CreatePartitionsRequestData data() {
return data;
}

View File

@ -35,6 +35,7 @@ public class CreatePartitionsResponse extends AbstractResponse {
this.data = data;
}
@Override
public CreatePartitionsResponseData data() {
return data;
}

View File

@ -77,6 +77,7 @@ public class CreateTopicsRequest extends AbstractRequest {
this.data = data;
}
@Override
public CreateTopicsRequestData data() {
return data;
}

View File

@ -50,6 +50,7 @@ public class CreateTopicsResponse extends AbstractResponse {
this.data = data;
}
@Override
public CreateTopicsResponseData data() {
return data;
}

View File

@ -100,7 +100,7 @@ public class DeleteAclsRequest extends AbstractRequest {
}
@Override
protected DeleteAclsRequestData data() {
public DeleteAclsRequestData data() {
return data;
}

View File

@ -51,7 +51,7 @@ public class DeleteAclsResponse extends AbstractResponse {
}
@Override
protected DeleteAclsResponseData data() {
public DeleteAclsResponseData data() {
return data;
}

View File

@ -46,7 +46,7 @@ public class DeleteGroupsRequest extends AbstractRequest {
}
}
public final DeleteGroupsRequestData data;
private final DeleteGroupsRequestData data;
public DeleteGroupsRequest(DeleteGroupsRequestData data, short version) {
super(ApiKeys.DELETE_GROUPS, version);
@ -75,7 +75,7 @@ public class DeleteGroupsRequest extends AbstractRequest {
}
@Override
protected DeleteGroupsRequestData data() {
public DeleteGroupsRequestData data() {
return data;
}
}

View File

@ -39,7 +39,7 @@ import java.util.Map;
*/
public class DeleteGroupsResponse extends AbstractResponse {
public final DeleteGroupsResponseData data;
private final DeleteGroupsResponseData data;
public DeleteGroupsResponse(DeleteGroupsResponseData data) {
super(ApiKeys.DELETE_GROUPS);
@ -47,7 +47,7 @@ public class DeleteGroupsResponse extends AbstractResponse {
}
@Override
protected DeleteGroupsResponseData data() {
public DeleteGroupsResponseData data() {
return data;
}

View File

@ -46,6 +46,7 @@ public class DeleteRecordsResponse extends AbstractResponse {
this.data = data;
}
@Override
public DeleteRecordsResponseData data() {
return data;
}

View File

@ -52,6 +52,7 @@ public class DeleteTopicsRequest extends AbstractRequest {
this.data = data;
}
@Override
public DeleteTopicsRequestData data() {
return data;
}

View File

@ -38,7 +38,7 @@ public class DeleteTopicsResponse extends AbstractResponse {
* INVALID_REQUEST(42)
* TOPIC_DELETION_DISABLED(73)
*/
private DeleteTopicsResponseData data;
private final DeleteTopicsResponseData data;
public DeleteTopicsResponse(DeleteTopicsResponseData data) {
super(ApiKeys.DELETE_TOPICS);
@ -50,6 +50,7 @@ public class DeleteTopicsResponse extends AbstractResponse {
return data.throttleTimeMs();
}
@Override
public DeleteTopicsResponseData data() {
return data;
}

View File

@ -89,6 +89,7 @@ public class DescribeAclsRequest extends AbstractRequest {
}
}
@Override
public DescribeAclsRequestData data() {
return data;
}

View File

@ -62,7 +62,7 @@ public class DescribeAclsResponse extends AbstractResponse {
}
@Override
protected DescribeAclsResponseData data() {
public DescribeAclsResponseData data() {
return data;
}

View File

@ -106,7 +106,7 @@ public class DescribeClientQuotasRequest extends AbstractRequest {
}
@Override
protected DescribeClientQuotasRequestData data() {
public DescribeClientQuotasRequestData data() {
return data;
}

View File

@ -71,7 +71,7 @@ public class DescribeClientQuotasResponse extends AbstractResponse {
}
@Override
protected DescribeClientQuotasResponseData data() {
public DescribeClientQuotasResponseData data() {
return data;
}

View File

@ -60,6 +60,7 @@ public class DescribeDelegationTokenRequest extends AbstractRequest {
this.data = data;
}
@Override
public DescribeDelegationTokenRequestData data() {
return data;
}

View File

@ -80,7 +80,7 @@ public class DescribeDelegationTokenResponse extends AbstractResponse {
}
@Override
protected DescribeDelegationTokenResponseData data() {
public DescribeDelegationTokenResponseData data() {
return data;
}

View File

@ -52,6 +52,7 @@ public class DescribeGroupsRequest extends AbstractRequest {
this.data = data;
}
@Override
public DescribeGroupsRequestData data() {
return data;
}

View File

@ -44,7 +44,7 @@ public class DescribeGroupsResponse extends AbstractResponse {
* AUTHORIZATION_FAILED (29)
*/
private DescribeGroupsResponseData data;
private final DescribeGroupsResponseData data;
public DescribeGroupsResponse(DescribeGroupsResponseData data) {
super(ApiKeys.DESCRIBE_GROUPS);
@ -105,6 +105,7 @@ public class DescribeGroupsResponse extends AbstractResponse {
return groupMetadata;
}
@Override
public DescribeGroupsResponseData data() {
return data;
}

View File

@ -39,6 +39,7 @@ public class DescribeLogDirsResponse extends AbstractResponse {
this.data = data;
}
@Override
public DescribeLogDirsResponseData data() {
return data;
}

View File

@ -49,7 +49,7 @@ public class DescribeQuorumRequest extends AbstractRequest {
}
}
public final DescribeQuorumRequestData data;
private final DescribeQuorumRequestData data;
private DescribeQuorumRequest(DescribeQuorumRequestData data, short version) {
super(ApiKeys.DESCRIBE_QUORUM, version);
@ -72,7 +72,7 @@ public class DescribeQuorumRequest extends AbstractRequest {
}
@Override
protected DescribeQuorumRequestData data() {
public DescribeQuorumRequestData data() {
return data;
}

View File

@ -41,7 +41,7 @@ import java.util.Map;
* - {@link Errors#UNKNOWN_TOPIC_OR_PARTITION}
*/
public class DescribeQuorumResponse extends AbstractResponse {
public final DescribeQuorumResponseData data;
private final DescribeQuorumResponseData data;
public DescribeQuorumResponse(DescribeQuorumResponseData data) {
super(ApiKeys.DESCRIBE_QUORUM);
@ -63,7 +63,7 @@ public class DescribeQuorumResponse extends AbstractResponse {
}
@Override
protected DescribeQuorumResponseData data() {
public DescribeQuorumResponseData data() {
return data;
}

View File

@ -44,7 +44,7 @@ public class DescribeUserScramCredentialsRequest extends AbstractRequest {
}
}
private DescribeUserScramCredentialsRequestData data;
private final DescribeUserScramCredentialsRequestData data;
private final short version;
private DescribeUserScramCredentialsRequest(DescribeUserScramCredentialsRequestData data, short version) {
@ -58,6 +58,7 @@ public class DescribeUserScramCredentialsRequest extends AbstractRequest {
new ByteBufferAccessor(buffer), version), version);
}
@Override
public DescribeUserScramCredentialsRequestData data() {
return data;
}

View File

@ -33,6 +33,7 @@ public class DescribeUserScramCredentialsResponse extends AbstractResponse {
this.data = responseData;
}
@Override
public DescribeUserScramCredentialsResponseData data() {
return data;
}

View File

@ -90,6 +90,7 @@ public class ElectLeadersRequest extends AbstractRequest {
this.data = data;
}
@Override
public ElectLeadersRequestData data() {
return data;
}

View File

@ -52,6 +52,7 @@ public class ElectLeadersResponse extends AbstractResponse {
data.setReplicaElectionResults(electionResults);
}
@Override
public ElectLeadersResponseData data() {
return data;
}

View File

@ -47,7 +47,7 @@ public class EndQuorumEpochRequest extends AbstractRequest {
}
}
public final EndQuorumEpochRequestData data;
private final EndQuorumEpochRequestData data;
private EndQuorumEpochRequest(EndQuorumEpochRequestData data, short version) {
super(ApiKeys.END_QUORUM_EPOCH, version);
@ -55,7 +55,7 @@ public class EndQuorumEpochRequest extends AbstractRequest {
}
@Override
protected EndQuorumEpochRequestData data() {
public EndQuorumEpochRequestData data() {
return data;
}

View File

@ -42,7 +42,7 @@ import java.util.Map;
* - {@link Errors#UNKNOWN_TOPIC_OR_PARTITION}
*/
public class EndQuorumEpochResponse extends AbstractResponse {
public final EndQuorumEpochResponseData data;
private final EndQuorumEpochResponseData data;
public EndQuorumEpochResponse(EndQuorumEpochResponseData data) {
super(ApiKeys.END_QUORUM_EPOCH);
@ -64,7 +64,7 @@ public class EndQuorumEpochResponse extends AbstractResponse {
}
@Override
protected EndQuorumEpochResponseData data() {
public EndQuorumEpochResponseData data() {
return data;
}

View File

@ -26,7 +26,7 @@ import java.nio.ByteBuffer;
public class EndTxnRequest extends AbstractRequest {
public final EndTxnRequestData data;
private final EndTxnRequestData data;
public static class Builder extends AbstractRequest.Builder<EndTxnRequest> {
public final EndTxnRequestData data;
@ -60,7 +60,7 @@ public class EndTxnRequest extends AbstractRequest {
}
@Override
protected EndTxnRequestData data() {
public EndTxnRequestData data() {
return data;
}

View File

@ -38,7 +38,7 @@ import java.util.Map;
*/
public class EndTxnResponse extends AbstractResponse {
public final EndTxnResponseData data;
private final EndTxnResponseData data;
public EndTxnResponse(EndTxnResponseData data) {
super(ApiKeys.END_TXN);
@ -61,7 +61,7 @@ public class EndTxnResponse extends AbstractResponse {
}
@Override
protected EndTxnResponseData data() {
public EndTxnResponseData data() {
return data;
}

View File

@ -80,6 +80,7 @@ public class EnvelopeRequest extends AbstractRequest {
return new EnvelopeRequest(new EnvelopeRequestData(new ByteBufferAccessor(buffer), version), version);
}
@Override
public EnvelopeRequestData data() {
return data;
}

View File

@ -57,6 +57,7 @@ public class EnvelopeResponse extends AbstractResponse {
return Errors.forCode(data.errorCode());
}
@Override
public EnvelopeResponseData data() {
return data;
}

View File

@ -39,7 +39,7 @@ public class ExpireDelegationTokenRequest extends AbstractRequest {
}
@Override
protected ExpireDelegationTokenRequestData data() {
public ExpireDelegationTokenRequestData data() {
return data;
}

View File

@ -52,7 +52,7 @@ public class ExpireDelegationTokenResponse extends AbstractResponse {
}
@Override
protected ExpireDelegationTokenResponseData data() {
public ExpireDelegationTokenResponseData data() {
return data;
}

View File

@ -67,6 +67,7 @@ public class FetchResponse<T extends BaseRecords> extends AbstractResponse {
private final FetchResponseData data;
private final LinkedHashMap<TopicPartition, PartitionData<T>> responseDataMap;
@Override
public FetchResponseData data() {
return data;
}

View File

@ -77,6 +77,7 @@ public class FindCoordinatorRequest extends AbstractRequest {
version);
}
@Override
public FindCoordinatorRequestData data() {
return data;
}

View File

@ -45,6 +45,7 @@ public class FindCoordinatorResponse extends AbstractResponse {
this.data = data;
}
@Override
public FindCoordinatorResponseData data() {
return data;
}

View File

@ -50,7 +50,7 @@ public class HeartbeatRequest extends AbstractRequest {
}
}
public final HeartbeatRequestData data;
private final HeartbeatRequestData data;
private HeartbeatRequest(HeartbeatRequestData data, short version) {
super(ApiKeys.HEARTBEAT, version);
@ -72,7 +72,7 @@ public class HeartbeatRequest extends AbstractRequest {
}
@Override
protected HeartbeatRequestData data() {
public HeartbeatRequestData data() {
return data;
}
}

View File

@ -58,7 +58,7 @@ public class HeartbeatResponse extends AbstractResponse {
}
@Override
protected HeartbeatResponseData data() {
public HeartbeatResponseData data() {
return data;
}

View File

@ -91,6 +91,7 @@ public class IncrementalAlterConfigsRequest extends AbstractRequest {
new ByteBufferAccessor(buffer), version), version);
}
@Override
public IncrementalAlterConfigsRequestData data() {
return data;
}

View File

@ -66,6 +66,7 @@ public class IncrementalAlterConfigsResponse extends AbstractResponse {
this.data = data;
}
@Override
public IncrementalAlterConfigsResponseData data() {
return data;
}

View File

@ -51,7 +51,7 @@ public class InitProducerIdRequest extends AbstractRequest {
}
}
public final InitProducerIdRequestData data;
private final InitProducerIdRequestData data;
private InitProducerIdRequest(InitProducerIdRequestData data, short version) {
super(ApiKeys.INIT_PRODUCER_ID, version);
@ -73,7 +73,8 @@ public class InitProducerIdRequest extends AbstractRequest {
}
@Override
protected InitProducerIdRequestData data() {
public InitProducerIdRequestData data() {
return data;
}
}

View File

@ -36,7 +36,7 @@ import java.util.Map;
* - {@link Errors#PRODUCER_FENCED}
*/
public class InitProducerIdResponse extends AbstractResponse {
public final InitProducerIdResponseData data;
private final InitProducerIdResponseData data;
public InitProducerIdResponse(InitProducerIdResponseData data) {
super(ApiKeys.INIT_PRODUCER_ID);
@ -54,7 +54,7 @@ public class InitProducerIdResponse extends AbstractResponse {
}
@Override
protected InitProducerIdResponseData data() {
public InitProducerIdResponseData data() {
return data;
}

View File

@ -107,6 +107,7 @@ public class JoinGroupRequest extends AbstractRequest {
}
}
@Override
public JoinGroupRequestData data() {
return data;
}

View File

@ -33,6 +33,7 @@ public class JoinGroupResponse extends AbstractResponse {
this.data = data;
}
@Override
public JoinGroupResponseData data() {
return data;
}

View File

@ -166,7 +166,8 @@ public class LeaderAndIsrRequest extends AbstractControlRequest {
return Collections.unmodifiableList(data.liveLeaders());
}
protected LeaderAndIsrRequestData data() {
@Override
public LeaderAndIsrRequestData data() {
return data;
}

View File

@ -72,7 +72,7 @@ public class LeaderAndIsrResponse extends AbstractResponse {
}
@Override
protected LeaderAndIsrResponseData data() {
public LeaderAndIsrResponseData data() {
return data;
}

View File

@ -87,6 +87,7 @@ public class LeaveGroupRequest extends AbstractRequest {
this.data = data;
}
@Override
public LeaveGroupRequestData data() {
return data;
}

View File

@ -48,7 +48,7 @@ import java.util.Objects;
*/
public class LeaveGroupResponse extends AbstractResponse {
public final LeaveGroupResponseData data;
private final LeaveGroupResponseData data;
public LeaveGroupResponse(LeaveGroupResponseData data) {
super(ApiKeys.LEAVE_GROUP);
@ -122,7 +122,7 @@ public class LeaveGroupResponse extends AbstractResponse {
}
@Override
protected LeaveGroupResponseData data() {
public LeaveGroupResponseData data() {
return data;
}

View File

@ -33,6 +33,7 @@ public class ListGroupsResponse extends AbstractResponse {
this.data = data;
}
@Override
public ListGroupsResponseData data() {
return data;
}

View File

@ -63,6 +63,7 @@ public class ListPartitionReassignmentsRequest extends AbstractRequest {
new ByteBufferAccessor(buffer), version), version);
}
@Override
public ListPartitionReassignmentsRequestData data() {
return data;
}

View File

@ -38,6 +38,7 @@ public class ListPartitionReassignmentsResponse extends AbstractResponse {
new ByteBufferAccessor(buffer), version));
}
@Override
public ListPartitionReassignmentsResponseData data() {
return data;
}

View File

@ -108,6 +108,7 @@ public class MetadataRequest extends AbstractRequest {
this.data = data;
}
@Override
public MetadataRequestData data() {
return data;
}

View File

@ -74,7 +74,7 @@ public class MetadataResponse extends AbstractResponse {
}
@Override
protected MetadataResponseData data() {
public MetadataResponseData data() {
return data;
}

View File

@ -73,6 +73,7 @@ public class OffsetCommitRequest extends AbstractRequest {
this.data = data;
}
@Override
public OffsetCommitRequestData data() {
return data;
}

View File

@ -81,6 +81,7 @@ public class OffsetCommitResponse extends AbstractResponse {
this(DEFAULT_THROTTLE_TIME, responseData);
}
@Override
public OffsetCommitResponseData data() {
return data;
}

View File

@ -46,7 +46,7 @@ public class OffsetDeleteRequest extends AbstractRequest {
}
}
public final OffsetDeleteRequestData data;
private final OffsetDeleteRequestData data;
public OffsetDeleteRequest(OffsetDeleteRequestData data, short version) {
super(ApiKeys.OFFSET_DELETE, version);
@ -71,7 +71,7 @@ public class OffsetDeleteRequest extends AbstractRequest {
}
@Override
protected OffsetDeleteRequestData data() {
public OffsetDeleteRequestData data() {
return data;
}
}

View File

@ -44,7 +44,7 @@ import java.util.Map;
*/
public class OffsetDeleteResponse extends AbstractResponse {
public final OffsetDeleteResponseData data;
private final OffsetDeleteResponseData data;
public OffsetDeleteResponse(OffsetDeleteResponseData data) {
super(ApiKeys.OFFSET_DELETE);
@ -52,7 +52,7 @@ public class OffsetDeleteResponse extends AbstractResponse {
}
@Override
protected OffsetDeleteResponseData data() {
public OffsetDeleteResponseData data() {
return data;
}

View File

@ -38,7 +38,7 @@ public class OffsetFetchRequest extends AbstractRequest {
private static final Logger log = LoggerFactory.getLogger(OffsetFetchRequest.class);
private static final List<OffsetFetchRequestTopic> ALL_TOPIC_PARTITIONS = null;
public final OffsetFetchRequestData data;
private final OffsetFetchRequestData data;
public static class Builder extends AbstractRequest.Builder<OffsetFetchRequest> {
@ -175,7 +175,7 @@ public class OffsetFetchRequest extends AbstractRequest {
}
@Override
protected OffsetFetchRequestData data() {
public OffsetFetchRequestData data() {
return data;
}
}

View File

@ -63,7 +63,7 @@ public class OffsetFetchResponse extends AbstractResponse {
private static final List<Errors> PARTITION_ERRORS = Arrays.asList(
Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.TOPIC_AUTHORIZATION_FAILED);
public final OffsetFetchResponseData data;
private final OffsetFetchResponseData data;
private final Errors error;
public static final class PartitionData {
@ -219,7 +219,7 @@ public class OffsetFetchResponse extends AbstractResponse {
}
@Override
protected OffsetFetchResponseData data() {
public OffsetFetchResponseData data() {
return data;
}

View File

@ -107,6 +107,7 @@ public class ProduceResponse extends AbstractResponse {
return data;
}
@Override
public ProduceResponseData data() {
return this.data;
}

View File

@ -38,6 +38,7 @@ public class RenewDelegationTokenRequest extends AbstractRequest {
new ByteBufferAccessor(buffer), version), version);
}
@Override
public RenewDelegationTokenRequestData data() {
return data;
}

View File

@ -44,7 +44,7 @@ public class RenewDelegationTokenResponse extends AbstractResponse {
}
@Override
protected RenewDelegationTokenResponseData data() {
public RenewDelegationTokenResponseData data() {
return data;
}

View File

@ -60,6 +60,7 @@ public class SaslAuthenticateRequest extends AbstractRequest {
this.data = data;
}
@Override
public SaslAuthenticateRequestData data() {
return data;
}

View File

@ -68,7 +68,7 @@ public class SaslAuthenticateResponse extends AbstractResponse {
}
@Override
protected SaslAuthenticateResponseData data() {
public SaslAuthenticateResponseData data() {
return data;
}

View File

@ -61,6 +61,7 @@ public class SaslHandshakeRequest extends AbstractRequest {
this.data = data;
}
@Override
public SaslHandshakeRequestData data() {
return data;
}

View File

@ -58,7 +58,7 @@ public class SaslHandshakeResponse extends AbstractResponse {
}
@Override
protected SaslHandshakeResponseData data() {
public SaslHandshakeResponseData data() {
return data;
}

View File

@ -211,7 +211,7 @@ public class StopReplicaRequest extends AbstractControlRequest {
}
@Override
protected StopReplicaRequestData data() {
public StopReplicaRequestData data() {
return data;
}
}

View File

@ -71,7 +71,7 @@ public class StopReplicaResponse extends AbstractResponse {
}
@Override
protected StopReplicaResponseData data() {
public StopReplicaResponseData data() {
return data;
}

View File

@ -53,7 +53,7 @@ public class SyncGroupRequest extends AbstractRequest {
}
}
public final SyncGroupRequestData data;
private final SyncGroupRequestData data;
public SyncGroupRequest(SyncGroupRequestData data, short version) {
super(ApiKeys.SYNC_GROUP, version);
@ -92,7 +92,7 @@ public class SyncGroupRequest extends AbstractRequest {
}
@Override
protected SyncGroupRequestData data() {
public SyncGroupRequestData data() {
return data;
}
}

View File

@ -26,7 +26,7 @@ import java.util.Map;
public class SyncGroupResponse extends AbstractResponse {
public final SyncGroupResponseData data;
private final SyncGroupResponseData data;
public SyncGroupResponse(SyncGroupResponseData data) {
super(ApiKeys.SYNC_GROUP);
@ -48,7 +48,7 @@ public class SyncGroupResponse extends AbstractResponse {
}
@Override
protected SyncGroupResponseData data() {
public SyncGroupResponseData data() {
return data;
}

View File

@ -44,7 +44,7 @@ public class TxnOffsetCommitRequest extends AbstractRequest {
private static final Logger log = LoggerFactory.getLogger(TxnOffsetCommitRequest.class);
public final TxnOffsetCommitRequestData data;
private final TxnOffsetCommitRequestData data;
public static class Builder extends AbstractRequest.Builder<TxnOffsetCommitRequest> {
@ -168,7 +168,7 @@ public class TxnOffsetCommitRequest extends AbstractRequest {
}
@Override
protected TxnOffsetCommitRequestData data() {
public TxnOffsetCommitRequestData data() {
return data;
}

View File

@ -47,7 +47,7 @@ import java.util.Map;
*/
public class TxnOffsetCommitResponse extends AbstractResponse {
public final TxnOffsetCommitResponseData data;
private final TxnOffsetCommitResponseData data;
public TxnOffsetCommitResponse(TxnOffsetCommitResponseData data) {
super(ApiKeys.TXN_OFFSET_COMMIT);
@ -78,7 +78,7 @@ public class TxnOffsetCommitResponse extends AbstractResponse {
}
@Override
protected TxnOffsetCommitResponseData data() {
public TxnOffsetCommitResponseData data() {
return data;
}

Some files were not shown because too many files have changed in this diff Show More