一区二区三区在线-一区二区三区亚洲视频-一区二区三区亚洲-一区二区三区午夜-一区二区三区四区在线视频-一区二区三区四区在线免费观看

服務器之家:專注于服務器技術及軟件下載分享
分類導航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術|正則表達式|C/C++|IOS|C#|Swift|Android|VB|R語言|JavaScript|易語言|vb.net|

服務器之家 - 編程語言 - Java教程 - Springboot+Netty+Websocket實現消息推送實例

Springboot+Netty+Websocket實現消息推送實例

2021-08-03 10:33青椒1013 Java教程

這篇文章主要介紹了Springboot+Netty+Websocket實現消息推送實例,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下

前言

websocket 使得客戶端和服務器之間的數據交換變得更加簡單,允許服務端主動向客戶端推送數據。在 websocket api 中,瀏覽器和服務器只需要完成一次握手,兩者之間就直接可以創建持久性的連接,并進行雙向數據傳輸。

netty框架的優勢

 1. api使用簡單,開發門檻低;
 2. 功能強大,預置了多種編解碼功能,支持多種主流協議;
 3. 定制能力強,可以通過channelhandler對通信框架進行靈活地擴展;
 4. 性能高,通過與其他業界主流的nio框架對比,netty的綜合性能最優;
 5. 成熟、穩定,netty修復了已經發現的所有jdk nio bug,業務開發人員不需要再為nio的bug而煩惱

提示:以下是本篇文章正文內容,下面案例可供參考

一、引入netty依賴

?
1
2
3
4
5
<dependency>
   <groupid>io.netty</groupid>
   <artifactid>netty-all</artifactid>
   <version>4.1.48.final</version>
</dependency>

二、使用步驟

1.引入基礎配置類

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package com.test.netty;
 
public enum cmd {
 start("000", "連接成功"),
 wmessage("001", "消息提醒"),
 ;
 private string cmd;
 private string desc;
 
 cmd(string cmd, string desc) {
  this.cmd = cmd;
  this.desc = desc;
 }
 
 public string getcmd() {
  return cmd;
 }
 
 public string getdesc() {
  return desc;
 }
}

2.netty服務啟動監聽器

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package com.test.netty;
 
import io.netty.bootstrap.serverbootstrap;
import io.netty.channel.channelfuture;
import io.netty.channel.channeloption;
import io.netty.channel.eventloopgroup;
import io.netty.channel.nio.nioeventloopgroup;
import io.netty.channel.socket.nio.nioserversocketchannel;
import lombok.extern.slf4j.slf4j;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.beans.factory.annotation.value;
import org.springframework.boot.applicationrunner;
import org.springframework.context.annotation.bean;
import org.springframework.stereotype.component;
 
/**
 * @author test
 * <p>
 * 服務啟動監聽器
 **/
@slf4j
@component
public class nettyserver {
 
 @value("${server.netty.port}")
 private int port;
 
 @autowired
 private serverchannelinitializer serverchannelinitializer;
 
 @bean
 applicationrunner nettyrunner() {
  return args -> {
   //new 一個主線程組
   eventloopgroup bossgroup = new nioeventloopgroup(1);
   //new 一個工作線程組
   eventloopgroup workgroup = new nioeventloopgroup();
   serverbootstrap bootstrap = new serverbootstrap()
     .group(bossgroup, workgroup)
     .channel(nioserversocketchannel.class)
     .childhandler(serverchannelinitializer)
     //設置隊列大小
     .option(channeloption.so_backlog, 1024)
     // 兩小時內沒有數據的通信時,tcp會自動發送一個活動探測數據報文
     .childoption(channeloption.so_keepalive, true);
   //綁定端口,開始接收進來的連接
   try {
    channelfuture future = bootstrap.bind(port).sync();
    log.info("服務器啟動開始監聽端口: {}", port);
    future.channel().closefuture().sync();
   } catch (interruptedexception e) {
    e.printstacktrace();
   } finally {
    //關閉主線程組
    bossgroup.shutdowngracefully();
    //關閉工作線程組
    workgroup.shutdowngracefully();
   }
  };
 }
}

3.netty服務端處理器

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package com.test.netty;
 
import com.test.common.util.jsonutil;
import io.netty.channel.channel;
import io.netty.channel.channelhandler;
import io.netty.channel.channelhandlercontext;
import io.netty.channel.simplechannelinboundhandler;
import io.netty.handler.codec.http.websocketx.textwebsocketframe;
import io.netty.handler.codec.http.websocketx.websocketserverprotocolhandler;
import lombok.data;
import lombok.extern.slf4j.slf4j;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.stereotype.component;
 
import java.net.urldecoder;
import java.util.*;
 
/**
 * @author test
 * <p>
 * netty服務端處理器
 **/
@slf4j
@component
@channelhandler.sharable
public class nettyserverhandler extends simplechannelinboundhandler<textwebsocketframe> {
 
 @autowired
 private serverchannelcache cache;
 private static final string datakey = "test=";
 
 @data
 public static class channelcache {
 }
 
 
 /**
  * 客戶端連接會觸發
  */
 @override
 public void channelactive(channelhandlercontext ctx) throws exception {
  channel channel = ctx.channel();
  log.info("通道連接已打開,id->{}......", channel.id().aslongtext());
 }
 
 @override
 public void usereventtriggered(channelhandlercontext ctx, object evt) throws exception {
  if (evt instanceof websocketserverprotocolhandler.handshakecomplete) {
   channel channel = ctx.channel();
   websocketserverprotocolhandler.handshakecomplete handshakecomplete = (websocketserverprotocolhandler.handshakecomplete) evt;
   string requesturi = handshakecomplete.requesturi();
   requesturi = urldecoder.decode(requesturi, "utf-8");
   log.info("handshake_complete,id->{},uri->{}", channel.id().aslongtext(), requesturi);
   string socketkey = requesturi.substring(requesturi.lastindexof(datakey) + datakey.length());
   if (socketkey.length() > 0) {
    cache.add(socketkey, channel);
    this.send(channel, cmd.down_start, null);
   } else {
    channel.disconnect();
    ctx.close();
   }
  }
  super.usereventtriggered(ctx, evt);
 }
 
 @override
 public void channelinactive(channelhandlercontext ctx) throws exception {
  channel channel = ctx.channel();
  log.info("通道連接已斷開,id->{},用戶id->{}......", channel.id().aslongtext(), cache.getcacheid(channel));
  cache.remove(channel);
 }
 
 /**
  * 發生異常觸發
  */
 @override
 public void exceptioncaught(channelhandlercontext ctx, throwable cause) throws exception {
  channel channel = ctx.channel();
  log.error("連接出現異常,id->{},用戶id->{},異常->{}......", channel.id().aslongtext(), cache.getcacheid(channel), cause.getmessage(), cause);
  cache.remove(channel);
  ctx.close();
 }
 
 /**
  * 客戶端發消息會觸發
  */
 @override
 protected void channelread0(channelhandlercontext ctx, textwebsocketframe msg) throws exception {
  try {
   // log.info("接收到客戶端發送的消息:{}", msg.text());
   ctx.channel().writeandflush(new textwebsocketframe(jsonutil.tostring(collections.singletonmap("cmd", "100"))));
  } catch (exception e) {
   log.error("消息處理異常:{}", e.getmessage(), e);
  }
 }
 
 public void send(cmd cmd, string id, object obj) {
  hashmap<string, channel> channels = cache.get(id);
  if (channels == null) {
   return;
  }
  map<string, object> data = new linkedhashmap<>();
  data.put("cmd", cmd.getcmd());
  data.put("data", obj);
  string msg = jsonutil.tostring(data);
  log.info("服務器下發消息: {}", msg);
  channels.values().foreach(channel -> {
   channel.writeandflush(new textwebsocketframe(msg));
  });
 }
 
 public void send(channel channel, cmd cmd, object obj) {
  map<string, object> data = new linkedhashmap<>();
  data.put("cmd", cmd.getcmd());
  data.put("data", obj);
  string msg = jsonutil.tostring(data);
  log.info("服務器下發消息: {}", msg);
  channel.writeandflush(new textwebsocketframe(msg));
 }
 
}

4.netty服務端緩存類

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package com.test.netty;
 
import io.netty.channel.channel;
import io.netty.util.attributekey;
import org.springframework.stereotype.component;
 
import java.util.hashmap;
import java.util.concurrent.concurrenthashmap;
 
@component
public class serverchannelcache {
 private static final concurrenthashmap<string, hashmap<string, channel>> cache_map = new concurrenthashmap<>();
 private static final attributekey<string> channel_attr_key = attributekey.valueof("test");
 
 public string getcacheid(channel channel) {
  return channel.attr(channel_attr_key).get();
 }
 
 public void add(string cacheid, channel channel) {
  channel.attr(channel_attr_key).set(cacheid);
  hashmap<string, channel> hashmap = cache_map.get(cacheid);
  if (hashmap == null) {
   hashmap = new hashmap<>();
  }
  hashmap.put(channel.id().asshorttext(), channel);
  cache_map.put(cacheid, hashmap);
 }
 
 public hashmap<string, channel> get(string cacheid) {
  if (cacheid == null) {
   return null;
  }
  return cache_map.get(cacheid);
 }
 
 public void remove(channel channel) {
  string cacheid = getcacheid(channel);
  if (cacheid == null) {
   return;
  }
  hashmap<string, channel> hashmap = cache_map.get(cacheid);
  if (hashmap == null) {
   hashmap = new hashmap<>();
  }
  hashmap.remove(channel.id().asshorttext());
  cache_map.put(cacheid, hashmap);
 }
}

5.netty服務初始化器

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.test.netty;
 
import io.netty.channel.channelinitializer;
import io.netty.channel.channelpipeline;
import io.netty.channel.socket.socketchannel;
import io.netty.handler.codec.http.httpobjectaggregator;
import io.netty.handler.codec.http.httpservercodec;
import io.netty.handler.codec.http.websocketx.websocketserverprotocolhandler;
import io.netty.handler.stream.chunkedwritehandler;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.stereotype.component;
 
/**
 * @author test
 * <p>
 * netty服務初始化器
 **/
@component
public class serverchannelinitializer extends channelinitializer<socketchannel> {
 
 @autowired
 private nettyserverhandler nettyserverhandler;
 
 @override
 protected void initchannel(socketchannel socketchannel) throws exception {
  channelpipeline pipeline = socketchannel.pipeline();
  pipeline.addlast(new httpservercodec());
  pipeline.addlast(new chunkedwritehandler());
  pipeline.addlast(new httpobjectaggregator(8192));
  pipeline.addlast(new websocketserverprotocolhandler("/test.io", true, 5000));
  pipeline.addlast(nettyserverhandler);
 }
}

6.html測試

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
<!doctype html>
<html>
 <head>
 <meta charset="utf-8">
 <title>test</title>
 
  <script type="text/javascript">
   function websockettest()
   {
   if ("websocket" in window)
   {
    alert("您的瀏覽器支持 websocket!");
    
    // 打開一個 web socket
    var ws = new websocket("ws://localhost:port/test.io");
    
    ws.onopen = function()
    {
     // web socket 已連接上,使用 send() 方法發送數據
     ws.send("發送數據");
     alert("數據發送中...");
    };
    
    ws.onmessage = function (evt)
    {
     var received_msg = evt.data;
     alert("數據已接收...");
    };
    
    ws.onclose = function()
    {
     // 關閉 websocket
     alert("連接已關閉...");
    };
   }
   
   else
   {
    // 瀏覽器不支持 websocket
    alert("您的瀏覽器不支持 websocket!");
   }
   }
  </script>
  
 </head>
 <body>
 
  <div id="sse">
   <a href="javascript:websockettest()" rel="external nofollow" >運行 websocket</a>
  </div>
  
 </body>
</html>

7.vue測試

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
mounted() {
   this.initwebsocket();
  },
  methods: {
   initwebsocket() {
    let websocket = new websocket('ws://localhost:port/test.io?test=123456');
    websocket.onmessage = (event) => {
     let msg = json.parse(event.data);
     switch (msg.cmd) {
      case "000":
       this.$message({
        type: 'success',
        message: "建立實時連接成功!",
        duration: 1000
       })
       setinterval(()=>{websocket.send("heartbeat")},60*1000);
       break;
      case "001":
       this.$message.warning("收到一條新的信息,請及時查看!")
       break;
     }
    }
    websocket.onclose = () => {
     settimeout(()=>{
      this.initwebsocket();
     },30*1000);
    }
    websocket.onerror = () => {
     settimeout(()=>{
      this.initwebsocket();
     },30*1000);
    }
   },
  },
![在這里插入圖片描述](https://img-blog.csdnimg.cn/20210107160420568.jpg?x-oss-process=image/watermark,type_zmfuz3pozw5nagvpdgk,shadow_10,text_ahr0chm6ly9ibg9nlmnzzg4ubmv0l3d1x3fpbmdfc29uzw==,size_16,color_ffffff,t_70#pic_center)

8.服務器下發消息

?
1
2
3
@autowired
    private nettyserverhandler nettyserverhandler;
nettyserverhandler.send(cmdweb.wmessage, id, message);

到此這篇關于springboot+netty+websocket實現消息推送實例的文章就介紹到這了,更多相關springboot websocket消息推送內容請搜索服務器之家以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持服務器之家!

原文鏈接:https://blog.csdn.net/wu_qing_song/article/details/112311860

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: 日本免费三片在线观看 | 射西西| 亚洲国产午夜看片 | 91在线播 | 国产视频久久久 | 国产一区二区三区久久小说 | 四虎成人永久地址 | 亚洲日本久久一区二区va | 免费xxxxx大片在线观看影视 | 九色PORNY真实丨国产免费 | 国产精品午夜国产小视频 | 亚洲好视频 | www.伊人| 小兰被扒开内裤露出p | 506rr亚洲欧美 | 韩国日本香港毛片免费 | 2022最新a精品视频在线观看 | 日韩一区视频在线 | 99人中文字幕亚洲区 | 4hc44四虎www在线影院男同 | 日本在线一区 | 亚洲欧美日韩中文高清一 | 波多野结衣 在线 | 日本aa大片在线播放免费看 | 肥胖女人一级毛片 | 天天做日日做天天添天天欢公交车 | 女人又色又爽又黄 | 性做久久久久久 | 99精品国产高清一区二区三区香蕉 | 电车痴汉(han) | 日本视频一区在线观看免费 | www日本高清视频 | 免费国产之a视频 | 香蕉久久夜色精品国产小优 | 国产在线视频欧美亚综合 | 91桃花| 亚洲 综合 欧美在线视频 | 极品蜜桃臀美女啪啪 | 国产乱叫456在线 | 嫩草成人影院 | 国产麻豆视频 |