-
Notifications
You must be signed in to change notification settings - Fork 14.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-17642: PreVote response handling and ProspectiveState #18240
base: trunk
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank @ahuang98 . Here is a partial review.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the changes @ahuang98 . Quick review of the changes to **/src/main
.
raft/src/main/java/org/apache/kafka/raft/internals/EpochElection.java
Outdated
Show resolved
Hide resolved
raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftMetrics.java
Show resolved
Hide resolved
// if (quorum.isVoter()) { | ||
// canElectNewLeaderAfterOldLeaderPartitioned fails if we do not bump epoch since it is possible | ||
// that the replica ends up as follower in the same epoch. | ||
// resigned(leaderId=local) -> prospective(leaderId=local) -> follower(leaderId=local) which is illegal | ||
// transitionToProspective(quorum.epoch() + 1, currentTimeMs); | ||
// transitionToCandidate(currentTimeMs); | ||
// } else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the existing raft event simulation tests picked up on a new bug in pollResigned - if we simply replace the transitionToCandidate(currentTimeMs) with transitionToProspective(currentTimeMs), a cordoned leader in epoch 5 could resign in epoch 5, transition to prospective in epoch 5 (with leaderId=localId), fail election and then attempt to become follower of itself in epoch 5.
so far, these are the alternatives which seem reasonable to me:
- resigned voter in epoch X should transition to prospective in epoch X+1
- cons: need to create a special code path just for this case to allow becoming prospective in epoch+1 (would also add trivial complexity for determining if votedKey or leaderId should be kept from prior transition). transitioning to prospective in epoch + 1 is almost as disruptive as transitioning directly to candidate since it involves an epoch bump
- pro: probably the option which follows intentions of past logic most closely
- resigned voter in epoch X should simply transition to unattached in epoch X+1 (current version)
- con: resigned replica has to wait two election timeouts after resignation to become prospective
- pro: simplified logic. unless this is the only replica eligible for leadership in the quorum (e.g. due to network partitioning), the impact of waiting two election timeouts after resignation is small - all other replicas should be starting their own elections within a single fetch timeout/election timeout
- resigned voter in epoch X instead waits a smaller backoffTimeMs before transitioning to unattached in epoch X+1
- con: scope creep - what should this backoff be? additional changes to resignedState
- pro: resigned voter waits less time before becoming eligible to start a new election.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the existing raft event simulation tests picked up on a new bug in pollResigned
What is the exact error? Let's add an unittest to one of the KafkaRaftClient*Test
suite that shows the bug.
attempt to become follower of itself in epoch 5.
Let's add a check to transtitionToFollower
that checks that leaderId
is not equal to localId
.
It makes sense to me that after the resign state the replica should always increase its epoch. The replica resigned from leadership at epoch X so eventually the epoch will be at least X + 1. Did you consider transitioning to candidate and relaxing the transition functions to allow both resigned and prospective to transition to candidate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
discussed offline, transitionToUnattached has existing logic for assigning election timeouts which we can borrow - we can just add an additional if clause that if we came from resignedState, assign electionTimeout to resignedState.electionTimeout which is effectively 0
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ahuang98 review to reply to your comments.
// if (quorum.isVoter()) { | ||
// canElectNewLeaderAfterOldLeaderPartitioned fails if we do not bump epoch since it is possible | ||
// that the replica ends up as follower in the same epoch. | ||
// resigned(leaderId=local) -> prospective(leaderId=local) -> follower(leaderId=local) which is illegal | ||
// transitionToProspective(quorum.epoch() + 1, currentTimeMs); | ||
// transitionToCandidate(currentTimeMs); | ||
// } else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the existing raft event simulation tests picked up on a new bug in pollResigned
What is the exact error? Let's add an unittest to one of the KafkaRaftClient*Test
suite that shows the bug.
attempt to become follower of itself in epoch 5.
Let's add a check to transtitionToFollower
that checks that leaderId
is not equal to localId
.
It makes sense to me that after the resign state the replica should always increase its epoch. The replica resigned from leadership at epoch X so eventually the epoch will be at least X + 1. Did you consider transitioning to candidate and relaxing the transition functions to allow both resigned and prospective to transition to candidate?
yes, I decided not to list that as an option because I felt it was equal to if not worse than the option of having resigned transition to prospective in epoch X + 1. personally I felt it was nicer to have less edge cases to the invariant that only prospective should transition to candidate |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed the changes to src/main
.
prospective.epochElection().rejectingVoters() | ||
); | ||
prospectiveTransitionAfterElectionLoss(prospective, currentTimeMs); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add an else case and throw an illegal state exception.
* On election loss, if replica is prospective it will transition to unattached or follower state. | ||
* If replica is candidate, it will start backing off. | ||
*/ | ||
private void maybeHandleElectionLoss(long currentTimeMs) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about passing the NomineeState
object, checking the subtype of that object and casting to the appropriate subtype.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Like the following?
private void maybeHandleElectionLoss(NomineeState state, long currentTimeMs) {
if (state instanceof CandidateState) {
CandidateState candidate = (CandidateState) state;
...
else if (state instanceof ProspectiveState) {
ProspectiveState prospective = (ProspectiveState) state;
...
Is the intention of the additional parameter to make it clear this method should be called on NomineeState? This seems a bit redundant with the existing QuorumState helpers (e.g. isCandidate() and candidateStateOrThrow()).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. We discussed this offline.
if (prospective.epochElection().isVoteRejected()) { | ||
logger.info( | ||
"Insufficient remaining votes to become candidate (rejected by {}). ", | ||
prospective.epochElection().rejectingVoters() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about just printing the entire epochElection
? It may be useful to know the state of the entire voter set not just the rejecting voters.
logger.info( | ||
"Insufficient remaining votes to become leader (rejected by {}). " + | ||
"We will backoff before retrying election again", | ||
candidate.epochElection().rejectingVoters() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about just printing the entire epochElection
? It may be useful to know the state of the entire voter set not just the rejecting voters.
raft/src/main/java/org/apache/kafka/raft/internals/EpochElection.java
Outdated
Show resolved
Hide resolved
raft/src/main/java/org/apache/kafka/raft/internals/EpochElection.java
Outdated
Show resolved
Hide resolved
@Override | ||
public String toString() { | ||
return String.format( | ||
"VoterState(%s, state=%s)", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"VoterState(replicaKey=%s, state=%s)",
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ReplicaKey's toString method contains the class name so I didn't want to be redundant - String.format("ReplicaKey(id=%d, directoryId=%s)", id, directoryId);
return String.format( | ||
"EpochElection(%s)", | ||
voterStates.values().stream() | ||
.map(VoterState::toString) | ||
.collect( | ||
Collectors.joining(", ")) | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not just print the map?
return String.format(
"EpochElection(voterStates=%s)",
voterStates
);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it felt redundant to print the keys given that the replica ids are also contained in the values. since this is would only be used for debugging though, I'll take your suggestion and just print the entire map
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Quick feedback.
@@ -667,7 +668,8 @@ private boolean maybeTransitionToLeader(CandidateState state, long currentTimeMs | |||
} | |||
|
|||
private boolean maybeTransitionToCandidate(ProspectiveState state, long currentTimeMs) { | |||
if (state.epochElection().isVoteGranted()) { | |||
// If replica is the only voter, it should transition to candidate immediately | |||
if (state.epochElection().isVoteGranted() || quorum.isOnlyVoter()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the quorum has a size of one and since the replica votes for itself when transitioning to prospective, isVoteGranted()
should always return true. If so, the replica doesn't need to check if it is the only voter.
Let's confirm we have a test for this in KafkaRaftClientTest. If not, let's add a test.
Let's also confirm that we have a test for this in ProspectiveStateTest and CandidateStateTest. If not, let's add tests for these cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added four tests for this, starting at testInitializeAsOnlyVoterWithEmptyElectionState
confirm that we have a test for this in ProspectiveStateTest and CandidateStateTest
Confirmed!
transitionToCandidate(currentTimeMs); | ||
// When there is only a single voter, become prospective immediately. | ||
// transitionToProspective will handle short-circuiting voter to candidate state | ||
if (quorum.isOnlyVoter() && !quorum.isNomineeState() && !quorum.isLeader()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you check that is not leader? In KRaft a replica should never start as a leader. KRaft throws and illegal state exception if it starts as leader. See line 545 above.
if (quorum.isLeader()) {
throw new IllegalStateException("Voter cannot initialize as a Leader");
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed offline, technically the replica can transition to leader due to the above conditional.
We can improve this conditional by directly checking if the replica is Unattached or Follower, and merge this conditional into the above conditional
@@ -154,6 +155,163 @@ private ReplicaKey replicaKey(int id, boolean withDirectoryId) { | |||
return ReplicaKey.of(id, directoryId); | |||
} | |||
|
|||
@ParameterizedTest |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've organized QuorumStateTest in the following way - misc tests were pulled to the front. All other tests are organized under banners (e.g. Initialization tests, Tests of transitions from state X)
@@ -302,7 +303,7 @@ public void testGrantVotesWhenShuttingDown(boolean withKip853Rpc) throws Excepti | |||
|
|||
@ParameterizedTest | |||
@ValueSource(booleans = { true, false }) | |||
public void testInitializeAsResignedAndBecomeCandidate(boolean withKip853Rpc) throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The diff is misleading here. This test was just removed because I found it was a duplicate of testInitializeAsResignedLeaderFromStateStore
@@ -162,16 +218,18 @@ public void shouldRecordVoterQuorumState(KRaftVersion kraftVersion) { | |||
getMetric(metrics, "current-vote-directory-id").metricValue() | |||
); | |||
assertEquals((double) 1, getMetric(metrics, "current-epoch").metricValue()); | |||
assertEquals((double) -1L, getMetric(metrics, "high-watermark").metricValue()); | |||
assertEquals((double) -1L, getMetric(metrics, "high-watermark").metricValue()); // todo, bug fix |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jsancio the HW drops to -1L after candidate transitions to leader - if you agree this is a bug I'll file a Jira for this
KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-996%3A+Pre-Vote
Jira: https://issues.apache.org/jira/browse/KAFKA-16164
Implements items 2-4 which cover response handling and new ProspectiveState:
Committer Checklist (excluded from commit message)