第一章-Rabbit概述
cnds:https://blog.csdn.net/wangbing25307/article/details/80845641
1什么是Rabbit
消息队列(Message Queue,简称MQ),从字面意思上看,本质是个队列,FIFO先入先出,只不过队列中存放的内容是message而已。
其主要用途:不同进程Process/线程Thread之间通信。RabbitMQ是一个开源的AMQP实现的可复用的企业消息系统。
AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。
AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
作用:
1.服务间异步通信:把消息发送给消息中间件,消息中间件并不立即处理它,后续在慢慢处理。
2.请求削峰:采用队列的形式可以减少突发访问压力,不会因为突发的超时负荷要求而崩溃。
3.解耦:当一个主要应用完成操作后,需要远程调用其它服务进行操作,若其它服务已经崩溃,那么消息就将丢失,使用MQ可以把消息发到MQ上的队列中,让其它服务进行调用,这样其它服务就算崩溃,消息依然存在,当前服务也不会受到影响。
2安装环境
1 rabbit是Erlang编写的,所以要先安装Erlang环境https://www.erlang.org/downloads
2 安装rabbitmq-server-3.8.0服务器
rabbitmq_server-3.8.0\sbin中执行rabbitmq-plugins enable rabbitmq_management
门户管理访问http://localhost:15672/#/ 账号密码都是guest
第二章-单模式应用
1导入jar
1
2
3
4
5
|
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.8.0</version>
</dependency>
|
1简单模式

1ConnectionUtil
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
package com.lx.util;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ConnectionUtil {
public static Connection getBaseConnection() throws IOException, TimeoutException {
return getBaseConnection("localhost",5672,"/","guest","guest");
}
private static Connection getBaseConnection(
String host,int port,String vHost,
String userName,String password
) throws IOException, TimeoutException {
//定义连接工厂
ConnectionFactory factory=new ConnectionFactory();
//设置服务器地址
factory.setHost(host);
//设置端口号
factory.setPort(port);
//虚拟主机,用户密码
factory.setVirtualHost(vHost);
factory.setUsername(userName);
factory.setPassword(password);
//获取连接
Connection connection=factory.newConnection();
return connection;
}
}
|
2provider
若消息即没有自动应答,也没有手动应答,那么消息还会回到队列中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
package com.lx.simple;
import com.lx.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Provider {
public static void main(String[] args) throws IOException, TimeoutException {
// provider();
//1通过工具类获得连接
Connection baseConnection = ConnectionUtil.getBaseConnection();
String queryName="simpleQuery1";
//2获得通道
Channel channel=baseConnection.createChannel();
//3创建队列 5个参数详解
//name:队列名字
// durable 是否持久化(类似快照)
//exclusive 是否排他 作用:1connection.close()该队列是否会自动删除 2如果是排外的,会对当前队列加锁,其他通道channel是不能访问的
//autoDelete自动删除队列 当最后一个消费者断开后,删除
//其它相关参数
channel.queueDeclare(queryName,false,false,false,null);
//4发布消息
for (int i = 0; i <1 ; i++) {
String message="I am a programmer"+i;
channel.basicPublish("",queryName,null,message.getBytes());
}
System.out.println("发送消息");
//5关闭通道和连接
channel.close();
baseConnection.close();
}
}
|
3consumer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
package com.lx.simple;
import com.lx.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
//1通过工具类获得连接
Connection baseConnection = ConnectionUtil.getBaseConnection();
String queryName="simpleQuery1";
//2获得通道
Channel channel=baseConnection.createChannel();
//3创建队列 5个参数详解
//name:队列名字
// durable 是否持久化(类似快照)
//exclusive 是否排他 作用:1connection.close()该队列是否会自动删除 2如果是排外的,会对当前队列加锁,其他通道channel是不能访问的
//autoDelete自动删除队列 当最后一个消费者断开后,删除
//其它相关参数
channel.queueDeclare(queryName,false,false,false,null);
//4接收消息
//参数:1队列名称 2是否自动确认
//
channel.basicConsume(queryName,true,new DefaultConsumer(channel){
//定义队列消费者
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:"+new String(body));
}
});
}
}
|
2work模式

1 provider
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
package com.lx.simple;
import com.lx.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Provider {
public static void main(String[] args) throws IOException, TimeoutException {
// provider();
//1通过工具类获得连接
Connection baseConnection = ConnectionUtil.getBaseConnection();
String queryName="simpleQuery1";
//2获得通道
Channel channel=baseConnection.createChannel();
//3创建队列 5个参数详解
//name:队列名字
// durable 是否持久化(类似快照)
//exclusive 是否排他 作用:1connection.close()该队列是否会自动删除 2如果是排外的,会对当前队列加锁,其他通道channel是不能访问的
//autoDelete自动删除队列 当最后一个消费者断开后,删除
//其它相关参数
channel.queueDeclare(queryName,false,false,false,null);
//4发布消息
for (int i = 0; i <10 ; i++) {
String message="I am a programmer"+i;
channel.basicPublish("",queryName,null,message.getBytes());
}
System.out.println("发送消息");
//5关闭通道和连接
channel.close();
baseConnection.close();
}
}
|
2 consumer
加上//一次只能接收一条 channel.basicQos(1);
就是一次只能接收处理一条,能者多劳。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
public static void main(String[] args) throws IOException, TimeoutException {
//1通过工具类获得连接
Connection baseConnection = ConnectionUtil.getBaseConnection();
String queryName="simpleQuery1";
//2获得通道
final Channel channel=baseConnection.createChannel();
//3创建队列 5个参数详解
//name:队列名字
// durable 是否持久化(类似快照)
//exclusive 是否排他 作用:1connection.close()该队列是否会自动删除 2如果是排外的,会对当前队列加锁,其他通道channel是不能访问的
//autoDelete自动删除队列 当最后一个消费者断开后,删除
//其它相关参数
channel.queueDeclare(queryName,false,false,false,null);
//一次只能接收一条
channel.basicQos(1);
//4接收消息
//参数:1队列名称 2是否自动确认
//自动确认指的是,只要接收到消息就可以了。 不自动确认需要手动应答
channel.basicConsume(queryName,false,new DefaultConsumer(channel){
//定义队列消费者
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:"+new String(body));
try {
TimeUnit.MILLISECONDS.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//手动应答
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
}
|
3发布/订阅-fanout

一个消费者将消息首先发送到交换器,交换器绑定到多个队列,然后被监听该队列的消费者所接收并消费
在RabbitMQ中,交换器主要有四种类型:direct、fanout、topic、headers(不使用)
1producer生产者 把消息发送到交换器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
package com.lx.exchange_fanout;
import com.lx.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class FanoutProvider {
public static void main(String[] args) throws IOException, TimeoutException {
//1通过工具类获得连接
Connection baseConnection = ConnectionUtil.getBaseConnection();
String exchangeName="fanoutProviderQueue";
//2获得通道
Channel channel=baseConnection.createChannel();
//3声明交互器 类型为fanout
channel.exchangeDeclare(exchangeName,"fanout");
//4发布消息
for (int i = 0; i <10 ; i++) {
String message="I am a programmer"+i;
//第一个参数是交换器名,第二个是路由键名
channel.basicPublish(exchangeName,"",null,message.getBytes());
}
System.out.println("发送消息");
//5关闭通道和连接
channel.close();
baseConnection.close();
}
}
|
2consumer消费者
消费者绑定交换器,消费者创建一个队列。多个队列绑定同一交换器,那么多个队列都能接收来自交换器的消息。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
package com.lx.exchange_fanout;
import com.lx.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class FanoutConsumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1通过工具类获得连接
Connection baseConnection = ConnectionUtil.getBaseConnection();
String queryName = "query1";
String exchangeName = "fanoutProviderQueue";
//2获得通道
final Channel channel = baseConnection.createChannel();
//3创建队列 5个参数详解
//name:队列名字
// durable 是否持久化(类似快照)
//exclusive 是否排他 作用:1connection.close()该队列是否会自动删除 2如果是排外的,会对当前队列加锁,其他通道channel是不能访问的
//autoDelete自动删除队列 当最后一个消费者断开后,删除
//其它相关参数
channel.queueDeclare(queryName, false, false, false, null);
//绑定到交换机
channel.queueBind(queryName, exchangeName, "");
//一次只能接收一条
channel.basicQos(1);
//4接收消息
//参数:1队列名称 2是否自动确认
//自动确认值得是,只要接收到消息就可以了。 不自动确认需要手动应答
channel.basicConsume(queryName, false, new DefaultConsumer(channel) {
//定义队列消费者
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:" + new String(body));
try {
TimeUnit.MILLISECONDS.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//手动应答
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}
|
4路由模式direct

1生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
package com.lx.exchange_direct;
import com.lx.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class DirectProvider {
public static void main(String[] args) throws IOException, TimeoutException {
//1通过工具类获得连接
Connection baseConnection = ConnectionUtil.getBaseConnection();
String exchangeName = "directExchange";
//2获得通道
Channel channel = baseConnection.createChannel();
//3声明交换器 类型为direct
channel.exchangeDeclare(exchangeName, "direct");
//4发布消息
for (int i = 0; i < 10; i++) {
String message = "I am a programmer" + i;
String routerKey;
if(i%2==0){
routerKey="red";
}else {
routerKey="grey";
}
//第一个参数是交换器名,第二个是路由键
channel.basicPublish(exchangeName, routerKey, null, message.getBytes());
}
System.out.println("发送消息");
//5关闭通道和连接
channel.close();
baseConnection.close();
}
}
|
2消费者
另起一个消费者 修改队列名,绑定不同的路由键,就会根据路由键的名称去寻找队列。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
|
package com.lx.exchange_direct;
import com.lx.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class DirectConsumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1通过工具类获得连接
Connection baseConnection = ConnectionUtil.getBaseConnection();
String queryName = "query1";
String exchangeName = "directExchange";
//2获得通道
final Channel channel = baseConnection.createChannel();
//3创建队列
channel.queueDeclare(queryName, false, false, false, null);
//绑定到交换机 参数1:队列名 参数2:交换器名 参数3:路由键
channel.queueBind(queryName, exchangeName, "red");
channel.queueBind(queryName, exchangeName, "grey");
//一次只能接收一条
channel.basicQos(1);
//4接收消息
//参数:1队列名称 2是否自动确认
//自动确认值得是,只要接收到消息就可以了。 不自动确认需要手动应答
channel.basicConsume(queryName, false, new DefaultConsumer(channel) {
//定义队列消费者
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:" + new String(body));
try {
TimeUnit.MILLISECONDS.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//手动应答 第二个参数:确认所有消息
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}
|
5主题模式-topic

1
2
3
4
5
6
7
8
|
路由key进行模糊匹配
符号“#”表示匹配一个或多个词,
符号“*”表示匹配一个词
//举个例子
provider提供者发送 路由键为xiangyang.name.lusenlin
接收1 xiangyang.* 无法接收 因为它只匹配一个
接收2 xiangyang.# 可以接收 因为它匹配多个
|

1提供方provider
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
package com.lx.exchange_topic;
import com.lx.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class TopicProvider {
public static void main(String[] args) throws IOException, TimeoutException {
//1通过工具类获得连接
Connection baseConnection = ConnectionUtil.getBaseConnection();
String exchangeName = "topicExchange";
//2获得通道
Channel channel = baseConnection.createChannel();
//3声明交互器 类型为topic
channel.exchangeDeclare(exchangeName, "topic");
//4发布消息
for (int i = 0; i < 10; i++) {
String message = "I am a programmer" + i;
String routerKey;
if(i%2==0){
routerKey="name.da.a1.a2";
message="red"+i;
}else {
routerKey="name.grey.red";
message="grey.red"+i;
}
//第一个参数是交换器名,第二个是路由键
channel.basicPublish(exchangeName, routerKey, null, message.getBytes());
}
System.out.println("发送消息");
//5关闭通道和连接
channel.close();
baseConnection.close();
}
}
|
2消费方consumer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
package com.lx.exchange_topic;
import com.lx.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class TopicConsumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1通过工具类获得连接
Connection baseConnection = ConnectionUtil.getBaseConnection();
String queryName = "query1";
String exchangeName = "topicExchange";
//2获得通道
final Channel channel = baseConnection.createChannel();
//3创建队列 5个参数详解
//name:队列名字
// durable 是否持久化(类似快照)
//exclusive 是否排他 作用:1connection.close()该队列是否会自动删除 2如果是排外的,会对当前队列加锁,其他通道channel是不能访问的
//autoDelete自动删除队列 当最后一个消费者断开后,删除
//其它相关参数
channel.queueDeclare(queryName, false, false, false, null);
//绑定到交换机 参数1:队列名 参数2:交换器名 参数3:路由键
channel.queueBind(queryName, exchangeName, "name.*");
//channel.queueBind(queryName, exchangeName, "*.add");
//一次只能接收一条
channel.basicQos(1);
//4接收消息
//参数:1队列名称 2是否自动确认
//自动确认值得是,只要接收到消息就可以了。 不自动确认需要手动应答
channel.basicConsume(queryName, false, new DefaultConsumer(channel) {
//定义队列消费者
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:" + new String(body));
try {
TimeUnit.MILLISECONDS.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
//手动应答
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}
|
第三章-整合springboot
1简单模式
1加入jar
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>spring-boot-parent</artifactId>
<groupId>org.springframework.boot</groupId>
<version>2.2.4.RELEASE</version>
</parent>
<groupId>org.example</groupId>
<artifactId>rabbit_springboot</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
</project>
|
2yml配置文件
1
2
3
4
5
6
7
8
9
10
11
12
|
server:
port: 8420
spring:
application:
name: springboot_rabbit
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
|
3业务类–生产者:
启动类,不需要过多配置。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
package com.lsl;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class Producer {
@Autowired
private AmqpTemplate rabbitTemplate;
@RequestMapping("send")
public String send() {
String context = "producer hello ";
this.rabbitTemplate.convertAndSend("simpleQueue", context);
return "success";
}
}
|
4业务–消费者,另外创建一个项目,加jar、yml
在消费者中需要注意的是,要在启动类进行配置。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
package com.lx;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
@SpringBootApplication
public class RabbitSpringBootConsumerApp {
@Bean
public Queue queue() {
return new Queue("lanxin_queue");
}
@Bean
public Queue queue1() {
return new Queue("simpleQueue");
}
@Bean
public RabbitListenerContainerFactory<?> simpleListenerFactory(ConnectionFactory connectionFactory){
//创建rabbit监听容器工厂 对 rabbit 进行配置
SimpleRabbitListenerContainerFactory containerFactory=
new SimpleRabbitListenerContainerFactory();
containerFactory.setConnectionFactory(connectionFactory);
//手动ack 手动应答
containerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return containerFactory;
}
public static void main(String[] args) {
SpringApplication.run(RabbitSpringBootConsumerApp.class,args);
}
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
|
package com.lx.simpler1;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class Consumer {
@RabbitListener(queues = "simpleQueue")
public void simpleConsumer(String msg){
System.out.println("consumer1接收到:"+msg);
}
}
|
2工作模式
1启动类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
package com.lx;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
@SpringBootApplication
public class RabbitSpringBootConsumerApp {
@Bean
public Queue queue() {
return new Queue("lanxin_queue");
}
@Bean
public Queue queue1() {
return new Queue("simpleQueue");
}
@Bean
public RabbitListenerContainerFactory<?> simpleListenerFactory(ConnectionFactory connectionFactory){
//创建rabbit监听容器工厂 对 rabbit 进行配置
SimpleRabbitListenerContainerFactory containerFactory=
new SimpleRabbitListenerContainerFactory();
containerFactory.setConnectionFactory(connectionFactory);
//手动ack 手动应答
containerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
//设置能者多劳(一次只能处理一个),覆盖轮询模式
containerFactory.setPrefetchCount(1);
return containerFactory;
}
public static void main(String[] args) {
SpringApplication.run(RabbitSpringBootConsumerApp.class,args);
}
}
|
2业务类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
package com.lx.simpler1;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@Component
public class Consumer {
@RabbitListener(queues = "simpleQueue",containerFactory = "simpleListenerFactory")
public void simpleConsumer(String msg,
@Headers Map<String, Object> headers, Channel channel) throws IOException {
try {
System.out.println("consumer1接收到:"+msg);
//消息确认,(deliveryTag,multiple是否确认所有消息)
channel.basicAck(Long.parseLong(String.valueOf(headers.get(AmqpHeaders.DELIVERY_TAG))),false);
} catch (Exception e) {
//消息拒绝(deliveryTag,multiple,requeue拒绝后是否重新回到队列)
channel.basicNack(Long.parseLong(String.valueOf(headers.get(AmqpHeaders.DELIVERY_TAG))), false, false);
}
}
@RabbitListener(queues = "simpleQueue",containerFactory = "simpleListenerFactory")
public void simpleConsumer1(String msg,
@Headers Map<String, Object> headers, Channel channel) throws IOException {
try {
System.out.println("consumer2接收到:"+msg);
TimeUnit.MILLISECONDS.sleep(2000);
//消息确认,(deliveryTag,multiple是否确认所有消息)
channel.basicAck(Long.parseLong(String.valueOf(headers.get(AmqpHeaders.DELIVERY_TAG))),false);
} catch (Exception e) {
//消息拒绝(deliveryTag,multiple,requeue拒绝后是否重新回到队列)
channel.basicNack(Long.parseLong(String.valueOf(headers.get(AmqpHeaders.DELIVERY_TAG))), false, false);
}
}
}
|
3fanout模式

生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
package com.lsl;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class Producer {
@Autowired
private AmqpTemplate rabbitTemplate;
@RequestMapping("fanoutSend1")
public String send1() {
String context = "producer fanoutSend1 ";
String exchangeName="fanoutExchange";
//两个参数为 队列名 消息
//三个参数为 交换器名,路由键名,消息
this.rabbitTemplate.convertAndSend(exchangeName,"", context);
return "success";
}
}
|
消费者
1启动类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
|
package com.lx;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
@SpringBootApplication
public class FanoutConsumerApp {
@Bean
public Queue queue1() {
return new Queue("fanoutQueue1");
}
@Bean
public Queue queue2() {
return new Queue("fanoutQueue2");
}
//声明一个fanout的交换机
@Bean
public FanoutExchange exchange(){
return new FanoutExchange("fanoutExchange");
}
//把队列绑定到交换机
@Bean
public Binding bindingQueue1(){
return BindingBuilder.bind(queue1()).to(exchange());
}
@Bean
public Binding bindingQueue2(){
return BindingBuilder.bind(queue2()).to(exchange());
}
@Bean
public RabbitListenerContainerFactory<?> simpleListenerFactory(ConnectionFactory connectionFactory){
//创建rabbit监听容器工厂 对 rabbit 进行配置
SimpleRabbitListenerContainerFactory containerFactory=
new SimpleRabbitListenerContainerFactory();
containerFactory.setConnectionFactory(connectionFactory);
//手动ack 手动应答
containerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
//设置能者多劳(一次只能处理一个),覆盖轮询模式
containerFactory.setPrefetchCount(1);
return containerFactory;
}
public static void main(String[] args) {
SpringApplication.run(FanoutConsumerApp.class,args);
}
}
|
2业务类:消费
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
|
package com.lx.fanout;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Map;
@Component
public class Consumer {
@RabbitListener(queues = "fanoutQueue1",containerFactory = "simpleListenerFactory")
public void simpleConsumer(String msg,
@Headers Map<String, Object> headers, Channel channel) throws IOException {
try {
System.out.println("fanout consumer1接收到:"+msg);
//消息确认,(deliveryTag,multiple是否确认所有消息)
channel.basicAck(Long.parseLong(String.valueOf(headers.get(AmqpHeaders.DELIVERY_TAG))),false);
} catch (Exception e) {
//消息拒绝(deliveryTag,multiple,requeue拒绝后是否重新回到队列)
channel.basicNack(Long.parseLong(String.valueOf(headers.get(AmqpHeaders.DELIVERY_TAG))), false, false);
}
}
@RabbitListener(queues = "fanoutQueue2",containerFactory = "simpleListenerFactory")
public void simpleConsumer1(String msg,
@Headers Map<String, Object> headers, Channel channel) throws IOException {
try {
System.out.println("fanout consumer2接收到:"+msg);
//消息确认,(deliveryTag,multiple是否确认所有消息)
channel.basicAck(Long.parseLong(String.valueOf(headers.get(AmqpHeaders.DELIVERY_TAG))),false);
} catch (Exception e) {
System.out.println("出现了错误 consumer2");
//消息拒绝(deliveryTag,multiple,requeue拒绝后是否重新回到队列)
channel.basicNack(Long.parseLong(String.valueOf(headers.get(AmqpHeaders.DELIVERY_TAG))), false, false);
}
}
}
|
4direct模式
这里使用的新方法,三种交换器模式都适用。

生产者
路由键不同,xiangtan、moyang是一个队列 xiangyang是一个队列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
package com.lsl;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class DirectProducer {
@Autowired
private AmqpTemplate rabbitTemplate;
@RequestMapping("directSend1")
public String send3() {
String context = "producer directSend1 ";
String exchangeName="directExchange";
//两个参数为 队列名 消息
//三个参数为 交换器名,路由键名,消息
this.rabbitTemplate.convertAndSend(exchangeName,"xiangyang", context);
return "success";
}
@RequestMapping("directSend2")
public String send4() {
String context = "producer directSend2 ";
String exchangeName="directExchange";
//两个参数为 队列名 消息
//三个参数为 交换器名,路由键名,消息
this.rabbitTemplate.convertAndSend(exchangeName,"moyang", context);
return "success";
}
@RequestMapping("directSend3")
public String send5() {
String context = "producer directSend3 ";
String exchangeName="directExchange";
//两个参数为 队列名 消息
//三个参数为 交换器名,路由键名,消息
this.rabbitTemplate.convertAndSend(exchangeName,"xiangtan", context);
return "success";
}
}
|
消费者
1业务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
|
package com.lx.direct;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Map;
@Component
public class Consumer {
@RabbitListener(containerFactory = "simpleListenerFactory"
,bindings = {
@QueueBinding(value = @Queue(value = "hubei"),
exchange = @Exchange(value = "directExchange", type = "direct"),
key = {"xiangyang"}
)
}
)
public void simpleConsumer(String msg,
@Headers Map<String, Object> headers, Channel channel) throws IOException {
try {
System.out.println("direct consumer1 湖北 接收到:"+msg);
//消息确认,(deliveryTag,multiple是否确认所有消息)
channel.basicAck(Long.parseLong(String.valueOf(headers.get(AmqpHeaders.DELIVERY_TAG))),false);
} catch (Exception e) {
//消息拒绝(deliveryTag,multiple,requeue拒绝后是否重新回到队列)
channel.basicNack(Long.parseLong(String.valueOf(headers.get(AmqpHeaders.DELIVERY_TAG))), false, false);
}
}
@RabbitListener(containerFactory = "simpleListenerFactory"
,bindings = {
@QueueBinding(value = @Queue(value = "hunan"),
exchange = @Exchange(value = "directExchange", type = "direct"),
key = {"xiangtan","moyang"}
)
}
)
public void simpleConsumer1(String msg,
@Headers Map<String, Object> headers,
Channel channel) throws IOException {
try {
System.out.println("direct consumer1 湖南 接收到:"+msg);
//消息确认,(deliveryTag,multiple是否确认所有消息)
channel.basicAck(Long.parseLong(String.valueOf(headers.get(AmqpHeaders.DELIVERY_TAG))),false);
} catch (Exception e) {
//消息拒绝(deliveryTag,multiple,requeue拒绝后是否重新回到队列)
channel.basicNack(Long.parseLong(String.valueOf(headers.get(AmqpHeaders.DELIVERY_TAG))), false, false);
}
}
}
|
2启动类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
package com.lx;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
@SpringBootApplication
public class DirectConsumerApp {
@Bean
public RabbitListenerContainerFactory<?> simpleListenerFactory(ConnectionFactory connectionFactory){
//创建rabbit监听容器工厂 对 rabbit 进行配置
SimpleRabbitListenerContainerFactory containerFactory=
new SimpleRabbitListenerContainerFactory();
containerFactory.setConnectionFactory(connectionFactory);
//手动ack 手动应答
containerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
//设置能者多劳(一次只能处理一个),覆盖轮询模式
containerFactory.setPrefetchCount(1);
return containerFactory;
}
public static void main(String[] args) {
SpringApplication.run(DirectConsumerApp.class,args);
}
}
|
5topic模式

1
2
3
4
5
6
7
8
|
路由key进行模糊匹配
符号“#”表示匹配一个或多个词,
符号“*”表示匹配一个词
//举个例子
provider提供者发送 路由键为xiangyang.name.lusenlin
接收1 xiangyang.* 无法接收 因为它只匹配一个
接收2 xiangyang.# 可以接收 因为它匹配多个
|
只需要在direct模式中修改发送方,和消费方的type、key就好。
生产者
1
2
3
4
5
6
7
8
9
|
@RequestMapping("topicSend1")
public String send3() {
String context = "producer topicSend1 ";
String exchangeName="directExchange";
//两个参数为 队列名 消息
//三个参数为 交换器名,路由键名,消息
this.rabbitTemplate.convertAndSend(exchangeName,"hunan.moyang.a", context);
return "success";
}
|
消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
@RabbitListener(containerFactory = "simpleListenerFactory"
,bindings = {
@QueueBinding(value = @Queue(value = "hunan"),
exchange = @Exchange(value = "directExchange", type = "topic"),
key = {"hunan.#"}
)
}
)
public void simpleConsumer1(String msg,
@Headers Map<String, Object> headers,
Channel channel) throws IOException {
try {
System.out.println("direct consumer1 湖南 接收到:"+msg);
//消息确认,(deliveryTag,multiple是否确认所有消息)
channel.basicAck(Long.parseLong(String.valueOf(headers.get(AmqpHeaders.DELIVERY_TAG))),false);
} catch (Exception e) {
//消息拒绝(deliveryTag,multiple,requeue拒绝后是否重新回到队列)
channel.basicNack(Long.parseLong(String.valueOf(headers.get(AmqpHeaders.DELIVERY_TAG))), false, false);
}
}
|
注意
启动类需要定义队列,设置等等。
当队列信息在生产者中不存在时候,队列还不存在,生产的消息就会丢失。
若队列信息在消费者存在,生产者此时即使没有队列信息,但队列会在消费者创建时创建。
第四章-高级应用
消息确认
生产者确认
简单模式:Channel信道的confirmSelect方法将当前信道设置成了confirm模式
springboot的消息确认
1yml设置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
server:
port: 8420
spring:
application:
name: springboot_rabbit
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
#confirm确认类型 correlated相关联的
publisher-confirm-type: correlated
#return确认
publisher-returns: true
#设置为 true 后 消费者在消息没有被路由到合适队列情况下会被return监听,false会自动删除
template:
mandatory: true
|
如果消息没有到exchange,则confirm回调,ack=false
如果消息到达exchange,则confirm回调,ack=true
exchange到queue成功,则不回调return
exchange到queue失败,则回调return(需设置mandatory=true,否则不回回调,消息就丢了)
2业务消费
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
|
package com.lsl;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class ConfirmProducer {
//@Autowired
private RabbitTemplate rabbitTemplate1;
@Autowired
public ConfirmProducer(RabbitTemplate rabbitTemplate1){
this.rabbitTemplate1=rabbitTemplate1;
this.rabbitTemplate1.setReturnCallback(returnCallback);
this.rabbitTemplate1.setConfirmCallback(confirmCallback);
this.rabbitTemplate1.setMandatory(true); //设置为 true 后 消费者在消息没有被路由到合适队列情况下会被return监听,而不会自动删除
}
final private RabbitTemplate.ConfirmCallback confirmCallback=new RabbitTemplate.ConfirmCallback() {
public void confirm(CorrelationData correlationData, boolean ack, String s) {
System.out.println("消息确认confirm: " +(correlationData==null?"回调数据为空":correlationData.getId()) + ",消息为=" + s + ",是否确认:" + ack);
}
};
final private RabbitTemplate.ReturnCallback returnCallback=new RabbitTemplate.ReturnCallback() {
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.err.println("return exchange: " + exchange + ", routingKey: "
+ routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText);
}
};
@RequestMapping("confirmSend")
public String send() {
String context = "producer hello ";
String exchangeName="directExchange";
//this.rabbitTemplate1.setMandatory(true); //设置为 true 后 消费者在消息没有被路由到合适队列情况下会被return监听,而不会自动删除
//设置 确认的回调函数对象
this.rabbitTemplate1.setConfirmCallback(confirmCallback);
//设置 return的回调函数
this.rabbitTemplate1.setReturnCallback(returnCallback);
for (int i = 1; i <= 1; i++) {
//this.rabbitTemplate1.convertAndSend("simpleQueue", (Object) context,new CorrelationData(UUID.randomUUID().toString()));
this.rabbitTemplate1.convertAndSend(exchangeName,"hubei.a",context);
}
return "confirmSend success";
}
@RequestMapping("confirmSend1")
public String send1() {
String context = "producer hello ";
String exchangeName="directExchangesad";
for (int i = 1; i <= 1; i++) {
this.rabbitTemplate1.convertAndSend(exchangeName,"hubei.a",context);
}
return "交换器错误confirmSend1 success";
}
@RequestMapping("confirmSend2")
public String send2() {
String context = "producer hello ";
String exchangeName="directExchange";
for (int i = 1; i <= 1; i++) {
//this.rabbitTemplate1.convertAndSend("simpleQueue", (Object) context,new CorrelationData(UUID.randomUUID().toString()));
this.rabbitTemplate1.convertAndSend(exchangeName,"adasdhubei.a",context);
}
return "队列错误 confirmSend2 success";
}
}
|