mirror of https://github.com/apache/kafka.git
KAFKA-12989 MockClient should respect the request matcher passed to prepareUnsupportedVersionResponse (#16849)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
parent
b63b20eeee
commit
2f0ae82d4a
|
@ -250,15 +250,16 @@ public class MockClient implements KafkaClient {
|
|||
short version = nodeApiVersions.latestUsableVersion(request.apiKey(), builder.oldestAllowedVersion(),
|
||||
builder.latestAllowedVersion());
|
||||
|
||||
|
||||
AbstractRequest abstractRequest = request.requestBuilder().build(version);
|
||||
if (!futureResp.requestMatcher.matches(abstractRequest))
|
||||
throw new IllegalStateException("Request matcher did not match next-in-line request "
|
||||
+ abstractRequest + " with prepared response " + futureResp.responseBody);
|
||||
|
||||
UnsupportedVersionException unsupportedVersionException = null;
|
||||
if (futureResp.isUnsupportedRequest) {
|
||||
unsupportedVersionException = new UnsupportedVersionException(
|
||||
"Api " + request.apiKey() + " with version " + version);
|
||||
} else {
|
||||
AbstractRequest abstractRequest = request.requestBuilder().build(version);
|
||||
if (!futureResp.requestMatcher.matches(abstractRequest))
|
||||
throw new IllegalStateException("Request matcher did not match next-in-line request "
|
||||
+ abstractRequest + " with prepared response " + futureResp.responseBody);
|
||||
}
|
||||
|
||||
ClientResponse resp = new ClientResponse(request.makeHeader(version), request.callback(), request.destination(),
|
||||
|
|
|
@ -876,7 +876,9 @@ public class TransactionManagerTest {
|
|||
client.prepareUnsupportedVersionResponse(body -> {
|
||||
FindCoordinatorRequest findCoordinatorRequest = (FindCoordinatorRequest) body;
|
||||
assertEquals(CoordinatorType.forId(findCoordinatorRequest.data().keyType()), CoordinatorType.TRANSACTION);
|
||||
assertEquals(findCoordinatorRequest.data().key(), transactionalId);
|
||||
assertTrue(findCoordinatorRequest.data().key().isEmpty());
|
||||
assertEquals(1, findCoordinatorRequest.data().coordinatorKeys().size());
|
||||
assertTrue(findCoordinatorRequest.data().coordinatorKeys().contains(transactionalId));
|
||||
return true;
|
||||
});
|
||||
|
||||
|
|
|
@ -171,7 +171,8 @@ class ForwardingManagerTest {
|
|||
|
||||
Mockito.when(controllerNodeProvider.getControllerInfo()).thenReturn(controllerInfo)
|
||||
|
||||
client.prepareUnsupportedVersionResponse(req => req.apiKey == requestHeader.apiKey)
|
||||
val isEnvelopeRequest: RequestMatcher = request => request.isInstanceOf[EnvelopeRequest]
|
||||
client.prepareUnsupportedVersionResponse(isEnvelopeRequest)
|
||||
|
||||
val response = new AtomicReference[AbstractResponse]()
|
||||
forwardingManager.forwardRequest(request, res => res.foreach(response.set))
|
||||
|
|
Loading…
Reference in New Issue