Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/cluster/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1658,7 +1658,7 @@ class Partition(val topicPartition: TopicPartition,
def deleteRecordsOnLeader(offset: Long): LogDeleteRecordsResult = inReadLock(leaderIsrUpdateLock) {
leaderLogIfLocal match {
case Some(leaderLog) =>
if (!leaderLog.config.delete)
if (!leaderLog.config.delete && leaderLog.config.compact)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@junrao WDYT? We have not discussed the support of DeleteRecordsRequest in the case where cleanup.policy is empty.

throw new PolicyViolationException(s"Records of partition $topicPartition can not be deleted due to the configured policy")

val convertedOffset = if (offset == DeleteRecordsRequest.HIGH_WATERMARK)
Expand Down
46 changes: 44 additions & 2 deletions core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import com.yammer.metrics.core.Metric
import kafka.log.LogManager
import kafka.server._
import kafka.utils._
import org.apache.kafka.common.errors.{ApiException, FencedLeaderEpochException, InconsistentTopicIdException, InvalidTxnStateException, NotLeaderOrFollowerException, OffsetNotAvailableException, OffsetOutOfRangeException, UnknownLeaderEpochException}
import org.apache.kafka.common.errors.{ApiException, FencedLeaderEpochException, InconsistentTopicIdException, InvalidTxnStateException, NotLeaderOrFollowerException, OffsetNotAvailableException, OffsetOutOfRangeException, PolicyViolationException, UnknownLeaderEpochException}
import org.apache.kafka.common.message.{AlterPartitionResponseData, FetchResponseData}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
Expand Down Expand Up @@ -61,7 +61,7 @@ import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, Unexpec
import org.apache.kafka.server.util.{KafkaScheduler, MockTime}
import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, EpochEntry, LocalLog, LogAppendInfo, LogDirFailureChannel, LogLoader, LogOffsetMetadata, LogOffsetsListener, LogReadInfo, LogSegments, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig, UnifiedLog, VerificationGuard}
import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, EpochEntry, LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, LogLoader, LogOffsetMetadata, LogOffsetsListener, LogReadInfo, LogSegments, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig, UnifiedLog, VerificationGuard}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
Expand Down Expand Up @@ -4030,4 +4030,46 @@ class PartitionTest extends AbstractPartitionTest {
alterPartitionManager)
partition.tryCompleteDelayedRequests()
}

@Test
def testDeleteRecordsOnLeaderWithEmptyPolicy(): Unit = {
val leaderEpoch = 5
val partition = setupPartitionWithMocks(leaderEpoch, isLeader = true)

val emptyPolicyConfig = new LogConfig(util.Map.of(
TopicConfig.CLEANUP_POLICY_CONFIG, ""
))

val mockLog = mock(classOf[UnifiedLog])
when(mockLog.config).thenReturn(emptyPolicyConfig)
when(mockLog.logEndOffset).thenReturn(2L)
when(mockLog.logStartOffset).thenReturn(0L)
when(mockLog.highWatermark).thenReturn(2L)
when(mockLog.maybeIncrementLogStartOffset(any(), any())).thenReturn(true)

partition.setLog(mockLog, false)

val result = partition.deleteRecordsOnLeader(1L)
assertEquals(1L, result.requestedOffset)
}

@Test
def testDeleteRecordsOnLeaderWithCompactPolicy(): Unit = {
val leaderEpoch = 5
val partition = setupPartitionWithMocks(leaderEpoch, isLeader = true)

val emptyPolicyConfig = new LogConfig(util.Map.of(
TopicConfig.CLEANUP_POLICY_CONFIG, "compact"
))

val mockLog = mock(classOf[UnifiedLog])
when(mockLog.config).thenReturn(emptyPolicyConfig)
when(mockLog.logEndOffset).thenReturn(2L)
when(mockLog.logStartOffset).thenReturn(0L)
when(mockLog.highWatermark).thenReturn(2L)
when(mockLog.maybeIncrementLogStartOffset(any(), any())).thenReturn(true)

partition.setLog(mockLog, false)
assertThrows(classOf[PolicyViolationException], () => partition.deleteRecordsOnLeader(1L))
}
}
5 changes: 4 additions & 1 deletion docs/upgrade.html
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,12 @@ <h5><a id="upgrade_420_notable" href="#upgrade_420_notable">Notable changes in 4
settings.
</li>
<li>
The <code>cleanup.policy</code> is empty and <code>remote.storage.enable</code> is set to true, the
If <code>cleanup.policy</code> is empty and <code>remote.storage.enable</code> is set to true, the
local log segments will be cleaned based on the values of <code>log.local.retention.bytes</code> and
<code>log.local.retention.ms</code>.
<br>
Note that <code>cleanup.policy</code> supports empty values, which means infinite retention.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could remind users that this use case is equivalent to setting retention.ms=-1 and retention.bytes=-1

This is equivalent to setting <code>retention.ms=-1</code> and <code>retention.bytes=-1</code>.
</li>
</ul>
</li>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1925,8 +1925,8 @@ public int deleteOldSegments() throws IOException {
deleteRetentionSizeBreachedSegments() +
deleteRetentionMsBreachedSegments();
} else {
// If cleanup.policy is empty and remote storage is disabled, we should not delete any local
// log segments
// If cleanup.policy is empty and remote storage is disabled, we should not delete any local log segments
// unless the log start offset advances through deleteRecords
return deleteLogStartOffsetBreachedSegments();
}
}
Expand Down