New paste Repaste Download
KafkaConsumerConfig :
package com.highradius.kafka.configuration;
import com.highradius.common.util.HRCLog;
import com.highradius.common.util.HRCLogFactory;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties.AckMode;
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
  private static final HRCLog LOGGER = HRCLogFactory.getLog(KafkaConsumerConfig.class);
  
  String brokerAddress;
  public String getBrokerAddress() {
      return brokerAddress;
  }
  public void setBrokerAddress(String brokerAddress) {
      this.brokerAddress = brokerAddress;
  }
//  @Value("${cpa.kafka.sasl.jaas.config}")
//  String jaasConfig;
//
//  @Value("${cpa.kafka.sasl.mechanism}")
//  String saslMechanism;
//
//  @Value("${cpa.kafka.ssl.endpoint.identification.algorithm}")
//  String sslEndpointIdentificationAlgorithm;
//
//  @Value("${cpa.kafka.security.protocol}")
//  String securityProtocol;
  @Bean
  public ConsumerFactory<String, String> consumerFactory() {
    LOGGER.info("Creating Kafka Consumer Factory with broker address: " + brokerAddress);
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerAddress);
    configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CustomKafkaDeserializer.class);
    configProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
    configProps.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 600000);
    return new DefaultKafkaConsumerFactory<>(configProps);
  }
  public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    LOGGER.info("Creating Kafka Listener Container Factory with broker address: " + brokerAddress);
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
    return factory;
  }
}
IDPResultConsumer :
package com.highradius.kafka.dto;
import com.highradius.g4.core.common.G4Log;
import com.highradius.g4.core.common.G4LogManager;
import com.highradius.g4.intelliparse.dto.IDPResultDto;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.listener.AcknowledgingMessageListener;
import org.springframework.kafka.support.Acknowledgment;
public class IDPResultConsumer implements AcknowledgingMessageListener<String, IDPResultDto> {
    private static final G4Log LOGGER = G4LogManager.getLog(IDPResultConsumer.class);
    @Override
    public void onMessage(ConsumerRecord<String, IDPResultDto> record, Acknowledgment acknowledgment) {
        LOGGER.info("Received IDPResultDto from Kafka: " + record.value());
        acknowledgment.acknowledge();
    }
}
kafka-config,xml :
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd">
    <bean id="kafkaProducerConfig" class="com.highradius.kafka.configuration.KafkaProducerConfig">
        <property name="brokerAddress" value="${cpa.kafka.broker}"/>
        <property name="jaasConfig" value="${cpa.kafka.sasl.jaas.config}"/>
        <property name="saslMechanism" value="${cpa.kafka.sasl.mechanism}"/>
        <property name="sslEndpointIdentificationAlgorithm" value="${cpa.kafka.ssl.endpoint.identification.algorithm}"/>
        <property name="securityProtocol" value="${cpa.kafka.security.protocol}"/>
    </bean>
    <bean id="kafkaTemplate" factory-bean="kafkaProducerConfig" factory-method="kafkaTemplate"/>
    <bean id="kafkaProducers" class="com.highradius.kafka.producers.KafkaProducers">
        <property name="kafkaTemplate" ref="kafkaTemplate"/>
    </bean>
    <bean id="kafkaConsumerConfig" class="com.highradius.kafka.configuration.KafkaConsumerConfig">
        <property name="brokerAddress" value="${cpa.idp.kafka.broker}"/>
    </bean>
    
    <bean id="kafkaListenerContainerFactory" factory-bean="kafkaConsumerConfig" factory-method="kafkaListenerContainerFactory"/>
    <bean id="containerProperties" class="org.springframework.kafka.listener.ContainerProperties">
        <constructor-arg value="${spring.kafka.consumer.topic.idp-result}"/>
        <property name="groupId" value="${spring.kafka.consumer.group-id}"/>
        <property name="ackMode" value="MANUAL_IMMEDIATE"/>
        <property name="messageListener">
            <bean class="com.highradius.kafka.dto.IDPResultConsumer"/>
        </property>
    </bean>
    <bean id="idpResultListenerContainer" class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer">
        <constructor-arg>
            <bean factory-bean="kafkaConsumerConfig" factory-method="consumerFactory"/>
        </constructor-arg>
        <constructor-arg ref="containerProperties"/>
    </bean>
    
<bean id="kafkaContainerStarter" class="com.highradius.kafka.configuration.KafkaContainerStarter">
     <constructor-arg ref="idpResultListenerContainer"/>
</bean>
</beans>
Filename: None. Size: 6kb. View raw, , hex, or download this file.

This paste expires on 2025-07-08 06:30:04.006550. Pasted through web.