spark Dstreams-程序部署
点击上方“IT那活儿”,关注后了解更多内容,不管IT什么活儿,干就完了!!!
为了运行一个spark streaming应用程序,需要满足以下条件 :
1.1 使用集群管理器管理集群:
这是基本的要求。
1.2 打成jar包:
你必须将你的应用程序编译成jar包,使用spark-submit启动程序,然而如果你的程序使用的是高级数据源(例如kafka),你必须将kafka依赖打进jar包。
1.3 为执行节点配置足够的内存:
因为接收到的数据必须保存在内存,所以执行节点必须有足够的内存来存储数据,如果要执行10分钟的窗口操作,系统必须在内存中保留至少10分钟的数据,因此应用程序的内存需求取决于其中使用的操作。
1.4 配置检查点:
如果流应用程序需要,则必须将Hadoop API兼容容错存储(例如HDFS、S3等)中的目录配置为检查点目录,并且流应用程序的写入方式应确保检查点信息可用于故障恢复。
1.5 配置应用驱动程序的的自动重启:
为了从驱动程序故障中自动修复,用于运行流应用程序的部署基础结构必须监视驱动程序进程,并在驱动程序失败时重新启动驱动程序。不同的集群管理器有不同的工具来实现这一点。
spark standalone:可以提交spark程序以以spark standalone方式运行,也就是说应用程序在一个节点运行,而且可以指示standalone集群管理器监督驱动程序,如果驱动程序由于非零退出代码或运行驱动程序的节点故障而失败,则重新启动它。
YARN:YARN支持自动重启应用程序的类似机制。
Mesos:Marathon已经被用来实现这一目标。
1.6 配置预写日志(write-ahead logs):
自spark1.2,我们已经引入了预写日志以实现强大的容错保证,如果启用它,所有从receiver接收到的数据都会写入配置检查点目录中的预写日志。
这可以防止驱动程序恢复时的数据丢失,从而确保零数据丢失,可以通过设置spark.streaming.receiver.writeAheadLog.enable=true来启用它,然而这可能以单个接收器的接收吞吐为代价,但是这可以通过并行运行更多接收器来弥补。
此外,建议在启用预写日志时禁用spark内接收数据的复制,因为该日志已存储在已复制的存储系统中,这可以通过设置存储级别为StorageLevel.MEMORY_AND_DISK_SER来实现,使用S3(或任何不支持刷新的文件系统)进行预写日志时,请记住启用:
spark.streaming.driver.writeAheadLog.closeFileAfterWrite
和
spark.streaming.receiver.writeAheadLog.closeFileAfterWrite。
1.7 设置最大接收速率:
如果集群资源不够大,spark streaming应用程序无法以接收数据的速度处理数据,则可以通过设置记录的最大速率限制来限制接收器的速率,请参阅:
在Spark 1.5中,我们引入了一种称为背压的功能,它消除了设置此速率限制的需要,因为Spark Streaming会自动计算速率限制,并在处理条件发生变化时动态调整速率限制。可通过将配置参数spark.streaming.backpressure.enabled设置为true来启用此背压。
如果你需要升级spark streaming应用程序代码,有两种可能的机制。2.1 升级后的Spark Streaming应用程序将启动,并与现有应用程序并行运行。一旦新的(接收到与旧的相同的数据)被预热并准备好进入黄金时段,旧的就可以被取下。2.2 现有应用程序正常关闭(有关正常关闭选项,请参阅StreamingContext.stop(…)或JavaStreamingContext.stop(…),以确保在关闭之前完全处理已接收的数据。然后可以启动升级后的应用程序,该应用程序将从早期应用程序停止的同一点开始处理。请注意,这只能通过支持源端缓冲(如Kafka)的输入源来实现,因为在上一个应用程序关闭且升级的应用程序尚未启动时,需要缓冲数据。无法从升级前代码的早期检查点信息重新启动。检查点信息实质上包含序列化的Scala/Java/Python对象,尝试使用新的、修改过的类反序列化对象可能会导致错误。在这种情况下,使用不同的检查点目录启动升级的应用程序,或者删除以前的检查点目录。
除了Spark的监控功能外,还有Spark streaming特有的其他功能。使用StreamingContext时,Spark web UI会显示一个附加的流选项卡,其中显示有关正在运行的接收器(接收器是否处于活动状态、接收到的记录数、接收器错误等)和已完成批次(批处理时间、排队延迟等)的统计信息。这可用于监视流应用程序的进度。- processing time:处理每个批次花费的时间
- Scheduling Delay:批在队列里等待前一批处理完成的时间
如果批次处理时间始终大于批次间隔和/或排队延迟持续增加,则表明系统无法以生成批次的速度处理批次,并且正在落后。在这种情况下,考虑减少批处理时间。还可以使用StreamingListener接口监控Spark streaming程序的进度,该接口允许您获取接收器状态和处理时间。请注意,这是一个开发人员API,将来可能会对其进行改进(即报告更多信息)。
要从集群上的Spark流媒体应用程序中获得最佳性能,需要进行一些调整。本节介绍了一些可以调整以提高应用程序性能的参数和配置。在高层次上,你需要考虑两件事:- 设置正确的批大小,以便可以在接收数据时尽快处理数据批(即,数据处理与数据摄取保持同步)。
减少批处理时间可以在Spark中进行许多优化,以最大限度地缩短每个批次的处理时间。通过网络接收数据(如kafka,socket等)需要将数据反序列化并存储到spark中,如果数据接收成为系统中的瓶颈,那么考虑数据接收的并行化。请注意,每个输入数据流都会创建一个接收单个数据流的接收器(在工作机器上运行)。因此,通过创建多个输入数据流并将其配置为从源接收数据流的不同分区,可以实现接收多个数据流。例如,接收两个主题数据的单个kafka输入数据流可以分成两个kafka输入流,每个kafka输入流只接收一个主题。这将运行两个接收器,允许并行接收数据,从而提高了总体吞吐量。这些多个数据流可以联合在一起以创建单个数据流。然后,应用于单个输入数据流的转换可以应用于统一流。可以这样做:应考虑的另一个参数是接收器的块间隔,它由配置参数spark.streaming.blockInterval确定。对于大多数接收器,接收到的数据在存储到Spark的内存中之前会合并成数据块。每个批处理中的块数决定了在类似映射的转换中用于处理接收数据的任务数。每批每个接收器的任务数大约为(批间隔/块间隔)。例如,200 ms的块间隔将每2秒批创建10个任务。如果任务数量太少(即,少于每台机器的核心数量),那么它将是低效的,因为所有可用的核心都不会用于处理数据。要增加给定批处理间隔的任务数,请减少块间隔。但是,建议的最小块间隔值约为50 ms,低于该值,任务启动开销可能会出现问题。使用多个输入流/接收器接收数据的另一种方法是显式地重新划分输入数据流(使用inputStream.repartition())。这将在进一步处理之前在群集中指定数量的计算机上分发接收到的数据批。如果在计算的任何阶段中使用的并行任务的数量不够多,那么集群资源可能会利用不足。例如,对于reduceByKey和ReduceByAndWindow等分布式reduce操作,并行任务的默认数量由spark.default.parallelism配置属性控制。您可以将并行级别作为参数传递(请参阅PairDStreamFunctions文档),或者设置spark.default.parallelism配置属性以更改默认值。通过调整序列化格式,可以减少数据序列化的开销。在流式传输的情况下,有两种类型的数据可以被序列化。- 默认情况下,通过接收器接收的输入数据存储在具有StorageLevel.memory_DISK_SER_2的执行器内存中。也就是说,数据序列化为字节以减少GC开销,并复制以容忍执行器故障。此外,数据首先保存在内存中,只有当内存不足以保存流计算所需的所有输入数据时,才会溢出到磁盘。这种序列化显然有开销——接收器必须对接收到的数据进行反序列化,并使用Spark的序列化格式对其重新序列化。
- 流式计算生成的RDD可以保存在内存中。例如,窗口操作将数据保存在内存中,因为它们将被多次处理。但是,与Spark Core默认的StorageLevel.MEMORY_ONLY不同,流式计算生成的持久化RDD默认使用StorageLevel.MEMORY_ONLY_DISK(即序列化)持久化,以最小化GC开销。
在这两种情况下,使用Kryo序列化可以减少CPU和内存开销。在流应用程序需要保留的数据量不大的特定情况下,可以将数据(两种类型)作为反序列化对象持久化,而不会产生过多的GC开销。例如,如果使用几秒钟的批处理间隔且没有窗口操作,则可以通过显式设置相应的存储级别来尝试禁用持久化数据中的序列化。这将减少由于序列化而产生的CPU开销,从而有可能在没有太多GC开销的情况下提高性能。如果每秒启动的任务数很高(例如,每秒50个或更多),则向执行者发送任务的开销可能很大,并且将很难实现亚秒延迟。执行模式:在standalone模式或粗粒度Mesos模式下运行Spark会导致比细粒度Mesos模式更好的任务启动时间。这些更改可能会将批处理时间减少100毫秒,从而允许使用次秒级的批处理大小。为了使在集群上运行的Spark Streaming应用程序保持稳定,系统应该能够以接收数据的速度处理数据。换句话说,批处理速度应该与接收数据速度一样快,可以通过web UI监控批数据处理时间,其应该小于批处理间隔。根据流计算的性质,所使用的批处理间隔可能会对应用程序在一组固定的群集资源上可以维持的数据速率产生重大影响。例如,让我们考虑较早的WorddCurnNead示例。对于特定的数据速率,系统可能能够每2秒(即2秒的批处理间隔)跟踪报告字数,但不是每500毫秒一次。因此,需要设置批次间隔,以便能够维持生产中的预期数据速率。为应用程序确定正确的批处理大小的一个好方法是使用保守的批处理间隔(例如,5-10秒)和低数据速率对其进行测试。为了验证系统是否能够跟上数据速率,您可以检查每个处理批次所经历的端到端延迟值(在Spark driver log4j日志中查找“总延迟”,或使用StreamingListener接口)。如果延迟保持与批量大小相当,则系统是稳定的。否则,如果延迟持续增加,则意味着系统无法跟上,因此不稳定。一旦你有了一个稳定配置的想法,你可以尝试增加数据速率和/或减少批量大小。请注意,只要延迟降低回较低值(即,小于批量大小),由于临时数据速率增加而导致的延迟瞬时增加就可以了。我们将专门讨论Spark流应用程序上下文中的一些调优参数。Spark流应用程序所需的集群内存量在很大程度上取决于所使用的转换类型。例如,如果要对最后10分钟的数据使用窗口操作,那么集群应该有足够的内存保留10分钟的数据。或者,如果您想使用带有大量键的updateStateByKey,那么所需的内存将很高。相反,如果要执行简单的映射过滤器存储操作,则所需内存将较低。通常,由于通过接收器接收的数据存储在StorageLevel.MEMORY_AND_DISK_SER_2中,因此超过内存的数据将溢出到磁盘。这可能会降低streaming应用程序的性能。因此建议根据streaming应用程序的要求提供足够的内存。最好尝试在小范围内查看内存使用情况,并进行相应的估计。内存调优的另一个方面是垃圾收集。对于需要低延迟的流应用程序,不希望JVM垃圾收集导致暂停。- imput data数据和RDD在默认情况下作为序列化字节持久化。与反序列化持久化相比,这减少了内存使用和GC开销。启用Kryo序列化进一步减少了序列化大小和内存使用。
- 默认情况下,由数据流转换生成的所有输入数据和持久化RDD将自动清除。Spark Streaming根据所使用的转换决定何时清除数据。例如,如果使用10分钟的窗口操作,则Spark Streaming将保留最后10分钟的数据,并主动丢弃较旧的数据。通过设置streamingContext.remember,数据可以保留更长的时间。
- 强烈建议使用并发标记和扫描GC,以保持GC相关暂停始终较低。尽管已知并发GC会降低系统的总体处理吞吐量,但仍建议使用它来实现更一致的批处理时间。确保在驱动程序(使用spark submit中的--driver java选项)和执行器(使用spark配置spark.executor.extraJavaOptions)上设置CMS GC。
接下来讨论在spark streaming应用程序中发生故障时行为。为了理解spark streaming的容错语义,让我们先看下RDD的基本容错语义。- RDD是一个不可变的、确定的和可重新计算的分布式数据集,每个RDD都会记住创建数据集的依赖。
- 如果RDD的任何分区由于工作节点故障而丢失,则可以使用操作依赖关系从原始容错数据集重新计算该分区。
- 假设所有的RDD转换都是确定性的,那么不管Spark集群中是否出现故障,最终转换的RDD中的数据都将始终相同。
Spark对HDFS或S3等容错文件系统中的数据进行操作。因此,从容错数据生成的所有RDD也是容错的。但是,Spark streaming的情况并非如此,因为在大多数情况下,数据是通过网络接收的(使用fileStream时除外)。为了为所有生成的RDD实现相同的容错属性,在群集中工作节点的多个Spark执行器之间复制接收到的数据(默认复制系数为2)。- 接收和复制的数据—此数据在单个工作节点发生故障时仍然有效,因为它的副本存在于另一个节点上。
- 已接收但已缓冲用于复制的数据—由于未复制此数据,因此恢复此数据的唯一方法是再次从源获取它。
- 运行executors的任何工作节点都可能失败,并且这些节点上的所有内存中数据都将丢失。如果任何接收器在发生故障的节点上运行,则其缓冲数据将丢失。
- 如果运行Spark Streaming应用程序的驱动程序节点出现故障,则SparkContext显然会丢失,所有executor及其内存中的数据也会丢失。
在所有可能的操作条件下(尽管出现故障等),系统可以提供三种类型的保证:- 至少一次:每条记录将被处理一次或多次。这比最多一次强,因为它确保不会丢失任何数据。但也可能有重复。
- 只有一次:每条记录将精确处理一次-不会丢失任何数据,也不会多次处理任何数据。这显然是三者中最有力的保证。
在任何流处理系统中,广义地说,处理数据有三个步骤。- 输出数据:最终转换的数据输出到外部系统,如文件系统、数据库、仪表板等。
如果流应用程序必须实现端到端的精确一次保证,那么每个步骤都必须提供精确一次的保证。也就是说,每个记录必须准确接收一次,准确转换一次,并准确推送到下游系统一次。让我们在Spark流的上下文中理解这些步骤的语义。- 接收数据:不同的输入源提供不同的保证,文章后面将详细讨论。
- 转换数据:由于RDD提供的保证,所有接收到的数据都将被精确地处理一次。即使出现故障,只要接收到的输入数据是可访问的,最终转换的RDD将始终具有相同的内容。
- 推出数据:默认情况下,输出操作至少确保一次语义,因为它取决于输出操作的类型(幂等式或非幂等式)和下游系统的语义(是否支持事务)。但用户可以实现自己的事务机制,以实现精确的一次性语义。文章后面将详细讨论这一点。
不同的输入源提供不同的保证,从至少一次到恰好一次不等。如果所有输入数据都已存在于HDFS等容错文件系统中,Spark Streaming始终可以从任何故障中恢复并处理所有数据。这给出了精确的一次语义,这意味着所有数据将被精确地处理一次,而不管什么失败。对于基于接收器的输入源,容错语义取决于故障场景和接收器类型。如前所述,有两种类型的接收器:- 可靠接收器-这些接收器仅在确保已复制接收到的数据后才确认可靠来源。如果这样的接收器出现故障,源将不会收到缓冲(未复制)数据的确认。因此,如果接收器重新启动,源将重新发送数据,并且不会因故障而丢失任何数据。
- 不可靠的接收器-此类接收器不发送确认,因此在由于工作人员或驱动程序故障而失败时可能会丢失数据。
如果工作节点发生故障,则可靠的接收器不会丢失数据。对于不可靠的接收器,接收到但未复制的数据可能会丢失。如果驱动程序节点发生故障,那么除了这些丢失之外,在内存中接收和复制的所有过去的数据都将丢失。这将影响有状态转换的结果。为了避免丢失过去接收到的数据,Spark 1.2引入了预写日志,将接收到的数据保存到容错存储器中。通过启用预写日志和可靠的接收器,可以实现零数据丢失。在语义方面,它提供了至少一次的保证。在Spark 1.3中,我们引入了一个新的Kafka Direct API,它可以确保Spark Streaming只接收一次所有Kafka数据。此外,如果实现一次输出操作,则可以实现端到端的一次输出保证。输出操作(比如foreachRDD)至少有一次语义,也就是说,如果工作程序发生故障,转换后的数据可能会多次写入外部实体。虽然这对于使用saveAs***Files操作保存到文件系统是可以接受的(因为文件将被相同的数据覆盖),但是可能需要额外的努力来实现一次语义。- 幂等更新:多次尝试总是写入相同的数据。例如,saveAs***文件总是将相同的数据写入生成的文件。
- 事务性更新:所有更新都是以事务方式进行的,因此更新只以原子方式进行一次。实现这一点的一种方法是:
使用批处理时间(在foreachRDD中可用)和RDD的分区索引来创建标识符。此标识符唯一标识流应用程序中的数据。使用标识符以事务方式(即原子方式)更新外部系统。也就是说,如果标识符尚未提交,则以原子方式提交分区数据和标识符。否则,如果已提交,则跳过更新。
为了实现读取并行性,需要创建多个接收器,即多个数据流。接收器在执行器中运行。它占据一个核心。确保在预订接收器插槽后有足够的内核进行处理,即spark.cores.max应考虑接收器插槽。接收器以循环方式分配给执行者。每个块间隔生成一个新的数据块。在N个块间隔内创建N个数据块。这些块由当前executor的块管理器分配给其他executor的块管理器。之后,驱动程序上运行的网络输入跟踪器将被告知块位置,以便进一步处理。6.3 在驱动程序(driver)上为批处理间隔期间创建的块创建RDD。批处理间隔期间生成的块是分区RDD。每个分区都是spark中的一项任务。块间隔(blockInterval)==批处理间隔(batchinterval)意味着创建了单个分区,并且可能在本地对其进行处理。6.4 块上的map任务在具有块的executor(一个接收块,另一个复制块)中处理,而不考虑块间隔,除非非本地调度开始。拥有更大的区块间隔意味着更大的区块。较高的spark.locality.wait值会增加在本地节点上处理块的机会。需要在这两个参数之间找到平衡,以确保在本地处理较大的块。6.5 您可以通过调用inputDstream.repartition(n)来定义分区的数量,而不是依赖于batchInterval和blockInterval。这将随机重新排列RDD中的数据,以创建n个分区。是的,为了更大的并行性。虽然是以洗牌为代价的。RDD的处理由驱动程序的jobscheduler作为作业进行调度。在给定的时间点,只有一个作业处于活动状态。因此,如果一个作业正在执行,其他作业将排队。6.6 如果有两个数据流,将形成两个RDD,并将创建两个作业,这两个作业将一个接一个地安排。为了避免这种情况,可以合并两个数据流。这将确保为数据流的两个RDD形成一个unionRDD。然后将此unionRDD视为单个作业。但是,RDD的分区不受影响。6.7 如果批处理时间超过batchinterval,那么很明显,接收器的内存将开始填满,并最终引发异常(很可能是BlockNotFoundException)。目前,无法暂停接收器。使用SparkConf配置spark.streaming.receiver.maxRate,可以限制接收器的速率。
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/129642.html