| | |
| | | package com.java110.core.kafka; |
| | | |
| | | |
| | | import com.java110.core.factory.GenerateCodeFactory; |
| | | import org.apache.kafka.clients.consumer.ConsumerConfig; |
| | | import org.apache.kafka.common.serialization.StringDeserializer; |
| | | import org.springframework.beans.factory.annotation.Value; |
| | |
| | | @EnableKafka |
| | | public class KafkaConsumerConfig { |
| | | |
| | | public static final String GROUP_ID_GENERATE = "-1"; |
| | | |
| | | @Value("${kafka.consumer.servers}") |
| | | private String servers; |
| | | @Value("${kafka.consumer.enable.auto.commit}") |
| | |
| | | private int concurrency; |
| | | @Value("${kafka.consumer.topic}") |
| | | public String topic; |
| | | |
| | | @Bean |
| | | public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { |
| | | ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>(); |
| | |
| | | propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout); |
| | | propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); |
| | | propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); |
| | | propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); |
| | | if (GROUP_ID_GENERATE.equals(groupId)) { |
| | | propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, GenerateCodeFactory.getUUID()); |
| | | } else { |
| | | propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); |
| | | } |
| | | propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); |
| | | return propsMap; |
| | | } |