Merge remote-tracking branch 'origin' into testRemoteLogManagerRemoteMetrics

This commit is contained in:
Luke Chen 2024-05-10 11:17:34 +08:00
commit d23931d5b6
45 changed files with 1628 additions and 603 deletions

View File

@ -1185,5 +1185,4 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
return loggers.setLevel(namespace, level);
}
}

View File

@ -121,6 +121,14 @@ public interface Herder {
void putConnectorConfig(String connName, Map<String, String> config, TargetState targetState, boolean allowReplace,
Callback<Created<ConnectorInfo>> callback);
/**
* Patch the configuration for a connector.
* @param connName name of the connector
* @param configPatch the connector's configuration patch.
* @param callback callback to invoke when the configuration has been written
*/
void patchConnectorConfig(String connName, Map<String, String> configPatch, Callback<Created<ConnectorInfo>> callback);
/**
* Delete a connector and its configuration.
* @param connName name of the connector

View File

@ -1096,54 +1096,82 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
log.trace("Submitting connector config write request {}", connName);
addRequest(
() -> {
validateConnectorConfig(config, callback.chainStaging((error, configInfos) -> {
if (error != null) {
callback.onCompletion(error, null);
return;
}
// Complete the connector config write via another herder request in order to
// perform the write to the backing store (or forward to the leader) during
// the "external request" portion of the tick loop
addRequest(
() -> {
if (maybeAddConfigErrors(configInfos, callback)) {
return null;
}
log.trace("Handling connector config request {}", connName);
if (!isLeader()) {
callback.onCompletion(new NotLeaderException("Only the leader can set connector configs.", leaderUrl()), null);
return null;
}
boolean exists = configState.contains(connName);
if (!allowReplace && exists) {
callback.onCompletion(new AlreadyExistsException("Connector " + connName + " already exists"), null);
return null;
}
log.trace("Submitting connector config {} {} {}", connName, allowReplace, configState.connectors());
writeToConfigTopicAsLeader(
"writing a config for connector " + connName + " to the config topic",
() -> configBackingStore.putConnectorConfig(connName, config, targetState)
);
// Note that we use the updated connector config despite the fact that we don't have an updated
// snapshot yet. The existing task info should still be accurate.
ConnectorInfo info = new ConnectorInfo(connName, config, configState.tasks(connName),
connectorType(config));
callback.onCompletion(null, new Created<>(!exists, info));
return null;
},
forwardErrorAndTickThreadStages(callback)
);
}));
doPutConnectorConfig(connName, config, targetState, allowReplace, callback);
return null;
},
forwardErrorAndTickThreadStages(callback)
);
}
@Override
public void patchConnectorConfig(String connName, Map<String, String> configPatch, Callback<Created<ConnectorInfo>> callback) {
log.trace("Submitting connector config patch request {}", connName);
addRequest(() -> {
// This reduces (but not completely eliminates) the chance for race conditions.
if (!isLeader()) {
callback.onCompletion(new NotLeaderException("Only the leader can set connector configs.", leaderUrl()), null);
return null;
}
ConnectorInfo connectorInfo = connectorInfo(connName);
if (connectorInfo == null) {
callback.onCompletion(new NotFoundException("Connector " + connName + " not found", null), null);
} else {
Map<String, String> patchedConfig = ConnectUtils.patchConfig(connectorInfo.config(), configPatch);
doPutConnectorConfig(connName, patchedConfig, null, true, callback);
}
return null;
}, forwardErrorAndTickThreadStages(callback));
}
private void doPutConnectorConfig(
String connName,
Map<String, String> config,
TargetState targetState, boolean allowReplace,
Callback<Created<ConnectorInfo>> callback) {
validateConnectorConfig(config, callback.chainStaging((error, configInfos) -> {
if (error != null) {
callback.onCompletion(error, null);
return;
}
// Complete the connector config write via another herder request in order to
// perform the write to the backing store (or forward to the leader) during
// the "external request" portion of the tick loop
addRequest(
() -> {
if (maybeAddConfigErrors(configInfos, callback)) {
return null;
}
log.trace("Handling connector config request {}", connName);
if (!isLeader()) {
callback.onCompletion(new NotLeaderException("Only the leader can set connector configs.", leaderUrl()), null);
return null;
}
boolean exists = configState.contains(connName);
if (!allowReplace && exists) {
callback.onCompletion(new AlreadyExistsException("Connector " + connName + " already exists"), null);
return null;
}
log.trace("Submitting connector config {} {} {}", connName, allowReplace, configState.connectors());
writeToConfigTopicAsLeader(
"writing a config for connector " + connName + " to the config topic",
() -> configBackingStore.putConnectorConfig(connName, config, targetState)
);
// Note that we use the updated connector config despite the fact that we don't have an updated
// snapshot yet. The existing task info should still be accurate.
ConnectorInfo info = new ConnectorInfo(connName, config, configState.tasks(connName),
connectorType(config));
callback.onCompletion(null, new Created<>(!exists, info));
return null;
},
forwardErrorAndTickThreadStages(callback)
);
}));
}
@Override
public void stopConnector(final String connName, final Callback<Void> callback) {
log.trace("Submitting request to transition connector {} to STOPPED state", connName);

View File

@ -242,6 +242,19 @@ public class ConnectorsResource {
return response.entity(createdInfo.result()).build();
}
@PATCH
@Path("/{connector}/config")
public Response patchConnectorConfig(final @PathParam("connector") String connector,
final @Context HttpHeaders headers,
final @Parameter(hidden = true) @QueryParam("forward") Boolean forward,
final Map<String, String> connectorConfigPatch) throws Throwable {
FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>();
herder.patchConnectorConfig(connector, connectorConfigPatch, cb);
Herder.Created<ConnectorInfo> createdInfo = requestHandler.completeOrForwardRequest(cb, "/connectors/" + connector + "/config",
"PATCH", headers, connectorConfigPatch, new TypeReference<ConnectorInfo>() { }, new CreatedConnectorInfoTranslator(), forward);
return Response.ok().entity(createdInfo.result()).build();
}
@POST
@Path("/{connector}/restart")
@Operation(summary = "Restart the specified connector")

View File

@ -47,6 +47,7 @@ import org.apache.kafka.connect.storage.MemoryConfigBackingStore;
import org.apache.kafka.connect.storage.MemoryStatusBackingStore;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -246,6 +247,31 @@ public class StandaloneHerder extends AbstractHerder {
}
}
@Override
public synchronized void patchConnectorConfig(String connName, Map<String, String> configPatch, Callback<Created<ConnectorInfo>> callback) {
try {
ConnectorInfo connectorInfo = connectorInfo(connName);
if (connectorInfo == null) {
callback.onCompletion(new NotFoundException("Connector " + connName + " not found", null), null);
return;
}
Map<String, String> patchedConfig = ConnectUtils.patchConfig(connectorInfo.config(), configPatch);
validateConnectorConfig(patchedConfig, (error, configInfos) -> {
if (error != null) {
callback.onCompletion(error, null);
return;
}
requestExecutorService.submit(
() -> putConnectorConfig(connName, patchedConfig, null, true, callback, configInfos)
);
});
} catch (Throwable e) {
callback.onCompletion(e, null);
}
}
@Override
public synchronized void stopConnector(String connName, Callback<Void> callback) {
try {

View File

@ -29,6 +29,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@ -218,4 +219,28 @@ public final class ConnectUtils {
public static String className(Object o) {
return o != null ? o.getClass().getName() : "null";
}
/**
* Apply a patch on a connector config.
*
* <p>In the output, the values from the patch will override the values from the config.
* {@code null} values will cause the corresponding key to be removed completely.
* @param config the config to be patched.
* @param patch the patch.
* @return the output config map.
*/
public static Map<String, String> patchConfig(
Map<String, String> config,
Map<String, String> patch
) {
Map<String, String> result = new HashMap<>(config);
patch.forEach((k, v) -> {
if (v != null) {
result.put(k, v);
} else {
result.remove(k);
}
});
return result;
}
}

View File

@ -773,6 +773,43 @@ public class ConnectWorkerIntegrationTest {
connect.assertions().assertConnectorDoesNotExist(CONNECTOR_NAME, "Connector wasn't deleted in time");
}
@Test
public void testPatchConnectorConfig() throws Exception {
connect = connectBuilder.build();
// start the clusters
connect.start();
connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
"Initial group of workers did not start in time.");
connect.kafka().createTopic(TOPIC_NAME);
Map<String, String> props = defaultSinkConnectorProps(TOPIC_NAME);
props.put("unaffected-key", "unaffected-value");
props.put("to-be-deleted-key", "value");
props.put(TASKS_MAX_CONFIG, "2");
Map<String, String> patch = new HashMap<>();
patch.put(TASKS_MAX_CONFIG, "3"); // this plays as a value to be changed
patch.put("to-be-added-key", "value");
patch.put("to-be-deleted-key", null);
connect.configureConnector(CONNECTOR_NAME, props);
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, 2,
"connector and tasks did not start in time");
connect.patchConnectorConfig(CONNECTOR_NAME, patch);
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME, 3,
"connector and tasks did not reconfigure and restart in time");
Map<String, String> expectedConfig = new HashMap<>(props);
expectedConfig.put("name", CONNECTOR_NAME);
expectedConfig.put("to-be-added-key", "value");
expectedConfig.put(TASKS_MAX_CONFIG, "3");
expectedConfig.remove("to-be-deleted-key");
assertEquals(expectedConfig, connect.connectorInfo(CONNECTOR_NAME).config());
}
private Map<String, String> defaultSinkConnectorProps(String topics) {
// setup props for the sink connector
Map<String, String> props = new HashMap<>();

View File

@ -2336,6 +2336,133 @@ public class DistributedHerderTest {
verifyNoMoreInteractions(worker, member, configBackingStore, statusBackingStore);
}
@Test
public void testPatchConnectorConfigNotFound() {
when(member.memberId()).thenReturn("leader");
expectRebalance(0, Collections.emptyList(), Collections.emptyList(), true);
when(statusBackingStore.connectors()).thenReturn(Collections.emptySet());
ClusterConfigState clusterConfigState = new ClusterConfigState(
0,
null,
Collections.emptyMap(),
Collections.emptyMap(),
Collections.emptyMap(),
Collections.emptyMap(),
Collections.emptyMap(),
Collections.emptyMap(),
Collections.emptySet(),
Collections.emptySet());
expectConfigRefreshAndSnapshot(clusterConfigState);
Map<String, String> connConfigPatch = new HashMap<>();
connConfigPatch.put("foo1", "baz1");
FutureCallback<Herder.Created<ConnectorInfo>> patchCallback = new FutureCallback<>();
herder.patchConnectorConfig(CONN2, connConfigPatch, patchCallback);
herder.tick();
assertTrue(patchCallback.isDone());
ExecutionException exception = assertThrows(ExecutionException.class, patchCallback::get);
assertInstanceOf(NotFoundException.class, exception.getCause());
}
@Test
public void testPatchConnectorConfigNotALeader() {
when(member.memberId()).thenReturn("not-leader");
// The connector is pre-existing due to the mocks.
ClusterConfigState originalSnapshot = new ClusterConfigState(
1,
null,
Collections.singletonMap(CONN1, 0),
Collections.singletonMap(CONN1, CONN1_CONFIG),
Collections.singletonMap(CONN1, TargetState.STARTED),
Collections.emptyMap(),
Collections.emptyMap(),
Collections.emptyMap(),
Collections.emptySet(),
Collections.emptySet());
expectConfigRefreshAndSnapshot(originalSnapshot);
when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0);
// Patch the connector config.
expectMemberEnsureActive();
expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList(), false);
FutureCallback<Herder.Created<ConnectorInfo>> patchCallback = new FutureCallback<>();
herder.patchConnectorConfig(CONN1, new HashMap<>(), patchCallback);
herder.tick();
assertTrue(patchCallback.isDone());
ExecutionException fencingException = assertThrows(ExecutionException.class, patchCallback::get);
assertInstanceOf(ConnectException.class, fencingException.getCause());
}
@Test
public void testPatchConnectorConfig() throws Exception {
when(member.memberId()).thenReturn("leader");
expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList(), true);
when(statusBackingStore.connectors()).thenReturn(Collections.emptySet());
Map<String, String> originalConnConfig = new HashMap<>(CONN1_CONFIG);
originalConnConfig.put("foo0", "unaffected");
originalConnConfig.put("foo1", "will-be-changed");
originalConnConfig.put("foo2", "will-be-removed");
// The connector is pre-existing due to the mocks.
ClusterConfigState originalSnapshot = new ClusterConfigState(
1,
null,
Collections.singletonMap(CONN1, 0),
Collections.singletonMap(CONN1, originalConnConfig),
Collections.singletonMap(CONN1, TargetState.STARTED),
Collections.emptyMap(),
Collections.emptyMap(),
Collections.emptyMap(),
Collections.emptySet(),
Collections.emptySet());
expectConfigRefreshAndSnapshot(originalSnapshot);
when(member.currentProtocolVersion()).thenReturn(CONNECT_PROTOCOL_V0);
expectMemberPoll();
// Patch the connector config.
Map<String, String> connConfigPatch = new HashMap<>();
connConfigPatch.put("foo1", "changed");
connConfigPatch.put("foo2", null);
connConfigPatch.put("foo3", "added");
Map<String, String> patchedConnConfig = new HashMap<>(originalConnConfig);
patchedConnConfig.put("foo0", "unaffected");
patchedConnConfig.put("foo1", "changed");
patchedConnConfig.remove("foo2");
patchedConnConfig.put("foo3", "added");
expectMemberEnsureActive();
expectRebalance(1, Arrays.asList(CONN1), Collections.emptyList(), true);
ArgumentCaptor<Callback<ConfigInfos>> validateCallback = ArgumentCaptor.forClass(Callback.class);
doAnswer(invocation -> {
validateCallback.getValue().onCompletion(null, CONN1_CONFIG_INFOS);
return null;
}).when(herder).validateConnectorConfig(eq(patchedConnConfig), validateCallback.capture());
FutureCallback<Herder.Created<ConnectorInfo>> patchCallback = new FutureCallback<>();
herder.patchConnectorConfig(CONN1, connConfigPatch, patchCallback);
herder.tick();
assertTrue(patchCallback.isDone());
assertEquals(patchedConnConfig, patchCallback.get().result().config());
// This is effectively the main check of this test:
// we validate that what's written in the config storage is the patched config.
verify(configBackingStore).putConnectorConfig(eq(CONN1), eq(patchedConnConfig), isNull());
verifyNoMoreInteractions(configBackingStore);
// No need to check herder.connectorConfig explicitly:
// all the related parts are mocked and that the config is correct is checked by eq()'s in the mocked called above.
}
@Test
public void testKeyRotationWhenWorkerBecomesLeader() throws Exception {
long rotationTtlDelay = DistributedConfig.INTER_WORKER_KEY_TTL_MS_MS_DEFAULT;

View File

@ -109,6 +109,17 @@ public class ConnectorsResourceTest {
CONNECTOR_CONFIG.put("sample_config", "test_config");
}
private static final Map<String, String> CONNECTOR_CONFIG_PATCH = new HashMap<>();
static {
CONNECTOR_CONFIG_PATCH.put("sample_config", "test_config_new");
CONNECTOR_CONFIG_PATCH.put("sample_config_2", "test_config_2");
}
private static final Map<String, String> CONNECTOR_CONFIG_PATCHED = new HashMap<>(CONNECTOR_CONFIG);
static {
CONNECTOR_CONFIG_PATCHED.putAll(CONNECTOR_CONFIG_PATCH);
}
private static final Map<String, String> CONNECTOR_CONFIG_CONTROL_SEQUENCES = new HashMap<>();
static {
CONNECTOR_CONFIG_CONTROL_SEQUENCES.put("name", CONNECTOR_NAME_CONTROL_SEQUENCES1);
@ -588,6 +599,37 @@ public class ConnectorsResourceTest {
assertThrows(BadRequestException.class, () -> connectorsResource.createConnector(FORWARD, NULL_HEADERS, request));
}
@Test
public void testPatchConnectorConfig() throws Throwable {
final ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> cb = ArgumentCaptor.forClass(Callback.class);
expectAndCallbackResult(cb, new Herder.Created<>(true, new ConnectorInfo(CONNECTOR_NAME, CONNECTOR_CONFIG_PATCHED, CONNECTOR_TASK_NAMES,
ConnectorType.SINK))
).when(herder).patchConnectorConfig(eq(CONNECTOR_NAME), eq(CONNECTOR_CONFIG_PATCH), cb.capture());
connectorsResource.patchConnectorConfig(CONNECTOR_NAME, NULL_HEADERS, FORWARD, CONNECTOR_CONFIG_PATCH);
}
@Test
public void testPatchConnectorConfigLeaderRedirect() throws Throwable {
final ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> cb = ArgumentCaptor.forClass(Callback.class);
expectAndCallbackNotLeaderException(cb)
.when(herder).patchConnectorConfig(eq(CONNECTOR_NAME), eq(CONNECTOR_CONFIG_PATCH), cb.capture());
when(restClient.httpRequest(eq(LEADER_URL + "connectors/" + CONNECTOR_NAME + "/config?forward=false"), eq("PATCH"), isNull(), eq(CONNECTOR_CONFIG_PATCH), any()))
.thenReturn(new RestClient.HttpResponse<>(200, new HashMap<>(CONNECTOR_CONFIG_PATCHED), null));
connectorsResource.patchConnectorConfig(CONNECTOR_NAME, NULL_HEADERS, FORWARD, CONNECTOR_CONFIG_PATCH);
}
@Test
public void testPatchConnectorConfigNotFound() throws Throwable {
final ArgumentCaptor<Callback<Herder.Created<ConnectorInfo>>> cb = ArgumentCaptor.forClass(Callback.class);
expectAndCallbackException(cb, new NotFoundException("Connector " + CONNECTOR_NAME + " not found"))
.when(herder).patchConnectorConfig(eq(CONNECTOR_NAME), eq(CONNECTOR_CONFIG_PATCH), cb.capture());
assertThrows(NotFoundException.class, () -> connectorsResource.patchConnectorConfig(
CONNECTOR_NAME, NULL_HEADERS, FORWARD, CONNECTOR_CONFIG_PATCH));
}
@Test
public void testGetConnectorTaskConfigs() throws Throwable {
final ArgumentCaptor<Callback<List<TaskInfo>>> cb = ArgumentCaptor.forClass(Callback.class);

View File

@ -77,7 +77,9 @@ import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static java.util.Collections.emptyList;
import static java.util.Collections.singleton;
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
@ -751,6 +753,77 @@ public class StandaloneHerderTest {
verifyNoMoreInteractions(connectorConfigCb);
}
@Test
public void testPatchConnectorConfigNotFound() {
Map<String, String> connConfigPatch = new HashMap<>();
connConfigPatch.put("foo1", "baz1");
Callback<Herder.Created<ConnectorInfo>> patchCallback = mock(Callback.class);
herder.patchConnectorConfig(CONNECTOR_NAME, connConfigPatch, patchCallback);
ArgumentCaptor<NotFoundException> exceptionCaptor = ArgumentCaptor.forClass(NotFoundException.class);
verify(patchCallback).onCompletion(exceptionCaptor.capture(), isNull());
assertEquals(exceptionCaptor.getValue().getMessage(), "Connector " + CONNECTOR_NAME + " not found");
}
@Test
public void testPatchConnectorConfig() throws ExecutionException, InterruptedException, TimeoutException {
// Create the connector.
Map<String, String> originalConnConfig = connectorConfig(SourceSink.SOURCE);
originalConnConfig.put("foo0", "unaffected");
originalConnConfig.put("foo1", "will-be-changed");
originalConnConfig.put("foo2", "will-be-removed");
Map<String, String> connConfigPatch = new HashMap<>();
connConfigPatch.put("foo1", "changed");
connConfigPatch.put("foo2", null);
connConfigPatch.put("foo3", "added");
Map<String, String> patchedConnConfig = new HashMap<>(originalConnConfig);
patchedConnConfig.put("foo0", "unaffected");
patchedConnConfig.put("foo1", "changed");
patchedConnConfig.remove("foo2");
patchedConnConfig.put("foo3", "added");
expectAdd(SourceSink.SOURCE);
expectConfigValidation(SourceSink.SOURCE, originalConnConfig, patchedConnConfig);
expectConnectorStartingWithoutTasks(originalConnConfig, SourceSink.SOURCE);
herder.putConnectorConfig(CONNECTOR_NAME, originalConnConfig, false, createCallback);
createCallback.get(1000L, TimeUnit.SECONDS);
expectConnectorStartingWithoutTasks(patchedConnConfig, SourceSink.SOURCE);
FutureCallback<Herder.Created<ConnectorInfo>> patchCallback = new FutureCallback<>();
herder.patchConnectorConfig(CONNECTOR_NAME, connConfigPatch, patchCallback);
Map<String, String> returnedConfig = patchCallback.get(1000L, TimeUnit.SECONDS).result().config();
assertEquals(patchedConnConfig, returnedConfig);
// Also check the returned config when requested.
FutureCallback<Map<String, String>> configCallback = new FutureCallback<>();
herder.connectorConfig(CONNECTOR_NAME, configCallback);
Map<String, String> returnedConfig2 = configCallback.get(1000L, TimeUnit.SECONDS);
assertEquals(patchedConnConfig, returnedConfig2);
}
private void expectConnectorStartingWithoutTasks(Map<String, String> config, SourceSink sourceSink) {
doNothing().when(worker).stopAndAwaitConnector(CONNECTOR_NAME);
final ArgumentCaptor<Callback<TargetState>> onStart = ArgumentCaptor.forClass(Callback.class);
doAnswer(invocation -> {
onStart.getValue().onCompletion(null, TargetState.STARTED);
return true;
}).when(worker).startConnector(eq(CONNECTOR_NAME), any(Map.class), any(),
eq(herder), eq(TargetState.STARTED), onStart.capture());
ConnectorConfig connConfig = sourceSink == SourceSink.SOURCE ?
new SourceConnectorConfig(plugins, config, true) :
new SinkConnectorConfig(plugins, config);
when(worker.connectorTaskConfigs(CONNECTOR_NAME, connConfig))
.thenReturn(emptyList());
}
@Test
public void testPutTaskConfigs() {
Callback<Void> cb = mock(Callback.class);

View File

@ -169,4 +169,24 @@ public class ConnectUtilsTest {
assertEquals(expectedClientIdBase, actualClientIdBase);
}
@Test
public void testPatchConfig() {
HashMap<String, String> config = new HashMap<>();
config.put("unaffected-key", "unaffected-value");
config.put("to-be-changed-key", "to-be-changed-value-old");
config.put("to-be-deleted-key", "to-be-deleted-value");
HashMap<String, String> patch = new HashMap<>();
patch.put("to-be-changed-key", "to-be-changed-value-new");
patch.put("to-be-deleted-key", null);
patch.put("to-be-added-key", "to-be-added-value");
HashMap<String, String> expectedResult = new HashMap<>();
expectedResult.put("unaffected-key", "unaffected-value");
expectedResult.put("to-be-changed-key", "to-be-changed-value-new");
expectedResult.put("to-be-added-key", "to-be-added-value");
Map<String, String> result = ConnectUtils.patchConfig(config, patch);
assertEquals(expectedResult, result);
}
}

View File

@ -271,6 +271,43 @@ abstract class EmbeddedConnect {
"Could not execute PUT request. Error response: " + responseToString(response));
}
/**
* Patch the config of a connector.
*
* @param connName the name of the connector
* @param connConfigPatch the configuration patch
* @throws ConnectRestException if the REST API returns error status
* @throws ConnectException if the configuration fails to be serialized or if the request could not be sent
*/
public String patchConnectorConfig(String connName, Map<String, String> connConfigPatch) {
String url = endpointForResource(String.format("connectors/%s/config", connName));
return doPatchConnectorConfig(url, connConfigPatch);
}
/**
* Execute a PATCH request with the given connector configuration on the given URL endpoint.
*
* @param url the full URL of the endpoint that corresponds to the given REST resource
* @param connConfigPatch the configuration patch
* @throws ConnectRestException if the REST api returns error status
* @throws ConnectException if the configuration fails to be serialized or if the request could not be sent
*/
protected String doPatchConnectorConfig(String url, Map<String, String> connConfigPatch) {
ObjectMapper mapper = new ObjectMapper();
String content;
try {
content = mapper.writeValueAsString(connConfigPatch);
} catch (IOException e) {
throw new ConnectException("Could not serialize connector configuration and execute PUT request");
}
Response response = requestPatch(url, content);
if (response.getStatus() < Response.Status.BAD_REQUEST.getStatusCode()) {
return responseToString(response);
}
throw new ConnectRestException(response.getStatus(),
"Could not execute PATCH request. Error response: " + responseToString(response));
}
/**
* Delete an existing connector.
*

View File

@ -53,14 +53,14 @@ public class ClusterConfig {
private final Map<String, String> adminClientProperties;
private final Map<String, String> saslServerProperties;
private final Map<String, String> saslClientProperties;
private final Map<Integer, Map<String, String>> perBrokerOverrideProperties;
private final Map<Integer, Map<String, String>> perServerProperties;
@SuppressWarnings("checkstyle:ParameterNumber")
private ClusterConfig(Type type, int brokers, int controllers, int disksPerBroker, String name, boolean autoStart,
SecurityProtocol securityProtocol, String listenerName, File trustStoreFile,
MetadataVersion metadataVersion, Map<String, String> serverProperties, Map<String, String> producerProperties,
Map<String, String> consumerProperties, Map<String, String> adminClientProperties, Map<String, String> saslServerProperties,
Map<String, String> saslClientProperties, Map<Integer, Map<String, String>> perBrokerOverrideProperties) {
Map<String, String> saslClientProperties, Map<Integer, Map<String, String>> perServerProperties) {
// do fail fast. the following values are invalid for both zk and kraft modes.
if (brokers < 0) throw new IllegalArgumentException("Number of brokers must be greater or equal to zero.");
if (controllers < 0) throw new IllegalArgumentException("Number of controller must be greater or equal to zero.");
@ -82,7 +82,7 @@ public class ClusterConfig {
this.adminClientProperties = Objects.requireNonNull(adminClientProperties);
this.saslServerProperties = Objects.requireNonNull(saslServerProperties);
this.saslClientProperties = Objects.requireNonNull(saslClientProperties);
this.perBrokerOverrideProperties = Objects.requireNonNull(perBrokerOverrideProperties);
this.perServerProperties = Objects.requireNonNull(perServerProperties);
}
public Type clusterType() {
@ -149,8 +149,8 @@ public class ClusterConfig {
return metadataVersion;
}
public Map<Integer, Map<String, String>> perBrokerOverrideProperties() {
return perBrokerOverrideProperties;
public Map<Integer, Map<String, String>> perServerOverrideProperties() {
return perServerProperties;
}
public Map<String, String> nameTags() {
@ -195,7 +195,7 @@ public class ClusterConfig {
.setAdminClientProperties(clusterConfig.adminClientProperties)
.setSaslServerProperties(clusterConfig.saslServerProperties)
.setSaslClientProperties(clusterConfig.saslClientProperties)
.setPerBrokerProperties(clusterConfig.perBrokerOverrideProperties);
.setPerServerProperties(clusterConfig.perServerProperties);
}
public static class Builder {
@ -215,7 +215,7 @@ public class ClusterConfig {
private Map<String, String> adminClientProperties = Collections.emptyMap();
private Map<String, String> saslServerProperties = Collections.emptyMap();
private Map<String, String> saslClientProperties = Collections.emptyMap();
private Map<Integer, Map<String, String>> perBrokerOverrideProperties = Collections.emptyMap();
private Map<Integer, Map<String, String>> perServerProperties = Collections.emptyMap();
private Builder() {}
@ -299,9 +299,9 @@ public class ClusterConfig {
return this;
}
public Builder setPerBrokerProperties(Map<Integer, Map<String, String>> perBrokerOverrideProperties) {
this.perBrokerOverrideProperties = Collections.unmodifiableMap(
perBrokerOverrideProperties.entrySet().stream()
public Builder setPerServerProperties(Map<Integer, Map<String, String>> perServerProperties) {
this.perServerProperties = Collections.unmodifiableMap(
perServerProperties.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> Collections.unmodifiableMap(new HashMap<>(e.getValue())))));
return this;
}
@ -309,8 +309,7 @@ public class ClusterConfig {
public ClusterConfig build() {
return new ClusterConfig(type, brokers, controllers, disksPerBroker, name, autoStart, securityProtocol, listenerName,
trustStoreFile, metadataVersion, serverProperties, producerProperties, consumerProperties,
adminClientProperties, saslServerProperties, saslClientProperties,
perBrokerOverrideProperties);
adminClientProperties, saslServerProperties, saslClientProperties, perServerProperties);
}
}
}

View File

@ -62,7 +62,7 @@ public class ClusterConfigTest {
.setAdminClientProperties(Collections.singletonMap("admin_client", "admin_client_value"))
.setSaslClientProperties(Collections.singletonMap("sasl_client", "sasl_client_value"))
.setSaslServerProperties(Collections.singletonMap("sasl_server", "sasl_server_value"))
.setPerBrokerProperties(Collections.singletonMap(0, Collections.singletonMap("broker_0", "broker_0_value")))
.setPerServerProperties(Collections.singletonMap(0, Collections.singletonMap("broker_0", "broker_0_value")))
.build();
Map<String, Object> clusterConfigFields = fields(clusterConfig);

View File

@ -26,6 +26,9 @@ import kafka.test.annotation.ClusterTests;
import kafka.test.annotation.Type;
import kafka.test.junit.ClusterTestExtensions;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.clients.admin.DescribeLogDirsResult;
import org.apache.kafka.clients.consumer.GroupProtocol;
import org.apache.kafka.server.common.MetadataVersion;
@ -33,13 +36,13 @@ import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.extension.ExtendWith;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import static org.apache.kafka.clients.consumer.GroupProtocol.CLASSIC;
import static org.apache.kafka.clients.consumer.GroupProtocol.CONSUMER;
import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG;
@ -49,6 +52,7 @@ import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.NEW_GROU
@Disabled
@ClusterTestDefaults(clusterType = Type.ZK, serverProperties = {
@ClusterConfigProperty(key = "default.key", value = "default.value"),
@ClusterConfigProperty(id = 0, key = "queued.max.requests", value = "100"),
}) // Set defaults for a few params in @ClusterTest(s)
@ExtendWith(ClusterTestExtensions.class)
public class ClusterTestExtensionsTest {
@ -91,30 +95,59 @@ public class ClusterTestExtensionsTest {
@ClusterTests({
@ClusterTest(name = "cluster-tests-1", clusterType = Type.ZK, serverProperties = {
@ClusterConfigProperty(key = "foo", value = "bar"),
@ClusterConfigProperty(key = "spam", value = "eggs")
@ClusterConfigProperty(key = "spam", value = "eggs"),
@ClusterConfigProperty(id = 86400, key = "baz", value = "qux"), // this one will be ignored as there is no broker id is 86400
}),
@ClusterTest(name = "cluster-tests-2", clusterType = Type.KRAFT, serverProperties = {
@ClusterConfigProperty(key = "foo", value = "baz"),
@ClusterConfigProperty(key = "spam", value = "eggz"),
@ClusterConfigProperty(key = "default.key", value = "overwrite.value")
@ClusterConfigProperty(key = "default.key", value = "overwrite.value"),
@ClusterConfigProperty(id = 0, key = "queued.max.requests", value = "200"),
@ClusterConfigProperty(id = 3000, key = "queued.max.requests", value = "300")
}),
@ClusterTest(name = "cluster-tests-3", clusterType = Type.CO_KRAFT, serverProperties = {
@ClusterConfigProperty(key = "foo", value = "baz"),
@ClusterConfigProperty(key = "spam", value = "eggz"),
@ClusterConfigProperty(key = "default.key", value = "overwrite.value")
@ClusterConfigProperty(key = "default.key", value = "overwrite.value"),
@ClusterConfigProperty(id = 0, key = "queued.max.requests", value = "200")
})
})
public void testClusterTests() {
if (clusterInstance.clusterType().equals(ClusterInstance.ClusterType.ZK)) {
public void testClusterTests() throws ExecutionException, InterruptedException {
if (!clusterInstance.isKRaftTest()) {
Assertions.assertEquals("bar", clusterInstance.config().serverProperties().get("foo"));
Assertions.assertEquals("eggs", clusterInstance.config().serverProperties().get("spam"));
Assertions.assertEquals("default.value", clusterInstance.config().serverProperties().get("default.key"));
} else if (clusterInstance.clusterType().equals(ClusterInstance.ClusterType.RAFT)) {
// assert broker server 0 contains property queued.max.requests 100 from ClusterTestDefaults
try (Admin admin = clusterInstance.createAdminClient()) {
ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER, "0");
Map<ConfigResource, Config> configs = admin.describeConfigs(Collections.singletonList(configResource)).all().get();
Assertions.assertEquals(1, configs.size());
Assertions.assertEquals("100", configs.get(configResource).get("queued.max.requests").value());
}
} else {
Assertions.assertEquals("baz", clusterInstance.config().serverProperties().get("foo"));
Assertions.assertEquals("eggz", clusterInstance.config().serverProperties().get("spam"));
Assertions.assertEquals("overwrite.value", clusterInstance.config().serverProperties().get("default.key"));
} else {
Assertions.fail("Unknown cluster type " + clusterInstance.clusterType());
// assert broker server 0 contains property queued.max.requests 200 from ClusterTest which overrides
// the value 100 in server property in ClusterTestDefaults
try (Admin admin = clusterInstance.createAdminClient()) {
ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER, "0");
Map<ConfigResource, Config> configs = admin.describeConfigs(Collections.singletonList(configResource)).all().get();
Assertions.assertEquals(1, configs.size());
Assertions.assertEquals("200", configs.get(configResource).get("queued.max.requests").value());
}
// In KRaft cluster non-combined mode, assert the controller server 3000 contains the property queued.max.requests 300
if (clusterInstance.config().clusterType() == Type.KRAFT) {
try (Admin admin = Admin.create(Collections.singletonMap(
AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG, clusterInstance.bootstrapControllers()))) {
ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER, "3000");
Map<ConfigResource, Config> configs = admin.describeConfigs(Collections.singletonList(configResource)).all().get();
Assertions.assertEquals(1, configs.size());
Assertions.assertEquals("300", configs.get(configResource).get("queued.max.requests").value());
}
}
}
}

View File

@ -27,6 +27,27 @@ import java.lang.annotation.Target;
@Target({ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface ClusterConfigProperty {
/**
* The config applies to the controller/broker with specified id. Default is -1, indicating the property applied to
* all controller/broker servers. Note that the "controller" here refers to the KRaft quorum controller.
* The id can vary depending on the different {@link kafka.test.annotation.Type}.
* <ul>
* <li> Under {@link kafka.test.annotation.Type#ZK}, the broker id starts from
* {@link kafka.testkit.TestKitNodes#BROKER_ID_OFFSET 0} and increases by 1
* with each additional broker, and there is no controller server under this mode. </li>
* <li> Under {@link kafka.test.annotation.Type#KRAFT}, the broker id starts from
* {@link kafka.testkit.TestKitNodes#BROKER_ID_OFFSET 0}, the controller id
* starts from {@link kafka.testkit.TestKitNodes#CONTROLLER_ID_OFFSET 3000}
* and increases by 1 with each addition broker/controller.</li>
* <li> Under {@link kafka.test.annotation.Type#CO_KRAFT}, the broker id and controller id both start from
* {@link kafka.testkit.TestKitNodes#BROKER_ID_OFFSET 0}
* and increases by 1 with each additional broker/controller.</li>
* </ul>
*
* If the id doesn't correspond to any broker/controller server, throw IllegalArgumentException
* @return the controller/broker id
*/
int id() default -1;
String key();
String value();
}

View File

@ -33,11 +33,13 @@ import org.junit.platform.commons.util.ReflectionUtils;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
@ -141,14 +143,15 @@ public class ClusterTestExtensions implements TestTemplateInvocationContextProvi
private void processClusterTest(ExtensionContext context, ClusterTest annot, ClusterTestDefaults defaults,
Consumer<TestTemplateInvocationContext> testInvocations) {
Type type = annot.clusterType() == Type.DEFAULT ? defaults.clusterType() : annot.clusterType();
Map<String, String> serverProperties = Stream.concat(Arrays.stream(defaults.serverProperties()), Arrays.stream(annot.serverProperties()))
.filter(e -> e.id() == -1)
.collect(Collectors.toMap(ClusterConfigProperty::key, ClusterConfigProperty::value, (a, b) -> b));
Map<Integer, Map<String, String>> perServerProperties = Stream.concat(Arrays.stream(defaults.serverProperties()), Arrays.stream(annot.serverProperties()))
.filter(e -> e.id() != -1)
.collect(Collectors.groupingBy(ClusterConfigProperty::id, Collectors.mapping(Function.identity(),
Collectors.toMap(ClusterConfigProperty::key, ClusterConfigProperty::value, (a, b) -> b))));
Map<String, String> serverProperties = new HashMap<>();
for (ClusterConfigProperty property : defaults.serverProperties()) {
serverProperties.put(property.key(), property.value());
}
for (ClusterConfigProperty property : annot.serverProperties()) {
serverProperties.put(property.key(), property.value());
}
ClusterConfig config = ClusterConfig.builder()
.setType(type)
.setBrokers(annot.brokers() == 0 ? defaults.brokers() : annot.brokers())
@ -158,6 +161,7 @@ public class ClusterTestExtensions implements TestTemplateInvocationContextProvi
.setName(annot.name().trim().isEmpty() ? null : annot.name())
.setListenerName(annot.listener().trim().isEmpty() ? null : annot.listener())
.setServerProperties(serverProperties)
.setPerServerProperties(perServerProperties)
.setSecurityProtocol(annot.securityProtocol())
.setMetadataVersion(annot.metadataVersion())
.build();

View File

@ -94,8 +94,8 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte
setBootstrapMetadataVersion(clusterConfig.metadataVersion()).
setCombined(isCombined).
setNumBrokerNodes(clusterConfig.numBrokers()).
setPerServerProperties(clusterConfig.perServerOverrideProperties()).
setNumDisksPerBroker(clusterConfig.numDisksPerBroker()).
setPerBrokerProperties(clusterConfig.perBrokerOverrideProperties()).
setNumControllerNodes(clusterConfig.numControllers()).build();
KafkaClusterTestKit.Builder builder = new KafkaClusterTestKit.Builder(nodes);
@ -104,7 +104,7 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte
builder.setConfigProp("zookeeper.connect", String.format("localhost:%d", zkReference.get().port()));
}
// Copy properties into the TestKit builder
clusterConfig.serverProperties().forEach((key, value) -> builder.setConfigProp(key.toString(), value.toString()));
clusterConfig.serverProperties().forEach(builder::setConfigProp);
// KAFKA-12512 need to pass security protocol and listener name here
KafkaClusterTestKit cluster = builder.build();
clusterReference.set(cluster);

View File

@ -301,7 +301,7 @@ public class ZkClusterInvocationContext implements TestTemplateInvocationContext
public void modifyConfigs(Seq<Properties> props) {
super.modifyConfigs(props);
for (int i = 0; i < props.length(); i++) {
props.apply(i).putAll(clusterConfig.perBrokerOverrideProperties().getOrDefault(i, Collections.emptyMap()));
props.apply(i).putAll(clusterConfig.perServerOverrideProperties().getOrDefault(i, Collections.emptyMap()));
}
}

View File

@ -87,10 +87,10 @@ public class BrokerNode implements TestKitNode {
Objects.requireNonNull(baseDirectory);
Objects.requireNonNull(clusterId);
if (id == -1) {
throw new RuntimeException("You must set the node id.");
throw new IllegalArgumentException("You must set the node id.");
}
if (numLogDirectories < 1) {
throw new RuntimeException("The value of numLogDirectories should be at least 1.");
throw new IllegalArgumentException("The value of numLogDirectories should be at least 1.");
}
List<String> logDataDirectories = IntStream
.range(0, numLogDirectories)

View File

@ -26,13 +26,13 @@ public class BrokerNodeTest {
@Test
public void testInvalidBuilder() {
Assertions.assertEquals("You must set the node id.",
Assertions.assertThrows(RuntimeException.class, () -> BrokerNode.builder()
Assertions.assertThrows(IllegalArgumentException.class, () -> BrokerNode.builder()
.setBaseDirectory("foo")
.setClusterId(Uuid.randomUuid())
.build()).getMessage());
Assertions.assertEquals("The value of numLogDirectories should be at least 1.",
Assertions.assertThrows(RuntimeException.class, () -> BrokerNode.builder()
Assertions.assertThrows(IllegalArgumentException.class, () -> BrokerNode.builder()
.setBaseDirectory("foo")
.setClusterId(Uuid.randomUuid())
.setId(0)

View File

@ -23,6 +23,9 @@ import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble;
import org.apache.kafka.metadata.properties.MetaPropertiesVersion;
import java.io.File;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
@ -36,6 +39,7 @@ public class ControllerNode implements TestKitNode {
private String baseDirectory;
private Uuid clusterId;
private boolean combined;
private Map<String, String> propertyOverrides = Collections.emptyMap();
private Builder() {}
@ -63,12 +67,17 @@ public class ControllerNode implements TestKitNode {
return this;
}
public Builder setPropertyOverrides(Map<String, String> propertyOverrides) {
this.propertyOverrides = Collections.unmodifiableMap(new HashMap<>(propertyOverrides));
return this;
}
public ControllerNode build() {
if (id == -1) {
throw new RuntimeException("You must set the node id.");
throw new IllegalArgumentException("You must set the node id.");
}
if (baseDirectory == null) {
throw new RuntimeException("You must set the base directory.");
throw new IllegalArgumentException("You must set the base directory.");
}
String metadataDirectory = new File(baseDirectory,
combined ? String.format("combined_%d_0", id) : String.format("controller_%d", id)).getAbsolutePath();
@ -81,7 +90,7 @@ public class ControllerNode implements TestKitNode {
setNodeId(id).
setDirectoryId(copier.generateValidDirectoryId()).
build());
return new ControllerNode(copier.copy(), combined);
return new ControllerNode(copier.copy(), combined, propertyOverrides);
}
}
@ -89,12 +98,16 @@ public class ControllerNode implements TestKitNode {
private final boolean combined;
private final Map<String, String> propertyOverrides;
private ControllerNode(
MetaPropertiesEnsemble initialMetaPropertiesEnsemble,
boolean combined
boolean combined,
Map<String, String> propertyOverrides
) {
this.initialMetaPropertiesEnsemble = Objects.requireNonNull(initialMetaPropertiesEnsemble);
this.combined = combined;
this.propertyOverrides = Objects.requireNonNull(propertyOverrides);
}
@Override
@ -106,4 +119,8 @@ public class ControllerNode implements TestKitNode {
public boolean combined() {
return combined;
}
public Map<String, String> propertyOverrides() {
return propertyOverrides;
}
}

View File

@ -209,6 +209,10 @@ public class KafkaClusterTestKit implements AutoCloseable {
if (brokerNode != null) {
props.putAll(brokerNode.propertyOverrides());
}
// Add associated controller node property overrides
if (controllerNode != null) {
props.putAll(controllerNode.propertyOverrides());
}
props.putIfAbsent(KafkaConfig$.MODULE$.UnstableMetadataVersionsEnableProp(), "true");
props.putIfAbsent(KafkaConfig$.MODULE$.UnstableApiVersionsEnableProp(), "true");
return new KafkaConfig(props, false, Option.empty());

View File

@ -23,7 +23,10 @@ import org.junit.jupiter.params.provider.ValueSource;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
@ -35,7 +38,7 @@ public class KafkaClusterTestKitTest {
@ParameterizedTest
@ValueSource(ints = {0, -1})
public void testCreateClusterWithBadNumDisksThrows(int disks) {
RuntimeException e = assertThrowsExactly(RuntimeException.class, () -> new KafkaClusterTestKit.Builder(
IllegalArgumentException e = assertThrowsExactly(IllegalArgumentException.class, () -> new KafkaClusterTestKit.Builder(
new TestKitNodes.Builder()
.setNumBrokerNodes(1)
.setNumDisksPerBroker(disks)
@ -47,7 +50,7 @@ public class KafkaClusterTestKitTest {
@Test
public void testCreateClusterWithBadNumOfControllers() {
RuntimeException e = assertThrowsExactly(RuntimeException.class, () -> new KafkaClusterTestKit.Builder(
IllegalArgumentException e = assertThrowsExactly(IllegalArgumentException.class, () -> new KafkaClusterTestKit.Builder(
new TestKitNodes.Builder()
.setNumBrokerNodes(1)
.setNumControllerNodes(-1)
@ -58,7 +61,7 @@ public class KafkaClusterTestKitTest {
@Test
public void testCreateClusterWithBadNumOfBrokers() {
RuntimeException e = assertThrowsExactly(RuntimeException.class, () -> new KafkaClusterTestKit.Builder(
IllegalArgumentException e = assertThrowsExactly(IllegalArgumentException.class, () -> new KafkaClusterTestKit.Builder(
new TestKitNodes.Builder()
.setNumBrokerNodes(-1)
.setNumControllerNodes(1)
@ -67,6 +70,22 @@ public class KafkaClusterTestKitTest {
assertEquals("Invalid negative value for numBrokerNodes", e.getMessage());
}
@Test
public void testCreateClusterWithBadPerServerProperties() {
Map<Integer, Map<String, String>> perServerProperties = new HashMap<>();
perServerProperties.put(100, Collections.singletonMap("foo", "foo1"));
perServerProperties.put(200, Collections.singletonMap("bar", "bar1"));
IllegalArgumentException e = assertThrowsExactly(IllegalArgumentException.class, () -> new KafkaClusterTestKit.Builder(
new TestKitNodes.Builder()
.setNumBrokerNodes(1)
.setNumControllerNodes(1)
.setPerServerProperties(perServerProperties)
.build())
);
assertEquals("Unknown server id 100, 200 in perServerProperties, the existent server ids are 0, 3000", e.getMessage());
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testCreateClusterAndCloseWithMultipleLogDirs(boolean combined) {

View File

@ -32,15 +32,19 @@ import java.util.SortedMap;
import java.util.TreeMap;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
public class TestKitNodes {
public static final int CONTROLLER_ID_OFFSET = 3000;
public static final int BROKER_ID_OFFSET = 0;
public static class Builder {
private boolean combined;
private Uuid clusterId;
private int numControllerNodes;
private int numBrokerNodes;
private int numDisksPerBroker = 1;
private Map<Integer, Map<String, String>> perBrokerProperties = Collections.emptyMap();
private Map<Integer, Map<String, String>> perServerProperties = Collections.emptyMap();
private BootstrapMetadata bootstrapMetadata = BootstrapMetadata.
fromVersion(MetadataVersion.latestTesting(), "testkit");
@ -79,22 +83,22 @@ public class TestKitNodes {
return this;
}
public Builder setPerBrokerProperties(Map<Integer, Map<String, String>> perBrokerProperties) {
this.perBrokerProperties = Collections.unmodifiableMap(
perBrokerProperties.entrySet().stream()
public Builder setPerServerProperties(Map<Integer, Map<String, String>> perServerProperties) {
this.perServerProperties = Collections.unmodifiableMap(
perServerProperties.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> Collections.unmodifiableMap(new HashMap<>(e.getValue())))));
return this;
}
public TestKitNodes build() {
if (numControllerNodes < 0) {
throw new RuntimeException("Invalid negative value for numControllerNodes");
throw new IllegalArgumentException("Invalid negative value for numControllerNodes");
}
if (numBrokerNodes < 0) {
throw new RuntimeException("Invalid negative value for numBrokerNodes");
throw new IllegalArgumentException("Invalid negative value for numBrokerNodes");
}
if (numDisksPerBroker <= 0) {
throw new RuntimeException("Invalid value for numDisksPerBroker");
throw new IllegalArgumentException("Invalid value for numDisksPerBroker");
}
String baseDirectory = TestUtils.tempDirectory().getAbsolutePath();
@ -102,13 +106,28 @@ public class TestKitNodes {
clusterId = Uuid.randomUuid();
}
List<Integer> controllerNodeIds = IntStream.range(startControllerId(), startControllerId() + numControllerNodes)
int controllerId = combined ? BROKER_ID_OFFSET : BROKER_ID_OFFSET + CONTROLLER_ID_OFFSET;
List<Integer> controllerNodeIds = IntStream.range(controllerId, controllerId + numControllerNodes)
.boxed()
.collect(Collectors.toList());
List<Integer> brokerNodeIds = IntStream.range(startBrokerId(), startBrokerId() + numBrokerNodes)
List<Integer> brokerNodeIds = IntStream.range(BROKER_ID_OFFSET, BROKER_ID_OFFSET + numBrokerNodes)
.boxed()
.collect(Collectors.toList());
String unknownIds = perServerProperties.keySet().stream()
.filter(id -> !controllerNodeIds.contains(id))
.filter(id -> !brokerNodeIds.contains(id))
.map(Object::toString)
.collect(Collectors.joining(", "));
if (!unknownIds.isEmpty()) {
throw new IllegalArgumentException(
String.format("Unknown server id %s in perServerProperties, the existent server ids are %s",
unknownIds,
Stream.concat(brokerNodeIds.stream(), controllerNodeIds.stream())
.map(Object::toString)
.collect(Collectors.joining(", "))));
}
TreeMap<Integer, ControllerNode> controllerNodes = new TreeMap<>();
for (int id : controllerNodeIds) {
ControllerNode controllerNode = ControllerNode.builder()
@ -116,6 +135,7 @@ public class TestKitNodes {
.setBaseDirectory(baseDirectory)
.setClusterId(clusterId)
.setCombined(brokerNodeIds.contains(id))
.setPropertyOverrides(perServerProperties.getOrDefault(id, Collections.emptyMap()))
.build();
controllerNodes.put(id, controllerNode);
}
@ -128,7 +148,7 @@ public class TestKitNodes {
.setBaseDirectory(baseDirectory)
.setClusterId(clusterId)
.setCombined(controllerNodeIds.contains(id))
.setPropertyOverrides(perBrokerProperties.getOrDefault(id, Collections.emptyMap()))
.setPropertyOverrides(perServerProperties.getOrDefault(id, Collections.emptyMap()))
.build();
brokerNodes.put(id, brokerNode);
}
@ -139,17 +159,6 @@ public class TestKitNodes {
controllerNodes,
brokerNodes);
}
private int startBrokerId() {
return 0;
}
private int startControllerId() {
if (combined) {
return startBrokerId();
}
return startBrokerId() + 3000;
}
}
private final String baseDirectory;

View File

@ -49,8 +49,8 @@ object ProducerIdsIntegrationTest {
.setBrokers(3)
.setAutoStart(false)
.setServerProperties(serverProperties)
.setPerBrokerProperties(perBrokerProperties)
.build());
.setPerServerProperties(perBrokerProperties)
.build())
}
}

View File

@ -402,7 +402,7 @@ class KRaftClusterTest {
val nodes = new TestKitNodes.Builder()
.setNumControllerNodes(1)
.setNumBrokerNodes(3)
.setPerBrokerProperties(brokerPropertyOverrides)
.setPerServerProperties(brokerPropertyOverrides)
.build()
doOnStartedKafkaCluster(nodes) { implicit cluster =>
@ -430,7 +430,7 @@ class KRaftClusterTest {
.setNumControllerNodes(1)
.setNumBrokerNodes(3)
.setNumDisksPerBroker(1)
.setPerBrokerProperties(brokerPropertyOverrides)
.setPerServerProperties(brokerPropertyOverrides)
.build()
doOnStartedKafkaCluster(nodes) { implicit cluster =>

View File

@ -291,6 +291,7 @@ predicates.IsBar.pattern=bar</code></pre>
<li><code>GET /connectors/{name}</code> - get information about a specific connector</li>
<li><code>GET /connectors/{name}/config</code> - get the configuration parameters for a specific connector</li>
<li><code>PUT /connectors/{name}/config</code> - update the configuration parameters for a specific connector</li>
<li><code>PATCH /connectors/{name}/config</code> - patch the configuration parameters for a specific connector, where <code>null</code> values in the JSON body indicates removing of the key from the final configuration</li>
<li><code>GET /connectors/{name}/status</code> - get current status of the connector, including if it is running, failed, paused, etc., which worker it is assigned to, error information if it has failed, and the state of all its tasks</li>
<li><code>GET /connectors/{name}/tasks</code> - get a list of tasks currently running for a connector along with their configurations</li>
<li><code>GET /connectors/{name}/tasks-config</code> - get the configuration of all tasks for a specific connector. This endpoint is deprecated and will be removed in the next major release. Please use the <code>GET /connectors/{name}/tasks</code> endpoint instead. Note that the response structures of the two endpoints differ slightly, please refer to the <a href="/{{version}}/generated/connect_rest.yaml">OpenAPI documentation</a> for more details</li>

View File

@ -2219,6 +2219,48 @@ bin/kafka-acls.sh --bootstrap-server localhost:9092 --command-config /tmp/adminc
<td>Group</td>
<td></td>
</tr>
<tr>
<td>CONSUMER_GROUP_DESCRIBE (69)</td>
<td>Read</td>
<td>Group</td>
<td></td>
</tr>
<tr>
<td>CONTROLLER_REGISTRATION (70)</td>
<td>ClusterAction</td>
<td>Cluster</td>
<td></td>
</tr>
<tr>
<td>GET_TELEMETRY_SUBSCRIPTIONS (71)</td>
<td></td>
<td></td>
<td>No authorization check is performed for this request.</td>
</tr>
<tr>
<td>PUSH_TELEMETRY (72)</td>
<td></td>
<td></td>
<td>No authorization check is performed for this request.</td>
</tr>
<tr>
<td>ASSIGN_REPLICAS_TO_DIRS (73)</td>
<td>ClusterAction</td>
<td>Cluster</td>
<td></td>
</tr>
<tr>
<td>LIST_CLIENT_METRICS_RESOURCES (74)</td>
<td>DescribeConfigs</td>
<td>Cluster</td>
<td></td>
</tr>
<tr>
<td>DESCRIBE_TOPIC_PARTITIONS (75)</td>
<td>Describe</td>
<td>Topic</td>
<td></td>
</tr>
</tbody>
</table>

View File

@ -75,8 +75,8 @@ settings.put(... , ...);</code></pre>
<li><a class="reference internal" href="#default-production-exception-handler" id="id24">default.production.exception.handler</a></li>
<li><a class="reference internal" href="#timestamp-extractor" id="id15">default.timestamp.extractor</a></li>
<li><a class="reference internal" href="#default-value-serde" id="id9">default.value.serde</a></li>
<li><a class="reference internal" href="#default-windowed-key-serde-inner" id="id32">default.windowed.key.serde.inner</a></li>
<li><a class="reference internal" href="#default-windowed-value-serde-inner" id="id33">default.windowed.value.serde.inner</a></li>
<li><a class="reference internal" href="#default-windowed-key-serde-inner" id="id32">default.windowed.key.serde.inner (deprecated) </a></li>
<li><a class="reference internal" href="#default-windowed-value-serde-inner" id="id33">default.windowed.value.serde.inner (deprecated) </a></li>
<li><a class="reference internal" href="#max-task-idle-ms" id="id28">max.task.idle.ms</a></li>
<li><a class="reference internal" href="#max-warmup-replicas" id="id29">max.warmup.replicas</a></li>
<li><a class="reference internal" href="#num-standby-replicas" id="id10">num.standby.replicas</a></li>
@ -91,6 +91,7 @@ settings.put(... , ...);</code></pre>
<li><a class="reference internal" href="#rocksdb-config-setter" id="id20">rocksdb.config.setter</a></li>
<li><a class="reference internal" href="#state-dir" id="id14">state.dir</a></li>
<li><a class="reference internal" href="#topology-optimization" id="id31">topology.optimization</a></li>
<li><a class="reference internal" href="#windowed-inner-class-serde" id="id38">windowed.inner.class.serde</a></li>
</ul>
</li>
<li><a class="reference internal" href="#kafka-consumers-and-producer-configuration-parameters" id="id16">Kafka consumers and producer configuration parameters</a>
@ -159,29 +160,29 @@ settings.put(... , ...);</code></pre>
<p>There are several Kafka and Kafka Streams configuration options that need to be configured explicitly for resiliency in face of broker failures:</p>
<table border="1" class="non-scrolling-table docutils">
<thead valign="bottom">
<tr class="row-odd"><th class="head">Parameter Name</th>
<tr class="row-even"><th class="head">Parameter Name</th>
<th class="head">Corresponding Client</th>
<th class="head">Default value</th>
<th class="head">Consider setting to</th>
</tr>
</thead>
<tbody valign="top">
<tr class="row-even"><td>acks</td>
<tr class="row-odd"><td>acks</td>
<td>Producer</td>
<td><code class="docutils literal"><span class="pre">acks=1</span></code></td>
<td><code class="docutils literal"><span class="pre">acks=all</span></code></td>
</tr>
<tr class="row-odd"><td>replication.factor (for broker version 2.3 or older)/td>
<tr class="row-even"><td>replication.factor (for broker version 2.3 or older)/td>
<td>Streams</td>
<td><code class="docutils literal"><span class="pre">-1</span></code></td>
<td><code class="docutils literal"><span class="pre">3</span></code></td>
</tr>
<tr class="row-even"><td>min.insync.replicas</td>
<tr class="row-odd"><td>min.insync.replicas</td>
<td>Broker</td>
<td><code class="docutils literal"><span class="pre">1</span></code></td>
<td><code class="docutils literal"><span class="pre">2</span></code></td>
</tr>
<tr class="row-odd"><td>num.standby.replicas</td>
<tr class="row-even"><td>num.standby.replicas</td>
<td>Streams</td>
<td><code class="docutils literal"><span class="pre">0</span></code></td>
<td><code class="docutils literal"><span class="pre">1</span></code></td>
@ -241,24 +242,29 @@ streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);</code></pre>
</tr>
</thead>
<tbody valign="top">
<tr class="row-odd"><td>acceptable.recovery.lag</td>
<tr class="row-even"><td>acceptable.recovery.lag</td>
<td>Medium</td>
<td colspan="2">The maximum acceptable lag (number of offsets to catch up) for an instance to be considered caught-up and ready for the active task.</td>
<td><code class="docutils literal"><span class="pre">10000</span></code></td>
</tr>
<tr class="row-even"><td>application.server</td>
<tr class="row-odd"><td>application.server</td>
<td>Low</td>
<td colspan="2">A host:port pair pointing to an embedded user defined endpoint that can be used for discovering the locations of
state stores within a single Kafka Streams application. The value of this must be different for each instance
of the application.</td>
<td>the empty string</td>
</tr>
<tr class="row-odd"><td>buffered.records.per.partition</td>
<tr class="row-even"><td>buffered.records.per.partition</td>
<td>Low</td>
<td colspan="2">The maximum number of records to buffer per partition.</td>
<td><code class="docutils literal"><span class="pre">1000</span></code></td>
</tr>
<tr class="row-even"><td>cache.max.bytes.buffering</td>
<tr class="row-odd"><td>statestore.cache.max.bytes</td>
<td>Medium</td>
<td colspan="2">Maximum number of memory bytes to be used for record caches across all threads.</td>
<td>10485760</td>
</tr>
<tr class="row-even"><td>cache.max.bytes.buffering (Deprecated. Use statestore.cache.max.bytes instead.)</td>
<td>Medium</td>
<td colspan="2">Maximum number of memory bytes to be used for record caches across all threads.</td>
<td>10485760 bytes</td>
@ -301,12 +307,12 @@ streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);</code></pre>
set by the user or all serdes must be passed in explicitly (see also default.key.serde).</td>
<td><code class="docutils literal"><span class="pre">null</span></code></td>
</tr>
<tr class="row-even"><td>default.windowed.key.serde.inner</td>
<tr class="row-even"><td>default.windowed.key.serde.inner (Deprecated. Use windowed.inner.class.serde instead.)</td>
<td>Medium</td>
<td colspan="2">Default serializer/deserializer for the inner class of windowed keys, implementing the <code class="docutils literal"><span class="pre">Serde</span></code> interface.</td>
<td><code class="docutils literal"><span class="pre">null</span></code></td>
</tr>
<tr class="row-odd"><td>default.windowed.value.serde.inner</td>
<tr class="row-odd"><td>default.windowed.value.serde.inner (Deprecated. Use windowed.inner.class.serde instead.)</td>
<td>Medium</td>
<td colspan="2">Default serializer/deserializer for the inner class of windowed values, implementing the <code class="docutils literal"><span class="pre">Serde</span></code> interface.</td>
<td><code class="docutils literal"><span class="pre">null</span></code></td>
@ -327,8 +333,9 @@ streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);</code></pre>
the <code>org.apache.kafka.streams.state.DslStoreSuppliers</code> interface.
</td>
<td><code>BuiltInDslStoreSuppliers.RocksDBDslStoreSuppliers</code></td>
<td>null</td>
</tr>
<tr class="row-even"><td>max.task.idle.ms</td>
<tr class="row-odd"><td>max.task.idle.ms</td>
<td>Medium</td>
<td colspan="2">
<p>
@ -347,37 +354,37 @@ streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);</code></pre>
</td>
<td>0 milliseconds</td>
</tr>
<tr class="row-odd"><td>max.warmup.replicas</td>
<tr class="row-even"><td>max.warmup.replicas</td>
<td>Medium</td>
<td colspan="2">The maximum number of warmup replicas (extra standbys beyond the configured num.standbys) that can be assigned at once.</td>
<td><code class="docutils literal"><span class="pre">2</span></code></td>
</tr>
<tr class="row-even"><td>metric.reporters</td>
<tr class="row-odd"><td>metric.reporters</td>
<td>Low</td>
<td colspan="2">A list of classes to use as metrics reporters.</td>
<td>the empty list</td>
</tr>
<tr class="row-odd"><td>metrics.num.samples</td>
<tr class="row-even"><td>metrics.num.samples</td>
<td>Low</td>
<td colspan="2">The number of samples maintained to compute metrics.</td>
<td><code class="docutils literal"><span class="pre">2</span></code></td>
</tr>
<tr class="row-even"><td>metrics.recording.level</td>
<tr class="row-odd"><td>metrics.recording.level</td>
<td>Low</td>
<td colspan="2">The highest recording level for metrics.</td>
<td><code class="docutils literal"><span class="pre">INFO</span></code></td>
</tr>
<tr class="row-odd"><td>metrics.sample.window.ms</td>
<tr class="row-even"><td>metrics.sample.window.ms</td>
<td>Low</td>
<td colspan="2">The window of time in milliseconds a metrics sample is computed over.</td>
<td>30000 milliseconds (30 seconds)</td>
</tr>
<tr class="row-even"><td>num.standby.replicas</td>
<tr class="row-odd"><td>num.standby.replicas</td>
<td>High</td>
<td colspan="2">The number of standby replicas for each task.</td>
<td><code class="docutils literal"><span class="pre">0</span></code></td>
</tr>
<tr class="row-odd"><td>num.stream.threads</td>
<tr class="row-even"><td>num.stream.threads</td>
<td>Medium</td>
<td colspan="2">The number of threads to execute stream processing.</td>
<td><code class="docutils literal"><span class="pre">1</span></code></td>
@ -406,7 +413,7 @@ streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);</code></pre>
clients with different tag values.</td>
<td>the empty list</td>
</tr>
<tr class="row-even"><td>replication.factor</td>
<tr class="row-odd"><td>replication.factor</td>
<td>Medium</td>
<td colspan="2">The replication factor for changelog topics and repartition topics created by the application.
The default of <code>-1</code> (meaning: use broker default replication factor) requires broker version 2.4 or newer.</td>
@ -432,22 +439,22 @@ streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);</code></pre>
<td colspan="2">Directory location for state stores.</td>
<td><code class="docutils literal"><span class="pre">/${java.io.tmpdir}/kafka-streams</span></code></td>
</tr>
<tr class="row-odd"><td>task.timeout.ms</td>
<tr class="row-even"><td>task.timeout.ms</td>
<td>Medium</td>
<td colspan="2">The maximum amount of time in milliseconds a task might stall due to internal errors and retries until an error is raised. For a timeout of <code>0 ms</code>, a task would raise an error for the first internal error. For any timeout larger than <code>0 ms</code>, a task will retry at least once before an error is raised.</td>
<td>300000 milliseconds (5 minutes)</td>
</tr>
<tr class="row-even"><td>topology.optimization</td>
<tr class="row-odd"><td>topology.optimization</td>
<td>Medium</td>
<td colspan="2">A configuration telling Kafka Streams if it should optimize the topology and what optimizations to apply. Acceptable values are: <code>StreamsConfig.NO_OPTIMIZATION</code> (<code>none</code>), <code>StreamsConfig.OPTIMIZE</code> (<code>all</code>) or a comma separated list of specific optimizations: (<code>StreamsConfig.REUSE_KTABLE_SOURCE_TOPICS</code> (<code>reuse.ktable.source.topics</code>), <code>StreamsConfig.MERGE_REPARTITION_TOPICS</code> (<code>merge.repartition.topics</code>)). </td>
<td><code>NO_OPTIMIZATION</code></td>
<td colspan="2">A configuration telling Kafka Streams if it should optimize the topology and what optimizations to apply. Acceptable values are: <code>StreamsConfig.NO_OPTIMIZATION</code> (<code>none</code>), <code>StreamsConfig.OPTIMIZE</code> (<code>all</code>) or a comma separated list of specific optimizations: (<code>StreamsConfig.REUSE_KTABLE_SOURCE_TOPICS</code> (<code>reuse.ktable.source.topics</code>), <code>StreamsConfig.MERGE_REPARTITION_TOPICS</code> (<code>merge.repartition.topics</code>)).</td>
<td><code> NO_OPTIMIZATION</code></td>
</tr>
<tr class="row-odd"><td>upgrade.from</td>
<tr class="row-even"><td>upgrade.from</td>
<td>Medium</td>
<td colspan="2">The version you are upgrading from during a rolling upgrade.</td>
<td>See <a class="reference internal" href="#streams-developer-guide-upgrade-from"><span class="std std-ref">Upgrade From</span></a></td>
</tr>
<tr class="row-even"><td>windowstore.changelog.additional.retention.ms</td>
<tr class="row-odd"><td>windowstore.changelog.additional.retention.ms</td>
<td>Low</td>
<td colspan="2">Added to a windows maintainMs to ensure data is not deleted from the log prematurely. Allows for clock drift.</td>
<td>86400000 milliseconds (1 day)</td>
@ -676,7 +683,7 @@ streamsConfiguration.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
</div></blockquote>
</div>
<div class="section" id="default-windowed-key-serde-inner">
<h4><a class="toc-backref" href="#id32">default.windowed.key.serde.inner</a><a class="headerlink" href="#default-windowed-key-serde-inner" title="Permalink to this headline"></a></h4>
<h4><a class="toc-backref" href="#id32">default.windowed.key.serde.inner</a><a class="headerlink" href="#default-windowed-key-serde-inner" title="Permalink to this headline"></a> (Deprecated.)</h4>
<blockquote>
<div><p>The default Serializer/Deserializer class for the inner class of windowed keys. Serialization and deserialization in Kafka Streams happens
whenever data needs to be materialized, for example:</p>
@ -689,7 +696,7 @@ streamsConfiguration.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
</div></blockquote>
</div>
<div class="section" id="default-windowed-value-serde-inner">
<h4><a class="toc-backref" href="#id33">default.windowed.value.serde.inner</a><a class="headerlink" href="#default-windowed-value-serde-inner" title="Permalink to this headline"></a></h4>
<h4><a class="toc-backref" href="#id33">default.windowed.value.serde.inner</a><a class="headerlink" href="#default-windowed-value-serde-inner" title="Permalink to this headline"></a>(Deprecated.)</h4>
<blockquote>
<div><p>The default Serializer/Deserializer class for the inner class of windowed values. Serialization and deserialization in Kafka Streams happens
happens whenever data needs to be materialized, for example:</p>
@ -1029,6 +1036,18 @@ streamsConfig.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, CustomRocksD
</p>
</div></blockquote>
</div>
<div class="section" id="windowed.inner.class.serde">
<h4><a class="toc-backref" href="#id31">windowed.inner.class.serde</a><a class="headerlink" href="#windowed.inner.class.serde" title="Permalink to this headline"></a></h4>
<blockquote>
<div>
<p>
Serde for the inner class of a windowed record. Must implement the org.apache.kafka.common.serialization.Serde interface.
</p>
<p>
Note that this config is only used by plain consumer/producer clients that set a windowed de/serializer type via configs. For Kafka Streams applications that deal with windowed types, you must pass in the inner serde type when you instantiate the windowed serde object for your topology.
</p>
</div></blockquote>
</div>
<div class="section" id="upgrade-from">
<span id="streams-developer-guide-upgrade-from"></span><h4><a class="toc-backref" href="#id14">upgrade.from</a><a class="headerlink" href="#upgrade-from" title="Permalink to this headline"></a></h4>
<blockquote>
@ -1120,7 +1139,7 @@ streamsSettings.put(StreamsConfig.topicPrefix("PARAMETER_NAME"), "topic-value");
<td>Consumer</td>
<td><code class="docutils literal"><span class="pre">1000</span></code></td>
</tr>
<tr class="row-even">
<tr class="row-odd">
<td>client.id</td>
<td>-</td>
<td><code class="docutils literal"><span class="pre">&lt;application.id&gt;-&lt;random-UUID&gt;</span></code></td>

View File

@ -437,7 +437,7 @@ public class StreamsConfig extends AbstractConfig {
* If you enable this feature Kafka Streams will use more resources (like broker connections)
* compared to {@link #AT_LEAST_ONCE "at_least_once"} and {@link #EXACTLY_ONCE_V2 "exactly_once_v2"}.
*
* @deprecated Since 3.0.0, will be removed in 4.0. Use {@link #EXACTLY_ONCE_V2 "exactly_once_v2"} instead.
* @deprecated since 3.0.0, will be removed in 4.0. Use {@link #EXACTLY_ONCE_V2 "exactly_once_v2"} instead.
*/
@SuppressWarnings("WeakerAccess")
@Deprecated
@ -450,7 +450,7 @@ public class StreamsConfig extends AbstractConfig {
* If you enable this feature Kafka Streams will use fewer resources (like broker connections)
* compared to the {@link #EXACTLY_ONCE} (deprecated) case.
*
* @deprecated Since 3.0.0, will be removed in 4.0. Use {@link #EXACTLY_ONCE_V2 "exactly_once_v2"} instead.
* @deprecated since 3.0.0, will be removed in 4.0. Use {@link #EXACTLY_ONCE_V2 "exactly_once_v2"} instead.
*/
@SuppressWarnings("WeakerAccess")
@Deprecated
@ -499,7 +499,8 @@ public class StreamsConfig extends AbstractConfig {
public static final String BUILT_IN_METRICS_VERSION_CONFIG = "built.in.metrics.version";
private static final String BUILT_IN_METRICS_VERSION_DOC = "Version of the built-in metrics to use.";
/** {@code cache.max.bytes.buffering} */
/** {@code cache.max.bytes.buffering}
* @deprecated since 3.4.0 Use {@link #STATESTORE_CACHE_MAX_BYTES_CONFIG "statestore.cache.max.bytes"} instead. */
@SuppressWarnings("WeakerAccess")
@Deprecated
public static final String CACHE_MAX_BYTES_BUFFERING_CONFIG = "cache.max.bytes.buffering";
@ -571,14 +572,16 @@ public class StreamsConfig extends AbstractConfig {
static final String DSL_STORE_SUPPLIERS_CLASS_DOC = "Defines which store implementations to plug in to DSL operators. Must implement the <code>org.apache.kafka.streams.state.DslStoreSuppliers</code> interface.";
static final Class<?> DSL_STORE_SUPPLIERS_CLASS_DEFAULT = BuiltInDslStoreSuppliers.RocksDBDslStoreSuppliers.class;
/** {@code default.windowed.key.serde.inner} */
/** {@code default.windowed.key.serde.inner
* @deprecated since 3.0.0 Use {@link #WINDOWED_INNER_CLASS_SERDE "windowed.inner.class.serde"} instead.} */
@SuppressWarnings("WeakerAccess")
@Deprecated
public static final String DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS = "default.windowed.key.serde.inner";
private static final String DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS_DOC = "Default serializer / deserializer for the inner class of a windowed key. Must implement the " +
"<code>org.apache.kafka.common.serialization.Serde</code> interface.";
/** {@code default.windowed.value.serde.inner} */
/** {@code default.windowed.value.serde.inner
* @deprecated since 3.0.0 Use {@link #WINDOWED_INNER_CLASS_SERDE "windowed.inner.class.serde"} instead.} */
@SuppressWarnings("WeakerAccess")
@Deprecated
public static final String DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS = "default.windowed.value.serde.inner";
@ -589,7 +592,7 @@ public class StreamsConfig extends AbstractConfig {
private static final String WINDOWED_INNER_CLASS_SERDE_DOC = " Default serializer / deserializer for the inner class of a windowed record. Must implement the " +
"<code>org.apache.kafka.common.serialization.Serde</code> interface. Note that setting this config in KafkaStreams application would result " +
"in an error as it is meant to be used only from Plain consumer client.";
/** {@code default key.serde} */
@SuppressWarnings("WeakerAccess")
public static final String DEFAULT_KEY_SERDE_CLASS_CONFIG = "default.key.serde";
@ -654,7 +657,8 @@ public class StreamsConfig extends AbstractConfig {
@SuppressWarnings("WeakerAccess")
public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG;
/** {@code auto.include.jmx.reporter} */
/** {@code auto.include.jmx.reporter
* @deprecated and will removed in 4.0.0 Use {@link JMX_REPORTER "jmx.reporter"} instead.} */
@Deprecated
public static final String AUTO_INCLUDE_JMX_REPORTER_CONFIG = CommonClientConfigs.AUTO_INCLUDE_JMX_REPORTER_CONFIG;

View File

@ -17,13 +17,14 @@
package org.apache.kafka.streams.processor.api;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.processor.Punctuator;
public interface RecordMetadata {
/**
* Return the topic name of the current input record; could be {@code null} if it is not
* available.
*
* <p> For example, if this method is invoked within a @link Punctuator#punctuate(long)
* <p> For example, if this method is invoked within a {@link Punctuator#punctuate(long)
* punctuation callback}, or while processing a record that was forwarded by a punctuation
* callback, the record won't have an associated topic.
* Another example is
@ -39,7 +40,7 @@ public interface RecordMetadata {
* Return the partition id of the current input record; could be {@code -1} if it is not
* available.
*
* <p> For example, if this method is invoked within a @link Punctuator#punctuate(long)
* <p> For example, if this method is invoked within a {@link Punctuator#punctuate(long)
* punctuation callback}, or while processing a record that was forwarded by a punctuation
* callback, the record won't have an associated partition id.
* Another example is
@ -55,7 +56,7 @@ public interface RecordMetadata {
* Return the offset of the current input record; could be {@code -1} if it is not
* available.
*
* <p> For example, if this method is invoked within a @link Punctuator#punctuate(long)
* <p> For example, if this method is invoked within a {@link Punctuator#punctuate(long)
* punctuation callback}, or while processing a record that was forwarded by a punctuation
* callback, the record won't have an associated offset.
* Another example is

View File

@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor.assignment;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
import org.apache.kafka.streams.errors.TaskAssignmentException;
/**
* A read-only metadata class representing the state of the application and the current rebalance.
* This class wraps all the input parameters to the task assignment, including the current state
* of each KafkaStreams client with at least one StreamThread participating in this rebalance, the
* assignment-related configs, and the tasks to be assigned.
*/
public interface ApplicationState {
/**
* @param computeTaskLags whether to include task lag information in the returned metadata. Note that passing
* in "true" will result in a remote call to fetch changelog topic end offsets, and you should pass in "false" unless
* you specifically need the task lag information.
*
* @return a map from the {@code processId} to {@link KafkaStreamsState} for all KafkaStreams clients in this app
*
* @throws TaskAssignmentException if a retriable error occurs while computing KafkaStreamsState metadata. Re-throw
* this exception to have Kafka Streams retry the rebalance by returning the same
* assignment and scheduling an immediate followup rebalance
*/
Map<ProcessId, KafkaStreamsState> kafkaStreamsStates(boolean computeTaskLags);
/**
* @return a simple container class with the Streams configs relevant to assignment
*/
AssignmentConfigs assignmentConfigs();
/**
* @return the set of all tasks in this topology which must be assigned
*/
Set<TaskId> allTasks();
/**
*
* @return the set of stateful and changelogged tasks in this topology
*/
Set<TaskId> statefulTasks();
/**
*
* @return the set of stateless or changelog-less tasks in this topology
*/
Set<TaskId> statelessTasks();
}

View File

@ -0,0 +1,145 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor.assignment;
import java.util.List;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.streams.StreamsConfig;
/**
* Assignment related configs for the Kafka Streams {@link TaskAssignor}.
*/
public class AssignmentConfigs {
private final long acceptableRecoveryLag;
private final int maxWarmupReplicas;
private final int numStandbyReplicas;
private final long probingRebalanceIntervalMs;
private final List<String> rackAwareAssignmentTags;
private final int rackAwareTrafficCost;
private final int rackAwareNonOverlapCost;
private final String rackAwareAssignmentStrategy;
public AssignmentConfigs(final StreamsConfig configs) {
this(
configs.getLong(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG),
configs.getInt(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG),
configs.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG),
configs.getLong(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG),
configs.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG),
configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG),
configs.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG),
configs.getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG)
);
}
public AssignmentConfigs(final long acceptableRecoveryLag,
final int maxWarmupReplicas,
final int numStandbyReplicas,
final long probingRebalanceIntervalMs,
final List<String> rackAwareAssignmentTags,
final int rackAwareTrafficCost,
final int rackAwareNonOverlapCost,
final String rackAwareAssignmentStrategy
) {
this.acceptableRecoveryLag = validated(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG, acceptableRecoveryLag);
this.maxWarmupReplicas = validated(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, maxWarmupReplicas);
this.numStandbyReplicas = validated(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, numStandbyReplicas);
this.probingRebalanceIntervalMs = validated(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG, probingRebalanceIntervalMs);
this.rackAwareAssignmentTags = validated(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG, rackAwareAssignmentTags);
this.rackAwareTrafficCost = validated(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG,
rackAwareTrafficCost
);
this.rackAwareNonOverlapCost = validated(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG,
rackAwareNonOverlapCost
);
this.rackAwareAssignmentStrategy = validated(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG,
rackAwareAssignmentStrategy
);
}
/**
* The configured acceptable recovery lag according to
* {@link StreamsConfig#ACCEPTABLE_RECOVERY_LAG_CONFIG}
*/
public long acceptableRecoveryLag() {
return acceptableRecoveryLag;
}
/**
* The maximum warmup replicas as configured via
* {@link StreamsConfig#MAX_WARMUP_REPLICAS_CONFIG}
*/
public int maxWarmupReplicas() {
return maxWarmupReplicas;
}
/**
* The number of standby replicas as configured via
* {@link StreamsConfig#NUM_STANDBY_REPLICAS_CONFIG}
*/
public int numStandbyReplicas() {
return numStandbyReplicas;
}
/**
* The probing rebalance interval in milliseconds as configured via
* {@link StreamsConfig#PROBING_REBALANCE_INTERVAL_MS_CONFIG}
*/
public long probingRebalanceIntervalMs() {
return probingRebalanceIntervalMs;
}
/**
* The rack-aware assignment tags as configured via
* {@link StreamsConfig#RACK_AWARE_ASSIGNMENT_TAGS_CONFIG}
*/
public List<String> rackAwareAssignmentTags() {
return rackAwareAssignmentTags;
}
/**
* The rack-aware assignment traffic cost as configured via
* {@link StreamsConfig#RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG}
*/
public int rackAwareTrafficCost() {
return rackAwareTrafficCost;
}
/**
* The rack-aware assignment non-overlap cost as configured via
* {@link StreamsConfig#RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG}
*/
public int rackAwareNonOverlapCost() {
return rackAwareNonOverlapCost;
}
/**
* The rack-aware assignment strategy as configured via
* {@link StreamsConfig#RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG}
*/
public String rackAwareAssignmentStrategy() {
return rackAwareAssignmentStrategy;
}
private static <T> T validated(final String configKey, final T value) {
final ConfigDef.Validator validator = StreamsConfig.configDef().configKeys().get(configKey).validator;
if (validator != null) {
validator.ensureValid(configKey, value);
}
return value;
}
}

View File

@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor.assignment;
import java.time.Instant;
import java.util.Set;
import org.apache.kafka.streams.processor.TaskId;
/**
* A simple interface for the assignor to return the desired placement of active and standby tasks on
* KafkaStreams clients.
*/
public interface KafkaStreamsAssignment {
/**
*
* @return the {@code ProcessID} associated with this {@code KafkaStreamsAssignment}
*/
ProcessId processId();
/**
*
* @return a set of assigned tasks that are part of this {@code KafkaStreamsAssignment}
*/
Set<AssignedTask> assignment();
/**
* @return the followup rebalance deadline in epoch time, after which this KafkaStreams
* client will trigger a new rebalance
*/
Instant followupRebalanceDeadline();
class AssignedTask {
private final TaskId id;
private final Type taskType;
public AssignedTask(final TaskId id, final Type taskType) {
this.id = id;
this.taskType = taskType;
}
public enum Type {
ACTIVE,
STANDBY
}
/**
*
* @return the id of the {@code AssignedTask}
*/
public TaskId id() {
return id;
}
/**
*
* @return the type of the {@code AssignedTask}
*/
public Type type() {
return taskType;
}
}
}

View File

@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor.assignment;
import java.util.Map;
import java.util.Optional;
import java.util.SortedSet;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.state.HostInfo;
/**
* A read-only metadata class representing the current state of each KafkaStreams client with at least one StreamThread participating in this rebalance
*/
public interface KafkaStreamsState {
/**
* @return the processId of the application instance running on this KafkaStreams client
*/
ProcessId processId();
/**
* Returns the number of processing threads available to work on tasks for this KafkaStreams client,
* which represents its overall capacity for work relative to other KafkaStreams clients.
*
* @return the number of processing threads on this KafkaStreams client
*/
int numProcessingThreads();
/**
* @return the set of consumer client ids for this KafkaStreams client
*/
SortedSet<String> consumerClientIds();
/**
* @return the set of all active tasks owned by consumers on this KafkaStreams client since the previous rebalance
*/
SortedSet<TaskId> previousActiveTasks();
/**
* @return the set of all standby tasks owned by consumers on this KafkaStreams client since the previous rebalance
*/
SortedSet<TaskId> previousStandbyTasks();
/**
* Returns the total lag across all logged stores in the task. Equal to the end offset sum if this client
* did not have any state for this task on disk.
*
* @return end offset sum - offset sum
* Task.LATEST_OFFSET if this was previously an active running task on this client
*
* @throws UnsupportedOperationException if the user did not request task lags be computed.
*/
long lagFor(final TaskId task);
/**
* @return the previous tasks assigned to this consumer ordered by lag, filtered for any tasks that don't exist in this assignment
*
* @throws UnsupportedOperationException if the user did not request task lags be computed.
*/
SortedSet<TaskId> prevTasksByLag(final String consumerClientId);
/**
* Returns a collection containing all (and only) stateful tasks in the topology by {@link TaskId},
* mapped to its "offset lag sum". This is computed as the difference between the changelog end offset
* and the current offset, summed across all logged state stores in the task.
*
* @return a map from all stateful tasks to their lag sum
*
* @throws UnsupportedOperationException if the user did not request task lags be computed.
*/
Map<TaskId, Long> statefulTasksToLagSums();
/**
* The {@link HostInfo} of this KafkaStreams client, if set via the
* {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_SERVER_CONFIG application.server} config
*
* @return the host info for this KafkaStreams client if configured, else {@code Optional.empty()}
*/
Optional<HostInfo> hostInfo();
/**
* The client tags for this KafkaStreams client, if set any have been via configs using the
* {@link org.apache.kafka.streams.StreamsConfig#clientTagPrefix}
* <p>
* Can be used however you want, or passed in to enable the rack-aware standby task assignor.
*
* @return all the client tags found in this KafkaStreams client's {@link org.apache.kafka.streams.StreamsConfig}
*/
Map<String, String> clientTags();
}

View File

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor.assignment;
import org.apache.kafka.common.protocol.types.Field.UUID;
/** A simple wrapper around UUID that abstracts a Process ID */
public class ProcessId {
private final UUID id;
public ProcessId(final UUID id) {
this.id = id;
}
/**
*
* @return the underlying {@code UUID} that this ProcessID is wrapping.
*/
public UUID id() {
return id;
}
}

View File

@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor.assignment;
import java.util.Map;
import java.util.SortedSet;
import org.apache.kafka.streams.processor.TaskId;
/**
* A set of utilities to help implement task assignment via the {@link TaskAssignor}
*/
public final class TaskAssignmentUtils {
/**
* Assign standby tasks to KafkaStreams clients according to the default logic.
* <p>
* If rack-aware client tags are configured, the rack-aware standby task assignor will be used
*
* @param applicationState the metadata and other info describing the current application state
* @param kafkaStreamsAssignments the current assignment of tasks to KafkaStreams clients
*
* @return a new map containing the mappings from KafkaStreamsAssignments updated with the default
* standby assignment
*/
public static Map<ProcessId, KafkaStreamsAssignment> defaultStandbyTaskAssignment(
final ApplicationState applicationState,
final Map<ProcessId, KafkaStreamsAssignment> kafkaStreamsAssignments
) {
throw new UnsupportedOperationException("Not Implemented.");
}
/**
* Optimize the active task assignment for rack-awareness
*
* @param applicationState the metadata and other info describing the current application state
* @param kafkaStreamsAssignments the current assignment of tasks to KafkaStreams clients
* @param tasks the set of tasks to reassign if possible. Must already be assigned
* to a KafkaStreams client
*
* @return a new map containing the mappings from KafkaStreamsAssignments updated with the default
* rack-aware assignment for active tasks
*/
public static Map<ProcessId, KafkaStreamsAssignment> optimizeRackAwareActiveTasks(
final ApplicationState applicationState,
final Map<ProcessId, KafkaStreamsAssignment> kafkaStreamsAssignments,
final SortedSet<TaskId> tasks
) {
throw new UnsupportedOperationException("Not Implemented.");
}
/**
* Optimize the standby task assignment for rack-awareness
*
* @param kafkaStreamsAssignments the current assignment of tasks to KafkaStreams clients
* @param applicationState the metadata and other info describing the current application state
*
* @return a new map containing the mappings from KafkaStreamsAssignments updated with the default
* rack-aware assignment for standby tasks
*/
public static Map<ProcessId, KafkaStreamsAssignment> optimizeRackAwareStandbyTasks(
final ApplicationState applicationState,
final Map<ProcessId, KafkaStreamsAssignment> kafkaStreamsAssignments
) {
throw new UnsupportedOperationException("Not Implemented.");
}
/**
* Return a "no-op" assignment that just copies the previous assignment of tasks to KafkaStreams clients
*
* @param applicationState the metadata and other info describing the current application state
*
* @return a new map containing an assignment that replicates exactly the previous assignment reported
* in the applicationState
*/
public static Map<ProcessId, KafkaStreamsAssignment> identityAssignment(
final ApplicationState applicationState
) {
throw new UnsupportedOperationException("Not Implemented.");
}
}

View File

@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor.assignment;
import java.util.Collection;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.GroupAssignment;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.GroupSubscription;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.errors.TaskAssignmentException;
/**
* A TaskAssignor is responsible for creating a TaskAssignment from a given
* {@code ApplicationState}.
* The implementation may also override the {@code onAssignmentComputed} callback for insight into
* the result of the assignment result.
*/
public interface TaskAssignor extends Configurable {
/**
* NONE: no error detected
* ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES: multiple KafkaStreams clients assigned with the same active task
* ACTIVE_AND_STANDBY_TASK_ASSIGNED_TO_SAME_KAFKASTREAMS: active task and standby task assigned to the same KafkaStreams client
* INVALID_STANDBY_TASK: stateless task assigned as a standby task
* UNKNOWN_PROCESS_ID: unrecognized ProcessId not matching any of the participating consumers
* UNKNOWN_TASK_ID: unrecognized TaskId not matching any of the tasks to be assigned
*/
enum AssignmentError {
NONE,
ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES,
ACTIVE_AND_STANDBY_TASK_ASSIGNED_TO_SAME_KAFKASTREAMS,
INVALID_STANDBY_TASK,
UNKNOWN_PROCESS_ID,
UNKNOWN_TASK_ID
}
/**
* @param applicationState the metadata for this Kafka Streams application
*
* @return the assignment of active and standby tasks to KafkaStreams clients
*
* @throws TaskAssignmentException If an error occurs during assignment, and you wish for the rebalance to be retried,
* you can throw this exception to keep the assignment unchanged and automatically
* schedule an immediate followup rebalance.
*/
TaskAssignment assign(ApplicationState applicationState);
/**
* This callback can be used to observe the final assignment returned to the brokers and check for any errors that
* were detected while processing the returned assignment. If any errors were found, the corresponding
* will be returned and a StreamsException will be thrown after this callback returns. The StreamsException will
* be thrown up to kill the StreamThread and can be handled as any other uncaught exception would if the application
* has registered a {@link StreamsUncaughtExceptionHandler}.
*
* @param assignment: the final assignment returned to the kafka broker
* @param subscription: the original subscription passed into the assignor
* @param error: the corresponding error type if one was detected while processing the returned assignment,
* or AssignmentError.NONE if the returned assignment was valid
*/
default void onAssignmentComputed(GroupAssignment assignment, GroupSubscription subscription, AssignmentError error) {}
/**
* Wrapper class for the final assignment of active and standbys tasks to individual
* KafkaStreams clients.
*/
class TaskAssignment {
private final Collection<KafkaStreamsAssignment> assignment;
public TaskAssignment(final Collection<KafkaStreamsAssignment> assignment) {
this.assignment = assignment;
}
/**
* @return the assignment of tasks to kafka streams clients.
*/
public Collection<KafkaStreamsAssignment> assignment() {
return assignment;
}
}
}

View File

@ -110,6 +110,13 @@ public class RepartitionTopics {
.collect(Collectors.toSet());
}
public Set<String> missingSourceTopics() {
return missingInputTopicsBySubtopology.entrySet().stream()
.map(entry -> entry.getValue())
.flatMap(missingTopicSet -> missingTopicSet.stream())
.collect(Collectors.toSet());
}
public Queue<StreamsException> missingSourceTopicExceptions() {
return missingInputTopicsBySubtopology.entrySet().stream().map(entry -> {
final Set<String> missingSourceTopics = entry.getValue();

View File

@ -523,7 +523,9 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
final boolean isMissingInputTopics = !repartitionTopics.missingSourceTopicExceptions().isEmpty();
if (isMissingInputTopics) {
if (!taskManager.topologyMetadata().hasNamedTopologies()) {
throw new MissingSourceTopicException("Missing source topics.");
final String errorMsg = String.format("Missing source topics. %s", repartitionTopics.missingSourceTopics());
log.error(errorMsg);
throw new MissingSourceTopicException(errorMsg);
} else {
nonFatalExceptionsToHandle.addAll(repartitionTopics.missingSourceTopicExceptions());
}

View File

@ -53,7 +53,9 @@ public class StreamsRebalanceListener implements ConsumerRebalanceListener {
// NB: all task management is already handled by:
// org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment
if (assignmentErrorCode.get() == AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code()) {
log.error("Received error code {}", AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA);
log.error("Received error code {}. {}",
AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.codeName(),
AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.description());
taskManager.handleRebalanceComplete();
throw new MissingSourceTopicException("One or more source topics were missing during rebalance");
} else if (assignmentErrorCode.get() == AssignorError.VERSION_PROBING.code()) {

View File

@ -19,20 +19,31 @@ package org.apache.kafka.streams.processor.internals.assignment;
public enum AssignorError {
// Note: this error code should be reserved for fatal errors, as the receiving clients are future-proofed
// to throw an exception upon an unrecognized error code.
NONE(0),
INCOMPLETE_SOURCE_TOPIC_METADATA(1),
VERSION_PROBING(2), // not actually used anymore, but we may hit it during a rolling upgrade from earlier versions
ASSIGNMENT_ERROR(3),
SHUTDOWN_REQUESTED(4);
NONE(0, "NONE", "NONE"),
INCOMPLETE_SOURCE_TOPIC_METADATA(1, "INCOMPLETE_SOURCE_TOPIC_METADATA", "Missing metadata for source topics. Check the group leader logs for details."),
VERSION_PROBING(2, "VERSION_PROBING", "Could not read internal rebalance metadata due to unknown encoding version."), // not actually used anymore, but we may hit it during a rolling upgrade from earlier versions
ASSIGNMENT_ERROR(3, "ASSIGNMENT_ERROR", "Internal task assignment error. Check the group leader logs for details."),
SHUTDOWN_REQUESTED(4, "SHUTDOWN_REQUESTED", "A KafkaStreams instance encountered a fatal error and requested a shutdown for the entire application.");
private final int code;
private final String codeName;
private final String description;
AssignorError(final int code) {
AssignorError(final int code, final String codeName, final String description) {
this.code = code;
this.codeName = codeName;
this.description = description;
}
public int code() {
return code;
}
public String codeName() {
return codeName;
}
public String description() {
return description;
}
}

View File

@ -17,10 +17,8 @@
package org.apache.kafka.tools.consumer.group;
import kafka.test.ClusterInstance;
import kafka.test.annotation.ClusterConfigProperty;
import kafka.test.annotation.ClusterTest;
import kafka.test.annotation.ClusterTestDefaults;
import kafka.test.annotation.Type;
import kafka.test.ClusterGenerator;
import kafka.test.annotation.ClusterTemplate;
import kafka.test.junit.ClusterTestExtensions;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.Admin;
@ -45,7 +43,6 @@ import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.extension.ExtendWith;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@ -59,28 +56,21 @@ import static org.junit.jupiter.api.Assertions.assertNull;
@Disabled
@Tag("integration")
@ClusterTestDefaults(clusterType = Type.ALL, serverProperties = {
@ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
@ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
@ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true")
})
@ExtendWith(ClusterTestExtensions.class)
public class DeleteOffsetsConsumerGroupCommandIntegrationTest {
public static final String TOPIC_PREFIX = "foo.";
public static final String GROUP_PREFIX = "test.group.";
private final ClusterInstance clusterInstance;
private final Iterable<Map<String, Object>> consumerConfigs;
DeleteOffsetsConsumerGroupCommandIntegrationTest(ClusterInstance clusterInstance) {
this.clusterInstance = clusterInstance;
this.consumerConfigs = clusterInstance.isKRaftTest()
? Arrays.asList(Collections.singletonMap(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name()),
Collections.singletonMap(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name()))
: Collections.singletonList(Collections.emptyMap());
}
@ClusterTest
private static void generator(ClusterGenerator clusterGenerator) {
ConsumerGroupCommandTestUtils.generator(clusterGenerator);
}
@ClusterTemplate("generator")
public void testDeleteOffsetsNonExistingGroup() {
String group = "missing.group";
String topic = "foo:1";
@ -90,91 +80,91 @@ public class DeleteOffsetsConsumerGroupCommandIntegrationTest {
}
}
@ClusterTest
@ClusterTemplate("generator")
public void testDeleteOffsetsOfStableConsumerGroupWithTopicPartition() {
for (Map<String, Object> consumerConfig: consumerConfigs) {
String topic = TOPIC_PREFIX + consumerConfig.getOrDefault(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name());
String group = GROUP_PREFIX + consumerConfig.getOrDefault(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name());
for (GroupProtocol groupProtocol : clusterInstance.supportedGroupProtocols()) {
String topic = TOPIC_PREFIX + groupProtocol.name();
String group = GROUP_PREFIX + groupProtocol.name();
createTopic(topic);
Runnable validateRunnable = getValidateRunnable(topic, group, 0, 0, Errors.GROUP_SUBSCRIBED_TO_TOPIC);
testWithConsumerGroup(topic, group, consumerConfig, true, validateRunnable);
testWithConsumerGroup(topic, group, groupProtocol, true, validateRunnable);
removeTopic(topic);
}
}
@ClusterTest
@ClusterTemplate("generator")
public void testDeleteOffsetsOfStableConsumerGroupWithTopicOnly() {
for (Map<String, Object> consumerConfig: consumerConfigs) {
String topic = TOPIC_PREFIX + consumerConfig.getOrDefault(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name());
String group = GROUP_PREFIX + consumerConfig.getOrDefault(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name());
for (GroupProtocol groupProtocol : clusterInstance.supportedGroupProtocols()) {
String topic = TOPIC_PREFIX + groupProtocol.name();
String group = GROUP_PREFIX + groupProtocol.name();
createTopic(topic);
Runnable validateRunnable = getValidateRunnable(topic, group, -1, 0, Errors.GROUP_SUBSCRIBED_TO_TOPIC);
testWithConsumerGroup(topic, group, consumerConfig, true, validateRunnable);
testWithConsumerGroup(topic, group, groupProtocol, true, validateRunnable);
removeTopic(topic);
}
}
@ClusterTest
@ClusterTemplate("generator")
public void testDeleteOffsetsOfStableConsumerGroupWithUnknownTopicPartition() {
for (Map<String, Object> consumerConfig: consumerConfigs) {
String topic = TOPIC_PREFIX + consumerConfig.getOrDefault(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name());
String group = GROUP_PREFIX + consumerConfig.getOrDefault(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name());
for (GroupProtocol groupProtocol : clusterInstance.supportedGroupProtocols()) {
String topic = TOPIC_PREFIX + groupProtocol.name();
String group = GROUP_PREFIX + groupProtocol.name();
Runnable validateRunnable = getValidateRunnable("foobar", group, 0, 0, Errors.UNKNOWN_TOPIC_OR_PARTITION);
testWithConsumerGroup(topic, group, consumerConfig, true, validateRunnable);
testWithConsumerGroup(topic, group, groupProtocol, true, validateRunnable);
}
}
@ClusterTest
@ClusterTemplate("generator")
public void testDeleteOffsetsOfStableConsumerGroupWithUnknownTopicOnly() {
for (Map<String, Object> consumerConfig: consumerConfigs) {
String topic = TOPIC_PREFIX + consumerConfig.getOrDefault(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name());
String group = GROUP_PREFIX + consumerConfig.getOrDefault(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name());
for (GroupProtocol groupProtocol : clusterInstance.supportedGroupProtocols()) {
String topic = TOPIC_PREFIX + groupProtocol.name();
String group = GROUP_PREFIX + groupProtocol.name();
Runnable validateRunnable = getValidateRunnable("foobar", group, -1, -1, Errors.UNKNOWN_TOPIC_OR_PARTITION);
testWithConsumerGroup(topic, group, consumerConfig, true, validateRunnable);
testWithConsumerGroup(topic, group, groupProtocol, true, validateRunnable);
}
}
@ClusterTest
@ClusterTemplate("generator")
public void testDeleteOffsetsOfEmptyConsumerGroupWithTopicPartition() {
for (Map<String, Object> consumerConfig: consumerConfigs) {
String topic = TOPIC_PREFIX + consumerConfig.getOrDefault(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name());
String group = GROUP_PREFIX + consumerConfig.getOrDefault(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name());
for (GroupProtocol groupProtocol : clusterInstance.supportedGroupProtocols()) {
String topic = TOPIC_PREFIX + groupProtocol.name();
String group = GROUP_PREFIX + groupProtocol.name();
createTopic(topic);
Runnable validateRunnable = getValidateRunnable(topic, group, 0, 0, Errors.NONE);
testWithConsumerGroup(topic, group, consumerConfig, false, validateRunnable);
testWithConsumerGroup(topic, group, groupProtocol, false, validateRunnable);
removeTopic(topic);
}
}
@ClusterTest
@ClusterTemplate("generator")
public void testDeleteOffsetsOfEmptyConsumerGroupWithTopicOnly() {
for (Map<String, Object> consumerConfig: consumerConfigs) {
String topic = TOPIC_PREFIX + consumerConfig.getOrDefault(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name());
String group = GROUP_PREFIX + consumerConfig.getOrDefault(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name());
for (GroupProtocol groupProtocol : clusterInstance.supportedGroupProtocols()) {
String topic = TOPIC_PREFIX + groupProtocol.name();
String group = GROUP_PREFIX + groupProtocol.name();
createTopic(topic);
Runnable validateRunnable = getValidateRunnable(topic, group, -1, 0, Errors.NONE);
testWithConsumerGroup(topic, group, consumerConfig, false, validateRunnable);
testWithConsumerGroup(topic, group, groupProtocol, false, validateRunnable);
removeTopic(topic);
}
}
@ClusterTest
@ClusterTemplate("generator")
public void testDeleteOffsetsOfEmptyConsumerGroupWithUnknownTopicPartition() {
for (Map<String, Object> consumerConfig: consumerConfigs) {
String topic = TOPIC_PREFIX + consumerConfig.getOrDefault(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name());
String group = GROUP_PREFIX + consumerConfig.getOrDefault(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name());
for (GroupProtocol groupProtocol : clusterInstance.supportedGroupProtocols()) {
String topic = TOPIC_PREFIX + groupProtocol.name();
String group = GROUP_PREFIX + groupProtocol.name();
Runnable validateRunnable = getValidateRunnable("foobar", group, 0, 0, Errors.UNKNOWN_TOPIC_OR_PARTITION);
testWithConsumerGroup(topic, group, consumerConfig, false, validateRunnable);
testWithConsumerGroup(topic, group, groupProtocol, false, validateRunnable);
}
}
@ClusterTest
@ClusterTemplate("generator")
public void testDeleteOffsetsOfEmptyConsumerGroupWithUnknownTopicOnly() {
for (Map<String, Object> consumerConfig: consumerConfigs) {
String topic = TOPIC_PREFIX + consumerConfig.getOrDefault(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name());
String group = GROUP_PREFIX + consumerConfig.getOrDefault(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name());
for (GroupProtocol groupProtocol : clusterInstance.supportedGroupProtocols()) {
String topic = TOPIC_PREFIX + groupProtocol.name();
String group = GROUP_PREFIX + groupProtocol.name();
Runnable validateRunnable = getValidateRunnable("foobar", group, -1, -1, Errors.UNKNOWN_TOPIC_OR_PARTITION);
testWithConsumerGroup(topic, group, consumerConfig, false, validateRunnable);
testWithConsumerGroup(topic, group, groupProtocol, false, validateRunnable);
}
}
@ -219,11 +209,11 @@ public class DeleteOffsetsConsumerGroupCommandIntegrationTest {
}
private void testWithConsumerGroup(String inputTopic,
String inputGroup,
Map<String, Object> consumerConfig,
GroupProtocol groupProtocol,
boolean isStable,
Runnable validateRunnable) {
produceRecord(inputTopic);
try (Consumer<byte[], byte[]> consumer = createConsumer(inputGroup, consumerConfig)) {
try (Consumer<byte[], byte[]> consumer = createConsumer(inputGroup, groupProtocol)) {
consumer.subscribe(Collections.singletonList(inputTopic));
ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(DEFAULT_MAX_WAIT_MS));
Assertions.assertNotEquals(0, records.count());
@ -253,9 +243,10 @@ public class DeleteOffsetsConsumerGroupCommandIntegrationTest {
return new KafkaProducer<>(config);
}
private Consumer<byte[], byte[]> createConsumer(String group, Map<String, Object> config) {
Map<String, Object> consumerConfig = new HashMap<>(config);
private Consumer<byte[], byte[]> createConsumer(String group, GroupProtocol groupProtocol) {
Map<String, Object> consumerConfig = new HashMap<>();
consumerConfig.putIfAbsent(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers());
consumerConfig.putIfAbsent(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name());
consumerConfig.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerConfig.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, group);
consumerConfig.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());