本文共 2012 字,大约阅读时间需要 6 分钟。
本文隶属于专栏《1000个问题搞定大数据技术体系》,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢!
本专栏目录结构和参考文献请见
当一个flume挂掉之后重启的时候还是可以接着上一次的数据继续收集
flume在 1.7 版本之前使用的监控一个文件(source exec)、监控一个目录(source spool dir)都无法直接实现
flume在 1.7 版本之后已经集成了该功能
其本质就是记录下每一次消费的位置,把消费信息的位置保存到文件中,后续程序挂掉了再重启的时候,可以接着上一次消费的数据位置继续拉取。
# source 类型---->taildirvim taildir.conf
a1.channels = ch1a1.sources = s1a1.sinks = hdfs-sink1#channela1.channels.ch1.type = memorya1.channels.ch1.capacity=10000a1.channels.ch1.transactionCapacity=500#sourcea1.sources.s1.channels = ch1#监控一个目录下的多个文件新增的内容a1.sources.s1.type = taildir#通过 json 格式存下每个文件消费的偏移量,避免从头消费a1.sources.s1.positionFile = /opt/bigdata/flume/index/taildir_position.jsona1.sources.s1.filegroups = f1 f2 f3 a1.sources.s1.filegroups.f1 = /home/hadoop/taillogs/access.loga1.sources.s1.filegroups.f2 = /home/hadoop/taillogs/nginx.loga1.sources.s1.filegroups.f3 = /home/hadoop/taillogs/web.loga1.sources.s1.headers.f1.headerKey = accessa1.sources.s1.headers.f2.headerKey = nginxa1.sources.s1.headers.f3.headerKey = weba1.sources.s1.fileHeader = true##sinka1.sinks.hdfs-sink1.channel = ch1a1.sinks.hdfs-sink1.type = hdfsa1.sinks.hdfs-sink1.hdfs.path =hdfs://node1:9000/demo/data/%{headerKey}a1.sinks.hdfs-sink1.hdfs.filePrefix = event_dataa1.sinks.hdfs-sink1.hdfs.fileSuffix = .loga1.sinks.hdfs-sink1.hdfs.rollSize = 1048576a1.sinks.hdfs-sink1.hdfs.rollInterval =20a1.sinks.hdfs-sink1.hdfs.rollCount = 10a1.sinks.hdfs-sink1.hdfs.batchSize = 1500a1.sinks.hdfs-sink1.hdfs.round = truea1.sinks.hdfs-sink1.hdfs.roundUnit = minutea1.sinks.hdfs-sink1.hdfs.threadsPoolSize = 25a1.sinks.hdfs-sink1.hdfs.fileType =DataStreama1.sinks.hdfs-sink1.hdfs.writeFormat = Texta1.sinks.hdfs-sink1.hdfs.callTimeout = 60000
运行后生成的 taildir_position.json 文件信息如下:
[{ "inode":102626782,"pos":123,"file":"/home/hadoop/taillogs/access.log"},{ "inode":102626785,"pos":123,"file":"/home/hadoop/taillogs/web.log"},{ "inode":102626786,"pos":123,"file":"/home/hadoop/taillogs/nginx.log"}]
这里inode就是标记文件的,文件名称改变,这个inode不会变,pos记录偏移量,file就是绝对路径
转载地址:http://nkgji.baihongyu.com/