Skip to content

Commit 5bbc421

Browse files
MINOR: update TransactionLog#readTxnRecordValue to initialize TransactionMetadata with non-empty topic partitions (#20370)
This is followup PR for #19699. * Update TransactionLog#readTxnRecordValue to initialize TransactionMetadata with non-empty topic partitions * Update `TxnTransitMetadata` comment, because it's not immutable. Reviewers: TengYao Chi <[email protected]>, Justine Olshan <[email protected]>, Kuan-Po Tseng <[email protected]>, Chia-Ping Tsai <[email protected]>
1 parent b2c1a0f commit 5bbc421

File tree

3 files changed

+16
-14
lines changed

3 files changed

+16
-14
lines changed

core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -115,28 +115,25 @@ object TransactionLog {
115115
val version = buffer.getShort
116116
if (version >= TransactionLogValue.LOWEST_SUPPORTED_VERSION && version <= TransactionLogValue.HIGHEST_SUPPORTED_VERSION) {
117117
val value = new TransactionLogValue(new ByteBufferAccessor(buffer), version)
118-
val transactionMetadata = new TransactionMetadata(
118+
val state = TransactionState.fromId(value.transactionStatus)
119+
val tps: util.Set[TopicPartition] = new util.HashSet[TopicPartition]()
120+
if (!state.equals(TransactionState.EMPTY))
121+
value.transactionPartitions.forEach(partitionsSchema => {
122+
partitionsSchema.partitionIds.forEach(partitionId => tps.add(new TopicPartition(partitionsSchema.topic, partitionId.intValue())))
123+
})
124+
Some(new TransactionMetadata(
119125
transactionalId,
120126
value.producerId,
121127
value.previousProducerId,
122128
value.nextProducerId,
123129
value.producerEpoch,
124130
RecordBatch.NO_PRODUCER_EPOCH,
125131
value.transactionTimeoutMs,
126-
TransactionState.fromId(value.transactionStatus),
127-
util.Set.of(),
132+
state,
133+
tps,
128134
value.transactionStartTimestampMs,
129135
value.transactionLastUpdateTimestampMs,
130-
TransactionVersion.fromFeatureLevel(value.clientTransactionVersion))
131-
132-
if (!transactionMetadata.state.equals(TransactionState.EMPTY))
133-
value.transactionPartitions.forEach(partitionsSchema => {
134-
transactionMetadata.addPartitions(partitionsSchema.partitionIds
135-
.stream
136-
.map(partitionId => new TopicPartition(partitionsSchema.topic, partitionId.intValue()))
137-
.toList)
138-
})
139-
Some(transactionMetadata)
136+
TransactionVersion.fromFeatureLevel(value.clientTransactionVersion)))
140137
} else throw new IllegalStateException(s"Unknown version $version from the transaction log message value")
141138
}
142139
}

transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionMetadata.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ public <T> T inLock(Supplier<T> function) {
117117
}
118118
}
119119

120+
// VisibleForTesting
120121
public void addPartitions(Collection<TopicPartition> partitions) {
121122
topicPartitions.addAll(partitions);
122123
}
@@ -500,13 +501,15 @@ public String transactionalId() {
500501
return transactionalId;
501502
}
502503

504+
// VisibleForTesting
503505
public void setProducerId(long producerId) {
504506
this.producerId = producerId;
505507
}
506508
public long producerId() {
507509
return producerId;
508510
}
509511

512+
// VisibleForTesting
510513
public void setPrevProducerId(long prevProducerId) {
511514
this.prevProducerId = prevProducerId;
512515
}
@@ -534,6 +537,7 @@ public int txnTimeoutMs() {
534537
return txnTimeoutMs;
535538
}
536539

540+
// VisibleForTesting
537541
public void state(TransactionState state) {
538542
this.state = state;
539543
}
@@ -550,6 +554,7 @@ public long txnStartTimestamp() {
550554
return txnStartTimestamp;
551555
}
552556

557+
// VisibleForTesting
553558
public void txnLastUpdateTimestamp(long txnLastUpdateTimestamp) {
554559
this.txnLastUpdateTimestamp = txnLastUpdateTimestamp;
555560
}

transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TxnTransitMetadata.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import java.util.HashSet;
2323

2424
/**
25-
* Immutable object representing the target transition of the transaction metadata
25+
* Represent the target transition of the transaction metadata. The topicPartitions field is mutable.
2626
*/
2727
public record TxnTransitMetadata(
2828
long producerId,

0 commit comments

Comments
 (0)