Loading... # 引言 JMS也就是Java Message Server,java提供了jms接口,能够实现这个接口的有kafka、rabbitMQ、RocketMQ、ActiveMQ等等,这里我们以ActiveMQ为例。 java的消息以及ActiveMQ的背景就不再赘述了。 # 下载与安装 https://activemq.apache.org/components/classic/download/ 怎么下载和安装也不进行详细的介绍了,因为开发和测试均在Windows平台下,所以本篇博文所演示的均在Windows平台运行的。 # 配置 ## 安全性考虑 1. Web页面的安全性 webUI可能大家只是用来看一些ActiveMQ的状态,但是细心的人可能会发现,Send a JMS Message页面,所以可以设置WebUI的登陆密码 找到conf目录下的jetty-realm.properties 下面的就是用户名密码和角色的规则,该就完了。 ``` 用户名: 密码, 角色 这个顺序,别搞错了 admin: admin123, admin user: user123, user ``` 2. 连接的安全性 如果别有用心的人知道了ActiveMQ的服务器地址和端口,发一些恶意的消息,可能会导致系统异常,甚至影响到安全性,所以可以给连接加上密码。 找到conf目录下的activemq.xml,编辑如下内容 ```xml <broker> <其它标签>...</其它标签> <plugins> <simpleAuthenticationPlugin> <users> <authenticationUser username="alc" password="alc" groups="users,admins"/> </users> </simpleAuthenticationPlugin> </plugins> </broker> ``` 3. 我感觉更安全的方式是内网共享,外网隔离,设置IP白名单。以上只是介绍,开发阶段设置这些反而麻烦了。 # 入门 ## 原始方法 ### 导入依赖 ```xml <dependencies> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-client</artifactId> <version>5.15.4</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>5.3.4</version> </dependency> </dependencies> ``` ### 消息队列(queue) 服务端 ```java import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class Server { public static void main(String[] args) throws JMSException { // 服务端部分 // 如果设置了ActiveMQ的账号和密码 String username = "alc"; String password = "alc"; // 创建连接工厂 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616"); // 通过连接工厂创建连接 Connection connection = connectionFactory.createConnection(username,password); // 启动线程连接 connection.start(); // 通过连接,创建会话 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 创建一个点对点的消息队列 Queue queue = session.createQueue("ALCActiveMQQueueTest"); // 创建一个生产者,消息生产者以上面定义的消息队列为名。 MessageProducer producer = session.createProducer(queue); // 定义一个消息,这里使用的是接口的实现类 TextMessage message = session.createTextMessage("Hello ALC"); // 发送消息 producer.send(message); // 倒序关流 producer.close(); session.close(); connection.close(); } } ``` 客户端 ```java import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; import java.io.IOException; public class Client { public static void main(String[] args) throws JMSException, IOException { // 客户端部分 // 如果设置了ActiveMQ的账号和密码 String username = "alc"; String password = "alc"; // 依然要先创建连接工厂 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616"); // 通过连接工厂创建连接对象 Connection connection = connectionFactory.createConnection(username,password); // 启动线程连接 connection.start(); // 通过连接创建会话 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 创建消息的队列,因为我们服务端发送的是点对点式的,所以使用的是Queue Queue queue = session.createQueue("ALCActiveMQQueueTest"); // 通过会话创建消费者对象 MessageConsumer consumer = session.createConsumer(queue); // 创建消费者监听器 consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { // 因为我们的示例发送的是文本,所以我们可以强制转换为字符串消息类型 TextMessage textMessage = (TextMessage) message; try { // 输出一下得到的文本数据。 System.out.println(textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }); // 持续监听,也就是让主线程阻塞。 System.in.read(); // 倒序关流 consumer.close(); session.close(); connection.close(); } } ``` ### 发布订阅(topic) 服务端 ```java import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class TServer { public static void main(String[] args) throws JMSException { // 服务端部分 // 如果设置了ActiveMQ的账号和密码 String username = "alc"; String password = "alc"; // 创建连接工厂 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616"); // 通过连接工厂创建连接 Connection connection = connectionFactory.createConnection(username, password); // 启动连接线程 connection.start(); // 通过连接对象创建会话 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 通过会话创建发布源 Topic topic = session.createTopic("ALC_Topic_Demo"); // 创建消息 TextMessage textMessage = session.createTextMessage("Hello ALC"); // 创建生产者 MessageProducer producer = session.createProducer(topic); // 生产者发布消息 producer.send(textMessage); // 倒序关流 producer.close(); session.close(); connection.close(); } } ``` 客户端 ```java import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; import java.io.IOException; public class TClient { public static void main(String[] args) throws JMSException, IOException { // 客户端部分 // 如果设置了ActiveMQ的账号和密码 String username = "alc"; String password = "alc"; // 依然要先创建连接工厂 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616"); // 通过连接工厂创建连接对象 Connection connection = connectionFactory.createConnection(username,password); // 启动线程连接 connection.start(); // 通过连接创建会话 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 创建消息对象,这次我们使用的是发布订阅模式 Topic topic = session.createTopic("ALC_Topic_Demo"); // 创建消费者对象 MessageConsumer consumer = session.createConsumer(topic); // 创建消息监听器 consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { // 因为我们知道这里使用的是文本,所以可以进行强制转换 TextMessage textMessage = (TextMessage) message; try { System.out.println(textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }); // 持续监听,也就是让主线程阻塞。 System.in.read(); // 倒序关流 consumer.close(); session.close(); connection.close(); } } ``` ## 提取工具类 这里就不进行代码演示了,因为我们发现重复代码有很多,那就可以提取重复代码,减少代码书写量,用起来也方便了。 这些代码都是重复的,所以可以定义一个静态的方法,用来获取Session对象。 ```java // 客户端部分 // 如果设置了ActiveMQ的账号和密码 String username = "alc"; String password = "alc"; // 依然要先创建连接工厂 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616"); // 通过连接工厂创建连接对象 Connection connection = connectionFactory.createConnection(username,password); // 启动线程连接 connection.start(); // 通过连接创建会话 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); ``` 针对不同的模式,再写一个不同的方法,最后统一关流操作。 ## Spring介入 ### 导入依赖文件 ```xml <dependencies> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>4.2.4.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <version>4.2.4.RELEASE</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-client</artifactId> <version>5.14.5</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> </dependencies> ``` ### Spring配置文件 ```xml <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> <context:component-scan base-package="com.alc"/> <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供--> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://127.0.0.1:61616"/> <property name="userName" value="alc"/> <property name="password" value="alc"/> </bean> <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory --> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <!-- 目标ConnectionFactory对应真实的可以产生JMSConnection的ConnectionFactory --> <property name="targetConnectionFactory" ref="targetConnectionFactory"/> </bean> <!--这个是点对点消息 --> <bean id="queueTextDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value="ALC_Spring_queue"/> </bean> <!--这是发布订阅消息--> <bean id="topicTextDestination" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg value="ALC_Spring_topic"/> </bean> <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 --> <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 --> <property name="connectionFactory" ref="connectionFactory"/> </bean> <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 --> <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 --> <property name="connectionFactory" ref="connectionFactory"/> </bean> <!--这样定义是为了后面的解释--> <bean id="t1" class="com.alc.TopicListener"> <property name="id" value="topic 1"/> </bean> <bean id="t2" class="com.alc.TopicListener"> <property name="id" value="topic 2"/> </bean> <bean id="t3" class="com.alc.TopicListener"> <property name="id" value="topic 3"/> </bean> <!--这样定义是为了后面的解释--> <bean id="q1" class="com.alc.QueueListener"> <property name="id" value="queue 1"/> </bean> <bean id="q2" class="com.alc.QueueListener"> <property name="id" value="queue 2"/> </bean> <bean id="q3" class="com.alc.QueueListener"> <property name="id" value="queue 3"/> </bean> <!-- 消息监听容器--> <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <!-- 连接工厂--> <property name="connectionFactory" ref="connectionFactory"/> <!-- 设置要监听的目标队列--> <property name="destination" ref="queueTextDestination"/> <!-- 设置监听处理类--> <property name="messageListener" ref="q1"/> </bean> <!-- 消息监听容器--> <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <!-- 连接工厂--> <property name="connectionFactory" ref="connectionFactory"/> <!-- 设置要监听的目标队列--> <property name="destination" ref="topicTextDestination"/> <!-- 设置监听处理类--> <property name="messageListener" ref="t1"/> </bean> <!-- 消息监听容器--> <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <!-- 连接工厂--> <property name="connectionFactory" ref="connectionFactory"/> <!-- 设置要监听的目标队列--> <property name="destination" ref="queueTextDestination"/> <!-- 设置监听处理类--> <property name="messageListener" ref="q2"/> </bean> <!-- 消息监听容器--> <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <!-- 连接工厂--> <property name="connectionFactory" ref="connectionFactory"/> <!-- 设置要监听的目标队列--> <property name="destination" ref="topicTextDestination"/> <!-- 设置监听处理类--> <property name="messageListener" ref="t2"/> </bean> <!-- 消息监听容器--> <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <!-- 连接工厂--> <property name="connectionFactory" ref="connectionFactory"/> <!-- 设置要监听的目标队列--> <property name="destination" ref="queueTextDestination"/> <!-- 设置监听处理类--> <property name="messageListener" ref="q3"/> </bean> <!-- 消息监听容器--> <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <!-- 连接工厂--> <property name="connectionFactory" ref="connectionFactory"/> <!-- 设置要监听的目标队列--> <property name="destination" ref="topicTextDestination"/> <!-- 设置监听处理类--> <property name="messageListener" ref="t3"/> </bean> </beans> ``` ### QueueListener ```java package com.alc; import org.springframework.stereotype.Component; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; @Component public class QueueListener implements MessageListener { private String id; public void setId(String id) { this.id = id; } public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out.println(id + " <-->" + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } } ``` ### TopicListerner ```java package com.alc; import org.springframework.stereotype.Component; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; @Component public class TopicListener implements MessageListener { private String id; public void setId(String id) { this.id = id; } public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out.println(id + " <-->" + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } } ``` ### MyTest ```java import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import javax.jms.*; @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = "classpath:applicationContext-jms.xml") public class MyTest { @Autowired private JmsTemplate jmsQueueTemplate; @Autowired private JmsTemplate jmsTopicTemplate; @Autowired private Destination queueTextDestination; @Autowired private Destination topicTextDestination; @Test public void QueueServer() throws JMSException { for (int i = 0; i < 10; i++) { final int finalI = i; jmsQueueTemplate.send(queueTextDestination, new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(" Hello Queue ->" + " " + finalI); } }); } } @Test public void TopicServer() throws JMSException { for (int i = 0; i < 10; i++) { final int finalI = i; jmsTopicTemplate.send(topicTextDestination, new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(" Hello Topic ->" + " " + finalI); } }); } } } ``` 看一下输出结果 当执行queue的时候 ``` queue 2 <--> Hello Queue -> 0 queue 1 <--> Hello Queue -> 1 queue 3 <--> Hello Queue -> 2 queue 2 <--> Hello Queue -> 3 queue 1 <--> Hello Queue -> 4 queue 3 <--> Hello Queue -> 5 queue 2 <--> Hello Queue -> 6 queue 1 <--> Hello Queue -> 7 queue 3 <--> Hello Queue -> 8 queue 2 <--> Hello Queue -> 9 ``` 当执行topic的时候 ``` topic 1 <--> Hello Topic -> 0 topic 1 <--> Hello Topic -> 1 topic 1 <--> Hello Topic -> 2 topic 1 <--> Hello Topic -> 3 topic 1 <--> Hello Topic -> 4 topic 1 <--> Hello Topic -> 5 topic 1 <--> Hello Topic -> 6 topic 1 <--> Hello Topic -> 7 topic 1 <--> Hello Topic -> 8 topic 1 <--> Hello Topic -> 9 topic 3 <--> Hello Topic -> 0 topic 3 <--> Hello Topic -> 1 topic 3 <--> Hello Topic -> 2 topic 3 <--> Hello Topic -> 3 topic 3 <--> Hello Topic -> 4 topic 3 <--> Hello Topic -> 5 topic 3 <--> Hello Topic -> 6 topic 3 <--> Hello Topic -> 7 topic 3 <--> Hello Topic -> 8 topic 3 <--> Hello Topic -> 9 topic 2 <--> Hello Topic -> 0 topic 2 <--> Hello Topic -> 1 topic 2 <--> Hello Topic -> 2 topic 2 <--> Hello Topic -> 3 ``` 也就是说,queue只会有一个监听器执行这个方法,而topic所有的都会接收到消息,并且执行。 # 事务模式 在手动创建activeMQ的时候是这样的 ```java Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); ``` 前面的false也就是不开启事务模式,后面的是自动应答,也就是说客户端(消费者)接收到消息后就确认消息送达,不关心成功与否,如果不成功也不会重新发送JMS消息 细节很多,我目前理解就是这些,有时间或者用到了,我会补充上的 [TODO] © 允许规范转载 打赏 赞赏作者 支付宝微信 赞 如果觉得我的文章对你有用,请随意赞赏