diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java index 448853c63670b..24ff8fabf0ccf 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java @@ -52,7 +52,6 @@ class ActiveTaskCreator { private final StreamsConfig applicationConfig; private final StreamsMetricsImpl streamsMetrics; private final StateDirectory stateDirectory; - private final ChangelogReader storeChangelogReader; private final ThreadCache cache; private final Time time; private final KafkaClientSupplier clientSupplier; @@ -62,7 +61,6 @@ class ActiveTaskCreator { private final Logger log; private final Sensor createTaskSensor; private final StreamsProducer streamsProducer; - private final boolean stateUpdaterEnabled; private final boolean processingThreadsEnabled; private boolean isClosed = false; @@ -70,7 +68,6 @@ class ActiveTaskCreator { final StreamsConfig applicationConfig, final StreamsMetricsImpl streamsMetrics, final StateDirectory stateDirectory, - final ChangelogReader storeChangelogReader, final ThreadCache cache, final Time time, final KafkaClientSupplier clientSupplier, @@ -78,13 +75,11 @@ class ActiveTaskCreator { final int threadIdx, final UUID processId, final LogContext logContext, - final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { this.topologyMetadata = topologyMetadata; this.applicationConfig = applicationConfig; this.streamsMetrics = streamsMetrics; this.stateDirectory = stateDirectory; - this.storeChangelogReader = storeChangelogReader; this.cache = cache; this.time = time; this.clientSupplier = clientSupplier; @@ -92,7 +87,6 @@ class ActiveTaskCreator { this.threadIdx = threadIdx; this.processId = processId; this.log = logContext.logger(getClass()); - this.stateUpdaterEnabled = stateUpdaterEnabled; this.processingThreadsEnabled = processingThreadsEnabled; createTaskSensor = ThreadMetrics.createTaskSensor(threadId, streamsMetrics); @@ -154,10 +148,8 @@ public Collection createTasks(final Consumer consumer, eosEnabled(applicationConfig), logContext, stateDirectory, - storeChangelogReader, topology.storeToChangelogTopic(), - partitions, - stateUpdaterEnabled); + partitions); final InternalProcessorContext context = new ProcessorContextImpl( taskId, diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index 3506845d288af..5f68ff4e06741 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -40,7 +40,6 @@ import java.io.File; import java.io.IOException; -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -170,7 +169,6 @@ public String toString() { private final TaskId taskId; private final boolean eosEnabled; - private ChangelogRegister changelogReader; private final Collection sourcePartitions; private final Map storeToChangelogTopic; @@ -180,7 +178,6 @@ public String toString() { private final File baseDir; private final OffsetCheckpoint checkpointFile; - private final boolean stateUpdaterEnabled; private TaskType taskType; private Logger log; @@ -202,19 +199,15 @@ public ProcessorStateManager(final TaskId taskId, final boolean eosEnabled, final LogContext logContext, final StateDirectory stateDirectory, - final ChangelogRegister changelogReader, final Map storeToChangelogTopic, - final Collection sourcePartitions, - final boolean stateUpdaterEnabled) throws ProcessorStateException { + final Collection sourcePartitions) throws ProcessorStateException { this.storeToChangelogTopic = storeToChangelogTopic; this.log = logContext.logger(ProcessorStateManager.class); this.logPrefix = logContext.logPrefix(); this.taskId = taskId; this.taskType = taskType; this.eosEnabled = eosEnabled; - this.changelogReader = changelogReader; this.sourcePartitions = sourcePartitions; - this.stateUpdaterEnabled = stateUpdaterEnabled; this.baseDir = stateDirectory.getOrCreateDirectoryForTask(taskId); this.checkpointFile = new OffsetCheckpoint(stateDirectory.checkpointFileFor(taskId)); @@ -225,16 +218,15 @@ public ProcessorStateManager(final TaskId taskId, /** * Special constructor used by {@link StateDirectory} to partially initialize startup tasks for local state, before * they're assigned to a thread. When the task is assigned to a thread, the initialization of this StateManager is - * completed in {@link #assignToStreamThread(LogContext, ChangelogRegister, Collection)}. + * completed in {@link #assignToStreamThread(LogContext, Collection)}. */ static ProcessorStateManager createStartupTaskStateManager(final TaskId taskId, final boolean eosEnabled, final LogContext logContext, final StateDirectory stateDirectory, final Map storeToChangelogTopic, - final Set sourcePartitions, - final boolean stateUpdaterEnabled) { - return new ProcessorStateManager(taskId, TaskType.STANDBY, eosEnabled, logContext, stateDirectory, null, storeToChangelogTopic, sourcePartitions, stateUpdaterEnabled); + final Set sourcePartitions) { + return new ProcessorStateManager(taskId, TaskType.STANDBY, eosEnabled, logContext, stateDirectory, storeToChangelogTopic, sourcePartitions); } /** @@ -243,26 +235,17 @@ static ProcessorStateManager createStartupTaskStateManager(final TaskId taskId, * assigned StreamThread's context. */ void assignToStreamThread(final LogContext logContext, - final ChangelogRegister changelogReader, final Collection sourcePartitions) { - if (this.changelogReader != null) { - throw new IllegalStateException("Attempted to replace an existing changelogReader on a StateManager without closing it."); - } this.sourcePartitions.clear(); this.log = logContext.logger(ProcessorStateManager.class); this.logPrefix = logContext.logPrefix(); - this.changelogReader = changelogReader; this.sourcePartitions.addAll(sourcePartitions); } void registerStateStores(final List allStores, final InternalProcessorContext processorContext) { processorContext.uninitialize(); for (final StateStore store : allStores) { - if (stores.containsKey(store.name())) { - if (!stateUpdaterEnabled) { - maybeRegisterStoreWithChangelogReader(store.name()); - } - } else { + if (!stores.containsKey(store.name())) { store.init(processorContext, store); } log.trace("Registered state store {}", store.name()); @@ -346,22 +329,6 @@ void initializeStoreOffsetsFromCheckpoint(final boolean storeDirIsEmpty) { } } - private void maybeRegisterStoreWithChangelogReader(final String storeName) { - if (isLoggingEnabled(storeName) && changelogReader != null) { - changelogReader.register(getStorePartition(storeName), this); - } - } - - private List getAllChangelogTopicPartitions() { - final List allChangelogPartitions = new ArrayList<>(); - for (final StateStoreMetadata storeMetadata : stores.values()) { - if (storeMetadata.changelogPartition != null) { - allChangelogPartitions.add(storeMetadata.changelogPartition); - } - } - return allChangelogPartitions; - } - @Override public File baseDir() { return baseDir; @@ -404,10 +371,6 @@ public void registerStore(final StateStore store, // on the state manager this state store would be closed as well stores.put(storeName, storeMetadata); - if (!stateUpdaterEnabled) { - maybeRegisterStoreWithChangelogReader(storeName); - } - log.debug("Registered state store {} to its state manager", storeName); } @@ -616,10 +579,6 @@ public void flushCache() { public void close() throws ProcessorStateException { log.debug("Closing its state manager and all the registered state stores: {}", stores); - if (!stateUpdaterEnabled && changelogReader != null) { - changelogReader.unregister(getAllChangelogTopicPartitions()); - } - RuntimeException firstException = null; // attempting to close the stores, just in case they // are not closed by a ProcessorNode yet @@ -664,11 +623,6 @@ else if (exception instanceof StreamsException) void recycle() { log.debug("Recycling state for {} task {}.", taskType, taskId); - if (!stateUpdaterEnabled && changelogReader != null) { - final List allChangelogs = getAllChangelogTopicPartitions(); - changelogReader.unregister(allChangelogs); - } - // when the state manager is recycled to be used, future writes may bypass its store's caching // layer if they are from restoration, hence we need to clear the state store's caches just in case // See KAFKA-14172 for details diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java index eb8bcafea695a..693cb4ed63ad8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java @@ -40,27 +40,21 @@ class StandbyTaskCreator { private final StreamsConfig applicationConfig; private final StreamsMetricsImpl streamsMetrics; private final StateDirectory stateDirectory; - private final ChangelogReader storeChangelogReader; private final ThreadCache dummyCache; private final Logger log; private final Sensor createTaskSensor; - private final boolean stateUpdaterEnabled; StandbyTaskCreator(final TopologyMetadata topologyMetadata, final StreamsConfig applicationConfig, final StreamsMetricsImpl streamsMetrics, final StateDirectory stateDirectory, - final ChangelogReader storeChangelogReader, final String threadId, - final LogContext logContext, - final boolean stateUpdaterEnabled) { + final LogContext logContext) { this.topologyMetadata = topologyMetadata; this.applicationConfig = applicationConfig; this.streamsMetrics = streamsMetrics; this.stateDirectory = stateDirectory; - this.storeChangelogReader = storeChangelogReader; this.log = logContext.logger(getClass()); - this.stateUpdaterEnabled = stateUpdaterEnabled; createTaskSensor = ThreadMetrics.createTaskSensor(threadId, streamsMetrics); @@ -87,10 +81,8 @@ Collection createTasks(final Map> tasksToBeCre eosEnabled(applicationConfig), getLogContext(taskId), stateDirectory, - storeChangelogReader, topology.storeToChangelogTopic(), - partitions, - stateUpdaterEnabled); + partitions); final InternalProcessorContext context = new ProcessorContextImpl( taskId, diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java index a95d20ddae0a1..28c13765ad068 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java @@ -204,7 +204,6 @@ public void initializeStartupTasks(final TopologyMetadata topologyMetadata, if (hasPersistentStores && !nonEmptyTaskDirectories.isEmpty()) { final ThreadCache dummyCache = new ThreadCache(logContext, 0, streamsMetrics); final boolean eosEnabled = StreamsConfigUtils.eosEnabled(config); - final boolean stateUpdaterEnabled = StreamsConfig.InternalConfig.stateUpdaterEnabled(config.originals()); // discover all non-empty task directories in StateDirectory for (final TaskDirectory taskDirectory : nonEmptyTaskDirectories) { @@ -226,8 +225,7 @@ public void initializeStartupTasks(final TopologyMetadata topologyMetadata, logContext, this, subTopology.storeToChangelogTopic(), - inputPartitions, - stateUpdaterEnabled + inputPartitions ); final InternalProcessorContext context = new ProcessorContextImpl( diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index d893220621721..61a6abb00e988 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -366,7 +366,6 @@ public boolean isStartingRunningOrPartitionAssigned() { private final AtomicBoolean leaveGroupRequested = new AtomicBoolean(false); private final AtomicLong lastShutdownWarningTimestamp = new AtomicLong(0L); private final boolean eosEnabled; - private final boolean stateUpdaterEnabled; private final boolean processingThreadsEnabled; private volatile long fetchDeadlineClientInstanceId = -1; @@ -393,15 +392,12 @@ public static StreamThread create(final TopologyMetadata topologyMetadata, final Runnable shutdownErrorHook, final BiConsumer streamsUncaughtExceptionHandler) { - final boolean stateUpdaterEnabled = InternalConfig.stateUpdaterEnabled(config.originals()); - final String threadId = clientId + THREAD_ID_SUBSTRING + threadIdx; final String stateUpdaterId = threadId.replace(THREAD_ID_SUBSTRING, STATE_UPDATER_ID_SUBSTRING); - final String restorationThreadId = stateUpdaterEnabled ? stateUpdaterId : threadId; final String logPrefix = String.format("stream-thread [%s] ", threadId); final LogContext logContext = new LogContext(logPrefix); - final LogContext restorationLogContext = stateUpdaterEnabled ? new LogContext(String.format("state-updater [%s] ", restorationThreadId)) : logContext; + final LogContext restorationLogContext = new LogContext(String.format("state-updater [%s] ", stateUpdaterId)); final Logger log = LoggerFactory.getLogger(StreamThread.class); final ReferenceContainer referenceContainer = new ReferenceContainer(); @@ -411,7 +407,7 @@ public static StreamThread create(final TopologyMetadata topologyMetadata, referenceContainer.clientTags = config.getClientTags(); log.info("Creating restore consumer client for thread {}", threadId); - final Map restoreConsumerConfigs = config.getRestoreConsumerConfigs(restoreConsumerClientId(restorationThreadId)); + final Map restoreConsumerConfigs = config.getRestoreConsumerConfigs(restoreConsumerClientId(stateUpdaterId)); final Consumer restoreConsumer = clientSupplier.getRestoreConsumer(restoreConsumerConfigs); final StoreChangelogReader changelogReader = new StoreChangelogReader( @@ -432,7 +428,6 @@ public static StreamThread create(final TopologyMetadata topologyMetadata, config, streamsMetrics, stateDirectory, - changelogReader, cache, time, clientSupplier, @@ -440,7 +435,6 @@ public static StreamThread create(final TopologyMetadata topologyMetadata, threadIdx, processId, logContext, - stateUpdaterEnabled, proceessingThreadsEnabled ); final StandbyTaskCreator standbyTaskCreator = new StandbyTaskCreator( @@ -448,20 +442,17 @@ public static StreamThread create(final TopologyMetadata topologyMetadata, config, streamsMetrics, stateDirectory, - changelogReader, threadId, - logContext, - stateUpdaterEnabled); + logContext); final Tasks tasks = new Tasks(logContext); final boolean processingThreadsEnabled = InternalConfig.processingThreadsEnabled(config.originals()); final DefaultTaskManager schedulingTaskManager = - maybeCreateSchedulingTaskManager(processingThreadsEnabled, stateUpdaterEnabled, topologyMetadata, time, threadId, tasks); + maybeCreateSchedulingTaskManager(processingThreadsEnabled, topologyMetadata, time, threadId, tasks); final StateUpdater stateUpdater = - maybeCreateStateUpdater( - stateUpdaterEnabled, + createStateUpdater( streamsMetrics, config, restoreConsumer, @@ -616,16 +607,11 @@ public MainConsumerSetup(final Consumer mainConsumer, } private static DefaultTaskManager maybeCreateSchedulingTaskManager(final boolean processingThreadsEnabled, - final boolean stateUpdaterEnabled, final TopologyMetadata topologyMetadata, final Time time, final String threadId, final Tasks tasks) { if (processingThreadsEnabled) { - if (!stateUpdaterEnabled) { - throw new IllegalStateException("Processing threads require the state updater to be enabled"); - } - final DefaultTaskManager defaultTaskManager = new DefaultTaskManager( time, threadId, @@ -640,8 +626,7 @@ private static DefaultTaskManager maybeCreateSchedulingTaskManager(final boolean return null; } - private static StateUpdater maybeCreateStateUpdater(final boolean stateUpdaterEnabled, - final StreamsMetricsImpl streamsMetrics, + private static StateUpdater createStateUpdater(final StreamsMetricsImpl streamsMetrics, final StreamsConfig streamsConfig, final Consumer restoreConsumer, final ChangelogReader changelogReader, @@ -649,20 +634,16 @@ private static StateUpdater maybeCreateStateUpdater(final boolean stateUpdaterEn final Time time, final String clientId, final int threadIdx) { - if (stateUpdaterEnabled) { - final String name = clientId + STATE_UPDATER_ID_SUBSTRING + threadIdx; - return new DefaultStateUpdater( - name, - streamsMetrics, - streamsConfig, - restoreConsumer, - changelogReader, - topologyMetadata, - time - ); - } else { - return null; - } + final String name = clientId + STATE_UPDATER_ID_SUBSTRING + threadIdx; + return new DefaultStateUpdater( + name, + streamsMetrics, + streamsConfig, + restoreConsumer, + changelogReader, + topologyMetadata, + time + ); } private static Optional parseHostInfo(final String endpoint) { @@ -858,7 +839,6 @@ public StreamThread(final Time time, this.numIterations = 1; this.eosEnabled = eosEnabled(config); - this.stateUpdaterEnabled = InternalConfig.stateUpdaterEnabled(config.originals()); this.processingThreadsEnabled = InternalConfig.processingThreadsEnabled(config.originals()); this.logSummaryIntervalMs = config.getLong(StreamsConfig.LOG_SUMMARY_INTERVAL_MS_CONFIG); @@ -888,9 +868,7 @@ public void run() { } boolean cleanRun = false; try { - if (stateUpdaterEnabled) { - taskManager.init(); - } + taskManager.init(); cleanRun = runLoop(); } catch (final Throwable e) { failedStreamThreadSensor.record(); @@ -1007,27 +985,6 @@ void maybeGetClientInstanceIds() { } } - - if (!stateUpdaterEnabled && !restoreConsumerInstanceIdFuture.isDone()) { - if (fetchDeadlineClientInstanceId >= time.milliseconds()) { - try { - restoreConsumerInstanceIdFuture.complete(restoreConsumer.clientInstanceId(Duration.ZERO)); - } catch (final IllegalStateException disabledError) { - // if telemetry is disabled on a client, we swallow the error, - // to allow returning a partial result for all other clients - restoreConsumerInstanceIdFuture.complete(null); - } catch (final TimeoutException swallow) { - // swallow - } catch (final Exception error) { - restoreConsumerInstanceIdFuture.completeExceptionally(error); - } - } else { - restoreConsumerInstanceIdFuture.completeExceptionally( - new TimeoutException("Could not retrieve restore consumer client instance id.") - ); - } - } - if (!producerInstanceIdFuture.isDone()) { if (fetchDeadlineClientInstanceId >= time.milliseconds()) { try { @@ -1049,13 +1006,6 @@ void maybeGetClientInstanceIds() { ); } } - - if (mainConsumerInstanceIdFuture.isDone() - && (!stateUpdaterEnabled && restoreConsumerInstanceIdFuture.isDone()) - && producerInstanceIdFuture.isDone()) { - - fetchDeadlineClientInstanceId = -1L; - } } } @@ -1215,10 +1165,6 @@ void runOnceWithoutProcessingThreads() { return; } - if (!stateUpdaterEnabled) { - initializeAndRestorePhase(); - } - // TODO: we should record the restore latency and its relative time spent ratio after // we figure out how to move this method out of the stream thread advanceNowAndComputeLatency(); @@ -1228,7 +1174,7 @@ void runOnceWithoutProcessingThreads() { long totalProcessLatency = 0L; long totalPunctuateLatency = 0L; if (state == State.RUNNING - || (stateUpdaterEnabled && isStartingRunningOrPartitionAssigned())) { + || isStartingRunningOrPartitionAssigned()) { taskManager.updateLags(); @@ -1243,9 +1189,7 @@ void runOnceWithoutProcessingThreads() { */ do { - if (stateUpdaterEnabled) { - checkStateUpdater(); - } + checkStateUpdater(); log.debug("Processing tasks with {} iterations.", numIterations); final int processed = taskManager.process(numIterations, time); @@ -1464,15 +1408,11 @@ private long pollPhase() { final ConsumerRecords records; log.debug("Invoking poll on main Consumer"); - if (state == State.PARTITIONS_ASSIGNED && !stateUpdaterEnabled) { - // try to fetch some records with zero poll millis - // to unblock the restoration as soon as possible - records = pollRequests(Duration.ZERO); - } else if (state == State.PARTITIONS_REVOKED) { + if (state == State.PARTITIONS_REVOKED) { // try to fetch some records with zero poll millis to unblock // other useful work while waiting for the join response records = pollRequests(Duration.ZERO); - } else if (state == State.RUNNING || state == State.STARTING || (state == State.PARTITIONS_ASSIGNED && stateUpdaterEnabled)) { + } else if (state == State.RUNNING || state == State.STARTING || state == State.PARTITIONS_ASSIGNED) { // try to fetch some records with normal poll time // in order to get long polling records = pollRequests(pollTime); @@ -2074,18 +2014,8 @@ public Map> clientInstanceIds(final Duration timeout) } result.put(getName() + "-consumer", mainConsumerInstanceIdFuture); - if (stateUpdaterEnabled) { - restoreConsumerInstanceIdFuture = stateUpdater.restoreConsumerInstanceId(timeout); - } else { - if (restoreConsumerInstanceIdFuture.isDone()) { - if (restoreConsumerInstanceIdFuture.isCompletedExceptionally()) { - restoreConsumerInstanceIdFuture = new KafkaFutureImpl<>(); - setDeadline = true; - } - } else { - setDeadline = true; - } - } + restoreConsumerInstanceIdFuture = stateUpdater.restoreConsumerInstanceId(timeout); + result.put(getName() + "-restore-consumer", restoreConsumerInstanceIdFuture); if (producerInstanceIdFuture.isDone()) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 67d009b037f78..adfc7c1718f64 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -345,7 +345,7 @@ private Map> assignStartupTasks(final Map inputPartitions = entry.getValue(); - task.stateManager().assignToStreamThread(new LogContext(threadLogPrefix), changelogReader, inputPartitions); + task.stateManager().assignToStreamThread(new LogContext(threadLogPrefix), inputPartitions); updateInputPartitionsOfStandbyTaskIfTheyChanged(task, inputPartitions); assignedTasks.put(task, inputPartitions); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java index 362c32592ca8e..d6670e8032154 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java @@ -68,8 +68,6 @@ public class ActiveTaskCreatorTest { private InternalTopologyBuilder builder; @Mock private StateDirectory stateDirectory; - @Mock - private ChangelogReader changeLogReader; private final MockClientSupplier mockClientSupplier = new MockClientSupplier(); private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new Metrics(), "clientId", "processId", new MockTime()); @@ -272,7 +270,6 @@ private void createTasks() { config, streamsMetrics, stateDirectory, - changeLogReader, new ThreadCache(new LogContext(), 0L, streamsMetrics), new MockTime(), mockClientSupplier, @@ -280,7 +277,6 @@ private void createTasks() { 0, uuid, new LogContext(), - false, false); assertThat( diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java index 8c28ae6a33dcb..6d8d9903e4f29 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java @@ -120,7 +120,6 @@ public class ProcessorStateManagerTest { private final byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8); private final ConsumerRecord consumerRecord = new ConsumerRecord<>(persistentStoreTopicName, 1, 100L, keyBytes, valueBytes); - private final MockChangelogReader changelogReader = new MockChangelogReader(); private final LogContext logContext = new LogContext("process-state-manager-test "); private final StateRestoreCallback noopStateRestoreCallback = (k, v) -> { }; @@ -202,14 +201,12 @@ public void shouldReportChangelogAsSource() { false, logContext, stateDirectory, - changelogReader, mkMap( mkEntry(persistentStoreName, persistentStoreTopicName), mkEntry(persistentStoreTwoName, persistentStoreTwoTopicName), mkEntry(nonPersistentStoreName, nonPersistentStoreTopicName) ), - Set.of(persistentStorePartition, nonPersistentStorePartition), - false); + Set.of(persistentStorePartition, nonPersistentStorePartition)); assertTrue(stateMgr.changelogAsSource(persistentStorePartition)); assertTrue(stateMgr.changelogAsSource(nonPersistentStorePartition)); @@ -224,12 +221,11 @@ public void shouldFindSingleStoreForChangelog() { false, logContext, stateDirectory, - changelogReader, mkMap( + mkMap( mkEntry(persistentStoreName, persistentStoreTopicName), mkEntry(persistentStoreTwoName, persistentStoreTopicName) ), - Collections.emptySet(), - false); + Collections.emptySet()); stateMgr.registerStore(persistentStore, persistentStore.stateRestoreCallback, null); stateMgr.registerStore(persistentStoreTwo, persistentStore.stateRestoreCallback, null); @@ -306,7 +302,7 @@ public void shouldRestoreTimestampedStoreWithConverter() { } @Test - public void shouldUnregisterChangelogsDuringClose() { + public void shouldCloseStateStoresOnStateManagerClose() { final ProcessorStateManager stateMgr = getStateManager(Task.TaskType.ACTIVE); final StateStore store = mock(StateStore.class); when(store.name()).thenReturn(persistentStoreName); @@ -317,16 +313,13 @@ public void shouldUnregisterChangelogsDuringClose() { verify(store).init(context, store); stateMgr.registerStore(store, noopStateRestoreCallback, null); - assertTrue(changelogReader.isPartitionRegistered(persistentStorePartition)); stateMgr.close(); verify(store).close(); - - assertFalse(changelogReader.isPartitionRegistered(persistentStorePartition)); } @Test - public void shouldRecycleStoreAndReregisterChangelog() { + public void shouldRecycleAndReinitializeStore() { final ProcessorStateManager stateMgr = getStateManager(Task.TaskType.ACTIVE); final StateStore store = mock(StateStore.class); when(store.name()).thenReturn(persistentStoreName); @@ -336,16 +329,14 @@ public void shouldRecycleStoreAndReregisterChangelog() { verify(store).init(context, store); stateMgr.registerStore(store, noopStateRestoreCallback, null); - assertTrue(changelogReader.isPartitionRegistered(persistentStorePartition)); stateMgr.recycle(); - assertFalse(changelogReader.isPartitionRegistered(persistentStorePartition)); assertThat(stateMgr.store(persistentStoreName), equalTo(store)); stateMgr.registerStateStores(singletonList(store), context); + verify(store).init(context, store); verify(context, times(2)).uninitialize(); - assertTrue(changelogReader.isPartitionRegistered(persistentStorePartition)); } @Test @@ -359,10 +350,9 @@ public void shouldClearStoreCache() { verify(store).init(context, store); stateMgr.registerStore(store, noopStateRestoreCallback, null); - assertTrue(changelogReader.isPartitionRegistered(persistentStorePartition)); + assertThat(stateMgr.store(persistentStoreName), equalTo(store)); stateMgr.recycle(); - assertFalse(changelogReader.isPartitionRegistered(persistentStorePartition)); assertThat(stateMgr.store(persistentStoreName), equalTo(store)); verify(store).clearCache(); @@ -374,7 +364,7 @@ public void shouldRegisterPersistentStores() { try { stateMgr.registerStore(persistentStore, persistentStore.stateRestoreCallback, null); - assertTrue(changelogReader.isPartitionRegistered(persistentStorePartition)); + assertEquals(persistentStore, stateMgr.store(persistentStoreName)); } finally { stateMgr.close(); } @@ -386,28 +376,7 @@ public void shouldRegisterNonPersistentStore() { try { stateMgr.registerStore(nonPersistentStore, nonPersistentStore.stateRestoreCallback, null); - assertTrue(changelogReader.isPartitionRegistered(nonPersistentStorePartition)); - } finally { - stateMgr.close(); - } - } - - @Test - public void shouldNotRegisterNonLoggedStore() { - final ProcessorStateManager stateMgr = new ProcessorStateManager( - taskId, - Task.TaskType.STANDBY, - false, - logContext, - stateDirectory, - changelogReader, - emptyMap(), - emptySet(), - false); - - try { - stateMgr.registerStore(persistentStore, persistentStore.stateRestoreCallback, null); - assertFalse(changelogReader.isPartitionRegistered(persistentStorePartition)); + assertEquals(nonPersistentStore, stateMgr.store(nonPersistentStoreName)); } finally { stateMgr.close(); } @@ -673,10 +642,8 @@ public void shouldNotWriteCheckpointForStoresWithoutChangelogTopic() throws IOEx false, logContext, stateDirectory, - changelogReader, emptyMap(), - emptySet(), - false); + emptySet()); try { stateMgr.registerStore(persistentStore, persistentStore.stateRestoreCallback, null); @@ -1230,14 +1197,12 @@ private ProcessorStateManager getStateManager(final Task.TaskType taskType, fina eosEnabled, logContext, stateDirectory, - changelogReader, mkMap( mkEntry(persistentStoreName, persistentStoreTopicName), mkEntry(persistentStoreTwoName, persistentStoreTwoTopicName), mkEntry(nonPersistentStoreName, nonPersistentStoreTopicName) ), - emptySet(), - false); + emptySet()); } private ProcessorStateManager getStateManager(final Task.TaskType taskType) { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 213ff8a5b10fb..4817347498b6f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -55,7 +55,6 @@ import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse; import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.LogCaptureAppender; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; @@ -75,7 +74,6 @@ import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.internals.ConsumedInternal; import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder; -import org.apache.kafka.streams.kstream.internals.MaterializedInternal; import org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp; import org.apache.kafka.streams.processor.PunctuationType; import org.apache.kafka.streams.processor.Punctuator; @@ -93,7 +91,6 @@ import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.Stores; -import org.apache.kafka.streams.state.internals.OffsetCheckpoint; import org.apache.kafka.test.MockApiProcessor; import org.apache.kafka.test.MockClientSupplier; import org.apache.kafka.test.MockKeyValueStoreBuilder; @@ -109,8 +106,6 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.Arguments; -import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; import org.mockito.InOrder; import org.mockito.Mock; @@ -119,8 +114,6 @@ import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; -import java.io.File; -import java.io.IOException; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; @@ -141,7 +134,6 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiConsumer; import java.util.stream.Collectors; -import java.util.stream.Stream; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; @@ -151,7 +143,6 @@ import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkProperties; import static org.apache.kafka.streams.processor.internals.ClientUtils.adminClientId; -import static org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME; import static org.apache.kafka.test.StreamsTestUtils.TaskBuilder.statelessTask; import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS; import static org.apache.kafka.test.TestUtils.waitForCondition; @@ -173,7 +164,6 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assumptions.assumeFalse; -import static org.junit.jupiter.api.Assumptions.assumeTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; @@ -226,14 +216,6 @@ public class StreamThreadTest { } }; - static Stream data() { - return Stream.of( - Arguments.of(false, false), - Arguments.of(true, false), - Arguments.of(true, true) - ); - } - @BeforeEach public void setUp() { Thread.currentThread().setName(CLIENT_ID + "-StreamThread-" + threadIdx); @@ -266,9 +248,8 @@ public void tearDown() { // task0 is unused private final TaskId task1 = new TaskId(0, 1); private final TaskId task2 = new TaskId(0, 2); - private final TaskId task3 = new TaskId(1, 1); - private Properties configProps(final boolean enableEoS, final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { + private Properties configProps(final boolean enableEoS, final boolean processingThreadsEnabled) { return mkProperties(mkMap( mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID), mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171"), @@ -278,7 +259,6 @@ private Properties configProps(final boolean enableEoS, final boolean stateUpdat mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, enableEoS ? StreamsConfig.EXACTLY_ONCE_V2 : StreamsConfig.AT_LEAST_ONCE), mkEntry(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArraySerde.class.getName()), mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArraySerde.class.getName()), - mkEntry(InternalConfig.STATE_UPDATER_ENABLED, Boolean.toString(stateUpdaterEnabled)), mkEntry(InternalConfig.PROCESSING_THREADS_ENABLED, Boolean.toString(processingThreadsEnabled)), mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "1") )); @@ -296,13 +276,13 @@ private Cluster createCluster() { ); } - private StreamThread createStreamThread(@SuppressWarnings("SameParameterValue") final String clientId, final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { - return createStreamThread(clientId, mockTime, stateUpdaterEnabled, processingThreadsEnabled); + private StreamThread createStreamThread(@SuppressWarnings("SameParameterValue") final String clientId, final boolean processingThreadsEnabled) { + return createStreamThread(clientId, mockTime, processingThreadsEnabled); } private StreamThread createStreamThread(@SuppressWarnings("SameParameterValue") final String clientId, - final Time time, final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { - final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled)); + final Time time, final boolean processingThreadsEnabled) { + final StreamsConfig config = new StreamsConfig(configProps(false, processingThreadsEnabled)); return createStreamThread(clientId, config, time); } @@ -377,9 +357,9 @@ public void onChange(final Thread thread, } @ParameterizedTest - @MethodSource("data") - public void shouldChangeStateInRebalanceListener(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { - thread = createStreamThread(CLIENT_ID, stateUpdaterEnabled, processingThreadsEnabled); + @ValueSource(booleans = {true, false}) + public void shouldChangeStateInRebalanceListener(final boolean processingThreadsEnabled) { + thread = createStreamThread(CLIENT_ID, processingThreadsEnabled); final StateListenerStub stateListener = new StateListenerStub(); thread.setStateListener(stateListener); @@ -414,9 +394,9 @@ public void shouldChangeStateInRebalanceListener(final boolean stateUpdaterEnabl } @ParameterizedTest - @MethodSource("data") - public void shouldChangeStateAtStartClose(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) throws Exception { - thread = createStreamThread(CLIENT_ID, new MockTime(1), stateUpdaterEnabled, processingThreadsEnabled); + @ValueSource(booleans = {true, false}) + public void shouldChangeStateAtStartClose(final boolean processingThreadsEnabled) throws Exception { + thread = createStreamThread(CLIENT_ID, new MockTime(1), processingThreadsEnabled); final StateListenerStub stateListener = new StateListenerStub(); thread.setStateListener(stateListener); @@ -438,9 +418,9 @@ public void shouldChangeStateAtStartClose(final boolean stateUpdaterEnabled, fin } @ParameterizedTest - @MethodSource("data") - public void shouldCreateMetricsAtStartup(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { - thread = createStreamThread(CLIENT_ID, new MockTime(1), stateUpdaterEnabled, processingThreadsEnabled); + @ValueSource(booleans = {true, false}) + public void shouldCreateMetricsAtStartup(final boolean processingThreadsEnabled) { + thread = createStreamThread(CLIENT_ID, new MockTime(1), processingThreadsEnabled); final String defaultGroupName = "stream-thread-metrics"; final Map defaultTags = Collections.singletonMap( "thread-id", @@ -538,10 +518,10 @@ public void shouldCreateMetricsAtStartup(final boolean stateUpdaterEnabled, fina } @ParameterizedTest - @MethodSource("data") - public void shouldNotCommitBeforeTheCommitInterval(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { + @ValueSource(booleans = {true, false}) + public void shouldNotCommitBeforeTheCommitInterval(final boolean processingThreadsEnabled) { final long commitInterval = 1000L; - final Properties props = configProps(false, stateUpdaterEnabled, processingThreadsEnabled); + final Properties props = configProps(false, processingThreadsEnabled); props.setProperty(StreamsConfig.STATE_DIR_CONFIG, stateDir); props.setProperty(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Long.toString(commitInterval)); @@ -565,11 +545,11 @@ public void shouldNotCommitBeforeTheCommitInterval(final boolean stateUpdaterEna } @ParameterizedTest - @MethodSource("data") - public void shouldNotPurgeBeforeThePurgeInterval(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { + @ValueSource(booleans = {true, false}) + public void shouldNotPurgeBeforeThePurgeInterval(final boolean processingThreadsEnabled) { final long commitInterval = 1000L; final long purgeInterval = 2000L; - final Properties props = configProps(false, stateUpdaterEnabled, processingThreadsEnabled); + final Properties props = configProps(false, processingThreadsEnabled); props.setProperty(StreamsConfig.STATE_DIR_CONFIG, stateDir); props.setProperty(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Long.toString(commitInterval)); props.setProperty(StreamsConfig.REPARTITION_PURGE_INTERVAL_MS_CONFIG, Long.toString(purgeInterval)); @@ -593,10 +573,10 @@ public void shouldNotPurgeBeforeThePurgeInterval(final boolean stateUpdaterEnabl } @ParameterizedTest - @MethodSource("data") - public void shouldAlsoPurgeWhenNothingGetsCommitted(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { + @ValueSource(booleans = {true, false}) + public void shouldAlsoPurgeWhenNothingGetsCommitted(final boolean processingThreadsEnabled) { final long purgeInterval = 1000L; - final Properties props = configProps(false, stateUpdaterEnabled, processingThreadsEnabled); + final Properties props = configProps(false, processingThreadsEnabled); props.setProperty(StreamsConfig.STATE_DIR_CONFIG, stateDir); props.setProperty(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Long.toString(purgeInterval)); props.setProperty(StreamsConfig.REPARTITION_PURGE_INTERVAL_MS_CONFIG, Long.toString(purgeInterval)); @@ -625,11 +605,11 @@ public void shouldAlsoPurgeWhenNothingGetsCommitted(final boolean stateUpdaterEn } @ParameterizedTest - @MethodSource("data") - public void shouldNotProcessWhenPartitionRevoked(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { + @ValueSource(booleans = {true, false}) + public void shouldNotProcessWhenPartitionRevoked(final boolean processingThreadsEnabled) { assumeFalse(processingThreadsEnabled); - final Properties props = configProps(false, stateUpdaterEnabled, processingThreadsEnabled); + final Properties props = configProps(false, processingThreadsEnabled); final StreamsConfig config = new StreamsConfig(props); when(mainConsumer.poll(Mockito.any())).thenReturn(ConsumerRecords.empty()); @@ -649,10 +629,10 @@ public void shouldNotProcessWhenPartitionRevoked(final boolean stateUpdaterEnabl } @ParameterizedTest - @MethodSource("data") - public void shouldProcessWhenRunning(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { + @ValueSource(booleans = {true, false}) + public void shouldProcessWhenRunning(final boolean processingThreadsEnabled) { assumeFalse(processingThreadsEnabled); - final Properties props = configProps(false, stateUpdaterEnabled, processingThreadsEnabled); + final Properties props = configProps(false, processingThreadsEnabled); final StreamsConfig config = new StreamsConfig(props); when(mainConsumer.poll(Mockito.any())).thenReturn(ConsumerRecords.empty()); @@ -674,11 +654,10 @@ public void shouldProcessWhenRunning(final boolean stateUpdaterEnabled, final bo } @ParameterizedTest - @MethodSource("data") - public void shouldProcessWhenPartitionAssigned(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { - assumeTrue(stateUpdaterEnabled); + @ValueSource(booleans = {true, false}) + public void shouldProcessWhenPartitionAssigned(final boolean processingThreadsEnabled) { assumeFalse(processingThreadsEnabled); - final Properties props = configProps(false, stateUpdaterEnabled, processingThreadsEnabled); + final Properties props = configProps(false, processingThreadsEnabled); final StreamsConfig config = new StreamsConfig(props); when(mainConsumer.poll(Mockito.any())).thenReturn(ConsumerRecords.empty()); @@ -699,12 +678,10 @@ public void shouldProcessWhenPartitionAssigned(final boolean stateUpdaterEnabled } @ParameterizedTest - @MethodSource("data") - public void shouldProcessWhenStarting(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { - assumeTrue(stateUpdaterEnabled); + @ValueSource(booleans = {true, false}) + public void shouldProcessWhenStarting(final boolean processingThreadsEnabled) { assumeFalse(processingThreadsEnabled); - final Properties props = configProps(false, stateUpdaterEnabled, processingThreadsEnabled); - props.setProperty(InternalConfig.STATE_UPDATER_ENABLED, Boolean.toString(true)); + final Properties props = configProps(false, processingThreadsEnabled); final StreamsConfig config = new StreamsConfig(props); when(mainConsumer.poll(Mockito.any())).thenReturn(ConsumerRecords.empty()); @@ -724,10 +701,10 @@ public void shouldProcessWhenStarting(final boolean stateUpdaterEnabled, final b } @ParameterizedTest - @MethodSource("data") - public void shouldEnforceRebalanceWhenScheduledAndNotCurrentlyRebalancing(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) throws InterruptedException { + @ValueSource(booleans = {true, false}) + public void shouldEnforceRebalanceWhenScheduledAndNotCurrentlyRebalancing(final boolean processingThreadsEnabled) throws InterruptedException { final Time mockTime = new MockTime(1); - final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled)); + final StreamsConfig config = new StreamsConfig(configProps(false, processingThreadsEnabled)); final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl( metrics, APPLICATION_ID, @@ -789,10 +766,10 @@ public void shouldEnforceRebalanceWhenScheduledAndNotCurrentlyRebalancing(final } @ParameterizedTest - @MethodSource("data") - public void shouldNotEnforceRebalanceWhenCurrentlyRebalancing(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) throws InterruptedException { + @ValueSource(booleans = {true, false}) + public void shouldNotEnforceRebalanceWhenCurrentlyRebalancing(final boolean processingThreadsEnabled) throws InterruptedException { final Time mockTime = new MockTime(1); - final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled)); + final StreamsConfig config = new StreamsConfig(configProps(false, processingThreadsEnabled)); final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl( metrics, APPLICATION_ID, @@ -885,8 +862,8 @@ AtomicLong nextRebalanceMs() { } @ParameterizedTest - @MethodSource("data") - public void shouldRespectNumIterationsInMainLoopWithoutProcessingThreads(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { + @ValueSource(booleans = {true, false}) + public void shouldRespectNumIterationsInMainLoopWithoutProcessingThreads(final boolean processingThreadsEnabled) { // With processing threads, there is no guarantee how many iterations will be performed assumeFalse(processingThreadsEnabled); @@ -908,7 +885,6 @@ public void shouldRespectNumIterationsInMainLoopWithoutProcessingThreads(final b ); final Properties properties = new Properties(); - properties.put(InternalConfig.STATE_UPDATER_ENABLED, stateUpdaterEnabled); properties.put(InternalConfig.PROCESSING_THREADS_ENABLED, processingThreadsEnabled); properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L); final StreamsConfig config = new StreamsConfig(StreamsTestUtils.getStreamsConfig(APPLICATION_ID, @@ -1014,10 +990,10 @@ public void shouldRespectNumIterationsInMainLoopWithoutProcessingThreads(final b } @ParameterizedTest - @MethodSource("data") - public void shouldNotCauseExceptionIfNothingCommitted(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { + @ValueSource(booleans = {true, false}) + public void shouldNotCauseExceptionIfNothingCommitted(final boolean processingThreadsEnabled) { final long commitInterval = 1000L; - final Properties props = configProps(false, stateUpdaterEnabled, processingThreadsEnabled); + final Properties props = configProps(false, processingThreadsEnabled); props.setProperty(StreamsConfig.STATE_DIR_CONFIG, stateDir); props.setProperty(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Long.toString(commitInterval)); @@ -1043,12 +1019,12 @@ public void shouldNotCauseExceptionIfNothingCommitted(final boolean stateUpdater } @ParameterizedTest - @MethodSource("data") - public void shouldCommitAfterCommitInterval(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { + @ValueSource(booleans = {true, false}) + public void shouldCommitAfterCommitInterval(final boolean processingThreadsEnabled) { final long commitInterval = 100L; final long commitLatency = 10L; - final Properties props = configProps(false, stateUpdaterEnabled, processingThreadsEnabled); + final Properties props = configProps(false, processingThreadsEnabled); props.setProperty(StreamsConfig.STATE_DIR_CONFIG, stateDir); props.setProperty(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Long.toString(commitInterval)); @@ -1104,12 +1080,12 @@ int commit(final Collection tasksToCommit) { } @ParameterizedTest - @MethodSource("data") - public void shouldPurgeAfterPurgeInterval(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { + @ValueSource(booleans = {true, false}) + public void shouldPurgeAfterPurgeInterval(final boolean processingThreadsEnabled) { final long commitInterval = 100L; final long purgeInterval = 200L; - final Properties props = configProps(false, stateUpdaterEnabled, processingThreadsEnabled); + final Properties props = configProps(false, processingThreadsEnabled); props.setProperty(StreamsConfig.STATE_DIR_CONFIG, stateDir); props.setProperty(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Long.toString(commitInterval)); props.setProperty(StreamsConfig.REPARTITION_PURGE_INTERVAL_MS_CONFIG, Long.toString(purgeInterval)); @@ -1137,8 +1113,8 @@ public void shouldPurgeAfterPurgeInterval(final boolean stateUpdaterEnabled, fin } @ParameterizedTest - @MethodSource("data") - public void shouldRecordCommitLatency(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { + @ValueSource(booleans = {true, false}) + public void shouldRecordCommitLatency(final boolean processingThreadsEnabled) { final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class); when(consumer.groupMetadata()).thenReturn(consumerGroupMetadata); when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty()); @@ -1160,7 +1136,7 @@ public void shouldRecordCommitLatency(final boolean stateUpdaterEnabled, final b schedulingTaskManager = null; } - final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled)); + final StreamsConfig config = new StreamsConfig(configProps(false, processingThreadsEnabled)); final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), mockTime); final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); @@ -1246,12 +1222,12 @@ int commit(final Collection tasksToCommit) { } @ParameterizedTest - @MethodSource("data") - public void shouldInjectSharedProducerForAllTasksUsingClientSupplierOnCreateIfEosDisabled(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { + @ValueSource(booleans = {true, false}) + public void shouldInjectSharedProducerForAllTasksUsingClientSupplierOnCreateIfEosDisabled(final boolean processingThreadsEnabled) { internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1); internalStreamsBuilder.buildAndOptimizeTopology(); - final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled)); + final StreamsConfig config = new StreamsConfig(configProps(false, processingThreadsEnabled)); thread = createStreamThread(CLIENT_ID, config); thread.setState(StreamThread.State.STARTING); @@ -1286,11 +1262,11 @@ public void shouldInjectSharedProducerForAllTasksUsingClientSupplierOnCreateIfEo } @ParameterizedTest - @MethodSource("data") - public void shouldInjectProducerPerThreadUsingClientSupplierOnCreateIfEosV2Enabled(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { + @ValueSource(booleans = {true, false}) + public void shouldInjectProducerPerThreadUsingClientSupplierOnCreateIfEosV2Enabled(final boolean processingThreadsEnabled) { internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1); - final Properties props = configProps(true, stateUpdaterEnabled, processingThreadsEnabled); + final Properties props = configProps(true, processingThreadsEnabled); thread = createStreamThread(CLIENT_ID, new StreamsConfig(props)); thread.setState(StreamThread.State.STARTING); @@ -1324,20 +1300,53 @@ public void shouldInjectProducerPerThreadUsingClientSupplierOnCreateIfEosV2Enabl } @ParameterizedTest - @MethodSource("data") - public void shouldOnlyCompleteShutdownAfterRebalanceNotInProgress(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) throws InterruptedException { - // The state updater is disabled for this test because this test relies on the fact the mainConsumer.resume() - // is not called. This is not true when the state updater is enabled which leads to - // java.lang.IllegalStateException: No current assignment for partition topic1-2. - // Since this tests verifies an aspect that is independent from the state updater, it is OK to disable - // the state updater and leave the rewriting of the test to later, when the code path for disabled state updater - // is removed. - assumeFalse(stateUpdaterEnabled); + @ValueSource(booleans = {true, false}) + public void shouldOnlyCompleteShutdownAfterRebalanceNotInProgress(final boolean processingThreadsEnabled) throws InterruptedException { internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1); + final Time mockTime = new MockTime(1); + final StreamsConfig config = new StreamsConfig(configProps(false, processingThreadsEnabled)); + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl( + metrics, + APPLICATION_ID, + PROCESS_ID.toString(), + mockTime + ); - final Properties props = configProps(true, stateUpdaterEnabled, processingThreadsEnabled); - thread = - createStreamThread(CLIENT_ID, new StreamsConfig(props), new MockTime(1)); + lenient().when(consumer.poll(any())).thenReturn(ConsumerRecords.empty()); + final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class); + when(consumer.groupMetadata()).thenReturn(consumerGroupMetadata); + when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty()); + final MockConsumerClientSupplier mockClientSupplier = new MockConsumerClientSupplier(consumer); + mockClientSupplier.setCluster(createCluster()); + + final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); + topologyMetadata.buildAndRewriteTopology(); + stateDirectory = new StateDirectory(config, mockTime, true, false); + final StreamsMetadataState streamsMetadataState = new StreamsMetadataState( + new TopologyMetadata(internalTopologyBuilder, config), + StreamsMetadataState.UNKNOWN_HOST, + new LogContext(String.format("stream-client [%s] ", CLIENT_ID)) + ); + thread = StreamThread.create( + topologyMetadata, + config, + mockClientSupplier, + mockClientSupplier.getAdmin(config.getAdminConfigs(CLIENT_ID)), + PROCESS_ID, + CLIENT_ID, + streamsMetrics, + mockTime, + streamsMetadataState, + 0, + stateDirectory, + new MockStateRestoreListener(), + new MockStandbyUpdateListener(), + threadIdx, + null, + null + ); + + mockClientSupplier.nextRebalanceMs().set(mockTime.milliseconds() - 1L); thread.taskManager().handleRebalanceStart(Collections.singleton(topic1)); @@ -1357,6 +1366,13 @@ public void shouldOnlyCompleteShutdownAfterRebalanceNotInProgress(final boolean 10 * 1000, "Thread never started."); + TestUtils.waitForCondition( + () -> Thread.getAllStackTraces().keySet().stream() + .map(Thread::getName) + .anyMatch(name -> name.contains("StateUpdater")), + 10 * 1000, + "StateUpdater thread not found."); + thread.shutdown(); // even if thread is no longer running, it should still be polling @@ -1378,14 +1394,14 @@ public void shouldOnlyCompleteShutdownAfterRebalanceNotInProgress(final boolean } @ParameterizedTest - @MethodSource("data") - public void shouldShutdownTaskManagerOnClose(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { + @ValueSource(booleans = {true, false}) + public void shouldShutdownTaskManagerOnClose(final boolean processingThreadsEnabled) { final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class); when(consumer.groupMetadata()).thenReturn(consumerGroupMetadata); when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty()); final TaskManager taskManager = mock(TaskManager.class); - final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled)); + final StreamsConfig config = new StreamsConfig(configProps(false, processingThreadsEnabled)); final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); thread = buildStreamThread(consumer, taskManager, config, topologyMetadata) @@ -1402,8 +1418,8 @@ public void shouldShutdownTaskManagerOnClose(final boolean stateUpdaterEnabled, } @ParameterizedTest - @MethodSource("data") - public void shouldNotReturnDataAfterTaskMigrated(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { + @ValueSource(booleans = {true, false}) + public void shouldNotReturnDataAfterTaskMigrated(final boolean processingThreadsEnabled) { final TaskManager taskManager = mock(TaskManager.class); final InternalTopologyBuilder internalTopologyBuilder = mock(InternalTopologyBuilder.class); when(internalTopologyBuilder.fullSourceTopicNames()).thenReturn(Collections.singletonList(topic1)); @@ -1418,30 +1434,19 @@ public void shouldNotReturnDataAfterTaskMigrated(final boolean stateUpdaterEnabl final TaskMigratedException taskMigratedException = new TaskMigratedException( "Changelog restore found task migrated", new RuntimeException("restore task migrated")); - ChangelogReader changelogReader = this.changelogReader; - if (stateUpdaterEnabled) { - when(taskManager.checkStateUpdater(anyLong(), any())).thenAnswer(answer -> { - consumer.addRecord(new ConsumerRecord<>(topic1, 1, 11, new byte[0], new byte[0])); - consumer.addRecord(new ConsumerRecord<>(topic1, 1, 12, new byte[1], new byte[0])); + final ChangelogReader changelogReader = this.changelogReader; - throw taskMigratedException; - }); - } else { - changelogReader = new MockChangelogReader() { - @Override - public long restore(final Map tasks) { - consumer.addRecord(new ConsumerRecord<>(topic1, 1, 11, new byte[0], new byte[0])); - consumer.addRecord(new ConsumerRecord<>(topic1, 1, 12, new byte[1], new byte[0])); + when(taskManager.checkStateUpdater(anyLong(), any())).thenAnswer(answer -> { + consumer.addRecord(new ConsumerRecord<>(topic1, 1, 11, new byte[0], new byte[0])); + consumer.addRecord(new ConsumerRecord<>(topic1, 1, 12, new byte[1], new byte[0])); - throw taskMigratedException; - } - }; - } + throw taskMigratedException; + }); final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, PROCESS_ID.toString(), mockTime); - final Properties props = configProps(false, stateUpdaterEnabled, processingThreadsEnabled); + final Properties props = configProps(false, processingThreadsEnabled); final StreamsConfig config = new StreamsConfig(props); thread = new StreamThread( new MockTime(1), @@ -1479,14 +1484,14 @@ public long restore(final Map tasks) { } @ParameterizedTest - @MethodSource("data") - public void shouldShutdownTaskManagerOnCloseWithoutStart(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { + @ValueSource(booleans = {true, false}) + public void shouldShutdownTaskManagerOnCloseWithoutStart(final boolean processingThreadsEnabled) { final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class); when(consumer.groupMetadata()).thenReturn(consumerGroupMetadata); when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty()); final TaskManager taskManager = mock(TaskManager.class); - final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled)); + final StreamsConfig config = new StreamsConfig(configProps(false, processingThreadsEnabled)); final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); thread = buildStreamThread(consumer, taskManager, config, topologyMetadata) @@ -1497,14 +1502,14 @@ public void shouldShutdownTaskManagerOnCloseWithoutStart(final boolean stateUpda } @ParameterizedTest - @MethodSource("data") - public void shouldOnlyShutdownOnce(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { + @ValueSource(booleans = {true, false}) + public void shouldOnlyShutdownOnce(final boolean processingThreadsEnabled) { final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class); when(consumer.groupMetadata()).thenReturn(consumerGroupMetadata); when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty()); final TaskManager taskManager = mock(TaskManager.class); - final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled)); + final StreamsConfig config = new StreamsConfig(configProps(false, processingThreadsEnabled)); final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); topologyMetadata.buildAndRewriteTopology(); thread = buildStreamThread(consumer, taskManager, config, topologyMetadata) @@ -1517,12 +1522,12 @@ public void shouldOnlyShutdownOnce(final boolean stateUpdaterEnabled, final bool } @ParameterizedTest - @MethodSource("data") - public void shouldNotThrowWhenStandbyTasksAssignedAndNoStateStoresForTopology(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { + @ValueSource(booleans = {true, false}) + public void shouldNotThrowWhenStandbyTasksAssignedAndNoStateStoresForTopology(final boolean processingThreadsEnabled) { internalTopologyBuilder.addSource(null, "name", null, null, null, "topic"); internalTopologyBuilder.addSink("out", "output", null, null, null, "name"); - final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled)); + final StreamsConfig config = new StreamsConfig(configProps(false, processingThreadsEnabled)); thread = createStreamThread(CLIENT_ID, config); thread.setState(StreamThread.State.STARTING); @@ -1539,12 +1544,12 @@ public void shouldNotThrowWhenStandbyTasksAssignedAndNoStateStoresForTopology(fi } @ParameterizedTest - @MethodSource("data") - public void shouldNotCloseTaskAndRemoveFromTaskManagerIfProducerWasFencedWhileProcessing(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) throws Exception { + @ValueSource(booleans = {true, false}) + public void shouldNotCloseTaskAndRemoveFromTaskManagerIfProducerWasFencedWhileProcessing(final boolean processingThreadsEnabled) throws Exception { internalTopologyBuilder.addSource(null, "source", null, null, null, topic1); internalTopologyBuilder.addSink("sink", "dummyTopic", null, null, null, "source"); - final StreamsConfig config = new StreamsConfig(configProps(true, stateUpdaterEnabled, processingThreadsEnabled)); + final StreamsConfig config = new StreamsConfig(configProps(true, processingThreadsEnabled)); thread = createStreamThread(CLIENT_ID, config); final MockConsumer consumer = clientSupplier.consumer; @@ -1610,8 +1615,8 @@ public void shouldNotCloseTaskAndRemoveFromTaskManagerIfProducerWasFencedWhilePr assertThat(producer.commitCount(), equalTo(1L)); } - private void testThrowingDurringCommitTransactionException(final RuntimeException e, final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) throws InterruptedException { - final StreamsConfig config = new StreamsConfig(configProps(true, stateUpdaterEnabled, processingThreadsEnabled)); + private void testThrowingDurringCommitTransactionException(final RuntimeException e, final boolean processingThreadsEnabled) throws InterruptedException { + final StreamsConfig config = new StreamsConfig(configProps(true, processingThreadsEnabled)); thread = createStreamThread(CLIENT_ID, config); internalTopologyBuilder.addSource(null, "name", null, null, null, topic1); @@ -1655,20 +1660,20 @@ private void testThrowingDurringCommitTransactionException(final RuntimeExceptio } @ParameterizedTest - @MethodSource("data") - public void shouldNotCloseTaskAndRemoveFromTaskManagerIfProducerGotFencedInCommitTransactionWhenSuspendingTasks(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) throws Exception { - testThrowingDurringCommitTransactionException(new ProducerFencedException("Producer is fenced"), stateUpdaterEnabled, processingThreadsEnabled); + @ValueSource(booleans = {true, false}) + public void shouldNotCloseTaskAndRemoveFromTaskManagerIfProducerGotFencedInCommitTransactionWhenSuspendingTasks(final boolean processingThreadsEnabled) throws Exception { + testThrowingDurringCommitTransactionException(new ProducerFencedException("Producer is fenced"), processingThreadsEnabled); } @ParameterizedTest - @MethodSource("data") - public void shouldNotCloseTaskAndRemoveFromTaskManagerIfInvalidPidMappingOccurredInCommitTransactionWhenSuspendingTasks(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) throws Exception { - testThrowingDurringCommitTransactionException(new InvalidPidMappingException("PidMapping is invalid"), stateUpdaterEnabled, processingThreadsEnabled); + @ValueSource(booleans = {true, false}) + public void shouldNotCloseTaskAndRemoveFromTaskManagerIfInvalidPidMappingOccurredInCommitTransactionWhenSuspendingTasks(final boolean processingThreadsEnabled) throws Exception { + testThrowingDurringCommitTransactionException(new InvalidPidMappingException("PidMapping is invalid"), processingThreadsEnabled); } @ParameterizedTest - @MethodSource("data") - public void shouldReinitializeRevivedTasksInAnyState(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) throws Exception { - final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled)); + @ValueSource(booleans = {true, false}) + public void shouldReinitializeRevivedTasksInAnyState(final boolean processingThreadsEnabled) throws Exception { + final StreamsConfig config = new StreamsConfig(configProps(false, processingThreadsEnabled)); thread = createStreamThread(CLIENT_ID, config, new MockTime(1)); final String storeName = "store"; @@ -1737,15 +1742,14 @@ public void shouldReinitializeRevivedTasksInAnyState(final boolean stateUpdaterE runOnce(processingThreadsEnabled); // the third actually polls, processes the record, and throws the corruption exception - if (stateUpdaterEnabled) { - TestUtils.waitForCondition( - () -> thread.taskManager().checkStateUpdater( - mockTime.milliseconds(), - topicPartitions -> mockConsumer.seekToBeginning(singleton(t1p1)) - ), - 10 * 1000, - "State updater never returned tasks."); - } + TestUtils.waitForCondition( + () -> thread.taskManager().checkStateUpdater( + mockTime.milliseconds(), + topicPartitions -> mockConsumer.seekToBeginning(singleton(t1p1)) + ), + 10 * 1000, + "State updater never returned tasks."); + addRecord(mockConsumer, 0L); shouldThrow.set(true); final TaskCorruptedException taskCorruptedException; @@ -1757,15 +1761,13 @@ public void shouldReinitializeRevivedTasksInAnyState(final boolean stateUpdaterE // Now, we can handle the corruption thread.taskManager().handleCorruption(taskCorruptedException.corruptedTasks()); - if (stateUpdaterEnabled) { - TestUtils.waitForCondition( - () -> thread.taskManager().checkStateUpdater( - mockTime.milliseconds(), - topicPartitions -> mockConsumer.seekToBeginning(singleton(t1p1)) - ), - 10 * 1000, - "State updater never returned tasks."); - } + TestUtils.waitForCondition( + () -> thread.taskManager().checkStateUpdater( + mockTime.milliseconds(), + topicPartitions -> mockConsumer.seekToBeginning(singleton(t1p1)) + ), + 10 * 1000, + "State updater never returned tasks."); // again, complete the restoration runOnce(processingThreadsEnabled); @@ -1784,11 +1786,11 @@ public void shouldReinitializeRevivedTasksInAnyState(final boolean stateUpdaterE } } - private void testNotCloseTaskAndRemoveFromTaskManagerInCommitTransactionWhenCommitting(final RuntimeException e, final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { + private void testNotCloseTaskAndRemoveFromTaskManagerInCommitTransactionWhenCommitting(final RuntimeException e, final boolean processingThreadsEnabled) { // only have source but no sink so that we would not get fenced in producer.send internalTopologyBuilder.addSource(null, "source", null, null, null, topic1); - final StreamsConfig config = new StreamsConfig(configProps(true, stateUpdaterEnabled, processingThreadsEnabled)); + final StreamsConfig config = new StreamsConfig(configProps(true, processingThreadsEnabled)); thread = createStreamThread(CLIENT_ID, config); final MockConsumer consumer = clientSupplier.consumer; @@ -1840,21 +1842,21 @@ private void testNotCloseTaskAndRemoveFromTaskManagerInCommitTransactionWhenComm } @ParameterizedTest - @MethodSource("data") - public void shouldNotCloseTaskAndRemoveFromTaskManagerIfProducerGotFencedInCommitTransactionWhenCommitting(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { - testNotCloseTaskAndRemoveFromTaskManagerInCommitTransactionWhenCommitting(new ProducerFencedException("Producer is fenced"), stateUpdaterEnabled, processingThreadsEnabled); + @ValueSource(booleans = {true, false}) + public void shouldNotCloseTaskAndRemoveFromTaskManagerIfProducerGotFencedInCommitTransactionWhenCommitting(final boolean processingThreadsEnabled) { + testNotCloseTaskAndRemoveFromTaskManagerInCommitTransactionWhenCommitting(new ProducerFencedException("Producer is fenced"), processingThreadsEnabled); } @ParameterizedTest - @MethodSource("data") - public void shouldNotCloseTaskAndRemoveFromTaskManagerIfPidMappingIsInvalidInCommitTransactionWhenCommitting(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { - testNotCloseTaskAndRemoveFromTaskManagerInCommitTransactionWhenCommitting(new InvalidPidMappingException("PID Mapping is invalid"), stateUpdaterEnabled, processingThreadsEnabled); + @ValueSource(booleans = {true, false}) + public void shouldNotCloseTaskAndRemoveFromTaskManagerIfPidMappingIsInvalidInCommitTransactionWhenCommitting(final boolean processingThreadsEnabled) { + testNotCloseTaskAndRemoveFromTaskManagerInCommitTransactionWhenCommitting(new InvalidPidMappingException("PID Mapping is invalid"), processingThreadsEnabled); } @ParameterizedTest - @MethodSource("data") - public void shouldNotCloseTaskProducerWhenSuspending(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) throws Exception { - final StreamsConfig config = new StreamsConfig(configProps(true, stateUpdaterEnabled, processingThreadsEnabled)); + @ValueSource(booleans = {true, false}) + public void shouldNotCloseTaskProducerWhenSuspending(final boolean processingThreadsEnabled) throws Exception { + final StreamsConfig config = new StreamsConfig(configProps(true, processingThreadsEnabled)); thread = createStreamThread(CLIENT_ID, config); internalTopologyBuilder.addSource(null, "name", null, null, null, topic1); @@ -1900,9 +1902,9 @@ public void shouldNotCloseTaskProducerWhenSuspending(final boolean stateUpdaterE } @ParameterizedTest - @MethodSource("data") - public void shouldReturnActiveTaskMetadataWhileRunningState(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { - final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled)); + @ValueSource(booleans = {true, false}) + public void shouldReturnActiveTaskMetadataWhileRunningState(final boolean processingThreadsEnabled) { + final StreamsConfig config = new StreamsConfig(configProps(false, processingThreadsEnabled)); internalTopologyBuilder.addSource(null, "source", null, null, null, topic1); clientSupplier.setCluster(createCluster()); @@ -1978,9 +1980,9 @@ public void shouldReturnActiveTaskMetadataWhileRunningState(final boolean stateU } @ParameterizedTest - @MethodSource("data") - public void shouldReturnStandbyTaskMetadataWhileRunningState(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { - final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled)); + @ValueSource(booleans = {true, false}) + public void shouldReturnStandbyTaskMetadataWhileRunningState(final boolean processingThreadsEnabled) { + final StreamsConfig config = new StreamsConfig(configProps(false, processingThreadsEnabled)); internalStreamsBuilder.stream(Collections.singleton(topic1), consumed) .groupByKey().count(Materialized.as("count-one")); @@ -2024,195 +2026,10 @@ public void shouldReturnStandbyTaskMetadataWhileRunningState(final boolean state assertTrue(threadMetadata.activeTasks().isEmpty()); } - @SuppressWarnings("unchecked") @ParameterizedTest - @MethodSource("data") - public void shouldUpdateStandbyTask(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) throws Exception { - // Updating standby tasks on the stream thread only happens when the state updater is disabled - assumeFalse(stateUpdaterEnabled); - - final String storeName1 = "count-one"; - final String storeName2 = "table-two"; - final String changelogName1 = APPLICATION_ID + "-" + storeName1 + "-changelog"; - final String changelogName2 = APPLICATION_ID + "-" + storeName2 + "-changelog"; - final Properties props = configProps(false, stateUpdaterEnabled, processingThreadsEnabled); - final StreamsConfig config = new StreamsConfig(props); - thread = createStreamThread(CLIENT_ID, config); - final MockConsumer restoreConsumer = clientSupplier.restoreConsumer; - - setupThread(storeName1, storeName2, changelogName1, changelogName2, restoreConsumer, false); - - runOnce(processingThreadsEnabled); - - final StandbyTask standbyTask1 = standbyTask(thread.taskManager(), t1p1); - final StandbyTask standbyTask2 = standbyTask(thread.taskManager(), t2p1); - assertEquals(task1, standbyTask1.id()); - assertEquals(task3, standbyTask2.id()); - - final KeyValueStore store1 = (KeyValueStore) standbyTask1.store(storeName1); - final KeyValueStore store2 = (KeyValueStore) standbyTask2.store(storeName2); - - assertEquals(0L, store1.approximateNumEntries()); - assertEquals(0L, store2.approximateNumEntries()); - - addStandbyRecordsToRestoreConsumer(restoreConsumer); - - runOnce(processingThreadsEnabled); - - assertEquals(10L, store1.approximateNumEntries()); - assertEquals(4L, store2.approximateNumEntries()); - } - - private void addActiveRecordsToRestoreConsumer(final MockConsumer restoreConsumer) { - for (long i = 0L; i < 10L; i++) { - restoreConsumer.addRecord(new ConsumerRecord<>( - STREAM_THREAD_TEST_COUNT_ONE_CHANGELOG, - 2, - i, - ("K" + i).getBytes(), - ("V" + i).getBytes())); - } - } - - private void addStandbyRecordsToRestoreConsumer(final MockConsumer restoreConsumer) { - // let the store1 be restored from 0 to 10; store2 be restored from 5 (checkpointed) to 10 - for (long i = 0L; i < 10L; i++) { - restoreConsumer.addRecord(new ConsumerRecord<>( - STREAM_THREAD_TEST_COUNT_ONE_CHANGELOG, - 1, - i, - ("K" + i).getBytes(), - ("V" + i).getBytes())); - restoreConsumer.addRecord(new ConsumerRecord<>( - STREAM_THREAD_TEST_TABLE_TWO_CHANGELOG, - 1, - i, - ("K" + i).getBytes(), - ("V" + i).getBytes())); - } - } - - private void setupThread(final String storeName1, - final String storeName2, - final String changelogName1, - final String changelogName2, - final MockConsumer restoreConsumer, - final boolean addActiveTask) throws IOException { - final TopicPartition activePartition = new TopicPartition(changelogName1, 2); - final TopicPartition partition1 = new TopicPartition(changelogName1, 1); - final TopicPartition partition2 = new TopicPartition(changelogName2, 1); - - internalStreamsBuilder - .stream(Collections.singleton(topic1), consumed) - .groupByKey() - .count(Materialized.as(storeName1)); - final MaterializedInternal> materialized - = new MaterializedInternal<>(Materialized.as(storeName2), internalStreamsBuilder, ""); - internalStreamsBuilder.table(topic2, new ConsumedInternal<>(Consumed.with(null, null)), materialized); - - internalStreamsBuilder.buildAndOptimizeTopology(); - restoreConsumer.updatePartitions(changelogName1, - Collections.singletonList(new PartitionInfo(changelogName1, 1, null, new Node[0], new Node[0])) - ); - - restoreConsumer.updateEndOffsets(Collections.singletonMap(activePartition, 10L)); - restoreConsumer.updateBeginningOffsets(Collections.singletonMap(activePartition, 0L)); - ((MockAdminClient) (thread.adminClient())).updateBeginningOffsets(Collections.singletonMap(activePartition, 0L)); - ((MockAdminClient) (thread.adminClient())).updateEndOffsets(Collections.singletonMap(activePartition, 10L)); - - restoreConsumer.updateEndOffsets(Collections.singletonMap(partition1, 10L)); - restoreConsumer.updateBeginningOffsets(Collections.singletonMap(partition1, 0L)); - restoreConsumer.updateEndOffsets(Collections.singletonMap(partition2, 10L)); - restoreConsumer.updateBeginningOffsets(Collections.singletonMap(partition2, 0L)); - final OffsetCheckpoint checkpoint - = new OffsetCheckpoint(new File(stateDirectory.getOrCreateDirectoryForTask(task3), CHECKPOINT_FILE_NAME)); - checkpoint.write(Collections.singletonMap(partition2, 5L)); - - thread.setState(StreamThread.State.STARTING); - thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet()); - - final Map> activeTasks = new HashMap<>(); - final Map> standbyTasks = new HashMap<>(); - - if (addActiveTask) { - activeTasks.put(task2, Collections.singleton(t1p2)); - } - - // assign single partition - standbyTasks.put(task1, Collections.singleton(t1p1)); - standbyTasks.put(task3, Collections.singleton(t2p1)); - - thread.taskManager().handleAssignment(activeTasks, standbyTasks); - thread.taskManager().tryToCompleteRestoration(mockTime.milliseconds(), null); - - thread.rebalanceListener().onPartitionsAssigned(Collections.emptyList()); - } - - @SuppressWarnings("unchecked") - @ParameterizedTest - @MethodSource("data") - public void shouldNotUpdateStandbyTaskWhenPaused(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) throws Exception { - // Updating standby tasks on the stream thread only happens when the state updater is disabled - assumeFalse(stateUpdaterEnabled); - - final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled)); - final String storeName1 = "count-one"; - final String storeName2 = "table-two"; - final String changelogName1 = APPLICATION_ID + "-" + storeName1 + "-changelog"; - final String changelogName2 = APPLICATION_ID + "-" + storeName2 + "-changelog"; - thread = createStreamThread(CLIENT_ID, config); - final MockConsumer restoreConsumer = clientSupplier.restoreConsumer; - - setupThread(storeName1, storeName2, changelogName1, changelogName2, restoreConsumer, true); - - runOnce(processingThreadsEnabled); - - final StreamTask activeTask1 = activeTask(thread.taskManager(), t1p2); - final StandbyTask standbyTask1 = standbyTask(thread.taskManager(), t1p1); - final StandbyTask standbyTask2 = standbyTask(thread.taskManager(), t2p1); - assertEquals(task1, standbyTask1.id()); - assertEquals(task3, standbyTask2.id()); - - final KeyValueStore activeStore = (KeyValueStore) activeTask1.store(storeName1); - - final KeyValueStore store1 = (KeyValueStore) standbyTask1.store(storeName1); - final KeyValueStore store2 = (KeyValueStore) standbyTask2.store(storeName2); - - assertEquals(0L, activeStore.approximateNumEntries()); - assertEquals(0L, store1.approximateNumEntries()); - assertEquals(0L, store2.approximateNumEntries()); - - // Add some records that the active task would handle - addActiveRecordsToRestoreConsumer(restoreConsumer); - // let the store1 be restored from 0 to 10; store2 be restored from 5 (checkpointed) to 10 - addStandbyRecordsToRestoreConsumer(restoreConsumer); - - // Simulate pause - thread.taskManager().topologyMetadata().pauseTopology(TopologyMetadata.UNNAMED_TOPOLOGY); - runOnce(processingThreadsEnabled); - - assertEquals(0L, activeStore.approximateNumEntries()); - assertEquals(0L, store1.approximateNumEntries()); - assertEquals(0L, store2.approximateNumEntries()); - - // Simulate resume - thread.taskManager().topologyMetadata().resumeTopology(TopologyMetadata.UNNAMED_TOPOLOGY); - runOnce(processingThreadsEnabled); - - assertEquals(10L, activeStore.approximateNumEntries()); - assertEquals(0L, store1.approximateNumEntries()); - assertEquals(0L, store2.approximateNumEntries()); - - runOnce(processingThreadsEnabled); - assertEquals(10L, activeStore.approximateNumEntries()); - assertEquals(10L, store1.approximateNumEntries()); - assertEquals(4L, store2.approximateNumEntries()); - } - - @ParameterizedTest - @MethodSource("data") - public void shouldCreateStandbyTask(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { - final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled)); + @ValueSource(booleans = {true, false}) + public void shouldCreateStandbyTask(final boolean processingThreadsEnabled) { + final StreamsConfig config = new StreamsConfig(configProps(false, processingThreadsEnabled)); setupInternalTopologyWithoutState(config); internalTopologyBuilder.addStateStore(new MockKeyValueStoreBuilder("myStore", true), "processor1"); @@ -2220,18 +2037,18 @@ public void shouldCreateStandbyTask(final boolean stateUpdaterEnabled, final boo } @ParameterizedTest - @MethodSource("data") - public void shouldNotCreateStandbyTaskWithoutStateStores(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { - final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled)); + @ValueSource(booleans = {true, false}) + public void shouldNotCreateStandbyTaskWithoutStateStores(final boolean processingThreadsEnabled) { + final StreamsConfig config = new StreamsConfig(configProps(false, processingThreadsEnabled)); setupInternalTopologyWithoutState(config); assertThat(createStandbyTask(config), empty()); } @ParameterizedTest - @MethodSource("data") - public void shouldNotCreateStandbyTaskIfStateStoresHaveLoggingDisabled(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { - final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled)); + @ValueSource(booleans = {true, false}) + public void shouldNotCreateStandbyTaskIfStateStoresHaveLoggingDisabled(final boolean processingThreadsEnabled) { + final StreamsConfig config = new StreamsConfig(configProps(false, processingThreadsEnabled)); setupInternalTopologyWithoutState(config); final StoreBuilder> storeBuilder = new MockKeyValueStoreBuilder("myStore", true); @@ -2242,9 +2059,9 @@ public void shouldNotCreateStandbyTaskIfStateStoresHaveLoggingDisabled(final boo } @ParameterizedTest - @MethodSource("data") + @ValueSource(booleans = {true, false}) @SuppressWarnings("deprecation") - public void shouldPunctuateActiveTask(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { + public void shouldPunctuateActiveTask(final boolean processingThreadsEnabled) { assumeFalse(processingThreadsEnabled); final List punctuatedStreamTime = new ArrayList<>(); @@ -2264,7 +2081,7 @@ public void process(final Record record) {} internalStreamsBuilder.stream(Collections.singleton(topic1), consumed).process(punctuateProcessor); internalStreamsBuilder.buildAndOptimizeTopology(); - final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled)); + final StreamsConfig config = new StreamsConfig(configProps(false, processingThreadsEnabled)); thread = createStreamThread(CLIENT_ID, config); thread.setState(StreamThread.State.STARTING); @@ -2318,8 +2135,8 @@ public void process(final Record record) {} } @ParameterizedTest - @MethodSource("data") - public void shouldPunctuateWithTimestampPreservedInProcessorContext(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { + @ValueSource(booleans = {true, false}) + public void shouldPunctuateWithTimestampPreservedInProcessorContext(final boolean processingThreadsEnabled) { assumeFalse(processingThreadsEnabled); final ProcessorSupplier punctuateProcessor = () -> new Processor<>() { @@ -2345,7 +2162,7 @@ public void process(final Record record) {} internalStreamsBuilder.buildAndOptimizeTopology(); final long currTime = mockTime.milliseconds(); - thread = createStreamThread(CLIENT_ID, stateUpdaterEnabled, processingThreadsEnabled); + thread = createStreamThread(CLIENT_ID, processingThreadsEnabled); thread.setState(StreamThread.State.STARTING); thread.taskManager().init(); @@ -2393,9 +2210,9 @@ public void process(final Record record) {} } @ParameterizedTest - @MethodSource("data") - public void shouldAlwaysUpdateTasksMetadataAfterChangingState(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { - final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled)); + @ValueSource(booleans = {true, false}) + public void shouldAlwaysUpdateTasksMetadataAfterChangingState(final boolean processingThreadsEnabled) { + final StreamsConfig config = new StreamsConfig(configProps(false, processingThreadsEnabled)); thread = createStreamThread(CLIENT_ID, config); ThreadMetadata metadata = thread.threadMetadata(); assertEquals(StreamThread.State.CREATED.name(), metadata.threadState()); @@ -2409,14 +2226,14 @@ public void shouldAlwaysUpdateTasksMetadataAfterChangingState(final boolean stat } @ParameterizedTest - @MethodSource("data") - public void shouldRecoverFromInvalidOffsetExceptionOnRestoreAndFinishRestore(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) throws Exception { + @ValueSource(booleans = {true, false}) + public void shouldRecoverFromInvalidOffsetExceptionOnRestoreAndFinishRestore(final boolean processingThreadsEnabled) throws Exception { internalStreamsBuilder.stream(Collections.singleton("topic"), consumed) .groupByKey() .count(Materialized.as("count")); internalStreamsBuilder.buildAndOptimizeTopology(); - thread = createStreamThread("clientId", new MockTime(1), stateUpdaterEnabled, processingThreadsEnabled); + thread = createStreamThread("clientId", new MockTime(1), processingThreadsEnabled); final MockConsumer mockConsumer = (MockConsumer) thread.mainConsumer(); final MockConsumer mockRestoreConsumer = (MockConsumer) thread.restoreConsumer(); final MockAdminClient mockAdminClient = (MockAdminClient) thread.adminClient(); @@ -2518,26 +2335,18 @@ public Set partitions() { "K2".getBytes(), "V2".getBytes())); - if (stateUpdaterEnabled) { - TestUtils.waitForCondition( - () -> mockRestoreConsumer.assignment().isEmpty(), - "Never get the assignment"); - } else { - TestUtils.waitForCondition( - () -> { - mockRestoreConsumer.assign(changelogPartitionSet); - return mockRestoreConsumer.position(changelogPartition) == 2L; - }, - "Never finished restore"); - } + TestUtils.waitForCondition( + () -> mockRestoreConsumer.assignment().isEmpty(), + "Never get the assignment"); + } @ParameterizedTest - @MethodSource("data") - public void shouldLogAndRecordSkippedMetricForDeserializationException(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { + @ValueSource(booleans = {true, false}) + public void shouldLogAndRecordSkippedMetricForDeserializationException(final boolean processingThreadsEnabled) { internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1); - final Properties properties = configProps(false, stateUpdaterEnabled, processingThreadsEnabled); + final Properties properties = configProps(false, processingThreadsEnabled); properties.setProperty( StreamsConfig.DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class.getName() @@ -2599,9 +2408,9 @@ public void shouldLogAndRecordSkippedMetricForDeserializationException(final boo } @ParameterizedTest - @MethodSource("data") - public void shouldThrowTaskMigratedExceptionHandlingTaskLost(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { - final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled)); + @ValueSource(booleans = {true, false}) + public void shouldThrowTaskMigratedExceptionHandlingTaskLost(final boolean processingThreadsEnabled) { + final StreamsConfig config = new StreamsConfig(configProps(false, processingThreadsEnabled)); final Set assignedPartitions = Collections.singleton(t1p1); final TaskManager taskManager = mock(TaskManager.class); @@ -2627,9 +2436,9 @@ public void shouldThrowTaskMigratedExceptionHandlingTaskLost(final boolean state } @ParameterizedTest - @MethodSource("data") - public void shouldThrowTaskMigratedExceptionHandlingRevocation(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { - final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled)); + @ValueSource(booleans = {true, false}) + public void shouldThrowTaskMigratedExceptionHandlingRevocation(final boolean processingThreadsEnabled) { + final StreamsConfig config = new StreamsConfig(configProps(false, processingThreadsEnabled)); final Set assignedPartitions = Collections.singleton(t1p1); final TaskManager taskManager = mock(TaskManager.class); @@ -2655,10 +2464,10 @@ public void shouldThrowTaskMigratedExceptionHandlingRevocation(final boolean sta } @ParameterizedTest - @MethodSource("data") + @ValueSource(booleans = {true, false}) @SuppressWarnings("unchecked") - public void shouldCatchHandleCorruptionOnTaskCorruptedExceptionPath(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { - final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled)); + public void shouldCatchHandleCorruptionOnTaskCorruptedExceptionPath(final boolean processingThreadsEnabled) { + final StreamsConfig config = new StreamsConfig(configProps(false, processingThreadsEnabled)); final TaskManager taskManager = mock(TaskManager.class); final Consumer consumer = mock(Consumer.class); final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class); @@ -2716,10 +2525,10 @@ void runOnceWithoutProcessingThreads() { } @ParameterizedTest - @MethodSource("data") + @ValueSource(booleans = {true, false}) @SuppressWarnings("unchecked") - public void shouldCatchTimeoutExceptionFromHandleCorruptionAndInvokeExceptionHandler(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { - final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled)); + public void shouldCatchTimeoutExceptionFromHandleCorruptionAndInvokeExceptionHandler(final boolean processingThreadsEnabled) { + final StreamsConfig config = new StreamsConfig(configProps(false, processingThreadsEnabled)); final TaskManager taskManager = mock(TaskManager.class); final Consumer consumer = mock(Consumer.class); final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class); @@ -2783,10 +2592,10 @@ void runOnceWithoutProcessingThreads() { } @ParameterizedTest - @MethodSource("data") + @ValueSource(booleans = {true, false}) @SuppressWarnings("unchecked") - public void shouldCatchTaskMigratedExceptionOnOnTaskCorruptedExceptionPath(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { - final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled)); + public void shouldCatchTaskMigratedExceptionOnOnTaskCorruptedExceptionPath(final boolean processingThreadsEnabled) { + final StreamsConfig config = new StreamsConfig(configProps(false, processingThreadsEnabled)); final TaskManager taskManager = mock(TaskManager.class); final Consumer consumer = mock(Consumer.class); final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class); @@ -2849,10 +2658,10 @@ void runOnceWithoutProcessingThreads() { } @ParameterizedTest - @MethodSource("data") + @ValueSource(booleans = {true, false}) @SuppressWarnings("unchecked") - public void shouldEnforceRebalanceWhenTaskCorruptedExceptionIsThrownForAnActiveTask(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { - final StreamsConfig config = new StreamsConfig(configProps(true, stateUpdaterEnabled, processingThreadsEnabled)); + public void shouldEnforceRebalanceWhenTaskCorruptedExceptionIsThrownForAnActiveTask(final boolean processingThreadsEnabled) { + final StreamsConfig config = new StreamsConfig(configProps(true, processingThreadsEnabled)); final TaskManager taskManager = mock(TaskManager.class); final Consumer consumer = mock(Consumer.class); final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class); @@ -2914,10 +2723,10 @@ void runOnceWithoutProcessingThreads() { } @ParameterizedTest - @MethodSource("data") + @ValueSource(booleans = {true, false}) @SuppressWarnings("unchecked") - public void shouldNotEnforceRebalanceWhenTaskCorruptedExceptionIsThrownForAnInactiveTask(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { - final StreamsConfig config = new StreamsConfig(configProps(true, stateUpdaterEnabled, processingThreadsEnabled)); + public void shouldNotEnforceRebalanceWhenTaskCorruptedExceptionIsThrownForAnInactiveTask(final boolean processingThreadsEnabled) { + final StreamsConfig config = new StreamsConfig(configProps(true, processingThreadsEnabled)); final TaskManager taskManager = mock(TaskManager.class); final Consumer consumer = mock(Consumer.class); final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class); @@ -2977,9 +2786,9 @@ void runOnceWithoutProcessingThreads() { } @ParameterizedTest - @MethodSource("data") - public void shouldNotCommitNonRunningNonRestoringTasks(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { - final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled)); + @ValueSource(booleans = {true, false}) + public void shouldNotCommitNonRunningNonRestoringTasks(final boolean processingThreadsEnabled) { + final StreamsConfig config = new StreamsConfig(configProps(false, processingThreadsEnabled)); final TaskManager taskManager = mock(TaskManager.class); final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class); when(consumer.groupMetadata()).thenReturn(consumerGroupMetadata); @@ -3016,15 +2825,14 @@ public void shouldNotCommitNonRunningNonRestoringTasks(final boolean stateUpdate } @ParameterizedTest - @MethodSource("data") + @ValueSource(booleans = {true, false}) public void shouldLogAndRecordSkippedRecordsForInvalidTimestamps( - final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled ) throws Exception { internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1); - final Properties properties = configProps(false, stateUpdaterEnabled, processingThreadsEnabled); + final Properties properties = configProps(false, processingThreadsEnabled); properties.setProperty( StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, LogAndSkipOnInvalidTimestamp.class.getName() @@ -3123,9 +2931,9 @@ private void waitForCommit(final MockConsumer mockConsumer, fina } @ParameterizedTest - @MethodSource("data") - public void shouldTransmitTaskManagerMetrics(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { - final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled)); + @ValueSource(booleans = {true, false}) + public void shouldTransmitTaskManagerMetrics(final boolean processingThreadsEnabled) { + final StreamsConfig config = new StreamsConfig(configProps(false, processingThreadsEnabled)); final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class); when(consumer.groupMetadata()).thenReturn(consumerGroupMetadata); when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty()); @@ -3150,9 +2958,9 @@ public void shouldTransmitTaskManagerMetrics(final boolean stateUpdaterEnabled, } @ParameterizedTest - @MethodSource("data") - public void shouldConstructAdminMetrics(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { - final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled)); + @ValueSource(booleans = {true, false}) + public void shouldConstructAdminMetrics(final boolean processingThreadsEnabled) { + final StreamsConfig config = new StreamsConfig(configProps(false, processingThreadsEnabled)); final Node broker1 = new Node(0, "dummyHost-1", 1234); final Node broker2 = new Node(1, "dummyHost-2", 1234); final List cluster = Arrays.asList(broker1, broker2); @@ -3207,19 +3015,19 @@ public void shouldConstructAdminMetrics(final boolean stateUpdaterEnabled, final } @ParameterizedTest - @MethodSource("data") - public void shouldNotRecordFailedStreamThread(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { - runAndVerifyFailedStreamThreadRecording(false, stateUpdaterEnabled, processingThreadsEnabled); + @ValueSource(booleans = {true, false}) + public void shouldNotRecordFailedStreamThread(final boolean processingThreadsEnabled) { + runAndVerifyFailedStreamThreadRecording(false, processingThreadsEnabled); } @ParameterizedTest - @MethodSource("data") - public void shouldRecordFailedStreamThread(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { - runAndVerifyFailedStreamThreadRecording(true, stateUpdaterEnabled, processingThreadsEnabled); + @ValueSource(booleans = {true, false}) + public void shouldRecordFailedStreamThread(final boolean processingThreadsEnabled) { + runAndVerifyFailedStreamThreadRecording(true, processingThreadsEnabled); } - public void runAndVerifyFailedStreamThreadRecording(final boolean shouldFail, final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { - final StreamsConfig config = new StreamsConfig(configProps(false, stateUpdaterEnabled, processingThreadsEnabled)); + public void runAndVerifyFailedStreamThreadRecording(final boolean shouldFail, final boolean processingThreadsEnabled) { + final StreamsConfig config = new StreamsConfig(configProps(false, processingThreadsEnabled)); final ConsumerGroupMetadata consumerGroupMetadata = mock(ConsumerGroupMetadata.class); when(consumer.groupMetadata()).thenReturn(consumerGroupMetadata); when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty()); @@ -3276,10 +3084,9 @@ void runOnceWithoutProcessingThreads() { } @ParameterizedTest - @MethodSource("data") - public void shouldCheckStateUpdater(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { - assumeTrue(stateUpdaterEnabled); - final Properties streamsConfigProps = configProps(false, stateUpdaterEnabled, processingThreadsEnabled); + @ValueSource(booleans = {true, false}) + public void shouldCheckStateUpdater(final boolean processingThreadsEnabled) { + final Properties streamsConfigProps = configProps(false, processingThreadsEnabled); thread = setUpThread(streamsConfigProps); final TaskManager taskManager = thread.taskManager(); thread.setState(State.STARTING); @@ -3294,12 +3101,11 @@ public void shouldCheckStateUpdater(final boolean stateUpdaterEnabled, final boo } @ParameterizedTest - @MethodSource("data") - public void shouldCheckStateUpdaterInBetweenProcessCalls(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { - assumeTrue(stateUpdaterEnabled); + @ValueSource(booleans = {true, false}) + public void shouldCheckStateUpdaterInBetweenProcessCalls(final boolean processingThreadsEnabled) { assumeFalse(processingThreadsEnabled); - final Properties streamsConfigProps = configProps(false, stateUpdaterEnabled, processingThreadsEnabled); + final Properties streamsConfigProps = configProps(false, processingThreadsEnabled); thread = setUpThread(streamsConfigProps); final TaskManager taskManager = thread.taskManager(); thread.setState(State.STARTING); @@ -3312,9 +3118,9 @@ public void shouldCheckStateUpdaterInBetweenProcessCalls(final boolean stateUpda } @ParameterizedTest - @MethodSource("data") - public void shouldUpdateLagsAfterPolling(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { - final Properties streamsConfigProps = configProps(false, stateUpdaterEnabled, processingThreadsEnabled); + @ValueSource(booleans = {true, false}) + public void shouldUpdateLagsAfterPolling(final boolean processingThreadsEnabled) { + final Properties streamsConfigProps = configProps(false, processingThreadsEnabled); thread = setUpThread(streamsConfigProps); thread.setState(State.STARTING); thread.setState(State.PARTITIONS_ASSIGNED); @@ -3330,9 +3136,9 @@ public void shouldUpdateLagsAfterPolling(final boolean stateUpdaterEnabled, fina @ParameterizedTest - @MethodSource("data") - public void shouldResumePollingForPartitionsWithAvailableSpaceBeforePolling(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { - final Properties streamsConfigProps = configProps(false, stateUpdaterEnabled, processingThreadsEnabled); + @ValueSource(booleans = {true, false}) + public void shouldResumePollingForPartitionsWithAvailableSpaceBeforePolling(final boolean processingThreadsEnabled) { + final Properties streamsConfigProps = configProps(false, processingThreadsEnabled); thread = setUpThread(streamsConfigProps); thread.setState(State.STARTING); thread.setState(State.PARTITIONS_ASSIGNED); @@ -3345,10 +3151,9 @@ public void shouldResumePollingForPartitionsWithAvailableSpaceBeforePolling(fina } @ParameterizedTest - @MethodSource("data") - public void shouldRespectPollTimeInPartitionsAssignedStateWithStateUpdater(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { - assumeTrue(stateUpdaterEnabled); - final Properties streamsConfigProps = configProps(false, stateUpdaterEnabled, processingThreadsEnabled); + @ValueSource(booleans = {true, false}) + public void shouldRespectPollTimeInPartitionsAssignedStateWithStateUpdater(final boolean processingThreadsEnabled) { + final Properties streamsConfigProps = configProps(false, processingThreadsEnabled); final StreamsConfig config = new StreamsConfig(streamsConfigProps); final Duration pollTime = Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)); thread = setUpThread(streamsConfigProps); @@ -3361,32 +3166,18 @@ public void shouldRespectPollTimeInPartitionsAssignedStateWithStateUpdater(final } @ParameterizedTest - @MethodSource("data") - public void shouldNotBlockWhenPollingInPartitionsAssignedStateWithoutStateUpdater(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { - assumeFalse(stateUpdaterEnabled); - final Properties streamsConfigProps = configProps(false, stateUpdaterEnabled, processingThreadsEnabled); - thread = setUpThread(streamsConfigProps); - thread.setState(State.STARTING); - thread.setState(State.PARTITIONS_ASSIGNED); - - runOnce(processingThreadsEnabled); - - Mockito.verify(mainConsumer).poll(Duration.ZERO); - } - - @ParameterizedTest - @MethodSource("data") - public void shouldGetMainAndRestoreConsumerInstanceId(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) throws Exception { - getClientInstanceId(false, stateUpdaterEnabled, processingThreadsEnabled); + @ValueSource(booleans = {true, false}) + public void shouldGetMainAndRestoreConsumerInstanceId(final boolean processingThreadsEnabled) throws Exception { + getClientInstanceId(false, processingThreadsEnabled); } @ParameterizedTest - @MethodSource("data") - public void shouldGetMainAndRestoreConsumerInstanceIdWithInternalTimeout(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) throws Exception { - getClientInstanceId(true, stateUpdaterEnabled, processingThreadsEnabled); + @ValueSource(booleans = {true, false}) + public void shouldGetMainAndRestoreConsumerInstanceIdWithInternalTimeout(final boolean processingThreadsEnabled) throws Exception { + getClientInstanceId(true, processingThreadsEnabled); } - private void getClientInstanceId(final boolean injectTimeException, final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) throws Exception { + private void getClientInstanceId(final boolean injectTimeException, final boolean processingThreadsEnabled) throws Exception { final Uuid consumerInstanceId = Uuid.randomUuid(); clientSupplier.consumer.setClientInstanceId(consumerInstanceId); if (injectTimeException) { @@ -3405,7 +3196,7 @@ private void getClientInstanceId(final boolean injectTimeException, final boolea } clientSupplier.prepareProducer(producer); - thread = createStreamThread("clientId", stateUpdaterEnabled, processingThreadsEnabled); + thread = createStreamThread("clientId", processingThreadsEnabled); thread.setState(State.STARTING); thread.taskManager().init(); @@ -3428,9 +3219,9 @@ private void getClientInstanceId(final boolean injectTimeException, final boolea } @ParameterizedTest - @MethodSource("data") - public void shouldReturnErrorIfMainConsumerInstanceIdNotInitialized(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { - thread = createStreamThread("clientId", stateUpdaterEnabled, processingThreadsEnabled); + @ValueSource(booleans = {true, false}) + public void shouldReturnErrorIfMainConsumerInstanceIdNotInitialized(final boolean processingThreadsEnabled) { + thread = createStreamThread("clientId", processingThreadsEnabled); thread.setState(State.STARTING); thread.taskManager().init(); @@ -3445,9 +3236,9 @@ public void shouldReturnErrorIfMainConsumerInstanceIdNotInitialized(final boolea } @ParameterizedTest - @MethodSource("data") - public void shouldReturnErrorIfRestoreConsumerInstanceIdNotInitialized(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { - thread = createStreamThread("clientId", stateUpdaterEnabled, processingThreadsEnabled); + @ValueSource(booleans = {true, false}) + public void shouldReturnErrorIfRestoreConsumerInstanceIdNotInitialized(final boolean processingThreadsEnabled) { + thread = createStreamThread("clientId", processingThreadsEnabled); thread.setState(State.STARTING); thread.taskManager().init(); @@ -3462,9 +3253,9 @@ public void shouldReturnErrorIfRestoreConsumerInstanceIdNotInitialized(final boo } @ParameterizedTest - @MethodSource("data") - public void shouldReturnErrorIfProducerInstanceIdNotInitialized(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { - thread = createStreamThread("clientId", stateUpdaterEnabled, processingThreadsEnabled); + @ValueSource(booleans = {true, false}) + public void shouldReturnErrorIfProducerInstanceIdNotInitialized(final boolean processingThreadsEnabled) { + thread = createStreamThread("clientId", processingThreadsEnabled); thread.setState(State.STARTING); thread.taskManager().init(); @@ -3479,10 +3270,10 @@ public void shouldReturnErrorIfProducerInstanceIdNotInitialized(final boolean st } @ParameterizedTest - @MethodSource("data") - public void shouldReturnNullIfMainConsumerTelemetryDisabled(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) throws Exception { + @ValueSource(booleans = {true, false}) + public void shouldReturnNullIfMainConsumerTelemetryDisabled(final boolean processingThreadsEnabled) throws Exception { clientSupplier.consumer.disableTelemetry(); - thread = createStreamThread("clientId", stateUpdaterEnabled, processingThreadsEnabled); + thread = createStreamThread("clientId", processingThreadsEnabled); thread.setState(State.STARTING); thread.taskManager().init(); @@ -3496,11 +3287,11 @@ public void shouldReturnNullIfMainConsumerTelemetryDisabled(final boolean stateU } @ParameterizedTest - @MethodSource("data") - public void shouldReturnNullIfRestoreConsumerTelemetryDisabled(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) throws Exception { + @ValueSource(booleans = {true, false}) + public void shouldReturnNullIfRestoreConsumerTelemetryDisabled(final boolean processingThreadsEnabled) throws Exception { clientSupplier.restoreConsumer.disableTelemetry(); - thread = createStreamThread("clientId", stateUpdaterEnabled, processingThreadsEnabled); + thread = createStreamThread("clientId", processingThreadsEnabled); thread.setState(State.STARTING); thread.taskManager().init(); @@ -3514,13 +3305,13 @@ public void shouldReturnNullIfRestoreConsumerTelemetryDisabled(final boolean sta } @ParameterizedTest - @MethodSource("data") - public void shouldReturnNullIfProducerTelemetryDisabled(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) throws Exception { + @ValueSource(booleans = {true, false}) + public void shouldReturnNullIfProducerTelemetryDisabled(final boolean processingThreadsEnabled) throws Exception { final MockProducer producer = new MockProducer<>(); producer.disableTelemetry(); clientSupplier.prepareProducer(producer); - thread = createStreamThread("clientId", stateUpdaterEnabled, processingThreadsEnabled); + thread = createStreamThread("clientId", processingThreadsEnabled); thread.setState(State.STARTING); thread.taskManager().init(); @@ -3534,11 +3325,11 @@ public void shouldReturnNullIfProducerTelemetryDisabled(final boolean stateUpdat } @ParameterizedTest - @MethodSource("data") - public void shouldTimeOutOnMainConsumerInstanceId(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { + @ValueSource(booleans = {true, false}) + public void shouldTimeOutOnMainConsumerInstanceId(final boolean processingThreadsEnabled) { clientSupplier.consumer.setClientInstanceId(Uuid.randomUuid()); clientSupplier.consumer.injectTimeoutException(-1); - thread = createStreamThread("clientId", stateUpdaterEnabled, processingThreadsEnabled); + thread = createStreamThread("clientId", processingThreadsEnabled); thread.setState(State.STARTING); thread.taskManager().init(); @@ -3559,11 +3350,11 @@ public void shouldTimeOutOnMainConsumerInstanceId(final boolean stateUpdaterEnab @ParameterizedTest - @MethodSource("data") - public void shouldTimeOutOnRestoreConsumerInstanceId(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { + @ValueSource(booleans = {true, false}) + public void shouldTimeOutOnRestoreConsumerInstanceId(final boolean processingThreadsEnabled) { clientSupplier.restoreConsumer.setClientInstanceId(Uuid.randomUuid()); clientSupplier.restoreConsumer.injectTimeoutException(-1); - thread = createStreamThread("clientId", stateUpdaterEnabled, processingThreadsEnabled); + thread = createStreamThread("clientId", processingThreadsEnabled); thread.setState(State.STARTING); thread.taskManager().init(); @@ -3584,14 +3375,14 @@ public void shouldTimeOutOnRestoreConsumerInstanceId(final boolean stateUpdaterE } @ParameterizedTest - @MethodSource("data") - public void shouldTimeOutOnProducerInstanceId(final boolean stateUpdaterEnabled, final boolean processingThreadsEnabled) { + @ValueSource(booleans = {true, false}) + public void shouldTimeOutOnProducerInstanceId(final boolean processingThreadsEnabled) { final MockProducer producer = new MockProducer<>(); producer.setClientInstanceId(Uuid.randomUuid()); producer.injectTimeoutException(-1); clientSupplier.prepareProducer(producer); - thread = createStreamThread("clientId", stateUpdaterEnabled, processingThreadsEnabled); + thread = createStreamThread("clientId", processingThreadsEnabled); thread.setState(State.STARTING); thread.taskManager().init(); @@ -3610,10 +3401,9 @@ public void shouldTimeOutOnProducerInstanceId(final boolean stateUpdaterEnabled, ); } - @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void testNamedTopologyWithStreamsProtocol(final boolean stateUpdaterEnabled) { - final Properties props = configProps(false, stateUpdaterEnabled, false); + @Test + public void testNamedTopologyWithStreamsProtocol() { + final Properties props = configProps(false, false); props.setProperty(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.toString()); final StreamsConfig config = new StreamsConfig(props); final InternalTopologyBuilder topologyBuilder = new InternalTopologyBuilder( @@ -3664,16 +3454,15 @@ public void testNamedTopologyWithStreamsProtocol(final boolean stateUpdaterEnabl @Test public void testStreamsRebalanceDataWithClassicProtocol() { - final Properties props = configProps(false, false, false); + final Properties props = configProps(false, false); props.setProperty(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.toString()); thread = createStreamThread(CLIENT_ID, new StreamsConfig(props)); assertTrue(thread.streamsRebalanceData().isEmpty()); } - @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void testStreamsRebalanceDataWithExtraCopartition(final boolean stateUpdaterEnabled) { - final Properties props = configProps(false, stateUpdaterEnabled, false); + @Test + public void testStreamsRebalanceDataWithExtraCopartition() { + final Properties props = configProps(false, false); props.setProperty(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.toString()); internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1); @@ -3724,7 +3513,7 @@ public void testStreamsRebalanceDataWithExtraCopartition(final boolean stateUpda @Test public void testStreamsRebalanceDataWithStreamsProtocol() { - final Properties props = configProps(false, false, false); + final Properties props = configProps(false, false); props.setProperty(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.toString()); props.setProperty(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:1234"); props.setProperty(StreamsConfig.REPLICATION_FACTOR_CONFIG, "1"); @@ -3832,7 +3621,7 @@ public void testStreamsProtocolRunOnceWithoutProcessingThreads() { ); final Runnable shutdownErrorHook = mock(Runnable.class); - final Properties props = configProps(false, false, false); + final Properties props = configProps(false, false); final StreamsMetadataState streamsMetadataState = new StreamsMetadataState( new TopologyMetadata(internalTopologyBuilder, new StreamsConfig(props)), StreamsMetadataState.UNKNOWN_HOST, @@ -3891,7 +3680,7 @@ public void testStreamsProtocolRunOnceWithoutProcessingThreadsMissingSourceTopic ); final Runnable shutdownErrorHook = mock(Runnable.class); - final Properties props = configProps(false, false, false); + final Properties props = configProps(false, false); final StreamsMetadataState streamsMetadataState = new StreamsMetadataState( new TopologyMetadata(internalTopologyBuilder, new StreamsConfig(props)), StreamsMetadataState.UNKNOWN_HOST, @@ -3958,7 +3747,7 @@ public void testStreamsProtocolIncorrectlyPartitionedTopics() { ); final Runnable shutdownErrorHook = mock(Runnable.class); - final Properties props = configProps(false, false, false); + final Properties props = configProps(false, false); final StreamsMetadataState streamsMetadataState = new StreamsMetadataState( new TopologyMetadata(internalTopologyBuilder, new StreamsConfig(props)), StreamsMetadataState.UNKNOWN_HOST, @@ -4018,7 +3807,7 @@ public void testStreamsProtocolRunOnceWithProcessingThreads() { Map.of() ); - final Properties props = configProps(false, false, false); + final Properties props = configProps(false, false); final Runnable shutdownErrorHook = mock(Runnable.class); final StreamsConfig config = new StreamsConfig(props); final StreamsMetadataState streamsMetadataState = new StreamsMetadataState( @@ -4077,7 +3866,7 @@ public void testStreamsProtocolRunOnceWithProcessingThreadsMissingSourceTopic() Map.of() ); - final Properties props = configProps(false, false, false); + final Properties props = configProps(false, false); final Runnable shutdownErrorHook = mock(Runnable.class); final StreamsConfig config = new StreamsConfig(props); final StreamsMetadataState streamsMetadataState = new StreamsMetadataState( @@ -4144,7 +3933,7 @@ public void testStreamsProtocolMissingSourceTopicRecovery() { Map.of() ); - final Properties props = configProps(false, false, false); + final Properties props = configProps(false, false); final Runnable shutdownErrorHook = mock(Runnable.class); final StreamsConfig config = new StreamsConfig(props); final StreamsMetadataState streamsMetadataState = new StreamsMetadataState( @@ -4315,10 +4104,8 @@ private Collection createStandbyTask(final StreamsConfig config) { config, streamsMetrics, stateDirectory, - new MockChangelogReader(), CLIENT_ID, - logContext, - false); + logContext); return standbyTaskCreator.createTasks(singletonMap(new TaskId(1, 2), emptySet())); } @@ -4344,25 +4131,6 @@ private void addRecord(final MockConsumer mockConsumer, Optional.empty())); } - StreamTask activeTask(final TaskManager taskManager, final TopicPartition partition) { - final Stream standbys = taskManager.allTasks().values().stream().filter(Task::isActive); - for (final Task task : (Iterable) standbys::iterator) { - if (task.inputPartitions().contains(partition)) { - return (StreamTask) task; - } - } - return null; - } - StandbyTask standbyTask(final TaskManager taskManager, final TopicPartition partition) { - final Stream standbys = taskManager.standbyTaskMap().values().stream(); - for (final Task task : (Iterable) standbys::iterator) { - if (task.inputPartitions().contains(partition)) { - return (StandbyTask) task; - } - } - return null; - } - private StreamThread buildStreamThread(final Consumer consumer, final TaskManager taskManager, final StreamsConfig config, diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java index 212a65eacddf9..34d8d27977820 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java @@ -48,7 +48,6 @@ import org.apache.kafka.streams.processor.internals.RecordCollector; import org.apache.kafka.streams.processor.internals.RecordCollectorImpl; import org.apache.kafka.streams.processor.internals.StateDirectory; -import org.apache.kafka.streams.processor.internals.StoreChangelogReader; import org.apache.kafka.streams.processor.internals.StreamTask; import org.apache.kafka.streams.processor.internals.StreamThread; import org.apache.kafka.streams.processor.internals.StreamsProducer; @@ -63,8 +62,6 @@ import org.apache.kafka.streams.state.TimestampedWindowStore; import org.apache.kafka.streams.state.ValueAndTimestamp; import org.apache.kafka.test.MockApiProcessorSupplier; -import org.apache.kafka.test.MockStandbyUpdateListener; -import org.apache.kafka.test.MockStateRestoreListener; import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.AfterEach; @@ -437,17 +434,8 @@ private StreamTask createStreamsTask(final StreamsConfig streamsConfig, StreamsConfigUtils.eosEnabled(streamsConfig), logContext, stateDirectory, - new StoreChangelogReader( - new MockTime(), - streamsConfig, - logContext, - adminClient, - restoreConsumer, - new MockStateRestoreListener(), - new MockStandbyUpdateListener()), topology.storeToChangelogTopic(), - partitions, - false); + partitions); final RecordCollector recordCollector = new RecordCollectorImpl( logContext, taskId, diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java index 81c90d043cec4..ff539380b5d53 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java @@ -473,10 +473,8 @@ private void setupTask(final StreamsConfig streamsConfig, StreamsConfig.EXACTLY_ONCE_V2.equals(streamsConfig.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)), logContext, stateDirectory, - new MockChangelogRegister(), processorTopology.storeToChangelogTopic(), - new HashSet<>(partitionsByInputTopic.values()), - false); + new HashSet<>(partitionsByInputTopic.values())); final RecordCollector recordCollector = new RecordCollectorImpl( logContext, TASK_ID,