大数据Flume技术解析

(一)Flume概述、Flume快速入门

1 Flume 概述

1.1 Flume 定义

Flume 是Cloudera 提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统。Flume 基于流式架构,灵活简单。


Flume 最主要的作用就是,实时读取服务器本地磁盘的数据,将数据写入到 HDFS。

1.2 Flume 基础架构

1.2.1 Agent

Agent 是一个 JVM 进程,它以事件的形式将数据从源头送至目的。
Agent 主要由 3 个部分组成, SourceChannelSink

1.2.2 Source

Source 是负责接收数据到 Flume Agent 的组件。 Source 组件可以处理各种类型、各种格式的日志数据,包括avrothriftexecjmsspooling directorynetcatsequence generatorsysloghttplegacy

1.2.3 Sink

Sink 不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent 。
Sink组件目的地包括
hdfsloggeravrothriftipcfileHBasesolr 、自定义。

1.2.4 Channel

Channel 是位于 Source 和 Sink 之间的缓冲区。因此, Channel 允许 Source 和 Sink 运作在不同的速率上。 Channel 是线程安全的,可以同时处理几个 Source 的写入操作和几个Sink 的读取操作。

Flume 自带三种 Channel:Memory ChannelFile Channel 以及 Kafka Channel

Memory Channel是内存中的队列。 Memory Channel 在不需要关心数据丢失的情景下适用。如果需要关心数据丢失,那么 Memory Channel 就不应该使用,因为程序死亡、机器宕
机或者重启都会导致数据丢失。
File Channel 将所有事件写到磁盘。因此在程序关闭或机器宕机的情况下不会丢失数据。

1.2.5 Event

传输单元,Flume 数据传输的基本单元,以 Event 的形式将数据从源头送至目的地。Event 由HeaderBody 两部分组成, Header 用来存放该 event 的一些属性,为 K-V 结构,Body 用来存放该条数据,形式为字节数组。

2 Flume 快速入门

2.1 Flume 安装部署

2.1.1 安装地址

(1)Flume 官网地址
http://flume.apache.org/

(2)文档查看地址
http://flume.apache.org/FlumeUserGuide.html

(3)下载地址
http://archive.apache.org/dist/flume/

2.1.2 安装部署

(1)将 apache-flume-1.7.0-bin.tar.gz 上传到 linux 的/opt/software 目录下

(2)解压 apache-flume-1.7.0 bin.tar.gz 到/opt/module/目录下

[Tom@hadoop102 software]$ tar -zxf apache-flume-1.7.0-bin.tar.gz -C /opt/module/
1

(3)修改 apache-flume-1.7.0-bin 的名称为 flume-1.7.0

[Tom@hadoop102 module]$ mv apache-flume-1.7.0-bin flume-1.7.0
1

(4)将 flume/conf 下的 flume-env.sh.template 文件修改为 flume-env.sh,并配置 flume-env.sh 文件

[Tom@hadoop102 conf]$ mv flume-env.sh.template flume-env.sh
[Tom@hadoop102 conf]$ vim flume-env.sh
export JAVA_HOME=/opt/module/jdk1.8.0_212
123

2.2 Flume 入门案例

2.2.1 监控端口数据官方案例

1. 案例需求
使用 Flume 监听一个端口, 收集该端口数据 ,并打印到控制台。

2. 需求分析


3. 实现步骤
(1)安装 netcat 工具

[Tom@hadoop102 software]$ sudo yum install -y nc
1

(2)判断44444 端口是否被占用

[Tom@hadoop102 flume-1.7.0]$ sudo netstat -tunlp | grep 44444
1

(3)创建Flume Agent 配置文件flume-netcat-logger.conf
在 flume-1.7.0 目录下创建 job 文件夹并进入 job 文件夹。

[Tom@hadoop102 flume]$ mkdir job
[Tom@hadoop102 flume]$ cd job/
12

在 job 文件夹下创建 Flume Agent 配置文件netcat-flume-logger.conf

[Tom@hadoop102 job]$ vim netcat-flume-logger.conf
1

netcat-flume-logger.conf 文件中添加如下内容:

# Name the components on this agent    a1:表示agent的名称
a1.sources = r1    # r1:表示a1的Source的名称
a1.sinks = k1    # k1:表示a1的Sink的名称
a1.channels = c1    # c1: 表示a1的Channel的名称

# Describe/configure the source
a1.sources.r1.type = netcat    # 表示a1的输入源类型为netcat端口类型
a1.sources.r1.bind = localhost    # 表示a1的监听的主机
a1.sources.r1.port = 44444    # 表示a1的监听的端口号

# Describe the sink
a1.sinks.k1.type = logger    # 表示a1的输出目的地是控制台logger类型

# Use a channel which buffers events in memory
a1.channels.c1.type = memory    # 表示a1的channel类型是memory内存型
a1.channels.c1.capacity = 1000    # 表示a1的channel总容量为1000个event
a1.channels.c1.transactionCapacity = 100    # 表示a1的channel传输时收集到了100条event以后再去提交事务

# Bind the source and sink to the channel
a1.sources.r1.channels = c1    # 表示将r1和c1连接起来
a1.sinks.k1.channel = c1    # 表示将k1和c1连接起来
123456789101112131415161718192021

注:配置文件来源于官方手册 http://flume.apache.org/FlumeUserGuide.html

(4)先开启 flume 监听端口
第一种写法:

[Tom@hadoop102 flume-1.7.0]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/netcat-flume-logger.conf  -Dflume.root.logger=INFO,console
1

第二种写法

[Tom@hadoop102 flume-1.7.0]$ bin/flume-ng agent -c conf/ -n a1 -f job/netcat-flume-logger.conf -Dflume.root.logger=INFO,console
1

参数说明:
--conf/-c:表示配置文件存储在conf/目录
--name/-n:表示给agent 起名为a1
--conf-file/-f:flume 本次启动读取的配置文件是在job 文件夹下的flume-telnet.conf文件。
-Dflume.root.logger=INFO,console :-D 表示 flume 运行时动态修改flume.root.logger 参数属性值,并将控制台日志打印级别设置为INFO 级别。日志级别包括:log、info、warn、error。

(5)使用 netcat 工具向本机的 44444 端口发送内容

[Tom@hadoop102 job]$ nc localhost 44444
hello
OK
HUST
OK
12345

(6)在Flume 监听页面观察接收数据情况

2.2.2 实时监控单个追加文件

1. 案例需求:实时监控 Hive 日志,并上传到 HDFS 中

2. 需求分析


3. 实现步骤
(1)Flume 要想将数据输出到HDFS,须持有Hadoop 相关jar 包。将以下 jar 包
拷贝到
opt/module/flume-1.7.0/lib文件夹下。

(2)创建file-flume-hdfs.conf文件

[Tom@hadoop102 job]$ vim file-flume-hdfs.conf
1

注:要想读取 Linux 系统中的文件,就得按照 Linux 命令的规则执行命令。由于Hive 日志在 Linux 系统中,所以读取文件的类型选择:exec 即 execute 执行的意思。表示执行Linux 命令来读取文件。
添加如下内容

# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2

# Describe/configure the source
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /opt/module/hive-3.1.2/logs/hive.log

# Describe the sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://hadoop102:8020/flume/%Y%m%d/%H
#上传文件的前缀
a2.sinks.k2.hdfs.filePrefix = logs-
#是否按照时间滚动文件夹
a2.sinks.k2.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k2.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k2.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a2.sinks.k2.hdfs.batchSize = 1000
#设置文件类型,可支持压缩
a2.sinks.k2.hdfs.fileType = DataStream
#多久生成一个新的文件
a2.sinks.k2.hdfs.rollInterval = 30
#设置每个文件的滚动大小
a2.sinks.k2.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a2.sinks.k2.hdfs.rollCount = 0

# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
1234567891011121314151617181920212223242526272829303132333435363738394041

注意:对于所有与时间相关的转义序列,Event Header 中必须存在以’ timestamp’的 key (除非hdfs.useLocalTimeStamp 设置为 true ,此方法会使用 TimestampInterceptor 自动添加 timestamp)。a3.sinks.k3.hdfs.useLocalTimeStamp = true

(3)运行 Flume

[Tom@hadoop102 flume-1.7.0]$ bin/flume-ng agent -c conf/ -n a2 -f job/file-flume-hdfs.conf
1

(4)开启 Hadoop 和 Hive 并操作 Hive 产生日志

[Tom@hadoop102 hadoop-3.1.3]$ sbin/start-dfs.sh
[Tom@hadoop102 hadoop-3.1.3]$ sbin/start-yarn.sh
[Tom@hadoop102 hive-3.1.2]$ bin/hive
123

(5)在HDFS上查看文件

2.3.3 实时监控目录下多个新文件

1. 案例需求:使用 Flume 监听整个目录的文件,并上传至HDFS

2. 需求分析


3. 实现步骤

(1)创建配置文件dir-flume-hdfs.conf
创建一个文件

[Tom@hadoop102 job]$ vim dir-flume-hdfs.conf
1

添加如下内容

# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2

# Describe/configure the source
a2.sources.r2.type = spooldir
a2.sources.r2.spoolDir = /opt/module/flume-1.7.0/upload
#忽略所有以.tmp结尾的文件,不上传
a2.sources.r2.ignorePattern = ([^ ]*.tmp)

# Describe the sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://hadoop102:8020/flume/%Y%m%d/%H
#上传文件的前缀
a2.sinks.k2.hdfs.filePrefix = upload-
#是否按照时间滚动文件夹
a2.sinks.k2.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k2.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k2.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a2.sinks.k2.hdfs.batchSize = 1000
#设置文件类型,可支持压缩
a2.sinks.k2.hdfs.fileType = DataStream
#多久生成一个新的文件
a2.sinks.k2.hdfs.rollInterval = 30
#设置每个文件的滚动大小
a2.sinks.k2.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a2.sinks.k2.hdfs.rollCount = 0

# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
12345678910111213141516171819202122232425262728293031323334353637383940414243

(2)启动监控文件夹命令

[Tom@hadoop102 flume-1.7.0]$ bin/flume-ng agent -c conf/ -n a2 -f job/dir-flume-hdfs.conf
1

说明:在使用 Spooling Directory Source 时不要在监控目录中创建并持续修改文件,上传完成的文件会以.COMPLETED结尾,被监控文件夹每 500 毫秒扫描一次文件变动。

(3)向upload 文件夹中添加文件
/opt/module/flume-1.7.0目录下创建 upload 文件夹

[Tom@hadoop102 flume]$ mkdir upload
1

向 upload 文件夹中添加文件

[huxili@hadoop102 upload]$ touch hust.txt
[huxili@hadoop102 upload]$ touch hust.tmp
[huxili@hadoop102 upload]$ touch hust.log
123

(4) 查看 HDFS 上的数据


(5)等待 1s,再次查询 upload 文件夹

[Tom@hadoop102 upload]$ ll
-rw-rw-r--. 1 Tom Tom  0 9月  11 20:38 hust.log.COMPLETED
-rw-rw-r--. 1 Tom Tom  0 9月  11 20:38 hust.tmp
-rw-rw-r--. 1 Tom Tom  0 9月  11 20:38 hust.txt.COMPLETED
1234

2.2.4 实时监控目录下的多个追加文件

Exec source适用于监控一个实时追加的文件,但不能保证数据不丢失;Spooldir Source能够保证数据不丢失,且能够实现断点续传,但延迟较高,不能实时监控;而Taildir Source既能够实现断点续传,又可以保证数据不丢失,还能够进行实时监控。
1. 案例需求:使用 Flume 监听整个目录的实时追加文件,并上传至HDFS。(在实际操作中我们直接打印到控制台,这样更直观)

2. 需求分析:


3. 实现步骤
(1)创建配置文件flume-taildir-hdfs.conf

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /opt/module/flume-1.7.0/files/file1.txt
a1.sources.r1.filegroups.f2 = /opt/module/flume-1.7.0/files/file2.txt
a1.sources.r1.positionFile = /opt/module/flume-1.7.0/position/position.json

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
12345678910111213141516171819202122

(2)启动监控文件夹命令

[Tom@hadoop102 flume-1.7.0]$ bin/flume-ng agent -c conf/ -n a1 -f job/flume-taildir-hdfs.conf
1

(3)向 files 文件夹中追加内容

[Tom@hadoop102 flume]$ mkdir files
1

向 upload 文件夹中添加文件

[Tom@hadoop102 files]$ echo hello >> file1.txt 
[Tom@hadoop102 files]$ echo hust >> file2.txt 
12

(4)查看数据


Taildir 说明:
Taildir Source 维护了一个json 格式的 position File,其会定期的往 position File 中更新每个文件读取到的最新的位置,因此能够实现断点续传。Position File 的格式如下:

{"inode":2496272,"pos":12,"file":"/opt/module/flume/files/file1.txt"}
{"inode":2496275,"pos":12,"file":"/opt/module/flume/files/file2.txt"}
12

注:Linux 中储存文件元数据的区域就叫做 inode,每个 inode 都有一个号码,操作系统用 inode 号码来识别不同的文件,Unix/Linux 系统内部不使用文件名,而使用 inode 号码来识别文件。

(二)Flume进阶、常见问题

1Flume进阶

1.1 Flume 事务

1.2 Flume Agent 内部原理


重要组件
(1)ChannelSelector
ChannelSelector的作用就是选出 Event 将要被发往哪个 Channel。其共有两种类型,分别是
Replicating(复制)和Multiplexing(多路复用)。
ReplicatingSelector 会将同一个 Event 发往所有的 Channel,Multiplexing 会根据相应的原则,将不同的 Event 发往不同的 Channel 。

(2)SinkProcessor
SinkProcessor 共有三种类型,分别是
DefaultSinkProcessorLoadBalancingSinkProcessorFailoverSinkProcessor
DefaultSinkProcessor 对应的是单个的 Sink,LoadBalancingSinkProcessor 和 FailoverSinkProcessor 对应的是 Sink Group,LoadBalancingSinkProcessor 可以实现负载均衡的功能,FailoverSinkProcessor 可以实现故障转移的功能。

1.3 Flume 拓扑结构

1.3.1 简单串联


这种模式是将多个 flume 顺序连接起来了,从最初的 source 开始到最终 sink 传送的目的存储系统。此模式不建议桥接过多的 flume 数量,flume 数量过多不仅会影响传输速率,而且一旦传输过程中某个节点 flume 宕机,会影响整个传输系统。

1.3.2 复制和多路复用


Flume 支持将事件流向一个或者多个目的地。这种模式可以将相同数据复制到多个 channel 中,或者将不同数据分发到不同的 channel 中, sink 可以选择传送到不同的目的地。

1.3.3 负载均衡和故障转移


Flume 支持使用将多个 sink 逻辑上分到一个 sink 组, sink 组配合不同的 SinkProcessor 可以实现负载均衡和错误恢复的功能。

1.3.4 聚合


这种模式是我们最常见的,也非常实用,日常 web 应用通常分布在上百个服务器,大者甚至上千个、上万个服务器。产生的日志,处理起来也非常麻烦。用flume 的这种组合方式能很好的解决这一问题,每台服务器部署一个flume 采集日志,传送到一个集中收集日志的 flume,再由此flume 上传到hdfs、hive、hbase 等,进行日志分析。

1.4 Flume 企业开发案例

1.4.1 复制和多路复用

1. 案例需求
使用 Flume-1 监控文件变动,Flume-1 将变动内容传递给Flume-2,Flume-2 负责存储到 HDFS。同时 Flume-1 将变动内容传递给Flume-3,Flume-3 负责输出到 Local FileSystem。

2. 需求分析


3. 实现步骤
(1)准备工作
/opt/module/flume-1.7.0/job目录下创建 group1 文件夹

[Tom@hadoop102 job]$ cd group1/
1

/opt/module/flume-1.7.0/datas/目录下创建 flume3 文件夹

[Tom@hadoop102 datas]$ mkdir flume3
1

(2)创建flume-file-flume.conf
配置1 个接收日志文件的source 和两个channel、两个sink,分别输送给 flume-flume-hdfs 和 flume-flume-dir 。
编辑配置文件

[Tom@hadoop102 group1]$ vim flume-file-flume.conf
1

添加如下内容

#name
a1.sources = r1
a1.channels = c1 c2
a1.sinks = k1 k2

#Source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/flume-1.7.0/data/hive.log
a1.sources.r1.positionFile = /opt/module/flume-1.7.0/position/position1.json

# 将数据流复制给所有channel
a1.sources.r1.selector.type = replicating

#Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

# Sink
# sink 端的 avro 是一个数据发送者
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142

#Bind
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
12345678910111213141516171819202122232425262728293031323334353637

(3)创建 flume-flume-hdfs .conf
配置上级 Flume 输出的 Source,输出是到 HDFS 的 Sink 。
编辑配置文件

[Tom@hadoop102 group1]$ vim flume-flume-hdfs.conf
1

添加如下内容

#name
a2.sources = r1
a2.channels = c1
a2.sinks = k1

#Source
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141

#Channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

#Sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://hadoop102:8020/group1/%Y%m%d/%H
#上传文件的前缀
a2.sinks.k1.hdfs.filePrefix = logs-
#是否按照时间滚动文件夹
a2.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a2.sinks.k1.hdfs.batchSize = 1000
#设置文件类型,可支持压缩
a2.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
a2.sinks.k1.hdfs.rollInterval = 30
#设置每个文件的滚动大小
a2.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a2.sinks.k1.hdfs.rollCount = 0

#Bind
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
123456789101112131415161718192021222324252627282930313233343536373839404142

(4)创建 flume-flume-dir .conf
配置上级 Flume 输出的 Source,输出是到本地目录的 Sink 。
编辑配置文件

[Tom@hadoop102 group1]$ vim flume-flume-dir.conf
1

添加如下内容

#name
a3.sources = r1
a3.channels = c1
a3.sinks = k1

#Source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142

#Channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100

#Sink
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /opt/module/flume-1.7.0/data/group1

#Bind
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
12345678910111213141516171819202122

提示:输出的本地目录必须是已经存在的目录,如果该目录不存在,并不会创建新的目录。

(5)执行配置文件
分别启动对应的 flume 进程:flume-flume-dir,flume-flume-hdfs,flume-file-flume 。

[Tom@hadoop102 flume-1.7.0]$ bin/flume-ng agent -c conf/ -n a3 -f job/group1/flume-flume-dir.conf

[Tom@hadoop102 flume-1.7.0]$ bin/flume-ng agent -c conf/ -n a2 -f job/group1/flume-flume-hdfs.conf

[Tom@hadoop102 flume-1.7.0]$ bin/flume-ng agent -c conf/ -n a1 -f job/group1/flume-file-flume.conf
12345

(6)启动 Hadoop 并向 hive.log 添加数据

[Tom@hadoop102 hadoop-3.1.3]$ sbin/start-dfs.sh
[Tom@hadoop102 hadoop-3.1.3]$ sbin/start-yarn.sh

[Tom@hadoop102 data]$ echo hello >> hive.log 
[Tom@hadoop102 data]$ echo hust >> hive.log 
12345

(7)检查 HDFS 上数据

(8)检查 /opt/module/flume-1.7.0/datas/flume3目录中数据

总用量 16
-rw-rw-r--. 1 Tom Tom  6 9月  12 22:02 1631453983368-46
-rw-rw-r--. 1 Tom Tom  5 9月  12 22:02 1631453983368-47
-rw-rw-r--. 1 Tom Tom  0 9月  12 22:03 1631453983368-48
1234

1.4.2 负载均衡和故障转移

1. 案例需求
使用 Flume1 监控一个端口,其 sink 组中的 sink 分别对接 Flume2 和 Flume3 ,采用 FailoverSinkProcessor ,实现故障转移的功能。

2. 需求分析


3. 实现步骤
(1)准备工作
/opt/module/flume-1.7.0/job目录下创建 group2 文件夹

[Tom@hadoop102 job]$ cd group2/
1

(2)创建 flume-netcat-flume.conf
配置1 个 netcat source 和1 个channel、1 个sink group(2 个sink),分别输送给 flume-flume-console1 和 flume-flume-console2。
编辑配置文件

[Tom@hadoop102 group2]$ vim flume-netcat-flume.conf
1

添加如下内容

#name
a1.sources = r1
a1.channels = c1
a1.sinks = k1 k2
a1.sinkgroups = g1

#Source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

#Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

#Sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142

#SinkGroup
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000

#Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
123456789101112131415161718192021222324252627282930313233343536

(3)创建 flume-flume-console1 .conf
配置上级 Flume 输出的 Source,输出是到本地控制台。
编辑配置文件

[Tom@hadoop102 group2]$ vim flume-flume-console1.conf
1

添加如下内容

#name
a2.sources = r1
a2.channels = c1
a2.sinks = k1

#Source
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141

#Channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

#Sink
a2.sinks.k1.type = logger

#Bind
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
123456789101112131415161718192021

(4)创建 flume-flume-console2 .conf
配置上级 Flume 输出的 Source,输出是到本地控制台。
编辑配置文件

[Tom@hadoop102 group2]$ vim flume-flume-console2.conf
1

添加如下内容

#name
a3.sources = r1
a3.channels = c1
a3.sinks = k1

#Source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142

#Channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100

#Sink
a3.sinks.k1.type = logger

#Bind
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
123456789101112131415161718192021

(5)执行配置文件
分别开启对应配置文件:flume-flume-console2,flume-flume-console1,flume-netcat-flume 。

[Tom@hadoop102 flume-1.7.0]$ bin/flume-ng agent -c conf/ -n a3 -f job/group2/flume-flume-console2.conf -Dflume.root.logger=INFO,console

[huxili@hadoop102 flume-1.7.0]$ bin/flume-ng agent -c conf/ -n a2 -f job/group2/flume-flume-consosole1.conf -Dflume.root.logger=INFO,console

[huxili@hadoop102 flume-1.7.0]$ bin/flume-ng agent -c conf/ -n a1 -f job/group2/flume-netcat-flume.conf
12345

(6)使用 netcat 工具向本机的 44444 端口发送内容

[Tom@hadoop102 flume-1.7.0]$ nc localhost 44444
1

(7)查看flume-flume-console2 及 flume-flume-console1 的控制台打印日志

(8)将 flume-flume-console2 kill ,观察 flume-flume-console1 的控制台打印情况。

使用jps -ml查看 Flume 进程

[Tom@hadoop102 job]$ jps -ml
5696 org.apache.flume.node.Application -n a3 -f job/group2/flume-flume-console2.conf
5430 org.apache.flume.node.Application -n a1 -f job/group2/flume-netcat-flume.conf
5275 org.apache.flume.node.Application -n a2 -f job/group2/flume-flume-console1.conf
3581 org.apache.hadoop.hdfs.server.datanode.DataNode
5821 sun.tools.jps.Jps -ml
3438 org.apache.hadoop.hdfs.server.namenode.NameNode
1234567

1.4.3 聚合

1. 案例需求
hadoop102 上的 Flume1 监控文件
opt/module/data/group.log
hadoop103 上的 Flume2 监控某一个端口的数据流,
Flume1 与 Flume2 将数据发送给 hadoop104 上的 Flume3,Flume3 将最终数据打印到控制台。

2. 需求分析


3. 实现步骤
(1)准备工作
分发 Flume

[Tom@hadoop102 module]$ xsync flume
1

在hadoop102、hadoop103 以及hadoop104 的/opt/module/flume-1.7.0/job目录下创建一个group3 文件夹。

[Tom@hadoop102 job]$ mkdir group3
[Tom@hadoop103 job]$ mkdir group3
[Tom@hadoop104 job]$ mkdir group3
123

(2)创建 flume1-logger-flume.conf
配置 Source 用于监控 hive.log 文件,配置 Sink 输出数据到下一级 Flume。
在 hadoop102 上编辑配置文件

[Tom@hadoop102 group3]$ vim flume1-logger-flume.conf
1

添加如下内容

#name
a2.sources = r1
a2.channels = c1
a2.sinks = k1

#Source
a2.sources.r1.type = TAILDIR
a2.sources.r1.filegroups = f1
a2.sources.r1.filegroups.f1 = /opt/module/flume-1.7.0/data/flume.log
a2.sources.r1.positionFile = /opt/module/flume-1.7.0/position/position2.json

#Channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

#Sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = hadoop104
a2.sinks.k1.port = 4141

#Bind
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
123456789101112131415161718192021222324

(3)创建 flume2-netcat-flume.conf
配置 Source 监控端口 44444 数据流,配置 Sink 数据到下一级 Flume:在 hadoop103 上编辑配置文件

[Tom@hadoop103 group3 ]$ vim flume2-netcat-flume.conf
1

添加如下内容

#name
a3.sources = r1
a3.channels = c1
a3.sinks = k1

#Source
a3.sources.r1.type = netcat
a3.sources.r1.bind = localhost
a3.sources.r1.port = 44444

#Channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100

#Sink
a3.sinks.k1.type = avro
a3.sinks.k1.hostname = hadoop104
a3.sinks.k1.port = 4142

#Bind
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
1234567891011121314151617181920212223

(4)创建 flume3-flume-logger.conf
配置 source 用于接收 flume1 与 flume2 发送过来的数据流,最终合并后 sink 到控制台。
在 hadoop104 上编辑配置文件

[Tom@hadoop104 group3 ]$ touch flume3-flume-logger.conf
[Tom@hadoop104 group3 ]$ vim flume3-flume-logger.conf
12

添加如下内容

#name
a4.sources = r1 r2
a4.channels = c1
a4.sinks = k1

#Source
a4.sources.r1.type = avro
a4.sources.r1.bind = hadoop104
a4.sources.r1.port = 4141

a4.sources.r2.type = avro
a4.sources.r2.bind = hadoop104
a4.sources.r2.port = 4142

#Channel
a4.channels.c1.type = memory
a4.channels.c1.capacity = 1000
a4.channels.c1.transactionCapacity = 100

#Sink
a4.sinks.k1.type = logger

#Bind
a4.sources.r1.channels = c1
a4.sources.r2.channels = c1
a4.sinks.k1.channel = c1
1234567891011121314151617181920212223242526

(5) 执行配置文件
分别开启对应配置文件:flume3-flume-logger.conf,flume2-netcat-flume.conf,flume1-logger-flume.conf 。

[Tom@hadoop104 flume-1.7.0]$ bin/flume-ng agent -c conf/ -n a4 -f job/group4/flume3-flume-logger.conf -Dflume.root.logger=INFO,console

[Tom@hadoop103 flume-1.7.0]$ bin/flume-ng agent -c conf/ -n a3 -f job/group4/flume2-netcat-flume.conf

[huxili@hadoop102 flume-1.7.0]$ bin/flume-ng agent -c conf/ -n a2 -f job/group4/flume1-logger-flume.conf  
12345

(6)在 hadoop102 上向 /opt/module/flume-1.7.0/data/目录下的 group .log 追加内容

[Tom@hadoop102 data]$ echo "hello" >> flume.log 
1

(7)在 hadoop103 上向 44444 端口发送数据

[Tom@hadoop103 flume-1.7.0]$ nc localhost 44444
hust
OK
123

(8)检查 hadoop104 上数据

1.5 自定义 Interceptor

1. 案例需求
使用 Flume 采集服务器本地日志,需要按照日志类型的不同,将不同种类的日志发往不同的分析系统。

2. 需求分析
在实际的开发中,一台服务器产生的日志类型可能有很多种,不同类型的日志可能需要发送到不同的分析系统。此时会用到 Flume 拓扑结构中的 Multiplexing 结构, Multiplexing 的原理是,根据 event 中 Header 的某个 key 的值,将不同的 event 发送到不同的 Channel 中,所以我们需要自定义一个Interceptor,为不同类型的event 的Header 中的key 赋予不同的值。
在该案例中,我们以端口数据模拟日志,以数字(单个)和字母(单个)模拟不同类型的日志,我们需要自定义 interceptor 区分数字和字母,将其分别发往不同的分析系统(Channel)(实际测试时,我们测试字符串是否包含’‘hello’’)。


3. 实现步骤
(1)创建一个 maven 项目,并引入以下依赖。


        
            org.apache.flume
            flume-ng-core
            1.7.0
        

1234567

(2)定义 CustomInterceptor 类并实现 Interceptor 接口。

package com.Tom.interceptor;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class TypeInterceptor implements Interceptor {
    // 声明一个存放事件的集合
    private List addHeaderEvents;

    @Override
    public void initialize() {
        // 初始化
        addHeaderEvents = new ArrayList();
    }

    @Override
    // 单个事件拦截
    public Event intercept(Event event) {
        // 1. 获取事件的头信息
        Map headers = event.getHeaders();

        // 2. 获取事件中的body信息
        String body = new String(event.getBody());

        // 3. 根据body中是否有"hello"来决定添加怎样的头信息
        if(body.contains("hello")){
            // 4. 添加头信息
            headers.put("topic", "first");
        } else {
            headers.put("topic", "second");
        }
        return event;
    }

    @Override
    // 批量事件拦截
    public List intercept(List events) {
        // 1. 清空集合
        addHeaderEvents.clear();
        // 2. 遍历events
        for (Event event : events){
            // 3. 给每一个事件添加头信息
            addHeaderEvents.add(intercept(event));
        }
        // 4. 返回结果
        return addHeaderEvents;
    }

    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder{
        @Override
        public Interceptor build() {
            return new TypeInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }
}
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970

(3)编辑 flume 配置文件
为 hadoop102 上的 Flume1 配置 1 个 netcat source,1 个 sink group(2 个 avro sink),并配置相应的 ChannelSelector 和 interceptor 。

#Name
a2.sources = r1
a2.channels = c1 c2
a2.sinks = k1 k2

#Source
a2.sources.r1.type = netcat
a2.sources.r1.bind = localhost
a2.sources.r1.port = 44444

#Interceptor
a2.sources.r1.interceptors = i1
a2.sources.r1.interceptors.i1.type = com.Tom.interceptor.TypeInterceptor$Builder

#Channel Selector
a2.sources.r1.selector.type = multiplexing
a2.sources.r1.selector.header = topic
a2.sources.r1.selector.mapping.first = c1
a2.sources.r1.selector.mapping.second = c2

#Channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100

#Sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = hadoop103
a2.sinks.k1.port = 4141

a2.sinks.k2.type = avro
a2.sinks.k2.hostname = hadoop104
a2.sinks.k2.port = 4142

#Bind
a2.sources.r1.channels = c1 c2
a2.sinks.k1.channel = c1
a2.sinks.k2.channel = c2
123456789101112131415161718192021222324252627282930313233343536373839404142

为 hadoop103 上的 Flume 2 配置一个 avro source 和一个 logger sink 。

#Name
a3.sources = r1
a3.sinks = k1
a3.channels = c1

#Source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop103
a3.sources.r1.port = 4141

#Sink
a3.sinks.k1.type = logger

#Channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100

#Bind
a3.sinks.k1.channel = c1
a3.sources.r1.channels = c1
123456789101112131415161718192021

为hadoop104 上的 Flume3 配置一个 avro source 和一个 logger sink 。

#Name
a4.sources = r1
a4.sinks = k1
a4.channels = c1

#Source
a4.sources.r1.type = avro
a4.sources.r1.bind = hadoop104
a4.sources.r1.port = 4142

#Sink
a4.sinks.k1.type = logger
a4.channels.c1.type = memory
a4.channels.c1.capacity = 1000
a4.channels.c1.transactionCapacity = 100

# Channel
a4.sinks.k1.channel = c1
a4.sources.r1.channels = c1
12345678910111213141516171819

(4)分别在 hadoop103,hadoop104,hadoop102 上启动 flume 进程 (注意启动顺序)。

[Tom@hadoop103 flume-1.7.0]$ bin/flume-ng agent -c conf/ -n a3 -f job/interceptor/flume3.conf -Dflume.root.logger=INFO,console

[Tom@hadoop104 flume-1.7.0]$ bin/flume-ng agent -c conf/ -n a4 -f job/interceptor/flume4.conf -Dflume.root.logger=INFO,console

[Tom@hadoop102 flume-1.7.0]$ bin/flume-ng agent -c conf/ -n a2 -f job/interceptor/flume2.conf 
12345

(5)在 hadoop102 使用 netcat 向 localhost:44444 发送字符串。

[Tom@hadoop102 flume-1.7.0]$ nc localhost 44444
helloworld
OK
world   
OK
thanks
OK
hello hust
OK
123456789

(6)观察 hadoop103 和 hadoop104 打印的日志


1.6 自定义 Source

1. 介绍
Source 是负责接收数据到 Flume Agent 的 组件。 Source 组件可以处理各种类型、各种格式的日志数据 包括 avro 、 thrift 、 exec 、 jms 、 spooling directory 、 netcat 、 sequence、generator 、 syslog 、 http 、 legacy 。官方提供的 source 类型已经很多,但是有时候并不能满足实际开发当中的需求,此时我们就需要根据实际需求自定义某些 source 。
官方也提供了自定义 source 的接口:
https://flume.apache.org/FlumeDeveloperGuide.html#source
根据官方说明自定义 MySource 需要继承 AbstractSource 类并实现 Configurable 和 PollableSource 接口。
实现相应方法:
getBackOffSleepIncrement() //暂不用
getMaxBackOffSleepInterval() //暂不用
configure(Context context) //初始化 context (读取配置文件内容)
process() //获取数据封装成 event 并写入 channel ,这个方法将被循环调用。
使用场景:读取 MySQL 数据或者其他文件系统。

2. 需求
使用 flume 接收数据,并给每条数据添加前缀,输出到控制台。前缀可从 flume 配置文件中配置。


3. 分析


4. 编码
导入 pom 依赖


        
            org.apache.flume
            flume-ng-core
            1.7.0
        
    
1234567

编写代码

package source;

import org.apache.flume.Context;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;

public class MySource extends AbstractSource implements Configurable, PollableSource {
    // 定义全局的前缀和后缀
    private String prefix;
    private String subfix;

    /**
     * 1. 接受数据(for循环造数据)
     * 2. 封装为事件
     * 3. 将事件传给channel
     */
    @Override
    public Status process() throws EventDeliveryException {
        Status status = null;
        // 1. 接收数据
        try {
            for (int i = 0; i < 5; ++i){
                // 2. 构建事件对象
                SimpleEvent evevt = new SimpleEvent();

                //3. 给事件设置值
                evevt.setBody((prefix + "--" + i + "--" + subfix).getBytes());

                //4. 将事件传给channel
                getChannelProcessor().processEvent(evevt);

                status = Status.READY;
            }
        } catch (Exception e) {
            e.printStackTrace();
            status = Status.BACKOFF;
        }

        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // 返回结果
        return status;
    }

    @Override
    public long getBackOffSleepIncrement() {
        return 0;
    }

    @Override
    public long getMaxBackOffSleepInterval() {
        return 0;
    }

    @Override
    public void configure(Context context) {
        // 读取配置文件, 给前后缀赋值
        prefix = context.getString("prefix");
        subfix = context.getString("subfix", "Tom");

    }
}
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869

5. 测试
(1)打包。将写好的代码打包,并放到 flume 的 lib 目录(opt/module/flume)下。

(2)配置文件

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = source.MySource
a1.sources.r1.prefix = online
a1.sources.r1.subfix = offline

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
123456789101112131415161718192021

(3)开启任务

[Tom@hadoop102 flume-1.7.0]$ bin/flume-ng agent -c conf/ -f job/mysource.conf -n a1 -Dflume.root.logger=INFO,console
1

(4)结果展示

1.7 自定义 Sink

1. 介绍
Sink 不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent 。
Sink 是完全事务性的。在从 Channel 批量删除数据之前,每个 Sink 用 Channel 启动一个事务。批量事件一旦成功写出到存储系统或下一个 Flume Agent,Sink 就利用 Channel 提交事务。
事务一旦被提交,该 Channel 从自己的内部缓冲区删除事件。
Sink 组件目的地包括hdfs、logger、avro、thrift、ipc、file、null、HBase、solr、自定义。官方提供的Sink 类型已经很多,但是有时候并不能满足实际开发当中的需求,此时我们就需要根据实际需求自定义某些Sink。
官方也提供了自定义sink 的接口:
https://flume.apache.org/FlumeDeveloperGuide.html#sink
根据官方说明自定义 MySink 需要继承 AbstractSink 类并实现 Configurable 接口。
实现相应方法:
configure(Context context) //初始化context(读取配置文件内容)
process() //从 Channel 读取获取数据(event),这个方法将被循环调用。
使用场景:读取 Channel 数据写入 MySQL 或者其他文件系统。

2. 需求
使用 flume 接收数据,并在 Sink 端给每条数据添加前缀和后缀,输出到控制台。前后缀可在 flume 任务配置文件中配置。
流程分析:


3. 编码

package sink;

import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MySink extends AbstractSink implements Configurable {
    // 获取Logger对象
    private Logger logger = LoggerFactory.getLogger(MySink.class);

    // 定义两个属性,前后缀
    private String prefix;
    private String subfix;

    /**
     * 1 获取Channel
     * 2 从Channel获取事务及数据
     * 3 发送数据
     */
    @Override
    public Status process() throws EventDeliveryException {
        // 1 定义返回值
        Status status = null;

        // 2 获取Channel
        Channel channel = getChannel();

        // 3 从Channel获取事务
        Transaction transaction = channel.getTransaction();

        // 4 开启事务
        transaction.begin();

        try {
            // 5 从Channel获取数据
            Event event = channel.take();

            // 6 处理事件
            if (event != null){
                String body = new String(event.getBody());
                logger.info(prefix + body  + subfix);
                // logger.error(prefix + body  + subfix);
            }

            // 7 提交事务
            transaction.commit();

            // 8 成功提交, 修改状态信息
            status = Status.READY;
        } catch (ChannelException e) {
            e.printStackTrace();

            // 9 提交事务失败
            transaction.rollback();

            // 10 修改状态
            status = Status.BACKOFF;

        } finally {
            // 11 最终, 关闭事务
            transaction.close();
        }

        // 12 返回状态信息
        return status;
    }

    @Override
    public void configure(Context context) {

        // 读取配置文件, 为前后缀赋值
        prefix = context.getString("prefix");
        subfix = context.getString("subfix", "Tom");
    }
}
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677

4. 测试
(1)打包。将写好的代码打包,并放到 flume 的 lib 目录(/opt/module/flume)下。
(2)配置文件

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = sink.MySink
a1.sinks.k1.prefix = online--
a1.sinks.k1.subfix = --offline

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
1234567891011121314151617181920212223

(3)开启任务

[Tom@hadoop102 flume-1.7.0]$ bin/flume-ng agent -c conf/ -f job/mysink.conf -n a1 -Dflume.root.logger=INFO,console

[Tom@hadoop102 flume-1.7.0]$ nc localhost 44444
hello
OK
HUST
OK
1234567

(4)结果展示

2 常见问题

2.1 你是如何实现 Flume 数据传输的监控的?

使用第三方框架 Ganglia 实时监控Flume。

2.2 Flume 的 Source,Sink,Channel 的作用?你们 Source 是什么类型?

1. 作用
(1)Source 组件是专门用来收集数据的,可以处理各种类型、各种格式的日志数据,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy
(2)Channel 组件对采集到的数据进行缓存,可以存放在 Memory 或File 中。
(3)Sink 组件是用于把数据发送到目的地的组件,目的地包括 HDFS、Logger、avro、thrift、ipc、file、Hbase、solr、自定义。

2. 我公司采用的Source 类型为
(1)监控后台日志:exec
(2)监控后台产生日志的端口:netcat、exec、spooldir

2.3 Flume 的 Channel Selectors


Channel Selectors,可以让不同的项目日志通过不同的 Channel 到不同的 Sink 中去。
官方文档上 Channel Selectors 有两种类型:Replicating Channel Selector (default)和 Multiplexing Channel Selector
这两种Selector的区别是:Replicating 会将source过来的events发往所有channel,而Multiplexing可以选择该发往哪些Channel。

2.4 Flume 参数调优

(1)Source
增加 Source 个数(使用 Tair Dir Source 时可增加 FileGroups 个数)可以增大 Source 的读取数据的能力。例如:当某一个目录产生的文件过多时需要将这个文件目录拆分成多个文件目录,同时配置好多个 Source 以保证 Source 有足够的能力获取到新产生的数据。
batchSize 参数决定 Source 一次批量运输到 Channel 的 event 条数,适当调大这个参数可以提高Source 搬运 Event 到 Channel 时的性能。
(2)Channel
type 选择 memory 时 Channel 的性能最好,但是如果 Flume 进程意外挂掉可能会丢失数据。 type 选择 file 时 Channel 的容错性更好,但是性能上会比 memory channel 差。
使用 file Channel 时 dataDirs 配置多个不同盘下的目录可以提高性能。
Capacity 参数决定 Channel 可容纳最大的 event 条数。 transactionCapacity 参数决定每
次 Source 往 channel 里面写的最大 event 条数和每次 Sink 从 channel 里面读的最大 event
条数。
transactionCapacity 需要大于 Source 和 Sink 的 batchSize 参数。
(3)Sink
增加 Sink 的个数可以增加 Sink 消费 event 的能力。 Sink 也不是越多越好够用就行,过
多的 Sink 会占用系统资源,造成系统资源不必要的浪费。
batchSize 参数决定 Sink 一次批量从 Channel 读取的 event 条数,适当调大这个参数可
以提高 Sink 从 Channel 搬出 event 的性能。

2.5 Flume 的事务机制

Flume的事务机制(类似数据库的事务机制): Flume 使用两个独立的事务分别负责从 Soucrce 到 Channel ,以及从 Channel 到 Sink 的事件传递。比如 spooling directory source 为文件的每一行创建一个事件,一旦事务中所有的事件全部传递到 Channel 且提交成功,那么 Soucrce 就将该文件标记为完成。同理,事务以类似的方式处理从 Channel 到 Sink 的传递过程,如果因为某种原因使得事件无法记录,那么事务将会回滚。且所有的事件都会保持到 Channel 中,等待重新传递。

2.6 Flume 采集数据会丢失吗?

根据 Flume 的架构原理, Flume 是不可能丢失数据的,其内部有完善的事务机制,Source 到 Channel 是事务性的, Channel 到 Sink 是事务性的,因此这两个环节不会出现数据的丢失,唯一可能丢失数据的情况是 Channel 采用 memory Channel agent 宕机导致数据丢失,或者 Channel 存储数据已满,导致 Source 不再写入,未写入的数据丢失。
Flume 不会丢失数据,但是有可能造成数据的重复,例如数据已经成功由 Sink 发出,但是没有接收到响应, Sink 会再次发送数据,此时可能会导致数据的重复 。

页面更新:2024-04-29

标签:数据   控制台   端口   文件夹   需求   事务   事件   文件   内容   技术   日志

1 2 3 4 5

上滑加载更多 ↓
更多:

本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828  

© CopyRight 2008-2024 All Rights Reserved. Powered By bs178.com 闽ICP备11008920号-3
闽公网安备35020302034844号

Top