一区二区三区在线-一区二区三区亚洲视频-一区二区三区亚洲-一区二区三区午夜-一区二区三区四区在线视频-一区二区三区四区在线免费观看

服務(wù)器之家:專注于服務(wù)器技術(shù)及軟件下載分享
分類導航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術(shù)|正則表達式|C/C++|IOS|C#|Swift|Android|VB|R語言|JavaScript|易語言|vb.net|

服務(wù)器之家 - 編程語言 - Java教程 - 在springboot中對kafka進行讀寫的示例代碼

在springboot中對kafka進行讀寫的示例代碼

2020-12-29 14:48冬天里的懶喵 Java教程

本篇文章主要介紹了在springboot中對kafka進行讀寫的示例代碼,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧

springbootkafka的client很好的實現(xiàn)了集成,使用非常方便,本文也實現(xiàn)了一個在springboot中實現(xiàn)操作kafka的demo。

1.pom配置

只需要在dependencies中增加 spring-kafka的配置即可。完整效果如下:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
<parent>
    <groupid>org.springframework.boot</groupid>
    <artifactid>spring-boot-starter-parent</artifactid>
    <version>1.5.4.release</version>
  </parent>
 
  <properties>
    <java.version>1.8</java.version>
     <spring-kafka.version>1.2.2.release</spring-kafka.version>
    <project.build.sourceencoding>utf-8</project.build.sourceencoding>
  </properties>
 
  <dependencies>
    <dependency>
      <groupid>org.springframework.boot</groupid>
      <artifactid>spring-boot-starter-web</artifactid>
    </dependency>
    <dependency>
      <groupid>org.springframework.boot</groupid>
      <artifactid>spring-boot-starter</artifactid>
    </dependency>
    <dependency>
      <groupid>org.springframework.boot</groupid>
      <artifactid>spring-boot-starter-test</artifactid>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupid>org.springframework.boot</groupid>
      <artifactid>spring-boot-starter-aop</artifactid>
    </dependency>
   <!-- spring-kafka -->
      <dependency>
      <groupid>org.springframework.kafka</groupid>
      <artifactid>spring-kafka</artifactid>
      <version>${spring-kafka.version}</version>
      </dependency>
      <dependency>
      <groupid>org.springframework.kafka</groupid>
      <artifactid>spring-kafka-test</artifactid>
      <version>${spring-kafka.version}</version>
      <scope>test</scope>
      </dependency>
   </dependencies>

2.生產(chǎn)者

參數(shù)配置類,其參數(shù)卸載yml文件中,通過@value注入

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package com.dhb.kafka.producer;
 
import org.apache.kafka.clients.producer.producerconfig;
import org.apache.kafka.common.serialization.stringserializer;
import org.springframework.beans.factory.annotation.value;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
import org.springframework.kafka.core.defaultkafkaproducerfactory;
import org.springframework.kafka.core.kafkatemplate;
import org.springframework.kafka.core.producerfactory;
 
import java.util.hashmap;
import java.util.map;
 
@configuration
public class senderconfig {
 
  @value("${kafka.bootstrap-servers}")
  private string bootstrapservers;
 
  @bean
  public map<string,object> producerconfigs() {
    map<string,object> props = new hashmap<>();
    props.put(producerconfig.bootstrap_servers_config,this.bootstrapservers);
    props.put(producerconfig.key_serializer_class_config, stringserializer.class);
    props.put(producerconfig.value_serializer_class_config,stringserializer.class);
    props.put(producerconfig.acks_config,"0");
    return props;
  }
 
  @bean
  public producerfactory<string,string> producerfactory() {
    return new defaultkafkaproducerfactory<>(producerconfigs());
  }
 
  @bean
  public kafkatemplate<string,string> kafkatemplate() {
    return new kafkatemplate<string, string>(producerfactory());
  }
 
  @bean
  public sender sender() {
    return new sender();
  }
}

消息發(fā)送類

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package com.dhb.kafka.producer;
 
import lombok.extern.slf4j.slf4j;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.kafka.core.kafkatemplate;
 
@slf4j
public class sender {
 
  @autowired
  private kafkatemplate<string,string> kafkatemplate;
 
  public void send(string topic,string payload) {
    log.info("sending payload='{}' to topic='{}'",payload,topic);
    this.kafkatemplate.send(topic,payload);
  }
}

3.消費者

參數(shù)配置類

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package com.dhb.kafka.consumer;
 
import org.apache.kafka.clients.consumer.consumerconfig;
import org.apache.kafka.common.serialization.stringdeserializer;
import org.springframework.beans.factory.annotation.value;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
import org.springframework.kafka.annotation.enablekafka;
import org.springframework.kafka.config.concurrentkafkalistenercontainerfactory;
import org.springframework.kafka.core.consumerfactory;
import org.springframework.kafka.core.defaultkafkaconsumerfactory;
 
import java.util.hashmap;
import java.util.map;
 
@configuration
@enablekafka
public class receiverconfig {
 
  @value("${kafka.bootstrap-servers}")
  private string bootstrapservers;
 
  public map<string,object> consumerconfigs() {
    map<string,object> props = new hashmap<>();
    props.put(consumerconfig.bootstrap_servers_config,bootstrapservers);
    props.put(consumerconfig.key_deserializer_class_config, stringdeserializer.class);
    props.put(consumerconfig.value_deserializer_class_config,stringdeserializer.class);
    props.put(consumerconfig.group_id_config,"helloword");
    return props;
  }
 
  @bean
  public consumerfactory<string,string> consumerfactory() {
    return new defaultkafkaconsumerfactory<>(consumerconfigs());
  }
 
  @bean
  public concurrentkafkalistenercontainerfactory<string,string> kafkalistenercontainerfactory() {
    concurrentkafkalistenercontainerfactory<string,string> factory =
        new concurrentkafkalistenercontainerfactory<>();
    factory.setconsumerfactory(consumerfactory());
    return factory;
  }
 
  @bean
  public receiver receiver() {
    return new receiver();
  }
 
}

消息接受類

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package com.dhb.kafka.consumer;
 
import lombok.extern.slf4j.slf4j;
import org.springframework.kafka.annotation.kafkalistener;
 
import java.util.concurrent.countdownlatch;
 
@slf4j
public class receiver {
 
  private countdownlatch latch = new countdownlatch(1);
 
  public countdownlatch getlatch() {
    return latch;
  }
 
  @kafkalistener(topics = "${kafka.topic.helloworld}")
  public void receive(string payload) {
    log.info("received payload='{}'",payload);
    latch.countdown();
  }
}

3.web測試類

定義了一個基于http的web測試接口

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.dhb.kafka.web;
 
import com.dhb.kafka.producer.sender;
import lombok.extern.slf4j.slf4j;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.web.bind.annotation.requestmapping;
import org.springframework.web.bind.annotation.requestmethod;
import org.springframework.web.bind.annotation.restcontroller;
 
import javax.servlet.http.httpservletrequest;
import javax.servlet.http.httpservletresponse;
import java.io.ioexception;
 
@restcontroller
@slf4j
public class kafkaproducer {
 
  @autowired
  sender sender;
 
  @requestmapping(value = "/sender.action", method = requestmethod.post)
  public void exec(httpservletrequest request, httpservletresponse response,string data) throws ioexception{
    this.sender.send("testtopic",data);
    response.setcharacterencoding("utf-8");
    response.setcontenttype("text/json");
    response.getwriter().write("success");
    response.getwriter().flush();
    response.getwriter().close();
  }
 
}

4.啟動類及配置

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
package com.dhb.kafka;
 
import org.springframework.boot.springapplication;
import org.springframework.boot.autoconfigure.springbootapplication;
 
@springbootapplication
public class kafkaapplication {
 
 
  public static void main(string[] args) {
    springapplication.run(kafkaapplication.class,args);
 
  }
}

application.yml

?
1
2
3
4
kafka:
 bootstrap-servers: 192.168.162.239:9092
 topic:
  helloworld: testtopic

程序結(jié)構(gòu):

在springboot中對kafka進行讀寫的示例代碼

包結(jié)構(gòu)

5.讀寫測試

通過執(zhí)行kafkaapplication的main方法啟動程序。然后打開postman進行測試:

在springboot中對kafka進行讀寫的示例代碼

運行后返回success

在springboot中對kafka進行讀寫的示例代碼

生產(chǎn)者日志:

在springboot中對kafka進行讀寫的示例代碼

消費者日志:

在springboot中對kafka進行讀寫的示例代碼

以上就是本文的全部內(nèi)容,希望對大家的學習有所幫助,也希望大家多多支持服務(wù)器之家。

原文鏈接:http://www.jianshu.com/p/3dcb64e49ac5?utm_source=tuicool&utm_medium=referral

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: 500av导航大全精品 | 美女用手扒自己下部 | 亚飞与亚基国语1080p在线观看 | gay男男白袜chinese | 娇小性色 | 俄罗斯极品h在线 | aaaa大片| 国产伦精品一区二区三区女 | 我们日本在线观看免费动漫下载 | 欧美性一级交视频 | 女子监狱第二季在线观看免费完整版 | 91精品国产麻豆国产自产在线 | 精品久久综合一区二区 | 亚洲成综合人影院在院播放 | 高清在线看 | 日本大片免aaa费观看视频 | 日本一区二区视频免费播放 | 潘金莲西门庆一级淫片aaaaaa | 狠狠色狠狠色综合婷婷tag | 久久亚洲精品AV成人无 | 亚洲AV无码乱码在线观看浪潮 | 色屁屁www | 色哟哟国产成人精品 | 侮辱丰满美丽的人妻 | 美女扒开腿让男生桶爽漫画 | 骚虎网站在线观看 | 国产精品亚洲精品日韩已方 | 草草在线视频 | 美女奶口隐私免费视频网站 | 女同色图 | 国色天香社区视频在线观看免费完整版 | 国产男人天堂 | 免费观看小视频 | 国内亚州视频在线观看 | 欧美性4khd720 | 国产一区二区三区四区波多野结衣 | 日韩a一级欧美一级 | 暖暖高清日本在线 | 久久久影院亚洲精品 | 日本aa大片在线播放免费看 | 色依依视频视频在线观看 |