Skip to content

Commit 52dfe1e

Browse files
authored
MINOR: Cleanup Raft Module (#20348)
This PR aims at cleaning up the `raft` module further by getting rid of some extra code which can be replaced by `record` Reviewers: Chia-Ping Tsai <[email protected]>
1 parent 5e2f54e commit 52dfe1e

File tree

13 files changed

+131
-433
lines changed

13 files changed

+131
-433
lines changed

raft/src/main/java/org/apache/kafka/raft/DynamicVoter.java

Lines changed: 14 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -23,28 +23,25 @@
2323

2424
import java.net.InetSocketAddress;
2525
import java.util.Map;
26-
import java.util.Objects;
2726

2827
/**
2928
* The textual representation of a KIP-853 voter.
30-
*
29+
* <p>
3130
* Since this is used in command-line tools, format changes to the parsing logic require a KIP,
3231
* and should be backwards compatible.
32+
*
33+
* @param directoryId The directory ID.
34+
* @param nodeId The voter ID.
35+
* @param host The voter hostname or IP address.
36+
* @param port The voter port.
3337
*/
34-
public final class DynamicVoter {
35-
private final Uuid directoryId;
36-
private final int nodeId;
37-
private final String host;
38-
private final int port;
39-
38+
public record DynamicVoter(Uuid directoryId, int nodeId, String host, int port) {
4039
/**
4140
* Create a DynamicVoter object by parsing an input string.
4241
*
43-
* @param input The input string.
44-
*
45-
* @return The DynamicVoter object.
46-
*
47-
* @throws IllegalArgumentException If parsing fails.
42+
* @param input The input string.
43+
* @return The DynamicVoter object.
44+
* @throws IllegalArgumentException If parsing fails.
4845
*/
4946
public static DynamicVoter parse(String input) {
5047
input = input.trim();
@@ -75,7 +72,7 @@ public static DynamicVoter parse(String input) {
7572
int endBracketIndex = input.indexOf("]");
7673
if (endBracketIndex < 0) {
7774
throw new IllegalArgumentException("Hostname began with left bracket, but no right " +
78-
"bracket was found.");
75+
"bracket was found.");
7976
}
8077
host = input.substring(1, endBracketIndex);
8178
input = input.substring(endBracketIndex + 1);
@@ -115,70 +112,16 @@ public static DynamicVoter parse(String input) {
115112
return new DynamicVoter(directoryId, nodeId, host, port);
116113
}
117114

118-
/**
119-
* Create a new KIP-853 voter.
120-
*
121-
* @param directoryId The directory ID.
122-
* @param nodeId The voter ID.
123-
* @param host The voter hostname or IP address.
124-
* @param port The voter port.
125-
*/
126-
public DynamicVoter(
127-
Uuid directoryId,
128-
int nodeId,
129-
String host,
130-
int port
131-
) {
132-
this.directoryId = directoryId;
133-
this.nodeId = nodeId;
134-
this.host = host;
135-
this.port = port;
136-
}
137-
138-
public Uuid directoryId() {
139-
return directoryId;
140-
}
141-
142-
public int nodeId() {
143-
return nodeId;
144-
}
145-
146-
public String host() {
147-
return host;
148-
}
149-
150-
public int port() {
151-
return port;
152-
}
153-
154115
public VoterSet.VoterNode toVoterNode(String controllerListenerName) {
155116
ReplicaKey voterKey = ReplicaKey.of(nodeId, directoryId);
156117
Endpoints listeners = Endpoints.fromInetSocketAddresses(Map.of(
157-
ListenerName.normalised(controllerListenerName),
158-
new InetSocketAddress(host, port)));
118+
ListenerName.normalised(controllerListenerName),
119+
new InetSocketAddress(host, port)));
159120
SupportedVersionRange supportedKRaftVersion =
160-
new SupportedVersionRange((short) 0, (short) 1);
121+
new SupportedVersionRange((short) 0, (short) 1);
161122
return VoterSet.VoterNode.of(voterKey, listeners, supportedKRaftVersion);
162123
}
163124

164-
@Override
165-
public boolean equals(Object o) {
166-
if (o == null || (!(o.getClass().equals(DynamicVoter.class)))) return false;
167-
DynamicVoter other = (DynamicVoter) o;
168-
return directoryId.equals(other.directoryId) &&
169-
nodeId == other.nodeId &&
170-
host.equals(other.host) &&
171-
port == other.port;
172-
}
173-
174-
@Override
175-
public int hashCode() {
176-
return Objects.hash(directoryId,
177-
nodeId,
178-
host,
179-
port);
180-
}
181-
182125
@Override
183126
public String toString() {
184127
if (host.contains(":")) {

raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java

Lines changed: 24 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -2636,48 +2636,18 @@ private boolean handleUnexpectedError(Errors error, RaftResponse.Inbound respons
26362636
private void handleResponse(RaftResponse.Inbound response, long currentTimeMs) {
26372637
// The response epoch matches the local epoch, so we can handle the response
26382638
ApiKeys apiKey = ApiKeys.forId(response.data().apiKey());
2639-
final boolean handledSuccessfully;
2640-
2641-
switch (apiKey) {
2642-
case FETCH:
2643-
handledSuccessfully = handleFetchResponse(response, currentTimeMs);
2644-
break;
2645-
2646-
case VOTE:
2647-
handledSuccessfully = handleVoteResponse(response, currentTimeMs);
2648-
break;
2649-
2650-
case BEGIN_QUORUM_EPOCH:
2651-
handledSuccessfully = handleBeginQuorumEpochResponse(response, currentTimeMs);
2652-
break;
2653-
2654-
case END_QUORUM_EPOCH:
2655-
handledSuccessfully = handleEndQuorumEpochResponse(response, currentTimeMs);
2656-
break;
2657-
2658-
case FETCH_SNAPSHOT:
2659-
handledSuccessfully = handleFetchSnapshotResponse(response, currentTimeMs);
2660-
break;
2661-
2662-
case API_VERSIONS:
2663-
handledSuccessfully = handleApiVersionsResponse(response, currentTimeMs);
2664-
break;
2665-
2666-
case UPDATE_RAFT_VOTER:
2667-
handledSuccessfully = handleUpdateVoterResponse(response, currentTimeMs);
2668-
break;
2669-
2670-
case ADD_RAFT_VOTER:
2671-
handledSuccessfully = handleAddVoterResponse(response, currentTimeMs);
2672-
break;
2673-
2674-
case REMOVE_RAFT_VOTER:
2675-
handledSuccessfully = handleRemoveVoterResponse(response, currentTimeMs);
2676-
break;
2677-
2678-
default:
2679-
throw new IllegalArgumentException("Received unexpected response type: " + apiKey);
2680-
}
2639+
final boolean handledSuccessfully = switch (apiKey) {
2640+
case FETCH -> handleFetchResponse(response, currentTimeMs);
2641+
case VOTE -> handleVoteResponse(response, currentTimeMs);
2642+
case BEGIN_QUORUM_EPOCH -> handleBeginQuorumEpochResponse(response, currentTimeMs);
2643+
case END_QUORUM_EPOCH -> handleEndQuorumEpochResponse(response, currentTimeMs);
2644+
case FETCH_SNAPSHOT -> handleFetchSnapshotResponse(response, currentTimeMs);
2645+
case API_VERSIONS -> handleApiVersionsResponse(response, currentTimeMs);
2646+
case UPDATE_RAFT_VOTER -> handleUpdateVoterResponse(response, currentTimeMs);
2647+
case ADD_RAFT_VOTER -> handleAddVoterResponse(response, currentTimeMs);
2648+
case REMOVE_RAFT_VOTER -> handleRemoveVoterResponse(response, currentTimeMs);
2649+
default -> throw new IllegalArgumentException("Received unexpected response type: " + apiKey);
2650+
};
26812651

26822652
requestManager.onResponseResult(
26832653
response.source(),
@@ -2740,48 +2710,18 @@ private Optional<Errors> validateLeaderOnlyRequest(int requestEpoch) {
27402710

27412711
private void handleRequest(RaftRequest.Inbound request, long currentTimeMs) {
27422712
ApiKeys apiKey = ApiKeys.forId(request.data().apiKey());
2743-
final CompletableFuture<? extends ApiMessage> responseFuture;
2744-
2745-
switch (apiKey) {
2746-
case FETCH:
2747-
responseFuture = handleFetchRequest(request, currentTimeMs);
2748-
break;
2749-
2750-
case VOTE:
2751-
responseFuture = completedFuture(handleVoteRequest(request));
2752-
break;
2753-
2754-
case BEGIN_QUORUM_EPOCH:
2755-
responseFuture = completedFuture(handleBeginQuorumEpochRequest(request, currentTimeMs));
2756-
break;
2757-
2758-
case END_QUORUM_EPOCH:
2759-
responseFuture = completedFuture(handleEndQuorumEpochRequest(request, currentTimeMs));
2760-
break;
2761-
2762-
case DESCRIBE_QUORUM:
2763-
responseFuture = completedFuture(handleDescribeQuorumRequest(request, currentTimeMs));
2764-
break;
2765-
2766-
case FETCH_SNAPSHOT:
2767-
responseFuture = completedFuture(handleFetchSnapshotRequest(request, currentTimeMs));
2768-
break;
2769-
2770-
case ADD_RAFT_VOTER:
2771-
responseFuture = handleAddVoterRequest(request, currentTimeMs);
2772-
break;
2773-
2774-
case REMOVE_RAFT_VOTER:
2775-
responseFuture = handleRemoveVoterRequest(request, currentTimeMs);
2776-
break;
2777-
2778-
case UPDATE_RAFT_VOTER:
2779-
responseFuture = handleUpdateVoterRequest(request, currentTimeMs);
2780-
break;
2781-
2782-
default:
2783-
throw new IllegalArgumentException("Unexpected request type " + apiKey);
2784-
}
2713+
final CompletableFuture<? extends ApiMessage> responseFuture = switch (apiKey) {
2714+
case FETCH -> handleFetchRequest(request, currentTimeMs);
2715+
case VOTE -> completedFuture(handleVoteRequest(request));
2716+
case BEGIN_QUORUM_EPOCH -> completedFuture(handleBeginQuorumEpochRequest(request, currentTimeMs));
2717+
case END_QUORUM_EPOCH -> completedFuture(handleEndQuorumEpochRequest(request, currentTimeMs));
2718+
case DESCRIBE_QUORUM -> completedFuture(handleDescribeQuorumRequest(request, currentTimeMs));
2719+
case FETCH_SNAPSHOT -> completedFuture(handleFetchSnapshotRequest(request, currentTimeMs));
2720+
case ADD_RAFT_VOTER -> handleAddVoterRequest(request, currentTimeMs);
2721+
case REMOVE_RAFT_VOTER -> handleRemoveVoterRequest(request, currentTimeMs);
2722+
case UPDATE_RAFT_VOTER -> handleUpdateVoterRequest(request, currentTimeMs);
2723+
default -> throw new IllegalArgumentException("Unexpected request type " + apiKey);
2724+
};
27852725

27862726
responseFuture.whenComplete((response, exception) -> {
27872727
ApiMessage message = response;

raft/src/main/java/org/apache/kafka/raft/LeaderAndEpoch.java

Lines changed: 3 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -19,47 +19,14 @@
1919
import java.util.Objects;
2020
import java.util.OptionalInt;
2121

22-
public class LeaderAndEpoch {
23-
private final OptionalInt leaderId;
24-
private final int epoch;
22+
public record LeaderAndEpoch(OptionalInt leaderId, int epoch) {
2523
public static final LeaderAndEpoch UNKNOWN = new LeaderAndEpoch(OptionalInt.empty(), 0);
2624

27-
public LeaderAndEpoch(OptionalInt leaderId, int epoch) {
28-
this.leaderId = Objects.requireNonNull(leaderId);
29-
this.epoch = epoch;
30-
}
31-
32-
public OptionalInt leaderId() {
33-
return leaderId;
34-
}
35-
36-
public int epoch() {
37-
return epoch;
25+
public LeaderAndEpoch {
26+
Objects.requireNonNull(leaderId);
3827
}
3928

4029
public boolean isLeader(int nodeId) {
4130
return leaderId.isPresent() && leaderId.getAsInt() == nodeId;
4231
}
43-
44-
@Override
45-
public boolean equals(Object o) {
46-
if (this == o) return true;
47-
if (o == null || getClass() != o.getClass()) return false;
48-
LeaderAndEpoch that = (LeaderAndEpoch) o;
49-
return epoch == that.epoch &&
50-
leaderId.equals(that.leaderId);
51-
}
52-
53-
@Override
54-
public int hashCode() {
55-
return Objects.hash(leaderId, epoch);
56-
}
57-
58-
@Override
59-
public String toString() {
60-
return "LeaderAndEpoch(" +
61-
"leaderId=" + leaderId +
62-
", epoch=" + epoch +
63-
')';
64-
}
6532
}

raft/src/main/java/org/apache/kafka/raft/internals/AddVoterHandler.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -332,16 +332,16 @@ public boolean handleApiVersionsResponse(
332332
}
333333

334334
public void highWatermarkUpdated(LeaderState<?> leaderState) {
335-
leaderState.addVoterHandlerState().ifPresent(current -> {
336-
leaderState.highWatermark().ifPresent(highWatermark -> {
335+
leaderState.addVoterHandlerState().ifPresent(current ->
336+
leaderState.highWatermark().ifPresent(highWatermark ->
337337
current.lastOffset().ifPresent(lastOffset -> {
338338
if (highWatermark.offset() > lastOffset) {
339339
// VotersRecord with the added voter was committed; complete the RPC
340340
leaderState.resetAddVoterHandlerState(Errors.NONE, null, Optional.empty());
341341
}
342-
});
343-
});
344-
});
342+
})
343+
)
344+
);
345345
}
346346

347347
private ApiVersionsRequestData buildApiVersionsRequest() {

raft/src/main/java/org/apache/kafka/raft/internals/EpochElection.java

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -27,17 +27,11 @@
2727
/**
2828
* Tracks the votes cast by voters in an election held by a Nominee.
2929
*/
30-
public class EpochElection {
31-
private Map<Integer, VoterState> voterStates;
32-
30+
public record EpochElection(Map<Integer, VoterState> voterStates) {
3331
public EpochElection(Set<ReplicaKey> voters) {
34-
this.voterStates = voters.stream()
35-
.collect(
36-
Collectors.toMap(
37-
ReplicaKey::id,
38-
VoterState::new
39-
)
40-
);
32+
this(voters.stream()
33+
.collect(Collectors.toMap(ReplicaKey::id, VoterState::new))
34+
);
4135
}
4236

4337
/**
@@ -157,14 +151,6 @@ private int majoritySize() {
157151
return voterStates.size() / 2 + 1;
158152
}
159153

160-
@Override
161-
public String toString() {
162-
return String.format(
163-
"EpochElection(voterStates=%s)",
164-
voterStates
165-
);
166-
}
167-
168154
private static final class VoterState {
169155
private final ReplicaKey replicaKey;
170156
private State state = State.UNRECORDED;

raft/src/main/java/org/apache/kafka/raft/internals/RemoveVoterHandler.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ public CompletableFuture<RemoveRaftVoterResponseData> handleRemoveVoterRequest(
156156
}
157157

158158
public void highWatermarkUpdated(LeaderState<?> leaderState) {
159-
leaderState.removeVoterHandlerState().ifPresent(current -> {
159+
leaderState.removeVoterHandlerState().ifPresent(current ->
160160
leaderState.highWatermark().ifPresent(highWatermark -> {
161161
if (highWatermark.offset() > current.lastOffset()) {
162162
// VotersRecord with the removed voter was committed; complete the RPC
@@ -182,7 +182,7 @@ public void highWatermarkUpdated(LeaderState<?> leaderState) {
182182
leaderState.requestResign();
183183
}
184184
}
185-
});
186-
});
185+
})
186+
);
187187
}
188188
}

raft/src/main/java/org/apache/kafka/raft/internals/ThresholdPurgatory.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -71,14 +71,7 @@ public int numWaiting() {
7171
return thresholdMap.size();
7272
}
7373

74-
private static class ThresholdKey<T extends Comparable<T>> implements Comparable<ThresholdKey<T>> {
75-
private final long id;
76-
private final T threshold;
77-
78-
private ThresholdKey(long id, T threshold) {
79-
this.id = id;
80-
this.threshold = threshold;
81-
}
74+
private record ThresholdKey<T extends Comparable<T>>(long id, T threshold) implements Comparable<ThresholdKey<T>> {
8275

8376
@Override
8477
public int compareTo(ThresholdKey<T> o) {

0 commit comments

Comments
 (0)