Skip to content

Commit 1672a4b

Browse files
authored
KAFKA-18476: KafkaStreams should swallow TransactionAbortedException (#18487)
TransactionAbortedException is a follow up error to a previous error, and such a previous error would already be handled when `producer.abortTransaction()` is called. Thus, a TransactionAbortedException can just be silently swallowed. Reviewers: Bill Bejeck <[email protected]>
1 parent faef80a commit 1672a4b

File tree

2 files changed

+32
-0
lines changed

2 files changed

+32
-0
lines changed

streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.kafka.common.errors.SecurityDisabledException;
3434
import org.apache.kafka.common.errors.SerializationException;
3535
import org.apache.kafka.common.errors.TimeoutException;
36+
import org.apache.kafka.common.errors.TransactionAbortedException;
3637
import org.apache.kafka.common.errors.UnknownServerException;
3738
import org.apache.kafka.common.header.Headers;
3839
import org.apache.kafka.common.header.internals.RecordHeaders;
@@ -441,6 +442,11 @@ private void recordSendError(final String topic,
441442
errorMessage += "\nWritten offsets would not be recorded and no more records would be sent since the producer is fenced, " +
442443
"indicating the task may be migrated out";
443444
sendException.set(new TaskMigratedException(errorMessage, productionException));
445+
} else if (productionException instanceof TransactionAbortedException) {
446+
// swallow silently
447+
//
448+
// TransactionAbortedException is only thrown after `abortTransaction()` was called,
449+
// so it's only a followup error, and Kafka Streams is already handling the original error
444450
} else {
445451
final ProductionExceptionHandlerResponse response;
446452
try {

streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.kafka.common.errors.ProducerFencedException;
3434
import org.apache.kafka.common.errors.SerializationException;
3535
import org.apache.kafka.common.errors.TimeoutException;
36+
import org.apache.kafka.common.errors.TransactionAbortedException;
3637
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
3738
import org.apache.kafka.common.header.Header;
3839
import org.apache.kafka.common.header.Headers;
@@ -1806,6 +1807,31 @@ public void shouldNotCallProductionExceptionHandlerOnClassCastException() {
18061807
}
18071808
}
18081809

1810+
@Test
1811+
public void shouldSwallowTransactionAbortedExceptionAndNotCallProductionExceptionHandler() {
1812+
final MockProducer<byte[], byte[]> mockProducer = new MockProducer<>(
1813+
cluster,
1814+
false,
1815+
new org.apache.kafka.clients.producer.RoundRobinPartitioner(),
1816+
new ByteArraySerializer(),
1817+
new ByteArraySerializer()
1818+
);
1819+
streamsProducer = new StreamsProducer(
1820+
mockProducer,
1821+
EXACTLY_ONCE_V2,
1822+
Time.SYSTEM,
1823+
logContext
1824+
);
1825+
1826+
final RecordCollector collector = newRecordCollector(new ProductionExceptionHandlerMock());
1827+
collector.initialize();
1828+
1829+
collector.send(topic, "key", "val", null, 0, null, stringSerializer, stringSerializer, sinkNodeName, context);
1830+
mockProducer.errorNext(new TransactionAbortedException()); // error out the send() call
1831+
1832+
collector.flush(); // need to call flush() to check for internal exceptions
1833+
}
1834+
18091835
@Test
18101836
public void shouldNotSendIfSendOfOtherTaskFailedInCallback() {
18111837
final TaskId taskId1 = new TaskId(0, 0);

0 commit comments

Comments
 (0)