ITPub博客

首页 > 大数据 > Hadoop > flume使用入门

flume使用入门

Hadoop 作者:532327593 时间:2014-02-22 16:15:51 0 删除 编辑
一. flume是什么
flume是apache的一个数据收集框架。定义了一个数据流的模型。


二. 设计核心思想
对日志收集的三个核心要素进行了抽象:
1. Source:日志从哪里产生
2. Sink:日志会被推送到什么地方
3. Channel:Source通过什么样的渠道,送到Sink
# 可以把Channel理解为一个queue,Source adds the events and Sink removes it.


三. flume的价值 1
得益于设计上的抽象,使得要素隔离得很清晰。使用上变得非常灵活。我们可以根据实际需要,对于这三个核心要素,选择合适的具体实现,完成我们工作场景下,日志收集的目的!

apache官方已经对这三个具体要素,提供了很多现成的实现:

Source:
     Avro Source:
     Thrift Source:
     #上面这两个很像,都是通过序列化的流,接收外部向它提交的数据,这两个都是apache自己实现的序列化。它们最常见的使用场景,都是用来构建log chain的,下面会进一步介绍。Avro是hadoop自己实现的序列化方案,官方比较推荐这个
     Exec Source:收集外部Linux命令产生的标准输出(标准错误默认被丢弃,但有必要的话,我们都会做重定向)。对外部命令本身有一定要求,例如tail -F这种持续产生输出的命令,就比较适合收集目的。
     NetCat Source:使用简单的nc命令,打开一个监听端口。可以接收从nc发送过来的数据。功能简单,主要用于调试。
     JMS Source:从消息队列获取数据。
    Syslog Source:直接从syslog(RFC 3164 - The BSD syslog Protocol, RFC 5424 - The Syslog Protocol)获取数据

Sink:
    HDFS Sink:推送到hdfs上,这个模块应该算是flume的主打模块了。
    Avro Sink/Thrift Sink:使用在级联的场景下,配合后端agent对应的Avro Source和Thrift Source构成log chain。后面会详细介绍。
    File Roll Sink:存放在外部文件系统中。
    HBase Sink:
    Logger Sink:debug用途  

Channels:
    Memory Channel:优点,高吞吐量。缺点:1. 较容易受到容量限制 2. 无法持久化,在flume进程意外结束后,丢失数据。
    JDBC Channels:使用内置的数据库Derby作为数据暂存区
    File Channels:使用外部的文件系统。可以设置文件、目录加密。不过速度很慢。。


四. flume的配置
为了方便后续的讲解,先打断插入介绍一下flume的配置。
整体而言,flume的配置简单到令人发指的程度

全局设定

# 我们已经知道了,一个flume agent,主要就是由Source,Channel,Sink三部分构成的。所以先定义它们。 .sources = .sinks = .channels = # 指定某个Source收集的数据,送到哪些下游的Channel .sources..channels = ...
# 指定某个Sink,从哪个Channel获取数据 .sinks..channel =

注意:一个Source可以向多个Channels发送数据。但一个Sink只能指定一个Channel。

各个模块设置
# Source的设置
.sources.. = 

# Channel的设置
.channel.. = 

# Sink的设置
.sources.. = 

# Interceptor的设置
.sources.. = 


五. flume的价值2:
   
     得益于flume模块化的设计,使得flume可以互相组合嵌套,从而产生出很多种灵活的使用方式!

方式一:较简单的绑定

配置
frontend:
agent_foo.sources = avro-AppSrv-source
agent_foo.sinks = avro-forward-sink
agent_foo.channels = file-channel

agent_foo.sources.avro-AppSrv-source.channels = file-channel
agent_foo.sinks.avro-forward-sink.channel = file-channel

agent_foo.sinks.avro-forward-sink.type = avro
agent_foo.sinks.avro-forward-sink.hostname = 10.1.1.100
agent_foo.sinks.avro-forward-sink.port = 10000

backend:
agent_foo.sources = avro-collection-source
agent_foo.sinks = hdfs-sink
agent_foo.channels = mem-channel

agent_foo.sources.avro-collection-source.channels = mem-channel
agent_foo.sinks.hdfs-sink.channel = mem-channel

agent_foo.sources.avro-collection-source.type = avro
agent_foo.sources.avro-collection-source.bind = 10.1.1.100
agent_foo.sources.avro-collection-source.port = 10000


方式二:汇流 Consolidation

配置和方式一相似,只有概念上的差别。此处省去配置。


方式三:分流 Fan out
对于分流场景,有两种策略可以设置。缺省是replicating,这种模式比较简单,意为从一个source收集来的数据,无差别地向所有下游绑定的channels进行发送。
a1.sources = r1
a1.channels = c1 c2 c3
a1.source.r1.selector.type = replicating
a1.source.r1.channels = c1 c2 c3

还有一种模式叫multiplexing。可以根据接收到的数据(在flume设计实现中,抽象为一个event)中的head信息,进行选择发送。
a1.sources = r1
a1.channels = c1 c2 c3 c4
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.CZ = c1
a1.sources.r1.selector.mapping.US = c2 c3
a1.sources.r1.selector.default = c4


六. 拦截器(Source Interceptor)

Flume事件抽象——Event
body承载的就是需要被收集的日志数据。
而Interceptor的作用就是对header进行设定

Interceptor配置:

backendAgent.sources.thriftSource.interceptors = hostInterceptor timeStampInterceptor staticInterceptor
backendAgent.sources.thriftSource.interceptors.hostInterceptor.type = host
backendAgent.sources.thriftSource.interceptors.hostInterceptor.hostHeader = ip
backendAgent.sources.thriftSource.interceptors.timeStampInterceptor.type = timestamp
backendAgent.sources.thriftSource.interceptors.staticInterceptor.type = static
backendAgent.sources.thriftSource.interceptors.staticInterceptor.key = source
backendAgent.sources.thriftSource.interceptors.staticInterceptor.value = tc_flume




七. 高级设置

Sink Processor

通过引入sinkgroup的概念,来为多个sinks如何协同工作设置策略
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = default | failover | load_balance
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000

Event Serializer



八. 使用问题杂记:

0. protobuf
如果需要自己编译flume源码,先要自行安装protobuf包。
1. 关于依赖包
对于flume项目的依赖包组织,用一句话简 单总结——就是坨屎!flume自带的脚本(flume-ng),可以通过配置在HADOOP_HOME变量的设定,直接使用hadoop项目中的jar 包(特别是hdfs和hbase的)。这对于使用hdfs的工作场景中是有意义的。因为事实上,flume自己只带了flume-hdfs-sink一层 很薄的接口,真实的HDFS存取访问操作,是需要外部的hadoop项目包提供的。不过,真要这么做的话,会立即会碰到很多版本不一致的冲突。。
所以,我最后为了隔离性和依赖的可控性,还是在flume的启动脚本中(flume-ng)中,重设了hadoop环境变量:
HADOOP_HOME=""
然后手工复制所需要的依赖(以下我列举的依赖包,在使用hdfs sink的场景中是必须的)
hadoop-common
hadoop-hdfs
hadoop-auth
commons-configuration
另外,对于guava包,不同的模块需要版本号也会有差异,非常头痛。只能分离多份flume的拷贝,根据不同使用配置的agent,采用不同的包。

2. 在HDFS Sink下的复制因子
我 们知道,在HDFS配置文件中,可以通过dfs.replication变量,设置hdfs的复制因子。而flume作为客户端,也会对这个复制因子有个 3的初始预设值。如果服务器的真实配置的复制因子小于这个数目,就会引起flume rotate存放在hdfs上的文件。并在标准输出打出一行警告:
Block Under-replication detected. Rotating file.
这会使得hdfs sink的一些设定,比如根据文件大小,根据事件数量,根据时间间隔翻滚文件的设置完全无效化,最后的结果就是在hdfs留下一堆十几kb的小文件。这对于本身就是为大文件设计的hdfs而言,无异于一种扼杀!
所以,如果确实不能配置3个以上datanode的话,一个简单的作法是在flume的配置目录下,复制一份hdfs-site.xml文件,写上:


    
          dfs.replication
          1
    


3. bugs:
[flume-ng-core]
org.apache.flume.source.ThriftSource
278        logger.warn("Thrift source % could not append events to the " +
279          "channel.", getName());
应改成:
278        logger.warn("Thrift source {} could not append events to the " +
279          "channel.", getName());














<!-- 正文结束 -->

来自 “ ITPUB博客 ” ,链接:http://blog.itpub.net/23987042/viewspace-1114331/,如需转载,请注明出处,否则将追究法律责任。

上一篇: 没有了~
下一篇: 没有了~
请登录后发表评论 登录
全部评论

注册时间:2010-05-27