Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。
Kafka中发布订阅的对象是topic。我们可以为每类数据创建一个topic,把向topic发布消息的客户端称作producer,从topic订阅消息的客户端称作consumer。Producers和consumers可以同时从多个topic读写数据。一个kafka集群由一个或多个broker服务器组成,它负责持久化和备份具体的kafka消息。
zookeeper 是 kafka 不可分割的一部分。接下来就来讲讲zookeeper在kafka中作用。
kafka 在 zookeeper 中的存储结构如下图所示:
所谓控制器就是一个Borker,在一个kafka集群中,有多个broker节点,但是它们之间需要选举出一个leader,其他的broker充当follower角色。集群中第一个启动的broker会通过在zookeeper中创建临时节点/controller来让自己成为控制器,其他broker启动时也会在zookeeper中创建临时节点,但是发现节点已经存在,所以它们会收到一个异常,意识到控制器已经存在,那么就会在zookeeper中创建watch对象,便于它们收到控制器变更的通知。
在kafka的集群中,会存在着多个主题topic,在每一个topic中,又被划分为多个partition,为了防止数据不丢失,每一个partition又有多个副本,在整个集群中,总共有三种副本角色:
默认的,如果follower与leader之间超过10s内没有发送请求,或者说没有收到请求数据,此时该follower就会被认为“不同步副本”。而持续请求的副本就是“同步副本”,当leader发生故障时,只有“同步副本”才可以被选举为leader。其中的请求超时时间可以通过参数replica.lag.time.max.ms参数来配置。
我们希望每个分区的leader可以分布到不同的broker中,尽可能的达到负载均衡,所以会有一个优先leader,如果我们设置参数auto.leader.rebalance.enable为true,那么它会检查优先leader是否是真正的leader,如果不是,则会触发选举,让优先leader成为leader。
组协调器会为消费组(consumer group)内的所有消费者选举出一个leader,这个选举的算法也很简单,第一个加入consumer group的consumer即为leader,如果某一时刻leader消费者退出了消费组,那么会重新 随机 选举一个新的leader。
k8s上安装kafka,可以使用helm,将kafka作为一个应用安装。当然这首先要你的k8s支持使用helm安装。helm的介绍和安装见:Kubernetes(k8s)包管理器Helm(Helm3)介绍&Helm3安装Harbor
1、创建命名空间
$ mkdir -p /opt/bigdata/kafka
$ cd /opt/bigdata/kafka
$ kubectl create namespace bigdata
2、创建持久化存储SC(bigdata-nfs-storage)
cat << EOF > bigdata-sc.yaml
apiVersion: v1
kind: ServiceAccount
metadata:
name: nfs-client-provisioner
# replace with namespace where provisioner is deployed
namespace: bigdata #根据实际环境设定namespace,下面类同
---
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: nfs-client-provisioner-runner
namespace: bigdata
rules:
- apiGroups: [""]
resources: ["persistentvolumes"]
verbs: ["get", "list", "watch", "create", "delete"]
- apiGroups: [""]
resources: ["persistentvolumeclaims"]
verbs: ["get", "list", "watch", "update"]
- apiGroups: ["storage.k8s.io"]
resources: ["storageclasses"]
verbs: ["get", "list", "watch"]
- apiGroups: [""]
resources: ["events"]
verbs: ["create", "update", "patch"]
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: run-nfs-client-provisioner
subjects:
- kind: ServiceAccount
name: nfs-client-provisioner
namespace: bigdata
# replace with namespace where provisioner is deployed
roleRef:
kind: ClusterRole
name: nfs-client-provisioner-runner
apiGroup: rbac.authorization.k8s.io
---
kind: Role
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: leader-locking-nfs-client-provisioner
namespace: bigdata
# replace with namespace where provisioner is deployed
rules:
- apiGroups: [""]
resources: ["endpoints"]
verbs: ["get", "list", "watch", "create", "update", "patch"]
---
kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: leader-locking-nfs-client-provisioner
namespace: bigdata
subjects:
- kind: ServiceAccount
name: nfs-client-provisioner
# replace with namespace where provisioner is deployed
namespace: bigdata
roleRef:
kind: Role
name: leader-locking-nfs-client-provisioner
apiGroup: rbac.authorization.k8s.io
---
kind: Deployment
apiVersion: apps/v1
metadata:
name: nfs-client-provisioner
namespace: bigdata
spec:
replicas: 1
strategy:
type: Recreate
selector:
matchLabels:
app: nfs-client-provisioner
template:
metadata:
labels:
app: nfs-client-provisioner
spec:
serviceAccountName: nfs-client-provisioner
containers:
- name: nfs-client-provisioner
image: quay.io/external_storage/nfs-client-provisioner:latest
volumeMounts:
- name: nfs-client-root
mountPath: /persistentvolumes #容器内挂载点
env:
- name: PROVISIONER_NAME
value: fuseim.pri/ifs
- name: NFS_SERVER
value: 192.168.0.113
- name: NFS_PATH
value: /opt/nfsdata
volumes:
- name: nfs-client-root #宿主机挂载点
nfs:
server: 192.168.0.113
path: /opt/nfsdata
---
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
name: bigdata-nfs-storage
namespace: bigdata
provisioner: fuseim.pri/ifs # or choose another name, must match deployment's env PROVISIONER_NAME'
reclaimPolicy: Retain #回收策略:Retain(保留)、 Recycle(回收)或者Delete(删除)
volumeBindingMode: Immediate #volumeBindingMode存储卷绑定策略
allowVolumeExpansion: true #pvc是否允许扩容
EOF
执行
$ kubectl apply -f bigdata-sc.yaml
$ kubectl get sc -n bigdata
$ kubectl describe sc bigdata-nfs-storage -n bigdata
3、helm添加bitnami仓库
$ helm repo add bitnami https://charts.bitnami.com/bitnami
$ helm install zookeeper bitnami/zookeeper
--namespace bigdata
--set replicaCount=3 --set auth.enabled=false
--set allowAnonymousLogin=true
--set persistence.storageClass=bigdata-nfs-storage
--set persistence.size=1Gi
查看,一定看到所有pod都是正常运行才ok
$ kubectl get pod,pv,svc -n bigdata -o wide
验证
内部连接测试
$ export POD_NAME=$(kubectl get pods --namespace bigdata -l "app.kubernetes.io/name=zookeeper,app.kubernetes.io/instance=zookeeper,app.kubernetes.io/component=zookeeper" -o jsonpath="{.items[0].metadata.name}")
$ kubectl exec -it $POD_NAME -n bigdata -- zkCli.sh
外部连接测试
# 先删掉本地端口对应的进程,要不然就得换连接端口了
$ netstat -tnlp|grep 127.0.0.1:2181|awk '{print int($NF)}'|xargs kill -9
# 外部连接测试
$ kubectl port-forward --namespace bigdata svc/zookeeper 2181:2181 &
# 需要本机安装zk客户端
$ zkCli.sh 127.0.0.1:21
1、查看zookeeper集群状态
$ helm status zookeeper -n bigdata
NAME: zookeeper
LAST DEPLOYED: Sat Dec 4 13:38:16 2021
NAMESPACE: bigdata
STATUS: deployed
REVISION: 1
TEST SUITE: None
NOTES:
CHART NAME: zookeeper
CHART VERSION: 7.4.13
APP VERSION: 3.7.0
** Please be patient while the chart is being deployed **
ZooKeeper can be accessed via port 2181 on the following DNS name from within your cluster:
zookeeper.bigdata.svc.cluster.local
To connect to your ZooKeeper server run the following commands:
export POD_NAME=$(kubectl get pods --namespace bigdata -l "app.kubernetes.io/name=zookeeper,app.kubernetes.io/instance=zookeeper,app.kubernetes.io/ component=zookeeper" -o jsonpath="{.items[0].metadata.name}")
kubectl exec -it $POD_NAME -- zkCli.sh
To connect to your ZooKeeper server from outside the cluster execute the following commands:
kubectl port-forward --namespace bigdata svc/zookeeper 2181:2181 &
zkCli.sh 127.0.0.1:2181
ZooKeeper can be accessed via port 2181 on the following DNS name from within your cluster:
zookeeper.bigdata.svc.cluster.local
安装
$ helm install kafka bitnami/kafka
--namespace bigdata
--set zookeeper.enabled=false
--set replicaCount=3
--set externalZookeeper.servers=zookeeper.bigdata.svc.cluster.local
--set persistence.storageClass=bigdata-nfs-storage
NAME: kafka
LAST DEPLOYED: Sat Dec 4 15:37:33 2021
NAMESPACE: bigdata
STATUS: deployed
REVISION: 1
TEST SUITE: None
NOTES:
CHART NAME: kafka
CHART VERSION: 14.4.3
APP VERSION: 2.8.1
** Please be patient while the chart is being deployed **
Kafka can be accessed by consumers via port 9092 on the following DNS name from within your cluster:
kafka.bigdata.svc.cluster.local
Each Kafka broker can be accessed by producers via port 9092 on the following DNS name(s) from within your cluster:
kafka-0.kafka-headless.bigdata.svc.cluster.local:9092
kafka-1.kafka-headless.bigdata.svc.cluster.local:9092
kafka-2.kafka-headless.bigdata.svc.cluster.local:9092
To create a pod that you can use as a Kafka client run the following commands:
kubectl run kafka-client --restart='Never' --image docker.io/bitnami/kafka:2.8.1-debian-10-r57 --namespace bigdata --command -- sleep infinity
kubectl exec --tty -i kafka-client --namespace bigdata -- bash
PRODUCER:
kafka-console-producer.sh
--broker-list kafka-0.kafka-headless.bigdata.svc.cluster.local:9092,kafka-1.kafka-headless.bigdata.svc.cluster.local:9092,kafka-2.kafka-hea dless.bigdata.svc.cluster.local:9092
--topic test
CONSUMER:
kafka-console-consumer.sh
--bootstrap-server kafka.bigdata.svc.cluster.local:9092
--topic test
--from-beginning
查看
$ kubectl get pod,svc -n bigdata
测试,安装上面提示,先创建一个client
$ kubectl run kafka-client --restart='Always' --image docker.io/bitnami/kafka:2.8.1-debian-10-r57 --namespace bigdata --command -- sleep infinity
打开两个窗口(一个作为生产者:producer,一个作为消费者:consumer),但是两个窗口都得先登录客户端
$ kubectl exec --tty -i kafka-client --namespace bigdata -- bash
producer
$ kafka-console-producer.sh
--broker-list kafka-0.kafka-headless.bigdata.svc.cluster.local:9092,kafka-1.kafka-headless.bigdata.svc.cluster.local:9092,kafka-2.kafka-headless.bigdata.svc.cluster.local:9092
--topic test
consumer
$ kafka-console-consumer.sh
--bootstrap-server kafka.bigdata.svc.cluster.local:9092
--topic test
--from-beginning
在producer端输入,consumer会实时打印
1、创建Topic(一个副本一个分区)
--create: 指定创建topic动作
--topic:指定新建topic的名称
--zookeeper: 指定kafka连接zk的连接url,该值和server.properties文件中的配置项{zookeeper.connect}一样
--config:指定当前topic上有效的参数值,参数列表参考文档为: Topic-level configuration
--partitions:指定当前创建的kafka分区数量,默认为1个
--replication-factor:指定每个分区的复制因子个数,默认1个
$ kafka-topics.sh --create --topic mytest --zookeeper zookeeper.bigdata.svc.cluster.local:2181 --partitions 1 --replication-factor 1
# 查看
$ kafka-topics.sh --describe --zookeeper zookeeper.bigdata.svc.cluster.local:2181 --topic mytest
2、删除Topic
# 先查看topic列表
$ kafka-topics.sh --list --zookeeper zookeeper.bigdata.svc.cluster.local:2181
# 删除
$ kafka-topics.sh --delete --topic mytest --zookeeper zookeeper.bigdata.svc.cluster.local:2181
# 再查看,发现topic还在
$ kafka-topics.sh --list --zookeeper zookeeper.bigdata.svc.cluster.local:2181
其实上面没删除,只是标记了(只会删除zookeeper中的元数据,消息文件须手动删除)
Note: This will have no impact if delete.topic.enable is not set to true.## 默认情况下,删除是标记删除,没有实际删除这个Topic;如果运行删除Topic,两种方式:
3、修改Topic信息
kafka默认的只保存7天的数据,时间一到就删除数据,当遇到磁盘过小,存放的数据量过大,可以设置缩短这个时间。
# 先创建一个topic
$ kafka-topics.sh --create --topic test001 --zookeeper zookeeper.bigdata.svc.cluster.local:2181 --partitions 1 --replication-factor 1
# 修改,设置数据过期时间(-1表示不过期)
$ kafka-topics.sh --zookeeper zookeeper.bigdata.svc.cluster.local:2181 -topic test001 --alter --config retention.ms=259200000
# 修改多字段
$ kafka-topics.sh --zookeeper zookeeper.bigdata.svc.cluster.local:2181 -topic test001 --alter --config max.message.bytes=128000 retention.ms=259200000
$ kafka-topics.sh --describe --zookeeper zookeeper.bigdata.svc.cluster.local:2181 --topic test001
4、增加topic分区数
$ kafka-topics.sh --zookeeper zookeeper.bigdata.svc.cluster.local:2181 --alter --topic test --partitions 10
$ kafka-topics.sh --describe --zookeeper zookeeper.bigdata.svc.cluster.local:2181 --topic test
5、查看Topic列表
$ kafka-topics.sh --list --zookeeper zookeeper.bigdata.svc.cluster.local:2181
6、列出所有主题中的所有用户组
$ kafka-consumer-groups.sh --bootstrap-server kafka-0.kafka-headless.bigdata.svc.cluster.local:9092 --list
7、查询消费者组详情(数据积压情况)
# 生产者
$ kafka-console-producer.sh
--broker-list kafka-0.kafka-headless.bigdata.svc.cluster.local:9092,kafka-1.kafka-headless.bigdata.svc.cluster.local:9092,kafka-2.kafka-headless.bigdata.svc.cluster.local:9092
--topic test
# 消费者带group.id
$ kafka-console-consumer.sh --bootstrap-server kafka-0.kafka-headless.bigdata.svc.cluster.local:9092 --topic test --consumer-property group.id=mygroup
# 查看消费组情况
$ kafka-consumer-groups.sh --bootstrap-server kafka-0.kafka-headless.bigdata.svc.cluster.local:9092 --describe --group mygroup
消费积压情况分析
生产和消费的操作上面已经实验过了,这里就不再重复了,更多的操作,可以参考kafka官方文档
页面更新:2024-05-17
本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828
© CopyRight 2008-2024 All Rights Reserved. Powered By bs178.com 闽ICP备11008920号-3
闽公网安备35020302034844号