diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListener.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListener.java index dcc4821f2a8c4..f9247bb193242 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListener.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListener.java @@ -19,8 +19,11 @@ import org.apache.kafka.clients.consumer.internals.StreamsRebalanceData; import org.apache.kafka.clients.consumer.internals.StreamsRebalanceListener; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.metrics.RebalanceMetrics; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.slf4j.Logger; @@ -38,17 +41,37 @@ public class DefaultStreamsRebalanceListener implements StreamsRebalanceListener private final StreamsRebalanceData streamsRebalanceData; private final TaskManager taskManager; private final StreamThread streamThread; + private final Sensor tasksRevokedSensor; + private final Sensor tasksAssignedSensor; + private final Sensor tasksLostSensor; + private final Sensor tasksRevokedTotalSensor; + private final Sensor tasksAssignedActiveTotalSensor; + private final Sensor tasksAssignedStandbyTotalSensor; + private final Sensor tasksLostTotalSensor; public DefaultStreamsRebalanceListener(final Logger log, final Time time, final StreamsRebalanceData streamsRebalanceData, final StreamThread streamThread, - final TaskManager taskManager) { + final TaskManager taskManager, + final StreamsMetricsImpl streamsMetrics, + final String threadId) { this.log = log; this.time = time; this.streamsRebalanceData = streamsRebalanceData; this.streamThread = streamThread; this.taskManager = taskManager; + + // Create sensors for rebalance metrics + this.tasksRevokedSensor = RebalanceMetrics.tasksRevokedSensor(threadId, streamsMetrics); + this.tasksAssignedSensor = RebalanceMetrics.tasksAssignedSensor(threadId, streamsMetrics); + this.tasksLostSensor = RebalanceMetrics.tasksLostSensor(threadId, streamsMetrics); + + // Create sensors for task count metrics (DEBUG level) + this.tasksRevokedTotalSensor = RebalanceMetrics.tasksRevokedTotalSensor(threadId, streamsMetrics); + this.tasksAssignedActiveTotalSensor = RebalanceMetrics.tasksAssignedActiveTotalSensor(threadId, streamsMetrics); + this.tasksAssignedStandbyTotalSensor = RebalanceMetrics.tasksAssignedStandbyTotalSensor(threadId, streamsMetrics); + this.tasksLostTotalSensor = RebalanceMetrics.tasksLostTotalSensor(threadId, streamsMetrics); } @Override @@ -65,7 +88,10 @@ public Optional onTasksRevoked(final Set log.info("Revoking active tasks {}.", tasks); taskManager.handleRevocation(partitionsToRevoke); } finally { - log.info("partition revocation took {} ms.", time.milliseconds() - start); + final long latency = time.milliseconds() - start; + tasksRevokedSensor.record(latency); + tasksRevokedTotalSensor.record(tasks.size()); + log.info("partition revocation took {} ms for {} tasks.", latency, tasks.size()); } if (streamThread.state() != StreamThread.State.PENDING_SHUTDOWN) { streamThread.setState(StreamThread.State.PARTITIONS_REVOKED); @@ -78,6 +104,7 @@ public Optional onTasksRevoked(final Set @Override public Optional onTasksAssigned(final StreamsRebalanceData.Assignment assignment) { + final long start = time.milliseconds(); try { final Map> activeTasksWithPartitions = pairWithTopicPartitions(assignment.activeTasks().stream()); @@ -91,16 +118,35 @@ public Optional onTasksAssigned(final StreamsRebalanceData.Assignment taskManager.handleRebalanceComplete(); } catch (final Exception exception) { return Optional.of(exception); + } finally { + final long latency = time.milliseconds() - start; + tasksAssignedSensor.record(latency); + tasksAssignedActiveTotalSensor.record(assignment.activeTasks().size()); + tasksAssignedStandbyTotalSensor.record(assignment.standbyTasks().size() + assignment.warmupTasks().size()); + log.info("task assignment took {} ms for {} active tasks and {} standby tasks.", + latency, assignment.activeTasks().size(), assignment.standbyTasks().size() + assignment.warmupTasks().size()); } return Optional.empty(); } @Override public Optional onAllTasksLost() { + final long start = time.milliseconds(); + // Get current task counts before they are lost + final int activeTaskCount = taskManager.activeTaskIds().size(); + final int standbyTaskCount = taskManager.standbyTaskIds().size(); + final int totalTaskCount = activeTaskCount + standbyTaskCount; + try { taskManager.handleLostAll(); } catch (final Exception exception) { return Optional.of(exception); + } finally { + final long latency = time.milliseconds() - start; + tasksLostSensor.record(latency); + tasksLostTotalSensor.record(totalTaskCount); + log.info("handling lost all tasks took {} ms for {} tasks ({} active, {} standby).", + latency, totalTaskCount, activeTaskCount, standbyTaskCount); } return Optional.empty(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index d893220621721..7a61f175a68ca 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -1151,7 +1151,9 @@ private void subscribeConsumer() { time, streamsRebalanceData.get(), this, - taskManager + taskManager, + streamsMetrics, + getName() ) ); } else { @@ -1162,7 +1164,9 @@ private void subscribeConsumer() { time, streamsRebalanceData.get(), this, - taskManager + taskManager, + streamsMetrics, + getName() ) ); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/RebalanceMetrics.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/RebalanceMetrics.java new file mode 100644 index 0000000000000..d1f7237bd037e --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/RebalanceMetrics.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals.metrics; + +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.Sensor.RecordingLevel; + +import java.util.Map; + +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.LATENCY_SUFFIX; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.THREAD_LEVEL_GROUP; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgAndMaxToSensor; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addValueMetricToSensor; + +public class RebalanceMetrics { + private RebalanceMetrics() {} + + private static final String TASKS_REVOKED = "tasks-revoked"; + private static final String TASKS_ASSIGNED = "tasks-assigned"; + private static final String TASKS_LOST = "tasks-lost"; + + private static final String TASKS_REVOKED_AVG_LATENCY_DESCRIPTION = "The average time taken for tasks-revoked rebalance listener callback"; + private static final String TASKS_REVOKED_MAX_LATENCY_DESCRIPTION = "The max time taken for tasks-revoked rebalance listener callback"; + private static final String TASKS_ASSIGNED_AVG_LATENCY_DESCRIPTION = "The average time taken for tasks-assigned rebalance listener callback"; + private static final String TASKS_ASSIGNED_MAX_LATENCY_DESCRIPTION = "The max time taken for tasks-assigned rebalance listener callback"; + private static final String TASKS_LOST_AVG_LATENCY_DESCRIPTION = "The average time taken for tasks-lost rebalance listener callback"; + private static final String TASKS_LOST_MAX_LATENCY_DESCRIPTION = "The max time taken for tasks-lost rebalance listener callback"; + + // Task count metric descriptions + private static final String TASKS_REVOKED_TOTAL_DESCRIPTION = "The total number of tasks revoked during rebalance"; + private static final String TASKS_ASSIGNED_ACTIVE_TOTAL_DESCRIPTION = "The total number of active tasks assigned during rebalance"; + private static final String TASKS_ASSIGNED_STANDBY_TOTAL_DESCRIPTION = "The total number of standby tasks assigned during rebalance"; + private static final String TASKS_LOST_TOTAL_DESCRIPTION = "The total number of tasks lost during rebalance"; + + public static Sensor tasksRevokedSensor(final String threadId, + final StreamsMetricsImpl streamsMetrics) { + return rebalanceLatencySensor( + threadId, + TASKS_REVOKED, + TASKS_REVOKED_AVG_LATENCY_DESCRIPTION, + TASKS_REVOKED_MAX_LATENCY_DESCRIPTION, + streamsMetrics + ); + } + + public static Sensor tasksAssignedSensor(final String threadId, + final StreamsMetricsImpl streamsMetrics) { + return rebalanceLatencySensor( + threadId, + TASKS_ASSIGNED, + TASKS_ASSIGNED_AVG_LATENCY_DESCRIPTION, + TASKS_ASSIGNED_MAX_LATENCY_DESCRIPTION, + streamsMetrics + ); + } + + public static Sensor tasksLostSensor(final String threadId, + final StreamsMetricsImpl streamsMetrics) { + return rebalanceLatencySensor( + threadId, + TASKS_LOST, + TASKS_LOST_AVG_LATENCY_DESCRIPTION, + TASKS_LOST_MAX_LATENCY_DESCRIPTION, + streamsMetrics + ); + } + + private static Sensor rebalanceLatencySensor(final String threadId, + final String operation, + final String avgDescription, + final String maxDescription, + final StreamsMetricsImpl streamsMetrics) { + final Sensor sensor = streamsMetrics.threadLevelSensor(threadId, operation + LATENCY_SUFFIX, RecordingLevel.INFO); + final Map tagMap = streamsMetrics.threadLevelTagMap(threadId); + addAvgAndMaxToSensor( + sensor, + THREAD_LEVEL_GROUP, + tagMap, + operation + LATENCY_SUFFIX, + avgDescription, + maxDescription + ); + return sensor; + } + + // Task count metrics (DEBUG level) + public static Sensor tasksRevokedTotalSensor(final String threadId, + final StreamsMetricsImpl streamsMetrics) { + return taskCountSensor( + threadId, + TASKS_REVOKED + "-total", + TASKS_REVOKED_TOTAL_DESCRIPTION, + streamsMetrics + ); + } + + public static Sensor tasksAssignedActiveTotalSensor(final String threadId, + final StreamsMetricsImpl streamsMetrics) { + return taskCountSensor( + threadId, + TASKS_ASSIGNED + "-active-total", + TASKS_ASSIGNED_ACTIVE_TOTAL_DESCRIPTION, + streamsMetrics + ); + } + + public static Sensor tasksAssignedStandbyTotalSensor(final String threadId, + final StreamsMetricsImpl streamsMetrics) { + return taskCountSensor( + threadId, + TASKS_ASSIGNED + "-standby-total", + TASKS_ASSIGNED_STANDBY_TOTAL_DESCRIPTION, + streamsMetrics + ); + } + + public static Sensor tasksLostTotalSensor(final String threadId, + final StreamsMetricsImpl streamsMetrics) { + return taskCountSensor( + threadId, + TASKS_LOST + "-total", + TASKS_LOST_TOTAL_DESCRIPTION, + streamsMetrics + ); + } + + private static Sensor taskCountSensor(final String threadId, + final String metricName, + final String description, + final StreamsMetricsImpl streamsMetrics) { + final Sensor sensor = streamsMetrics.threadLevelSensor(threadId, metricName, RecordingLevel.DEBUG); + final Map tagMap = streamsMetrics.threadLevelTagMap(threadId); + addValueMetricToSensor( + sensor, + THREAD_LEVEL_GROUP, + tagMap, + metricName, + description + ); + return sensor; + } +} \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListenerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListenerTest.java index 66cb8e5185b15..c5318668cc442 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListenerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStreamsRebalanceListenerTest.java @@ -18,13 +18,18 @@ import org.apache.kafka.clients.consumer.internals.StreamsRebalanceData; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.metrics.RebalanceMetrics; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; import org.mockito.InOrder; +import org.mockito.MockedStatic; import org.slf4j.LoggerFactory; import java.util.Map; @@ -35,33 +40,64 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class DefaultStreamsRebalanceListenerTest { + private static final String THREAD_ID = "test-thread-id"; private final TaskManager taskManager = mock(TaskManager.class); private final StreamThread streamThread = mock(StreamThread.class); - private DefaultStreamsRebalanceListener defaultStreamsRebalanceListener = new DefaultStreamsRebalanceListener( - LoggerFactory.getLogger(DefaultStreamsRebalanceListener.class), - new MockTime(), - mock(StreamsRebalanceData.class), - streamThread, - taskManager - ); + private final StreamsMetricsImpl streamsMetrics = mock(StreamsMetricsImpl.class); + private final Sensor tasksRevokedSensor = mock(Sensor.class); + private final Sensor tasksAssignedSensor = mock(Sensor.class); + private final Sensor tasksLostSensor = mock(Sensor.class); + private final Sensor tasksRevokedTotalSensor = mock(Sensor.class); + private final Sensor tasksAssignedActiveTotalSensor = mock(Sensor.class); + private final Sensor tasksAssignedStandbyTotalSensor = mock(Sensor.class); + private final Sensor tasksLostTotalSensor = mock(Sensor.class); + private MockTime mockTime; + private DefaultStreamsRebalanceListener defaultStreamsRebalanceListener; + + @BeforeEach + public void setUp() { + mockTime = new MockTime(); + } private void createRebalanceListenerWithRebalanceData(final StreamsRebalanceData streamsRebalanceData) { - defaultStreamsRebalanceListener = new DefaultStreamsRebalanceListener( - LoggerFactory.getLogger(DefaultStreamsRebalanceListener.class), - new MockTime(), - streamsRebalanceData, - streamThread, - taskManager - ); + try (MockedStatic rebalanceMetricsMock = mockStatic(RebalanceMetrics.class)) { + rebalanceMetricsMock.when(() -> RebalanceMetrics.tasksRevokedSensor(anyString(), any(StreamsMetricsImpl.class))) + .thenReturn(tasksRevokedSensor); + rebalanceMetricsMock.when(() -> RebalanceMetrics.tasksAssignedSensor(anyString(), any(StreamsMetricsImpl.class))) + .thenReturn(tasksAssignedSensor); + rebalanceMetricsMock.when(() -> RebalanceMetrics.tasksLostSensor(anyString(), any(StreamsMetricsImpl.class))) + .thenReturn(tasksLostSensor); + rebalanceMetricsMock.when(() -> RebalanceMetrics.tasksRevokedTotalSensor(anyString(), any(StreamsMetricsImpl.class))) + .thenReturn(tasksRevokedTotalSensor); + rebalanceMetricsMock.when(() -> RebalanceMetrics.tasksAssignedActiveTotalSensor(anyString(), any(StreamsMetricsImpl.class))) + .thenReturn(tasksAssignedActiveTotalSensor); + rebalanceMetricsMock.when(() -> RebalanceMetrics.tasksAssignedStandbyTotalSensor(anyString(), any(StreamsMetricsImpl.class))) + .thenReturn(tasksAssignedStandbyTotalSensor); + rebalanceMetricsMock.when(() -> RebalanceMetrics.tasksLostTotalSensor(anyString(), any(StreamsMetricsImpl.class))) + .thenReturn(tasksLostTotalSensor); + + defaultStreamsRebalanceListener = new DefaultStreamsRebalanceListener( + LoggerFactory.getLogger(DefaultStreamsRebalanceListener.class), + mockTime, + streamsRebalanceData, + streamThread, + taskManager, + streamsMetrics, + THREAD_ID + ); + } } @ParameterizedTest @@ -205,4 +241,221 @@ void testOnAllTasksLostWithException() { assertEquals(exception, result.get()); verify(taskManager).handleLostAll(); } + + @Test + void testOnTasksRevokedRecordsMetrics() { + // Mock handleRevocation to simulate time passing + doAnswer(invocation -> { + mockTime.sleep(100); // Simulate task revocation taking 100ms + return null; + }).when(taskManager).handleRevocation(any()); + + createRebalanceListenerWithRebalanceData(new StreamsRebalanceData( + UUID.randomUUID(), + Optional.empty(), + Map.of( + "1", + new StreamsRebalanceData.Subtopology( + Set.of("source1"), + Set.of(), + Map.of("repartition1", new StreamsRebalanceData.TopicInfo(Optional.of(1), Optional.of((short) 1), Map.of())), + Map.of(), + Set.of() + ) + ), + Map.of() + )); + + final Optional result = defaultStreamsRebalanceListener.onTasksRevoked( + Set.of(new StreamsRebalanceData.TaskId("1", 0)) + ); + + assertTrue(result.isEmpty()); + verify(tasksRevokedSensor).record(100L); + verify(tasksRevokedTotalSensor).record(1.0); + verify(taskManager).handleRevocation( + Set.of(new TopicPartition("source1", 0), new TopicPartition("repartition1", 0)) + ); + } + + @Test + void testOnTasksAssignedRecordsMetrics() { + // Mock handleAssignment to simulate time passing + doAnswer(invocation -> { + mockTime.sleep(150); // Simulate task assignment taking 150ms + return null; + }).when(taskManager).handleAssignment(any(), any()); + + createRebalanceListenerWithRebalanceData(new StreamsRebalanceData( + UUID.randomUUID(), + Optional.empty(), + Map.of( + "1", + new StreamsRebalanceData.Subtopology( + Set.of("source1"), + Set.of(), + Map.of("repartition1", new StreamsRebalanceData.TopicInfo(Optional.of(1), Optional.of((short) 1), Map.of())), + Map.of(), + Set.of() + ) + ), + Map.of() + )); + + final Optional result = defaultStreamsRebalanceListener.onTasksAssigned( + new StreamsRebalanceData.Assignment( + Set.of(new StreamsRebalanceData.TaskId("1", 0)), + Set.of(), + Set.of() + ) + ); + + assertTrue(result.isEmpty()); + verify(tasksAssignedSensor).record(150L); + verify(tasksAssignedActiveTotalSensor).record(1.0); + verify(tasksAssignedStandbyTotalSensor).record(0.0); + verify(taskManager).handleAssignment( + Map.of(new TaskId(1, 0), Set.of(new TopicPartition("source1", 0), new TopicPartition("repartition1", 0))), + Map.of() + ); + verify(streamThread).setState(StreamThread.State.PARTITIONS_ASSIGNED); + verify(taskManager).handleRebalanceComplete(); + } + + @Test + void testOnAllTasksLostRecordsMetrics() { + // Mock handleLostAll to simulate time passing + doAnswer(invocation -> { + mockTime.sleep(200); // Simulate task lost handling taking 200ms + return null; + }).when(taskManager).handleLostAll(); + + // Mock task IDs + when(taskManager.activeTaskIds()).thenReturn(Set.of(new TaskId(0, 0), new TaskId(0, 1))); + when(taskManager.standbyTaskIds()).thenReturn(Set.of(new TaskId(1, 0))); + + createRebalanceListenerWithRebalanceData(new StreamsRebalanceData(UUID.randomUUID(), Optional.empty(), Map.of(), Map.of())); + + final Optional result = defaultStreamsRebalanceListener.onAllTasksLost(); + + assertTrue(result.isEmpty()); + verify(tasksLostSensor).record(200L); + verify(tasksLostTotalSensor).record(3.0); // 2 active + 1 standby + verify(taskManager).handleLostAll(); + } + + @Test + void testOnTasksRevokedRecordsMetricsEvenWithException() { + final Exception exception = new RuntimeException("sample exception"); + // Mock handleRevocation to first advance time, then throw exception + doAnswer(invocation -> { + mockTime.sleep(50); // Simulate some work before exception + throw exception; + }).when(taskManager).handleRevocation(any()); + + createRebalanceListenerWithRebalanceData(new StreamsRebalanceData( + UUID.randomUUID(), + Optional.empty(), + Map.of( + "1", + new StreamsRebalanceData.Subtopology( + Set.of("source1"), + Set.of(), + Map.of(), + Map.of(), + Set.of() + ) + ), + Map.of() + )); + + final Optional result = defaultStreamsRebalanceListener.onTasksRevoked( + Set.of(new StreamsRebalanceData.TaskId("1", 0)) + ); + + assertTrue(result.isPresent()); + verify(tasksRevokedSensor).record(50L); + verify(tasksRevokedTotalSensor).record(1.0); + verify(taskManager).handleRevocation(any()); + } + + @Test + void testOnTasksAssignedRecordsMetricsEvenWithException() { + final Exception exception = new RuntimeException("sample exception"); + // Mock handleAssignment to first advance time, then throw exception + doAnswer(invocation -> { + mockTime.sleep(75); // Simulate some work before exception + throw exception; + }).when(taskManager).handleAssignment(any(), any()); + + createRebalanceListenerWithRebalanceData(new StreamsRebalanceData(UUID.randomUUID(), Optional.empty(), Map.of(), Map.of())); + + final Optional result = defaultStreamsRebalanceListener.onTasksAssigned( + new StreamsRebalanceData.Assignment( + Set.of(new StreamsRebalanceData.TaskId("1", 0)), + Set.of(), + Set.of() + ) + ); + + assertTrue(result.isPresent()); + verify(tasksAssignedSensor).record(75L); + verify(tasksAssignedActiveTotalSensor).record(1.0); + verify(tasksAssignedStandbyTotalSensor).record(0.0); + verify(taskManager).handleAssignment(any(), any()); + } + + @Test + void testOnAllTasksLostRecordsMetricsEvenWithException() { + final Exception exception = new RuntimeException("sample exception"); + // Mock handleLostAll to first advance time, then throw exception + doAnswer(invocation -> { + mockTime.sleep(125); // Simulate some work before exception + throw exception; + }).when(taskManager).handleLostAll(); + + // Mock task IDs + when(taskManager.activeTaskIds()).thenReturn(Set.of(new TaskId(0, 0))); + when(taskManager.standbyTaskIds()).thenReturn(Set.of()); + + createRebalanceListenerWithRebalanceData(new StreamsRebalanceData(UUID.randomUUID(), Optional.empty(), Map.of(), Map.of())); + + final Optional result = defaultStreamsRebalanceListener.onAllTasksLost(); + + assertTrue(result.isPresent()); + verify(tasksLostSensor).record(125L); + verify(tasksLostTotalSensor).record(1.0); + verify(taskManager).handleLostAll(); + } + + @Test + void testOnTasksAssignedRecordsMetricsWithStandbyTasks() { + createRebalanceListenerWithRebalanceData(new StreamsRebalanceData( + UUID.randomUUID(), + Optional.empty(), + Map.of( + "1", + new StreamsRebalanceData.Subtopology( + Set.of("source1"), + Set.of(), + Map.of(), + Map.of(), + Set.of() + ) + ), + Map.of() + )); + + final Optional result = defaultStreamsRebalanceListener.onTasksAssigned( + new StreamsRebalanceData.Assignment( + Set.of(new StreamsRebalanceData.TaskId("1", 0), new StreamsRebalanceData.TaskId("1", 1)), + Set.of(new StreamsRebalanceData.TaskId("1", 2)), + Set.of(new StreamsRebalanceData.TaskId("1", 3)) + ) + ); + + assertTrue(result.isEmpty()); + verify(tasksAssignedActiveTotalSensor).record(2.0); + verify(tasksAssignedStandbyTotalSensor).record(2.0); // 1 standby + 1 warmup + } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/RebalanceMetricsTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/RebalanceMetricsTest.java new file mode 100644 index 0000000000000..2fa75db1434e5 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/RebalanceMetricsTest.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals.metrics; + +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.Sensor.RecordingLevel; + +import org.junit.jupiter.api.Test; +import org.mockito.MockedStatic; + +import java.util.Map; + +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.LATENCY_SUFFIX; +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.THREAD_LEVEL_GROUP; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class RebalanceMetricsTest { + + private static final String THREAD_ID = "test-thread"; + private final StreamsMetricsImpl streamsMetrics = mock(StreamsMetricsImpl.class); + private final Sensor expectedSensor = mock(Sensor.class); + private final Map tagMap = Map.of("thread-id", THREAD_ID); + + @Test + public void shouldGetTasksRevokedSensor() { + when(streamsMetrics.threadLevelSensor(THREAD_ID, "tasks-revoked" + LATENCY_SUFFIX, RecordingLevel.INFO)) + .thenReturn(expectedSensor); + when(streamsMetrics.threadLevelTagMap(THREAD_ID)).thenReturn(tagMap); + + try (MockedStatic streamsMetricsStatic = mockStatic(StreamsMetricsImpl.class)) { + final Sensor sensor = RebalanceMetrics.tasksRevokedSensor(THREAD_ID, streamsMetrics); + + assertEquals(expectedSensor, sensor); + verify(streamsMetrics).threadLevelSensor(THREAD_ID, "tasks-revoked" + LATENCY_SUFFIX, RecordingLevel.INFO); + verify(streamsMetrics).threadLevelTagMap(THREAD_ID); + + streamsMetricsStatic.verify(() -> StreamsMetricsImpl.addAvgAndMaxToSensor( + expectedSensor, + THREAD_LEVEL_GROUP, + tagMap, + "tasks-revoked" + LATENCY_SUFFIX, + "The average time taken for tasks-revoked rebalance listener callback", + "The max time taken for tasks-revoked rebalance listener callback" + )); + } + } + + @Test + public void shouldGetTasksAssignedSensor() { + when(streamsMetrics.threadLevelSensor(THREAD_ID, "tasks-assigned" + LATENCY_SUFFIX, RecordingLevel.INFO)) + .thenReturn(expectedSensor); + when(streamsMetrics.threadLevelTagMap(THREAD_ID)).thenReturn(tagMap); + + try (MockedStatic streamsMetricsStatic = mockStatic(StreamsMetricsImpl.class)) { + final Sensor sensor = RebalanceMetrics.tasksAssignedSensor(THREAD_ID, streamsMetrics); + + assertEquals(expectedSensor, sensor); + verify(streamsMetrics).threadLevelSensor(THREAD_ID, "tasks-assigned" + LATENCY_SUFFIX, RecordingLevel.INFO); + verify(streamsMetrics).threadLevelTagMap(THREAD_ID); + + streamsMetricsStatic.verify(() -> StreamsMetricsImpl.addAvgAndMaxToSensor( + expectedSensor, + THREAD_LEVEL_GROUP, + tagMap, + "tasks-assigned" + LATENCY_SUFFIX, + "The average time taken for tasks-assigned rebalance listener callback", + "The max time taken for tasks-assigned rebalance listener callback" + )); + } + } + + @Test + public void shouldGetTasksLostSensor() { + when(streamsMetrics.threadLevelSensor(THREAD_ID, "tasks-lost" + LATENCY_SUFFIX, RecordingLevel.INFO)) + .thenReturn(expectedSensor); + when(streamsMetrics.threadLevelTagMap(THREAD_ID)).thenReturn(tagMap); + + try (MockedStatic streamsMetricsStatic = mockStatic(StreamsMetricsImpl.class)) { + final Sensor sensor = RebalanceMetrics.tasksLostSensor(THREAD_ID, streamsMetrics); + + assertEquals(expectedSensor, sensor); + verify(streamsMetrics).threadLevelSensor(THREAD_ID, "tasks-lost" + LATENCY_SUFFIX, RecordingLevel.INFO); + verify(streamsMetrics).threadLevelTagMap(THREAD_ID); + + streamsMetricsStatic.verify(() -> StreamsMetricsImpl.addAvgAndMaxToSensor( + expectedSensor, + THREAD_LEVEL_GROUP, + tagMap, + "tasks-lost" + LATENCY_SUFFIX, + "The average time taken for tasks-lost rebalance listener callback", + "The max time taken for tasks-lost rebalance listener callback" + )); + } + } + + @Test + public void shouldGetTasksRevokedTotalSensor() { + when(streamsMetrics.threadLevelSensor(THREAD_ID, "tasks-revoked-total", RecordingLevel.DEBUG)) + .thenReturn(expectedSensor); + when(streamsMetrics.threadLevelTagMap(THREAD_ID)).thenReturn(tagMap); + + try (MockedStatic streamsMetricsStatic = mockStatic(StreamsMetricsImpl.class)) { + final Sensor sensor = RebalanceMetrics.tasksRevokedTotalSensor(THREAD_ID, streamsMetrics); + + assertEquals(expectedSensor, sensor); + verify(streamsMetrics).threadLevelSensor(THREAD_ID, "tasks-revoked-total", RecordingLevel.DEBUG); + verify(streamsMetrics).threadLevelTagMap(THREAD_ID); + + streamsMetricsStatic.verify(() -> StreamsMetricsImpl.addValueMetricToSensor( + expectedSensor, + THREAD_LEVEL_GROUP, + tagMap, + "tasks-revoked-total", + "The total number of tasks revoked during rebalance" + )); + } + } + + @Test + public void shouldGetTasksAssignedActiveTotalSensor() { + when(streamsMetrics.threadLevelSensor(THREAD_ID, "tasks-assigned-active-total", RecordingLevel.DEBUG)) + .thenReturn(expectedSensor); + when(streamsMetrics.threadLevelTagMap(THREAD_ID)).thenReturn(tagMap); + + try (MockedStatic streamsMetricsStatic = mockStatic(StreamsMetricsImpl.class)) { + final Sensor sensor = RebalanceMetrics.tasksAssignedActiveTotalSensor(THREAD_ID, streamsMetrics); + + assertEquals(expectedSensor, sensor); + verify(streamsMetrics).threadLevelSensor(THREAD_ID, "tasks-assigned-active-total", RecordingLevel.DEBUG); + verify(streamsMetrics).threadLevelTagMap(THREAD_ID); + + streamsMetricsStatic.verify(() -> StreamsMetricsImpl.addValueMetricToSensor( + expectedSensor, + THREAD_LEVEL_GROUP, + tagMap, + "tasks-assigned-active-total", + "The total number of active tasks assigned during rebalance" + )); + } + } + + @Test + public void shouldGetTasksAssignedStandbyTotalSensor() { + when(streamsMetrics.threadLevelSensor(THREAD_ID, "tasks-assigned-standby-total", RecordingLevel.DEBUG)) + .thenReturn(expectedSensor); + when(streamsMetrics.threadLevelTagMap(THREAD_ID)).thenReturn(tagMap); + + try (MockedStatic streamsMetricsStatic = mockStatic(StreamsMetricsImpl.class)) { + final Sensor sensor = RebalanceMetrics.tasksAssignedStandbyTotalSensor(THREAD_ID, streamsMetrics); + + assertEquals(expectedSensor, sensor); + verify(streamsMetrics).threadLevelSensor(THREAD_ID, "tasks-assigned-standby-total", RecordingLevel.DEBUG); + verify(streamsMetrics).threadLevelTagMap(THREAD_ID); + + streamsMetricsStatic.verify(() -> StreamsMetricsImpl.addValueMetricToSensor( + expectedSensor, + THREAD_LEVEL_GROUP, + tagMap, + "tasks-assigned-standby-total", + "The total number of standby tasks assigned during rebalance" + )); + } + } + + @Test + public void shouldGetTasksLostTotalSensor() { + when(streamsMetrics.threadLevelSensor(THREAD_ID, "tasks-lost-total", RecordingLevel.DEBUG)) + .thenReturn(expectedSensor); + when(streamsMetrics.threadLevelTagMap(THREAD_ID)).thenReturn(tagMap); + + try (MockedStatic streamsMetricsStatic = mockStatic(StreamsMetricsImpl.class)) { + final Sensor sensor = RebalanceMetrics.tasksLostTotalSensor(THREAD_ID, streamsMetrics); + + assertEquals(expectedSensor, sensor); + verify(streamsMetrics).threadLevelSensor(THREAD_ID, "tasks-lost-total", RecordingLevel.DEBUG); + verify(streamsMetrics).threadLevelTagMap(THREAD_ID); + + streamsMetricsStatic.verify(() -> StreamsMetricsImpl.addValueMetricToSensor( + expectedSensor, + THREAD_LEVEL_GROUP, + tagMap, + "tasks-lost-total", + "The total number of tasks lost during rebalance" + )); + } + } +} \ No newline at end of file