介绍: MQ全称为Message Queue ,即消息队列, RabbitMQ是 由erlang语言开发,基于AMQP ( Advanced
Message Queue高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布
式系统开发中应用非常广泛。
深入RabbitMQ
(1)应用场景
- 任务异步处理: 将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理。提高了应用程序的响应时间。
- 程序解耦合: MQ相当于一个中介,生产方通过MQ与消费方交互,它将应用程序进行解耦合。
(2)基本构成
- Broker : 消息队列服务进程,此进程包括两个部分: Exchange和Queue。
- Exchange : 消息队列交换机,按一定的规则将消息路由转发到某个队列,并对消息进行过虑。
- Queue : 消息队列,存储消息的队列,消息到达队列并转发给指定的消费方。
- Producer : 消息生产者, 即生产方客户端,产方客户端将消费发送到MQ。
- Consumer : 消息消费者,即消费方客户端,接收MQ转发在队列Queue中的消息。
发送消息:
1、生产者和Broker建立TCP连接。
2、生产者和Broker建立通道。
3、生产者通过通道消息发送Broker ,由Exchange将消息进行转发。
4、Exchange将消息转发到指定的Queue (队列)。
接收消息:
1、消费者和Broker建立TCP连接。
2、消费者和Broker建立通道。
3、消费者监听指定的Queue(队列)。
4、当有消息到达Queue时Broker默认将消息推送给消费者。
5、消费者接收到消息。
(3)docker安装Rabbit
docker pull rabbitmq:management //拉取rabbit镜像,management是自带管理界面的
容器需要有以下映射端口:
- 4369 :erlang发现口
- 5671,5672 :client端通信口
- 15672:管理界面UI端口
- 15671:UI监听端口
- 25672 :server间内部通信口
docker run -di --name=tensquare_rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 15671:15671 -p 15672:15672 -p 25672:25672 rabbitmq:management //生成容器并启动
访问端口UI管理界面:http://192.168.0.104:15672/
我的ip地址192.168.0.104,默认登陆账号:guest,密码:guest
2、基础操作rabbitMq
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("60.205.231.39");
factory.setPort(5672);
connection = factory.newConnection();
获取连接
(1)生产者消费
生产者生产信息
Channel channel = connection.createChannel(); //生成管道
channel.queueDeclare("rabbittext",true,false,false,null);
生成一个队列
1、QUEUE: 队列的名称。
2、odurable: 消息是否持久化,true持久化会保存到我们的磁盘,保证服务器重启没有被消费的信息不会丢失。
3、exclusive: 如果一个队列声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。
4、autoDelete: 是否自动删除。
5、args: 相关参数。
channel.basicPublish("", "rabbittext", null, ("我爱你"+i).getBytes()); //往队列中添加消息
1、exchange: 交换机名称
2、routingKey: 消息队列的名称
3、BasicProperties: 配置信息
4、body: 有发送的信息的字节数组
此时我们就可以实现往队列中添加数据了。但是消息是否添加到了队列中,我们不能得到任何信息。
生产者消息确认机制
对信管(channel)设置事务,三步走:
1、channel.txSelect()声明事务
2、channel.txComment()提交事务
3、channel.txRollback()回滚事务
我们可以进行测试这种事非常消耗时间的,所以我们推荐使用==confirm==来实现消息的确定。
- 事务保证消息的确定和confirm来保证我们只能同时使用一个。
我们使用管道监听来实现。
channel.confirmSelect(); //开启confirm去顶消息机制
for(int i=1;i<=10;i++) {
channel.basicPublish("", "rabbittext", null, ("我爱你"+i).getBytes()); //生成一个生产者
}
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long l, boolean b) throws IOException {
System.out.println(l+"消息处理成功!");
}
@Override
public void handleNack(long l, boolean b) throws IOException {
System.out.println(l+"消息处理失败!");
}
});
监听器是异步的,并且是批量返回结果。
4消息处理成功!
3消息处理成功!
10消息处理成功!
(2)消费者
消费者接收信息
channel.basicConsume("rabbittext",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println("接收到一个消息: " + message);
}
});
channel.basicConsume的参数介绍:
**1、queue:**队列的名称
**2、autoAck:**是否自动消息确认
**3、callback:**消息者消费信息
多劳多得
我们设置两个消费者,我们会发现两个生产者是平均的消费这些信息。在真实的开发中我们并不想这样,我们期望的模型是多劳多得。
1、设置消费者消费确定后过后才进行推送
channel.basicQos(1);
2、消息我们关闭自动我确认,false
channel.basicConsume("rabbittext",false,new DefaultConsumer(channel)...)
3、消息接收后我们手动确认,第二个参数表示是一次确定一个
channel.basicAck(envelope.getDeliveryTag(),false);
削峰限流
我们设置队列我们处理不完的信息,进行抛弃。防止过多的请求对系统造成压力。
//设置队列的初始化参数
Map<String, Object> map = new HashMap<String, Object>();
//指定消息队列的长度
map.put("x-max-length",5);
//当队列满时,多余的消息直接拒绝接收,多余的消息被丢弃
map.put("x-overflow", "reject-publish");
channel.queueDeclare("rabbit",true,false,false,map);//· queue :队列名称
此时我们就能实现削峰限流。处理不了的请求就行对其进行抛弃。
(4)springboot整合Rabbit
1.创建消息队列:
2.案例前配置:
//子工程引入springboot和rabbit整合包
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
application.yml配置:
spring:
rabbitmq:
host: 192.168.0.104
3.Rabbit直接模式(Direct):
我们需要将消息发给唯一一个节点时使用这种模式,这是最简单的一种形式。
1.使用的rabbit默认的交换机Exchange(为空字符串)。
2.不需要进行任何的交换机和队列绑定(binding)操作 。
3.消息传递时需要一个“RouteKey”,可以简单的理解为要发送到的队列名字。
1)配置生产者
//生产者controller层
@RestController
@RequestMapping("/send")
public class productController {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/message/{msg}")
public String sendmessge(@PathVariable String msg){
rabbitTemplate.convertAndSend("dl",msg); //发送至dl队列
return "发送成功";
}
}
浏览器命中http://localhost:8080/send/message/123
发送消息***123***到***dl***队列中,返回发送成功,我们可以看到rabbitUI界面显示:
2)配置消费者
生产者和消费者是两个不同的项目
@Component
@RabbitListener(queues = "dl") //监听dl队列
public class oneConsumer {
@RabbitHandler
public void getMessage(String message){
System.out.println(message);
}
}
我们两个项目都启动起来,命中http://localhost:8080/send/message/666
看消费者的命令窗口:
4.rabbit分裂模式(Fanout)
当我们需要将消息一次发给多个队列时,需要使用这种模式。任何发送到Exchange的消息都会被转发到与该Exchange绑定(Binding)的所有Queue上。
1.这种模式不需要RouteKey
2.这种模式需要提前将Exchange与Queue进行绑定,一个Exchange可以绑定多个Queue,一个Queue可以同多个Exchange进行绑定。
3.如果接受到消息的Exchange没有与任何Queue绑定,则消息会被抛弃。
添加交换机:
点击交换机exchange,进行绑定队列:
代码实现:
生产者修改:
@RequestMapping("/fanoutmessage/{msg}")
public String sendmessges(@PathVariable String msg){
rabbitTemplate.convertAndSend("dlexchange","",msg);
return "发送成功";
}
命中http://localhost:8080/send/fanoutmessage/666我们得到结果:
5.rabbit主题模式(Topic)
任何发送到TopicExchange的消息都会被转发到所有关心RouteKey中指定话题的Queue 上
1.这种模式较为复杂,简单来说,就是每个队列都有其关心的主题,所有的消息都带有一个“标题”(RouteKey),Exchange会将消息转发到所有关注主题能与RouteKey模糊匹配的队列。
2.这种模式需要RouteKey,也许要提前绑定Exchange与Queue。
3.在进行绑定时,要提供一个该队列关心的主题,如“#.log.#”表示该队列关心所有涉及log的消息(一个RouteKey为”MQ.log.error”的消息会被转发到该队列)。
4.“#”表示0个或若干个关键字,""表示一个关键字。如“log.”能与“log.warn”匹配,无法与“log.warn.timeout”匹配;但是“log.#”能与上述两者匹配。
5.同样,如果Exchange没有发现能够与RouteKey匹配的Queue,则会抛弃此消息
我们绑定了3个队列:
@RequestMapping("/topicmessage/{msg}")
public String sendmessgess(@PathVariable String msg){
System.out.println(msg);
rabbitTemplate.convertAndSend("topicexchange","dl",msg);//主题模式
return "主题模式发送成功";
}
命中http://localhost:8080/send/topicmessage/666
可以不断地更改convertAndSend()方法的参数,让符合表达式的消费者接收到信息。
生产者发来了信息,我是一号:666
生产者发来了信息,我是二号:666
案例代码地址:案例代码地址