我们先看一幅图
消费偏移量offset就是记录消费者的消费进度的。也是rocketmq保证消息不会重复消费的核心(当然,极端情况下还是可能会导致重复消费)。
consumequeue中一个消息的索引单元就是一个offset值。
在分析rocketmq的消费者是如何利用这个offset完成消息消费的之前,我们先看下broker端是如何管理这些offset值的。
这里的服务端就是broker
public class ConsumerOffsetManager extends ConfigManager {
//TODO: key = topic@group
//TODO: value = Map[key = queueId, value = offset]
private ConcurrentMap> offsetTable =
new ConcurrentHashMap>(512);
//TOOD:...other
}
复制代码
offset文件默认路径:$user.home/store/config/consumerOffset.json 文件内容例子:
{
"offsetTable":{
//topic@consumerGroupName
"batchTopic@test_cg_batch":{0:0,1:0,2:0,3:1
},
//key是queueid,value是offset值,就是我们今天讨论的主角
"ordered_topic@CG":{0:0,1:15,2:0,3:35
},
"qiuguan_topic@qiuguan_group_1":{0:2533,1:2534,2:2531,3:2531
},
"hacker_topic@fuyuanhui_group_2":{0:64035,1:64034,2:64034,3:64034
},
"qiuguan_topic_2@qiuguan_group":{0:2,1:1,2:7,3:6
},
"qiuguan_topic@qiuguan_group":{0:2533,1:2534,2:2531,3:2531
}
}
}
复制代码
稍后我们分析消费者时还会看到
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.consumerOffsetManager.persist();
} catch (Throwable e) {
log.error("schedule persist consumerOffset error.", e);
}
}
}, 1000 * 10, 1000 * 5, TimeUnit.MILLISECONDS);
复制代码
那么服务端对于offset值的管理大致就这些,那么我们来看下消费者是如何利用offset来进行消息消费的。
总的来说就是,消费者定时的将消费过的offset值上传到broker的内存offsetTable中,然后通过定时任务将其刷盘到文件中。
那么接下来就看看消费者是如何使用这个offset值的。
它会启动一个消息拉取服务PullMessageService对象,还有一个是在拉取消息之前要完成的重平衡RebalanceService对象。offset初始化就和重平衡息息相关,那么我们就看下重平衡是如何完成offset初始化的。
我们这里还是只讨论集群消费模式。它和广播模式的区别就是,广播模式每个消费者都要消费topic下的所有队列,集群模式通过分配算法(默认是平均)来将topic下的所有队列分配给消费者。既然这里我们主要讨论的是offset,那么就以集群模式进行分析即可。
这里我们就只看和offset初始化相关的部分
RebalanceService#run()一步一步来到初始化offset的地方
private boolean updateProcessQueueTableInRebalance(final String topic, final Set mqSet,
final boolean isOrder) {
boolean changed = false;
...
/**
* 遍历本次负载分配到的队列集合,如果
* processQueueTable中没有包含该消息队列,表明这是本次新增加的消
* 息队列,首先从内存中移除该消息队列的消费进度,然后从磁盘中读
* 取该消息队列的消费进度,创建PullRequest对象。这里有一个关键,
* 如果读取到的消费进度小于0,则需要校对消费进度。RocketMQ提供了
* CONSUME_FROM_LAST_OFFSET、CONSUME_FROM_FIRST_OFFSET、
* CONSUME_FROM_TIMESTAMP方式,在创建消费者时可以通过调用
* DefaultMQPushConsumer#setConsumeFromWhere方法进行设置
*/
List pullRequestList = new ArrayList();
for (MessageQueue mq : mqSet) {
if (!this.processQueueTable.containsKey(mq)) {
/**
* 经过消息队列重新负载(分配)后,分配到新的消息队列时,首
* 先需要尝试向Broker发起锁定该消息队列的请求,如果返回加锁成
* 功,则创建该消息队列的拉取任务,否则跳过,等待其他消费者释放
* 该消息队列的锁,然后在下一次队列重新负载时再尝试加锁
*/
// 顺序消息
if (isOrder && !this.lock(mq)) {
log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
continue;
}
this.removeDirtyOffset(mq);
ProcessQueue pq = new ProcessQueue();
// todo PullRequest的nextOffset计算逻辑位于RebalancePushImpl#computePullFromWhere
long nextOffset = this.computePullFromWhere(mq);
if (nextOffset >= 0) {
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
if (pre != null) {
log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
} else {
log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(nextOffset);
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(pq);
pullRequestList.add(pullRequest);
changed = true;
}
} else {
log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
}
}
}
// todo 将PullRequest加入PullMessageService,以便唤醒PullMessageService线程
this.dispatchPullRequest(pullRequestList);
return changed;
}
复制代码
初始化 offset值的地方就是 computePullFromWhere(mq)方法,那么点进去看下它的内部逻辑: org.apache.rocketmq.client.impl.consumer.RebalancePushImpl#computePullFromWhere:
public long computePullFromWhere(MessageQueue mq) {
long result = -1;
final ConsumeFromWhere consumeFromWhere = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere();
// 获取 offsetStore
final OffsetStore offsetStore = this.defaultMQPushConsumerImpl.getOffsetStore();
switch (consumeFromWhere) {
case CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST:
case CONSUME_FROM_MIN_OFFSET:
case CONSUME_FROM_MAX_OFFSET:
// 从队列最新偏移量开始消费
case CONSUME_FROM_LAST_OFFSET: {
/**
* 从远程中读取消息队列的消费进度,如果大于0则直接返回,如果等
* 于-1,在CONSUME_FROM_LAST_OFFSET模式下获取该消息队列当前最大
* 的偏移量,如果小于-1,表示该消息进度文件中存储了错误的偏移
* 量,则返回-1
*/
// todo 读取操作
long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
if (lastOffset >= 0) {
result = lastOffset;
}
// First start,no offset
else if (-1 == lastOffset) {
if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
result = 0L;
} else {
try {
result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
} catch (MQClientException e) {
result = -1;
}
}
} else {
result = -1;
}
break;
}
// 从头开始消费
case CONSUME_FROM_FIRST_OFFSET: {
/**
* 从磁盘中读取消息队列的消费进度,如果大于0则直接返回,如果
* 等于-1,在CONSUME_FROM_FIRST_OFFSET模式下直接返回0,从头开始
* 消费,如果小于-1,表示该消息进度文件中存储了错误的偏移量,则
* 返回-1
*/
long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
if (lastOffset >= 0) {
result = lastOffset;
} else if (-1 == lastOffset) {
result = 0L;
} else {
result = -1;
}
break;
}
// 从消费者启动时间戳对应消费进度开始消费
case CONSUME_FROM_TIMESTAMP: {
/**
* 从磁盘中读取消息队列的消费进度,如果大于0则直接返回。如果
* 等于-1,在CONSUME_FROM_TIMESTAMP模式下会尝试将消息存储时间戳
* 更新为消费者启动的时间戳,如果能找到则返回找到的偏移量,否则
* 返回0。如果小于-1,表示该消息进度文件中存储了错误的偏移量,则
* 返回-1
*/
long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
if (lastOffset >= 0) {
result = lastOffset;
} else if (-1 == lastOffset) {
if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
try {
result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
} catch (MQClientException e) {
result = -1;
}
} else {
try {
long timestamp = UtilAll.parseDate(this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeTimestamp(),
UtilAll.YYYYMMDDHHMMSS).getTime();
result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);
} catch (MQClientException e) {
result = -1;
}
}
} else {
result = -1;
}
break;
}
default:
break;
}
return result;
}
复制代码
总之就是无论哪种配置策略都要先从broker读取offset,如果“没有”,则按照客户端的规则进行offset的初始化。
所以这里就是确定好初始offset. 确定好offset后,就可以构建PullRequest了,我们继续看前面的代码:
// todo PullRequest的nextOffset计算逻辑位于RebalancePushImpl#computePullFromWhere
long nextOffset = this.computePullFromWhere(mq);
if (nextOffset >= 0) {
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
if (pre != null) {
log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
} else {
log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(nextOffset);
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(pq);
pullRequestList.add(pullRequest);
changed = true;
}
}
复制代码
这里就比较熟悉了,前面分析消费流程的时候我们也看到过,接下来就是将填充好MessageQueue(主要就是queueid), ProcessQueue(消费者本地缓存消息的队列),nextOffset(消费偏移量)的 PullRequest 对象放入拉取消息的类PullMessageService 中的pullRequestQueue队列属性中,这样PullMessageService 就可以去broker拉取消息了。
拉取消息的过程我们还只是关注消费偏移量(nextOffset)的变化
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
//TODO:从队列中取出 PullRequest
PullRequest pullRequest = this.pullRequestQueue.take();
//TODO:拉取消息
this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}
log.info(this.getServiceName() + " service end");
}
复制代码
那么在拉取消息的过程中,offset是如何工作的呢?我们用一幅图来展示:
那么我们在从源码中简单看下拉取过程:
DefaultMQPushConsumerImpl#pullMessage(PullRequest pullRequest)
public void pullMessage(final PullRequest pullRequest) {
//TODO:....省略诸多流控代码.......
//TODO:消息从broker拉取成功后的本地回调处理
//TODO:PullResult 包含了从broker读取的消息,以及新的offset
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
subscriptionData);
switch (pullResult.getPullStatus()) {
case FOUND:
long prevRequestOffset = pullRequest.getNextOffset();
//TODO:重点关注
//TODO:将broker返回的新的offset值重新设置给PullRequest的 nextOffset
//TODO:下次就用新的offset去broker读取消息
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
long pullRT = System.currentTimeMillis() - beginTimestamp;
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullRT);
long firstMsgOffset = Long.MAX_VALUE;
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
} else {
firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
} else {
//TODO: 重点关注
//TODO:再次将PullRequest放入队列中
//TODO:还是原来的PullRequest对象,只不过offset更新了
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
}
//TODO:...省略部分代码......
break;
case NO_NEW_MSG:
case NO_MATCHED_MSG:
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
break;
default:
break;
}
}
}
@Override
public void onException(Throwable e) {
if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("execute the pull request exception", e);
}
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
}
};
//TODO:...省略部分代码。。。。。
//TODO:去broker拉取消息
try {
this.pullAPIWrapper.pullKernelImpl(
pullRequest.getMessageQueue(),
subExpression,
subscriptionData.getExpressionType(),
subscriptionData.getSubVersion(),
//TODO:从PullReqeust中获取offset值给broker
pullRequest.getNextOffset(),
this.defaultMQPushConsumer.getPullBatchSize(),
sysFlag,
commitOffsetValue,
BROKER_SUSPEND_MAX_TIME_MILLIS,
CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
CommunicationMode.ASYNC,
//TODO:上面构建的回调对象,当从broker拉取到消息后交给回调处理
pullCallback
);
} catch (Exception e) {
log.error("pullKernelImpl exception", e);
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
}
}
复制代码
复制代码
通过分析源码,我们也大概知道了在拉取消息过程中offset是如何变化的。那么接下来我们在看下offset是如何完成持久化的。
在消费者消费完消息后,就需要更新offset并完成持久化,我们我们就看下消费完成后处理消费结果的代码:
ConsumeMessageConcurrentlyService#processConsumeResult(...)
public void processConsumeResult(
final ConsumeConcurrentlyStatus status,
final ConsumeConcurrentlyContext context,
final ConsumeRequest consumeRequest
) {
//获取ackIndex的值,Integer.MAX_VALUE
int ackIndex = context.getAckIndex();
if (consumeRequest.getMsgs().isEmpty())
return;
/**
* 根据消息监听器返回的结果计算ackIndex,如果返回
* CONSUME_SUCCESS,则将ackIndex设置为msgs.size()-1,如果返回
* RECONSUME_LATER,则将ackIndex设置为-1,这是为下文发送msg
* back(ACK)消息做的准备
*/
switch (status) {
case CONSUME_SUCCESS:
if (ackIndex >= consumeRequest.getMsgs().size()) {
ackIndex = consumeRequest.getMsgs().size() - 1;
}
int ok = ackIndex + 1;
int failed = consumeRequest.getMsgs().size() - ok;
this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
break;
case RECONSUME_LATER:
//如果消费失败,则将ackIndex置为-1
ackIndex = -1;
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
consumeRequest.getMsgs().size());
break;
default:
break;
}
/**
* 如果是广播模式,业务方会返回RECONSUME_LATER,消息
* 并不会被重新消费,而是以警告级别输出到日志文件中。如果是集群
* 模式,消息消费成功,因为ackIndex=
* consumeRequest.getMsgs().size()-1,所以i=ackIndex+1等于
* consumeRequest.getMsgs().size(),并不会执行sendMessageBack。
* 只有在业务方返回RECONSUME_LATER时,该批消息都需要发送ACK消
* 息,如果消息发送失败,则直接将本批ACK消费发送失败的消息再次封
* 装为ConsumeRequest,然后延迟5s重新消费。如果ACK消息发送成功,
* 则该消息会延迟消费
*/
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
}
break;
case CLUSTERING:
List msgBackFailed = new ArrayList(consumeRequest.getMsgs().size());
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
// todo 发回到broker
boolean result = this.sendMessageBack(msg, context);
if (!result) {
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
msgBackFailed.add(msg);
}
}
if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);
this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
}
break;
default:
break;
}
/**
* 从ProcessQueue中移除这批消息,这里返回的偏移量
* 是移除该批消息后最小的偏移量。然后用该偏移量更新消息消费进
* 度,以便消费者重启后能从上一次的消费进度开始消费,避免消息重
* 复消费。值得注意的是,当消息监听器返回RECONSUME_LATER时,消息
* 消费进度也会向前推进,并用ProcessQueue中最小的队列偏移量调用
* 消息消费进度存储器OffsetStore更新消费进度。这是因为当返回
* RECONSUME_LATER时,RocketMQ会创建一条与原消息属性相同的消息,
* 拥有一个唯一的新msgId,并存储原消息ID,该消息会存入CommitLog
* 文件,与原消息没有任何关联,所以该消息也会进入ConsuemeQueue,
* 并拥有一个全新的队列偏移量
*/
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
// todo 更新偏移量
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}
}
复制代码
我们继续看下updateOffset的逻辑:
这里有两个实现类
那么我们就看下RemoteBrokerOffsetStore的updateOffset的逻辑:
public class RemoteBrokerOffsetStore implements OffsetStore {
//TOOD:....省略部分代码.......
@Override
public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) {
if (mq != null) {
AtomicLong offsetOld = this.offsetTable.get(mq);
if (null == offsetOld) {
offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));
}
if (null != offsetOld) {
if (increaseOnly) {
MixAll.compareAndIncreaseOnly(offsetOld, offset);
} else {
offsetOld.set(offset);
}
}
}
}
}
复制代码
其实很简单,就是将消费过的消息的offset(递增)更新到消费者本地offset变量表中。然后通过定时任务持久化到broker中。
接下来在简单看下消费者客户端持久化offset到broker. 在消费者启动时,会启动一个定时任务:
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.persistAllConsumerOffset();
} catch (Exception e) {
log.error("ScheduledTask persistAllConsumerOffset exception", e);
}
}
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
复制代码
每隔5s将offset持久化到broker
我们在简单看下broker端如何处理offset
public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
//TODO:.....省略其他code.....
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
switch (request.getCode()) {
case RequestCode.GET_CONSUMER_LIST_BY_GROUP:
return this.getConsumerListByGroup(ctx, request);
//TODO:更新offset
case RequestCode.UPDATE_CONSUMER_OFFSET:
return this.updateConsumerOffset(ctx, request);
case RequestCode.QUERY_CONSUMER_OFFSET:
return this.queryConsumerOffset(ctx, request);
default:
break;
}
return null;
}
//TODO:....省略其他代码
}
复制代码
最终代码:
public class ConsumerOffsetManager extends ConfigManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final String TOPIC_GROUP_SEPARATOR = "@";
//TODO: key = topic@group
//TODO: value = Map[key = queueId, value = offset]
private ConcurrentMap> offsetTable =
new ConcurrentHashMap>(512);
//TODO:....省略诸多code.....
private void commitOffset(final String clientHost, final String key, final int queueId, final long offset) {
ConcurrentMap map = this.offsetTable.get(key);
if (null == map) {
map = new ConcurrentHashMap(32);
map.put(queueId, offset);
this.offsetTable.put(key, map);
} else {
Long storeOffset = map.put(queueId, offset);
if (storeOffset != null && offset < storeOffset) {
log.warn("[NOTIFYME]update consumer offset less than store. clientHost={}, key={}, queueId={}, requestOffset={}, storeOffset={}", clientHost, key, queueId, offset, storeOffset);
}
}
}
//TODO:......
}
复制代码
其实也很简单,就是将客户端发送的offset更新到broker的offset表中,然后再通过broker的定时任务持久化到文件中。那么我们在简单看下broker的持久化:
broker启动时,也会启动持久化的定时任务
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.consumerOffsetManager.persist();
} catch (Throwable e) {
log.error("schedule persist consumerOffset error.", e);
}
}
}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
复制代码
通过定时任务将上面ConsumerOffsetManager类中的offset表offsetTable写到文件中。至此,就完成了offset的更新和持久化。
本文从源码的角度分析了RocketMQ的消费偏移量offset是如何被使用的,简单总结下持久化过程:
所谓的消息消费不丢失消息,就是存储在Broker中的消息,至少要能被成功消费一次。
RocketMQ在消息消费时采用了ACK机制,即消息客户端从Broker拉取消息到消费端,只有消息消费端成功将消息消费,才会发送ACK到Broker,broker才会认为该消息消费成功,保证消息不丢失。而且消息在消费时,是采取最小位点提交机制,说明如下:
举例说明:拉取线程从broker拉取了8条消息,到线程池中消费,其中 thread-1线程在消费msg1,thread-2在消费消息msg2,thread3消费消息msg3,此时如何thred3先消费完msg3,但thread1,thread2还未处理完msg1、msg2,那thread-1是向Broker反馈msg3的偏移量?
这个时候为了保证消息不丢失,尽管thread3先将msg3消费完成,但处理队列中的msg1还没有消费成功,此时thread3向broker提交位点时是将msg1的偏移量汇报,当msg1,msg2消费完成时,上报的位点就是msg4了,这样保证消息不丢失。
对应的代码如下: org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#processConsumeResult:
接下来org.apache.rocketmq.client.impl.consumer.ProcessQueue#removeMessage:
但上述机制会带来消息重复消费,例如当thred3将msg3消费完成,然后向服务端汇报进度为msg1,然后客户端重启,将重新从msg1开始消费,msg3就被重复拉取,重复处理,所谓消费端需要实现幂等。
页面更新:2024-05-01
本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828
© CopyRight 2008-2024 All Rights Reserved. Powered By bs178.com 闽ICP备11008920号-3
闽公网安备35020302034844号