spring boot自動配置方式整合
spring boot具有許多自動化配置,對于kafka的自動化配置當然也包含在內,基于spring boot自動配置方式整合kafka,需要做以下步驟。
引入kafka的pom依賴包
1
2
3
4
5
6
|
<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka --> < dependency > < groupId >org.springframework.kafka</ groupId > < artifactId >spring-kafka</ artifactId > < version >2.2.2.RELEASE</ version > </ dependency > |
在配置文件中配置kafka相關屬性配置,分別配置生產者和消費者的屬性,在程序啟動時,spring boot框架會自動讀取這些配置的屬性,創建相關的生產者、消費者等。下面展示一個簡單的配置。
1
2
3
4
5
6
7
8
9
|
#kafka默認消費者配置 spring.kafka.consumer.bootstrap-servers= 192.168 . 0.15 : 9092 spring.kafka.consumer.enable-auto-commit= false spring.kafka.consumer.auto-offset-reset=earliest #kafka默認生產者配置 spring.kafka.producer.bootstrap-servers= 192.168 . 0.15 : 9092 spring.kafka.producer.acks=- 1 spring.kafka.client-id=kafka-producer spring.kafka.producer.batch-size= 5 |
當然,在實際生產中的配置肯定比上面的配置復雜,需要一些定制化的操作,那么spring boot的自動化配置創建的生產者或者消費者都不能滿足我們時,應該需要自定義化相關配置,這個在后續舉例,這里先分析自動化配置。
在進行了如上配置之后,需要生產者時,使用方式為下代碼所示。
1
2
3
4
5
6
7
8
9
10
11
12
|
@RunWith (SpringRunner. class ) @SpringBootTest (classes = {UserSSOApplication. class }) public class UserSSOApplicationTests { @Resource //注入kafkatemplete,這個由spring boot自動創建 KafkaTemplate kafkaTemplate; @Test public void testKafkaSendMsg() { //發送消息 kafkaTemplate.send( "test" , 0 , 12 , "1222" ); } } |
消費者的使用注解方式整合,代碼如下。
1
2
3
4
5
6
7
8
9
|
@Component @Slf4j public class KafkaMessageReceiver2 { //指定監聽的topic,當前消費者組id @KafkaListener (topics = { "test" }, groupId = "receiver" ) public void registryReceiver(ConsumerRecord<Integer, String> integerStringConsumerRecords) { log.info(integerStringConsumerRecords.value()); } } |
上面是最簡單的配置,實現的一個簡單例子,如果需要更加定制化的配置,可以參考類
org.springframework.boot.autoconfigure.kafka.KafkaProperties這里面包含了大部分需要的kafka配置。針對配置,在properties文件中添加即可。
spring boot自動配置的不足
上面是依賴spring boot自動化配置完成的整合方式,實際上所有的配置實現都是在org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration中完成。可以看出個是依賴于@Configuration完成bean配置,這種配置方式基本能夠實現大部分情況,只要熟悉org.springframework.boot.autoconfigure.kafka.KafkaProperties中的配置即可。
但是這種方式還有一個問題,就是org.springframework.boot.autoconfigure.kafka.KafkaProperties中并沒有涵蓋所有的org.apache.kafka.clients.producer.ProducerConfig中的配置,這就導致某些特殊配置不能依賴spring boot自動創建,需要我們手動創建Producer和comsumer。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
|
@Configuration @ConditionalOnClass (KafkaTemplate. class ) @EnableConfigurationProperties (KafkaProperties. class ) @Import (KafkaAnnotationDrivenConfiguration. class ) public class KafkaAutoConfiguration { private final KafkaProperties properties; private final RecordMessageConverter messageConverter; public KafkaAutoConfiguration(KafkaProperties properties, ObjectProvider<RecordMessageConverter> messageConverter) { this .properties = properties; this .messageConverter = messageConverter.getIfUnique(); } @Bean @ConditionalOnMissingBean (KafkaTemplate. class ) public KafkaTemplate<?, ?> kafkaTemplate( ProducerFactory<Object, Object> kafkaProducerFactory, ProducerListener<Object, Object> kafkaProducerListener) { KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>( kafkaProducerFactory); if ( this .messageConverter != null ) { kafkaTemplate.setMessageConverter( this .messageConverter); } kafkaTemplate.setProducerListener(kafkaProducerListener); kafkaTemplate.setDefaultTopic( this .properties.getTemplate().getDefaultTopic()); return kafkaTemplate; } @Bean @ConditionalOnMissingBean (ConsumerFactory. class ) public ConsumerFactory<?, ?> kafkaConsumerFactory() { return new DefaultKafkaConsumerFactory<>( this .properties.buildConsumerProperties()); } @Bean @ConditionalOnMissingBean (ProducerFactory. class ) public ProducerFactory<?, ?> kafkaProducerFactory() { DefaultKafkaProducerFactory<?, ?> factory = new DefaultKafkaProducerFactory<>( this .properties.buildProducerProperties()); String transactionIdPrefix = this .properties.getProducer() .getTransactionIdPrefix(); if (transactionIdPrefix != null ) { factory.setTransactionIdPrefix(transactionIdPrefix); } return factory; } //略略略 } |
spring boot下手動配置kafka
由于需要對某些特殊配置進行配置,我們可能需要手動配置kafka相關的bean,創建一個配置類如下,類似于
org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration,這里創建了對應類型的bean之后,org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration中的對應Bean定義將不起作用。
所有的生產者配置可以參考ProducerConfig類,所有的消費者配置可以參考ConsumerConfig類。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
|
/** * kafka配置,實際上,在KafkaAutoConfiguration中已經有默認的根據配置文件信息創建配置,但是自動配置屬性沒有涵蓋所有 * 我們可以自定義創建相關bean,進行如下配置 * * @author zhoujy * @date 2018年12月17日 **/ @Configuration public class KafkaConfig { @Value ( "${spring.kafka.consumer.bootstrap-servers}" ) private String bootstrapServers; //構造消費者屬性map,ConsumerConfig中的可配置屬性比spring boot自動配置要多 private Map<String, Object> consumerProperties(){ Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false ); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000" ); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer. class ); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer. class ); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5 ); props.put(ConsumerConfig.GROUP_ID_CONFIG, "activity-service" ); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); return props; } /** * 不使用spring boot默認方式創建的DefaultKafkaConsumerFactory,重新定義創建方式 * @return */ @Bean ( "consumerFactory" ) public DefaultKafkaConsumerFactory consumerFactory(){ return new DefaultKafkaConsumerFactory(consumerProperties()); } @Bean ( "listenerContainerFactory" ) //個性化定義消費者 public ConcurrentKafkaListenerContainerFactory listenerContainerFactory(DefaultKafkaConsumerFactory consumerFactory) { //指定使用DefaultKafkaConsumerFactory ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(consumerFactory); //設置消費者ack模式為手動,看需求設置 factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE); //設置可批量拉取消息消費,拉取數量一次3,看需求設置 factory.setConcurrency( 3 ); factory.setBatchListener( true ); return factory; } /* @Bean //代碼創建方式topic public NewTopic batchTopic() { return new NewTopic("topic.quick.batch", 8, (short) 1); }*/ //創建生產者配置map,ProducerConfig中的可配置屬性比spring boot自動配置要多 private Map<String, Object> producerProperties(){ Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer. class ); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer. class ); props.put(ProducerConfig.ACKS_CONFIG, "-1" ); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 5 ); props.put(ProducerConfig.LINGER_MS_CONFIG, 500 ); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); return props; } /** * 不使用spring boot的KafkaAutoConfiguration默認方式創建的DefaultKafkaProducerFactory,重新定義 * @return */ @Bean ( "produceFactory" ) public DefaultKafkaProducerFactory produceFactory(){ return new DefaultKafkaProducerFactory(producerProperties()); } /** * 不使用spring boot的KafkaAutoConfiguration默認方式創建的KafkaTemplate,重新定義 * @param produceFactory * @return */ @Bean public KafkaTemplate kafkaTemplate(DefaultKafkaProducerFactory produceFactory){ return new KafkaTemplate(produceFactory); } } |
生產者的使用方式,跟自動配置一樣,直接注入KafkaTemplate即可。主要是消費者的使用有些不同。
批量消費消息
上面的消費者配置配置了一個bean,@Bean(“listenerContainerFactory”),這個bean可以指定為消費者,注解方式中是如下的使用方式。
containerFactory = "listenerContainerFactory"指定了使用listenerContainerFactory作為消費者。
- 注意registryReceiver中的參數,ConsumerRecord對比之前的消費者,因為設置listenerContainerFactory是批量消費,因此ConsumerRecord是一個List,如果不是批量消費的話,相對應就是一個對象。
- 注意第二個參數Acknowledgment,這個參數只有在設置消費者的ack應答模式為AckMode.MANUAL_IMMEDIATE才能注入,意思是需要手動ack。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
@Component @Slf4j public class KafkaMessageReceiver { /** * listenerContainerFactory設置了批量拉取消息,因此參數是List<ConsumerRecord<Integer, String>>,否則是ConsumerRecord * @param integerStringConsumerRecords * @param acknowledgment */ @KafkaListener (topics = { "test" }, containerFactory = "listenerContainerFactory" ) public void registryReceiver(List<ConsumerRecord<Integer, String>> integerStringConsumerRecords, Acknowledgment acknowledgment) { Iterator<ConsumerRecord<Integer, String>> it = integerStringConsumerRecords.iterator(); while (it.hasNext()){ ConsumerRecord<Integer, String> consumerRecords = it.next(); //dosome acknowledgment.acknowledge(); } } } |
如果不想要批量消費消息,那就可以另外定義一個bean類似于@Bean(“listenerContainerFactory”),如下,只要不設置批量消費即可。
1
2
3
4
5
6
7
8
9
10
11
|
@Bean ( "listenerContainerFactory2" ) //個性化定義消費者 public ConcurrentKafkaListenerContainerFactory listenerContainerFactory2(DefaultKafkaConsumerFactory consumerFactory) { //指定使用DefaultKafkaConsumerFactory ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(consumerFactory); //設置消費者ack模式為手動,看需求設置 factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE); return factory; } |
spring boot整合kafka報錯
Timeout expired while fetching topic metadata
這種報錯應檢查 kafka連接問題,服務是否啟動,端口是否正確。
Kafka Producer error Expiring 10 record(s) for TOPIC:XXXXXX: xxx ms has passed since batch creation plus linger time
這種報錯要考慮kafka和spring對應的版本問題,我的springboot 2.1.2在使用kafka_2.12-2.1.1時出現此問題,將kafka版本換為2.11-1.1.1后,問題解決。
以上為個人經驗,希望能給大家一個參考,也希望大家多多支持服務器之家。
原文鏈接:https://blog.csdn.net/qq_20597727/article/details/85085532