Provide the exact record offset to QuorumController.replay() in all cases. There are several situations where this is useful, such as logging, implementing metadata transactions, or handling broker registration records. In the case where the QC is inactive, and simply replaying records, it is easy to compute the exact record offset from the batch base offset and the record index. The active QC case is more difficult. Technically, when we submit records to the Raft layer, it can choose a batch end offset later than the one we expect, if someone else is also adding records. While the QC is the only entity submitting data records, control records may be added at any time. In the current implementation, these are really only used for leadership elections. However, this could change with the addition of quorum reconfiguration or similar features. Therefore, this PR allows the QC to tell the Raft layer that a record append should fail if it would have resulted in a batch end offset other than what was expected. This in turn will trigger a controller failover. In the future, if automatically added control records become more common, we may wish to have a more sophisticated system than this simple optimistic concurrency mechanism. But for now, this will allow us to rely on the offset as correct. In order that the active QC can learn what offset to start writing at, the PR also adds a new endOffset parameter to handleLeaderChange. Since the Raft layer only invokes handleLeaderChange on the active once it has replayed the log, this information should always be up-to-date in that context. At the Raft level, this PR adds a new exception, UnexpectedEndOffsetException. This gets thrown when we request an end offset that doesn't match the one the Raft layer would have given us. Although this exception should cause a failover, it should not be considered a fault. This complicated the exception handling a bit and motivated splitting more of it out into the new EventHandlerExceptionInfo class. This will also let us unit test things like slf4j log messages a bit better. |
||
|---|---|---|
| .. | ||
| bin | ||
| config | ||
| src | ||
| .gitignore | ||
| README.md | ||
README.md
KRaft (Kafka Raft)
KRaft (Kafka Raft) is a protocol based on the Raft Consensus Protocol tailored for Apache Kafka.
This is used by Apache Kafka in the KRaft (Kafka Raft Metadata) mode. We also have a standalone test server which can be used for performance testing. We describe the details to set this up below.
Run Single Quorum
bin/test-kraft-server-start.sh --config config/kraft.properties
Run Multi Node Quorum
Create 3 separate KRaft quorum properties as the following:
cat << EOF >> config/kraft-quorum-1.properties
node.id=1
listeners=PLAINTEXT://localhost:9092
controller.listener.names=PLAINTEXT
controller.quorum.voters=1@localhost:9092,2@localhost:9093,3@localhost:9094
log.dirs=/tmp/kraft-logs-1
EOF
cat << EOF >> config/kraft-quorum-2.properties
node.id=2
listeners=PLAINTEXT://localhost:9093
controller.listener.names=PLAINTEXT
controller.quorum.voters=1@localhost:9092,2@localhost:9093,3@localhost:9094
log.dirs=/tmp/kraft-logs-2
EOF
cat << EOF >> config/kraft-quorum-3.properties
node.id=3
listeners=PLAINTEXT://localhost:9094
controller.listener.names=PLAINTEXT
controller.quorum.voters=1@localhost:9092,2@localhost:9093,3@localhost:9094
log.dirs=/tmp/kraft-logs-3
EOF
Open up 3 separate terminals, and run individual commands:
bin/test-kraft-server-start.sh --config config/kraft-quorum-1.properties
bin/test-kraft-server-start.sh --config config/kraft-quorum-2.properties
bin/test-kraft-server-start.sh --config config/kraft-quorum-3.properties
Once a leader is elected, it will begin writing to an internal
__raft_performance_test topic with a steady workload of random data.
You can control the workload using the --throughput and --record-size
arguments passed to test-kraft-server-start.sh.