Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -131,28 +131,6 @@ public void commit() throws InterruptedException {
@Override
public abstract void stop();

/**
* <p>
* Commit an individual {@link SourceRecord} when the callback from the producer client is received. This method is
* also called when a record is filtered by a transformation, and thus will never be ACK'd by a broker.
* <p>
* This is an alias for {@link #commitRecord(SourceRecord, RecordMetadata)} for backwards compatibility. The default
* implementation of {@link #commitRecord(SourceRecord, RecordMetadata)} just calls this method. It is not necessary
* to override both methods.
* <p>
* SourceTasks are not required to implement this functionality; Kafka Connect will record offsets
* automatically. This hook is provided for systems that also need to store offsets internally
* in their own system.
*
* @param record {@link SourceRecord} that was successfully sent via the producer or filtered by a transformation
* @throws InterruptedException
* @deprecated Use {@link #commitRecord(SourceRecord, RecordMetadata)} instead.
*/
@Deprecated
public void commitRecord(SourceRecord record) throws InterruptedException {
// This space intentionally left blank.
}

/**
* <p>
* Commit an individual {@link SourceRecord} when the callback from the producer client is received. This method is
Expand All @@ -164,16 +142,14 @@ public void commitRecord(SourceRecord record) throws InterruptedException {
* automatically. This hook is provided for systems that also need to store offsets internally
* in their own system.
* <p>
* The default implementation just calls {@link #commitRecord(SourceRecord)}, which is a nop by default. It is
* not necessary to implement both methods.
* The default implementation is a nop. It is not necessary to implement the method.
*
* @param record {@link SourceRecord} that was successfully sent via the producer, filtered by a transformation, or dropped on producer exception
* @param metadata {@link RecordMetadata} record metadata returned from the broker, or null if the record was filtered or if producer exceptions are ignored
* @throws InterruptedException
*/
public void commitRecord(SourceRecord record, RecordMetadata metadata)
throws InterruptedException {
// by default, just call other method for backwards compatibility
commitRecord(record);
// by default, just do nothing
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -736,16 +736,13 @@ public void commit() throws InterruptedException {
super.commit();
}

@Override
@SuppressWarnings("deprecation")
public void commitRecord(SourceRecord record) throws InterruptedException {
block.maybeBlockOn(SOURCE_TASK_COMMIT_RECORD);
super.commitRecord(record);
}

@Override
public void commitRecord(SourceRecord record, RecordMetadata metadata) throws InterruptedException {
block.maybeBlockOn(SOURCE_TASK_COMMIT_RECORD_WITH_METADATA);
if (metadata == null) {
block.maybeBlockOn(SOURCE_TASK_COMMIT_RECORD);
} else {
block.maybeBlockOn(SOURCE_TASK_COMMIT_RECORD_WITH_METADATA);
}
super.commitRecord(record, metadata);
}
}
Expand Down
2 changes: 2 additions & 0 deletions docs/upgrade.html
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ <h5><a id="upgrade_400_notable" href="#upgrade_400_notable">Notable changes in 4
<li>The <code>onPartitionsRevoked(Collection&lt;TopicPartition&gt;)</code> and <code>onPartitionsAssigned(Collection&lt;TopicPartition&gt;)</code> methods
were removed from <code>SinkTask</code>.
</li>
<li>The <code>commitRecord(SourceRecord)</code> method was removed from <code>SourceTask</code>.
</li>
</ul>
</li>
<li><b>Consumer</b>
Expand Down
Loading