Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 14 additions & 71 deletions raft/src/main/java/org/apache/kafka/raft/DynamicVoter.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,28 +23,25 @@

import java.net.InetSocketAddress;
import java.util.Map;
import java.util.Objects;

/**
* The textual representation of a KIP-853 voter.
*
* <p>
* Since this is used in command-line tools, format changes to the parsing logic require a KIP,
* and should be backwards compatible.
*
* @param directoryId The directory ID.
* @param nodeId The voter ID.
* @param host The voter hostname or IP address.
* @param port The voter port.
*/
public final class DynamicVoter {
private final Uuid directoryId;
private final int nodeId;
private final String host;
private final int port;

public record DynamicVoter(Uuid directoryId, int nodeId, String host, int port) {
/**
* Create a DynamicVoter object by parsing an input string.
*
* @param input The input string.
*
* @return The DynamicVoter object.
*
* @throws IllegalArgumentException If parsing fails.
* @param input The input string.
* @return The DynamicVoter object.
* @throws IllegalArgumentException If parsing fails.
*/
public static DynamicVoter parse(String input) {
input = input.trim();
Expand Down Expand Up @@ -75,7 +72,7 @@ public static DynamicVoter parse(String input) {
int endBracketIndex = input.indexOf("]");
if (endBracketIndex < 0) {
throw new IllegalArgumentException("Hostname began with left bracket, but no right " +
"bracket was found.");
"bracket was found.");
}
host = input.substring(1, endBracketIndex);
input = input.substring(endBracketIndex + 1);
Expand Down Expand Up @@ -115,70 +112,16 @@ public static DynamicVoter parse(String input) {
return new DynamicVoter(directoryId, nodeId, host, port);
}

/**
* Create a new KIP-853 voter.
*
* @param directoryId The directory ID.
* @param nodeId The voter ID.
* @param host The voter hostname or IP address.
* @param port The voter port.
*/
public DynamicVoter(
Uuid directoryId,
int nodeId,
String host,
int port
) {
this.directoryId = directoryId;
this.nodeId = nodeId;
this.host = host;
this.port = port;
}

public Uuid directoryId() {
return directoryId;
}

public int nodeId() {
return nodeId;
}

public String host() {
return host;
}

public int port() {
return port;
}

public VoterSet.VoterNode toVoterNode(String controllerListenerName) {
ReplicaKey voterKey = ReplicaKey.of(nodeId, directoryId);
Endpoints listeners = Endpoints.fromInetSocketAddresses(Map.of(
ListenerName.normalised(controllerListenerName),
new InetSocketAddress(host, port)));
ListenerName.normalised(controllerListenerName),
new InetSocketAddress(host, port)));
SupportedVersionRange supportedKRaftVersion =
new SupportedVersionRange((short) 0, (short) 1);
new SupportedVersionRange((short) 0, (short) 1);
return VoterSet.VoterNode.of(voterKey, listeners, supportedKRaftVersion);
}

@Override
public boolean equals(Object o) {
if (o == null || (!(o.getClass().equals(DynamicVoter.class)))) return false;
DynamicVoter other = (DynamicVoter) o;
return directoryId.equals(other.directoryId) &&
nodeId == other.nodeId &&
host.equals(other.host) &&
port == other.port;
}

@Override
public int hashCode() {
return Objects.hash(directoryId,
nodeId,
host,
port);
}

@Override
public String toString() {
if (host.contains(":")) {
Expand Down
108 changes: 24 additions & 84 deletions raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -2636,48 +2636,18 @@ private boolean handleUnexpectedError(Errors error, RaftResponse.Inbound respons
private void handleResponse(RaftResponse.Inbound response, long currentTimeMs) {
// The response epoch matches the local epoch, so we can handle the response
ApiKeys apiKey = ApiKeys.forId(response.data().apiKey());
final boolean handledSuccessfully;

switch (apiKey) {
case FETCH:
handledSuccessfully = handleFetchResponse(response, currentTimeMs);
break;

case VOTE:
handledSuccessfully = handleVoteResponse(response, currentTimeMs);
break;

case BEGIN_QUORUM_EPOCH:
handledSuccessfully = handleBeginQuorumEpochResponse(response, currentTimeMs);
break;

case END_QUORUM_EPOCH:
handledSuccessfully = handleEndQuorumEpochResponse(response, currentTimeMs);
break;

case FETCH_SNAPSHOT:
handledSuccessfully = handleFetchSnapshotResponse(response, currentTimeMs);
break;

case API_VERSIONS:
handledSuccessfully = handleApiVersionsResponse(response, currentTimeMs);
break;

case UPDATE_RAFT_VOTER:
handledSuccessfully = handleUpdateVoterResponse(response, currentTimeMs);
break;

case ADD_RAFT_VOTER:
handledSuccessfully = handleAddVoterResponse(response, currentTimeMs);
break;

case REMOVE_RAFT_VOTER:
handledSuccessfully = handleRemoveVoterResponse(response, currentTimeMs);
break;

default:
throw new IllegalArgumentException("Received unexpected response type: " + apiKey);
}
final boolean handledSuccessfully = switch (apiKey) {
case FETCH -> handleFetchResponse(response, currentTimeMs);
case VOTE -> handleVoteResponse(response, currentTimeMs);
case BEGIN_QUORUM_EPOCH -> handleBeginQuorumEpochResponse(response, currentTimeMs);
case END_QUORUM_EPOCH -> handleEndQuorumEpochResponse(response, currentTimeMs);
case FETCH_SNAPSHOT -> handleFetchSnapshotResponse(response, currentTimeMs);
case API_VERSIONS -> handleApiVersionsResponse(response, currentTimeMs);
case UPDATE_RAFT_VOTER -> handleUpdateVoterResponse(response, currentTimeMs);
case ADD_RAFT_VOTER -> handleAddVoterResponse(response, currentTimeMs);
case REMOVE_RAFT_VOTER -> handleRemoveVoterResponse(response, currentTimeMs);
default -> throw new IllegalArgumentException("Received unexpected response type: " + apiKey);
};

requestManager.onResponseResult(
response.source(),
Expand Down Expand Up @@ -2740,48 +2710,18 @@ private Optional<Errors> validateLeaderOnlyRequest(int requestEpoch) {

private void handleRequest(RaftRequest.Inbound request, long currentTimeMs) {
ApiKeys apiKey = ApiKeys.forId(request.data().apiKey());
final CompletableFuture<? extends ApiMessage> responseFuture;

switch (apiKey) {
case FETCH:
responseFuture = handleFetchRequest(request, currentTimeMs);
break;

case VOTE:
responseFuture = completedFuture(handleVoteRequest(request));
break;

case BEGIN_QUORUM_EPOCH:
responseFuture = completedFuture(handleBeginQuorumEpochRequest(request, currentTimeMs));
break;

case END_QUORUM_EPOCH:
responseFuture = completedFuture(handleEndQuorumEpochRequest(request, currentTimeMs));
break;

case DESCRIBE_QUORUM:
responseFuture = completedFuture(handleDescribeQuorumRequest(request, currentTimeMs));
break;

case FETCH_SNAPSHOT:
responseFuture = completedFuture(handleFetchSnapshotRequest(request, currentTimeMs));
break;

case ADD_RAFT_VOTER:
responseFuture = handleAddVoterRequest(request, currentTimeMs);
break;

case REMOVE_RAFT_VOTER:
responseFuture = handleRemoveVoterRequest(request, currentTimeMs);
break;

case UPDATE_RAFT_VOTER:
responseFuture = handleUpdateVoterRequest(request, currentTimeMs);
break;

default:
throw new IllegalArgumentException("Unexpected request type " + apiKey);
}
final CompletableFuture<? extends ApiMessage> responseFuture = switch (apiKey) {
case FETCH -> handleFetchRequest(request, currentTimeMs);
case VOTE -> completedFuture(handleVoteRequest(request));
case BEGIN_QUORUM_EPOCH -> completedFuture(handleBeginQuorumEpochRequest(request, currentTimeMs));
case END_QUORUM_EPOCH -> completedFuture(handleEndQuorumEpochRequest(request, currentTimeMs));
case DESCRIBE_QUORUM -> completedFuture(handleDescribeQuorumRequest(request, currentTimeMs));
case FETCH_SNAPSHOT -> completedFuture(handleFetchSnapshotRequest(request, currentTimeMs));
case ADD_RAFT_VOTER -> handleAddVoterRequest(request, currentTimeMs);
case REMOVE_RAFT_VOTER -> handleRemoveVoterRequest(request, currentTimeMs);
case UPDATE_RAFT_VOTER -> handleUpdateVoterRequest(request, currentTimeMs);
default -> throw new IllegalArgumentException("Unexpected request type " + apiKey);
};

responseFuture.whenComplete((response, exception) -> {
ApiMessage message = response;
Expand Down
39 changes: 3 additions & 36 deletions raft/src/main/java/org/apache/kafka/raft/LeaderAndEpoch.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,47 +19,14 @@
import java.util.Objects;
import java.util.OptionalInt;

public class LeaderAndEpoch {
private final OptionalInt leaderId;
private final int epoch;
public record LeaderAndEpoch(OptionalInt leaderId, int epoch) {
public static final LeaderAndEpoch UNKNOWN = new LeaderAndEpoch(OptionalInt.empty(), 0);

public LeaderAndEpoch(OptionalInt leaderId, int epoch) {
this.leaderId = Objects.requireNonNull(leaderId);
this.epoch = epoch;
}

public OptionalInt leaderId() {
return leaderId;
}

public int epoch() {
return epoch;
public LeaderAndEpoch {
Objects.requireNonNull(leaderId);
}

public boolean isLeader(int nodeId) {
return leaderId.isPresent() && leaderId.getAsInt() == nodeId;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
LeaderAndEpoch that = (LeaderAndEpoch) o;
return epoch == that.epoch &&
leaderId.equals(that.leaderId);
}

@Override
public int hashCode() {
return Objects.hash(leaderId, epoch);
}

@Override
public String toString() {
return "LeaderAndEpoch(" +
"leaderId=" + leaderId +
", epoch=" + epoch +
')';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -332,16 +332,16 @@ public boolean handleApiVersionsResponse(
}

public void highWatermarkUpdated(LeaderState<?> leaderState) {
leaderState.addVoterHandlerState().ifPresent(current -> {
leaderState.highWatermark().ifPresent(highWatermark -> {
leaderState.addVoterHandlerState().ifPresent(current ->
leaderState.highWatermark().ifPresent(highWatermark ->
current.lastOffset().ifPresent(lastOffset -> {
if (highWatermark.offset() > lastOffset) {
// VotersRecord with the added voter was committed; complete the RPC
leaderState.resetAddVoterHandlerState(Errors.NONE, null, Optional.empty());
}
});
});
});
})
)
);
}

private ApiVersionsRequestData buildApiVersionsRequest() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,11 @@
/**
* Tracks the votes cast by voters in an election held by a Nominee.
*/
public class EpochElection {
private Map<Integer, VoterState> voterStates;

public record EpochElection(Map<Integer, VoterState> voterStates) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please make voterStates immutable?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, the toString could be removed

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the voterIds could return voterStates.keySet() instead if voterStates is immutable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the toString()
For the other comment on making voterStates immutable, I am not sure if that can be done. The public constructor EpochElection is what is used by multiple classes and it takes Set<ReplicaKey> voters and generates the needed Map with VoterState inner class.

public EpochElection(Set<ReplicaKey> voters) {
this.voterStates = voters.stream()
.collect(
Collectors.toMap(
ReplicaKey::id,
VoterState::new
)
);
this(voters.stream()
.collect(Collectors.toMap(ReplicaKey::id, VoterState::new))
);
}

/**
Expand Down Expand Up @@ -157,14 +151,6 @@ private int majoritySize() {
return voterStates.size() / 2 + 1;
}

@Override
public String toString() {
return String.format(
"EpochElection(voterStates=%s)",
voterStates
);
}

private static final class VoterState {
private final ReplicaKey replicaKey;
private State state = State.UNRECORDED;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public CompletableFuture<RemoveRaftVoterResponseData> handleRemoveVoterRequest(
}

public void highWatermarkUpdated(LeaderState<?> leaderState) {
leaderState.removeVoterHandlerState().ifPresent(current -> {
leaderState.removeVoterHandlerState().ifPresent(current ->
leaderState.highWatermark().ifPresent(highWatermark -> {
if (highWatermark.offset() > current.lastOffset()) {
// VotersRecord with the removed voter was committed; complete the RPC
Expand All @@ -182,7 +182,7 @@ public void highWatermarkUpdated(LeaderState<?> leaderState) {
leaderState.requestResign();
}
}
});
});
})
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,7 @@ public int numWaiting() {
return thresholdMap.size();
}

private static class ThresholdKey<T extends Comparable<T>> implements Comparable<ThresholdKey<T>> {
private final long id;
private final T threshold;

private ThresholdKey(long id, T threshold) {
this.id = id;
this.threshold = threshold;
}
private record ThresholdKey<T extends Comparable<T>>(long id, T threshold) implements Comparable<ThresholdKey<T>> {

@Override
public int compareTo(ThresholdKey<T> o) {
Expand Down
Loading