Skip to content

Conversation

RaidenE1
Copy link
Contributor

@RaidenE1 RaidenE1 commented Sep 3, 2025

  • Background

    Add new metrics for TasksLost, TasksAssigned and TasksRevoked in StreamsRebalanceListener.

  • Features Implemented

    Added three rebalance latency metrics to DefaultStreamsRebalanceListener:

    • tasks-revoked-latency-avg/max - Average/max latency when revoking tasks
    • tasks-assigned-latency-avg/max - Average/max latency when assigning tasks
    • tasks-lost-latency-avg/max - Average/max latency when losing tasks

    Main Changes

    1. Created RebalanceMetrics.java
    • Follows existing Streams ThreadMetrics pattern with static factory methods
    • Provides three methods to create sensors: tasksRevokedSensor(), tasksAssignedSensor(), tasksLostSensor()
    1. Modified DefaultStreamsRebalanceListener.java
    • Added three Sensor member variables
    • Updated constructor to accept StreamsMetricsImpl and threadId parameters
    • Records execution time in onTasksRevoked(), onTasksAssigned(), and onAllTasksLost() methods
    1. Modified StreamThread.java
    • Pass streamsMetrics and getName() when creating DefaultStreamsRebalanceListener
    1. Added Comprehensive Tests
    • DefaultStreamsRebalanceListenerTest - Added 6 tests to verify metrics recording (both normal and exception cases)
    • RebalanceMetricsTest - Verifies sensor creation logic

@github-actions github-actions bot added triage PRs from the community streams labels Sep 3, 2025
Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR adds latency metrics for rebalance listener callbacks in Kafka Streams to provide better observability of task rebalancing operations. The implementation follows the existing Streams ThreadMetrics pattern and introduces three new metrics for measuring the time taken during task revocation, assignment, and loss operations.

Key changes:

  • Created RebalanceMetrics class with factory methods for creating sensors
  • Modified DefaultStreamsRebalanceListener to record latency metrics during rebalance operations
  • Updated StreamThread to pass required metrics dependencies to the rebalance listener

Reviewed Changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
RebalanceMetrics.java New metrics class providing static factory methods for creating rebalance latency sensors
DefaultStreamsRebalanceListener.java Enhanced to record latency metrics for tasks revoked, assigned, and lost operations
StreamThread.java Updated constructor calls to pass streamsMetrics and threadId to DefaultStreamsRebalanceListener
DefaultStreamsRebalanceListenerTest.java Added comprehensive tests for metrics recording in both success and exception scenarios
RebalanceMetricsTest.java Added unit tests to verify sensor creation logic and metric configuration

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

Comment on lines 95 to 97
public Optional<Exception> onTasksAssigned(final StreamsRebalanceData.Assignment assignment) {
final long start = time.milliseconds();
try {
Copy link
Preview

Copilot AI Sep 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The start time is captured outside the try block, but if an exception occurs during the method execution, the sensor will still record the latency in the finally block. This could lead to recording metrics for failed operations that didn't complete normally, potentially skewing the latency measurements.

Copilot uses AI. Check for mistakes.

Comment on lines 117 to 125
public Optional<Exception> onAllTasksLost() {
final long start = time.milliseconds();
try {
taskManager.handleLostAll();
} catch (final Exception exception) {
return Optional.of(exception);
} finally {
tasksLostSensor.record(time.milliseconds() - start);
}
Copy link
Preview

Copilot AI Sep 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to onTasksAssigned, this method records latency metrics even when exceptions occur. The finally block will execute regardless of whether the operation completed successfully or failed, which may not provide accurate latency measurements for successful operations.

Copilot uses AI. Check for mistakes.

@github-actions github-actions bot removed the triage PRs from the community label Sep 4, 2025
@lucasbru lucasbru self-assigned this Sep 8, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants