| KafkaConsumerConfig :
|
| package com.highradius.kafka.configuration;
|
|
|
| import com.highradius.common.util.HRCLog;
|
| import com.highradius.common.util.HRCLogFactory;
|
| import java.util.HashMap;
|
| import java.util.Map;
|
| import org.apache.kafka.clients.consumer.ConsumerConfig;
|
| import org.apache.kafka.common.serialization.StringDeserializer;
|
| import org.springframework.beans.factory.annotation.Value;
|
| import org.springframework.context.annotation.Bean;
|
| import org.springframework.context.annotation.Configuration;
|
| import org.springframework.kafka.annotation.EnableKafka;
|
| import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
|
| import org.springframework.kafka.core.ConsumerFactory;
|
| import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
| import org.springframework.kafka.listener.ContainerProperties.AckMode;
|
|
|
| @EnableKafka
|
| @Configuration
|
| public class KafkaConsumerConfig {
|
|
|
| private static final HRCLog LOGGER = HRCLogFactory.getLog(KafkaConsumerConfig.class);
|
|
|
| String brokerAddress;
|
|
|
| public String getBrokerAddress() {
|
| return brokerAddress;
|
| }
|
|
|
| public void setBrokerAddress(String brokerAddress) {
|
| this.brokerAddress = brokerAddress;
|
| }
|
| // @Value("${cpa.kafka.sasl.jaas.config}")
|
| // String jaasConfig;
|
| //
|
| // @Value("${cpa.kafka.sasl.mechanism}")
|
| // String saslMechanism;
|
| //
|
| // @Value("${cpa.kafka.ssl.endpoint.identification.algorithm}")
|
| // String sslEndpointIdentificationAlgorithm;
|
| //
|
| // @Value("${cpa.kafka.security.protocol}")
|
| // String securityProtocol;
|
|
|
| @Bean
|
| public ConsumerFactory<String, String> consumerFactory() {
|
| LOGGER.info("Creating Kafka Consumer Factory with broker address: " + brokerAddress);
|
| Map<String, Object> configProps = new HashMap<>();
|
| configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerAddress);
|
| configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
|
| configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CustomKafkaDeserializer.class);
|
| configProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
|
| configProps.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 600000);
|
| return new DefaultKafkaConsumerFactory<>(configProps);
|
| }
|
|
|
| public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
|
| LOGGER.info("Creating Kafka Listener Container Factory with broker address: " + brokerAddress);
|
| ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
|
| factory.setConsumerFactory(consumerFactory());
|
| factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
|
| return factory;
|
| }
|
| }
|
|
|
| IDPResultConsumer :
|
| package com.highradius.kafka.dto;
|
|
|
| import com.highradius.g4.core.common.G4Log;
|
| import com.highradius.g4.core.common.G4LogManager;
|
| import com.highradius.g4.intelliparse.dto.IDPResultDto;
|
| import org.apache.kafka.clients.consumer.ConsumerRecord;
|
| import org.springframework.kafka.listener.AcknowledgingMessageListener;
|
| import org.springframework.kafka.support.Acknowledgment;
|
|
|
| public class IDPResultConsumer implements AcknowledgingMessageListener<String, IDPResultDto> {
|
|
|
| private static final G4Log LOGGER = G4LogManager.getLog(IDPResultConsumer.class);
|
|
|
| @Override
|
| public void onMessage(ConsumerRecord<String, IDPResultDto> record, Acknowledgment acknowledgment) {
|
| LOGGER.info("Received IDPResultDto from Kafka: " + record.value());
|
| acknowledgment.acknowledge();
|
| }
|
| }
|
|
|
| kafka-config,xml :
|
| <beans xmlns="http://www.springframework.org/schema/beans"
|
| xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
| xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd">
|
|
|
| <bean id="kafkaProducerConfig" class="com.highradius.kafka.configuration.KafkaProducerConfig">
|
| <property name="brokerAddress" value="${cpa.kafka.broker}"/>
|
| <property name="jaasConfig" value="${cpa.kafka.sasl.jaas.config}"/>
|
| <property name="saslMechanism" value="${cpa.kafka.sasl.mechanism}"/>
|
| <property name="sslEndpointIdentificationAlgorithm" value="${cpa.kafka.ssl.endpoint.identification.algorithm}"/>
|
| <property name="securityProtocol" value="${cpa.kafka.security.protocol}"/>
|
| </bean>
|
|
|
| <bean id="kafkaTemplate" factory-bean="kafkaProducerConfig" factory-method="kafkaTemplate"/>
|
|
|
| <bean id="kafkaProducers" class="com.highradius.kafka.producers.KafkaProducers">
|
| <property name="kafkaTemplate" ref="kafkaTemplate"/>
|
| </bean>
|
|
|
| <bean id="kafkaConsumerConfig" class="com.highradius.kafka.configuration.KafkaConsumerConfig">
|
| <property name="brokerAddress" value="${cpa.idp.kafka.broker}"/>
|
| </bean>
|
|
|
| <bean id="kafkaListenerContainerFactory" factory-bean="kafkaConsumerConfig" factory-method="kafkaListenerContainerFactory"/>
|
|
|
| <bean id="containerProperties" class="org.springframework.kafka.listener.ContainerProperties">
|
| <constructor-arg value="${spring.kafka.consumer.topic.idp-result}"/>
|
| <property name="groupId" value="${spring.kafka.consumer.group-id}"/>
|
| <property name="ackMode" value="MANUAL_IMMEDIATE"/>
|
| <property name="messageListener">
|
| <bean class="com.highradius.kafka.dto.IDPResultConsumer"/>
|
| </property>
|
| </bean>
|
|
|
| <bean id="idpResultListenerContainer" class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer">
|
| <constructor-arg>
|
| <bean factory-bean="kafkaConsumerConfig" factory-method="consumerFactory"/>
|
| </constructor-arg>
|
| <constructor-arg ref="containerProperties"/>
|
| </bean>
|
|
|
| <bean id="kafkaContainerStarter" class="com.highradius.kafka.configuration.KafkaContainerStarter">
|
| <constructor-arg ref="idpResultListenerContainer"/>
|
| </bean>
|
| </beans>
|