65
65
import org .junit .jupiter .api .BeforeAll ;
66
66
import org .junit .jupiter .api .BeforeEach ;
67
67
import org .junit .jupiter .api .Tag ;
68
+ import org .junit .jupiter .api .Test ;
68
69
import org .junit .jupiter .api .TestInfo ;
69
70
import org .junit .jupiter .api .Timeout ;
70
71
import org .junit .jupiter .params .ParameterizedTest ;
@@ -174,7 +175,7 @@ public void tearDown() throws Exception {
174
175
@ ParameterizedTest
175
176
@ MethodSource ("recordingLevelParameters" )
176
177
public void shouldPushGlobalThreadMetricsToBroker (final String recordingLevel , final String groupProtocol ) throws Exception {
177
- streamsApplicationProperties = props (true , groupProtocol );
178
+ streamsApplicationProperties = props (groupProtocol );
178
179
streamsApplicationProperties .put (StreamsConfig .METRICS_RECORDING_LEVEL_CONFIG , recordingLevel );
179
180
final Topology topology = simpleTopology (true );
180
181
subscribeForStreamsMetrics ();
@@ -213,7 +214,7 @@ public void shouldPushGlobalThreadMetricsToBroker(final String recordingLevel, f
213
214
@ MethodSource ("recordingLevelParameters" )
214
215
public void shouldPushMetricsToBroker (final String recordingLevel , final String groupProtocol ) throws Exception {
215
216
// End-to-end test validating metrics pushed to broker
216
- streamsApplicationProperties = props (true , groupProtocol );
217
+ streamsApplicationProperties = props (groupProtocol );
217
218
streamsApplicationProperties .put (StreamsConfig .METRICS_RECORDING_LEVEL_CONFIG , recordingLevel );
218
219
final Topology topology = simpleTopology (false );
219
220
subscribeForStreamsMetrics ();
@@ -274,10 +275,10 @@ public void shouldPushMetricsToBroker(final String recordingLevel, final String
274
275
}
275
276
276
277
@ ParameterizedTest
277
- @ MethodSource ("singleAndMultiTaskParameters " )
278
- public void shouldPassMetrics (final String topologyType , final boolean stateUpdaterEnabled , final String groupProtocol ) throws Exception {
278
+ @ MethodSource ("topologyComplexityAndRebalanceProtocol " )
279
+ public void shouldPassMetrics (final String topologyType , final String groupProtocol ) throws Exception {
279
280
// Streams metrics should get passed to Admin and Consumer
280
- streamsApplicationProperties = props (stateUpdaterEnabled , groupProtocol );
281
+ streamsApplicationProperties = props (groupProtocol );
281
282
final Topology topology = topologyType .equals ("simple" ) ? simpleTopology (false ) : complexTopology ();
282
283
283
284
try (final KafkaStreams streams = new KafkaStreams (topology , streamsApplicationProperties )) {
@@ -303,16 +304,15 @@ public void shouldPassMetrics(final String topologyType, final boolean stateUpda
303
304
}
304
305
}
305
306
306
- @ ParameterizedTest
307
- @ MethodSource ("multiTaskParameters" )
308
- public void shouldPassCorrectMetricsDynamicInstances (final boolean stateUpdaterEnabled , final String groupProtocol ) throws Exception {
307
+ @ Test
308
+ public void shouldPassCorrectMetricsDynamicInstances () throws Exception {
309
309
// Correct streams metrics should get passed with dynamic membership
310
- streamsApplicationProperties = props (stateUpdaterEnabled , groupProtocol );
310
+ streamsApplicationProperties = props ("classic" );
311
311
streamsApplicationProperties .put (StreamsConfig .STATE_DIR_CONFIG , TestUtils .tempDirectory (appId ).getPath () + "-ks1" );
312
312
streamsApplicationProperties .put (StreamsConfig .CLIENT_ID_CONFIG , appId + "-ks1" );
313
313
314
314
315
- streamsSecondApplicationProperties = props (stateUpdaterEnabled , groupProtocol );
315
+ streamsSecondApplicationProperties = props ("classic" );
316
316
streamsSecondApplicationProperties .put (StreamsConfig .STATE_DIR_CONFIG , TestUtils .tempDirectory (appId ).getPath () + "-ks2" );
317
317
streamsSecondApplicationProperties .put (StreamsConfig .CLIENT_ID_CONFIG , appId + "-ks2" );
318
318
@@ -407,7 +407,7 @@ public void shouldPassCorrectMetricsDynamicInstances(final boolean stateUpdaterE
407
407
@ ValueSource (strings = {"classic" , "streams" })
408
408
public void passedMetricsShouldNotLeakIntoClientMetrics (final String groupProtocol ) throws Exception {
409
409
// Streams metrics should not be visible in client metrics
410
- streamsApplicationProperties = props (true , groupProtocol );
410
+ streamsApplicationProperties = props (groupProtocol );
411
411
final Topology topology = complexTopology ();
412
412
413
413
try (final KafkaStreams streams = new KafkaStreams (topology , streamsApplicationProperties )) {
@@ -444,27 +444,16 @@ private List<String> getTaskIdsAsStrings(final KafkaStreams streams) {
444
444
.toList ();
445
445
}
446
446
447
- private static Stream <Arguments > singleAndMultiTaskParameters () {
448
- return Stream .of (
449
- Arguments .of ("simple" , true , "classic" ),
450
- Arguments .of ("simple" , false , "classic" ),
451
- Arguments .of ("complex" , true , "classic" ),
452
- Arguments .of ("complex" , false , "classic" ),
453
- Arguments .of ("simple" , true , "streams" ),
454
- Arguments .of ("simple" , false , "streams" )
455
- );
456
- }
457
-
458
- private static Stream <Arguments > multiTaskParameters () {
447
+ private static Stream <Arguments > topologyComplexityAndRebalanceProtocol () {
459
448
return Stream .of (
460
- Arguments .of (true , "classic" ),
461
- Arguments .of (false , "classic" )
449
+ Arguments .of ("simple" , "classic" ),
450
+ Arguments .of ("complex" , "classic" ),
451
+ Arguments .of ("simple" , "streams" )
462
452
);
463
453
}
464
454
465
- private Properties props (final boolean stateUpdaterEnabled , final String groupProtocol ) {
455
+ private Properties props (final String groupProtocol ) {
466
456
return props (mkObjectProperties (mkMap (
467
- mkEntry (StreamsConfig .InternalConfig .STATE_UPDATER_ENABLED , stateUpdaterEnabled ),
468
457
mkEntry (StreamsConfig .GROUP_PROTOCOL_CONFIG , groupProtocol )
469
458
)));
470
459
}
0 commit comments