详细讲解如何使用Java连接Kafka构建生产者和消费者

前言

学习消息队列的过程中,先补习了RabbitMQ相关知识,接着又重温了Kafka相关的知识,发现,我并没有积累Java原生操作Kafka的文章,只使用SpringBoot集成过Kafka,所以,本次是纯Java的方式操作Kafka,构建生产者和消费者,本地部署Kafka环境,给出测试样例的测试结果,同时,讲解部分通用的参数,及给出通过命令行启动生产者和消费者的测试样例,分享如下,帮助读者学习Kafka基础操作。

2 环境准备

下载kafka

2.1 启动zookeeper

bin/zookeeper-server-start.sh config/zookeeper.properties

2.2 启动kafka

bin/kafka-server-start.sh config/server.properties

2.3 新建topic

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic my-topic

2.4 依赖

    org.apache.kafka    kafka-clients    3.2.0

3 Kafka通用配置

Kafka的生产者和消费者参数比较多,这里仅列出一些测试用的参数,

区分生产者和消费者。

3.1 生产者配置参数

序号

参数

描述

1

bootstrap.servers

Kafka主机

2

acks

生产者:要求leader请求结束前收到的确认次数,来控制发送数据的持久化
消息确认:
0:生产者不等待服务器确认,此时retry参数不生效
1:leader写入记录到log,不会等待follower的确认即向生产者发送通知
all:leader等待所有副本通知,然后向生产者发送通知,保证所有数据落盘到所有副本,功能同设置为-1

3

retries

生产者重试次数

4

batch.size

生产者:向同一分区发送打包发送的数据量,单位:bytes,默认16384bytes=16K

5

linger.ms

生产者:批量发送消息的间隔时间(延迟时间),单位:毫秒

6

buffer.memory

生产者:可以使用的最大缓存空间,单位:bytes,默认33554432bytes=32M

7

key.serializer

生产者:键编码器

8

value.serializer

生产者:值编码器

3.2 消费者配置参数

序号

参数

描述

1

bootstrap.servers

Kafka主机

2

group.id

消费者:消费topic的组ID

3

enable.auto.commit

消费者:后台定期提交offset

4

auto.commit.interval.ms

消费者提交offset的时间间隔:单位:毫秒,当enable.auto.commit为true时生效

5

auto.offset.reset

消费者:重启后配置offset
earliest:消费者恢复到当前topic最早的offset
latest:消费者从最新的offset开始消费
none:如果消费者组没找到之前的offset抛出异常
其他任何值都会抛出异常

6

key.deserializer

消费者:键解码器

7

value.deserializer

消费者:值解码器

3.3 Kafka通用参数封装

由于参数众多,这里封装了一个Kafka通用参数类,给了默认值,

本地测试,直接使用默认参数,

同时给出了有参构造器,自定义参数,

代码样例如下。

package com.monkey.java_study.mq.kafka;import java.util.Collection;import java.util.Collections;/** * Kafka通用配置. * * @author xindaqi * @since 2022-08-03 9:49 */public class KafkaCommonProperties {     /**     * Kafka主机     */    private String kafkaHost = "192.168.211.129:9092";    /**     * 生产者:要求leader请求结束前收到的确认次数,来控制发送数据的持久化     * 消息确认:     * 0:生产者不等待服务器确认,此时retry参数不生效     * 1:leader写入记录到log,不会等待follower的确认即向生产者发送通知     * all:leader等待所有副本通知,然后向生产者发送通知,保证所有数据落盘到所有副本,功能同设置为-1     */    private String ack = "all";    /**     * 生产者重试次数     */    private Integer retryTimes = 1;    /**     * 生产者:向同一分区发送打包发送的数据量,单位:bytes,默认16384bytes=16K     */    private Integer batchSize = 16384;    /**     * 生产者:批量发送消息的间隔时间(延迟时间),单位:毫秒     */    private Integer lingerMs = 1;    /**     * 生产者:可以使用的最大缓存空间,单位:bytes,默认33554432bytes=32M.     */    private Integer bufferMemory = 33554432;    /**     * 生产者:键编码器     */    private String keyEncoder = "org.apache.kafka.common.serialization.StringSerializer";    /**     * 生产者:值编码器     */    private String valueEncoder = "org.apache.kafka.common.serialization.StringSerializer";    /**     * 消费者:消费topic的组ID     */    private String groupId = "my-group-id";    /**     * 消费者:后台定期提交offset     */    private String autoCommit = "true";    /**     * 消费者提交offset的时间间隔:单位:毫秒,当enable.auto.commit为true时生效     */    private String autoCommitIntervalMs = "1000";    /**     * 消费者:键解码器     */    private String keyDecoder = "org.apache.kafka.common.serialization.StringDeserializer";    /**     * 消费者:值解码器     */    private String valueDecoder = "org.apache.kafka.common.serialization.StringDeserializer";    /**     * 消费者:重启后配置offset     * earliest:消费者恢复到当前topic最早的offset     * latest:消费者从最新的offset开始消费     * none:如果消费者组没找到之前的offset抛出异常     * 其他任何值都会抛出异常     */    private String autoOffsetReset = "latest";    /**     * TOPIC     */    private Collection topic = Collections.singleton("my-topic");    public KafkaCommonProperties() {     }    public KafkaCommonProperties(String kafkaHost, String ack, Integer retryTimes, Integer batchSize, Integer lingerMs, Integer bufferMemory, String keyEncoder, String valueEncoder, String groupId, String autoCommit, String autoCommitIntervalMs, String keyDecoder, String valueDecoder, String autoOffsetReset, Collection topic) {         this.kafkaHost = kafkaHost;        this.ack = ack;        this.retryTimes = retryTimes;        this.batchSize = batchSize;        this.lingerMs = lingerMs;        this.bufferMemory = bufferMemory;        this.keyEncoder = keyEncoder;        this.valueEncoder = valueEncoder;        this.groupId = groupId;        this.autoCommit = autoCommit;        this.autoCommitIntervalMs = autoCommitIntervalMs;        this.keyDecoder = keyDecoder;        this.valueDecoder = valueDecoder;        this.autoOffsetReset = autoOffsetReset;        this.topic = topic;    }// 省略setter和getter及toString()  }

4 Code实践

4.1 生产者

构建Kafka数据生产者,

测试样例的配置有:Kafka broker地址,消息确认,重试,批量发送数据,数据键和值的编码器,

重写Callback实现异步生产数据。

4.1.1 生产数据

package com.monkey.java_study.mq.kafka;import org.apache.kafka.clients.producer.*;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.util.Objects;import java.util.Properties;/** * Kafka生产者. * * @author xindaqi * @since 2022-08-02 9:59 */public class KafkaProducerTest {     private static final Logger logger = LoggerFactory.getLogger(KafkaProducerTest.class);    public static KafkaProducer getDefaultKafkaProducer(KafkaCommonProperties kafkaCommonProperties) {         Properties properties = new Properties();        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaCommonProperties.getKafkaHost());        properties.put(ProducerConfig.ACKS_CONFIG, kafkaCommonProperties.getAck());        properties.put(ProducerConfig.RETRIES_CONFIG, kafkaCommonProperties.getRetryTimes());        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, kafkaCommonProperties.getBatchSize());        properties.put(ProducerConfig.LINGER_MS_CONFIG, kafkaCommonProperties.getLingerMs());        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, kafkaCommonProperties.getBufferMemory());        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, kafkaCommonProperties.getKeyEncoder());        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, kafkaCommonProperties.getValueEncoder());        return new KafkaProducer<>(properties);    }    static class MyProducerCallback implements Callback {         @Override        public void onCompletion(RecordMetadata metadata, Exception exception) {             if (Objects.nonNull(exception)) {                 logger.error(">>>>>>>>>>Producer生产消息异常:", exception);            }            if (Objects.nonNull(metadata)) {                 logger.info(">>>>>>>>>>Producer生产消息:metadata:{},partition:{}, offset:{}", metadata, metadata.partition(), metadata.offset());            }        }    }    public static void main(String[] args) {         KafkaCommonProperties kafkaCommonProperties = new KafkaCommonProperties();        KafkaProducer producer = getDefaultKafkaProducer(kafkaCommonProperties);        String message = "hello world ";        try {             for (int i = 0; i < 10; i++) {                 // 异步写入数据                String topic = kafkaCommonProperties.getTopic().toArray()[0].toString();                ProducerRecord producerRecord = new ProducerRecord<>(topic, message + i);                producer.send(producerRecord, new MyProducerCallback());            }        } catch (Exception ex) {             logger.error(">>>>>>>>生产数据异常:", ex);            throw new RuntimeException(ex);        } finally {             producer.close();        }    }}

4.1.2 开启生产者

生产者开启后,控制台输出生产者配置信息,如下图所示,其中,

acks在代码中配置为all,而运行日志中acks为-1,所以,acks的all与-1是同种功能。

生产者生产数据是通过异步的方式,控制台日志如下图所示,

由图可知,生产数据的线程为:kafka-producer-network-thread。

4.2 消费者

Kafka消费者通过groupId消费指定topic的,

以groupId区分不同的消费者,即不同的groupId消费相同的topic,对于topic而言,就是不同的消费者,

同时,消费者需要记录消费到的offset,以便下次启动时定位到具体的位置,消费消息。

这里,配置的offset策略为:latest,即每次重启消费者时,从最新的offset开始消费(上次记录的offset之后的一个,如果上次消费没有记录,则从当前offset之后开始消费)。

offset的重置这样理解:

当前topic写入数据有4条,offset从0到3,

如果,offset重设为earliest,则每次重启消费者,offset都会从0开始消费数据;

如果,offset重设为latest,则,每次消费从上次消费的offset下一个开始消费,如果上次消费的offset为3,则,重启后, 从4开始消费数据。

4.2.1 消费数据

package com.monkey.java_study.mq.kafka;import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.time.Duration;import java.util.Properties;/** * Kafka消费者. * * @author xindaqi * @since 2022-08-02 9:59 */public class KafkaConsumerTest {     private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerTest.class);    public static KafkaConsumer getDefaultKafkaConsumer(KafkaCommonProperties kafkaCommonProperties) {         Properties properties = new Properties();        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaCommonProperties.getKafkaHost());        properties.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaCommonProperties.getGroupId());        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaCommonProperties.getAutoCommit());        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, kafkaCommonProperties.getAutoCommitIntervalMs());        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaCommonProperties.getAutoOffsetReset());        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, kafkaCommonProperties.getKeyDecoder());        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, kafkaCommonProperties.getValueDecoder());        return new KafkaConsumer<>(properties);    }    public static void main(String[] args) {         try {             KafkaCommonProperties kafkaCommonProperties = new KafkaCommonProperties();            KafkaConsumer consumer = getDefaultKafkaConsumer(kafkaCommonProperties);            consumer.subscribe(kafkaCommonProperties.getTopic());            while (Boolean.TRUE) {                 ConsumerRecords records = consumer.poll(Duration.ofMillis(1000));                for (ConsumerRecord record : records) {                     logger.info(">>>>>>>>Consumer offset:{}, value:{}", record.offset(), record.value());                }            }        } catch (Exception ex) {             throw new RuntimeException(ex);        }    }}

4.2.2 开启消费者

开启消费者后,控制台输出消费者的配置参数,如下图所示。

同时输出的还有(如下图所示),下次将要消费的offset:41。

消费信息日志如下图所示,

由图可知,从offset 41开始消费。

5 命令行

上面

5.1 开启消费者

从topic的最开始消费数据,则offset从0开始,

开启消费者命令:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning

topic中已有数据,因此,开始消费者时,从最开始消费数据,结果如下图所示。

命令行启动生产者后,

该消费者会自动消费进入数据,结果如下图所示。

5.2 开启生产者

开启生产者命令如下:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-topic

发送消费如下图所示。

无。

展开阅读全文

页面更新:2024-03-13

标签:生产者   消费者   编码器   解码器   异常   参数   消息   单位   通知   数据   详细

1 2 3 4 5

上滑加载更多 ↓
推荐阅读:
友情链接:
更多:

本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828  

© CopyRight 2008-2024 All Rights Reserved. Powered By bs178.com 闽ICP备11008920号-3
闽公网安备35020302034844号

Top