kafka/gradle
James Yuzawa eb24ed893a
KAFKA-10470: Zstd upgrade and buffering (#9499)
Zstd-jni 1.4.5-6 allocates large internal buffers inside of ZstdInputStream and ZstdOutputStream. This caused a lot of allocation and GC activity when creating and closing the streams. It also does not buffer the reads or writes. This causes inefficiency when DefaultRecord.writeTo() does a series of small single bytes reads using various ByteUtils methods. The JNI is more efficient if the writes of uncompressed data were flushed in large pieces rather than for each byte. This is due to the the expense of context switching between the Java code and the native code. This is also the case when reading as well. Per luben/zstd-jni#141 the maintainer of zstd-jni and I agreed to not buffer reads and writes in favor of having the caller do that, so here we are updating the caller.

In this patch, I upgraded to the most recent zstd-jni version with the buffer reuse built-in. This was done in luben/zstd-jni#143 and luben/zstd-jni#146 Since we decided not to add additional buffering of input/output with zstd-jni, I added the BufferedInputStream and BufferedOutputStream to CompressionType.ZSTD just like we currently do for CompressionType.GZIP which also is inefficient for single byte reads and writes. I used the same buffer sizes as that existing implementation.

NOTE: if so desired we could pass a wrapped BufferSupplier into the Zstd*Stream classes to have Kafka decide how the buffer recycling occurs. This functionality was added in the latter PR linked above. I am holding off on this since based on jmh benchmarking the performance gains were not clear and personally I don't know if it worth the complexity of trying to hack around the reflection at this point in time. The zstd-jni uses a very similar default recycler as snappy does currently which seems to provide decent efficiency. While this PR fixes the defect, I feel that using BufferSupplier in both zstd-jni and snappy is outside of the scope of this bugfix and should be considered a separate improvement. I would prefer this change get merged in on its own since the performance gains here are very significant relative to the more incremental and minor optimizations which could be achieved by doing that separate work.

There are some noticeable improvements in the JMH benchmarks (excerpt):

BEFORE:
Benchmark                                                                                                                    (bufferSupplierStr)  (bytes)  (compressionType)  (maxBatchSize)  (messageSize)  (messageVersion)   Mode  Cnt       Score     Error   Units
CompressedRecordBatchValidationBenchmark.measureValidateMessagesAndAssignOffsetsCompressed                                                CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15   27743.260 ± 673.869   ops/s
CompressedRecordBatchValidationBenchmark.measureValidateMessagesAndAssignOffsetsCompressed:·gc.alloc.rate                                 CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15    3399.966 ±  82.608  MB/sec
CompressedRecordBatchValidationBenchmark.measureValidateMessagesAndAssignOffsetsCompressed:·gc.alloc.rate.norm                            CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15  134968.010 ±   0.012    B/op
CompressedRecordBatchValidationBenchmark.measureValidateMessagesAndAssignOffsetsCompressed:·gc.churn.G1_Eden_Space                        CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15    3850.985 ±  84.476  MB/sec
CompressedRecordBatchValidationBenchmark.measureValidateMessagesAndAssignOffsetsCompressed:·gc.churn.G1_Eden_Space.norm                   CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15  152881.128 ± 942.189    B/op
CompressedRecordBatchValidationBenchmark.measureValidateMessagesAndAssignOffsetsCompressed:·gc.churn.G1_Survivor_Space                    CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15     174.241 ±   3.486  MB/sec
CompressedRecordBatchValidationBenchmark.measureValidateMessagesAndAssignOffsetsCompressed:·gc.churn.G1_Survivor_Space.norm               CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15    6917.758 ±  82.522    B/op
CompressedRecordBatchValidationBenchmark.measureValidateMessagesAndAssignOffsetsCompressed:·gc.count                                      CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15    1689.000            counts
CompressedRecordBatchValidationBenchmark.measureValidateMessagesAndAssignOffsetsCompressed:·gc.time                                       CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15   82621.000                ms
JMH benchmarks done

Benchmark                                                                                                    (bufferSupplierStr)  (bytes)  (compressionType)  (maxBatchSize)  (messageSize)  (messageVersion)   Mode  Cnt       Score       Error   Units
RecordBatchIterationBenchmark.measureIteratorForBatchWithSingleMessage                                                    CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15   24095.711 ±   895.866   ops/s
RecordBatchIterationBenchmark.measureIteratorForBatchWithSingleMessage:·gc.alloc.rate                                     CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15    2932.289 ±   109.465  MB/sec
RecordBatchIterationBenchmark.measureIteratorForBatchWithSingleMessage:·gc.alloc.rate.norm                                CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15  134032.012 ±     0.013    B/op
RecordBatchIterationBenchmark.measureIteratorForBatchWithSingleMessage:·gc.churn.G1_Eden_Space                            CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15    3282.912 ±   115.042  MB/sec
RecordBatchIterationBenchmark.measureIteratorForBatchWithSingleMessage:·gc.churn.G1_Eden_Space.norm                       CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15  150073.914 ±  1342.235    B/op
RecordBatchIterationBenchmark.measureIteratorForBatchWithSingleMessage:·gc.churn.G1_Survivor_Space                        CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15     149.697 ±     5.786  MB/sec
RecordBatchIterationBenchmark.measureIteratorForBatchWithSingleMessage:·gc.churn.G1_Survivor_Space.norm                   CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15    6842.462 ±    64.515    B/op
RecordBatchIterationBenchmark.measureIteratorForBatchWithSingleMessage:·gc.count                                          CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15    1449.000              counts
RecordBatchIterationBenchmark.measureIteratorForBatchWithSingleMessage:·gc.time                                           CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15   82518.000                  ms
RecordBatchIterationBenchmark.measureSkipIteratorForVariableBatchSize                                                     CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15    1449.060 ±   230.498   ops/s
RecordBatchIterationBenchmark.measureSkipIteratorForVariableBatchSize:·gc.alloc.rate                                      CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15     198.051 ±    31.532  MB/sec
RecordBatchIterationBenchmark.measureSkipIteratorForVariableBatchSize:·gc.alloc.rate.norm                                 CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15  150502.519 ±     0.186    B/op
RecordBatchIterationBenchmark.measureSkipIteratorForVariableBatchSize:·gc.churn.G1_Eden_Space                             CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15     200.064 ±    31.879  MB/sec
RecordBatchIterationBenchmark.measureSkipIteratorForVariableBatchSize:·gc.churn.G1_Eden_Space.norm                        CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15  152569.341 ± 13826.686    B/op
RecordBatchIterationBenchmark.measureSkipIteratorForVariableBatchSize:·gc.count                                           CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15      91.000              counts
RecordBatchIterationBenchmark.measureSkipIteratorForVariableBatchSize:·gc.time                                            CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15   75869.000                  ms
RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize                                                CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15    2609.660 ±  1145.160   ops/s
RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize:·gc.alloc.rate                                 CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15     815.441 ±   357.818  MB/sec
RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize:·gc.alloc.rate.norm                            CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15  344309.097 ±     0.238    B/op
RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize:·gc.churn.G1_Eden_Space                        CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15     808.952 ±   354.975  MB/sec
RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize:·gc.churn.G1_Eden_Space.norm                   CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15  345712.061 ± 51434.034    B/op
RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize:·gc.churn.G1_Old_Gen                           CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15       0.019 ±     0.042  MB/sec
RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize:·gc.churn.G1_Old_Gen.norm                      CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15      18.615 ±    42.045    B/op
RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize:·gc.churn.G1_Survivor_Space                    CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15      24.132 ±    12.254  MB/sec
RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize:·gc.churn.G1_Survivor_Space.norm               CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15   13540.960 ± 14649.192    B/op
RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize:·gc.count                                      CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15     148.000              counts
RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize:·gc.time                                       CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15   23848.000                  ms
JMH benchmarks done

AFTER
Benchmark                                                                                                                (bufferSupplierStr)  (bytes)  (compressionType)  (maxBatchSize)  (messageSize)  (messageVersion)   Mode  Cnt       Score      Error   Units
CompressedRecordBatchValidationBenchmark.measureValidateMessagesAndAssignOffsetsCompressed                                            CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15  147792.454 ± 2721.318   ops/s
CompressedRecordBatchValidationBenchmark.measureValidateMessagesAndAssignOffsetsCompressed:·gc.alloc.rate                             CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15    2708.481 ±   50.012  MB/sec
CompressedRecordBatchValidationBenchmark.measureValidateMessagesAndAssignOffsetsCompressed:·gc.alloc.rate.norm                        CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15   20184.002 ±    0.002    B/op
CompressedRecordBatchValidationBenchmark.measureValidateMessagesAndAssignOffsetsCompressed:·gc.churn.G1_Eden_Space                    CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15    2732.667 ±   59.258  MB/sec
CompressedRecordBatchValidationBenchmark.measureValidateMessagesAndAssignOffsetsCompressed:·gc.churn.G1_Eden_Space.norm               CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15   20363.460 ±  120.585    B/op
CompressedRecordBatchValidationBenchmark.measureValidateMessagesAndAssignOffsetsCompressed:·gc.churn.G1_Old_Gen                       CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15       0.042 ±    0.033  MB/sec
CompressedRecordBatchValidationBenchmark.measureValidateMessagesAndAssignOffsetsCompressed:·gc.churn.G1_Old_Gen.norm                  CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15       0.316 ±    0.249    B/op
CompressedRecordBatchValidationBenchmark.measureValidateMessagesAndAssignOffsetsCompressed:·gc.count                                  CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15     833.000             counts
CompressedRecordBatchValidationBenchmark.measureValidateMessagesAndAssignOffsetsCompressed:·gc.time                                   CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15    8390.000                 ms
JMH benchmarks done

Benchmark                                                                                                (bufferSupplierStr)  (bytes)  (compressionType)  (maxBatchSize)  (messageSize)  (messageVersion)   Mode  Cnt       Score      Error   Units
RecordBatchIterationBenchmark.measureIteratorForBatchWithSingleMessage                                                CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15  166786.092 ± 3285.702   ops/s
RecordBatchIterationBenchmark.measureIteratorForBatchWithSingleMessage:·gc.alloc.rate                                 CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15    2926.914 ±   57.464  MB/sec
RecordBatchIterationBenchmark.measureIteratorForBatchWithSingleMessage:·gc.alloc.rate.norm                            CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15   19328.002 ±    0.002    B/op
RecordBatchIterationBenchmark.measureIteratorForBatchWithSingleMessage:·gc.churn.G1_Eden_Space                        CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15    2938.541 ±   66.850  MB/sec
RecordBatchIterationBenchmark.measureIteratorForBatchWithSingleMessage:·gc.churn.G1_Eden_Space.norm                   CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15   19404.357 ±  177.485    B/op
RecordBatchIterationBenchmark.measureIteratorForBatchWithSingleMessage:·gc.churn.G1_Old_Gen                           CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15       0.516 ±    0.100  MB/sec
RecordBatchIterationBenchmark.measureIteratorForBatchWithSingleMessage:·gc.churn.G1_Old_Gen.norm                      CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15       3.409 ±    0.657    B/op
RecordBatchIterationBenchmark.measureIteratorForBatchWithSingleMessage:·gc.churn.G1_Survivor_Space                    CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15       0.032 ±    0.131  MB/sec
RecordBatchIterationBenchmark.measureIteratorForBatchWithSingleMessage:·gc.churn.G1_Survivor_Space.norm               CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15       0.207 ±    0.858    B/op
RecordBatchIterationBenchmark.measureIteratorForBatchWithSingleMessage:·gc.count                                      CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15     834.000             counts
RecordBatchIterationBenchmark.measureIteratorForBatchWithSingleMessage:·gc.time                                       CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15    9370.000                 ms
RecordBatchIterationBenchmark.measureSkipIteratorForVariableBatchSize                                                 CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15   15988.116 ±  137.427   ops/s
RecordBatchIterationBenchmark.measureSkipIteratorForVariableBatchSize:·gc.alloc.rate                                  CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15     448.636 ±    3.851  MB/sec
RecordBatchIterationBenchmark.measureSkipIteratorForVariableBatchSize:·gc.alloc.rate.norm                             CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15   30907.698 ±    0.020    B/op
RecordBatchIterationBenchmark.measureSkipIteratorForVariableBatchSize:·gc.churn.G1_Eden_Space                         CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15     450.905 ±    5.587  MB/sec
RecordBatchIterationBenchmark.measureSkipIteratorForVariableBatchSize:·gc.churn.G1_Eden_Space.norm                    CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15   31064.113 ±  291.190    B/op
RecordBatchIterationBenchmark.measureSkipIteratorForVariableBatchSize:·gc.churn.G1_Old_Gen                            CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15       0.043 ±    0.007  MB/sec
RecordBatchIterationBenchmark.measureSkipIteratorForVariableBatchSize:·gc.churn.G1_Old_Gen.norm                       CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15       2.931 ±    0.493    B/op
RecordBatchIterationBenchmark.measureSkipIteratorForVariableBatchSize:·gc.count                                       CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15     790.000             counts
RecordBatchIterationBenchmark.measureSkipIteratorForVariableBatchSize:·gc.time                                        CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15     999.000                 ms
RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize                                            CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15   11345.169 ±  206.528   ops/s
RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize:·gc.alloc.rate                             CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15    2314.800 ±   42.094  MB/sec
RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize:·gc.alloc.rate.norm                        CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15  224714.266 ±    0.028    B/op
RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize:·gc.churn.G1_Eden_Space                    CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15    2320.213 ±   45.521  MB/sec
RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize:·gc.churn.G1_Eden_Space.norm               CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15  225235.965 ±  803.309    B/op
RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize:·gc.churn.G1_Old_Gen                       CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15       0.026 ±    0.005  MB/sec
RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize:·gc.churn.G1_Old_Gen.norm                  CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15       2.551 ±    0.455    B/op
RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize:·gc.count                                  CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15     994.000             counts
RecordBatchIterationBenchmark.measureStreamingIteratorForVariableBatchSize:·gc.time                                   CREATE   RANDOM               ZSTD             200           1000                 2  thrpt   15    1189.000                 ms
JMH benchmarks done

Reviewers: Ismael Juma <ismael@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
2020-11-11 10:11:52 +08:00
..
resources
wrapper MINOR: Upgrade to gradle 6.7 (#9440) 2020-10-15 07:54:43 -07:00
buildscript.gradle MINOR: Use https instead of http in links (#6477) 2019-04-22 11:58:25 -07:00
dependencies.gradle KAFKA-10470: Zstd upgrade and buffering (#9499) 2020-11-11 10:11:52 +08:00
rat.gradle MINOR: Update dependencies for Kafka 2.2 (#6116) 2019-01-10 01:14:30 -08:00
spotbugs-exclude.xml MINOR: Use `Map.forKeyValue` to avoid tuple allocation in Scala 2.13 (#9299) 2020-09-21 16:04:19 -07:00