Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
9761bfe
implment production code
TaiJuWu Mar 20, 2025
108129d
implement prod code
TaiJuWu Mar 22, 2025
c21ec89
fix compiler
TaiJuWu Mar 22, 2025
3f6ca5c
Revert "fix compiler"
TaiJuWu Mar 22, 2025
5e0ff34
Revert "implement prod code"
TaiJuWu Mar 22, 2025
0f5c308
fix test
TaiJuWu Mar 22, 2025
5063d0d
add Utils.closeQuietly
TaiJuWu Mar 22, 2025
da3e54e
add test
TaiJuWu Mar 25, 2025
5979f30
refactor
TaiJuWu Mar 25, 2025
3e7f5b2
merge two variable
TaiJuWu Mar 26, 2025
518f03a
refactor
TaiJuWu Mar 26, 2025
63eaea9
Merge branch 'trunk' into 877_log
TaiJuWu Mar 26, 2025
d4ef41d
improve test
TaiJuWu Mar 26, 2025
4f07357
apply Maison's implementation
TaiJuWu Mar 31, 2025
7a580bf
Merge branch 'trunk' into 877_log
TaiJuWu Apr 1, 2025
a9b90d2
rename and small refactor
TaiJuWu Apr 1, 2025
3d5a149
add doc
TaiJuWu Apr 3, 2025
266deb7
Merge branch 'trunk' into 877_log
TaiJuWu Apr 3, 2025
a00f3f9
address Yunyung comments
TaiJuWu Apr 3, 2025
13e8b73
address Miason comment
TaiJuWu Apr 3, 2025
31d52fe
Merge branch 'trunk' into 877_log
TaiJuWu Apr 10, 2025
aa94317
copy file from Miason PR
TaiJuWu Apr 10, 2025
f28689e
add it
TaiJuWu Apr 10, 2025
e77bee1
address Yunyung comments
TaiJuWu Apr 10, 2025
1541865
Merge branch 'trunk' into 877_log
TaiJuWu Apr 10, 2025
5c0b221
address maison comments
TaiJuWu Apr 15, 2025
cd5d51b
Merge branch 'trunk' into 877_log
TaiJuWu Apr 15, 2025
5c1cf46
Merge branch 'trunk' into 877_log
TaiJuWu Apr 16, 2025
f18e1b2
fix build
TaiJuWu Apr 16, 2025
e3eed4b
fix test
TaiJuWu Apr 17, 2025
fbfa598
Merge branch 'trunk' into 877_log
TaiJuWu Apr 17, 2025
45b6c5d
fix build
TaiJuWu Apr 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -902,6 +902,7 @@ project(':server') {
implementation project(':transaction-coordinator')
implementation project(':raft')
implementation project(':share-coordinator')
implementation project(':storage:storage-api')
implementation libs.jacksonDatabind
implementation libs.metrics
implementation libs.slf4jApi
Expand All @@ -913,6 +914,7 @@ project(':server') {
testImplementation testLog4j2Libs
testImplementation project(':test-common:test-common-internal-api')
testImplementation project(':test-common:test-common-runtime')
testImplementation project(':storage:storage-api').sourceSets.test.output

testRuntimeOnly runtimeTestLibs
}
Expand Down
44 changes: 21 additions & 23 deletions core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,6 @@ class BrokerServer(
brokerTopicStats,
logDirFailureChannel)

remoteLogManagerOpt = createRemoteLogManager()

lifecycleManager = new BrokerLifecycleManager(config,
time,
s"broker-${config.nodeId}-",
Expand Down Expand Up @@ -280,6 +278,8 @@ class BrokerServer(
withWildcardHostnamesResolved().
withEphemeralPortsCorrected(name => socketServer.boundPort(new ListenerName(name)))

remoteLogManagerOpt = createRemoteLogManager(listenerInfo)

alterPartitionManager = AlterPartitionManager(
config,
scheduler = kafkaScheduler,
Expand Down Expand Up @@ -471,23 +471,6 @@ class BrokerServer(
socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time,
config.numIoThreads, "RequestHandlerAvgIdlePercent")

// Start RemoteLogManager before initializing broker metadata publishers.
remoteLogManagerOpt.foreach { rlm =>
val listenerName = config.remoteLogManagerConfig.remoteLogMetadataManagerListenerName()
if (listenerName != null) {
val endpoint = listenerInfo.listeners().values().stream
.filter(e =>
e.listenerName().isPresent &&
ListenerName.normalised(e.listenerName().get()).equals(ListenerName.normalised(listenerName))
)
.findFirst()
.orElseThrow(() => new ConfigException(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP,
listenerName, "Should be set as a listener name within valid broker listener name list: " + listenerInfo.listeners().values()))
rlm.onEndPointCreated(endpoint)
}
rlm.startup()
}

metadataPublishers.add(new MetadataVersionConfigValidator(config, sharedServer.metadataPublishingFaultHandler))
brokerMetadataPublisher = new BrokerMetadataPublisher(config,
metadataCache,
Expand Down Expand Up @@ -712,16 +695,31 @@ class BrokerServer(
}
}

protected def createRemoteLogManager(): Option[RemoteLogManager] = {
if (config.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) {
Some(new RemoteLogManager(config.remoteLogManagerConfig, config.brokerId, config.logDirs.head, clusterId, time,
protected def createRemoteLogManager(listenerInfo: ListenerInfo): Option[RemoteLogManager] = {
if (config.remoteLogManagerConfig.isRemoteStorageSystemEnabled) {
val listenerName = config.remoteLogManagerConfig.remoteLogMetadataManagerListenerName()
val endpoint = if (listenerName != null) {
Some(listenerInfo.listeners().values().stream
.filter(e =>
e.listenerName().isPresent &&
ListenerName.normalised(e.listenerName().get()).equals(ListenerName.normalised(listenerName))
)
.findFirst()
.orElseThrow(() => new ConfigException(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP,
listenerName, "Should be set as a listener name within valid broker listener name list: " + listenerInfo.listeners().values())))
} else {
None
}

val rlm = new RemoteLogManager(config.remoteLogManagerConfig, config.brokerId, config.logDirs.head, clusterId, time,
(tp: TopicPartition) => logManager.getLog(tp).toJava,
(tp: TopicPartition, remoteLogStartOffset: java.lang.Long) => {
logManager.getLog(tp).foreach { log =>
log.updateLogStartOffsetFromRemoteTier(remoteLogStartOffset)
}
},
brokerTopicStats, metrics))
brokerTopicStats, metrics, endpoint.toJava)
Some(rlm)
} else {
None
}
Expand Down
6 changes: 4 additions & 2 deletions core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2236,7 +2236,8 @@ class UnifiedLogTest {
_ => Optional.empty[UnifiedLog](),
(_, _) => {},
brokerTopicStats,
new Metrics()))
new Metrics(),
Optional.empty))
remoteLogManager.setDelayedOperationPurgatory(purgatory)

val logConfig = LogTestUtils.createLogConfig(segmentBytes = 200, indexIntervalBytes = 1,
Expand Down Expand Up @@ -2333,7 +2334,8 @@ class UnifiedLogTest {
_ => Optional.empty[UnifiedLog](),
(_, _) => {},
brokerTopicStats,
new Metrics()))
new Metrics(),
Optional.empty))
remoteLogManager.setDelayedOperationPurgatory(purgatory)

val logConfig = LogTestUtils.createLogConfig(segmentBytes = 200, indexIntervalBytes = 1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3792,8 +3792,8 @@ class ReplicaManagerTest {
_ => Optional.of(mockLog),
(TopicPartition, Long) => {},
brokerTopicStats,
metrics)
remoteLogManager.startup()
metrics,
Optional.empty)
val spyRLM = spy(remoteLogManager)

val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, shouldMockLog = true, remoteLogManager = Some(spyRLM))
Expand Down Expand Up @@ -3903,8 +3903,8 @@ class ReplicaManagerTest {
_ => Optional.of(dummyLog),
(TopicPartition, Long) => {},
brokerTopicStats,
metrics)
remoteLogManager.startup()
metrics,
Optional.empty)
val spyRLM = spy(remoteLogManager)
val timer = new MockTimer(time)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,43 +27,41 @@
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.metadata.authorizer.StandardAuthorizer;
import org.apache.kafka.server.log.remote.storage.NoOpRemoteLogMetadataManager;
import org.apache.kafka.server.log.remote.storage.NoOpRemoteStorageManager;

import java.util.LinkedHashMap;
import java.util.Map;

import static org.apache.kafka.server.config.ReplicationConfigs.REPLICA_SELECTOR_CLASS_CONFIG;
import static org.apache.kafka.server.config.ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG;
import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP;
import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP;
import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP;
import static org.junit.jupiter.api.Assertions.assertEquals;

public class MonitorablePluginsIntegrationTest {

private static int controllerId(Type type) {
return type == Type.KRAFT ? 3000 : 0;
}

private static Map<String, String> expectedTags(String config, String clazz) {
return expectedTags(config, clazz, Map.of());
}

private static Map<String, String> expectedTags(String config, String clazz, Map<String, String> extraTags) {
Map<String, String> tags = new LinkedHashMap<>();
tags.put("config", config);
tags.put("class", clazz);
tags.putAll(extraTags);
return tags;
}

@ClusterTest(
types = {Type.KRAFT, Type.CO_KRAFT},
serverProperties = {
@ClusterConfigProperty(key = StandardAuthorizer.SUPER_USERS_CONFIG, value = "User:ANONYMOUS"),
@ClusterConfigProperty(key = AUTHORIZER_CLASS_NAME_CONFIG, value = "org.apache.kafka.metadata.authorizer.StandardAuthorizer"),
@ClusterConfigProperty(key = REPLICA_SELECTOR_CLASS_CONFIG, value = "org.apache.kafka.server.MonitorablePluginsIntegrationTest$MonitorableReplicaSelector")
@ClusterConfigProperty(key = REPLICA_SELECTOR_CLASS_CONFIG, value = "org.apache.kafka.server.MonitorablePluginsIntegrationTest$MonitorableReplicaSelector"),
@ClusterConfigProperty(key = REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, value = "true"),
@ClusterConfigProperty(key = REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP,
value = "org.apache.kafka.server.MonitorablePluginsIntegrationTest$MonitorableNoOpRemoteLogMetadataManager"),
@ClusterConfigProperty(key = REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP,
value = "org.apache.kafka.server.MonitorablePluginsIntegrationTest$MonitorableNoOpRemoteStorageManager")
}
)
public void testMonitorableServerPlugins(ClusterInstance clusterInstance) {
assertAuthorizerMetrics(clusterInstance);
assertReplicaSelectorMetrics(clusterInstance);
assertRemoteLogManagerMetrics(clusterInstance);
}

private void assertAuthorizerMetrics(ClusterInstance clusterInstance) {
Expand All @@ -78,6 +76,17 @@ private void assertAuthorizerMetrics(ClusterInstance clusterInstance) {
expectedTags(AUTHORIZER_CLASS_NAME_CONFIG, "StandardAuthorizer", Map.of("role", "controller")));
}

private void assertRemoteLogManagerMetrics(ClusterInstance clusterInstance) {
assertMetrics(
clusterInstance.brokers().get(0).metrics(),
MonitorableNoOpRemoteLogMetadataManager.METRICS_COUNT,
expectedTags(REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, MonitorableNoOpRemoteLogMetadataManager.class.getSimpleName()));
assertMetrics(
clusterInstance.brokers().get(0).metrics(),
MonitorableNoOpRemoteStorageManager.METRICS_COUNT,
expectedTags(REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, MonitorableNoOpRemoteStorageManager.class.getSimpleName()));
}

private void assertReplicaSelectorMetrics(ClusterInstance clusterInstance) {
assertMetrics(
clusterInstance.brokers().get(0).metrics(),
Expand All @@ -98,6 +107,17 @@ private void assertMetrics(Metrics metrics, int expected, Map<String, String> ex
assertEquals(expected, found);
}

public static class MonitorableNoOpRemoteLogMetadataManager extends NoOpRemoteLogMetadataManager implements Monitorable {

private static final int METRICS_COUNT = 1;

@Override
public void withPluginMetrics(PluginMetrics metrics) {
MetricName name = metrics.metricName("name", "description", Map.of());
metrics.addMetric(name, (Measurable) (config, now) -> 123);
}
}

public static class MonitorableReplicaSelector extends RackAwareReplicaSelector implements Monitorable {

private static final int METRICS_COUNT = 1;
Expand All @@ -108,4 +128,27 @@ public void withPluginMetrics(PluginMetrics metrics) {
metrics.addMetric(name, (Measurable) (config, now) -> 123);
}
}

public static class MonitorableNoOpRemoteStorageManager extends NoOpRemoteStorageManager implements Monitorable {

private static final int METRICS_COUNT = 1;

@Override
public void withPluginMetrics(PluginMetrics metrics) {
MetricName name = metrics.metricName("name", "description", Map.of());
metrics.addMetric(name, (Measurable) (config, now) -> 123);
}
}

private static Map<String, String> expectedTags(String config, String clazz) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need this method?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove it, thanks.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After merging trunk and move private method to the end, this method is back.

return expectedTags(config, clazz, Map.of());
}

private static Map<String, String> expectedTags(String config, String clazz, Map<String, String> extraTags) {
Map<String, String> tags = new LinkedHashMap<>();
tags.put("config", config);
tags.put("class", clazz);
tags.putAll(extraTags);
return tags;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@
* "cluster.id", "broker.id" and all other properties prefixed with the config: "remote.log.metadata.manager.impl.prefix"
* (default value is "rlmm.config.") are passed when {@link #configure(Map)} is invoked on this instance.
* <p>
*
* Implement {@link org.apache.kafka.common.metrics.Monitorable} to enable the manager to register metrics.
* The following tags are automatically added to all metrics registered: <code>config</code> set to
* <code>remote.log.metadata.manager.class.name</code>, and <code>class</code> set to the RemoteLogMetadataManager class name.
*/
public interface RemoteLogMetadataManager extends Configurable, Closeable {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@
* <p>
* All properties prefixed with the config: "remote.log.storage.manager.impl.prefix"
* (default value is "rsm.config.") are passed when {@link #configure(Map)} is invoked on this instance.
*
* Implement {@link org.apache.kafka.common.metrics.Monitorable} to enable the manager to register metrics.
* The following tags are automatically added to all metrics registered: <code>config</code> set to
* <code>remote.log.storage.manager.class.name</code>, and <code>class</code> set to the RemoteStorageManager class name.
*/
public interface RemoteStorageManager extends Configurable, Closeable {

Expand Down
Loading