一区二区三区在线-一区二区三区亚洲视频-一区二区三区亚洲-一区二区三区午夜-一区二区三区四区在线视频-一区二区三区四区在线免费观看

服務器之家:專注于服務器技術及軟件下載分享
分類導航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術|正則表達式|C/C++|IOS|C#|Swift|Android|VB|R語言|JavaScript|易語言|vb.net|

服務器之家 - 編程語言 - Java教程 - 關于spring boot整合kafka+注解方式

關于spring boot整合kafka+注解方式

2022-01-21 11:50一擼向北 Java教程

這篇文章主要介紹了關于spring boot整合kafka+注解方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教

spring boot自動配置方式整合

spring boot具有許多自動化配置,對于kafka的自動化配置當然也包含在內,基于spring boot自動配置方式整合kafka,需要做以下步驟。

引入kafka的pom依賴包

?
1
2
3
4
5
6
<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.2.2.RELEASE</version>
</dependency>

在配置文件中配置kafka相關屬性配置,分別配置生產者和消費者的屬性,在程序啟動時,spring boot框架會自動讀取這些配置的屬性,創建相關的生產者、消費者等。下面展示一個簡單的配置。

?
1
2
3
4
5
6
7
8
9
#kafka默認消費者配置
spring.kafka.consumer.bootstrap-servers=192.168.0.15:9092
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=earliest
#kafka默認生產者配置
spring.kafka.producer.bootstrap-servers=192.168.0.15:9092
spring.kafka.producer.acks=-1
spring.kafka.client-id=kafka-producer
spring.kafka.producer.batch-size=5

當然,在實際生產中的配置肯定比上面的配置復雜,需要一些定制化的操作,那么spring boot的自動化配置創建的生產者或者消費者都不能滿足我們時,應該需要自定義化相關配置,這個在后續舉例,這里先分析自動化配置。

在進行了如上配置之后,需要生產者時,使用方式為下代碼所示。

?
1
2
3
4
5
6
7
8
9
10
11
12
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {UserSSOApplication.class})
public class UserSSOApplicationTests {
   @Resource
    //注入kafkatemplete,這個由spring boot自動創建
   KafkaTemplate kafkaTemplate;
   @Test
   public void testKafkaSendMsg() {
       //發送消息
      kafkaTemplate.send("test", 0,12,"1222");
   }
}

消費者的使用注解方式整合,代碼如下。

?
1
2
3
4
5
6
7
8
9
@Component
@Slf4j
public class KafkaMessageReceiver2 {
    //指定監聽的topic,當前消費者組id
    @KafkaListener(topics = {"test"}, groupId = "receiver")
    public void registryReceiver(ConsumerRecord<Integer, String> integerStringConsumerRecords) {
        log.info(integerStringConsumerRecords.value());
    }
}

上面是最簡單的配置,實現的一個簡單例子,如果需要更加定制化的配置,可以參考類

org.springframework.boot.autoconfigure.kafka.KafkaProperties這里面包含了大部分需要的kafka配置。針對配置,在properties文件中添加即可。

spring boot自動配置的不足

上面是依賴spring boot自動化配置完成的整合方式,實際上所有的配置實現都是在org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration中完成。可以看出個是依賴于@Configuration完成bean配置,這種配置方式基本能夠實現大部分情況,只要熟悉org.springframework.boot.autoconfigure.kafka.KafkaProperties中的配置即可。

但是這種方式還有一個問題,就是org.springframework.boot.autoconfigure.kafka.KafkaProperties中并沒有涵蓋所有的org.apache.kafka.clients.producer.ProducerConfig中的配置,這就導致某些特殊配置不能依賴spring boot自動創建,需要我們手動創建Producer和comsumer。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
@Configuration
@ConditionalOnClass(KafkaTemplate.class)
@EnableConfigurationProperties(KafkaProperties.class)
@Import(KafkaAnnotationDrivenConfiguration.class)
public class KafkaAutoConfiguration {
   private final KafkaProperties properties;
   private final RecordMessageConverter messageConverter;
   public KafkaAutoConfiguration(KafkaProperties properties,
         ObjectProvider<RecordMessageConverter> messageConverter) {
      this.properties = properties;
      this.messageConverter = messageConverter.getIfUnique();
   }
   @Bean
   @ConditionalOnMissingBean(KafkaTemplate.class)
   public KafkaTemplate<?, ?> kafkaTemplate(
         ProducerFactory<Object, Object> kafkaProducerFactory,
         ProducerListener<Object, Object> kafkaProducerListener) {
      KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(
            kafkaProducerFactory);
      if (this.messageConverter != null) {
         kafkaTemplate.setMessageConverter(this.messageConverter);
      }
      kafkaTemplate.setProducerListener(kafkaProducerListener);
      kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
      return kafkaTemplate;
   }
   @Bean
   @ConditionalOnMissingBean(ConsumerFactory.class)
   public ConsumerFactory<?, ?> kafkaConsumerFactory() {
      return new DefaultKafkaConsumerFactory<>(
            this.properties.buildConsumerProperties());
   }
   @Bean
   @ConditionalOnMissingBean(ProducerFactory.class)
   public ProducerFactory<?, ?> kafkaProducerFactory() {
      DefaultKafkaProducerFactory<?, ?> factory = new DefaultKafkaProducerFactory<>(
            this.properties.buildProducerProperties());
      String transactionIdPrefix = this.properties.getProducer()
            .getTransactionIdPrefix();
      if (transactionIdPrefix != null) {
         factory.setTransactionIdPrefix(transactionIdPrefix);
      }
      return factory;
   }
  //略略略
}

spring boot下手動配置kafka

由于需要對某些特殊配置進行配置,我們可能需要手動配置kafka相關的bean,創建一個配置類如下,類似于

org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration,這里創建了對應類型的bean之后,org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration中的對應Bean定義將不起作用。

所有的生產者配置可以參考ProducerConfig類,所有的消費者配置可以參考ConsumerConfig類。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
/**
 * kafka配置,實際上,在KafkaAutoConfiguration中已經有默認的根據配置文件信息創建配置,但是自動配置屬性沒有涵蓋所有
 * 我們可以自定義創建相關bean,進行如下配置
 *
 * @author zhoujy
 * @date 2018年12月17日
 **/
@Configuration
public class KafkaConfig {
    @Value("${spring.kafka.consumer.bootstrap-servers}")
    private String bootstrapServers;
    //構造消費者屬性map,ConsumerConfig中的可配置屬性比spring boot自動配置要多
    private Map<String, Object> consumerProperties(){
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "activity-service");
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        return props;
    }
    /**
     * 不使用spring boot默認方式創建的DefaultKafkaConsumerFactory,重新定義創建方式
     * @return
     */
    @Bean("consumerFactory")
    public DefaultKafkaConsumerFactory consumerFactory(){
        return new DefaultKafkaConsumerFactory(consumerProperties());
    }
    @Bean("listenerContainerFactory")
    //個性化定義消費者
    public ConcurrentKafkaListenerContainerFactory listenerContainerFactory(DefaultKafkaConsumerFactory consumerFactory) {
        //指定使用DefaultKafkaConsumerFactory
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory);
        
 //設置消費者ack模式為手動,看需求設置 
        factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
        //設置可批量拉取消息消費,拉取數量一次3,看需求設置
        factory.setConcurrency(3);
        factory.setBatchListener(true);
        return factory;
    }
   /* @Bean
     //代碼創建方式topic
    public NewTopic batchTopic() {
        return new NewTopic("topic.quick.batch", 8, (short) 1);
    }*/
    //創建生產者配置map,ProducerConfig中的可配置屬性比spring boot自動配置要多
    private Map<String, Object> producerProperties(){
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.ACKS_CONFIG, "-1");
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 5);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 500);
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        return props;
    }
    /**
     * 不使用spring boot的KafkaAutoConfiguration默認方式創建的DefaultKafkaProducerFactory,重新定義
     * @return
     */
    @Bean("produceFactory")
    public DefaultKafkaProducerFactory produceFactory(){
        return new DefaultKafkaProducerFactory(producerProperties());
    }
    /**
     * 不使用spring boot的KafkaAutoConfiguration默認方式創建的KafkaTemplate,重新定義
     * @param produceFactory
     * @return
     */
    @Bean
    public KafkaTemplate kafkaTemplate(DefaultKafkaProducerFactory produceFactory){
        return new KafkaTemplate(produceFactory);
    }
}

生產者的使用方式,跟自動配置一樣,直接注入KafkaTemplate即可。主要是消費者的使用有些不同。

批量消費消息

上面的消費者配置配置了一個bean,@Bean(“listenerContainerFactory”),這個bean可以指定為消費者,注解方式中是如下的使用方式。

containerFactory = "listenerContainerFactory"指定了使用listenerContainerFactory作為消費者。

  • 注意registryReceiver中的參數,ConsumerRecord對比之前的消費者,因為設置listenerContainerFactory是批量消費,因此ConsumerRecord是一個List,如果不是批量消費的話,相對應就是一個對象。
  • 注意第二個參數Acknowledgment,這個參數只有在設置消費者的ack應答模式為AckMode.MANUAL_IMMEDIATE才能注入,意思是需要手動ack。
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Component
@Slf4j
public class KafkaMessageReceiver {  
    /**
     * listenerContainerFactory設置了批量拉取消息,因此參數是List<ConsumerRecord<Integer, String>>,否則是ConsumerRecord
     * @param integerStringConsumerRecords
     * @param acknowledgment
     */
    @KafkaListener(topics = {"test"}, containerFactory = "listenerContainerFactory")
    public void registryReceiver(List<ConsumerRecord<Integer, String>> integerStringConsumerRecords, Acknowledgment acknowledgment) {
        Iterator<ConsumerRecord<Integer, String>> it = integerStringConsumerRecords.iterator();
        while (it.hasNext()){
            ConsumerRecord<Integer, String> consumerRecords = it.next();
            //dosome
             acknowledgment.acknowledge();
        }
    }
}

如果不想要批量消費消息,那就可以另外定義一個bean類似于@Bean(“listenerContainerFactory”),如下,只要不設置批量消費即可。

?
1
2
3
4
5
6
7
8
9
10
11
@Bean("listenerContainerFactory2")
    //個性化定義消費者
    public ConcurrentKafkaListenerContainerFactory listenerContainerFactory2(DefaultKafkaConsumerFactory consumerFactory) {
        //指定使用DefaultKafkaConsumerFactory
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory);
        
 //設置消費者ack模式為手動,看需求設置 
        factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }

spring boot整合kafka報錯

Timeout expired while fetching topic metadata

這種報錯應檢查 kafka連接問題,服務是否啟動,端口是否正確。

Kafka Producer error Expiring 10 record(s) for TOPIC:XXXXXX: xxx ms has passed since batch creation plus linger time

這種報錯要考慮kafka和spring對應的版本問題,我的springboot 2.1.2在使用kafka_2.12-2.1.1時出現此問題,將kafka版本換為2.11-1.1.1后,問題解決。

以上為個人經驗,希望能給大家一個參考,也希望大家多多支持服務器之家。

原文鏈接:https://blog.csdn.net/qq_20597727/article/details/85085532

延伸 · 閱讀

精彩推薦
  • Java教程20個非常實用的Java程序代碼片段

    20個非常實用的Java程序代碼片段

    這篇文章主要為大家分享了20個非常實用的Java程序片段,對java開發項目有所幫助,感興趣的小伙伴們可以參考一下 ...

    lijiao5352020-04-06
  • Java教程xml與Java對象的轉換詳解

    xml與Java對象的轉換詳解

    這篇文章主要介紹了xml與Java對象的轉換詳解的相關資料,需要的朋友可以參考下...

    Java教程網2942020-09-17
  • Java教程Java8中Stream使用的一個注意事項

    Java8中Stream使用的一個注意事項

    最近在工作中發現了對于集合操作轉換的神器,java8新特性 stream,但在使用中遇到了一個非常重要的注意點,所以這篇文章主要給大家介紹了關于Java8中S...

    阿杜7482021-02-04
  • Java教程小米推送Java代碼

    小米推送Java代碼

    今天小編就為大家分享一篇關于小米推送Java代碼,小編覺得內容挺不錯的,現在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧...

    富貴穩中求8032021-07-12
  • Java教程Java使用SAX解析xml的示例

    Java使用SAX解析xml的示例

    這篇文章主要介紹了Java使用SAX解析xml的示例,幫助大家更好的理解和學習使用Java,感興趣的朋友可以了解下...

    大行者10067412021-08-30
  • Java教程Java BufferWriter寫文件寫不進去或缺失數據的解決

    Java BufferWriter寫文件寫不進去或缺失數據的解決

    這篇文章主要介紹了Java BufferWriter寫文件寫不進去或缺失數據的解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望...

    spcoder14552021-10-18
  • Java教程Java實現搶紅包功能

    Java實現搶紅包功能

    這篇文章主要為大家詳細介紹了Java實現搶紅包功能,采用多線程模擬多人同時搶紅包,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙...

    littleschemer13532021-05-16
  • Java教程升級IDEA后Lombok不能使用的解決方法

    升級IDEA后Lombok不能使用的解決方法

    最近看到提示IDEA提示升級,尋思已經有好久沒有升過級了。升級完畢重啟之后,突然發現好多錯誤,本文就來介紹一下如何解決,感興趣的可以了解一下...

    程序猿DD9332021-10-08
主站蜘蛛池模板: 5g影院天天影院天天爽影院网站 | 久草在在线免视频在线观看 | 精品视频 九九九 | 欧美日韩一区二区三在线 | 成人免费视频一区二区三区 | 四虎网址大全 | 99精品视频在线观看re | 欧美人人干 | 扒开黑女人p大荫蒂老女人 扒开大腿狠狠挺进视频 | 国产日韩欧美色视频色在线观看 | 成人一区二区免费中文字幕 | www.亚洲天堂 | 国产欧美久久一区二区 | 成年人免费在线看 | 国偷盗摄自产福利一区在线 | 国产成人夜色影视视频 | videos变态极端| 国产精品日韩在线观看 | 热99精品视频 | 日本免费在线播放 | 晚上禁用的十大黄台视频 | 92国产福利视频一区二区 | 国产悠悠视频在线播放 | 男男gaygays国内 | 欧美一级视频在线观看 | 日韩免费在线观看 | 欧美另类videos另类粗暴 | 大肥臀风间由美 中文字幕 大东北chinesexxxx露脸 | 丁香五香天堂 | 国产99re在线观看69热 | 国产精品成人一区二区1 | 国产精品探花一区在线观看 | 日本激情小说 | 欧美一级免费看 | 国产一成人精品福利网站 | 校园全黄h全肉细节文 | 欧美一区欧美二区 | 欧美亚洲一区二区三区在线 | 香蕉免费看一区二区三区 | 毛片在线网址 | 亚洲视频高清 |