Flume容错Avro sink示例
? flume ?    2016-05-24 22:04:31    1030    0    0
holynull   ? flume ?

    这个agent实现读取nginx访问日志,并将日志sink到服务器端的3个agent(Avro Source)。我们发现Flume Avro Sink的配置只能配置一个hostname和port,就是说我们只能通过简单配置将日志数据发送到服务器端的一个flume agent。那么event数据流在传输时就存在一个单点故障风险,即服务器端的flume agent无法联通的情况下,这个数据流通路就断了。

    接下来我们来看一下,在使用Avro sink-Avro source时,如何实现容错?

nginx-log.sources=tailCmd
nginx-log.sinks=sinkAvro1 sinkAvro2 sinkAvro3
nginx-log.channels=ch1
 
nginx-log.sources.tailCmd.type=exec
nginx-log.sources.tailCmd.command=tail -F /usr/local/nginx/logs/host.access.log
#下一次重启前,等待的时间
nginx-log.sources.tailCmd.restartThrottle=1000
#如果执行命令死掉了,是否需要重启命令
nginx-log.sources.tailCmd.restart=true
#标准输出是否被记录
nginx-log.sources.tailCmd.logStdErr=false
#一次性向channel发送20行
nginx-log.sources.tailCmd.batchSize=20
#没有达到buffer size时,等待3秒后,将数据推送走
nginx-log.sources.tailCmd.batchTimeout=3000
nginx-log.sources.tailCmd.channels=ch1
nginx-log.sources.tailCmd.interceptors = i1
nginx-log.sources.tailCmd.interceptors.i1.type = com.wanda.flume.interceptor.MyAgentInterceptor$Builder
nginx-log.sources.tailCmd.interceptors.i1.sysName = wdmap-nginx
#nginx-log.sources.tailCmd.interceptors.i2.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
 
nginx-log.channels.ch1.type = memory
 
#channel中保存的event的最大数量
nginx-log.channels.ch1.capacity = 1000
 
#每次从source取得event的最大数量和每次发送给sink的最大数量
nginx-log.channels.ch1.transactionCapacity = 100
 
#增加和移除一个event的超时时间,单位秒
nginx-log.channels.ch1.keep-alive=3
 
#定义event header占用的缓存百分比。缓存的大小介于byteCapacity和预估channel中所有event的size之间
nginx-log.channels.ch1.byteCapacityBufferPercentage=20
 
#在这个channel中允许存储event body的总字节数。这正是设置byteCapacityBufferPercentage的原因。默认值是JVM最大可用内存的80%(即JVM
#参数-Xmx的80%)。注意,如果存在多个channel在单独的一个JVM中,并且他们保存了相同的event(即,一个source使用两个replicating chann
#el),那么event占用了event size双倍大小的字节容量(byteCapacity )。这个参数设置成0,表示这个值被设置成内部硬性上限200GB左右。
#nginx-log.channels.ch1.byteCapacity
 
nginx-log.sinks.sinkAvro1.type=avro
 
nginx-log.sinks.sinkAvro1.hostname=10.199.203.195
 
nginx-log.sinks.sinkAvro1.port=44444
#批量发送event的数量
nginx-log.sinks.sinkAvro1.batch-size=100
#第一次请求的超时时间,即握手(handshake)超时时间,单位毫秒
nginx-log.sinks.sinkAvro1.connect-timeout=20000
 
#next hop连接重置之前的超时时间,单位秒。这个设置可以使sink连接到目标集群中的新增主机,而不用重新启动代理。
#nginx-log.sinks.sinkAvro1.request-timeout
 
#可以设置成none和deflate,这个值必须与AvroSource的compression-type匹配
#nginx-log.sinks.sinkAvro1.compression-type=none
 
#压缩event的压缩级别。0表示没有压缩,1-9为压缩级别。压缩级别越大压缩比就越大
#nginx-log.sinks.sinkAvro1.compression-level=0
 
#设置sink是否使用ssl协议传输数据。如果设置为true。你可以选择性的设置“truststore”, “truststore-password”, “truststore-type”,并
#且指定是否“trust-all-certs”
#nginx-log.sinks.sinkAvro1.ssl=false
 
#如果设置成true,远程服务器的SSL server certificates将不会被检查。生产环境最好不要设置成true,因为这样容易受到攻击
#nginx-log.sinks.sinkAvro1.trust-all-certs=false
 
#Java truststore的路径。flume使用这个文件中的certificate authority信息,来决定远端的Avro Source的SSL authentication credenti
#als是否被信任。如果这个参数没有被指定,那么这个值将默认为Java JSSE certificate authority files (typically “jssecacerts” or “
#cacerts” in the Oracle JRE)
#nginx-log.sinks.sinkAvro1.truststore
 
#制定truststore的密码
#nginx-log.sinks.sinkAvro1.truststore-password
 
#Java truststore的类型
#nginx-log.sinks.sinkAvro1.truststore-type=JKS
 
#不包含协议的列表,用空格分隔
#nginx-log.sinks.sinkAvro1.exclude-protocols=SSLv3
 
#最大I/O工作线程数。这个配置在NettyAvroRpcClient和NioClientSocketChannelFactory上起作用。默认值为服务器可用处理器个数的两倍。
#nginx-log.sinks.sinkAvro1.maxIoWorkers
 
nginx-log.sinks.sinkAvro1.channel=ch1
 
nginx-log.sinks.sinkAvro2.type=avro
nginx-log.sinks.sinkAvro2.hostname=10.199.203.196
nginx-log.sinks.sinkAvro2.port=44444
nginx-log.sinks.sinkAvro2.batch-size=100
nginx-log.sinks.sinkAvro2.connect-timeout=20000
#nginx-log.sinks.sinkAvro2.request-timeout
#nginx-log.sinks.sinkAvro2.compression-type=none
#nginx-log.sinks.sinkAvro2.compression-level=0
#nginx-log.sinks.sinkAvro2.ssl=false
#nginx-log.sinks.sinkAvro2.trust-all-certs=false
#nginx-log.sinks.sinkAvro2.truststore
#nginx-log.sinks.sinkAvro2.truststore-password
#nginx-log.sinks.sinkAvro2.truststore-type=JKS
#nginx-log.sinks.sinkAvro2.exclude-protocols=SSLv3
#nginx-log.sinks.sinkAvro2.maxIoWorkers
nginx-log.sinks.sinkAvro2.channel=ch1
 
nginx-log.sinks.sinkAvro3.type=avro
nginx-log.sinks.sinkAvro3.hostname=10.199.203.197
nginx-log.sinks.sinkAvro3.port=44444
nginx-log.sinks.sinkAvro3.batch-size=100
nginx-log.sinks.sinkAvro3.connect-timeout=20000
#nginx-log.sinks.sinkAvro3.request-timeout
#nginx-log.sinks.sinkAvro3.compression-type=none
#nginx-log.sinks.sinkAvro3.compression-level=0
#nginx-log.sinks.sinkAvro3.ssl=false
#nginx-log.sinks.sinkAvro3.trust-all-certs=false
#nginx-log.sinks.sinkAvro3.truststore
#nginx-log.sinks.sinkAvro3.truststore-password
#nginx-log.sinks.sinkAvro3.truststore-type=JKS
#nginx-log.sinks.sinkAvro3.exclude-protocols=SSLv3
#nginx-log.sinks.sinkAvro3.maxIoWorkers
nginx-log.sinks.sinkAvro3.channel=ch1
 
nginx-log.sinkgroups = g1
nginx-log.sinkgroups.g1.sinks = sinkAvro1 sinkAvro2 sinkAvro3
nginx-log.sinkgroups.g1.processor.type = failover
nginx-log.sinkgroups.g1.processor.priority.sinkAvro1 = 5
nginx-log.sinkgroups.g1.processor.priority.sinkAvro2 = 10
nginx-log.sinkgroups.g1.processor.priority.sinkAvro3 = 15
nginx-log.sinkgroups.g1.processor.maxpenalty = 30000 

 

上一篇: Flume sink 到 Kafka、HDFS和HBase的例子

下一篇: Flume容错和负载均衡 Flume Sink Processors

1030 人读过
立即登录, 发表评论.
没有帐号? 立即注册
0 条评论
文档导航