Flume sink 到 Kafka、HDFS和HBase的例子
? flume ?    2016-05-24 22:04:31    1548    0    0
holynull   ? flume ?

我们使用一个Avro Source,通过3个replicating channel,分别sink到Kafka、HDFS和HBase。Agent配置如下:

tier1.sources = avroSrc
tier1.channels = ch1 ch2 ch3
tier1.sinks = sKafka sHbase sHdfs
 
tier1.sources.avroSrc.type = avro
tier1.sources.avroSrc.bind = 0.0.0.0
tier1.sources.avroSrc.port = 44444
tier1.sources.avroSrc.channels = ch1 ch2 ch3
 
#产生的最大工作线程数
#tier1.sources.avroSrc.threads
 
#压缩类型,none或者deflate
#tier1.sources.avroSrc.compression-type
 
#设置成true表示使用ssl协议加密。同时必须指定“keystore” 和 “keystore-password”
#tier1.sources.avroSrc.ssl=false
 
#Java keystore文件的路径
#tier1.sources.avroSrc.keystore
 
#Java keystore的密码
#tier1.sources.avroSrc.keystore-password
 
#Java keystore的类型。JKS或者PKCS12
#tier1.sources.avroSrc.keystore-type=JKS
 
#不包含的协议,用空格分隔
#tier1.sources.avroSrc.exclude-protocols=SSLv3
 
#设置成true,表示对netty开启IP地址过滤
#tier1.sources.avroSrc.ipFilter=false
 
#定义IP地址过滤规则
#tier1.sources.avroSrc.ipFilterRules
 
#给source增加一个自定义拦截器
tier1.sources.avroSrc.interceptors = myinterceptor
#自定义拦截器,在flume-ext-1.0-SNAPSHOT.jar中
tier1.sources.avroSrc.interceptors.myinterceptor.type = com.wanda.flume.interceptor.MyInterceptor$Builder
 
tier1.channels.ch1.type = memory
 
#channel中保存的event的最大数量
tier1.channels.ch1.capacity = 1000
 
#每次从source取得event的最大数量和每次发送给sink的最大数量
tier1.channels.ch1.transactionCapacity = 100
 
#增加和移除一个event的超时时间,单位秒
tier1.channels.ch1.keep-alive=3
 
#定义event header占用的缓存百分比。缓存的大小介于byteCapacity和预估channel中所有event的size之间
#tier1.channels.ch1.byteCapacityBufferPercentage=20
 
#在这个channel中允许存储event body的总字节数。这正是设置byteCapacityBufferPercentage的原因。默认值是JVM最大可用内存的80%(即JVM
#参数-Xmx的80%)。注意,如果存在多个channel在单独的一个JVM中,并且他们保存了相同的event(即,一个source使用两个replicating chann
#el),那么event占用了event size双倍大小的字节容量(byteCapacity )。这个参数设置成0,表示这个值被设置成内部硬性上限200GB左右。
#tier1.channels.ch1.byteCapacity
 
tier1.channels.ch2.type = memory
tier1.channels.ch2.capacity = 1000
tier1.channels.ch2.transactionCapacity = 100
tier1.channels.ch2.keep-alive=3
#tier1.channels.ch2.byteCapacityBufferPercentage=20
#tier1.channels.ch2.byteCapacity
 
tier1.channels.ch3.type = memory
tier1.channels.ch3.capacity = 1000
tier1.channels.ch3.transactionCapacity = 100
tier1.channels.ch3.keep-alive=3
#tier1.channels.ch3.byteCapacityBufferPercentage=20
#tier1.channels.ch3.byteCapacity
 
#####################################################Kafka Sink#########################################################
#使用flume1.6的kafka sink
#不用设置topic,topic在event header中
tier1.sinks.sKafka.type = org.apache.flume.sink.kafka.KafkaSink
#sinke连接的broker列表,如果获取topic的分区,只设置一部分broker即可,但是建议设置至少设置2个broker保证HA.格式用户逗号分隔
#hostname:port
tier1.sinks.sKafka.brokerList=10.199.203.196:9092,10.199.203.197:9092
 
#消息发布到kafka的topic。如果设置了这个参数,消息将被发布到指定的topic。如果event header中包含topic域,这个设置将会被event heade
#-r中的topic域覆盖。
#tier1.sinks.sKafka.topic=default-flume-topic
 
#一次批量处理的message数量。越大批处理量,可以增加吞度量,同时也会使延迟变长。
tier1.sinks.sKafka.batchSize=100
 
#需要多少个副本确认,才认定为成功写入。0:不等待任何确认消息;1:仅等待leader的确认消息;-1:等待所有副本都确认。设置成-1可以避免leade
#r失败造成的数据丢失
tier1.sinks.sKafka.requiredAcks=1
 
tier1.sinks.sKafka.channel = ch1
 
#####################################################HBse Sink##########################################################
 
tier1.sinks.sHbase.type = asynchbase
 
#hbase-site.xml中hbase.zookeeper.quorum的值;如果在没有设置这个值,flume会用classpath下找到的第一个hbase-site.xml中的值代替;
#tier1.sinks.sHbase.zookeeperQuorum = 10.199.203.193:2181,10.199.203.194:2181,10.199.203.195:2181
 
#hbase-site.xml中zookeeper.znode.parent的值
#tier1.sinks.sHbase.znodeParent = /hbase
 
tier1.sinks.sHbase.table = wdmap_log
 
tier1.sinks.sHbase.columnFamily = logcf
 
#每次批量写入的event数量
tier1.sinks.sHbase.batchSize = 5
 
#是否在通一个处理批次中,合并对同一个hbase cell多个操作
tier1.sinks.sHbase.coalesceIncrements=false
 
#等待hbase对一个事务中所有event的acks(正确应答)的时长,单位ms
tier1.sinks.sHbase.timeout=60000
 
#tier1.sinks.sHbase.serializer = org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer
 
tier1.sinks.sHbase.serializer = com.wanda.flume.sink.hbase.AsyncHbaseLogEventSerializer
 
tier1.sinks.sHbase.channel = ch2
 
#####################################################HDFS Sink##########################################################
 
tier1.sinks.sHdfs.channel = ch3
 
tier1.sinks.sHdfs.type = hdfs
 
tier1.sinks.sHdfs.hdfs.path = hdfs://10.199.203.193:9000/wdmap-log
 
#hdfs目录下文件名前缀
tier1.sinks.sHdfs.hdfs.filePrefix = FlumeData
 
#hdfs上文件后缀
#tier1.sinks.sHdfs.hdfs.fileSuffix
 
#临时文件前缀
#tier1.sinks.sHdfs.hdfs.inUsePrefix
 
#临时文件后缀
tier1.sinks.sHdfs.hdfs.inUseSuffix=.tmp
 
#文件滚动时间间隔,设置0表示不按照时间滚动
tier1.sinks.sHdfs.hdfs.rollInterval=0
 
#触发文件滚动的byte数,0表示不按照文件size滚动
tier1.sinks.sHdfs.hdfs.rollSize = 1000000
 
#文件滚动钱写入的event个数,0表示不按照event个数滚动文件
tier1.sinks.sHdfs.hdfs.rollCount = 0
 
#发现非活动文件后多久,关闭该文件;设置0表示,不启用自动关闭闲置文件
tier1.sinks.sHdfs.hdfs.idleTimeout=0
 
#数据即将写入HDFS之时,一次性批量写入的event的个数
tier1.sinks.sHdfs.hdfs.batchSize = 1000
 
#压缩解码器,gzip,bzip2,lzo,lzop,snappy
#tier1.sinks.sHdfs.hdfs.codeC
 
#文件格式。现在支持SequenceFile, DataStream, CompressedStream. DataStream没有压缩,所以不要设置codeC;CompressedStream则需要
#配置一个可用codeC
tier1.sinks.sHdfs.hdfs.fileType=DataStream
 
#最大允许打开的文件数量。打开文件数量超过这个数量,最先打开的文件将被关闭
tier1.sinks.sHdfs.hdfs.maxOpenFiles=5000
 
#指定block的最小副本数。如果没有设置,默认取classpath下Hadoop的默认设置
#tier1.sinks.sHdfs.hdfs.minBlockReplicas
 
#Format for sequence file records. One of “Text” or “Writable” (the default).
tier1.sinks.sHdfs.hdfs.writeFormat = Text
 
#HDFS操作(open,write,flush,close)timeout时间,单位ms。如果很多操作都超时,那么可以增加这个数值。
tier1.sinks.sHdfs.hdfs.callTimeout = 10000
 
#执行HDFS IO操作(opent,write)的线程数
tier1.sinks.sHdfs.hdfs.threadsPoolSize = 10
 
#定时滚动文件线程的数量
tier1.sinks.sHdfs.hdfs.rollTimerPoolSize=1
 
#Kerberos user principal for accessing secure HDFS
#tier1.sinks.sHdfs.hdfs.kerberosPrincipal
 
#Kerberos keytab for accessing secure HDFS
#tier1.sinks.sHdfs.hdfs.kerberosKeytab
 
#tier1.sinks.sHdfs.hdfs.proxyUser
 
#时间戳是否线下舍入;将影响除了%t以外的所有时间转意符
tier1.sinks.sHdfs.hdfs.round=false
 
#Rounded down to the highest multiple of this (in the unit configured using hdfs.roundUnit), less than current time.
tier1.sinks.sHdfs.hdfs.roundValue=1
 
#The unit of the round down value - second, minute or hour.
tier1.sinks.sHdfs.hdfs.roundUnit=second
 
#Name of the timezone that should be used for resolving the directory path, e.g. America/Los_Angeles.
#tier1.sinks.sHdfs.hdfs.timeZone=Local Time
 
#Use the local time (instead of the timestamp from the event header) while replacing the escape sequences.
#tier1.sinks.sHdfs.hdfs.useLocalTimeStamp=false
 
#初始化关闭操作后,sink必须尝试修改文件名称的次数。如果设置成1,sink不会重试修改文件名称,临时文件将会保持在打开状态。如果设置成0,
#sink会一直不断的尝试,直到成功(没有上限次数)。如果close操作失败文件仍然可能一直保持打开状态,但是数据完好无损,并且在flume重启后
#文件将被关闭。
tier1.sinks.sHdfs.hdfs.closeTries=0
 
#两个连续close操作间隔时间,单位s。关闭操作消耗很多RPC通信,所以这个值设置过低将增加NameNode的负载。如果设置成0或者更低,那么在sink
#第一次close失败后,临时文件将一直保持打开状态。
tier1.sinks.sHdfs.hdfs.retryInterval=180
 
#Other possible options include avro_event or the fully-qualified class name of an implementation of the EventSerializer
#.Builder interface.
#tier1.sinks.sHdfs.serializer

 

上一篇: Flume容错Avro sink示例

下一篇: Flume容错和负载均衡 Flume Sink Processors

1548 人读过
立即登录, 发表评论.
没有帐号? 立即注册
0 条评论
文档导航