第一章-Rabbit概述

cnds:https://blog.csdn.net/wangbing25307/article/details/80845641

1什么是Rabbit

消息队列(Message Queue,简称MQ),从字面意思上看,本质是个队列,FIFO先入先出,只不过队列中存放的内容是message而已。 其主要用途:不同进程Process/线程Thread之间通信。RabbitMQ是一个开源的AMQP实现的可复用的企业消息系统。

AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。

AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。

作用:

1.服务间异步通信:把消息发送给消息中间件,消息中间件并不立即处理它,后续在慢慢处理。 2.请求削峰:采用队列的形式可以减少突发访问压力,不会因为突发的超时负荷要求而崩溃。 3.解耦:当一个主要应用完成操作后,需要远程调用其它服务进行操作,若其它服务已经崩溃,那么消息就将丢失,使用MQ可以把消息发到MQ上的队列中,让其它服务进行调用,这样其它服务就算崩溃,消息依然存在,当前服务也不会受到影响。

2安装环境

1 rabbit是Erlang编写的,所以要先安装Erlang环境https://www.erlang.org/downloads

2 安装rabbitmq-server-3.8.0服务器

rabbitmq_server-3.8.0\sbin中执行rabbitmq-plugins enable rabbitmq_management

门户管理访问http://localhost:15672/#/ 账号密码都是guest

第二章-单模式应用

1导入jar

1
2
3
4
5
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.8.0</version>
    </dependency>

1简单模式

image-20200806221918005

1ConnectionUtil

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package com.lx.util;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ConnectionUtil {

    public static Connection getBaseConnection() throws IOException, TimeoutException {
        return getBaseConnection("localhost",5672,"/","guest","guest");
    }

    private static Connection getBaseConnection(
            String host,int port,String vHost,
            String userName,String password
    ) throws IOException, TimeoutException {
        //定义连接工厂
        ConnectionFactory factory=new ConnectionFactory();
        //设置服务器地址
        factory.setHost(host);

        //设置端口号
        factory.setPort(port);

        //虚拟主机,用户密码
        factory.setVirtualHost(vHost);
        factory.setUsername(userName);
        factory.setPassword(password);

        //获取连接
        Connection connection=factory.newConnection();
        return connection;
    }
}

2provider

若消息即没有自动应答,也没有手动应答,那么消息还会回到队列中。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package com.lx.simple;

import com.lx.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Provider {
    public static void main(String[] args) throws IOException, TimeoutException {
       // provider();
        //1通过工具类获得连接
        Connection baseConnection = ConnectionUtil.getBaseConnection();

        String queryName="simpleQuery1";

        //2获得通道
        Channel channel=baseConnection.createChannel();

        //3创建队列   5个参数详解
        //name:队列名字
        // durable 是否持久化(类似快照)
        //exclusive 是否排他 作用:1connection.close()该队列是否会自动删除 2如果是排外的,会对当前队列加锁,其他通道channel是不能访问的
        //autoDelete自动删除队列 当最后一个消费者断开后,删除
        //其它相关参数
        channel.queueDeclare(queryName,false,false,false,null);

        //4发布消息
        for (int i = 0; i <1 ; i++) {
            String message="I am a programmer"+i;
            channel.basicPublish("",queryName,null,message.getBytes());
        }
        System.out.println("发送消息");
        //5关闭通道和连接
        channel.close();
        baseConnection.close();
    }
}

3consumer

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package com.lx.simple;

import com.lx.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1通过工具类获得连接
        Connection baseConnection = ConnectionUtil.getBaseConnection();

        String queryName="simpleQuery1";

        //2获得通道
        Channel channel=baseConnection.createChannel();

        //3创建队列   5个参数详解
        //name:队列名字
        // durable 是否持久化(类似快照)
        //exclusive 是否排他 作用:1connection.close()该队列是否会自动删除 2如果是排外的,会对当前队列加锁,其他通道channel是不能访问的
        //autoDelete自动删除队列 当最后一个消费者断开后,删除
        //其它相关参数
        channel.queueDeclare(queryName,false,false,false,null);

        //4接收消息
        //参数:1队列名称 2是否自动确认
        //
        channel.basicConsume(queryName,true,new DefaultConsumer(channel){
            //定义队列消费者
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("body:"+new String(body));
            }
        });
    }
}

2work模式

image-20200807111803229

1 provider

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package com.lx.simple;

import com.lx.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Provider {
    public static void main(String[] args) throws IOException, TimeoutException {
       // provider();
        //1通过工具类获得连接
        Connection baseConnection = ConnectionUtil.getBaseConnection();

        String queryName="simpleQuery1";

        //2获得通道
        Channel channel=baseConnection.createChannel();

        //3创建队列   5个参数详解
        //name:队列名字
        // durable 是否持久化(类似快照)
        //exclusive 是否排他 作用:1connection.close()该队列是否会自动删除 2如果是排外的,会对当前队列加锁,其他通道channel是不能访问的
        //autoDelete自动删除队列 当最后一个消费者断开后,删除
        //其它相关参数
        channel.queueDeclare(queryName,false,false,false,null);

        //4发布消息
        for (int i = 0; i <10 ; i++) {
            String message="I am a programmer"+i;
            channel.basicPublish("",queryName,null,message.getBytes());
        }
        System.out.println("发送消息");
        //5关闭通道和连接
        channel.close();
        baseConnection.close();
    }

}

2 consumer

加上//一次只能接收一条 channel.basicQos(1);就是一次只能接收处理一条,能者多劳。

image-20200807112142127

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public static void main(String[] args) throws IOException, TimeoutException {
    //1通过工具类获得连接
    Connection baseConnection = ConnectionUtil.getBaseConnection();

    String queryName="simpleQuery1";

    //2获得通道
   final Channel channel=baseConnection.createChannel();

    //3创建队列   5个参数详解
    //name:队列名字
    // durable 是否持久化(类似快照)
    //exclusive 是否排他 作用:1connection.close()该队列是否会自动删除 2如果是排外的,会对当前队列加锁,其他通道channel是不能访问的
    //autoDelete自动删除队列 当最后一个消费者断开后,删除
    //其它相关参数
    channel.queueDeclare(queryName,false,false,false,null);

    //一次只能接收一条
    channel.basicQos(1);
    //4接收消息
    //参数:1队列名称 2是否自动确认
    //自动确认指的是,只要接收到消息就可以了。 不自动确认需要手动应答
    channel.basicConsume(queryName,false,new DefaultConsumer(channel){
        //定义队列消费者
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.out.println("body:"+new String(body));
            try {
                TimeUnit.MILLISECONDS.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //手动应答
            channel.basicAck(envelope.getDeliveryTag(),false);
        }

    });

}

3发布/订阅-fanout

image-20200807113443933

一个消费者将消息首先发送到交换器,交换器绑定到多个队列,然后被监听该队列的消费者所接收并消费

在RabbitMQ中,交换器主要有四种类型:direct、fanout、topic、headers(不使用)

1producer生产者 把消息发送到交换器

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.lx.exchange_fanout;

import com.lx.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class FanoutProvider {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1通过工具类获得连接
        Connection baseConnection = ConnectionUtil.getBaseConnection();

        String exchangeName="fanoutProviderQueue";

        //2获得通道
        Channel channel=baseConnection.createChannel();
				//3声明交互器 类型为fanout
        channel.exchangeDeclare(exchangeName,"fanout");

        //4发布消息
        for (int i = 0; i <10 ; i++) {
            String message="I am a programmer"+i;
            //第一个参数是交换器名,第二个是路由键名
            channel.basicPublish(exchangeName,"",null,message.getBytes());
        }
        System.out.println("发送消息");
        //5关闭通道和连接
        channel.close();
        baseConnection.close();
    }
}

2consumer消费者

消费者绑定交换器,消费者创建一个队列。多个队列绑定同一交换器,那么多个队列都能接收来自交换器的消息。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package com.lx.exchange_fanout;

import com.lx.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class FanoutConsumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1通过工具类获得连接
        Connection baseConnection = ConnectionUtil.getBaseConnection();

        String queryName = "query1";
        String exchangeName = "fanoutProviderQueue";
        //2获得通道
        final Channel channel = baseConnection.createChannel();

        //3创建队列   5个参数详解
        //name:队列名字
        // durable 是否持久化(类似快照)
        //exclusive 是否排他 作用:1connection.close()该队列是否会自动删除 2如果是排外的,会对当前队列加锁,其他通道channel是不能访问的
        //autoDelete自动删除队列 当最后一个消费者断开后,删除
        //其它相关参数
        channel.queueDeclare(queryName, false, false, false, null);

        //绑定到交换机
        channel.queueBind(queryName, exchangeName, "");
        //一次只能接收一条
        channel.basicQos(1);
        //4接收消息
        //参数:1队列名称 2是否自动确认
        //自动确认值得是,只要接收到消息就可以了。 不自动确认需要手动应答
        channel.basicConsume(queryName, false, new DefaultConsumer(channel) {
            //定义队列消费者
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("body:" + new String(body));
                try {
                    TimeUnit.MILLISECONDS.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //手动应答
                channel.basicAck(envelope.getDeliveryTag(), false);
            }

        });
    }
}

4路由模式direct

image-20200807132047005

1生产者

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package com.lx.exchange_direct;

import com.lx.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class DirectProvider {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1通过工具类获得连接
        Connection baseConnection = ConnectionUtil.getBaseConnection();

        String exchangeName = "directExchange";

        //2获得通道
        Channel channel = baseConnection.createChannel();
				//3声明交换器 类型为direct
        channel.exchangeDeclare(exchangeName, "direct");

        //4发布消息
        for (int i = 0; i < 10; i++) {
            String message = "I am a programmer" + i;
            String routerKey;
            if(i%2==0){
                routerKey="red";
            }else {
                routerKey="grey";
            }
            //第一个参数是交换器名,第二个是路由键
            channel.basicPublish(exchangeName, routerKey, null, message.getBytes());
        }
        System.out.println("发送消息");
        //5关闭通道和连接
        channel.close();
        baseConnection.close();
    }
}

2消费者

另起一个消费者 修改队列名,绑定不同的路由键,就会根据路由键的名称去寻找队列。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package com.lx.exchange_direct;

import com.lx.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class DirectConsumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1通过工具类获得连接
        Connection baseConnection = ConnectionUtil.getBaseConnection();

        String queryName = "query1";
        String exchangeName = "directExchange";
        //2获得通道
        final Channel channel = baseConnection.createChannel();

        //3创建队列
        channel.queueDeclare(queryName, false, false, false, null);

        //绑定到交换机 参数1:队列名 参数2:交换器名 参数3:路由键
        channel.queueBind(queryName, exchangeName, "red");
        channel.queueBind(queryName, exchangeName, "grey");
        //一次只能接收一条
        channel.basicQos(1);
        //4接收消息
        //参数:1队列名称 2是否自动确认
        //自动确认值得是,只要接收到消息就可以了。 不自动确认需要手动应答
        channel.basicConsume(queryName, false, new DefaultConsumer(channel) {
            //定义队列消费者
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("body:" + new String(body));
                try {
                    TimeUnit.MILLISECONDS.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //手动应答  第二个参数:确认所有消息
                channel.basicAck(envelope.getDeliveryTag(), false);
            }

        });
    }
}

5主题模式-topic

image-20200807152043138

1
2
3
4
5
6
7
8
路由key进行模糊匹配 
符号“#”表示匹配一个或多个词
符号*表示匹配一个词
  
//举个例子
  provider提供者发送 路由键为xiangyang.name.lusenlin
  接收1 xiangyang.* 无法接收 因为它只匹配一个
  接收2 xiangyang.# 可以接收 因为它匹配多个

image-20200807161727985

1提供方provider

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package com.lx.exchange_topic;

import com.lx.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class TopicProvider {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1通过工具类获得连接
        Connection baseConnection = ConnectionUtil.getBaseConnection();

        String exchangeName = "topicExchange";

        //2获得通道
        Channel channel = baseConnection.createChannel();
//3声明交互器 类型为topic
        channel.exchangeDeclare(exchangeName, "topic");

        //4发布消息
        for (int i = 0; i < 10; i++) {
            String message = "I am a programmer" + i;
            String routerKey;
            if(i%2==0){
                routerKey="name.da.a1.a2";
                message="red"+i;
            }else {
                routerKey="name.grey.red";
                message="grey.red"+i;
            }
            //第一个参数是交换器名,第二个是路由键
            channel.basicPublish(exchangeName, routerKey, null, message.getBytes());
        }
        System.out.println("发送消息");
        //5关闭通道和连接
        channel.close();
        baseConnection.close();
    }
}

2消费方consumer

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package com.lx.exchange_topic;

import com.lx.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class TopicConsumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1通过工具类获得连接
        Connection baseConnection = ConnectionUtil.getBaseConnection();

        String queryName = "query1";
        String exchangeName = "topicExchange";
        //2获得通道
        final Channel channel = baseConnection.createChannel();

        //3创建队列   5个参数详解
        //name:队列名字
        // durable 是否持久化(类似快照)
        //exclusive 是否排他 作用:1connection.close()该队列是否会自动删除 2如果是排外的,会对当前队列加锁,其他通道channel是不能访问的
        //autoDelete自动删除队列 当最后一个消费者断开后,删除
        //其它相关参数
        channel.queueDeclare(queryName, false, false, false, null);

        //绑定到交换机 参数1:队列名 参数2:交换器名 参数3:路由键
        channel.queueBind(queryName, exchangeName, "name.*");
        //channel.queueBind(queryName, exchangeName, "*.add");
        //一次只能接收一条
        channel.basicQos(1);
        //4接收消息
        //参数:1队列名称 2是否自动确认
        //自动确认值得是,只要接收到消息就可以了。 不自动确认需要手动应答
        channel.basicConsume(queryName, false, new DefaultConsumer(channel) {
            //定义队列消费者
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("body:" + new String(body));
                try {
                    TimeUnit.MILLISECONDS.sleep(300);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //手动应答
                channel.basicAck(envelope.getDeliveryTag(), false);
            }

        });
    }
}

第三章-整合springboot

1简单模式

1加入jar

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <parent>
        <artifactId>spring-boot-parent</artifactId>
        <groupId>org.springframework.boot</groupId>
        <version>2.2.4.RELEASE</version>
    </parent>

    <groupId>org.example</groupId>
    <artifactId>rabbit_springboot</artifactId>
    <version>1.0-SNAPSHOT</version>
<dependencies>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
</dependencies>

</project>

2yml配置文件

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
server:
  port: 8420

spring:
  application:
    name: springboot_rabbit

  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest

3业务类–生产者:

启动类,不需要过多配置。


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
package com.lsl;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class Producer {
    @Autowired
    private AmqpTemplate rabbitTemplate;

    @RequestMapping("send")
    public String send() {
        String context = "producer hello ";
        this.rabbitTemplate.convertAndSend("simpleQueue", context);
        return "success";
    }

}

4业务–消费者,另外创建一个项目,加jar、yml

在消费者中需要注意的是,要在启动类进行配置。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package com.lx;

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
public class RabbitSpringBootConsumerApp {
    @Bean
    public Queue queue() {
        return new Queue("lanxin_queue");
    }

    @Bean
    public Queue queue1() {
        return new Queue("simpleQueue");
    }

    @Bean
    public RabbitListenerContainerFactory<?> simpleListenerFactory(ConnectionFactory connectionFactory){
        //创建rabbit监听容器工厂 对 rabbit 进行配置
        SimpleRabbitListenerContainerFactory containerFactory=
                new SimpleRabbitListenerContainerFactory();

        containerFactory.setConnectionFactory(connectionFactory);
        //手动ack 手动应答
        containerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return containerFactory;
    }
    public static void main(String[] args) {
        SpringApplication.run(RabbitSpringBootConsumerApp.class,args);
    }

}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
package com.lx.simpler1;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class Consumer {

    @RabbitListener(queues = "simpleQueue")
    public void simpleConsumer(String msg){
        System.out.println("consumer1接收到:"+msg);
    }
}

2工作模式

1启动类

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package com.lx;

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
public class RabbitSpringBootConsumerApp {
    @Bean
    public Queue queue() {
        return new Queue("lanxin_queue");
    }

    @Bean
    public Queue queue1() {
        return new Queue("simpleQueue");
    }

    @Bean
    public RabbitListenerContainerFactory<?> simpleListenerFactory(ConnectionFactory connectionFactory){
        //创建rabbit监听容器工厂 对 rabbit 进行配置
        SimpleRabbitListenerContainerFactory containerFactory=
                new SimpleRabbitListenerContainerFactory();

        containerFactory.setConnectionFactory(connectionFactory);
        //手动ack 手动应答
        containerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);

        //设置能者多劳(一次只能处理一个),覆盖轮询模式
        containerFactory.setPrefetchCount(1);

        return containerFactory;
    }
    public static void main(String[] args) {
        SpringApplication.run(RabbitSpringBootConsumerApp.class,args);
    }

}

2业务类

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package com.lx.simpler1;


import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeUnit;

@Component
public class Consumer {

    @RabbitListener(queues = "simpleQueue",containerFactory = "simpleListenerFactory")
    public void simpleConsumer(String msg,
                               @Headers Map<String, Object> headers, Channel channel) throws IOException {

        try {
            System.out.println("consumer1接收到:"+msg);
            //消息确认,(deliveryTag,multiple是否确认所有消息)
            channel.basicAck(Long.parseLong(String.valueOf(headers.get(AmqpHeaders.DELIVERY_TAG))),false);
        } catch (Exception e) {
            //消息拒绝(deliveryTag,multiple,requeue拒绝后是否重新回到队列)
            channel.basicNack(Long.parseLong(String.valueOf(headers.get(AmqpHeaders.DELIVERY_TAG))), false, false);
        }
    }

    @RabbitListener(queues = "simpleQueue",containerFactory = "simpleListenerFactory")
    public void simpleConsumer1(String msg,
                               @Headers Map<String, Object> headers, Channel channel) throws IOException {

        try {
            System.out.println("consumer2接收到:"+msg);
            TimeUnit.MILLISECONDS.sleep(2000);

            //消息确认,(deliveryTag,multiple是否确认所有消息)
            channel.basicAck(Long.parseLong(String.valueOf(headers.get(AmqpHeaders.DELIVERY_TAG))),false);
        } catch (Exception e) {
            //消息拒绝(deliveryTag,multiple,requeue拒绝后是否重新回到队列)
            channel.basicNack(Long.parseLong(String.valueOf(headers.get(AmqpHeaders.DELIVERY_TAG))), false, false);
        }
    }

}

3fanout模式

image-20200807113443933

生产者

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package com.lsl;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class Producer {
    @Autowired
    private AmqpTemplate rabbitTemplate;

    @RequestMapping("fanoutSend1")
    public String send1() {
        String context = "producer  fanoutSend1 ";
        String exchangeName="fanoutExchange";
        //两个参数为 队列名 消息
        //三个参数为 交换器名,路由键名,消息
        this.rabbitTemplate.convertAndSend(exchangeName,"", context);
        return "success";
    }

}

消费者

1启动类

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package com.lx;

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
public class FanoutConsumerApp {
    @Bean
    public Queue queue1() {
        return new Queue("fanoutQueue1");
    }

    @Bean
    public Queue queue2() {
        return new Queue("fanoutQueue2");
    }

    //声明一个fanout的交换机
    @Bean
    public FanoutExchange exchange(){
        return new FanoutExchange("fanoutExchange");
    }

    //把队列绑定到交换机
    @Bean
    public Binding bindingQueue1(){
        return BindingBuilder.bind(queue1()).to(exchange());
    }
    @Bean
    public Binding bindingQueue2(){
        return BindingBuilder.bind(queue2()).to(exchange());
    }

    @Bean
    public RabbitListenerContainerFactory<?> simpleListenerFactory(ConnectionFactory connectionFactory){
        //创建rabbit监听容器工厂 对 rabbit 进行配置
        SimpleRabbitListenerContainerFactory containerFactory=
                new SimpleRabbitListenerContainerFactory();

        containerFactory.setConnectionFactory(connectionFactory);
        //手动ack 手动应答
        containerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);

        //设置能者多劳(一次只能处理一个),覆盖轮询模式
        containerFactory.setPrefetchCount(1);

        return containerFactory;
    }
    public static void main(String[] args) {
        SpringApplication.run(FanoutConsumerApp.class,args);
    }

}

2业务类:消费

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package com.lx.fanout;


import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Map;

@Component
public class Consumer {

    @RabbitListener(queues = "fanoutQueue1",containerFactory = "simpleListenerFactory")
    public void simpleConsumer(String msg,
                               @Headers Map<String, Object> headers, Channel channel) throws IOException {

        try {
            System.out.println("fanout consumer1接收到:"+msg);
            //消息确认,(deliveryTag,multiple是否确认所有消息)
            channel.basicAck(Long.parseLong(String.valueOf(headers.get(AmqpHeaders.DELIVERY_TAG))),false);
        } catch (Exception e) {
            //消息拒绝(deliveryTag,multiple,requeue拒绝后是否重新回到队列)
            channel.basicNack(Long.parseLong(String.valueOf(headers.get(AmqpHeaders.DELIVERY_TAG))), false, false);
        }
    }

    @RabbitListener(queues = "fanoutQueue2",containerFactory = "simpleListenerFactory")
    public void simpleConsumer1(String msg,
                               @Headers Map<String, Object> headers, Channel channel) throws IOException {

        try {
            System.out.println("fanout consumer2接收到:"+msg);

            //消息确认,(deliveryTag,multiple是否确认所有消息)
            channel.basicAck(Long.parseLong(String.valueOf(headers.get(AmqpHeaders.DELIVERY_TAG))),false);
        } catch (Exception e) {
            System.out.println("出现了错误 consumer2");
            //消息拒绝(deliveryTag,multiple,requeue拒绝后是否重新回到队列)
            channel.basicNack(Long.parseLong(String.valueOf(headers.get(AmqpHeaders.DELIVERY_TAG))), false, false);
        }
    }

}

4direct模式

这里使用的新方法,三种交换器模式都适用。

image-20200807132047005

生产者

路由键不同,xiangtan、moyang是一个队列 xiangyang是一个队列

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package com.lsl;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class DirectProducer {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    @RequestMapping("directSend1")
    public String send3() {
        String context = "producer  directSend1 ";
        String exchangeName="directExchange";
        //两个参数为 队列名 消息
        //三个参数为 交换器名,路由键名,消息
        this.rabbitTemplate.convertAndSend(exchangeName,"xiangyang", context);
        return "success";
    }
    @RequestMapping("directSend2")
    public String send4() {
        String context = "producer  directSend2 ";
        String exchangeName="directExchange";
        //两个参数为 队列名 消息
        //三个参数为 交换器名,路由键名,消息
        this.rabbitTemplate.convertAndSend(exchangeName,"moyang", context);
        return "success";
    }
    @RequestMapping("directSend3")
    public String send5() {
        String context = "producer  directSend3 ";
        String exchangeName="directExchange";
        //两个参数为 队列名 消息
        //三个参数为 交换器名,路由键名,消息
        this.rabbitTemplate.convertAndSend(exchangeName,"xiangtan", context);
        return "success";
    }
}

消费者

1业务

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package com.lx.direct;


import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Map;

@Component
public class Consumer {

    @RabbitListener(containerFactory = "simpleListenerFactory"
    ,bindings = {
            @QueueBinding(value = @Queue(value = "hubei"),
                    exchange = @Exchange(value = "directExchange", type = "direct"),
                    key = {"xiangyang"}
            )
    }
    )
    public void simpleConsumer(String msg,
                               @Headers Map<String, Object> headers, Channel channel) throws IOException {

        try {
            System.out.println("direct consumer1 湖北 接收到:"+msg);
            //消息确认,(deliveryTag,multiple是否确认所有消息)
            channel.basicAck(Long.parseLong(String.valueOf(headers.get(AmqpHeaders.DELIVERY_TAG))),false);
        } catch (Exception e) {
            //消息拒绝(deliveryTag,multiple,requeue拒绝后是否重新回到队列)
            channel.basicNack(Long.parseLong(String.valueOf(headers.get(AmqpHeaders.DELIVERY_TAG))), false, false);
        }
    }
    

    @RabbitListener(containerFactory = "simpleListenerFactory"
            ,bindings = {
            @QueueBinding(value = @Queue(value = "hunan"),
                    exchange = @Exchange(value = "directExchange", type = "direct"),
                    key = {"xiangtan","moyang"}
            )
    }
    )
    public void simpleConsumer1(String msg,
                               @Headers Map<String, Object> headers,
                                Channel channel) throws IOException {

        try {
            System.out.println("direct consumer1 湖南 接收到:"+msg);
            //消息确认,(deliveryTag,multiple是否确认所有消息)
            channel.basicAck(Long.parseLong(String.valueOf(headers.get(AmqpHeaders.DELIVERY_TAG))),false);
        } catch (Exception e) {
            //消息拒绝(deliveryTag,multiple,requeue拒绝后是否重新回到队列)
            channel.basicNack(Long.parseLong(String.valueOf(headers.get(AmqpHeaders.DELIVERY_TAG))), false, false);
        }
    }


}

2启动类

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.lx;

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
public class DirectConsumerApp {

    @Bean
    public RabbitListenerContainerFactory<?> simpleListenerFactory(ConnectionFactory connectionFactory){
        //创建rabbit监听容器工厂 对 rabbit 进行配置
        SimpleRabbitListenerContainerFactory containerFactory=
                new SimpleRabbitListenerContainerFactory();

        containerFactory.setConnectionFactory(connectionFactory);
        //手动ack 手动应答
        containerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);

        //设置能者多劳(一次只能处理一个),覆盖轮询模式
        containerFactory.setPrefetchCount(1);

        return containerFactory;
    }
    public static void main(String[] args) {
        SpringApplication.run(DirectConsumerApp.class,args);
    }

}

5topic模式

image-20200807152043138

1
2
3
4
5
6
7
8
路由key进行模糊匹配 
符号“#”表示匹配一个或多个词
符号*表示匹配一个词
  
//举个例子
  provider提供者发送 路由键为xiangyang.name.lusenlin
  接收1 xiangyang.* 无法接收 因为它只匹配一个
  接收2 xiangyang.# 可以接收 因为它匹配多个

只需要在direct模式中修改发送方,和消费方的type、key就好。

生产者

1
2
3
4
5
6
7
8
9
@RequestMapping("topicSend1")
public String send3() {
    String context = "producer  topicSend1 ";
    String exchangeName="directExchange";
    //两个参数为 队列名 消息
    //三个参数为 交换器名,路由键名,消息
    this.rabbitTemplate.convertAndSend(exchangeName,"hunan.moyang.a", context);
    return "success";
}

消费者

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
@RabbitListener(containerFactory = "simpleListenerFactory"
        ,bindings = {
        @QueueBinding(value = @Queue(value = "hunan"),
                exchange = @Exchange(value = "directExchange", type = "topic"),
                key = {"hunan.#"}
        )
}
)
public void simpleConsumer1(String msg,
                           @Headers Map<String, Object> headers,
                            Channel channel) throws IOException {

    try {
        System.out.println("direct consumer1 湖南 接收到:"+msg);
        //消息确认,(deliveryTag,multiple是否确认所有消息)
        channel.basicAck(Long.parseLong(String.valueOf(headers.get(AmqpHeaders.DELIVERY_TAG))),false);
    } catch (Exception e) {
        //消息拒绝(deliveryTag,multiple,requeue拒绝后是否重新回到队列)
        channel.basicNack(Long.parseLong(String.valueOf(headers.get(AmqpHeaders.DELIVERY_TAG))), false, false);
    }
}

注意

启动类需要定义队列,设置等等。

当队列信息在生产者中不存在时候,队列还不存在,生产的消息就会丢失。 若队列信息在消费者存在,生产者此时即使没有队列信息,但队列会在消费者创建时创建。

第四章-高级应用

消息确认

生产者确认

简单模式:Channel信道的confirmSelect方法将当前信道设置成了confirm模式

springboot的消息确认

1yml设置

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
server:
  port: 8420

spring:
  application:
    name: springboot_rabbit

  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    #confirm确认类型 correlated相关联的
    publisher-confirm-type: correlated
    #return确认
    publisher-returns: true

    #设置为 true 后 消费者在消息没有被路由到合适队列情况下会被return监听,false会自动删除
    template:
      mandatory: true

如果消息没有到exchange,则confirm回调,ack=false

如果消息到达exchange,则confirm回调,ack=true

exchange到queue成功,则不回调return

exchange到queue失败,则回调return(需设置mandatory=true,否则不回回调,消息就丢了)

2业务消费

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package com.lsl;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class ConfirmProducer  {

    //@Autowired
    private RabbitTemplate rabbitTemplate1;
    @Autowired
    public ConfirmProducer(RabbitTemplate rabbitTemplate1){
        this.rabbitTemplate1=rabbitTemplate1;
        this.rabbitTemplate1.setReturnCallback(returnCallback);
        this.rabbitTemplate1.setConfirmCallback(confirmCallback);

        this.rabbitTemplate1.setMandatory(true); //设置为 true 后 消费者在消息没有被路由到合适队列情况下会被return监听,而不会自动删除
    }

    final private RabbitTemplate.ConfirmCallback confirmCallback=new RabbitTemplate.ConfirmCallback() {
        public void confirm(CorrelationData correlationData, boolean ack, String s) {
            System.out.println("消息确认confirm: " +(correlationData==null?"回调数据为空":correlationData.getId())  + ",消息为=" + s + ",是否确认:" + ack);
        }
    };

    final private RabbitTemplate.ReturnCallback returnCallback=new RabbitTemplate.ReturnCallback() {
        public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
            System.err.println("return exchange: " + exchange + ", routingKey: "
                    + routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText);
        }
    };

    @RequestMapping("confirmSend")
    public String send() {
        String context = "producer hello ";

        String exchangeName="directExchange";
        //this.rabbitTemplate1.setMandatory(true); //设置为 true 后 消费者在消息没有被路由到合适队列情况下会被return监听,而不会自动删除
        //设置 确认的回调函数对象
        this.rabbitTemplate1.setConfirmCallback(confirmCallback);

        //设置 return的回调函数
        this.rabbitTemplate1.setReturnCallback(returnCallback);

        for (int i = 1; i <= 1; i++) {

            //this.rabbitTemplate1.convertAndSend("simpleQueue", (Object) context,new CorrelationData(UUID.randomUUID().toString()));
            this.rabbitTemplate1.convertAndSend(exchangeName,"hubei.a",context);
        }
        return "confirmSend success";
    }

    @RequestMapping("confirmSend1")
    public String send1() {
        String context = "producer hello ";
        String exchangeName="directExchangesad";


        for (int i = 1; i <= 1; i++) {
				  this.rabbitTemplate1.convertAndSend(exchangeName,"hubei.a",context);
        }
        return "交换器错误confirmSend1 success";
    }

    @RequestMapping("confirmSend2")
    public String send2() {
        String context = "producer hello ";
        String exchangeName="directExchange";
        for (int i = 1; i <= 1; i++) {

            //this.rabbitTemplate1.convertAndSend("simpleQueue", (Object) context,new CorrelationData(UUID.randomUUID().toString()));
            this.rabbitTemplate1.convertAndSend(exchangeName,"adasdhubei.a",context);
        }
        return "队列错误 confirmSend2 success";
    }

}