这个agent实现读取nginx访问日志,并将日志sink到服务器端的3个agent(Avro Source)。我们发现Flume Avro Sink的配置只能配置一个hostname和port,就是说我们只能通过简单配置将日志数据发送到服务器端的一个flume agent。那么event数据流在传输时就存在一个单点故障风险,即服务器端的flume agent无法联通的情况下,这个数据流通路就断了。
接下来我们来看一下,在使用Avro sink-Avro source时,如何实现容错?
nginx-log.sources=tailCmd nginx-log.sinks=sinkAvro1 sinkAvro2 sinkAvro3 nginx-log.channels=ch1 nginx-log.sources.tailCmd.type=exec nginx-log.sources.tailCmd.command=tail -F /usr/local/nginx/logs/host.access.log #下一次重启前,等待的时间 nginx-log.sources.tailCmd.restartThrottle=1000 #如果执行命令死掉了,是否需要重启命令 nginx-log.sources.tailCmd.restart=true #标准输出是否被记录 nginx-log.sources.tailCmd.logStdErr=false #一次性向channel发送20行 nginx-log.sources.tailCmd.batchSize=20 #没有达到buffer size时,等待3秒后,将数据推送走 nginx-log.sources.tailCmd.batchTimeout=3000 nginx-log.sources.tailCmd.channels=ch1 nginx-log.sources.tailCmd.interceptors = i1 nginx-log.sources.tailCmd.interceptors.i1.type = com.wanda.flume.interceptor.MyAgentInterceptor$Builder nginx-log.sources.tailCmd.interceptors.i1.s
我们使用一个Avro Source,通过3个replicating channel,分别sink到Kafka、HDFS和HBase。Agent配置如下:
tier1.sources = avroSrc tier1.channels = ch1 ch2 ch3 tier1.sinks = sKafka sHbase sHdfs tier1.sources.avroSrc.type = avro tier1.sources.avroSrc.bind = 0.0.0.0 tier1.sources.avroSrc.port = 44444 tier1.sources.avroSrc.channels = ch1 ch2 ch3 #产生的最大工作线程数 #tier1.sources.avroSrc.threads #压缩类型,none或者deflate #tier1.sources.avroSrc.compression-type #设置成true表示使用ssl协议加密。同时必须指定“keystore” 和 “keystore-password” #tier1.sources.avroSrc.ssl=false #Java keystore文件的路径 #tier1.sources.avroSrc.keystore #Java keystore的密码 #tier1.sources.avroSrc.keystore-password #Java keystore的类型。JKS或者PKCS12 #tier1.sources.avroSrc.keystore-type=JKS #不包含的协议,用空格分隔 #tier1.sources.avroSrc.exclude-protocols=SSLv3 #设置成true,表示对netty开启IP地址过滤 #tier1.sources.avroSrc.ipFilter=false #定义IP地址过滤规则 #tier1.sources.avroSrc.ipFilterRules #给source增加一个自定义拦截器 tier1.sources.avroSrc.interceptors = myinterceptor #自定义拦截器,在flume-ext-1.0-SNAPSHOT.jar中 tier1.