From 6fd7862920439f4bf9c0e650834faab81e7cecf6 Mon Sep 17 00:00:00 2001 From: m1a2st Date: Sat, 6 Sep 2025 09:56:37 +0800 Subject: [PATCH 1/6] followup Jun's comments --- docs/upgrade.html | 4 +++- .../org/apache/kafka/storage/internals/log/UnifiedLog.java | 4 ++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/docs/upgrade.html b/docs/upgrade.html index c7af07e0411ab..bdf5c83530598 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -136,9 +136,11 @@
Notable changes in 4 settings.
  • - The cleanup.policy is empty and remote.storage.enable is set to true, the + If cleanup.policy is empty and remote.storage.enable is set to true, the local log segments will be cleaned based on the values of log.local.retention.bytes and log.local.retention.ms. +
    + Note that cleanup.policy supports empty values, which means infinite retention.
  • diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java index 61b4b9d0edb2f..b6f0a685b4c45 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java @@ -1925,8 +1925,8 @@ public int deleteOldSegments() throws IOException { deleteRetentionSizeBreachedSegments() + deleteRetentionMsBreachedSegments(); } else { - // If cleanup.policy is empty and remote storage is disabled, we should not delete any local - // log segments + // If cleanup.policy is empty and remote storage is disabled, we should not delete any local log segments + // unless the log start offset advances through deleteRecords return deleteLogStartOffsetBreachedSegments(); } } From 8a8a0d6f1256f0c2b3695042719017df7bfaa4d5 Mon Sep 17 00:00:00 2001 From: m1a2st Date: Sun, 7 Sep 2025 00:05:43 +0800 Subject: [PATCH 2/6] addressed by comments --- docs/upgrade.html | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/upgrade.html b/docs/upgrade.html index bdf5c83530598..6a7f86afebd00 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -141,6 +141,7 @@
    Notable changes in 4 log.local.retention.ms.
    Note that cleanup.policy supports empty values, which means infinite retention. + This use case is equivalent to setting retention.ms=-1 and retention.bytes=-1. From 795f1cdf78b7dac336304b8d3fa623e57447b90c Mon Sep 17 00:00:00 2001 From: m1a2st Date: Sun, 7 Sep 2025 00:07:22 +0800 Subject: [PATCH 3/6] update the grammar --- docs/upgrade.html | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/upgrade.html b/docs/upgrade.html index 6a7f86afebd00..0b596a711bc33 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -141,7 +141,7 @@
    Notable changes in 4 log.local.retention.ms.
    Note that cleanup.policy supports empty values, which means infinite retention. - This use case is equivalent to setting retention.ms=-1 and retention.bytes=-1. + This is equivalent to setting retention.ms=-1 and retention.bytes=-1. From 10a89bf30d3cfdced89eb1c295eae3e28d4820cb Mon Sep 17 00:00:00 2001 From: m1a2st Date: Mon, 8 Sep 2025 21:25:25 +0800 Subject: [PATCH 4/6] update the logic to allow empty cleanup.policy --- core/src/main/scala/kafka/cluster/Partition.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 3b45a08b0673d..44b28a1f07e4a 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -1658,7 +1658,7 @@ class Partition(val topicPartition: TopicPartition, def deleteRecordsOnLeader(offset: Long): LogDeleteRecordsResult = inReadLock(leaderIsrUpdateLock) { leaderLogIfLocal match { case Some(leaderLog) => - if (!leaderLog.config.delete) + if (!leaderLog.config.delete && leaderLog.config.compact) throw new PolicyViolationException(s"Records of partition $topicPartition can not be deleted due to the configured policy") val convertedOffset = if (offset == DeleteRecordsRequest.HIGH_WATERMARK) From 9a76f9fd3def82f8d1bfe9d5b9d1e9d39e927aff Mon Sep 17 00:00:00 2001 From: m1a2st Date: Mon, 8 Sep 2025 23:43:23 +0800 Subject: [PATCH 5/6] add unit test for new logic --- .../unit/kafka/cluster/PartitionTest.scala | 47 ++++++++++++++++++- 1 file changed, 45 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala index 8e512ad4d0128..95dfc4c006891 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala @@ -21,7 +21,7 @@ import com.yammer.metrics.core.Metric import kafka.log.LogManager import kafka.server._ import kafka.utils._ -import org.apache.kafka.common.errors.{ApiException, FencedLeaderEpochException, InconsistentTopicIdException, InvalidTxnStateException, NotLeaderOrFollowerException, OffsetNotAvailableException, OffsetOutOfRangeException, UnknownLeaderEpochException} +import org.apache.kafka.common.errors.{ApiException, FencedLeaderEpochException, InconsistentTopicIdException, InvalidTxnStateException, NotLeaderOrFollowerException, OffsetNotAvailableException, OffsetOutOfRangeException, PolicyViolationException, UnknownLeaderEpochException} import org.apache.kafka.common.message.{AlterPartitionResponseData, FetchResponseData} import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.record.FileRecords.TimestampAndOffset @@ -61,7 +61,7 @@ import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, Unexpec import org.apache.kafka.server.util.{KafkaScheduler, MockTime} import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache -import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, EpochEntry, LocalLog, LogAppendInfo, LogDirFailureChannel, LogLoader, LogOffsetMetadata, LogOffsetsListener, LogReadInfo, LogSegments, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig, UnifiedLog, VerificationGuard} +import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, EpochEntry, LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, LogLoader, LogOffsetMetadata, LogOffsetsListener, LogReadInfo, LogSegments, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig, UnifiedLog, VerificationGuard} import org.apache.kafka.storage.log.metrics.BrokerTopicStats import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource @@ -4030,4 +4030,47 @@ class PartitionTest extends AbstractPartitionTest { alterPartitionManager) partition.tryCompleteDelayedRequests() } + + @Test + def testDeleteRecordsOnLeaderWithEmptyPolicy(): Unit = { + val leaderEpoch = 5 + val partition = setupPartitionWithMocks(leaderEpoch, isLeader = true) + + val emptyPolicyConfig = new LogConfig(util.Map.of( + TopicConfig.CLEANUP_POLICY_CONFIG, "" + )) + + val mockLog = mock(classOf[UnifiedLog]) + when(mockLog.config).thenReturn(emptyPolicyConfig) + when(mockLog.logEndOffset).thenReturn(2L) + when(mockLog.logStartOffset).thenReturn(0L) + when(mockLog.highWatermark).thenReturn(2L) + when(mockLog.maybeIncrementLogStartOffset(any(), any())).thenReturn(true) + + partition.setLog(mockLog, false) + + val result = partition.deleteRecordsOnLeader(1L) + assertEquals(1L, result.requestedOffset) + } + + @Test + def testDeleteRecordsOnLeaderWithCompactPolicy(): Unit = { + val leaderEpoch = 5 + val partition = setupPartitionWithMocks(leaderEpoch, isLeader = true) + + val emptyPolicyConfig = new LogConfig(util.Map.of( + TopicConfig.CLEANUP_POLICY_CONFIG, "compact" + )) + + val mockLog = mock(classOf[UnifiedLog]) + when(mockLog.config).thenReturn(emptyPolicyConfig) + when(mockLog.logEndOffset).thenReturn(2L) + when(mockLog.logStartOffset).thenReturn(0L) + when(mockLog.highWatermark).thenReturn(2L) + when(mockLog.maybeIncrementLogStartOffset(any(), any())).thenReturn(true) + + partition.setLog(mockLog, false) + + assertThrows(classOf[PolicyViolationException], () => partition.deleteRecordsOnLeader(1L)) + } } From 2e92ab5cfafe7e63b8625fbb5480d02c31c6ef5f Mon Sep 17 00:00:00 2001 From: m1a2st Date: Mon, 8 Sep 2025 23:44:22 +0800 Subject: [PATCH 6/6] add unit test for new logic --- core/src/test/scala/unit/kafka/cluster/PartitionTest.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala index 95dfc4c006891..5662c2d227636 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala @@ -4070,7 +4070,6 @@ class PartitionTest extends AbstractPartitionTest { when(mockLog.maybeIncrementLogStartOffset(any(), any())).thenReturn(true) partition.setLog(mockLog, false) - assertThrows(classOf[PolicyViolationException], () => partition.deleteRecordsOnLeader(1L)) } }