这个agent实现读取nginx访问日志,并将日志sink到服务器端的3个agent(Avro Source)。我们发现Flume Avro Sink的配置只能配置一个hostname和port,就是说我们只能通过简单配置将日志数据发送到服务器端的一个flume agent。那么event数据流在传输时就存在一个单点故障风险,即服务器端的flume agent无法联通的情况下,这个数据流通路就断了。
接下来我们来看一下,在使用Avro sink-Avro source时,如何实现容错?
nginx-log.sources=tailCmd nginx-log.sinks=sinkAvro1 sinkAvro2 sinkAvro3 nginx-log.channels=ch1 nginx-log.sources.tailCmd.type=exec nginx-log.sources.tailCmd.command=tail -F /usr/local/nginx/logs/host.access.log #下一次重启前,等待的时间 nginx-log.sources.tailCmd.restartThrottle=1000 #如果执行命令死掉了,是否需要重启命令 nginx-log.sources.tailCmd.restart=true #标准输出是否被记录 nginx-log.sources.tailCmd.logStdErr=false #一次性向channel发送20行 nginx-log.sources.tailCmd.batchSize=20 #没有达到buffer size时,等待3秒后,将数据推送走 nginx-log.sources.tailCmd.batchTimeout=3000 nginx-log.sources.tailCmd.channels=ch1 nginx-log.sources.tailCmd.interceptors = i1 nginx-log.sources.tailCmd.interceptors.i1.type = com.wanda.flume.interceptor.MyAgentInterceptor$Builder nginx-log.sources.tailCmd.interceptors.i1.s
我们使用一个Avro Source,通过3个replicating channel,分别sink到Kafka、HDFS和HBase。Agent配置如下:
tier1.sources = avroSrc tier1.channels = ch1 ch2 ch3 tier1.sinks = sKafka sHbase sHdfs tier1.sources.avroSrc.type = avro tier1.sources.avroSrc.bind = 0.0.0.0 tier1.sources.avroSrc.port = 44444 tier1.sources.avroSrc.channels = ch1 ch2 ch3 #产生的最大工作线程数 #tier1.sources.avroSrc.threads #压缩类型,none或者deflate #tier1.sources.avroSrc.compression-type #设置成true表示使用ssl协议加密。同时必须指定“keystore” 和 “keystore-password” #tier1.sources.avroSrc.ssl=false #Java keystore文件的路径 #tier1.sources.avroSrc.keystore #Java keystore的密码 #tier1.sources.avroSrc.keystore-password #Java keystore的类型。JKS或者PKCS12 #tier1.sources.avroSrc.keystore-type=JKS #不包含的协议,用空格分隔 #tier1.sources.avroSrc.exclude-protocols=SSLv3 #设置成true,表示对netty开启IP地址过滤 #tier1.sources.avroSrc.ipFilter=false #定义IP地址过滤规则 #tier1.sources.avroSrc.ipFilterRules #给source增加一个自定义拦截器 tier1.sources.avroSrc.interceptors = myinterceptor #自定义拦截器,在flume-ext-1.0-SNAPSHOT.jar中 tier1.
Property Name | Default | Description |
---|---|---|
sinks | – | Space-separated list of sinks that are participating in the group |
processor.type | default | The component type name, needs to bedefault , failover or load_balance |
示例:
a1.sinkgroups=g1 a1.sinkgroups.g1.sinks=k1 k2 a1.sinkgroups.g1.processor.type=load_balance a1.sinkgroups.g1.processor.backoff=true a1.sinkgroups.g1.processor.selector=random
默认的sink processor仅接受单独一个sink。不必对单个sink使用processor。对单个sink可以使用source-channel-sink的方式。
Failover Sink Processor(容错处理器)拥有一个sink的优先级列表,用来保证只有一个sink可用。
容错机制将失败的sink放入一个冷却池中,并给他设置一个冷却时间,如果重试中不断失败,冷却时间将不断增加。一旦sink成功的发送event,sink将被重新保存到一个可用sink池中。在这个可用sink池中,每一个sink都有一个关联优先级值,值越大优先级越高。当一个sink发送event失败时,剩下的sink中优先级最高的sink将试着发送event。例如:在选择发送event的sink时,优先级100的sink将优先于优先级80的sink。如果没有设置sink的优先级,那么优先级将按照设置的顺序从左至右,由高到低来决定。
设置sink组的processor为failover,并且为每个独立的