还在担心消息积压问题难解决?思路代码优化细节全公开

前言

之前就遇到的一个问题吧,发生过一两次了,每次都是重启等着慢慢消费,但这样肯定是不行的啦,还是得上点魔法。按照既往惯例,我都会在文章选题开始时,看看网上大家都是怎么写的,看了不少文章吧,感觉大家写的还是比较抽象。( ﹏ ),不过可能也是因为消息积压这一块吧,这个解决方案属实是太过简单,确实也没啥好说的,导致大家这个不怎么贴代码,老是讲思路,我也能够理解。但是作为好家伙棒小伙,我肯定得给大家端上一碗热腾腾的鸡汤,最近心情略差,压力过大,所以皮不动了。那么直接开始吧,鸡汤来咯!

事件背景

之前我不是自己维护了一套日志收集系统嘛,是Filebeat+Kafka+数据处理服务+Elasticsearch+Kibana+Skywalking这样的架构,其中数据处理服务是一个spring boot项目部署在公司的DevOps平台上。因为是我自己弄的,我就自己建了一个项目空间,专门放我写的组件的源码以及这个数据处理服务,平时我也是在上面自己部署的,因为稳定运行一年多嘛,我平时也不关注。

几个周前的时候,呃,不知道咋回事生产的那个节点挂了。周四晚上挂的,发的邮件通知我没看到(邮件太多就不看了),周五没人看日志,就这么到了周一下午,有同事过来问我为啥没有这几天的日志。我先登录Kibana看了下,确实少了这几天的日志,然后我就登录DevOps发现节点没了,上CMAK(Kafka监控)发现积压了八百多万数据,嚯嚯,小一千万,大的要来了。

宕机后的处理

首先告诉发现的小伙伴都别慌,小BUG小场面,先用DevOps上面自带的K8S日志输出看实时日志或者写命令看日志文件。

点击容器日志可查看实时输出的日志,如下图。我个人觉得这个其实也蛮方便的,但是要定位历史问题,或者要进行搜索、链路追踪啥的,就不如一套日志收集系统了。

因为我这个自己做的日志收集系统不是公司级的项目(架构组也在做,但是有技术问题,技术问题场景太多了,所以还没有完全推广),大概就我们大组几十个开发在使用,应用到了二十多个子系统上,目前日志量已经增长到了月度亿级。

因为我会在代码规范中明令禁止测试或者正式环境打印Sql,所以日志量总体还算是可控。再加上我之前在数据处理服务中写了定时,定期删除半年以上日志,因此磁盘表示它状态良好。万幸的是,暂时没有遇到太大问题,但是后续可能要考虑压缩历史日志归档的问题。

扯远了,说了一些正确的废话,成功让大家摸鱼十秒钟,哈哈。回到正题,我是马上重启了数据处理服务进行消费,由于原Topic只有3个分区,所以并发度是3。可能有的人会说为啥消费的时候不开多线程呢,开多线程批量插入ES就不受分区并发度限制了?对的,这是一个手段,但是我开启了手动提交offset,为了确保日志不丢失,所以这并发度没法提升了。

正式的环境不能乱动,但是测试的可以随便玩,那么这个消息积压的问题场景就由我收下了!解决思路如下,非常简单,但问题就在于细节非常多,网上的实践内容也相对偏少,所以我就想着通过文章的方式来复盘下我全部的操作。

  1. 修复现有consumer的问题,并将其停掉,再不停就真G了。
  2. 重新创建一个容量更大的Topic,比如patition是原来的N倍,大大大。
  3. 编写一个临时consumer程序,消费原来积压的队列(注意该consumer不做任何耗时的操作,仅作为中转将消息快速写入新创建的Topic里)。
  4. 将修复好的consumer部署到原来N倍的机器上消费新Topic。
  5. 消息积压解决后,恢复原有架构。

真实的操作场景复盘

新建一个中转Topic

这里无论是通过命令还是在可视化工具上创建,都可以啦,不管黑猫白猫,抓到耗子就是好猫。因为原来的Topic分区是3,这里可以选择提升10倍,改成30。但是我是选择了相对保守的12作为分区数,因为我这资源有限啦。

临时消费者,中转消息

这里我们就在原数据处理服务直接写一个监听加中转就OK了。

@Autowired
@Qualifier("performanceKafkaTemplate")
private KafkaTemplate kafkaTemplate;

@KafkaListener(topics = {"log-zero-#{systemProperties['env']}"}, containerFactory = "testFactory")
public void forwardTopic(List> records, Acknowledgment ack) {
    List logMsgList = new ArrayList<>(1024);
    records.forEach(record -> {
        ListenableFuture> sendListener =
                kafkaTemplate.send("log-zero-test", record.value());
        sendListener.addCallback(success -> {
        }, err -> {
            log.error("消息发送失败", err);
        });

    });
    ack.acknowledge();
}
复制代码

这里针对Kafka做了一个封装,用的是自己封装的高性能版生产者和测试版消费者。

消费新Topic

首先改写一下原有的消费逻辑,因为之前是按月动态索引,所以消费消息时会先从redis中获取是否存在当月索引,现在我可以直接写死这个索引,新增即可,消费的代码如下。

@KafkaListener(topics = {"log-zero-test"}, containerFactory = "testFactory")
public void testLogListen(List> records, Acknowledgment ack) {
    List logMsgList = new ArrayList<>(1024);
    records.forEach(record -> {
        String value = record.value();
        String[] split = value.split("|");
        String system = split[2];
        if (split.length > 13) {
            String env = split[3];
            LogInfoDTO info = new LogInfoDTO();
            info.setCreateTime(split[0]);
            info.setLogType(split[1]);
            info.setSystem(system);
            info.setEnv(env);
            info.setLocalIp(split[4]);
            info.setRequestIp(split[5]);
            info.setTriceId(split[6]);
            info.setRequestUrl(split[7]);
            info.setRequestType(split[8]);
            info.setRequestMethod(split[9]);
            info.setUserId("null".equalsIgnoreCase(split[10]) ? "" : split[10]);
            info.setUserName(split[11]);
            info.setThreadName(split[12]);
            info.setMsg(split[13]);
            logMsgList.add(new LogMsgDTO("test-log-zero-uat-re", JSON.toJSONString(info)));
        }
    });
    if (!CollectionUtils.isEmpty(logMsgList)) {
        esService.bulkAddRequest(logMsgList);
    }
    ack.acknowledge();
}
/**
 * 批量新增日志
 */
public void bulkAddRequest(List msgList) {
    BulkRequest request = new BulkRequest();
    //等待批量请求作为执行的超时设置为2分钟
    request.timeout(TimeValue.timeValueMinutes(3));
    msgList.forEach(msg -> {
        request.add(new IndexRequest(msg.getTopic()).source(msg.getMsg(), XContentType.JSON));
    });
    zeroClient.bulkAsync(request, RequestOptions.DEFAULT, new ActionListener() {
        @Override
        public void onResponse(BulkResponse bulkResponse) {
            for (BulkItemResponse bulkItemResponse : bulkResponse) {
                boolean failed = bulkItemResponse.isFailed();
                if (failed) {
                    log.error("批量插入消息失败,详细信息={}", bulkItemResponse.getFailureMessage());
                }
                DocWriteResponse itemResponse = bulkItemResponse.getResponse();
                switch (bulkItemResponse.getOpType()) {
                    case INDEX:
                    case CREATE:
                        IndexResponse indexResponse = (IndexResponse) itemResponse;
                        handleAddDocSuccess(indexResponse);
                        break;
                    case UPDATE:
                        UpdateResponse updateResponse = (UpdateResponse) itemResponse;
                        break;
                    case DELETE:
                        DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
                        break;
                    default:
                        break;
                }
            }
        }
        @Override
        public void onFailure(Exception e) {
            log.error("批量插入失败", e);
        }
    });
}
复制代码

消费消息这个场景做的很简单啊,就是转换日志信息,然后拼接ES批量插入的请求体,几乎没什么费事的动作。这里要注意一下,如果消息早已发送到Topic,并且选择了一个全新的消费者组,那么将auto.offset.reset改成earliest从头读起。

积压消息消费之后,切回原来的架构

因为我这里使用了DevOps平台,所以切换很简单,选之前的容器变更部署即可。

年轻人,听说你也懂优化

我是真的栓Q,大乌鱼事件发生,现在不懂优化是真的不行的辣!

生产者优化

众所周知,Kafka不懂优化也没有太大关系,因为通用版已经够好用了,但是作为卷王怎么能够接受通用,必然是狠狠地定制,定在墙上那种。这里简单说一下高性能版生产者参数的调整,顺道解释下调整意义。

batch.size和linger.ms是决定吞吐量和延时的重要参数,两个条件任一满足即可发送消息。batch.size过大会导致发送消息难以攒够batch.size大小导致消息发不出去,因此需要linger.ms保底,linger.ms时间一到也能发送消息。

以上便是高性能通用版生产者的重要参数调整,接下来针对生产速度做一个对比。在本地笔记本8核16G环境下(远低于服务器性能,仅作对比参考),并发消费3分区向12分区(副本数为1)新Topic生产消息,另一对比者reliableHighKafkaTemplate(仅开启ACK为-1,调大请求消息体),结果峰值如下。高性能版的生产者是要比普通的版本要高出不少的,如果放在服务器上这个速度还会暴增,因此生产消息的速度我们并不需要太在意,肯定是比消费更快的。

高性能版

高可靠版

6045.41 条/s

3925.80 条/s

消费者优化

既然使用了Spring-Kafka就要老老实实的接受人家的设定(这里是妄图通过Kafka-client写出消费者,但是被生命周期管理代码劝退,从而老老实实用Spring真香桶的败犬菜恐龙),根据Spring官方给的两条路,选择并发消费。相比写配置文件调参,我更喜欢通过bean加载,除了可以更自由的选择之外,还可以塞到组件里即插即用。接下来重点介绍下高性能模式下,如何调整消费者参数,以及如何定制Spring-Kafka的消费者工厂。

/**
 * @author WangZY
 * @date 2023/3/6 18:52
 * @description 测试用手动
 */
@Bean
public ConcurrentKafkaListenerContainerFactory testFactory() {
    ConcurrentKafkaListenerContainerFactory container =
            new ConcurrentKafkaListenerContainerFactory<>();
    Map props = new HashMap<>(16);
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, prop.getKafkaServers());
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, StringUtils.isEmpty(prop.getConsumerGroupId()) ?
            "testConsumerGroup" : prop.getConsumerGroupId());
    props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1048576);
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);
    props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 60000);

    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    container.setConsumerFactory(new DefaultKafkaConsumerFactory<>(props));
    container.setConcurrency(3);
    container.setBatchListener(true);
    container.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
    return container;
}
复制代码

这里Spring-Kafka的消费者工厂定义,除了调参之外,重要还有开启并发监听setBatchListener(true)。只有开启了这个参数,才能以public void testLogListen(List> records)这种入参为List的方式监听消费。同理这里getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL)则是为了让入参中的Acknowledgment ack生效,改为手动提交offset后,虽然降低了消费速度,但是避免了因消费报错导致的消息丢失问题。

ES配置优化

前面ElasticSearch不停机重建索引引申来的优化与思考 - 掘金 (juejin.cn)也讲过这个问题,这一次当然还是类似的配置副本数可以先设置为0,刷新间隔禁用,刷盘设置为异步即可。别的一些歪门邪道经测试没大用,除了前面三项最好的提升就是加内存,这个堆硬件最好使。插一个官方推荐的优化插入速度的链接Tune for indexing speed | Elasticsearch Guide [8.6] | Elastic,顺道总结下ES官方推荐的提高插入速度的方法

  1. 使用Bulk批量插入
  2. 多线程请求插入ES
  3. 取消或者增加索引刷新间隔
  4. 插入时禁用索引初始副本
  5. 服务器禁用swap
  6. 索引文档使用自动生成的ID
  7. 使用更快的硬件,比如SSD
  8. 加内存(这一点是我加的,因为ES很多配置是内存百分比的,提高内存一般是性能提升的最优解。懂吗朋友,加钱啊)

不同架构的消费速度对比

目前测试节点是三节点集群,都是mixed节点,服务器都是8核16G+1T机械磁盘(用SSD会更快)。ES除了一个节点因机器上服务过多(Kibana,SkywalkingUI,redis-cluster等),设置为4G(-Xms4G -Xmx4G),另两台设置为6G。接下来我会以表格的方式记录不同配置下,在Kibana上观测到的极限插入速度。

贴一张Kibana的图,其他就不截图了,不然成灌水了。这个实测结果其实是符合我的预期的,不过让我意外的是1master(4G)+2data(6+6)的速度高于3mixed(4+6+6)。不知道是不是我的机器配置原因导致了这一点,但是我手里也没有空闲的服务器供我测试了,就留待读者们自己实验了,有测试过的大佬可以在评论区留言。简单来说,是符合上述ES官方推荐的优化配置的说法的,多线程的提升是巨量的,禁用副本和刷新间隔也会产生不小的提升。

消费的单次拉取值是需要根据实际情况调整的,我第一次调整是单次3000,后来调成2000,都发生了同一个问题,速度没上去,服务器负载倒是飙升,如下图

刺激的BUG战场

发送消息体过大

新建了Topic后,写好中转生产者后,刚调用就G了,快如闪电。原因很简单,消息体超过message.max.bytes默认值1MB了,调大即可。

ES插入获取请求连接超时

java.util.concurrent.TimeoutException: Connection lease request time out
at org.apache.http.nio.pool.AbstractNIOConnPool.processPendingRequest(AbstractNIOConnPool.java:411) ~[httpcore-nio-4.4.13.jar:4.4.13]
at org.apache.http.nio.pool.AbstractNIOConnPool.processNextPendingRequest(AbstractNIOConnPool.java:391) ~[httpcore-nio-4.4.13.jar:4.4.13]
at org.apache.http.nio.pool.AbstractNIOConnPool.release(AbstractNIOConnPool.java:355) ~[httpcore-nio-4.4.13.jar:4.4.13]
at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.releaseConnection(PoolingNHttpClientConnectionManager.java:391) ~[httpasyncclient-4.1.4.jar:4.1.4]
复制代码

我是在spring boot数据处理服务中使用了elasticsearch-rest-high-level-client客户端,其中封装了HttpClient来向ES发送请求。这里的问题来源于使用了bulkAsync(后根据场景换成了同步请求bulk),这是个异步请求,底层是AbstractNIOConnPool提供了请求连接池。发生这个报错的原因是新的异步请求在获取连接时超过了获取连接超时时间,解决方案最直接的就是调大连接池大小和请求连接超时时间。

httpClientBuilder.setMaxConnTotal(zeroProperties.getMaxConnectTotal());
httpClientBuilder.setMaxConnPerRoute(zeroProperties.getMaxConnectPerRoute());
复制代码

ES配置超时时间无效

RestClientBuilder builder = RestClient.builder(httpHostZeroList.toArray(new HttpHost[0]));
builder.setHttpClientConfigCallback(httpClientBuilder -> {

        httpClientBuilder.setMaxConnTotal(zeroProperties.getMaxConnectTotal());
        httpClientBuilder.setMaxConnPerRoute(zeroProperties.getMaxConnectPerRoute());
        return httpClientBuilder;
    });
if (!StringUtils.isEmpty(username) && !StringUtils.isEmpty(password)) {
    CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
    credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
    builder.setHttpClientConfigCallback(httpClientBuilder -> {
        httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
       
        return httpClientBuilder;
    });
}
复制代码

在解决上面问题时Debug发现一个很神奇的现象,就是这里的配置超时时间的setMaxConnTotal和setMaxConnPerRoute,如果后面有相同的setHttpClientConfigCallback方法时,就会走最后一个set方法,前面的不会生效。在没有debug之前,我设置的参数一直不能生效,始终是默认值,我百思不得其解,最后通过debug,发现了这个神奇的现象。最后解决方法如下,就是放在一起进行配置,希望有大佬能解答我这个问题。

RestClientBuilder builder = RestClient.builder(httpHostZeroList.toArray(new HttpHost[0]));
if (!StringUtils.isEmpty(username) && !StringUtils.isEmpty(password)) {
    CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
    credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
    builder.setHttpClientConfigCallback(httpClientBuilder -> {
        httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
        httpClientBuilder.setMaxConnTotal(zeroProperties.getMaxConnectTotal());
        httpClientBuilder.setMaxConnPerRoute(zeroProperties.getMaxConnectPerRoute());
        return httpClientBuilder;
    });
}else {
    builder.setHttpClientConfigCallback(httpClientBuilder -> {
        httpClientBuilder.setMaxConnTotal(zeroProperties.getMaxConnectTotal());
        httpClientBuilder.setMaxConnPerRoute(zeroProperties.getMaxConnectPerRoute());
        return httpClientBuilder;
    });
}
复制代码

ES过量插入导致OOM

org.elasticsearch.client.ResponseException: method [POST], host [http://192.168.158.115:9200], URI [/_bulk?timeout=3m], status line [HTTP/1.1 429 Too Many Requests]

{"error":{"root_cause":[{"type":"circuit_breaking_exception","reason":"[parent] Data too large, data for [] would be [2077061194/1.9gb], which is larger than the limit of [2040109465/1.8gb], real usage: [2074284352/1.9gb], new bytes reserved: [2776842/2.6mb], usages [request=0/0b, fielddata=37121/36.2kb, in_flight_requests=10749974/10.2mb, model_inference=0/0b, accounting=8100600/7.7mb]","bytes_wanted":2077061194,"bytes_limit":2040109465,"durability":"TRANSIENT"}],"type":"circuit_breaking_exception","reason":"[parent] Data too large, data for [] would be [2077061194/1.9gb], which is larger than the limit of [2040109465/1.8gb], real usage: [2074284352/1.9gb], new bytes reserved: [2776842/2.6mb], usages [request=0/0b, fielddata=37121/36.2kb, in_flight_requests=10749974/10.2mb, model_inference=0/0b, accounting=8100600/7.7mb]","bytes_wanted":2077061194,"bytes_limit":2040109465,"durability":"TRANSIENT"},"status":429}

ES默认是1G,我这里调整到2G,上面这段话的意思就是说,ES的堆内存满了,放不下新的数据,也就是标准OOM了。重点观察报错日志的几个参数

此时ES此节点挂掉,查看日志可以得到下面这段。

[2023-03-09T17:45:22,338][INFO ][o.e.i.b.HierarchyCircuitBreakerService] [node-2] attempting to trigger G1GC due to high heap usage [2049729624]

[2023-03-09T17:45:22,343][INFO ][o.e.i.b.HierarchyCircuitBreakerService] [node-2] GC did bring memory usage down, before [2049729624], after [2029398072], allocations [1], duration [5]

[2023-03-09T17:45:22,332][WARN ][o.e.h.AbstractHttpServerTransport] [node-2] caught exception while handling client http traffic, closing connection Netty4HttpChannel{localAddress=/192.168.158.114:9200, remoteAddress=/172.27.136.31:6096}

java.lang.Exception: java.lang.OutOfMemoryError: Java heap space

这个解决办法其实很简单,因为调任何参数都不治本了,所以唯一能做的就是加内存,我在测试阶段是加到4G就没有问题了。

微醺码头

代码场景设计实践 - 云雨雪的专栏 - 掘金 (juejin.cn),突然发现我这个专栏已经有26人订阅了,系列文章均3K阅读量40赞60收藏。本期也将收录至该专栏,依照传统,必然要登上微醺码头,为大家带来一些散碎的知识点,坐稳了,开船啦!

更换节点

一般分为四种节点类型,分别是主节点、数据节点、客户端节点和混合节点。通过elasticsearch.yml中的node.master: true和node.data: true(默认值)来配置,默认节点是混合节点。

当集群达到一定规模之后,不建议使用mixed,而是应该对各节点进行角色划分,可以按照mixed--->master+data--->master+data+client逐步划分。

接下来介绍下如何从mixed进行转换,当然我的建议是一开始就要划分出来明确的角色,别到一半再转。以下是从mixed转换到master的步骤,其他同理了。

  1. 修改elasticsearch.yml中的node.master: true和node.data:false
  2. 下线该节点,在es安装目录下的bin文件夹下执行./elasticsearch-node repurpose清除分区数据
  3. 最后重启该节点就成功

注意这里有几个问题,首先是在当前节点拥有分片数据的时候是不允许直接设置node.data:false的,会提示以下异常

我是使用的ES7.9版本,因此官方在报错信息后贴心的提示了可以使用工具www.elastic.co/guide/en/el…进行分片信息的删除。

这里bin目录下执行命令后,会有个确认环节,再次让你判断是否要清除分片数据。这里我的建议是三个,一是手动通过ES的API迁移分片(一次只能迁移一个索引,折磨),二是一开始就弄好节点角色定位避免二次变更,三是所有索引都加上副本。

从上面这个图中可以看到,当选择清除主节点分片数据后,有副本的索引的数据不会丢,而是将副本升级为分片,体现在未分配分片中。因为我的索引大部分设置为3分片,所以这时候索引大部分状态都是Yellow,小部分没有副本的则是变成了Red。(上面这个图截的不好,应该截索引那个界面的)

当我重新将master节点变成mixed节点后,未分配的分片会慢慢填充到该节点,节点和集群状态也会转Green,不要害怕,让子弹飞一会儿。

Kafka各类型通用生产者配置

按照功能区分为三种特化型生产者和一种普适生产者。三种特化型分别是高性能(吞吐量)、高可靠性、顺序性。高性能上面说过了,就不复述了。

import com.xxx.transfer.properties.TransferProperties;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;

import java.util.HashMap;
import java.util.Map;

/**
 * @Author WangZY
 * @Date 2022/3/15 14:51
 * @Description 多种生产者配置
 **/
@EnableConfigurationProperties(value = {TransferProperties.class})
@Configuration
public class KafkaProducerConfig {
    @Autowired
    private TransferProperties prop;

    /**
     * @author WangZY
     * @date 2022/7/14 11:38
     * @description 高性能
     */
    @Bean
    public KafkaTemplate performanceKafkaTemplate() {
        Map props = new HashMap<>(16);
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, prop.getKafkaServers());
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 163840);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 20);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864);
        props.put(ProducerConfig.ACKS_CONFIG, "1");
        props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 10485760);
        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000);
        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
        //config.getInt,自动强转这里不用在意是字符串还是数字
        props.put(ProducerConfig.RETRIES_CONFIG, 1);
        props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 1000);
        return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(props));
    }

    /**
     * @author WangZY
     * @date 2022/7/14 11:40
     * @description 高可靠性
     */
    @Bean
    public KafkaTemplate reliableHighKafkaTemplate() {
        Map props = new HashMap<>(16);
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, prop.getKafkaServers());
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 10485760);
        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000);
        props.put(ProducerConfig.RETRIES_CONFIG, "3");
        props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "1000");
        props.put(ProducerConfig.ACKS_CONFIG, "-1");
        return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(props));
    }

    /**
     * @author WangZY
     * @date 2022/7/14 11:40
     * @description 顺序性
     */
    @Bean
    public KafkaTemplate timeKafkaTemplate() {
        Map props = new HashMap<>(16);
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, prop.getKafkaServers());
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 10485760);
        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000);
        props.put(ProducerConfig.RETRIES_CONFIG, "1");
        props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "1000");
        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
        return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(props));
    }

    /**
     * @author WangZY
     * @date 2022/7/14 11:41
     * @description 普通
     */
    @Primary
    @Bean
    public KafkaTemplate normalKafkaTemplate() {
        Map props = new HashMap<>(16);
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, prop.getKafkaServers());
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 10485760);
        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000);
        props.put(ProducerConfig.RETRIES_CONFIG, "1");
        props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "1000");
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
        props.put(ProducerConfig.LINGER_MS_CONFIG, 5);
        return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(props));
    }
}
复制代码

Kafka各类型通用消费者配置

这个就相对朴实了,区分比较少,主要是是否自动提交offset的对比,高性能部分也在前面提到过,直接贴代码吧。

import com.xxx.transfer.properties.TransferProperties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.util.StringUtils;

import java.util.HashMap;
import java.util.Map;

/**
 * @Author WangZY
 * @Date 2022/3/15 14:51
 * @Description 多种消费者配置
 **/
@EnableConfigurationProperties(value = {TransferProperties.class})
@Configuration
public class KafkaConsumerConfig {
    @Autowired
    private TransferProperties prop;

    /**
     * @author WangZY
     * @date 2022/7/14 11:35
     * @description 高性能
     */
    @Bean
    public ConcurrentKafkaListenerContainerFactory performanceFactory() {
        ConcurrentKafkaListenerContainerFactory container =
                new ConcurrentKafkaListenerContainerFactory<>();
        Map props = new HashMap<>(16);
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, prop.getKafkaServers());
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, StringUtils.isEmpty(prop.getConsumerGroupId()) ?
                "performanceConsumerGroup" : prop.getConsumerGroupId());
        props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1048576);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 60000);
        container.setConsumerFactory(new DefaultKafkaConsumerFactory<>(props));
        container.setConcurrency(3);
        container.setBatchListener(true);
        return container;
    }


    /**
     * @author WangZY
     * @date 2022/7/14 11:36
     * @description 高可靠
     */
    @Bean
    public ConcurrentKafkaListenerContainerFactory reliableHighFactory() {
        ConcurrentKafkaListenerContainerFactory container =
                new ConcurrentKafkaListenerContainerFactory<>();
        Map props = new HashMap<>(16);
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, prop.getKafkaServers());
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, StringUtils.isEmpty(prop.getConsumerGroupId()) ?
                "reliableHighConsumerGroup" : prop.getConsumerGroupId());
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 60000);
        container.setConsumerFactory(new DefaultKafkaConsumerFactory<>(props));
        container.setConcurrency(3);
        container.setBatchListener(true);
        container.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        return container;
    }

    /**
     * @author WangZY
     * @date 2022/7/14 11:38
     * @description 普通
     */
    @Primary
    @Bean
    public ConcurrentKafkaListenerContainerFactory normalFactory() {
        ConcurrentKafkaListenerContainerFactory container =
                new ConcurrentKafkaListenerContainerFactory<>();
        Map props = new HashMap<>(16);
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, prop.getKafkaServers());
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, StringUtils.isEmpty(prop.getConsumerGroupId()) ?
                "normalConsumerGroup" : prop.getConsumerGroupId());
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 60000);
        container.setConsumerFactory(new DefaultKafkaConsumerFactory<>(props));
        container.setConcurrency(3);
        container.setBatchListener(true);
        return container;
    }

}
复制代码

写在最后

在我看来消息积压这个问题场景的解决思路是很简单的,确实没啥好讲的,但同时细节又十分丰富,因此我补充了一些优化知识和实际遇到的问题。解决问题的过程还是蛮有意思的。

展开阅读全文

页面更新:2024-02-06

标签:消息   代码   生产者   难解   副本   节点   索引   思路   细节   担心   内存   参数   数据   日志

1 2 3 4 5

上滑加载更多 ↓
推荐阅读:
友情链接:
更多:

本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828  

© CopyRight 2008-2024 All Rights Reserved. Powered By bs178.com 闽ICP备11008920号-3
闽公网安备35020302034844号

Top