Skip to content

Commit 29ce961

Browse files
authored
MINOR; Revert "KAFKA-18681: Created GetReplicaLogInfo RPCs (#19664)" (#20371)
This reverts commit d86ba7f. Reverting since we are planning to change how KIP-966 is implemented. We should revert this RPC until we have more clarity on how this KIP will be executed. Reviewers: José Armando García Sancio <[email protected]>
1 parent f922ff6 commit 29ce961

File tree

14 files changed

+6
-760
lines changed

14 files changed

+6
-760
lines changed

clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,8 +134,8 @@ public enum ApiKeys {
134134
STREAMS_GROUP_DESCRIBE(ApiMessageType.STREAMS_GROUP_DESCRIBE),
135135
DESCRIBE_SHARE_GROUP_OFFSETS(ApiMessageType.DESCRIBE_SHARE_GROUP_OFFSETS),
136136
ALTER_SHARE_GROUP_OFFSETS(ApiMessageType.ALTER_SHARE_GROUP_OFFSETS),
137-
DELETE_SHARE_GROUP_OFFSETS(ApiMessageType.DELETE_SHARE_GROUP_OFFSETS),
138-
GET_REPLICA_LOG_INFO(ApiMessageType.GET_REPLICA_LOG_INFO);
137+
DELETE_SHARE_GROUP_OFFSETS(ApiMessageType.DELETE_SHARE_GROUP_OFFSETS);
138+
139139

140140
private static final Map<ApiMessageType.ListenerType, EnumSet<ApiKeys>> APIS_BY_LISTENER =
141141
new EnumMap<>(ApiMessageType.ListenerType.class);

clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -354,8 +354,6 @@ private static AbstractRequest doParseRequest(ApiKeys apiKey, short apiVersion,
354354
return AlterShareGroupOffsetsRequest.parse(readable, apiVersion);
355355
case DELETE_SHARE_GROUP_OFFSETS:
356356
return DeleteShareGroupOffsetsRequest.parse(readable, apiVersion);
357-
case GET_REPLICA_LOG_INFO:
358-
return GetReplicaLogInfoRequest.parse(readable, apiVersion);
359357
default:
360358
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " +
361359
"code should be updated to do so.", apiKey));

clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -291,8 +291,6 @@ public static AbstractResponse parseResponse(ApiKeys apiKey, Readable readable,
291291
return AlterShareGroupOffsetsResponse.parse(readable, version);
292292
case DELETE_SHARE_GROUP_OFFSETS:
293293
return DeleteShareGroupOffsetsResponse.parse(readable, version);
294-
case GET_REPLICA_LOG_INFO:
295-
return GetReplicaLogInfoResponse.parse(readable, version);
296294
default:
297295
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " +
298296
"code should be updated to do so.", apiKey));

clients/src/main/java/org/apache/kafka/common/requests/GetReplicaLogInfoRequest.java

Lines changed: 0 additions & 98 deletions
This file was deleted.

clients/src/main/java/org/apache/kafka/common/requests/GetReplicaLogInfoResponse.java

Lines changed: 0 additions & 75 deletions
This file was deleted.

clients/src/main/resources/common/message/GetReplicaLogInfoRequest.json

Lines changed: 0 additions & 32 deletions
This file was deleted.

clients/src/main/resources/common/message/GetReplicaLogInfoResponse.json

Lines changed: 0 additions & 43 deletions
This file was deleted.

clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -157,8 +157,6 @@
157157
import org.apache.kafka.common.message.FetchSnapshotRequestData;
158158
import org.apache.kafka.common.message.FetchSnapshotResponseData;
159159
import org.apache.kafka.common.message.FindCoordinatorRequestData;
160-
import org.apache.kafka.common.message.GetReplicaLogInfoRequestData;
161-
import org.apache.kafka.common.message.GetReplicaLogInfoResponseData;
162160
import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
163161
import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
164162
import org.apache.kafka.common.message.HeartbeatRequestData;
@@ -1077,7 +1075,6 @@ private AbstractRequest getRequest(ApiKeys apikey, short version) {
10771075
case DESCRIBE_SHARE_GROUP_OFFSETS: return createDescribeShareGroupOffsetsRequest(version);
10781076
case ALTER_SHARE_GROUP_OFFSETS: return createAlterShareGroupOffsetsRequest(version);
10791077
case DELETE_SHARE_GROUP_OFFSETS: return createDeleteShareGroupOffsetsRequest(version);
1080-
case GET_REPLICA_LOG_INFO: return createGetReplicaLogInfoRequest(version);
10811078
default: throw new IllegalArgumentException("Unknown API key " + apikey);
10821079
}
10831080
}
@@ -1173,25 +1170,10 @@ private AbstractResponse getResponse(ApiKeys apikey, short version) {
11731170
case DESCRIBE_SHARE_GROUP_OFFSETS: return createDescribeShareGroupOffsetsResponse();
11741171
case ALTER_SHARE_GROUP_OFFSETS: return createAlterShareGroupOffsetsResponse();
11751172
case DELETE_SHARE_GROUP_OFFSETS: return createDeleteShareGroupOffsetsResponse();
1176-
case GET_REPLICA_LOG_INFO: return createGetReplicaLogInfoResponse();
11771173
default: throw new IllegalArgumentException("Unknown API key " + apikey);
11781174
}
11791175
}
11801176

1181-
private GetReplicaLogInfoRequest createGetReplicaLogInfoRequest(short version) {
1182-
GetReplicaLogInfoRequestData data = new GetReplicaLogInfoRequestData()
1183-
.setTopicPartitions(singletonList(new GetReplicaLogInfoRequestData.TopicPartitions()
1184-
.setPartitions(singletonList(0))));
1185-
return new GetReplicaLogInfoRequest.Builder(data).build(version);
1186-
}
1187-
1188-
private GetReplicaLogInfoResponse createGetReplicaLogInfoResponse() {
1189-
GetReplicaLogInfoResponseData data = new GetReplicaLogInfoResponseData();
1190-
data.setBrokerEpoch(0);
1191-
data.setTopicPartitionLogInfoList(singletonList(new GetReplicaLogInfoResponseData.TopicPartitionLogInfo()));
1192-
return new GetReplicaLogInfoResponse(data);
1193-
}
1194-
11951177
private ConsumerGroupDescribeRequest createConsumerGroupDescribeRequest(short version) {
11961178
ConsumerGroupDescribeRequestData data = new ConsumerGroupDescribeRequestData()
11971179
.setGroupIds(Collections.singletonList("group"))

core/src/main/java/kafka/server/builders/KafkaApisBuilder.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@
4444

4545
import java.util.Map;
4646
import java.util.Optional;
47-
import java.util.function.Supplier;
4847

4948
import scala.jdk.javaapi.OptionConverters;
5049

@@ -72,7 +71,6 @@ public class KafkaApisBuilder {
7271
private ClientMetricsManager clientMetricsManager = null;
7372
private ShareCoordinator shareCoordinator = null;
7473
private GroupConfigManager groupConfigManager = null;
75-
private Supplier<Long> brokerEpochSupplier = () -> -1L;
7674

7775
public KafkaApisBuilder setRequestChannel(RequestChannel requestChannel) {
7876
this.requestChannel = requestChannel;
@@ -189,11 +187,6 @@ public KafkaApisBuilder setGroupConfigManager(GroupConfigManager groupConfigMana
189187
return this;
190188
}
191189

192-
public KafkaApisBuilder setBrokerEpochSupplier(Supplier<Long> brokerEpochSupplier) {
193-
this.brokerEpochSupplier = brokerEpochSupplier;
194-
return this;
195-
}
196-
197190
@SuppressWarnings({"CyclomaticComplexity"})
198191
public KafkaApis build() {
199192
if (requestChannel == null) throw new RuntimeException("you must set requestChannel");
@@ -237,7 +230,6 @@ public KafkaApis build() {
237230
tokenManager,
238231
apiVersionManager,
239232
clientMetricsManager,
240-
groupConfigManager,
241-
brokerEpochSupplier);
233+
groupConfigManager);
242234
}
243235
}

core/src/main/scala/kafka/server/BrokerServer.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -463,8 +463,7 @@ class BrokerServer(
463463
tokenManager = tokenManager,
464464
apiVersionManager = apiVersionManager,
465465
clientMetricsManager = clientMetricsManager,
466-
groupConfigManager = groupConfigManager,
467-
brokerEpochSupplier = () => lifecycleManager.brokerEpoch)
466+
groupConfigManager = groupConfigManager)
468467

469468
dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.nodeId,
470469
socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time,

0 commit comments

Comments
 (0)