文章目录
1 Flume 概述1.1 Flume 定义1.2 Flume 基础架构1.2.1 Agent1.2.2 Source1.2.3 Sink1.2.4 Channel1.2.5 Event 2 Flume 快速入门2.1 Flume 安装部署2.1.1 安装地址2.1.2 安装部署 2.2 Flume 入门案例2.2.1 监控端口数据官方案例2.2.2 实时监控单个追加文件2.3.3 实时监控目录下多个新文件2.2.4 实时监控目录下的多个追加文件1 Flume 概述
1.1 Flume 定义
Flume 是Cloudera 提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统。Flume 基于流式架构,灵活简单。
Flume 最主要的作用就是,实时读取服务器本地磁盘的数据,将数据写入到 HDFS。
1.2 Flume 基础架构
1.2.1 Agent
Agent 是一个 JVM 进程,它以事件的形式将数据从源头送至目的。
Agent
主要由 3 个部分组成,Source
、Channel
、Sink
。
1.2.2 Source
Source 是负责接收数据到 Flume Agent 的组件。 Source 组件可以处理各种类型、各种格式的日志数据,包括avro
、thrift
、exec
、jms
、spooling directory
、netcat
、sequence generator
、syslog
、http
、legacy
。
1.2.3 Sink
Sink 不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent 。
Sink组件目的地包括hdfs
、logger
、avro
、thrift
、ipc
、file
、HBase
、solr
、自定义。
1.2.4 Channel
Channel 是位于 Source 和 Sink 之间的缓冲区。因此, Channel 允许 Source 和 Sink 运作在不同的速率上。 Channel 是线程安全的,可以同时处理几个 Source 的写入操作和几个Sink 的读取操作。
Flume 自带三种 Channel:Memory Channel
和File Channel
以及Kafka Channel
。
Memory Channel是内存中的队列。 Memory Channel 在不需要关心数据丢失的情景下适用。如果需要关心数据丢失,那么 Memory Channel 就不应该使用,因为程序死亡、机器宕
机或者重启都会导致数据丢失。
File Channel 将所有事件写到磁盘。因此在程序关闭或机器宕机的情况下不会丢失数据。
1.2.5 Event
传输单元,Flume 数据传输的基本单元,以 Event 的形式将数据从源头送至目的地。Event 由Header
和Body
两部分组成, Header 用来存放该 event 的一些属性,为 K-V 结构,Body 用来存放该条数据,形式为字节数组。
2 Flume 快速入门
2.1 Flume 安装部署
2.1.1 安装地址
(1)Flume 官网地址
/
(2)文档查看地址
/FlumeUserGuide.html
(3)下载地址
/dist/flume/
2.1.2 安装部署
(1)将 apache-flume-1.7.0-bin.tar.gz 上传到 linux 的/opt/software
目录下
(2)解压 apache-flume-1.7.0 bin.tar.gz 到/opt/module/
目录下
[Tom@hadoop102 software]$ tar -zxf apache-flume-1.7.0-bin.tar.gz -C /opt/module/
(3)修改 apache-flume-1.7.0-bin 的名称为 flume-1.7.0
[Tom@hadoop102 module]$ mv apache-flume-1.7.0-bin flume-1.7.0
(4)将 flume/conf 下的 flume-env.sh.template 文件修改为 flume-env.sh,并配置flume-env.sh
文件
[Tom@hadoop102 conf]$ mv flume-env.sh.template flume-env.sh[Tom@hadoop102 conf]$ vim flume-env.shexport JAVA_HOME=/opt/module/jdk1.8.0_212
2.2 Flume 入门案例
2.2.1 监控端口数据官方案例
1. 案例需求
使用 Flume 监听一个端口, 收集该端口数据 ,并打印到控制台。
2. 需求分析
3. 实现步骤
(1)安装 netcat 工具
[Tom@hadoop102 software]$ sudo yum install -y nc
(2)判断44444 端口是否被占用
[Tom@hadoop102 flume-1.7.0]$ sudo netstat -tunlp | grep 44444
(3)创建Flume Agent 配置文件flume-netcat-logger.conf
在 flume-1.7.0 目录下创建 job 文件夹并进入 job 文件夹。
[Tom@hadoop102 flume]$ mkdir job[Tom@hadoop102 flume]$ cd job/
在 job 文件夹下创建 Flume Agent 配置文件netcat-flume-logger.conf
。
[Tom@hadoop102 job]$ vim netcat-flume-logger.conf
在netcat-flume-logger.conf
文件中添加如下内容:
# Name the components on this agent a1:表示agent的名称a1.sources = r1 # r1:表示a1的Source的名称a1.sinks = k1 # k1:表示a1的Sink的名称a1.channels = c1 # c1: 表示a1的Channel的名称# Describe/configure the sourcea1.sources.r1.type = netcat # 表示a1的输入源类型为netcat端口类型a1.sources.r1.bind = localhost # 表示a1的监听的主机a1.sources.r1.port = 44444 # 表示a1的监听的端口号# Describe the sinka1.sinks.k1.type = logger # 表示a1的输出目的地是控制台logger类型# Use a channel which buffers events in memorya1.channels.c1.type = memory # 表示a1的channel类型是memory内存型a1.channels.c1.capacity = 1000 # 表示a1的channel总容量为1000个eventa1.channels.c1.transactionCapacity = 100 # 表示a1的channel传输时收集到了100条event以后再去提交事务# Bind the source and sink to the channela1.sources.r1.channels = c1 # 表示将r1和c1连接起来a1.sinks.k1.channel = c1 # 表示将k1和c1连接起来
注:配置文件来源于官方手册 /FlumeUserGuide.html
(4)先开启 flume 监听端口
第一种写法:
[Tom@hadoop102 flume-1.7.0]$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/netcat-flume-logger.conf -Dflume.root.logger=INFO,console
第二种写法
[Tom@hadoop102 flume-1.7.0]$ bin/flume-ng agent -c conf/ -n a1 -f job/netcat-flume-logger.conf -Dflume.root.logger=INFO,console
参数说明:
--conf/-c
:表示配置文件存储在conf/目录
--name/-n
:表示给agent 起名为a1
--conf-file/-f
:flume 本次启动读取的配置文件是在job 文件夹下的flume-telnet.conf文件。
-Dflume.root.logger=INFO,console
:-D 表示 flume 运行时动态修改flume.root.logger 参数属性值,并将控制台日志打印级别设置为INFO 级别。日志级别包括:log、info、warn、error。
(5)使用 netcat 工具向本机的 44444 端口发送内容
[Tom@hadoop102 job]$ nc localhost 44444helloOKHUSTOK
(6)在Flume 监听页面观察接收数据情况
2.2.2 实时监控单个追加文件
1. 案例需求:实时监控 Hive 日志,并上传到 HDFS 中
2. 需求分析
3. 实现步骤
(1)Flume 要想将数据输出到HDFS,须持有Hadoop 相关jar 包。将以下 jar 包
拷贝到opt/module/flume-1.7.0/lib
文件夹下。
(2)创建file-flume-hdfs.conf
文件
[Tom@hadoop102 job]$ vim file-flume-hdfs.conf
注:要想读取 Linux 系统中的文件,就得按照 Linux 命令的规则执行命令。由于Hive 日志在 Linux 系统中,所以读取文件的类型选择:exec 即 execute 执行的意思。表示执行Linux 命令来读取文件。
添加如下内容
# Name the components on this agenta2.sources = r2a2.sinks = k2a2.channels = c2# Describe/configure the sourcea2.sources.r2.type = execa2.mand = tail -F /opt/module/hive-3.1.2/logs/hive.log# Describe the sinka2.sinks.k2.type = hdfsa2.sinks.k2.hdfs.path = hdfs://hadoop102:8020/flume/%Y%m%d/%H#上传文件的前缀a2.sinks.k2.hdfs.filePrefix = logs-#是否按照时间滚动文件夹a2.sinks.k2.hdfs.round = true#多少时间单位创建一个新的文件夹a2.sinks.k2.hdfs.roundValue = 1#重新定义时间单位a2.sinks.k2.hdfs.roundUnit = hour#是否使用本地时间戳a2.sinks.k2.hdfs.useLocalTimeStamp = true#积攒多少个Event才flush到HDFS一次a2.sinks.k2.hdfs.batchSize = 1000#设置文件类型,可支持压缩a2.sinks.k2.hdfs.fileType = DataStream#多久生成一个新的文件a2.sinks.k2.hdfs.rollInterval = 30#设置每个文件的滚动大小a2.sinks.k2.hdfs.rollSize = 134217700#文件的滚动与Event数量无关a2.sinks.k2.hdfs.rollCount = 0# Use a channel which buffers events in memorya2.channels.c2.type = memorya2.channels.c2.capacity = 1000a2.channels.c2.transactionCapacity = 100# Bind the source and sink to the channela2.sources.r2.channels = c2a2.sinks.k2.channel = c2
注意:对于所有与时间相关的转义序列,Event Header 中必须存在以’ timestamp’的 key (除非hdfs.useLocalTimeStamp 设置为 true ,此方法会使用 TimestampInterceptor 自动添加 timestamp)。a3.sinks.k3.hdfs.useLocalTimeStamp = true
。
(3)运行 Flume
[Tom@hadoop102 flume-1.7.0]$ bin/flume-ng agent -c conf/ -n a2 -f job/file-flume-hdfs.conf
(4)开启 Hadoop 和 Hive 并操作 Hive 产生日志
[Tom@hadoop102 hadoop-3.1.3]$ sbin/start-dfs.sh[Tom@hadoop102 hadoop-3.1.3]$ sbin/start-yarn.sh[Tom@hadoop102 hive-3.1.2]$ bin/hive
(5)在HDFS上查看文件
2.3.3 实时监控目录下多个新文件
1. 案例需求:使用 Flume 监听整个目录的文件,并上传至HDFS
2. 需求分析
3. 实现步骤
(1)创建配置文件dir-flume-hdfs.conf
创建一个文件
[Tom@hadoop102 job]$ vim dir-flume-hdfs.conf
添加如下内容
# Name the components on this agenta2.sources = r2a2.sinks = k2a2.channels = c2# Describe/configure the sourcea2.sources.r2.type = spooldira2.sources.r2.spoolDir = /opt/module/flume-1.7.0/upload#忽略所有以.tmp结尾的文件,不上传a2.sources.r2.ignorePattern = ([^ ]*\.tmp)# Describe the sinka2.sinks.k2.type = hdfsa2.sinks.k2.hdfs.path = hdfs://hadoop102:8020/flume/%Y%m%d/%H#上传文件的前缀a2.sinks.k2.hdfs.filePrefix = upload-#是否按照时间滚动文件夹a2.sinks.k2.hdfs.round = true#多少时间单位创建一个新的文件夹a2.sinks.k2.hdfs.roundValue = 1#重新定义时间单位a2.sinks.k2.hdfs.roundUnit = hour#是否使用本地时间戳a2.sinks.k2.hdfs.useLocalTimeStamp = true#积攒多少个Event才flush到HDFS一次a2.sinks.k2.hdfs.batchSize = 1000#设置文件类型,可支持压缩a2.sinks.k2.hdfs.fileType = DataStream#多久生成一个新的文件a2.sinks.k2.hdfs.rollInterval = 30#设置每个文件的滚动大小a2.sinks.k2.hdfs.rollSize = 134217700#文件的滚动与Event数量无关a2.sinks.k2.hdfs.rollCount = 0# Use a channel which buffers events in memorya2.channels.c2.type = memorya2.channels.c2.capacity = 1000a2.channels.c2.transactionCapacity = 100# Bind the source and sink to the channela2.sources.r2.channels = c2a2.sinks.k2.channel = c2
(2)启动监控文件夹命令
[Tom@hadoop102 flume-1.7.0]$ bin/flume-ng agent -c conf/ -n a2 -f job/dir-flume-hdfs.conf
说明:在使用 Spooling Directory Source 时不要在监控目录中创建并持续修改文件,上传完成的文件会以.COMPLETED
结尾,被监控文件夹每 500 毫秒扫描一次文件变动。
(3)向upload 文件夹中添加文件
在/opt/module/flume-1.7.0
目录下创建 upload 文件夹
[Tom@hadoop102 flume]$ mkdir upload
向 upload 文件夹中添加文件
[huxili@hadoop102 upload]$ touch hust.txt[huxili@hadoop102 upload]$ touch hust.tmp[huxili@hadoop102 upload]$ touch hust.log
(4) 查看 HDFS 上的数据
(5)等待 1s,再次查询 upload 文件夹
[Tom@hadoop102 upload]$ ll-rw-rw-r--. 1 Tom Tom 0 9月 11 20:38 PLETED-rw-rw-r--. 1 Tom Tom 0 9月 11 20:38 hust.tmp-rw-rw-r--. 1 Tom Tom 0 9月 11 20:38 PLETED
2.2.4 实时监控目录下的多个追加文件
Exec source
适用于监控一个实时追加的文件,但不能保证数据不丢失;Spooldir Source
能够保证数据不丢失,且能够实现断点续传,但延迟较高,不能实时监控;而Taildir Source
既能够实现断点续传,又可以保证数据不丢失,还能够进行实时监控。
1. 案例需求:使用 Flume 监听整个目录的实时追加文件,并上传至HDFS。(在实际操作中我们直接打印到控制台,这样更直观)
2. 需求分析:
3. 实现步骤
(1)创建配置文件flume-taildir-hdfs.conf
a1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the sourcea1.sources.r1.type = TAILDIRa1.sources.r1.filegroups = f1 f2a1.sources.r1.filegroups.f1 = /opt/module/flume-1.7.0/files/file1.txta1.sources.r1.filegroups.f2 = /opt/module/flume-1.7.0/files/file2.txta1.sources.r1.positionFile = /opt/module/flume-1.7.0/position/position.json# Describe the sinka1.sinks.k1.type = logger# Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1
(2)启动监控文件夹命令
[Tom@hadoop102 flume-1.7.0]$ bin/flume-ng agent -c conf/ -n a1 -f job/flume-taildir-hdfs.conf
(3)向 files 文件夹中追加内容
[Tom@hadoop102 flume]$ mkdir files
向 upload 文件夹中添加文件
[Tom@hadoop102 files]$ echo hello >> file1.txt [Tom@hadoop102 files]$ echo hust >> file2.txt
(4)查看数据
Taildir 说明:
Taildir Source 维护了一个json 格式的 position File,其会定期的往 position File 中更新每个文件读取到的最新的位置,因此能够实现断点续传。Position File 的格式如下:
{"inode":2496272,"pos":12,"file":"/opt/module/flume/files/file1.txt"}{"inode":2496275,"pos":12,"file":"/opt/module/flume/files/file2.txt"}
注:Linux 中储存文件元数据的区域就叫做 inode,每个 inode 都有一个号码,操作系统用 inode 号码来识别不同的文件,Unix/Linux 系统内部不使用文件名,而使用 inode 号码来识别文件。
参考:
/video/BV184411B7kU?p=15