资讯专栏INFORMATION COLUMN

结构化数据流-触发,监控

IT那活儿 / 1750人阅读
结构化数据流-触发,监控

点击上方“IT那活儿”公众号,关注后了解更多内容,不管IT什么活儿,干就完了!!!

01

流式查询的触发器的设置定义了流数据处理的时间,无论查询时作为微批处理还是作为连续查询执行。

02

微批触发类型

例子:

03

连续处理模式(Continuous processing)

Continuous processing连续处理模式是Spark 2.3中引入的一种新的实验性流式执行模式,可实现小于1毫秒的端到端延迟,并至少保证一次容错。与默认的微批处理引擎相比,该引擎可以实现一次保证,但最多可实现约100ms的延迟。

对于某些类型的查询(如下所述),您可以选择在不修改应用程序逻辑(即不更改数据帧/数据集操作)的情况下选择何种模式执行。

如果要在连续处理模式下执行查询,只需要定义一个连续触发器,并将检查点间隔作为参数。

1秒的检查点间隔意味着连续处理引擎将每秒记录查询的进度。生成的检查点采用与微批处理引擎兼容的格式,因此可以使用任何触发器重新启动任何查询。

例如,以微批处理模式启动的受支持查询可以在连续模式下重新启动,反之亦然。请注意,无论何时切换到连续模式,您都将获得至少一次容错保证。

自spark2.4,连续处理模式下仅支持以下查询类型:

  • 操作(operation):在连续处理模式下仅支持类似map类型操作,像select, map, flatMap, mapPartitions,where, filter等,除了聚合函数支持所有SQL函数。

  • 数据源(source):kafka source和rate source(用于测试)。

  • 接收器(sink):Kafka sink,.Memory sink,Console sink。

虽然控制台接收器适合测试,但最好使用Kafka作为源和接收器来观察端到端的低延迟处理,因为这允许引擎在输入主题中的输入数据可用的毫秒内处理数据并在输出主题中提供结果。

警告:

1)Continuous processing engine启动多个长时间运行的任务,这些任务不断地从源读取数据、处理数据并不断地向接收器写入数据。查询所需的任务数量取决于查询可以并行地从源中读取多少分区。

因此,在启动连续处理查询之前,您必须确保集群中有足够的内核来并行执行所有任务。

例如,如果您正在阅读一个有10个分区的Kafka主题,那么集群必须至少有10个核心才能使查询取得进展。

2)停止连续处理流可能会产生虚假的任务终止警告。可以放心地忽略这些。

3)当前没有自动重试失败的任务。任何故障都将导致查询停止,需要从检查点手动重新启动查询。

4)查询运行后,无法修改多个配置。要更改它们,请放弃检查点并启动新查询。这些配置包括:

  • spark.sql.shuffle.partitions

    这是由于状态的物理分区:状态通过对键应用哈希函数进行分区,因此状态的分区数应该保持不变。如果您希望为有状态操作运行更少的任务,那么coalesce将有助于避免不必要的重新分区。合并后,除非发生另一次洗牌,否则(减少的)任务数将保持不变。

  • spark.sql.streaming.stateStore.providerClass,要正确读取查询的上一个状态,状态存储提供程序的类应保持不变

  • spark.sql.streaming.multipleWatermarkPolicy

    当查询包含多个水印时,修改此项会导致水印值不一致,因此策略应保持不变。


04

流查询管理

调用start()方法后将生成StreamingQuery对象,可用于监视可管理流查询。

你可以再单个SparkSession中同时运行任意数量的查询,这些查询将同时运行并共享集群资源,您可以使用sparkSession.streams()获取StreamingQueryManager,其可以管理当前活动的查询。


05

流查询监控

有多种方法可以监控流查询,可以使用spark的Dropwizard Metrics支持,或者通过编程方式进入他们。

您可以使用streamingQuery.lastProgress()和streamingQuery.status()直接获取活动查询的当前状态和指标。

  • lastProgress()返回Scala和Java中的StreamingQueryProgress对象,以及Python中具有相同字段的字典。它包含关于流的最后一个触发器中所取得的进展的所有信息—处理了哪些数据、处理速率、延迟等。

  • streamingQuery.recentProgress,它返回最后几个进展的数组。

您还可以通过附加StreamingQueryListener(Scala/Java文档)异步监视与SparkSession关联的所有查询。使用sparkSession.streams.attachListener()附加自定义StreamingQueryListener对象后,当查询启动和停止以及在活动查询中取得进展时,将收到回调。

举个例子:

也可以使用Dropwizard报告查询指标,需要进行如下设置:

启用此配置后在SparkSession中启动的所有查询将通过Dropwizard向已配置的任何接收器(例如Ganglia、Graphite、JMX等)报告度量。

06

使用检查点从故障中恢复

如果出现故障或故意关闭,可以恢复以前查询的进度和状态,并在停止时继续。这是使用检查点和预写日志完成的。

您可以使用检查点位置配置查询,该查询将所有进度信息(即每个触发器中处理的偏移量范围)和正在运行的聚合(例如,快速示例中的字数)保存到检查点位置。此检查点位置必须是HDFS兼容文件系统中的路径。


END



本文作者:潘宗昊

本文来源:IT那活儿(上海新炬王翦团队)

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/129471.html

相关文章

  • 这么多监控组件,总有一款适合你

    摘要:典型实现不同的监控模块,侧重于不同领域,有着不同的职责。指标收集方面,支持多样化的组件将被优先下使用。以上谈了这么多,仅仅是聊了一下收集方面而已。 更多文章,请移步微信公众号《小姐姐味道》 mp原文 https://mp.weixin.qq.com/s?__...监控是分布式系统的必备组件,能够起到提前预警、问题排查、评估决策等功效,乃行走江湖、居家必备之良品。 监控系统概要 功能划分...

    simon_chen 评论0 收藏0
  • 这么多监控组件,总有一款适合你

    摘要:典型实现不同的监控模块,侧重于不同领域,有着不同的职责。指标收集方面,支持多样化的组件将被优先下使用。以上谈了这么多,仅仅是聊了一下收集方面而已。 更多文章,请移步微信公众号《小姐姐味道》 mp原文 https://mp.weixin.qq.com/s?__...监控是分布式系统的必备组件,能够起到提前预警、问题排查、评估决策等功效,乃行走江湖、居家必备之良品。 监控系统概要 功能划分...

    wpw 评论0 收藏0
  • 前端监控数据收集(请求拦截)

    摘要:的五种就绪状态请求未初始化还没有调用。请求已发送,正在处理中通常现在可以从响应中获取内容头。并且还提供了每个阶段的事件如果请求中止,会触发事件。网络错误如太多重定向会阻止请求完成,会触发事件。当等待服务器的响应时,对象会发生事件。 所谓web,即使你我素未谋面,便知志趣相投;足不出户,亦知世界之大。 01 — 为什么拦截请求 现在的web应用,大都是通过请求(http)去获取资源,拿到...

    IntMain 评论0 收藏0
  • 前端监控数据收集(请求拦截)

    摘要:的五种就绪状态请求未初始化还没有调用。请求已发送,正在处理中通常现在可以从响应中获取内容头。并且还提供了每个阶段的事件如果请求中止,会触发事件。网络错误如太多重定向会阻止请求完成,会触发事件。当等待服务器的响应时,对象会发生事件。 所谓web,即使你我素未谋面,便知志趣相投;足不出户,亦知世界之大。 01 — 为什么拦截请求 现在的web应用,大都是通过请求(http)去获取资源,拿到...

    娣辩孩 评论0 收藏0

发表评论

0条评论

IT那活儿

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<