MQ的基本概念
MQ概述
全称Message Queue (消息队列),是在消息的传输过程中保存消息的容器。多用于分布式系统之间进行通信。
- MQ,消息队列,储存消息的中间件
- 分布式系统通信的两种方式:直接 远程调用 和 借助第三方完成间接通信
- 发送方称为生产者,接收方称为消费者
MQ的优势和劣势
优势
应用解耦
使用MQ使得应用间解耦,提升容错性和可维护性
异步提速
提升用户体验和系统吞吐量(单位时间内处理请求的数目)
削峰填谷
劣势
- 可用性降低
- 复杂度提高
- 一致性
使用条件
- 生产者不需要从消费者处获得反馈。引入消息队列之前的直接调用,其接口的返回值应该为空。
- 容许短暂的不一致性
- 确实起到了效果。收益大于管理。
RabbitMQ简介
AMQP,高级消息队列,是一个网络协议,是应用层协议的一个开放标准,为面向消息的中间键设计。
相关概念
快速入门
生产者
//创建连接工厂
ConnectionFactory factory() = new ConnectionFactory;
//设置参数
factory.setHost("xxx");
factory.setPort(5672);
factory.setVirtualHost("/xxx");//虚拟机默认值"/"
factory.setUsername("xxx");
factory.setPassword("xxx");
//创建连接
Connection connection = factory.newConnection();
//创建Channel
Channel channel = connection.createChannel();
//创建队列
queueDeclare();
/*
参数:
1.queue 队列名称
2.durable 是否持久化
3.exclusive:
是否独占.只有一个消费者能够监听
当connection关闭时,是否删除队列
4.autoDelete 是否自动删除,当没有Consumer时自动删除
5.arguments 删除参数
*/
//如果有相同名字的则不创建
channel.queueDeclare("Hello_world",true,false,false,null);
//发送消息
/*
basicPublish
参数:
1.exchange 交换机名称 简单默认自动
2.routingKey 路由名称
3.props 配置信息
4.body 发送消息数据
*/
String body = "hello~";
channel.basicPublish("","Hello_world",null,body.getBytes());
//释放资源
channel.close();
connection.close();
消费者
//创建连接工厂
ConnectionFactory factory() = new ConnectionFactory;
//设置参数
factory.setHost("xxx");
factory.setPort(5672);
factory.setVirtualHost("/xxx");//虚拟机默认值"/"
factory.setUsername("xxx");
factory.setPassword("xxx");
//创建连接
Connection connection = factory.newConnection();
//创建Channel
Channel channel = connection.createChannel();
//创建队列
queueDeclare();
/*
参数:
1.queue 队列名称
2.durable 是否持久化
3.exclusive:
是否独占.只有一个消费者能够监听
当connection关闭时,是否删除队列
4.autoDelete 是否自动删除,当没有Consumer时自动删除
5.arguments 删除参数
*/
//如果有相同名字的则不创建
channel.queueDeclare("Hello_world",true,false,false,null);
//接收消息
/*
basicComsume
参数:
1.queue 队列名称
2.autoAck 是否自动确认
3.callback 回调对象
*/
Consumer consumer = new DefaultConsumer(channel){
回调方法,收到消息后,自动执行
...
}
//不要关闭
RabbitMQ工作模式
Work queues工作队列模式
模式说明
- 在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争关系
- Workqueues对于任务过重或任务较多情况使用工作队列可以提高任务处理速度
Pub/Sub订阅模式
模式说明
生产者
//创建连接工厂
ConnectionFactory factory() = new ConnectionFactory;
//设置参数
factory.setHost("xxx");
factory.setPort(5672);
factory.setVirtualHost("/xxx");//虚拟机默认值"/"
factory.setUsername("xxx");
factory.setPassword("xxx");
//创建连接
Connection connection = factory.newConnection();
//创建Channel
Channel channel = connection.createChannel();
/*
参数
1.exchange 交换机名称
2.type 交换机类型
DIRECT 定向
FANOUT 扇形
TOPIC 通配符
HEADERS 参数匹配
3.durable 是否持久化
4.autoDelete 自动删除
5.internal 内部使用
6.argumrnts 参数
*/
//创建交换机
String exchangesName = "test_fanout";
channel.exchangeDeclare(exchangesName...);
//创建队列
channel.queueDeclare(...);
//绑定队列和交换机
channel.queueBind("队列名称1","交换机名称","路由键");
channel.queueBind("队列名称2","交换机名称","路由键");
//发送消息
channel.basicPublish(exchangeName,"",null,body.getbytes());
//释放资源
channel.close();
connection.close();
消费者
//创建连接工厂
ConnectionFactory factory() = new ConnectionFactory;
//设置参数
factory.setHost("xxx");
factory.setPort(5672);
factory.setVirtualHost("/xxx");//虚拟机默认值"/"
factory.setUsername("xxx");
factory.setPassword("xxx");
//创建连接
Connection connection = factory.newConnection();
//创建Channel
Channel channel = connection.createChannel();
/*
参数
1.exchange 交换机名称
2.type 交换机类型
DIRECT 定向
FANOUT 扇形
TOPIC 通配符
HEADERS 参数匹配
3.durable 是否持久化
4.autoDelete 自动删除
5.internal 内部使用
6.argumrnts 参数
*/
//创建交换机
String exchangesName = "test_fanout";
channel.exchangeDeclare(exchangesName...);
//创建队列
channel.queueDeclare(...);
//绑定队列和交换机
channel.queueBind("队列名称1","交换机名称","路由键");
channel.queueBind("队列名称2","交换机名称","路由键");
//发送消息
channel.basicPublish(exchangeName,"",null,body.getbytes());
//释放资源
channel.close();
connection.close();
Routing路由模式
模式说明
Routing 模式要求队列在绑定交换机时要指定routing key,消息会自动传发到符合的队列
Topics通配符模式
路径变成通配符
SpringBoot工整合RabbitMQ
生产者
引入依赖
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>xxx</version> </parent> <dependencier> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> </dependencier>
RabbitMQConfig.java
@Configuration public class RabbitMQConfig { public static final String EXCHANGE_NAME = "boot_topic_exchange"; public static final String QUEUE_NAME = "boot_queue"; //交换机 @Bean("bootExchange") public Exchange boot Exchange(){ return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build(); } //队列 @Bean("bootQueue") public Queue bootQueue(){ return QueueBuilder.durable(QUEUE_NAME).build(); } //队列和交换机绑定 @Bean public Binding bindQueueExchange(@Qualifier("bootQueue")Queue queue,@Qualifier(bootExchange)Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(boot.#).noargs(); } }
消费者
RabbitMQListener
@Component public class RabbitMQListener { @RabbitListener(queues = "boot_queue") public void ListenerQueue(Message message) { sout(message); } }
RabbitMQ高级
RabbitMQ高级特性
- 消息可靠性投递
- Consumer ACK
- 消费端限流
- TTL
- 死信队列
- 延迟队列
- 日志与监控
- 消息可靠性分析与追踪
- 管理
消息的可靠投递
rabbit mq整个消息投递的路径为
producer -> rabbitmq broker -> exchange -> queue -> consumer
RabbitMQ提供两种方式来控制消息的投递可靠性模式
confirm 确认模式
消息从preducer到exchange则会返回一个confirmCallback
return 返回模式
消息exchange到queue投递失败返回returnCallback
开启确认模式
- 确认模式开启:ConnectionFactory中开启publisher-confirm=“true”
- 在rabbitTemplate定义回调函数
开启回退模式
- 开启回退模式
- 设置ReturncallBack
- 设置Exchange处理消息的模式
Consumer ACK
ack指acknowledge
确认方式
- 自动确认
- 手动确认
- 根据异常情况确认
Consumer ACK机制
- 设置手动签收
- 让监听器实现ChannelAwareMessageListener接口
- 如果消息成功处理,则调用channel的basicAck()签收
- 否则拒绝签收,broker重新发送给Consumer
消费端限流
限流机制
- 确保ack机制为手动确认
- listener-contaioner配置属性
TTL
私信队列
称为死信队列
- 队列消息长度到达限制
- 消费者拒收消费消息
- 原队列存在消息过期设置,到达超时时间未被消费
延迟队列
在RabbitMQ中没有延迟队列功能
但可以用TTL+死信队列组合实现延迟队列效果
日志与监控
默认存放路径: /var/log/rabbitmq/rabbit@xx.log
rabbitmqctl管理和监控
消息追踪
另一种方式:rabbitmq_tracing
RabbitMQ应用问题
- 消息可靠性保障
- 消息幂等型处理
消息可靠性保障-消息补偿
息幂等型处理
消息可靠性保障-消息补偿
RabbitMQ集群搭建
- 高可用集群搭建
集群搭建原理
使用Haproxy
评论区
欢迎你留下宝贵的意见,昵称输入QQ号会显示QQ头像哦~