KafkaConsumerConfig : package com.highradius.kafka.configuration; import com.highradius.common.util.HRCLog; import com.highradius.common.util.HRCLogFactory; import java.util.HashMap; import java.util.Map; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ContainerProperties.AckMode; @EnableKafka @Configuration public class KafkaConsumerConfig { private static final HRCLog LOGGER = HRCLogFactory.getLog(KafkaConsumerConfig.class); String brokerAddress; public String getBrokerAddress() { return brokerAddress; } public void setBrokerAddress(String brokerAddress) { this.brokerAddress = brokerAddress; } // @Value("${cpa.kafka.sasl.jaas.config}") // String jaasConfig; // // @Value("${cpa.kafka.sasl.mechanism}") // String saslMechanism; // // @Value("${cpa.kafka.ssl.endpoint.identification.algorithm}") // String sslEndpointIdentificationAlgorithm; // // @Value("${cpa.kafka.security.protocol}") // String securityProtocol; @Bean public ConsumerFactory consumerFactory() { LOGGER.info("Creating Kafka Consumer Factory with broker address: " + brokerAddress); Map configProps = new HashMap<>(); configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerAddress); configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CustomKafkaDeserializer.class); configProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100); configProps.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 600000); return new DefaultKafkaConsumerFactory<>(configProps); } public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { LOGGER.info("Creating Kafka Listener Container Factory with broker address: " + brokerAddress); ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE); return factory; } } IDPResultConsumer : package com.highradius.kafka.dto; import com.highradius.g4.core.common.G4Log; import com.highradius.g4.core.common.G4LogManager; import com.highradius.g4.intelliparse.dto.IDPResultDto; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.listener.AcknowledgingMessageListener; import org.springframework.kafka.support.Acknowledgment; public class IDPResultConsumer implements AcknowledgingMessageListener { private static final G4Log LOGGER = G4LogManager.getLog(IDPResultConsumer.class); @Override public void onMessage(ConsumerRecord record, Acknowledgment acknowledgment) { LOGGER.info("Received IDPResultDto from Kafka: " + record.value()); acknowledgment.acknowledge(); } } kafka-config,xml :