资讯专栏INFORMATION COLUMN

[译] Introducing Complex Event Processing (CEP) wit

Yu_Huang / 3581人阅读

摘要:所有不相关的数据会立即丢弃,由于查询都是在一个无限的数据流中,这样的优势显而易见。基于这些监控事件数据流,我们想要检测出可能要过热的机架,从而调整负载和降温。

原文链接

正文

随着传感网络的普及,智能设备持续收集着越来越多的数据,分析近乎实时,不断增长的数据流是一个巨大的挑战。快速应对变化趋势、交付最新的 BI 应用会成为一个公司成败的关键因素。其中关键问题就是数据流的事件模型检测。

Complex event processing (CEP) 要处理的就是在持续事件中匹配模式的问题。匹配结果通常就是:从输入事件中提取的复杂事件。传统 DBMSs 在固定数据上执行查询,而 CEP 在存储的 query 上执行(译者注:某个范围)。所有不相关的数据会立即丢弃,由于 CEP 查询都是在一个无限的数据流中,这样的优势显而易见。更重要的是,输入实时被处理,系统一旦收到某一个序列的所有数据,结果就会被输出。CEP 因此有着非常高效的实时分析能力。

由此,CEP 的处理范式吸引了很多技术人员兴趣,有着广泛的应用场景。值得注意的是,CEP 现在用在了金融应用,例如:股票市场趋势、信用卡欺诈检测。还有基于 RFID 的追踪和监控,例如:库房小偷检测。CEP 还可以被用于基于用户可疑行为的网络入侵检测。

Apache Flink 有着天生的真正的流处理能力,具有低延迟、高吞吐量的特性,和 CEP 简直绝配。因此,Flink 社区在 Flink 1.0 引入了第一个版本的 CEP library。接下来我们会使用一个数据中心监控的案例介绍其使用。

假设这样一个场景:数据中心有很多机架,每一个机架都有功率和温度监控。监控设备会不断产生功率和温度事件。基于这些监控事件数据流,我们想要检测出可能要过热的机架,从而调整负载和降温。

针对这种场景,我们采取两阶段处理方法。首先,监控温度事件,当检测到连续两个超过阈值的温度事件,即生成一个当前平均温度的警告(warning),温度报警不一定意味着过热。但是如果看到两个连续的升温警告事件,则生成机架过热报警(alert)。此时,需要采取措施冷却机架。

首先,定义来源的监控事件流,每一个 message 都包含来源 rack ID(机架 ID)。温度事件包含当前温度,功率事件包含当前电压。我们把事件模型定义为 POJOs.

    public abstract class MonitoringEvent {
        private int rackID;
        ...
    }
    
    public class TemperatureEvent extends MonitoringEvent {
        private double temperature;
        ...
    }
    
    public class PowerEvent extends MonitoringEvent {
        private double voltage;
        ...
    }

我们可以使用 Flink 的 connector(比如:Kafka, RabbitMQ 等),生成 DataStream inputEventStream 给 Flink 的 CEP 算子提供输入。首先,我们需要定义检测温度警告的事件模式 (pattern),CEP library 提供了非常直观的 Pattern API 来定义复杂的模式。

每个模式都包含了一个可以定义过滤 (filter) 条件的事件序列。模式 (pattern) 的第一个事件通常都命名为"First Event"。

    Pattern.begin("First Event");

这句话会匹配每一个输入的监控事件(monitoring event),而我们只需要温度大于一定阈值的温度事件(TemperatureEvents),所以我们需要添加 subtype 和 where 语句限制。

    Pattern.begin("First Event")
        .subtype(TemperatureEvent.class)
        .where(evt -> evt.getTemperature() >= TEMPERATURE_THRESHOLD);

之前说:对于同一个机架,当看到两个连续的高温事件(超过阈值)就产生一个温度报警(TemperatureWarning),Pattern API 提供了 next 调用方法,来添加事件到模式定义中。next 添加的事件发生时间必须紧跟着第一个匹配事件之后,才能触发整个模式的匹配。

Pattern warningPattern = Pattern.begin("First Event")
    .subtype(TemperatureEvent.class)
    .where(evt -> evt.getTemperature() >= TEMPERATURE_THRESHOLD)
    .next("Second Event")
    .subtype(TemperatureEvent.class)
    .where(evt -> evt.getTemperature() >= TEMPERATURE_THRESHOLD)
    .within(Time.seconds(10));

最后模式的定义包含有一个 within 的 API 调用,用来定义两个连续 TemperatureEvents 必须在 10s 内发生才能匹配。时间基于 time characteristic 设置,可以是:处理时间、输入时间或者事件时间。(译者注 Event Time / Processing Time / Ingestion Time 解释)

定义好事件模型之后,可以将其应用到输入数据流中。

    PatternStream tempPatternStream = CEP.pattern(
        inputEventStream.keyBy("rackID"),
        warningPattern);

由于告警是针对单个机架的告警,必须使用 keyBy 通过 rackID 字段对输入事件流分流。即匹配出的事件都是同一个机架的。

PatternStream 可以访问匹配的事件序列。通过使用 select API 可以访问其上数据,给 select API 传入 PatternSelectFunction,PatternSelectFunction 会在每一个匹配上的事件序列上执行。事件序列通过 Map 访问,MonitoringEvent 通过之前分配的事件名称来定位。这里我们通过 select function 针对每一个匹配的模式产生一个 TemperatureWarning 事件。

    public class TemperatureWarning {
        private int rackID;
        private double averageTemperature;
        ...
    }
    
    DataStream warnings = tempPatternStream.select(
        (Map pattern) -> {
            TemperatureEvent first = (TemperatureEvent) pattern.get("First Event");
            TemperatureEvent second = (TemperatureEvent) pattern.get("Second Event");
    
            return new TemperatureWarning(
                first.getRackID(), 
                (first.getTemperature() + second.getTemperature()) / 2);
        }
    );

现在我们从原始监控事件流(monitoring event stream)生成了一个复杂事件流 DataStream 警告。这个复杂事件流可以再次被用作其他复杂事件处理的输入。当同一个机架产生两个连续升温警告时,我们使用 TemperatureWarnings 来生成 TemperatureAlerts。TemperatureAlerts 定义如下:

    public class TemperatureAlert {
        private int rackID;
        ...
    }

首先定义报警事件

    Pattern alertPattern = Pattern.begin("First Event")
        .next("Second Event")
        .within(Time.seconds(20));

定义描述了在 20s 内有两个 TemperatureWarnings 事件,并且第一个事件名称为 "First Event",紧接着的第二个为 “Second Event”。这来了个事件都没有 where 语句,因为需要访问两个事件才能判断温度时候增长。因此,下面我们需要在 select 语句中使用 filter 条件来提取。这里我们只是生成了 PatternStream。

    PatternStream alertPatternStream = CEP.pattern(
        warnings.keyBy("rackID"),
        alertPattern);

同样,我们需要 keyBy 对输入的告警数据流针对同一个机架进行分流。然后使用 flatSelect 方法访问匹配的事件序列,当判断温度上升时生成 TemperatureAlert 告警。

    DataStream alerts = alertPatternStream.flatSelect(
        (Map pattern, Collector out) -> {
            TemperatureWarning first = pattern.get("First Event");
            TemperatureWarning second = pattern.get("Second Event");
    
            if (first.getAverageTemperature() < second.getAverageTemperature()) {
                out.collect(new TemperatureAlert(first.getRackID()));
            }
        });

DataStream 告警是针对同一个机架的数据流,基于这个数据我们现在可以调整负载和降温。源代码地址(译者注:注意阅读 readme)

总结:

本文描述了使用 Flink CEP library 可以很容易处理事件流。我们通过数据中心的监控和报警案例,完成了服务器机架过热报警的小程序。
未来 Flink 社区会持续扩展 CEP library 的功能和表述能力。接下来的 road map 是支持类正则表达式的模式实现,包括 *, 上下限制和否定。此外,还计划允许 where 语句访问之前匹配的事件字段。这个特性可以让我们提前删除不需要的事件序列。

阅读材料:

本内容为译者添加

官网:Apache Flink

概念:Event Time / Processing Time / Ingestion Time

案例:Apache Flink example CEP program to monitor data center temperatures

API 介绍:FlinkCEP - Complex event processing for Flink

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/35812.html

相关文章

  • Flux再进化:Introducing Relay and GraphQL

    摘要:它的设计使得即使是大型团队也能以高度隔离的方式应对功能变更。获取数据数据变更性能,都是让人头痛的问题。通过维护组件与数据间的依赖在依赖的数据就绪前组件不会被渲染为开发者提供更加可预测的开发环境。这杜绝了隐式的数据依赖导致的潜在。 关于Relay与GraphQL的介绍 原文:Introducing Relay and GraphQL 视频地址(强烈建议观看):https://www.y...

    cncoder 评论0 收藏0
  • 时序列数据库武斗大会之TSDB名录 Part 2

    摘要:在前面时序列数据库武斗大会之名录我们已经介绍了一些常见的,这里我们再对剩余的一些做些简单介绍。是一个多租户的时间序列和资源数据库。是基于的时序列数据库。 【编者按】刘斌,OneAPM后端研发工程师,拥有10多年编程经验,参与过大型金融、通信以及Android手机操作系的开发,熟悉Linux及后台开发技术。曾参与翻译过《第一本Docker书》、《GitHub入门与实践》、《Web应用安全...

    luodongseu 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<