消息队列

MQ全称为Message Queue,即消息队列。“消息队列”是在消息的传输过程中保存消息的容器。它是典型的:生产者、消费者模型。生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,这样就实现了生产者和消费者的解耦。

你为什么使用消息队列

解耦

看这么个场景。A 系统发送数据到 BCD 三个系统,通过接口调用发送。如果 E 系统也要这个数据呢?那如果 C 系统现在不需要了呢?A 系统负责人几乎崩溃……

在这个场景中,A 系统跟其它各种乱七八糟的系统严重耦合,A 系统产生一条比较关键的数据,很多系统都需要 A 系统将这个数据发送过来。A 系统要时时刻刻考虑 BCDE 四个系统如果挂了该咋办?要不要重发,要不要把消息存起来?头发都白了啊!

如果使用 MQ,A 系统产生一条数据,发送到 MQ 里面去,哪个系统需要数据自己去 MQ 里面消费。如果新系统需要数据,直接从 MQ 里消费即可;如果某个系统不需要这条数据了,就取消对 MQ 消息的消费即可。这样下来,A 系统压根儿不需要去考虑要给谁发送数据,不需要维护这个代码,也不需要考虑人家是否调用成功、失败超时等情况。

总结:通过一个 MQ,Pub/Sub 发布订阅消息这么一个模型,A 系统就跟其它系统彻底解耦了。

面试技巧:你需要去考虑一下你负责的系统中是否有类似的场景,就是一个系统或者一个模块,调用了多个系统或者模块,互相之间的调用很复杂,维护起来很麻烦。但是其实这个调用是不需要直接同步调用接口的,如果用 MQ 给它异步化解耦,也是可以的,你就需要去考虑在你的项目里,是不是可以运用这个 MQ 去进行系统的解耦。在简历中体现出来这块东西,用 MQ 作解耦。

异步

再来看一个场景,A 系统接收一个请求,需要在自己本地写库,还需要在 BCD 三个系统写库,自己本地写库要 3ms,BCD 三个系统分别写库要 300ms、450ms、200ms。最终请求总延时是 3 + 300 + 450 + 200 = 953ms,接近 1s,用户感觉搞个什么东西,慢死了慢死了。用户通过浏览器发起请求,等待个 1s,这几乎是不可接受的。

一般互联网类的企业,对于用户直接的操作,一般要求是每个请求都必须在 200 ms 以内完成,对用户几乎是无感知的。

如果使用 MQ,那么 A 系统连续发送 3 条消息到 MQ 队列中,假如耗时 5ms,A 系统从接受一个请求到返回响应给用户,总时长是 3 + 5 = 8ms,对于用户而言,其实感觉上就是点个按钮,8ms 以后就直接返回了,爽!网站做得真好,真快!

削峰

每天 0:00 到 12:00,A 系统风平浪静,每秒并发请求数量就 50 个。结果每次一到 12:00 ~ 13:00 ,每秒并发请求数量突然会暴增到 5k+ 条。但是系统是直接基于 MySQL 的,大量的请求涌入 MySQL,每秒钟对 MySQL 执行约 5k 条 SQL。

一般的 MySQL,扛到每秒 2k 个请求就差不多了,如果每秒请求到 5k 的话,可能就直接把 MySQL 给打死了,导致系统崩溃,用户也就没法再使用系统了。

但是高峰期一过,到了下午的时候,就成了低峰期,可能也就 1w 的用户同时在网站上操作,每秒中的请求数量可能也就 50 个请求,对整个系统几乎没有任何的压力。

如果使用 MQ,每秒 5k 个请求写入 MQ,A 系统每秒钟最多处理 2k 个请求,因为 MySQL 每秒钟最多处理 2k 个。A 系统从 MQ 中慢慢拉取请求,每秒钟就拉取 2k 个请求,不要超过自己每秒能处理的最大请求数量就 ok,这样下来,哪怕是高峰期的时候,A 系统也绝对不会挂掉。而 MQ 每秒钟 5k 个请求进来,就 2k 个请求出去,结果就导致在中午高峰期(1 个小时),可能有几十万甚至几百万的请求积压在 MQ 中。

这个短暂的高峰期积压是 ok 的,因为高峰期过了之后,每秒钟就 50 个请求进 MQ,但是 A 系统依然会按照每秒 2k 个请求的速度在处理。所以说,只要高峰期一过,A 系统就会快速将积压的消息给解决掉。

消息队列有什么优缺点

优点上面已经说了,就是在特殊场景下有其对应的好处解耦异步削峰

缺点有以下几个:

  • 系统可用性降低
    系统引入的外部依赖越多,越容易挂掉。本来你就是 A 系统调用 BCD 三个系统的接口就好了,人 ABCD 四个系统好好的,没啥问题,你偏加个 MQ 进来,万一 MQ 挂了咋整,MQ 一挂,整套系统崩溃的,你不就完了?如何保证消息队列的高可用,可以点击这里查看
  • 系统复杂度提高
    硬生生加个 MQ 进来,你怎么保证消息没有重复消费?怎么处理消息丢失的情况?怎么保证消息传递的顺序性?头大头大,问题一大堆,痛苦不已。
  • 一致性问题
    A 系统处理完了直接返回成功了,人都以为你这个请求就成功了;但是问题是,要是 BCD 三个系统那里,BD 两个系统写库成功了,结果 C 系统写库失败了,咋整?你这数据就不一致了。

所以消息队列实际是一种非常复杂的架构,你引入它有很多好处,但是也得针对它带来的坏处做各种额外的技术方案和架构来规避掉,做好之后,你会发现,妈呀,系统复杂度提升了一个数量级,也许是复杂了 10 倍。但是关键时刻,用,还是得用的。

如何保证消息队列的高可用?

RabbitMQ 是比较有代表性的,因为是基于主从(非分布式)做高可用性的,我们就以 RabbitMQ 为例子讲解第一种 MQ 的高可用性怎么实现。

RabbitMQ 有三种模式:单机模式、普通集群模式、镜像集群模式。

单机模式

单机模式,就是 Demo 级别的,一般就是你本地启动了玩玩儿的😄,没人生产用单机模式。

普通集群模式(无高可用性)

普通集群模式,意思就是在多台机器上启动多个 RabbitMQ 实例,每个机器启动一个。你创建的 queue,只会放在一个 RabbitMQ 实例上,但是每个实例都同步 queue 的元数据(元数据可以认为是 queue 的一些配置信息,通过元数据,可以找到 queue 所在实例)。你消费的时候,实际上如果连接到了另外一个实例,那么那个实例会从 queue 所在实例上拉取数据过来。

这种方式确实很麻烦,也不怎么好,没做到所谓的分布式,就是个普通集群。因为这导致你要么消费者每次随机连接一个实例然后拉取数据,要么固定连接那个 queue 所在实例消费数据,前者有数据拉取的开销,后者导致单实例性能瓶颈

而且如果那个放 queue 的实例宕机了,会导致接下来其他实例就无法从那个实例拉取,如果你开启了消息持久化,让 RabbitMQ 落地存储消息的话,消息不一定会丢,得等这个实例恢复了,然后才可以继续从这个 queue 拉取数据。

所以这个事儿就比较尴尬了,这就没有什么所谓的高可用性这方案主要是提高吞吐量的,就是说让集群中多个节点来服务某个 queue 的读写操作。

镜像集群模式(高可用性)

这种模式,才是所谓的 RabbitMQ 的高可用模式。跟普通集群模式不一样的是,在镜像集群模式下,你创建的 queue,无论元数据还是 queue 里的消息都会存在于多个实例上,就是说,每个 RabbitMQ 节点都有这个 queue 的一个完整镜像,包含 queue 的全部数据的意思。然后每次你写消息到 queue 的时候,都会自动把消息同步到多个实例的 queue 上。

那么如何开启这个镜像集群模式呢?其实很简单,RabbitMQ 有很好的管理控制台,就是在后台新增一个策略,这个策略是镜像集群模式的策略,指定的时候是可以要求数据同步到所有节点的,也可以要求同步到指定数量的节点,再次创建 queue 的时候,应用这个策略,就会自动将数据同步到其他的节点上去了。

这样的话,好处在于,你任何一个机器宕机了,没事儿,其它机器(节点)还包含了这个 queue 的完整数据,别的 consumer 都可以到其它节点上去消费数据。坏处在于,第一,这个性能开销也太大了吧,消息需要同步到所有机器上,导致网络带宽压力和消耗很重!第二,这么玩儿,不是分布式的,就没有扩展性可言了,如果某个 queue 负载很重,你加机器,新增的机器也包含了这个 queue 的所有数据,并没有办法线性扩展你的 queue。你想,如果这个 queue 的数据量很大,大到这个机器上的容量无法容纳了,此时该怎么办呢?

RabbitMQ

RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。

RabbitMQ是一种流行的消息队列软件,因此在进行RabbitMQ面试时,以下是一些可能涉及的常见问题:

RabbitMQ是什么?

RabbitMQ是一款开源、高性能、轻量级的消息队列软件,使用Erlang编写,它可以使不同应用程序之间实现异步通信。

什么是AMQP?

AMQP代表“高级消息队列协议”,是RabbitMQ使用的网络协议。它定义了消息传递的格式和规则,包括如何创建、发送和接收消息。

RabbitMQ中的exchange是什么?

Exchange是RabbitMQ用来接收消息并将其路由到相应队列的组件。它决定了哪个队列会接收到一个特定的消息,这取决于exchange类型和绑定键。

RabbitMQ中的队列有哪些属性?

队列有几个重要的属性,包括名称、持久性、排他性、自动删除和参数。队列名称是唯一标识符,持久性指当RabbitMQ服务器重新启动时,消息是否仍然存在;排他性指只有一个连接可以使用队列,自动删除指在没有消费者时,队列是否被删除。

RabbitMQ中的ack是什么?

Ack代表“确认”,是指当一个消费者成功地接收和处理一条消息时,向RabbitMQ发送的信号。这告诉RabbitMQ消息已成功处理,可以将其从队列中删除。

RabbitMQ中的消费者是如何工作的?

消费者创建一个连接到RabbitMQ服务器,并订阅一个或多个队列。当有消息可用时,消费者使用基本消费方法接收消息。消费者可以使用ack方法向RabbitMQ发送确认信号,以表明已经成功地接收并处理了消息。

RabbitMQ中的负载均衡是什么?

负载均衡是指在多个消费者之间分配任务的过程。RabbitMQ使用循环分发策略来实现负载均衡,该策略通过轮流将消息发送到不同的消费者来平均分配负载。

RabbitMQ中的流量控制是什么?

流量控制是一种机制,用于确保生产者和消费者之间的速度匹配,以避免过载。RabbitMQ中的流量控制是通过基于内存和磁盘空间的限制来实现的,以确保队列不会被过载。

RabbitMQ的工作原理

组成部分说明:

  • Broker:消息队列服务进程,此进程包括两个部分:Exchange和Queue
  • Exchange:消息队列交换机,按一定的规则将消息路由转发到某个队列,对消息进行过虑。
  • Queue:消息队列,存储消息的队列,消息到达队列并转发给指定的
  • Producer:消息生产者,即生产方客户端,生产方客户端将消息发送
  • Consumer:消息消费者,即消费方客户端,接收MQ转发的消息。

生产者发送消息流程:

1、生产者和Broker建立TCP连接。

2、生产者和Broker建立通道。

3、生产者通过通道消息发送给Broker,由Exchange将消息进行转发。

4、Exchange将消息转发到指定的Queue(队列)

消费者接收消息流程:

1、消费者和Broker建立TCP连接

2、消费者和Broker建立通道

3、消费者监听指定的Queue(队列)

4、当有消息到达Queue时Broker默认将消息推送给消费者。

5、消费者接收到消息。

6、ack回复

消息丢失的三种情况

第一种:生产者弄丢了数据。生产者将数据发送到 RabbitMQ 的时候,可能数据就在半路给搞丢了,因为网络问题啥的,都有可能。

第二种:RabbitMQ 弄丢了数据。MQ还没有持久化自己挂了
第三种:消费端弄丢了数据。刚消费到,还没处理,结果进程挂了,比如重启了。

RabbitMQ消息丢失解决方案

如果需要保证消息在整条链路中不丢失,那就需要生产端、mq自身与消费端共同去保障。

生产端:对生产的消息进行状态标记,开启confirm机制,依据mq的响应来更新消息状态,使用定时任务重新投递超时的消息,多次投递失败进行报警。

mq自身:开启持久化,并在落盘后再进行ack。如果是镜像部署模式,需要在同步到多个副本之后再进行ack。

消费端:开启手动ack模式,在业务处理完成后再进行ack,并且需要保证幂等。

通过以上的处理,理论上不存在消息丢失的情况,但是系统的吞吐量以及性能有所下降。

在实际开发中,需要考虑消息丢失的影响程度,来做出对可靠性以及性能之间的权衡。

消息丢失

消息丢失通常是由于生产者未正确发送消息或消费者未能确认处理消息而导致。为了避免这种情况,可以采取以下措施:

  • 确保生产者将消息标记为“持久性”,这样即使RabbitMQ服务器重新启动,也不会丢失这些消息。在使用Java客户端时,可以通过设置MessageProperties.PERSISTENT_TEXT_PLAIN属性来实现该功能:
1
channel.basicPublish(exchangeName, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
  • 确保消费者正确地处理并确认每个接收到的消息。如果没有确认,RabbitMQ会认为该消息未被成功处理,因此可能会重新发送该消息。可以在消费者代码中使用basicAck方法显式确认消息:
1
2
3
4
channel.basicConsume(queueName, false, (consumerTag, delivery) -> {
// 处理消息
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
});
  • 在适当的情况下,使用可靠性传输(RTP)模式,该模式提供了更强的保证,包括消息不会丢失、不会重复等。在使用Java客户端时,可以通过设置ConnectionFactory.setPublisherConfirms(true)和ConnectionFactor.setPublisherReturns(true)属性来启用该功能。

消息堆积

消息堆积通常是由于队列容量不足、消费者处理速度较慢或负载不均衡等问题导致的。为了避免这种情况,可以采取以下措施:

  • 增加队列容量,以便可以容纳更多的消息。在使用Java客户端时,可以通过设置Queue.DeclareOk构造函数的x-max-length参数来实现该功能:
1
2
3
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-max-length", 10000);
channel.queueDeclare(queueName, true, false, false, arguments);
  • 提高消费者的处理速度,以便更快地处理消息。可以增加消费者的数量,并使用负载均衡机制将任务分配给多个消费者,其中每个消费者都可以并行处理一部分任务。
  • 在消费者端实现流控制,以限制消息的发送速率,避免消息堆积。在使用Java客户端时,可以通过设置basicQos方法的prefetchCount参数来实现该功能:
1
2
int prefetchCount = 10;
channel.basicQos(prefetchCount);

消息重复

消息重复通常是由于生产者生成相同的消息id或消费者未正确记录已处理的消息等原因导致的。为了避免这种情况,可以采取以下措施:

  • 确保生产者生成唯一的消息id,并使用幂等性操作来确保消息只能被处理一次。在使用Java客户端时,可以通过设置MessageProperties.setMessageId方法来实现该功能:
1
2
3
4
String messageId = UUID.randomUUID().toString();
MessageProperties properties = new MessageProperties();
properties.setMessageId(messageId);
channel.basicPublish(exchangeName, routingKey, properties, message.getBytes());
  • 在消费者端记录

    • 使用ACK机制

      在确保消息处理成功后,发送ACK确认消息给RabbitMQ,这样RabbitMQ才会删除该消息。如果处理失败,则可以将消息重新放回队列中,让其重新处理。

    • 设置消息的唯一标识符

      为了防止重复消息,可以为每个消息设置一个唯一的ID标识符,并在消息处理之前检查该ID是否已经存在。如果存在,则说明该消息已经被处理过,可以直接忽略。

    • 增加消息过期时间

      为每个消息设置一个过期时间,如果消息在规定时间内未被处理,则将其视为失败并将其删除。这样可以防止消息被无限期地占用,从而导致其他消息得不到处理。

    • 使用幂等性处理

      幂等性处理是指同一操作可以重复执行多次,且结果相同。在消息处理中,可以使用幂等性处理来保证消息只会被处理一次。例如,给数据库插入数据时,可以先查询该数据是否已经存在,如果不存在则插入数据,如果已经存在则直接返回。

    • 避免重复消费

      在消费者端,可以使用消息的全局唯一标识符(UUID)来判断是否已经消费过该消息,避免重复消费。可以将已经消费过的消息ID存储到数据库或缓存中。

具体操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.TimeoutException;

public class RabbitMQDemo {

private static final String QUEUE_NAME = "test_queue";
private static final String EXCHANGE_NAME = "test_exchange";
private static final String ROUTING_KEY = "test_routing_key";

public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

// 声明队列和交换机
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);

// 消费者处理消息
channel.basicConsume(QUEUE_NAME, false, "test_consumer", new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String messageId = properties.getMessageId();
boolean isProcessed = processMessage(body); // 处理消息

if (isProcessed) {
// 确认消息已经处理成功
channel.basicAck(envelope.getDeliveryTag(), false);
} else {
// 将消息重新放回队列中
channel.basicNack(envelope.getDeliveryTag(), false, true);
}
}
});
}

private static boolean processMessage(byte[] body) {
String messageId = UUID.randomUUID().toString();
// 使用消息的ID作为唯一标识符
// 如果该消息已经被处理过,则直接返回false
if (isMessageProcessed(messageId)) {
return false;
}
// 处理消息
// ...
// 处理成功后将消息ID存储到数据库或缓存中
storeProcessedMessage(messageId);
return true;
}

private static boolean isMessageProcessed(String messageId) {
// 查询数据库或缓存中是否已经存在该消息ID
// 如果存在,则说明该消息已经被处理过,返回true
// 如果不存在,则说明该消息还未被处理过,返回false
return false;
}

private static void storeProcessedMessage(String messageId) {
// 将消息ID存储到数据库或缓存中
}
}

0%