Merge remote-tracking branch 'apache-github/trunk' into minor-alter-isr-scheduling

Conflicts:
	core/src/main/scala/kafka/server/AlterIsrManager.scala
This commit is contained in:
David Arthur 2021-01-05 15:02:33 -05:00
commit 11c4eea501
465 changed files with 14156 additions and 5182 deletions

View File

@ -199,6 +199,11 @@ You can run spotbugs using:
The spotbugs warnings will be found in `reports/spotbugs/main.html` and `reports/spotbugs/test.html` files in the subproject build
directories. Use -PxmlSpotBugsReport=true to generate an XML report instead of an HTML one.
### JMH microbenchmarks ###
We use [JMH](https://openjdk.java.net/projects/code-tools/jmh/) to write microbenchmarks that produce reliable results in the JVM.
See [jmh-benchmarks/README.md](https://github.com/apache/kafka/blob/trunk/jmh-benchmarks/README.md) for details on how to run the microbenchmarks.
### Common build options ###
The following options should be set with a `-P` switch, for example `./gradlew -PmaxParallelForks=1 test`.

View File

@ -1427,6 +1427,7 @@ project(':streams') {
':streams:upgrade-system-tests-24:test',
':streams:upgrade-system-tests-25:test',
':streams:upgrade-system-tests-26:test',
':streams:upgrade-system-tests-27:test',
':streams:examples:test'
]
)
@ -1702,6 +1703,18 @@ project(':streams:upgrade-system-tests-26') {
}
}
project(':streams:upgrade-system-tests-27') {
archivesBaseName = "kafka-streams-upgrade-system-tests-27"
dependencies {
testCompile libs.kafkaStreams_27
}
systemTestLibs {
dependsOn testJar
}
}
project(':jmh-benchmarks') {
apply plugin: 'com.github.johnrengelman.shadow'
@ -1713,7 +1726,10 @@ project(':jmh-benchmarks') {
}
dependencies {
compile project(':core')
compile(project(':core')) {
// jmh requires jopt 4.x while `core` depends on 5.0, they are not binary compatible
exclude group: 'net.sf.jopt-simple', module: 'jopt-simple'
}
compile project(':clients')
compile project(':streams')
compile project(':core')
@ -1726,6 +1742,11 @@ project(':jmh-benchmarks') {
compile libs.slf4jlog4j
}
tasks.withType(JavaCompile) {
// Suppress warning caused by code generated by jmh: `warning: [cast] redundant cast to long`
options.compilerArgs << "-Xlint:-cast"
}
jar {
manifest {
attributes "Main-Class": "org.openjdk.jmh.Main"

View File

@ -28,7 +28,7 @@
<suppress id="dontUseSystemExit"
files="Exit.java"/>
<suppress checks="ClassFanOutComplexity"
files="(Fetcher|Sender|SenderTest|ConsumerCoordinator|KafkaConsumer|KafkaProducer|Utils|TransactionManager|TransactionManagerTest|KafkaAdminClient|NetworkClient|Admin|KafkaRaftClient|KafkaRaftClientTest).java"/>
files="(Fetcher|Sender|SenderTest|ConsumerCoordinator|KafkaConsumer|KafkaProducer|Utils|TransactionManager|TransactionManagerTest|KafkaAdminClient|NetworkClient|Admin|KafkaRaftClient|KafkaRaftClientTest|RaftClientTestContext).java"/>
<suppress checks="ClassFanOutComplexity"
files="(SaslServerAuthenticator|SaslAuthenticatorTest).java"/>
<suppress checks="ClassFanOutComplexity"
@ -67,13 +67,13 @@
files="(Utils|Topic|KafkaLZ4BlockOutputStream|AclData|JoinGroupRequest).java"/>
<suppress checks="CyclomaticComplexity"
files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager|AbstractStickyAssignor|DefaultSslEngineFactory).java"/>
files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager|AbstractStickyAssignor|DefaultSslEngineFactory|Authorizer).java"/>
<suppress checks="JavaNCSS"
files="(AbstractRequest|AbstractResponse|KerberosLogin|WorkerSinkTaskTest|TransactionManagerTest|SenderTest|KafkaAdminClient|ConsumerCoordinatorTest|KafkaAdminClientTest).java"/>
<suppress checks="NPathComplexity"
files="(ConsumerCoordinator|BufferPool|Fetcher|MetricName|Node|ConfigDef|RecordBatch|SslFactory|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|TokenInformation|Agent|Values|PluginUtils|MiniTrogdorCluster|TasksRequest|KafkaProducer|AbstractStickyAssignor|KafkaRaftClient).java"/>
files="(ConsumerCoordinator|BufferPool|Fetcher|MetricName|Node|ConfigDef|RecordBatch|SslFactory|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|TokenInformation|Agent|Values|PluginUtils|MiniTrogdorCluster|TasksRequest|KafkaProducer|AbstractStickyAssignor|KafkaRaftClient|Authorizer).java"/>
<suppress checks="(JavaNCSS|CyclomaticComplexity|MethodLength)"
files="CoordinatorClient.java"/>
@ -100,7 +100,7 @@
files="RequestResponseTest.java|FetcherTest.java|KafkaAdminClientTest.java"/>
<suppress checks="NPathComplexity"
files="MemoryRecordsTest|MetricsTest|TestSslUtils"/>
files="MemoryRecordsTest|MetricsTest|RequestResponseTest|TestSslUtils|AclAuthorizerBenchmark"/>
<suppress checks="(WhitespaceAround|LocalVariableName|ImportControl|AvoidStarImport)"
files="Murmur3Test.java"/>
@ -240,6 +240,8 @@
files="SignalLogger.java"/>
<suppress checks="ParameterNumber"
files="ProduceBenchSpec.java"/>
<suppress checks="ParameterNumber"
files="ConsumeBenchSpec.java"/>
<suppress checks="ParameterNumber"
files="SustainedConnectionSpec.java"/>
<suppress id="dontUseSystemExit"

View File

@ -26,6 +26,7 @@ import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionsResponseKey;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.network.ChannelState;
import org.apache.kafka.common.network.NetworkSend;
import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.network.Selectable;
import org.apache.kafka.common.network.Send;
@ -519,7 +520,7 @@ public class NetworkClient implements KafkaClient {
log.debug("Sending {} request with header {} and timeout {} to node {}: {}",
clientRequest.apiKey(), header, clientRequest.requestTimeoutMs(), destination, request);
}
Send send = request.toSend(destination, header);
Send send = request.toSend(header);
InFlightRequest inFlightRequest = new InFlightRequest(
clientRequest,
header,
@ -528,7 +529,7 @@ public class NetworkClient implements KafkaClient {
send,
now);
this.inFlightRequests.add(inFlightRequest);
selector.send(send);
selector.send(new NetworkSend(clientRequest.destination(), send));
}
/**
@ -831,10 +832,10 @@ public class NetworkClient implements KafkaClient {
*/
private void handleCompletedSends(List<ClientResponse> responses, long now) {
// if no response is expected then when the send is completed, return it
for (Send send : this.selector.completedSends()) {
InFlightRequest request = this.inFlightRequests.lastSent(send.destination());
for (NetworkSend send : this.selector.completedSends()) {
InFlightRequest request = this.inFlightRequests.lastSent(send.destinationId());
if (!request.expectResponse) {
this.inFlightRequests.completeLastSent(send.destination());
this.inFlightRequests.completeLastSent(send.destinationId());
responses.add(request.completed(null, now));
}
}
@ -892,10 +893,10 @@ public class NetworkClient implements KafkaClient {
private void handleApiVersionsResponse(List<ClientResponse> responses,
InFlightRequest req, long now, ApiVersionsResponse apiVersionsResponse) {
final String node = req.destination;
if (apiVersionsResponse.data.errorCode() != Errors.NONE.code()) {
if (req.request.version() == 0 || apiVersionsResponse.data.errorCode() != Errors.UNSUPPORTED_VERSION.code()) {
if (apiVersionsResponse.data().errorCode() != Errors.NONE.code()) {
if (req.request.version() == 0 || apiVersionsResponse.data().errorCode() != Errors.UNSUPPORTED_VERSION.code()) {
log.warn("Received error {} from node {} when making an ApiVersionsRequest with correlation id {}. Disconnecting.",
Errors.forCode(apiVersionsResponse.data.errorCode()), node, req.header.correlationId());
Errors.forCode(apiVersionsResponse.data().errorCode()), node, req.header.correlationId());
this.selector.close(node);
processDisconnection(responses, node, now, ChannelState.LOCAL_CLOSE);
} else {
@ -903,8 +904,8 @@ public class NetworkClient implements KafkaClient {
// the ApiVersionsRequest when an UNSUPPORTED_VERSION error is returned.
// If not provided, the client falls back to version 0.
short maxApiVersion = 0;
if (apiVersionsResponse.data.apiKeys().size() > 0) {
ApiVersionsResponseKey apiVersion = apiVersionsResponse.data.apiKeys().find(ApiKeys.API_VERSIONS.id);
if (apiVersionsResponse.data().apiKeys().size() > 0) {
ApiVersionsResponseKey apiVersion = apiVersionsResponse.data().apiKeys().find(ApiKeys.API_VERSIONS.id);
if (apiVersion != null) {
maxApiVersion = apiVersion.maxVersion();
}
@ -913,10 +914,12 @@ public class NetworkClient implements KafkaClient {
}
return;
}
NodeApiVersions nodeVersionInfo = new NodeApiVersions(apiVersionsResponse.data.apiKeys());
NodeApiVersions nodeVersionInfo = new NodeApiVersions(apiVersionsResponse.data().apiKeys());
apiVersions.update(node, nodeVersionInfo);
this.connectionStates.ready(node);
log.debug("Recorded API versions for node {}: {}", node, nodeVersionInfo);
log.debug("Node {} has finalized features epoch: {}, finalized features: {}, supported features: {}, API versions: {}.",
node, apiVersionsResponse.data().finalizedFeaturesEpoch(), apiVersionsResponse.data().finalizedFeatures(),
apiVersionsResponse.data().supportedFeatures(), nodeVersionInfo);
}
/**

View File

@ -126,10 +126,10 @@ import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse;
import org.apache.kafka.common.message.ListGroupsRequestData;
import org.apache.kafka.common.message.ListGroupsResponseData;
import org.apache.kafka.common.message.ListOffsetRequestData.ListOffsetPartition;
import org.apache.kafka.common.message.ListOffsetRequestData.ListOffsetTopic;
import org.apache.kafka.common.message.ListOffsetResponseData.ListOffsetPartitionResponse;
import org.apache.kafka.common.message.ListOffsetResponseData.ListOffsetTopicResponse;
import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition;
import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsTopic;
import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartitionResponse;
import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse;
import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.MetadataRequestData;
import org.apache.kafka.common.message.OffsetCommitRequestData;
@ -215,8 +215,8 @@ import org.apache.kafka.common.requests.LeaveGroupRequest;
import org.apache.kafka.common.requests.LeaveGroupResponse;
import org.apache.kafka.common.requests.ListGroupsRequest;
import org.apache.kafka.common.requests.ListGroupsResponse;
import org.apache.kafka.common.requests.ListOffsetRequest;
import org.apache.kafka.common.requests.ListOffsetResponse;
import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.apache.kafka.common.requests.ListOffsetsResponse;
import org.apache.kafka.common.requests.ListPartitionReassignmentsRequest;
import org.apache.kafka.common.requests.ListPartitionReassignmentsResponse;
import org.apache.kafka.common.requests.MetadataRequest;
@ -1788,7 +1788,7 @@ public class KafkaAdminClient extends AdminClient {
}
partitions.sort(Comparator.comparingInt(TopicPartitionInfo::partition));
TopicDescription topicDescription = new TopicDescription(topicName, isInternal, partitions,
validAclOperations(response.topicAuthorizedOperations(topicName).get()));
validAclOperations(response.topicAuthorizedOperations(topicName).get()), cluster.topicId(topicName));
future.complete(topicDescription);
}
}
@ -3451,12 +3451,12 @@ public class KafkaAdminClient extends AdminClient {
}
// If the error is an error at the group level, the future is failed with it
final Errors groupError = Errors.forCode(response.data.errorCode());
final Errors groupError = Errors.forCode(response.data().errorCode());
if (handleGroupRequestError(groupError, context.future()))
return;
final Map<TopicPartition, Errors> partitions = new HashMap<>();
response.data.topics().forEach(topic -> topic.partitions().forEach(partition -> partitions.put(
response.data().topics().forEach(topic -> topic.partitions().forEach(partition -> partitions.put(
new TopicPartition(topic.name(), partition.partitionIndex()),
Errors.forCode(partition.errorCode())))
);
@ -3975,7 +3975,7 @@ public class KafkaAdminClient extends AdminClient {
MetadataResponse mr = context.response().orElseThrow(() -> new IllegalStateException("No Metadata response"));
List<Call> calls = new ArrayList<>();
// grouping topic partitions per leader
Map<Node, Map<String, ListOffsetTopic>> leaders = new HashMap<>();
Map<Node, Map<String, ListOffsetsTopic>> leaders = new HashMap<>();
for (Map.Entry<TopicPartition, OffsetSpec> entry: topicPartitionOffsets.entrySet()) {
@ -3985,15 +3985,15 @@ public class KafkaAdminClient extends AdminClient {
long offsetQuery = (offsetSpec instanceof TimestampSpec)
? ((TimestampSpec) offsetSpec).timestamp()
: (offsetSpec instanceof OffsetSpec.EarliestSpec)
? ListOffsetRequest.EARLIEST_TIMESTAMP
: ListOffsetRequest.LATEST_TIMESTAMP;
? ListOffsetsRequest.EARLIEST_TIMESTAMP
: ListOffsetsRequest.LATEST_TIMESTAMP;
// avoid sending listOffsets request for topics with errors
if (!mr.errors().containsKey(tp.topic())) {
Node node = mr.cluster().leaderFor(tp);
if (node != null) {
Map<String, ListOffsetTopic> leadersOnNode = leaders.computeIfAbsent(node, k -> new HashMap<String, ListOffsetTopic>());
ListOffsetTopic topic = leadersOnNode.computeIfAbsent(tp.topic(), k -> new ListOffsetTopic().setName(tp.topic()));
topic.partitions().add(new ListOffsetPartition().setPartitionIndex(tp.partition()).setTimestamp(offsetQuery));
Map<String, ListOffsetsTopic> leadersOnNode = leaders.computeIfAbsent(node, k -> new HashMap<>());
ListOffsetsTopic topic = leadersOnNode.computeIfAbsent(tp.topic(), k -> new ListOffsetsTopic().setName(tp.topic()));
topic.partitions().add(new ListOffsetsPartition().setPartitionIndex(tp.partition()).setTimestamp(offsetQuery));
} else {
future.completeExceptionally(Errors.LEADER_NOT_AVAILABLE.exception());
}
@ -4002,27 +4002,27 @@ public class KafkaAdminClient extends AdminClient {
}
}
for (final Map.Entry<Node, Map<String, ListOffsetTopic>> entry : leaders.entrySet()) {
for (final Map.Entry<Node, Map<String, ListOffsetsTopic>> entry : leaders.entrySet()) {
final int brokerId = entry.getKey().id();
calls.add(new Call("listOffsets on broker " + brokerId, context.deadline(), new ConstantNodeIdProvider(brokerId)) {
final List<ListOffsetTopic> partitionsToQuery = new ArrayList<>(entry.getValue().values());
final List<ListOffsetsTopic> partitionsToQuery = new ArrayList<>(entry.getValue().values());
@Override
ListOffsetRequest.Builder createRequest(int timeoutMs) {
return ListOffsetRequest.Builder
ListOffsetsRequest.Builder createRequest(int timeoutMs) {
return ListOffsetsRequest.Builder
.forConsumer(true, context.options().isolationLevel())
.setTargetTimes(partitionsToQuery);
}
@Override
void handleResponse(AbstractResponse abstractResponse) {
ListOffsetResponse response = (ListOffsetResponse) abstractResponse;
ListOffsetsResponse response = (ListOffsetsResponse) abstractResponse;
Map<TopicPartition, OffsetSpec> retryTopicPartitionOffsets = new HashMap<>();
for (ListOffsetTopicResponse topic : response.topics()) {
for (ListOffsetPartitionResponse partition : topic.partitions()) {
for (ListOffsetsTopicResponse topic : response.topics()) {
for (ListOffsetsPartitionResponse partition : topic.partitions()) {
TopicPartition tp = new TopicPartition(topic.name(), partition.partitionIndex());
KafkaFutureImpl<ListOffsetsResultInfo> future = futures.get(tp);
Errors error = Errors.forCode(partition.errorCode());
@ -4032,7 +4032,7 @@ public class KafkaAdminClient extends AdminClient {
} else if (MetadataOperationContext.shouldRefreshMetadata(error)) {
retryTopicPartitionOffsets.put(tp, offsetRequestSpec);
} else if (error == Errors.NONE) {
Optional<Integer> leaderEpoch = (partition.leaderEpoch() == ListOffsetResponse.UNKNOWN_EPOCH)
Optional<Integer> leaderEpoch = (partition.leaderEpoch() == ListOffsetsResponse.UNKNOWN_EPOCH)
? Optional.empty()
: Optional.of(partition.leaderEpoch());
future.complete(new ListOffsetsResultInfo(partition.offset(), partition.timestamp(), leaderEpoch));
@ -4044,8 +4044,8 @@ public class KafkaAdminClient extends AdminClient {
if (retryTopicPartitionOffsets.isEmpty()) {
// The server should send back a response for every topic partition. But do a sanity check anyway.
for (ListOffsetTopic topic : partitionsToQuery) {
for (ListOffsetPartition partition : topic.partitions()) {
for (ListOffsetsTopic topic : partitionsToQuery) {
for (ListOffsetsPartition partition : topic.partitions()) {
TopicPartition tp = new TopicPartition(topic.name(), partition.partitionIndex());
ApiException error = new ApiException("The response from broker " + brokerId +
" did not contain a result for topic partition " + tp);
@ -4063,8 +4063,8 @@ public class KafkaAdminClient extends AdminClient {
@Override
void handleFailure(Throwable throwable) {
for (ListOffsetTopic topic : entry.getValue().values()) {
for (ListOffsetPartition partition : topic.partitions()) {
for (ListOffsetsTopic topic : entry.getValue().values()) {
for (ListOffsetsPartition partition : topic.partitions()) {
TopicPartition tp = new TopicPartition(topic.name(), partition.partitionIndex());
KafkaFutureImpl<ListOffsetsResultInfo> future = futures.get(tp);
future.completeExceptionally(throwable);
@ -4359,10 +4359,10 @@ public class KafkaAdminClient extends AdminClient {
@Override
void handleResponse(AbstractResponse response) {
final ApiVersionsResponse apiVersionsResponse = (ApiVersionsResponse) response;
if (apiVersionsResponse.data.errorCode() == Errors.NONE.code()) {
if (apiVersionsResponse.data().errorCode() == Errors.NONE.code()) {
future.complete(createFeatureMetadata(apiVersionsResponse));
} else {
future.completeExceptionally(Errors.forCode(apiVersionsResponse.data.errorCode()).exception());
future.completeExceptionally(Errors.forCode(apiVersionsResponse.data().errorCode()).exception());
}
}
@ -4421,8 +4421,8 @@ public class KafkaAdminClient extends AdminClient {
final UpdateFeaturesResponse response =
(UpdateFeaturesResponse) abstractResponse;
Errors topLevelError = Errors.forCode(response.data().errorCode());
switch (topLevelError) {
ApiError topLevelError = response.topLevelError();
switch (topLevelError.error()) {
case NONE:
for (final UpdatableFeatureResult result : response.data().results()) {
final KafkaFutureImpl<Void> future = updateFutures.get(result.feature());
@ -4442,12 +4442,11 @@ public class KafkaAdminClient extends AdminClient {
feature -> "The controller response did not contain a result for feature " + feature);
break;
case NOT_CONTROLLER:
handleNotControllerError(topLevelError);
handleNotControllerError(topLevelError.error());
break;
default:
for (final Map.Entry<String, KafkaFutureImpl<Void>> entry : updateFutures.entrySet()) {
final String errorMsg = response.data().errorMessage();
entry.getValue().completeExceptionally(topLevelError.exception(errorMsg));
entry.getValue().completeExceptionally(topLevelError.exception());
}
break;
}

View File

@ -18,6 +18,7 @@
package org.apache.kafka.clients.admin;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.utils.Utils;
@ -34,6 +35,7 @@ public class TopicDescription {
private final boolean internal;
private final List<TopicPartitionInfo> partitions;
private final Set<AclOperation> authorizedOperations;
private final Uuid topicId;
@Override
public boolean equals(final Object o) {
@ -74,10 +76,16 @@ public class TopicDescription {
*/
public TopicDescription(String name, boolean internal, List<TopicPartitionInfo> partitions,
Set<AclOperation> authorizedOperations) {
this(name, internal, partitions, authorizedOperations, Uuid.ZERO_UUID);
}
public TopicDescription(String name, boolean internal, List<TopicPartitionInfo> partitions,
Set<AclOperation> authorizedOperations, Uuid topicId) {
this.name = name;
this.internal = internal;
this.partitions = partitions;
this.authorizedOperations = authorizedOperations;
this.topicId = topicId;
}
/**
@ -95,6 +103,10 @@ public class TopicDescription {
return internal;
}
public Uuid topicId() {
return topicId;
}
/**
* A list of partitions where the index represents the partition id and the element contains leadership and replica
* information for that partition.

View File

@ -257,7 +257,7 @@ public abstract class AbstractCoordinator implements Closeable {
} else if (coordinator != null && client.isUnavailable(coordinator)) {
// we found the coordinator, but the connection has failed, so mark
// it dead and backoff before retrying discovery
markCoordinatorUnknown();
markCoordinatorUnknown("coordinator unavailable");
timer.sleep(rebalanceConfig.retryBackoffMs);
}
} while (coordinatorUnknown() && timer.notExpired());
@ -619,7 +619,7 @@ public abstract class AbstractCoordinator implements Closeable {
} else if (error == Errors.COORDINATOR_NOT_AVAILABLE
|| error == Errors.NOT_COORDINATOR) {
// re-discover the coordinator and retry with backoff
markCoordinatorUnknown();
markCoordinatorUnknown(error);
log.info("JoinGroup failed: {} Marking coordinator unknown. Sent generation was {}",
error.message(), sentGeneration);
future.raise(error);
@ -658,6 +658,10 @@ public abstract class AbstractCoordinator implements Closeable {
AbstractCoordinator.this.generation = new Generation(OffsetCommitRequest.DEFAULT_GENERATION_ID, memberId, null);
}
future.raise(error);
} else if (error == Errors.REBALANCE_IN_PROGRESS) {
log.info("JoinGroup failed due to non-fatal error: REBALANCE_IN_PROGRESS, " +
"which could indicate a replication timeout on the broker. Will retry.");
future.raise(error);
} else {
// unexpected error, throw the exception
log.error("JoinGroup failed due to unexpected error: {}", error.message());
@ -732,9 +736,9 @@ public abstract class AbstractCoordinator implements Closeable {
RequestFuture<ByteBuffer> future) {
Errors error = syncResponse.error();
if (error == Errors.NONE) {
if (isProtocolTypeInconsistent(syncResponse.data.protocolType())) {
if (isProtocolTypeInconsistent(syncResponse.data().protocolType())) {
log.error("SyncGroup failed due to inconsistent Protocol Type, received {} but expected {}",
syncResponse.data.protocolType(), protocolType());
syncResponse.data().protocolType(), protocolType());
future.raise(Errors.INCONSISTENT_GROUP_PROTOCOL);
} else {
log.debug("Received successful SyncGroup response: {}", syncResponse);
@ -743,7 +747,7 @@ public abstract class AbstractCoordinator implements Closeable {
synchronized (AbstractCoordinator.this) {
if (!generation.equals(Generation.NO_GENERATION) && state == MemberState.COMPLETING_REBALANCE) {
// check protocol name only if the generation is not reset
final String protocolName = syncResponse.data.protocolName();
final String protocolName = syncResponse.data().protocolName();
final boolean protocolNameInconsistent = protocolName != null &&
!protocolName.equals(generation.protocolName);
@ -761,7 +765,7 @@ public abstract class AbstractCoordinator implements Closeable {
sensors.successfulRebalanceSensor.record(lastRebalanceEndMs - lastRebalanceStartMs);
lastRebalanceStartMs = -1L;
future.complete(ByteBuffer.wrap(syncResponse.data.assignment()));
future.complete(ByteBuffer.wrap(syncResponse.data().assignment()));
}
} else {
log.info("Generation data was cleared by heartbeat thread to {} and state is now {} before " +
@ -799,7 +803,7 @@ public abstract class AbstractCoordinator implements Closeable {
|| error == Errors.NOT_COORDINATOR) {
log.info("SyncGroup failed: {} Marking coordinator unknown. Sent generation was {}",
error.message(), sentGeneration);
markCoordinatorUnknown();
markCoordinatorUnknown(error);
future.raise(error);
} else {
future.raise(new KafkaException("Unexpected error from SyncGroup: " + error.message()));
@ -880,7 +884,7 @@ public abstract class AbstractCoordinator implements Closeable {
*/
protected synchronized Node checkAndGetCoordinator() {
if (coordinator != null && client.isUnavailable(coordinator)) {
markCoordinatorUnknown(true);
markCoordinatorUnknown(true, "coordinator unavailable");
return null;
}
return this.coordinator;
@ -890,13 +894,20 @@ public abstract class AbstractCoordinator implements Closeable {
return this.coordinator;
}
protected synchronized void markCoordinatorUnknown() {
markCoordinatorUnknown(false);
protected synchronized void markCoordinatorUnknown(Errors error) {
markCoordinatorUnknown(false, "error response " + error.name());
}
protected synchronized void markCoordinatorUnknown(boolean isDisconnected) {
protected synchronized void markCoordinatorUnknown(String cause) {
markCoordinatorUnknown(false, cause);
}
protected synchronized void markCoordinatorUnknown(boolean isDisconnected, String cause) {
if (this.coordinator != null) {
log.info("Group coordinator {} is unavailable or invalid, will attempt rediscovery", this.coordinator);
log.info("Group coordinator {} is unavailable or invalid due to cause: {}."
+ "isDisconnected: {}. Rediscovery will be attempted.", this.coordinator,
cause, isDisconnected);
Node oldCoordinator = this.coordinator;
// Mark the coordinator dead before disconnecting requests since the callbacks for any pending
@ -1094,7 +1105,7 @@ public abstract class AbstractCoordinator implements Closeable {
|| error == Errors.NOT_COORDINATOR) {
log.info("Attempt to heartbeat failed since coordinator {} is either not started or not valid",
coordinator());
markCoordinatorUnknown();
markCoordinatorUnknown(error);
future.raise(error);
} else if (error == Errors.REBALANCE_IN_PROGRESS) {
// since we may be sending the request during rebalance, we should check
@ -1143,7 +1154,7 @@ public abstract class AbstractCoordinator implements Closeable {
public void onFailure(RuntimeException e, RequestFuture<T> future) {
// mark the coordinator as dead
if (e instanceof DisconnectException) {
markCoordinatorUnknown(true);
markCoordinatorUnknown(true, e.getMessage());
}
future.raise(e);
}
@ -1355,7 +1366,8 @@ public abstract class AbstractCoordinator implements Closeable {
} else if (heartbeat.sessionTimeoutExpired(now)) {
// the session timeout has expired without seeing a successful heartbeat, so we should
// probably make sure the coordinator is still healthy.
markCoordinatorUnknown();
markCoordinatorUnknown("session timed out without receiving a "
+ "heartbeat response");
} else if (heartbeat.pollTimeoutExpired(now)) {
// the poll timeout has expired, which means that the foreground thread has stalled
// in between calls to poll().

View File

@ -1208,7 +1208,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
} else if (error == Errors.COORDINATOR_NOT_AVAILABLE
|| error == Errors.NOT_COORDINATOR
|| error == Errors.REQUEST_TIMED_OUT) {
markCoordinatorUnknown();
markCoordinatorUnknown(error);
future.raise(error);
return;
} else if (error == Errors.FENCED_INSTANCE_ID) {
@ -1311,7 +1311,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
future.raise(error);
} else if (error == Errors.NOT_COORDINATOR) {
// re-discover the coordinator and retry
markCoordinatorUnknown();
markCoordinatorUnknown(error);
future.raise(error);
} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
future.raise(GroupAuthorizationException.forGroupId(rebalanceConfig.groupId));

View File

@ -81,7 +81,7 @@ public class ConsumerProtocol {
.setPartitions(topicEntry.getValue()));
}
return MessageUtil.toByteBuffer(version, data);
return MessageUtil.toVersionPrefixedByteBuffer(version, data);
}
public static Subscription deserializeSubscription(final ByteBuffer buffer, short version) {
@ -127,7 +127,7 @@ public class ConsumerProtocol {
.setPartitions(topicEntry.getValue()));
}
return MessageUtil.toByteBuffer(version, data);
return MessageUtil.toVersionPrefixedByteBuffer(version, data);
}
public static Assignment deserializeAssignment(final ByteBuffer buffer, short version) {

View File

@ -48,9 +48,9 @@ import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.message.ListOffsetRequestData.ListOffsetPartition;
import org.apache.kafka.common.message.ListOffsetResponseData.ListOffsetPartitionResponse;
import org.apache.kafka.common.message.ListOffsetResponseData.ListOffsetTopicResponse;
import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition;
import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartitionResponse;
import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse;
import org.apache.kafka.common.metrics.Gauge;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
@ -70,8 +70,8 @@ import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.ListOffsetRequest;
import org.apache.kafka.common.requests.ListOffsetResponse;
import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.apache.kafka.common.requests.ListOffsetsResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
@ -440,17 +440,17 @@ public class Fetcher<K, V> implements Closeable {
private Long offsetResetStrategyTimestamp(final TopicPartition partition) {
OffsetResetStrategy strategy = subscriptions.resetStrategy(partition);
if (strategy == OffsetResetStrategy.EARLIEST)
return ListOffsetRequest.EARLIEST_TIMESTAMP;
return ListOffsetsRequest.EARLIEST_TIMESTAMP;
else if (strategy == OffsetResetStrategy.LATEST)
return ListOffsetRequest.LATEST_TIMESTAMP;
return ListOffsetsRequest.LATEST_TIMESTAMP;
else
return null;
}
private OffsetResetStrategy timestampToOffsetResetStrategy(long timestamp) {
if (timestamp == ListOffsetRequest.EARLIEST_TIMESTAMP)
if (timestamp == ListOffsetsRequest.EARLIEST_TIMESTAMP)
return OffsetResetStrategy.EARLIEST;
else if (timestamp == ListOffsetRequest.LATEST_TIMESTAMP)
else if (timestamp == ListOffsetsRequest.LATEST_TIMESTAMP)
return OffsetResetStrategy.LATEST;
else
return null;
@ -563,11 +563,11 @@ public class Fetcher<K, V> implements Closeable {
}
public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions, Timer timer) {
return beginningOrEndOffset(partitions, ListOffsetRequest.EARLIEST_TIMESTAMP, timer);
return beginningOrEndOffset(partitions, ListOffsetsRequest.EARLIEST_TIMESTAMP, timer);
}
public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions, Timer timer) {
return beginningOrEndOffset(partitions, ListOffsetRequest.LATEST_TIMESTAMP, timer);
return beginningOrEndOffset(partitions, ListOffsetsRequest.LATEST_TIMESTAMP, timer);
}
private Map<TopicPartition, Long> beginningOrEndOffset(Collection<TopicPartition> partitions,
@ -733,11 +733,11 @@ public class Fetcher<K, V> implements Closeable {
}
private void resetOffsetsAsync(Map<TopicPartition, Long> partitionResetTimestamps) {
Map<Node, Map<TopicPartition, ListOffsetPartition>> timestampsToSearchByNode =
Map<Node, Map<TopicPartition, ListOffsetsPartition>> timestampsToSearchByNode =
groupListOffsetRequests(partitionResetTimestamps, new HashSet<>());
for (Map.Entry<Node, Map<TopicPartition, ListOffsetPartition>> entry : timestampsToSearchByNode.entrySet()) {
for (Map.Entry<Node, Map<TopicPartition, ListOffsetsPartition>> entry : timestampsToSearchByNode.entrySet()) {
Node node = entry.getKey();
final Map<TopicPartition, ListOffsetPartition> resetTimestamps = entry.getValue();
final Map<TopicPartition, ListOffsetsPartition> resetTimestamps = entry.getValue();
subscriptions.setNextAllowedRetry(resetTimestamps.keySet(), time.milliseconds() + requestTimeoutMs);
RequestFuture<ListOffsetResult> future = sendListOffsetRequest(node, resetTimestamps, false);
@ -752,7 +752,7 @@ public class Fetcher<K, V> implements Closeable {
for (Map.Entry<TopicPartition, ListOffsetData> fetchedOffset : result.fetchedOffsets.entrySet()) {
TopicPartition partition = fetchedOffset.getKey();
ListOffsetData offsetData = fetchedOffset.getValue();
ListOffsetPartition requestedReset = resetTimestamps.get(partition);
ListOffsetsPartition requestedReset = resetTimestamps.get(partition);
resetOffsetIfNeeded(partition, timestampToOffsetResetStrategy(requestedReset.timestamp()), offsetData);
}
}
@ -884,7 +884,7 @@ public class Fetcher<K, V> implements Closeable {
private RequestFuture<ListOffsetResult> sendListOffsetsRequests(final Map<TopicPartition, Long> timestampsToSearch,
final boolean requireTimestamps) {
final Set<TopicPartition> partitionsToRetry = new HashSet<>();
Map<Node, Map<TopicPartition, ListOffsetPartition>> timestampsToSearchByNode =
Map<Node, Map<TopicPartition, ListOffsetsPartition>> timestampsToSearchByNode =
groupListOffsetRequests(timestampsToSearch, partitionsToRetry);
if (timestampsToSearchByNode.isEmpty())
return RequestFuture.failure(new StaleMetadataException());
@ -893,7 +893,7 @@ public class Fetcher<K, V> implements Closeable {
final Map<TopicPartition, ListOffsetData> fetchedTimestampOffsets = new HashMap<>();
final AtomicInteger remainingResponses = new AtomicInteger(timestampsToSearchByNode.size());
for (Map.Entry<Node, Map<TopicPartition, ListOffsetPartition>> entry : timestampsToSearchByNode.entrySet()) {
for (Map.Entry<Node, Map<TopicPartition, ListOffsetsPartition>> entry : timestampsToSearchByNode.entrySet()) {
RequestFuture<ListOffsetResult> future =
sendListOffsetRequest(entry.getKey(), entry.getValue(), requireTimestamps);
future.addListener(new RequestFutureListener<ListOffsetResult>() {
@ -930,10 +930,10 @@ public class Fetcher<K, V> implements Closeable {
* @param partitionsToRetry A set of topic partitions that will be extended with partitions
* that need metadata update or re-connect to the leader.
*/
private Map<Node, Map<TopicPartition, ListOffsetPartition>> groupListOffsetRequests(
private Map<Node, Map<TopicPartition, ListOffsetsPartition>> groupListOffsetRequests(
Map<TopicPartition, Long> timestampsToSearch,
Set<TopicPartition> partitionsToRetry) {
final Map<TopicPartition, ListOffsetPartition> partitionDataMap = new HashMap<>();
final Map<TopicPartition, ListOffsetsPartition> partitionDataMap = new HashMap<>();
for (Map.Entry<TopicPartition, Long> entry: timestampsToSearch.entrySet()) {
TopicPartition tp = entry.getKey();
Long offset = entry.getValue();
@ -955,8 +955,8 @@ public class Fetcher<K, V> implements Closeable {
leader, tp);
partitionsToRetry.add(tp);
} else {
int currentLeaderEpoch = leaderAndEpoch.epoch.orElse(ListOffsetResponse.UNKNOWN_EPOCH);
partitionDataMap.put(tp, new ListOffsetPartition()
int currentLeaderEpoch = leaderAndEpoch.epoch.orElse(ListOffsetsResponse.UNKNOWN_EPOCH);
partitionDataMap.put(tp, new ListOffsetsPartition()
.setPartitionIndex(tp.partition())
.setTimestamp(offset)
.setCurrentLeaderEpoch(currentLeaderEpoch));
@ -975,18 +975,18 @@ public class Fetcher<K, V> implements Closeable {
* @return A response which can be polled to obtain the corresponding timestamps and offsets.
*/
private RequestFuture<ListOffsetResult> sendListOffsetRequest(final Node node,
final Map<TopicPartition, ListOffsetPartition> timestampsToSearch,
final Map<TopicPartition, ListOffsetsPartition> timestampsToSearch,
boolean requireTimestamp) {
ListOffsetRequest.Builder builder = ListOffsetRequest.Builder
ListOffsetsRequest.Builder builder = ListOffsetsRequest.Builder
.forConsumer(requireTimestamp, isolationLevel)
.setTargetTimes(ListOffsetRequest.toListOffsetTopics(timestampsToSearch));
.setTargetTimes(ListOffsetsRequest.toListOffsetsTopics(timestampsToSearch));
log.debug("Sending ListOffsetRequest {} to broker {}", builder, node);
return client.send(node, builder)
.compose(new RequestFutureAdapter<ClientResponse, ListOffsetResult>() {
@Override
public void onSuccess(ClientResponse response, RequestFuture<ListOffsetResult> future) {
ListOffsetResponse lor = (ListOffsetResponse) response.responseBody();
ListOffsetsResponse lor = (ListOffsetsResponse) response.responseBody();
log.trace("Received ListOffsetResponse {} from broker {}", lor, node);
handleListOffsetResponse(lor, future);
}
@ -995,7 +995,7 @@ public class Fetcher<K, V> implements Closeable {
/**
* Callback for the response of the list offset call above.
* @param listOffsetResponse The response from the server.
* @param listOffsetsResponse The response from the server.
* @param future The future to be completed when the response returns. Note that any partition-level errors will
* generally fail the entire future result. The one exception is UNSUPPORTED_FOR_MESSAGE_FORMAT,
* which indicates that the broker does not support the v1 message format. Partitions with this
@ -1003,14 +1003,14 @@ public class Fetcher<K, V> implements Closeable {
* value of each partition may be null only for v0. In v1 and later the ListOffset API would not
* return a null timestamp (-1 is returned instead when necessary).
*/
private void handleListOffsetResponse(ListOffsetResponse listOffsetResponse,
private void handleListOffsetResponse(ListOffsetsResponse listOffsetsResponse,
RequestFuture<ListOffsetResult> future) {
Map<TopicPartition, ListOffsetData> fetchedOffsets = new HashMap<>();
Set<TopicPartition> partitionsToRetry = new HashSet<>();
Set<String> unauthorizedTopics = new HashSet<>();
for (ListOffsetTopicResponse topic : listOffsetResponse.topics()) {
for (ListOffsetPartitionResponse partition : topic.partitions()) {
for (ListOffsetsTopicResponse topic : listOffsetsResponse.topics()) {
for (ListOffsetsPartitionResponse partition : topic.partitions()) {
TopicPartition topicPartition = new TopicPartition(topic.name(), partition.partitionIndex());
Errors error = Errors.forCode(partition.errorCode());
switch (error) {
@ -1027,7 +1027,7 @@ public class Fetcher<K, V> implements Closeable {
}
log.debug("Handling v0 ListOffsetResponse response for {}. Fetched offset {}",
topicPartition, offset);
if (offset != ListOffsetResponse.UNKNOWN_OFFSET) {
if (offset != ListOffsetsResponse.UNKNOWN_OFFSET) {
ListOffsetData offsetData = new ListOffsetData(offset, null, Optional.empty());
fetchedOffsets.put(topicPartition, offsetData);
}
@ -1035,8 +1035,8 @@ public class Fetcher<K, V> implements Closeable {
// Handle v1 and later response or v0 without offsets
log.debug("Handling ListOffsetResponse response for {}. Fetched offset {}, timestamp {}",
topicPartition, partition.offset(), partition.timestamp());
if (partition.offset() != ListOffsetResponse.UNKNOWN_OFFSET) {
Optional<Integer> leaderEpoch = (partition.leaderEpoch() == ListOffsetResponse.UNKNOWN_EPOCH)
if (partition.offset() != ListOffsetsResponse.UNKNOWN_OFFSET) {
Optional<Integer> leaderEpoch = (partition.leaderEpoch() == ListOffsetsResponse.UNKNOWN_EPOCH)
? Optional.empty()
: Optional.of(partition.leaderEpoch());
ListOffsetData offsetData = new ListOffsetData(partition.offset(), partition.timestamp(),

View File

@ -605,6 +605,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
* @throws IllegalStateException if no transactional.id has been configured or if {@link #initTransactions()}
* has not yet been invoked
* @throws ProducerFencedException if another producer with the same transactional.id is active
* @throws org.apache.kafka.common.errors.InvalidProducerEpochException if the producer has attempted to produce with an old epoch
* to the partition leader. See the exception for more details
* @throws org.apache.kafka.common.errors.UnsupportedVersionException fatal error indicating the broker
* does not support transactions (i.e. if its version is lower than 0.11.0.0)
* @throws org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured
@ -743,6 +745,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
*
* @throws IllegalStateException if no transactional.id has been configured or no transaction has been started
* @throws ProducerFencedException fatal error indicating another producer with the same transactional.id is active
* @throws org.apache.kafka.common.errors.InvalidProducerEpochException if the producer has attempted to produce with an old epoch
* to the partition leader. See the exception for more details
* @throws org.apache.kafka.common.errors.UnsupportedVersionException fatal error indicating the broker
* does not support transactions (i.e. if its version is lower than 0.11.0.0)
* @throws org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured

View File

@ -1348,8 +1348,8 @@ public class TransactionManager {
Errors error = initProducerIdResponse.error();
if (error == Errors.NONE) {
ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(initProducerIdResponse.data.producerId(),
initProducerIdResponse.data.producerEpoch());
ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(initProducerIdResponse.data().producerId(),
initProducerIdResponse.data().producerEpoch());
setProducerIdAndEpoch(producerIdAndEpoch);
transitionTo(State.READY);
lastError = null;
@ -1623,7 +1623,7 @@ public class TransactionManager {
@Override
public void handleResponse(AbstractResponse response) {
AddOffsetsToTxnResponse addOffsetsToTxnResponse = (AddOffsetsToTxnResponse) response;
Errors error = Errors.forCode(addOffsetsToTxnResponse.data.errorCode());
Errors error = Errors.forCode(addOffsetsToTxnResponse.data().errorCode());
if (error == Errors.NONE) {
log.debug("Successfully added partition for consumer group {} to transaction", builder.data.groupId());

View File

@ -46,6 +46,7 @@ public final class Cluster {
private final Map<Integer, List<PartitionInfo>> partitionsByNode;
private final Map<Integer, Node> nodesById;
private final ClusterResource clusterResource;
private final Map<String, Uuid> topicIds;
/**
* Create a new cluster with the given id, nodes and partitions
@ -57,7 +58,7 @@ public final class Cluster {
Collection<PartitionInfo> partitions,
Set<String> unauthorizedTopics,
Set<String> internalTopics) {
this(clusterId, false, nodes, partitions, unauthorizedTopics, Collections.emptySet(), internalTopics, null);
this(clusterId, false, nodes, partitions, unauthorizedTopics, Collections.emptySet(), internalTopics, null, Collections.emptyMap());
}
/**
@ -71,7 +72,7 @@ public final class Cluster {
Set<String> unauthorizedTopics,
Set<String> internalTopics,
Node controller) {
this(clusterId, false, nodes, partitions, unauthorizedTopics, Collections.emptySet(), internalTopics, controller);
this(clusterId, false, nodes, partitions, unauthorizedTopics, Collections.emptySet(), internalTopics, controller, Collections.emptyMap());
}
/**
@ -86,7 +87,23 @@ public final class Cluster {
Set<String> invalidTopics,
Set<String> internalTopics,
Node controller) {
this(clusterId, false, nodes, partitions, unauthorizedTopics, invalidTopics, internalTopics, controller);
this(clusterId, false, nodes, partitions, unauthorizedTopics, invalidTopics, internalTopics, controller, Collections.emptyMap());
}
/**
* Create a new cluster with the given id, nodes, partitions and topicIds
* @param nodes The nodes in the cluster
* @param partitions Information about a subset of the topic-partitions this cluster hosts
*/
public Cluster(String clusterId,
Collection<Node> nodes,
Collection<PartitionInfo> partitions,
Set<String> unauthorizedTopics,
Set<String> invalidTopics,
Set<String> internalTopics,
Node controller,
Map<String, Uuid> topicIds) {
this(clusterId, false, nodes, partitions, unauthorizedTopics, invalidTopics, internalTopics, controller, topicIds);
}
private Cluster(String clusterId,
@ -96,7 +113,8 @@ public final class Cluster {
Set<String> unauthorizedTopics,
Set<String> invalidTopics,
Set<String> internalTopics,
Node controller) {
Node controller,
Map<String, Uuid> topicIds) {
this.isBootstrapConfigured = isBootstrapConfigured;
this.clusterResource = new ClusterResource(clusterId);
// make a randomized, unmodifiable copy of the nodes
@ -165,6 +183,7 @@ public final class Cluster {
this.partitionsByTopic = Collections.unmodifiableMap(tmpPartitionsByTopic);
this.availablePartitionsByTopic = Collections.unmodifiableMap(tmpAvailablePartitionsByTopic);
this.partitionsByNode = Collections.unmodifiableMap(tmpPartitionsByNode);
this.topicIds = Collections.unmodifiableMap(topicIds);
this.unauthorizedTopics = Collections.unmodifiableSet(unauthorizedTopics);
this.invalidTopics = Collections.unmodifiableSet(invalidTopics);
@ -191,7 +210,7 @@ public final class Cluster {
for (InetSocketAddress address : addresses)
nodes.add(new Node(nodeId--, address.getHostString(), address.getPort()));
return new Cluster(null, true, nodes, new ArrayList<>(0),
Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), null);
Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), null, Collections.emptyMap());
}
/**
@ -327,6 +346,14 @@ public final class Cluster {
return controller;
}
public Collection<Uuid> topicIds() {
return topicIds.values();
}
public Uuid topicId(String topic) {
return topicIds.getOrDefault(topic, Uuid.ZERO_UUID);
}
@Override
public String toString() {
return "Cluster(id = " + clusterResource.clusterId() + ", nodes = " + this.nodes +

View File

@ -14,18 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.errors;
package org.apache.kafka.common.protocol;
public class PositionOutOfRangeException extends ApiException {
import java.nio.ByteBuffer;
private static final long serialVersionUID = 1;
public final class MessageTestUtil {
public static ByteBuffer messageToByteBuffer(Message message, short version) {
ObjectSerializationCache cache = new ObjectSerializationCache();
int size = message.size(cache, version);
ByteBuffer bytes = ByteBuffer.allocate(size);
message.write(new ByteBufferAccessor(bytes), cache, version);
bytes.rewind();
return bytes;
public PositionOutOfRangeException(String s) {
super(s);
}
public PositionOutOfRangeException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@ -14,20 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.network;
package org.apache.kafka.common.errors;
import java.nio.channels.GatheringByteChannel;
public class SnapshotNotFoundException extends ApiException {
public final class TransportLayers {
private static final long serialVersionUID = 1;
private TransportLayers() {}
// This is temporary workaround as Send and Receive interfaces are used by BlockingChannel.
// Once BlockingChannel is removed we can make Send and Receive work with TransportLayer rather than
// GatheringByteChannel or ScatteringByteChannel.
public static boolean hasPendingWrites(GatheringByteChannel channel) {
if (channel instanceof TransportLayer)
return ((TransportLayer) channel).hasPendingWrites();
return false;
public SnapshotNotFoundException(String s) {
super(s);
}
public SnapshotNotFoundException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@ -109,7 +109,7 @@ public class RecordHeaders implements Headers {
}
public Header[] toArray() {
return headers.isEmpty() ? Record.EMPTY_HEADERS : headers.toArray(new Header[headers.size()]);
return headers.isEmpty() ? Record.EMPTY_HEADERS : headers.toArray(new Header[0]);
}
private void checkKey(String key) {

View File

@ -63,7 +63,7 @@ public class Frequencies extends SampledStat implements CompoundStat {
if (frequencies.isEmpty()) {
throw new IllegalArgumentException("Must specify at least one metric name");
}
Frequency[] frequencyArray = frequencies.toArray(new Frequency[frequencies.size()]);
Frequency[] frequencyArray = frequencies.toArray(new Frequency[0]);
return new Frequencies(2, 0.0, 1.0, frequencyArray);
}

View File

@ -19,39 +19,30 @@ package org.apache.kafka.common.network;
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.GatheringByteChannel;
/**
* A send backed by an array of byte buffers
*/
public class ByteBufferSend implements Send {
private final String destination;
private final long size;
protected final ByteBuffer[] buffers;
private long remaining;
private boolean pending = false;
public ByteBufferSend(String destination, ByteBuffer... buffers) {
this.destination = destination;
public ByteBufferSend(ByteBuffer... buffers) {
this.buffers = buffers;
for (ByteBuffer buffer : buffers)
remaining += buffer.remaining();
this.size = remaining;
}
public ByteBufferSend(String destination, ByteBuffer[] buffers, long size) {
this.destination = destination;
public ByteBufferSend(ByteBuffer[] buffers, long size) {
this.buffers = buffers;
this.size = size;
this.remaining = size;
}
@Override
public String destination() {
return destination;
}
@Override
public boolean completed() {
return remaining <= 0 && !pending;
@ -63,12 +54,12 @@ public class ByteBufferSend implements Send {
}
@Override
public long writeTo(GatheringByteChannel channel) throws IOException {
public long writeTo(TransferableChannel channel) throws IOException {
long written = channel.write(buffers);
if (written < 0)
throw new EOFException("Wrote negative bytes to channel. This shouldn't happen.");
remaining -= written;
pending = TransportLayers.hasPendingWrites(channel);
pending = channel.hasPendingWrites();
return written;
}
@ -79,10 +70,15 @@ public class ByteBufferSend implements Send {
@Override
public String toString() {
return "ByteBufferSend(" +
"destination='" + destination + "'" +
", size=" + size +
", remaining=" + remaining +
", pending=" + pending +
')';
}
public static ByteBufferSend sizePrefixed(ByteBuffer buffer) {
ByteBuffer sizeBuffer = ByteBuffer.allocate(4);
sizeBuffer.putInt(0, buffer.remaining());
return new ByteBufferSend(sizeBuffer, buffer);
}
}

View File

@ -124,7 +124,7 @@ public class KafkaChannel implements AutoCloseable {
private final MemoryPool memoryPool;
private final ChannelMetadataRegistry metadataRegistry;
private NetworkReceive receive;
private Send send;
private NetworkSend send;
// Track connection and mute state of channels to enable outstanding requests on channels to be
// processed after the channel is disconnected.
private boolean disconnected;
@ -376,18 +376,18 @@ public class KafkaChannel implements AutoCloseable {
return socket.getInetAddress().toString();
}
public void setSend(Send send) {
public void setSend(NetworkSend send) {
if (this.send != null)
throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress, connection id is " + id);
this.send = send;
this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
}
public Send maybeCompleteSend() {
public NetworkSend maybeCompleteSend() {
if (send != null && send.completed()) {
midWrite = false;
transportLayer.removeInterestOps(SelectionKey.OP_WRITE);
Send result = send;
NetworkSend result = send;
send = null;
return result;
}

View File

@ -16,22 +16,34 @@
*/
package org.apache.kafka.common.network;
import java.nio.ByteBuffer;
import java.io.IOException;
/**
* A size delimited Send that consists of a 4 byte network-ordered size N followed by N bytes of content
*/
public class NetworkSend extends ByteBufferSend {
public class NetworkSend implements Send {
private final String destinationId;
private final Send send;
public NetworkSend(String destination, ByteBuffer buffer) {
super(destination, sizeBuffer(buffer.remaining()), buffer);
public NetworkSend(String destinationId, Send send) {
this.destinationId = destinationId;
this.send = send;
}
private static ByteBuffer sizeBuffer(int size) {
ByteBuffer sizeBuffer = ByteBuffer.allocate(4);
sizeBuffer.putInt(size);
sizeBuffer.rewind();
return sizeBuffer;
public String destinationId() {
return destinationId;
}
@Override
public boolean completed() {
return send.completed();
}
@Override
public long writeTo(TransferableChannel channel) throws IOException {
return send.writeTo(channel);
}
@Override
public long size() {
return send.size();
}
}

View File

@ -62,7 +62,7 @@ public interface Selectable {
* Queue the given request for sending in the subsequent {@link #poll(long) poll()} calls
* @param send The request to send
*/
void send(Send send);
void send(NetworkSend send);
/**
* Do I/O. Reads, writes, connection establishment, etc.
@ -74,7 +74,7 @@ public interface Selectable {
/**
* The list of sends that completed on the last {@link #poll(long) poll()} call.
*/
List<Send> completedSends();
List<NetworkSend> completedSends();
/**
* The collection of receives that completed on the last {@link #poll(long) poll()} call.

View File

@ -106,7 +106,7 @@ public class Selector implements Selectable, AutoCloseable {
private final Map<String, KafkaChannel> channels;
private final Set<KafkaChannel> explicitlyMutedChannels;
private boolean outOfMemory;
private final List<Send> completedSends;
private final List<NetworkSend> completedSends;
private final LinkedHashMap<String, NetworkReceive> completedReceives;
private final Set<SelectionKey> immediatelyConnectedKeys;
private final Map<String, KafkaChannel> closingChannels;
@ -383,8 +383,8 @@ public class Selector implements Selectable, AutoCloseable {
* Queue the given request for sending in the subsequent {@link #poll(long)} calls
* @param send The request to send
*/
public void send(Send send) {
String connectionId = send.destination();
public void send(NetworkSend send) {
String connectionId = send.destinationId();
KafkaChannel channel = openOrClosingChannelOrFail(connectionId);
if (closingChannels.containsKey(connectionId)) {
// ensure notification via `disconnected`, leave channel in the state in which closing was triggered
@ -642,7 +642,7 @@ public class Selector implements Selectable, AutoCloseable {
void write(KafkaChannel channel) throws IOException {
String nodeId = channel.id();
long bytesSent = channel.write();
Send send = channel.maybeCompleteSend();
NetworkSend send = channel.maybeCompleteSend();
// We may complete the send with bytesSent < 1 if `TransportLayer.hasPendingWrites` was true and `channel.write()`
// caused the pending writes to be written to the socket channel buffer
if (bytesSent > 0 || send != null) {
@ -714,7 +714,7 @@ public class Selector implements Selectable, AutoCloseable {
}
@Override
public List<Send> completedSends() {
public List<NetworkSend> completedSends() {
return this.completedSends;
}

View File

@ -17,18 +17,12 @@
package org.apache.kafka.common.network;
import java.io.IOException;
import java.nio.channels.GatheringByteChannel;
/**
* This interface models the in-progress sending of data to a specific destination
* This interface models the in-progress sending of data.
*/
public interface Send {
/**
* The id for the destination of this send
*/
String destination();
/**
* Is this send complete?
*/
@ -41,7 +35,7 @@ public interface Send {
* @return The number of bytes written
* @throws IOException If the write fails
*/
long writeTo(GatheringByteChannel channel) throws IOException;
long writeTo(TransferableChannel channel) throws IOException;
/**
* Size of the send

View File

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.network;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.channels.GatheringByteChannel;
/**
* Extends GatheringByteChannel with the minimal set of methods required by the Send interface. Supporting TLS and
* efficient zero copy transfers are the main reasons for the additional methods.
*
* @see SslTransportLayer
*/
public interface TransferableChannel extends GatheringByteChannel {
/**
* @return true if there are any pending writes. false if the implementation directly write all data to output.
*/
boolean hasPendingWrites();
/**
* Transfers bytes from `fileChannel` to this `TransferableChannel`.
*
* This method will delegate to {@link FileChannel#transferTo(long, long, java.nio.channels.WritableByteChannel)},
* but it will unwrap the destination channel, if possible, in order to benefit from zero copy. This is required
* because the fast path of `transferTo` is only executed if the destination buffer inherits from an internal JDK
* class.
*
* @param fileChannel The source channel
* @param position The position within the file at which the transfer is to begin; must be non-negative
* @param count The maximum number of bytes to be transferred; must be non-negative
* @return The number of bytes, possibly zero, that were actually transferred
* @see FileChannel#transferTo(long, long, java.nio.channels.WritableByteChannel)
*/
long transferFrom(FileChannel fileChannel, long position, long count) throws IOException;
}

View File

@ -23,18 +23,16 @@ package org.apache.kafka.common.network;
* As NetworkClient replaces BlockingChannel and other implementations we will be using KafkaChannel as
* a network I/O channel.
*/
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.channels.ScatteringByteChannel;
import java.nio.channels.GatheringByteChannel;
import java.security.Principal;
import org.apache.kafka.common.errors.AuthenticationException;
public interface TransportLayer extends ScatteringByteChannel, GatheringByteChannel {
import java.io.IOException;
import java.nio.channels.ScatteringByteChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.security.Principal;
public interface TransportLayer extends ScatteringByteChannel, TransferableChannel {
/**
* Returns true if the channel has handshake and authentication done.
@ -75,11 +73,6 @@ public interface TransportLayer extends ScatteringByteChannel, GatheringByteChan
*/
void handshake() throws AuthenticationException, IOException;
/**
* Returns true if there are any pending writes
*/
boolean hasPendingWrites();
/**
* Returns `SSLSession.getPeerPrincipal()` if this is an SslTransportLayer and there is an authenticated peer,
* `KafkaPrincipal.ANONYMOUS` is returned otherwise.
@ -97,20 +90,4 @@ public interface TransportLayer extends ScatteringByteChannel, GatheringByteChan
* which may be processed without reading additional data from the network.
*/
boolean hasBytesBuffered();
/**
* Transfers bytes from `fileChannel` to this `TransportLayer`.
*
* This method will delegate to {@link FileChannel#transferTo(long, long, java.nio.channels.WritableByteChannel)},
* but it will unwrap the destination channel, if possible, in order to benefit from zero copy. This is required
* because the fast path of `transferTo` is only executed if the destination buffer inherits from an internal JDK
* class.
*
* @param fileChannel The source channel
* @param position The position within the file at which the transfer is to begin; must be non-negative
* @param count The maximum number of bytes to be transferred; must be non-negative
* @return The number of bytes, possibly zero, that were actually transferred
* @see FileChannel#transferTo(long, long, java.nio.channels.WritableByteChannel)
*/
long transferFrom(FileChannel fileChannel, long position, long count) throws IOException;
}

View File

@ -16,135 +16,16 @@
*/
package org.apache.kafka.common.protocol;
import org.apache.kafka.common.message.AddOffsetsToTxnRequestData;
import org.apache.kafka.common.message.AddOffsetsToTxnResponseData;
import org.apache.kafka.common.message.AddPartitionsToTxnRequestData;
import org.apache.kafka.common.message.AddPartitionsToTxnResponseData;
import org.apache.kafka.common.message.AlterClientQuotasRequestData;
import org.apache.kafka.common.message.AlterClientQuotasResponseData;
import org.apache.kafka.common.message.AlterConfigsRequestData;
import org.apache.kafka.common.message.AlterConfigsResponseData;
import org.apache.kafka.common.message.AlterIsrRequestData;
import org.apache.kafka.common.message.AlterIsrResponseData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData;
import org.apache.kafka.common.message.AlterReplicaLogDirsRequestData;
import org.apache.kafka.common.message.AlterReplicaLogDirsResponseData;
import org.apache.kafka.common.message.AlterUserScramCredentialsRequestData;
import org.apache.kafka.common.message.AlterUserScramCredentialsResponseData;
import org.apache.kafka.common.message.ApiMessageType;
import org.apache.kafka.common.message.ApiVersionsRequestData;
import org.apache.kafka.common.message.ApiVersionsResponseData;
import org.apache.kafka.common.message.BeginQuorumEpochRequestData;
import org.apache.kafka.common.message.BeginQuorumEpochResponseData;
import org.apache.kafka.common.message.ControlledShutdownRequestData;
import org.apache.kafka.common.message.ControlledShutdownResponseData;
import org.apache.kafka.common.message.CreateAclsRequestData;
import org.apache.kafka.common.message.CreateAclsResponseData;
import org.apache.kafka.common.message.CreateDelegationTokenRequestData;
import org.apache.kafka.common.message.CreateDelegationTokenResponseData;
import org.apache.kafka.common.message.CreatePartitionsRequestData;
import org.apache.kafka.common.message.CreatePartitionsResponseData;
import org.apache.kafka.common.message.CreateTopicsRequestData;
import org.apache.kafka.common.message.CreateTopicsResponseData;
import org.apache.kafka.common.message.DeleteAclsRequestData;
import org.apache.kafka.common.message.DeleteAclsResponseData;
import org.apache.kafka.common.message.DeleteGroupsRequestData;
import org.apache.kafka.common.message.DeleteGroupsResponseData;
import org.apache.kafka.common.message.DeleteRecordsRequestData;
import org.apache.kafka.common.message.DeleteRecordsResponseData;
import org.apache.kafka.common.message.DeleteTopicsRequestData;
import org.apache.kafka.common.message.DeleteTopicsResponseData;
import org.apache.kafka.common.message.DescribeAclsRequestData;
import org.apache.kafka.common.message.DescribeAclsResponseData;
import org.apache.kafka.common.message.DescribeClientQuotasRequestData;
import org.apache.kafka.common.message.DescribeClientQuotasResponseData;
import org.apache.kafka.common.message.DescribeConfigsRequestData;
import org.apache.kafka.common.message.DescribeConfigsResponseData;
import org.apache.kafka.common.message.DescribeDelegationTokenRequestData;
import org.apache.kafka.common.message.DescribeDelegationTokenResponseData;
import org.apache.kafka.common.message.DescribeGroupsRequestData;
import org.apache.kafka.common.message.DescribeGroupsResponseData;
import org.apache.kafka.common.message.DescribeLogDirsRequestData;
import org.apache.kafka.common.message.DescribeLogDirsResponseData;
import org.apache.kafka.common.message.DescribeQuorumRequestData;
import org.apache.kafka.common.message.DescribeQuorumResponseData;
import org.apache.kafka.common.message.DescribeUserScramCredentialsRequestData;
import org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData;
import org.apache.kafka.common.message.ElectLeadersRequestData;
import org.apache.kafka.common.message.ElectLeadersResponseData;
import org.apache.kafka.common.message.EndQuorumEpochRequestData;
import org.apache.kafka.common.message.EndQuorumEpochResponseData;
import org.apache.kafka.common.message.EndTxnRequestData;
import org.apache.kafka.common.message.EndTxnResponseData;
import org.apache.kafka.common.message.EnvelopeRequestData;
import org.apache.kafka.common.message.EnvelopeResponseData;
import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
import org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
import org.apache.kafka.common.message.FetchRequestData;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.message.FindCoordinatorRequestData;
import org.apache.kafka.common.message.FindCoordinatorResponseData;
import org.apache.kafka.common.message.HeartbeatRequestData;
import org.apache.kafka.common.message.HeartbeatResponseData;
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData;
import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData;
import org.apache.kafka.common.message.InitProducerIdRequestData;
import org.apache.kafka.common.message.InitProducerIdResponseData;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.message.LeaderAndIsrRequestData;
import org.apache.kafka.common.message.LeaderAndIsrResponseData;
import org.apache.kafka.common.message.LeaveGroupRequestData;
import org.apache.kafka.common.message.LeaveGroupResponseData;
import org.apache.kafka.common.message.ListGroupsRequestData;
import org.apache.kafka.common.message.ListGroupsResponseData;
import org.apache.kafka.common.message.ListOffsetRequestData;
import org.apache.kafka.common.message.ListOffsetResponseData;
import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
import org.apache.kafka.common.message.MetadataRequestData;
import org.apache.kafka.common.message.MetadataResponseData;
import org.apache.kafka.common.message.OffsetCommitRequestData;
import org.apache.kafka.common.message.OffsetCommitResponseData;
import org.apache.kafka.common.message.OffsetDeleteRequestData;
import org.apache.kafka.common.message.OffsetDeleteResponseData;
import org.apache.kafka.common.message.OffsetFetchRequestData;
import org.apache.kafka.common.message.OffsetFetchResponseData;
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData;
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
import org.apache.kafka.common.message.ProduceRequestData;
import org.apache.kafka.common.message.ProduceResponseData;
import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
import org.apache.kafka.common.message.RenewDelegationTokenResponseData;
import org.apache.kafka.common.message.SaslAuthenticateRequestData;
import org.apache.kafka.common.message.SaslAuthenticateResponseData;
import org.apache.kafka.common.message.SaslHandshakeRequestData;
import org.apache.kafka.common.message.SaslHandshakeResponseData;
import org.apache.kafka.common.message.StopReplicaRequestData;
import org.apache.kafka.common.message.StopReplicaResponseData;
import org.apache.kafka.common.message.SyncGroupRequestData;
import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
import org.apache.kafka.common.message.UpdateFeaturesRequestData;
import org.apache.kafka.common.message.UpdateFeaturesResponseData;
import org.apache.kafka.common.message.UpdateMetadataRequestData;
import org.apache.kafka.common.message.UpdateMetadataResponseData;
import org.apache.kafka.common.message.VoteRequestData;
import org.apache.kafka.common.message.VoteResponseData;
import org.apache.kafka.common.message.WriteTxnMarkersRequestData;
import org.apache.kafka.common.message.WriteTxnMarkersResponseData;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.SchemaException;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.protocol.types.Type;
import org.apache.kafka.common.record.RecordBatch;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;
import static org.apache.kafka.common.protocol.types.Type.BYTES;
@ -157,124 +38,75 @@ import static org.apache.kafka.common.protocol.types.Type.RECORDS;
* Identifiers for all the Kafka APIs
*/
public enum ApiKeys {
PRODUCE(0, "Produce", ProduceRequestData.SCHEMAS, ProduceResponseData.SCHEMAS),
FETCH(1, "Fetch", FetchRequestData.SCHEMAS, FetchResponseData.SCHEMAS),
LIST_OFFSETS(2, "ListOffsets", ListOffsetRequestData.SCHEMAS, ListOffsetResponseData.SCHEMAS),
METADATA(3, "Metadata", MetadataRequestData.SCHEMAS, MetadataResponseData.SCHEMAS),
LEADER_AND_ISR(4, "LeaderAndIsr", true, LeaderAndIsrRequestData.SCHEMAS, LeaderAndIsrResponseData.SCHEMAS),
STOP_REPLICA(5, "StopReplica", true, StopReplicaRequestData.SCHEMAS, StopReplicaResponseData.SCHEMAS),
UPDATE_METADATA(6, "UpdateMetadata", true, UpdateMetadataRequestData.SCHEMAS, UpdateMetadataResponseData.SCHEMAS),
CONTROLLED_SHUTDOWN(7, "ControlledShutdown", true, ControlledShutdownRequestData.SCHEMAS,
ControlledShutdownResponseData.SCHEMAS),
OFFSET_COMMIT(8, "OffsetCommit", OffsetCommitRequestData.SCHEMAS, OffsetCommitResponseData.SCHEMAS),
OFFSET_FETCH(9, "OffsetFetch", OffsetFetchRequestData.SCHEMAS, OffsetFetchResponseData.SCHEMAS),
FIND_COORDINATOR(10, "FindCoordinator", FindCoordinatorRequestData.SCHEMAS,
FindCoordinatorResponseData.SCHEMAS),
JOIN_GROUP(11, "JoinGroup", JoinGroupRequestData.SCHEMAS, JoinGroupResponseData.SCHEMAS),
HEARTBEAT(12, "Heartbeat", HeartbeatRequestData.SCHEMAS, HeartbeatResponseData.SCHEMAS),
LEAVE_GROUP(13, "LeaveGroup", LeaveGroupRequestData.SCHEMAS, LeaveGroupResponseData.SCHEMAS),
SYNC_GROUP(14, "SyncGroup", SyncGroupRequestData.SCHEMAS, SyncGroupResponseData.SCHEMAS),
DESCRIBE_GROUPS(15, "DescribeGroups", DescribeGroupsRequestData.SCHEMAS,
DescribeGroupsResponseData.SCHEMAS),
LIST_GROUPS(16, "ListGroups", ListGroupsRequestData.SCHEMAS, ListGroupsResponseData.SCHEMAS),
SASL_HANDSHAKE(17, "SaslHandshake", SaslHandshakeRequestData.SCHEMAS, SaslHandshakeResponseData.SCHEMAS),
API_VERSIONS(18, "ApiVersions", ApiVersionsRequestData.SCHEMAS, ApiVersionsResponseData.SCHEMAS) {
@Override
public Struct parseResponse(short version, ByteBuffer buffer) {
// Fallback to version 0 for ApiVersions response. If a client sends an ApiVersionsRequest
// using a version higher than that supported by the broker, a version 0 response is sent
// to the client indicating UNSUPPORTED_VERSION.
return parseResponse(version, buffer, (short) 0);
}
},
CREATE_TOPICS(19, "CreateTopics", CreateTopicsRequestData.SCHEMAS, CreateTopicsResponseData.SCHEMAS, true),
DELETE_TOPICS(20, "DeleteTopics", DeleteTopicsRequestData.SCHEMAS, DeleteTopicsResponseData.SCHEMAS, true),
DELETE_RECORDS(21, "DeleteRecords", DeleteRecordsRequestData.SCHEMAS, DeleteRecordsResponseData.SCHEMAS),
INIT_PRODUCER_ID(22, "InitProducerId", InitProducerIdRequestData.SCHEMAS, InitProducerIdResponseData.SCHEMAS),
OFFSET_FOR_LEADER_EPOCH(23, "OffsetForLeaderEpoch", false, OffsetForLeaderEpochRequestData.SCHEMAS,
OffsetForLeaderEpochResponseData.SCHEMAS),
ADD_PARTITIONS_TO_TXN(24, "AddPartitionsToTxn", false, RecordBatch.MAGIC_VALUE_V2,
AddPartitionsToTxnRequestData.SCHEMAS, AddPartitionsToTxnResponseData.SCHEMAS),
ADD_OFFSETS_TO_TXN(25, "AddOffsetsToTxn", false, RecordBatch.MAGIC_VALUE_V2, AddOffsetsToTxnRequestData.SCHEMAS,
AddOffsetsToTxnResponseData.SCHEMAS),
END_TXN(26, "EndTxn", false, RecordBatch.MAGIC_VALUE_V2, EndTxnRequestData.SCHEMAS, EndTxnResponseData.SCHEMAS),
WRITE_TXN_MARKERS(27, "WriteTxnMarkers", true, RecordBatch.MAGIC_VALUE_V2, WriteTxnMarkersRequestData.SCHEMAS,
WriteTxnMarkersResponseData.SCHEMAS),
TXN_OFFSET_COMMIT(28, "TxnOffsetCommit", false, RecordBatch.MAGIC_VALUE_V2, TxnOffsetCommitRequestData.SCHEMAS,
TxnOffsetCommitResponseData.SCHEMAS),
DESCRIBE_ACLS(29, "DescribeAcls", DescribeAclsRequestData.SCHEMAS, DescribeAclsResponseData.SCHEMAS),
CREATE_ACLS(30, "CreateAcls", CreateAclsRequestData.SCHEMAS, CreateAclsResponseData.SCHEMAS, true),
DELETE_ACLS(31, "DeleteAcls", DeleteAclsRequestData.SCHEMAS, DeleteAclsResponseData.SCHEMAS, true),
DESCRIBE_CONFIGS(32, "DescribeConfigs", DescribeConfigsRequestData.SCHEMAS,
DescribeConfigsResponseData.SCHEMAS),
ALTER_CONFIGS(33, "AlterConfigs", AlterConfigsRequestData.SCHEMAS,
AlterConfigsResponseData.SCHEMAS, true),
ALTER_REPLICA_LOG_DIRS(34, "AlterReplicaLogDirs", AlterReplicaLogDirsRequestData.SCHEMAS,
AlterReplicaLogDirsResponseData.SCHEMAS),
DESCRIBE_LOG_DIRS(35, "DescribeLogDirs", DescribeLogDirsRequestData.SCHEMAS,
DescribeLogDirsResponseData.SCHEMAS),
SASL_AUTHENTICATE(36, "SaslAuthenticate", SaslAuthenticateRequestData.SCHEMAS,
SaslAuthenticateResponseData.SCHEMAS),
CREATE_PARTITIONS(37, "CreatePartitions", CreatePartitionsRequestData.SCHEMAS,
CreatePartitionsResponseData.SCHEMAS, true),
CREATE_DELEGATION_TOKEN(38, "CreateDelegationToken", CreateDelegationTokenRequestData.SCHEMAS,
CreateDelegationTokenResponseData.SCHEMAS, true),
RENEW_DELEGATION_TOKEN(39, "RenewDelegationToken", RenewDelegationTokenRequestData.SCHEMAS,
RenewDelegationTokenResponseData.SCHEMAS, true),
EXPIRE_DELEGATION_TOKEN(40, "ExpireDelegationToken", ExpireDelegationTokenRequestData.SCHEMAS,
ExpireDelegationTokenResponseData.SCHEMAS, true),
DESCRIBE_DELEGATION_TOKEN(41, "DescribeDelegationToken", DescribeDelegationTokenRequestData.SCHEMAS,
DescribeDelegationTokenResponseData.SCHEMAS),
DELETE_GROUPS(42, "DeleteGroups", DeleteGroupsRequestData.SCHEMAS, DeleteGroupsResponseData.SCHEMAS),
ELECT_LEADERS(43, "ElectLeaders", ElectLeadersRequestData.SCHEMAS,
ElectLeadersResponseData.SCHEMAS),
INCREMENTAL_ALTER_CONFIGS(44, "IncrementalAlterConfigs", IncrementalAlterConfigsRequestData.SCHEMAS,
IncrementalAlterConfigsResponseData.SCHEMAS, true),
ALTER_PARTITION_REASSIGNMENTS(45, "AlterPartitionReassignments", AlterPartitionReassignmentsRequestData.SCHEMAS,
AlterPartitionReassignmentsResponseData.SCHEMAS, true),
LIST_PARTITION_REASSIGNMENTS(46, "ListPartitionReassignments", ListPartitionReassignmentsRequestData.SCHEMAS,
ListPartitionReassignmentsResponseData.SCHEMAS),
OFFSET_DELETE(47, "OffsetDelete", OffsetDeleteRequestData.SCHEMAS, OffsetDeleteResponseData.SCHEMAS),
DESCRIBE_CLIENT_QUOTAS(48, "DescribeClientQuotas", DescribeClientQuotasRequestData.SCHEMAS,
DescribeClientQuotasResponseData.SCHEMAS),
ALTER_CLIENT_QUOTAS(49, "AlterClientQuotas", AlterClientQuotasRequestData.SCHEMAS,
AlterClientQuotasResponseData.SCHEMAS, true),
DESCRIBE_USER_SCRAM_CREDENTIALS(50, "DescribeUserScramCredentials", DescribeUserScramCredentialsRequestData.SCHEMAS,
DescribeUserScramCredentialsResponseData.SCHEMAS),
ALTER_USER_SCRAM_CREDENTIALS(51, "AlterUserScramCredentials", AlterUserScramCredentialsRequestData.SCHEMAS,
AlterUserScramCredentialsResponseData.SCHEMAS, true),
VOTE(52, "Vote", true, false,
VoteRequestData.SCHEMAS, VoteResponseData.SCHEMAS),
BEGIN_QUORUM_EPOCH(53, "BeginQuorumEpoch", true, false,
BeginQuorumEpochRequestData.SCHEMAS, BeginQuorumEpochResponseData.SCHEMAS),
END_QUORUM_EPOCH(54, "EndQuorumEpoch", true, false,
EndQuorumEpochRequestData.SCHEMAS, EndQuorumEpochResponseData.SCHEMAS),
DESCRIBE_QUORUM(55, "DescribeQuorum", true, false,
DescribeQuorumRequestData.SCHEMAS, DescribeQuorumResponseData.SCHEMAS),
ALTER_ISR(56, "AlterIsr", AlterIsrRequestData.SCHEMAS, AlterIsrResponseData.SCHEMAS),
UPDATE_FEATURES(57, "UpdateFeatures",
UpdateFeaturesRequestData.SCHEMAS, UpdateFeaturesResponseData.SCHEMAS, true),
ENVELOPE(58, "Envelope", true, false, EnvelopeRequestData.SCHEMAS, EnvelopeResponseData.SCHEMAS);
PRODUCE(ApiMessageType.PRODUCE),
FETCH(ApiMessageType.FETCH),
LIST_OFFSETS(ApiMessageType.LIST_OFFSETS),
METADATA(ApiMessageType.METADATA),
LEADER_AND_ISR(ApiMessageType.LEADER_AND_ISR, true),
STOP_REPLICA(ApiMessageType.STOP_REPLICA, true),
UPDATE_METADATA(ApiMessageType.UPDATE_METADATA, true),
CONTROLLED_SHUTDOWN(ApiMessageType.CONTROLLED_SHUTDOWN, true),
OFFSET_COMMIT(ApiMessageType.OFFSET_COMMIT),
OFFSET_FETCH(ApiMessageType.OFFSET_FETCH),
FIND_COORDINATOR(ApiMessageType.FIND_COORDINATOR),
JOIN_GROUP(ApiMessageType.JOIN_GROUP),
HEARTBEAT(ApiMessageType.HEARTBEAT),
LEAVE_GROUP(ApiMessageType.LEAVE_GROUP),
SYNC_GROUP(ApiMessageType.SYNC_GROUP),
DESCRIBE_GROUPS(ApiMessageType.DESCRIBE_GROUPS),
LIST_GROUPS(ApiMessageType.LIST_GROUPS),
SASL_HANDSHAKE(ApiMessageType.SASL_HANDSHAKE),
API_VERSIONS(ApiMessageType.API_VERSIONS),
CREATE_TOPICS(ApiMessageType.CREATE_TOPICS, false, true),
DELETE_TOPICS(ApiMessageType.DELETE_TOPICS, false, true),
DELETE_RECORDS(ApiMessageType.DELETE_RECORDS),
INIT_PRODUCER_ID(ApiMessageType.INIT_PRODUCER_ID),
OFFSET_FOR_LEADER_EPOCH(ApiMessageType.OFFSET_FOR_LEADER_EPOCH),
ADD_PARTITIONS_TO_TXN(ApiMessageType.ADD_PARTITIONS_TO_TXN, false, RecordBatch.MAGIC_VALUE_V2, false),
ADD_OFFSETS_TO_TXN(ApiMessageType.ADD_OFFSETS_TO_TXN, false, RecordBatch.MAGIC_VALUE_V2, false),
END_TXN(ApiMessageType.END_TXN, false, RecordBatch.MAGIC_VALUE_V2, false),
WRITE_TXN_MARKERS(ApiMessageType.WRITE_TXN_MARKERS, true, RecordBatch.MAGIC_VALUE_V2, false),
TXN_OFFSET_COMMIT(ApiMessageType.TXN_OFFSET_COMMIT, false, RecordBatch.MAGIC_VALUE_V2, false),
DESCRIBE_ACLS(ApiMessageType.DESCRIBE_ACLS),
CREATE_ACLS(ApiMessageType.CREATE_ACLS, false, true),
DELETE_ACLS(ApiMessageType.DELETE_ACLS, false, true),
DESCRIBE_CONFIGS(ApiMessageType.DESCRIBE_CONFIGS),
ALTER_CONFIGS(ApiMessageType.ALTER_CONFIGS, false, true),
ALTER_REPLICA_LOG_DIRS(ApiMessageType.ALTER_REPLICA_LOG_DIRS),
DESCRIBE_LOG_DIRS(ApiMessageType.DESCRIBE_LOG_DIRS),
SASL_AUTHENTICATE(ApiMessageType.SASL_AUTHENTICATE),
CREATE_PARTITIONS(ApiMessageType.CREATE_PARTITIONS, false, true),
CREATE_DELEGATION_TOKEN(ApiMessageType.CREATE_DELEGATION_TOKEN, false, true),
RENEW_DELEGATION_TOKEN(ApiMessageType.RENEW_DELEGATION_TOKEN, false, true),
EXPIRE_DELEGATION_TOKEN(ApiMessageType.EXPIRE_DELEGATION_TOKEN, false, true),
DESCRIBE_DELEGATION_TOKEN(ApiMessageType.DESCRIBE_DELEGATION_TOKEN),
DELETE_GROUPS(ApiMessageType.DELETE_GROUPS),
ELECT_LEADERS(ApiMessageType.ELECT_LEADERS),
INCREMENTAL_ALTER_CONFIGS(ApiMessageType.INCREMENTAL_ALTER_CONFIGS, false, true),
ALTER_PARTITION_REASSIGNMENTS(ApiMessageType.ALTER_PARTITION_REASSIGNMENTS, false, true),
LIST_PARTITION_REASSIGNMENTS(ApiMessageType.LIST_PARTITION_REASSIGNMENTS),
OFFSET_DELETE(ApiMessageType.OFFSET_DELETE),
DESCRIBE_CLIENT_QUOTAS(ApiMessageType.DESCRIBE_CLIENT_QUOTAS),
ALTER_CLIENT_QUOTAS(ApiMessageType.ALTER_CLIENT_QUOTAS, false, true),
DESCRIBE_USER_SCRAM_CREDENTIALS(ApiMessageType.DESCRIBE_USER_SCRAM_CREDENTIALS),
ALTER_USER_SCRAM_CREDENTIALS(ApiMessageType.ALTER_USER_SCRAM_CREDENTIALS, false, true),
VOTE(ApiMessageType.VOTE, true, RecordBatch.MAGIC_VALUE_V0, false, false),
BEGIN_QUORUM_EPOCH(ApiMessageType.BEGIN_QUORUM_EPOCH, true, RecordBatch.MAGIC_VALUE_V0, false, false),
END_QUORUM_EPOCH(ApiMessageType.END_QUORUM_EPOCH, true, RecordBatch.MAGIC_VALUE_V0, false, false),
DESCRIBE_QUORUM(ApiMessageType.DESCRIBE_QUORUM, true, RecordBatch.MAGIC_VALUE_V0, false, false),
ALTER_ISR(ApiMessageType.ALTER_ISR, true),
UPDATE_FEATURES(ApiMessageType.UPDATE_FEATURES, false, true),
ENVELOPE(ApiMessageType.ENVELOPE, true, RecordBatch.MAGIC_VALUE_V0, false, false),
FETCH_SNAPSHOT(ApiMessageType.FETCH_SNAPSHOT, false, RecordBatch.MAGIC_VALUE_V0, false, false);
private static final ApiKeys[] ID_TO_TYPE;
private static final int MIN_API_KEY = 0;
public static final int MAX_API_KEY;
// The generator ensures every `ApiMessageType` has a unique id
private static final Map<Integer, ApiKeys> ID_TO_TYPE = Arrays.stream(ApiKeys.values())
.collect(Collectors.toMap(key -> (int) key.id, Function.identity()));
static {
int maxKey = -1;
for (ApiKeys key : ApiKeys.values())
maxKey = Math.max(maxKey, key.id);
ApiKeys[] idToType = new ApiKeys[maxKey + 1];
for (ApiKeys key : ApiKeys.values())
idToType[key.id] = key;
ID_TO_TYPE = idToType;
MAX_API_KEY = maxKey;
}
/** the permanent and immutable id of an API--this can't change ever */
/** the permanent and immutable id of an API - this can't change ever */
public final short id;
/** an english description of the api--this is for debugging and can change */
/** An english description of the api - used for debugging and metric names, it can potentially be changed via a KIP */
public final String name;
/** indicates if this is a ClusterAction request used only by brokers */
@ -289,63 +121,41 @@ public enum ApiKeys {
/** indicates whether the API is enabled for forwarding **/
public final boolean forwardable;
public final Schema[] requestSchemas;
public final Schema[] responseSchemas;
public final boolean requiresDelayedAllocation;
ApiKeys(int id, String name, Schema[] requestSchemas, Schema[] responseSchemas) {
this(id, name, false, requestSchemas, responseSchemas);
public final ApiMessageType messageType;
ApiKeys(ApiMessageType messageType) {
this(messageType, false);
}
ApiKeys(int id, String name, boolean clusterAction, Schema[] requestSchemas, Schema[] responseSchemas) {
this(id, name, clusterAction, RecordBatch.MAGIC_VALUE_V0, requestSchemas, responseSchemas);
ApiKeys(ApiMessageType messageType, boolean clusterAction) {
this(messageType, clusterAction, RecordBatch.MAGIC_VALUE_V0, false);
}
ApiKeys(int id, String name, Schema[] requestSchemas, Schema[] responseSchemas, boolean forwardable) {
this(id, name, false, RecordBatch.MAGIC_VALUE_V0, true, requestSchemas, responseSchemas, forwardable);
ApiKeys(ApiMessageType messageType, boolean clusterAction, boolean forwardable) {
this(messageType, clusterAction, RecordBatch.MAGIC_VALUE_V0, forwardable);
}
ApiKeys(int id, String name, boolean clusterAction, boolean isEnabled, Schema[] requestSchemas, Schema[] responseSchemas) {
this(id, name, clusterAction, RecordBatch.MAGIC_VALUE_V0, isEnabled, requestSchemas, responseSchemas, false);
}
ApiKeys(int id, String name, boolean clusterAction, byte minRequiredInterBrokerMagic,
Schema[] requestSchemas, Schema[] responseSchemas) {
this(id, name, clusterAction, minRequiredInterBrokerMagic, true, requestSchemas, responseSchemas, false);
ApiKeys(ApiMessageType messageType, boolean clusterAction, byte minRequiredInterBrokerMagic, boolean forwardable) {
this(messageType, clusterAction, minRequiredInterBrokerMagic, forwardable, true);
}
ApiKeys(
int id,
String name,
ApiMessageType messageType,
boolean clusterAction,
byte minRequiredInterBrokerMagic,
boolean isEnabled,
Schema[] requestSchemas,
Schema[] responseSchemas,
boolean forwardable
boolean forwardable,
boolean isEnabled
) {
if (id < 0)
throw new IllegalArgumentException("id must not be negative, id: " + id);
this.id = (short) id;
this.name = name;
this.messageType = messageType;
this.id = messageType.apiKey();
this.name = messageType.name;
this.clusterAction = clusterAction;
this.minRequiredInterBrokerMagic = minRequiredInterBrokerMagic;
this.isEnabled = isEnabled;
if (requestSchemas.length != responseSchemas.length)
throw new IllegalStateException(requestSchemas.length + " request versions for api " + name
+ " but " + responseSchemas.length + " response versions.");
for (int i = 0; i < requestSchemas.length; ++i) {
if (requestSchemas[i] == null)
throw new IllegalStateException("Request schema for api " + name + " for version " + i + " is null");
if (responseSchemas[i] == null)
throw new IllegalStateException("Response schema for api " + name + " for version " + i + " is null");
}
this.requiresDelayedAllocation = forwardable || shouldRetainsBufferReference(requestSchemas);
this.requestSchemas = requestSchemas;
this.responseSchemas = responseSchemas;
this.requiresDelayedAllocation = forwardable || shouldRetainsBufferReference(messageType.requestSchemas());
this.forwardable = forwardable;
}
@ -361,57 +171,23 @@ public enum ApiKeys {
}
public static ApiKeys forId(int id) {
if (!hasId(id))
throw new IllegalArgumentException(String.format("Unexpected ApiKeys id `%s`, it should be between `%s` " +
"and `%s` (inclusive)", id, MIN_API_KEY, MAX_API_KEY));
return ID_TO_TYPE[id];
ApiKeys apiKey = ID_TO_TYPE.get(id);
if (apiKey == null) {
throw new IllegalArgumentException("Unexpected api key: " + id);
}
return apiKey;
}
public static boolean hasId(int id) {
return id >= MIN_API_KEY && id <= MAX_API_KEY;
return ID_TO_TYPE.containsKey(id);
}
public short latestVersion() {
return (short) (requestSchemas.length - 1);
return messageType.highestSupportedVersion();
}
public short oldestVersion() {
return 0;
}
public Schema requestSchema(short version) {
return schemaFor(requestSchemas, version);
}
public Schema responseSchema(short version) {
return schemaFor(responseSchemas, version);
}
public Struct parseRequest(short version, ByteBuffer buffer) {
return requestSchema(version).read(buffer);
}
public Struct parseResponse(short version, ByteBuffer buffer) {
return responseSchema(version).read(buffer);
}
protected Struct parseResponse(short version, ByteBuffer buffer, short fallbackVersion) {
int bufferPosition = buffer.position();
try {
return responseSchema(version).read(buffer);
} catch (SchemaException e) {
if (version != fallbackVersion) {
buffer.position(bufferPosition);
return responseSchema(fallbackVersion).read(buffer);
} else
throw e;
}
}
private Schema schemaFor(Schema[] versions, short version) {
if (!isVersionSupported(version))
throw new IllegalArgumentException("Invalid version for API key " + this + ": " + version);
return versions[version];
return messageType.lowestSupportedVersion();
}
public boolean isVersionSupported(short apiVersion) {
@ -419,11 +195,11 @@ public enum ApiKeys {
}
public short requestHeaderVersion(short apiVersion) {
return ApiMessageType.fromApiKey(id).requestHeaderVersion(apiVersion);
return messageType.requestHeaderVersion(apiVersion);
}
public short responseHeaderVersion(short apiVersion) {
return ApiMessageType.fromApiKey(id).responseHeaderVersion(apiVersion);
return messageType.responseHeaderVersion(apiVersion);
}
private static String toHtml() {

View File

@ -133,4 +133,11 @@ public class ByteBufferAccessor implements Readable, Writable {
return ByteUtils.readVarlong(buf);
}
public void flip() {
buf.flip();
}
public ByteBuffer buffer() {
return buf;
}
}

View File

@ -1,35 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.protocol;
import org.apache.kafka.common.protocol.types.Field;
public class CommonFields {
public static final Field.Int32 THROTTLE_TIME_MS = new Field.Int32("throttle_time_ms",
"Duration in milliseconds for which the request was throttled due to quota violation (Zero if the " +
"request did not violate any quota)", 0);
public static final Field.Str TOPIC_NAME = new Field.Str("topic", "Name of topic");
public static final Field.Int32 PARTITION_ID = new Field.Int32("partition", "Topic partition id");
public static final Field.Int16 ERROR_CODE = new Field.Int16("error_code", "Response error code");
public static final Field.NullableStr ERROR_MESSAGE = new Field.NullableStr("error_message", "Response error message");
public static final Field.Int32 LEADER_EPOCH = new Field.Int32("leader_epoch", "The leader epoch");
public static final Field.Int32 CURRENT_LEADER_EPOCH = new Field.Int32("current_leader_epoch",
"The current leader epoch, if provided, is used to fence consumers/replicas with old metadata. " +
"If the epoch provided by the client is larger than the current epoch known to the broker, then " +
"the UNKNOWN_LEADER_EPOCH error code will be returned. If the provided epoch is smaller, then " +
"the FENCED_LEADER_EPOCH error code will be returned.");
}

View File

@ -34,10 +34,9 @@ import org.apache.kafka.common.errors.DuplicateResourceException;
import org.apache.kafka.common.errors.DuplicateSequenceException;
import org.apache.kafka.common.errors.ElectionNotNeededException;
import org.apache.kafka.common.errors.EligibleLeadersNotAvailableException;
import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.FeatureUpdateFailedException;
import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.FencedLeaderEpochException;
import org.apache.kafka.common.errors.InvalidUpdateVersionException;
import org.apache.kafka.common.errors.FetchSessionIdNotFoundException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
@ -56,6 +55,7 @@ import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.errors.InvalidPartitionsException;
import org.apache.kafka.common.errors.InvalidPidMappingException;
import org.apache.kafka.common.errors.InvalidPrincipalTypeException;
import org.apache.kafka.common.errors.InvalidProducerEpochException;
import org.apache.kafka.common.errors.InvalidReplicaAssignmentException;
import org.apache.kafka.common.errors.InvalidReplicationFactorException;
import org.apache.kafka.common.errors.InvalidRequestException;
@ -65,6 +65,7 @@ import org.apache.kafka.common.errors.InvalidTimestampException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.InvalidTxnStateException;
import org.apache.kafka.common.errors.InvalidTxnTimeoutException;
import org.apache.kafka.common.errors.InvalidUpdateVersionException;
import org.apache.kafka.common.errors.KafkaStorageException;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.errors.ListenerNotFoundException;
@ -83,6 +84,7 @@ import org.apache.kafka.common.errors.OffsetOutOfRangeException;
import org.apache.kafka.common.errors.OperationNotAttemptedException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.PolicyViolationException;
import org.apache.kafka.common.errors.PositionOutOfRangeException;
import org.apache.kafka.common.errors.PreferredLeaderNotAvailableException;
import org.apache.kafka.common.errors.PrincipalDeserializationException;
import org.apache.kafka.common.errors.ProducerFencedException;
@ -95,6 +97,7 @@ import org.apache.kafka.common.errors.ResourceNotFoundException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.errors.SecurityDisabledException;
import org.apache.kafka.common.errors.SnapshotNotFoundException;
import org.apache.kafka.common.errors.StaleBrokerEpochException;
import org.apache.kafka.common.errors.ThrottlingQuotaExceededException;
import org.apache.kafka.common.errors.TimeoutException;
@ -115,7 +118,6 @@ import org.apache.kafka.common.errors.UnsupportedCompressionTypeException;
import org.apache.kafka.common.errors.UnsupportedForMessageFormatException;
import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.errors.InvalidProducerEpochException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -133,6 +135,8 @@ import java.util.function.Function;
* the client if the request version suggests that the client may not recognize the new error code.
*
* Do not add exceptions that occur only on the client or only on the server here.
*
* @see org.apache.kafka.common.network.SslTransportLayer
*/
public enum Errors {
UNKNOWN_SERVER_ERROR(-1, "The server experienced an unexpected error when processing the request.",
@ -341,7 +345,12 @@ public enum Errors {
INVALID_UPDATE_VERSION(95, "The given update version was invalid.", InvalidUpdateVersionException::new),
FEATURE_UPDATE_FAILED(96, "Unable to update finalized features due to an unexpected server error.", FeatureUpdateFailedException::new),
PRINCIPAL_DESERIALIZATION_FAILURE(97, "Request principal deserialization failed during forwarding. " +
"This indicates an internal error on the broker cluster security setup.", PrincipalDeserializationException::new);
"This indicates an internal error on the broker cluster security setup.", PrincipalDeserializationException::new),
SNAPSHOT_NOT_FOUND(98, "Requested snapshot was not found", SnapshotNotFoundException::new),
POSITION_OUT_OF_RANGE(
99,
"Requested position is not greater than or equal to zero, and less than the size of the snapshot.",
PositionOutOfRangeException::new);
private static final Logger log = LoggerFactory.getLogger(Errors.class);

View File

@ -32,6 +32,14 @@ public class MessageSizeAccumulator {
return totalSize;
}
/**
* Size excluding zero copy fields as specified by {@link #zeroCopySize}. This is typically the size of the byte
* buffer used to serialize messages.
*/
public int sizeExcludingZeroCopy() {
return totalSize - zeroCopySize;
}
/**
* Get the total "zero-copy" size of the message. This is the summed
* total of all fields which have either have a type of 'bytes' with

View File

@ -23,6 +23,7 @@ import org.apache.kafka.common.utils.Utils;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
@ -159,12 +160,9 @@ public final class MessageUtil {
}
public static byte[] duplicate(byte[] array) {
if (array == null) {
if (array == null)
return null;
}
byte[] newArray = new byte[array.length];
System.arraycopy(array, 0, newArray, 0, array.length);
return newArray;
return Arrays.copyOf(array, array.length);
}
/**
@ -182,19 +180,27 @@ public final class MessageUtil {
}
}
public static ByteBuffer toByteBuffer(final short version, final Message message) {
public static ByteBuffer toByteBuffer(final Message message, final short version) {
ObjectSerializationCache cache = new ObjectSerializationCache();
int size = message.size(cache, version);
ByteBuffer bytes = ByteBuffer.allocate(2 + size);
ByteBufferAccessor accessor = new ByteBufferAccessor(bytes);
accessor.writeShort(version);
message.write(accessor, cache, version);
int messageSize = message.size(cache, version);
ByteBufferAccessor bytes = new ByteBufferAccessor(ByteBuffer.allocate(messageSize));
message.write(bytes, cache, version);
bytes.flip();
return bytes;
return bytes.buffer();
}
public static byte[] toBytes(final short version, final Message message) {
ByteBuffer buffer = toByteBuffer(version, message);
public static ByteBuffer toVersionPrefixedByteBuffer(final short version, final Message message) {
ObjectSerializationCache cache = new ObjectSerializationCache();
int messageSize = message.size(cache, version);
ByteBufferAccessor bytes = new ByteBufferAccessor(ByteBuffer.allocate(messageSize + 2));
bytes.writeShort(version);
message.write(bytes, cache, version);
bytes.flip();
return bytes.buffer();
}
public static byte[] toVersionPrefixedBytes(final short version, final Message message) {
ByteBuffer buffer = toVersionPrefixedByteBuffer(version, message);
// take the inner array directly if it is full with data
if (buffer.hasArray() &&
buffer.arrayOffset() == 0 &&

View File

@ -143,7 +143,7 @@ public class Protocol {
b.append("):</a></h5>\n\n");
// Requests
b.append("<b>Requests:</b><br>\n");
Schema[] requests = key.requestSchemas;
Schema[] requests = key.messageType.requestSchemas();
for (int i = 0; i < requests.length; i++) {
Schema schema = requests[i];
// Schema
@ -164,7 +164,7 @@ public class Protocol {
// Responses
b.append("<b>Responses:</b><br>\n");
Schema[] responses = key.responseSchemas;
Schema[] responses = key.messageType.responseSchemas();
for (int i = 0; i < responses.length; i++) {
Schema schema = responses[i];
// Schema

View File

@ -22,7 +22,6 @@ import org.apache.kafka.common.record.BaseRecords;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MultiRecordsSend;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.RequestUtils;
import org.apache.kafka.common.requests.ResponseHeader;
import org.apache.kafka.common.utils.ByteUtils;
@ -38,12 +37,11 @@ import java.util.Queue;
* allocating new space for "zero-copy" fields (see {@link #writeByteBuffer(ByteBuffer)}
* and {@link #writeRecords(BaseRecords)}).
*
* See {@link org.apache.kafka.common.requests.EnvelopeRequest#toSend(String, RequestHeader)}
* See {@link org.apache.kafka.common.requests.EnvelopeRequest#toSend(RequestHeader)}
* for example usage.
*/
public class SendBuilder implements Writable {
private final ByteBuffer buffer;
private final String destinationId;
private final Queue<Send> sends = new ArrayDeque<>(1);
private long sizeOfSends = 0;
@ -51,8 +49,7 @@ public class SendBuilder implements Writable {
private final List<ByteBuffer> buffers = new ArrayList<>();
private long sizeOfBuffers = 0;
SendBuilder(String destinationId, int size) {
this.destinationId = destinationId;
SendBuilder(int size) {
this.buffer = ByteBuffer.allocate(size);
this.buffer.mark();
}
@ -131,7 +128,7 @@ public class SendBuilder implements Writable {
/**
* Write a record set. The underlying record data will be retained
* in the result of {@link #build()}. See {@link BaseRecords#toSend(String)}.
* in the result of {@link #build()}. See {@link BaseRecords#toSend()}.
*
* @param records the records to write
*/
@ -142,7 +139,7 @@ public class SendBuilder implements Writable {
addBuffer(((MemoryRecords) records).buffer());
} else {
flushPendingSend();
addSend(records.toSend(destinationId));
addSend(records.toSend());
}
}
@ -150,7 +147,7 @@ public class SendBuilder implements Writable {
flushPendingBuffer();
if (!buffers.isEmpty()) {
ByteBuffer[] byteBufferArray = buffers.toArray(new ByteBuffer[0]);
addSend(new ByteBufferSend(destinationId, byteBufferArray, sizeOfBuffers));
addSend(new ByteBufferSend(byteBufferArray, sizeOfBuffers));
clearBuffers();
}
}
@ -175,17 +172,15 @@ public class SendBuilder implements Writable {
if (sends.size() == 1) {
return sends.poll();
} else {
return new MultiRecordsSend(destinationId, sends, sizeOfSends);
return new MultiRecordsSend(sends, sizeOfSends);
}
}
public static Send buildRequestSend(
String destination,
RequestHeader header,
Message apiRequest
) {
return buildSend(
destination,
header.data(),
header.headerVersion(),
apiRequest,
@ -194,13 +189,11 @@ public class SendBuilder implements Writable {
}
public static Send buildResponseSend(
String destination,
ResponseHeader header,
Message apiResponse,
short apiVersion
) {
return buildSend(
destination,
header.data(),
header.headerVersion(),
apiResponse,
@ -209,22 +202,22 @@ public class SendBuilder implements Writable {
}
private static Send buildSend(
String destination,
Message header,
short headerVersion,
Message apiMessage,
short apiVersion
) {
ObjectSerializationCache serializationCache = new ObjectSerializationCache();
MessageSizeAccumulator messageSize = RequestUtils.size(serializationCache, header, headerVersion, apiMessage, apiVersion);
int totalSize = messageSize.totalSize();
int sizeExcludingZeroCopyFields = totalSize - messageSize.zeroCopySize();
MessageSizeAccumulator messageSize = new MessageSizeAccumulator();
header.addSize(messageSize, serializationCache, headerVersion);
apiMessage.addSize(messageSize, serializationCache, apiVersion);
SendBuilder builder = new SendBuilder(destination, sizeExcludingZeroCopyFields + 4);
builder.writeInt(totalSize);
SendBuilder builder = new SendBuilder(messageSize.sizeExcludingZeroCopy() + 4);
builder.writeInt(messageSize.totalSize());
header.write(builder, serializationCache, headerVersion);
apiMessage.write(builder, serializationCache, apiVersion);
return builder.build();
}

View File

@ -32,6 +32,7 @@ public class ClientQuotaEntity {
*/
public static final String USER = "user";
public static final String CLIENT_ID = "client-id";
public static final String IP = "ip";
/**
* Constructs a quota entity for the given types and names. If a name is null,

View File

@ -62,8 +62,8 @@ public abstract class AbstractRecords implements Records {
}
@Override
public DefaultRecordsSend toSend(String destination) {
return new DefaultRecordsSend(destination, this);
public DefaultRecordsSend toSend() {
return new DefaultRecordsSend(this);
}
private Iterator<Record> recordsIterator() {

View File

@ -30,5 +30,5 @@ public interface BaseRecords {
* Encapsulate this {@link BaseRecords} object into {@link RecordsSend}
* @return Initialized {@link RecordsSend} object
*/
RecordsSend<? extends BaseRecords> toSend(String destination);
RecordsSend<? extends BaseRecords> toSend();
}

View File

@ -16,20 +16,21 @@
*/
package org.apache.kafka.common.record;
import org.apache.kafka.common.network.TransferableChannel;
import java.io.IOException;
import java.nio.channels.GatheringByteChannel;
public class DefaultRecordsSend extends RecordsSend<Records> {
public DefaultRecordsSend(String destination, Records records) {
this(destination, records, records.sizeInBytes());
public DefaultRecordsSend(Records records) {
this(records, records.sizeInBytes());
}
public DefaultRecordsSend(String destination, Records records, int maxBytesToWrite) {
super(destination, records, maxBytesToWrite);
public DefaultRecordsSend(Records records, int maxBytesToWrite) {
super(records, maxBytesToWrite);
}
@Override
protected long writeTo(GatheringByteChannel channel, long previouslyWritten, int remaining) throws IOException {
protected long writeTo(TransferableChannel channel, long previouslyWritten, int remaining) throws IOException {
return records().writeTo(channel, previouslyWritten, remaining);
}
}

View File

@ -17,7 +17,7 @@
package org.apache.kafka.common.record;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.network.TransportLayer;
import org.apache.kafka.common.network.TransferableChannel;
import org.apache.kafka.common.record.FileLogInputStream.FileChannelRecordBatch;
import org.apache.kafka.common.utils.AbstractIterator;
import org.apache.kafka.common.utils.Time;
@ -29,7 +29,6 @@ import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.GatheringByteChannel;
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
import java.util.Objects;
@ -270,7 +269,7 @@ public class FileRecords extends AbstractRecords implements Closeable {
}
@Override
public long writeTo(GatheringByteChannel destChannel, long offset, int length) throws IOException {
public long writeTo(TransferableChannel destChannel, long offset, int length) throws IOException {
long newSize = Math.min(channel.size(), end) - start;
int oldSize = sizeInBytes();
if (newSize < oldSize)
@ -280,14 +279,7 @@ public class FileRecords extends AbstractRecords implements Closeable {
long position = start + offset;
int count = Math.min(length, oldSize);
final long bytesTransferred;
if (destChannel instanceof TransportLayer) {
TransportLayer tl = (TransportLayer) destChannel;
bytesTransferred = tl.transferFrom(channel, position, count);
} else {
bytesTransferred = channel.transferTo(position, count, destChannel);
}
return bytesTransferred;
return destChannel.transferFrom(channel, position, count);
}
/**

View File

@ -77,8 +77,8 @@ public class LazyDownConversionRecords implements BaseRecords {
}
@Override
public LazyDownConversionRecordsSend toSend(String destination) {
return new LazyDownConversionRecordsSend(destination, this);
public LazyDownConversionRecordsSend toSend() {
return new LazyDownConversionRecordsSend(this);
}
public TopicPartition topicPartition() {

View File

@ -18,12 +18,13 @@ package org.apache.kafka.common.record;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnsupportedCompressionTypeException;
import org.apache.kafka.common.network.TransferableChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.GatheringByteChannel;
import java.util.Iterator;
/**
@ -39,8 +40,8 @@ public final class LazyDownConversionRecordsSend extends RecordsSend<LazyDownCon
private RecordsSend convertedRecordsWriter;
private Iterator<ConvertedRecords<?>> convertedRecordsIterator;
public LazyDownConversionRecordsSend(String destination, LazyDownConversionRecords records) {
super(destination, records, records.sizeInBytes());
public LazyDownConversionRecordsSend(LazyDownConversionRecords records) {
super(records, records.sizeInBytes());
convertedRecordsWriter = null;
recordConversionStats = new RecordConversionStats();
convertedRecordsIterator = records().iterator(MAX_READ_SIZE);
@ -66,7 +67,7 @@ public final class LazyDownConversionRecordsSend extends RecordsSend<LazyDownCon
}
@Override
public long writeTo(GatheringByteChannel channel, long previouslyWritten, int remaining) throws IOException {
public long writeTo(TransferableChannel channel, long previouslyWritten, int remaining) throws IOException {
if (convertedRecordsWriter == null || convertedRecordsWriter.completed()) {
MemoryRecords convertedRecords;
@ -90,7 +91,7 @@ public final class LazyDownConversionRecordsSend extends RecordsSend<LazyDownCon
convertedRecords = buildOverflowBatch(remaining);
}
convertedRecordsWriter = new DefaultRecordsSend(destination(), convertedRecords, Math.min(convertedRecords.sizeInBytes(), remaining));
convertedRecordsWriter = new DefaultRecordsSend(convertedRecords, Math.min(convertedRecords.sizeInBytes(), remaining));
}
return convertedRecordsWriter.writeTo(channel);
}

View File

@ -19,11 +19,13 @@ package org.apache.kafka.common.record;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.CorruptRecordException;
import org.apache.kafka.common.message.LeaderChangeMessage;
import org.apache.kafka.common.network.TransferableChannel;
import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention;
import org.apache.kafka.common.utils.AbstractIterator;
import org.apache.kafka.common.utils.ByteBufferOutputStream;
import org.apache.kafka.common.utils.CloseableIterator;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -62,7 +64,7 @@ public class MemoryRecords extends AbstractRecords {
}
@Override
public long writeTo(GatheringByteChannel channel, long position, int length) throws IOException {
public long writeTo(TransferableChannel channel, long position, int length) throws IOException {
if (position > Integer.MAX_VALUE)
throw new IllegalArgumentException("position should not be greater than Integer.MAX_VALUE: " + position);
if (position + length > buffer.limit())

View File

@ -19,11 +19,12 @@ package org.apache.kafka.common.record;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.network.Send;
import org.apache.kafka.common.network.TransferableChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.channels.GatheringByteChannel;
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
@ -34,7 +35,6 @@ import java.util.Queue;
public class MultiRecordsSend implements Send {
private static final Logger log = LoggerFactory.getLogger(MultiRecordsSend.class);
private final String dest;
private final Queue<Send> sendQueue;
private final long size;
private Map<TopicPartition, RecordConversionStats> recordConversionStats;
@ -43,11 +43,10 @@ public class MultiRecordsSend implements Send {
private Send current;
/**
* Construct a MultiRecordsSend for the given destination from a queue of Send objects. The queue will be
* consumed as the MultiRecordsSend progresses (on completion, it will be empty).
* Construct a MultiRecordsSend from a queue of Send objects. The queue will be consumed as the MultiRecordsSend
* progresses (on completion, it will be empty).
*/
public MultiRecordsSend(String dest, Queue<Send> sends) {
this.dest = dest;
public MultiRecordsSend(Queue<Send> sends) {
this.sendQueue = sends;
long size = 0;
@ -58,8 +57,7 @@ public class MultiRecordsSend implements Send {
this.current = sendQueue.poll();
}
public MultiRecordsSend(String dest, Queue<Send> sends, long size) {
this.dest = dest;
public MultiRecordsSend(Queue<Send> sends, long size) {
this.sendQueue = sends;
this.size = size;
this.current = sendQueue.poll();
@ -70,11 +68,6 @@ public class MultiRecordsSend implements Send {
return size;
}
@Override
public String destination() {
return dest;
}
@Override
public boolean completed() {
return current == null;
@ -90,7 +83,7 @@ public class MultiRecordsSend implements Send {
}
@Override
public long writeTo(GatheringByteChannel channel) throws IOException {
public long writeTo(TransferableChannel channel) throws IOException {
if (completed())
throw new KafkaException("This operation cannot be invoked on a complete request.");
@ -128,8 +121,7 @@ public class MultiRecordsSend implements Send {
@Override
public String toString() {
return "MultiRecordsSend(" +
"dest='" + dest + "'" +
", size=" + size +
"size=" + size +
", totalWritten=" + totalWritten +
')';
}

View File

@ -16,11 +16,11 @@
*/
package org.apache.kafka.common.record;
import org.apache.kafka.common.network.TransferableChannel;
import org.apache.kafka.common.utils.AbstractIterator;
import org.apache.kafka.common.utils.Time;
import java.io.IOException;
import java.nio.channels.GatheringByteChannel;
import java.util.Iterator;
@ -64,7 +64,7 @@ public interface Records extends BaseRecords {
* @return The number of bytes actually written
* @throws IOException For any IO errors
*/
long writeTo(GatheringByteChannel channel, long position, int length) throws IOException;
long writeTo(TransferableChannel channel, long position, int length) throws IOException;
/**
* Get the record batches. Note that the signature allows subclasses

View File

@ -17,41 +17,33 @@
package org.apache.kafka.common.record;
import org.apache.kafka.common.network.Send;
import org.apache.kafka.common.network.TransportLayers;
import org.apache.kafka.common.network.TransferableChannel;
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.GatheringByteChannel;
public abstract class RecordsSend<T extends BaseRecords> implements Send {
private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocate(0);
private final String destination;
private final T records;
private final int maxBytesToWrite;
private int remaining;
private boolean pending = false;
protected RecordsSend(String destination, T records, int maxBytesToWrite) {
this.destination = destination;
protected RecordsSend(T records, int maxBytesToWrite) {
this.records = records;
this.maxBytesToWrite = maxBytesToWrite;
this.remaining = maxBytesToWrite;
}
@Override
public String destination() {
return destination;
}
@Override
public boolean completed() {
return remaining <= 0 && !pending;
}
@Override
public final long writeTo(GatheringByteChannel channel) throws IOException {
public final long writeTo(TransferableChannel channel) throws IOException {
long written = 0;
if (remaining > 0) {
@ -61,7 +53,7 @@ public abstract class RecordsSend<T extends BaseRecords> implements Send {
remaining -= written;
}
pending = TransportLayers.hasPendingWrites(channel);
pending = channel.hasPendingWrites();
if (remaining <= 0 && pending)
channel.write(EMPTY_BYTE_BUFFER);
@ -83,10 +75,10 @@ public abstract class RecordsSend<T extends BaseRecords> implements Send {
* the to maximum bytes we want to write the to `channel`. `previouslyWritten` and `remaining` will be adjusted
* appropriately for every subsequent invocation. See {@link #writeTo} for example expected usage.
* @param channel The channel to write to
* @param previouslyWritten Bytes written in previous calls to {@link #writeTo(GatheringByteChannel, long, int)}; 0 if being called for the first time
* @param previouslyWritten Bytes written in previous calls to {@link #writeTo(TransferableChannel, long, int)}; 0 if being called for the first time
* @param remaining Number of bytes remaining to be written
* @return The number of bytes actually written
* @throws IOException For any IO errors
*/
protected abstract long writeTo(GatheringByteChannel channel, long previouslyWritten, int remaining) throws IOException;
protected abstract long writeTo(TransferableChannel channel, long previouslyWritten, int remaining) throws IOException;
}

View File

@ -20,7 +20,7 @@ import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.network.Send;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.Message;
import org.apache.kafka.common.protocol.MessageUtil;
import org.apache.kafka.common.protocol.ObjectSerializationCache;
import org.apache.kafka.common.protocol.SendBuilder;
@ -97,20 +97,13 @@ public abstract class AbstractRequest implements AbstractRequestResponse {
return apiKey;
}
public final Send toSend(String destination, RequestHeader header) {
return SendBuilder.buildRequestSend(destination, header, data());
public final Send toSend(RequestHeader header) {
return SendBuilder.buildRequestSend(header, data());
}
// Visible for testing
public final ByteBuffer serializeWithHeader(RequestHeader header) {
return RequestUtils.serialize(header.data(), header.headerVersion(), data(), version);
}
protected abstract Message data();
// Visible for testing
public final ByteBuffer serializeBody() {
return RequestUtils.serialize(null, (short) 0, data(), version);
public final ByteBuffer serialize() {
return MessageUtil.toByteBuffer(data(), version);
}
// Visible for testing
@ -166,7 +159,7 @@ public abstract class AbstractRequest implements AbstractRequestResponse {
case FETCH:
return FetchRequest.parse(buffer, apiVersion);
case LIST_OFFSETS:
return ListOffsetRequest.parse(buffer, apiVersion);
return ListOffsetsRequest.parse(buffer, apiVersion);
case METADATA:
return MetadataRequest.parse(buffer, apiVersion);
case OFFSET_COMMIT:
@ -279,6 +272,8 @@ public abstract class AbstractRequest implements AbstractRequestResponse {
return UpdateFeaturesRequest.parse(buffer, apiVersion);
case ENVELOPE:
return EnvelopeRequest.parse(buffer, apiVersion);
case FETCH_SNAPSHOT:
return FetchSnapshotRequest.parse(buffer, apiVersion);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " +
"code should be updated to do so.", apiKey));

View File

@ -16,4 +16,9 @@
*/
package org.apache.kafka.common.requests;
public interface AbstractRequestResponse { }
import org.apache.kafka.common.protocol.ApiMessage;
public interface AbstractRequestResponse {
ApiMessage data();
}

View File

@ -19,7 +19,7 @@ package org.apache.kafka.common.requests;
import org.apache.kafka.common.network.Send;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.Message;
import org.apache.kafka.common.protocol.MessageUtil;
import org.apache.kafka.common.protocol.SendBuilder;
import java.nio.ByteBuffer;
@ -27,7 +27,6 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ -40,25 +39,20 @@ public abstract class AbstractResponse implements AbstractRequestResponse {
this.apiKey = apiKey;
}
public final Send toSend(String destination, ResponseHeader header, short version) {
return SendBuilder.buildResponseSend(destination, header, data(), version);
public final Send toSend(ResponseHeader header, short version) {
return SendBuilder.buildResponseSend(header, data(), version);
}
/**
* Visible for testing, typically {@link #toSend(String, ResponseHeader, short)} should be used instead.
* Serializes header and body without prefixing with size (unlike `toSend`, which does include a size prefix).
*/
public final ByteBuffer serializeWithHeader(short version, int correlationId) {
return serializeWithHeader(new ResponseHeader(correlationId, apiKey.responseHeaderVersion(version)), version);
}
final ByteBuffer serializeWithHeader(ResponseHeader header, short version) {
Objects.requireNonNull(header, "header should not be null");
return RequestUtils.serialize(header.data(), header.headerVersion(), data(), version);
}
// Visible for testing
final ByteBuffer serializeBody(short version) {
return RequestUtils.serialize(null, (short) 0, data(), version);
final ByteBuffer serialize(short version) {
return MessageUtil.toByteBuffer(data(), version);
}
/**
@ -95,8 +89,6 @@ public abstract class AbstractResponse implements AbstractRequestResponse {
errorCounts.put(error, count + 1);
}
protected abstract Message data();
/**
* Parse a response from the provided buffer. The buffer is expected to hold both
* the {@link ResponseHeader} as well as the response payload.
@ -124,7 +116,7 @@ public abstract class AbstractResponse implements AbstractRequestResponse {
case FETCH:
return FetchResponse.parse(responseBuffer, version);
case LIST_OFFSETS:
return ListOffsetResponse.parse(responseBuffer, version);
return ListOffsetsResponse.parse(responseBuffer, version);
case METADATA:
return MetadataResponse.parse(responseBuffer, version);
case OFFSET_COMMIT:
@ -237,6 +229,8 @@ public abstract class AbstractResponse implements AbstractRequestResponse {
return UpdateFeaturesResponse.parse(responseBuffer, version);
case ENVELOPE:
return EnvelopeResponse.parse(responseBuffer, version);
case FETCH_SNAPSHOT:
return FetchSnapshotResponse.parse(responseBuffer, version);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " +
"code should be updated to do so.", apiKey));

View File

@ -26,7 +26,7 @@ import java.nio.ByteBuffer;
public class AddOffsetsToTxnRequest extends AbstractRequest {
public AddOffsetsToTxnRequestData data;
private final AddOffsetsToTxnRequestData data;
public static class Builder extends AbstractRequest.Builder<AddOffsetsToTxnRequest> {
public AddOffsetsToTxnRequestData data;
@ -53,7 +53,7 @@ public class AddOffsetsToTxnRequest extends AbstractRequest {
}
@Override
protected AddOffsetsToTxnRequestData data() {
public AddOffsetsToTxnRequestData data() {
return data;
}

View File

@ -39,7 +39,7 @@ import java.util.Map;
*/
public class AddOffsetsToTxnResponse extends AbstractResponse {
public AddOffsetsToTxnResponseData data;
private final AddOffsetsToTxnResponseData data;
public AddOffsetsToTxnResponse(AddOffsetsToTxnResponseData data) {
super(ApiKeys.ADD_OFFSETS_TO_TXN);
@ -57,7 +57,7 @@ public class AddOffsetsToTxnResponse extends AbstractResponse {
}
@Override
protected AddOffsetsToTxnResponseData data() {
public AddOffsetsToTxnResponseData data() {
return data;
}

View File

@ -32,7 +32,7 @@ import java.util.Map;
public class AddPartitionsToTxnRequest extends AbstractRequest {
public final AddPartitionsToTxnRequestData data;
private final AddPartitionsToTxnRequestData data;
private List<TopicPartition> cachedPartitions = null;
@ -112,7 +112,7 @@ public class AddPartitionsToTxnRequest extends AbstractRequest {
}
@Override
protected AddPartitionsToTxnRequestData data() {
public AddPartitionsToTxnRequestData data() {
return data;
}

View File

@ -46,7 +46,7 @@ import java.util.Map;
*/
public class AddPartitionsToTxnResponse extends AbstractResponse {
public final AddPartitionsToTxnResponseData data;
private final AddPartitionsToTxnResponseData data;
private Map<TopicPartition, Errors> cachedErrorsMap = null;
@ -117,7 +117,7 @@ public class AddPartitionsToTxnResponse extends AbstractResponse {
}
@Override
protected AddPartitionsToTxnResponseData data() {
public AddPartitionsToTxnResponseData data() {
return data;
}

View File

@ -111,7 +111,7 @@ public class AlterClientQuotasRequest extends AbstractRequest {
}
@Override
protected AlterClientQuotasRequestData data() {
public AlterClientQuotasRequestData data() {
return data;
}

View File

@ -77,7 +77,7 @@ public class AlterClientQuotasResponse extends AbstractResponse {
}
@Override
protected AlterClientQuotasResponseData data() {
public AlterClientQuotasResponseData data() {
return data;
}

View File

@ -112,7 +112,7 @@ public class AlterConfigsRequest extends AbstractRequest {
}
@Override
protected AlterConfigsRequestData data() {
public AlterConfigsRequestData data() {
return data;
}

View File

@ -34,6 +34,7 @@ public class AlterIsrRequest extends AbstractRequest {
this.data = data;
}
@Override
public AlterIsrRequestData data() {
return data;
}

View File

@ -35,6 +35,7 @@ public class AlterIsrResponse extends AbstractResponse {
this.data = data;
}
@Override
public AlterIsrResponseData data() {
return data;
}

View File

@ -40,6 +40,7 @@ public class AlterPartitionReassignmentsResponse extends AbstractResponse {
new AlterPartitionReassignmentsResponseData(new ByteBufferAccessor(buffer), version));
}
@Override
public AlterPartitionReassignmentsResponseData data() {
return data;
}

View File

@ -61,7 +61,7 @@ public class AlterReplicaLogDirsRequest extends AbstractRequest {
}
@Override
protected AlterReplicaLogDirsRequestData data() {
public AlterReplicaLogDirsRequestData data() {
return data;
}

View File

@ -59,6 +59,7 @@ public class AlterUserScramCredentialsRequest extends AbstractRequest {
return new AlterUserScramCredentialsRequest(new AlterUserScramCredentialsRequestData(new ByteBufferAccessor(buffer), version), version);
}
@Override
public AlterUserScramCredentialsRequestData data() {
return data;
}

View File

@ -33,6 +33,7 @@ public class AlterUserScramCredentialsResponse extends AbstractResponse {
this.data = responseData;
}
@Override
public AlterUserScramCredentialsResponseData data() {
return data;
}

View File

@ -46,7 +46,7 @@ public class ApiVersionsResponse extends AbstractResponse {
public static final ApiVersionsResponse DEFAULT_API_VERSIONS_RESPONSE = createApiVersionsResponse(
DEFAULT_THROTTLE_TIME, RecordBatch.CURRENT_MAGIC_VALUE);
public final ApiVersionsResponseData data;
private final ApiVersionsResponseData data;
public ApiVersionsResponse(ApiVersionsResponseData data) {
super(ApiKeys.API_VERSIONS);

View File

@ -46,7 +46,7 @@ public class BeginQuorumEpochRequest extends AbstractRequest {
}
}
public final BeginQuorumEpochRequestData data;
private final BeginQuorumEpochRequestData data;
private BeginQuorumEpochRequest(BeginQuorumEpochRequestData data, short version) {
super(ApiKeys.BEGIN_QUORUM_EPOCH, version);
@ -54,7 +54,7 @@ public class BeginQuorumEpochRequest extends AbstractRequest {
}
@Override
protected BeginQuorumEpochRequestData data() {
public BeginQuorumEpochRequestData data() {
return data;
}

View File

@ -42,7 +42,7 @@ import java.util.Map;
* - {@link Errors#UNKNOWN_TOPIC_OR_PARTITION}
*/
public class BeginQuorumEpochResponse extends AbstractResponse {
public final BeginQuorumEpochResponseData data;
private final BeginQuorumEpochResponseData data;
public BeginQuorumEpochResponse(BeginQuorumEpochResponseData data) {
super(ApiKeys.BEGIN_QUORUM_EPOCH);
@ -86,7 +86,7 @@ public class BeginQuorumEpochResponse extends AbstractResponse {
}
@Override
protected BeginQuorumEpochResponseData data() {
public BeginQuorumEpochResponseData data() {
return data;
}

View File

@ -65,6 +65,7 @@ public class ControlledShutdownRequest extends AbstractRequest {
version);
}
@Override
public ControlledShutdownRequestData data() {
return data;
}

View File

@ -62,6 +62,7 @@ public class ControlledShutdownResponse extends AbstractResponse {
return new ControlledShutdownResponse(new ControlledShutdownResponseData(new ByteBufferAccessor(buffer), version));
}
@Override
public ControlledShutdownResponseData data() {
return data;
}

View File

@ -70,7 +70,7 @@ public class CreateAclsRequest extends AbstractRequest {
}
@Override
protected CreateAclsRequestData data() {
public CreateAclsRequestData data() {
return data;
}

View File

@ -34,7 +34,7 @@ public class CreateAclsResponse extends AbstractResponse {
}
@Override
protected CreateAclsResponseData data() {
public CreateAclsResponseData data() {
return data;
}

View File

@ -38,6 +38,7 @@ public class CreateDelegationTokenRequest extends AbstractRequest {
version);
}
@Override
public CreateDelegationTokenRequestData data() {
return data;
}

View File

@ -64,6 +64,7 @@ public class CreateDelegationTokenResponse extends AbstractResponse {
return prepareResponse(throttleTimeMs, error, owner, -1, -1, -1, "", ByteBuffer.wrap(new byte[] {}));
}
@Override
public CreateDelegationTokenResponseData data() {
return data;
}

View File

@ -55,6 +55,7 @@ public class CreatePartitionsRequest extends AbstractRequest {
this.data = data;
}
@Override
public CreatePartitionsRequestData data() {
return data;
}

View File

@ -35,6 +35,7 @@ public class CreatePartitionsResponse extends AbstractResponse {
this.data = data;
}
@Override
public CreatePartitionsResponseData data() {
return data;
}

View File

@ -77,6 +77,7 @@ public class CreateTopicsRequest extends AbstractRequest {
this.data = data;
}
@Override
public CreateTopicsRequestData data() {
return data;
}

View File

@ -50,6 +50,7 @@ public class CreateTopicsResponse extends AbstractResponse {
this.data = data;
}
@Override
public CreateTopicsResponseData data() {
return data;
}

View File

@ -100,7 +100,7 @@ public class DeleteAclsRequest extends AbstractRequest {
}
@Override
protected DeleteAclsRequestData data() {
public DeleteAclsRequestData data() {
return data;
}

View File

@ -51,7 +51,7 @@ public class DeleteAclsResponse extends AbstractResponse {
}
@Override
protected DeleteAclsResponseData data() {
public DeleteAclsResponseData data() {
return data;
}

View File

@ -46,7 +46,7 @@ public class DeleteGroupsRequest extends AbstractRequest {
}
}
public final DeleteGroupsRequestData data;
private final DeleteGroupsRequestData data;
public DeleteGroupsRequest(DeleteGroupsRequestData data, short version) {
super(ApiKeys.DELETE_GROUPS, version);
@ -75,7 +75,7 @@ public class DeleteGroupsRequest extends AbstractRequest {
}
@Override
protected DeleteGroupsRequestData data() {
public DeleteGroupsRequestData data() {
return data;
}
}

View File

@ -39,7 +39,7 @@ import java.util.Map;
*/
public class DeleteGroupsResponse extends AbstractResponse {
public final DeleteGroupsResponseData data;
private final DeleteGroupsResponseData data;
public DeleteGroupsResponse(DeleteGroupsResponseData data) {
super(ApiKeys.DELETE_GROUPS);
@ -47,7 +47,7 @@ public class DeleteGroupsResponse extends AbstractResponse {
}
@Override
protected DeleteGroupsResponseData data() {
public DeleteGroupsResponseData data() {
return data;
}

View File

@ -46,6 +46,7 @@ public class DeleteRecordsResponse extends AbstractResponse {
this.data = data;
}
@Override
public DeleteRecordsResponseData data() {
return data;
}

View File

@ -52,6 +52,7 @@ public class DeleteTopicsRequest extends AbstractRequest {
this.data = data;
}
@Override
public DeleteTopicsRequestData data() {
return data;
}

View File

@ -38,7 +38,7 @@ public class DeleteTopicsResponse extends AbstractResponse {
* INVALID_REQUEST(42)
* TOPIC_DELETION_DISABLED(73)
*/
private DeleteTopicsResponseData data;
private final DeleteTopicsResponseData data;
public DeleteTopicsResponse(DeleteTopicsResponseData data) {
super(ApiKeys.DELETE_TOPICS);
@ -50,6 +50,7 @@ public class DeleteTopicsResponse extends AbstractResponse {
return data.throttleTimeMs();
}
@Override
public DeleteTopicsResponseData data() {
return data;
}

View File

@ -89,6 +89,7 @@ public class DescribeAclsRequest extends AbstractRequest {
}
}
@Override
public DescribeAclsRequestData data() {
return data;
}

View File

@ -62,7 +62,7 @@ public class DescribeAclsResponse extends AbstractResponse {
}
@Override
protected DescribeAclsResponseData data() {
public DescribeAclsResponseData data() {
return data;
}

View File

@ -106,7 +106,7 @@ public class DescribeClientQuotasRequest extends AbstractRequest {
}
@Override
protected DescribeClientQuotasRequestData data() {
public DescribeClientQuotasRequestData data() {
return data;
}

View File

@ -71,7 +71,7 @@ public class DescribeClientQuotasResponse extends AbstractResponse {
}
@Override
protected DescribeClientQuotasResponseData data() {
public DescribeClientQuotasResponseData data() {
return data;
}

View File

@ -60,6 +60,7 @@ public class DescribeDelegationTokenRequest extends AbstractRequest {
this.data = data;
}
@Override
public DescribeDelegationTokenRequestData data() {
return data;
}

View File

@ -80,7 +80,7 @@ public class DescribeDelegationTokenResponse extends AbstractResponse {
}
@Override
protected DescribeDelegationTokenResponseData data() {
public DescribeDelegationTokenResponseData data() {
return data;
}

View File

@ -52,6 +52,7 @@ public class DescribeGroupsRequest extends AbstractRequest {
this.data = data;
}
@Override
public DescribeGroupsRequestData data() {
return data;
}

View File

@ -44,7 +44,7 @@ public class DescribeGroupsResponse extends AbstractResponse {
* AUTHORIZATION_FAILED (29)
*/
private DescribeGroupsResponseData data;
private final DescribeGroupsResponseData data;
public DescribeGroupsResponse(DescribeGroupsResponseData data) {
super(ApiKeys.DESCRIBE_GROUPS);
@ -105,6 +105,7 @@ public class DescribeGroupsResponse extends AbstractResponse {
return groupMetadata;
}
@Override
public DescribeGroupsResponseData data() {
return data;
}

View File

@ -39,6 +39,7 @@ public class DescribeLogDirsResponse extends AbstractResponse {
this.data = data;
}
@Override
public DescribeLogDirsResponseData data() {
return data;
}

View File

@ -49,7 +49,7 @@ public class DescribeQuorumRequest extends AbstractRequest {
}
}
public final DescribeQuorumRequestData data;
private final DescribeQuorumRequestData data;
private DescribeQuorumRequest(DescribeQuorumRequestData data, short version) {
super(ApiKeys.DESCRIBE_QUORUM, version);
@ -72,7 +72,7 @@ public class DescribeQuorumRequest extends AbstractRequest {
}
@Override
protected DescribeQuorumRequestData data() {
public DescribeQuorumRequestData data() {
return data;
}

View File

@ -41,7 +41,7 @@ import java.util.Map;
* - {@link Errors#UNKNOWN_TOPIC_OR_PARTITION}
*/
public class DescribeQuorumResponse extends AbstractResponse {
public final DescribeQuorumResponseData data;
private final DescribeQuorumResponseData data;
public DescribeQuorumResponse(DescribeQuorumResponseData data) {
super(ApiKeys.DESCRIBE_QUORUM);
@ -63,7 +63,7 @@ public class DescribeQuorumResponse extends AbstractResponse {
}
@Override
protected DescribeQuorumResponseData data() {
public DescribeQuorumResponseData data() {
return data;
}

View File

@ -44,7 +44,7 @@ public class DescribeUserScramCredentialsRequest extends AbstractRequest {
}
}
private DescribeUserScramCredentialsRequestData data;
private final DescribeUserScramCredentialsRequestData data;
private final short version;
private DescribeUserScramCredentialsRequest(DescribeUserScramCredentialsRequestData data, short version) {
@ -58,6 +58,7 @@ public class DescribeUserScramCredentialsRequest extends AbstractRequest {
new ByteBufferAccessor(buffer), version), version);
}
@Override
public DescribeUserScramCredentialsRequestData data() {
return data;
}

View File

@ -33,6 +33,7 @@ public class DescribeUserScramCredentialsResponse extends AbstractResponse {
this.data = responseData;
}
@Override
public DescribeUserScramCredentialsResponseData data() {
return data;
}

View File

@ -90,6 +90,7 @@ public class ElectLeadersRequest extends AbstractRequest {
this.data = data;
}
@Override
public ElectLeadersRequestData data() {
return data;
}

View File

@ -52,6 +52,7 @@ public class ElectLeadersResponse extends AbstractResponse {
data.setReplicaElectionResults(electionResults);
}
@Override
public ElectLeadersResponseData data() {
return data;
}

View File

@ -47,7 +47,7 @@ public class EndQuorumEpochRequest extends AbstractRequest {
}
}
public final EndQuorumEpochRequestData data;
private final EndQuorumEpochRequestData data;
private EndQuorumEpochRequest(EndQuorumEpochRequestData data, short version) {
super(ApiKeys.END_QUORUM_EPOCH, version);
@ -55,7 +55,7 @@ public class EndQuorumEpochRequest extends AbstractRequest {
}
@Override
protected EndQuorumEpochRequestData data() {
public EndQuorumEpochRequestData data() {
return data;
}

View File

@ -42,7 +42,7 @@ import java.util.Map;
* - {@link Errors#UNKNOWN_TOPIC_OR_PARTITION}
*/
public class EndQuorumEpochResponse extends AbstractResponse {
public final EndQuorumEpochResponseData data;
private final EndQuorumEpochResponseData data;
public EndQuorumEpochResponse(EndQuorumEpochResponseData data) {
super(ApiKeys.END_QUORUM_EPOCH);
@ -64,7 +64,7 @@ public class EndQuorumEpochResponse extends AbstractResponse {
}
@Override
protected EndQuorumEpochResponseData data() {
public EndQuorumEpochResponseData data() {
return data;
}

View File

@ -26,7 +26,7 @@ import java.nio.ByteBuffer;
public class EndTxnRequest extends AbstractRequest {
public final EndTxnRequestData data;
private final EndTxnRequestData data;
public static class Builder extends AbstractRequest.Builder<EndTxnRequest> {
public final EndTxnRequestData data;
@ -60,7 +60,7 @@ public class EndTxnRequest extends AbstractRequest {
}
@Override
protected EndTxnRequestData data() {
public EndTxnRequestData data() {
return data;
}

View File

@ -38,7 +38,7 @@ import java.util.Map;
*/
public class EndTxnResponse extends AbstractResponse {
public final EndTxnResponseData data;
private final EndTxnResponseData data;
public EndTxnResponse(EndTxnResponseData data) {
super(ApiKeys.END_TXN);
@ -61,7 +61,7 @@ public class EndTxnResponse extends AbstractResponse {
}
@Override
protected EndTxnResponseData data() {
public EndTxnResponseData data() {
return data;
}

View File

@ -80,6 +80,7 @@ public class EnvelopeRequest extends AbstractRequest {
return new EnvelopeRequest(new EnvelopeRequestData(new ByteBufferAccessor(buffer), version), version);
}
@Override
public EnvelopeRequestData data() {
return data;
}

Some files were not shown because too many files have changed in this diff Show More