KAFKA-16950: Define Persister interfaces and RPCs (#16335)

Define the interfaces and RPCs for share-group persistence. (KIP-932). This PR is just RPCs and interfaces to allow building of the broker components which depend upon them. The implementation will follow in subsequent PRs.

Reviewers:  Manikumar Reddy <manikumar.reddy@gmail.com>, Apoorv Mittal <apoorvmittal10@gmail.com>
This commit is contained in:
Andrew Schofield 2024-06-15 16:22:49 +01:00 committed by GitHub
parent adee6bae45
commit fecbfb8133
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
55 changed files with 3081 additions and 2 deletions

View File

@ -125,7 +125,12 @@ public enum ApiKeys {
SHARE_ACKNOWLEDGE(ApiMessageType.SHARE_ACKNOWLEDGE),
ADD_RAFT_VOTER(ApiMessageType.ADD_RAFT_VOTER),
REMOVE_RAFT_VOTER(ApiMessageType.REMOVE_RAFT_VOTER),
UPDATE_RAFT_VOTER(ApiMessageType.UPDATE_RAFT_VOTER);
UPDATE_RAFT_VOTER(ApiMessageType.UPDATE_RAFT_VOTER),
INITIALIZE_SHARE_GROUP_STATE(ApiMessageType.INITIALIZE_SHARE_GROUP_STATE, true),
READ_SHARE_GROUP_STATE(ApiMessageType.READ_SHARE_GROUP_STATE, true),
WRITE_SHARE_GROUP_STATE(ApiMessageType.WRITE_SHARE_GROUP_STATE, true),
DELETE_SHARE_GROUP_STATE(ApiMessageType.DELETE_SHARE_GROUP_STATE, true),
READ_SHARE_GROUP_STATE_SUMMARY(ApiMessageType.READ_SHARE_GROUP_STATE_SUMMARY, true);
private static final Map<ApiMessageType.ListenerType, EnumSet<ApiKeys>> APIS_BY_LISTENER =
new EnumMap<>(ApiMessageType.ListenerType.class);

View File

@ -340,6 +340,16 @@ public abstract class AbstractRequest implements AbstractRequestResponse {
return RemoveRaftVoterRequest.parse(buffer, apiVersion);
case UPDATE_RAFT_VOTER:
return UpdateRaftVoterRequest.parse(buffer, apiVersion);
case INITIALIZE_SHARE_GROUP_STATE:
return InitializeShareGroupStateRequest.parse(buffer, apiVersion);
case READ_SHARE_GROUP_STATE:
return ReadShareGroupStateRequest.parse(buffer, apiVersion);
case WRITE_SHARE_GROUP_STATE:
return WriteShareGroupStateRequest.parse(buffer, apiVersion);
case DELETE_SHARE_GROUP_STATE:
return DeleteShareGroupStateRequest.parse(buffer, apiVersion);
case READ_SHARE_GROUP_STATE_SUMMARY:
return ReadShareGroupStateSummaryRequest.parse(buffer, apiVersion);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " +
"code should be updated to do so.", apiKey));

View File

@ -277,6 +277,16 @@ public abstract class AbstractResponse implements AbstractRequestResponse {
return RemoveRaftVoterResponse.parse(responseBuffer, version);
case UPDATE_RAFT_VOTER:
return UpdateRaftVoterResponse.parse(responseBuffer, version);
case INITIALIZE_SHARE_GROUP_STATE:
return InitializeShareGroupStateResponse.parse(responseBuffer, version);
case READ_SHARE_GROUP_STATE:
return ReadShareGroupStateResponse.parse(responseBuffer, version);
case WRITE_SHARE_GROUP_STATE:
return WriteShareGroupStateResponse.parse(responseBuffer, version);
case DELETE_SHARE_GROUP_STATE:
return DeleteShareGroupStateResponse.parse(responseBuffer, version);
case READ_SHARE_GROUP_STATE_SUMMARY:
return ReadShareGroupStateSummaryResponse.parse(responseBuffer, version);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " +
"code should be updated to do so.", apiKey));

View File

@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
import org.apache.kafka.common.message.DeleteShareGroupStateResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
public class DeleteShareGroupStateRequest extends AbstractRequest {
public static class Builder extends AbstractRequest.Builder<DeleteShareGroupStateRequest> {
private final DeleteShareGroupStateRequestData data;
public Builder(DeleteShareGroupStateRequestData data) {
this(data, false);
}
public Builder(DeleteShareGroupStateRequestData data, boolean enableUnstableLastVersion) {
super(ApiKeys.DELETE_SHARE_GROUP_STATE, enableUnstableLastVersion);
this.data = data;
}
@Override
public DeleteShareGroupStateRequest build(short version) {
return new DeleteShareGroupStateRequest(data, version);
}
@Override
public String toString() {
return data.toString();
}
}
private final DeleteShareGroupStateRequestData data;
public DeleteShareGroupStateRequest(DeleteShareGroupStateRequestData data, short version) {
super(ApiKeys.DELETE_SHARE_GROUP_STATE, version);
this.data = data;
}
@Override
public DeleteShareGroupStateResponse getErrorResponse(int throttleTimeMs, Throwable e) {
List<DeleteShareGroupStateResponseData.DeleteStateResult> results = new ArrayList<>();
data.topics().forEach(
topicResult -> results.add(new DeleteShareGroupStateResponseData.DeleteStateResult()
.setTopicId(topicResult.topicId())
.setPartitions(topicResult.partitions().stream()
.map(partitionData -> new DeleteShareGroupStateResponseData.PartitionResult()
.setPartition(partitionData.partition())
.setErrorCode(Errors.forException(e).code()))
.collect(Collectors.toList()))));
return new DeleteShareGroupStateResponse(new DeleteShareGroupStateResponseData()
.setResults(results));
}
@Override
public DeleteShareGroupStateRequestData data() {
return data;
}
public static DeleteShareGroupStateRequest parse(ByteBuffer buffer, short version) {
return new DeleteShareGroupStateRequest(
new DeleteShareGroupStateRequestData(new ByteBufferAccessor(buffer), version),
version
);
}
}

View File

@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.message.DeleteShareGroupStateResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
public class DeleteShareGroupStateResponse extends AbstractResponse {
private final DeleteShareGroupStateResponseData data;
public DeleteShareGroupStateResponse(DeleteShareGroupStateResponseData data) {
super(ApiKeys.DELETE_SHARE_GROUP_STATE);
this.data = data;
}
@Override
public DeleteShareGroupStateResponseData data() {
return data;
}
@Override
public Map<Errors, Integer> errorCounts() {
Map<Errors, Integer> counts = new HashMap<>();
data.results().forEach(
result -> result.partitions().forEach(
partitionResult -> updateErrorCounts(counts, Errors.forCode(partitionResult.errorCode()))
)
);
return counts;
}
@Override
public int throttleTimeMs() {
return DEFAULT_THROTTLE_TIME;
}
@Override
public void maybeSetThrottleTimeMs(int throttleTimeMs) {
// No op
}
public static DeleteShareGroupStateResponse parse(ByteBuffer buffer, short version) {
return new DeleteShareGroupStateResponse(
new DeleteShareGroupStateResponseData(new ByteBufferAccessor(buffer), version)
);
}
}

View File

@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.message.InitializeShareGroupStateRequestData;
import org.apache.kafka.common.message.InitializeShareGroupStateResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
public class InitializeShareGroupStateRequest extends AbstractRequest {
public static class Builder extends AbstractRequest.Builder<InitializeShareGroupStateRequest> {
private final InitializeShareGroupStateRequestData data;
public Builder(InitializeShareGroupStateRequestData data) {
this(data, false);
}
public Builder(InitializeShareGroupStateRequestData data, boolean enableUnstableLastVersion) {
super(ApiKeys.INITIALIZE_SHARE_GROUP_STATE, enableUnstableLastVersion);
this.data = data;
}
@Override
public InitializeShareGroupStateRequest build(short version) {
return new InitializeShareGroupStateRequest(data, version);
}
@Override
public String toString() {
return data.toString();
}
}
private final InitializeShareGroupStateRequestData data;
public InitializeShareGroupStateRequest(InitializeShareGroupStateRequestData data, short version) {
super(ApiKeys.INITIALIZE_SHARE_GROUP_STATE, version);
this.data = data;
}
@Override
public InitializeShareGroupStateResponse getErrorResponse(int throttleTimeMs, Throwable e) {
List<InitializeShareGroupStateResponseData.InitializeStateResult> results = new ArrayList<>();
data.topics().forEach(
topicResult -> results.add(new InitializeShareGroupStateResponseData.InitializeStateResult()
.setTopicId(topicResult.topicId())
.setPartitions(topicResult.partitions().stream()
.map(partitionData -> new InitializeShareGroupStateResponseData.PartitionResult()
.setPartition(partitionData.partition())
.setErrorCode(Errors.forException(e).code()))
.collect(Collectors.toList()))));
return new InitializeShareGroupStateResponse(new InitializeShareGroupStateResponseData()
.setResults(results));
}
@Override
public InitializeShareGroupStateRequestData data() {
return data;
}
public static InitializeShareGroupStateRequest parse(ByteBuffer buffer, short version) {
return new InitializeShareGroupStateRequest(
new InitializeShareGroupStateRequestData(new ByteBufferAccessor(buffer), version),
version
);
}
}

View File

@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.message.InitializeShareGroupStateResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
public class InitializeShareGroupStateResponse extends AbstractResponse {
private final InitializeShareGroupStateResponseData data;
public InitializeShareGroupStateResponse(InitializeShareGroupStateResponseData data) {
super(ApiKeys.INITIALIZE_SHARE_GROUP_STATE);
this.data = data;
}
@Override
public InitializeShareGroupStateResponseData data() {
return data;
}
@Override
public Map<Errors, Integer> errorCounts() {
Map<Errors, Integer> counts = new HashMap<>();
data.results().forEach(
result -> result.partitions().forEach(
partitionResult -> updateErrorCounts(counts, Errors.forCode(partitionResult.errorCode()))
)
);
return counts;
}
@Override
public int throttleTimeMs() {
return DEFAULT_THROTTLE_TIME;
}
@Override
public void maybeSetThrottleTimeMs(int throttleTimeMs) {
// No op
}
public static InitializeShareGroupStateResponse parse(ByteBuffer buffer, short version) {
return new InitializeShareGroupStateResponse(
new InitializeShareGroupStateResponseData(new ByteBufferAccessor(buffer), version)
);
}
}

View File

@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.message.ReadShareGroupStateRequestData;
import org.apache.kafka.common.message.ReadShareGroupStateResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
public class ReadShareGroupStateRequest extends AbstractRequest {
public static class Builder extends AbstractRequest.Builder<ReadShareGroupStateRequest> {
private final ReadShareGroupStateRequestData data;
public Builder(ReadShareGroupStateRequestData data) {
this(data, true);
}
public Builder(ReadShareGroupStateRequestData data, boolean enableUnstableLastVersion) {
super(ApiKeys.READ_SHARE_GROUP_STATE, enableUnstableLastVersion);
this.data = data;
}
@Override
public ReadShareGroupStateRequest build(short version) {
return new ReadShareGroupStateRequest(data, version);
}
@Override
public String toString() {
return data.toString();
}
}
private final ReadShareGroupStateRequestData data;
public ReadShareGroupStateRequest(ReadShareGroupStateRequestData data, short version) {
super(ApiKeys.READ_SHARE_GROUP_STATE, version);
this.data = data;
}
@Override
public ReadShareGroupStateResponse getErrorResponse(int throttleTimeMs, Throwable e) {
List<ReadShareGroupStateResponseData.ReadStateResult> results = new ArrayList<>();
data.topics().forEach(
topicResult -> results.add(new ReadShareGroupStateResponseData.ReadStateResult()
.setTopicId(topicResult.topicId())
.setPartitions(topicResult.partitions().stream()
.map(partitionData -> new ReadShareGroupStateResponseData.PartitionResult()
.setPartition(partitionData.partition())
.setErrorCode(Errors.forException(e).code())
.setErrorMessage(Errors.forException(e).message()))
.collect(Collectors.toList()))));
return new ReadShareGroupStateResponse(new ReadShareGroupStateResponseData()
.setResults(results));
}
@Override
public ReadShareGroupStateRequestData data() {
return data;
}
public static ReadShareGroupStateRequest parse(ByteBuffer buffer, short version) {
return new ReadShareGroupStateRequest(
new ReadShareGroupStateRequestData(new ByteBufferAccessor(buffer), version),
version
);
}
}

View File

@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.ReadShareGroupStateResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class ReadShareGroupStateResponse extends AbstractResponse {
private final ReadShareGroupStateResponseData data;
public ReadShareGroupStateResponse(ReadShareGroupStateResponseData data) {
super(ApiKeys.READ_SHARE_GROUP_STATE);
this.data = data;
}
@Override
public ReadShareGroupStateResponseData data() {
return data;
}
@Override
public Map<Errors, Integer> errorCounts() {
Map<Errors, Integer> counts = new HashMap<>();
data.results().forEach(
result -> result.partitions().forEach(
partitionResult -> updateErrorCounts(counts, Errors.forCode(partitionResult.errorCode()))
)
);
return counts;
}
@Override
public int throttleTimeMs() {
return DEFAULT_THROTTLE_TIME;
}
@Override
public void maybeSetThrottleTimeMs(int throttleTimeMs) {
// No op
}
public static ReadShareGroupStateResponse parse(ByteBuffer buffer, short version) {
return new ReadShareGroupStateResponse(
new ReadShareGroupStateResponseData(new ByteBufferAccessor(buffer), version)
);
}
public static ReadShareGroupStateResponseData toResponseData(
Uuid topicId,
int partition,
long startOffset,
int stateEpoch,
List<ReadShareGroupStateResponseData.StateBatch> stateBatches
) {
return new ReadShareGroupStateResponseData()
.setResults(Collections.singletonList(
new ReadShareGroupStateResponseData.ReadStateResult()
.setTopicId(topicId)
.setPartitions(Collections.singletonList(
new ReadShareGroupStateResponseData.PartitionResult()
.setPartition(partition)
.setStartOffset(startOffset)
.setStateEpoch(stateEpoch)
.setStateBatches(stateBatches)
))
));
}
public static ReadShareGroupStateResponseData toErrorResponseData(Uuid topicId, int partitionId, Errors error, String errorMessage) {
return new ReadShareGroupStateResponseData().setResults(
Collections.singletonList(new ReadShareGroupStateResponseData.ReadStateResult()
.setTopicId(topicId)
.setPartitions(Collections.singletonList(new ReadShareGroupStateResponseData.PartitionResult()
.setPartition(partitionId)
.setErrorCode(error.code())
.setErrorMessage(errorMessage)))));
}
public static ReadShareGroupStateResponseData.PartitionResult toErrorResponsePartitionResult(int partitionId, Errors error, String errorMessage) {
return new ReadShareGroupStateResponseData.PartitionResult()
.setPartition(partitionId)
.setErrorCode(error.code())
.setErrorMessage(errorMessage);
}
public static ReadShareGroupStateResponseData.ReadStateResult toResponseReadStateResult(Uuid topicId, List<ReadShareGroupStateResponseData.PartitionResult> partitionResults) {
return new ReadShareGroupStateResponseData.ReadStateResult()
.setTopicId(topicId)
.setPartitions(partitionResults);
}
}

View File

@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.message.ReadShareGroupStateSummaryRequestData;
import org.apache.kafka.common.message.ReadShareGroupStateSummaryResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
public class ReadShareGroupStateSummaryRequest extends AbstractRequest {
public static class Builder extends AbstractRequest.Builder<ReadShareGroupStateSummaryRequest> {
private final ReadShareGroupStateSummaryRequestData data;
public Builder(ReadShareGroupStateSummaryRequestData data) {
this(data, false);
}
public Builder(ReadShareGroupStateSummaryRequestData data, boolean enableUnstableLastVersion) {
super(ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY, enableUnstableLastVersion);
this.data = data;
}
@Override
public ReadShareGroupStateSummaryRequest build(short version) {
return new ReadShareGroupStateSummaryRequest(data, version);
}
@Override
public String toString() {
return data.toString();
}
}
private final ReadShareGroupStateSummaryRequestData data;
public ReadShareGroupStateSummaryRequest(ReadShareGroupStateSummaryRequestData data, short version) {
super(ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY, version);
this.data = data;
}
@Override
public ReadShareGroupStateSummaryResponse getErrorResponse(int throttleTimeMs, Throwable e) {
List<ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult> results = new ArrayList<>();
data.topics().forEach(
topicResult -> results.add(new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult()
.setTopicId(topicResult.topicId())
.setPartitions(topicResult.partitions().stream()
.map(partitionData -> new ReadShareGroupStateSummaryResponseData.PartitionResult()
.setPartition(partitionData.partition())
.setErrorCode(Errors.forException(e).code())
.setErrorMessage(Errors.forException(e).message()))
.collect(Collectors.toList()))));
return new ReadShareGroupStateSummaryResponse(new ReadShareGroupStateSummaryResponseData()
.setResults(results));
}
@Override
public ReadShareGroupStateSummaryRequestData data() {
return data;
}
public static ReadShareGroupStateSummaryRequest parse(ByteBuffer buffer, short version) {
return new ReadShareGroupStateSummaryRequest(
new ReadShareGroupStateSummaryRequestData(new ByteBufferAccessor(buffer), version),
version
);
}
}

View File

@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.message.ReadShareGroupStateSummaryResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
public class ReadShareGroupStateSummaryResponse extends AbstractResponse {
private final ReadShareGroupStateSummaryResponseData data;
public ReadShareGroupStateSummaryResponse(ReadShareGroupStateSummaryResponseData data) {
super(ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY);
this.data = data;
}
@Override
public ReadShareGroupStateSummaryResponseData data() {
return data;
}
@Override
public Map<Errors, Integer> errorCounts() {
Map<Errors, Integer> counts = new HashMap<>();
data.results().forEach(
result -> result.partitions().forEach(
partitionResult -> updateErrorCounts(counts, Errors.forCode(partitionResult.errorCode()))
)
);
return counts;
}
@Override
public int throttleTimeMs() {
return DEFAULT_THROTTLE_TIME;
}
@Override
public void maybeSetThrottleTimeMs(int throttleTimeMs) {
// No op
}
public static ReadShareGroupStateSummaryResponse parse(ByteBuffer buffer, short version) {
return new ReadShareGroupStateSummaryResponse(
new ReadShareGroupStateSummaryResponseData(new ByteBufferAccessor(buffer), version)
);
}
}

View File

@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.message.WriteShareGroupStateRequestData;
import org.apache.kafka.common.message.WriteShareGroupStateResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
public class WriteShareGroupStateRequest extends AbstractRequest {
public static class Builder extends AbstractRequest.Builder<WriteShareGroupStateRequest> {
private final WriteShareGroupStateRequestData data;
public Builder(WriteShareGroupStateRequestData data) {
this(data, true);
}
public Builder(WriteShareGroupStateRequestData data, boolean enableUnstableLastVersion) {
super(ApiKeys.WRITE_SHARE_GROUP_STATE, enableUnstableLastVersion);
this.data = data;
}
@Override
public WriteShareGroupStateRequest build(short version) {
return new WriteShareGroupStateRequest(data, version);
}
@Override
public String toString() {
return data.toString();
}
}
private final WriteShareGroupStateRequestData data;
public WriteShareGroupStateRequest(WriteShareGroupStateRequestData data, short version) {
super(ApiKeys.WRITE_SHARE_GROUP_STATE, version);
this.data = data;
}
@Override
public WriteShareGroupStateResponse getErrorResponse(int throttleTimeMs, Throwable e) {
List<WriteShareGroupStateResponseData.WriteStateResult> results = new ArrayList<>();
data.topics().forEach(
topicResult -> results.add(new WriteShareGroupStateResponseData.WriteStateResult()
.setTopicId(topicResult.topicId())
.setPartitions(topicResult.partitions().stream()
.map(partitionData -> new WriteShareGroupStateResponseData.PartitionResult()
.setPartition(partitionData.partition())
.setErrorCode(Errors.forException(e).code())
.setErrorMessage(Errors.forException(e).message()))
.collect(Collectors.toList()))));
return new WriteShareGroupStateResponse(new WriteShareGroupStateResponseData()
.setResults(results));
}
@Override
public WriteShareGroupStateRequestData data() {
return data;
}
public static WriteShareGroupStateRequest parse(ByteBuffer buffer, short version) {
return new WriteShareGroupStateRequest(
new WriteShareGroupStateRequestData(new ByteBufferAccessor(buffer), version),
version
);
}
}

View File

@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.WriteShareGroupStateResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class WriteShareGroupStateResponse extends AbstractResponse {
private final WriteShareGroupStateResponseData data;
public WriteShareGroupStateResponse(WriteShareGroupStateResponseData data) {
super(ApiKeys.WRITE_SHARE_GROUP_STATE);
this.data = data;
}
@Override
public WriteShareGroupStateResponseData data() {
return data;
}
@Override
public Map<Errors, Integer> errorCounts() {
Map<Errors, Integer> counts = new HashMap<>();
data.results().forEach(
result -> result.partitions().forEach(
partitionResult -> updateErrorCounts(counts, Errors.forCode(partitionResult.errorCode()))
)
);
return counts;
}
@Override
public int throttleTimeMs() {
return DEFAULT_THROTTLE_TIME;
}
@Override
public void maybeSetThrottleTimeMs(int throttleTimeMs) {
// No op
}
public static WriteShareGroupStateResponse parse(ByteBuffer buffer, short version) {
return new WriteShareGroupStateResponse(
new WriteShareGroupStateResponseData(new ByteBufferAccessor(buffer), version)
);
}
public static WriteShareGroupStateResponseData toErrorResponseData(Uuid topicId, int partitionId, Errors error, String errorMessage) {
WriteShareGroupStateResponseData responseData = new WriteShareGroupStateResponseData();
responseData.setResults(Collections.singletonList(new WriteShareGroupStateResponseData.WriteStateResult()
.setTopicId(topicId)
.setPartitions(Collections.singletonList(new WriteShareGroupStateResponseData.PartitionResult()
.setPartition(partitionId)
.setErrorCode(error.code())
.setErrorMessage(errorMessage)))));
return responseData;
}
public static WriteShareGroupStateResponseData.PartitionResult toErrorResponsePartitionResult(int partitionId, Errors error, String errorMessage) {
return new WriteShareGroupStateResponseData.PartitionResult()
.setPartition(partitionId)
.setErrorCode(error.code())
.setErrorMessage(errorMessage);
}
public static WriteShareGroupStateResponseData.WriteStateResult toResponseWriteStateResult(Uuid topicId, List<WriteShareGroupStateResponseData.PartitionResult> partitionResults) {
return new WriteShareGroupStateResponseData.WriteStateResult()
.setTopicId(topicId)
.setPartitions(partitionResults);
}
}

View File

@ -0,0 +1,38 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
{
"apiKey": 86,
"type": "request",
"listeners": ["broker"],
"name": "DeleteShareGroupStateRequest",
"validVersions": "0",
"flexibleVersions": "0+",
"latestVersionUnstable": true,
"fields": [
{ "name": "GroupId", "type": "string", "versions": "0+",
"about":"The group identifier." },
{ "name": "Topics", "type": "[]DeleteStateData", "versions": "0+",
"about": "The data for the topics.", "fields": [
{ "name": "TopicId", "type": "uuid", "versions": "0+",
"about": "The topic identifier." },
{ "name": "Partitions", "type": "[]PartitionData", "versions": "0+",
"about": "The data for the partitions.", "fields": [
{ "name": "Partition", "type": "int32", "versions": "0+",
"about": "The partition index." }
]}
]}
]
}

View File

@ -0,0 +1,45 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
{
"apiKey": 86,
"type": "response",
"name": "DeleteShareGroupStateResponse",
"validVersions": "0",
"flexibleVersions": "0+",
// - NOT_COORDINATOR (version 0+)
// - COORDINATOR_NOT_AVAILABLE (version 0+)
// - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
// - GROUP_ID_NOT_FOUND (version 0+)
// - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
// - FENCED_STATE_EPOCH (version 0+)
// - INVALID_REQUEST (version 0+)
"fields": [
{ "name": "Results", "type": "[]DeleteStateResult", "versions": "0+",
"about": "The delete results", "fields": [
{ "name": "TopicId", "type": "uuid", "versions": "0+",
"about": "The topic identifier" },
{ "name": "Partitions", "type": "[]PartitionResult", "versions": "0+",
"about" : "The results for the partitions.", "fields": [
{ "name": "Partition", "type": "int32", "versions": "0+",
"about": "The partition index." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The error code, or 0 if there was no error." },
{ "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "The error message, or null if there was no error." }
]}
]}
]
}

View File

@ -0,0 +1,42 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
{
"apiKey": 83,
"type": "request",
"listeners": ["broker"],
"name": "InitializeShareGroupStateRequest",
"validVersions": "0",
"flexibleVersions": "0+",
"latestVersionUnstable": true,
"fields": [
{ "name": "GroupId", "type": "string", "versions": "0+",
"about": "The group identifier." },
{ "name": "Topics", "type": "[]InitializeStateData", "versions": "0+",
"about": "The data for the topics.", "fields": [
{ "name": "TopicId", "type": "uuid", "versions": "0+",
"about": "The topic identifier." },
{ "name": "Partitions", "type": "[]PartitionData", "versions": "0+",
"about": "The data for the partitions.", "fields": [
{ "name": "Partition", "type": "int32", "versions": "0+",
"about": "The partition index." },
{ "name": "StateEpoch", "type": "int32", "versions": "0+",
"about": "The state epoch for this share-partition." },
{ "name": "StartOffset", "type": "int64", "versions": "0+",
"about": "The share-partition start offset, or -1 if the start offset is not being initialized." }
]}
]}
]
}

View File

@ -0,0 +1,43 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
{
"apiKey": 83,
"type": "response",
"name": "InitializeShareGroupStateResponse",
"validVersions": "0",
"flexibleVersions": "0+",
// - NOT_COORDINATOR (version 0+)
// - COORDINATOR_NOT_AVAILABLE (version 0+)
// - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
// - FENCED_STATE_EPOCH (version 0+)
// - INVALID_REQUEST (version 0+)
"fields": [
{ "name": "Results", "type": "[]InitializeStateResult", "versions": "0+",
"about": "The initialization results", "fields": [
{ "name": "TopicId", "type": "uuid", "versions": "0+",
"about": "The topic identifier" },
{ "name": "Partitions", "type": "[]PartitionResult", "versions": "0+",
"about" : "The results for the partitions.", "fields": [
{ "name": "Partition", "type": "int32", "versions": "0+",
"about": "The partition index." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The error code, or 0 if there was no error." },
{ "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "The error message, or null if there was no error." }
]}
]}
]
}

View File

@ -0,0 +1,40 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
{
"apiKey": 84,
"type": "request",
"listeners": ["broker"],
"name": "ReadShareGroupStateRequest",
"validVersions": "0",
"flexibleVersions": "0+",
"latestVersionUnstable": true,
"fields": [
{ "name": "GroupId", "type": "string", "versions": "0+",
"about":"The group identifier." },
{ "name": "Topics", "type": "[]ReadStateData", "versions": "0+",
"about": "The data for the topics.", "fields": [
{ "name": "TopicId", "type": "uuid", "versions": "0+",
"about": "The topic identifier." },
{ "name": "Partitions", "type": "[]PartitionData", "versions": "0+",
"about": "The data for the partitions.", "fields": [
{ "name": "Partition", "type": "int32", "versions": "0+",
"about": "The partition index." },
{ "name": "LeaderEpoch", "type": "int32", "versions": "0+",
"about": "The leader epoch of the share-partition." }
]}
]}
]
}

View File

@ -0,0 +1,58 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
{
"apiKey": 84,
"type": "response",
"name": "ReadShareGroupStateResponse",
"validVersions": "0",
"flexibleVersions": "0+",
// - NOT_COORDINATOR (version 0+)
// - COORDINATOR_NOT_AVAILABLE (version 0+)
// - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
// - GROUP_ID_NOT_FOUND (version 0+)
// - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
// - INVALID_REQUEST (version 0+)
"fields": [
{ "name": "Results", "type": "[]ReadStateResult", "versions": "0+",
"about": "The read results", "fields": [
{ "name": "TopicId", "type": "uuid", "versions": "0+",
"about": "The topic identifier" },
{ "name": "Partitions", "type": "[]PartitionResult", "versions": "0+",
"about" : "The results for the partitions.", "fields": [
{ "name": "Partition", "type": "int32", "versions": "0+",
"about": "The partition index." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The error code, or 0 if there was no error." },
{ "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "The error message, or null if there was no error." },
{ "name": "StateEpoch", "type": "int32", "versions": "0+",
"about": "The state epoch for this share-partition." },
{ "name": "StartOffset", "type": "int64", "versions": "0+",
"about": "The share-partition start offset, which can be -1 if it is not yet initialized." },
{ "name": "StateBatches", "type": "[]StateBatch", "versions": "0+", "fields":[
{ "name": "FirstOffset", "type": "int64", "versions": "0+",
"about": "The base offset of this state batch." },
{ "name": "LastOffset", "type": "int64", "versions": "0+",
"about": "The last offset of this state batch." },
{ "name": "DeliveryState", "type": "int8", "versions": "0+",
"about": "The state - 0:Available,2:Acked,4:Archived." },
{ "name": "DeliveryCount", "type": "int16", "versions": "0+",
"about": "The delivery count." }
]}
]}
]}
]
}

View File

@ -0,0 +1,40 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
{
"apiKey": 87,
"type": "request",
"listeners": ["broker"],
"name": "ReadShareGroupStateSummaryRequest",
"validVersions": "0",
"flexibleVersions": "0+",
"latestVersionUnstable": true,
"fields": [
{ "name": "GroupId", "type": "string", "versions": "0+",
"about":"The group identifier." },
{ "name": "Topics", "type": "[]ReadStateSummaryData", "versions": "0+",
"about": "The data for the topics.", "fields": [
{ "name": "TopicId", "type": "uuid", "versions": "0+",
"about": "The topic identifier." },
{ "name": "Partitions", "type": "[]PartitionData", "versions": "0+",
"about": "The data for the partitions.", "fields": [
{ "name": "Partition", "type": "int32", "versions": "0+",
"about": "The partition index." },
{ "name": "LeaderEpoch", "type": "int32", "versions": "0+",
"about": "The leader epoch of the share-partition." }
]}
]}
]
}

View File

@ -0,0 +1,49 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
{
"apiKey": 87,
"type": "response",
"name": "ReadShareGroupStateSummaryResponse",
"validVersions": "0",
"flexibleVersions": "0+",
// - NOT_COORDINATOR (version 0+)
// - COORDINATOR_NOT_AVAILABLE (version 0+)
// - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
// - GROUP_ID_NOT_FOUND (version 0+)
// - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
// - FENCED_LEADER_EPOCH (version 0+)
// - INVALID_REQUEST (version 0+)
"fields": [
{ "name": "Results", "type": "[]ReadStateSummaryResult", "versions": "0+",
"about": "The read results", "fields": [
{ "name": "TopicId", "type": "uuid", "versions": "0+",
"about": "The topic identifier" },
{ "name": "Partitions", "type": "[]PartitionResult", "versions": "0+",
"about" : "The results for the partitions.", "fields": [
{ "name": "Partition", "type": "int32", "versions": "0+",
"about": "The partition index." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The error code, or 0 if there was no error." },
{ "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "The error message, or null if there was no error." },
{ "name": "StateEpoch", "type": "int32", "versions": "0+",
"about": "The state epoch of the share-partition." },
{ "name": "StartOffset", "type": "int64", "versions": "0+",
"about": "The share-partition start offset." }
]}
]}
]
}

View File

@ -0,0 +1,54 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
{
"apiKey": 85,
"type": "request",
"listeners": ["broker"],
"name": "WriteShareGroupStateRequest",
"validVersions": "0",
"flexibleVersions": "0+",
"latestVersionUnstable": true,
"fields": [
{ "name": "GroupId", "type": "string", "versions": "0+",
"about":"The group identifier." },
{ "name": "Topics", "type": "[]WriteStateData", "versions": "0+",
"about": "The data for the topics.", "fields": [
{ "name": "TopicId", "type": "uuid", "versions": "0+",
"about": "The topic identifier." },
{ "name": "Partitions", "type": "[]PartitionData", "versions": "0+",
"about": "The data for the partitions.", "fields": [
{ "name": "Partition", "type": "int32", "versions": "0+",
"about": "The partition index." },
{ "name": "StateEpoch", "type": "int32", "versions": "0+",
"about": "The state epoch for this share-partition." },
{ "name": "LeaderEpoch", "type": "int32", "versions": "0+",
"about": "The leader epoch of the share-partition." },
{ "name": "StartOffset", "type": "int64", "versions": "0+",
"about": "The share-partition start offset, or -1 if the start offset is not being written." },
{ "name": "StateBatches", "type": "[]StateBatch", "versions": "0+", "fields": [
{ "name": "FirstOffset", "type": "int64", "versions": "0+",
"about": "The base offset of this state batch." },
{ "name": "LastOffset", "type": "int64", "versions": "0+",
"about": "The last offset of this state batch." },
{ "name": "DeliveryState", "type": "int8", "versions": "0+",
"about": "The state - 0:Available,2:Acked,4:Archived" },
{ "name": "DeliveryCount", "type": "int16", "versions": "0+",
"about": "The delivery count." }
]}
]}
]}
]
}

View File

@ -0,0 +1,45 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
{
"apiKey": 85,
"type": "response",
"name": "WriteShareGroupStateResponse",
"validVersions": "0",
"flexibleVersions": "0+",
// - NOT_COORDINATOR (version 0+)
// - COORDINATOR_NOT_AVAILABLE (version 0+)
// - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
// - GROUP_ID_NOT_FOUND (version 0+)
// - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
// - FENCED_STATE_EPOCH (version 0+)
// - INVALID_REQUEST (version 0+)
"fields": [
{ "name": "Results", "type": "[]WriteStateResult", "versions": "0+",
"about": "The write results", "fields": [
{ "name": "TopicId", "type": "uuid", "versions": "0+",
"about": "The topic identifier" },
{ "name": "Partitions", "type": "[]PartitionResult", "versions": "0+",
"about" : "The results for the partitions.", "fields": [
{ "name": "Partition", "type": "int32", "versions": "0+",
"about": "The partition index." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The error code, or 0 if there was no error." },
{ "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
"about": "The error message, or null if there was no error." }
]}
]}
]
}

View File

@ -110,6 +110,8 @@ import org.apache.kafka.common.message.DeleteGroupsResponseData.DeletableGroupRe
import org.apache.kafka.common.message.DeleteGroupsResponseData.DeletableGroupResultCollection;
import org.apache.kafka.common.message.DeleteRecordsRequestData;
import org.apache.kafka.common.message.DeleteRecordsResponseData;
import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
import org.apache.kafka.common.message.DeleteShareGroupStateResponseData;
import org.apache.kafka.common.message.DeleteTopicsRequestData;
import org.apache.kafka.common.message.DeleteTopicsResponseData;
import org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResult;
@ -163,6 +165,8 @@ import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterC
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterableConfig;
import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData;
import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResponse;
import org.apache.kafka.common.message.InitializeShareGroupStateRequestData;
import org.apache.kafka.common.message.InitializeShareGroupStateResponseData;
import org.apache.kafka.common.message.InitProducerIdRequestData;
import org.apache.kafka.common.message.InitProducerIdResponseData;
import org.apache.kafka.common.message.JoinGroupRequestData;
@ -207,6 +211,10 @@ import org.apache.kafka.common.message.ProduceRequestData;
import org.apache.kafka.common.message.ProduceResponseData;
import org.apache.kafka.common.message.PushTelemetryRequestData;
import org.apache.kafka.common.message.PushTelemetryResponseData;
import org.apache.kafka.common.message.ReadShareGroupStateRequestData;
import org.apache.kafka.common.message.ReadShareGroupStateResponseData;
import org.apache.kafka.common.message.ReadShareGroupStateSummaryRequestData;
import org.apache.kafka.common.message.ReadShareGroupStateSummaryResponseData;
import org.apache.kafka.common.message.RemoveRaftVoterRequestData;
import org.apache.kafka.common.message.RemoveRaftVoterResponseData;
import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
@ -241,6 +249,8 @@ import org.apache.kafka.common.message.UpdateRaftVoterRequestData;
import org.apache.kafka.common.message.UpdateRaftVoterResponseData;
import org.apache.kafka.common.message.VoteRequestData;
import org.apache.kafka.common.message.VoteResponseData;
import org.apache.kafka.common.message.WriteShareGroupStateRequestData;
import org.apache.kafka.common.message.WriteShareGroupStateResponseData;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.network.Send;
import org.apache.kafka.common.protocol.ApiKeys;
@ -1107,6 +1117,11 @@ public class RequestResponseTest {
case ADD_RAFT_VOTER: return createAddRaftVoterRequest(version);
case REMOVE_RAFT_VOTER: return createRemoveRaftVoterRequest(version);
case UPDATE_RAFT_VOTER: return createUpdateRaftVoterRequest(version);
case INITIALIZE_SHARE_GROUP_STATE: return createInitializeShareGroupStateRequest(version);
case READ_SHARE_GROUP_STATE: return createReadShareGroupStateRequest(version);
case WRITE_SHARE_GROUP_STATE: return createWriteShareGroupStateRequest(version);
case DELETE_SHARE_GROUP_STATE: return createDeleteShareGroupStateRequest(version);
case READ_SHARE_GROUP_STATE_SUMMARY: return createReadShareGroupStateSummaryRequest(version);
default: throw new IllegalArgumentException("Unknown API key " + apikey);
}
}
@ -1196,6 +1211,11 @@ public class RequestResponseTest {
case ADD_RAFT_VOTER: return createAddRaftVoterResponse();
case REMOVE_RAFT_VOTER: return createRemoveRaftVoterResponse();
case UPDATE_RAFT_VOTER: return createUpdateRaftVoterResponse();
case INITIALIZE_SHARE_GROUP_STATE: return createInitializeShareGroupStateResponse();
case READ_SHARE_GROUP_STATE: return createReadShareGroupStateResponse();
case WRITE_SHARE_GROUP_STATE: return createWriteShareGroupStateResponse();
case DELETE_SHARE_GROUP_STATE: return createDeleteShareGroupStateResponse();
case READ_SHARE_GROUP_STATE_SUMMARY: return createReadShareGroupStateSummaryResponse();
default: throw new IllegalArgumentException("Unknown API key " + apikey);
}
}
@ -3866,6 +3886,124 @@ public class RequestResponseTest {
return new ListClientMetricsResourcesResponse(response);
}
private InitializeShareGroupStateRequest createInitializeShareGroupStateRequest(short version) {
InitializeShareGroupStateRequestData data = new InitializeShareGroupStateRequestData()
.setGroupId("group")
.setTopics(Collections.singletonList(new InitializeShareGroupStateRequestData.InitializeStateData()
.setTopicId(Uuid.randomUuid())
.setPartitions(Collections.singletonList(new InitializeShareGroupStateRequestData.PartitionData()
.setPartition(0)
.setStateEpoch(0)
.setStartOffset(0)))));
return new InitializeShareGroupStateRequest.Builder(data).build(version);
}
private InitializeShareGroupStateResponse createInitializeShareGroupStateResponse() {
InitializeShareGroupStateResponseData data = new InitializeShareGroupStateResponseData();
data.setResults(Collections.singletonList(new InitializeShareGroupStateResponseData.InitializeStateResult()
.setTopicId(Uuid.randomUuid())
.setPartitions(Collections.singletonList(new InitializeShareGroupStateResponseData.PartitionResult()
.setPartition(0)
.setErrorCode(Errors.NONE.code())))));
return new InitializeShareGroupStateResponse(data);
}
private ReadShareGroupStateRequest createReadShareGroupStateRequest(short version) {
ReadShareGroupStateRequestData data = new ReadShareGroupStateRequestData()
.setGroupId("group")
.setTopics(Collections.singletonList(new ReadShareGroupStateRequestData.ReadStateData()
.setTopicId(Uuid.randomUuid())
.setPartitions(Collections.singletonList(new ReadShareGroupStateRequestData.PartitionData()
.setPartition(0)))));
return new ReadShareGroupStateRequest.Builder(data).build(version);
}
private ReadShareGroupStateResponse createReadShareGroupStateResponse() {
ReadShareGroupStateResponseData data = new ReadShareGroupStateResponseData()
.setResults(Collections.singletonList(new ReadShareGroupStateResponseData.ReadStateResult()
.setTopicId(Uuid.randomUuid())
.setPartitions(Collections.singletonList(new ReadShareGroupStateResponseData.PartitionResult()
.setPartition(0)
.setErrorCode(Errors.NONE.code())
.setStateEpoch(0)
.setStartOffset(0)
.setStateBatches(Collections.singletonList(new ReadShareGroupStateResponseData.StateBatch()
.setFirstOffset(0)
.setLastOffset(0)
.setDeliveryState((byte) 0x0)
.setDeliveryCount((short) 0)))))));
return new ReadShareGroupStateResponse(data);
}
private WriteShareGroupStateRequest createWriteShareGroupStateRequest(short version) {
WriteShareGroupStateRequestData data = new WriteShareGroupStateRequestData()
.setGroupId("group")
.setTopics(Collections.singletonList(new WriteShareGroupStateRequestData.WriteStateData()
.setTopicId(Uuid.randomUuid())
.setPartitions(Collections.singletonList(new WriteShareGroupStateRequestData.PartitionData()
.setPartition(0)
.setStateEpoch(0)
.setStartOffset(0)
.setStateBatches(singletonList(new WriteShareGroupStateRequestData.StateBatch()
.setFirstOffset(0)
.setLastOffset(0)
.setDeliveryState((byte) 0x0)
.setDeliveryCount((short) 0)))))));
return new WriteShareGroupStateRequest.Builder(data).build(version);
}
private WriteShareGroupStateResponse createWriteShareGroupStateResponse() {
WriteShareGroupStateResponseData data = new WriteShareGroupStateResponseData()
.setResults(Collections.singletonList(new WriteShareGroupStateResponseData.WriteStateResult()
.setTopicId(Uuid.randomUuid())
.setPartitions(Collections.singletonList(new WriteShareGroupStateResponseData.PartitionResult()
.setPartition(0)
.setErrorCode(Errors.NONE.code())))));
return new WriteShareGroupStateResponse(data);
}
private DeleteShareGroupStateRequest createDeleteShareGroupStateRequest(short version) {
DeleteShareGroupStateRequestData data = new DeleteShareGroupStateRequestData()
.setGroupId("group")
.setTopics(Collections.singletonList(new DeleteShareGroupStateRequestData.DeleteStateData()
.setTopicId(Uuid.randomUuid())
.setPartitions(Collections.singletonList(new DeleteShareGroupStateRequestData.PartitionData()
.setPartition(0)))));
return new DeleteShareGroupStateRequest.Builder(data).build(version);
}
private DeleteShareGroupStateResponse createDeleteShareGroupStateResponse() {
DeleteShareGroupStateResponseData data = new DeleteShareGroupStateResponseData()
.setResults(Collections.singletonList(new DeleteShareGroupStateResponseData.DeleteStateResult()
.setTopicId(Uuid.randomUuid())
.setPartitions(Collections.singletonList(new DeleteShareGroupStateResponseData.PartitionResult()
.setPartition(0)
.setErrorCode(Errors.NONE.code())))));
return new DeleteShareGroupStateResponse(data);
}
private ReadShareGroupStateSummaryRequest createReadShareGroupStateSummaryRequest(short version) {
ReadShareGroupStateSummaryRequestData data = new ReadShareGroupStateSummaryRequestData()
.setGroupId("group")
.setTopics(Collections.singletonList(new ReadShareGroupStateSummaryRequestData.ReadStateSummaryData()
.setTopicId(Uuid.randomUuid())
.setPartitions(Collections.singletonList(new ReadShareGroupStateSummaryRequestData.PartitionData()
.setPartition(0)))));
return new ReadShareGroupStateSummaryRequest.Builder(data).build(version);
}
private ReadShareGroupStateSummaryResponse createReadShareGroupStateSummaryResponse() {
ReadShareGroupStateSummaryResponseData data = new ReadShareGroupStateSummaryResponseData()
.setResults(Collections.singletonList(new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult()
.setTopicId(Uuid.randomUuid())
.setPartitions(Collections.singletonList(new ReadShareGroupStateSummaryResponseData.PartitionResult()
.setPartition(0)
.setErrorCode(Errors.NONE.code())
.setStartOffset(0)
.setStateEpoch(0)))));
return new ReadShareGroupStateSummaryResponse(data);
}
@Test
public void testInvalidSaslHandShakeRequest() {
AbstractRequest request = new SaslHandshakeRequest.Builder(

View File

@ -52,6 +52,7 @@ object RequestConvertToJson {
case req: DeleteAclsRequest => DeleteAclsRequestDataJsonConverter.write(req.data, request.version)
case req: DeleteGroupsRequest => DeleteGroupsRequestDataJsonConverter.write(req.data, request.version)
case req: DeleteRecordsRequest => DeleteRecordsRequestDataJsonConverter.write(req.data, request.version)
case req: DeleteShareGroupStateRequest => DeleteShareGroupStateRequestDataJsonConverter.write(req.data, request.version)
case req: DeleteTopicsRequest => DeleteTopicsRequestDataJsonConverter.write(req.data, request.version)
case req: DescribeAclsRequest => DescribeAclsRequestDataJsonConverter.write(req.data, request.version)
case req: DescribeClientQuotasRequest => DescribeClientQuotasRequestDataJsonConverter.write(req.data, request.version)
@ -76,6 +77,7 @@ object RequestConvertToJson {
case req: GetTelemetrySubscriptionsRequest => GetTelemetrySubscriptionsRequestDataJsonConverter.write(req.data, request.version)
case req: HeartbeatRequest => HeartbeatRequestDataJsonConverter.write(req.data, request.version)
case req: IncrementalAlterConfigsRequest => IncrementalAlterConfigsRequestDataJsonConverter.write(req.data, request.version)
case req: InitializeShareGroupStateRequest => InitializeShareGroupStateRequestDataJsonConverter.write(req.data, request.version)
case req: InitProducerIdRequest => InitProducerIdRequestDataJsonConverter.write(req.data, request.version)
case req: JoinGroupRequest => JoinGroupRequestDataJsonConverter.write(req.data, request.version)
case req: LeaderAndIsrRequest => LeaderAndIsrRequestDataJsonConverter.write(req.data, request.version)
@ -92,6 +94,8 @@ object RequestConvertToJson {
case req: OffsetsForLeaderEpochRequest => OffsetForLeaderEpochRequestDataJsonConverter.write(req.data, request.version)
case req: ProduceRequest => ProduceRequestDataJsonConverter.write(req.data, request.version, false)
case req: PushTelemetryRequest => PushTelemetryRequestDataJsonConverter.write(req.data, request.version)
case req: ReadShareGroupStateRequest => ReadShareGroupStateRequestDataJsonConverter.write(req.data, request.version)
case req: ReadShareGroupStateSummaryRequest => ReadShareGroupStateSummaryRequestDataJsonConverter.write(req.data, request.version)
case req: RenewDelegationTokenRequest => RenewDelegationTokenRequestDataJsonConverter.write(req.data, request.version)
case req: SaslAuthenticateRequest => SaslAuthenticateRequestDataJsonConverter.write(req.data, request.version)
case req: SaslHandshakeRequest => SaslHandshakeRequestDataJsonConverter.write(req.data, request.version)
@ -106,6 +110,7 @@ object RequestConvertToJson {
case req: UpdateFeaturesRequest => UpdateFeaturesRequestDataJsonConverter.write(req.data, request.version)
case req: UpdateMetadataRequest => UpdateMetadataRequestDataJsonConverter.write(req.data, request.version)
case req: VoteRequest => VoteRequestDataJsonConverter.write(req.data, request.version)
case req: WriteShareGroupStateRequest => WriteShareGroupStateRequestDataJsonConverter.write(req.data, request.version)
case req: WriteTxnMarkersRequest => WriteTxnMarkersRequestDataJsonConverter.write(req.data, request.version)
case req: AddRaftVoterRequest => AddRaftVoterRequestDataJsonConverter.write(req.data, request.version)
case req: RemoveRaftVoterRequest => RemoveRaftVoterRequestDataJsonConverter.write(req.data, request.version)
@ -142,6 +147,7 @@ object RequestConvertToJson {
case res: DeleteAclsResponse => DeleteAclsResponseDataJsonConverter.write(res.data, version)
case res: DeleteGroupsResponse => DeleteGroupsResponseDataJsonConverter.write(res.data, version)
case res: DeleteRecordsResponse => DeleteRecordsResponseDataJsonConverter.write(res.data, version)
case res: DeleteShareGroupStateResponse => DeleteShareGroupStateResponseDataJsonConverter.write(res.data, version)
case res: DeleteTopicsResponse => DeleteTopicsResponseDataJsonConverter.write(res.data, version)
case res: DescribeAclsResponse => DescribeAclsResponseDataJsonConverter.write(res.data, version)
case res: DescribeClientQuotasResponse => DescribeClientQuotasResponseDataJsonConverter.write(res.data, version)
@ -166,6 +172,7 @@ object RequestConvertToJson {
case res: GetTelemetrySubscriptionsResponse => GetTelemetrySubscriptionsResponseDataJsonConverter.write(res.data, version)
case res: HeartbeatResponse => HeartbeatResponseDataJsonConverter.write(res.data, version)
case res: IncrementalAlterConfigsResponse => IncrementalAlterConfigsResponseDataJsonConverter.write(res.data, version)
case res: InitializeShareGroupStateResponse => InitializeShareGroupStateResponseDataJsonConverter.write(res.data, version)
case res: InitProducerIdResponse => InitProducerIdResponseDataJsonConverter.write(res.data, version)
case res: JoinGroupResponse => JoinGroupResponseDataJsonConverter.write(res.data, version)
case res: LeaderAndIsrResponse => LeaderAndIsrResponseDataJsonConverter.write(res.data, version)
@ -182,6 +189,8 @@ object RequestConvertToJson {
case res: OffsetsForLeaderEpochResponse => OffsetForLeaderEpochResponseDataJsonConverter.write(res.data, version)
case res: ProduceResponse => ProduceResponseDataJsonConverter.write(res.data, version)
case res: PushTelemetryResponse => PushTelemetryResponseDataJsonConverter.write(res.data, version)
case res: ReadShareGroupStateResponse => ReadShareGroupStateResponseDataJsonConverter.write(res.data, version)
case res: ReadShareGroupStateSummaryResponse => ReadShareGroupStateSummaryResponseDataJsonConverter.write(res.data, version)
case res: RenewDelegationTokenResponse => RenewDelegationTokenResponseDataJsonConverter.write(res.data, version)
case res: SaslAuthenticateResponse => SaslAuthenticateResponseDataJsonConverter.write(res.data, version)
case res: SaslHandshakeResponse => SaslHandshakeResponseDataJsonConverter.write(res.data, version)
@ -196,6 +205,7 @@ object RequestConvertToJson {
case res: UpdateFeaturesResponse => UpdateFeaturesResponseDataJsonConverter.write(res.data, version)
case res: UpdateMetadataResponse => UpdateMetadataResponseDataJsonConverter.write(res.data, version)
case res: VoteResponse => VoteResponseDataJsonConverter.write(res.data, version)
case res: WriteShareGroupStateResponse => WriteShareGroupStateResponseDataJsonConverter.write(res.data, version)
case res: WriteTxnMarkersResponse => WriteTxnMarkersResponseDataJsonConverter.write(res.data, version)
case res: AddRaftVoterResponse => AddRaftVoterResponseDataJsonConverter.write(res.data, version)
case res: RemoveRaftVoterResponse => RemoveRaftVoterResponseDataJsonConverter.write(res.data, version)

View File

@ -260,6 +260,11 @@ class KafkaApis(val requestChannel: RequestChannel,
case ApiKeys.REMOVE_RAFT_VOTER => forwardToControllerOrFail(request)
case ApiKeys.SHARE_FETCH => handleShareFetchRequest(request)
case ApiKeys.SHARE_ACKNOWLEDGE => handleShareAcknowledgeRequest(request)
case ApiKeys.INITIALIZE_SHARE_GROUP_STATE => handleInitializeShareGroupStateRequest(request)
case ApiKeys.READ_SHARE_GROUP_STATE => handleReadShareGroupStateRequest(request)
case ApiKeys.WRITE_SHARE_GROUP_STATE => handleWriteShareGroupStateRequest(request)
case ApiKeys.DELETE_SHARE_GROUP_STATE => handleDeleteShareGroupStateRequest(request)
case ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY => handleReadShareGroupStateSummaryRequest(request)
case _ => throw new IllegalStateException(s"No handler for request api key ${request.header.apiKey}")
}
} catch {
@ -3957,6 +3962,41 @@ class KafkaApis(val requestChannel: RequestChannel,
CompletableFuture.completedFuture[Unit](())
}
def handleInitializeShareGroupStateRequest(request: RequestChannel.Request): Unit = {
val initializeShareGroupStateRequest = request.body[InitializeShareGroupStateRequest]
// TODO: Implement the InitializeShareGroupStateRequest handling
requestHelper.sendMaybeThrottle(request, initializeShareGroupStateRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
CompletableFuture.completedFuture[Unit](())
}
def handleReadShareGroupStateRequest(request: RequestChannel.Request): Unit = {
val readShareGroupStateRequest = request.body[ReadShareGroupStateRequest]
// TODO: Implement the ReadShareGroupStateRequest handling
requestHelper.sendMaybeThrottle(request, readShareGroupStateRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
CompletableFuture.completedFuture[Unit](())
}
def handleWriteShareGroupStateRequest(request: RequestChannel.Request): Unit = {
val writeShareGroupStateRequest = request.body[WriteShareGroupStateRequest]
// TODO: Implement the WriteShareGroupStateRequest handling
requestHelper.sendMaybeThrottle(request, writeShareGroupStateRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
CompletableFuture.completedFuture[Unit](())
}
def handleDeleteShareGroupStateRequest(request: RequestChannel.Request): Unit = {
val deleteShareGroupStateRequest = request.body[DeleteShareGroupStateRequest]
// TODO: Implement the DeleteShareGroupStateRequest handling
requestHelper.sendMaybeThrottle(request, deleteShareGroupStateRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
CompletableFuture.completedFuture[Unit](())
}
def handleReadShareGroupStateSummaryRequest(request: RequestChannel.Request): Unit = {
val readShareGroupStateSummaryRequest = request.body[ReadShareGroupStateSummaryRequest]
// TODO: Implement the ReadShareGroupStateSummaryRequest handling
requestHelper.sendMaybeThrottle(request, readShareGroupStateSummaryRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
CompletableFuture.completedFuture[Unit](())
}
private def updateRecordConversionStats(request: RequestChannel.Request,
tp: TopicPartition,
conversionStats: RecordValidationStats): Unit = {

View File

@ -180,7 +180,7 @@ class RequestQuotaTest extends BaseRequestTest {
@ValueSource(strings = Array("zk", "kraft"))
def testExemptRequestTime(quorum: String): Unit = {
// Exclude `DESCRIBE_QUORUM`, maybe it shouldn't be a cluster action
val actions = clusterActions -- clusterActionsWithThrottleForBroker -- RequestQuotaTest.Envelope - ApiKeys.DESCRIBE_QUORUM
val actions = clusterActions -- clusterActionsWithThrottleForBroker -- RequestQuotaTest.Envelope -- RequestQuotaTest.ShareGroupState - ApiKeys.DESCRIBE_QUORUM
for (apiKey <- actions) {
submitTest(apiKey, () => checkExemptRequestMetric(apiKey))
}
@ -739,6 +739,21 @@ class RequestQuotaTest extends BaseRequestTest {
case ApiKeys.UPDATE_RAFT_VOTER =>
new UpdateRaftVoterRequest.Builder(new UpdateRaftVoterRequestData())
case ApiKeys.INITIALIZE_SHARE_GROUP_STATE =>
new InitializeShareGroupStateRequest.Builder(new InitializeShareGroupStateRequestData(), true)
case ApiKeys.READ_SHARE_GROUP_STATE =>
new ReadShareGroupStateRequest.Builder(new ReadShareGroupStateRequestData(), true)
case ApiKeys.WRITE_SHARE_GROUP_STATE =>
new WriteShareGroupStateRequest.Builder(new WriteShareGroupStateRequestData(), true)
case ApiKeys.DELETE_SHARE_GROUP_STATE =>
new DeleteShareGroupStateRequest.Builder(new DeleteShareGroupStateRequestData(), true)
case ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY =>
new ReadShareGroupStateSummaryRequest.Builder(new ReadShareGroupStateSummaryRequestData(), true)
case _ =>
throw new IllegalArgumentException("Unsupported API key " + apiKey)
}
@ -860,6 +875,8 @@ class RequestQuotaTest extends BaseRequestTest {
object RequestQuotaTest {
val SaslActions = Set(ApiKeys.SASL_HANDSHAKE, ApiKeys.SASL_AUTHENTICATE)
val Envelope = Set(ApiKeys.ENVELOPE)
val ShareGroupState = Set(ApiKeys.INITIALIZE_SHARE_GROUP_STATE, ApiKeys.READ_SHARE_GROUP_STATE, ApiKeys.WRITE_SHARE_GROUP_STATE,
ApiKeys.DELETE_SHARE_GROUP_STATE, ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY)
val UnauthorizedPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "Unauthorized")
// Principal used for all client connections. This is modified by tests which

View File

@ -2303,6 +2303,36 @@ bin/kafka-acls.sh --bootstrap-server localhost:9092 --command-config /tmp/adminc
<td>Topic</td>
<td></td>
</tr>
<tr>
<td>INITIALIZE_SHARE_GROUP_STATE (83)</td>
<td>ClusterAction</td>
<td>Cluster</td>
<td></td>
</tr>
<tr>
<td>READ_SHARE_GROUP_STATE (84)</td>
<td>ClusterAction</td>
<td>Cluster</td>
<td></td>
</tr>
<tr>
<td>WRITE_SHARE_GROUP_STATE (85)</td>
<td>ClusterAction</td>
<td>Cluster</td>
<td></td>
</tr>
<tr>
<td>DELETE_SHARE_GROUP_STATE (86)</td>
<td>ClusterAction</td>
<td>Cluster</td>
<td></td>
</tr>
<tr>
<td>READ_SHARE_GROUP_STATE_SUMMARY (87)</td>
<td>ClusterAction</td>
<td>Cluster</td>
<td></td>
</tr>
</tbody>
</table>

View File

@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.group.share;
import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
import java.util.stream.Collectors;
/**
* This class contains the parameters for {@link Persister#deleteState(DeleteShareGroupStateParameters)}.
*/
public class DeleteShareGroupStateParameters implements PersisterParameters {
private final GroupTopicPartitionData<PartitionIdData> groupTopicPartitionData;
private DeleteShareGroupStateParameters(GroupTopicPartitionData<PartitionIdData> groupTopicPartitionData) {
this.groupTopicPartitionData = groupTopicPartitionData;
}
public static DeleteShareGroupStateParameters from(DeleteShareGroupStateRequestData data) {
return new Builder()
.setGroupTopicPartitionData(new GroupTopicPartitionData<>(data.groupId(), data.topics().stream()
.map(deleteStateData -> new TopicData<>(deleteStateData.topicId(), deleteStateData.partitions().stream()
.map(partitionData -> PartitionFactory.newPartitionIdData(partitionData.partition()))
.collect(Collectors.toList())))
.collect(Collectors.toList())))
.build();
}
public GroupTopicPartitionData<PartitionIdData> groupTopicPartitionData() {
return groupTopicPartitionData;
}
public static class Builder {
private GroupTopicPartitionData<PartitionIdData> groupTopicPartitionData;
public Builder setGroupTopicPartitionData(GroupTopicPartitionData<PartitionIdData> groupTopicPartitionData) {
this.groupTopicPartitionData = groupTopicPartitionData;
return this;
}
public DeleteShareGroupStateParameters build() {
return new DeleteShareGroupStateParameters(groupTopicPartitionData);
}
}
}

View File

@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.group.share;
import org.apache.kafka.common.message.DeleteShareGroupStateResponseData;
import java.util.List;
import java.util.stream.Collectors;
/**
* This class contains the result from {@link Persister#deleteState(DeleteShareGroupStateParameters)}.
*/
public class DeleteShareGroupStateResult implements PersisterResult {
private final List<TopicData<PartitionErrorData>> topicsData;
private DeleteShareGroupStateResult(List<TopicData<PartitionErrorData>> topicsData) {
this.topicsData = topicsData;
}
public List<TopicData<PartitionErrorData>> topicsData() {
return topicsData;
}
public static DeleteShareGroupStateResult from(DeleteShareGroupStateResponseData data) {
return new Builder()
.setTopicsData(data.results().stream()
.map(deleteStateResult -> new TopicData<>(deleteStateResult.topicId(), deleteStateResult.partitions().stream()
.map(partitionResult -> PartitionFactory.newPartitionErrorData(partitionResult.partition(), partitionResult.errorCode(), partitionResult.errorMessage()))
.collect(Collectors.toList())))
.collect(Collectors.toList()))
.build();
}
public static class Builder {
private List<TopicData<PartitionErrorData>> topicsData;
public Builder setTopicsData(List<TopicData<PartitionErrorData>> topicsData) {
this.topicsData = topicsData;
return this;
}
public DeleteShareGroupStateResult build() {
return new DeleteShareGroupStateResult(topicsData);
}
}
}

View File

@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.group.share;
import java.util.List;
import java.util.Objects;
/**
* This class is used to contain the data for a group, a topic and its partitions in the interface to {@link Persister}.
*
* @param <P> The type of {@link PartitionInfoData}
*/
public class GroupTopicPartitionData<P extends PartitionInfoData> {
private final String groupId;
private final List<TopicData<P>> topicsData;
public GroupTopicPartitionData(String groupId, List<TopicData<P>> topicsData) {
this.groupId = groupId;
this.topicsData = topicsData;
}
public String groupId() {
return groupId;
}
public List<TopicData<P>> topicsData() {
return topicsData;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
GroupTopicPartitionData<?> that = (GroupTopicPartitionData<?>) o;
return Objects.equals(groupId, that.groupId) && Objects.equals(topicsData, that.topicsData);
}
@Override
public int hashCode() {
return Objects.hash(groupId, topicsData);
}
public static class Builder<P extends PartitionInfoData> {
private String groupId;
private List<TopicData<P>> topicsData;
public Builder<P> setGroupId(String groupId) {
this.groupId = groupId;
return this;
}
public Builder<P> setTopicsData(List<TopicData<P>> topicsData) {
this.topicsData = topicsData;
return this;
}
public Builder<P> setGroupTopicPartition(GroupTopicPartitionData<P> groupTopicPartitionData) {
this.groupId = groupTopicPartitionData.groupId();
this.topicsData = groupTopicPartitionData.topicsData();
return this;
}
public GroupTopicPartitionData<P> build() {
return new GroupTopicPartitionData<P>(this.groupId, this.topicsData);
}
}
@Override
public String toString() {
return "GroupTopicPartitionData(" +
"groupId=" + groupId + ", " +
"topicsData=" + topicsData +
")";
}
}

View File

@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.group.share;
import org.apache.kafka.common.message.InitializeShareGroupStateRequestData;
import java.util.stream.Collectors;
/**
* This class contains the parameters for {@link Persister#initializeState(InitializeShareGroupStateParameters)}.
*/
public class InitializeShareGroupStateParameters implements PersisterParameters {
private final GroupTopicPartitionData<PartitionStateData> groupTopicPartitionData;
private InitializeShareGroupStateParameters(GroupTopicPartitionData<PartitionStateData> groupTopicPartitionData) {
this.groupTopicPartitionData = groupTopicPartitionData;
}
public GroupTopicPartitionData<PartitionStateData> groupTopicPartitionData() {
return groupTopicPartitionData;
}
public static InitializeShareGroupStateParameters from(InitializeShareGroupStateRequestData data) {
return new Builder()
.setGroupTopicPartitionData(new GroupTopicPartitionData<>(data.groupId(), data.topics().stream()
.map(readStateData -> new TopicData<>(readStateData.topicId(),
readStateData.partitions().stream()
.map(partitionData -> PartitionFactory.newPartitionStateData(partitionData.partition(), partitionData.stateEpoch(), partitionData.startOffset()))
.collect(Collectors.toList())))
.collect(Collectors.toList())))
.build();
}
public static class Builder {
private GroupTopicPartitionData<PartitionStateData> groupTopicPartitionData;
public Builder setGroupTopicPartitionData(GroupTopicPartitionData<PartitionStateData> groupTopicPartitionData) {
this.groupTopicPartitionData = groupTopicPartitionData;
return this;
}
public InitializeShareGroupStateParameters build() {
return new InitializeShareGroupStateParameters(this.groupTopicPartitionData);
}
}
}

View File

@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.group.share;
import org.apache.kafka.common.message.InitializeShareGroupStateResponseData;
import java.util.List;
import java.util.stream.Collectors;
/**
* This class contains the result from {@link Persister#initializeState(InitializeShareGroupStateParameters)}.
*/
public class InitializeShareGroupStateResult implements PersisterResult {
private final List<TopicData<PartitionErrorData>> topicsData;
private InitializeShareGroupStateResult(List<TopicData<PartitionErrorData>> topicsData) {
this.topicsData = topicsData;
}
public List<TopicData<PartitionErrorData>> topicsData() {
return topicsData;
}
public static InitializeShareGroupStateResult from(InitializeShareGroupStateResponseData data) {
return new Builder()
.setTopicsData(data.results().stream()
.map(initializeStateResult -> new TopicData<>(initializeStateResult.topicId(),
initializeStateResult.partitions().stream()
.map(partitionResult -> PartitionFactory.newPartitionErrorData(partitionResult.partition(), partitionResult.errorCode(), partitionResult.errorMessage()))
.collect(Collectors.toList())))
.collect(Collectors.toList()))
.build();
}
public static class Builder {
private List<TopicData<PartitionErrorData>> topicsData;
public Builder setTopicsData(List<TopicData<PartitionErrorData>> topicsData) {
this.topicsData = topicsData;
return this;
}
public InitializeShareGroupStateResult build() {
return new InitializeShareGroupStateResult(topicsData);
}
}
}

View File

@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.group.share;
import java.util.List;
/**
* This interface is implemented by classes used to contain the data for a partition with all of its data
* in the interface to {@link Persister}.
*/
public interface PartitionAllData extends PartitionInfoData, PartitionIdData {
int stateEpoch();
long startOffset();
short errorCode();
String errorMessage();
List<PersisterStateBatch> stateBatches();
}

View File

@ -0,0 +1,157 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.group.share;
import java.util.List;
import java.util.Objects;
/**
* This class contains the data for a partition in the interface to {@link Persister}. The various interfaces
* reflect the ways in which a subset of the data can be accessed for different purposes.
*/
public class PartitionData implements
PartitionIdData, PartitionStateData, PartitionErrorData, PartitionStateErrorData,
PartitionStateBatchData, PartitionIdLeaderEpochData, PartitionAllData {
private final int partition;
private final int stateEpoch;
private final long startOffset;
private final short errorCode;
private final String errorMessage;
private final int leaderEpoch;
private final List<PersisterStateBatch> stateBatches;
public PartitionData(int partition, int stateEpoch, long startOffset, short errorCode,
String errorMessage, int leaderEpoch, List<PersisterStateBatch> stateBatches) {
this.partition = partition;
this.stateEpoch = stateEpoch;
this.startOffset = startOffset;
this.errorCode = errorCode;
this.leaderEpoch = leaderEpoch;
this.errorMessage = errorMessage;
this.stateBatches = stateBatches;
}
public int partition() {
return partition;
}
public int stateEpoch() {
return stateEpoch;
}
public long startOffset() {
return startOffset;
}
public short errorCode() {
return errorCode;
}
public String errorMessage() {
return errorMessage;
}
public int leaderEpoch() {
return leaderEpoch;
}
public List<PersisterStateBatch> stateBatches() {
return stateBatches;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
PartitionData that = (PartitionData) o;
return Objects.equals(partition, that.partition) &&
Objects.equals(stateEpoch, that.stateEpoch) &&
Objects.equals(startOffset, that.startOffset) &&
Objects.equals(errorCode, that.errorCode) &&
Objects.equals(errorMessage, that.errorMessage) &&
Objects.equals(leaderEpoch, that.leaderEpoch) &&
Objects.equals(stateBatches, that.stateBatches);
}
@Override
public int hashCode() {
return Objects.hash(partition, stateEpoch, startOffset, errorCode, leaderEpoch, errorMessage, stateBatches);
}
public static class Builder {
private int partition;
private int stateEpoch;
private long startOffset;
private short errorCode;
private String errorMessage;
private int leaderEpoch;
private List<PersisterStateBatch> stateBatches;
public Builder setPartition(int partition) {
this.partition = partition;
return this;
}
public Builder setStateEpoch(int stateEpoch) {
this.stateEpoch = stateEpoch;
return this;
}
public Builder setStartOffset(long startOffset) {
this.startOffset = startOffset;
return this;
}
public Builder setErrorCode(short errorCode) {
this.errorCode = errorCode;
return this;
}
public Builder setErrorMessage(String errorMessage) {
this.errorMessage = errorMessage;
return this;
}
public Builder setLeaderEpoch(int leaderEpoch) {
this.leaderEpoch = leaderEpoch;
return this;
}
public Builder setStateBatches(List<PersisterStateBatch> stateBatches) {
this.stateBatches = stateBatches;
return this;
}
public PartitionData build() {
return new PartitionData(partition, stateEpoch, startOffset, errorCode, errorMessage, leaderEpoch, stateBatches);
}
}
@Override
public String toString() {
return "PartitionData(" +
"partition=" + partition + "," +
"stateEpoch=" + stateEpoch + "," +
"startOffset=" + startOffset + "," +
"errorCode=" + errorCode + "," +
"errorMessage=" + errorMessage + "," +
"leaderEpoch=" + leaderEpoch + "," +
"stateBatches=" + stateBatches +
")";
}
}

View File

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.group.share;
/**
* This interface is implemented by classes used to contain the data for a partition with error data
* in the interface to {@link Persister}.
*/
public interface PartitionErrorData extends PartitionInfoData, PartitionIdData {
short errorCode();
String errorMessage();
}

View File

@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.group.share;
import org.apache.kafka.common.protocol.Errors;
import java.util.List;
/**
* This is a factory class for creating instances of {@link PartitionData}.
*/
public class PartitionFactory {
public static final int DEFAULT_STATE_EPOCH = 0;
public static final int DEFAULT_START_OFFSET = 0;
public static final short DEFAULT_ERROR_CODE = Errors.NONE.code();
public static final int DEFAULT_LEADER_EPOCH = 0;
public static final String DEFAULT_ERR_MESSAGE = Errors.NONE.message();
public static PartitionIdData newPartitionIdData(int partition) {
return new PartitionData(partition, DEFAULT_STATE_EPOCH, DEFAULT_START_OFFSET, DEFAULT_ERROR_CODE, DEFAULT_ERR_MESSAGE, DEFAULT_LEADER_EPOCH, null);
}
public static PartitionIdLeaderEpochData newPartitionIdLeaderEpochData(int partition, int leaderEpoch) {
return new PartitionData(partition, DEFAULT_STATE_EPOCH, DEFAULT_START_OFFSET, DEFAULT_ERROR_CODE, DEFAULT_ERR_MESSAGE, leaderEpoch, null);
}
public static PartitionStateData newPartitionStateData(int partition, int stateEpoch, long startOffset) {
return new PartitionData(partition, stateEpoch, startOffset, DEFAULT_ERROR_CODE, DEFAULT_ERR_MESSAGE, DEFAULT_LEADER_EPOCH, null);
}
public static PartitionErrorData newPartitionErrorData(int partition, short errorCode, String errorMessage) {
return new PartitionData(partition, DEFAULT_STATE_EPOCH, DEFAULT_START_OFFSET, errorCode, errorMessage, DEFAULT_LEADER_EPOCH, null);
}
public static PartitionStateErrorData newPartitionStateErrorData(int partition, int stateEpoch, long startOffset, short errorCode, String errorMessage) {
return new PartitionData(partition, stateEpoch, startOffset, errorCode, errorMessage, DEFAULT_LEADER_EPOCH, null);
}
public static PartitionStateBatchData newPartitionStateBatchData(int partition, int stateEpoch, long startOffset, int leaderEpoch, List<PersisterStateBatch> stateBatches) {
return new PartitionData(partition, stateEpoch, startOffset, DEFAULT_ERROR_CODE, DEFAULT_ERR_MESSAGE, leaderEpoch, stateBatches);
}
public static PartitionAllData newPartitionAllData(int partition, int stateEpoch, long startOffset, short errorCode, String errorMessage, List<PersisterStateBatch> stateBatches) {
return new PartitionData(partition, stateEpoch, startOffset, errorCode, errorMessage, DEFAULT_LEADER_EPOCH, stateBatches);
}
}

View File

@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.group.share;
/**
* This interface is implemented by classes used to contain the data for a partition with its partition index
* in the interface to {@link Persister}.
*/
public interface PartitionIdData extends PartitionInfoData {
int partition();
}

View File

@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.group.share;
/**
* This interface is implemented by classes used to contain the data for a partition with its leader epoch
* in the interface to {@link Persister}.
*/
public interface PartitionIdLeaderEpochData extends PartitionInfoData, PartitionIdData {
int leaderEpoch();
}

View File

@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.group.share;
/**
* This interface is implemented by classes used to contain the data for a partition in the
* interface to {@link Persister}.
*/
public interface PartitionInfoData {
}

View File

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.group.share;
import java.util.List;
/**
* This interface is implemented by classes used to contain the data for a partition with state batch data
* in the interface to {@link Persister}.
*/
public interface PartitionStateBatchData extends PartitionInfoData, PartitionIdData {
int stateEpoch();
long startOffset();
int leaderEpoch();
List<PersisterStateBatch> stateBatches();
}

View File

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.group.share;
/**
* This interface is implemented by classes used to contain the data for a partition with the state epoch and
* start offset in the interface to {@link Persister}.
*/
public interface PartitionStateData extends PartitionInfoData, PartitionIdData {
int stateEpoch();
long startOffset();
}

View File

@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.group.share;
/**
* This interface is implemented by classes used to contain the data for a partition with state and error data
* in the interface to {@link Persister}.
*/
public interface PartitionStateErrorData extends PartitionInfoData, PartitionIdData {
int stateEpoch();
long startOffset();
short errorCode();
String errorMessage();
}

View File

@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.group.share;
import org.apache.kafka.common.annotation.InterfaceStability;
import java.util.concurrent.CompletableFuture;
/**
* This interface introduces methods which can be used by callers to interact with the
* persistence implementation responsible for storing share group/partition states.
* For KIP-932, the default {@link Persister} use a share coordinator which stores information in
* an internal topic, but this interface allows for other variations as well.
*/
@InterfaceStability.Evolving
public interface Persister {
/**
* Initialize the share partition state.
*
* @param request Request parameters
* @return A {@link CompletableFuture} that completes with the result.
*/
CompletableFuture<InitializeShareGroupStateResult> initializeState(InitializeShareGroupStateParameters request) throws IllegalArgumentException;
/**
* Read share-partition state.
*
* @param request Request parameters
* @return A {@link CompletableFuture} that completes with the result.
*/
CompletableFuture<ReadShareGroupStateResult> readState(ReadShareGroupStateParameters request) throws IllegalArgumentException;
/**
* Write share-partition state.
*
* @param request Request parameters
* @return A {@link CompletableFuture} that completes with the result.
*/
CompletableFuture<WriteShareGroupStateResult> writeState(WriteShareGroupStateParameters request) throws IllegalArgumentException;
/**
* Delete share-partition state.
*
* @param request Request parameters
* @return A {@link CompletableFuture} that completes with the result.
*/
CompletableFuture<DeleteShareGroupStateResult> deleteState(DeleteShareGroupStateParameters request) throws IllegalArgumentException;
/**
* Read the offset information from share-partition state.
*
* @param request Request parameters
* @return A {@link CompletableFuture} that completes with the result.
*/
CompletableFuture<ReadShareGroupStateSummaryResult> readSummary(ReadShareGroupStateSummaryParameters request) throws IllegalArgumentException;
/**
* Perform cleanup and interrupt any threads
*/
void stop();
}

View File

@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.group.share;
/**
* Marker interface for parameter classes related to the {@link Persister} result classes.
*/
public interface PersisterParameters {
}

View File

@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.group.share;
/**
* Marker interface for result classes related to the {@link Persister} result classes.
*/
public interface PersisterResult {
}

View File

@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.group.share;
import org.apache.kafka.common.message.ReadShareGroupStateResponseData;
import org.apache.kafka.common.message.WriteShareGroupStateRequestData;
import java.util.Objects;
/**
* This class contains the information for a single batch of state information for use by the {@link Persister}.
*/
public class PersisterStateBatch {
private final long firstOffset;
private final long lastOffset;
private final byte deliveryState;
private final short deliveryCount;
public PersisterStateBatch(long firstOffset, long lastOffset, byte deliveryState, short deliveryCount) {
this.firstOffset = firstOffset;
this.lastOffset = lastOffset;
this.deliveryState = deliveryState;
this.deliveryCount = deliveryCount;
}
public long firstOffset() {
return firstOffset;
}
public long lastOffset() {
return lastOffset;
}
public byte deliveryState() {
return deliveryState;
}
public short deliveryCount() {
return deliveryCount;
}
public static PersisterStateBatch from(ReadShareGroupStateResponseData.StateBatch batch) {
return new PersisterStateBatch(
batch.firstOffset(),
batch.lastOffset(),
batch.deliveryState(),
batch.deliveryCount());
}
public static PersisterStateBatch from(WriteShareGroupStateRequestData.StateBatch batch) {
return new PersisterStateBatch(
batch.firstOffset(),
batch.lastOffset(),
batch.deliveryState(),
batch.deliveryCount());
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
PersisterStateBatch that = (PersisterStateBatch) o;
return firstOffset == that.firstOffset && lastOffset == that.lastOffset && deliveryState == that.deliveryState && deliveryCount == that.deliveryCount;
}
@Override
public int hashCode() {
return Objects.hash(firstOffset, lastOffset, deliveryState, deliveryCount);
}
@Override
public String toString() {
return "PersisterStateBatch(" +
"firstOffset=" + firstOffset + "," +
"lastOffset=" + lastOffset + "," +
"deliveryState=" + deliveryState + "," +
"deliveryCount=" + deliveryCount +
")";
}
}

View File

@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.group.share;
import org.apache.kafka.common.message.ReadShareGroupStateRequestData;
import java.util.stream.Collectors;
/**
* This class contains the parameters for {@link Persister#readState(ReadShareGroupStateParameters)}.
*/
public class ReadShareGroupStateParameters implements PersisterParameters {
private final GroupTopicPartitionData<PartitionIdLeaderEpochData> groupTopicPartitionData;
private ReadShareGroupStateParameters(GroupTopicPartitionData<PartitionIdLeaderEpochData> groupTopicPartitionData) {
this.groupTopicPartitionData = groupTopicPartitionData;
}
public GroupTopicPartitionData<PartitionIdLeaderEpochData> groupTopicPartitionData() {
return groupTopicPartitionData;
}
public static ReadShareGroupStateParameters from(ReadShareGroupStateRequestData data) {
return new Builder()
.setGroupTopicPartitionData(new GroupTopicPartitionData<>(data.groupId(), data.topics().stream()
.map(readStateData -> new TopicData<>(readStateData.topicId(),
readStateData.partitions().stream()
.map(partitionData -> PartitionFactory.newPartitionIdLeaderEpochData(partitionData.partition(), partitionData.leaderEpoch()))
.collect(Collectors.toList())))
.collect(Collectors.toList())))
.build();
}
public static class Builder {
private GroupTopicPartitionData<PartitionIdLeaderEpochData> groupTopicPartitionData;
public Builder setGroupTopicPartitionData(GroupTopicPartitionData<PartitionIdLeaderEpochData> groupTopicPartitionData) {
this.groupTopicPartitionData = groupTopicPartitionData;
return this;
}
public ReadShareGroupStateParameters build() {
return new ReadShareGroupStateParameters(groupTopicPartitionData);
}
}
@Override
public String toString() {
return "ReadShareGroupStateParameters(" + groupTopicPartitionData + ")";
}
}

View File

@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.group.share;
import org.apache.kafka.common.message.ReadShareGroupStateResponseData;
import java.util.List;
import java.util.stream.Collectors;
/**
* This class contains the result from {@link Persister#readState(ReadShareGroupStateParameters)}.
*/
public class ReadShareGroupStateResult implements PersisterResult {
private final List<TopicData<PartitionAllData>> topicsData;
private ReadShareGroupStateResult(List<TopicData<PartitionAllData>> topicsData) {
this.topicsData = topicsData;
}
public List<TopicData<PartitionAllData>> topicsData() {
return topicsData;
}
public static ReadShareGroupStateResult from(ReadShareGroupStateResponseData data) {
return new Builder()
.setTopicsData(data.results().stream()
.map(topicData -> new TopicData<>(topicData.topicId(),
topicData.partitions().stream()
.map(partitionResult -> PartitionFactory.newPartitionAllData(
partitionResult.partition(),
partitionResult.stateEpoch(),
partitionResult.startOffset(),
partitionResult.errorCode(),
partitionResult.errorMessage(),
partitionResult.stateBatches().stream()
.map(PersisterStateBatch::from)
.collect(Collectors.toList())
))
.collect(Collectors.toList())))
.collect(Collectors.toList()))
.build();
}
public static class Builder {
private List<TopicData<PartitionAllData>> topicsData;
public Builder setTopicsData(List<TopicData<PartitionAllData>> topicsData) {
this.topicsData = topicsData;
return this;
}
public ReadShareGroupStateResult build() {
return new ReadShareGroupStateResult(topicsData);
}
}
}

View File

@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.group.share;
import org.apache.kafka.common.message.ReadShareGroupStateSummaryRequestData;
import java.util.stream.Collectors;
/**
* This class contains the parameters for {@link Persister#readSummary(ReadShareGroupStateSummaryParameters)}.
*/
public class ReadShareGroupStateSummaryParameters implements PersisterParameters {
private final GroupTopicPartitionData<PartitionIdLeaderEpochData> groupTopicPartitionData;
private ReadShareGroupStateSummaryParameters(GroupTopicPartitionData<PartitionIdLeaderEpochData> groupTopicPartitionData) {
this.groupTopicPartitionData = groupTopicPartitionData;
}
public GroupTopicPartitionData<PartitionIdLeaderEpochData> groupTopicPartitionData() {
return groupTopicPartitionData;
}
public static ReadShareGroupStateSummaryParameters from(ReadShareGroupStateSummaryRequestData data) {
return new Builder()
.setGroupTopicPartitionData(new GroupTopicPartitionData<>(data.groupId(), data.topics().stream()
.map(topicData -> new TopicData<>(topicData.topicId(),
topicData.partitions().stream()
.map(partitionData -> PartitionFactory.newPartitionIdLeaderEpochData(partitionData.partition(), partitionData.leaderEpoch()))
.collect(Collectors.toList())))
.collect(Collectors.toList())))
.build();
}
public static class Builder {
private GroupTopicPartitionData<PartitionIdLeaderEpochData> groupTopicPartitionData;
public Builder setGroupTopicPartitionData(GroupTopicPartitionData<PartitionIdLeaderEpochData> groupTopicPartitionData) {
this.groupTopicPartitionData = groupTopicPartitionData;
return this;
}
public ReadShareGroupStateSummaryParameters build() {
return new ReadShareGroupStateSummaryParameters(groupTopicPartitionData);
}
}
}

View File

@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.group.share;
import org.apache.kafka.common.message.ReadShareGroupStateSummaryResponseData;
import java.util.List;
import java.util.stream.Collectors;
/**
* This class contains the result from {@link Persister#readSummary(ReadShareGroupStateSummaryParameters)}.
*/
public class ReadShareGroupStateSummaryResult implements PersisterResult {
private final List<TopicData<PartitionStateErrorData>> topicsData;
private ReadShareGroupStateSummaryResult(List<TopicData<PartitionStateErrorData>> topicsData) {
this.topicsData = topicsData;
}
public static ReadShareGroupStateSummaryResult from(ReadShareGroupStateSummaryResponseData data) {
return new Builder()
.setTopicsData(data.results().stream()
.map(readStateSummaryResult -> new TopicData<>(readStateSummaryResult.topicId(),
readStateSummaryResult.partitions().stream()
.map(partitionResult -> PartitionFactory.newPartitionStateErrorData(
partitionResult.partition(), partitionResult.stateEpoch(), partitionResult.startOffset(), partitionResult.errorCode(), partitionResult.errorMessage()))
.collect(Collectors.toList())))
.collect(Collectors.toList()))
.build();
}
public static class Builder {
private List<TopicData<PartitionStateErrorData>> topicsData;
public Builder setTopicsData(List<TopicData<PartitionStateErrorData>> topicsData) {
this.topicsData = topicsData;
return this;
}
public ReadShareGroupStateSummaryResult build() {
return new ReadShareGroupStateSummaryResult(topicsData);
}
}
}

View File

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.group.share;
import org.apache.kafka.common.Uuid;
import java.util.Objects;
public class ShareGroupHelper {
/**
* Calculates the coordinator key for finding a share coordinator.
*
* @param groupId Group ID
* @param topicId Topic ID
* @param partition Partition index
*
* @return The coordinator key
*/
public static String coordinatorKey(String groupId, Uuid topicId, int partition) {
Objects.requireNonNull(groupId);
Objects.requireNonNull(topicId);
return String.format("%s:%s:%d", groupId, topicId, partition);
}
}

View File

@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.group.share;
import org.apache.kafka.common.Uuid;
import java.util.List;
import java.util.Objects;
/**
* This class is used to contain the data for a topic and its partitions in the interface to {@link Persister}.
*
* @param <P> The type of {@link PartitionInfoData}
*/
public class TopicData<P extends PartitionInfoData> {
private final Uuid topicId;
private final List<P> partitions;
public TopicData(Uuid topicId, List<P> partitions) {
this.topicId = topicId;
this.partitions = partitions;
}
public Uuid topicId() {
return topicId;
}
public List<P> partitions() {
return partitions;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
TopicData<?> topicData = (TopicData<?>) o;
return Objects.equals(topicId, topicData.topicId) && Objects.equals(partitions, topicData.partitions);
}
@Override
public int hashCode() {
return Objects.hash(topicId, partitions);
}
@Override
public String toString() {
return "TopicData(" +
"topicId=" + topicId + "," +
"partitions=" + partitions +
")";
}
}

View File

@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.group.share;
import org.apache.kafka.common.message.WriteShareGroupStateRequestData;
import java.util.stream.Collectors;
/**
* This class contains the parameters for {@link Persister#writeState(WriteShareGroupStateParameters)}.
*/
public class WriteShareGroupStateParameters implements PersisterParameters {
private final GroupTopicPartitionData<PartitionStateBatchData> groupTopicPartitionData;
private WriteShareGroupStateParameters(GroupTopicPartitionData<PartitionStateBatchData> groupTopicPartitionData) {
this.groupTopicPartitionData = groupTopicPartitionData;
}
public GroupTopicPartitionData<PartitionStateBatchData> groupTopicPartitionData() {
return groupTopicPartitionData;
}
public static WriteShareGroupStateParameters from(WriteShareGroupStateRequestData data) {
return new Builder()
.setGroupTopicPartitionData(new GroupTopicPartitionData<>(data.groupId(), data.topics().stream()
.map(writeStateData -> new TopicData<>(writeStateData.topicId(),
writeStateData.partitions().stream()
.map(partitionData -> PartitionFactory.newPartitionStateBatchData(partitionData.partition(), partitionData.stateEpoch(), partitionData.startOffset(),
partitionData.leaderEpoch(),
partitionData.stateBatches().stream()
.map(PersisterStateBatch::from)
.collect(Collectors.toList())))
.collect(Collectors.toList())))
.collect(Collectors.toList())))
.build();
}
public static class Builder {
private GroupTopicPartitionData<PartitionStateBatchData> groupTopicPartitionData;
public Builder setGroupTopicPartitionData(GroupTopicPartitionData<PartitionStateBatchData> groupTopicPartitionData) {
this.groupTopicPartitionData = groupTopicPartitionData;
return this;
}
public WriteShareGroupStateParameters build() {
return new WriteShareGroupStateParameters(groupTopicPartitionData);
}
}
@Override
public String toString() {
return "WriteShareGroupStateParameters(" + groupTopicPartitionData + ")";
}
}

View File

@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.group.share;
import org.apache.kafka.common.message.WriteShareGroupStateResponseData;
import java.util.List;
import java.util.stream.Collectors;
/**
* This class contains the result from {@link Persister#writeState(WriteShareGroupStateParameters)}.
*/
public class WriteShareGroupStateResult implements PersisterResult {
private final List<TopicData<PartitionErrorData>> topicsData;
private WriteShareGroupStateResult(List<TopicData<PartitionErrorData>> topicsData) {
this.topicsData = topicsData;
}
public List<TopicData<PartitionErrorData>> topicsData() {
return topicsData;
}
public static WriteShareGroupStateResult from(WriteShareGroupStateResponseData data) {
return new Builder()
.setTopicsData(data.results().stream()
.map(writeStateResult -> new TopicData<>(writeStateResult.topicId(),
writeStateResult.partitions().stream()
.map(partitionResult -> PartitionFactory.newPartitionErrorData(partitionResult.partition(), partitionResult.errorCode(), partitionResult.errorMessage()))
.collect(Collectors.toList())))
.collect(Collectors.toList()))
.build();
}
public static class Builder {
private List<TopicData<PartitionErrorData>> topicsData;
public Builder setTopicsData(List<TopicData<PartitionErrorData>> topicsData) {
this.topicsData = topicsData;
return this;
}
public WriteShareGroupStateResult build() {
return new WriteShareGroupStateResult(topicsData);
}
}
}