摘要:的选哪个首选断点还原可以记录偏移量可配置文件组,里面使用正则表达式配置多个要监控的文件就凭第一点其他的都被比下去了这么好的有一点不完美,不能支持递归监控文件夹。
Flume的source选哪个?
taildir source首选!
1.断点还原 positionFile可以记录偏移量
2.可配置文件组,里面使用正则表达式配置多个要监控的文件
就凭第一点其他的source都被比下去了!
这么好的taildir source有一点不完美,不能支持递归监控文件夹。
所以就只能修改源代码了……好玩,我喜欢~
Flume的taildir source启动会调用start()方法作初始化,里面创建一个ReliableTaildirEventReader,这里用到了建造者模式
@Override public synchronized void start() { logger.info("{} TaildirSource source starting with directory: {}", getName(), filePaths); try { reader = new ReliableTaildirEventReader.Builder() .filePaths(filePaths) .headerTable(headerTable) .positionFilePath(positionFilePath) .skipToEnd(skipToEnd) .addByteOffset(byteOffsetHeader) .cachePatternMatching(cachePatternMatching) .recursive(isRecursive) .annotateFileName(fileHeader) .fileNameHeader(fileHeaderKey) .build(); } catch (IOException e) { throw new FlumeException("Error instantiating ReliableTaildirEventReader", e); } idleFileChecker = Executors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder().setNameFormat("idleFileChecker").build()); idleFileChecker.scheduleWithFixedDelay(new idleFileCheckerRunnable(), idleTimeout, checkIdleInterval, TimeUnit.MILLISECONDS); positionWriter = Executors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder().setNameFormat("positionWriter").build()); positionWriter.scheduleWithFixedDelay(new PositionWriterRunnable(), writePosInitDelay, writePosInterval, TimeUnit.MILLISECONDS); super.start(); logger.debug("TaildirSource started"); sourceCounter.start(); }
taildir source属于PollableSource,
/** * A {@link Source} that requires an external driver to poll to determine * whether there are {@linkplain Event events} that are available to ingest * from the source. * * @see org.apache.flume.source.EventDrivenSourceRunner */ public interface PollableSource extends Source { ...
这段注释的意思是PollableSource是需要一个外部驱动去查看有没有需要消费的事件,从而拉取事件,讲白了就是定时拉取。所以flume也不一定是真正实时的,只是隔一会儿不停地来查看事件而已。(与之相应的是另一种EventDrivenSourceRunner)
那么taildir source在定时拉取事件的时候是调用的process方法
@Override public Status process() { Status status = Status.READY; try { existingInodes.clear(); existingInodes.addAll(reader.updateTailFiles()); for (long inode : existingInodes) { TailFile tf = reader.getTailFiles().get(inode); if (tf.needTail()) { tailFileProcess(tf, true); } } closeTailFiles(); try { TimeUnit.MILLISECONDS.sleep(retryInterval); } catch (InterruptedException e) { logger.info("Interrupted while sleeping"); } } catch (Throwable t) { logger.error("Unable to tail files", t); status = Status.BACKOFF; } return status; }
重点就是下面这几行
existingInodes.addAll(reader.updateTailFiles());
for (long inode : existingInodes) {
TailFile tf = reader.getTailFiles().get(inode);
if (tf.needTail()) {
tailFileProcess(tf, true);
} }
从reader.updateTailFiles()获取需要监控的文件,然后对每一个进行处理,查看最后修改时间,判定是否需要tail,需要tail就tail
那么进入reader.updateTailFiles()
for (TaildirMatcher taildir : taildirCache) { Mapheaders = headerTable.row(taildir.getFileGroup()); for (File f : taildir.getMatchingFiles()) { long inode = getInode(f); TailFile tf = tailFiles.get(inode); if (tf == null || !tf.getPath().equals(f.getAbsolutePath())) { long startPos = skipToEnd ? f.length() : 0; tf = openFile(f, headers, inode, startPos);
遍历每一个正则表达式匹配对应的匹配器,每个匹配器去获取匹配的文件!taildir.getMatchingFiles()
ListgetMatchingFiles() { long now = TimeUnit.SECONDS.toMillis( TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis())); long currentParentDirMTime = parentDir.lastModified(); List result; // calculate matched files if // - we don"t want to use cache (recalculate every time) OR // - directory was clearly updated after the last check OR // - last mtime change wasn"t already checked for sure // (system clock hasn"t passed that second yet) if (!cachePatternMatching || lastSeenParentDirMTime < currentParentDirMTime || !(currentParentDirMTime < lastCheckedTime)) { lastMatchedFiles = sortByLastModifiedTime(getMatchingFilesNoCache(isRecursive)); lastSeenParentDirMTime = currentParentDirMTime; lastCheckedTime = now; } return lastMatchedFiles; }
可以看到getMatchingFilesNoCache(isRecursive)就是获取匹配的文件的方法,也就是需要修改的方法了!
ps:这里的isRecursive是我加的~
点进去:
private ListgetMatchingFilesNoCache() { List result = Lists.newArrayList(); try (DirectoryStream stream = Files.newDirectoryStream(parentDir.toPath(), fileFilter)) { for (Path entry : stream) { result.add(entry.toFile()); } } catch (IOException e) { logger.error("I/O exception occurred while listing parent directory. " + "Files already matched will be returned. " + parentDir.toPath(), e); } return result; }
源码是用了Files.newDirectoryStream(parentDir.toPath(), fileFilter)),将父目录下符合正则表达式的文件都添加到一个迭代器里。(这里还用了try (...)的语法糖)
我在这个getMatchingFilesNoCache()方法下面下了一个重载的方法, 可增加扩展性:
private ListgetMatchingFilesNoCache(boolean recursion) { if (!recursion) { return getMatchingFilesNoCache(); } List result = Lists.newArrayList(); // 使用非递归的方式遍历文件夹 Queue dirs = new ArrayBlockingQueue<>(10); dirs.offer(parentDir); while (dirs.size() > 0) { File dir = dirs.poll(); try { DirectoryStream stream = Files.newDirectoryStream(dir.toPath(), fileFilter); stream.forEach(path -> result.add(path.toFile())); } catch (IOException e) { logger.error("I/O exception occurred while listing parent directory. " + "Files already matched will be returned. (recursion)" + parentDir.toPath(), e); } File[] dirList = dir.listFiles(); assert dirList != null; for (File f : dirList) { if (f.isDirectory()) { dirs.add(f); } } } return result; }
我使用了非递归的方式遍历文件夹,就是树到队列的转换。
到这里,核心部分就改完了。接下来要处理这个recursion的参数
一路改构造方法,添加这个参数,最终参数从哪来呢?
flume的source启动时会调用configure方法,将Context中的内容配置进reader等对象中。
isRecursive = context.getBoolean(RECURSIVE, DEFAULT_RECURSIVE);
context从TaildirSourceConfigurationConstants中获取配置名和默认值
/** * Whether to support recursion. */ public static final String RECURSIVE = "recursive"; public static final boolean DEFAULT_RECURSIVE = false;
这里的recursive也就是flume配置文件里配置项了
# Whether to support recusion a1.sources.r1.recursive = true大功告成,打包试试!
用maven只对这一个module打包。我把这个module的pom改了下artifactId,加上了自己名字作个纪念,哈哈
可惜pom里面不能写中文……
org.apache.flume.flume-ng-sources flume-taildir-source-recursive-by-Wish000 Flume Taildir Source
执行package将其放在flume的lib下,替换原来的flume-taildir-source***.jar
启动,测试,成功!
具体代码见GitHub地址:https://github.com/Wish000/me...
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/36029.html
摘要:的选哪个首选断点还原可以记录偏移量可配置文件组,里面使用正则表达式配置多个要监控的文件就凭第一点其他的都被比下去了这么好的有一点不完美,不能支持递归监控文件夹。 Flume的source选哪个?taildir source首选!1.断点还原 positionFile可以记录偏移量2.可配置文件组,里面使用正则表达式配置多个要监控的文件就凭第一点其他的source都被比下去了!这么好的t...
摘要:于是便诞生了随行付分布式文件系统简称,提供的海量安全低成本高可靠的云存储服务。子系统相关流程图如下核心实现主要为随行付各个业务系统提供文件共享和访问服务,并且可以按应用统计流量命中率空间等指标。 背景 传统Web应用中所有的功能部署在一起,图片、文件也在一台服务器;应用微服务架构后,服务之间的图片共享通过FTP+Nginx静态资源的方式进行访问,文件共享通过nfs磁盘挂载的方式进行访问...
摘要:对于一般的采集需求,通过对的简单配置即可实现。针对特殊场景也具备良好的自定义扩展能力,因此,可以适用于大部分的日常数据采集场景。 文章作者:foochane 原文链接:https://foochane.cn/article/2019062701.html Flume日志采集框架 安装和部署 Flume运行机制 采集静态文件到hdfs 采集动态日志文件到hdfs 两个agent级联 F...
阅读 2395·2021-11-11 16:54
阅读 1204·2021-09-22 15:23
阅读 3644·2021-09-07 09:59
阅读 1990·2021-09-02 15:41
阅读 3283·2021-08-17 10:13
阅读 3037·2019-08-30 15:53
阅读 1235·2019-08-30 13:57
阅读 1210·2019-08-29 15:16