本章基于rocketmq4.6.0分析nameserver的实现。
nameserver的基础概念不再赘述,可以参考官网。
本章将分析以下内容:
后续分析不会再看nameserver侧,有个概念即可。
nameserver侧比较简单,一共不超过10个类。
NamesrvStartup:启动类,读取配置,构造NamesrvController启动;
NamesrvController:管理nameserver所有组件,比如:通讯server、kv配置管理、路由信息管理等;
KVConfigManager:kv配置管理,kv配置的增删改查;
RouteInfoManager:路由信息管理,路由信息的增删改查;
BrokerHousekeepingService:当通讯层长连接发生变更,调用RouteInfoManager处理;
DefaultRequestProcessor:业务请求处理器,处理所有客户端的请求,包括broker和client;
所有组件的通讯层都会使用rocketmq-remoting模块,无论客户端还是服务端,无论是producer还是consumer还是broker还是nameserver。
这里简单分析一下服务端的实现,即NettyRemotingServer。
绕过底层bytebuffer的读写,业务上都是RemotingCommand模型,无论请求报文还是响应报文。
NettyRemotingServer#start:
Netty线程分为三组
无论客户端还是服务端,当netty的worker线程处理完编解码后,区分请求还是响应,走不同逻辑。
NettyRemotingAbstract#processMessageReceived:
NettyRemotingAbstract#processRequestCommand:
对于请求来说,根据请求业务编码,找对应NettyRequestProcessor处理器,及其业务线程池处理。
NettyRemotingAbstract.processorTable:
NettyRemotingAbstract#defaultRequestProcessor:
对于没有特殊配置的业务请求,使用默认的Processor处理。
NamesrvController#registerProcessor:nameserver就只有一个Processor。
nameserver的业务线程池大小默认也是8,通过serverWorkerThreads配置。
NettyRemotingAbstract#processResponseCommand:
对于响应来说,根据请求id找到future,
rpc都差不多,发送请求的时候把id和future全局存一下,收到响应根据id找future
如果future是注册了callback回调方法的,使用future的callback线程池执行回调,
否则netty的worker线程,将response写入future,唤醒阻塞等待响应的线程。
NettyRemotingServer#start:这里是服务端,但是客户端也类似,开启一个线程扫描超时请求。
NettyRemotingAbstract#responseTable:当前已经发出,但未收到相应的请求。
内存中维护一份Map,存储Namespace-Key-Value。
KVConfigManager#putKVConfig:写kv配置,先拿写锁写内存,后拿读锁写磁盘。
KVConfigManager#getKVConfig:拿读锁读配置。
默认kv配置会存储在user.home下的namesrv/kvConfig.json中。
json复制代码{"configTable":{"namespace":{"key":"value"}}}
目前只有一个地方在用这个kv配置,针对生产者,场景是顺序消息。
假设有两个broker,每个broker有8个queue。
对于生产者发现路由来说,当一个broker下线后,原来是16个队列hash,现在变成8个队列hash,那么有可能乱序。
通过在nameserver侧写死路由表,可以保证不发生上面的情况,问题在于消息投递到下线的broker会报错。
比如updateTopic命令,设置TopicA为顺序topic。
arduino复制代码mqadmin updateTopic -n localhost:9876 -b 169.254.246.163:10911 -t topicA -r 8 -w 8 -o true
则会在namespace=ORDER_TOPIC_CONFIG,新增kv配置,key=topic名,value=broker名1:队列数量1;broker名2:队列数量2。见UpdateTopicSubCommand#execute。
MQClientInstance#topicRouteData2TopicPublishInfo:
生产者获取路由表时,如果topic在nameserver被配置为顺序消息topic,将采用静态的路由表给生产者。
但是,默认情况下nameserver并没有开启这个功能。
RouteInfoManager基于内存存储路由信息,一共5个table。
DefaultRequestProcessor#registerBrokerWithFilterServer:
broker注册请求包含broker配置信息和topic配置信息;
broker响应请求包含master地址和所有topic的静态路由信息(kv配置);
RouteInfoManager#registerBroker:先获取写锁,处理内存5个table
step1,clusterAddrTable,注册cluster到brokerName的映射关系。
step2,brokerAddrTable,注册brokerName到BrokerData的关系。
BrokerData包含同名broker的拓扑关系。
step3,topicQueueTable,注册topic到QueueData(队列)的映射关系,其中QueueData包含brokerName。
Step4,brokerLiveTable,注册broker地址到存活broker信息的映射关系。
Step5,filterServerTable,注册broker地址到filterServer的映射关系。sql92过滤相关。
Step6,如果注册broker非master,从brokerAddrTable获取masterBroker地址,从brokerLiveTable获取masterBroker的ha地址,返回broker。
broker注册分为两大类。
一个类是broker启动或故障恢复之后注册。
举个例子,BrokerController#start:
启动后注册,定时注册,默认30s一次。
另一类是topic变更触发broker注册。
举个例子,TopicConfigManager#createTopicInSendMessageMethod:
broker自动创建topic,触发broker注册,实际是为了刷新topic配置。
broker正常关闭,主动调用nameserver注销。
DefaultRequestProcessor#unregisterBroker:
RouteInfoManager#unregisterBroker:
注销broker也是先拿写锁,然后操作5个table。
需要注意的是,除了两个纯broker纬度的table,其他table的处理是有条件的。
BrokerHousekeepingService:
当通讯层连接关闭、异常、空闲,都会被动触发broker注销。
RouteInfoManager#onChannelDestroy:
对于RouteInfoManager来说,和主动注销的区别在于,
需要先获取读锁,通过通讯层的netty channel,找到存活brokerTable中的broker信息,执行上述5个table的remove操作。
NamesrvController#initialize:
nameserver后台每隔10秒会扫描非活跃broker执行注销。
RouteInfoManager#scanNotActiveBroker:
如果broker2分钟内未发送注册请求,同样会执行注销。
最常用的方法是根据topic查询路由TopicRouteData,无论生产消费都需要。
DefaultRequestProcessor#getRouteInfoByTopic:
TopicRouteData:包含topic下所有queue,以及broker信息。
RouteInfoManager#pickupTopicRouteData:从几个table中根据topic查路由。
结合broker注销,只要有存活的同名broker,都能拿到QueueData。
RocketMQ通讯协议是个私有协议,主要包含几部分:
Server端线程模型如下:
Client端线程模型如下(本文没提,见NettyRemotingClient):
nameserver提供动态kv配置能力,目前只有一个用途,针对顺序消费场景,生产者可以走nameserver提供的静态路由配置。但是这个能力默认nameserver是禁用的,orderMessageEnable=false。
broker注册和心跳,在nameserver侧维护了5张table。
对于producer和consumer来说,最重要的是其中两张table提供的路由数据,topicQueueTable和brokerAddrTable。
BrokerLiveInfo主要存储了broker的通讯层channel、心跳时间、ha地址等信息。
有三种方式导致broker从nameserver注销:broker主动注销、通讯层注销、nameserver定时检测心跳。
无论是通讯层空闲还是心跳检测,超时时间都是120秒。
BrokerLiveInfo移除后,只有所有同名broker下线,才会导致topicQueueTable和brokerAddrTable移除路由数据。
页面更新:2024-02-24
本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828
© CopyRight 2008-2024 All Rights Reserved. Powered By bs178.com 闽ICP备11008920号-3
闽公网安备35020302034844号