Hello World
参考demo项目:G:\学习资料\java\java学习文档\java技术文档\消息队列文档\rabbimitmq\cd-xufei-rabbiitmq-parent\cd-xufei-rabbitmq-01 => helloworld
生产者 发送数据到 队列中去 队列将数据发送给消费者
# 生产者写法
提示:若队列不存在,将会自动创建
查看示例
public class Producer {
/**
* 申明队列的名字
*/
private static final String QUEUE_NAME = "helloWorld-Que1";
public static void main(String[] args) throws Exception {
// 1.获取连接
Connection connection = ConnectionUtils.getConnection();
// 2.建立通道
Channel channel = connection.createChannel();
// 3.申明一个队列,如果队列不存在会创建队列
/**
*第一个参数:队列的名字
*第二个参数:是否持久化 :这个数据 发送到队列去的时候 这个数据 是否要持久化到数据库(内存中)中去
*第三个参数:是否排外
* 第一个意思:是否允许这个通道以外的 其他消费者来消费这个数据
* 第二个意思:连接关闭之后 这个队列是否自动删除
*第四个参数:是否自动删除
* 表示的是 当最后一个连接退出的时候是否要删除这个队列
*第五个参数:创建队列 附带的头的信息(TTL队列 死信队列)
*
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 通道和队列都声明成功之后生产者就可以向这个队列里面发送消息了
/**
* 第一个参数:交换机的名字 (没有交换机的话就直接填空)
* 第二个参数:路由的key 因为这里是将数据 直接发送到队列中 所以这里的路由key直接就是 队列的名字
* 第三个参数:发送消息的时候 附带的一些属性信息
* 第四个参数:发送的这个消息体的内容
*/
for (int i = 0; i < 100; i++) {
channel.basicPublish("", QUEUE_NAME, null, ("我是第一个helloword程序" + i).getBytes());
}
// 这里可以关闭掉这个通道 以及这个连接
// channel.close();
// connection.close();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# 消费者写法
查看示例
public class Consumer {
/**
* 申明队列的名字
*/
private static final String QUEUE_NAME = "helloWorld-Que1";
public static void main(String[] args) throws Exception {
// 1、获取连接
Connection connection = ConnectionUtils.getConnection();
// 2、建立通道
Channel channel = connection.createChannel();
// 3、申明一个队列
/**
*第一个参数:队列的名字
*第二个参数:是否持久化 :这个数据 发送到队列去的时候 这个数据 是否要持久化到数据库中去
*第三个参数:是否排外
* 第一个意思:是否允许这个通道以外的 其他消费者来消费这个数据
* 第二个意思:连接关闭之后 这个队列是否自动删除
*第四个参数:是否自动删除
* 表示的是 当最后一个连接退出的时候是否要删除这个队列
*第五个参数:创建队列 付带的头的信息(TTL 死信队列)
*
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 4、申明一个消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
/**
*
* @param consumerTag : 表示的是 消费者 唯一的标记
* @param envelope:信封 : 消息的一个封装
* @param properties:前面生产者发送消息的时候 写入的键值对
* @param body :获取到的消息的内容
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接受到的消息是" + new String(body));
// 消费消息的时候如果设置为手动应答需要下面的设置,这样才会保证业务处理和消息消费同时成功
/**
* 第一个参数 当前这条消息的标记
* 第二个参数 是否自动应答
*/
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
// 5、将消费者和队列进行绑定
/**
* 第一个参数:队列的名字
* 第二个参数:是否自动应答设置为
* true 属于自动应答,消费者消费了消息之后会自动告诉队列我已经消费了,
* 队列就会将数据删除了,这样可能会导致业务处理失败了但是消息队列上面的数据也被消费了
* false 属于不自动应答,意思消费者消费了消息之后不会主动告诉队列我消费了,需要我们手动去通知消息队列删除数据,
* 第三个参数是 声明的消费者
*/
//channel.basicConsume(QUEUE_NAME,true,consumer);
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# 连接帮助类
查看示例
public class ConnectionUtils {
/**
* 这个就是获取 连接的帮助包
*
* @return
*/
public static Connection getConnection() throws Exception {
// 第一步:获取连接的工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 设置端口 默认就是5672
connectionFactory.setPort(5672);
// 设置用户名
connectionFactory.setUsername("xff");
// 设置密码
connectionFactory.setPassword("xff");
// 这里这个虚拟主机 默认设置成/就可以了
connectionFactory.setVirtualHost("/");
// 设置我们的主机
connectionFactory.setHost("169.254.182.30");
// 设置连接的超时时间
connectionFactory.setConnectionTimeout(30000);
// 获取咋们的连接
return connectionFactory.newConnection();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
最近更新: 2025/07/30, 15:37:56