RocketMQ 早已提供了一组最佳实践,但工作在一线的伙伴却很少知道,项目中的各种随性代码经常导致消息错乱问题,严重影响业务的准确性。为了保障最佳实践的落地,降低一线伙伴的使用成本,统一 MQ 使用规范,需要对其进行抽象和封装…
RocketMQ的最佳实践中推荐:一个应用尽可能用一个Topic,消息子类型用tags来标识,tags可以由应用自由设置。
在使用rocketMQTemplate发送消息时,通过设置发送方法的destination参数来设置消息的目的地,destination的格式为topicName:tagName,:前面表示topic的名称,后面表示tags名称,简单示例如下:
// 计算 destination
protected String createDestination(String topic, String tag) {
if (org.apache.commons.lang3.StringUtils.isNotEmpty(tag)){
return topic + ":" + tag;
}else {
return topic;
}
}
// 发送信息
String destination = createDestination(topic, tag);
SendResult sendResult = this.rocketMQTemplate.syncSendOrderly(destination, msg, shardingKey, 2000);
tags从命名来看像是一个复数,但发送消息时,目的地只能指定一个topic下的一个tag,不能指定多个。
但,在消费消息时,就变的没那么方便了,简单示例如下:
@Service
@RocketMQMessageListener(
topic = "consumer-test-topic-1",
consumerGroup ="user-message-consumer-1",
selectorExpression = "*",
consumeMode = ConsumeMode.ORDERLY
)
@Slf4j
public class RocketBasedUserMessageConsumer extends UserMessageConsumer
implements RocketMQListener {
@Override
public void onMessage(MessageExt message) {
String tag = message.getTags();
byte[] body = message.getBody();
log.info("handle msg body {}", new String(body));
switch (tag){
case "UserCreatedEvent":
UserEvents.UserCreatedEvent createdEvent = JSON.parseObject(body, UserEvents.UserCreatedEvent.class);
handle(createdEvent);
return;
case "UserEnableEvent":
UserEvents.UserEnableEvent enableEvent = JSON.parseObject(body, UserEvents.UserEnableEvent.class);
handle(enableEvent);
return;
case "UserDisableEvent":
UserEvents.UserDisableEvent disableEvent = JSON.parseObject(body, UserEvents.UserDisableEvent.class);
handle(disableEvent);
return;
case "UserDeletedEvent":
UserEvents.UserDeletedEvent deletedEvent = JSON.parseObject(body, UserEvents.UserDeletedEvent.class);
handle(deletedEvent);
return;
}
}
}
该方法有几个问题:
提供一种面向业务场景的,灵活进行业务扩展的模式,具有以下特征:
框架依赖 rocketmq-spring-boot-starter 完成消息发送和回收。
首先,增加 rocketmq 相关依赖。
org.apache.rocketmq
rocketmq-spring-boot-starter
2.2.1
然后,增加 lego starter。
com.geekhalo.lego
lego-starter
0.1.13-tag_based_dispatcher_message_consumer-SNAPSHOT
在 application.yml 文件中增加 rocketmq 配置。
rocketmq:
name-server: http://127.0.0.1:9876
producer:
group: rocket-demo
定义消费者,只需:
示例如下:
@TagBasedDispatcherMessageConsumer(
topic = "consumer-test-topic",
consumer = "user-message-consumer"
)
public class UserMessageConsumer {
private final Map> events = Maps.newHashMap();
public void clean(){
this.events.clear();;
}
public List getUserEvents(Long userId){
return this.events.get(userId);
}
@HandleTag("UserCreatedEvent")
public void handle(UserEvents.UserCreatedEvent userCreatedEvent){
List userEvents = this.events.computeIfAbsent(userCreatedEvent.getUserId(), userId -> new ArrayList<>());
userEvents.add(userCreatedEvent);
}
@HandleTag("UserEnableEvent")
public void handle(UserEvents.UserEnableEvent userEnableEvent){
List userEvents = this.events.computeIfAbsent(userEnableEvent.getUserId(), userId -> new ArrayList<>());
userEvents.add(userEnableEvent);
}
@HandleTag("UserDisableEvent")
public void handle(UserEvents.UserDisableEvent userDisableEvent){
List userEvents = this.events.computeIfAbsent(userDisableEvent.getUserId(), userId -> new ArrayList<>());
userEvents.add(userDisableEvent);
}
@HandleTag("UserDeletedEvent")
public void handle(UserEvents.UserDeletedEvent userDeletedEvent){
List userEvents = this.events.computeIfAbsent(userDeletedEvent.getUserId(), userId -> new ArrayList<>());
userEvents.add(userDeletedEvent);
}
}
编写测试用例如下:
@SpringBootTest(classes = DemoApplication.class)
@Slf4j
class UserMessageConsumerTest {
@Autowired
private UserMessageConsumer userMessageConsumer;
@Autowired
private RocketMQTemplate rocketMQTemplate;
private List userIds;
@BeforeEach
void setUp() throws InterruptedException {
this.userMessageConsumer.clean();
this.userIds = new ArrayList<>();
for (int i = 0; i< 100; i++){
userIds.add(10000L + i);
}
this.userIds.forEach(userId -> sendMessage(userId));
TimeUnit.SECONDS.sleep(3);
}
private void sendMessage(Long userId) {
String topic = "consumer-test-topic";
{
String tag = "UserCreatedEvent";
UserEvents.UserCreatedEvent userCreatedEvent = new UserEvents.UserCreatedEvent();
userCreatedEvent.setUserId(userId);
userCreatedEvent.setUserName("Name-" + userId);
sendOrderlyMessage(topic, tag, userCreatedEvent);
}
{
String tag = "UserEnableEvent";
UserEvents.UserEnableEvent userEnableEvent = new UserEvents.UserEnableEvent();
userEnableEvent.setUserId(userId);
userEnableEvent.setUserName("Name-" + userId);
sendOrderlyMessage(topic, tag, userEnableEvent);
}
{
String tag = "UserDisableEvent";
UserEvents.UserDisableEvent userDisableEvent = new UserEvents.UserDisableEvent();
userDisableEvent.setUserId(userId);
userDisableEvent.setUserName("Name-" + userId);
sendOrderlyMessage(topic, tag, userDisableEvent);
}
{
String tag = "UserDeletedEvent";
UserEvents.UserDeletedEvent userDeletedEvent = new UserEvents.UserDeletedEvent();
userDeletedEvent.setUserId(userId);
userDeletedEvent.setUserName("Name-" + userId);
sendOrderlyMessage(topic, tag, userDeletedEvent);
}
}
private void sendOrderlyMessage(String topic, String tag, UserEvents.UserEvent event) {
String shardingKey = String.valueOf(event.getUserId());
String json = JSON.toJSONString(event);
Message msg = MessageBuilder
.withPayload(json)
.build();
String destination = createDestination(topic, tag);
SendResult sendResult = this.rocketMQTemplate.syncSendOrderly(destination, msg, shardingKey, 2000);
log.info("Send result is {} for msg", sendResult, msg);
}
protected String createDestination(String topic, String tag) {
if (org.apache.commons.lang3.StringUtils.isNotEmpty(tag)){
return topic + ":" + tag;
}else {
return topic;
}
}
@AfterEach
void tearDown() {
}
@Test
void getUserEvents() {
this.userIds.forEach(userId ->{
List userEvents = this.userMessageConsumer.getUserEvents(userId);
Assertions.assertEquals(4, userEvents.size());
Assertions.assertTrue(userEvents.get(0) instanceof UserEvents.UserCreatedEvent);
Assertions.assertTrue(userEvents.get(1) instanceof UserEvents.UserEnableEvent);
Assertions.assertTrue(userEvents.get(2) instanceof UserEvents.UserDisableEvent);
Assertions.assertTrue(userEvents.get(3) instanceof UserEvents.UserDeletedEvent);
});
}
}
启动时,可以看到如下日志:
TagBasedDispatcherConsumerContainer : success to subscribe http://127.0.0.1:9876, topic consumer-test-topic, tag UserCreatedEvent||UserEnableEvent||UserDeletedEvent||UserDisableEvent, group user-message-consumer
从日志上可以看出,框架以组 group user-message-consumer 创建 Consumer,并订阅 consumer-test-topic 的 UserCreatedEvent||UserEnableEvent||UserDeletedEvent||UserDisableEvent 等 Tag,初始化流程符合预期。
测试逻辑比较简单,逻辑如下:
UserMessageConsumerTest : Send result is SendResult [sendStatus=SEND_OK, msgId=2408820718EADE005827F0B9E9D4D6D9B98158644D467D38DE4900FD, offsetMsgId=C0A8010A00002A9F00000000056077FB, messageQueue=MessageQueue [topic=consumer-test-topic, brokerName=bogon, queueId=2], queueOffset=1121] for msg
TagBasedDispatcherConsumerContainer : consume 2408820718EADE005827F0B9E9D4D6D9B98158644D467D38DE4700FC cost: 0 ms
用例通过,运行结果符合预期。
image
框架初始化流程如下:
image
运行流程如下:
项目仓库地址:https://gitee.com/litao851025/lego
项目文档地址:https://gitee.com/litao851025/lego/wikis/support/TagBasedDispatcherMessageConsumer
页面更新:2024-05-01
本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828
© CopyRight 2008-2024 All Rights Reserved. Powered By bs178.com 闽ICP备11008920号-3
闽公网安备35020302034844号