JMS
# JMS
# 什么是 JMS
JMS(JAVA Message Service,java消息服务)API 是一个消息服务的标准或者说是规范,允许应用程序组件基于 JavaEE 平台创建、发送、接收和读取消息。它使分布式通信耦合度更低,消息服务更加可靠以及异步性。
Tips
JMS是java的消息服务,JMS的客户端之间可以通过JMS服务进行异步的消息传输。
# JMS 四大元素
# JMS provider
实现 JMS 接口和规范的消息中间件,即 MQ 服务器。
# JMS producer
消息生产者,创建、发送 JMS 消息的客户端应用。
# JMS consumer
消息消费者,接收和处理 JMS 消息的客户端应用。
# JMS message
# 消息头
- JMSDestination 目的地。
- JMSDeliveryMode 持久和非持久,持久模式中如果 JMS provider 出现故障,消息不会丢失,服务器恢复后再次传递。
- JMSExpiration 过期时间。
- JMSPriority 优先级,加急消息优先于普通消息。
- JMSMessageID。
TextMessage textMessage = session.createTextMessage("第" + i + "次蚌埠住了");
textMessage.setJMSExpiration(1000);
textMessage.setJMSDeliveryMode(Message.DEFAULT_DELIVERY_MODE);
textMessage.setJMSPriority(Message.DEFAULT_PRIORITY);
producer.send(textMessage);
2
3
4
5
# 消息体
- 封装具体的消息数据。
- 5种消息体格式:
- TextMessage 普通字符串消息,包含一个String。
- MapMessage 一个 Map 类型消息 key 为 String,value 为 Java 基本类型。
- BytesMessage 二进制数组,包含一个 byte[]。
- StreamMessage Java 数据流消息,用标准流操作来顺序的填充和读取。
- ObjectMessage 对象消息,包含一个可序列化的 Java 对象。
- 发送和接收的消息体类型必须一致。
# 发送方
for (int i = 0; i < 5; i++) {
TextMessage textMessage = session.createTextMessage("第" + i + "次蚌埠住了");
producer.send(textMessage);
MapMessage mapMessage = session.createMapMessage();
mapMessage.setString("string","字符串消息");
mapMessage.setInt("int",1);
mapMessage.setDouble("double",2.3);
mapMessage.setBoolean("boolean",false);
producer.send(mapMessage);
BytesMessage bytesMessage = session.createBytesMessage();
bytesMessage.writeBytes("随着经济发展,蚌埠住的人越来越多了".getBytes(StandardCharsets.UTF_8));
producer.send(bytesMessage);
StreamMessage streamMessage = session.createStreamMessage();
streamMessage.writeString("天怎么突然晴了,原来是你把我给整无雨了");
producer.send(streamMessage);
ObjectMessage objectMessage = session.createObjectMessage();
objectMessage.setObject(new User(1,"PPG"));
producer.send(objectMessage);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 接收方
consumer.setMessageListener(message -> {
try {
if (message instanceof TextMessage){
TextMessage textMessage = (TextMessage) message;
System.out.println("消费者收到字符串消息:\n"+textMessage.getText());
}else if (message instanceof MapMessage){
MapMessage mapMessage = (MapMessage) message;
System.out.println("消费者收到map消息:");
System.out.println("String"+mapMessage.getString("string"));
System.out.println("int"+mapMessage.getInt("int"));
System.out.println("double"+mapMessage.getDouble("double"));
System.out.println("boolean"+mapMessage.getBoolean("boolean"));
}else if (message instanceof BytesMessage){
BytesMessage bytesMessage = (BytesMessage) message;
System.out.println("消费者收到字节消息:");
byte[] buffer = new byte[10];
int len;
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
while ((len=bytesMessage.readBytes(buffer))!=-1){
byteArrayOutputStream.write(buffer,0,len);
}
System.out.println(new String(byteArrayOutputStream.toByteArray(), 0, byteArrayOutputStream.size(), StandardCharsets.UTF_8));
}else if (message instanceof StreamMessage){
StreamMessage streamMessage = (StreamMessage) message;
System.out.println("消费者收到Stream流消息:");
System.out.println(streamMessage.readString());
}else if (message instanceof ObjectMessage){
System.out.println("消费者收到对象消息:");
ObjectMessage objectMessage = (ObjectMessage) message;
User user = (User) objectMessage.getObject();
System.out.println(user);
}
System.out.println("===================================================");
}catch (Exception e){
e.printStackTrace();
}
});
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# 消息属性
- 如果需要除消息头字段以外的值,那么可以使用消息属性。
- 识别/去重/重点标注等操作非常有用的方法。
- 是属性名、属性值的键值对形式。
# JMS 的可靠性
# PRESISTENT 持久性
# 队列持久性
Tips
队列的情况下默认持久化。
持久化:
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
非持久化:
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
# 主题持久性
发送端:
先设置持久化后再调用connection.start()
方法。
private static final String URL="tcp://150.158.153.216:61616";
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(URL);
Connection connection = activeMQConnectionFactory.createConnection();
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("persist");
MessageProducer producer = session.createProducer(topic);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
connection.start();
for (int i = 0; i < 3; i++) {
TextMessage textMessage = session.createTextMessage("随着经济发展,蚌埠住的人越来越多了");
producer.send(textMessage);
}
System.out.println("send complete");
producer.close();
session.close();
connection.close();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
消费者:
首先设置一个客户端 ID,然后不再使用 consumer 而是 subscriber 订阅者,就像是订阅了某个公众号,这样持久化的消息会在客户端可用时推送。
注意
要先运行过一次订阅成功才能实现接收过去的持久化消息
private static final String URL="tcp://150.158.153.216:61616";
public static void main(String[] args) throws JMSException, IOException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(URL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.setClientID("ppg");//ID需要唯一
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("persist");
TopicSubscriber subscriber = session.createDurableSubscriber(topic, "demo");
connection.start();
subscriber.setMessageListener((message -> {
if (message instanceof TextMessage){
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("消费者收到:"+textMessage.getText());
}catch (Exception e){
e.printStackTrace();
}
}
}));
System.in.read();
subscriber.close();
session.close();
connection.close();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 事务
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
对于生产者来说,如果第一个设置为false即不开启事务,只要执行send,消息就进入队列;如果设置为true即开启了事务,先执行send再执行commit才能将消息提交到队列中
session.commit();
session.rollback();
2
对于消费者来说,如果第一个设置为 false 即不开启事务,只要执行接收,消息就出队列;如果设置为 true 即开启了事务,如果接收完毕不执行 commit,被消费的消息不会出队列,导致一条消息多次消费。
# Acknowledge 签收
# 非事务模式
- 自动签收(默认):
Session.AUTO_ACKNOWLEDGE
- 手动签收:
Session.CLIENT_ACKNOWLEDGE
消费者要主动签收,否则同样存在多次消费的问题:
textMessage.acknowledge();
- 允许重复消息:
Session.DUPS_OK_ACKNOWLEDGE
不必必须签收,消息可能会重复发送。在第二次重新传递消息的时候,消息头的 JmsDelivered 会被置为 true 标示当前消息已经传送过一次,客户端需要进行消息的重复处理控制。类似 auto ack, 自动批量确认消息,具有延迟发送 ack 的特点,ActiveMq 内部实现积累一定数量自动确认。
# 事务模式
开启事务后,是否签收取决于事务是否 commit。
# 事务与签收的关系
- 在事务性会话中,当一个事务被成功提交则消息被自动签收。如果事务回滚,则消息会被再次传送。
- 非事务性会话中,消息何时被确认取决于创建会话时的应答模式