Kafka原理介绍+安装+基本操作(kafka on k8s)

一、Kafka概述

Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。

1)Kafka的特性

2)应用场景

二、Kafka架构简介

Kafka中发布订阅的对象是topic。我们可以为每类数据创建一个topic,把向topic发布消息的客户端称作producer,从topic订阅消息的客户端称作consumer。Producers和consumers可以同时从多个topic读写数据。一个kafka集群由一个或多个broker服务器组成,它负责持久化和备份具体的kafka消息。

Kafka原理介绍+安装+基本操作(kafka on k8s)

写入流程

  1. producer 先从 zookeeper 的 "/brokers/.../state" 节点找到该 partition 的 leader
  2. producer 将消息发送给该 leader
  3. leader 将消息写入本地 log
  4. followers 从 leader pull 消息,写入本地 log 后 leader 发送 ACK
  5. leader 收到所有 ISR 中的 replica 的 ACK 后,增加 HW(high watermark,最后 commit 的 offset) 并向 producer 发送 ACK
Kafka原理介绍+安装+基本操作(kafka on k8s)

三、Kakfa的设计思想

四、Zookeeper在Kafka中的作用

zookeeper 是 kafka 不可分割的一部分。接下来就来讲讲zookeeper在kafka中作用。

1)记录和维护broker状态

2)控制器(leader )选举

3)限额权限

4)记录 ISR(已同步的副本)

5)node 和 topic 注册

6)topic 配置

kafka 在 zookeeper 中的存储结构如下图所示:

Kafka原理介绍+安装+基本操作(kafka on k8s)

五、Leader选举

1)控制器(Broker)选举Leader机制

所谓控制器就是一个Borker,在一个kafka集群中,有多个broker节点,但是它们之间需要选举出一个leader,其他的broker充当follower角色。集群中第一个启动的broker会通过在zookeeper中创建临时节点/controller来让自己成为控制器,其他broker启动时也会在zookeeper中创建临时节点,但是发现节点已经存在,所以它们会收到一个异常,意识到控制器已经存在,那么就会在zookeeper中创建watch对象,便于它们收到控制器变更的通知。

2)分区副本选举Leader机制

在kafka的集群中,会存在着多个主题topic,在每一个topic中,又被划分为多个partition,为了防止数据不丢失,每一个partition又有多个副本,在整个集群中,总共有三种副本角色:

默认的,如果follower与leader之间超过10s内没有发送请求,或者说没有收到请求数据,此时该follower就会被认为“不同步副本”。而持续请求的副本就是“同步副本”,当leader发生故障时,只有“同步副本”才可以被选举为leader。其中的请求超时时间可以通过参数replica.lag.time.max.ms参数来配置。

我们希望每个分区的leader可以分布到不同的broker中,尽可能的达到负载均衡,所以会有一个优先leader,如果我们设置参数auto.leader.rebalance.enable为true,那么它会检查优先leader是否是真正的leader,如果不是,则会触发选举,让优先leader成为leader。

3)消费组(consumer group)选举机制

组协调器会为消费组(consumer group)内的所有消费者选举出一个leader,这个选举的算法也很简单,第一个加入consumer group的consumer即为leader,如果某一时刻leader消费者退出了消费组,那么会重新 随机 选举一个新的leader。

六、kubernetes(k8s) helm3安装zookeeper、kafka

k8s上安装kafka,可以使用helm,将kafka作为一个应用安装。当然这首先要你的k8s支持使用helm安装。helm的介绍和安装见:Kubernetes(k8s)包管理器Helm(Helm3)介绍&Helm3安装Harbor

1)前期准备

1、创建命名空间

$ mkdir -p /opt/bigdata/kafka
$ cd /opt/bigdata/kafka
$ kubectl create namespace bigdata

2、创建持久化存储SC(bigdata-nfs-storage)

cat << EOF > bigdata-sc.yaml
apiVersion: v1
kind: ServiceAccount
metadata:
  name: nfs-client-provisioner
  # replace with namespace where provisioner is deployed
  namespace: bigdata        #根据实际环境设定namespace,下面类同
---
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: nfs-client-provisioner-runner
  namespace: bigdata
rules:
  - apiGroups: [""]
    resources: ["persistentvolumes"]
    verbs: ["get", "list", "watch", "create", "delete"]
  - apiGroups: [""]
    resources: ["persistentvolumeclaims"]
    verbs: ["get", "list", "watch", "update"]
  - apiGroups: ["storage.k8s.io"]
    resources: ["storageclasses"]
    verbs: ["get", "list", "watch"]
  - apiGroups: [""]
    resources: ["events"]
    verbs: ["create", "update", "patch"]
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: run-nfs-client-provisioner
subjects:
  - kind: ServiceAccount
    name: nfs-client-provisioner
    namespace: bigdata
    # replace with namespace where provisioner is deployed
roleRef:
  kind: ClusterRole
  name: nfs-client-provisioner-runner
  apiGroup: rbac.authorization.k8s.io
---
kind: Role
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: leader-locking-nfs-client-provisioner
  namespace: bigdata
    # replace with namespace where provisioner is deployed
rules:
  - apiGroups: [""]
    resources: ["endpoints"]
    verbs: ["get", "list", "watch", "create", "update", "patch"]
---
kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: leader-locking-nfs-client-provisioner
  namespace: bigdata
subjects:
  - kind: ServiceAccount
    name: nfs-client-provisioner
    # replace with namespace where provisioner is deployed
    namespace: bigdata
roleRef:
  kind: Role
  name: leader-locking-nfs-client-provisioner
  apiGroup: rbac.authorization.k8s.io
---
kind: Deployment
apiVersion: apps/v1
metadata:
  name: nfs-client-provisioner
  namespace: bigdata
spec:
  replicas: 1
  strategy:
    type: Recreate
  selector:
    matchLabels:
      app: nfs-client-provisioner
  template:
    metadata:
      labels:
        app: nfs-client-provisioner
    spec:
      serviceAccountName: nfs-client-provisioner
      containers:
        - name: nfs-client-provisioner
          image: quay.io/external_storage/nfs-client-provisioner:latest
          volumeMounts:
            - name: nfs-client-root
              mountPath: /persistentvolumes #容器内挂载点
          env:
            - name: PROVISIONER_NAME
              value: fuseim.pri/ifs
            - name: NFS_SERVER
              value: 192.168.0.113
            - name: NFS_PATH
              value: /opt/nfsdata
      volumes:
        - name: nfs-client-root #宿主机挂载点
          nfs:
            server: 192.168.0.113
            path: /opt/nfsdata
---
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
  name: bigdata-nfs-storage
  namespace: bigdata
provisioner: fuseim.pri/ifs  # or choose another name, must match deployment's env PROVISIONER_NAME'
reclaimPolicy: Retain        #回收策略:Retain(保留)、 Recycle(回收)或者Delete(删除)
volumeBindingMode: Immediate    #volumeBindingMode存储卷绑定策略
allowVolumeExpansion: true    #pvc是否允许扩容
EOF

执行

$ kubectl apply -f bigdata-sc.yaml
$ kubectl get sc -n bigdata
$ kubectl describe sc bigdata-nfs-storage -n bigdata
Kafka原理介绍+安装+基本操作(kafka on k8s)

3、helm添加bitnami仓库

$ helm repo add bitnami https://charts.bitnami.com/bitnami

2)部署zookeeper集群

$ helm install zookeeper bitnami/zookeeper 
--namespace bigdata 
--set replicaCount=3 --set auth.enabled=false 
--set allowAnonymousLogin=true 
--set persistence.storageClass=bigdata-nfs-storage 
--set persistence.size=1Gi
Kafka原理介绍+安装+基本操作(kafka on k8s)

查看,一定看到所有pod都是正常运行才ok

$ kubectl get pod,pv,svc -n bigdata -o wide
Kafka原理介绍+安装+基本操作(kafka on k8s)

验证
内部连接测试

$ export POD_NAME=$(kubectl get pods --namespace bigdata -l "app.kubernetes.io/name=zookeeper,app.kubernetes.io/instance=zookeeper,app.kubernetes.io/component=zookeeper" -o jsonpath="{.items[0].metadata.name}")
$ kubectl exec -it $POD_NAME -n bigdata -- zkCli.sh
Kafka原理介绍+安装+基本操作(kafka on k8s)

外部连接测试

# 先删掉本地端口对应的进程,要不然就得换连接端口了
$ netstat -tnlp|grep 127.0.0.1:2181|awk '{print int($NF)}'|xargs kill -9
# 外部连接测试
$ kubectl port-forward --namespace bigdata svc/zookeeper 2181:2181 &
# 需要本机安装zk客户端
$ zkCli.sh 127.0.0.1:21

3)部署kafka集群

1、查看zookeeper集群状态

$ helm status zookeeper -n bigdata
Kafka原理介绍+安装+基本操作(kafka on k8s)

NAME: zookeeper
LAST DEPLOYED: Sat Dec  4 13:38:16 2021
NAMESPACE: bigdata
STATUS: deployed
REVISION: 1
TEST SUITE: None
NOTES:
CHART NAME: zookeeper
CHART VERSION: 7.4.13
APP VERSION: 3.7.0

** Please be patient while the chart is being deployed **

ZooKeeper can be accessed via port 2181 on the following DNS name from within your cluster:

    zookeeper.bigdata.svc.cluster.local

To connect to your ZooKeeper server run the following commands:

    export POD_NAME=$(kubectl get pods --namespace bigdata -l "app.kubernetes.io/name=zookeeper,app.kubernetes.io/instance=zookeeper,app.kubernetes.io/                           component=zookeeper" -o jsonpath="{.items[0].metadata.name}")
    kubectl exec -it $POD_NAME -- zkCli.sh

To connect to your ZooKeeper server from outside the cluster execute the following commands:

    kubectl port-forward --namespace bigdata svc/zookeeper 2181:2181 &
    zkCli.sh 127.0.0.1:2181

ZooKeeper can be accessed via port 2181 on the following DNS name from within your cluster:

zookeeper.bigdata.svc.cluster.local

安装

$ helm install kafka bitnami/kafka 
--namespace bigdata 
--set zookeeper.enabled=false 
--set replicaCount=3 
--set externalZookeeper.servers=zookeeper.bigdata.svc.cluster.local 
--set persistence.storageClass=bigdata-nfs-storage
Kafka原理介绍+安装+基本操作(kafka on k8s)

NAME: kafka
LAST DEPLOYED: Sat Dec  4 15:37:33 2021
NAMESPACE: bigdata
STATUS: deployed
REVISION: 1
TEST SUITE: None
NOTES:
CHART NAME: kafka
CHART VERSION: 14.4.3
APP VERSION: 2.8.1

** Please be patient while the chart is being deployed **

Kafka can be accessed by consumers via port 9092 on the following DNS name from within your cluster:

    kafka.bigdata.svc.cluster.local

Each Kafka broker can be accessed by producers via port 9092 on the following DNS name(s) from within your cluster:

    kafka-0.kafka-headless.bigdata.svc.cluster.local:9092
    kafka-1.kafka-headless.bigdata.svc.cluster.local:9092
    kafka-2.kafka-headless.bigdata.svc.cluster.local:9092

To create a pod that you can use as a Kafka client run the following commands:

    kubectl run kafka-client --restart='Never' --image docker.io/bitnami/kafka:2.8.1-debian-10-r57 --namespace bigdata --command -- sleep infinity
    kubectl exec --tty -i kafka-client --namespace bigdata -- bash

    PRODUCER:
        kafka-console-producer.sh 
            --broker-list kafka-0.kafka-headless.bigdata.svc.cluster.local:9092,kafka-1.kafka-headless.bigdata.svc.cluster.local:9092,kafka-2.kafka-hea                           dless.bigdata.svc.cluster.local:9092 
            --topic test

    CONSUMER:
        kafka-console-consumer.sh 
            --bootstrap-server kafka.bigdata.svc.cluster.local:9092 
            --topic test 
            --from-beginning

查看

$ kubectl get pod,svc -n bigdata
Kafka原理介绍+安装+基本操作(kafka on k8s)

4)简单使用

测试,安装上面提示,先创建一个client

$ kubectl run kafka-client --restart='Always' --image docker.io/bitnami/kafka:2.8.1-debian-10-r57 --namespace bigdata --command -- sleep infinity

打开两个窗口(一个作为生产者:producer,一个作为消费者:consumer),但是两个窗口都得先登录客户端

$ kubectl exec --tty -i kafka-client --namespace bigdata -- bash

producer

$ kafka-console-producer.sh 
--broker-list kafka-0.kafka-headless.bigdata.svc.cluster.local:9092,kafka-1.kafka-headless.bigdata.svc.cluster.local:9092,kafka-2.kafka-headless.bigdata.svc.cluster.local:9092 
--topic test

consumer

$ kafka-console-consumer.sh 
--bootstrap-server kafka.bigdata.svc.cluster.local:9092 
--topic test 
--from-beginning

在producer端输入,consumer会实时打印

Kafka原理介绍+安装+基本操作(kafka on k8s)

1、创建Topic(一个副本一个分区)

--create: 指定创建topic动作

--topic:指定新建topic的名称

--zookeeper: 指定kafka连接zk的连接url,该值和server.properties文件中的配置项{zookeeper.connect}一样

--config:指定当前topic上有效的参数值,参数列表参考文档为: Topic-level configuration

--partitions:指定当前创建的kafka分区数量,默认为1个

--replication-factor:指定每个分区的复制因子个数,默认1个
$ kafka-topics.sh --create --topic mytest --zookeeper zookeeper.bigdata.svc.cluster.local:2181 --partitions 1 --replication-factor 1
# 查看
$ kafka-topics.sh --describe --zookeeper zookeeper.bigdata.svc.cluster.local:2181  --topic mytest
Kafka原理介绍+安装+基本操作(kafka on k8s)

2、删除Topic

# 先查看topic列表
$ kafka-topics.sh --list --zookeeper zookeeper.bigdata.svc.cluster.local:2181
# 删除
$ kafka-topics.sh --delete --topic mytest --zookeeper zookeeper.bigdata.svc.cluster.local:2181
# 再查看,发现topic还在
$ kafka-topics.sh --list --zookeeper zookeeper.bigdata.svc.cluster.local:2181
Kafka原理介绍+安装+基本操作(kafka on k8s)

其实上面没删除,只是标记了(只会删除zookeeper中的元数据,消息文件须手动删除)

Note: This will have no impact if delete.topic.enable is not set to true.## 默认情况下,删除是标记删除,没有实际删除这个Topic;如果运行删除Topic,两种方式:

3、修改Topic信息

kafka默认的只保存7天的数据,时间一到就删除数据,当遇到磁盘过小,存放的数据量过大,可以设置缩短这个时间。

# 先创建一个topic
$ kafka-topics.sh --create --topic test001 --zookeeper zookeeper.bigdata.svc.cluster.local:2181 --partitions 1 --replication-factor 1
# 修改,设置数据过期时间(-1表示不过期)
$ kafka-topics.sh --zookeeper zookeeper.bigdata.svc.cluster.local:2181 -topic test001 --alter --config retention.ms=259200000
# 修改多字段
$ kafka-topics.sh --zookeeper zookeeper.bigdata.svc.cluster.local:2181 -topic test001 --alter --config max.message.bytes=128000 retention.ms=259200000
$ kafka-topics.sh --describe --zookeeper zookeeper.bigdata.svc.cluster.local:2181  --topic test001
Kafka原理介绍+安装+基本操作(kafka on k8s)

4、增加topic分区数

$ kafka-topics.sh --zookeeper zookeeper.bigdata.svc.cluster.local:2181 --alter --topic test --partitions 10
$  kafka-topics.sh --describe --zookeeper zookeeper.bigdata.svc.cluster.local:2181  --topic test
Kafka原理介绍+安装+基本操作(kafka on k8s)

5、查看Topic列表

$ kafka-topics.sh --list --zookeeper zookeeper.bigdata.svc.cluster.local:2181
Kafka原理介绍+安装+基本操作(kafka on k8s)

6、列出所有主题中的所有用户组

$ kafka-consumer-groups.sh --bootstrap-server kafka-0.kafka-headless.bigdata.svc.cluster.local:9092 --list

7、查询消费者组详情(数据积压情况)

# 生产者
$ kafka-console-producer.sh 
--broker-list kafka-0.kafka-headless.bigdata.svc.cluster.local:9092,kafka-1.kafka-headless.bigdata.svc.cluster.local:9092,kafka-2.kafka-headless.bigdata.svc.cluster.local:9092 
--topic test
# 消费者带group.id
$ kafka-console-consumer.sh --bootstrap-server kafka-0.kafka-headless.bigdata.svc.cluster.local:9092 --topic test --consumer-property group.id=mygroup
# 查看消费组情况
$ kafka-consumer-groups.sh --bootstrap-server kafka-0.kafka-headless.bigdata.svc.cluster.local:9092 --describe --group mygroup 
Kafka原理介绍+安装+基本操作(kafka on k8s)

消费积压情况分析

生产和消费的操作上面已经实验过了,这里就不再重复了,更多的操作,可以参考kafka官方文档

展开阅读全文

页面更新:2024-05-17

标签:都会   生产者   副本   节点   集群   控制器   分区   原理   数量   消息   操作   数据

1 2 3 4 5

上滑加载更多 ↓
推荐阅读:
友情链接:
更多:

本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828  

© CopyRight 2008-2024 All Rights Reserved. Powered By bs178.com 闽ICP备11008920号-3
闽公网安备35020302034844号

Top