一区二区三区在线-一区二区三区亚洲视频-一区二区三区亚洲-一区二区三区午夜-一区二区三区四区在线视频-一区二区三区四区在线免费观看

服務器之家:專注于服務器技術及軟件下載分享
分類導航

PHP教程|ASP.NET教程|JAVA教程|ASP教程|編程技術|正則表達式|C/C++|IOS|C#|Swift|Android|JavaScript|易語言|

服務器之家 - 編程語言 - JAVA教程 - JMS簡介與ActiveMQ實戰代碼分享

JMS簡介與ActiveMQ實戰代碼分享

2021-03-13 11:27nomico271 JAVA教程

這篇文章主要介紹了JMS簡介與ActiveMQ實戰代碼分享,具有一定借鑒價值,需要的朋友可以參考下

一、異步通信

之前接觸到的rmi,hessian等技術都是同步通信機制。當客戶端調用遠程方法時,客戶端必須等到遠程方法完成后,才能繼續執行。這段時間客戶端一直會被阻塞(這樣造成的用戶體驗很不好)。

JMS簡介與ActiveMQ實戰代碼分享

(同步通信)

同步通信有并不是程序之間交互的唯一方式,異步通信機制中,客戶端不需要等待服務處理消息,可以繼續執行,并且最終能夠收到并處理消息。

JMS簡介與ActiveMQ實戰代碼分享

(異步通信)

異步通信的優勢

無需等待。客戶端只需要將消息發送給消息代理,不需要等待就可以繼續執行別的任務,且確信消息會被投遞給相應的目的地。
面向消息和解耦。 客戶端不需要擔心遠程服務的接口規范,只需要把消息放入消息隊列然后獲取結果即可。

二、jms

1. 簡介

在jms出現之前,每個消息代理都是有不同的實現,這就使得不同代理之間的消息代碼很難通用。jms(java message service,java消息服務)是一個標準,定義了使用消息代理的通用api。即所有遵從規范的實現都使用通用的接口,類似于jdbc為數據庫操作提供通用接口。

jms幾個重要的要素:

destination:消息從發送端發出后要走的通道。
connectionfactory:連接工廠,用于創建連接的對象。
connection:連接接口,用于創建session。
session:會話接口,用于創建消息的發送者,接受者以及消息對象本身。
messageconsumer:消息的消費者。
messageproducer:消息的生產者。
xxxmessage:各種類型的消息對象,包括bytemessage、mapmessage、objectmessage、streammessage和textmessage 5種。

2. jms消息模型

不同的消息系統有不同的消息模型。jms提供了兩種模型:queue(點對點)和topic(發布/訂閱)。

jms queue(點對點)模型

在點對點模型中,消息生產者生產消息發送到queue中,然后消息消費者從queue中取出并且消費消息,但不可重復消費。

如圖:

JMS簡介與ActiveMQ實戰代碼分享

發送者1,發送者2,發送者3各發送一條消息到服務器;

消息1,2,3就會按照順序形成一個隊列,隊列中的消息不知道自己會被哪個接收者消費;

接收者1,2,3分別從隊列中取出一條消息進行消費,每取出一條消息,隊列就會將該消息刪除,這樣即保證了消息不會被重復消費。

jms queue模型也成為p2p(point to point)模型。

jms topic(發布/訂閱)模型

jms topic模型與jms queue模型的最大差別在于消息接收的部分。topic模型類似于微信公眾號,訂閱了該公眾號的接收者都可以接收到公眾號推送的消息。

如圖:

JMS簡介與ActiveMQ實戰代碼分享

發布者1,2,3分別發布3個主題1,2,3;
這樣訂閱了主題1的用戶群:訂閱者1,2,3即能接收到主題1消息;同理訂閱者4,5,6即能接收到主題2消息,訂閱者7,8,9即能接收到主題3消息。

jms topic模型也成為pus/sub模型。

兩種模式下各要素的對比:

JMS簡介與ActiveMQ實戰代碼分享

3. 傳統jms編程模型

producer:

(1)創建連接工廠connectionfactory;
(2) 使用連接工廠創建連接;
(3)啟動連接;
(4)創建會話;
(5) 創建消息發送的目的地;
(6)創建生產者;
(7)創建消息類型和消息內容;
(8)發送消息;

consumer:

(1)創建連接工廠connectionfactory;
(2) 使用連接工廠創建連接;
(3)啟動連接;
(4)創建會話;
(5) 創建消息發送的目的地;
(6)創建消費者
(7)創建消息類型;
(8)接收消息;

三、 activemq簡介

activemq 是apache出品,最流行的,能力強勁的開源消息總線。activemq 是一個完全支持jms1.1和j2ee 1.4規范的 jms provider實現,盡管jms規范出臺已經是很久的事情了,但是jms在當今的j2ee應用中間仍然扮演著特殊的地位。

activemq 主要特性:

多種語言和協議編寫客戶端。語言: java,c,c++,c#,ruby,perl,python,php。應用協議:
openwire,stomp rest,ws notification,xmpp,amqp
完全支持jms1.1和j2ee 1.4規范 (持久化,xa消息,事務)
對spring的支持,activemq可以很容易內嵌到使用spring的系統里面去,而且也支持spring2.0的特性
通過了常見j2ee服務器(如 geronimo,jboss 4,glassfish,weblogic)的測試,其中通過jca 1.5 resource adaptors的配置,可以讓activemq可以自動的部署到任何兼容j2ee 1.4 商業服務器上
支持多種傳送協議:in-vm,tcp,ssl,nio,udp,jgroups,jxta
支持通過jdbc和journal提供高速的消息持久化
從設計上保證了高性能的集群,客戶端-服務器,點對點
支持ajax
支持與axis的整合
可以很容易得調用內嵌jms provider,進行測試

四、 activemq實戰

下面看看如何activemq實現一個簡單的消息隊列。

傳統的jms編程模型

1. jms queue模型代碼實現:

producer:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package com.wgs.mq.queue;
import org.apache.activemq.activemqconnectionfactory;
import javax.jms.*;
/**
 * created by genshenwang.nomico on 2017/10/19.
 */
public class activemqproducer {
    private static final string url = "tcp://localhost:61616";
    private static final string queue_name = "queue-name";
    public static void main(string[] args) throws jmsexception {
        //1 創建連接工廠connectionfactory
        connectionfactory connectionfactory = new activemqconnectionfactory(url);
        //2 使用連接工廠創建連接
        connection connection = connectionfactory.createconnection();
        //3 啟動連接
        connection.start();
        //4 創建會話
        session session = connection.createsession(false, session.auto_acknowledge);
        //5 創建消息發送的目的地
        destination destination = session.createqueue(queue_name);
        //6 創建生產者
        messageproducer messageproducer = session.createproducer(destination);
        //7 創建消息
        textmessage textmessage = session.createtextmessage();
        for (int i = 1; i <= 100; i++) {
            //8 創建消息內容
            textmessage.settext("發送者- 1 -發送消息:" + i);
            //9 發送消息
            messageproducer.send(textmessage);
        }
        system.out.println("消息發送成功");
        session.close();
        connection.close();
    }
}

conusmer:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package com.wgs.mq.queue;
import org.apache.activemq.activemqconnectionfactory;
import javax.jms.*;
/**
 * created by genshenwang.nomico on 2017/10/19.
 */
public class activemqconsumer {
    private static final string url = "tcp://localhost:61616";
    private static final string queue_name = "queue-name";
    public static void main(string[] args) throws jmsexception {
        //1 創建連接工廠connectionfactory
        connectionfactory connectionfactory = new activemqconnectionfactory(url);
        //2 使用連接工廠創建連接
        connection connection = connectionfactory.createconnection();
        //3 啟動連接
        connection.start();
        //4 創建會話
        session session = connection.createsession(false, session.auto_acknowledge);
        //5 創建消息發送的目的地
        destination destination = session.createqueue(queue_name);
        //6 創建消費者
        messageconsumer messageconsumer = session.createconsumer(destination);
        messageconsumer.setmessagelistener(new messagelistener() {
            public void onmessage(message message) {
                //7 創建消息
                textmessage textmessage = (textmessage)message;
                try {
                    //7 接收消息
                    system.out.println("消費者- 1 -接收消息:【" + textmessage.gettext() + "】");
                }
                catch (jmsexception e) {
                    e.printstacktrace();
                }
            }
        }
        );
    }
}

2. jms topic模型代碼實現:

producer:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package com.wgs.mq.topic;
import org.apache.activemq.activemqconnectionfactory;
import javax.jms.*;
/**
 * 發布訂閱模式
 * created by genshenwang.nomico on 2017/10/19.
 */
public class activemqproducer {
    private static final string url = "tcp://localhost:61616";
    private static final string topic_name = "topic-name";
    public static void main(string[] args) throws jmsexception {
        //1 創建連接工廠connectionfactory
        connectionfactory connectionfactory = new activemqconnectionfactory(url);
        //2 使用連接工廠創建連接
        connection connection = connectionfactory.createconnection();
        //3 啟動連接
        connection.start();
        //4 創建會話
        session session = connection.createsession(false, session.auto_acknowledge);
        //5 創建帶有主題的消息發送的目的地
        destination destination = session.createtopic(topic_name);
        //6 創建生產者
        messageproducer messageproducer = session.createproducer(destination);
        //7 創建消息
        textmessage textmessage = session.createtextmessage();
        for (int i = 1; i <= 100; i++) {
            //8 創建消息內容
            textmessage.settext("發送者- 1 -發送消息:" + i);
            //9 發送消息
            messageproducer.send(textmessage);
        }
        system.out.println("消息發送成功");
        session.close();
        connection.close();
    }
}

consumer:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package com.wgs.mq.topic;
import org.apache.activemq.activemqconnectionfactory;
import javax.jms.*;
/**
 * 發布訂閱模式
 * created by genshenwang.nomico on 2017/10/19.
 */
public class activemqconsumer {
    private static final string url = "tcp://localhost:61616";
    private static final string topic_name = "topic-name";
    public static void main(string[] args) throws jmsexception {
        //1 創建連接工廠connectionfactory
        connectionfactory connectionfactory = new activemqconnectionfactory(url);
        //2 使用連接工廠創建連接
        connection connection = connectionfactory.createconnection();
        //3 啟動連接
        connection.start();
        //4 創建會話
        session session = connection.createsession(false, session.auto_acknowledge);
        //5 創建消息發送的目的地
        destination destination = session.createtopic(topic_name);
        //6 創建消費者
        messageconsumer messageconsumer = session.createconsumer(destination);
        messageconsumer.setmessagelistener(new messagelistener() {
            public void onmessage(message message) {
                //7 創建消息
                textmessage textmessage = (textmessage)message;
                try {
                    //7 接收消息
                    system.out.println("消費者- 1 -接收消息:【" + textmessage.gettext() + "】");
                }
                catch (jmsexception e) {
                    e.printstacktrace();
                }
            }
        }
        );
    }
}

使用spring的jms模板

雖然jms為所有的消息代理提供了統一的接口,但如同jdbc一樣,在處理連接,語句,結果集和異常時會顯得很繁雜。不過,spring為我們提供了jmstemplate來消除冗余和重復的jms代碼。
下面看看如何使用jmstemplate來實現消息隊列。

1. jms queue模型代碼實現:

配置文件:
producer.xml:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
<?xml version="1.0" encoding="utf-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/xmlschema-instance"
    xmlns:context="http://www.springframework.org/schema/context"
    xsi:schemalocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
 
  <context:annotation-config/>
 
  <!-- activemq提供的connectionfactory-->
  <bean id="targetconnectionfactory" class="org.apache.activemq.activemqconnectionfactory">
    <property name="brokerurl" value="tcp://localhost:61616"/>
  </bean>
 
  <!-- 在spring 中配置jms連接工廠,連接到activemq提供的connectionfactory-->
  <bean id="connectionfactory" class="org.springframework.jms.connection.singleconnectionfactory">
    <property name="targetconnectionfactory" ref = "targetconnectionfactory"/>
  </bean>
 
  <!-- 配置jmstemplate,用于發送消息 -->
  <bean id="jmstemplate" class="org.springframework.jms.core.jmstemplate">
    <property name="connectionfactory" ref="connectionfactory"/>
  </bean>
 
  <!-- 配置隊列目的地的名稱-->
  <bean id="queuedestination" class="org.apache.activemq.command.activemqqueue">
    <constructor-arg value="queue-spring-name"/>
  </bean>
  <!-- 配置隊列目的地的名稱-->
  <bean id="topicdestination" class="org.apache.activemq.command.activemqtopic">
    <constructor-arg value="topic-spring-name"/>
  </bean>
 
  <bean id="producerserviceimpl" class="com.wgs.jms.producer.activemqproducerserviceimpl"/>
 
 
</beans>

consumer.xml:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
<?xml version="1.0" encoding="utf-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/xmlschema-instance"
    xmlns:context="http://www.springframework.org/schema/context"
    xsi:schemalocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
 
  <context:annotation-config/>
 
  <!-- activemq提供的connectionfactory-->
  <bean id="targetconnectionfactory" class="org.apache.activemq.activemqconnectionfactory">
    <property name="brokerurl" value="tcp://localhost:61616"/>
  </bean>
 
  <!-- 在spring 中配置jms連接工廠,連接到activemq提供的connectionfactory-->
  <bean id="connectionfactory" class="org.springframework.jms.connection.singleconnectionfactory">
    <property name="targetconnectionfactory" ref = "targetconnectionfactory"/>
  </bean>
 
  <!-- 配置隊列目的地的名稱-->
  <bean id="queuedestination" class="org.apache.activemq.command.activemqqueue">
    <constructor-arg value="queue-spring-name"/>
  </bean>
  <!-- 配置消息監聽器-->
  <bean id="consumermessagelistener" class="com.wgs.jms.consumer.consumermessagelistener"/>
  <!-- 配置隊列目的地的名稱-->
  <bean id="jmscontainer" class="org.springframework.jms.listener.defaultmessagelistenercontainer">
    <property name="destination" ref="queuedestination"/>
    <property name="connectionfactory" ref="connectionfactory"/>
    <property name="messagelistener" ref="consumermessagelistener"/>
  </bean>
  <!-- 配置隊列目的地的名稱-->
  <bean id="topicdestination" class="org.apache.activemq.command.activemqtopic">
    <constructor-arg value="topic-spring-name"/>
  </bean>
</beans>

生產者producer:

(1)先寫一個接口:

?
1
2
3
4
5
6
7
package com.wgs.jms.producer;
/**
 * created by genshenwang.nomico on 2017/10/20.
 */
public interface activemqproducerservice {
  void sendmessage(final string message);
}

(2)接口的實現:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package com.wgs.jms.producer;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.jms.core.jmstemplate;
import org.springframework.jms.core.messagecreator;
import javax.annotation.resource;
import javax.jms.*;
/**
 * created by genshenwang.nomico on 2017/10/20.
 */
public class activemqproducerserviceimpl implements activemqproducerservice {
    @autowired
      jmstemplate jmstemplate;
    @resource(name = "queuedestination")
      destination destination;
    public void sendmessage(final string message) {
        jmstemplate.send(destination, new messagecreator() {
            public message createmessage(session session) throws jmsexception {
                textmessage textmessage = session.createtextmessage(message);
                return textmessage;
            }
        }
        );
        system.out.println("生產者- 1 -發送消息成功:" + message);
    }
}

(3)測試:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package com.wgs.jms.producer;
import org.springframework.context.support.classpathxmlapplicationcontext;
/**
 * created by genshenwang.nomico on 2017/10/20.
 */
public class activemqproducermain {
    public static void main(string[] args) {
        classpathxmlapplicationcontext context = new classpathxmlapplicationcontext("producer.xml");
        activemqproducerservice service = context.getbean(activemqproducerservice.class);
        for (int i = 0; i < 100; i++) {
            service.sendmessage("test" + i);
        }
        context.close();
    }
}

消費者:

(1)創建消息監聽器:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package com.wgs.jms.consumer;
import javax.jms.jmsexception;
import javax.jms.message;
import javax.jms.messagelistener;
import javax.jms.textmessage;
/**
 * created by genshenwang.nomico on 2017/10/20.
 */
public class consumermessagelistener implements messagelistener {
    public void onmessage(message message) {
        try {
            textmessage textmessage = (textmessage) message;
            system.out.println("消費者- 1 -接收消息:" + textmessage.gettext());
        }
        catch (jmsexception e) {
            e.printstacktrace();
        }
    }
}

(2)測試:

?
1
2
3
4
5
6
7
8
9
10
package com.wgs.jms.consumer;
import org.springframework.context.support.classpathxmlapplicationcontext;
/**
 * created by genshenwang.nomico on 2017/10/20.
 */
public class activemqconsumermain {
    public static void main(string[] args) {
        classpathxmlapplicationcontext context = new classpathxmlapplicationcontext("consumer.xml");
    }
}

2. jms topic模型代碼實現:

將上述代碼中出現的queuedestination改為topicdestination即可。

總結

以上就是本文關于jms簡介與activemq實戰代碼分享的全部內容,希望對大家有所幫助。感興趣的朋友可以繼續參閱本站其他相關專題,如有不足之處,歡迎留言指出。感謝朋友們對本站的支持!

原文鏈接:http://blog.csdn.net/noaman_wgs/article/details/78392718

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: 5g影院天天5g爽天天看 | 日韩精品在线一区二区 | 69日本xxxx| 精品国产原创在线观看视频 | 草莓香蕉绿巨人丝瓜榴莲污在线观看 | 四虎永久网址在线观看 | 99久久er这里只有精品17 | 亚洲 日韩经典 中文字幕 | 扒开腿开嫩苞 | 久久学生精品国产自在拍 | 波多野结衣作品在线观看 | 国产91无毒不卡在线观看 | 成人欧美一区二区三区黑人 | 饭冈加奈子乳喷cead144 | 国产成人福利美女观看视频 | 色播导航| 青青久久精品国产免费看 | 国语视频高清在线观看 | 麻生希在线 | 午夜久久久久久网站 | 女人叉开腿让男人桶 | 亚洲精品无码久久不卡 | 99久久久无码国产精品 | 荡娃艳妇有声小说 | 国产福利资源网在线观看 | 午夜在线播放免费人成无 | 天天色天天综合网 | 久久视频在线视频观看精品15 | 亚洲系列第一页 | 青春草视频免费观看 | 免费看成人毛片日本久久 | 国产精品第一 | 兽操人 | 国自产拍在线天天更新91 | 欧美精品日韩一区二区三区 | 91免费播放人人爽人人快乐 | 女人爽到喷水的视频免费 | 国产成人福利色视频 | 深夜在线影院 | 草草在线影院 | aaa毛片手机在线现看 |