@@ -788,19 +788,8 @@ public void shouldWriteLatestOffsetsToCheckpointOnShutdown(final boolean process
788
788
789
789
@ ParameterizedTest
790
790
@ ValueSource (booleans = {true , false })
791
- public void shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoringStateUpdaterEnabled (
791
+ public void shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoring (
792
792
final boolean processingThreadsEnabled ) throws Exception {
793
- shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoring (processingThreadsEnabled , true );
794
- }
795
-
796
- @ Test
797
- public void shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoringStateUpdaterDisabled () throws Exception {
798
- shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoring (false , false );
799
- }
800
-
801
- private void shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoring (
802
- final boolean processingThreadsEnabled ,
803
- final boolean stateUpdaterEnabled ) throws Exception {
804
793
805
794
final Properties streamsConfiguration = new Properties ();
806
795
streamsConfiguration .put (StreamsConfig .APPLICATION_ID_CONFIG , applicationId );
@@ -812,7 +801,6 @@ private void shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoring(
812
801
streamsConfiguration .put (ConsumerConfig .AUTO_OFFSET_RESET_CONFIG , "earliest" );
813
802
streamsConfiguration .put (StreamsConfig .STATE_DIR_CONFIG , TestUtils .tempDirectory (applicationId ).getPath ());
814
803
streamsConfiguration .put (InternalConfig .PROCESSING_THREADS_ENABLED , processingThreadsEnabled );
815
- streamsConfiguration .put (InternalConfig .STATE_UPDATER_ENABLED , stateUpdaterEnabled );
816
804
streamsConfiguration .put (StreamsConfig .restoreConsumerPrefix (ConsumerConfig .MAX_POLL_RECORDS_CONFIG ), 100 );
817
805
final String stateStoreName = "stateStore" ;
818
806
@@ -1223,7 +1211,6 @@ public void process(final Record<Long, Long> record) {
1223
1211
properties .put (StreamsConfig .STATESTORE_CACHE_MAX_BYTES_CONFIG , 0 );
1224
1212
properties .put (StreamsConfig .STATE_DIR_CONFIG , stateTmpDir + appDir );
1225
1213
properties .put (StreamsConfig .APPLICATION_SERVER_CONFIG , dummyHostName + ":2142" );
1226
- properties .put (InternalConfig .STATE_UPDATER_ENABLED , processingThreadsEnabled );
1227
1214
properties .put (InternalConfig .PROCESSING_THREADS_ENABLED , processingThreadsEnabled );
1228
1215
1229
1216
final Properties config = StreamsTestUtils .getStreamsConfig (
0 commit comments