找回密码
 立即注册
  • QQ空间
  • 回复
  • 收藏

分布式架构核心组件之消息队列RabbitMQ

房凌| 2019-1-23 01:18 阅读 1245 评论 0

一、为什么使用消息队列

1、实现解耦

耦合指不同模块/系统之间相互作用、相互依赖的关系。RabbitMQ以异步的方式解耦系统间的关系,使用者将业务请求发送到Rabbit服务器,然后就可以返回了,Rabbit会确保请求被正确处理,即使遇到网络异常、Rabbit服务器崩溃等特殊场景。针对这些特殊场景,Rabbit提供了各种机制确保其可用性。

2、实现异步

对于大批量的用户操作,同步等待需要很长时间,利用MQ的发布订阅模型,异步处理响应结果可以避免用户不必要的等待。

举一个简单MQ使用场景:使用手机APP实时转账,就采用了MQ低延迟特性,将转账消息投递到MQ中,然后由消费者(对方银行)进行处理,具体到账时间以对方银行收到转账消息,处理完转账操作时间为准。

3、流量消峰

对于像电商网站,每逢店庆、双11等节日会出现瞬间访问量特别大的情况。不能因为系统支持的并发量有限而拒绝部分用户请求,通常做法是承接所有用户请求转变为消息存放到MQ中,由后端业务系统按照顺序进行处理。借助MQ低延迟高吞吐量特性,可以创建多消费者实现负载,从而实现流量消峰。

另外可以利用MQ发布订阅模型可以实现消息广播。

二、RabbitMQ工作模型

RabbitMQ中生产者发送消息、消费者接收消息都需要与Broker建立连接Connection(TCP长连接),为了减少Broker性能损耗,RabbitMQ引入了消息通道/信道Channel,Channel是在应用程序与Broker的Connection连接中创建的虚拟连接,在发送接收消息时直接操作Channel即可,不需要频繁与Broker创建连接。另外Channel还是最重要的编程接口,对交换机Exchange、队列Queue操作的很多API都是对Channel方法的封装。

RabbitMQ消息投递引用了交换机Exchange,当一个消息携带了地址(Routing Key)时,交换机会根据携带的路由关键字发送到不同队列Queue。交换机Exchange和队列Queue的对应关系需要进行事先绑定Binding。队列Queue和交换机是多对多的关系,交换机可以绑定多个队列Queue,队列Queue也可以绑定到不同交换机。

同时为了节省服务器资源,RabbitMQ引入了Virtual Host虚拟机机制,我们可以把Virtual Host虚拟机当做一个小型的RabbitMQ服务器。在同一个Broker中可以创建多个Virtual Host虚拟机,每个Virtual Host虚拟机中都能创建自己的交换机Exchange和队列Queue,以及它们的绑定关系。从而实现了硬件服务器的高效利用以及资源的隔离。

RabbitMQ中核心概念:

1、Server:又称Broker,接收客户端的连接,实现AMQP实体服务。

2、Connection:应用程序与Broker的网络连接。

3、Channel:网络信道,几乎所有的操作都在Channel中进行,包括定义Queue、定义Exchange、绑定Queue与Exchange、发布消息等。Channel是进行消息读写的通道。客户端可以建立多个Channel,每个Channel代表一个会话任务。

4、Message:服务器和应用程序之间传送的消息,由Properties和Body组成。Properties可以对消息进行修饰,比如消息的优先级、延迟等高级特性;Body就是消息体内容。

5、Virtual Host:虚拟地址,用于进行逻辑隔离,最上层的消息路由。一个Virtual Host可以有若干个Exchange和Queue,同一个Virtual host里面不能有相同的Exchange和Queue。

6、Exchange:接收消息的交换机,根据路由关键字转发消息到绑定的队列。RabbitMQ中有三种常用的交换机类型:

6.1、直连交换机direct:通过Binding关键字将Queue队列和直连交换机建立连接。队列Queue绑定直连交换机时,必须指定一个关键字。发送消息时会携带一个路由关键字,RabbitMQ会将路由关键字和绑定关键字进行一个全字符匹配。

channel.basicPublish("My_Direct_Exchange","spring","hello world");

channel.basicPublish("My_Direct_Exchange","struts","hi struts");

6.2、主题交换机topic:绑定关键字时,不需要全字符匹配,可以采用通配符。

* 代表一个单词

# 代表零个或多个单词

channel.basicPublish("My_Direct_Exchange","junior.netty","hello world");

6.3、广播交换机fanout:不再需要任何绑定关键字,交换机会将消息分发到所有队列Queue。

7、Binding:Exchange和Queue之间的虚拟连接,binding中可以包含routing key。

8、Routing key:一个路由规则,虚拟机可用它来确定如何路由一个特定消息。

9、Queue:也称为Message Queue消息队列,保存消息并将它们转发给消费者,多个消费者可以订阅同一个Queue,这时Queue中的消息会被平均分摊给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理。

10、Prefetch count:如果有多个消费者同时订阅同一个Queue中的消息,Queue中的消息会被平摊给多个消费者。这时如果每个消息的处理时间不同,就有可能会导致某些消费者一直在忙,而另外一些消费者很快就处理完手头工作并一直空闲的情况。我们可以通过设置prefetchCount来限制Queue每次发送给每个消费者的消息数,比如我们设置prefetchCount=1,则Queue每次给每个消费者发送一条消息;消费者处理完这条消息后Queue会再给该消费者发送一条消息。

三、Spring Boot集成RabbitMQ

RabbitMQ安装成功后登录RabbitMQ控制台http://192.168.1.112:15672/,设置Exchange、Queue、Routing Key。Spring Boot集成RabbitMQ步骤:

1、引入相关依赖

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2、配置application.yml

## producer配置
spring:
rabbitmq:
addresses: 192.168.1.110:5672
username: wangpf
password: 111111
virtual-host: /
server:
port: 8001
servlet:
context-path: /
## consumer配置
spring:
rabbitmq:
addresses: 192.168.1.110:5672
username: guest
password: guest
virtual-host: /
listener:
simple:
concurrency: 5
acknowledge-mode: manual
max-concurrency: 10
prefetch: 1
server:
port: 8002
servlet:
context-path: /

3、实例代码

生产者RabbitSender

@Component
public class RabbitSender {
//自动注入RabbitTemplate模板类
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private BrokerMessageLogMapper brokerMessageLogMapper;
//回调函数: confirm确认
final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.err.println("correlationData: " + correlationData);
String messageId = correlationData.getId();
if(ack){
//如果confirm返回成功 则进行更新
brokerMessageLogMapper.changeBrokerMessageLogStatus(messageId, Constants.ORDER_SEND_SUCCESS, new Date());
} else {
//失败则进行具体的后续操作:重试 或者补偿等手段
System.err.println("异常处理...");
}
}
};
//发送消息方法调用: 构建自定义对象消息
public void sendOrder(Order order) throws Exception {
// 通过实现 ConfirmCallback 接口,消息发送到 Broker 后触发回调,确认消息是否到达 Broker 服务器,也就是只确认是否正确到达 Exchange 中
rabbitTemplate.setConfirmCallback(confirmCallback);
//消息唯一ID
CorrelationData correlationData = new CorrelationData(order.getMessageId());
rabbitTemplate.convertAndSend("order-exchange", "order.msg", order, correlationData);
}
}

消费者RabbitReceiver

@Component
public class RabbitReceiver {
//配置监听的哪一个队列,同时在没有queue和exchange的情况下会去创建并建立绑定关系
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "order-queue",durable = "true"),
exchange = @Exchange(name="order-exchange",durable = "true",type = "topic"),
key = "order.*"
)
)

//如果有消息过来,在消费的时候调用这个方法
@RabbitHandler
public void onOrderMessage(@Payload Order order, @Headers Map<String,Object> headers, Channel channel) throws IOException {
//消费者操作
System.out.println("---------收到消息,开始消费---------");
/**
* Delivery Tag 用来标识信道中投递的消息。RabbitMQ 推送消息给 Consumer 时,会附带一个 Delivery Tag,
* 以便 Consumer 可以在消息确认时告诉 RabbitMQ 到底是哪条消息被确认了。
* RabbitMQ 保证在每个信道中,每条消息的 Delivery Tag 从 1 开始递增。
*/
Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
/**
* multiple 取值为 false 时,表示通知 RabbitMQ 当前消息被确认
* 如果为 true,则额外将比第一个参数指定的 delivery tag 小的消息一并确认
*/
boolean multiple = false;
//ACK,确认一条消息已经被消费
channel.basicAck(deliveryTag,multiple);
}
}
文章点评