摘要:由于标题长度限制,原题是这样某系统万,每十分钟统计一下请求次数最多的个。请求写到日志的话,其实就是超大文件中统计问题。
由于标题长度限制,原题是这样:某系统QPS100万,每十分钟统计一下请求次数最多的100个IP。ip请求写到日志的话,其实就是超大文件中统计top k问题。10分钟6亿条记录,大约是10G级别,所以对于一般单机处理来讲不能一次性加载到内存计算。所以分治算法是处理这类问题的基本思想。
思路前面说了分治思想。那么具体如何分解问题呢。
思路就是把大文件分割成多个可以内存处理的小文件,对每个小文件统计top k问题,最后再对所有统计结果合并得到最终的top k。
注意,这里的分割并不是随意分割的,那样最终结果显然是不对的,必须保证相同的ip记录都分割到同一个文件。那么hash算法最合适不过了,可以把相同的ip哈希到同一文件。
关于top k问题,效率高的解法是使用构造最小堆或者借助快速排序的思想,复杂度为O(nlogk)。这里更适合用最小堆,具体来说,就是先利用前k个数据构建一个固定大小k的最小堆,对之后的数据,小于堆顶不做处理,大于则替换堆顶并调整。这样,对每个文件顺序处理完之后就得到最终结果,而不需要保留每个文件的top k再归并。
实现博主偷懒,借助TreeSet代替最小堆来维护top k数据,TreeSet的话底层是借助红黑树排序,比最小堆复杂些,实际上对每个小文件用红黑树全排序再截取前k个。复杂度O(nlogm),这里m是每个小文件中的数量, m>>k。再有时间的话再用最小堆优化一下,复杂度应为O(nlogk)。
ps:已实现最小堆版本,见实现2,并做了对比实验
定时任务使用quartz实现。
下面是代码。
IP类,封装ip计数,使用TreeSet存放须实现comparable接口。注意这里重写compare方法不要return 0,否则会被TreeSet视为相同对象而放不进去。这个可以看一下TreeSet的实现,它实际上内部还是一个TreeMap,只是把对象作为key,而value没有使用。add添加元素时,会调用TreeMap的put方法,put内部又会调用compare方法,如果compare返回结果为0,只是重新setValue,对TreeSet相当于什么也没做。
package com.hellolvs; import org.apache.commons.lang3.builder.ToStringBuilder; /** * IP计数POJO * * @author lvs * @date 2017/12/08. */ public class IP implements Comparable{ private String ip; private int count; public IP() { } public IP(String ip, int count) { this.ip = ip; this.count = count; } public String getIp() { return ip; } public void setIp(String ip) { this.ip = ip; } public int getCount() { return count; } public void setCount(int count) { this.count = count; } @Override public int compareTo(IP o) { return o.count < this.count ? -1 : 1; } @Override public String toString() { return ToStringBuilder.reflectionToString(this); } }
IPCountJob类,定时统计日志文件中top k个ip。
注意其中的分割文件,这里的分割需要对文件边读边写,不能一次性读入内存再分割。guava io的readLines是直接装入内存的,所以不能用。可以使用java原生的io类,或使用commons io的LineIterator更优雅一些。
package com.hellolvs; import com.google.common.base.Charsets; import com.google.common.base.Objects; import com.google.common.base.StandardSystemProperty; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.common.io.Files; import com.google.common.io.LineProcessor; import org.apache.commons.io.FileUtils; import org.apache.commons.io.LineIterator; import org.quartz.Job; import org.quartz.JobExecutionContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.BufferedWriter; import java.io.File; import java.io.FileWriter; import java.io.IOException; import java.nio.charset.Charset; import java.security.SecureRandom; import java.util.HashMap; import java.util.Map; import java.util.TreeSet; import java.util.concurrent.atomic.AtomicInteger; /** * 定时Job,每十分钟统计请求次数前k的ip * * @author lvs * @date 2017/12/08. */ public class IPCountJob implements Job { private static final Logger LOG = LoggerFactory.getLogger(IPCountJob.class); private static final String LINE_SEPARATOR = StandardSystemProperty.LINE_SEPARATOR.value(); private static final Charset UTF_8 = Charsets.UTF_8; private static final String INPUT_PATH = "/home/lvs/logs/ip.log"; private static final String OUTPUT_PATH = "/home/lvs/logs/split/"; private static final int SPLIT_NUM = 1024; private static final int TOP_K = 100; /** * 利用TreeSet存储请求次数前k的IP */ private TreeSetresultSet = Sets.newTreeSet(); /** * 分割文件用,保存每个文件的写入流对象 */ private final Map bufferMap = Maps.newHashMapWithExpectedSize(SPLIT_NUM); /** * 定时任务,每十分钟统计请求次数前k的IP */ @Override public void execute(JobExecutionContext jobExecutionContext) { // 捕获异常,防止定时任务中断 try { execute(); } catch (Exception e) { LOG.error("定时任务出错:{}", e.getMessage(), e); } } /** * 统计大文件中请求次数前k的IP * * @throws IOException I/O error */ public void execute() throws IOException { // 这里应该每10分钟获取当前轮替日志文件路径,此处用常量路径模拟 File ipLogFile = new File(INPUT_PATH); splitLog(ipLogFile, SPLIT_NUM); File logSplits = new File(OUTPUT_PATH); for (File logSplit : logSplits.listFiles()) { countTopK(logSplit, TOP_K); } LOG.info("结果集:{}", resultSet.size()); for (IP ip : resultSet) { LOG.info("{}", ip); } } /** * 生成模拟日志文件 * * @param logNum 生成日志条数 * @throws IOException I/O error */ public static void generateLog(long logNum) throws IOException { /* 创建文件 */ File log = new File(INPUT_PATH); File parentDir = log.getParentFile(); if (!parentDir.exists()) { parentDir.mkdirs(); } log.createNewFile(); /* 生成随机ip写入文件 */ SecureRandom random = new SecureRandom(); try (BufferedWriter bw = new BufferedWriter(new FileWriter(log))) { for (int i = 0; i < logNum; i++) { StringBuilder sb = new StringBuilder(); sb.append("192.").append(random.nextInt(255)).append(".").append(random.nextInt(255)).append(".") .append(random.nextInt(255)).append(LINE_SEPARATOR); bw.write(sb.toString()); } bw.flush(); } } /** * 分割日志文件 * * @param logFile 待分割文件 * @param fileNum 分割文件数量 * @throws IOException I/O error */ private void splitLog(File logFile, int fileNum) throws IOException { /* 为每个分割文件创建写入流对象 */ for (int i = 0; i < fileNum; i++) { File file = new File(OUTPUT_PATH + i); File parentDir = file.getParentFile(); if (!parentDir.exists()) { parentDir.mkdirs(); } bufferMap.put(i, new BufferedWriter(new FileWriter(file))); } /* 根据ip的hashcode将数据分割到不同文件中 */ LineIterator it = null; try { it = FileUtils.lineIterator(logFile, "UTF-8"); while (it.hasNext()) { String ip = it.nextLine(); int hashCode = Objects.hashCode(ip); hashCode = hashCode < 0 ? -hashCode : hashCode; BufferedWriter writer = bufferMap.get(hashCode % fileNum); writer.write(ip + LINE_SEPARATOR); } } finally { /* 释放资源 */ LineIterator.closeQuietly(it); for (Map.Entry buffer : bufferMap.entrySet()) { BufferedWriter writer = buffer.getValue(); writer.flush(); writer.close(); } bufferMap.clear(); } } /** * 统计请求次数前k的IP * * @param logSplit 当前分割文件 * @param k top k * @throws IOException I/O error */ private void countTopK(File logSplit, int k) throws IOException { /* 读取文件对ip计数 */ HashMap ipCountMap = Files.readLines(logSplit, UTF_8, new LineProcessor >() { private HashMap ipCountMap = Maps.newHashMap(); @Override public boolean processLine(String line) throws IOException { AtomicInteger ipCount = ipCountMap.get(line.trim()); if (ipCount != null) { ipCount.getAndIncrement(); } else { ipCountMap.put(line.trim(), new AtomicInteger(1)); } return true; } @Override public HashMap getResult() { return ipCountMap; } }); /* 统计结果添加到TreeSet */ for (Map.Entry entry : ipCountMap.entrySet()) { resultSet.add(new IP(entry.getKey(), entry.getValue().get())); } /* TreeSet只保留前k个ip */ TreeSet temp = Sets.newTreeSet(); int i = 0; for (IP o : resultSet) { temp.add(o); i++; if (i >= k) { break; } } resultSet = temp; } /** * 返回统计结果 * * @return 结果集合 */ public TreeSet getResult() { return resultSet; } }
Main,定时任务启动
package com.hellolvs; import org.quartz.JobBuilder; import org.quartz.JobDetail; import org.quartz.Scheduler; import org.quartz.SimpleScheduleBuilder; import org.quartz.Trigger; import org.quartz.TriggerBuilder; import org.quartz.impl.StdSchedulerFactory; /** * 定时任务启动器 * * @author lvs * @date 2017/12/11. */ public class Main { public static void main(String[] args) throws Exception { // 生成模拟日志文件 IPCountJob.generateLog(600000000); JobDetail job = JobBuilder.newJob(IPCountJob.class) .withIdentity("ipCountJob", "group1").build(); Trigger trigger = TriggerBuilder .newTrigger() .withIdentity("ipCountTrigger", "group1") .withSchedule( SimpleScheduleBuilder.simpleSchedule() .withIntervalInMinutes(10).repeatForever()) .build(); Scheduler scheduler = new StdSchedulerFactory().getScheduler(); scheduler.start(); scheduler.scheduleJob(job, trigger); } }实现2
IP类
package com.hellolvs; import org.apache.commons.lang3.builder.ToStringBuilder; /** * IP计数POJO * * @author lvs * @date 2017/12/08. */ public class IP implements Comparable{ private String ip; private int count; public IP() { } public IP(String ip, int count) { this.ip = ip; this.count = count; } public String getIp() { return ip; } public void setIp(String ip) { this.ip = ip; } public int getCount() { return count; } public void setCount(int count) { this.count = count; } @Override public int compareTo(IP o) { return Integer.compare(this.count, o.count); } @Override public String toString() { return ToStringBuilder.reflectionToString(this); } }
IPCountJob类,最小堆版本统计top k
package com.hellolvs; import com.google.common.base.Charsets; import com.google.common.base.Objects; import com.google.common.base.StandardSystemProperty; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.io.Files; import com.google.common.io.LineProcessor; import org.apache.commons.io.FileUtils; import org.apache.commons.io.LineIterator; import org.quartz.Job; import org.quartz.JobExecutionContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.BufferedWriter; import java.io.File; import java.io.FileWriter; import java.io.IOException; import java.nio.charset.Charset; import java.security.SecureRandom; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; /** * 定时Job,每十分钟统计请求次数前k的ip * * @author lvs * @date 2017/12/08. */ public class IPCountJob implements Job { private static final Logger LOG = LoggerFactory.getLogger(IPCountJob.class); private static final String LINE_SEPARATOR = StandardSystemProperty.LINE_SEPARATOR.value(); private static final Charset UTF_8 = Charsets.UTF_8; private static final String INPUT_PATH = "/home/lvs/logs/ip.log"; private static final String OUTPUT_PATH = "/home/lvs/logs/split/"; private static final int SPLIT_NUM = 1024; private static final int TOP_K = 100; /** * 利用最小堆结构存储请求次数前k的IP */ private Listresult = Lists.newArrayListWithExpectedSize(TOP_K); /** * 分割文件用,保存每个文件的写入流对象 */ private final Map bufferMap = Maps.newHashMapWithExpectedSize(SPLIT_NUM); /** * 定时任务,每十分钟统计请求次数前k的IP */ @Override public void execute(JobExecutionContext jobExecutionContext) { // 捕获异常,防止定时任务中断 try { execute(); } catch (Exception e) { LOG.error("定时任务出错:{}", e.getMessage(), e); } } /** * 统计大文件中请求次数前k的IP * * @throws IOException I/O error */ public void execute() throws IOException { // 这里应该每10分钟获取当前轮替日志文件路径,此处用常量路径模拟 File ipLogFile = new File(INPUT_PATH); splitLog(ipLogFile, SPLIT_NUM); File logSplits = new File(OUTPUT_PATH); for (File logSplit : logSplits.listFiles()) { countTopK(logSplit, TOP_K); } MinHeap.sort(result); LOG.info("结果集:{}", result.size()); for (int i = result.size() - 1; i >= 0; i--) { LOG.info("{}", result.get(i)); } } /** * 生成模拟日志文件 * * @param logNum 生成日志条数 * @throws IOException I/O error */ public static void generateLog(long logNum) throws IOException { /* 创建文件 */ File log = new File(INPUT_PATH); File parentDir = log.getParentFile(); if (!parentDir.exists()) { parentDir.mkdirs(); } log.createNewFile(); /* 生成随机ip写入文件 */ SecureRandom random = new SecureRandom(); try (BufferedWriter bw = new BufferedWriter(new FileWriter(log))) { for (int i = 0; i < logNum; i++) { StringBuilder sb = new StringBuilder(); sb.append("192.").append(random.nextInt(255)).append(".").append(random.nextInt(255)).append(".") .append(random.nextInt(255)).append(LINE_SEPARATOR); bw.write(sb.toString()); } bw.flush(); } } /** * 分割日志文件 * * @param logFile 待分割文件 * @param fileNum 分割文件数量 * @throws IOException I/O error */ private void splitLog(File logFile, int fileNum) throws IOException { /* 为每个分割文件创建写入流对象 */ for (int i = 0; i < fileNum; i++) { File file = new File(OUTPUT_PATH + i); File parentDir = file.getParentFile(); if (!parentDir.exists()) { parentDir.mkdirs(); } bufferMap.put(i, new BufferedWriter(new FileWriter(file))); } /* 根据ip的hashcode将数据分割到不同文件中 */ LineIterator it = null; try { it = FileUtils.lineIterator(logFile, "UTF-8"); while (it.hasNext()) { String ip = it.nextLine(); int hashCode = Objects.hashCode(ip); hashCode = hashCode < 0 ? -hashCode : hashCode; BufferedWriter writer = bufferMap.get(hashCode % fileNum); writer.write(ip + LINE_SEPARATOR); } } finally { /* 释放资源 */ LineIterator.closeQuietly(it); for (Map.Entry buffer : bufferMap.entrySet()) { BufferedWriter writer = buffer.getValue(); writer.flush(); writer.close(); } bufferMap.clear(); } } /** * 统计请求次数前k的IP * * @param logSplit 当前分割文件 * @param k top k * @throws IOException I/O error */ private void countTopK(File logSplit, int k) throws IOException { /* 读取文件对ip计数 */ HashMap ipCountMap = Files.readLines(logSplit, UTF_8, new LineProcessor >() { private HashMap ipCountMap = Maps.newHashMap(); @Override public boolean processLine(String line) throws IOException { AtomicInteger ipCount = ipCountMap.get(line.trim()); if (ipCount != null) { ipCount.getAndIncrement(); } else { ipCountMap.put(line.trim(), new AtomicInteger(1)); } return true; } @Override public HashMap getResult() { return ipCountMap; } }); /* 前k条数据用来构建初始最小堆,之后的数据比堆顶大则替换堆顶并调堆 */ for (Map.Entry entry : ipCountMap.entrySet()) { IP ip = new IP(entry.getKey(), entry.getValue().get()); if (result.size() != k) { result.add(ip); if (result.size() == k) { MinHeap.initMinHeap(result); } } else { if (ip.compareTo(result.get(0)) > 0) { result.set(0, ip); MinHeap.adjust(result, 0, k); } } } } /** * 返回统计结果 * * @return 结果集合 */ public List getResult() { return result; } }
MinHeap类,最小堆工具
package com.hellolvs; import java.util.List; /** * 最小堆 * * @author lvs * @date 2017-12-12 */ public class MinHeap { /** * 对最小堆排序 * * @param list 已经为最小堆结构的列表 * @param元素须实现Comparable接口 */ public static > void sort(List list) { for (int i = list.size() - 1; i > 0; i--) { swap(list, 0, i); adjust(list, 0, i); } } /** * 初始化最小堆 * * @param list 待初始化为最小堆的列表 * @param 元素须实现Comparable接口 */ public static > void initMinHeap(List list) { /* 从最后一个非叶节点开始至根节点依次调整 */ for (int i = list.size() / 2 - 1; i >= 0; i--) { adjust(list, i, list.size()); } } /** * 调堆 * * @param list 当前堆 * @param 元素须实现Comparable接口 * @param cur 待调整位置 * @param length 当前堆大小 */ public static > void adjust(List list, int cur, int length) { T tmp = list.get(cur); for (int i = 2 * cur + 1; i < length; i = 2 * i + 1) { if (i + 1 < length && list.get(i).compareTo(list.get(i + 1)) > 0) { i++; // i指向孩子节点中最小的节点 } if (tmp.compareTo(list.get(i)) > 0) { list.set(cur, list.get(i)); // 最小孩子节点调整到其父节点 cur = i; // 当前节点置为最小孩子节点,继续调整 } else { break; // 没有调整时退出循环 } } list.set(cur, tmp); // 被调整节点最终存放位置 } /** * 交换List中的元素 * * @param list 待交换列表 * @param i 第一个元素位置 * @param j 第二个元素位置 */ private static > void swap(List list, int i, int j) { T tmp = list.get(i); list.set(i, list.get(j)); list.set(j, tmp); } }
Main类,无改动
package com.hellolvs; import org.quartz.JobBuilder; import org.quartz.JobDetail; import org.quartz.Scheduler; import org.quartz.SimpleScheduleBuilder; import org.quartz.Trigger; import org.quartz.TriggerBuilder; import org.quartz.impl.StdSchedulerFactory; /** * 定时任务启动器 * * @author lvs * @date 2017/12/11. */ public class Main { public static void main(String[] args) throws Exception { // 生成模拟日志文件 IPCountJob.generateLog(600000000); JobDetail job = JobBuilder.newJob(IPCountJob.class) .withIdentity("ipCountJob", "group1").build(); Trigger trigger = TriggerBuilder .newTrigger() .withIdentity("ipCountTrigger", "group1") .withSchedule( SimpleScheduleBuilder.simpleSchedule() .withIntervalInMinutes(10).repeatForever()) .build(); Scheduler scheduler = new StdSchedulerFactory().getScheduler(); scheduler.start(); scheduler.scheduleJob(job, trigger); } }附
附一下pom文件:
对比实验4.0.0 com.hellolvs ipCount 1.0.0 jar 20.0 3.1 2.4 2.6 2.1.7 1.7.5 1.0.13 4.10 1.8 UTF-8 com.google.guava guava ${guava.version} org.apache.commons commons-lang3 ${commons-lang3.version} commons-io commons-io ${commons-io.version} joda-time joda-time ${joda-time.version} org.quartz-scheduler quartz ${org.quartz-scheduler.version} org.slf4j slf4j-api ${org.slf4j.version} ch.qos.logback logback-classic ${logback.version} runtime ch.qos.logback logback-core ${logback.version} runtime junit junit-dep ${junit.version} test com.google.guava guava org.apache.commons commons-lang3 commons-io commons-io joda-time joda-time org.quartz-scheduler quartz org.slf4j slf4j-api ch.qos.logback logback-classic ch.qos.logback logback-core junit junit-dep ROOT org.apache.maven.plugins maven-compiler-plugin ${java.version} ${project.build.sourceEncoding}
生成了6亿条数据的日志。
TreeSet版本:生成6亿条日志时间:521582 分割文件时间:173219 分割后统计top k时间:195037 定时任务执行时间:368294
注:定时任务执行时间指的是对大文件的总统计时间,主要是分割文件+分割后统计top k。
cpu和堆使用情况:
可以看到堆变化明显分为三阶段:对应了生成日志、分割日志、分割后统计top k。
最小堆版本:生成6亿条日志时间:513840 分割文件时间:148861 分割后统计top k时间:190966 定时任务执行时间:339870
cpu和堆使用情况:
总结:
生成日志和分割文件是没有改动的,运行时间不一样,可能有一定误差。
倒是两个版本统计top k时间没有明显的变化,按上面分析O(nlogm)和O(nlogk)应该有比较明显的差距才对,这里n=600000000,m约600000,k=100,各位可以帮忙分析一下效率差距不大的原因。
不过可以看到堆内存使用明显降低了约100MB,因为TreeSet需要添加m个元素再截取k个,而MinHeap只需要添加k个元素。
个人博客:www.hellolvs.com
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/70785.html
摘要:自己的小网站跑在阿里云的上面偶尔也去分析分析自己网站服务器日志看看网站的访问量。然后统计最终返回的数字就是当前所有端口的已建立连接的总数。 自己的小网站跑在阿里云的ECS上面,偶尔也去分析分析自己网站服务器日志,看看网站的访问量。看看有没有黑阔搞破坏!于是收集,整理一些服务器日志分析命令,大家可以试试! 1、查看有多少个IP访问: awk {print $1} log_file|sor...
摘要:今年的无论是常态全链路压测或者是双十一当天,面临的主要问题是如何保障自身系统在海量数据冲击下的稳定性,以及如何更快的展现各个系统的状态及更好的帮助开发同学发现及定位问题。在整个双十一备战过程中,遇到并解决了很多疑难杂症。 摘要: EagleEye作为阿里集团老牌的链路跟踪系统,其自身业务虽不在交易链路上,但却监控着全集团的链路状态,特别是在中间件的远程调用上,覆盖了集团绝大部分的场景,...
摘要:自己的小网站跑在阿里云的上面偶尔也去分析分析自己网站服务器日志看看网站的访问量。表示能够处理个并发请求,这个值可根据负载情况自动调整。最终返回的数字就是当前所有端口的请求总数。 自己的小网站跑在阿里云的ECS上面,偶尔也去分析分析自己网站服务器日志,看看网站的访问量。看看有没有黑阔搞破坏!于是收集,整理一些服务器日志分析命令,大家可以试试! 1、查看有多少个IP访问: awk {pr...
阅读 903·2021-11-22 12:09
阅读 3687·2021-09-27 13:36
阅读 1376·2021-08-20 09:37
阅读 3920·2019-12-27 12:22
阅读 2330·2019-08-30 15:55
阅读 2296·2019-08-30 13:16
阅读 2799·2019-08-26 17:06
阅读 3418·2019-08-23 18:32