KAFKA-9555 Added default RLMM implementation based on internal topic storage.
This is the initial version of the default RLMM implementation.
This includes changes containing default RLMM configs, RLMM implementation, producer/consumer managers.
Introduced TopicBasedRemoteLogMetadataManagerHarness which takes care of bringing up a Kafka cluster and create remote log metadata topic and initializes TopicBasedRemoteLogMetadataManager.
Refactored existing RemoteLogMetadataCacheTest to RemoteLogSegmentLifecycleTest to have parameterized tests to run both RemoteLogMetadataCache and also TopicBasedRemoteLogMetadataManager.
Refactored existing InmemoryRemoteLogMetadataManagerTest, RemoteLogMetadataManagerTest to have parameterized tests to run both InmemoryRemoteLogMetadataManager and also TopicBasedRemoteLogMetadataManager.
This is part of tiered storage KIP-405 efforts.
Reviewers: Kowshik Prakasam <kprakasam@confluent.io>, Cong Ding <cong@ccding.com>, Jun Rao <junrao@gmail.com>
Support the KIP-455 reassignment API when in KRaft mode. Reassignments
which merely rearrange partitions complete immediately. Those that only
remove a partition complete immediately if the ISR would be non-empty
after the specified removals. Reassignments that add one or more
partitions follow the KIP-455 pattern of adding all the adding replicas
to the replica set, and then waiting for the ISR to include all the new
partitions before completing. Changes to the partition sets are
accomplished via PartitionChangeRecord.
Reviewers: Jun Rao <junrao@gmail.com>
1) Bring the generation field back to the CooperativeStickyAssignor so we don't need to rely so heavily on the ConsumerCoordinator properly updating its SubscriptionState after eg falling out of the group. The plain StickyAssignor always used the generation since it had to, so we just make sure the CooperativeStickyAssignor has this tool as well
2) In case of unforeseen problems or further bugs that slip past the generation field safety net, the assignor will now explicitly look out for partitions that are being claimed by multiple consumers as owned in the same generation. Such a case should never occur, but if it does, we have to invalidate this partition from the ownedPartitions of both consumers, since we can't tell who, if anyone, has the valid claim to this partition.
3) Fix a subtle bug that I discovered while writing tests for the above two fixes: in the constrained algorithm, we compute the exact number of partitions each consumer should end up with, and keep track of the "unfilled" members who must -- or might -- require more partitions to hit their quota. The problem was that members at the minQuota were being considered as "unfilled" even after we had already hit the maximum number of consumers allowed to go up to the maxQuota, meaning those minQuota members could/should not accept any more partitions beyond that. I believe this was introduced in #10509, so it shouldn't be in any released versions and does not need to be backported.
Reviewers: Guozhang Wang <guozhang@apache.org>, Luke Chen <showuon@gmail.com>
Updated FetchRequest and FetchResponse to use topic IDs rather than topic names.
Some of the complicated code is found in FetchSession and FetchSessionHandler.
We need to be able to store topic IDs and maintain a cache on the broker for IDs that may not have been resolved. On incremental fetch requests, we will try to resolve them or remove them if in toForget.
Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>, Chia-Ping Tsai <chia7712@gmail.com>, Jun Rao <junrao@gmail.com>
This implements the request and response portion of KIP-709. It updates the OffsetFetch request and response to support fetching offsets for multiple consumer groups at a time. If the broker does not support the new OffsetFetch version, clients can revert to the previous behaviour and use a request for each coordinator.
Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>, Konstantine Karantasis <konstantine@confluent.io>
Create the image/ module for storing, reading, and writing broker metadata images.
Metadata images are immutable. New images are produced from existing images
using delta classes. Delta classes are mutable, and represent changes to a base
image.
MetadataImage objects can be converted to lists of KRaft metadata records. This
is essentially writing a KRaft snapshot. The resulting snapshot can be read
back into a MetadataDelta object. In practice, we will typically read the
snapshot, and then read a few more records to get fully up to date. After that,
the MetadataDelta can be converted to a MetadataImage as usual.
Sometimes, we have to load a snapshot even though we already have an existing
non-empty MetadataImage. We would do this if the broker fell too far behind and
needed to receive a snapshot to catch up. This is handled just like the normal
snapshot loading process. Anything that is not in the snapshot will be marked
as deleted in the MetadataDelta once finishSnapshot() is called.
In addition to being used for reading and writing snapshots, MetadataImage also
serves as a cache for broker information in memory. A follow-up PR will replace
MetadataCache, CachedConfigRepository, and the client quotas cache with the
corresponding Image classes. TopicsDelta also replaces the "deferred
partition" state that the RaftReplicaManager currently implements. (That change
is also in a follow-up PR.)
Reviewers: Jason Gustafson <jason@confluent.io>, David Arthur <mumrah@gmail.com>
Implements KIP-745 https://cwiki.apache.org/confluence/display/KAFKA/KIP-745%3A+Connect+API+to+restart+connector+and+tasks to change connector REST API to restart a connector and its tasks as a whole.
Testing strategy
- [x] Unit tests added for all possible combinations of onlyFailed and includeTasks
- [x] Integration tests added for all possible combinations of onlyFailed and includeTasks
- [x] System tests for happy path
Reviewers: Randall Hauch <rhauch@gmail.com>, Diego Erdody <erdody@gmail.com>, Konstantine Karantasis <k.karantasis@gmail.com>
Add header and footer records for raft snapshots. This helps identify when the snapshot
starts and ends. The header also contains a time. The time field is currently set to 0.
KAFKA-12997 will add in the necessary wiring to use the correct timestamp.
Reviewers: Jose Sancio <jsancio@gmail.com>, Colin P. McCabe <cmccabe@apache.org>
We had been using `RecordAccumulator.beginFlush` in order to force the `RecordAccumulator` to flush pending batches when a transaction was being completed. Internally, `RecordAccumulator` has a simple counter for the number of flushes in progress. The count gets incremented in `beginFlush` and it is expected to be decremented by `awaitFlushCompletion`. The second call to decrement the counter never happened in the transactional path, so the counter could get stuck at a positive value, which means that the linger time would effectively be ignored.
This patch fixes the problem by removing the use of `beginFlush` in `Sender`. Instead, we now add an additional condition in `RecordAccumulator` to explicitly check when a transaction is being completed.
Reviewers: Guozhang Wang <wangguoz@gmail.com>
Added tiered storage related configs including remote log manager configs.
Added local log retention configs to LogConfig.
Added tests for the added configs.
Reviewers: Kowshik Prakasam <kprakasam@confluent.io>, Jun Rao <junrao@gmail.com>
They have been deprecated since 0.10.0. Full list of removes configs:
* port
* host.name
* advertised.port
* advertised.host.name
Also adjust tests to take the removals into account. Some tests were
no longer relevant and have been removed.
Finally, took the chance to:
* Clean up unnecessary usage of `KafkaConfig$.MODULE$` in
related files.
* Add missing `Test` annotations to `AdvertiseBrokerTest` and
make necessary changes for the tests to pass.
Reviewers: David Jacot <djacot@confluent.io>, Luke Chen <showuon@gmail.com>
Directly use `RaftClient.Listener`, `SnapshotWriter` and `SnapshotReader` in the quorum controller.
1. Allow `RaftClient` users to create snapshots by specifying the last committed offset and last committed epoch. These values are validated against the log and leader epoch cache.
2. Remove duplicate classes in the metadata module for writing and reading snapshots.
3. Changed the logic for comparing snapshots. The old logic was assuming a certain batch grouping. This didn't match the implementation of the snapshot writer. The snapshot writer is free to merge batches before writing them.
4. Improve `LocalLogManager` to keep track of multiple snapshots.
5. Improve the documentation and API for the snapshot classes to highlight the distinction between the offset of batches in the snapshot vs the offset of batches in the log. These two offsets are independent of one another. `SnapshotWriter` and `SnapshotReader` expose a method called `lastOffsetFromLog` which represents the last inclusive offset from the log that is represented in the snapshot.
Reviewers: dengziming <swzmdeng@163.com>, Jason Gustafson <jason@confluent.io>
This PR includes adding the NamedTopology to the Subscription/AssignmentInfo, and to the StateDirectory so it can place NamedTopology tasks within the hierarchical structure with task directories under the NamedTopology parent dir.
Reviewers: Walker Carlson <wcarlson@confluent.io>, Guozhang Wang <guozhang@confluent.io>
This patch removes the temporary shim layer we added to bridge the interface
differences between MetaLogManager and RaftClient. Instead, we now use the
RaftClient directly from the metadata module. This also means that the
metadata gradle module now depends on raft, rather than the other way around.
Finally, this PR also consolidates the handleResign and handleNewLeader APIs
into a single handleLeaderChange API.
Co-authored-by: Jason Gustafson <jason@confluent.io>
Implement a striped replica placement algorithm for KRaft. This also
means implementing rack awareness. Previously, KRraft just chose
replicas randomly in a non-rack-aware fashion. Also, allow replicas to
be placed on fenced brokers if there are no other choices. This was
specified in KIP-631 but previously not implemented.
Reviewers: Jun Rao <junrao@gmail.com>
The QuorumController should honor the timeout for RPC requests
which feature a timeout. For electLeaders, attempt to trigger a leader
election for all partitions when the request specifies null for the topics
argument.
Reviewers: David Arthur <mumrah@gmail.com>
Introduce List serde for primitive types or custom serdes with a serializer and a deserializer according to KIP-466
Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>, Matthias J. Sax <mjsax@conflunet.io>, John Roesler <roesler@confluent.io>, Michael Noll <michael@confluent.io>
Added server-common module to have server side common classes. Moved ApiMessageAndVersion, RecordSerde, AbstractApiMessageSerde, and BytesApiMessageSerde to server-common module.
Reivewers: Kowshik Prakasam <kprakasam@confluent.io>, Jun Rao <junrao@gmail.com>
KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage. This topic will receive events of RemoteLogSegmentMetadata, RemoteLogSegmentUpdate, and RemotePartitionDeleteMetadata. These events are serialized into Kafka protocol message format.
Added tests for all the event types for that topic.
This is part of the tiered storaqe implementation KIP-405.
Reivewers: Kowshik Prakasam <kprakasam@confluent.io>, Jun Rao <junrao@gmail.com>
Implement Raft Snapshot loading API.
1. Adds a new method `handleSnapshot` to `raft.Listener` which is called whenever the `RaftClient` determines that the `Listener` needs to load a new snapshot before reading the log. This happens when the `Listener`'s next offset is less than the log start offset also known as the earliest snapshot.
2. Adds a new type `SnapshotReader<T>` which provides a `Iterator<Batch<T>>` interface and de-serializes records in the `RawSnapshotReader` into `T`s
3. Adds a new type `RecordsIterator<T>` that implements an `Iterator<Batch<T>>` by scanning a `Records` object and deserializes the batches and records into `Batch<T>`. This type is used by both `SnapshotReader<T>` and `RecordsBatchReader<T>` internally to implement the `Iterator` interface that they expose.
4. Changes the `MockLog` implementation to read one or two batches at a time. The previous implementation always read from the given offset to the high-watermark. This made it impossible to test interesting snapshot loading scenarios.
5. Removed `throws IOException` from some methods. Some of types were inconsistently throwing `IOException` in some cases and throwing `RuntimeException(..., new IOException(...))` in others. This PR improves the consistent by wrapping `IOException` in `RuntimeException` in a few more places and replacing `Closeable` with `AutoCloseable`.
6. Updated the Kafka Raft simulation test to take into account snapshot. `ReplicatedCounter` was updated to generate snapshot after 10 records get committed. This means that the `ConsistentCommittedData` validation was extended to take snapshots into account. Also added a new invariant to ensure that the log start offset is consistently set with the earliest snapshot.
Reviewers: dengziming <swzmdeng@163.com>, David Arthur <mumrah@gmail.com>, Jason Gustafson <jason@confluent.io>
Fixes the issue with https://issues.apache.org/jira/browse/KAFKA-10847.
To fix the above problem, the left/outer stream-stream join processor uses a buffer to hold non-joined records for some time until the window closes, so they are not processed if a join is found during the join window time. If the window of a record closes and a join was not found, then this should be emitted and processed by the consequent topology processor.
A new time-ordered window store is used to temporary hold records that do not have a join and keep the records keys ordered by time. The KStreamStreamJoin has a reference to this new store . For every non-joined record seen, the processor writes it to this new state store without processing it. When a joined record is seen, the processor deletes the joined record from the new state store to prevent further processing.
Records that were never joined at the end of the window + grace period are emitted to the next topology processor. I use the stream time to check for the expiry time for determinism results . The KStreamStreamJoin checks for expired records and emit them every time a new record is processed in the join processor.
The new state store is shared with the left and right join nodes. The new store needs to serialize the record keys using a combined key of <joinSide-recordKey>. This key combination helps to delete the records from the other join if a joined record is found. Two new serdes are created for this, KeyAndJoinSideSerde which serializes a boolean value that specifies the side where the key is found, and ValueOrOtherValueSerde that serializes either V1 or V2 based on where the key was found.
Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
Deprecates the following
1. StreamsConfig.EXACTLY_ONCE
2. StreamsConfig.EXACTLY_ONCE_BETA
3. Producer#sendOffsetsToTransaction(Map offsets, String consumerGroupId)
And introduces a new StreamsConfig.EXACTLY_ONCE_V2 config. Additionally, this PR replaces usages of the term "eos-beta" throughout the code with the term "eos-v2"
Reviewers: Matthias J. Sax <mjsax@confluent.io>
Implement the createPartitions RPC which adds more partitions to a topic
in the KIP-500 controller. Factor out some of the logic for validating
manual partition assignments, so that it can be shared between
createTopics and createPartitions. Add a startPartition argument to the
replica placer.
Reviewers: Jason Gustafson <jason@confluent.io>
KAFKA-12368: Added inmemory implementations for RemoteStorageManager and RemoteLogMetadataManager.
Added inmemory implementation for RemoteStorageManager and RemoteLogMetadataManager. A major part of inmemory RLMM will be used in the default RLMM implementation which will be based on topic storage. These will be used in unit tests for tiered storage.
Added tests for both the implementations and their supported classes.
This is part of tiered storage implementation, KIP-405.
Reivewers: Kowshik Prakasam <kprakasam@confluent.io>, Jun Rao <junrao@gmail.com>
Implement controller-side snapshot generation.Implement QuorumController snapshot
generation. Note that this PR does not handle KRaft integration, just the internal
snapshot record generation and consumption logic.
Reading a snapshot is relatively straightforward. When the QuorumController
starts up, it loads the most recent snapshot. This is just a series of records
that we replay, plus a log offset ("snapshot epoch") that we advance to.
Writing a snapshot is more complex. There are several components:
the SnapshotWriter which persists the snapshot, the SnapshotGenerator
which manages writing each batch of records, and the SnapshotGeneratorManager
which interfaces the preceding two classes with the event queue.
Controller snapshots are done incrementally. In order to avoid blocking the
controller thread for a long time, we pull a few record batches at a time from
our record batch iterators. These iterators are implemented by controller
manager classes such as ReplicationControlManager, ClusterControlManager, etc.
Finally, this PR adds ControllerTestUtils#deepSortRecords and
ControllerTestUtils#assertBatchIteratorContains, which make it easier to write
unit tests. Since records are often constructed from unsorted data structures,
it is often useful to sort them before comparing them.
Reviewers: David Arthur <mumrah@gmail.com>
1. Add `canGrantVote` to `EpochState`
2. Move the if-else in `KafkaRaftCllient.handleVoteRequest` to `EpochState`
3. Add unit tests for `canGrantVote`
Reviewers: Jason Gustafson <jason@confluent.io>
Introduce "testkit" package which includes KafkaClusterTestKit class for enabling integration tests of self-managed clusters. Also make use of this new integration test harness in the ClusterTestExtentions JUnit extension.
Adds RaftClusterTest for basic self-managed integration test.
Reviewers: Jason Gustafson <jason@confluent.io>, Colin P. McCabe <cmccabe@apache.org>
Co-authored-by: Colin P. McCabe <cmccabe@apache.org>
Previously we implemented ClusterId validation for the Fetch API in the Raft implementation. This patch adds ClusterId validation to the remaining Raft RPCs.
Reviewers: José Armando García Sancio <jsancio@users.noreply.github.com>, Jason Gustafson <jason@confluent.io>
This patch changes the raft simulation tests to use jqwik, which is a property testing library. This provides two main benefits:
- It simplifies the randomization of test parameters. Currently the tests use a fixed set of `Random` seeds, which means that most builds are doing redundant work. We get a bigger benefit from allowing each build to test different parameterizations.
- It makes it easier to reproduce failures. Whenever a test fails, jqwik will report the random seed that failed. A developer can then modify the `@Property` annotation to use that specific seed in order to reproduce the failure.
This patch also includes an optimization for `MockLog.earliestSnapshotId` which reduces the time to run the simulation tests dramatically.
Reviewers: Ismael Juma <ismael@juma.me.uk>, Chia-Ping Tsai <chia7712@gmail.com>, José Armando García Sancio <jsancio@gmail.com>, David Jacot <djacot@confluent.io>
1. Remove org.apache.log4j from allowed import list of shell, trogdor subpackage; they uses slf4j, not log4.
2. Remove org.slf4j from allowed import list of clients, server subpackage: org.slf4j is allowed globally.
3. Remove org.apache.log4j from streams subpackage's allowed import list
Reviewers: David Jacot <david.jacot@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
This patch enables delete topic support for the new KIP-500 controller. Also fixes the following:
- Fix a bug where feature level records were not correctly replayed.
- Fix a bug in TimelineHashMap#remove where the wrong type was being returned.
Reviewers: Jason Gustafson <jason@confluent.io>, Justine Olshan <jolshan@confluent.io>, Ron Dagostino <rdagostino@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>, Jun Rao <junrao@gmail.com>
Co-authored-by: Jason Gustafson <jason@confluent.io>
Always grab a new thread.id and verify that a thread has fully shut down to DEAD before removing from the `threads` list and making that id available again
Reviewers: Walker Carlson <wcarlson@confluent.io>, Bruno Cadonna <cadonna@confluent.io>
Implements KIP-695
Reverts a previous behavior change to Consumer.poll and replaces
it with a new Consumer.currentLag API, which returns the client's
currently cached lag.
Uses this new API to implement the desired task idling semantics
improvement from KIP-695.
Reverts fdcf8fbf72 / KAFKA-10866: Add metadata to ConsumerRecords (#9836)
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Guozhang Wang <guozhang@apache.org>
The quorum controller stores metadata in the KIP-500 metadata log, not in Apache
ZooKeeper. Each controller node is a voter in the metadata quorum. The leader of the
quorum is the active controller, which processes write requests. The followers are standby
controllers, which replay the operations written to the log. If the active controller goes away,
a standby controller can take its place.
Like the ZooKeeper-based controller, the quorum controller is based on an event queue
backed by a single-threaded executor. However, unlike the ZK-based controller, the quorum
controller can have multiple operations in flight-- it does not need to wait for one operation
to be finished before starting another. Therefore, calls into the QuorumController return
CompleteableFuture objects which are completed with either a result or an error when the
operation is done. The QuorumController will also time out operations that have been
sitting on the queue too long without being processed. In this case, the future is completed
with a TimeoutException.
The controller uses timeline data structures to store multiple "versions" of its in-memory
state simultaneously. "Read operations" read only committed state, which is slightly older
than the most up-to-date in-memory state. "Write operations" read and write the latest
in-memory state. However, we can not return a successful result for a write operation until
its state has been committed to the log. Therefore, if a client receives an RPC response, it
knows that the requested operation has been performed, and can not be undone by a
controller failover.
Reviewers: Jun Rao <junrao@gmail.com>, Ron Dagostino <rdagostino@confluent.io>
The Kafka Metadata shell is a new command which allows users to
interactively examine the metadata stored in a KIP-500 cluster.
It can examine snapshot files that are specified via --snapshot.
The metadata tool works by replaying the log and storing the state into
in-memory nodes. These nodes are presented in a fashion similar to
filesystem directories.
Reviewers: Jason Gustafson <jason@confluent.io>, David Arthur <mumrah@gmail.com>, Igor Soarez <soarez@apple.com>
Previously all APIs were accessible on every listener exposed by the broker, but
with KIP-500, that is no longer true. We now have more complex requirements for
API accessibility.
For example, the KIP-500 controller exposes some APIs which are not exposed by
brokers, such as BrokerHeartbeatRequest, and does not expose most client APIs,
such as JoinGroupRequest, etc. Similarly, the KIP-500 broker does not implement
some APIs that the ZK-based broker does, such as LeaderAndIsrRequest and
UpdateFeaturesRequest.
All of this means that we need more sophistication in how we expose APIs and
keep them consistent with the ApiVersions API. Up until now, we have been
working around this using the controllerOnly flag inside ApiKeys, but this is
not rich enough to support all of the cases listed above. This PR introduces a
new "listeners" field to the request schema definitions. This field is an array
of strings which indicate the listener types in which the API should be exposed.
We currently support "zkBroker", "broker", and "controller". ("broker"
indicates the KIP-500 broker, whereas zkBroker indicates the old broker).
This PR also creates ApiVersionManager to encapsulate the creation of the
ApiVersionsResponse based on the listener type. Additionally, it modifies
SocketServer to check the listener type of received requests before forwarding
them to the request handler.
Finally, this PR also fixes a bug in the handling of the ApiVersionsResponse
prior to authentication. Previously a static response was sent, which means that
changes to features would not get reflected. This also meant that the logic to
ensure that only the intersection of version ranges supported by the controller
would get exposed did not work. I think this is important because some clients
rely on the initial pre-authenticated ApiVersions response rather than doing a
second round after authentication as the Java client does.
One final cleanup note: I have removed the expectation that envelope requests
are only allowed on "privileged" listeners. This made sense initially because
we expected to use forwarding before the KIP-500 controller was available. That
is not the case anymore and we expect the Envelope API to only be exposed on the
controller listener. I have nevertheless preserved the existing workarounds to
allow verification of the forwarding behavior in integration testing.
Reviewers: Colin P. McCabe <cmccabe@apache.org>, Ismael Juma <ismael@juma.me.uk>
This PR adds the KIP-500 BrokerServer and ControllerServer classes and
makes some related changes to get them working. Note that the ControllerServer
does not instantiate a QuorumController object yet, since that will be added in
PR #10070.
* Add BrokerServer and ControllerServer
* Change ApiVersions#computeMaxUsableProduceMagic so that it can handle
endpoints which do not support PRODUCE (such as KIP-500 controller nodes)
* KafkaAdminClientTest: fix some lingering references to decommissionBroker
that should be references to unregisterBroker.
* Make some changes to allow SocketServer to be used by ControllerServer as
we as by the broker.
* We now return a random active Broker ID as the Controller ID in
MetadataResponse for the Raft-based case as per KIP-590.
* Add the RaftControllerNodeProvider
* Add EnvelopeUtils
* Add MetaLogRaftShim
* In ducktape, in config_property.py: use a KIP-500 compatible cluster ID.
Reviewers: Colin P. McCabe <cmccabe@apache.org>, David Arthur <mumrah@gmail.com>
We don't really need it and it causes problems in older Android versions
and GraalVM native image usage (there are workarounds for the latter).
Move the logic to separate classes that are only invoked when the
relevant compression library is actually used. Place such classes
in their own package and enforce via checkstyle that only these
classes refer to compression library packages.
To avoid cyclic dependencies, moved `BufferSupplier` to the `utils`
package.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
Add MetaLogListener, LocalLogManager, and related classes. These
classes are used by the KIP-500 controller and broker to interface with the
Raft log.
Also add the Controller interface. The implementation will be added in a separate PR.
Reviewers: Ron Dagostino <rdagostino@confluent.io>, David Arthur <mumrah@gmail.com>
Add KafkaEventQueue, which is used by the KIP-500 controller to manage its event queue.
Compared to using an Executor, KafkaEventQueue has the following advantages:
* Events can be given "deadlines." If an event lingers in the queue beyond the deadline, it
will be completed with a timeout exception. This is useful for implementing timeouts for
controller RPCs.
* Events can be prepended to the queue as well as appended.
* Events can be given tags to make them easier to manage. This is especially useful for
rescheduling or cancelling events which were previously scheduled to execute in the future.
Reviewers: Jun Rao <junrao@gmail.com>, José Armando García Sancio <jsancio@gmail.com>
This patch adds a `RecordSerde` implementation for the metadata record format expected by KIP-631.
Reviewers: Colin McCabe <cmccabe@apache.org>, Ismael Juma <mlists@juma.me.uk>
Replace BrokerStates.scala with BrokerState.java, to make it easier to use from Java code if needed. This also makes it easier to go from a numeric type to an enum.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>