我们使用一个Avro Source,通过3个replicating channel,分别sink到Kafka、HDFS和HBase。Agent配置如下:
tier1.sources = avroSrc tier1.channels = ch1 ch2 ch3 tier1.sinks = sKafka sHbase sHdfs tier1.sources.avroSrc.type = avro tier1.sources.avroSrc.bind = 0.0.0.0 tier1.sources.avroSrc.port = 44444 tier1.sources.avroSrc.channels = ch1 ch2 ch3 #产生的最大工作线程数 #tier1.sources.avroSrc.threads #压缩类型,none或者deflate #tier1.sources.avroSrc.compression-type #设置成true表示使用ssl协议加密。同时必须指定“keystore” 和 “keystore-password” #tier1.sources.avroSrc.ssl=false #Java keystore文件的路径 #tier1.sources.avroSrc.keystore #Java keystore的密码 #tier1.sources.avroSrc.keystore-password #Java keystore的类型。JKS或者PKCS12 #tier1.sources.avroSrc.keystore-type=JKS #不包含的协议,用空格分隔 #tier1.sources.avroSrc.exclude-protocols=SSLv3 #设置成true,表示对netty开启IP地址过滤 #tier1.sources.avroSrc.ipFilter=false #定义IP地址过滤规则 #tier1.sources.avroSrc.ipFilterRules #给source增加一个自定义拦截器 tier1.sources.avroSrc.interceptors = myinterceptor #自定义拦截器,在flume-ext-1.0-SNAPSHOT.jar中 tier1.sources.avroSrc.interceptors.myinterceptor.type = com.wanda.flume.interceptor.MyInterceptor$Builder tier1.channels.ch1.type = memory #channel中保存的event的最大数量 tier1.channels.ch1.capacity = 1000 #每次从source取得event的最大数量和每次发送给sink的最大数量 tier1.channels.ch1.transactionCapacity = 100 #增加和移除一个event的超时时间,单位秒 tier1.channels.ch1.keep-alive=3 #定义event header占用的缓存百分比。缓存的大小介于byteCapacity和预估channel中所有event的size之间 #tier1.channels.ch1.byteCapacityBufferPercentage=20 #在这个channel中允许存储event body的总字节数。这正是设置byteCapacityBufferPercentage的原因。默认值是JVM最大可用内存的80%(即JVM #参数-Xmx的80%)。注意,如果存在多个channel在单独的一个JVM中,并且他们保存了相同的event(即,一个source使用两个replicating chann #el),那么event占用了event size双倍大小的字节容量(byteCapacity )。这个参数设置成0,表示这个值被设置成内部硬性上限200GB左右。 #tier1.channels.ch1.byteCapacity tier1.channels.ch2.type = memory tier1.channels.ch2.capacity = 1000 tier1.channels.ch2.transactionCapacity = 100 tier1.channels.ch2.keep-alive=3 #tier1.channels.ch2.byteCapacityBufferPercentage=20 #tier1.channels.ch2.byteCapacity tier1.channels.ch3.type = memory tier1.channels.ch3.capacity = 1000 tier1.channels.ch3.transactionCapacity = 100 tier1.channels.ch3.keep-alive=3 #tier1.channels.ch3.byteCapacityBufferPercentage=20 #tier1.channels.ch3.byteCapacity #####################################################Kafka Sink######################################################### #使用flume1.6的kafka sink #不用设置topic,topic在event header中 tier1.sinks.sKafka.type = org.apache.flume.sink.kafka.KafkaSink #sinke连接的broker列表,如果获取topic的分区,只设置一部分broker即可,但是建议设置至少设置2个broker保证HA.格式用户逗号分隔 #hostname:port tier1.sinks.sKafka.brokerList=10.199.203.196:9092,10.199.203.197:9092 #消息发布到kafka的topic。如果设置了这个参数,消息将被发布到指定的topic。如果event header中包含topic域,这个设置将会被event heade #-r中的topic域覆盖。 #tier1.sinks.sKafka.topic=default-flume-topic #一次批量处理的message数量。越大批处理量,可以增加吞度量,同时也会使延迟变长。 tier1.sinks.sKafka.batchSize=100 #需要多少个副本确认,才认定为成功写入。0:不等待任何确认消息;1:仅等待leader的确认消息;-1:等待所有副本都确认。设置成-1可以避免leade #r失败造成的数据丢失 tier1.sinks.sKafka.requiredAcks=1 tier1.sinks.sKafka.channel = ch1 #####################################################HBse Sink########################################################## tier1.sinks.sHbase.type = asynchbase #hbase-site.xml中hbase.zookeeper.quorum的值;如果在没有设置这个值,flume会用classpath下找到的第一个hbase-site.xml中的值代替; #tier1.sinks.sHbase.zookeeperQuorum = 10.199.203.193:2181,10.199.203.194:2181,10.199.203.195:2181 #hbase-site.xml中zookeeper.znode.parent的值 #tier1.sinks.sHbase.znodeParent = /hbase tier1.sinks.sHbase.table = wdmap_log tier1.sinks.sHbase.columnFamily = logcf #每次批量写入的event数量 tier1.sinks.sHbase.batchSize = 5 #是否在通一个处理批次中,合并对同一个hbase cell多个操作 tier1.sinks.sHbase.coalesceIncrements=false #等待hbase对一个事务中所有event的acks(正确应答)的时长,单位ms tier1.sinks.sHbase.timeout=60000 #tier1.sinks.sHbase.serializer = org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer tier1.sinks.sHbase.serializer = com.wanda.flume.sink.hbase.AsyncHbaseLogEventSerializer tier1.sinks.sHbase.channel = ch2 #####################################################HDFS Sink########################################################## tier1.sinks.sHdfs.channel = ch3 tier1.sinks.sHdfs.type = hdfs tier1.sinks.sHdfs.hdfs.path = hdfs://10.199.203.193:9000/wdmap-log #hdfs目录下文件名前缀 tier1.sinks.sHdfs.hdfs.filePrefix = FlumeData #hdfs上文件后缀 #tier1.sinks.sHdfs.hdfs.fileSuffix #临时文件前缀 #tier1.sinks.sHdfs.hdfs.inUsePrefix #临时文件后缀 tier1.sinks.sHdfs.hdfs.inUseSuffix=.tmp #文件滚动时间间隔,设置0表示不按照时间滚动 tier1.sinks.sHdfs.hdfs.rollInterval=0 #触发文件滚动的byte数,0表示不按照文件size滚动 tier1.sinks.sHdfs.hdfs.rollSize = 1000000 #文件滚动钱写入的event个数,0表示不按照event个数滚动文件 tier1.sinks.sHdfs.hdfs.rollCount = 0 #发现非活动文件后多久,关闭该文件;设置0表示,不启用自动关闭闲置文件 tier1.sinks.sHdfs.hdfs.idleTimeout=0 #数据即将写入HDFS之时,一次性批量写入的event的个数 tier1.sinks.sHdfs.hdfs.batchSize = 1000 #压缩解码器,gzip,bzip2,lzo,lzop,snappy #tier1.sinks.sHdfs.hdfs.codeC #文件格式。现在支持SequenceFile, DataStream, CompressedStream. DataStream没有压缩,所以不要设置codeC;CompressedStream则需要 #配置一个可用codeC tier1.sinks.sHdfs.hdfs.fileType=DataStream #最大允许打开的文件数量。打开文件数量超过这个数量,最先打开的文件将被关闭 tier1.sinks.sHdfs.hdfs.maxOpenFiles=5000 #指定block的最小副本数。如果没有设置,默认取classpath下Hadoop的默认设置 #tier1.sinks.sHdfs.hdfs.minBlockReplicas #Format for sequence file records. One of “Text” or “Writable” (the default). tier1.sinks.sHdfs.hdfs.writeFormat = Text #HDFS操作(open,write,flush,close)timeout时间,单位ms。如果很多操作都超时,那么可以增加这个数值。 tier1.sinks.sHdfs.hdfs.callTimeout = 10000 #执行HDFS IO操作(opent,write)的线程数 tier1.sinks.sHdfs.hdfs.threadsPoolSize = 10 #定时滚动文件线程的数量 tier1.sinks.sHdfs.hdfs.rollTimerPoolSize=1 #Kerberos user principal for accessing secure HDFS #tier1.sinks.sHdfs.hdfs.kerberosPrincipal #Kerberos keytab for accessing secure HDFS #tier1.sinks.sHdfs.hdfs.kerberosKeytab #tier1.sinks.sHdfs.hdfs.proxyUser #时间戳是否线下舍入;将影响除了%t以外的所有时间转意符 tier1.sinks.sHdfs.hdfs.round=false #Rounded down to the highest multiple of this (in the unit configured using hdfs.roundUnit), less than current time. tier1.sinks.sHdfs.hdfs.roundValue=1 #The unit of the round down value - second, minute or hour. tier1.sinks.sHdfs.hdfs.roundUnit=second #Name of the timezone that should be used for resolving the directory path, e.g. America/Los_Angeles. #tier1.sinks.sHdfs.hdfs.timeZone=Local Time #Use the local time (instead of the timestamp from the event header) while replacing the escape sequences. #tier1.sinks.sHdfs.hdfs.useLocalTimeStamp=false #初始化关闭操作后,sink必须尝试修改文件名称的次数。如果设置成1,sink不会重试修改文件名称,临时文件将会保持在打开状态。如果设置成0, #sink会一直不断的尝试,直到成功(没有上限次数)。如果close操作失败文件仍然可能一直保持打开状态,但是数据完好无损,并且在flume重启后 #文件将被关闭。 tier1.sinks.sHdfs.hdfs.closeTries=0 #两个连续close操作间隔时间,单位s。关闭操作消耗很多RPC通信,所以这个值设置过低将增加NameNode的负载。如果设置成0或者更低,那么在sink #第一次close失败后,临时文件将一直保持打开状态。 tier1.sinks.sHdfs.hdfs.retryInterval=180 #Other possible options include avro_event or the fully-qualified class name of an implementation of the EventSerializer #.Builder interface. #tier1.sinks.sHdfs.serializer
没有帐号? 立即注册