Springboot整合RabbitMQ
参考demo:G:\学习资料\java\java学习文档\java技术文档\消息队列文档\rabbimitmq\cd-xufei-rabbiitmq-parent\cd-xufei-sprinboot-rabbitmq
# 1、导包
<!--这个是rabbitMQ的这个使用的包-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
1
2
3
4
5
2
3
4
5
# 2、编写配置文件
# rabbitmq
spring:
rabbitmq:
virtual-host: /
host: 169.254.182.30
port: 5672
username: xff
password: xff
# return机制
publisher-returns: true
# confirm机制
publisher-confirms: true
listener:
direct:
# 设置手动应答
acknowledge-mode: manual
template:
# 搭配 publisher-returns 使用 属于return机制
# 当消息投递过程中不可达目的地时将消息返回给生产者的功能
mandatory: true
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 3、编写RabbitMQConfig文件
查看示例
@SpringBootConfiguration
public class RabbitMQConfig {
/**
* 工作模型
*
* @return
*/
@Bean
public Queue queueWork() {
return new Queue("sb.queueWork");
}
/**
* 发布订阅模型
*/
// 首先声明两个队列
@Bean
public Queue queueFanout1() {
return new Queue("queueFanout1");
}
@Bean
public Queue queueFanout2() {
return new Queue("queueFanout2");
}
// 其次声明发布订阅模型的交换机
@Bean
public FanoutExchange fanoutExchange1() {
return new FanoutExchange("fanoutExchange1");
}
// 然后绑定队列到交换机
@Bean
public Binding fanoutExchangeBind1(Queue queueFanout1, FanoutExchange fanoutExchange1) {
return BindingBuilder.bind(queueFanout1).to(fanoutExchange1);
}
@Bean
public Binding fanoutExchangeBind2(Queue queueFanout2, FanoutExchange fanoutExchange1) {
return BindingBuilder.bind(queueFanout2).to(fanoutExchange1);
}
/**
* 玩下主题模型
*/
// 声明主题模式队列1
@Bean
public Queue queuetopic1() {
return new Queue("queuetopic1");
}
// 声明主题模式队列2
@Bean
public Queue queuetopic2() {
return new Queue("queuetopic2");
}
// 声明主题模型的交换机
@Bean
public TopicExchange topicExchange1() {
return new TopicExchange("topicExchange1");
}
// 将队列绑定到交换机
@Bean
public Binding bindingTopicExchange1(Queue queuetopic1, TopicExchange topicExchange1) {
// 模糊匹配是指 * 模糊匹配生产者的 key 比如生产者的key是 xf.c 那么消费者也能接收到消息
return BindingBuilder.bind(queuetopic1).to(topicExchange1).with("xiaofeifei.*");
}
@Bean
public Binding bindingTopicExchange2(Queue queuetopic2, TopicExchange topicExchange1) {
return BindingBuilder.bind(queuetopic2).to(topicExchange1).with("xiaofeifei.*");
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# 4、编写 Manager
查看示例
@Component
public class RabbitMQManager {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 向队列模型中 发送数据
*
* @param queueName : 队列的名字
* @param msg :发送的数据
*/
public void sendWork(String queueName, String msg) {
rabbitTemplate.convertAndSend(queueName, msg);
}
/**
* 向发布订阅模型发布消息
*
* @param exchangeName
* @param msg
*/
public void sendFanout(String exchangeName, String msg) {
// 如果要实现 confirm 机制
// rabbitTemplate.setMandatory(true);
// rabbitTemplate.setConfirmCallback(new MyConfirmListener());//设置发送数据的监听
// 设置的是 没有地方去的数据的监听 return 机制
// rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
// @Override
// public void returnedMessage(Message message, int i, String s, String s1, String s2) {
//
// }
//});
rabbitTemplate.convertSendAndReceive(exchangeName, "", msg);
}
/**
* 向主题模型发布消息
*
* @param exchangeName
* @param msg
*/
public void sendTopic(String exchangeName, String msg, String routingKey) {
rabbitTemplate.convertSendAndReceive(exchangeName, routingKey, msg);
}
class MyConfirmListener implements RabbitTemplate.ConfirmCallback {
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# 5、编写Service
查看示例
public interface IRabbitMQService {
/**
* 向队列模型中 发送数据
*
* @param queueName : 队列的名字
* @param msg :发送的数据
*/
void sendWork(String queueName, String msg);
/**
* 向发布订阅模型发布消息
*
* @param exchangeName
* @param msg
*/
void sendFanout(String exchangeName, String msg);
/**
* 向主题模型发布消息
*
* @param exchangeName
* @param msg
*/
void sendTopic(String exchangeName, String msg, String routingKey);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 6、编写Controller
查看示例
@RestController
public class RabbitMQController {
@Autowired
private IRabbitMQService rabbitMQService;
/**
* 玩下工作模型
*
* @return
*/
@RequestMapping("/sendwork")
public Object sendwork() {
for (int i = 0; i < 10; i++) {
rabbitMQService.sendWork("sb.queueWork", "这个是工作模型整合的测试的地方" + i);
}
return "数据发送成功";
}
/**
* 玩下发布订阅模型
*
* @return
*/
@RequestMapping("/sendFanout")
public Object sendFanout() {
for (int i = 0; i < 5; i++) {
rabbitMQService.sendFanout("fanoutExchange1", "这个是发布订阅模型测试的地方" + i);
}
return "数据发送成功";
}
/**
* 玩下主题模型
*
* @return
*/
@RequestMapping("/sendTopic")
public Object sendTopic() {
for (int i = 0; i < 5; i++) {
rabbitMQService.sendTopic("topicExchange1", "这个是主题模型测试的地方" + i, "xiaofeifei.aa");
}
return "数据发送成功";
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
最近更新: 2025/01/07, 09:25:39