Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-17642: PreVote response handling and ProspectiveState #18240

Open
wants to merge 22 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
<suppress id="dontUseSystemExit"
files="Exit.java"/>
<suppress checks="ClassFanOutComplexity"
files="(AbstractFetch|Sender|SenderTest|ConsumerCoordinator|KafkaConsumer|KafkaProducer|Utils|TransactionManager|TransactionManagerTest|KafkaAdminClient|NetworkClient|Admin|KafkaRaftClient|KafkaRaftClientTest|RaftClientTestContext|TestingMetricsInterceptingAdminClient).java"/>
files="(AbstractFetch|Sender|SenderTest|ConsumerCoordinator|KafkaConsumer|KafkaProducer|Utils|TransactionManager|TransactionManagerTest|KafkaAdminClient|NetworkClient|Admin|RaftClientTestContext|TestingMetricsInterceptingAdminClient).java"/>
<suppress checks="ClassFanOutComplexity"
files="(SaslServerAuthenticator|SaslAuthenticatorTest).java"/>
<suppress checks="NPath"
Expand All @@ -76,7 +76,7 @@
files="(KerberosLogin|RequestResponseTest|ConnectMetricsRegistry|KafkaConsumer|AbstractStickyAssignor|AbstractRequest|AbstractResponse).java"/>

<suppress checks="ParameterNumber"
files="(NetworkClient|FieldSpec|KafkaRaftClient|KafkaProducer).java"/>
files="(NetworkClient|FieldSpec|KafkaProducer).java"/>
<suppress checks="ParameterNumber"
files="(KafkaConsumer|ConsumerCoordinator).java"/>
<suppress checks="ParameterNumber"
Expand All @@ -91,7 +91,7 @@
files="ClientUtils.java"/>

<suppress checks="ClassDataAbstractionCoupling"
files="(KafkaConsumer|ConsumerCoordinator|AbstractFetch|KafkaProducer|AbstractRequest|AbstractResponse|TransactionManager|Admin|KafkaAdminClient|MockAdminClient|KafkaRaftClient|KafkaRaftClientTest|KafkaNetworkChannelTest).java"/>
files="(KafkaConsumer|ConsumerCoordinator|AbstractFetch|KafkaProducer|AbstractRequest|AbstractResponse|TransactionManager|Admin|KafkaAdminClient|MockAdminClient|KafkaNetworkChannelTest).java"/>
<suppress checks="ClassDataAbstractionCoupling"
files="(Errors|SaslAuthenticatorTest|AgentTest|CoordinatorTest|NetworkClientTest).java"/>

Expand All @@ -102,10 +102,10 @@
files="(AbstractFetch|ClientTelemetryReporter|ConsumerCoordinator|CommitRequestManager|FetchCollector|OffsetFetcherUtils|KafkaProducer|Sender|ConfigDef|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager|AbstractStickyAssignor|DefaultSslEngineFactory|Authorizer|RecordAccumulator|MemoryRecords|FetchSessionHandler|MockAdminClient).java"/>

<suppress checks="JavaNCSS"
files="(AbstractRequest|AbstractResponse|KerberosLogin|WorkerSinkTaskTest|TransactionManagerTest|SenderTest|KafkaAdminClient|ConsumerCoordinatorTest|KafkaAdminClientTest|KafkaRaftClientTest).java"/>
files="(AbstractRequest|AbstractResponse|KerberosLogin|WorkerSinkTaskTest|TransactionManagerTest|SenderTest|KafkaAdminClient|ConsumerCoordinatorTest|KafkaAdminClientTest).java"/>

<suppress checks="NPathComplexity"
files="(ConsumerCoordinator|BufferPool|MetricName|Node|ConfigDef|RecordBatch|SslFactory|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|TokenInformation|Agent|PluginUtils|MiniTrogdorCluster|TasksRequest|KafkaProducer|AbstractStickyAssignor|KafkaRaftClient|Authorizer|FetchSessionHandler|RecordAccumulator|Shell).java"/>
files="(ConsumerCoordinator|BufferPool|MetricName|Node|ConfigDef|RecordBatch|SslFactory|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|TokenInformation|Agent|PluginUtils|MiniTrogdorCluster|TasksRequest|KafkaProducer|AbstractStickyAssignor|Authorizer|FetchSessionHandler|RecordAccumulator|Shell).java"/>

<suppress checks="(JavaNCSS|CyclomaticComplexity|MethodLength)"
files="CoordinatorClient.java"/>
Expand Down Expand Up @@ -187,6 +187,9 @@
<suppress checks="NPathComplexity"
files="(DynamicVoter|RecordsIterator).java"/>

<suppress checks="JavaNCSS"
files="(KafkaRaftClientTest).java"/>

<!-- Streams -->
<suppress checks="ClassFanOutComplexity"
files="(KafkaStreams|KStreamImpl|KTableImpl|InternalTopologyBuilder|StreamsPartitionAssignor|StreamThread|IQv2StoreIntegrationTest|KStreamImplTest|RocksDBStore|StreamTask|TaskManager).java"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ public static VoteRequestData singletonRequest(TopicPartition topicPartition,
int replicaEpoch,
int replicaId,
int lastEpoch,
long lastEpochEndOffset) {
long lastEpochEndOffset,
boolean preVote) {
return new VoteRequestData()
.setClusterId(clusterId)
.setTopics(Collections.singletonList(
Expand All @@ -86,7 +87,8 @@ public static VoteRequestData singletonRequest(TopicPartition topicPartition,
.setReplicaEpoch(replicaEpoch)
.setReplicaId(replicaId)
.setLastOffsetEpoch(lastEpoch)
.setLastOffset(lastEpochEndOffset))
.setLastOffset(lastEpochEndOffset)
.setPreVote(preVote))
)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,7 @@ class RequestQuotaTest extends BaseRequestTest {
new AlterUserScramCredentialsRequest.Builder(new AlterUserScramCredentialsRequestData())

case ApiKeys.VOTE =>
new VoteRequest.Builder(VoteRequest.singletonRequest(tp, null, 1, 2, 0, 10))
new VoteRequest.Builder(VoteRequest.singletonRequest(tp, null, 1, 2, 0, 10, true))

case ApiKeys.BEGIN_QUORUM_EPOCH =>
new BeginQuorumEpochRequest.Builder(BeginQuorumEpochRequest.singletonRequest(tp, null, 2, 5))
Expand Down
207 changes: 22 additions & 185 deletions raft/src/main/java/org/apache/kafka/raft/CandidateState.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,46 +20,36 @@
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.raft.internals.EpochElection;

import org.slf4j.Logger;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class CandidateState implements EpochState {
public class CandidateState implements NomineeState {
private final int localId;
private final Uuid localDirectoryId;
private final int epoch;
private final int retries;
private final Map<Integer, VoterState> voteStates = new HashMap<>();
private final EpochElection epochElection;
private final Optional<LogOffsetMetadata> highWatermark;
private final int electionTimeoutMs;
private final Timer electionTimer;
private final Timer backoffTimer;
private final Logger log;

/**
* The lifetime of a candidate state is the following.
*
* 1. Once started, it would keep record of the received votes.
* 2. If majority votes granted, it can then end its life and will be replaced by a leader state;
* 3. If majority votes rejected or election timed out, it would transit into a backing off phase;
* after the backoff phase completes, it would end its left and be replaced by a new candidate state with bumped retry.
* 1. Once started, it will keep record of the received votes.
* 2. If majority votes granted, it will transition to leader state.
* 3. If majority votes rejected or election timed out, it will transition to prospective.
*/
private boolean isBackingOff;

protected CandidateState(
Time time,
int localId,
Uuid localDirectoryId,
int epoch,
VoterSet voters,
Optional<LogOffsetMetadata> highWatermark,
int retries,
int electionTimeoutMs,
LogContext logContext
) {
Expand All @@ -78,173 +68,48 @@ protected CandidateState(
this.localDirectoryId = localDirectoryId;
this.epoch = epoch;
this.highWatermark = highWatermark;
this.retries = retries;
this.isBackingOff = false;
this.electionTimeoutMs = electionTimeoutMs;
this.electionTimer = time.timer(electionTimeoutMs);
this.backoffTimer = time.timer(0);
this.log = logContext.logger(CandidateState.class);

for (ReplicaKey voter : voters.voterKeys()) {
voteStates.put(voter.id(), new VoterState(voter));
}
voteStates.get(localId).setState(State.GRANTED);
this.epochElection = new EpochElection(voters.voterKeys());
epochElection.recordVote(localId, true);
}

public int localId() {
return localId;
}
ahuang98 marked this conversation as resolved.
Show resolved Hide resolved

public int majoritySize() {
return voteStates.size() / 2 + 1;
}

private long numGranted() {
return votersInState(State.GRANTED).count();
}

private long numUnrecorded() {
return votersInState(State.UNRECORDED).count();
}

/**
* Check if the candidate is backing off for the next election
*/
public boolean isBackingOff() {
return isBackingOff;
}

public int retries() {
return retries;
}

/**
* Check whether we have received enough votes to conclude the election and become leader.
*
* @return true if at least a majority of nodes have granted the vote
*/
public boolean isVoteGranted() {
return numGranted() >= majoritySize();
}

/**
* Check if we have received enough rejections that it is no longer possible to reach a
* majority of grants.
*
* @return true if the vote is rejected, false if the vote is already or can still be granted
*/
public boolean isVoteRejected() {
return numGranted() + numUnrecorded() < majoritySize();
@Override
public EpochElection epochElection() {
return epochElection;
}

/**
* Record a granted vote from one of the voters.
*
* @param remoteNodeId The id of the voter
* @return true if the voter had not been previously recorded
* @throws IllegalArgumentException if the remote node is not a voter or if the vote had already been
* rejected by this node
*/
@Override
public boolean recordGrantedVote(int remoteNodeId) {
VoterState voterState = voteStates.get(remoteNodeId);
if (voterState == null) {
throw new IllegalArgumentException("Attempt to grant vote to non-voter " + remoteNodeId);
} else if (voterState.state().equals(State.REJECTED)) {
if (epochElection().isRejectedVoter(remoteNodeId)) {
throw new IllegalArgumentException("Attempt to grant vote from node " + remoteNodeId +
" which previously rejected our request");
}

boolean recorded = voterState.state().equals(State.UNRECORDED);
voterState.setState(State.GRANTED);

return recorded;
return epochElection().recordVote(remoteNodeId, true);
}

/**
* Record a rejected vote from one of the voters.
*
* @param remoteNodeId The id of the voter
* @return true if the rejected vote had not been previously recorded
* @throws IllegalArgumentException if the remote node is not a voter or if the vote had already been
* granted by this node
*/
@Override
public boolean recordRejectedVote(int remoteNodeId) {
VoterState voterState = voteStates.get(remoteNodeId);
if (voterState == null) {
throw new IllegalArgumentException("Attempt to reject vote to non-voter " + remoteNodeId);
} else if (voterState.state().equals(State.GRANTED)) {
if (epochElection().isGrantedVoter(remoteNodeId)) {
throw new IllegalArgumentException("Attempt to reject vote from node " + remoteNodeId +
" which previously granted our request");
}

boolean recorded = voterState.state().equals(State.UNRECORDED);
voterState.setState(State.REJECTED);

return recorded;
}

/**
* Record the current election has failed since we've either received sufficient rejecting voters or election timed out
*/
public void startBackingOff(long currentTimeMs, long backoffDurationMs) {
this.backoffTimer.update(currentTimeMs);
this.backoffTimer.reset(backoffDurationMs);
this.isBackingOff = true;
}

/**
* Get the set of voters which have not been counted as granted or rejected yet.
*
* @return The set of unrecorded voters
*/
public Set<ReplicaKey> unrecordedVoters() {
return votersInState(State.UNRECORDED).collect(Collectors.toSet());
}

/**
* Get the set of voters that have granted our vote requests.
*
* @return The set of granting voters, which should always contain the ID of the candidate
*/
public Set<Integer> grantingVoters() {
return votersInState(State.GRANTED).map(ReplicaKey::id).collect(Collectors.toSet());
}

/**
* Get the set of voters that have rejected our candidacy.
*
* @return The set of rejecting voters
*/
public Set<Integer> rejectingVoters() {
return votersInState(State.REJECTED).map(ReplicaKey::id).collect(Collectors.toSet());
}

private Stream<ReplicaKey> votersInState(State state) {
return voteStates
.values()
.stream()
.filter(voterState -> voterState.state().equals(state))
.map(VoterState::replicaKey);
return epochElection().recordVote(remoteNodeId, false);
}

@Override
public boolean hasElectionTimeoutExpired(long currentTimeMs) {
electionTimer.update(currentTimeMs);
return electionTimer.isExpired();
}

public boolean isBackoffComplete(long currentTimeMs) {
backoffTimer.update(currentTimeMs);
return backoffTimer.isExpired();
}

public long remainingBackoffMs(long currentTimeMs) {
if (!isBackingOff) {
throw new IllegalStateException("Candidate is not currently backing off");
}
backoffTimer.update(currentTimeMs);
return backoffTimer.remainingMs();
}

@Override
public long remainingElectionTimeMs(long currentTimeMs) {
electionTimer.update(currentTimeMs);
return electionTimer.remainingMs();
Expand All @@ -255,7 +120,7 @@ public ElectionState election() {
return ElectionState.withVotedCandidate(
epoch,
ReplicaKey.of(localId, localDirectoryId),
voteStates.keySet()
epochElection.voterIds()
);
}

Expand Down Expand Up @@ -299,13 +164,12 @@ public boolean canGrantVote(
@Override
public String toString() {
return String.format(
"CandidateState(localId=%d, localDirectoryId=%s,epoch=%d, retries=%d, voteStates=%s, " +
"CandidateState(localId=%d, localDirectoryId=%s,epoch=%d, voteStates=%s, " +
"highWatermark=%s, electionTimeoutMs=%d)",
localId,
localDirectoryId,
epoch,
retries,
voteStates,
epochElection().voterStates(),
highWatermark,
electionTimeoutMs
);
Expand All @@ -318,31 +182,4 @@ public String name() {

@Override
public void close() {}

private static final class VoterState {
private final ReplicaKey replicaKey;
private State state = State.UNRECORDED;

private VoterState(ReplicaKey replicaKey) {
this.replicaKey = replicaKey;
}

public State state() {
return state;
}

public void setState(State state) {
this.state = state;
}

public ReplicaKey replicaKey() {
return replicaKey;
}
}

private enum State {
UNRECORDED,
GRANTED,
REJECTED
}
}
2 changes: 1 addition & 1 deletion raft/src/main/java/org/apache/kafka/raft/EpochState.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ default Optional<LogOffsetMetadata> highWatermark() {
* Decide whether to grant a vote to a replica.
*
* It is the responsibility of the caller to invoke
* {@link QuorumState#transitionToUnattachedVotedState(int, ReplicaKey)} if a standard vote is granted.
* {@link QuorumState#unattachedAddVotedState(int, ReplicaKey)} if a standard vote is granted.
*
* @param replicaKey the id and directory of the replica requesting the vote
* @param isLogUpToDate whether the replica's log is at least as up-to-date as receiver’s log
Expand Down
Loading
Loading