-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Description
Discussed in #2250
Originally posted by Walnussbaer May 3, 2022
Hi everyone,
I'm really strunggeling with some basic functionality that I would like to achieve using KafkaTemplate and I hope someone can help me out.
I already created a Stackoverflow entry, but didn't get much help there yet (https://stackoverflow.com/questions/72055135/spring-apache-kafka-onfailure-callback-of-kafkatemplate-not-fired-on-connection/72059673?noredirect=1#comment127371838_72059673). The Apache Kafka Spring Docs weren't much help either, as well as a very extensive Google search.
Consider the basic following scenario:
I have a simple KafkaTemplate, that shall send data to a KafkaCluster. Now consider, that the KafkaCluster goes down (not just temporarily). How can i configure the KafkaTemplate so that the wrapped producer stops trying to connect to the KafkaCluster to fetch the metadata for the given topic in case the Kafka Cluster is not reachable?
Why do I want to achieve that? Well, in a production environment, it can always be the case, that the Kafka Cluster goes down for some reason. I want to be able to detect that during the sending process of data. And worst problem ist, that, as of now, my producer thread will starve to death, because it goes into in infinite loop trying to connect to the broker.
This is my producer config:
@Configuration
public class KafkaProducerConfig {
private String bootstrapServers = "[::1]:9091"; // wrong port to simulate unavailable connection
@Bean
public Map<String,Object> producerConfig() {
// config settings for creating producers
Map<String,Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,this.bootstrapServers);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
configProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG,10000);
configProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,4000);
configProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG,6000);
configProps.put(ProducerConfig.RETRIES_CONFIG,0);
return configProps;
}
@Bean
public ProducerFactory<String,String> producerFactory() {
// creates a kafka producer
return new DefaultKafkaProducerFactory<>(producerConfig());
}
@Bean("kafkaTemplate")
public KafkaTemplate<String,String> kafkaTemplate(){
// template which abstracts sending data to kafka
KafkaTemplate<String,String> kafkaTemplate = new KafkaTemplate<>(producerFactory());
return kafkaTemplate;
}
}
This is my service. My first approach sendMessageWithCallback() does not work, because the onFailure() method won't get invoked if the KafkaProducer cannot establish a connection to the Kafka cluster. Using my second service method sendMessageWithProperErrorHandling(), I can at least catch the TimeoutException which is thrown by the KafkaProducer when the metadata for the topic could not be fetched within MAX_BLOCK_MS_CONFIG, but still, I can't stop the producer from going into an infite loop after that first timeout. Below you also find a picutre of the infinite loop. The KafkaProducer will essentially try to connect to the KafkaCluster for the rest of it's life, creating thread starving to death. It also looks like that it completly ignores my RETRIES_CONFIG which is set to zero retires ...
@Service
public class KafkaSenderService {
Logger logger = LoggerFactory.getLogger(KafkaSenderService.class);
@Qualifier("kafkaTemplate")
private final KafkaTemplate<String,String> kafkaTemplate;
@Autowired
public KafkaSenderService(KafkaTemplate<String,String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(String message, String topicName) {
kafkaTemplate.send(topicName,message);
}
public void sendMessageWithCallback(String message, String topicName) {
// possibility to add callbacks to define what shall happen in success/ error case
ListenableFuture<SendResult<String,String>> future = kafkaTemplate.send(topicName, message);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
// DOES NOT WORK IF THE BROKER IS NOT AVAILABLE
public void onFailure(Throwable ex) {
logger.warn("Message could not be delivered. " + ex.getMessage());
}
@Override
public void onSuccess(SendResult<String, String> result) {
logger.info("Your message was delivered with following offset: " + result.getRecordMetadata().offset());
}
});
}
public void sendMessageWithProperErrorHandling(String message, String topicName){
// TODO use AdminClient to check connectivity --> sensless, what if the cluster goes down after the check was made?!
try {
SendResult<String,String> sendResult = kafkaTemplate.send(topicName, message).get(5000, TimeUnit.MILLISECONDS);
} catch (Exception te) {
System.out.println("Could not connect" + te.getMessage());
te.printStackTrace();
}
}
}
Now my simple question: What is the best practice to detect connection errors during a send process and stop the sending process, when that error occurs?