分布式技术入坑指南(四)

分布式技术入坑指南(四)

12.Java消息队列学习与使用

ActiveMQ

官网:http://activemq.apache.org/

安装

  • 解压
  • cd到bin目录
  • ./activemq start
  • 访问,默认端口:8161
  • 默认用户名与密码:admin

使用

两种模式

  • 点对点
    Alt text

  • 发布/订阅
    Alt text

需要包:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<!-- 消息队列整合包 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
<version>${spring.version}</version>
</dependency>
<!-- activemq -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>${activemq.version}</version>
</dependency>

点对点模式

目的地实现类 —— Queue
发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package xyz.yuzh.activemq.queue;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
* Created with IntelliJ IDEA.
* Author: yu_zh
* DateTime: 2018/07/24 12:31
* 消息发送端
* 点对点模式
*/
public class QueueProducer {
public static void main(String[] args) throws Exception{
// 第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。
//brokerURL服务器的ip及端口号
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.184.130:61616");
// 第二步:使用ConnectionFactory对象创建一个Connection对象。
Connection connection = connectionFactory.createConnection();
// 第三步:开启连接,调用Connection对象的start方法。
connection.start();
// 第四步:使用Connection对象创建一个Session对象。
//第一个参数:是否开启事务。true:开启事务,第二个参数忽略。
//第二个参数:当第一个参数为false时,才有意义。消息的应答模式。1、自动应答2、手动应答。一般是自动应答。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 第五步:使用Session对象创建一个Destination对象(topic、queue),此处创建一个Queue对象。
//参数:队列的名称。
Queue queue = session.createQueue("queue-test");
// 第六步:使用Session对象创建一个Producer对象。
MessageProducer producer = session.createProducer(queue);
// 第七步:创建一个Message对象,创建一个TextMessage对象。
/*TextMessage message = new ActiveMQTextMessage();
message.setText("hello activeMq,this is my first test.");*/
TextMessage textMessage = session.createTextMessage("hello activeMq,this is my first test.");
// 第八步:使用Producer对象发送消息。
producer.send(textMessage);
// 第九步:关闭资源。
producer.close();
session.close();
connection.close();
}
}

接收消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package xyz.yuzh.activemq.queue;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
* Created with IntelliJ IDEA.
* Author: yu_zh
* DateTime: 2018/07/24 12:31
* 消息接收端
*/
public class QueueConsumer {
public static void main(String[] args) throws Exception {
// 1.创建一个连接工厂 (Activemq的连接工厂)参数:指定连接的activemq的服务
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.184.130:61616");
// 2.获取连接
Connection connection = connectionFactory.createConnection();
// 3.开启连接
connection.start();
// 4.根据连接对象创建session
// 第一个参数:表示是否使用分布式事务(JTA)
// 第二个参数:如果第一个参数为false,第二个参数才有意义;表示使用的应答模式 :自动应答,手动应答.这里选择自动应答。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5.根据session创建Destination(目的地,queue topic,这里使用的是queue)
Queue queue = session.createQueue("queue-test");
// 6.创建消费者
MessageConsumer consumer = session.createConsumer(queue);
//7.接收消息
//第一种
/* while(true){
//接收消息 (参数的值表示的是超过一定时间 以毫秒为单位就断开连接)
Message message = consumer.receive(10000);
//如果message为空,没有接收到消息了就跳出
if(message==null){
break;
}

if(message instanceof TextMessage){
TextMessage messaget = (TextMessage)message;
System.out.println(">>>获取的消息内容:"+messaget.getText());//获取消息内容
}
}*/

//第二种:
//设置监听器,其实开启了一个新的线程。
consumer.setMessageListener((m) -> {
//接收消息,如果有消息才进入,如果没有消息就不会进入此方法
if (m instanceof TextMessage) {
TextMessage message = (TextMessage) m;
try {
//获取消息内容
System.out.println(">>>获取的消息内容:" + message.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
Thread.sleep(10000);//睡眠10秒钟。

// 9.关闭资源
consumer.close();
session.close();
connection.close();
}
}

发布/订阅模式

目的地实现类 —— Topic

发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package xyz.yuzh.activemq.topic;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQTextMessage;

import javax.jms.*;

/**
* Created with IntelliJ IDEA.
* Author: yu_zh
* DateTime: 2018/07/24 12:50
* 发布订阅模式
*/
public class TopicProducer {
public static void main(String[] args) throws Exception {
// 1.创建一个连接工厂 (Activemq的连接工厂)参数:指定连接的activemq的服务
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.184.130:61616");
// 2.获取连接
Connection connection = connectionFactory.createConnection();
// 3.开启连接
connection.start();
// 4.根据连接对象创建session
// 第一个参数:表示是否使用分布式事务(JTA)
// 第二个参数:如果第一个参数为false,第二个参数才有意义;表示使用的应答模式 :自动应答,手动应答.这里选择自动应答。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5.根据session创建Destination(目的地,queue topic,这里使用的是topic)
Topic topic = session.createTopic("topic-test");
// 6.创建生产者
MessageProducer producer = session.createProducer(topic);
// 7.构建消息对象,(构建发送消息的内容) 字符串类型的消息格式(TEXTMessage)
TextMessage textMessage = new ActiveMQTextMessage();
textMessage.setText("发送消息123");// 消息的内容
// 8.发送消息
producer.send(textMessage);
// 9.关闭资源
producer.close();
session.close();
connection.close();
}
}

接收消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package xyz.yuzh.activemq.topic;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
* Created with IntelliJ IDEA.
* Author: yu_zh
* DateTime: 2018/07/24 12:50
*/
public class TopicConsumer {
public static void main(String[] args) throws Exception {
// 1.创建一个连接工厂 (Activemq的连接工厂)参数:指定连接的activemq的服务
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.184.130:61616");
// 2.获取连接
Connection connection = connectionFactory.createConnection();
// 3.开启连接
connection.start();
// 4.根据连接对象创建session
// 第一个参数:表示是否使用分布式事务(JTA)
// 第二个参数:如果第一个参数为false,第二个参数才有意义;表示使用的应答模式 :自动应答,手动应答.这里选择自动应答。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5.根据session创建Destination(目的地,queue topic,这里使用的是queue)
Topic topic = session.createTopic("topic-test");
// 6.创建消费者
MessageConsumer consumer = session.createConsumer(topic);
// 7.接收消息
while (true) {
//接收消息 (参数的值表示的是超过一定时间 以毫秒为单位就断开连接)
Message message = consumer.receive(100000);
//如果message为空,没有接收到消息了就跳出
if (message == null) {
break;
}
if (message instanceof TextMessage) {
TextMessage messaget = (TextMessage) message;
System.out.println(">>>获取的消息内容:" + messaget.getText());//获取消息内容
}
}
// 第二种:
// 设置监听器,其实开启了一个新的线程。
/* consumer.setMessageListener((m) -> {
// 接收消息,如果有消息才进入,如果没有消息就不会进入此方法
if (m instanceof TextMessage) {
TextMessage messaget = (TextMessage) m;
try {
// 获取消息内容
System.out.println(">>>获取的消息内容:" + messaget.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
Thread.sleep(10000);// 睡眠10秒钟。*/

// 9.关闭资源
consumer.close();
session.close();
connection.close();
}
}

与Spring整合

这里只做测试,发送与接收均在同一系统并在同一配置文件

mvc配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">

<!-- ActiveMQ的连接工厂 -->
<bean id="targetConnection" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://192.168.184.130:61616"></property>
</bean>
<!-- Spring通用的connectionfacotry 指定真正使用的连接工厂 -->
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory" ref="targetConnection"></property>
</bean>
<!-- 接收和发送消息时使用的模板类 -->
<bean class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory"></property>
</bean>
<!-- 目的地实现类:Queue、Topic -->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg name="name" value="item-change-queue"></constructor-arg>
</bean>
<!-- <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg name="name" value="item-change-topic"></constructor-arg>
</bean> -->

<!-- 配置消息接收监听器 -->
<bean id="myMessageListener" class="xyz.yuzh.activemq.spring.SpringActiveMqReciveListener"></bean>
<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"></property>
<property name="destination" ref="queueDestination"></property>
<property name="messageListener" ref="myMessageListener"></property>
</bean>
</beans>

发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package xyz.yuzh.activemq.spring;

import org.apache.xbean.spring.context.ClassPathXmlApplicationContext;
import org.junit.Test;
import org.springframework.context.ApplicationContext;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;


/**
* Created with IntelliJ IDEA.
* Author: yu_zh
* DateTime: 2018/07/24 17:21
* 消息发送端
*/
public class SpringActiveMqSend {
public static void main(String[] args) throws Exception{
//初始化容器
ApplicationContext context = new ClassPathXmlApplicationContext("classpath:spring-activemq.xml");
//获得JMS模板对象
JmsTemplate jmsTemplate = context.getBean(JmsTemplate.class);
//获得消息队列目的地(注意是jms接口的包,不是activemq的包。)
Destination queueDestination = context.getBean(Destination.class);
//发送消息
jmsTemplate.send(queueDestination, (s) -> {
StringBuilder message = new StringBuilder();
for (int i = 1 ; i <= 100;i++){
message.append("测试消息"+i+"\n");
}
return s.createTextMessage(message.toString());
});
Thread.sleep(100000);
}
}

接收消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package xyz.yuzh.activemq.spring;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
* Created with IntelliJ IDEA.
* Author: yu_zh
* DateTime: 2018/07/24 18:06
* 消息接收监听器
*/
public class SpringActiveMqReciveListener implements MessageListener {

@Override
public void onMessage(Message message) {
if(message instanceof TextMessage){
TextMessage textMessage = (TextMessage)message;
String text;
try {
text = textMessage.getText();
System.out.println("SpringActiveMqReciveListener-> "+text);
} catch (JMSException e) {
e.printStackTrace();
}
}
}

}

具体业务场景下使用

manager-service工程更新一个商品(增删改查),search-service工程要在索引库中更新该商品。

manager-service系统(发送消息端)

spring-activemq.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:dubbo="http://code.alibabatech.com/schema/dubbo" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
http://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">

<!-- 系统间通信 —— 消息发送方 -->

<bean id="targetConnection" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://192.168.184.130:61616"></property>
</bean>
<!-- 通用的connectionfacotry 指定真正使用的连接工厂 -->
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory" ref="targetConnection"></property>
</bean>
<!-- 接收和发送消息时使用的类 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory"></property>
</bean>
<!-- 订阅通知模式 -->
<bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg name="name" value="item-change-topic"></constructor-arg>
</bean>
</beans>

Service

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Service
public class ItemServiceImpl implements ItemService {
//......
@Resource
private JmsTemplate jmsTemplate;
@Resource
private Destination topicDestination;

//......

@Override
public Object addItem() {
//执行商品添加逻辑
//TODO...
//发送一个消息通知搜索系统更新该商品到索引库
jmsTemplate.send(topicDestination, (s) ->{
System.out.println("发送方-> 添加操作,商品编号是123");
return s.createTextMessage("addItem:123");
});
return null;
}

}

search-service系统(接收消息端)

spring-activemq.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:dubbo="http://code.alibabatech.com/schema/dubbo" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
http://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">

<!-- 系统间通信 —— 消息接收方 -->

<bean id="targetConnection" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://192.168.184.130:61616"></property>
</bean>
<!-- 通用的connectionfacotry 指定真正使用的连接工厂 -->
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory" ref="targetConnection"></property>
</bean>
<!-- 接收和发送消息时使用的类 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory"></property>
</bean>
<!-- 订阅通知模式 -->
<bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
<!-- name不和发送端保持一致接收不到消息! -->
<constructor-arg name="name" value="item-change-topic"></constructor-arg>
</bean>

<!-- 接收消息的监听器 -->
<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"></property>
<property name="destination" ref="topicDestination"></property>
<property name="messageListener" ref="itemChangeListener"></property>
</bean>
</beans>

Service

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package xyz.taotao.search.activemq.recive;

import org.apache.solr.client.solrj.SolrServer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;


/**
* Created with IntelliJ IDEA.
* Author: yu_zh
* DateTime: 2018/07/24 23:22
* 商品改变后通知该方法更新索引库
*/
@Component
public class ItemChangeListener implements MessageListener {
@Autowired
private SolrServer solrserver;

@Override
public void onMessage(Message message) {
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
String text = "";
try {
text = textMessage.getText();
if (text.contains("addItem")) {
String itemId = text.substring(text.lastIndexOf(":")+1);
//通知索引库服务提供者更新商品
//TODO...
System.out.println("接收方-> 索引库更新产品:"+itemId);
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}

}

推荐博客:http://elim.iteye.com/blog/1893038

Comments

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×