Spring 整合 ActiveMQ
PPG007 ... 2021-12-27 About 2 min
# Spring 整合 ActiveMQ
# 导入依赖
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
<version>5.3.6</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework/spring-jms -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>5.3.9</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-pool -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.16.3</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.12.4</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-all -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.16.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.xbean/xbean-spring -->
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<version>4.20</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# 编写配置
@Configuration
@ComponentScan(basePackages = "service")
public class Config {
@Bean
public PooledConnectionFactory connectionFactory(){
PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
pooledConnectionFactory.setConnectionFactory(mqConnectionFactory());
pooledConnectionFactory.setMaxConnections(100);
return pooledConnectionFactory;
}
private ActiveMQConnectionFactory mqConnectionFactory(){
ActiveMQConnectionFactory mqConnectionFactory = new ActiveMQConnectionFactory();
mqConnectionFactory.setBrokerURL("tcp://150.158.153.216:61616");
return mqConnectionFactory;
}
@Bean
public ActiveMQQueue queue(){
// 设置队列名字
return new ActiveMQQueue("springActiveMQQueue-PPG");
}
@Bean
public ActiveMQTopic topic(){
// 设置主题名字
return new ActiveMQTopic("springActiveMQTopic-PPG");
}
@Bean
public JmsTemplate jmsTemplate(){
JmsTemplate jmsTemplate = new JmsTemplate();
// 设置连接工厂
jmsTemplate.setConnectionFactory(connectionFactory());
// 设置默认目的地
jmsTemplate.setDefaultDestination(queue());
// 设置消息转换器
jmsTemplate.setMessageConverter(new SimpleMessageConverter());
return jmsTemplate;
}
@Bean
public DefaultMessageListenerContainer messageListenerContainer(){
DefaultMessageListenerContainer messageListenerContainer = new DefaultMessageListenerContainer();
messageListenerContainer.setConnectionFactory(connectionFactory());
messageListenerContainer.setDestination(queue());
messageListenerContainer.setSessionTransacted(true);
messageListenerContainer.setMessageListener((MessageListener) message -> {
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println(textMessage.getText());
}catch (Exception e){
e.printStackTrace();
}
}throw new RuntimeException();
});
return messageListenerContainer;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
在Spring中使用ActiveMQ步骤:
- 创建 ActiveMQ 的连接工厂,设置 URL。
- 创建池化连接工厂,传入上面的 ActiveMQ 连接工厂,设置最大连接数等参数,注册到容器。
- 创建队列或主题,注册到容器。
- 配置 JmsTemplate 指定连接工厂、默认目的地、消息转换器等其他配置,注册到容器。
- 配置消息监听器,指明监听器使用的连接工厂与监听的目的地,注入容器。
# 使用队列
# 发送端
public static void main(String[] args) {
AnnotationConfigApplicationContext annotationConfigApplicationContext = new AnnotationConfigApplicationContext(Config.class);
JmsTemplate jmsTemplate = annotationConfigApplicationContext.getBean(JmsTemplate.class);
jmsTemplate.send(session -> session.createTextMessage("扎不多得嘞"));
System.out.println("send success");
}
1
2
3
4
5
6
2
3
4
5
6
由于配置类中默认目的地就是队列,所以此处不需要指定。
# 接收端
调用 receive 方法进行读取。
public static void main(String[] args) throws JMSException {
AnnotationConfigApplicationContext annotationConfigApplicationContext = new AnnotationConfigApplicationContext(Config.class);
JmsTemplate jmsTemplate = annotationConfigApplicationContext.getBean(JmsTemplate.class);
TextMessage receive = (TextMessage) jmsTemplate.receive();
System.out.println(receive.getText());
}
1
2
3
4
5
6
2
3
4
5
6
# 使用主题
# 发送端
通过 setDefaultDestination
方法指定目的地,或者使用 convertAndSend
的重载方法指定目的地。
public static void main(String[] args) {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Config.class);
JmsTemplate jmsTemplate = context.getBean(JmsTemplate.class);
ActiveMQTopic topic = context.getBean(ActiveMQTopic.class);
jmsTemplate.setDefaultDestination(topic);
jmsTemplate.convertAndSend("随着经济的发展,蚌埠住的人越来越多了");
}
1
2
3
4
5
6
7
2
3
4
5
6
7
# 接收端
public static void main(String[] args) {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Config.class);
JmsTemplate jmsTemplate = context.getBean(JmsTemplate.class);
ActiveMQTopic topic = context.getBean(ActiveMQTopic.class);
jmsTemplate.setDefaultDestination(topic);
System.out.println(jmsTemplate.receiveAndConvert());
}
1
2
3
4
5
6
7
2
3
4
5
6
7
# 持久化
队列默认持久化。
主题模式中,配置消息监听器,进行订阅。
@Bean
public DefaultMessageListenerContainer messageListenerContainer(){
DefaultMessageListenerContainer messageListenerContainer = new DefaultMessageListenerContainer();
messageListenerContainer.setConnectionFactory(connectionFactory());
messageListenerContainer.setDestination(topic());
messageListenerContainer.setSessionTransacted(true);
messageListenerContainer.setSubscriptionDurable(true);
messageListenerContainer.setClientId("test");
messageListenerContainer.setDurableSubscriptionName("123");
messageListenerContainer.setMessageListener((MessageListener) message -> {
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println(textMessage.getText());
}catch (Exception e){
e.printStackTrace();
}
}
});
return messageListenerContainer;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
不论主题的发送方是否设置持久化,消息监听器都会收到离线时的消息,同样要要先运行一次执行订阅操作。
# 事务
Spring 提供了一个 JmsTransactionManager 用于对 JMS ConnectionFactory 做事务管理。这将允许 JMS 应用利用 Spring 的事务管理特性。
使用消息监听器时,配置其 sessionTransacted
属性为 true 即可开启事务,如果 listener 中出现异常,自动回滚。