标签 - flume

? flume ?    2016-05-24 22:04:31    1030    0    0

    这个agent实现读取nginx访问日志,并将日志sink到服务器端的3个agent(Avro Source)。我们发现Flume Avro Sink的配置只能配置一个hostname和port,就是说我们只能通过简单配置将日志数据发送到服务器端的一个flume agent。那么event数据流在传输时就存在一个单点故障风险,即服务器端的flume agent无法联通的情况下,这个数据流通路就断了。

    接下来我们来看一下,在使用Avro sink-Avro source时,如何实现容错?

nginx-log.sources=tailCmd
nginx-log.sinks=sinkAvro1 sinkAvro2 sinkAvro3
nginx-log.channels=ch1
 
nginx-log.sources.tailCmd.type=exec
nginx-log.sources.tailCmd.command=tail -F /usr/local/nginx/logs/host.access.log
#下一次重启前,等待的时间
nginx-log.sources.tailCmd.restartThrottle=1000
#如果执行命令死掉了,是否需要重启命令
nginx-log.sources.tailCmd.restart=true
#标准输出是否被记录
nginx-log.sources.tailCmd.logStdErr=false
#一次性向channel发送20行
nginx-log.sources.tailCmd.batchSize=20
#没有达到buffer size时,等待3秒后,将数据推送走
nginx-log.sources.tailCmd.batchTimeout=3000
nginx-log.sources.tailCmd.channels=ch1
nginx-log.sources.tailCmd.interceptors = i1
nginx-log.sources.tailCmd.interceptors.i1.type = com.wanda.flume.interceptor.MyAgentInterceptor$Builder
nginx-log.sources.tailCmd.interceptors.i1.s
? flume ?    2016-05-24 22:04:31    1548    0    0

我们使用一个Avro Source,通过3个replicating channel,分别sink到Kafka、HDFS和HBase。Agent配置如下:

tier1.sources = avroSrc
tier1.channels = ch1 ch2 ch3
tier1.sinks = sKafka sHbase sHdfs
 
tier1.sources.avroSrc.type = avro
tier1.sources.avroSrc.bind = 0.0.0.0
tier1.sources.avroSrc.port = 44444
tier1.sources.avroSrc.channels = ch1 ch2 ch3
 
#产生的最大工作线程数
#tier1.sources.avroSrc.threads
 
#压缩类型,none或者deflate
#tier1.sources.avroSrc.compression-type
 
#设置成true表示使用ssl协议加密。同时必须指定“keystore” 和 “keystore-password”
#tier1.sources.avroSrc.ssl=false
 
#Java keystore文件的路径
#tier1.sources.avroSrc.keystore
 
#Java keystore的密码
#tier1.sources.avroSrc.keystore-password
 
#Java keystore的类型。JKS或者PKCS12
#tier1.sources.avroSrc.keystore-type=JKS
 
#不包含的协议,用空格分隔
#tier1.sources.avroSrc.exclude-protocols=SSLv3
 
#设置成true,表示对netty开启IP地址过滤
#tier1.sources.avroSrc.ipFilter=false
 
#定义IP地址过滤规则
#tier1.sources.avroSrc.ipFilterRules
 
#给source增加一个自定义拦截器
tier1.sources.avroSrc.interceptors = myinterceptor
#自定义拦截器,在flume-ext-1.0-SNAPSHOT.jar中
tier1.