|
28 | 28 | import org.apache.kafka.common.errors.ReplicaNotAvailableException;
|
29 | 29 | import org.apache.kafka.common.metrics.KafkaMetric;
|
30 | 30 | import org.apache.kafka.common.metrics.Metrics;
|
| 31 | +import org.apache.kafka.common.metrics.Monitorable; |
| 32 | +import org.apache.kafka.common.metrics.PluginMetrics; |
31 | 33 | import org.apache.kafka.common.record.FileRecords;
|
32 | 34 | import org.apache.kafka.common.record.MemoryRecords;
|
33 | 35 | import org.apache.kafka.common.record.RecordBatch;
|
@@ -3695,9 +3697,9 @@ public void testRLMOpsWhenMetadataIsNotReady() throws InterruptedException, IOEx
|
3695 | 3697 | tp -> Optional.of(mockLog),
|
3696 | 3698 | (topicPartition, offset) -> currentLogStartOffset.set(offset),
|
3697 | 3699 | brokerTopicStats, metrics) {
|
3698 |
| - public RemoteStorageManager createRemoteStorageManager() { |
3699 |
| - return remoteStorageManager; |
3700 |
| - } |
| 3700 | +// public RemoteStorageManager createRemoteStorageManager() { |
| 3701 | +// return remoteStorageManager; |
| 3702 | +// } |
3701 | 3703 | public RemoteLogMetadataManager createRemoteLogMetadataManager() {
|
3702 | 3704 | return remoteLogMetadataManager;
|
3703 | 3705 | }
|
@@ -3741,6 +3743,39 @@ long findLogStartOffset(TopicIdPartition topicIdPartition, UnifiedLog log) {
|
3741 | 3743 | verifyNoMoreInteractions(remoteStorageManager);
|
3742 | 3744 | }
|
3743 | 3745 |
|
| 3746 | + @Test |
| 3747 | + public void testMonitorableRemoteLogStorageManager() { |
| 3748 | + Properties props = new Properties(); |
| 3749 | + props.putAll(brokerConfig); |
| 3750 | + // override common security.protocol by adding "RLMM prefix" and "remote log metadata common client prefix" |
| 3751 | + props.put(DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX + REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + "security.protocol", "SSL"); |
| 3752 | + appendRLMConfig(props); |
| 3753 | + props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, MonitorableNoOpRemoteStorageManager.class.getName()); |
| 3754 | + System.err.println("testMonitorableRemoteLogStorageManager " + props.get(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP)); |
| 3755 | + KafkaConfig config = KafkaConfig.fromProps(props); |
| 3756 | + try (RemoteLogManager remoteLogManager = new RemoteLogManager( |
| 3757 | + config.remoteLogManagerConfig(), |
| 3758 | + brokerId, |
| 3759 | + logDir, |
| 3760 | + clusterId, |
| 3761 | + time, |
| 3762 | + tp -> Optional.of(mockLog), |
| 3763 | + (topicPartition, offset) -> { }, |
| 3764 | + brokerTopicStats, |
| 3765 | + metrics) { |
| 3766 | +// public RemoteStorageManager createRemoteStorageManager() { |
| 3767 | +// return remoteStorageManager; |
| 3768 | +// } |
| 3769 | + public RemoteLogMetadataManager createRemoteLogMetadataManager() { |
| 3770 | + return remoteLogMetadataManager; |
| 3771 | + } |
| 3772 | + }) { |
| 3773 | + System.err.println("ZZZ " + ((MonitorableNoOpRemoteStorageManager) remoteStorageManager).pluginMetrics); |
| 3774 | + } catch (IOException e) { |
| 3775 | + throw new RuntimeException(e); |
| 3776 | + } |
| 3777 | + } |
| 3778 | + |
3744 | 3779 | private void appendRecordsToFile(File file, int nRecords, int nRecordsPerBatch) throws IOException {
|
3745 | 3780 | byte magic = RecordBatch.CURRENT_MAGIC_VALUE;
|
3746 | 3781 | Compression compression = Compression.NONE;
|
@@ -3785,4 +3820,22 @@ private void appendRLMConfig(Properties props) {
|
3785 | 3820 | props.put(DEFAULT_REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX + remoteLogMetadataProducerTestProp, remoteLogMetadataProducerTestVal);
|
3786 | 3821 | }
|
3787 | 3822 |
|
| 3823 | + public static class MonitorableNoOpRemoteStorageManager extends NoOpRemoteStorageManager implements Monitorable { |
| 3824 | + public boolean pluginMetrics = false; |
| 3825 | + |
| 3826 | + public MonitorableNoOpRemoteStorageManager() { } |
| 3827 | + |
| 3828 | + @Override |
| 3829 | + public void withPluginMetrics(PluginMetrics metrics) { |
| 3830 | + pluginMetrics = true; |
| 3831 | + } |
| 3832 | + } |
| 3833 | + |
| 3834 | + public static class MonitorableNoOpRemoteLogMetadataManager extends NoOpRemoteLogMetadataManager implements Monitorable { |
| 3835 | + public boolean pluginMetrics = false; |
| 3836 | + @Override |
| 3837 | + public void withPluginMetrics(PluginMetrics metrics) { |
| 3838 | + pluginMetrics = true; |
| 3839 | + } |
| 3840 | + } |
3788 | 3841 | }
|
0 commit comments