消息队列是什么?

使用队列来进行各模块之间异步通信的中间件。

image-20221031150325422

消息对列有什么优点和缺点?为什么使用mq?

  • 优点:(为什么使用mq?)

    • 解耦,便于维护。

    • 削峰、整流,对流量进行控制。

    • 异步处理,提高响应速度。

  • 缺点:

    • 系统可用性降低,多了个容易出现问题的地方,如果mq挂了就麻烦,所以需要mq的高可用。
    • 系统复杂度提高
    • 一致性问题,异步的肯定会有这样的问题

如何保证消息不被重复消费?或者说,如何保证消息消费时的幂等性?

这个并不靠MQ的机制来保证,是靠开发来保证的,要看具体的业务。常见的措施有:

  • 写数据库时添加唯一索引控制重复插入。

  • 让每个消息都带有唯一标识,放到redis中判断是否已经消费过了。

  • … 可以根据具体的MQ的特性来设计方案。

ActiveMQ、RabbitMQ、RocketMQ、Kafka对比

ActiveMQ RabbitMQ RocketMQ Kafka
公司/社区 Apache Rabbit 阿里 Apache
开发语言 Java Erlang Java Scala&Java
协议支持 OpenWire,STOMP,REST,XMPP,AMQP AMQP,XMPP,SMTP,STOMP 自定义协议 自定义协议
可用性 一般
单机吞吐量 一般 非常高
消息延迟 毫秒级 微秒级 毫秒级 毫秒以内
消息可靠性 一般 一般
  • ActiveMQ,出现得比较早,已经不满足如今的业务需求,不推荐用
  • RabbitMQ,迭代稳定,社区活跃,但是是Erlang语言,很难深入研究它或做源码定制。适用技术实力较为一般的中小型公司
  • RocketMQ,阿里出品,经过阿里业务的考验,java写的,可深入研究和做定制。适用基础架构研发实力较强的大型公司
  • Kafka,大数据领域的行业标准。

追求可用性:Kafka、RocketMQ、RabbitMQ

追求可靠性:RabbitMQ、RocketMQ

追求吞吐能力:RocketMQ、Kafka

追求消息低延迟:RabbitMQ、Kafka

RabbitMQ

RabbitMQ是AMQP(高级消息队列协议)的erlang语言实现。

RabbitMQ特点?

  • 可靠性:使用一些机制来保证传输可靠性,如持久化、消费者消息确认及发布者消息确认等。
  • 灵活的路由:在消息进入队列之前,通过交换器来路由消息。对于典型的路由功能,RabbitMQ己经提供了一些内置的交换器来实现。针对更复杂的路由功能,可以将多个交换器绑定在一起,也可以通过插件机制来实现自己的交换器。
  • 扩展性:可搭建集群,可动态扩展集群中的节点。
  • 高可用性:可搭建镜像集群,在部分节点出现问题的情况下队列仍然可用。
  • 多种协议:除了原生支持AMQP协议,还支持STOMP,MQTT等多种消息中间件协议。
  • 多语言客户端:几乎支持所有常用语言,比如Java、Python、Ruby、PHP、C#、JavaScript等。
  • 管理界面:提供了一个易用的用户界面,使得用户可以监控和管理消息、集群中的节点等。
  • 插件机制:RabbitMQ提供了许多插件,以实现从多方面进行扩展,当然也可以编写自己的插件。

AMQP的三大组件?

  • exchange 交换机,负责将消息路由到相应的队列上。需要注意,它只负责消息的转发,不负责存储(这是大概念来说的,像延迟队列DelayExchange就是通过交换机来存储到内存或磁盘中)。
  • queue 队列,存储消息于内存或磁盘中。
  • binding 绑定,告诉交换机该把消息路由到哪个队列的一套规则。

可以发现rabbitmq就实现了amqp的三大组件,binding则是通过routing key来实现

image-20221031150152752

RabbitMQ的五大消息队列工作模式?

Basic 简单模式

简单模式是没有交换机的,只有一个生产者、一个队列和一个消费者。最最最基础,没啥好说的。

image-20221031150934911

Work 工作模式

简单模式下的消费者的处理速度会比不上生产者的生产速度,队列里就会堆积消息。工作模式下,有一个生产者、一个队列,可以有多个消费者监听同一个队列,提高消费速度。

image-20221031151148178

Pub/Sub 发布订阅模式

多了交换机的角色,可通过交换机来决定消息该发给哪些队列,具体如何操作,取决于Exchange的类型。Exchange有以下3种类型:

  • Fanout:广播,将消息交给所有绑定到交换机的队列。

  • Direct:定向,把消息交给符合指定routing key 的队列。每个消息发给交换机的时候都会带上一个routing key,每个队列也会设置自己的routing key。

  • Topic:主题/通配符,把消息交给routing key符合通配符规则的队列:

    Routingkey一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: xbw.abc。通配符规则:

    • #:匹配一个或多个词,如item.#:能够匹配item.spu.insert 或者 item.spu

    • *:匹配不多不少正好1个词,如item.*:只能匹配item.spu

后面的路由模式和主题模式与Fanout、Direct、Topic其实不用区分得太开,其实就是交换机的路由类型有三种而已。所以后面两种模式不讲了。

image-20221031152416072

Routing 路由模式

Topic 主题模式

RabbitMQ中消息有哪几种状态?

  • alpha: 消息内容(包括消息体、属性和 headers) 和消息索引都存储在内存中 。

  • beta: 消息内容保存在磁盘中,消息索引保存在内存中。

  • gamma: 消息内容保存在磁盘中,消息索引在磁盘和内存中都有 。

  • delta: 消息内容和索引都在磁盘中

RabbitMQ的消息分发机制是什么?(队列发送给消费者的机制)

消息分发对应的情景是有两种:

  • Round-robin 轮询(默认),平均下来每个消费者拿到一样多的消息,不顾虑消息者处理消息的能力。
  • Fair 公平分发,通过设置 prefetchCount消息预取数)参数来实现的。每个Consumer在某一时间段内最多处理prefetchcount个Message,在收到Consumer的ack前,队列不会将新的Message分发给它。考虑了消息者处理消息的能力。

RabbitMQ 如何保证消息的有序性?

消息出现乱序的原因是一个队列有多个消费者消费,每个消费者谁先执行完事不确定的。RabbitMQ本身没这种机制,需要自己去设计方案,比如说:

  • 保证消息发送的有序性,就能保证在队列里是有序的(这是大前提)
  • 保证一组需要有序的消息都发送到同一个队列,然后由唯一的一个消费者处理。
  • 保证一个队列只包含一个消费者,然后这个 consumer 内部用内存队列做排队,然后分发给底层不同的 worker 来处理。

RabbitMQ 如何确保消息传输的可靠性?(怎样保证消息不被传丢?)

针对3大阶段,每个阶段都有对应的机制来确保可靠传输:

生产者发送消息到MQ的阶段

  1. 生产者确认机制 Publisher Confirms。消息到达MQ后,MQ会返回一个结果给生产者,告诉生产者你发送是否成功。需要注意,使用这个机制必须要给每个消息指定一个唯一ID,避免冲突。返回结果有两种,都是靠回调函数来返回,是异步模式,会出现消息乱序的问题。(这个机制可以实现幂等生产者,解决消息重复投递到队列的问题)
    • publisher-confirm 生产者确认

      • 消息成功投递到交换机,返回ack

      • 消息未投递到交换机,返回nack

    • publisher-return 生产者回执

      • 消息投递到交换机了,返回ACK,但如果没有路由到队列,就返回路由失败原因。如果没有指定publisher-return的话,消息就会丢弃。(注意不是nack,这个ACK是在消息到达交换机时就返回了,这里的publisher-return是消息从交换机路由到队列的过程出错了才返回)

image-20221031162030086

在回调函数中就能做日志记录,消息重发等操作。

​ spring AMQP对应的代码如下:

spring:
  rabbitmq:
  	# publish-confirm-type,开启publisher-confirm,支持两种类型:
  	# simple:同步等待confirm结果,直到超时 
  	# correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback
    publisher-confirm-type: correlated 
    publisher-returns: true # 开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback
    template:
      mandatory: true # 定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息
// 定义ReturnCallback, publisher-return的回调函数
@Configuration
public class CommonConfig implements ApplicationContextAware {
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        // 获取RabbitTemplate
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        // 设置ReturnCallback
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            // 投递失败,记录日志
            log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}",
                     replyCode, replyText, exchange, routingKey, message.toString());
            // 如果有业务需要,可以重发消息
        });
    }
}
// 定义ConfirmCallback
public void testSendMessage2SimpleQueue() throws InterruptedException {
    // 1.消息体
    String message = "hello, spring amqp!";
    // 2.全局唯一的消息ID,需要封装到CorrelationData中
    CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    // 3.添加callback
    correlationData.getFuture().addCallback(
        result -> { // ------> publisher-confirm ack的回调函数
            if(result.isAck()){
                // 3.1.ack,消息成功
                log.debug("消息发送成功, ID:{}", correlationData.getId());
            }else{
                // 3.2.nack,消息失败
                log.error("消息发送失败, ID:{}, 原因{}",correlationData.getId(), result.getReason());
            }
        },
        // ↓↓↓↓ publisher-confirm nack的回调函数
        ex -> log.error("消息发送异常, ID:{}, 原因{}",correlationData.getId(),ex.getMessage())
    );
    // 4.发送消息
    rabbitTemplate.convertAndSend("task.direct", "task", message, correlationData);

    // 休眠一会儿,等待ack回执
    Thread.sleep(2000);
}
  1. rabbitmq事务机制,跟普通的事务类似,主要也是开启事务、提交事务和回滚事务。它属于同步模式,显然性能就要比上面的异步要差很多。一般不要用事务,因为用mq就已经是异步的思想了,不应该再用同步。

    事务和comfirm两个模式只能使用其中一个,否则会报错。

    spring代码如下:

/**
 * 配置rabbitmq事务管理器
 *
 * @param connectionFactory
 * @return
 */
@Bean("rabbitTransactionManager")
public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory) {
    return new RabbitTransactionManager(connectionFactory);
}

@Bean
public RabbitTemplate transactionRabbitTemplate(ConnectionFactory connectionFactory) {
    return new RabbitTemplate(connectionFactory);
}
@Service
public class TransactionPublisher implements RabbitTemplate.ReturnCallback {
    @Autowired
    private RabbitTemplate transactionRabbitTemplate;

    @PostConstruct
    public void init() {
        // 将信道设置为事务模式
        transactionRabbitTemplate.setChannelTransacted(true);
        transactionRabbitTemplate.setReturnCallback(this);
    }

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        System.out.println("事务 " + message + " 发送失败");
    }

    /**
     * 一般的用法,推送消息
     *
     * @param ans
     * @return
     */
    @Transactional(rollbackFor = Exception.class, transactionManager = "rabbitTransactionManager")
    public String publish(String ans) {
        String msg = "transaction msg = " + ans;
        System.out.println("publish: " + msg);

        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        transactionRabbitTemplate.convertAndSend(MqConstants.exchange, MqConstants.routing, msg, correlationData);
        return msg;
    }
}
  1. 消息补偿。其实就是消息重发,需要配合生产者本地的数据库,需要保存消息在生产者本地,不能直接发给MQ,预防需要消息重发。

消息在MQ内部的阶段

  1. 交换机持久化
  2. 队列持久化
  3. 消息持久化

持久化在磁盘,MQ重启后不会丢失任何信息。rabbitmq默认三者都不持久化,但springAMQP默认三者都持久化,所以开发的时候不用特意指定。

  1. 搭建集群。这个是从可用性出发,后面再说。

MQ传给消费者的阶段

  1. 消费者确认机制。这个机制rabbitmq只是简单的ACK,而springAMQP提供的机制则在框架层面帮我做了,在开发时是有用的。

    rabbitmq是阅后即焚机制,即RabbitMQ确认消息被消费者消费后会立刻删除。这靠的是消费者代码中手动地发送Basic.ack,但要是消费者消息还没处理完就宕机了,MQ很有可能会丢弃这个消息。所以SpringAMQP则允许配置三种确认模式,自动帮我们完成消费者确认

    • manual:手动ack,需要在业务代码结束后,调用api发送ack。也不是很可靠,一般不用。

    • auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack。所以使用默认的auto即可。

    • none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除。不可靠,不用它。

# 在配置文件中配置一下就可以了
spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto # manual auto none,默认是auto
  1. 消费失败重试机制。同样是spring提供的机制,rabbitmq没有。

    当消费者出现异常nack后,消息会不断requeue(重入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,无限循环,导致mq的消息处理飙升,带来不必要的压力。我们可以利用Spring的retry机制,在消费者出现异常时本地重试,而不是无限制的requeue到mq队列。

# 在配置文件中配置一下就可以了
spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000 # 初识的失败等待时长为1秒
          multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 # 最大重试次数
          stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

​ 当重试次数到达最大重试次数后,spring会有3中失败策略来继续保证可靠性,他们都由MessageRecovery接口来处理:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息默认就是这种方式。

  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队。(消息补偿)

  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

    image-20221031102515922.png

​ 具体代码如下:

@Configuration
public class ErrorMessageConfig {
    @Bean
    public DirectExchange errorMessageExchange(){
        return new DirectExchange("error.direct");
    }
    @Bean
    public Queue errorQueue(){
        return new Queue("error.queue", true);
    }
    @Bean
    public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
        return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
    }

    // 指定失败策略为 RepublishMessageRecoverer
    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
    }
}
  1. 死信队列。也就是后面要讲的死信交换机,用于处理死信。

死信交换机?

满足以下其中一个条件的消息就是死信。本质就是不能被消费者完整消费完的消息

  • 消费者返回basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false。
  • 消息过期,超时无人消费。我们可以为消息或者队列设置TTL,只要超过TTL没被消费就会变成死信。
  • 队列满了,无法发送。(如果是有持久化操作的话队列会有page-out操作,不会真的满了不接收消息)

如果一个包含死信的队列配置了dead-letter-exchange属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机称为死信交换机(Dead Letter Exchange,DLX)。

我们只需**为死信交换机绑定一个队列(死信队列)**就能存储这些死信,后续可以交给另一个消费者或者人工介入处理。

image-20221031225716702

延迟队列

只要没有消费者监听正常队列,超时后消息就会交给死信交换机,发送给死信队列,就可以实现延时TTL时间后消费。整条链就变成了一个延迟队列

DelayExchange

不过一般不这么使用,比较麻烦。用rabbitmq的DelayExchange插件,就可以直接创建使用延迟队列了。它的原理是在交换机就持久化到硬盘了。

DelayExchange需要将一个交换机声明为delayed类型。当我们发送消息到delayExchange时,流程如下:

  • 接收消息
  • 判断消息是否具备x-delay属性
  • 如果有x-delay属性,说明是延迟消息,持久化到硬盘,读取x-delay值,作为延迟时间
  • 返回routing not found结果给消息发送者
  • x-delay时间到期后,重新投递消息到指定队列

使用步骤是:

  1. 声明delayed类型的交换机,delayed属性为true
  2. 发送消息时,消息要添加x-delay头,值为超时时间

优先队列?

**优先级高的队列先被消费。**可在创建队列时设置x-max-priority配置优先级。

当消费速度大于生产速度且Broker没有堆积的情况下,优先级显得没有意义。

惰性队列?

队列出现消息堆积问题时,因为内存是有限的,普通的队列间歇性地进行page-out操作:将队列进行阻塞,不接收新消息,将一部分消息刷入磁盘中,用硬盘来对内存“扩容”,其坏处就是该队列的吞吐量很不稳定

惰性队列 lazy-queue则是只基于磁盘存储的,容量要大很多,不会间歇进行page-out,性能稳定。但缺点就是处理性能受限于磁盘IO,比较慢。

RabbitMQ 如何保证高可用的?->RabbitMQ 的集群

RabbitMQ 的集群有两种:

  • 普通集群模式分片集群,将队列分散到集群的各个节点,从而提高整个集群的并发能力(吞吐量)。没提供故障转移等保护机制,不够高可用。

    • 每个 RabbitMQ 实例创建有自己的queue,但是每个实例都共享其他 queue 的元数据( queue 的一些配置信息,通过元数据,可以找到 queue 所在实例,但不包含队列中的消息)。

    • 当访问集群某节点时,如果队列不在该节点,会根据元数据找到真正的queue所在节点,让其传递到当前节点并返回。

    • 若队列所在节点宕机,队列中的消息就会丢失,其他节点就拿不到队列里的消息了。

      image-20221101090127074

  • 镜像集群模式主从集群,普通集群的基础上,添加了主从备份功能,提高集群的数据可用性,才是所谓的高可用。

    • 每个节点创建的 queue,无论元数据还是 queue 里的消息都会完全同步备份到其他节点上,也就是说,每个 RabbitMQ 节点都有这个 queue 的一个完整镜像,包含 queue 的全部数据。

    • 创建队列的节点被称为该队列的主节点,备份到的其它节点叫做该队列的镜像节点。一个队列的主节点可能是另一个队列的镜像节点。

    • 所有操作都是主节点完成,然后同步给镜像节点。

    • 主宕机后,镜像节点会替代成新的主。

    • 优点就是高可用,缺点就是性能开销大。所以rabbitmq3.8出现仲裁队列

      image-20221101090231895

RabbitMQ 消息怎么传输?

由于 TCP 链接的创建和销毁开销较大,且并发数受系统资源限制,会造成性能瓶颈,所以 RabbitMQ 会在TCP连接上建立Channel来传输数据。每条 TCP 链接上的Channel数量没有限制,一个线程使用一个Channel,达到一个 TCP 被多个线程共享的效果,每个Channel在 RabbitMQ 都有唯一的 ID,保证了Channel私有性。

Kafka

主要应用场景

  • 消息队列
  • 大数据/流式数据处理

架构与重要概念

  • Broker(kafka实例):

    • 多个broker组成kafka集群,能实现负载均衡及容错。
    • broker是无状态(Sateless)的,所有broker的状态信息都保存在ZooKeeper集群中,通过ZooKeeper来管理协调所有broker。
  • Producer 生产者:将消息发布到特定的Topic上(推模式)。

  • Topic 主题:

    • topic是一个逻辑概念,Producer 将消息发布到特定的Topic,Consumer 通过订阅特定的 Topic 来消费消息。

    • topic必须要有标识符且唯一,topic没有数量上限。

    • topic中的消息是有统一结构的,一般一个topic包含某一类消息。

    • 消息一旦发送到topic中,就不能被更改。

  • Partition 分区:

    • kafka没有队列的概念,与之对应的是 Partition,把partition理解为存放消息的队列即可。以日志文件来保存数据。
    • 一个 Topic 可以有多个 Partition ,且同一 Topic 下的 Partition 可以分布在不同 Broker 上,即一个 Topic 可横跨多个 Broker。这样能提供比较好的并发能力,将同一topic的流量负载均衡到多台broker上

    image-20230106161729537

  • Offset 偏移量:

    • offset是某个partition中下一条将要发送给Consumer的消息的序号。

    • Kafka默认将offset存储在ZooKeeper中。

  • Replica 副本:每个partition会有多个replica

  • Leader 与 follower:同一个partition的所有replica中会选举一个为leader,其余为follower。生产者和消费者只与 leader 交互,follower只拉取leader的消息进行同步(kafka并不支持读写分离,是主写主读),多副本机制提高了消息存储的安全性。当 leader 发生故障时会从 follower 中选举一个 leader,但 follower 中如果有和 leader 同步程度达不到要求的就不参与 leader 的竞选。

  • Consumer 消费者:从topic拉取若干个partition中的数据(只有拉模式),只要向zk提交offset,就表明消费者消费了某条消息。

  • Consumer Group 消费者组:

    • 可扩展且具有容错性消费者机制

    • 一个消费者组可以包含多个消费者,一个消费者组有一个唯一的ID(group Id)。

    • 组内的消费者一起消费某个topic的所有partition的数据每个partition只能被其中一个固定的消费者消费,除非有消费者故障触发Rebalance机制。所以消费者组对应于topic,消费者对应于partition

05.Kafka架构和工作流程- 李林超博客~ 个人博客

Zookeeper 在 Kafka 中的作用

  • 管理和协调broker,并且存储了Kafka的元数据(例如:有多少topic、partition、consumer)

  • 各个角色的注册和管理。

  • 生产者和消费者的负载均衡。

  • 记录offset。

  • 。。。

img

生产者生产消息流程

  • 生产者先从 zookeeper 的 "/brokers/topics/主题名/partitions/分区名/state"节点找到该 partition 的leader 所在的broker 的broker ID

  • 生产者将消息发给对应broker

  • broker上的leader将消息写入到本地log中

  • follower从leader上拉取消息,写入到本地log,并向leader发送ACK

  • leader接收到所有的ISR中的Replica的ACK后,并向生产者返回ACK。

  • (上述是acks=all的情况)

image-20230107225936120

消费者消费消息流程

  • 采用拉模式拉取数据

  • 每个consumer都可以根据分配策略(默认RangeAssignor),获得要消费的分区

  • 获取到consumer对应的offset(默认从ZK中获取上一次消费的offset)

  • 找到该分区的leader,拉取数据

  • 消费者提交offset

image-20230107230719634

Kafka的数据存储形式

  • 一个topic由多个partition组成
  • 一个partition由多个segment(段,更多是一种逻辑概念并没有相应的实体文件)组成
  • 一个segment由log、index、timeindex 文件组成
    • log:日志数据文件。
    • index:索引文件,是该segment的稀疏索引,根据offset通过索引查找某条数据。
    • timeindex:时间索引文件。

image-20230107231059807

image-20230107231555530

读取日志文件数据流程

  • 根据 offset 首先找到相应的 segment (注意:offset是分区的全局偏移量)

  • 然后根据这个「全局分区offset」找到相对于文件的「segment段的offset」

  • 最后再根据 「segment段的offset」在index文件(稀疏索引)中二分查找数据,读取消息

    image-20230107231803448

​ 如果我们要查找偏移量为 23 的消息,首先通过二分法在偏移量索引文件中找到不大于 23 的最大索引项,即[22, 656],然后从日志分段文件中的物理位置 656 开始顺序查找偏移量为 23 的消息。

image-20240331204732735

幂等生产者如何保证?

解决消息重复投递到队列的问题

  • 在生产者生产消息时,如果出现retry时,有可能会一条消息被发送了多次,如果Kafka不具备幂等性的,就有可能会在partition中保存多条一模一样的消息。image-20230106214924232

  • 为了实现生产者的幂等性,Kafka引入了 Producer ID(PID)和 Sequence Number

    • PID 生产者ID:每个Producer在初始化时,都会分配一个唯一的PID,这个PID对用户来说,是透明的。

    • Sequence Number 序列号:每个生产者(对应PID)发送到指定partition的消息都带有一个从0开始递增的Sequence Number

  • 如果生产的消息的seq 小于等于 partition中对应pid下的消息的seq,不保存信息。

  • 幂等生产者可以选择性开启。

image-20230106221245918

Replica是怎么管理的?

  • AR(Assigned Replicas 已分配的副本):某分区中的所有 Replica
  • ISR(In-Sync Replicas 在同步中的副本):与 Leader 保持一定程度同步的Replica(包括 Leader)
  • OSR(Out-of-Sync Replias 不同步的副本):与 Leader 同步滞后过多的Replica(不包括Leader,一般是故障的副本
  • 都是针对某个partition的概念。
  • AR = ISR + OSR,正常情况下AR=ISR,OSR为空
  • Leader 负责维护和跟踪 ISR 集合中所有 Follower 副本的滞后状态,当 Follower 落后过多时,就会将其放入 OSR 集合,当 Follower 追上了 Leader 的进度时,就会将其放入 ISR 集合。默认只有 ISR 中的replica才有资格晋升为 Leader

image-20230107152824662

如何确定当前能读到哪一条消息?

  • LEO(log end offset):日志文件中下一条待写入消息的offset每个partition都会有一个LEO
  • HW(High Watermark 高水位):ISR 中最小的LEO[0,LEO) 为这个日志文件的可以消费区间,消费者只能消费此区间中的消息。ISR中的所有replica都共享一个HW。HW机制能确保消费者能读到的消息都是有多副本备份过的可靠消息

image-20230107163333363

生产者发送消息有哪些模式?

总共有三种模式

  • 发后即忘(fire-and-forget):只管发送消息,但不关心消息是否正确到达,效率最高,但可靠性最差。
  • 同步(sync):producer.send()返回一个Future对象,调用get()方法同步等待返回结果,就知道消息是否发送成功,需要等上一个消息发送成功后才可以继续发送下一个消息
  • 异步(async):producer.send()传入一个回调函数,即可异步返回,发送成功与否都会调用这个回调函数,在回调函数中选择记录日志还是重试都取决于调用方。

生产者发送消息的分区策略有哪些?

  • 轮询(默认):依次将消息发送该topic下的所有partition,若在生成消息时 key 为 null,则使用轮询策略。
  • 随机(淘汰掉了):也是为了均衡分配,但效果还不如轮询,所以几乎不用。
  • key 哈希到指定分区:
    • 在生产消息时 key 不为null,根据 key 进行 hash映射到指定分区
    • 如果保证同一个 key 的消息的生产是有序的,基本能满足消息的顺序性保障。但如果 partation 数量发生变化就不能保证映射关系了
    • 会出现数据倾斜
  • 指定 Partiton 发送
  • 自定义策略:实现 Partitioner 接口,并在生产消息时指定该自定义的分区器

!!Kafka 的可靠性是怎么保证的?

也是分为三个方面来思考:生产者到kafka,kafka内部,kafka到消费者。

  1. acks机制(生产者到kafka):acks为生产者生产消息时可以配置的参数,该参数表示指定分区中有多少个副本收到这条消息,生产者才认为这条消息是写入成功的,该参数有三个值:

    • acks = 1(默认):只要 leader 成功写入消息,就代表成功。如果 leader 与 follower 还没有来得及同步,leader 就崩溃了,选举新 leader 后会丢失了这条消息。

    • acks = 0:不需要等待响应就代表生产成功。一旦出现任何问题,都会导致消息丢失。

    • acks = -1 或 acks = all:需要等待 ISR 中的所有replica都写入成功的响应才代表成功。该方案可靠性最高,但如果 ISR 只包含leader ,那么与 acks = 1 毫无差别了。

  2. 生产者发送消息的模式(生产者到kafka):通过同步或异步的模式获取响应结果,做失败重试

  3. 消费者手动提交offset(kafka到消费者):消费者消费到消息后默认是自动提交offset,如果消费者消费出错,自动提交offset就代表这条消息丢失了。如果要保证可靠性可以等业务正常处理完后,再手动提交offset,就不会丢失消息,但这就属于是同步消费了。

  4. 通过 LEO 来确定分区的 HW(kafka到消费者):确保消费者能读到的消息都是有多副本备份过的。

  5. (kafka内部)消息都是持久化的,只要从 Page Cache 持久化到磁盘的过程中broker不宕机,消息就不会丢失。

kafka消息堆积了有什么原因?怎么办?

  • 消费者宕机:
    • 需要马上排查并重启消费者。重启后直接消费新数据,积压的旧数据采用离线程序进行"补漏"。
    • 或者新开一个topic启用新的消费者临时消费。
  • partition较少,消费者消费能力不够,吞吐量不行:
    • 同一个topic下增加partition,并增加消费者组里的消费者。
  • 生产者生产的数据分配不合理,数据倾斜:
    • 合理分配。

Kafka 中如何保证顺序消费

  • 全局顺序消息:

    • 一个 Topic 只能有一个 Partition,且producer和consumer都只能有一个,且单线程,不能并发处理(但违背了 Kafka 的设计初衷,牺牲了高并发、高吞吐的特性)。
  • 局部顺序消息:

    • 发送消息的时候指定 key映射到同一个Partition中,保证同一个订单号的业务的顺序性,consumer也是只能有一个,不能并发处理。
    • 也可以有多个partition和多个consumer,但是consumer需要抢分布式锁,锁是已处理业务的id,id需要有顺序性。
  • 。。。

消费者组Rebalance机制

  • 消费者组中的所有消费者会根据分区分配策略来分配partition,当topic、partition、消费者的数量发生变化(增或减),就会触发Rebalance机制,重新分配分区。

  • Rebalance过程会对consumer group产生非常严重的影响所有消费者都要停止工作,直到Rebalance完成。

消费者组分区分配策略

1.Range 范围分配(默认)

  • 按照消费者总数和分区总数进行整除运算来获得一个跨度,然后按照跨度来平均分配partition,每个消费者负责一个范围内的partition,可以确保每个消费者消费的分区数量是尽可能均衡的。
  • 该策略是针对每个Topic的。对于每个 topic,该策略会将消费者组内所有订阅这个topic的消费者按照名称的字典顺序排序,然后分配。

image-20230107221400827

  1. RoundRobin 轮询分配:轮询地将分区逐个分配给每个消费者

    image-20230107222755521

  2. Sticky 粘性分配

    • 为了在rebalance后,分区的分配尽可能与上一次的保持相同,减少系统资源的开销以及其它异常情况的发生,同时还能尽可能均匀地分配。策略最优,但代码最复杂。

    • 没有rebalance时,Sticky与RoundRobin分配策略是一样的:

    image-20230107223236590

    • Rebalance时,将空缺的partition均匀分配给可用的消费者

    image-20230107223342133

  3. 自定义:实现 org.apache.kafka.clients.consumer.internals.PartitionAssignor 接口

kafka的controller角色

  • Kafka启动时,所有broker中其中一个 broker 会被选举为controller,负责创建topic、添加分区、修改副本数量等任务。

  • 前面leader和follower是针对partition,而controller是针对broker的

  • 分区leader的选举,也是由controller决定的

Controller的选举

  • 在Kafka集群启动的时候,每个broker都会尝试去ZooKeeper上注册成为Controller(ZK临时节点)

  • 但只有一个竞争成功,其他的broker会注册该节点的监视器

  • 一但该临时节点状态发生变化,就可以进行相应的处理

  • Controller也是高可用的,一旦某个broker崩溃,其他的broker会重新注册为Controller

Leader 是如何选举的?

  • leader的选举都由controller决定

  • controller会将leader的改变直接通过RPC的方式通知需为此作出响应的Broker

  • controller读取到当前分区的ISR,只要有一个Replica还幸存,就选择其中一个作为leader,否则任意选这个一个Replica作为leader

  • 如果该partition的所有Replica都已经宕机,则新的leader为-1

  • leader重新选举后不保证leader是负载均衡的

leader负载均衡

  • ISR列表中,第一个replica就是preferred-replica(优先副本)

  • 理想情况下优先副本就是 leader,是能达到leader负载均衡的。

  • 当发现leader不能负载均衡时,可以执行脚本命令重新均匀分配leader。

如何增强消费者的消费能力?

  • 增加 topic 的partition数量,同时增加消费组的消费者数量,尽量使消费者数=分区数
  • 若消费者消费不及时,可以采用多线程的方式进行消费,并优化业务方法流程。

分区数越多越好吗?吞吐量就会越高吗?

在一定条件下,分区数与吞吐量和性能成正比,但不能太多。

  • 内存开销大

    • 服务端在很多组件中都维护了分区级别的缓存,分区数越大,缓存成本也就越大。

    • 消费端的消费线程数是和分区数挂钩,分区数越大,消费者线程的开销成本也就越大。

    • 生产者发送消息有缓存的概念,为每个分区缓存消息,当积累到一定程度或者时间时会将消息发送到分区,分区越多,这部分的缓存也就越大。

  • 文件句柄的开销每个 broker 会为每个日志段文件打开一个 index 文件句柄和一个log文件句柄。 partition 越多,所需要保持打开状态的文件句柄数就越多,最终超过底层操作系统配置的文件句柄数量限制

  • 增加端对端的延迟分区越多则副本之间的同步数据量就越多,在默认情况下,每个 broker 从其他 broker 进行数据副本复制时,该 broker 节点只会为此工作分配一个线程来完成该 broker 所有 partition 数据的复制

  • 降低高可用性分区数量越多,故障恢复时间也就越长,而如果发生宕机的 broker 恰好是 controller 节点时:在这种情况下,新 leader 节点的选举过程在 controller 节点恢复到新的 broker 之前不会启动。controller 节点的错误恢复将会自动地进行,但是新的 controller 节点需要从 zookeeper 中读取每一个 partition 的元数据信息用于初始化数据。例如,假设一个Kafka 集群存在 10000个partition,从 zookeeper 中恢复元数据时每个 partition 大约花费 2 ms,则 controller 的恢复将会增加约 20 秒的不可用时间窗口。

kafka为什么这么快,吞吐量这么高?

  • 顺序读写:日志文件是只能追加写(Append-only)的,在末尾写入数据,顺序读写的读写性能比随机读写高很多。

  • Page Cache:Kafka 利用操作系统本身的 Page Cache(内核缓存),而不是JVM内存(用户空间),通过 mmap 直接操作内存映射文件提高 I/O 速度(kafka没有用户空间中的内存buffer,写入数据都是直接持久化到硬盘的,rocketMQ也是一样)。

  • 零拷贝:Kafka直接使用sendfile系统调用将数据从内核空间的page cache直接拷贝到内核空间的 socket 缓冲区,然后再拷贝到 NIC(网卡) 缓冲区,避免了数据在内核态和用户态之间复制。

  • 分区分段+索引:每次文件操作直接操作 segment,Kafka 又默认为分段后的数据文件建立了索引文件,不仅提高数据读取的效率,同时也提高了数据操作的并行度

  • 批量读写:Kafka 数据读写是批量的而不是单条的,这样可以避免在网络上频繁传输单个消息带来的延迟和带宽开销。假设网络带宽为10MB/S,一次性传输10MB的消息比传输1KB的消息10000万次显然要快得多。

  • 批量压缩:Kafka 把所有的消息都变成一个批量的文件,并且进行合理的批量压缩,减少网络 IO 损耗。

RocketMQ

架构与重要概念

rocketmq是基于kafka的很多特性设计的,所以很多地方都很像。

RocketMQ啥都是集群部署的,这是他吞吐量大高可用的原因之一。

  • NameServer

    • 主要负责对于源数据的管理,包括了对于Topic和路由信息的管理。平时主要开销是在维持心跳和提供Topic-Broker的关系数据。
    • 等同于kafka的zk,但比zk更轻量。主要是因为nameserver集群去中心化的,且每个节点之间互相独立,没有任何信息交互
    • NameServer 被设计成几乎无状态的,可以横向扩展,节点之间无通信,通过部署多台机器来标记自己是一个伪集群
    • 每个 Broker 在启动的时候会到 NameServer 注册,Producer 在发送消息前会根据 Topic 到 NameServer 获取到 Broker 的路由信息,Consumer 也会定时获取 Topic 的路由信息。
  • Broker

    • 单个Broker与所有NameServer保持长连接心跳(30s一次),Broker向NameServer发心跳时, 会带上当前自己所负责的所有Topic信息,如果Topic个数太多(万级别),会导致一次心跳中,光Topic的数据就占几十M,很容易传输失败,导致NameServer误认为Broker心跳失败

    • broker与nameserver底层的通信和连接都是基于Netty实现的。

      • 单broker可以支撑上万队列规模,支持消息推拉模型,且具有上亿级消息堆积能力,同时可严格保证消息的有序性

      • Broker集群可以进行主从部署 ,如果 master 宕机,则slave 提供消费服务,但是不能写入消息(目前原生rocketmq的broker集群一个master只配一个slave,所以也就没有选主的机制,需要手动进行master恢复和故障迁移,除非使用第三方的Dledger组件替换掉原生的机制,就能做到一个master配多个slave,复制半数以上slave即成功,还能自动选主和故障迁移)。

      • Producer 只能将消息发送到 Broker master,但 Consumer 可同时与提供 Topic 服务的 Master 和 Slave建立长连接,既可以从 Broker Master 也可以从 Broker Slave 订阅消息

  • Producer(Group):

    • 单个Producer 与 NameServer集群中其中一个节点(随机选择)建立长连接(无心跳),定期从 NameServer 获取 Topic 路由信息,并向提供 Topic 服务的 Broker Master 保持长连接心跳

    • 消息由Producer通过多种负载均衡模式(默认轮询)发送到Broker集群中该topic下的queue中,发送低延时,支持快速失败。

    • 三种发送消息方式:

      • 同步发送:消息发送方发出数据后会在收到接收方发回响应之后才发下一个数据包。
        • 异步发送:指发送方发出数据后,不等接收方发回响应,接着发送下个数据包。

        • 单向发送:是指只负责发送消息而不等待服务器回应没有回调函数触发。

  • Consumer(Group):

    • 单个Consumer与 NameServer集群中其中一个节点(随机选择)建立长连接(无心跳),定期从 NameServer 获取 Topic 路由信息,并向提供 Topic 服务的 Broker Master 保持长连接心跳
    • 支持推拉两种消费模式,支持集群消费广播消费,提供实时的消息订阅机制
    • 推模式封装了消息的拉取、消费进度和其他的内部维护工作,将消息到达时执行的回调接口留给用户应用程序来实现。但从实现上看还是从消息服务器中拉取消息,不同于 Pull 的是, Push 首先要注册消费监听器,当监听器处触发后才开始消费消息
  • Clustering 集群消费(默认):一个消费者组共同消费一个topic的多个queue,一个queue只会被一个consumer消费。如果某个consumer挂掉,组内其它consumer会接替挂掉的consumer继续消费。

  • Broadcasting 广播消费:消息会发给消费者组中的每一个消费者进行消费

  • Message

    • 一条消息必须有一个Topic

    • 一条消息也可以拥有一个可选Tag和额外的键值对,它们可以用于设置一个业务 Key 并在 Broker 上查找此消息以便在开发期间查找问题。

  • Topic

    • Topic 与生产者和消费者的关系非常松散,一个 Topic 可以有任意个生产者向其发送消息,一个生产者也可以同时向不同的 Topic 发送消息,一个 Topic 也可以被任意个消费者订阅。
  • Tag

    • 可以看作子主题,它是消息的第二级类型,为用户提供额外的灵活性。同一业务模块不同目的的消息就可以用相同 Topic 而不同的 Tag 来标识。如交易消息又可以分为:交易创建消息、交易完成消息等。

    • 一条消息可以没有 Tag

    • 有助于保持代码干净和连贯,还可以为 RocketMQ 提供的查询系统提供帮助。

  • Message Queue

    • 相当于KafkaPartition,每个Queue内部是有序的,分为读和写两种队列,一般来说读写队列数量一致,如果不一致就会出现很多问题。
  • Offset

    • 所有队列都是持久化,长度无限的数据结构,使用Offset来访问某个消息,Offset 为 java long 类型,64 位,理论上在 100年内不会溢出,所以认为是长度无限。
  • Message Order

    • Orderly(顺序消费):消息消费顺序与生产者为每个队列发送顺序一致,所以如果要强制保证全局顺序,需要确保该topic只有一个queue
    • Concurrently(并行消费):不再保证消费顺序,消费的最大并行数量受每个消费者客户端指定的线程池限制。

领域模型

img

Broker有哪些集群模式?Broker高可用?

master和slave是针对broker的概念,slave的作用是对master冗余备份分担读负载、保证消费高可用。master之间不互通,互不了解,每个master只做好自己事情就行了,大大降低了broker实现的复杂性。

  • 单Master模式:风险较大,一旦Broker重启或者宕机时,会导致整个服务不可用,这就需要部署多个broker,多个master。

  • 多Master模式:集群中无Slave,全是Master

    • 优:配置简单,单个Master宕机或重启维护对应用无影响,在磁盘配置为RAID10时,即使机器宕机不可恢复情况下,由于RAID10磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢),性能最高;

    • 劣:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,可用性会受到影响

  • 多master多slave异步复制模式:每个Master配置1个Slave,有多Master-Slave,HA采用异步复制方式,主从之间有短暂消息延迟(毫秒级)

    • 优:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,同时Master宕机后,消费者仍然可以从Slave消费,而且此过程对应用透明,不需要人工干预,性能同多Master模式几乎一样;

    • 劣:Master宕机,磁盘损坏情况下会丢失少量消息。

  • 多 master多slave同步双写模式:每个Master配置1个Slave,有多对Master-Slave,HA采用同步双写方式,只有主从都写成功,才向返回成功

    • 优:数据与服务都无单点故障,Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高;

    • 劣:性能比异步复制模式略低(大约低10%左右),发送单个消息的RT会略高,且目前版本在主节点宕机后,备机不能自动切换为主机。

消息消费高可用

在Consumer的配置文件中,并不需要设置是从Master读还是从Slave 读,当Master不可用或者繁忙的时候,Consumer会被自动切换到从Slave 读。当Master宕机后,Consumer仍然可以从Slave读取消息,不影响Consumer程序,达到了消费端的高可用性。

消息刷盘实现?

Broker 存取消息时直接操作物理内存(内存映射文件),这可以提供系统的吞吐量,但是无法避免机器掉电时数据丢失,所以需要持久化到磁盘中。刷盘的最终实现都是使用NIO中的 MappedByteBuffer.force() 将映射区的数据写入到磁盘,刷盘策略有两种:

  • 同步刷盘:broker内存中的消息必须刷到commitLog日志文件中才算成功发送到该broker上(至于返回producer是否成功就要看broker用什么集群模式了)。
  • 异步刷盘:消息达到Broker内存后就算发送成功,会唤醒另一个线程将数据持久化到CommitLog日志文件中,但不保证线程执行时机。

img

RocketMQ的数据存储形式

RocketMQ主要的存储文件包括CommitLog文件、ConsumeQueue文件、Indexfile文件。采用的是混合型的存储结构(多个Topic的消息实体内容都存储于一个CommitLog中),针对Producer和Consumer分别采用了数据和索引相分离的存储结构。

img

image-20230127133643788

  • CommitLog:日志数据文件

    • 消息主体以及元数据的存储主体,存储Producer写入的消息主体内容,消息内容不是定长的。
    • 单个文件大小默认1G, 文件名长度为20位,左边补零,剩余为起始偏移量,比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为1073741824,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一个文件。
    • CommitLog文件保存于${Rocket_Home}/store/commitlog目录中

    CommitLog

  • ConsumeQueue:逻辑消费队列,基于topic与queue索引文件,直接查询某个topic的某个queue中的消息。

    • ConsumeQueue文件夹为topic/queue/file三层组织结构,存储路径为${Rocket_Home}/store/consumequeue/{topic}/{queueId}/{fileName}。
    • ConsumeQueue文件采取定长设计,每一个条目共20个字节,分别为8字节的CommitLog偏移量、4字节的消息长度、8字节的tag与hashcode,单个文件有30W个条目,可以像数组一样随机访问每一个条目,每个ConsumeQueue文件大小约5.72M;

    image-20230127140547379

  • IndexFile全局索引文件,可以通过key时间区间来查询消息。

    • Index文件的存储位置是: {fileName},文件名fileName是以创建时的时间戳命名的,固定的单个IndexFile文件大小约为400M,一个IndexFile可以保存 2000W个索引
    • 底层实现为hash索引

    IndexFile文件示意图

RocketMQ 数据的读写流程

  • Producer发送消息(包含topic和QueueId)至Broker,Broker使用同步或异步对消息刷盘持久化,保存至CommitLog中。然后Broker的后台服务线程ReputMessageService不停地分发请求异步构建ConsumeQueue(逻辑消费队列)和IndexFile(索引文件)。
  • 根据生产者指定的 TopicQueueId 将这条消息在 CommitLog 的offset,消息大小,tag的hashcode存入对应的 ConsumeQueue 索引文件中。
  • 同时每个队列也都保存了 ConsumerOffset 即每个consumer的消费位置,consumer消费消息时只需要根据 ConsumeOffset 来查找索引,就可获取下一个未被消费的消息。

img

RocketMQ如何保证消息的可靠性?

也是从3个阶段考虑:生产阶段、存储阶段、消费阶段。

  • 生产:生产确认机制

    • 同步发送时,要注意处理响应结果和异常。如果返回响应OK,表示消息成功发送到了Broker,如果响应失败,或者发生其它异常,都应该重试。

    • 异步发送时,应该在回调方法里检查,如果发送失败或者异常,都应该进行重试。

    • 如果发生超时的情况,也可以通过查询日志的API,来检查是否在Broker存储成功。

  • 存储:消息持久化,根据业务场景使用不同的刷盘机制

    • 消息只要持久化到CommitLog(日志文件)中,即使Broker宕机,未消费的消息也能重新恢复再消费。

    • Broker的刷盘策略:同步刷盘异步刷盘

    • Broker通过集群主从模式来保证高可用和消息可靠性。

  • 消费:确认时机应该在执行完所有消费业务逻辑之后,再发送消费确认

Producer和Consumer的负载均衡分配策略?

  • Producer负载均衡:

    • 对于非顺序消息(普通消息、定时/延时消息、事务消息)场景,默认且只能(默认的意思是不需要手动设置)会轮询目标Topic下的所有MessageQueue,并采用递增取模的方式向不同的MessageQueue上发送消息,实现发送的负载均衡。
    • 如果要指定某个queue发送,就用MessageQueueSelector接口的那3个实现类。

    img

    • 5.0之前只支持轮询,5.0之后可支持MessageGroupHash模式,以消息组为粒度,按照内置的SipHash算法,将相同消息组的消息分配到同一队列中,对于顺序消息场景,默认且只能使用MessageGroupHash模式的负载均衡策略。需要给消息设置消息组(MessageGroup)。

    hash算法分配

  • consumer负载均衡:仅限集群消费模式,有3种分配算法:

    • AllocateMessageQueueAveragely(默认)

      image-20230124055431144

    • AllocateMessageQueueAveragelyByCircle

      image-20230124055445375

    • AllocateMachineRoomNearby: 将同机房的Consumer和Broker优先分配在一起。

如何保证消息顺序消费?

也是分全局和局部顺序保证。具体的实现与kafka有一点区别。

  • 全局:

    • 一个 Topic 只能有一个 queue,且producer和consumer都只能有一个,且单线程,不能并发处理(但违背了 rocketmq 的设计初衷,牺牲了高并发、高吞吐的特性)。
  • 局部:

    • 同订单ID的消息发送到同一个queue中,consumer也是只能有一个,不能并发处理。

    • rocketmq没有kafka的key的设计,而是可以使用 MessageQueueSelector 接口(如果不用这个队列选择接口默认就是负载均衡轮询)来控制消息要发往哪个 Message Queue,其有3种实现,顺序性的需求就要使用hash取模:

      SelectMessageQueueByHash   // hash
      SelectMessageQueueByMachineRoom // 机器(broker)随机,同一个机房的队列优先选择
      SelectMessageQueueByRandom // 队列随机
          
      public class SelectMessageQueueByHash implements MessageQueueSelector {
      
          @Override
          public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
              int value = arg.hashCode();
              if (value < 0) {
                  value = Math.abs(value);
              }
              value = value % mqs.size();
              return mqs.get(value);
          }
      }
      

消息重复投递到队列怎么办?

也就是幂等生产者和幂等消费者如何保证?

  • rocketmq没有内置解决方案,要靠业务逻辑。因为RocketMQ选择了确保消息一定被投递到队列,所以有可能造成消息重复投递。

  • 解决方法:在consumer端来实现,只要不重复消费,就解决重复投递。保证每条消息都有唯一编号,建立一个消息表,拿到这个消息在db或redis做insert操作。给这个消息做一个唯一主键(primary key)或唯一约束,如果出现重复消费的情况,就会导致主键冲突,那么就不再处理这条消息。(kafka在producer端实现)

消息堆积怎么办?

解决方法:

  • 限流降级,降低生产者生产速度。
  • 提高消费者的消费能力,可以看kafka的。

怎么实现分布式消息事务?

  • 半消息(Half Message):在事务提交之前,消息对consumer不可见,即暂时还不能被Consumer消费的消息。Producer成功发送消息到Broker,但此消息被标记为 “暂不可投递” 状态,只有Producer执行完本地事务后对该消息进行二次确认之后,Consumer才能消费此条消息。

    如何做到写入消息但是对consumer不可见呢?RocketMQ事务消息的做法是:如果消息是半消息,将备份原消息的主题与消息消费队列,然后改变主题为RMQ_SYS_TRANS_HALF_TOPIC。由于消费组未订阅该主题,故consumer无法消费半消息,然后RocketMQ会开启一个定时任务,从Topic为RMQ_SYS_TRANS_HALF_TOPIC中拉取消息进行消费,根据生产者组获取一个服务提供者发送回查事务状态请求,根据事务状态来决定是提交或回滚消息。

  • 消息回查:因为网络闪段、producer宕机等,导致Producer 一直没有对半消息进行二次确认。这时broker会定时扫描长期处于半消息的消息,主动询问Producer该消息的最终状态(Commit或者Rollback)

基于半消息和消息回查,就能实现分布式事务,整个过程是异步的:

  1. producer发送半消息给broker,消息携带consumer即将要+100元的信息。

  2. broker回复producer半消息发送成功,producer就知道半消息发送成功。

  3. producer执行本地事务,结果会有三种情况:1、执行成功。2、执行失败。3、网络等原因导致没有响应

    • 如果本地事务成功,producer向broker发送Commit,消费者可以消费该message,执行他自己的本地事务。

    • 如果本地事务失败,producer向broker发送Rollback,broker直接删除这条半消息(若开启了死信保存功能,则将该消息放入死信topic的队列,3天后自动删除,默认不开启,但一般都是开启的)。

    • 如果因为网络等原因迟迟没有返回失败还是成功,那么会执行RocketMQ的回调接口,进行事务的消息回查

image-20230126133256774

消息回溯?

Consumer已经消费成功的消息,由于业务上的需求需要重新消费,Broker在向Consumer投递成功消息后,消息仍然需要保留。例如由于Consumer系统故障,恢复后需要重新消费1小时前的数据,那么Broker要提供一种机制,可以按照时间维度(一般是这个维度)来回退消费进度。

RocketMQ支持按照时间回溯消费,时间维度精确到毫秒,可以向前回溯,也可以向后回溯。

如何实现消息过滤?

有两种方案:

  • Broker过滤
    • 优:避免无用的消息传到 Consumer
    • 劣:加重了 Broker 的负担,实现起来相对复杂
  • Consumer过滤(常用,如果希望提高吞吐量,可以采用Broker过滤。)
    • 优:实现起来简单,可以完全自定义实现
    • 劣:大量无用的消息到达了 Consumer 端只能丢弃不处理,吞吐量比broker过滤低

有3种方式:

  • Tag过滤:最常用,高效简单
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");
  • SQL 表达式过滤:更灵活
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
// 只有订阅的消息有这个属性a, a >=0 and a <= 3
consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");
consumer.registerMessageListener(new MessageListenerConcurrently() {
   @Override
   public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
       return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
   }
});
consumer.start();
  • Filter Server 过滤:最灵活,也是最复杂的一种方式,允许用户自定义函数进行过滤

消息长轮询?

Consumer 拉取消息时,如果对应的queue没有数据,Broker 不会立即返回,而是把该PullReuqest 挂起,有一个服务线程会不停地检查,看queue中是否有数据或超时。当 queue 有了消息就能返回,或长轮询阻塞时间到了,再重新处理该 queue 上的所有 PullRequest。

  • PullMessageProcessor#processRequest
//如果没有拉到数据
case ResponseCode.PULL_NOT_FOUND:
// broker 和 consumer 都允许 suspend,默认开启
if (brokerAllowSuspend && hasSuspendFlag) {
    long pollingTimeMills = suspendTimeoutMillisLong;
    if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
        pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
    }

    String topic = requestHeader.getTopic();
    long offset = requestHeader.getQueueOffset();
    int queueId = requestHeader.getQueueId();
    //封装一个PullRequest
    PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
            this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
    //把PullRequest挂起
    this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
    response = null;
    break;
}
  • PullRequestHoldService#run()
@Override
public void run() {
    log.info("{} service started", this.getServiceName());
    while (!this.isStopped()) {
        try {
            if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
                this.waitForRunning(5 * 1000);
            } else {
                this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());
            }

            long beginLockTimestamp = this.systemClock.now();
            // 循环检查hold住的请求
            this.checkHoldRequest();
            long costTime = this.systemClock.now() - beginLockTimestamp;
            if (costTime > 5 * 1000) {
                log.info("[NOTIFYME] check hold request cost {} ms.", costTime);
            }
        } catch (Throwable e) {
            log.warn(this.getServiceName() + " service has exception. ", e);
        }
    }

    log.info("{} service end", this.getServiceName());
}

为什么RocketMQ不使用Zookeeper作为注册中心呢?

Kafka我们都知道采用Zookeeper作为注册中心——当然也开始逐渐去Zookeeper,RocketMQ不使用Zookeeper其实主要可能从这几方面来考虑:

  1. 基于可用性的考虑,根据CAP理论,同时最多只能满足两个点,而Zookeeper满足的是CP,也就是说Zookeeper并不能保证服务的可用性,Zookeeper在进行选举的时候,整个选举的时间太长,期间整个集群都处于不可用的状态,而这对于一个注册中心来说肯定是不能接受的,作为服务发现来说就应该是为可用性而设计。
  2. 基于性能的考虑,NameServer本身的实现非常轻量,而且可以通过增加机器的方式水平扩展,增加集群的抗压能力,而Zookeeper的写是不可扩展的,Zookeeper要解决这个问题只能通过划分领域,划分多个Zookeeper集群来解决,首先操作起来太复杂,其次这样还是又违反了CAP中的A的设计,导致服务之间是不连通的。
  3. 持久化的机制来带的问题,ZooKeeper 的 ZAB 协议对每一个写请求,会在每个 ZooKeeper 节点上保持写一个事务日志,同时再加上定期的将内存数据镜像(Snapshot)到磁盘来保证数据的一致性和持久性,而对于一个简单的服务发现的场景来说,这其实没有太大的必要,这个实现方案太重了。而且本身存储的数据应该是高度定制化的。
  4. 消息发送应该弱依赖注册中心,而RocketMQ的设计理念也正是基于此,生产者在第一次发送消息的时候从NameServer获取到Broker地址后缓存到本地,如果NameServer整个集群不可用,短时间内对于生产者和消费者并不会产生太大影响。
上一篇 下一篇