一、異步通信
之前接觸到的rmi,hessian等技術都是同步通信機制。當客戶端調用遠程方法時,客戶端必須等到遠程方法完成后,才能繼續執行。這段時間客戶端一直會被阻塞(這樣造成的用戶體驗很不好)。
(同步通信)
同步通信有并不是程序之間交互的唯一方式,異步通信機制中,客戶端不需要等待服務處理消息,可以繼續執行,并且最終能夠收到并處理消息。
(異步通信)
異步通信的優勢
無需等待。客戶端只需要將消息發送給消息代理,不需要等待就可以繼續執行別的任務,且確信消息會被投遞給相應的目的地。
面向消息和解耦。 客戶端不需要擔心遠程服務的接口規范,只需要把消息放入消息隊列然后獲取結果即可。
二、jms
1. 簡介
在jms出現之前,每個消息代理都是有不同的實現,這就使得不同代理之間的消息代碼很難通用。jms(java message service,java消息服務)是一個標準,定義了使用消息代理的通用api。即所有遵從規范的實現都使用通用的接口,類似于jdbc為數據庫操作提供通用接口。
jms幾個重要的要素:
destination:消息從發送端發出后要走的通道。
connectionfactory:連接工廠,用于創建連接的對象。
connection:連接接口,用于創建session。
session:會話接口,用于創建消息的發送者,接受者以及消息對象本身。
messageconsumer:消息的消費者。
messageproducer:消息的生產者。
xxxmessage:各種類型的消息對象,包括bytemessage、mapmessage、objectmessage、streammessage和textmessage 5種。
2. jms消息模型
不同的消息系統有不同的消息模型。jms提供了兩種模型:queue(點對點)和topic(發布/訂閱)。
jms queue(點對點)模型
在點對點模型中,消息生產者生產消息發送到queue中,然后消息消費者從queue中取出并且消費消息,但不可重復消費。
如圖:
發送者1,發送者2,發送者3各發送一條消息到服務器;
消息1,2,3就會按照順序形成一個隊列,隊列中的消息不知道自己會被哪個接收者消費;
接收者1,2,3分別從隊列中取出一條消息進行消費,每取出一條消息,隊列就會將該消息刪除,這樣即保證了消息不會被重復消費。
jms queue模型也成為p2p(point to point)模型。
jms topic(發布/訂閱)模型
jms topic模型與jms queue模型的最大差別在于消息接收的部分。topic模型類似于微信公眾號,訂閱了該公眾號的接收者都可以接收到公眾號推送的消息。
如圖:
發布者1,2,3分別發布3個主題1,2,3;
這樣訂閱了主題1的用戶群:訂閱者1,2,3即能接收到主題1消息;同理訂閱者4,5,6即能接收到主題2消息,訂閱者7,8,9即能接收到主題3消息。
jms topic模型也成為pus/sub模型。
兩種模式下各要素的對比:
3. 傳統jms編程模型
producer:
(1)創建連接工廠connectionfactory;
(2) 使用連接工廠創建連接;
(3)啟動連接;
(4)創建會話;
(5) 創建消息發送的目的地;
(6)創建生產者;
(7)創建消息類型和消息內容;
(8)發送消息;
consumer:
(1)創建連接工廠connectionfactory;
(2) 使用連接工廠創建連接;
(3)啟動連接;
(4)創建會話;
(5) 創建消息發送的目的地;
(6)創建消費者
(7)創建消息類型;
(8)接收消息;
三、 activemq簡介
activemq 是apache出品,最流行的,能力強勁的開源消息總線。activemq 是一個完全支持jms1.1和j2ee 1.4規范的 jms provider實現,盡管jms規范出臺已經是很久的事情了,但是jms在當今的j2ee應用中間仍然扮演著特殊的地位。
activemq 主要特性:
多種語言和協議編寫客戶端。語言: java,c,c++,c#,ruby,perl,python,php。應用協議:
openwire,stomp rest,ws notification,xmpp,amqp
完全支持jms1.1和j2ee 1.4規范 (持久化,xa消息,事務)
對spring的支持,activemq可以很容易內嵌到使用spring的系統里面去,而且也支持spring2.0的特性
通過了常見j2ee服務器(如 geronimo,jboss 4,glassfish,weblogic)的測試,其中通過jca 1.5 resource adaptors的配置,可以讓activemq可以自動的部署到任何兼容j2ee 1.4 商業服務器上
支持多種傳送協議:in-vm,tcp,ssl,nio,udp,jgroups,jxta
支持通過jdbc和journal提供高速的消息持久化
從設計上保證了高性能的集群,客戶端-服務器,點對點
支持ajax
支持與axis的整合
可以很容易得調用內嵌jms provider,進行測試
四、 activemq實戰
下面看看如何activemq實現一個簡單的消息隊列。
傳統的jms編程模型
1. jms queue模型代碼實現:
producer:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
package com.wgs.mq.queue; import org.apache.activemq.activemqconnectionfactory; import javax.jms.*; /** * created by genshenwang.nomico on 2017/10/19. */ public class activemqproducer { private static final string url = "tcp://localhost:61616" ; private static final string queue_name = "queue-name" ; public static void main(string[] args) throws jmsexception { //1 創建連接工廠connectionfactory connectionfactory connectionfactory = new activemqconnectionfactory(url); //2 使用連接工廠創建連接 connection connection = connectionfactory.createconnection(); //3 啟動連接 connection.start(); //4 創建會話 session session = connection.createsession( false , session.auto_acknowledge); //5 創建消息發送的目的地 destination destination = session.createqueue(queue_name); //6 創建生產者 messageproducer messageproducer = session.createproducer(destination); //7 創建消息 textmessage textmessage = session.createtextmessage(); for ( int i = 1 ; i <= 100 ; i++) { //8 創建消息內容 textmessage.settext( "發送者- 1 -發送消息:" + i); //9 發送消息 messageproducer.send(textmessage); } system.out.println( "消息發送成功" ); session.close(); connection.close(); } } |
conusmer:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
package com.wgs.mq.queue; import org.apache.activemq.activemqconnectionfactory; import javax.jms.*; /** * created by genshenwang.nomico on 2017/10/19. */ public class activemqconsumer { private static final string url = "tcp://localhost:61616" ; private static final string queue_name = "queue-name" ; public static void main(string[] args) throws jmsexception { //1 創建連接工廠connectionfactory connectionfactory connectionfactory = new activemqconnectionfactory(url); //2 使用連接工廠創建連接 connection connection = connectionfactory.createconnection(); //3 啟動連接 connection.start(); //4 創建會話 session session = connection.createsession( false , session.auto_acknowledge); //5 創建消息發送的目的地 destination destination = session.createqueue(queue_name); //6 創建消費者 messageconsumer messageconsumer = session.createconsumer(destination); messageconsumer.setmessagelistener( new messagelistener() { public void onmessage(message message) { //7 創建消息 textmessage textmessage = (textmessage)message; try { //7 接收消息 system.out.println( "消費者- 1 -接收消息:【" + textmessage.gettext() + "】" ); } catch (jmsexception e) { e.printstacktrace(); } } } ); } } |
2. jms topic模型代碼實現:
producer:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
package com.wgs.mq.topic; import org.apache.activemq.activemqconnectionfactory; import javax.jms.*; /** * 發布訂閱模式 * created by genshenwang.nomico on 2017/10/19. */ public class activemqproducer { private static final string url = "tcp://localhost:61616" ; private static final string topic_name = "topic-name" ; public static void main(string[] args) throws jmsexception { //1 創建連接工廠connectionfactory connectionfactory connectionfactory = new activemqconnectionfactory(url); //2 使用連接工廠創建連接 connection connection = connectionfactory.createconnection(); //3 啟動連接 connection.start(); //4 創建會話 session session = connection.createsession( false , session.auto_acknowledge); //5 創建帶有主題的消息發送的目的地 destination destination = session.createtopic(topic_name); //6 創建生產者 messageproducer messageproducer = session.createproducer(destination); //7 創建消息 textmessage textmessage = session.createtextmessage(); for ( int i = 1 ; i <= 100 ; i++) { //8 創建消息內容 textmessage.settext( "發送者- 1 -發送消息:" + i); //9 發送消息 messageproducer.send(textmessage); } system.out.println( "消息發送成功" ); session.close(); connection.close(); } } |
consumer:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
package com.wgs.mq.topic; import org.apache.activemq.activemqconnectionfactory; import javax.jms.*; /** * 發布訂閱模式 * created by genshenwang.nomico on 2017/10/19. */ public class activemqconsumer { private static final string url = "tcp://localhost:61616" ; private static final string topic_name = "topic-name" ; public static void main(string[] args) throws jmsexception { //1 創建連接工廠connectionfactory connectionfactory connectionfactory = new activemqconnectionfactory(url); //2 使用連接工廠創建連接 connection connection = connectionfactory.createconnection(); //3 啟動連接 connection.start(); //4 創建會話 session session = connection.createsession( false , session.auto_acknowledge); //5 創建消息發送的目的地 destination destination = session.createtopic(topic_name); //6 創建消費者 messageconsumer messageconsumer = session.createconsumer(destination); messageconsumer.setmessagelistener( new messagelistener() { public void onmessage(message message) { //7 創建消息 textmessage textmessage = (textmessage)message; try { //7 接收消息 system.out.println( "消費者- 1 -接收消息:【" + textmessage.gettext() + "】" ); } catch (jmsexception e) { e.printstacktrace(); } } } ); } } |
使用spring的jms模板
雖然jms為所有的消息代理提供了統一的接口,但如同jdbc一樣,在處理連接,語句,結果集和異常時會顯得很繁雜。不過,spring為我們提供了jmstemplate來消除冗余和重復的jms代碼。
下面看看如何使用jmstemplate來實現消息隊列。
1. jms queue模型代碼實現:
配置文件:
producer.xml:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
<?xml version= "1.0" encoding= "utf-8" ?> <beans xmlns= "http://www.springframework.org/schema/beans" xmlns:xsi= "http://www.w3.org/2001/xmlschema-instance" xmlns:context= "http://www.springframework.org/schema/context" xsi:schemalocation= "http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd" > <context:annotation-config/> <!-- activemq提供的connectionfactory--> <bean id= "targetconnectionfactory" class = "org.apache.activemq.activemqconnectionfactory" > <property name= "brokerurl" value= "tcp://localhost:61616" /> </bean> <!-- 在spring 中配置jms連接工廠,連接到activemq提供的connectionfactory--> <bean id= "connectionfactory" class = "org.springframework.jms.connection.singleconnectionfactory" > <property name= "targetconnectionfactory" ref = "targetconnectionfactory" /> </bean> <!-- 配置jmstemplate,用于發送消息 --> <bean id= "jmstemplate" class = "org.springframework.jms.core.jmstemplate" > <property name= "connectionfactory" ref= "connectionfactory" /> </bean> <!-- 配置隊列目的地的名稱--> <bean id= "queuedestination" class = "org.apache.activemq.command.activemqqueue" > <constructor-arg value= "queue-spring-name" /> </bean> <!-- 配置隊列目的地的名稱--> <bean id= "topicdestination" class = "org.apache.activemq.command.activemqtopic" > <constructor-arg value= "topic-spring-name" /> </bean> <bean id= "producerserviceimpl" class = "com.wgs.jms.producer.activemqproducerserviceimpl" /> </beans> |
consumer.xml:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
<?xml version= "1.0" encoding= "utf-8" ?> <beans xmlns= "http://www.springframework.org/schema/beans" xmlns:xsi= "http://www.w3.org/2001/xmlschema-instance" xmlns:context= "http://www.springframework.org/schema/context" xsi:schemalocation= "http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd" > <context:annotation-config/> <!-- activemq提供的connectionfactory--> <bean id= "targetconnectionfactory" class = "org.apache.activemq.activemqconnectionfactory" > <property name= "brokerurl" value= "tcp://localhost:61616" /> </bean> <!-- 在spring 中配置jms連接工廠,連接到activemq提供的connectionfactory--> <bean id= "connectionfactory" class = "org.springframework.jms.connection.singleconnectionfactory" > <property name= "targetconnectionfactory" ref = "targetconnectionfactory" /> </bean> <!-- 配置隊列目的地的名稱--> <bean id= "queuedestination" class = "org.apache.activemq.command.activemqqueue" > <constructor-arg value= "queue-spring-name" /> </bean> <!-- 配置消息監聽器--> <bean id= "consumermessagelistener" class = "com.wgs.jms.consumer.consumermessagelistener" /> <!-- 配置隊列目的地的名稱--> <bean id= "jmscontainer" class = "org.springframework.jms.listener.defaultmessagelistenercontainer" > <property name= "destination" ref= "queuedestination" /> <property name= "connectionfactory" ref= "connectionfactory" /> <property name= "messagelistener" ref= "consumermessagelistener" /> </bean> <!-- 配置隊列目的地的名稱--> <bean id= "topicdestination" class = "org.apache.activemq.command.activemqtopic" > <constructor-arg value= "topic-spring-name" /> </bean> </beans> |
生產者producer:
(1)先寫一個接口:
1
2
3
4
5
6
7
|
package com.wgs.jms.producer; /** * created by genshenwang.nomico on 2017/10/20. */ public interface activemqproducerservice { void sendmessage( final string message); } |
(2)接口的實現:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
package com.wgs.jms.producer; import org.springframework.beans.factory.annotation.autowired; import org.springframework.jms.core.jmstemplate; import org.springframework.jms.core.messagecreator; import javax.annotation.resource; import javax.jms.*; /** * created by genshenwang.nomico on 2017/10/20. */ public class activemqproducerserviceimpl implements activemqproducerservice { @autowired jmstemplate jmstemplate; @resource (name = "queuedestination" ) destination destination; public void sendmessage( final string message) { jmstemplate.send(destination, new messagecreator() { public message createmessage(session session) throws jmsexception { textmessage textmessage = session.createtextmessage(message); return textmessage; } } ); system.out.println( "生產者- 1 -發送消息成功:" + message); } } |
(3)測試:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
package com.wgs.jms.producer; import org.springframework.context.support.classpathxmlapplicationcontext; /** * created by genshenwang.nomico on 2017/10/20. */ public class activemqproducermain { public static void main(string[] args) { classpathxmlapplicationcontext context = new classpathxmlapplicationcontext( "producer.xml" ); activemqproducerservice service = context.getbean(activemqproducerservice. class ); for ( int i = 0 ; i < 100 ; i++) { service.sendmessage( "test" + i); } context.close(); } } |
消費者:
(1)創建消息監聽器:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
package com.wgs.jms.consumer; import javax.jms.jmsexception; import javax.jms.message; import javax.jms.messagelistener; import javax.jms.textmessage; /** * created by genshenwang.nomico on 2017/10/20. */ public class consumermessagelistener implements messagelistener { public void onmessage(message message) { try { textmessage textmessage = (textmessage) message; system.out.println( "消費者- 1 -接收消息:" + textmessage.gettext()); } catch (jmsexception e) { e.printstacktrace(); } } } |
(2)測試:
1
2
3
4
5
6
7
8
9
10
|
package com.wgs.jms.consumer; import org.springframework.context.support.classpathxmlapplicationcontext; /** * created by genshenwang.nomico on 2017/10/20. */ public class activemqconsumermain { public static void main(string[] args) { classpathxmlapplicationcontext context = new classpathxmlapplicationcontext( "consumer.xml" ); } } |
2. jms topic模型代碼實現:
將上述代碼中出現的queuedestination改為topicdestination即可。
總結
以上就是本文關于jms簡介與activemq實戰代碼分享的全部內容,希望對大家有所幫助。感興趣的朋友可以繼續參閱本站其他相關專題,如有不足之處,歡迎留言指出。感謝朋友們對本站的支持!
原文鏈接:http://blog.csdn.net/noaman_wgs/article/details/78392718