Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-17642: PreVote response handling and ProspectiveState #18240

Open
wants to merge 22 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
<suppress id="dontUseSystemExit"
files="Exit.java"/>
<suppress checks="ClassFanOutComplexity"
files="(AbstractFetch|Sender|SenderTest|ConsumerCoordinator|KafkaConsumer|KafkaProducer|Utils|TransactionManager|TransactionManagerTest|KafkaAdminClient|NetworkClient|Admin|KafkaRaftClient|KafkaRaftClientTest|RaftClientTestContext|TestingMetricsInterceptingAdminClient).java"/>
files="(AbstractFetch|Sender|SenderTest|ConsumerCoordinator|KafkaConsumer|KafkaProducer|Utils|TransactionManager|TransactionManagerTest|KafkaAdminClient|NetworkClient|Admin|RaftClientTestContext|TestingMetricsInterceptingAdminClient).java"/>
<suppress checks="ClassFanOutComplexity"
files="(SaslServerAuthenticator|SaslAuthenticatorTest).java"/>
<suppress checks="NPath"
Expand All @@ -76,7 +76,7 @@
files="(KerberosLogin|RequestResponseTest|ConnectMetricsRegistry|KafkaConsumer|AbstractStickyAssignor|AbstractRequest|AbstractResponse).java"/>

<suppress checks="ParameterNumber"
files="(NetworkClient|FieldSpec|KafkaRaftClient|KafkaProducer).java"/>
files="(NetworkClient|FieldSpec|KafkaProducer).java"/>
<suppress checks="ParameterNumber"
files="(KafkaConsumer|ConsumerCoordinator).java"/>
<suppress checks="ParameterNumber"
Expand All @@ -91,7 +91,7 @@
files="ClientUtils.java"/>

<suppress checks="ClassDataAbstractionCoupling"
files="(KafkaConsumer|ConsumerCoordinator|AbstractFetch|KafkaProducer|AbstractRequest|AbstractResponse|TransactionManager|Admin|KafkaAdminClient|MockAdminClient|KafkaRaftClient|KafkaRaftClientTest|KafkaNetworkChannelTest).java"/>
files="(KafkaConsumer|ConsumerCoordinator|AbstractFetch|KafkaProducer|AbstractRequest|AbstractResponse|TransactionManager|Admin|KafkaAdminClient|MockAdminClient|KafkaNetworkChannelTest).java"/>
<suppress checks="ClassDataAbstractionCoupling"
files="(Errors|SaslAuthenticatorTest|AgentTest|CoordinatorTest|NetworkClientTest).java"/>

Expand All @@ -102,10 +102,10 @@
files="(AbstractFetch|ClientTelemetryReporter|ConsumerCoordinator|CommitRequestManager|FetchCollector|OffsetFetcherUtils|KafkaProducer|Sender|ConfigDef|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager|AbstractStickyAssignor|DefaultSslEngineFactory|Authorizer|RecordAccumulator|MemoryRecords|FetchSessionHandler|MockAdminClient).java"/>

<suppress checks="JavaNCSS"
files="(AbstractRequest|AbstractResponse|KerberosLogin|WorkerSinkTaskTest|TransactionManagerTest|SenderTest|KafkaAdminClient|ConsumerCoordinatorTest|KafkaAdminClientTest|KafkaRaftClientTest).java"/>
files="(AbstractRequest|AbstractResponse|KerberosLogin|WorkerSinkTaskTest|TransactionManagerTest|SenderTest|KafkaAdminClient|ConsumerCoordinatorTest|KafkaAdminClientTest).java"/>

<suppress checks="NPathComplexity"
files="(ConsumerCoordinator|BufferPool|MetricName|Node|ConfigDef|RecordBatch|SslFactory|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|TokenInformation|Agent|PluginUtils|MiniTrogdorCluster|TasksRequest|KafkaProducer|AbstractStickyAssignor|KafkaRaftClient|Authorizer|FetchSessionHandler|RecordAccumulator|Shell).java"/>
files="(ConsumerCoordinator|BufferPool|MetricName|Node|ConfigDef|RecordBatch|SslFactory|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|TokenInformation|Agent|PluginUtils|MiniTrogdorCluster|TasksRequest|KafkaProducer|AbstractStickyAssignor|Authorizer|FetchSessionHandler|RecordAccumulator|Shell).java"/>

<suppress checks="(JavaNCSS|CyclomaticComplexity|MethodLength)"
files="CoordinatorClient.java"/>
Expand Down Expand Up @@ -187,6 +187,9 @@
<suppress checks="NPathComplexity"
files="(DynamicVoter|RecordsIterator).java"/>

<suppress checks="JavaNCSS"
files="(KafkaRaftClientTest).java"/>

<!-- Streams -->
<suppress checks="ClassFanOutComplexity"
files="(KafkaStreams|KStreamImpl|KTableImpl|InternalTopologyBuilder|StreamsPartitionAssignor|StreamThread|IQv2StoreIntegrationTest|KStreamImplTest|RocksDBStore|StreamTask|TaskManager).java"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ public static VoteRequestData singletonRequest(TopicPartition topicPartition,
int replicaEpoch,
int replicaId,
int lastEpoch,
long lastEpochEndOffset) {
long lastEpochEndOffset,
boolean preVote) {
return new VoteRequestData()
.setClusterId(clusterId)
.setTopics(Collections.singletonList(
Expand All @@ -86,7 +87,8 @@ public static VoteRequestData singletonRequest(TopicPartition topicPartition,
.setReplicaEpoch(replicaEpoch)
.setReplicaId(replicaId)
.setLastOffsetEpoch(lastEpoch)
.setLastOffset(lastEpochEndOffset))
.setLastOffset(lastEpochEndOffset)
.setPreVote(preVote))
)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,7 @@ class RequestQuotaTest extends BaseRequestTest {
new AlterUserScramCredentialsRequest.Builder(new AlterUserScramCredentialsRequestData())

case ApiKeys.VOTE =>
new VoteRequest.Builder(VoteRequest.singletonRequest(tp, null, 1, 2, 0, 10))
new VoteRequest.Builder(VoteRequest.singletonRequest(tp, null, 1, 2, 0, 10, true))

case ApiKeys.BEGIN_QUORUM_EPOCH =>
new BeginQuorumEpochRequest.Builder(BeginQuorumEpochRequest.singletonRequest(tp, null, 2, 5))
Expand Down
173 changes: 24 additions & 149 deletions raft/src/main/java/org/apache/kafka/raft/CandidateState.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,38 +20,33 @@
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.raft.internals.EpochElection;

import org.slf4j.Logger;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class CandidateState implements EpochState {
public class CandidateState implements NomineeState {
private final int localId;
private final Uuid localDirectoryId;
private final int epoch;
private final int retries;
private final Map<Integer, VoterState> voteStates = new HashMap<>();
private final EpochElection epochElection;
private final Optional<LogOffsetMetadata> highWatermark;
private final int electionTimeoutMs;
private final Timer electionTimer;
private final Timer backoffTimer;
private final Logger log;

private boolean isBackingOff;
/**
* The lifetime of a candidate state is the following.
*
* 1. Once started, it would keep record of the received votes.
* 2. If majority votes granted, it can then end its life and will be replaced by a leader state;
* 3. If majority votes rejected or election timed out, it would transit into a backing off phase;
* after the backoff phase completes, it would end its left and be replaced by a new candidate state with bumped retry.
* 1. Once started, it will send vote requests and keep record of the received vote responses.
* 2. If majority votes granted, it will transition to leader state.
* 3. If majority votes rejected, it will transition to prospective after a backoff phase.
* 4. If election times out, it will transition immediately to prospective.
*/
private boolean isBackingOff;

protected CandidateState(
Time time,
int localId,
Expand Down Expand Up @@ -85,26 +80,8 @@ protected CandidateState(
this.backoffTimer = time.timer(0);
this.log = logContext.logger(CandidateState.class);

for (ReplicaKey voter : voters.voterKeys()) {
voteStates.put(voter.id(), new VoterState(voter));
}
voteStates.get(localId).setState(State.GRANTED);
}

public int localId() {
return localId;
}

public int majoritySize() {
return voteStates.size() / 2 + 1;
}

private long numGranted() {
return votersInState(State.GRANTED).count();
}

private long numUnrecorded() {
return votersInState(State.UNRECORDED).count();
this.epochElection = new EpochElection(voters.voterKeys());
epochElection.recordVote(localId, true);
}

/**
Expand All @@ -118,69 +95,27 @@ public int retries() {
return retries;
}

/**
* Check whether we have received enough votes to conclude the election and become leader.
*
* @return true if at least a majority of nodes have granted the vote
*/
public boolean isVoteGranted() {
return numGranted() >= majoritySize();
}

/**
* Check if we have received enough rejections that it is no longer possible to reach a
* majority of grants.
*
* @return true if the vote is rejected, false if the vote is already or can still be granted
*/
public boolean isVoteRejected() {
return numGranted() + numUnrecorded() < majoritySize();
@Override
public EpochElection epochElection() {
return epochElection;
}

/**
* Record a granted vote from one of the voters.
*
* @param remoteNodeId The id of the voter
* @return true if the voter had not been previously recorded
* @throws IllegalArgumentException if the remote node is not a voter or if the vote had already been
* rejected by this node
*/
@Override
public boolean recordGrantedVote(int remoteNodeId) {
VoterState voterState = voteStates.get(remoteNodeId);
if (voterState == null) {
throw new IllegalArgumentException("Attempt to grant vote to non-voter " + remoteNodeId);
} else if (voterState.state().equals(State.REJECTED)) {
if (epochElection().isRejectedVoter(remoteNodeId)) {
throw new IllegalArgumentException("Attempt to grant vote from node " + remoteNodeId +
" which previously rejected our request");
}

boolean recorded = voterState.state().equals(State.UNRECORDED);
voterState.setState(State.GRANTED);

return recorded;
return epochElection().recordVote(remoteNodeId, true);
}

/**
* Record a rejected vote from one of the voters.
*
* @param remoteNodeId The id of the voter
* @return true if the rejected vote had not been previously recorded
* @throws IllegalArgumentException if the remote node is not a voter or if the vote had already been
* granted by this node
*/
@Override
public boolean recordRejectedVote(int remoteNodeId) {
VoterState voterState = voteStates.get(remoteNodeId);
if (voterState == null) {
throw new IllegalArgumentException("Attempt to reject vote to non-voter " + remoteNodeId);
} else if (voterState.state().equals(State.GRANTED)) {
if (epochElection().isGrantedVoter(remoteNodeId)) {
throw new IllegalArgumentException("Attempt to reject vote from node " + remoteNodeId +
" which previously granted our request");
}

boolean recorded = voterState.state().equals(State.UNRECORDED);
voterState.setState(State.REJECTED);

return recorded;
return epochElection().recordVote(remoteNodeId, false);
}

/**
Expand All @@ -192,41 +127,7 @@ public void startBackingOff(long currentTimeMs, long backoffDurationMs) {
this.isBackingOff = true;
}

/**
* Get the set of voters which have not been counted as granted or rejected yet.
*
* @return The set of unrecorded voters
*/
public Set<ReplicaKey> unrecordedVoters() {
return votersInState(State.UNRECORDED).collect(Collectors.toSet());
}

/**
* Get the set of voters that have granted our vote requests.
*
* @return The set of granting voters, which should always contain the ID of the candidate
*/
public Set<Integer> grantingVoters() {
return votersInState(State.GRANTED).map(ReplicaKey::id).collect(Collectors.toSet());
}

/**
* Get the set of voters that have rejected our candidacy.
*
* @return The set of rejecting voters
*/
public Set<Integer> rejectingVoters() {
return votersInState(State.REJECTED).map(ReplicaKey::id).collect(Collectors.toSet());
}

private Stream<ReplicaKey> votersInState(State state) {
return voteStates
.values()
.stream()
.filter(voterState -> voterState.state().equals(state))
.map(VoterState::replicaKey);
}

@Override
public boolean hasElectionTimeoutExpired(long currentTimeMs) {
electionTimer.update(currentTimeMs);
return electionTimer.isExpired();
Expand All @@ -245,6 +146,7 @@ public long remainingBackoffMs(long currentTimeMs) {
return backoffTimer.remainingMs();
}

@Override
public long remainingElectionTimeMs(long currentTimeMs) {
electionTimer.update(currentTimeMs);
return electionTimer.remainingMs();
Expand All @@ -255,7 +157,7 @@ public ElectionState election() {
return ElectionState.withVotedCandidate(
epoch,
ReplicaKey.of(localId, localDirectoryId),
voteStates.keySet()
epochElection.voterIds()
);
}

Expand Down Expand Up @@ -299,13 +201,13 @@ public boolean canGrantVote(
@Override
public String toString() {
return String.format(
"CandidateState(localId=%d, localDirectoryId=%s,epoch=%d, retries=%d, voteStates=%s, " +
"CandidateState(localId=%d, localDirectoryId=%s, epoch=%d, retries=%d, epochElection=%s, " +
"highWatermark=%s, electionTimeoutMs=%d)",
localId,
localDirectoryId,
epoch,
retries,
voteStates,
epochElection(),
highWatermark,
electionTimeoutMs
);
Expand All @@ -318,31 +220,4 @@ public String name() {

@Override
public void close() {}

private static final class VoterState {
private final ReplicaKey replicaKey;
private State state = State.UNRECORDED;

private VoterState(ReplicaKey replicaKey) {
this.replicaKey = replicaKey;
}

public State state() {
return state;
}

public void setState(State state) {
this.state = state;
}

public ReplicaKey replicaKey() {
return replicaKey;
}
}

private enum State {
UNRECORDED,
GRANTED,
REJECTED
}
}
9 changes: 7 additions & 2 deletions raft/src/main/java/org/apache/kafka/raft/ElectionState.java
Original file line number Diff line number Diff line change
Expand Up @@ -185,12 +185,17 @@ public static ElectionState withVotedCandidate(int epoch, ReplicaKey votedKey, S
return new ElectionState(epoch, OptionalInt.empty(), Optional.of(votedKey), voters);
}

public static ElectionState withElectedLeader(int epoch, int leaderId, Set<Integer> voters) {
public static ElectionState withElectedLeader(
int epoch,
int leaderId,
Optional<ReplicaKey> votedKey,
Set<Integer> voters
) {
if (leaderId < 0) {
throw new IllegalArgumentException("Illegal leader Id " + leaderId + ": must be non-negative");
}

return new ElectionState(epoch, OptionalInt.of(leaderId), Optional.empty(), voters);
return new ElectionState(epoch, OptionalInt.of(leaderId), votedKey, voters);
}

public static ElectionState withUnknownLeader(int epoch, Set<Integer> voters) {
Expand Down
2 changes: 1 addition & 1 deletion raft/src/main/java/org/apache/kafka/raft/EpochState.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ default Optional<LogOffsetMetadata> highWatermark() {
* Decide whether to grant a vote to a replica.
*
* It is the responsibility of the caller to invoke
* {@link QuorumState#transitionToUnattachedVotedState(int, ReplicaKey)} if a standard vote is granted.
* {@link QuorumState#unattachedAddVotedState(int, ReplicaKey)} if a standard vote is granted.
*
* @param replicaKey the id and directory of the replica requesting the vote
* @param isLogUpToDate whether the replica's log is at least as up-to-date as receiver’s log
Expand Down
Loading
Loading