RocketMQ源码分析:延迟消息

这一节我们来看下RocketMQ的延迟消息

1.发送延迟消息

public class Producer { public static void main(String[] args) throws Exception { // 实例化一个生产者来产生延时消息 DefaultMQProducer producer = new DefaultMQProducer("DELAY_P_G");

      producer.setNamesrvAddr("127.0.0.1:9876"); // 启动生产者 producer.start(); for (int i = 0; i < 1; i++) {
          Message message = new Message(MQConstant.DELAY_TOPIC, ("Hello scheduled message " + i).getBytes()); /**
           * MessageStoreConfig
           * messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
           *
           * 共18个等级,依次是从1-18
           * 比如,level=3, 表示延迟10s 消费
           */ message.setDelayTimeLevel(4); // 发送消息 SendResult send = producer.send(message);
          System.out.println("send = " + send);
      } // 关闭生产者 producer.shutdown();
  }
}

延迟消息的标志就是在发送时,通过消息对象Message的setDelayTimeLevel(int level)方法设置一个延迟等级,这样该条消息就是一个延迟消息了。那么延迟等级与延迟时间是如何对应的呢?


2.存储延迟消息

其实延迟消息和普通消息并没有多大的差异,只不过broker在存储消息时,会判断消息的延迟属性是否为空,如果不为空,则判断是延迟消息,进而会做一些额外的处理,那么我们就看下broker存储时判断是否为延迟消息的逻辑:

CommitLog#asyncPutMessage(..)

public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) { // Set the storage time msg.setStoreTimestamp(System.currentTimeMillis()); // Set the message body BODY CRC (consider the most appropriate setting // on the client) msg.setBodyCRC(UtilAll.crc32(msg.getBody())); // Back to Results AppendMessageResult result = null;

    StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();

    String topic = msg.getTopic(); int queueId = msg.getQueueId(); final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()); if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
            || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) { //TODO:延迟消息的判定 if (msg.getDelayTimeLevel() > 0) { if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
            } //TODO:将延迟消息的topic替换为broker固定的topic: SCHEDULE_TOPIC_XXXX topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC; //TODO: 将queueid替换为(延迟级别-1) queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()); //TODO:备份原始的topic/queueid, 留着后面解析 MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
            MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
            msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); //TODO:将消息topic设置为延迟topic,这样订阅该topic的消费者不能及时去消费了 //TODO:等到延迟时间到了,将延迟topic在替换成原始topic,消费者就可以消费了 msg.setTopic(topic);
            msg.setQueueId(queueId);
        }
    } //TODO:....省略后续存储逻辑,和普通消息一样 }

其实它要做的事情很简单,简单总结下:

  1. 将原始topic替换为延迟消息固定的topic:SCHEDULE_TOPIC_XXXX

所有的延时消息共用这一个topic

  1. 将原始queueid替换为(延迟级别-1)
相同延迟级别的消息会在同一个队列中

  1. 备份原始topic/queueid, 保存到原始消息的properties属性中

这样就处理完了一条延迟消息,然后就是存储消息,和普通一样,这里就不展示了。

  1. 不过在消息分发(构建消息索引)时,将索引单元的的tag hashcode 替换为消息的投递时间

3. 延迟消息的投递

上面broker将延迟消息写到了commitlog中,由于broker替换了我们的原始topic,所以订阅该topic的消费者此时还无法消费该消息,只有当时间到了消费者才可以消费,那么我们就看下broker是如何处理的。 首先处理延迟消息的是ScheduleMessageService类,我们简单看下它的类结构:

broker启动时,会启动该类

public class ScheduleMessageService extends ConfigManager { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); private static final long FIRST_DELAY_TIME = 1000L; private static final long DELAY_FOR_A_WHILE = 100L; private static final long DELAY_FOR_A_PERIOD = 10000L; //TODO:broker启动时会初始化这个Map,key是延迟等级,共计18个,value就是延迟等级对应的时间 private final ConcurrentMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable = new ConcurrentHashMap<Integer, Long>(32); private final ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable = new ConcurrentHashMap<Integer, Long>(32); //TODO:省略其他属性和方法 //TODO:broker启动时,会调用该方法 public void start() { if (started.compareAndSet(false, true)) { super.load(); this.timer = new Timer("ScheduleMessageTimerThread", true); for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
                Integer level = entry.getKey();
                Long timeDelay = entry.getValue();
                Long offset = this.offsetTable.get(level); if (null == offset) {
                    offset = 0L;
                } if (timeDelay != null) { //TODO:处理延迟消息 this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
                }
            } this.timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { try { //TODO:持久化 if (started.get()) ScheduleMessageService.this.persist();
                    } catch (Throwable e) {
                        log.error("scheduleAtFixedRate flush exception", e);
                    }
                }
            }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
        }
    } //TODO:省略其他方法 }

关注的地方主要就是2个,一个是处理延迟消息,一个是持久化,那么我们就分别看下:

3.1 处理延迟消息

Broker中同一等级的所有延时消息会被写入到consumequeue 目录中SCHEDULE_TOPIC_XXXX目录下相同Queue中。即一个Queue中消息投递时间的延迟等级时间是相同的。那么投递时间就取决于于 消息存储时间了。即按照消息被发送到Broker的时间进行排序的。

//TODO:遍历延迟等级列表 for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
    Integer level = entry.getKey();
    Long timeDelay = entry.getValue();
    Long offset = this.offsetTable.get(level); if (null == offset) {
        offset = 0L;
    } if (timeDelay != null) { this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
    }
}

delayLevelTable存放的就是如下图:


key是延迟等级,value是对应的延迟时间。共计18个

每一个延迟级别对应一个DeliverDelayedMessageTimerTask,所以相同延迟级别的消息共用同一个线程。

接下来我们就看下DeliverDelayedMessageTimerTask的逻辑:

class DeliverDelayedMessageTimerTask extends TimerTask { private final int delayLevel; private final long offset; public DeliverDelayedMessageTimerTask(int delayLevel, long offset) { this.delayLevel = delayLevel; this.offset = offset;
    } @Override public void run() { try { if (isStarted()) { //TODO:核心逻辑 this.executeOnTimeup();
            }
        } catch (Exception e) { // XXX: warn and notify me log.error("ScheduleMessageService, executeOnTimeup exception", e);
            ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask( this.delayLevel, this.offset), DELAY_FOR_A_PERIOD);
        }
    } //TODO:....省略其他方法....... }

继续看下executeOnTimeup()方法的逻辑,内容比较多,不过还是很容易理解

public void executeOnTimeup() { //TODO:根据延迟topic和延迟queueid 去获取Consumequeue ConsumeQueue cq =
        ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
            delayLevel2QueueId(delayLevel)); long failScheduleOffset = offset; if (cq != null) {
        SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset); if (bufferCQ != null) { try { //TODO:offset用来标记队列读取到哪里了 long nextOffset = offset; int i = 0;
                ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit(); for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { long offsetPy = bufferCQ.getByteBuffer().getLong(); int sizePy = bufferCQ.getByteBuffer().getInt(); long tagsCode = bufferCQ.getByteBuffer().getLong(); //TODO:....省略部分代码..... long now = System.currentTimeMillis(); //TODO:计算投递时间,时间存储在了tag hashcode 中了 long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);

                    nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE); long countdown = deliverTimestamp - now; //TODO:投递时间到了 if (countdown <= 0) { //TODO:去broker中将消息读取出来 MessageExt msgExt =
                            ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
                                offsetPy, sizePy); if (msgExt != null) { try { //TODO:构建新的消息体,将原来的消息信息设置到这里,并将topic和queueid设置为原始的topic和queueid(前面备份过) MessageExtBrokerInner msgInner = this.messageTimeup(msgExt); if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {
                                    log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",
                                            msgInner.getTopic(), msgInner); continue;
                                } //TODO:将消息再次写入commitlog中,topic是原始topic,这样消费者就可以去消费了 PutMessageResult putMessageResult =
                                    ScheduleMessageService.this.writeMessageStore
                                        .putMessage(msgInner); //TODO:....省略部分代码......  } catch (Exception e) { /*
                                 * XXX: warn and notify me
                                 */ log.error( "ScheduleMessageService, messageTimeup execute error, drop it. msgExt=" + msgExt + ", nextOffset=" + nextOffset + ",offsetPy=" + offsetPy + ",sizePy=" + sizePy, e);
                            }
                        }
                    } else {
                        ScheduleMessageService.this.timer.schedule( new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
                            countdown);
                        ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset); return;
                    }
                } // end of for nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
                ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask( this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
                ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset); return;
            } finally {

                bufferCQ.release();
            }
        } // end of if (bufferCQ != null) else { long cqMinOffset = cq.getMinOffsetInQueue(); if (offset < cqMinOffset) {
                failScheduleOffset = cqMinOffset;
                log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset=" + cqMinOffset + ", queueId=" + cq.getQueueId());
            }
        }
    } // end of if (cq != null) ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
        failScheduleOffset), DELAY_FOR_A_WHILE);
}

我们简单对代码总结下:

  1. 根据延迟topic和延迟queueid获取consumequeue,并从队列中读取索引单元

  1. 计算消息的投递时间。从索引单元中取出消息的保存时间(延迟消息的索引单元会将tag hashcode 替换为消息的存储时间),然后根据延迟等级获取出延迟时间,然后二者相加就是消息的投递时间。
  2. 如果投递时间到了

3.1 则根据索引单元中的commitlog offset 和 msg size 将该条消息A从commitlog中读取出 来.
3.2 将读取出来的消息属性复制到一个新的消息对象体B中,将A中备份的原始topic, queueid 读取 出来重新设置到B中,并清除延迟属性,使其成为一条普通消息.
3.3 调用CommitLog#putMessage(msg)方法,再次将消息B写入到commitlog中。这样消费者就可以消费到订阅了该topic的消息。

  1. 如果投递时间没到

4.1 计算剩余投递时间countdown(投递时间-当前时间), 然后开启一个JDK的Timer延迟任务,延迟时间就是countdown,继续执行DeliverDelayedMessageTimerTask的逻辑。

  1. 更新延迟消息队列的消费进度(后面持久化也就是指的它)

这里简单说下:同一个Queue(delayLevel - 1)中消息投递时间的延迟等级是相同的。那么投递时间就取决于消息存储时间了。即按照消息被发送到Broker的时间进行排序的。


3.2 持久化

持久化其实也非常的简单,就是通过定时任务,每隔10s将延迟队列的消费进度offset写到文件中。

文件默认路径:$user.home/store/config/delayOffset.json


key 就是延迟等级,value 就是对应的消费进度offset。

4.总结

本文从源码的角度分析了RocketMq是如何发送延迟消息的,那么我们就简单总结下:

  1. 发送消息时,通过setDelayTimeLevel(int level) 来设置延迟等级,RocketMQ默认支持18种延迟等级,每个延迟等级对应不同的延迟时间
  2. 所有延迟消息共用一个topic: SCHEDULE_TOPIC_XXXX
  3. 相同延迟等级的消息会放到同一个队列中(queueid=delayLevel - 1)
  4. 相同等级的消息会被同一个线程去处理
#Java##程序员##计算机##编程#
全部评论
有点不好理解
点赞 回复 分享
发布于 2022-08-08 16:20

相关推荐

代码飞升_不回私信人...:别这样贬低自己,降低预期,放平心态,跟昨天的自己比。做好自己,反而会效率更高心态更好,加油兄弟
点赞 评论 收藏
分享
评论
2
4
分享

创作者周榜

更多
牛客网
牛客网在线编程
牛客网题解
牛客企业服务