摘要:所有不相关的数据会立即丢弃,由于查询都是在一个无限的数据流中,这样的优势显而易见。基于这些监控事件数据流,我们想要检测出可能要过热的机架,从而调整负载和降温。
原文链接
正文随着传感网络的普及,智能设备持续收集着越来越多的数据,分析近乎实时,不断增长的数据流是一个巨大的挑战。快速应对变化趋势、交付最新的 BI 应用会成为一个公司成败的关键因素。其中关键问题就是数据流的事件模型检测。
Complex event processing (CEP) 要处理的就是在持续事件中匹配模式的问题。匹配结果通常就是:从输入事件中提取的复杂事件。传统 DBMSs 在固定数据上执行查询,而 CEP 在存储的 query 上执行(译者注:某个范围)。所有不相关的数据会立即丢弃,由于 CEP 查询都是在一个无限的数据流中,这样的优势显而易见。更重要的是,输入实时被处理,系统一旦收到某一个序列的所有数据,结果就会被输出。CEP 因此有着非常高效的实时分析能力。
由此,CEP 的处理范式吸引了很多技术人员兴趣,有着广泛的应用场景。值得注意的是,CEP 现在用在了金融应用,例如:股票市场趋势、信用卡欺诈检测。还有基于 RFID 的追踪和监控,例如:库房小偷检测。CEP 还可以被用于基于用户可疑行为的网络入侵检测。
Apache Flink 有着天生的真正的流处理能力,具有低延迟、高吞吐量的特性,和 CEP 简直绝配。因此,Flink 社区在 Flink 1.0 引入了第一个版本的 CEP library。接下来我们会使用一个数据中心监控的案例介绍其使用。
假设这样一个场景:数据中心有很多机架,每一个机架都有功率和温度监控。监控设备会不断产生功率和温度事件。基于这些监控事件数据流,我们想要检测出可能要过热的机架,从而调整负载和降温。
针对这种场景,我们采取两阶段处理方法。首先,监控温度事件,当检测到连续两个超过阈值的温度事件,即生成一个当前平均温度的警告(warning),温度报警不一定意味着过热。但是如果看到两个连续的升温警告事件,则生成机架过热报警(alert)。此时,需要采取措施冷却机架。
首先,定义来源的监控事件流,每一个 message 都包含来源 rack ID(机架 ID)。温度事件包含当前温度,功率事件包含当前电压。我们把事件模型定义为 POJOs.
public abstract class MonitoringEvent { private int rackID; ... } public class TemperatureEvent extends MonitoringEvent { private double temperature; ... } public class PowerEvent extends MonitoringEvent { private double voltage; ... }
我们可以使用 Flink 的 connector(比如:Kafka, RabbitMQ 等),生成 DataStream
每个模式都包含了一个可以定义过滤 (filter) 条件的事件序列。模式 (pattern) 的第一个事件通常都命名为"First Event"。
Pattern.begin("First Event");
这句话会匹配每一个输入的监控事件(monitoring event),而我们只需要温度大于一定阈值的温度事件(TemperatureEvents),所以我们需要添加 subtype 和 where 语句限制。
Pattern.begin("First Event") .subtype(TemperatureEvent.class) .where(evt -> evt.getTemperature() >= TEMPERATURE_THRESHOLD);
之前说:对于同一个机架,当看到两个连续的高温事件(超过阈值)就产生一个温度报警(TemperatureWarning),Pattern API 提供了 next 调用方法,来添加事件到模式定义中。next 添加的事件发生时间必须紧跟着第一个匹配事件之后,才能触发整个模式的匹配。
PatternwarningPattern = Pattern. begin("First Event") .subtype(TemperatureEvent.class) .where(evt -> evt.getTemperature() >= TEMPERATURE_THRESHOLD) .next("Second Event") .subtype(TemperatureEvent.class) .where(evt -> evt.getTemperature() >= TEMPERATURE_THRESHOLD) .within(Time.seconds(10));
最后模式的定义包含有一个 within 的 API 调用,用来定义两个连续 TemperatureEvents 必须在 10s 内发生才能匹配。时间基于 time characteristic 设置,可以是:处理时间、输入时间或者事件时间。(译者注 Event Time / Processing Time / Ingestion Time 解释)
定义好事件模型之后,可以将其应用到输入数据流中。
PatternStreamtempPatternStream = CEP.pattern( inputEventStream.keyBy("rackID"), warningPattern);
由于告警是针对单个机架的告警,必须使用 keyBy 通过 rackID 字段对输入事件流分流。即匹配出的事件都是同一个机架的。
PatternStream
public class TemperatureWarning { private int rackID; private double averageTemperature; ... } DataStreamwarnings = tempPatternStream.select( (Map pattern) -> { TemperatureEvent first = (TemperatureEvent) pattern.get("First Event"); TemperatureEvent second = (TemperatureEvent) pattern.get("Second Event"); return new TemperatureWarning( first.getRackID(), (first.getTemperature() + second.getTemperature()) / 2); } );
现在我们从原始监控事件流(monitoring event stream)生成了一个复杂事件流 DataStream
public class TemperatureAlert { private int rackID; ... }
首先定义报警事件
PatternalertPattern = Pattern. begin("First Event") .next("Second Event") .within(Time.seconds(20));
定义描述了在 20s 内有两个 TemperatureWarnings 事件,并且第一个事件名称为 "First Event",紧接着的第二个为 “Second Event”。这来了个事件都没有 where 语句,因为需要访问两个事件才能判断温度时候增长。因此,下面我们需要在 select 语句中使用 filter 条件来提取。这里我们只是生成了 PatternStream。
PatternStreamalertPatternStream = CEP.pattern( warnings.keyBy("rackID"), alertPattern);
同样,我们需要 keyBy 对输入的告警数据流针对同一个机架进行分流。然后使用 flatSelect 方法访问匹配的事件序列,当判断温度上升时生成 TemperatureAlert 告警。
DataStreamalerts = alertPatternStream.flatSelect( (Map pattern, Collector out) -> { TemperatureWarning first = pattern.get("First Event"); TemperatureWarning second = pattern.get("Second Event"); if (first.getAverageTemperature() < second.getAverageTemperature()) { out.collect(new TemperatureAlert(first.getRackID())); } });
DataStream
本文描述了使用 Flink CEP library 可以很容易处理事件流。我们通过数据中心的监控和报警案例,完成了服务器机架过热报警的小程序。
未来 Flink 社区会持续扩展 CEP library 的功能和表述能力。接下来的 road map 是支持类正则表达式的模式实现,包括 *, 上下限制和否定。此外,还计划允许 where 语句访问之前匹配的事件字段。这个特性可以让我们提前删除不需要的事件序列。
本内容为译者添加
官网:Apache Flink
概念:Event Time / Processing Time / Ingestion Time
案例:Apache Flink example CEP program to monitor data center temperatures
API 介绍:FlinkCEP - Complex event processing for Flink
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/35812.html
摘要:它的设计使得即使是大型团队也能以高度隔离的方式应对功能变更。获取数据数据变更性能,都是让人头痛的问题。通过维护组件与数据间的依赖在依赖的数据就绪前组件不会被渲染为开发者提供更加可预测的开发环境。这杜绝了隐式的数据依赖导致的潜在。 关于Relay与GraphQL的介绍 原文:Introducing Relay and GraphQL 视频地址(强烈建议观看):https://www.y...
摘要:在前面时序列数据库武斗大会之名录我们已经介绍了一些常见的,这里我们再对剩余的一些做些简单介绍。是一个多租户的时间序列和资源数据库。是基于的时序列数据库。 【编者按】刘斌,OneAPM后端研发工程师,拥有10多年编程经验,参与过大型金融、通信以及Android手机操作系的开发,熟悉Linux及后台开发技术。曾参与翻译过《第一本Docker书》、《GitHub入门与实践》、《Web应用安全...
阅读 1707·2023-04-26 02:30
阅读 1033·2021-11-10 11:36
阅读 1380·2021-10-08 10:14
阅读 3496·2021-09-28 09:35
阅读 1552·2021-08-23 09:47
阅读 2544·2019-08-30 15:56
阅读 1469·2019-08-30 15:44
阅读 1749·2019-08-30 13:59