Stream
PPG007 ... 2021-12-28 About 2 min
# Stream
Stream 支持 RabbitMQ 和 Kafka。
# 依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
1
2
3
4
2
3
4
# 消息生产者
修改配置文件:
Tips
如果使用下面的配置方式配置 RabbitMQ,且如果 RabbitMQ 不在本地,程序将进行两次连接,第一次连接到远程服务器的消息队列,第二次连接本地消息队列,如果本地没有消息队列,则会抛出异常,要解决这个问题请参考消息消费者的配置方法。
server:
port: 8801
spring:
application:
name: stream-provider
cloud:
stream:
binders:
defaultRabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: 192.168.3.14
port: 5672
username: rabbitmq
password: ${spring.cloud.stream.binders.defaultRabbit.environment.spring.rabbitmq.username}
bindings:
output:
destination: exchangeDemo
content-type: application/json
# binder: defaultRabbit 加不加没啥影响
eureka:
client:
service-url:
defaultZone: http://192.168.3.14:7001/eureka/,http://192.168.3.55:7002/eureka/
instance:
prefer-ip-address: true
ip-address: 127.0.0.1
non-secure-port: 8801
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
编写业务实现类:
@EnableBinding({Source.class})
public class MessageProviderImpl implements IMessageProvider {
@Autowired
private MessageChannel output;
@Override
public String send(String message) {
output.send(MessageBuilder.withPayload(message).build());
return null;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
2
3
4
5
6
7
8
9
10
11
12
13
然后编写 controller 进行访问即可。
# 消息消费者
编写配置文件:
Tips
将消息队列连接配置移到 spring 下就不会出现重连两次的问题。
server:
port: 8802
spring:
rabbitmq:
host: 192.168.3.14
port: 5672
username: rabbitmq
password: ${spring.rabbitmq.username}
application:
name: stream-consumer
cloud:
stream:
binders:
defaultRabbit:
type: rabbit
bindings:
input:
destination: exchangeDemo
content-type: application/json
# binder: defaultRabbit
eureka:
client:
service-url:
defaultZone: http://192.168.3.14:7001/eureka/,http://192.168.3.55:7002/eureka/
instance:
prefer-ip-address: true
ip-address: 127.0.0.1
non-secure-port: 8802
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
编写消息消费类:
@EnableBinding(Sink.class)
@Component
public class ReceiveController {
@Value("${server.port}")
private String port;
@StreamListener(Sink.INPUT)
public void input(Message<String> message){
System.out.println("消费者1号收到消息---------->"+message.getPayload()+"\t 端口号:"+port);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
2
3
4
5
6
7
8
9
10
11
12
13
# 解决重复消费问题
通过分组实现解决重复消费问题。
分组
发送一条消息,同一个分组内的所有消费者中只有一个能消费这条消息,不同分组可以重复消费。
修改消费者配置文件,指定分组:
spring:
rabbitmq:
host: 192.168.3.14
port: 5672
username: rabbitmq
password: ${spring.rabbitmq.username}
application:
name: stream-consumer
cloud:
stream:
binders:
defaultRabbit:
type: rabbit
bindings:
input:
destination: exchangeDemo
content-type: application/json
group: ppg
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 消息持久化
如果不显示指定分组,在消费者下线期间产生的消息不会被这个消费者消费,如果指定了分组,那么下线期间产生的消息也会被消费。