Java 编码实现 MQ 通讯(Queue)

PPG007 ... 2021-12-27 About 2 min

# Java 编码实现 MQ 通讯(Queue)

一对一模型中目的地称为队列 Queue。

# 依赖

<!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-all -->
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.16.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.xbean/xbean-spring -->
<dependency>
    <groupId>org.apache.xbean</groupId>
    <artifactId>xbean-spring</artifactId>
    <version>4.20</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-api</artifactId>
    <version>1.7.32</version>
</dependency>
<!-- https://mvnrepository.com/artifact/ch.qos.logback/logback-classic -->
<dependency>
    <groupId>ch.qos.logback</groupId>
    <artifactId>logback-classic</artifactId>
    <version>1.2.5</version>
    <scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/junit/junit -->
<dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.13.2</version>
    <scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.18.20</version>
    <scope>provided</scope>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

# 发送端

private static final String URL="tcp://150.158.153.216:61616";

public static void main(String[] args) throws JMSException {
    //        创建连接工厂并传入URL
    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(URL);
    //        建立连接
    Connection connection = activeMQConnectionFactory.createConnection();
    //        开启连接
    connection.start();
    //        创建会话
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    //        点对点通信创建queue
    Queue queue = session.createQueue("ppg");
    //        创建这个queue对应的消息发送对象
    MessageProducer producer = session.createProducer(queue);
    //        循环发送三次消息
    for (int i = 0; i < 3; i++) {
        //            创建一个消息对象
        TextMessage textMessage = session.createTextMessage("msg--->" + i);
        //            发送消息
        producer.send(textMessage);
    }
    //        先开后关
    producer.close();
    session.close();
    connection.close();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

# 接收端

private static final String URL="tcp://150.158.153.216:61616";

public static void main(String[] args) throws JMSException {
    //        创建连接工厂并传入URL
    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(URL);
    //        建立连接
    Connection connection = activeMQConnectionFactory.createConnection();
    //        开启连接
    connection.start();
    //        创建会话
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    //        点对点通信创建queue
    Queue queue = session.createQueue("ppg");
    //        创建消费者
    MessageConsumer consumer = session.createConsumer(queue);
    while (true){
        //            接收与发送的类型要一致
        TextMessage receive = (TextMessage) consumer.receive();
        if (receive!=null){
            System.out.println("收到消息-->"+receive.getText());
        }else {
            break;
        }
    }
    //        先开后关
    consumer.close();
    session.close();
    connection.close();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

查看 ActiveMQ 控制台:

image-20210819143010106

点击上方 Queues 进入 queue 队列的监控界面,name 表示 queue 的名字,此后紧跟着待消费的消息数量、已连接的消费者数量、消息入队数量、消息出队数量。

# 接收端设置接收延迟,短阻塞

TextMessage receive = (TextMessage) consumer.receive(1000);
1

# MessageListener 消息监听器

consumer.setMessageListener(message -> {

    try {
        TextMessage textMessage = (TextMessage) message;
        System.out.println("收到"+textMessage.getText());
    } catch (JMSException e) {
        e.printStackTrace();
    }
});
System.in.read();
1
2
3
4
5
6
7
8
9
10
Last update: August 15, 2022 09:32
Contributors: Koston Zhuang , PPG007