分类 - Flume

? flume ?    2016-05-24 22:04:31    1027    0    0

    这个agent实现读取nginx访问日志,并将日志sink到服务器端的3个agent(Avro Source)。我们发现Flume Avro Sink的配置只能配置一个hostname和port,就是说我们只能通过简单配置将日志数据发送到服务器端的一个flume agent。那么event数据流在传输时就存在一个单点故障风险,即服务器端的flume agent无法联通的情况下,这个数据流通路就断了。

    接下来我们来看一下,在使用Avro sink-Avro source时,如何实现容错?

nginx-log.sources=tailCmd
nginx-log.sinks=sinkAvro1 sinkAvro2 sinkAvro3
nginx-log.channels=ch1
 
nginx-log.sources.tailCmd.type=exec
nginx-log.sources.tailCmd.command=tail -F /usr/local/nginx/logs/host.access.log
#下一次重启前,等待的时间
nginx-log.sources.tailCmd.restartThrottle=1000
#如果执行命令死掉了,是否需要重启命令
nginx-log.sources.tailCmd.restart=true
#标准输出是否被记录
nginx-log.sources.tailCmd.logStdErr=false
#一次性向channel发送20行
nginx-log.sources.tailCmd.batchSize=20
#没有达到buffer size时,等待3秒后,将数据推送走
nginx-log.sources.tailCmd.batchTimeout=3000
nginx-log.sources.tailCmd.channels=ch1
nginx-log.sources.tailCmd.interceptors = i1
nginx-log.sources.tailCmd.interceptors.i1.type = com.wanda.flume.interceptor.MyAgentInterceptor$Builder
nginx-log.sources.tailCmd.interceptors.i1.s
? flume ?    2016-05-24 22:04:31    1548    0    0

我们使用一个Avro Source,通过3个replicating channel,分别sink到Kafka、HDFS和HBase。Agent配置如下:

tier1.sources = avroSrc
tier1.channels = ch1 ch2 ch3
tier1.sinks = sKafka sHbase sHdfs
 
tier1.sources.avroSrc.type = avro
tier1.sources.avroSrc.bind = 0.0.0.0
tier1.sources.avroSrc.port = 44444
tier1.sources.avroSrc.channels = ch1 ch2 ch3
 
#产生的最大工作线程数
#tier1.sources.avroSrc.threads
 
#压缩类型,none或者deflate
#tier1.sources.avroSrc.compression-type
 
#设置成true表示使用ssl协议加密。同时必须指定“keystore” 和 “keystore-password”
#tier1.sources.avroSrc.ssl=false
 
#Java keystore文件的路径
#tier1.sources.avroSrc.keystore
 
#Java keystore的密码
#tier1.sources.avroSrc.keystore-password
 
#Java keystore的类型。JKS或者PKCS12
#tier1.sources.avroSrc.keystore-type=JKS
 
#不包含的协议,用空格分隔
#tier1.sources.avroSrc.exclude-protocols=SSLv3
 
#设置成true,表示对netty开启IP地址过滤
#tier1.sources.avroSrc.ipFilter=false
 
#定义IP地址过滤规则
#tier1.sources.avroSrc.ipFilterRules
 
#给source增加一个自定义拦截器
tier1.sources.avroSrc.interceptors = myinterceptor
#自定义拦截器,在flume-ext-1.0-SNAPSHOT.jar中
tier1.
? Flume ?    2016-05-24 22:04:31    2221    0    0

Flume Sink Processors

    Sink groups允许用户在一个代理中对多个sink进行分组。Sink processor能够实现分组内的sink负载均衡。以及组内sink容错,实现当组内一个sink失败时,切换至其他的sink。

Property NameDefaultDescription
sinksSpace-separated list of sinks that are participating in the group
processor.typedefaultThe component type name, needs to bedefaultfailover or load_balance

示例:

a1.sinkgroups=g1
a1.sinkgroups.g1.sinks=k1 k2
a1.sinkgroups.g1.processor.type=load_balance
a1.sinkgroups.g1.processor.backoff=true
a1.sinkgroups.g1.processor.selector=random

Default Sink Processor

    默认的sink processor仅接受单独一个sink。不必对单个sink使用processor。对单个sink可以使用source-channel-sink的方式。

Failorver Sink Processor

    Failover Sink Processor(容错处理器)拥有一个sink的优先级列表,用来保证只有一个sink可用。

    容错机制将失败的sink放入一个冷却池中,并给他设置一个冷却时间,如果重试中不断失败,冷却时间将不断增加。一旦sink成功的发送event,sink将被重新保存到一个可用sink池中。在这个可用sink池中,每一个sink都有一个关联优先级值,值越大优先级越高。当一个sink发送event失败时,剩下的sink中优先级最高的sink将试着发送event。例如:在选择发送event的sink时,优先级100的sink将优先于优先级80的sink。如果没有设置sink的优先级,那么优先级将按照设置的顺序从左至右,由高到低来决定。

    设置sink组的processor为failover,并且为每个独立的