资讯专栏INFORMATION COLUMN

结构化数据流-foreachBath操作

IT那活儿 / 431人阅读
结构化数据流-foreachBath操作

点击上方“IT那活儿”公众号,关注后了解更多内容,不管IT什么活儿,干就完了!!!

01

小案例

应用程序中只有调用了start()才能真正开始执行查询,然后返回StreamingQuery对象,您可以使用该对象来管理查询。先举一些例子:

02

foreach和foreachBatch操作

foreach和foreachBatch操作允许您对流式查询的输出应用任意操作和写入逻辑。它们的用例稍有不同——foreach允许在每一行上自定义写入逻辑,而foreachBatch允许在每个微批的输出上执行任意操作和自定义逻辑。让我们更详细地了解它们的用法。

2.1 foreachBatch

foreachBach允许你对流式查询的每个微批的输出数据指定执行的函数,自spark2.4,scala,java,python都支持这一点,他有两个参数,微批数据集和微批的唯一ID。

使用foreachBatch可以做以下工作:

  • 重用批处理数据源-对于许多存储系统,可能还没有可用的流式接收器,但可能已经存在批处理查询的接收器,所以使用foreachBatch可以再每个微批次输出中使用批接收器。

  • 写入多个存储器:可能你需要将流式查询的输出写入到多个位置,则只需多次写入即可,但是这样可能导致重新计算(包括可能重新读取数据)。为了避免重新计算,应该缓存数据集,将其写入多个存储器后再取消缓存,例如:

批处理中的许多操作在流式查询中不支持,因为spark不支持在这些情况下生成增量计划,使用foreachBatch可以将流式查询转换成了一个个微批来处理,但是你必须自己考虑执行这些操作端到端语义。

注意:

  • 默认情况下foreachBatch只提供至少一次写入保证,但是你可以使用batchID作为消除重复的方法,并获得一次写入保证。

  • foreachBatch不适用于连续处理模式,因为他基本上依赖于微批处理,如果以连续模式写入数据,可以使用foreach。

2.2 foreach

如果不存在相应的批处理数据接收器,或者不存在连续处理模式,则可以使用foreach来自定义编写器逻辑,你可以将数据写入逻辑分为三个方法,open,process和close。

自spark2.4 scala,java,python都支持这一点。

流查询启动后,spark按以下方式调用函数或对象的方法:

  • 此对象的一个副本负责查询中单个任务生成的所有数据,也就是说,一个实例负责处理以分布式方式生成的数据的一个分区。

  • 此对象必须是可序列化的,因为每个任务将获取此对象的副本,需要进行反序列化,强烈建议任何初始化一定在调用open()方法之后完成,意味着已经准备好了数据。

  • 这些方法的生命周期如下:对于每个分区(包含partition_id),每个微批(包含epoch_id)。

open(partitionId, epochId)方法被调用,如果open()方法返回true,则对于分区和微批中的每一行将调用process(),然后调用close(error),在处理时抛出错误(如果有)。

  • close()方法被调用(如果有)如果open()方法被调用并返回成功(不管返回true还是false),除非JVM或python进程崩溃。

  • spark不保证输出相同,因此无法使用(partitionId, epochId)实现重复数据消除,如果需要对输出执行重复数据消除,请尝试使用foreachBatch。


03

DataStreamReader和DataStreamWriter操作

自spark3.1,你也可以使用DataStreamReader.table()读取表数据作为streaming DataFrame,使用DataStreamWriter.toTable()写入为表。


END



本文作者:潘宗昊

本文来源:IT那活儿(上海新炬王翦团队)

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/129473.html

相关文章

  • 智能合约的一种设计结构

    摘要:所以,在设计时,要了解区块链相关知识,这些是出于安全考虑。通过一个智能合约能够管理所有模块,这个是不变的,相当于一个不变的点,用来链接各个模块,保证稳定,相当于在区块链上一直会有一个稳定的地址长期进行服务。 智能合约的设计和传统的应用设计有点不同。传统应用一般为了快速迭代是在产品之后考虑安全,但是 DApp 则需要在产品出来之前就考虑安全问题,它将会关系到账户资产、用户数据等问题,而且...

    evin2016 评论0 收藏0
  • 算法学习之数据结构线性表、堆、栈

    摘要:栈底是固定的,而栈顶浮动的如果栈中元素个数为零则被称为空栈。入栈将数据保存到栈顶。链栈链栈是指栈的链式存储结构,是没有附加头节点的运算受限的单链表,栈顶指针是链表的头指针。 一、喜欢单挑线性表 1.线性表的特性 线性表是一个线性结构,它是一个含有n≥0个节点的有限序列。在节点中,有且仅有一个开始节点没有前驱并有一个后继节点,有且仅有一个终端节点没有后继并有一个前驱节点。其他的节点都有且...

    huaixiaoz 评论0 收藏0
  • 沪江前端由H5页面引起的一场前端数据结构讨论

    摘要:发送请求,处理数据。在上面这个场景中,这类数据的结构可能是最常碰到的。整个过程可以简化成数据的变化引起视图的变化,和现在很多前端框架数据驱动思想有几分相似。至此一个对于页面的抽象出来的数据结构雏形基本完成了。 作者:周周(沪江资深Web前端开发工程师)本文为原创文章,转载请注明作者及出处 前言 近期在小D十周年活动之际,又看到了一个自家H5专题梦工厂生成的页面。 showImg(htt...

    xialong 评论0 收藏0

发表评论

0条评论

IT那活儿

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<