Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ class ActiveTaskCreator {
private final StreamsConfig applicationConfig;
private final StreamsMetricsImpl streamsMetrics;
private final StateDirectory stateDirectory;
private final ChangelogReader storeChangelogReader;
private final ThreadCache cache;
private final Time time;
private final KafkaClientSupplier clientSupplier;
Expand All @@ -62,37 +61,32 @@ class ActiveTaskCreator {
private final Logger log;
private final Sensor createTaskSensor;
private final StreamsProducer streamsProducer;
private final boolean stateUpdaterEnabled;
private final boolean processingThreadsEnabled;
private boolean isClosed = false;

ActiveTaskCreator(final TopologyMetadata topologyMetadata,
final StreamsConfig applicationConfig,
final StreamsMetricsImpl streamsMetrics,
final StateDirectory stateDirectory,
final ChangelogReader storeChangelogReader,
final ThreadCache cache,
final Time time,
final KafkaClientSupplier clientSupplier,
final String threadId,
final int threadIdx,
final UUID processId,
final LogContext logContext,
final boolean stateUpdaterEnabled,
final boolean processingThreadsEnabled) {
this.topologyMetadata = topologyMetadata;
this.applicationConfig = applicationConfig;
this.streamsMetrics = streamsMetrics;
this.stateDirectory = stateDirectory;
this.storeChangelogReader = storeChangelogReader;
this.cache = cache;
this.time = time;
this.clientSupplier = clientSupplier;
this.threadId = threadId;
this.threadIdx = threadIdx;
this.processId = processId;
this.log = logContext.logger(getClass());
this.stateUpdaterEnabled = stateUpdaterEnabled;
this.processingThreadsEnabled = processingThreadsEnabled;

createTaskSensor = ThreadMetrics.createTaskSensor(threadId, streamsMetrics);
Expand Down Expand Up @@ -154,10 +148,8 @@ public Collection<Task> createTasks(final Consumer<byte[], byte[]> consumer,
eosEnabled(applicationConfig),
logContext,
stateDirectory,
storeChangelogReader,
topology.storeToChangelogTopic(),
partitions,
stateUpdaterEnabled);
partitions);

final InternalProcessorContext<Object, Object> context = new ProcessorContextImpl(
taskId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -170,7 +169,6 @@ public String toString() {

private final TaskId taskId;
private final boolean eosEnabled;
private ChangelogRegister changelogReader;
private final Collection<TopicPartition> sourcePartitions;
private final Map<String, String> storeToChangelogTopic;

Expand All @@ -180,7 +178,6 @@ public String toString() {

private final File baseDir;
private final OffsetCheckpoint checkpointFile;
private final boolean stateUpdaterEnabled;

private TaskType taskType;
private Logger log;
Expand All @@ -202,19 +199,15 @@ public ProcessorStateManager(final TaskId taskId,
final boolean eosEnabled,
final LogContext logContext,
final StateDirectory stateDirectory,
final ChangelogRegister changelogReader,
final Map<String, String> storeToChangelogTopic,
final Collection<TopicPartition> sourcePartitions,
final boolean stateUpdaterEnabled) throws ProcessorStateException {
final Collection<TopicPartition> sourcePartitions) throws ProcessorStateException {
this.storeToChangelogTopic = storeToChangelogTopic;
this.log = logContext.logger(ProcessorStateManager.class);
this.logPrefix = logContext.logPrefix();
this.taskId = taskId;
this.taskType = taskType;
this.eosEnabled = eosEnabled;
this.changelogReader = changelogReader;
this.sourcePartitions = sourcePartitions;
this.stateUpdaterEnabled = stateUpdaterEnabled;

this.baseDir = stateDirectory.getOrCreateDirectoryForTask(taskId);
this.checkpointFile = new OffsetCheckpoint(stateDirectory.checkpointFileFor(taskId));
Expand All @@ -225,16 +218,15 @@ public ProcessorStateManager(final TaskId taskId,
/**
* Special constructor used by {@link StateDirectory} to partially initialize startup tasks for local state, before
* they're assigned to a thread. When the task is assigned to a thread, the initialization of this StateManager is
* completed in {@link #assignToStreamThread(LogContext, ChangelogRegister, Collection)}.
* completed in {@link #assignToStreamThread(LogContext, Collection)}.
*/
static ProcessorStateManager createStartupTaskStateManager(final TaskId taskId,
final boolean eosEnabled,
final LogContext logContext,
final StateDirectory stateDirectory,
final Map<String, String> storeToChangelogTopic,
final Set<TopicPartition> sourcePartitions,
final boolean stateUpdaterEnabled) {
return new ProcessorStateManager(taskId, TaskType.STANDBY, eosEnabled, logContext, stateDirectory, null, storeToChangelogTopic, sourcePartitions, stateUpdaterEnabled);
final Set<TopicPartition> sourcePartitions) {
return new ProcessorStateManager(taskId, TaskType.STANDBY, eosEnabled, logContext, stateDirectory, storeToChangelogTopic, sourcePartitions);
}

/**
Expand All @@ -243,26 +235,17 @@ static ProcessorStateManager createStartupTaskStateManager(final TaskId taskId,
* assigned StreamThread's context.
*/
void assignToStreamThread(final LogContext logContext,
final ChangelogRegister changelogReader,
final Collection<TopicPartition> sourcePartitions) {
if (this.changelogReader != null) {
throw new IllegalStateException("Attempted to replace an existing changelogReader on a StateManager without closing it.");
}
this.sourcePartitions.clear();
this.log = logContext.logger(ProcessorStateManager.class);
this.logPrefix = logContext.logPrefix();
this.changelogReader = changelogReader;
this.sourcePartitions.addAll(sourcePartitions);
}

void registerStateStores(final List<StateStore> allStores, final InternalProcessorContext<?, ?> processorContext) {
processorContext.uninitialize();
for (final StateStore store : allStores) {
if (stores.containsKey(store.name())) {
if (!stateUpdaterEnabled) {
maybeRegisterStoreWithChangelogReader(store.name());
}
} else {
if (!stores.containsKey(store.name())) {
store.init(processorContext, store);
}
log.trace("Registered state store {}", store.name());
Expand Down Expand Up @@ -346,22 +329,6 @@ void initializeStoreOffsetsFromCheckpoint(final boolean storeDirIsEmpty) {
}
}

private void maybeRegisterStoreWithChangelogReader(final String storeName) {
if (isLoggingEnabled(storeName) && changelogReader != null) {
changelogReader.register(getStorePartition(storeName), this);
}
}

private List<TopicPartition> getAllChangelogTopicPartitions() {
final List<TopicPartition> allChangelogPartitions = new ArrayList<>();
for (final StateStoreMetadata storeMetadata : stores.values()) {
if (storeMetadata.changelogPartition != null) {
allChangelogPartitions.add(storeMetadata.changelogPartition);
}
}
return allChangelogPartitions;
}

@Override
public File baseDir() {
return baseDir;
Expand Down Expand Up @@ -404,10 +371,6 @@ public void registerStore(final StateStore store,
// on the state manager this state store would be closed as well
stores.put(storeName, storeMetadata);

if (!stateUpdaterEnabled) {
maybeRegisterStoreWithChangelogReader(storeName);
}

log.debug("Registered state store {} to its state manager", storeName);
}

Expand Down Expand Up @@ -616,10 +579,6 @@ public void flushCache() {
public void close() throws ProcessorStateException {
log.debug("Closing its state manager and all the registered state stores: {}", stores);

if (!stateUpdaterEnabled && changelogReader != null) {
changelogReader.unregister(getAllChangelogTopicPartitions());
}

RuntimeException firstException = null;
// attempting to close the stores, just in case they
// are not closed by a ProcessorNode yet
Expand Down Expand Up @@ -664,11 +623,6 @@ else if (exception instanceof StreamsException)
void recycle() {
log.debug("Recycling state for {} task {}.", taskType, taskId);

if (!stateUpdaterEnabled && changelogReader != null) {
final List<TopicPartition> allChangelogs = getAllChangelogTopicPartitions();
changelogReader.unregister(allChangelogs);
}

// when the state manager is recycled to be used, future writes may bypass its store's caching
// layer if they are from restoration, hence we need to clear the state store's caches just in case
// See KAFKA-14172 for details
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,27 +40,21 @@ class StandbyTaskCreator {
private final StreamsConfig applicationConfig;
private final StreamsMetricsImpl streamsMetrics;
private final StateDirectory stateDirectory;
private final ChangelogReader storeChangelogReader;
private final ThreadCache dummyCache;
private final Logger log;
private final Sensor createTaskSensor;
private final boolean stateUpdaterEnabled;

StandbyTaskCreator(final TopologyMetadata topologyMetadata,
final StreamsConfig applicationConfig,
final StreamsMetricsImpl streamsMetrics,
final StateDirectory stateDirectory,
final ChangelogReader storeChangelogReader,
final String threadId,
final LogContext logContext,
final boolean stateUpdaterEnabled) {
final LogContext logContext) {
this.topologyMetadata = topologyMetadata;
this.applicationConfig = applicationConfig;
this.streamsMetrics = streamsMetrics;
this.stateDirectory = stateDirectory;
this.storeChangelogReader = storeChangelogReader;
this.log = logContext.logger(getClass());
this.stateUpdaterEnabled = stateUpdaterEnabled;

createTaskSensor = ThreadMetrics.createTaskSensor(threadId, streamsMetrics);

Expand All @@ -87,10 +81,8 @@ Collection<Task> createTasks(final Map<TaskId, Set<TopicPartition>> tasksToBeCre
eosEnabled(applicationConfig),
getLogContext(taskId),
stateDirectory,
storeChangelogReader,
topology.storeToChangelogTopic(),
partitions,
stateUpdaterEnabled);
partitions);

final InternalProcessorContext<?, ?> context = new ProcessorContextImpl(
taskId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,6 @@ public void initializeStartupTasks(final TopologyMetadata topologyMetadata,
if (hasPersistentStores && !nonEmptyTaskDirectories.isEmpty()) {
final ThreadCache dummyCache = new ThreadCache(logContext, 0, streamsMetrics);
final boolean eosEnabled = StreamsConfigUtils.eosEnabled(config);
final boolean stateUpdaterEnabled = StreamsConfig.InternalConfig.stateUpdaterEnabled(config.originals());

// discover all non-empty task directories in StateDirectory
for (final TaskDirectory taskDirectory : nonEmptyTaskDirectories) {
Expand All @@ -226,8 +225,7 @@ public void initializeStartupTasks(final TopologyMetadata topologyMetadata,
logContext,
this,
subTopology.storeToChangelogTopic(),
inputPartitions,
stateUpdaterEnabled
inputPartitions
);

final InternalProcessorContext<Object, Object> context = new ProcessorContextImpl(
Expand Down
Loading