MINOR: Reject requests using unsupported features in KIP-1071 (#20031)
CI / build (push) Waiting to run Details

KIP-1071 does not currently support all features planned in the KIP. We
should reject any requests that are using features that are currently
not implemented.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Matthias J. Sax
 <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>
This commit is contained in:
Lucas Brutschy 2025-06-25 14:48:56 +02:00 committed by GitHub
parent 5e23df0c8d
commit 23ddb1d8ac
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 124 additions and 19 deletions

View File

@ -141,6 +141,7 @@ import static org.apache.kafka.coordinator.common.runtime.CoordinatorOperationEx
import static org.apache.kafka.coordinator.group.Utils.throwIfEmptyString;
import static org.apache.kafka.coordinator.group.Utils.throwIfNotEmptyCollection;
import static org.apache.kafka.coordinator.group.Utils.throwIfNotNull;
import static org.apache.kafka.coordinator.group.Utils.throwIfNotNullOrEmpty;
import static org.apache.kafka.coordinator.group.Utils.throwIfNull;
/**
@ -540,6 +541,26 @@ public class GroupCoordinatorService implements GroupCoordinator {
}
}
/**
* Validates the request. Specifically, throws if any not-yet-supported features are used.
*
* @param request The request to validate.
* @throws InvalidRequestException if the request is not valid.
*/
private static void throwIfStreamsGroupHeartbeatRequestIsUsingUnsupportedFeatures(
StreamsGroupHeartbeatRequestData request
) throws InvalidRequestException {
throwIfNotNull(request.instanceId(), "Static membership is not yet supported.");
throwIfNotNull(request.taskOffsets(), "TaskOffsets are not supported yet.");
throwIfNotNull(request.taskEndOffsets(), "TaskEndOffsets are not supported yet.");
throwIfNotNullOrEmpty(request.warmupTasks(), "WarmupTasks are not supported yet.");
if (request.topology() != null) {
for (StreamsGroupHeartbeatRequestData.Subtopology subtopology : request.topology().subtopologies()) {
throwIfNotEmptyCollection(subtopology.sourceTopicRegex(), "Regular expressions for source topics are not supported yet.");
}
}
}
/**
* See
* {@link GroupCoordinator#streamsGroupHeartbeat(AuthorizableRequestContext, StreamsGroupHeartbeatRequestData)}.
@ -559,6 +580,7 @@ public class GroupCoordinatorService implements GroupCoordinator {
}
try {
throwIfStreamsGroupHeartbeatRequestIsUsingUnsupportedFeatures(request);
throwIfStreamsGroupHeartbeatRequestIsInvalid(request);
} catch (Throwable ex) {
ApiError apiError = ApiError.fromThrowable(ex);

View File

@ -280,6 +280,22 @@ public class Utils {
}
}
/**
* Throws an InvalidRequestException if the value is not null and non-empty.
*
* @param value The value.
* @param error The error message.
* @throws InvalidRequestException
*/
static void throwIfNotNullOrEmpty(
Collection<?> value,
String error
) throws InvalidRequestException {
if (value != null && !value.isEmpty()) {
throw new InvalidRequestException(error);
}
}
/**
* Throws an InvalidRequestException if the value is non-null.
*

View File

@ -574,6 +574,91 @@ public class GroupCoordinatorServiceTest {
future.get(5, TimeUnit.SECONDS)
);
}
@Test
public void testStreamsGroupHeartbeatFailsForUnsupportedFeatures() throws Exception {
GroupCoordinatorService service = new GroupCoordinatorServiceBuilder()
.setConfig(createConfig())
.setRuntime(mockRuntime())
.build(true);
AuthorizableRequestContext context = mock(AuthorizableRequestContext.class);
when(context.requestVersion()).thenReturn((int) ApiKeys.STREAMS_GROUP_HEARTBEAT.latestVersion());
assertEquals(
new StreamsGroupHeartbeatResult(
new StreamsGroupHeartbeatResponseData()
.setErrorCode(Errors.INVALID_REQUEST.code())
.setErrorMessage("Static membership is not yet supported."),
Map.of()
),
service.streamsGroupHeartbeat(
context,
new StreamsGroupHeartbeatRequestData()
.setInstanceId(Uuid.randomUuid().toString())
).get(5, TimeUnit.SECONDS)
);
assertEquals(
new StreamsGroupHeartbeatResult(
new StreamsGroupHeartbeatResponseData()
.setErrorCode(Errors.INVALID_REQUEST.code())
.setErrorMessage("TaskOffsets are not supported yet."),
Map.of()
),
service.streamsGroupHeartbeat(
context,
new StreamsGroupHeartbeatRequestData()
.setTaskOffsets(List.of(new StreamsGroupHeartbeatRequestData.TaskOffset()))
).get(5, TimeUnit.SECONDS)
);
assertEquals(
new StreamsGroupHeartbeatResult(
new StreamsGroupHeartbeatResponseData()
.setErrorCode(Errors.INVALID_REQUEST.code())
.setErrorMessage("TaskEndOffsets are not supported yet."),
Map.of()
),
service.streamsGroupHeartbeat(
context,
new StreamsGroupHeartbeatRequestData()
.setTaskEndOffsets(List.of(new StreamsGroupHeartbeatRequestData.TaskOffset()))
).get(5, TimeUnit.SECONDS)
);
assertEquals(
new StreamsGroupHeartbeatResult(
new StreamsGroupHeartbeatResponseData()
.setErrorCode(Errors.INVALID_REQUEST.code())
.setErrorMessage("WarmupTasks are not supported yet."),
Map.of()
),
service.streamsGroupHeartbeat(
context,
new StreamsGroupHeartbeatRequestData()
.setWarmupTasks(List.of(new StreamsGroupHeartbeatRequestData.TaskIds()))
).get(5, TimeUnit.SECONDS)
);
assertEquals(
new StreamsGroupHeartbeatResult(
new StreamsGroupHeartbeatResponseData()
.setErrorCode(Errors.INVALID_REQUEST.code())
.setErrorMessage("Regular expressions for source topics are not supported yet."),
Map.of()
),
service.streamsGroupHeartbeat(
context,
new StreamsGroupHeartbeatRequestData()
.setTopology(new StreamsGroupHeartbeatRequestData.Topology()
.setSubtopologies(List.of(new StreamsGroupHeartbeatRequestData.Subtopology()
.setSourceTopicRegex(List.of("foo.*"))
))
)
).get(5, TimeUnit.SECONDS)
);
}
@SuppressWarnings("MethodLength")
@Test
@ -584,7 +669,7 @@ public class GroupCoordinatorServiceTest {
.build(true);
AuthorizableRequestContext context = mock(AuthorizableRequestContext.class);
when(context.requestVersion()).thenReturn((int) ApiKeys.SHARE_GROUP_HEARTBEAT.latestVersion());
when(context.requestVersion()).thenReturn((int) ApiKeys.STREAMS_GROUP_HEARTBEAT.latestVersion());
String memberId = Uuid.randomUuid().toString();
@ -743,24 +828,6 @@ public class GroupCoordinatorServiceTest {
).get(5, TimeUnit.SECONDS)
);
// InstanceId must be non-empty if provided in all requests.
assertEquals(
new StreamsGroupHeartbeatResult(
new StreamsGroupHeartbeatResponseData()
.setErrorCode(Errors.INVALID_REQUEST.code())
.setErrorMessage("InstanceId can't be empty."),
Map.of()
),
service.streamsGroupHeartbeat(
context,
new StreamsGroupHeartbeatRequestData()
.setGroupId("foo")
.setMemberId(memberId)
.setMemberEpoch(1)
.setInstanceId("")
).get(5, TimeUnit.SECONDS)
);
// RackId must be non-empty if provided in all requests.
assertEquals(
new StreamsGroupHeartbeatResult(