Commit Graph

25 Commits

Author SHA1 Message Date
Lucas Bradstreet 37990f9099
MINOR: fix inaccurate RecordBatchIterationBenchmark.measureValidation benchmark (#8428)
KAFKA-9820 (https://github.com/apache/kafka/pull/8422) added a benchmark of LogValidator.validateMessagesAndAssignOffsetsCompressed. Unfortunately it instantiated BrokerTopicStats within the benchmark itself, and it is expensive. The fixed benchmark does not change the outcome of the improvement in KAFKA-9820, and actually increases the magnitude of the improvement in percentage terms.

```
Updated benchmark before KAFKA-9820:
Benchmark                                                                     (bufferSupplierStr)  (bytes)  (compressionType)  (maxBatchSize)  (messageSize)  (messageVersion)   Mode  Cnt       Score      Error   Units
RecordBatchIterationBenchmark.measureValidation                                        NO_CACHING   RANDOM                LZ4               1           1000                 2  thrpt   15  164173.236 ± 2927.701   ops/s
RecordBatchIterationBenchmark.measureValidation:·gc.alloc.rate                         NO_CACHING   RANDOM                LZ4               1           1000                 2  thrpt   15   20440.980 ±  364.411  MB/sec
RecordBatchIterationBenchmark.measureValidation:·gc.alloc.rate.norm                    NO_CACHING   RANDOM                LZ4               1           1000                 2  thrpt   15  137120.002 ±    0.002    B/op
RecordBatchIterationBenchmark.measureValidation:·gc.churn.G1_Eden_Space                NO_CACHING   RANDOM                LZ4               1           1000                 2  thrpt   15   20708.378 ±  372.041  MB/sec
RecordBatchIterationBenchmark.measureValidation:·gc.churn.G1_Eden_Space.norm           NO_CACHING   RANDOM                LZ4               1           1000                 2  thrpt   15  138913.935 ±  398.960    B/op
RecordBatchIterationBenchmark.measureValidation:·gc.churn.G1_Old_Gen                   NO_CACHING   RANDOM                LZ4               1           1000                 2  thrpt   15       0.547 ±    0.107  MB/sec
RecordBatchIterationBenchmark.measureValidation:·gc.churn.G1_Old_Gen.norm              NO_CACHING   RANDOM                LZ4               1           1000                 2  thrpt   15       3.664 ±    0.689    B/op
RecordBatchIterationBenchmark.measureValidation:·gc.count                              NO_CACHING   RANDOM                LZ4               1           1000                 2  thrpt   15    2713.000             counts
RecordBatchIterationBenchmark.measureValidation:·gc.time                               NO_CACHING   RANDOM                LZ4               1           1000                 2  thrpt   15    1398.000                 ms
RecordBatchIterationBenchmark.measureValidation                                        NO_CACHING   RANDOM                LZ4               2           1000                 2  thrpt   15  164305.533 ± 5143.457   ops/s
RecordBatchIterationBenchmark.measureValidation:·gc.alloc.rate                         NO_CACHING   RANDOM                LZ4               2           1000                 2  thrpt   15   20490.828 ±  641.408  MB/sec
RecordBatchIterationBenchmark.measureValidation:·gc.alloc.rate.norm                    NO_CACHING   RANDOM                LZ4               2           1000                 2  thrpt   15  137328.002 ±    0.002    B/op
RecordBatchIterationBenchmark.measureValidation:·gc.churn.G1_Eden_Space                NO_CACHING   RANDOM                LZ4               2           1000                 2  thrpt   15   20767.922 ±  648.843  MB/sec
RecordBatchIterationBenchmark.measureValidation:·gc.churn.G1_Eden_Space.norm           NO_CACHING   RANDOM                LZ4               2           1000                 2  thrpt   15  139185.616 ±  325.790    B/op
RecordBatchIterationBenchmark.measureValidation:·gc.churn.G1_Old_Gen                   NO_CACHING   RANDOM                LZ4               2           1000                 2  thrpt   15       0.681 ±    0.053  MB/sec
RecordBatchIterationBenchmark.measureValidation:·gc.churn.G1_Old_Gen.norm              NO_CACHING   RANDOM                LZ4               2           1000                 2  thrpt   15       4.560 ±    0.292    B/op
RecordBatchIterationBenchmark.measureValidation:·gc.count                              NO_CACHING   RANDOM                LZ4               2           1000                 2  thrpt   15    3101.000             counts
RecordBatchIterationBenchmark.measureValidation:·gc.time                               NO_CACHING   RANDOM                LZ4               2           1000                 2  thrpt   15    1538.000                 ms
RecordBatchIterationBenchmark.measureValidation                                        NO_CACHING   RANDOM                LZ4              10           1000                 2  thrpt   15  169572.635 ±  595.613   ops/s
RecordBatchIterationBenchmark.measureValidation:·gc.alloc.rate                         NO_CACHING   RANDOM                LZ4              10           1000                 2  thrpt   15   21129.934 ±   74.618  MB/sec
RecordBatchIterationBenchmark.measureValidation:·gc.alloc.rate.norm                    NO_CACHING   RANDOM                LZ4              10           1000                 2  thrpt   15  137216.002 ±    0.002    B/op
RecordBatchIterationBenchmark.measureValidation:·gc.churn.G1_Eden_Space                NO_CACHING   RANDOM                LZ4              10           1000                 2  thrpt   15   21410.416 ±   70.458  MB/sec
RecordBatchIterationBenchmark.measureValidation:·gc.churn.G1_Eden_Space.norm           NO_CACHING   RANDOM                LZ4              10           1000                 2  thrpt   15  139037.806 ±  309.278    B/op
RecordBatchIterationBenchmark.measureValidation:·gc.churn.G1_Old_Gen                   NO_CACHING   RANDOM                LZ4              10           1000                 2  thrpt   15       0.312 ±    0.420  MB/sec
RecordBatchIterationBenchmark.measureValidation:·gc.churn.G1_Old_Gen.norm              NO_CACHING   RANDOM                LZ4              10           1000                 2  thrpt   15       2.026 ±    2.725    B/op
RecordBatchIterationBenchmark.measureValidation:·gc.count                              NO_CACHING   RANDOM                LZ4              10           1000                 2  thrpt   15    3398.000             counts
RecordBatchIterationBenchmark.measureValidation:·gc.time                               NO_CACHING   RANDOM                LZ4              10           1000                 2  thrpt   15    1701.000                 ms
JMH benchmarks done


Updated benchmark after KAFKA-9820:
Benchmark                                                                     (bufferSupplierStr)  (bytes)  (compressionType)  (maxBatchSize)  (messageSize)  (messageVersion)   Mode  Cnt       Score     Error   Units
RecordBatchIterationBenchmark.measureValidation                                        NO_CACHING   RANDOM                LZ4               1           1000                 2  thrpt   15  322678.586 ± 254.126   ops/s
RecordBatchIterationBenchmark.measureValidation:·gc.alloc.rate                         NO_CACHING   RANDOM                LZ4               1           1000                 2  thrpt   15   20376.474 ±  15.326  MB/sec
RecordBatchIterationBenchmark.measureValidation:·gc.alloc.rate.norm                    NO_CACHING   RANDOM                LZ4               1           1000                 2  thrpt   15   69544.001 ±   0.001    B/op
RecordBatchIterationBenchmark.measureValidation:·gc.churn.G1_Eden_Space                NO_CACHING   RANDOM                LZ4               1           1000                 2  thrpt   15   20485.394 ±  44.087  MB/sec
RecordBatchIterationBenchmark.measureValidation:·gc.churn.G1_Eden_Space.norm           NO_CACHING   RANDOM                LZ4               1           1000                 2  thrpt   15   69915.744 ± 143.372    B/op
RecordBatchIterationBenchmark.measureValidation:·gc.churn.G1_Old_Gen                   NO_CACHING   RANDOM                LZ4               1           1000                 2  thrpt   15       0.027 ±   0.002  MB/sec
RecordBatchIterationBenchmark.measureValidation:·gc.churn.G1_Old_Gen.norm              NO_CACHING   RANDOM                LZ4               1           1000                 2  thrpt   15       0.091 ±   0.008    B/op
RecordBatchIterationBenchmark.measureValidation:·gc.count                              NO_CACHING   RANDOM                LZ4               1           1000                 2  thrpt   15    3652.000            counts
RecordBatchIterationBenchmark.measureValidation:·gc.time                               NO_CACHING   RANDOM                LZ4               1           1000                 2  thrpt   15    1773.000                ms
RecordBatchIterationBenchmark.measureValidation                                        NO_CACHING   RANDOM                LZ4               2           1000                 2  thrpt   15  321332.070 ± 869.841   ops/s
RecordBatchIterationBenchmark.measureValidation:·gc.alloc.rate                         NO_CACHING   RANDOM                LZ4               2           1000                 2  thrpt   15   20303.259 ±  55.609  MB/sec
RecordBatchIterationBenchmark.measureValidation:·gc.alloc.rate.norm                    NO_CACHING   RANDOM                LZ4               2           1000                 2  thrpt   15   69600.001 ±   0.001    B/op
RecordBatchIterationBenchmark.measureValidation:·gc.churn.G1_Eden_Space                NO_CACHING   RANDOM                LZ4               2           1000                 2  thrpt   15   20394.052 ±  72.842  MB/sec
RecordBatchIterationBenchmark.measureValidation:·gc.churn.G1_Eden_Space.norm           NO_CACHING   RANDOM                LZ4               2           1000                 2  thrpt   15   69911.238 ± 160.177    B/op
RecordBatchIterationBenchmark.measureValidation:·gc.churn.G1_Old_Gen                   NO_CACHING   RANDOM                LZ4               2           1000                 2  thrpt   15       0.028 ±   0.003  MB/sec
RecordBatchIterationBenchmark.measureValidation:·gc.churn.G1_Old_Gen.norm              NO_CACHING   RANDOM                LZ4               2           1000                 2  thrpt   15       0.096 ±   0.010    B/op
RecordBatchIterationBenchmark.measureValidation:·gc.count                              NO_CACHING   RANDOM                LZ4               2           1000                 2  thrpt   15    3637.000            counts
RecordBatchIterationBenchmark.measureValidation:·gc.time                               NO_CACHING   RANDOM                LZ4               2           1000                 2  thrpt   15    1790.000                ms
RecordBatchIterationBenchmark.measureValidation                                        NO_CACHING   RANDOM                LZ4              10           1000                 2  thrpt   15  315490.355 ± 271.921   ops/s
RecordBatchIterationBenchmark.measureValidation:·gc.alloc.rate                         NO_CACHING   RANDOM                LZ4              10           1000                 2  thrpt   15   19943.166 ±  21.235  MB/sec
RecordBatchIterationBenchmark.measureValidation:·gc.alloc.rate.norm                    NO_CACHING   RANDOM                LZ4              10           1000                 2  thrpt   15   69640.001 ±   0.001    B/op
RecordBatchIterationBenchmark.measureValidation:·gc.churn.G1_Eden_Space                NO_CACHING   RANDOM                LZ4              10           1000                 2  thrpt   15   20020.263 ±  43.144  MB/sec
RecordBatchIterationBenchmark.measureValidation:·gc.churn.G1_Eden_Space.norm           NO_CACHING   RANDOM                LZ4              10           1000                 2  thrpt   15   69909.228 ± 136.413    B/op
RecordBatchIterationBenchmark.measureValidation:·gc.churn.G1_Old_Gen                   NO_CACHING   RANDOM                LZ4              10           1000                 2  thrpt   15       0.026 ±   0.002  MB/sec
RecordBatchIterationBenchmark.measureValidation:·gc.churn.G1_Old_Gen.norm              NO_CACHING   RANDOM                LZ4              10           1000                 2  thrpt   15       0.090 ±   0.008    B/op
RecordBatchIterationBenchmark.measureValidation:·gc.count                              NO_CACHING   RANDOM                LZ4              10           1000                 2  thrpt   15    3571.000            counts
RecordBatchIterationBenchmark.measureValidation:·gc.time                               NO_CACHING   RANDOM                LZ4              10           1000                 2  thrpt   15    1764.000                ms
```

Reviewers: Ismael Juma <ismael@juma.me.uk>
2020-04-05 14:24:54 -07:00
Lucas Bradstreet 46540eb5e0
KAFKA-9820: validateMessagesAndAssignOffsetsCompressed allocates unused iterator (#8422)
3e9d1c1411 introduced skipKeyValueIterator(s) which were intended to be used, but in this case were created but were not used in offset validation.

A subset of the benchmark results follow. Looks like a 20% improvement in validation performance and a 40% reduction in garbage allocation for 1-2 batch sizes.

**# Parameters: (bufferSupplierStr = NO_CACHING, bytes = RANDOM, compressionType = LZ4, maxBatchSize = 1, messageSize = 1000, messageVersion = 2)**

Before:
Result "org.apache.kafka.jmh.record.RecordBatchIterationBenchmark.measureValidation":
  64851.837 ±(99.9%) 944.248 ops/s [Average]              
  (min, avg, max) = (64505.317, 64851.837, 65114.359), stdev = 245.218
  CI (99.9%): [63907.589, 65796.084] (assumes normal distribution)                                       
                                                             
"org.apache.kafka.jmh.record.RecordBatchIterationBenchmark.measureValidation:·gc.alloc.rate.norm":
  164088.003 ±(99.9%) 0.004 B/op [Average]                                                                                 
  (min, avg, max) = (164088.001, 164088.003, 164088.004), stdev = 0.001
  CI (99.9%): [164087.998, 164088.007] (assumes normal distribution)

After:

Result "org.apache.kafka.jmh.record.RecordBatchIterationBenchmark.measureValidation":                                      
  78910.273 ±(99.9%) 707.024 ops/s [Average]                                                                               
  (min, avg, max) = (78785.486, 78910.273, 79234.007), stdev = 183.612                                                     
  CI (99.9%): [78203.249, 79617.297] (assumes normal distribution)                                       

"org.apache.kafka.jmh.record.RecordBatchIterationBenchmark.measureValidation:·gc.alloc.rate.norm":                                                                                                                                   
  96440.002 ±(99.9%) 0.001 B/op [Average]                                                                                  
  (min, avg, max) = (96440.002, 96440.002, 96440.002), stdev = 0.001                                                       
  CI (99.9%): [96440.002, 96440.003] (assumes normal distribution)   

 **# Parameters: (bufferSupplierStr = NO_CACHING, bytes = RANDOM, compressionType = LZ4, maxBatchSize = 2, messageSize = 1000, messageVersion = 2)**

Before:
Result "org.apache.kafka.jmh.record.RecordBatchIterationBenchmark.measureValidation":                                      
  64815.364 ±(99.9%) 639.309 ops/s [Average]                                                                               
  (min, avg, max) = (64594.545, 64815.364, 64983.305), stdev = 166.026                                                                                                                                                                                
  CI (99.9%): [64176.056, 65454.673] (assumes normal distribution)                                                         
                                                                                                                                                                                        "org.apache.kafka.jmh.record.RecordBatchIterationBenchmark.measureValidation:·gc.alloc.rate.norm":        
  163944.003 ±(99.9%) 0.001 B/op [Average]                                                                                 
  (min, avg, max) = (163944.002, 163944.003, 163944.003), stdev = 0.001                                                    
  CI (99.9%): [163944.002, 163944.004] (assumes normal distribution)                                     

After:
Result "org.apache.kafka.jmh.record.RecordBatchIterationBenchmark.measureValidation":
  77075.096 ±(99.9%) 201.092 ops/s [Average]              
  (min, avg, max) = (77021.537, 77075.096, 77129.693), stdev = 52.223
  CI (99.9%): [76874.003, 77276.188] (assumes normal distribution)                                       
                                                             
"org.apache.kafka.jmh.record.RecordBatchIterationBenchmark.measureValidation:·gc.alloc.rate.norm":
  96504.002 ±(99.9%) 0.003 B/op [Average]                                                                                  
  (min, avg, max) = (96504.001, 96504.002, 96504.003), stdev = 0.001
  CI (99.9%): [96503.999, 96504.005] (assumes normal distribution)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Ismael Juma <ismael@juma.me.uk>
2020-04-04 10:05:51 -07:00
Ismael Juma 222726d6f9
KAFKA-9373: Reduce shutdown time by avoiding unnecessary loading of indexes (#8346)
KAFKA-7283 enabled lazy mmap on index files by initializing indices
on-demand rather than performing costly disk/memory operations when
creating all indices on broker startup. This helped reducing the startup
time of brokers. However, segment indices are still created on closing
segments, regardless of whether they need to be closed or not.

This is a cleaned up version of #7900, which was submitted by @efeg. It
eliminates unnecessary disk accesses and memory map operations while
deleting, renaming or closing offset and time indexes.

In a cluster with 31 brokers, where each broker has 13K to 20K segments,
@efeg and team observed up to 2 orders of magnitude faster LogManager
shutdown times - i.e. dropping the LogManager shutdown time of each
broker from 10s of seconds to 100s of milliseconds.

To avoid confusion between `renameTo` and `setFile`, I replaced the
latter with the more restricted updateParentDir` (it turns out that's
all we need).

Reviewers: Jun Rao <junrao@gmail.com>, Andrew Choi <a24choi@edu.uwaterloo.ca>

Co-authored-by: Adem Efe Gencer <agencer@linkedin.com>
Co-authored-by: Ismael Juma <ismael@juma.me.uk>
2020-03-26 04:26:51 -07:00
Gardner Vickers 8cf781ef01
MINOR: Improve performance of checkpointHighWatermarks, patch 1/2 (#6741)
This PR works to improve high watermark checkpointing performance.

`ReplicaManager.checkpointHighWatermarks()` was found to be a major contributor to GC pressure, especially on Kafka clusters with high partition counts and low throughput.

Added a JMH benchmark for `checkpointHighWatermarks` which establishes a
performance baseline. The parameterized benchmark was run with 100, 1000 and
2000 topics. 

Modified `ReplicaManager.checkpointHighWatermarks()` to avoid extra copies and cached
the Log parent directory Sting to avoid frequent allocations when calculating
`File.getParent()`.

A few clean-ups:
* Changed all usages of Log.dir.getParent to Log.parentDir and Log.dir.getParentFile to
Log.parentDirFile.
* Only expose public accessor for `Log.dir` (consistent with `Log.parentDir`)
* Removed unused parameters in `Partition.makeLeader`, `Partition.makeFollower` and `Partition.createLogIfNotExists`.

Benchmark results:

| Topic Count | Ops/ms | MB/sec allocated |
|-------------|---------|------------------|
| 100               | + 51%    |  - 91% |
| 1000             | + 143% |  - 49% |
| 2000            | + 149% |   - 50% |

Reviewers: Lucas Bradstreet <lucas@confluent.io>. Ismael Juma <ismael@juma.me.uk>

Co-authored-by: Gardner Vickers <gardner@vickers.me>
Co-authored-by: Ismael Juma <ismael@juma.me.uk>
2020-03-25 20:53:42 -07:00
Manikumar Reddy a0e1407820
KAFKA-9670; Reduce allocations in Metadata Response preparation (#8236)
This PR removes  intermediate  conversions between `MetadataResponse.TopicMetadata` => `MetadataResponseTopic` and `MetadataResponse.PartitionMetadata` => `MetadataResponsePartition` objects.

There is 15-20% reduction in object allocations and 5-10% improvement in metadata request performance.

Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson<jason@confluent.io>
2020-03-16 09:30:48 -07:00
jiao e3ccf20794 KAFKA-9685: Solve Set concatenation perf issue in AclAuthorizer
To dismiss the usage of operation ++ against Set which is slow when Set has many entries. This pr introduces a new class 'AclSets' which takes multiple Sets as parameters and do 'find' against them one by one. For more details about perf and benchmark, refer to [KAFKA-9685](https://issues.apache.org/jira/browse/KAFKA-9685)

Author: jiao <jiao.zhang@linecorp.com>

Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>

Closes #8261 from jiao-zhangS/jira-9685
2020-03-13 20:53:29 +05:30
Manikumar Reddy 8dff0b168a Kafka 9626: Improve ACLAuthorizer.acls() performance
This PR avoids creation of unnecessary sets in AclAuthorizer.acls() method implementation.

Perf results:
**Old**
```
Benchmark                                (aclCount)  (resourceCount)  Mode  Cnt    Score   Error  Units
AclAuthorizerBenchmark.testAclsIterator           5             5000  avgt   15    5.821 ? 0.309  ms/op
AclAuthorizerBenchmark.testAclsIterator           5            10000  avgt   15   15.303 ? 0.107  ms/op
AclAuthorizerBenchmark.testAclsIterator           5            50000  avgt   15   74.976 ? 0.543  ms/op
AclAuthorizerBenchmark.testAclsIterator          10             5000  avgt   15   15.366 ? 0.184  ms/op
AclAuthorizerBenchmark.testAclsIterator          10            10000  avgt   15   29.899 ? 0.129  ms/op
AclAuthorizerBenchmark.testAclsIterator          10            50000  avgt   15  167.301 ? 1.723  ms/op
AclAuthorizerBenchmark.testAclsIterator          15             5000  avgt   15   21.980 ? 0.114  ms/op
AclAuthorizerBenchmark.testAclsIterator          15            10000  avgt   15   44.385 ? 0.255  ms/op
AclAuthorizerBenchmark.testAclsIterator          15            50000  avgt   15  241.919 ? 3.955  ms/op
```
**New**

```
Benchmark                                (aclCount)  (resourceCount)  Mode  Cnt   Score   Error  Units
AclAuthorizerBenchmark.testAclsIterator           5             5000  avgt   15   0.666 ? 0.004  ms/op
AclAuthorizerBenchmark.testAclsIterator           5            10000  avgt   15   1.427 ? 0.015  ms/op
AclAuthorizerBenchmark.testAclsIterator           5            50000  avgt   15  21.410 ? 0.225  ms/op
AclAuthorizerBenchmark.testAclsIterator          10             5000  avgt   15   1.230 ? 0.018  ms/op
AclAuthorizerBenchmark.testAclsIterator          10            10000  avgt   15   4.303 ? 0.744  ms/op
AclAuthorizerBenchmark.testAclsIterator          10            50000  avgt   15  36.724 ? 0.409  ms/op
AclAuthorizerBenchmark.testAclsIterator          15             5000  avgt   15   2.433 ? 0.379  ms/op
AclAuthorizerBenchmark.testAclsIterator          15            10000  avgt   15   9.818 ? 0.214  ms/op
AclAuthorizerBenchmark.testAclsIterator          15            50000  avgt   15  52.886 ? 0.525  ms/op
```

Author: Manikumar Reddy <manikumar.reddy@gmail.com>
Author: Lucas Bradstreet <lucas@confluent.io>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Rajini Sivaram <rajinisivaram@googlemail.com>, Lucas Bradstreet <lucas@confluent.io>

Closes #8199 from omkreddy/KAFKA-9626
2020-03-03 01:51:09 +05:30
Manikumar Reddy 922a95a18d KAFKA-9594: Add a separate lock to pause the follower log append while checking if the log dir could be replaced.
This PR adds new lock is used to prevent the follower replica from being updated while ReplicaAlterDirThread is executing maybeReplaceCurrentWithFutureReplica() to replace follower replica with the future replica.

Now doAppendRecordsToFollowerOrFutureReplica() doesn't need to hold the lock on leaderIsrUpdateLock for local replica updation and ongoing log appends on the follower will not delay the makeFollower() call.

**Benchmark results for Partition.makeFollower() **
Old:
```
Benchmark                                        Mode  Cnt     Score    Error  Units
PartitionMakeFollowerBenchmark.testMakeFollower  avgt    15  2046.967 ? 22.842  ns/op
```

New:
```
Benchmark                                        Mode  Cnt     Score   Error  Units
PartitionMakeFollowerBenchmark.testMakeFollower  avgt    15  1278.525 ? 5.354  ns/op
```

Author: Manikumar Reddy <manikumar.reddy@gmail.com>

Reviewers: Jun Rao <junrao@gmail.com>

Closes #8153 from omkreddy/KAFKA-9594-LAISR
2020-02-26 10:55:48 +05:30
Lucas Bradstreet 1675115ec1 MINOR: refactor replica last sent HW updates due to performance regression (#7671)
This change fixes a performance regression due to follower last seen highwatermark
handling introduced in 23beeea. maybeUpdateHwAndSendResponse is expensive for
brokers with high partition counts, as it requires a partition and a replica lookup for every
partition being fetched. This refactor moves the last seen watermark update into the follower
fetch state update where we have already looked up the partition and replica.

I've seen cases where maybeUpdateHwAndSendResponse is responsible 8% of CPU usage, not including the responseCallback call that is part of it.

I have benchmarked this change with `UpdateFollowerFetchStateBenchmark` and it adds 5ns
of overhead to Partition.updateFollowerFetchState, which is a rounding error compared to the
current overhead of maybeUpdateHwAndSendResponse.

Reviewers: David Arthur <mumrah@gmail.com>, Jason Gustafson <jason@confluent.io>, Ismael Juma <ismael@juma.me.uk>
2019-11-12 21:21:18 -08:00
Guozhang Wang 59a75f4422
KAFKA-9048 Pt1: Remove Unnecessary lookup in Fetch Building (#7576)
Get rid of partitionStates that creates a new PartitionState for each state since all the callers do not require it to be a Seq.

Modify ReplicaFetcherThread constructor to fix the broken benchmark path.

This PR:

Benchmark                                  (partitionCount)  Mode  Cnt        Score       Error  Units
ReplicaFetcherThreadBenchmark.testFetcher               100  avgt   15     9280.953 ±    55.967  ns/op
ReplicaFetcherThreadBenchmark.testFetcher               500  avgt   15    61533.546 ±  1213.559  ns/op
ReplicaFetcherThreadBenchmark.testFetcher              1000  avgt   15   151306.146 ±  1820.222  ns/op
ReplicaFetcherThreadBenchmark.testFetcher              5000  avgt   15  1138547.929 ± 45301.938  ns/op

Trunk:

Benchmark                                  (partitionCount)  Mode  Cnt        Score       Error  Units |   |   |   |   |  
-- | -- | -- | -- | -- | --
ReplicaFetcherThreadBenchmark.testFetcher               100  avgt   15     9305.588 ±    51.886  ns/op |   |   |   |   |  
ReplicaFetcherThreadBenchmark.testFetcher               500  avgt   15    65216.933 ±   939.827  ns/op |   |   |   |   |  
ReplicaFetcherThreadBenchmark.testFetcher              1000  avgt   15   151715.514 ±  1361.009  ns/op |   |   |   |   |  
ReplicaFetcherThreadBenchmark.testFetcher              5000  avgt   15  1231958.103 ± 94


Reviewers: Jason Gustafson <jason@confluent.io>, Lucas Bradstreet <lucasbradstreet@gmail.com>
2019-10-28 07:57:50 -07:00
Lucas Bradstreet 8966d066bd KAFKA-9039: Optimize ReplicaFetcher fetch path (#7443)
Improves the performance of the replica fetcher for high partition count fetch requests, where a majority of the partitions did not update between fetch requests. All benchmarks were run on an r5x.large.

Vanilla
Benchmark (partitionCount) Mode Cnt Score Error Units
ReplicaFetcherThreadBenchmark.testFetcher 100 avgt 15 26491.825 ± 438.463 ns/op
ReplicaFetcherThreadBenchmark.testFetcher 500 avgt 15 153941.952 ± 4337.073 ns/op
ReplicaFetcherThreadBenchmark.testFetcher 1000 avgt 15 339868.602 ± 4201.462 ns/op
ReplicaFetcherThreadBenchmark.testFetcher 5000 avgt 15 2588878.448 ± 22172.482 ns/op

From 100 to 5000 partitions the latency increase is 2588878.448 / 26491.825 = 97.

Avoid gettimeofdaycalls in steady state fetch states
8545888

Benchmark (partitionCount) Mode Cnt Score Error Units
ReplicaFetcherThreadBenchmark.testFetcher 100 avgt 15 22685.381 ± 267.727 ns/op
ReplicaFetcherThreadBenchmark.testFetcher 500 avgt 15 113622.521 ± 1854.254 ns/op
ReplicaFetcherThreadBenchmark.testFetcher 1000 avgt 15 273698.740 ± 9269.554 ns/op
ReplicaFetcherThreadBenchmark.testFetcher 5000 avgt 15 2189223.207 ± 1706.945 ns/op

From 100 to 5000 partitions the latency increase is 2189223.207 / 22685.381 = 97X

Avoid copying partition states to maintain fetch offsets
29fdd60

Benchmark (partitionCount) Mode Cnt Score Error Units
ReplicaFetcherThreadBenchmark.testFetcher 100 avgt 15 17039.989 ± 609.355 ns/op
ReplicaFetcherThreadBenchmark.testFetcher 500 avgt 15 99371.086 ± 1833.256 ns/op
ReplicaFetcherThreadBenchmark.testFetcher 1000 avgt 15 216071.333 ± 3714.147 ns/op
ReplicaFetcherThreadBenchmark.testFetcher 5000 avgt 15 2035678.223 ± 5195.232 ns/op

From 100 to 5000 partitions the latency increase is 2035678.223 / 17039.989 = 119X

Keep lag alongside PartitionFetchState to avoid expensive isReplicaInSync check
0e57e3e

Benchmark (partitionCount) Mode Cnt Score Error Units
ReplicaFetcherThreadBenchmark.testFetcher 100 avgt 15 15131.684 ± 382.088 ns/op
ReplicaFetcherThreadBenchmark.testFetcher 500 avgt 15 86813.843 ± 3346.385 ns/op
ReplicaFetcherThreadBenchmark.testFetcher 1000 avgt 15 193050.381 ± 3281.833 ns/op
ReplicaFetcherThreadBenchmark.testFetcher 5000 avgt 15 1801488.513 ± 2756.355 ns/op

From 100 to 5000 partitions the latency increase is 1801488.513 / 15131.684 = 119X

Fetch session optimizations (mostly presizing the next hashmap, and avoiding making a copy of sessionPartitions, as a deep copy is not required for the ReplicaFetcher)
2614b24

Benchmark (partitionCount) Mode Cnt Score Error Units
ReplicaFetcherThreadBenchmark.testFetcher 100 avgt 15 11386.203 ± 416.701 ns/op
ReplicaFetcherThreadBenchmark.testFetcher 500 avgt 15 60820.292 ± 3163.001 ns/op
ReplicaFetcherThreadBenchmark.testFetcher 1000 avgt 15 146242.158 ± 1937.254 ns/op
ReplicaFetcherThreadBenchmark.testFetcher 5000 avgt 15 1366768.926 ± 3305.712 ns/op

From 100 to 5000 partitions the latency increase is 1366768.926 / 11386.203 = 120

Reviewers: Jun Rao <junrao@gmail.com>, Guozhang Wang <wangguoz@gmail.com>
2019-10-16 09:49:53 -07:00
Ismael Juma 66183f730f
KAFKA-8471: Replace control requests/responses with automated protocol (#7353)
Replaced UpdateMetadata{Request, Response}, LeaderAndIsr{Request, Response}
and StopReplica{Request, Response} with the automated protocol classes.

Updated the JSON schema for the 3 request types to be more consistent and
less strict (if needed to avoid duplication).

The general approach is to avoid generating new collections in the request
classes. Normalization happens in the constructor to make this possible. Builders
still have to group by topic to maintain the external ungrouped view.

Introduced new tests for LeaderAndIsrRequest and UpdateMetadataRequest to
verify that the new logic is correct.

A few other clean-ups/fixes in code that was touched due to these changes:
* KAFKA-8956: Refactor DelayedCreatePartitions#updateWaiting to avoid modifying
collection in foreach.
* Avoid unnecessary allocation for state change trace logging if trace logging is not enabled
* Use `toBuffer` instead of `toList`, `toIndexedSeq` or `toSeq` as it generally performs
better and it matches the performance characteristics of `java.util.ArrayList`. This is
particularly important when passing such instances to Java code.
* Minor refactoring for clarity and readability.
* Removed usage of deprecated `/:`, unused imports and unnecessary `var`s.
* Include exception in `AdminClientIntegrationTest` failure message.
* Move StopReplicaRequest verification in `AuthorizerIntegrationTest` to the end
to match the comment.

Reviewers: Colin Patrick McCabe <cmccabe@apache.org>
2019-09-28 19:39:45 -07:00
Lucas Bradstreet f3ded39a05 KAFKA-8841; Reduce overhead of ReplicaManager.updateFollowerFetchState (#7324)
This PR makes two changes to code in the ReplicaManager.updateFollowerFetchState path, which is in the hot path for follower fetches. Although calling ReplicaManager.updateFollowerFetch state is inexpensive on its own, it is called once for each partition every time a follower fetch occurs.

1. updateFollowerFetchState no longer calls maybeExpandIsr when the follower is already in the ISR. This avoid repeated expansion checks. 
2. Partition.maybeIncrementLeaderHW is also in the hot path for ReplicaManager.updateFollowerFetchState. Partition.maybeIncrementLeaderHW calls Partition.remoteReplicas four times each iteration, and it performs a toSet conversion. maybeIncrementLeaderHW now avoids generating any intermediate collections when updating the HWM.

**Benchmark results for Partition.updateFollowerFetchState on a r5.xlarge:**
Old:
```
  1288.633 ±(99.9%) 1.170 ns/op [Average]
  (min, avg, max) = (1287.343, 1288.633, 1290.398), stdev = 1.037
  CI (99.9%): [1287.463, 1289.802] (assumes normal distribution)
```

New (when follower fetch offset is updated):
```
  261.727 ±(99.9%) 0.122 ns/op [Average]
  (min, avg, max) = (261.565, 261.727, 261.937), stdev = 0.114
  CI (99.9%): [261.605, 261.848] (assumes normal distribution)
```

New (when follower fetch offset is the same):
```
  68.484 ±(99.9%) 0.025 ns/op [Average]
  (min, avg, max) = (68.446, 68.484, 68.520), stdev = 0.023
  CI (99.9%): [68.460, 68.509] (assumes normal distribution)
```

Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>
2019-09-18 09:11:39 -07:00
Lucas Bradstreet fb381cb6c7 MINOR: Fix integer overflow in LRUCacheBenchmark (#7270)
The jmh LRUCacheBenchmark will exhibit an int overflow when run on a fast machine:

```
java.lang.ArrayIndexOutOfBoundsException: Index -3648 out of bounds for length 10000
	at org.apache.kafka.jmh.cache.LRUCacheBenchmark.testCachePerformance(LRUCacheBenchmark.java:70)
	at org.apache.kafka.jmh.cache.generated.LRUCacheBenchmark_testCachePerformance_jmhTest.testCachePerformance_thrpt_jmhStub(LRUCacheBenchmark_testCachePerformance_jmhTest.java:119)
	at org.apache.kafka.jmh.cache.generated.LRUCacheBenchmark_testCachePerformance_jmhTest.testCachePerformance_Throughput(LRUCacheBenchmark_testCachePerformance_jmhTest.java:83)
```

Reviewers: Jason Gustafson <jason@confluent.io>
2019-08-29 18:02:27 -07:00
Guozhang Wang 3e9d1c1411
KAFKA-8106: Skipping ByteBuffer allocation of key / value / headers in logValidator (#6785)
* KAFKA-8106:Reducing the allocation and copying of ByteBuffer when logValidator do validation.

* KAFKA-8106:Reducing the allocation and copying of ByteBuffer when logValidator do validation.

* github comments

* use batch.skipKeyValueIterator

* cleanups

* no need to skip kv for uncompressed iterator

* checkstyle fixes

* fix findbugs

* adding unit tests

* reuse decompression buffer; and using streaming iterator

* checkstyle

* add unit tests

* remove reusing buffer supplier

* fix unit tests

* add unit tests

* use streaming iterator

* minor refactoring

* rename

* github comments

* github comments

* reuse buffer at DefaultRecord caller

* some further optimization

* major refactoring

* further refactoring

* update comment

* github comments

* minor fix

* add jmh benchmarks

* update jmh

* github comments

* minor fix

* github comments
2019-06-21 12:44:45 -07:00
Lifei Chen 3322439d98 MINOR: Document improvement (#6682)
Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
2019-05-06 16:52:23 +05:30
Ismael Juma 7d9e93ac6d
MINOR: Use https instead of http in links (#6477)
Verified that the https links work.

I didn't update the license header in this PR since that touches
so many files. Will file a separate one for that.

Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
2019-04-22 11:58:25 -07:00
Ismael Juma 12f310d50e
KAFKA-7612: Fix javac warnings and enable warnings as errors (#5900)
- Use Xlint:all with 3 exclusions (filed KAFKA-7613 to remove the exclusions)
- Use the same javac options when compiling tests (seems accidental that
we didn't do this before)
- Replaced several deprecated method calls with non-deprecated ones:
  - `KafkaConsumer.poll(long)` and `KafkaConsumer.close(long)`
  - `Class.newInstance` and `new Integer/Long` (deprecated since Java 9)
  - `scala.Console` (deprecated in Scala 2.11)
  - `PartitionData` taking a timestamp (one of them seemingly a bug)
  - `JsonMappingException` single parameter constructor
- Fix unnecessary usage of raw types in several places.
- Add @SuppressWarnings for deprecations, unchecked and switch fallthrough in
several places.
- Scala clean-ups (var -> val, ETA expansion warnings, avoid reflective calls)
- Use lambdas to simplify code in a few places
- Add @SafeVarargs, fix varargs usage and remove unnecessary `Utils.mkList` method

Reviewers: Matthias J. Sax <mjsax@apache.org>, Manikumar Reddy <manikumar.reddy@gmail.com>, Randall Hauch <rhauch@gmail.com>, Bill Bejeck <bill@confluent.io>, Stanislav Kozlovski <stanislav_kozlovski@outlook.com>
2018-11-12 22:18:59 -08:00
Armin Braun 346d0ca538 MINOR: Fix needless GC + Result time unit in JMH
Fixes two issues with the JMH benchmark example:
* Trivial: The output should be in `ops/ms` for readability
reasons (it's in the millions of operations per second)
* Important: The benchmark is not actually measuring the
LRU Cache performance as most of the time in each run is
wasted on concatenating `key + counter` as well as
`value + counter`. Fixed by pre-generating 10k K-V pairs
(100x the cache capacity) and iterating over them. This
brings the performance up by a factor of more than 5 on
a standard 4 core i7 (`~6k/ms` before goes to `~35k/ms`).

Author: Armin Braun <me@obrown.io>

Reviewers: Bill Bejeck <bbejeck@gmail.com>, Guozhang Wang <wangguoz@gmail.com>, Ismael Juma <ismael@juma.me.uk>

Closes #2903 from original-brownbear/fix-jmh-example
2017-09-18 10:52:54 +01:00
Bill Bejeck b6adb2dc89 KAFKA-3989; MINOR: follow-up: update script to run from kafka root
…h-benchmarks/jmh.sh

Author: bbejeck <bbejeck@gmail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Guozhang Wang <wangguoz@gmail.com>

Closes #2654 from bbejeck/KAFKA-3989_follow_up
2017-08-26 16:15:40 -07:00
Jason Gustafson f49697a279 KAFKA-5456; Ensure producer handles old format large compressed messages
More specifically, fix the case where a compressed V0 or V1 message is
larger than the producer batch size.

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Apurva Mehta <apurva@confluent.io>, Ismael Juma <ismael@juma.me.uk>

Closes #3356 from hachikuji/KAFKA-5456
2017-06-16 20:36:28 +01:00
Ismael Juma 39eb31feae MINOR: Optimise performance of `Topic.validate()`
I included a JMH benchmark and the results follow. The
implementation in this PR takes no more than 1/10th
of the time when compared to trunk. I also included
results for an alternative implementation that is a little
slower than the one in the PR.

Trunk:
```text
TopicBenchmark.testValidate                                topic  avgt   15  134.107 ±  3.956  ns/op
TopicBenchmark.testValidate                    longer-topic-name  avgt   15  316.241 ± 13.379  ns/op
TopicBenchmark.testValidate  very-long-topic-name_with_more_text  avgt   15  636.026 ± 30.272  ns/op
```

Implementation in the PR:
```text
TopicBenchmark.testValidate                                topic  avgt   15  13.153 ± 0.383  ns/op
TopicBenchmark.testValidate                    longer-topic-name  avgt   15  26.139 ± 0.896  ns/op
TopicBenchmark.testValidate  very-long-topic-name.with_more_text  avgt   15  44.829 ± 1.390  ns/op
```

Alternative implementation where boolean validChar = Character.isLetterOrDigit(c) || c == '.' || c == '_' || c == '-';
```text
TopicBenchmark.testValidate                                topic  avgt   15  18.883 ± 1.044  ns/op
TopicBenchmark.testValidate                    longer-topic-name  avgt   15  36.696 ± 1.220  ns/op
TopicBenchmark.testValidate  very-long-topic-name_with_more_text  avgt   15  65.956 ± 0.669  ns/op
```

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #3234 from ijuma/optimise-topic-is-valid
2017-06-06 03:08:40 +01:00
Ismael Juma c7bc8f7d8c MINOR: Remove redundant volatile write in RecordHeaders
The JMH benchmark included shows that the redundant
volatile write causes the constructor of `ProducerRecord`
to take more than 50% longer:

ProducerRecordBenchmark.constructorBenchmark  avgt   15  24.136 ± 1.458  ns/op (before)
ProducerRecordBenchmark.constructorBenchmark  avgt   15  14.904 ± 0.231  ns/op (after)

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Jason Gustafson <jason@confluent.io>

Closes #3233 from ijuma/remove-volatile-write-in-records-header-constructor
2017-06-04 10:48:34 -07:00
Xavier Léauté c060c48285 KAFKA-5150; Reduce LZ4 decompression overhead
- reuse decompression buffers in consumer Fetcher
- switch lz4 input stream to operate directly on ByteBuffers
- avoids performance impact of catching exceptions when reaching the end of legacy record batches
- more tests with both compressible / incompressible data, multiple
  blocks, and various other combinations to increase code coverage
- fixes bug that would cause exception instead of invalid block size
  for invalid incompressible blocks
- fixes bug if incompressible flag is set on end frame block size

Overall this improves LZ4 decompression performance by up to 40x for small batches.
Most improvements are seen for batches of size 1 with messages on the order of ~100B.
We see at least 2x improvements for for batch sizes of < 10 messages, containing messages < 10kB

This patch also yields 2-4x improvements on v1 small single message batches for other compression types.

Full benchmark results can be found here
https://gist.github.com/xvrl/05132e0643513df4adf842288be86efd

Author: Xavier Léauté <xavier@confluent.io>
Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Jason Gustafson <jason@confluent.io>, Ismael Juma <ismael@juma.me.uk>

Closes #2967 from xvrl/kafka-5150
2017-05-31 02:22:07 +01:00
bbejeck 79f85039d7 KAFKA-3989; Initial support for adding a JMH benchmarking module
Author: bbejeck <bbejeck@gmail.com>

Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>, Ismael Juma <ismael@juma.me.uk>

Closes #1712 from bbejeck/KAFKA-3989_create_jmh_benchmarking_module
2017-03-06 11:56:14 +00:00