Commit Graph

70 Commits

Author SHA1 Message Date
Colin Patrick McCabe ec38dcb72f
MINOR: support ImplicitLinkedHashCollection#sort (#10456)
Support sorting the elements in ImplicitLinkedHashCollection.
This is useful sometimes in unit tests for comparing collections.

Reviewers: Ismael Juma <ismael@juma.me.uk>
2021-04-01 18:30:53 -07:00
Justine Olshan 40f001cc53
KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager (#10282)
KIP-516 introduced partition.metadata file to persist the topic ID on the broker. It is created through handling the LeaderAndIsrRequest in ReplicaManager. (See https://github.com/apache/kafka/pull/10143 for the code path.) RaftReplicaManager was missing the analogue code path for Kip-500 code. Like in ReplicaManager, RaftReplicaManager will now check the partition.metadata file when handling metadata records.

However, since we know that all raft topics will have topic IDs, we can simply set the ID in the log upon the log's creation.
Updated the ReplicaManager path to do the same on newly created topics.

There are also some tweaks to the checking logic to better handle the scenario when the log exists but is not yet associated to Partition (for example, upon startup after a shutdown).

Tests added to ensure the file is created and that the correct error is thrown when the id is inconsistent.
Added tests for creating the log with the new topic ID parameter.

Also adds a few methods to get topic ID from MetadataImageBuilder as this is the most convenient way to get topic ID from RaftReplicaManager.

Reviewers: Ron Dagostino <rdagostino@confluent.io>, Jason Gustafson <jason@confluent.io>
2021-04-01 15:42:21 -07:00
Ismael Juma 7a3ebbebbc
KAFKA-12415 Prepare for Gradle 7.0 and restrict transitive scope for non api dependencies (#10203)
Gradle 7.0 is required for Java 16 compatibility and it removes a number of
deprecated APIs. Fix most issues preventing the upgrade to Gradle 7.0.
The remaining ones are more complicated and should be handled
in a separate PR. Details of the changes:

* Release tarball no longer includes includes test, sources, javadoc and test sources jars (these
are still published to the Maven Central repository).
* Replace `compile` with `api` or `implementation` - note that `implementation`
dependencies appear with `runtime` scope in the pom file so this is a (positive)
change in behavior
* Add missing dependencies that were uncovered by the usage of `implementation`
* Replace `testCompile` with `testImplementation`
* Replace `runtime` with `runtimeOnly` and `testRuntime` with `testRuntimeOnly`
* Replace `configurations.runtime` with `configurations.runtimeClasspath`
* Replace `configurations.testRuntime` with `configurations.testRuntimeClasspath` (except for
the usage in the `streams` project as that causes a cyclic dependency error)
* Use `java-library` plugin instead of `java`
* Use `maven-publish` plugin instead of deprecated `maven` plugin - this changes the
commands used to publish and to install locally, but task aliases for `install` and
`uploadArchives` were added for backwards compatibility
* Removed `-x signArchives` line from the readme since it was wrong (it was a
no-op before and it fails now, however)
* Replaces `artifacts` block with an approach that works with the `maven-publish` plugin
* Don't publish `jmh-benchmark` module - the shadow jar is pretty large and not
particularly useful (before this PR, we would publish the non shadow jars)
* Replace `version` with `archiveVersion`, `baseName` with `archiveBaseName` and
`classifier` with `archiveClassifier`
* Update Gradle and plugins to the latest stable version (7.0 is not stable yet)
* Use `plugin` DSL to configure plugins
* Updated notable changes for 3.0

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Randall Hauch <rhauch@gmail.com>
2021-03-04 11:22:22 -08:00
Chia-Ping Tsai 8205051e90
MINOR: remove FetchResponse.AbortedTransaction and redundant construc… (#9758)
1. rename INVALID_HIGHWATERMARK to INVALID_HIGH_WATERMARK
2. replace FetchResponse.AbortedTransaction by FetchResponseData.AbortedTransaction
3. remove redundant constructors from FetchResponse.PartitionData
4. rename recordSet to records
5. add helpers "recordsOrFail" and "recordsSize" to FetchResponse to process record casting

Reviewers: Ismael Juma <ismael@juma.me.uk>
2021-03-04 18:06:50 +08:00
Jason Gustafson 698319b8e2 KAFKA-12278; Ensure exposed api versions are consistent within listener (#10666)
Previously all APIs were accessible on every listener exposed by the broker, but
with KIP-500, that is no longer true.  We now have more complex requirements for
API accessibility.

For example, the KIP-500 controller exposes some APIs which are not exposed by
brokers, such as BrokerHeartbeatRequest, and does not expose most client APIs,
such as JoinGroupRequest, etc.  Similarly, the KIP-500 broker does not implement
some APIs that the ZK-based broker does, such as LeaderAndIsrRequest and
UpdateFeaturesRequest.

All of this means that we need more sophistication in how we expose APIs and
keep them consistent with the ApiVersions API. Up until now, we have been
working around this using the controllerOnly flag inside ApiKeys, but this is
not rich enough to support all of the cases listed above.  This PR introduces a
new "listeners" field to the request schema definitions.  This field is an array
of strings which indicate the listener types in which the API should be exposed.
We currently support "zkBroker", "broker", and "controller".  ("broker"
indicates the KIP-500 broker, whereas zkBroker indicates the old broker).

This PR also creates ApiVersionManager to encapsulate the creation of the
ApiVersionsResponse based on the listener type.  Additionally, it modifies
SocketServer to check the listener type of received requests before forwarding
them to the request handler.

Finally, this PR also fixes a bug in the handling of the ApiVersionsResponse
prior to authentication. Previously a static response was sent, which means that
changes to features would not get reflected. This also meant that the logic to
ensure that only the intersection of version ranges supported by the controller
would get exposed did not work. I think this is important because some clients
rely on the initial pre-authenticated ApiVersions response rather than doing a
second round after authentication as the Java client does.

One final cleanup note: I have removed the expectation that envelope requests
are only allowed on "privileged" listeners.  This made sense initially because
we expected to use forwarding before the KIP-500 controller was available. That
is not the case anymore and we expect the Envelope API to only be exposed on the
controller listener. I have nevertheless preserved the existing workarounds to
allow verification of the forwarding behavior in integration testing.

Reviewers: Colin P. McCabe <cmccabe@apache.org>, Ismael Juma <ismael@juma.me.uk>
2021-02-18 16:25:51 -08:00
Ron Dagostino a30f92bf59
MINOR: Add KIP-500 BrokerServer and ControllerServer (#10113)
This PR adds the KIP-500 BrokerServer and ControllerServer classes and 
makes some related changes to get them working.  Note that the ControllerServer 
does not instantiate a QuorumController object yet, since that will be added in
PR #10070.

* Add BrokerServer and ControllerServer

* Change ApiVersions#computeMaxUsableProduceMagic so that it can handle
endpoints which do not support PRODUCE (such as KIP-500 controller nodes)

* KafkaAdminClientTest: fix some lingering references to decommissionBroker
that should be references to unregisterBroker.

* Make some changes to allow SocketServer to be used by ControllerServer as
we as by the broker.

* We now return a random active Broker ID as the Controller ID in
MetadataResponse for the Raft-based case as per KIP-590.

* Add the RaftControllerNodeProvider

* Add EnvelopeUtils

* Add MetaLogRaftShim

* In ducktape, in config_property.py: use a KIP-500 compatible cluster ID.

Reviewers: Colin P. McCabe <cmccabe@apache.org>, David Arthur <mumrah@gmail.com>
2021-02-17 21:35:13 -08:00
Ismael Juma 744d05b128
KAFKA-12327: Remove MethodHandle usage in CompressionType (#10123)
We don't really need it and it causes problems in older Android versions
and GraalVM native image usage (there are workarounds for the latter).

Move the logic to separate classes that are only invoked when the
relevant compression library is actually used. Place such classes
in their own package and enforce via checkstyle that only these
classes refer to compression library packages.

To avoid cyclic dependencies, moved `BufferSupplier` to the `utils`
package.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2021-02-14 08:12:25 -08:00
Justine Olshan 39dcdeffd7
MINOR: Prevent creating partition.metadata until ID can be written (#10041)
Currently the partition.metadata file is created when the log is created. However, clusters with older inter-broker protocols will never use this file. This PR moves the creation of the file to when we write to the file.

This PR also deletes the partition.metadata file on startup if the IBP version is lower than 2.8.

Reviewers: Jun Rao <junrao@gmail.com>
2021-02-10 15:22:15 -08:00
Ron Dagostino 42a9355e60
MINOR: Defer log recovery until LogManager startup (#10039)
Currently log recovery begins as soon as we instantiate `LogManager`, but when using a
Raft-based metadata quorum we won't have configs until after we catch up on the metadata
log.  We therefore defer log recovery until we actually invoke `startup()` on the `LogManager`
instance.  This timing difference has no effect when using ZooKeeper because we
immediately invoke `startup()` on the instantiated instance, but it gives us the necessary
flexibility for accurate log recovery with updated configs when using a Raft-based metadata
quorum.

The `LogCleaner` is currently instantiated during construction just after log recovery
completes, and then it is started in `startup()`.  As an extra precaution, since we are
no longer performing recovery during construction, we both instantiate and start the
log cleaner in `startup()` after log recovery completes.

We also convert `LogManager` to use a `ConfigRepository` to load topic configs
(which can override the default log configs) instead of having a hard-coded
dependency on ZooKeeper.  We retrieve the topic configs when we invoke `startup()`
-- which again is effectively no different from a timing perspective than what we do
today for the ZooKeeper case.

One subtlety is that currently we create the log configs for every topic at this point
-- if a topic has no config overrides then we associate a copy of the default
configuration with the topic inside a map, and we retrieve the log configs for that
topic's partitions from from that map during recovery.  This PR makes a change to
this series of events as follows.  We do not associate a copy of the the default
configuration with a topic in the map if the topic has no configs set when we query
for them.  This saves some memory -- we don't unnecessarily copy the default
config many times -- but it also means we have use the default log configs for
that topic later on when recovery for each of its partitions begins.

The difference is that the default configs are dynamically reconfigurable, and they
could potentially change between the time when we invoke `startup()` and when
log recovery begins (log recovery can begin quite some time after `startup()` is
invoked if shutdown was unclean).  Prior to this patch such a change would not
be used; with this patch they could be if they happen before recovery begins.
This actually is better -- we are performing log recovery with the most recent
known defaults when a topic had no overrides at all. Also, `Partition.createLog`
has logic to handle missed config updates, so the behavior is eventually the same.

The transition of the broker state from `STARTING` to `RECOVERY` currently
happens within the `LogManager`, and it only occurs if the shutdown was
unclean.  We move this transition into the broker as it avoids passing a
reference to the broker state into the `LogManager`.  We also now always
transition the broker into the `RECOVERY` state as dictated by [the KIP-631 broker state machine](https://cwiki.apache.org/confluence/display/KAFKA/KIP-631%3A+The+Quorumbased+Kafka+Controller#KIP631:TheQuorumbasedKafkaController-TheBrokerStateMachine).

Finally, a few clean-ups were included. One worth highlighting is that `Partition`
no longer requires a `ConfigRepository`.

Reviewers: David Arthur <david.arthur@confluent.io>, Ismael Juma <ismael@juma.me.uk>
2021-02-07 15:46:41 -08:00
Boyang Chen d2cb2dc45d
KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed (#9579)
Consolidate auto topic creation logic to either forward a CreateTopicRequest or handling the creation directly as AutoTopicCreationManager, when handling FindCoordinator/Metadata request.

Co-authored-by: Jason Gustafson <jason@confluent.io>

Reviewers: Jason Gustafson <jason@confluent.io>
2021-02-06 13:04:30 -08:00
David Arthur 242f65e5ba
Refactor the MetadataCache into two implementations (#10049)
Refactor the MetadataCache into two implementations that both implement a common trait.  This will let us
continue to use the existing implementation when using ZK, but use the new implementation when in kip-500 mode.

Reviewers: Colin McCabe <cmccabe@apache.org>, Justine Olshan <jolshan@confluent.io>, Jason Gustafson <jason@confluent.io>
2021-02-05 16:25:26 -08:00
Ron Dagostino acf39fe94a
MINOR: Allow KafkaApis to be configured for Raft controller quorums (#10045)
`KafkaApis` is configured differently when it is used in a broker with a Raft-based controller quorum vs. a ZooKeeper quorum.  For example, when using Raft, `ForwardingManager` is required rather than optional, and there is no `AdminManager`, `KafkaController`, or `KafkaZkClient`.  This PR introduces `MetadataSupport` to abstract the two possibilities: `ZkSupport` and `RaftSupport`.  This provides a fluent way to decide what to do based on the type of support that `KafkaApis` has been configured with.  Certain types of requests are not supported when using raft (`AlterIsrRequest`, `UpdateMetadataRequest`, etc.), and `MetadataSupport` gives us an intuitive way to identify the constraints and requirements associated with the different configurations and react accordingly.

Existing tests are sufficient to detect bugs and regressions.

Reviewers: José Armando García Sancio <jsancio@gmail.com>, Jason Gustafson <jason@confluent.io>
2021-02-05 12:57:44 -08:00
Ron Dagostino c4ea6fb0a7
MINOR: Add ConfigRepository, use in Partition and KafkaApis (#10005)
`Partition` objects are able to retrieve topic configs when creating their log, and currently they do so with an implementation of `trait TopicConfigFetcher` that uses ZooKeeper.  ZooKeeper is not available when using a Raft-based metadata log, so we need to abstract the retrieval of configs so it can work either with or without ZooKeeper.  This PR introduces `trait ConfigRepository` with `ZkConfigRepository` and `CachedConfigRepository` implementations.  `Partition` objects now use a provided `ConfigRepository` to retrieve topic configs, and we eliminate `TopicConfigFetcher` as it is no longer needed.

`ReplicaManager` now contains an instance of `ConfigRepository` so it can provide it when creating `Partition` instances.

`KafkaApis` needs to be able to handle describe-config requests; it currently delegates that to `ZkAdminManager`, which of course queries ZooKeeper.  To make this work with or without ZooKeeper we move the logic from `ZkAdminManager` into a new `ConfigHelper` class that goes through a `ConfigRepository` instance.  We provide `KafkaApis` with such an instance, and it creates an instance of the helper so it can use that instead of going through `ZkAdminManager`.

Existing tests are sufficient to identify bugs and regressions in `Partition`, `ReplicaManager`, `KafkaApis`, and `ConfigHelper`.  The `ConfigRepository` implementations have their own unit tests.

Reviewers: Jason Gustafson <jason@confluent.io>
2021-02-04 12:58:26 -08:00
Colin Patrick McCabe 772f2cfc82
MINOR: Replace BrokerStates.scala with BrokerState.java (#10028)
Replace BrokerStates.scala with BrokerState.java, to make it easier to use from Java code if needed.  This also makes it easier to go from a numeric type to an enum.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2021-02-03 13:41:38 -08:00
Colin Patrick McCabe 1711cfa4eb
KAFKA-12209: Add the timeline data structures for the KIP-631 controller (#9901)
Reviewers: Jun Rao <junrao@gmail.com>
2021-02-02 11:33:55 -08:00
Jason Gustafson 9689a313f5
MINOR: Drop enable.metadata.quorum config (#9934)
The primary purpose of this patch is to remove the internal `enable.metadata.quorum` configuration. Instead, we rely on `process.roles` to determine if the self-managed quorum has been enabled. As a part of this, I've done the following:

1. Replace the notion of "disabled" APIs with "controller-only" APIs. We previously marked some APIs which were intended only for the KIP-500 as "disabled" so that they would not be unintentionally exposed. For example, the Raft quorum APIs were disabled. Marking them as "controller-only" carries the same effect, but makes the intent that they should be only exposed by the KIP-500 controller clearer.
2. Make `ForwardingManager` optional in `KafkaServer` and `KafkaApis`. Previously we used `null` if forwarding was enabled and relied on the metadata quorum check.
3. Make `zookeeper.connect` an optional configuration if `process.roles` is defined.
4. Update raft README to remove reference to `zookeeper.conntect`

Reviewers: Colin Patrick McCabe <cmccabe@confluent.io>, Boyang Chen <boyang@confluent.io>
2021-01-21 15:16:15 -08:00
Colin Patrick McCabe b5c107363f
KAFKA-12208: Rename AdminManager to ZkAdminManager (#9900)
Rename AdminManager to ZkAdminManager to emphasize the fact that it is not used by the KIP-500 code paths.

Reviewers: Ismael Juma <ismael@juma.me.uk>, Boyang Chen <boyang@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
2021-01-15 12:56:09 -08:00
David Arthur d3f19e4bb0
KAFKA-10825 ZooKeeper ISR manager (#9713)
ISR-related cleanup in ReplicaManager and Partition. Removes ISR change logic from ReplicaManager and adds a new ZkIsrManager class which adheres to a new AlterIsrManager trait. Unifies all of the ISR logic in Partition so we don't have separate code paths for ZK vs AlterIsr. Also removes PartitionStateStore
2020-12-21 14:44:02 -05:00
dengziming 5c921afa4a
KAFKA-10547; Add TopicId in MetadataResponse (#9622)
Includes:
- Bump the version of MetadataRequest and MetadataResponse, add topicId in MetadataResponse
- Alter describeTopic in AdminClientTopicService and ZookeeperTopicService
- TopicMetadata is cached in MetadataCache, so we need to add topicId to MetadataCache
- MetadataCache is updated by UpdateMetadataRequest, bump the version of UpdateMetadataReq and UpdateMetadataResp, add topicId in UpdateMetadataReq.

Reviewers: Justine Olshan <jolshan@confluent.io>, Rajini Sivaram <rajinisivaram@googlemail.com>
2020-12-18 21:30:52 +00:00
Cheng Tan ae3a6ed990
KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic (KIP-679) (#9485)
Includes:
- New API to authorize by resource type
- Default implementation for the method that supports super users and ACLs
- Optimized implementation in AclAuthorizer that supports ACLs, super users and allow.everyone.if.no.acl.found
- Benchmarks and tests
- InitProducerIdRequest authorized for Cluster:IdempotentWrite or WRITE to any topic, ProduceRequest authorized only for topic even if idempotent

Reviewers: Lucas Bradstreet <lucas@confluent.io>, Rajini Sivaram <rajinisivaram@googlemail.com>
2020-12-18 18:08:46 +00:00
David Jacot 02a30a51eb
KAFKA-10740; Replace OffsetsForLeaderEpochRequest.PartitionData with automated protocol (#9689)
This patch follows up https://github.com/apache/kafka/pull/9547. It refactors AbstractFetcherThread and its descendants to use `OffsetForLeaderEpochRequestData.OffsetForLeaderPartition` instead of `OffsetsForLeaderEpochRequest.PartitionData`. The patch relies on existing tests.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Jason Gustafson <jason@confluent.io>
2020-12-17 17:40:37 +01:00
Ismael Juma 782175dfbc
MINOR: Simplify ApiKeys by relying on ApiMessageType (#9748)
* The naming for `ListOffsets` was inconsistent, in some places it was `ListOffset` and in others
it was `ListOffsets`. Picked the latter since it was used in metrics and the protocol documentation
and made it consistent.
* Removed unused methods in ApiKeys.
* Deleted `CommonFields`.
* Added `lowestSupportedVersion` and `highestSupportedVersion` to `ApiMessageType`
* Removed tests in `MessageTest` that are no longer relevant.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2020-12-16 06:33:10 -08:00
Anastasia Vela 1a10c3445e
KAFKA-10525: Emit JSONs with new auto-generated schema (KIP-673) (#9526)
This patch updates the request logger to output request and response payloads in JSON. Payloads are converted to JSON based on their auto-generated schema.

Reviewers:  Lucas Bradstreet <lucas@confluent.io>, David Mao <dmao@confluent.io>, David Jacot <djacot@confluent.io>
2020-12-15 14:33:36 +01:00
Ismael Juma 8cabd57612
MINOR: Update jmh to 1.27 for async profiler support (#9129)
Also updated the jmh readme to make it easier for new people to know
what's possible and best practices.

There were some changes in the generated benchmarking code that
required adjusting `spotbugs-exclude.xml` and for a `javac` warning
to be suppressed for the benchmarking module. I took the chance
to make the spotbugs exclusion mode maintainable via a regex
pattern.

Tested the commands on Linux and macOS with zsh.

JMH highlights:

* async-profiler integration. Can be used with -prof async,
pass -prof async:help to look for the accepted options.
* perf c2c [2] integration. Can be used with -prof perfc2c,
if available.
* JFR profiler integration. Can be used with -prof jfr, pass
-prof jfr:help to look for the accepted options.

Full details:
* 1.24: https://mail.openjdk.java.net/pipermail/jmh-dev/2020-August/002982.html
* 1.25: https://mail.openjdk.java.net/pipermail/jmh-dev/2020-August/002987.html
* 1.26: https://mail.openjdk.java.net/pipermail/jmh-dev/2020-October/003024.html
* 1.27: https://mail.openjdk.java.net/pipermail/jmh-dev/2020-December/003096.html

Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>, Bill Bejeck <bbejeck@gmail.com>, Lucas Bradstreet <lucasbradstreet@gmail.com>
2020-12-10 17:56:52 -08:00
Ismael Juma 1f98112e99
MINOR: Remove connection id from Send and consolidate request/message utils (#9714)
Connection id is now only present in `NetworkSend`, which is now
the class used by `Selector`/`NetworkClient`/`KafkaChannel` (which
works well since `NetworkReceive` is the class used for
received data).

The previous `NetworkSend` was also responsible for adding a size
prefix. This logic is already present in `SendBuilder`, but for the
minority of cases where `SendBuilder` is not used (including
a number of tests), we now have `ByteBufferSend.sizePrefixed()`.

With regards to the request/message utilities:
* Renamed `toByteBuffer`/`toBytes` in `MessageUtil` to
`toVersionPrefixedByteBuffer`/`toVersionPrefixedBytes` for clarity.
* Introduced new `MessageUtil.toByteBuffer` that does not include
the version as the prefix.
* Renamed `serializeBody` in `AbstractRequest/Response` to
`serialize` for symmetry with `parse`.
* Introduced `RequestTestUtils` and moved relevant methods from
`TestUtils`.
* Moved `serializeWithHeader` methods that were only used in
tests to `RequestTestUtils`.
* Deleted `MessageTestUtil`.

Finally, a couple of changes to simplify coding patterns:
* Added `flip()` and `buffer()` to `ByteBufferAccessor`.
* Added `MessageSizeAccumulator.sizeExcludingZeroCopy`.
* Used lambdas instead of `TestCondition`.
* Used `Arrays.copyOf` instead of `System.arraycopy` in `MessageUtil`.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Jason Gustafson <jason@confluent.io>
2020-12-09 11:15:58 -08:00
Ismael Juma 6f27bb02da
KAFKA-10818: Skip conversion to `Struct` when serializing generated requests/responses (#7409)
Generated request/response classes have code to serialize/deserialize directly to
`ByteBuffer` so the intermediate conversion to `Struct` can be skipped for them.
We have recently completed the transition to generated request/response classes,
so we can also remove the `Struct` based fallbacks.

Additional noteworthy changes:
* `AbstractRequest.parseRequest` has a more efficient computation of request size that
relies on the received buffer instead of the parsed `Struct`.
* Use `SendBuilder` for `AbstractRequest/Response` `toSend`, made the superclass
implementation final and removed the overrides that are no longer necessary.
* Removed request/response constructors that assume latest version as they are unsafe
outside of tests.
* Removed redundant version fields in requests/responses.
* Removed unnecessary work in `OffsetFetchResponse`'s constructor when version >= 2.
* Made `AbstractResponse.throttleTimeMs()` abstract.
* Using `toSend` in `SaslClientAuthenticator` instead of `serialize`.
* Various changes in Request/Response classes to make them more consistent and to
rely on the Data classes as much as possible when it comes to their state.
* Remove the version argument from `AbstractResponse.toString`.
* Fix `getErrorResponse` for `ProduceRequest` and `DescribeClientQuotasRequest` to
use `ApiError` which processes the error message sent back to the clients. This was
uncovered by an accidental fix to a `RequestResponseTest` test (it was calling
`AbstractResponse.toString` instead of `AbstractResponse.toString(short)`).

Rely on existing protocol tests to ensure this refactoring does not change 
observed behavior (aside from improved performance).

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
2020-12-07 15:39:57 -08:00
David Arthur 633f7cff19
KAFKA-10799 AlterIsr utilizes ReplicaManager ISR metrics (#9677)
Add small interface to Partition.scala that allows AlterIsr and ZK code paths to update the ISR metrics managed by ReplicaManager. This opens the door for consolidating even more code between the two ISR update code paths.
2020-12-03 16:11:07 -05:00
David Jacot 10364e4b0c
KAFKA-10739; Replace EpochEndOffset with automated protocol (#9630)
This patch follows up https://github.com/apache/kafka/pull/9547. It refactors KafkaApis, ReplicaManager and Partition to use `OffsetForLeaderEpochResponseData.EpochEndOffset` instead of `EpochEndOffset`. In the mean time, it removes `OffsetsForLeaderEpochRequest#epochsByTopicPartition` and `OffsetsForLeaderEpochResponse#responses` and replaces their usages to use the automated protocol directly. Finally, it removes old constructors in `OffsetsForLeaderEpochResponse`. The patch relies on existing tests.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Jason Gustafson <jason@confluent.io>
2020-12-03 18:50:29 +01:00
Rajini Sivaram 7ecc3a579a
KAFKA-10554; Perform follower truncation based on diverging epochs in Fetch response (#9382)
From IBP 2.7 onwards, fetch responses include diverging epoch and offset in fetch responses if lastFetchedEpoch is provided in the fetch request. This PR uses that information for truncation and avoids the additional OffsetForLeaderEpoch requests in followers when lastFetchedEpoch is known.

Co-authored-by: Jason Gustafson <jason@confluent.io>

Reviewers: Jason Gustafson <jason@confluent.io>, Nikhil Bhatia <rite2nikhil@gmail.com>
2020-12-03 10:12:06 +00:00
Jason Gustafson 6054837c0a
MINOR: Reduce sends created by `SendBuilder` (#9619)
This patch changes the grouping of `Send` objects created by `SendBuilder` in order to reduce the number of generated `Send` objects and thereby the number of system writes.

Reviewers: David Jacot <djacot@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
2020-11-19 12:45:23 -08:00
Chia-Ping Tsai 30bc21ca35
KAFKA-9628; Replace Produce request/response with automated protocol (#9401)
This patch rewrites `ProduceRequest` and `ProduceResponse` using the generated protocols. We have also added several new benchmarks to verify no regression in performance. A summary of results is included below:

### Benchmark

1. loop **30** times
1. calculate average

#### kafkatest.benchmarks.core.benchmark_test.Benchmark.test_producer_throughput

> @cluster(num_nodes=5)
> @parametrize(acks=-1, topic=TOPIC_REP_THREE)

- +0.3144915325 %
- 28.08766667 ->  28.1715625 (mb_per_sec)

> @cluster(num_nodes=5)
> @matrix(acks=[1], topic=[TOPIC_REP_THREE], message_size=[100000],compression_type=["none"], security_protocol=['PLAINTEXT'])

- +4.220730323 %
- 157.145 -> 163.7776667 (mb_per_sec)

> @cluster(num_nodes=7)
> @parametrize(acks=1, topic=TOPIC_REP_THREE, num_producers=3)

- +5.996241145%
- 57.64166667 -> 61.098 (mb_per_sec)

> @cluster(num_nodes=5)
> @parametrize(acks=1, topic=TOPIC_REP_THREE)

- +0.3979572536%
- 44.05833333 -> 44.23366667 (mb_per_sec)

> @cluster(num_nodes=5)
> @parametrize(acks=1, topic= TOPIC_REP_ONE)

- +2.228235226%
- 69.23266667 -> 70.77533333 (mb_per_sec)

### JMH results

In short, most ops performance are regression since we have to convert data to protocol data. The cost is inevitable (like other request/response) before we use protocol data directly.

### JMH for ProduceRequest

1. construction regression:
    - 281.474 -> 454.935 ns/op
    - 296.000 -> 1888.000 B/op
1. toErrorResponse regression:
    - 41.942 -> 107.528 ns/op
    - 1216.000 -> 1616.000 B/op
1. toStruct improvement:
    - 255.185 -> 90.728 ns/op
    - 864.000 -> 304.000 B/op

**BEFORE**
```
Benchmark                                                                        Mode  Cnt     Score    Error   Units
ProducerRequestBenchmark.constructorErrorResponse                                avgt   15    41.942 ±  0.036   ns/op
ProducerRequestBenchmark.constructorErrorResponse:·gc.alloc.rate                 avgt   15  6409.263 ±  5.478  MB/sec
ProducerRequestBenchmark.constructorErrorResponse:·gc.alloc.rate.norm            avgt   15   296.000 ±  0.001    B/op
ProducerRequestBenchmark.constructorErrorResponse:·gc.churn.G1_Eden_Space        avgt   15  6416.420 ± 76.071  MB/sec
ProducerRequestBenchmark.constructorErrorResponse:·gc.churn.G1_Eden_Space.norm   avgt   15   296.331 ±  3.539    B/op
ProducerRequestBenchmark.constructorErrorResponse:·gc.churn.G1_Old_Gen           avgt   15     0.002 ±  0.002  MB/sec
ProducerRequestBenchmark.constructorErrorResponse:·gc.churn.G1_Old_Gen.norm      avgt   15    ≈ 10⁻⁴             B/op
ProducerRequestBenchmark.constructorErrorResponse:·gc.count                      avgt   15   698.000           counts
ProducerRequestBenchmark.constructorErrorResponse:·gc.time                       avgt   15   378.000               ms
ProducerRequestBenchmark.constructorProduceRequest                               avgt   15   281.474 ±  3.286   ns/op
ProducerRequestBenchmark.constructorProduceRequest:·gc.alloc.rate                avgt   15  3923.868 ± 46.303  MB/sec
ProducerRequestBenchmark.constructorProduceRequest:·gc.alloc.rate.norm           avgt   15  1216.000 ±  0.001    B/op
ProducerRequestBenchmark.constructorProduceRequest:·gc.churn.G1_Eden_Space       avgt   15  3923.375 ± 59.568  MB/sec
ProducerRequestBenchmark.constructorProduceRequest:·gc.churn.G1_Eden_Space.norm  avgt   15  1215.844 ± 11.184    B/op
ProducerRequestBenchmark.constructorProduceRequest:·gc.churn.G1_Old_Gen          avgt   15     0.004 ±  0.001  MB/sec
ProducerRequestBenchmark.constructorProduceRequest:·gc.churn.G1_Old_Gen.norm     avgt   15     0.001 ±  0.001    B/op
ProducerRequestBenchmark.constructorProduceRequest:·gc.count                     avgt   15   515.000           counts
ProducerRequestBenchmark.constructorProduceRequest:·gc.time                      avgt   15   279.000               ms
ProducerRequestBenchmark.constructorStruct                                       avgt   15   255.185 ±  0.069   ns/op
ProducerRequestBenchmark.constructorStruct:·gc.alloc.rate                        avgt   15  3074.889 ±  0.823  MB/sec
ProducerRequestBenchmark.constructorStruct:·gc.alloc.rate.norm                   avgt   15   864.000 ±  0.001    B/op
ProducerRequestBenchmark.constructorStruct:·gc.churn.G1_Eden_Space               avgt   15  3077.737 ± 31.537  MB/sec
ProducerRequestBenchmark.constructorStruct:·gc.churn.G1_Eden_Space.norm          avgt   15   864.800 ±  8.823    B/op
ProducerRequestBenchmark.constructorStruct:·gc.churn.G1_Old_Gen                  avgt   15     0.003 ±  0.001  MB/sec
ProducerRequestBenchmark.constructorStruct:·gc.churn.G1_Old_Gen.norm             avgt   15     0.001 ±  0.001    B/op
ProducerRequestBenchmark.constructorStruct:·gc.count                             avgt   15   404.000           counts
ProducerRequestBenchmark.constructorStruct:·gc.time                              avgt   15   214.000               ms
```

**AFTER**
```
Benchmark                                                                        Mode  Cnt     Score    Error   Units
ProducerRequestBenchmark.constructorErrorResponse                                avgt   15   107.528 ±  0.270   ns/op
ProducerRequestBenchmark.constructorErrorResponse:·gc.alloc.rate                 avgt   15  4864.899 ± 12.132  MB/sec
ProducerRequestBenchmark.constructorErrorResponse:·gc.alloc.rate.norm            avgt   15   576.000 ±  0.001    B/op
ProducerRequestBenchmark.constructorErrorResponse:·gc.churn.G1_Eden_Space        avgt   15  4868.023 ± 61.943  MB/sec
ProducerRequestBenchmark.constructorErrorResponse:·gc.churn.G1_Eden_Space.norm   avgt   15   576.371 ±  7.331    B/op
ProducerRequestBenchmark.constructorErrorResponse:·gc.churn.G1_Old_Gen           avgt   15     0.005 ±  0.001  MB/sec
ProducerRequestBenchmark.constructorErrorResponse:·gc.churn.G1_Old_Gen.norm      avgt   15     0.001 ±  0.001    B/op
ProducerRequestBenchmark.constructorErrorResponse:·gc.count                      avgt   15   639.000           counts
ProducerRequestBenchmark.constructorErrorResponse:·gc.time                       avgt   15   339.000               ms
ProducerRequestBenchmark.constructorProduceRequest                               avgt   15   454.935 ±  0.332   ns/op
ProducerRequestBenchmark.constructorProduceRequest:·gc.alloc.rate                avgt   15  3769.014 ±  2.767  MB/sec
ProducerRequestBenchmark.constructorProduceRequest:·gc.alloc.rate.norm           avgt   15  1888.000 ±  0.001    B/op
ProducerRequestBenchmark.constructorProduceRequest:·gc.churn.G1_Eden_Space       avgt   15  3763.407 ± 31.530  MB/sec
ProducerRequestBenchmark.constructorProduceRequest:·gc.churn.G1_Eden_Space.norm  avgt   15  1885.190 ± 15.594    B/op
ProducerRequestBenchmark.constructorProduceRequest:·gc.churn.G1_Old_Gen          avgt   15     0.004 ±  0.001  MB/sec
ProducerRequestBenchmark.constructorProduceRequest:·gc.churn.G1_Old_Gen.norm     avgt   15     0.002 ±  0.001    B/op
ProducerRequestBenchmark.constructorProduceRequest:·gc.count                     avgt   15   494.000           counts
ProducerRequestBenchmark.constructorProduceRequest:·gc.time                      avgt   15   264.000               ms
ProducerRequestBenchmark.constructorStruct                                       avgt   15    90.728 ±  0.695   ns/op
ProducerRequestBenchmark.constructorStruct:·gc.alloc.rate                        avgt   15  3043.140 ± 23.246  MB/sec
ProducerRequestBenchmark.constructorStruct:·gc.alloc.rate.norm                   avgt   15   304.000 ±  0.001    B/op
ProducerRequestBenchmark.constructorStruct:·gc.churn.G1_Eden_Space               avgt   15  3047.251 ± 59.638  MB/sec
ProducerRequestBenchmark.constructorStruct:·gc.churn.G1_Eden_Space.norm          avgt   15   304.404 ±  5.034    B/op
ProducerRequestBenchmark.constructorStruct:·gc.churn.G1_Old_Gen                  avgt   15     0.003 ±  0.001  MB/sec
ProducerRequestBenchmark.constructorStruct:·gc.churn.G1_Old_Gen.norm             avgt   15    ≈ 10⁻⁴             B/op
ProducerRequestBenchmark.constructorStruct:·gc.count                             avgt   15   400.000           counts
ProducerRequestBenchmark.constructorStruct:·gc.time                              avgt   15   205.000               ms
```
### JMH for ProduceResponse

1. construction regression:
    - 3.293 -> 303.226 ns/op
    - 24.000 -> 1848.000 B/op
1. toStruct improvement:
    - 825.889 -> 311.725 ns/op
    - 2208.000 -> 896.000 B/op

**BEFORE**

```
Benchmark                                                                          Mode  Cnt     Score    Error   Units
ProducerResponseBenchmark.constructorProduceResponse                               avgt   15     3.293 ±  0.004   ns/op
ProducerResponseBenchmark.constructorProduceResponse:·gc.alloc.rate                avgt   15  6619.731 ±  9.075  MB/sec
ProducerResponseBenchmark.constructorProduceResponse:·gc.alloc.rate.norm           avgt   15    24.000 ±  0.001    B/op
ProducerResponseBenchmark.constructorProduceResponse:·gc.churn.G1_Eden_Space       avgt   15  6618.648 ±  0.153  MB/sec
ProducerResponseBenchmark.constructorProduceResponse:·gc.churn.G1_Eden_Space.norm  avgt   15    23.996 ±  0.033    B/op
ProducerResponseBenchmark.constructorProduceResponse:·gc.churn.G1_Old_Gen          avgt   15     0.003 ±  0.002  MB/sec
ProducerResponseBenchmark.constructorProduceResponse:·gc.churn.G1_Old_Gen.norm     avgt   15    ≈ 10⁻⁵             B/op
ProducerResponseBenchmark.constructorProduceResponse:·gc.count                     avgt   15   720.000           counts
ProducerResponseBenchmark.constructorProduceResponse:·gc.time                      avgt   15   383.000               ms
ProducerResponseBenchmark.constructorStruct                                        avgt   15   825.889 ±  0.638   ns/op
ProducerResponseBenchmark.constructorStruct:·gc.alloc.rate                         avgt   15  2428.000 ±  1.899  MB/sec
ProducerResponseBenchmark.constructorStruct:·gc.alloc.rate.norm                    avgt   15  2208.000 ±  0.001    B/op
ProducerResponseBenchmark.constructorStruct:·gc.churn.G1_Eden_Space                avgt   15  2430.196 ± 55.894  MB/sec
ProducerResponseBenchmark.constructorStruct:·gc.churn.G1_Eden_Space.norm           avgt   15  2210.001 ± 51.009    B/op
ProducerResponseBenchmark.constructorStruct:·gc.churn.G1_Old_Gen                   avgt   15     0.003 ±  0.001  MB/sec
ProducerResponseBenchmark.constructorStruct:·gc.churn.G1_Old_Gen.norm              avgt   15     0.002 ±  0.001    B/op
ProducerResponseBenchmark.constructorStruct:·gc.count                              avgt   15   319.000           counts
ProducerResponseBenchmark.constructorStruct:·gc.time                               avgt   15   166.000               ms
```

**AFTER**

```
Benchmark                                                                          Mode  Cnt     Score    Error   Units
ProducerResponseBenchmark.constructorProduceResponse                               avgt   15   303.226 ±  0.517   ns/op
ProducerResponseBenchmark.constructorProduceResponse:·gc.alloc.rate                avgt   15  5534.940 ±  9.439  MB/sec
ProducerResponseBenchmark.constructorProduceResponse:·gc.alloc.rate.norm           avgt   15  1848.000 ±  0.001    B/op
ProducerResponseBenchmark.constructorProduceResponse:·gc.churn.G1_Eden_Space       avgt   15  5534.046 ± 51.849  MB/sec
ProducerResponseBenchmark.constructorProduceResponse:·gc.churn.G1_Eden_Space.norm  avgt   15  1847.710 ± 18.105    B/op
ProducerResponseBenchmark.constructorProduceResponse:·gc.churn.G1_Old_Gen          avgt   15     0.007 ±  0.001  MB/sec
ProducerResponseBenchmark.constructorProduceResponse:·gc.churn.G1_Old_Gen.norm     avgt   15     0.002 ±  0.001    B/op
ProducerResponseBenchmark.constructorProduceResponse:·gc.count                     avgt   15   602.000           counts
ProducerResponseBenchmark.constructorProduceResponse:·gc.time                      avgt   15   318.000               ms
ProducerResponseBenchmark.constructorStruct                                        avgt   15   311.725 ±  3.132   ns/op
ProducerResponseBenchmark.constructorStruct:·gc.alloc.rate                         avgt   15  2610.602 ± 25.964  MB/sec
ProducerResponseBenchmark.constructorStruct:·gc.alloc.rate.norm                    avgt   15   896.000 ±  0.001    B/op
ProducerResponseBenchmark.constructorStruct:·gc.churn.G1_Eden_Space                avgt   15  2613.021 ± 42.965  MB/sec
ProducerResponseBenchmark.constructorStruct:·gc.churn.G1_Eden_Space.norm           avgt   15   896.824 ± 11.331    B/op
ProducerResponseBenchmark.constructorStruct:·gc.churn.G1_Old_Gen                   avgt   15     0.003 ±  0.001  MB/sec
ProducerResponseBenchmark.constructorStruct:·gc.churn.G1_Old_Gen.norm              avgt   15     0.001 ±  0.001    B/op
ProducerResponseBenchmark.constructorStruct:·gc.count                              avgt   15   343.000           counts
ProducerResponseBenchmark.constructorStruct:·gc.time                               avgt   15   194.000               ms
```

Reviewers: David Jacot <djacot@confluent.io>, Jason Gustafson <jason@confluent.io>
2020-11-18 13:44:21 -08:00
Boyang Chen bb34c5c8cc
KAFKA-10350: add forwarding manager implementation with metrics (#9580)
add forwarding manager implementation with metrics

Reviewers: David Arthur <mumrah@gmail.com>
2020-11-11 23:21:10 -08:00
Boyang Chen 0814e4f645
KAFKA-10181: Use Envelope RPC to do redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics (#9103)
This PR adds support for forwarding of the following RPCs:

AlterConfigs
IncrementalAlterConfigs
AlterClientQuotas
CreateTopics

Co-authored-by: Jason Gustafson <jason@confluent.io>
Reviewers: Jason Gustafson <jason@confluent.io>
2020-11-04 14:21:44 -08:00
Boyang Chen 9f26906fcc
Revert "KAFKA-9705 part 1: add KIP-590 request header fields (#9144)" (#9523)
This reverts commit 21dc5231ce as we decide to use Envelope for redirection instead of initial principal.

Reviewers: Jason Gustafson <jason@confluent.io>
2020-10-28 22:57:10 -07:00
Kowshik Prakasam fb4f297207
KAFKA-10028: Implement write path for feature versioning system (KIP-584) (#9001)
Summary:
In this PR, I have implemented the write path of the feature versioning system (KIP-584). Here is a summary of what's in this PR:

New APIs in org.apache.kafka.clients.admin.Admin interface, and their client and server implementations. These APIs can be used to describe features and update finalized features. These APIs are: Admin#describeFeatures and Admin#updateFeatures.
The write path is provided by the Admin#updateFeatures API. The corresponding server-side implementation is provided in KafkaApis and KafkaController classes. This can be a good place to start the code review.
The write path is supplemented by Admin#describeFeatures client API. This does not translate 1:1 to a server-side API. Instead, under the hood the API makes an explicit ApiVersionsRequest to the Broker to fetch the supported and finalized features.
Implemented a suite of integration tests in UpdateFeaturesTest.scala that thoroughly exercises the various cases in the write path.

Other changes:

The data type of the FinalizedFeaturesEpoch field in ApiVersionsResponse has been modified from int32 to int64. This change is to conform with the latest changes to the KIP explained in the voting thread.
Along the way, the class SupportedFeatures has been renamed to be called BrokerFeatures, and, it now holds both supported features as well as default minimum version levels.
For the purpose of testing, both the BrokerFeatures and FinalizedFeatureCache classes have been changed to be no longer singleton in implementation. Instead, these are now instantiated once and maintained in KafkaServer. The singleton instances are passed around to various classes, as needed.

Reviewers: Boyang Chen <boyang@confluent.io>, Jun Rao <junrao@gmail.com>
2020-10-07 10:23:16 -07:00
David Arthur 57de67db22
KAFKA-8836; Add `AlterISR` RPC and use it for ISR modifications (#9100)
This patch implements [KIP-497](https://cwiki.apache.org/confluence/display/KAFKA/KIP-497%3A+Add+inter-broker+API+to+alter+ISR), which introduces an asynchronous API for partition leaders to update ISR state.

Reviewers: Jason Gustafson <jason@confluent.io>
2020-09-24 16:28:25 -07:00
Chia-Ping Tsai 4b6d8da9fd
KAFKA-10438: Lazy initialization of record header to reduce memory usage (#9223)
There are no checks on the header key so instantiating key (bytes to string) is unnecessary.
One implication is that conversion failures will be detected a bit later, but this is consistent
with how we handle the header value.

**JMH RESULT**

1. ops: +12%
1. The optimization of memory usage is very small as the cost of creating extra ```ByteBuffer``` is
almost same to byte array copy (used to construct ```String```). Using large key results in better
improvement but I don't think large key is common case.

**BEFORE**
```
Benchmark                                                                     (bufferSupplierStr)  (bytes)  (compressionType)  (headerKeySize)  (maxBatchSize)  (maxHeaderSize)  (messageSize)  (messageVersion)   Mode  Cnt        Score      Error   Units
RecordBatchIterationBenchmark.measureValidation                                        NO_CACHING   RANDOM               NONE               10             200                5           1000                 2  thrpt   15  2035938.174 ± 1653.566   ops/s
RecordBatchIterationBenchmark.measureValidation:·gc.alloc.rate.norm                    NO_CACHING   RANDOM               NONE               10             200                5           1000                 2  thrpt   15     2040.000 ±    0.001    B/op
```

```
Benchmark                                                                     (bufferSupplierStr)  (bytes)  (compressionType)  (headerKeySize)  (maxBatchSize)  (maxHeaderSize)  (messageSize)  (messageVersion)   Mode  Cnt        Score      Error   Units
RecordBatchIterationBenchmark.measureValidation                                        NO_CACHING   RANDOM               NONE               30             200                5           1000                 2  thrpt   15  1979193.376 ± 1239.286   ops/s
RecordBatchIterationBenchmark.measureValidation:·gc.alloc.rate.norm                    NO_CACHING   RANDOM               NONE               30             200                5           1000                 2  thrpt   15     2120.000 ±    0.001    B/op
```


**AFTER**

```
Benchmark                                                                     (bufferSupplierStr)  (bytes)  (compressionType)  (headerKeySize)  (maxBatchSize)  (maxHeaderSize)  (messageSize)  (messageVersion)   Mode  Cnt        Score      Error   Units
RecordBatchIterationBenchmark.measureValidation                                        NO_CACHING   RANDOM               NONE               10             200                5           1000                 2  thrpt   15  2289115.973 ± 2661.856   ops/s
RecordBatchIterationBenchmark.measureValidation:·gc.alloc.rate.norm                    NO_CACHING   RANDOM               NONE               10             200                5           1000                 2  thrpt   15     2032.000 ±    0.001    B/op
```

```
Benchmark                                                                     (bufferSupplierStr)  (bytes)  (compressionType)  (headerKeySize)  (maxBatchSize)  (maxHeaderSize)  (messageSize)  (messageVersion)   Mode  Cnt        Score     Error   Units
RecordBatchIterationBenchmark.measureValidation                                        NO_CACHING   RANDOM               NONE               30             200                5           1000                 2  thrpt   15  2222625.706 ± 908.358   ops/s
RecordBatchIterationBenchmark.measureValidation:·gc.alloc.rate.norm                    NO_CACHING   RANDOM               NONE               30             200                5           1000                 2  thrpt   15     2040.000 ±   0.001    B/op
```

Reviewers: Ismael Juma <ismael@juma.me.uk>
2020-09-21 08:03:49 -07:00
David Arthur 1a9697430a
KAFKA-8806 Reduce calls to validateOffsetsIfNeeded (#7222)
Only check if positions need validation if there is new metadata. 

Also fix some inefficient java.util.stream code in the hot path of SubscriptionState.
2020-08-21 10:25:52 -04:00
Boyang Chen 21dc5231ce
KAFKA-9705 part 1: add KIP-590 request header fields (#9144)
Reviewers: Colin P. McCabe <cmccabe@apache.org>, David Jacot <david.jacot@gmail.com>
2020-08-18 13:38:59 -07:00
David Arthur 4cd2396db3
KAFKA-9629 Use generated protocol for Fetch API (#9008)
Refactored FetchRequest and FetchResponse to use the generated message classes for serialization and deserialization. This allows us to bypass unnecessary Struct conversion in a few places. A new "records" type was added to the message protocol which uses BaseRecords as the field type. When sending, we can set a FileRecords instance on the message, and when receiving the message class will use MemoryRecords. 

Also included a few JMH benchmarks which indicate a small performance improvement for requests with high partition counts or small record sizes.

Reviewers: Jason Gustafson <jason@confluent.io>, Boyang Chen <boyang@confluent.io>, David Jacot <djacot@confluent.io>, Lucas Bradstreet <lucas@confluent.io>, Ismael Juma <ismael@juma.me.uk>, Colin P. McCabe <cmccabe@apache.org>
2020-07-30 13:29:39 -04:00
Brian Byrne 99472c54f0
KAFKA-10158: Fix flaky testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress (#9022)
Set `replica.fetch.max.bytes` to `1` and produce multiple record batches to allow
for throttling to take place. This helps avoid a race condition where the
reassignment would complete more quickly than expected causing an
assertion to fail some times.

Reviewers: Lucas Bradstreet <lucas@confluent.io>, Jason Gustafson <jason@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>, Ismael Juma <ismael@juma.me.uk>
2020-07-26 08:28:47 -07:00
David Jacot a5ffd1ca44
KAFKA-10163; Throttle Create Topic, Create Partition and Delete Topic Operations (KIP-599, Part I, Broker Changes) (#8933)
This PR implements the broker side changes of KIP-599, except the changes of the Rate implementation which will be addressed separately. The PR changes/introduces the following:
  - It introduces the protocol changes.
  - It introduces a new quota manager ControllerMutationQuotaManager which is another specialization of the ClientQuotaManager.
  - It enforces the quota in the KafkaApis and in the AdminManager. This part handles new and old clients as described in the KIP.

Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
2020-07-22 16:38:55 +01:00
Ismael Juma e2e2c628b9
KAFKA-10074: Improve performance of `matchingAcls` (#8769)
This PR reduces allocations by using a plain old `foreach` in
`matchingAcls` and improves `AclSeqs.find` to only search the inner
collections that are required to find a match (instead of searching all
of them).

A recent change (90bbeedf52) in `matchingAcls` to remove `filterKeys` in
favor of filtering inside `flatMap` caused a performance regression in
cases where there are large number of topics, prefix ACLs and
TreeMap.from/to filtering is ineffective. In such cases, we rely on
string comparisons to exclude entries from the ACL cache that are not
relevant.

This issue is not present in any release yet, so we should include the
simple fix in the 2.6 branch.

The original benchmark did not show a performance difference, so I
adjusted the benchmark to stress the relevant code more. More
specifically, `aclCacheSnapshot.from(...).to(...)` returns nearly 20000
entries where each map value contains 1000 AclEntries. Out of the 200k
AclEntries, only 1050 are retained due to the `startsWith` filtering.

This is the case where the implementation in master is least
efficient when compared to the previous version and the version in this
PR.

The adjusted benchmark results for testAuthorizer are 4.532ms for
master, 2.903ms for the previous version and 2.877ms for this PR.
Normalized allocation rate was 593 KB/op for master, 597 KB/op for the
previous version and 101 KB/s for this PR. Full results follow:

master with adjusted benchmark:
```
Benchmark                                                                 (aclCount)  (resourceCount)  Mode  Cnt          Score          Error   Units
AclAuthorizerBenchmark.testAclsIterator                                           50           200000  avgt    5        680.805 ±       44.318   ms/op
AclAuthorizerBenchmark.testAclsIterator:·gc.alloc.rate                            50           200000  avgt    5        549.879 ±       36.259  MB/sec
AclAuthorizerBenchmark.testAclsIterator:·gc.alloc.rate.norm                       50           200000  avgt    5  411457042.000 ±     4805.461    B/op
AclAuthorizerBenchmark.testAclsIterator:·gc.churn.G1_Eden_Space                   50           200000  avgt    5        331.110 ±       95.821  MB/sec
AclAuthorizerBenchmark.testAclsIterator:·gc.churn.G1_Eden_Space.norm              50           200000  avgt    5  247799480.320 ± 72877192.319    B/op
AclAuthorizerBenchmark.testAclsIterator:·gc.churn.G1_Survivor_Space               50           200000  avgt    5          0.891 ±        3.183  MB/sec
AclAuthorizerBenchmark.testAclsIterator:·gc.churn.G1_Survivor_Space.norm          50           200000  avgt    5     667593.387 ±  2369888.357    B/op
AclAuthorizerBenchmark.testAclsIterator:·gc.count                                 50           200000  avgt    5         28.000                 counts
AclAuthorizerBenchmark.testAclsIterator:·gc.time                                  50           200000  avgt    5       3458.000                     ms
AclAuthorizerBenchmark.testAuthorizer                                             50           200000  avgt    5          4.532 ±        0.546   ms/op
AclAuthorizerBenchmark.testAuthorizer:·gc.alloc.rate                              50           200000  avgt    5        119.036 ±       14.261  MB/sec
AclAuthorizerBenchmark.testAuthorizer:·gc.alloc.rate.norm                         50           200000  avgt    5     593524.310 ±       22.452    B/op
AclAuthorizerBenchmark.testAuthorizer:·gc.churn.G1_Eden_Space                     50           200000  avgt    5        117.091 ±     1008.188  MB/sec
AclAuthorizerBenchmark.testAuthorizer:·gc.churn.G1_Eden_Space.norm                50           200000  avgt    5     598574.303 ±  5153905.271    B/op
AclAuthorizerBenchmark.testAuthorizer:·gc.churn.G1_Survivor_Space                 50           200000  avgt    5          0.034 ±        0.291  MB/sec
AclAuthorizerBenchmark.testAuthorizer:·gc.churn.G1_Survivor_Space.norm            50           200000  avgt    5        173.001 ±     1489.593    B/op
AclAuthorizerBenchmark.testAuthorizer:·gc.count                                   50           200000  avgt    5          1.000                 counts
AclAuthorizerBenchmark.testAuthorizer:·gc.time                                    50           200000  avgt    5         13.000                     ms
```

master with filterKeys like 90bbeedf52 and adjusted benchmark:
```
Benchmark                                                                 (aclCount)  (resourceCount)  Mode  Cnt          Score          Error   Units
AclAuthorizerBenchmark.testAclsIterator                                           50           200000  avgt    5        729.163 ±       20.842   ms/op
AclAuthorizerBenchmark.testAclsIterator:·gc.alloc.rate                            50           200000  avgt    5        513.005 ±       13.966  MB/sec
AclAuthorizerBenchmark.testAclsIterator:·gc.alloc.rate.norm                       50           200000  avgt    5  411459778.400 ±     3178.045    B/op
AclAuthorizerBenchmark.testAclsIterator:·gc.churn.G1_Eden_Space                   50           200000  avgt    5        307.041 ±       94.544  MB/sec
AclAuthorizerBenchmark.testAclsIterator:·gc.churn.G1_Eden_Space.norm              50           200000  avgt    5  246385400.686 ± 82294899.881    B/op
AclAuthorizerBenchmark.testAclsIterator:·gc.churn.G1_Survivor_Space               50           200000  avgt    5          1.571 ±        2.590  MB/sec
AclAuthorizerBenchmark.testAclsIterator:·gc.churn.G1_Survivor_Space.norm          50           200000  avgt    5    1258291.200 ±  2063669.849    B/op
AclAuthorizerBenchmark.testAclsIterator:·gc.count                                 50           200000  avgt    5         33.000                 counts
AclAuthorizerBenchmark.testAclsIterator:·gc.time                                  50           200000  avgt    5       3266.000                     ms
AclAuthorizerBenchmark.testAuthorizer                                             50           200000  avgt    5          2.903 ±        0.175   ms/op
AclAuthorizerBenchmark.testAuthorizer:·gc.alloc.rate                              50           200000  avgt    5        187.088 ±       11.301  MB/sec
AclAuthorizerBenchmark.testAuthorizer:·gc.alloc.rate.norm                         50           200000  avgt    5     597962.743 ±       14.237    B/op
AclAuthorizerBenchmark.testAuthorizer:·gc.churn.G1_Eden_Space                     50           200000  avgt    5        118.602 ±     1021.202  MB/sec
AclAuthorizerBenchmark.testAuthorizer:·gc.churn.G1_Eden_Space.norm                50           200000  avgt    5     383359.632 ±  3300842.044    B/op
AclAuthorizerBenchmark.testAuthorizer:·gc.count                                   50           200000  avgt    5          1.000                 counts
AclAuthorizerBenchmark.testAuthorizer:·gc.time                                    50           200000  avgt    5         14.000                     ms
```

This PR with adjusted benchmark:
```
Benchmark                                                                 (aclCount)  (resourceCount)  Mode  Cnt          Score          Error   Units
AclAuthorizerBenchmark.testAclsIterator                                           50           200000  avgt    5        706.774 ±       32.353   ms/op
AclAuthorizerBenchmark.testAclsIterator:·gc.alloc.rate                            50           200000  avgt    5        529.879 ±       25.416  MB/sec
AclAuthorizerBenchmark.testAclsIterator:·gc.alloc.rate.norm                       50           200000  avgt    5  411458751.497 ±     4424.187    B/op
AclAuthorizerBenchmark.testAclsIterator:·gc.churn.G1_Eden_Space                   50           200000  avgt    5        310.559 ±      112.310  MB/sec
AclAuthorizerBenchmark.testAclsIterator:·gc.churn.G1_Eden_Space.norm              50           200000  avgt    5  241364219.611 ± 97317733.967    B/op
AclAuthorizerBenchmark.testAclsIterator:·gc.churn.G1_Old_Gen                      50           200000  avgt    5          0.690 ±        5.937  MB/sec
AclAuthorizerBenchmark.testAclsIterator:·gc.churn.G1_Old_Gen.norm                 50           200000  avgt    5     531278.507 ±  4574468.166    B/op
AclAuthorizerBenchmark.testAclsIterator:·gc.churn.G1_Survivor_Space               50           200000  avgt    5          2.550 ±       17.243  MB/sec
AclAuthorizerBenchmark.testAclsIterator:·gc.churn.G1_Survivor_Space.norm          50           200000  avgt    5    1969325.592 ± 13278191.648    B/op
AclAuthorizerBenchmark.testAclsIterator:·gc.count                                 50           200000  avgt    5         32.000                 counts
AclAuthorizerBenchmark.testAclsIterator:·gc.time                                  50           200000  avgt    5       3489.000                     ms
AclAuthorizerBenchmark.testAuthorizer                                             50           200000  avgt    5          2.877 ±        0.530   ms/op
AclAuthorizerBenchmark.testAuthorizer:·gc.alloc.rate                              50           200000  avgt    5         31.963 ±        5.912  MB/sec
AclAuthorizerBenchmark.testAuthorizer:·gc.alloc.rate.norm                         50           200000  avgt    5     101057.225 ±        9.468    B/op
AclAuthorizerBenchmark.testAuthorizer:·gc.count                                   50           200000  avgt    5            ≈ 0                 counts
```

Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>, Chia-Ping Tsai <chia7712@gmail.com>
2020-06-01 07:01:18 -07:00
Jason Gustafson 2cbf6be54a
KAFKA-9952; Remove immediate fetch completion logic on high watermark updates (#8709)
For KIP-392, we added logic to make sure that high watermark changes are propagated to followers without delay in order to improve end to end latency when fetching from followers. The downside of this change is that it can increase the rate of fetch requests from followers which can have a noticeable impact on performance (see KAFKA-9731). 

To fix that problem, we have previously modified the code so that we only propagate high watermark changes immediately when a replica selector is used (which is not the default). However, leaving this logic around means that it is risky to enable follower fetching since it changes the follower request rate, which can have a big impact on overall broker performance. 

This patch disables immediate propagation of the high watermark more generally. Instead, users can use the max wait time in order to control the worst-case latency. Note that this is typically only a problem anyway for low-throughput clusters since otherwise we will always have a steady rate of high watermark updates.

Reviewers: Ismael Juma <ismael@juma.me.uk>
2020-05-27 14:41:49 -07:00
Lucas Bradstreet cfc34cace5
MINOR: reduce allocations in log start and recovery checkpoints (#8467)
For brokers with replica counts > 4000, allocations from logsByDir become
substantial. logsByDir is called often by LogManager.checkpointLogRecoveryOffsets
and LogManager.checkpointLogStartOffsets. The approach used is similar to the
one from the checkpointHighwatermarks change in
https://github.com/apache/kafka/pull/6741.

Are there better ways to structure out data structure to avoid creating logsByDir on
demand for each checkpoint iteration? This micro-optimization will help as is, but if
we can avoid doing this completely it'd be better.

JMH benchmark results:
```
Before:
Benchmark                                                                      (numPartitions)  (numTopics)   Mode  Cnt        Score        Error   Units
CheckpointBench.measureCheckpointLogStartOffsets                                             3          100  thrpt   15        2.233 ±      0.013  ops/ms
CheckpointBench.measureCheckpointLogStartOffsets:·gc.alloc.rate                              3          100  thrpt   15      477.097 ±     49.731  MB/sec
CheckpointBench.measureCheckpointLogStartOffsets:·gc.alloc.rate.norm                         3          100  thrpt   15   246083.007 ±     33.052    B/op
CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Eden_Space                     3          100  thrpt   15      475.683 ±     55.569  MB/sec
CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Eden_Space.norm                3          100  thrpt   15   245474.040 ±  14968.328    B/op
CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Old_Gen                        3          100  thrpt   15        0.001 ±      0.001  MB/sec
CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Old_Gen.norm                   3          100  thrpt   15        0.341 ±      0.268    B/op
CheckpointBench.measureCheckpointLogStartOffsets:·gc.count                                   3          100  thrpt   15      129.000               counts
CheckpointBench.measureCheckpointLogStartOffsets:·gc.time                                    3          100  thrpt   15       52.000                   ms
CheckpointBench.measureCheckpointLogStartOffsets                                             3         1000  thrpt   15        0.572 ±      0.004  ops/ms
CheckpointBench.measureCheckpointLogStartOffsets:·gc.alloc.rate                              3         1000  thrpt   15     1360.240 ±    150.539  MB/sec
CheckpointBench.measureCheckpointLogStartOffsets:·gc.alloc.rate.norm                         3         1000  thrpt   15  2750221.257 ±    891.024    B/op
CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Eden_Space                     3         1000  thrpt   15     1362.908 ±    148.799  MB/sec
CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Eden_Space.norm                3         1000  thrpt   15  2756395.092 ±  44671.843    B/op
CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Old_Gen                        3         1000  thrpt   15        0.017 ±      0.008  MB/sec
CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Old_Gen.norm                   3         1000  thrpt   15       33.611 ±     14.401    B/op
CheckpointBench.measureCheckpointLogStartOffsets:·gc.count                                   3         1000  thrpt   15      273.000               counts
CheckpointBench.measureCheckpointLogStartOffsets:·gc.time                                    3         1000  thrpt   15      186.000                   ms
CheckpointBench.measureCheckpointLogStartOffsets                                             3         2000  thrpt   15        0.266 ±      0.002  ops/ms
CheckpointBench.measureCheckpointLogStartOffsets:·gc.alloc.rate                              3         2000  thrpt   15     1342.557 ±    171.260  MB/sec
CheckpointBench.measureCheckpointLogStartOffsets:·gc.alloc.rate.norm                         3         2000  thrpt   15  5877881.729 ±   3695.086    B/op
CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Eden_Space                     3         2000  thrpt   15     1343.965 ±    186.069  MB/sec
CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Eden_Space.norm                3         2000  thrpt   15  5877788.561 ± 168540.343    B/op
CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Old_Gen                        3         2000  thrpt   15        0.081 ±      0.043  MB/sec
CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Old_Gen.norm                   3         2000  thrpt   15      351.277 ±    167.006    B/op
CheckpointBench.measureCheckpointLogStartOffsets:·gc.count                                   3         2000  thrpt   15      253.000               counts
CheckpointBench.measureCheckpointLogStartOffsets:·gc.time                                    3         2000  thrpt   15      231.000                   ms
JMH benchmarks done

After:
CheckpointBench.measureCheckpointLogStartOffsets                                             3          100  thrpt   15        2.809 ±     0.129  ops/ms
CheckpointBench.measureCheckpointLogStartOffsets:·gc.alloc.rate                              3          100  thrpt   15      211.248 ±    25.953  MB/sec
CheckpointBench.measureCheckpointLogStartOffsets:·gc.alloc.rate.norm                         3          100  thrpt   15    86533.838 ±  3763.989    B/op
CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Eden_Space                     3          100  thrpt   15      211.512 ±    38.669  MB/sec
CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Eden_Space.norm                3          100  thrpt   15    86228.552 ±  9590.781    B/op
CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Old_Gen                        3          100  thrpt   15       ≈ 10⁻³              MB/sec
CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Old_Gen.norm                   3          100  thrpt   15        0.140 ±     0.111    B/op
CheckpointBench.measureCheckpointLogStartOffsets:·gc.count                                   3          100  thrpt   15       57.000              counts
CheckpointBench.measureCheckpointLogStartOffsets:·gc.time                                    3          100  thrpt   15       25.000                  ms
CheckpointBench.measureCheckpointLogStartOffsets                                             3         1000  thrpt   15        1.046 ±     0.030  ops/ms
CheckpointBench.measureCheckpointLogStartOffsets:·gc.alloc.rate                              3         1000  thrpt   15      524.597 ±    74.793  MB/sec
CheckpointBench.measureCheckpointLogStartOffsets:·gc.alloc.rate.norm                         3         1000  thrpt   15   582898.889 ± 37552.262    B/op
CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Eden_Space                     3         1000  thrpt   15      519.675 ±    89.754  MB/sec
CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Eden_Space.norm                3         1000  thrpt   15   576371.150 ± 55972.955    B/op
CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Old_Gen                        3         1000  thrpt   15        0.009 ±     0.005  MB/sec
CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Old_Gen.norm                   3         1000  thrpt   15        9.920 ±     5.375    B/op
CheckpointBench.measureCheckpointLogStartOffsets:·gc.count                                   3         1000  thrpt   15      111.000              counts
CheckpointBench.measureCheckpointLogStartOffsets:·gc.time                                    3         1000  thrpt   15       56.000                  ms
CheckpointBench.measureCheckpointLogStartOffsets                                             3         2000  thrpt   15        0.617 ±     0.007  ops/ms
CheckpointBench.measureCheckpointLogStartOffsets:·gc.alloc.rate                              3         2000  thrpt   15      573.061 ±    95.931  MB/sec
CheckpointBench.measureCheckpointLogStartOffsets:·gc.alloc.rate.norm                         3         2000  thrpt   15  1092098.004 ± 75140.633    B/op
CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Eden_Space                     3         2000  thrpt   15      572.448 ±    97.960  MB/sec
CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Eden_Space.norm                3         2000  thrpt   15  1091290.460 ± 85946.164    B/op
CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Old_Gen                        3         2000  thrpt   15        0.010 ±     0.012  MB/sec
CheckpointBench.measureCheckpointLogStartOffsets:·gc.churn.G1_Old_Gen.norm                   3         2000  thrpt   15       19.990 ±    24.407    B/op
CheckpointBench.measureCheckpointLogStartOffsets:·gc.count                                   3         2000  thrpt   15      109.000              counts
CheckpointBench.measureCheckpointLogStartOffsets:·gc.time                                    3         2000  thrpt   15       67.000                  ms
JMH benchmarks done

```

For the 2000 topic, 3 partition case, we see a reduction in normalized allocations from 5877881B/op to 1284190.774B/op, a reduction of 78%.

Some allocation profiles from a mid sized broker follow. I have seen worse, but these
add up to around 3.8% on a broker that saw GC overhead in CPU time of around 30%.
You could argue that this is relatively small, but it seems worthwhile for a low risk change.

![image](https://user-images.githubusercontent.com/252189/79058104-33e91d80-7c1e-11ea-99c9-0cf2e3571e1f.png)
![image](https://user-images.githubusercontent.com/252189/79058105-38add180-7c1e-11ea-8bfd-6e6eafb0c794.png)

Reviewers: Ismael Juma <ismael@juma.me.uk>
2020-04-25 13:54:59 -07:00
Lucas Bradstreet 37990f9099
MINOR: fix inaccurate RecordBatchIterationBenchmark.measureValidation benchmark (#8428)
KAFKA-9820 (https://github.com/apache/kafka/pull/8422) added a benchmark of LogValidator.validateMessagesAndAssignOffsetsCompressed. Unfortunately it instantiated BrokerTopicStats within the benchmark itself, and it is expensive. The fixed benchmark does not change the outcome of the improvement in KAFKA-9820, and actually increases the magnitude of the improvement in percentage terms.

```
Updated benchmark before KAFKA-9820:
Benchmark                                                                     (bufferSupplierStr)  (bytes)  (compressionType)  (maxBatchSize)  (messageSize)  (messageVersion)   Mode  Cnt       Score      Error   Units
RecordBatchIterationBenchmark.measureValidation                                        NO_CACHING   RANDOM                LZ4               1           1000                 2  thrpt   15  164173.236 ± 2927.701   ops/s
RecordBatchIterationBenchmark.measureValidation:·gc.alloc.rate                         NO_CACHING   RANDOM                LZ4               1           1000                 2  thrpt   15   20440.980 ±  364.411  MB/sec
RecordBatchIterationBenchmark.measureValidation:·gc.alloc.rate.norm                    NO_CACHING   RANDOM                LZ4               1           1000                 2  thrpt   15  137120.002 ±    0.002    B/op
RecordBatchIterationBenchmark.measureValidation:·gc.churn.G1_Eden_Space                NO_CACHING   RANDOM                LZ4               1           1000                 2  thrpt   15   20708.378 ±  372.041  MB/sec
RecordBatchIterationBenchmark.measureValidation:·gc.churn.G1_Eden_Space.norm           NO_CACHING   RANDOM                LZ4               1           1000                 2  thrpt   15  138913.935 ±  398.960    B/op
RecordBatchIterationBenchmark.measureValidation:·gc.churn.G1_Old_Gen                   NO_CACHING   RANDOM                LZ4               1           1000                 2  thrpt   15       0.547 ±    0.107  MB/sec
RecordBatchIterationBenchmark.measureValidation:·gc.churn.G1_Old_Gen.norm              NO_CACHING   RANDOM                LZ4               1           1000                 2  thrpt   15       3.664 ±    0.689    B/op
RecordBatchIterationBenchmark.measureValidation:·gc.count                              NO_CACHING   RANDOM                LZ4               1           1000                 2  thrpt   15    2713.000             counts
RecordBatchIterationBenchmark.measureValidation:·gc.time                               NO_CACHING   RANDOM                LZ4               1           1000                 2  thrpt   15    1398.000                 ms
RecordBatchIterationBenchmark.measureValidation                                        NO_CACHING   RANDOM                LZ4               2           1000                 2  thrpt   15  164305.533 ± 5143.457   ops/s
RecordBatchIterationBenchmark.measureValidation:·gc.alloc.rate                         NO_CACHING   RANDOM                LZ4               2           1000                 2  thrpt   15   20490.828 ±  641.408  MB/sec
RecordBatchIterationBenchmark.measureValidation:·gc.alloc.rate.norm                    NO_CACHING   RANDOM                LZ4               2           1000                 2  thrpt   15  137328.002 ±    0.002    B/op
RecordBatchIterationBenchmark.measureValidation:·gc.churn.G1_Eden_Space                NO_CACHING   RANDOM                LZ4               2           1000                 2  thrpt   15   20767.922 ±  648.843  MB/sec
RecordBatchIterationBenchmark.measureValidation:·gc.churn.G1_Eden_Space.norm           NO_CACHING   RANDOM                LZ4               2           1000                 2  thrpt   15  139185.616 ±  325.790    B/op
RecordBatchIterationBenchmark.measureValidation:·gc.churn.G1_Old_Gen                   NO_CACHING   RANDOM                LZ4               2           1000                 2  thrpt   15       0.681 ±    0.053  MB/sec
RecordBatchIterationBenchmark.measureValidation:·gc.churn.G1_Old_Gen.norm              NO_CACHING   RANDOM                LZ4               2           1000                 2  thrpt   15       4.560 ±    0.292    B/op
RecordBatchIterationBenchmark.measureValidation:·gc.count                              NO_CACHING   RANDOM                LZ4               2           1000                 2  thrpt   15    3101.000             counts
RecordBatchIterationBenchmark.measureValidation:·gc.time                               NO_CACHING   RANDOM                LZ4               2           1000                 2  thrpt   15    1538.000                 ms
RecordBatchIterationBenchmark.measureValidation                                        NO_CACHING   RANDOM                LZ4              10           1000                 2  thrpt   15  169572.635 ±  595.613   ops/s
RecordBatchIterationBenchmark.measureValidation:·gc.alloc.rate                         NO_CACHING   RANDOM                LZ4              10           1000                 2  thrpt   15   21129.934 ±   74.618  MB/sec
RecordBatchIterationBenchmark.measureValidation:·gc.alloc.rate.norm                    NO_CACHING   RANDOM                LZ4              10           1000                 2  thrpt   15  137216.002 ±    0.002    B/op
RecordBatchIterationBenchmark.measureValidation:·gc.churn.G1_Eden_Space                NO_CACHING   RANDOM                LZ4              10           1000                 2  thrpt   15   21410.416 ±   70.458  MB/sec
RecordBatchIterationBenchmark.measureValidation:·gc.churn.G1_Eden_Space.norm           NO_CACHING   RANDOM                LZ4              10           1000                 2  thrpt   15  139037.806 ±  309.278    B/op
RecordBatchIterationBenchmark.measureValidation:·gc.churn.G1_Old_Gen                   NO_CACHING   RANDOM                LZ4              10           1000                 2  thrpt   15       0.312 ±    0.420  MB/sec
RecordBatchIterationBenchmark.measureValidation:·gc.churn.G1_Old_Gen.norm              NO_CACHING   RANDOM                LZ4              10           1000                 2  thrpt   15       2.026 ±    2.725    B/op
RecordBatchIterationBenchmark.measureValidation:·gc.count                              NO_CACHING   RANDOM                LZ4              10           1000                 2  thrpt   15    3398.000             counts
RecordBatchIterationBenchmark.measureValidation:·gc.time                               NO_CACHING   RANDOM                LZ4              10           1000                 2  thrpt   15    1701.000                 ms
JMH benchmarks done


Updated benchmark after KAFKA-9820:
Benchmark                                                                     (bufferSupplierStr)  (bytes)  (compressionType)  (maxBatchSize)  (messageSize)  (messageVersion)   Mode  Cnt       Score     Error   Units
RecordBatchIterationBenchmark.measureValidation                                        NO_CACHING   RANDOM                LZ4               1           1000                 2  thrpt   15  322678.586 ± 254.126   ops/s
RecordBatchIterationBenchmark.measureValidation:·gc.alloc.rate                         NO_CACHING   RANDOM                LZ4               1           1000                 2  thrpt   15   20376.474 ±  15.326  MB/sec
RecordBatchIterationBenchmark.measureValidation:·gc.alloc.rate.norm                    NO_CACHING   RANDOM                LZ4               1           1000                 2  thrpt   15   69544.001 ±   0.001    B/op
RecordBatchIterationBenchmark.measureValidation:·gc.churn.G1_Eden_Space                NO_CACHING   RANDOM                LZ4               1           1000                 2  thrpt   15   20485.394 ±  44.087  MB/sec
RecordBatchIterationBenchmark.measureValidation:·gc.churn.G1_Eden_Space.norm           NO_CACHING   RANDOM                LZ4               1           1000                 2  thrpt   15   69915.744 ± 143.372    B/op
RecordBatchIterationBenchmark.measureValidation:·gc.churn.G1_Old_Gen                   NO_CACHING   RANDOM                LZ4               1           1000                 2  thrpt   15       0.027 ±   0.002  MB/sec
RecordBatchIterationBenchmark.measureValidation:·gc.churn.G1_Old_Gen.norm              NO_CACHING   RANDOM                LZ4               1           1000                 2  thrpt   15       0.091 ±   0.008    B/op
RecordBatchIterationBenchmark.measureValidation:·gc.count                              NO_CACHING   RANDOM                LZ4               1           1000                 2  thrpt   15    3652.000            counts
RecordBatchIterationBenchmark.measureValidation:·gc.time                               NO_CACHING   RANDOM                LZ4               1           1000                 2  thrpt   15    1773.000                ms
RecordBatchIterationBenchmark.measureValidation                                        NO_CACHING   RANDOM                LZ4               2           1000                 2  thrpt   15  321332.070 ± 869.841   ops/s
RecordBatchIterationBenchmark.measureValidation:·gc.alloc.rate                         NO_CACHING   RANDOM                LZ4               2           1000                 2  thrpt   15   20303.259 ±  55.609  MB/sec
RecordBatchIterationBenchmark.measureValidation:·gc.alloc.rate.norm                    NO_CACHING   RANDOM                LZ4               2           1000                 2  thrpt   15   69600.001 ±   0.001    B/op
RecordBatchIterationBenchmark.measureValidation:·gc.churn.G1_Eden_Space                NO_CACHING   RANDOM                LZ4               2           1000                 2  thrpt   15   20394.052 ±  72.842  MB/sec
RecordBatchIterationBenchmark.measureValidation:·gc.churn.G1_Eden_Space.norm           NO_CACHING   RANDOM                LZ4               2           1000                 2  thrpt   15   69911.238 ± 160.177    B/op
RecordBatchIterationBenchmark.measureValidation:·gc.churn.G1_Old_Gen                   NO_CACHING   RANDOM                LZ4               2           1000                 2  thrpt   15       0.028 ±   0.003  MB/sec
RecordBatchIterationBenchmark.measureValidation:·gc.churn.G1_Old_Gen.norm              NO_CACHING   RANDOM                LZ4               2           1000                 2  thrpt   15       0.096 ±   0.010    B/op
RecordBatchIterationBenchmark.measureValidation:·gc.count                              NO_CACHING   RANDOM                LZ4               2           1000                 2  thrpt   15    3637.000            counts
RecordBatchIterationBenchmark.measureValidation:·gc.time                               NO_CACHING   RANDOM                LZ4               2           1000                 2  thrpt   15    1790.000                ms
RecordBatchIterationBenchmark.measureValidation                                        NO_CACHING   RANDOM                LZ4              10           1000                 2  thrpt   15  315490.355 ± 271.921   ops/s
RecordBatchIterationBenchmark.measureValidation:·gc.alloc.rate                         NO_CACHING   RANDOM                LZ4              10           1000                 2  thrpt   15   19943.166 ±  21.235  MB/sec
RecordBatchIterationBenchmark.measureValidation:·gc.alloc.rate.norm                    NO_CACHING   RANDOM                LZ4              10           1000                 2  thrpt   15   69640.001 ±   0.001    B/op
RecordBatchIterationBenchmark.measureValidation:·gc.churn.G1_Eden_Space                NO_CACHING   RANDOM                LZ4              10           1000                 2  thrpt   15   20020.263 ±  43.144  MB/sec
RecordBatchIterationBenchmark.measureValidation:·gc.churn.G1_Eden_Space.norm           NO_CACHING   RANDOM                LZ4              10           1000                 2  thrpt   15   69909.228 ± 136.413    B/op
RecordBatchIterationBenchmark.measureValidation:·gc.churn.G1_Old_Gen                   NO_CACHING   RANDOM                LZ4              10           1000                 2  thrpt   15       0.026 ±   0.002  MB/sec
RecordBatchIterationBenchmark.measureValidation:·gc.churn.G1_Old_Gen.norm              NO_CACHING   RANDOM                LZ4              10           1000                 2  thrpt   15       0.090 ±   0.008    B/op
RecordBatchIterationBenchmark.measureValidation:·gc.count                              NO_CACHING   RANDOM                LZ4              10           1000                 2  thrpt   15    3571.000            counts
RecordBatchIterationBenchmark.measureValidation:·gc.time                               NO_CACHING   RANDOM                LZ4              10           1000                 2  thrpt   15    1764.000                ms
```

Reviewers: Ismael Juma <ismael@juma.me.uk>
2020-04-05 14:24:54 -07:00
Lucas Bradstreet 46540eb5e0
KAFKA-9820: validateMessagesAndAssignOffsetsCompressed allocates unused iterator (#8422)
3e9d1c1411 introduced skipKeyValueIterator(s) which were intended to be used, but in this case were created but were not used in offset validation.

A subset of the benchmark results follow. Looks like a 20% improvement in validation performance and a 40% reduction in garbage allocation for 1-2 batch sizes.

**# Parameters: (bufferSupplierStr = NO_CACHING, bytes = RANDOM, compressionType = LZ4, maxBatchSize = 1, messageSize = 1000, messageVersion = 2)**

Before:
Result "org.apache.kafka.jmh.record.RecordBatchIterationBenchmark.measureValidation":
  64851.837 ±(99.9%) 944.248 ops/s [Average]              
  (min, avg, max) = (64505.317, 64851.837, 65114.359), stdev = 245.218
  CI (99.9%): [63907.589, 65796.084] (assumes normal distribution)                                       
                                                             
"org.apache.kafka.jmh.record.RecordBatchIterationBenchmark.measureValidation:·gc.alloc.rate.norm":
  164088.003 ±(99.9%) 0.004 B/op [Average]                                                                                 
  (min, avg, max) = (164088.001, 164088.003, 164088.004), stdev = 0.001
  CI (99.9%): [164087.998, 164088.007] (assumes normal distribution)

After:

Result "org.apache.kafka.jmh.record.RecordBatchIterationBenchmark.measureValidation":                                      
  78910.273 ±(99.9%) 707.024 ops/s [Average]                                                                               
  (min, avg, max) = (78785.486, 78910.273, 79234.007), stdev = 183.612                                                     
  CI (99.9%): [78203.249, 79617.297] (assumes normal distribution)                                       

"org.apache.kafka.jmh.record.RecordBatchIterationBenchmark.measureValidation:·gc.alloc.rate.norm":                                                                                                                                   
  96440.002 ±(99.9%) 0.001 B/op [Average]                                                                                  
  (min, avg, max) = (96440.002, 96440.002, 96440.002), stdev = 0.001                                                       
  CI (99.9%): [96440.002, 96440.003] (assumes normal distribution)   

 **# Parameters: (bufferSupplierStr = NO_CACHING, bytes = RANDOM, compressionType = LZ4, maxBatchSize = 2, messageSize = 1000, messageVersion = 2)**

Before:
Result "org.apache.kafka.jmh.record.RecordBatchIterationBenchmark.measureValidation":                                      
  64815.364 ±(99.9%) 639.309 ops/s [Average]                                                                               
  (min, avg, max) = (64594.545, 64815.364, 64983.305), stdev = 166.026                                                                                                                                                                                
  CI (99.9%): [64176.056, 65454.673] (assumes normal distribution)                                                         
                                                                                                                                                                                        "org.apache.kafka.jmh.record.RecordBatchIterationBenchmark.measureValidation:·gc.alloc.rate.norm":        
  163944.003 ±(99.9%) 0.001 B/op [Average]                                                                                 
  (min, avg, max) = (163944.002, 163944.003, 163944.003), stdev = 0.001                                                    
  CI (99.9%): [163944.002, 163944.004] (assumes normal distribution)                                     

After:
Result "org.apache.kafka.jmh.record.RecordBatchIterationBenchmark.measureValidation":
  77075.096 ±(99.9%) 201.092 ops/s [Average]              
  (min, avg, max) = (77021.537, 77075.096, 77129.693), stdev = 52.223
  CI (99.9%): [76874.003, 77276.188] (assumes normal distribution)                                       
                                                             
"org.apache.kafka.jmh.record.RecordBatchIterationBenchmark.measureValidation:·gc.alloc.rate.norm":
  96504.002 ±(99.9%) 0.003 B/op [Average]                                                                                  
  (min, avg, max) = (96504.001, 96504.002, 96504.003), stdev = 0.001
  CI (99.9%): [96503.999, 96504.005] (assumes normal distribution)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Ismael Juma <ismael@juma.me.uk>
2020-04-04 10:05:51 -07:00
Ismael Juma 222726d6f9
KAFKA-9373: Reduce shutdown time by avoiding unnecessary loading of indexes (#8346)
KAFKA-7283 enabled lazy mmap on index files by initializing indices
on-demand rather than performing costly disk/memory operations when
creating all indices on broker startup. This helped reducing the startup
time of brokers. However, segment indices are still created on closing
segments, regardless of whether they need to be closed or not.

This is a cleaned up version of #7900, which was submitted by @efeg. It
eliminates unnecessary disk accesses and memory map operations while
deleting, renaming or closing offset and time indexes.

In a cluster with 31 brokers, where each broker has 13K to 20K segments,
@efeg and team observed up to 2 orders of magnitude faster LogManager
shutdown times - i.e. dropping the LogManager shutdown time of each
broker from 10s of seconds to 100s of milliseconds.

To avoid confusion between `renameTo` and `setFile`, I replaced the
latter with the more restricted updateParentDir` (it turns out that's
all we need).

Reviewers: Jun Rao <junrao@gmail.com>, Andrew Choi <a24choi@edu.uwaterloo.ca>

Co-authored-by: Adem Efe Gencer <agencer@linkedin.com>
Co-authored-by: Ismael Juma <ismael@juma.me.uk>
2020-03-26 04:26:51 -07:00
Gardner Vickers 8cf781ef01
MINOR: Improve performance of checkpointHighWatermarks, patch 1/2 (#6741)
This PR works to improve high watermark checkpointing performance.

`ReplicaManager.checkpointHighWatermarks()` was found to be a major contributor to GC pressure, especially on Kafka clusters with high partition counts and low throughput.

Added a JMH benchmark for `checkpointHighWatermarks` which establishes a
performance baseline. The parameterized benchmark was run with 100, 1000 and
2000 topics. 

Modified `ReplicaManager.checkpointHighWatermarks()` to avoid extra copies and cached
the Log parent directory Sting to avoid frequent allocations when calculating
`File.getParent()`.

A few clean-ups:
* Changed all usages of Log.dir.getParent to Log.parentDir and Log.dir.getParentFile to
Log.parentDirFile.
* Only expose public accessor for `Log.dir` (consistent with `Log.parentDir`)
* Removed unused parameters in `Partition.makeLeader`, `Partition.makeFollower` and `Partition.createLogIfNotExists`.

Benchmark results:

| Topic Count | Ops/ms | MB/sec allocated |
|-------------|---------|------------------|
| 100               | + 51%    |  - 91% |
| 1000             | + 143% |  - 49% |
| 2000            | + 149% |   - 50% |

Reviewers: Lucas Bradstreet <lucas@confluent.io>. Ismael Juma <ismael@juma.me.uk>

Co-authored-by: Gardner Vickers <gardner@vickers.me>
Co-authored-by: Ismael Juma <ismael@juma.me.uk>
2020-03-25 20:53:42 -07:00
Manikumar Reddy a0e1407820
KAFKA-9670; Reduce allocations in Metadata Response preparation (#8236)
This PR removes  intermediate  conversions between `MetadataResponse.TopicMetadata` => `MetadataResponseTopic` and `MetadataResponse.PartitionMetadata` => `MetadataResponsePartition` objects.

There is 15-20% reduction in object allocations and 5-10% improvement in metadata request performance.

Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson<jason@confluent.io>
2020-03-16 09:30:48 -07:00