RocketMQ源码:consumer 提交消费偏移量

1.什么是消费偏移量offset?

我们先看一幅图

消费偏移量offset就是记录消费者的消费进度的。也是rocketmq保证消息不会重复消费的核心(当然,极端情况下还是可能会导致重复消费)。

consumequeue中一个消息的索引单元就是一个offset值。

在分析rocketmq的消费者是如何利用这个offset完成消息消费的之前,我们先看下broker端是如何管理这些offset值的。

2. 服务端管理offset

这里的服务端就是broker

  1. broker在初始化(initialize())时会通过消费者offset管理类ConsumerOffsetManager来加载配置文件中的offset值,然后设置到offsetTable属性中。
public class ConsumerOffsetManager extends ConfigManager {


    //TODO: key = topic@group
    //TODO: value = Map[key = queueId, value = offset]
    private ConcurrentMap> offsetTable =
        new ConcurrentHashMap>(512);
        
        //TOOD:...other
}        
复制代码

offset文件默认路径:$user.home/store/config/consumerOffset.json 文件内容例子:

{
        "offsetTable":{
                //topic@consumerGroupName
                "batchTopic@test_cg_batch":{0:0,1:0,2:0,3:1
                },
                //key是queueid,value是offset值,就是我们今天讨论的主角
                "ordered_topic@CG":{0:0,1:15,2:0,3:35
                },
                "qiuguan_topic@qiuguan_group_1":{0:2533,1:2534,2:2531,3:2531
                },
                "hacker_topic@fuyuanhui_group_2":{0:64035,1:64034,2:64034,3:64034
                },
                "qiuguan_topic_2@qiuguan_group":{0:2,1:1,2:7,3:6
                },
                "qiuguan_topic@qiuguan_group":{0:2533,1:2534,2:2531,3:2531
                }
        }
}        
复制代码
  1. 消费者消费后,会将offset发送到broker,这里会先写入到上面的消费者offset管理类ConsumerOffsetManager的offsetTable中,然后通过定时任务将其刷盘到文件中。属性中。然后通过3将数据持久化到文件中

稍后我们分析消费者时还会看到

  1. broker在初始化(initialize())时,会启动定时任务,每隔5秒执行一次初始化,将ConsumerOffsetManager的offsetTable属性中的值持久化到文件中。
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        try {
            BrokerController.this.consumerOffsetManager.persist();
        } catch (Throwable e) {
            log.error("schedule persist consumerOffset error.", e);
        }
    }
}, 1000 * 10, 1000 * 5, TimeUnit.MILLISECONDS);
复制代码

那么服务端对于offset值的管理大致就这些,那么我们来看下消费者是如何利用offset来进行消息消费的。

总的来说就是,消费者定时的将消费过的offset值上传到broker的内存offsetTable中,然后通过定时任务将其刷盘到文件中。

那么接下来就看看消费者是如何使用这个offset值的。

3.消费者使用offset

3.1 消费者初始化offset

它会启动一个消息拉取服务PullMessageService对象,还有一个是在拉取消息之前要完成的重平衡RebalanceService对象。offset初始化就和重平衡息息相关,那么我们就看下重平衡是如何完成offset初始化的。

我们这里还是只讨论集群消费模式。它和广播模式的区别就是,广播模式每个消费者都要消费topic下的所有队列,集群模式通过分配算法(默认是平均)来将topic下的所有队列分配给消费者。既然这里我们主要讨论的是offset,那么就以集群模式进行分析即可。

这里我们就只看和offset初始化相关的部分

RebalanceService#run()一步一步来到初始化offset的地方

private boolean updateProcessQueueTableInRebalance(final String topic, final Set mqSet,
    final boolean isOrder) {
    boolean changed = false;

    ...

    /**
     * 遍历本次负载分配到的队列集合,如果
     * processQueueTable中没有包含该消息队列,表明这是本次新增加的消
     * 息队列,首先从内存中移除该消息队列的消费进度,然后从磁盘中读
     * 取该消息队列的消费进度,创建PullRequest对象。这里有一个关键,
     * 如果读取到的消费进度小于0,则需要校对消费进度。RocketMQ提供了
     * CONSUME_FROM_LAST_OFFSET、CONSUME_FROM_FIRST_OFFSET、
     * CONSUME_FROM_TIMESTAMP方式,在创建消费者时可以通过调用
     * DefaultMQPushConsumer#setConsumeFromWhere方法进行设置
     */
    List pullRequestList = new ArrayList();
    for (MessageQueue mq : mqSet) {
        if (!this.processQueueTable.containsKey(mq)) {

            /**
             * 经过消息队列重新负载(分配)后,分配到新的消息队列时,首
             * 先需要尝试向Broker发起锁定该消息队列的请求,如果返回加锁成
             * 功,则创建该消息队列的拉取任务,否则跳过,等待其他消费者释放
             * 该消息队列的锁,然后在下一次队列重新负载时再尝试加锁
             */
            // 顺序消息
            if (isOrder && !this.lock(mq)) {
                log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
                continue;
            }

            this.removeDirtyOffset(mq);
            ProcessQueue pq = new ProcessQueue();
            // todo PullRequest的nextOffset计算逻辑位于RebalancePushImpl#computePullFromWhere
            long nextOffset = this.computePullFromWhere(mq);
            if (nextOffset >= 0) {
                ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
                if (pre != null) {
                    log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
                } else {
                    log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
                    PullRequest pullRequest = new PullRequest();
                    pullRequest.setConsumerGroup(consumerGroup);
                    pullRequest.setNextOffset(nextOffset);
                    pullRequest.setMessageQueue(mq);
                    pullRequest.setProcessQueue(pq);
                    pullRequestList.add(pullRequest);
                    changed = true;
                }
            } else {
                log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
            }
        }
    }

    // todo 将PullRequest加入PullMessageService,以便唤醒PullMessageService线程
    this.dispatchPullRequest(pullRequestList);

    return changed;
}
复制代码

初始化 offset值的地方就是 computePullFromWhere(mq)方法,那么点进去看下它的内部逻辑: org.apache.rocketmq.client.impl.consumer.RebalancePushImpl#computePullFromWhere:

public long computePullFromWhere(MessageQueue mq) {
    long result = -1;
    final ConsumeFromWhere consumeFromWhere = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere();
    // 获取 offsetStore
    final OffsetStore offsetStore = this.defaultMQPushConsumerImpl.getOffsetStore();
    switch (consumeFromWhere) {
        case CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST:
        case CONSUME_FROM_MIN_OFFSET:
        case CONSUME_FROM_MAX_OFFSET:
        // 从队列最新偏移量开始消费
        case CONSUME_FROM_LAST_OFFSET: {
            /**
             * 从远程中读取消息队列的消费进度,如果大于0则直接返回,如果等
             * 于-1,在CONSUME_FROM_LAST_OFFSET模式下获取该消息队列当前最大
             * 的偏移量,如果小于-1,表示该消息进度文件中存储了错误的偏移
             * 量,则返回-1
             */
            // todo 读取操作
            long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
            if (lastOffset >= 0) {
                result = lastOffset;
            }
            // First start,no offset
            else if (-1 == lastOffset) {
                if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                    result = 0L;
                } else {
                    try {
                        result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
                    } catch (MQClientException e) {
                        result = -1;
                    }
                }
            } else {
                result = -1;
            }
            break;
        }
        // 从头开始消费
        case CONSUME_FROM_FIRST_OFFSET: {
            /**
             * 从磁盘中读取消息队列的消费进度,如果大于0则直接返回,如果
             * 等于-1,在CONSUME_FROM_FIRST_OFFSET模式下直接返回0,从头开始
             * 消费,如果小于-1,表示该消息进度文件中存储了错误的偏移量,则
             * 返回-1
             */
            long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
            if (lastOffset >= 0) {
                result = lastOffset;
            } else if (-1 == lastOffset) {
                result = 0L;
            } else {
                result = -1;
            }
            break;
        }
        // 从消费者启动时间戳对应消费进度开始消费
        case CONSUME_FROM_TIMESTAMP: {
            /**
             * 从磁盘中读取消息队列的消费进度,如果大于0则直接返回。如果
             * 等于-1,在CONSUME_FROM_TIMESTAMP模式下会尝试将消息存储时间戳
             * 更新为消费者启动的时间戳,如果能找到则返回找到的偏移量,否则
             * 返回0。如果小于-1,表示该消息进度文件中存储了错误的偏移量,则
             * 返回-1
             */
            long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
            if (lastOffset >= 0) {
                result = lastOffset;
            } else if (-1 == lastOffset) {
                if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                    try {
                        result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
                    } catch (MQClientException e) {
                        result = -1;
                    }
                } else {
                    try {
                        long timestamp = UtilAll.parseDate(this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeTimestamp(),
                            UtilAll.YYYYMMDDHHMMSS).getTime();
                        result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);
                    } catch (MQClientException e) {
                        result = -1;
                    }
                }
            } else {
                result = -1;
            }
            break;
        }

        default:
            break;
    }

    return result;
}
复制代码

总之就是无论哪种配置策略都要先从broker读取offset,如果“没有”,则按照客户端的规则进行offset的初始化。

所以这里就是确定好初始offset. 确定好offset后,就可以构建PullRequest了,我们继续看前面的代码:

            // todo PullRequest的nextOffset计算逻辑位于RebalancePushImpl#computePullFromWhere
            long nextOffset = this.computePullFromWhere(mq);
            if (nextOffset >= 0) {
                ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
                if (pre != null) {
                    log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
                } else {
                    log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
                    PullRequest pullRequest = new PullRequest();
                    pullRequest.setConsumerGroup(consumerGroup);
                    pullRequest.setNextOffset(nextOffset);
                    pullRequest.setMessageQueue(mq);
                    pullRequest.setProcessQueue(pq);
                    pullRequestList.add(pullRequest);
                    changed = true;
                }
            } 
复制代码

这里就比较熟悉了,前面分析消费流程的时候我们也看到过,接下来就是将填充好MessageQueue(主要就是queueid), ProcessQueue(消费者本地缓存消息的队列),nextOffset(消费偏移量)的 PullRequest 对象放入拉取消息的类PullMessageService 中的pullRequestQueue队列属性中,这样PullMessageService 就可以去broker拉取消息了。

拉取消息的过程我们还只是关注消费偏移量(nextOffset)的变化

3.3 利用offset拉取消息

@Override
public void run() {
    log.info(this.getServiceName() + " service started");

    while (!this.isStopped()) {
        try {
            //TODO:从队列中取出 PullRequest
            PullRequest pullRequest = this.pullRequestQueue.take();
            //TODO:拉取消息
            this.pullMessage(pullRequest);
        } catch (InterruptedException ignored) {
        } catch (Exception e) {
            log.error("Pull Message Service Run Method exception", e);
        }
    }

    log.info(this.getServiceName() + " service end");
}
复制代码

那么在拉取消息的过程中,offset是如何工作的呢?我们用一幅图来展示:

那么我们在从源码中简单看下拉取过程:

DefaultMQPushConsumerImpl#pullMessage(PullRequest pullRequest)

public void pullMessage(final PullRequest pullRequest) {
    
    //TODO:....省略诸多流控代码.......
    
    
    //TODO:消息从broker拉取成功后的本地回调处理
    //TODO:PullResult 包含了从broker读取的消息,以及新的offset
    PullCallback pullCallback = new PullCallback() {
        @Override
        public void onSuccess(PullResult pullResult) {
            if (pullResult != null) {
                pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
                    subscriptionData);

                switch (pullResult.getPullStatus()) {
                    case FOUND:
                        long prevRequestOffset = pullRequest.getNextOffset();
                       
                        //TODO:重点关注
                        //TODO:将broker返回的新的offset值重新设置给PullRequest的 nextOffset
                        //TODO:下次就用新的offset去broker读取消息
                        pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                        long pullRT = System.currentTimeMillis() - beginTimestamp;
                        DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
                            pullRequest.getMessageQueue().getTopic(), pullRT);

                        long firstMsgOffset = Long.MAX_VALUE;
                        if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
                            DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                        } else {
                            firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();

                            DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
                                pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());

                            boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
                            DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
                                pullResult.getMsgFoundList(),
                                processQueue,
                                pullRequest.getMessageQueue(),
                                dispatchToConsume);

                            if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
                                DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
                                    DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
                            } else {
                              
                                //TODO: 重点关注
                                //TODO:再次将PullRequest放入队列中
                                //TODO:还是原来的PullRequest对象,只不过offset更新了
                                DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                            }
                        }
                         
                        //TODO:...省略部分代码......
                       
                        break;
                    case NO_NEW_MSG:
                    case NO_MATCHED_MSG:
                        pullRequest.setNextOffset(pullResult.getNextBeginOffset());

                        DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);

                        DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                        break;
                     
                    default:
                        break;
                }
            }
        }

        @Override
        public void onException(Throwable e) {
            if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                log.warn("execute the pull request exception", e);
            }

            DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
        }
    };
    
    //TODO:...省略部分代码。。。。。
    
    //TODO:去broker拉取消息
    try {
            this.pullAPIWrapper.pullKernelImpl(
                pullRequest.getMessageQueue(),
                subExpression,
                subscriptionData.getExpressionType(),
                subscriptionData.getSubVersion(),
                //TODO:从PullReqeust中获取offset值给broker
                pullRequest.getNextOffset(),
                this.defaultMQPushConsumer.getPullBatchSize(),
                sysFlag,
                commitOffsetValue,
                BROKER_SUSPEND_MAX_TIME_MILLIS,
                CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
                CommunicationMode.ASYNC,
                //TODO:上面构建的回调对象,当从broker拉取到消息后交给回调处理
                pullCallback
            );
        } catch (Exception e) {
            log.error("pullKernelImpl exception", e);
            this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
        }
}
复制代码
复制代码

通过分析源码,我们也大概知道了在拉取消息过程中offset是如何变化的。那么接下来我们在看下offset是如何完成持久化的。

3.4 offset的更新和持久化

在消费者消费完消息后,就需要更新offset并完成持久化,我们我们就看下消费完成后处理消费结果的代码:

ConsumeMessageConcurrentlyService#processConsumeResult(...)

public void processConsumeResult(
    final ConsumeConcurrentlyStatus status,
    final ConsumeConcurrentlyContext context,
    final ConsumeRequest consumeRequest
) {
    //获取ackIndex的值,Integer.MAX_VALUE
    int ackIndex = context.getAckIndex();

    if (consumeRequest.getMsgs().isEmpty())
        return;

    /**
     * 根据消息监听器返回的结果计算ackIndex,如果返回
     * CONSUME_SUCCESS,则将ackIndex设置为msgs.size()-1,如果返回
     * RECONSUME_LATER,则将ackIndex设置为-1,这是为下文发送msg
     * back(ACK)消息做的准备
     */
    switch (status) {
        case CONSUME_SUCCESS:
            if (ackIndex >= consumeRequest.getMsgs().size()) {
                ackIndex = consumeRequest.getMsgs().size() - 1;
            }
            int ok = ackIndex + 1;
            int failed = consumeRequest.getMsgs().size() - ok;
            this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
            this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
            break;
        case RECONSUME_LATER:
            //如果消费失败,则将ackIndex置为-1
            ackIndex = -1;
            this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
                consumeRequest.getMsgs().size());
            break;
        default:
            break;
    }

    /**
     * 如果是广播模式,业务方会返回RECONSUME_LATER,消息
     * 并不会被重新消费,而是以警告级别输出到日志文件中。如果是集群
     * 模式,消息消费成功,因为ackIndex=
     * consumeRequest.getMsgs().size()-1,所以i=ackIndex+1等于
     * consumeRequest.getMsgs().size(),并不会执行sendMessageBack。
     * 只有在业务方返回RECONSUME_LATER时,该批消息都需要发送ACK消
     * 息,如果消息发送失败,则直接将本批ACK消费发送失败的消息再次封
     * 装为ConsumeRequest,然后延迟5s重新消费。如果ACK消息发送成功,
     * 则该消息会延迟消费
     */
    switch (this.defaultMQPushConsumer.getMessageModel()) {
        case BROADCASTING:
            for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                MessageExt msg = consumeRequest.getMsgs().get(i);
                log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
            }
            break;
        case CLUSTERING:
            List msgBackFailed = new ArrayList(consumeRequest.getMsgs().size());
            for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                MessageExt msg = consumeRequest.getMsgs().get(i);
                // todo 发回到broker
                boolean result = this.sendMessageBack(msg, context);
                if (!result) {
                    msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
                    msgBackFailed.add(msg);
                }
            }

            if (!msgBackFailed.isEmpty()) {
                consumeRequest.getMsgs().removeAll(msgBackFailed);

                this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
            }
            break;
        default:
            break;
    }

    /**
     * 从ProcessQueue中移除这批消息,这里返回的偏移量
     * 是移除该批消息后最小的偏移量。然后用该偏移量更新消息消费进
     * 度,以便消费者重启后能从上一次的消费进度开始消费,避免消息重
     * 复消费。值得注意的是,当消息监听器返回RECONSUME_LATER时,消息
     * 消费进度也会向前推进,并用ProcessQueue中最小的队列偏移量调用
     * 消息消费进度存储器OffsetStore更新消费进度。这是因为当返回
     * RECONSUME_LATER时,RocketMQ会创建一条与原消息属性相同的消息,
     * 拥有一个唯一的新msgId,并存储原消息ID,该消息会存入CommitLog
     * 文件,与原消息没有任何关联,所以该消息也会进入ConsuemeQueue,
     * 并拥有一个全新的队列偏移量
     */
    long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
    if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
        // todo 更新偏移量
        this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
    }
}
复制代码

我们继续看下updateOffset的逻辑:

这里有两个实现类

那么我们就看下RemoteBrokerOffsetStore的updateOffset的逻辑:

public class RemoteBrokerOffsetStore implements OffsetStore {
    
    //TOOD:....省略部分代码.......

    @Override
    public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) {
        if (mq != null) {
            AtomicLong offsetOld = this.offsetTable.get(mq);
            if (null == offsetOld) {
                offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));
            }

            if (null != offsetOld) {
                if (increaseOnly) {
                    MixAll.compareAndIncreaseOnly(offsetOld, offset);
                } else {
                    offsetOld.set(offset);
                }
            }
        }
    }    
}
复制代码

其实很简单,就是将消费过的消息的offset(递增)更新到消费者本地offset变量表中。然后通过定时任务持久化到broker中。

接下来在简单看下消费者客户端持久化offset到broker. 在消费者启动时,会启动一个定时任务:

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

    @Override
    public void run() {
        try {
            MQClientInstance.this.persistAllConsumerOffset();
        } catch (Exception e) {
            log.error("ScheduledTask persistAllConsumerOffset exception", e);
        }
    }
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
复制代码

每隔5s将offset持久化到broker

3.5 broker 处理offset

我们在简单看下broker端如何处理offset

3.5.1 首先要接收客户端发送的offset

public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
    
    //TODO:.....省略其他code.....

    @Override
    public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
        throws RemotingCommandException {
        switch (request.getCode()) {
            case RequestCode.GET_CONSUMER_LIST_BY_GROUP:
                return this.getConsumerListByGroup(ctx, request);
            //TODO:更新offset    
            case RequestCode.UPDATE_CONSUMER_OFFSET:
                return this.updateConsumerOffset(ctx, request);
            case RequestCode.QUERY_CONSUMER_OFFSET:
                return this.queryConsumerOffset(ctx, request);
            default:
                break;
        }
        return null;
    }
    
    
    //TODO:....省略其他代码
}    
复制代码

最终代码:

public class ConsumerOffsetManager extends ConfigManager {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
    private static final String TOPIC_GROUP_SEPARATOR = "@";


    //TODO: key = topic@group
    //TODO: value = Map[key = queueId, value = offset]
    private ConcurrentMap> offsetTable =
        new ConcurrentHashMap>(512);

    //TODO:....省略诸多code.....

    private void commitOffset(final String clientHost, final String key, final int queueId, final long offset) {
        ConcurrentMap map = this.offsetTable.get(key);
        if (null == map) {
            map = new ConcurrentHashMap(32);
            map.put(queueId, offset);
            this.offsetTable.put(key, map);
        } else {
            Long storeOffset = map.put(queueId, offset);
            if (storeOffset != null && offset < storeOffset) {
                log.warn("[NOTIFYME]update consumer offset less than store. clientHost={}, key={}, queueId={}, requestOffset={}, storeOffset={}", clientHost, key, queueId, offset, storeOffset);
            }
        }
     }
     
     //TODO:......
}
复制代码

其实也很简单,就是将客户端发送的offset更新到broker的offset表中,然后再通过broker的定时任务持久化到文件中。那么我们在简单看下broker的持久化:

3.5.2 broker的持久化

broker启动时,也会启动持久化的定时任务

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        try {
            BrokerController.this.consumerOffsetManager.persist();
        } catch (Throwable e) {
            log.error("schedule persist consumerOffset error.", e);
        }
    }
}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
复制代码

通过定时任务将上面ConsumerOffsetManager类中的offset表offsetTable写到文件中。至此,就完成了offset的更新和持久化。

4.总结

本文从源码的角度分析了RocketMQ的消费偏移量offset是如何被使用的,简单总结下持久化过程:

  1. 消费者客户端本地维护一个offset表,消费者消费完成后先更新到本地offset表中
  2. 消费者客户端启动时会开启一个定时任务,将本地的offset表发送到broker
  3. broker会接收消费者客户端发送的offset数据,并保存到broker的本地offset表中
  4. broker启动时也会开启定时任务,用于将broker的本地offset表中的数据持久化到文件中

5. 最小位点提交机制

所谓的消息消费不丢失消息,就是存储在Broker中的消息,至少要能被成功消费一次

RocketMQ在消息消费时采用了ACK机制,即消息客户端从Broker拉取消息到消费端,只有消息消费端成功将消息消费,才会发送ACK到Broker,broker才会认为该消息消费成功,保证消息不丢失。而且消息在消费时,是采取最小位点提交机制,说明如下:

举例说明:拉取线程从broker拉取了8条消息,到线程池中消费,其中 thread-1线程在消费msg1,thread-2在消费消息msg2,thread3消费消息msg3,此时如何thred3先消费完msg3,但thread1,thread2还未处理完msg1、msg2,那thread-1是向Broker反馈msg3的偏移量?

这个时候为了保证消息不丢失,尽管thread3先将msg3消费完成,但处理队列中的msg1还没有消费成功,此时thread3向broker提交位点时是将msg1的偏移量汇报,当msg1,msg2消费完成时,上报的位点就是msg4了,这样保证消息不丢失。

对应的代码如下: org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#processConsumeResult:

接下来org.apache.rocketmq.client.impl.consumer.ProcessQueue#removeMessage:

但上述机制会带来消息重复消费,例如当thred3将msg3消费完成,然后向服务端汇报进度为msg1,然后客户端重启,将重新从msg1开始消费,msg3就被重复拉取,重复处理,所谓消费端需要实现幂等。

展开阅读全文

页面更新:2024-05-01

标签:队列   初始化   持久   进度   源码   客户端   消费者   消息   模式   代码   文件

1 2 3 4 5

上滑加载更多 ↓
推荐阅读:
友情链接:
更多:

本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828  

© CopyRight 2008-2024 All Rights Reserved. Powered By bs178.com 闽ICP备11008920号-3
闽公网安备35020302034844号

Top