摘要:输入和接收器输入代表从某种流式数据源流入的数据流。文件数据流可以从任何兼容包括等的文件系统,创建方式如下将监视该目录,并处理该目录下任何新建的文件目前还不支持嵌套目录。会被一个个依次推入队列,而则会依次以数据流形式处理这些的数据。
特点:
Spark Streaming能够实现对实时数据流的流式处理,并具有很好的可扩展性、高吞吐量和容错性。
Spark Streaming支持从多种数据源提取数据,如:Kafka、Flume、Twitter、ZeroMQ、Kinesis以及TCP套接字,并且可以提供一些高级API来表达复杂的处理算法,如:map、reduce、join和window等。
Spark Streaming支持将处理完的数据推送到文件系统、数据库或者实时仪表盘中展示。
可以将Spark的机器学习(machine learning) 和 图计算(graph processing)的算法应用于Spark Streaming的数据流当中。
下图展示了Spark Streaming的内部工作原理。Spark Streaming从实时数据流接入数据,再将其划分为一个个小批量供后续Spark engine处理,所以实际上,Spark Streaming是按一个个小批量来处理数据流的。
Spark Streaming为这种持续的数据流提供了的一个高级抽象,即:discretized stream(离散数据流)或者叫DStream。DStream既可以从输入数据源创建得来,如:Kafka、Flume或者Kinesis,也可以从其他DStream经一些算子操作得到。其实在内部,一个DStream就是包含了一系列RDDs。
入门实例分析SparkConf conf = new SparkConf().setAppName("stream1").setMaster("local[2]"); JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(1)); JavaReceiverInputDStreamlines = jsc.socketTextStream("localhost", 9999); JavaPairDStream pairs= lines.flatMap((str)->Arrays.asList(str.split(" ")).iterator()) .mapToPair((str)->new Tuple2 (str,1L)); JavaPairDStream res=pairs.reduceByKey((v1,v2)->v1+v2); res.print(); jsc.start(); try { jsc.awaitTermination(); } catch (InterruptedException e) { e.printStackTrace(); }
StreamingContext 是Spark Streaming的入口。并将批次间隔设为1秒。
利用这个上下文对象(StreamingContext),我们可以创建一个DStream,该DStream代表从前面的TCP数据源流入的数据流,同时TCP数据源是由主机名(如:hostnam)和端口(如:9999)来描述的。
这里的 lines 就是从数据server接收到的数据流。其中每一条记录都是一行文本。接下来,我们就需要把这些文本行按空格分割成单词。
flatMap 是一种 “一到多”(one-to-many)的映射算子,它可以将源DStream中每一条记录映射成多条记录,从而产生一个新的DStream对象。在本例中,lines中的每一行都会被flatMap映射为多个单词,从而生成新的words DStream对象。然后,我们就能对这些单词进行计数了。
words这个DStream对象经过map算子(一到一的映射)转换为一个包含(word, 1)键值对的DStream对象pairs,再对pairs使用reduce算子,得到每个批次中各个单词的出现频率。
注意,执行以上代码后,Spark Streaming只是将计算逻辑设置好,此时并未真正的开始处理数据。要启动之前的处理逻辑,我们还需要如下调用:
ssc.start() // 启动流式计算 ssc.awaitTermination() // 等待直到计算终止
首先,你需要运行netcat(Unix-like系统都会有这个小工具),将其作为data server
$ nc -lk 9999
然后,执行程序. 现在你尝试可以在运行netcat的终端里敲几个单词,你会发现这些单词以及相应的计数会出现在启动Spark Streaming例子的终端屏幕上。
注意,StreamingContext在内部会创建一个 SparkContext 对象(SparkContext是所有Spark应用的入口,在StreamingContext对象中可以这样访问:ssc.sparkContext)。
StreamingContext还有另一个构造参数,即:批次间隔,这个值的大小需要根据应用的具体需求和可用的集群资源来确定。
需要关注的重点:
一旦streamingContext启动,就不能再对其计算逻辑进行添加或修改。
一旦streamingContext被stop掉,就不能restart。
单个JVM虚机同一时间只能包含一个active的StreamingContext。
StreamingContext.stop() 也会把关联的SparkContext对象stop掉,如果不想把SparkContext对象也stop掉,可以将StreamingContext.stop的可选参数 stopSparkContext 设为false。
一个SparkContext对象可以和多个StreamingContext对象关联,只要先对前一个StreamingContext.stop(sparkContext=false),然后再创建新的StreamingContext对象即可。
离散数据流 (DStreams)离散数据流(DStream)是Spark Streaming最基本的抽象。它代表了一种连续的数据流,要么从某种数据源提取数据,要么从其他数据流映射转换而来。DStream内部是由一系列连续的RDD组成的,每个RDD都包含了特定时间间隔内的一批数据,如下图所示:
任何作用于DStream的算子,其实都会被转化为对其内部RDD的操作。底层的RDD转换仍然是由Spark引擎来计算。DStream的算子将这些细节隐藏了起来,并为开发者提供了更为方便的高级API。
输入DStream和接收器输入DStream代表从某种流式数据源流入的数据流。在之前的例子里,lines 对象就是输入DStream,它代表从netcat server收到的数据流。每个输入DStream(除文件数据流外)都和一个接收器(Receiver)相关联,而接收器则是专门从数据源拉取数据到内存中的对象。
Spark Streaming主要提供两种内建的流式数据源:
基础数据源(Basic sources): 在StreamingContext API 中可直接使用的源,如:文件系统,套接字连接或者Akka actor。
高级数据源(Advanced sources): 需要依赖额外工具类的源,如:Kafka、Flume、Kinesis、Twitter等数据源。这些数据源都需要增加额外的依赖,详见依赖链接(linking)这一节。
注意,如果你需要同时从多个数据源拉取数据,那么你就需要创建多个DStream对象。多个DStream对象其实也就同时创建了多个数据流接收器。但是请注意,Spark的worker/executor 都是长期运行的,因此它们都会各自占用一个分配给Spark Streaming应用的CPU。
因此,本地运行时,一定要将master设为”local[n]”,其中 n > 接收器的个数。
将Spark Streaming应用置于集群中运行时,同样,分配给该应用的CPU core数必须大于接收器的总数。否则,该应用就只会接收数据,而不会处理数据。
使用ssc.socketTextStream(…) 可以从一个TCP连接中接收文本数据。而除了TCP套接字外,StreamingContext API 还支持从文件或者Akka actor中拉取数据。
文件数据流(File Streams): 可以从任何兼容HDFS API(包括:HDFS、S3、NFS等)的文件系统,创建方式如下:
streamingContext.fileStream(dataDirectory);
Spark Streaming将监视该dataDirectory目录,并处理该目录下任何新建的文件(目前还不支持嵌套目录)。注意:
各个文件数据格式必须一致。
dataDirectory中的文件必须通过moving或者renaming来创建。
一旦文件move进dataDirectory之后,就不能再改动。所以如果这个文件后续还有写入,这些新写入的数据不会被读取。
对于简单的文本文件,更简单的方式是调用 streamingContext.textFileStream(dataDirectory)。
另外,文件数据流不是基于接收器的,所以不需要为其多带带分配一个CPU core。
RDD队列数据流(Queue of RDDs as a Stream): 如果需要测试Spark Streaming应用,你可以创建一个基于一批RDD的DStream对象,只需调用 streamingContext.queueStream(queueOfRDDs)。RDD会被一个个依次推入队列,而DStream则会依次以数据流形式处理这些RDD的数据。
自定义数据源
输入DStream也可以用自定义的方式创建。你需要做的只是实现一个自定义的接收器(receiver),以便从自定义的数据源接收数据,然后将数据推入Spark中。 见:http://spark.apache.org/docs/...
接收器可靠性
从可靠性角度来划分,大致有两种数据源。其中,像Kafka、Flume这样的数据源,它们支持对所传输的数据进行确认。系统收到这类可靠数据源过来的数据,然后发出确认信息,这样就能够确保任何失败情况下,都不会丢数据。因此我们可以将接收器也相应地分为两类:
可靠接收器(Reliable Receiver) – 可靠接收器会在成功接收并保存好Spark数据副本后,向可靠数据源发送确认信息。
不可靠接收器(Unreliable Receiver) – 不可靠接收器不会发送任何确认信息。
DStream支持的transformation算子和RDD类似,DStream也支持从输入DStream经过各种transformation算子映射成新的DStream。
map(func) 返回会一个新的DStream,并将源DStream中每个元素通过func映射为新的元素
flatMap(func) 和map类似,不过每个输入元素不再是映射为一个输出,而是映射为0到多个输出
filter(func) 返回一个新的DStream,并包含源DStream中被func选中(func返回true)的元素
repartition(numPartitions) 更改DStream的并行度(增加或减少分区数)
union(otherStream) 返回新的DStream,包含源DStream和otherDStream元素的并集
count() 返回一个包含单元素RDDs的DStream,其中每个元素是源DStream中各个RDD中的元素个数
reduce(func) 返回一个包含单元素RDDs的DStream,其中每个元素是通过源RDD中各个RDD的元素经func(func输入两个参数并返回一个同类型结果数据)聚合得到的结果。func必须满足结合律,以便支持并行计算。
countByValue() 如果源DStream包含的元素类型为K,那么该算子返回新的DStream包含元素为(K, Long)键值对,其中K为源DStream各个元素,而Long为该元素出现的次数。
reduceByKey(func, [numTasks]) 如果源DStream 包含的元素为 (K, V) 键值对,则该算子返回一个新的也包含(K, V)键值对的DStream,其中V是由func聚合得到的。注意:默认情况下,该算子使用Spark的默认并发任务数(本地模式为2,集群模式下由spark.default.parallelism 决定)。你可以通过可选参数numTasks来指定并发任务个数。
join(otherStream, [numTasks]) 如果源DStream包含元素为(K, V),同时otherDStream包含元素为(K, W)键值对,则该算子返回一个新的DStream,其中源DStream和otherDStream中每个K都对应一个 (K, (V, W))键值对元素。
cogroup(otherStream, [numTasks]) 如果源DStream包含元素为(K, V),同时otherDStream包含元素为(K, W)键值对,则该算子返回一个新的DStream,其中每个元素类型为包含(K, Seq[V], Seq[W])的tuple。
transform(func) 返回一个新的DStream,其包含的RDD为源RDD经过func操作后得到的结果。利用该算子可以对DStream施加任意的操作。
updateStateByKey(func) 返回一个包含新”状态”的DStream。源DStream中每个key及其对应的values会作为func的输入,而func可以用于对每个key的“状态”数据作任意的更新操作。
updateStateByKey算子updateStateByKey 算子支持维护一个任意的状态。要实现这一点,只需要两步:
定义状态 – 状态数据可以是任意类型。
定义状态更新函数 – 定义好一个函数,其输入为数据流之前的状态和新的数据流数据,且可其更新步骤1中定义的输入数据流的状态。
在每一个批次数据到达后,Spark都会调用状态更新函数,来更新所有已有key(不管key是否存在于本批次中)的状态。如果状态更新函数返回None,则对应的键值对会被删除。
举例如下。假设你需要维护一个流式应用,统计数据流中每个单词的出现次数。这里将各个单词的出现次数这个整型数定义为状态。我们接下来定义状态更新函数如下:
Function2, Optional
, Optional > updateFunction = new Function2 , Optional
, Optional >() { @Override public Optional call(List values, Optional state) { Integer newSum = ... // add the new values with the previous running count to get the new count return Optional.of(newSum); } };
注意,调用updateStateByKey前需要配置检查点目录. 配置方式见下:
检查点一般来说Streaming 应用都需要7*24小时长期运行,所以必须对一些与业务逻辑无关的故障有很好的容错(如:系统故障、JVM崩溃等)。对于这些可能性,Spark Streaming 必须在检查点保存足够的信息到一些可容错的外部存储系统中,以便能够随时从故障中恢复回来。所以,检查点需要保存以下两种数据:
元数据检查点(Metadata checkpointing) – 保存流式计算逻辑的定义信息到外部可容错存储系统(如:HDFS)。主要用途是用于在故障后回复应用程序本身(后续详谈)。元数包括:
Configuration – 创建Streaming应用程序的配置信息。
DStream operations – 定义流式处理逻辑的DStream操作信息。
Incomplete batches – 已经排队但未处理完的批次信息。
总之,元数据检查点主要是为了恢复驱动器节点上的故障,而数据或RDD检查点是为了支持对有状态转换操作的恢复。
何时启用检查点
使用了有状态的转换算子(Usage of stateful transformations) – 不管是用了 updateStateByKey 还是用了 reduceByKeyAndWindow(有”反归约”函数的那个版本),你都必须配置检查点目录来周期性地保存RDD检查点。
支持驱动器故障中恢复(Recovering from failures of the driver running the application) – 这时候需要元数据检查点以便恢复流式处理的进度信息。
注意,一些简单的流式应用,如果没有用到前面所说的有状态转换算子,则完全可以不开启检查点。不过这样的话,驱动器(driver)故障恢复后,有可能会丢失部分数据(有些已经接收但还未处理的数据可能会丢失)。不过通常这点丢失时可接受的,很多Spark Streaming应用也是这样运行的。
如何配置检查点
检查点的启用,只需要设置好保存检查点信息的检查点目录即可,一般会会将这个目录设为一些可容错的、可靠性较高的文件系统(如:HDFS、S3等)。
第一种:开发者只需要调用 streamingContext.checkpoint(checkpointDirectory)。设置好检查点,你就可以使用前面提到的有状态转换算子了。
第二种:如果你需要你的应用能够支持从驱动器故障中恢复,你可能需要重写部分代码,实现以下行为:
如果程序是首次启动,就需要new一个新的StreamingContext,并定义好所有的数据流处理,然后调用StreamingContext.start()。
如果程序是故障后重启,就需要从检查点目录中的数据中重新构建StreamingContext对象。
// Create a factory object that can create and setup a new JavaStreamingContext JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() { @Override public JavaStreamingContext create() { JavaStreamingContext jssc = new JavaStreamingContext(...); // new context JavaDStreamlines = jssc.socketTextStream(...); // create DStreams ... jssc.checkpoint(checkpointDirectory); // set checkpoint directory return jssc; } }; // Get JavaStreamingContext from checkpoint data or create a new one JavaStreamingContext context = JavaStreamingContext.getOrCreate(checkpointDirectory, contextFactory); // Do additional setup on context that needs to be done, // irrespective of whether it is being started or restarted context. ... // Start the context context.start(); context.awaitTermination();
需要注意的是,RDD检查点会增加额外的保存数据的开销。这可能会导致数据流的处理时间变长。
因此,你必须仔细的调整检查点间隔时间。如果批次间隔太小(比如:1秒),那么对每个批次保存检查点数据将大大减小吞吐量。
另一方面,检查点保存过于频繁又会导致血统信息和任务个数的增加,这同样会影响系统性能。
对于需要RDD检查点的有状态转换算子,默认的间隔是批次间隔的整数倍,且最小10秒。开发人员可以这样来自定义这个间隔:dstream.checkpoint(checkpointInterval)。一般推荐设为批次间隔时间的5~10倍。
transform算子transform算子(及其变体transformWith)可以支持任意的RDD到RDD的映射操作。也就是说,你可以用tranform算子来包装任何DStream API所不支持的RDD算子。例如,将DStream每个批次中的RDD和另一个Dataset进行关联(join)操作,这个功能DStream API并没有直接支持。不过你可以用transform来实现这个功能,可见transform其实为DStream提供了非常强大的功能支持。比如说,你可以用事先算好的垃圾信息,对DStream进行实时过滤。
// RDD containing spam information final JavaPairRDDspamInfoRDD = jssc.sparkContext().newAPIHadoopRDD(...); JavaPairDStream cleanedDStream = wordCounts.transform( new Function , JavaPairRDD >() { @Override public JavaPairRDD call(JavaPairRDD rdd) throws Exception { rdd.join(spamInfoRDD).filter(...); // join data stream with spam information to do data cleaning ... } });
注意,这里transform包含的算子,其调用时间间隔和批次间隔是相同的。所以你可以基于时间改变对RDD的操作,如:在不同批次,调用不同的RDD算子,设置不同的RDD分区或者广播变量等。
基于窗口(window)的算子Spark Streaming同样也提供基于时间窗口的计算,也就是说,你可以对某一个滑动时间窗内的数据施加特定tranformation算子。如下图所示:
如上图所示,每次窗口滑动时,源DStream中落入窗口的RDDs就会被合并成新的windowed DStream。在上图的例子中,这个操作会施加于3个RDD单元,而滑动距离是2个RDD单元。由此可以得出任何窗口相关操作都需要指定一下两个参数:
(窗口长度)window length – 窗口覆盖的时间长度(上图中为3)
(滑动距离)sliding interval – 窗口启动的时间间隔(上图中为2)
注意,这两个参数都必须是DStream批次间隔(上图中为1)的整数倍.
下面咱们举个例子。假设,你需要每隔10秒统计一下前30秒内的单词计数。为此,我们需要在包含(word, 1)键值对的DStream上,对最近30秒的数据调用reduceByKey算子。不过这些都可以简单地用一个 reduceByKeyAndWindow搞定。
// Reduce function adding two integers, defined separately for clarity Function2reduceFunc = new Function2 () { @Override public Integer call(Integer i1, Integer i2) { return i1 + i2; } }; // 每隔10秒归约一次最近30秒的数据 JavaPairDStream windowedWordCounts = pairs.reduceByKeyAndWindow(reduceFunc, Durations.seconds(30), Durations.seconds(10));
以下列出了常用的窗口算子。所有这些算子都有前面提到的那两个参数 – 窗口长度 和 滑动距离。
window(windowLength, slideInterval) 将源DStream窗口化,并返回转化后的DStream
countByWindow(windowLength,slideInterval) 返回数据流在一个滑动窗口内的元素个数
reduceByWindow(func, windowLength,slideInterval) 基于数据流在一个滑动窗口内的元素,用func做聚合,返回一个单元素数据流。func必须满足结合律,以便支持并行计算。
reduceByKeyAndWindow(func,windowLength, slideInterval, [numTasks]) 基于(K, V)键值对DStream,将一个滑动窗口内的数据进行聚合,返回一个新的包含(K,V)键值对的DStream,其中每个value都是各个key经过func聚合后的结果。
注意:如果不指定numTasks,其值将使用Spark的默认并行任务数(本地模式下为2,集群模式下由 spark.default.parallelism决定)。当然,你也可以通过numTasks来指定任务个数。
reduceByKeyAndWindow(func, invFunc,windowLength,slideInterval, [numTasks]) 和前面的reduceByKeyAndWindow() 类似,只是这个版本会用之前滑动窗口计算结果,递增地计算每个窗口的归约结果。当新的数据进入窗口时,这些values会被输入func做归约计算,而这些数据离开窗口时,对应的这些values又会被输入 invFunc 做”反归约”计算。举个简单的例子,就是把新进入窗口数据中各个单词个数“增加”到各个单词统计结果上,同时把离开窗口数据中各个单词的统计个数从相应的统计结果中“减掉”。不过,你的自己定义好”反归约”函数,即:该算子不仅有归约函数(见参数func),还得有一个对应的”反归约”函数(见参数中的 invFunc)。和前面的reduceByKeyAndWindow() 类似,该算子也有一个可选参数numTasks来指定并行任务数。注意,这个算子需要配置好检查点(checkpointing)才能用。
countByValueAndWindow(windowLength,slideInterval, [numTasks]) 基于包含(K, V)键值对的DStream,返回新的包含(K, Long)键值对的DStream。其中的Long value都是滑动窗口内key出现次数的计数。
和前面的reduceByKeyAndWindow() 类似,该算子也有一个可选参数numTasks来指定并行任务数。
最后,值得一提的是,你在Spark Streaming中做各种关联(join)操作非常简单。
1、流-流(Stream-stream)关联
一个数据流可以和另一个数据流直接关联。
JavaPairDStreamstream1 = ... JavaPairDStream stream2 = ... JavaPairDStream > joinedStream = stream1.join(stream2);
上面代码中,stream1的每个批次中的RDD会和stream2相应批次中的RDD进行join。同样,你可以类似地使用 leftOuterJoin, rightOuterJoin, fullOuterJoin 等。此外,你还可以基于窗口来join不同的数据流
JavaPairDStreamwindowedStream1 = stream1.window(Durations.seconds(20)); JavaPairDStream windowedStream2 = stream2.window(Durations.minutes(1)); JavaPairDStream > joinedStream = windowedStream1.join(windowedStream2);
2、流-数据集(stream-dataset)关联
这里举个基于滑动窗口的例子。
JavaPairRDDdataset = ... JavaPairDStream windowedStream = stream.window(Durations.seconds(20)); JavaPairDStream joinedStream = windowedStream.transform( new Function >, JavaRDD >>() { @Override public JavaRDD > call(JavaRDD > rdd) { return rdd.join(dataset); } } );
在上面代码里,你可以动态地该表join的数据集(dataset)。传给tranform算子的操作函数会在每个批次重新求值,所以每次该函数都会用最新的dataset值,所以不同批次间你可以改变dataset的值。
DStream输出算子输出算子可以将DStream的数据推送到外部系统,如:数据库或者文件系统。因为输出算子会将最终完成转换的数据输出到外部系统,因此只有输出算子调用时,才会真正触发DStream transformation算子的真正执行(这一点类似于RDD 的action算子)。目前所支持的输出算子如下表:
print() 在驱动器(driver)节点上打印DStream每个批次中的头十个元素。
saveAsTextFiles(prefix, [suffix]) 将DStream的内容保存到文本文件。
每个批次一个文件,各文件命名规则为 “prefix-TIME_IN_MS[.suffix]”
saveAsObjectFiles(prefix, [suffix]) 将DStream内容以序列化Java对象的形式保存到顺序文件中。
每个批次一个文件,各文件命名规则为 “prefix-TIME_IN_MS[.suffix]”Python API 暂不支持Python
saveAsHadoopFiles(prefix, [suffix]) 将DStream内容保存到Hadoop文件中。
每个批次一个文件,各文件命名规则为 “prefix-TIME_IN_MS[.suffix]”Python API 暂不支持Python
foreachRDD(func) 这是最通用的输出算子了,该算子接收一个函数func,func将作用于DStream的每个RDD上。
func应该实现将每个RDD的数据推到外部系统中,比如:保存到文件或者写到数据库中。
注意,func函数是在streaming应用的驱动器进程中执行的,所以如果其中包含RDD的action算子,就会触发对DStream中RDDs的实际计算过程。
使用foreachRDD的设计模式DStream.foreachRDD是一个非常强大的原生工具函数,用户可以基于此算子将DStream数据推送到外部系统中。不过用户需要了解如何正确而高效地使用这个工具。以下列举了一些常见的错误。
通常,对外部系统写入数据需要一些连接对象(如:远程server的TCP连接),以便发送数据给远程系统。因此,开发人员可能会不经意地在Spark驱动器(driver)进程中创建一个连接对象,然后又试图在Spark worker节点上使用这个连接。如下例所示:
dstream.foreachRDD(new VoidFunction>() { @Override public void call(JavaRDD rdd) { final Connection connection = createNewConnection(); // executed at the driver rdd.foreach(new VoidFunction () { @Override public void call(String record) { connection.send(record); // executed at the worker } }); } });
这段代码是错误的,因为它需要把连接对象序列化,再从驱动器节点发送到worker节点。而这些连接对象通常都是不能跨节点(机器)传递的。比如,连接对象通常都不能序列化,或者在另一个进程中反序列化后再次初始化(连接对象通常都需要初始化,因此从驱动节点发到worker节点后可能需要重新初始化)等。解决此类错误的办法就是在worker节点上创建连接对象。
一个比较好的解决方案是使用 rdd.foreachPartition – 为RDD的每个分区创建一个多带带的连接对象,示例如下:
dstream.foreachRDD(new VoidFunction>() { @Override public void call(JavaRDD rdd) { rdd.foreachPartition(new VoidFunction >() { @Override public void call(Iterator partitionOfRecords) { Connection connection = createNewConnection(); while (partitionOfRecords.hasNext()) { connection.send(partitionOfRecords.next()); } connection.close(); } }); } });
最后,还有一个更优化的办法,就是在多个RDD批次之间复用连接对象。开发者可以维护一个静态连接池来保存连接对象,以便在不同批次的多个RDD之间共享同一组连接对象
dstream.foreachRDD(new VoidFunction>() { @Override public void call(JavaRDD rdd) { rdd.foreachPartition(new VoidFunction >() { @Override public void call(Iterator partitionOfRecords) { // ConnectionPool is a static, lazily initialized pool of connections Connection connection = ConnectionPool.getConnection(); while (partitionOfRecords.hasNext()) { connection.send(partitionOfRecords.next()); } ConnectionPool.returnConnection(connection); // return to the pool for future reuse } }); } });
注意,连接池中的连接应该是懒惰创建的,并且有确定的超时时间,超时后自动销毁。这个实现应该是目前发送数据最高效的实现方式。
注意点:
DStream的转化执行也是懒惰的,需要输出算子来触发,这一点和RDD的懒惰执行由action算子触发很类似。特别地,DStream输出算子中包含的RDD action算子会强制触发对所接收数据的处理。因此,如果你的Streaming应用中没有输出算子,或者你用了dstream.foreachRDD(func)却没有在func中调用RDD action算子,那么这个应用只会接收数据,而不会处理数据,接收到的数据最后只是被简单地丢弃掉了。
默认地,输出算子只能一次执行一个,且按照它们在应用程序代码中定义的顺序执行。
累加器和广播变量首先需要注意的是,累加器(Accumulators)和广播变量(Broadcast variables)是无法从Spark Streaming的检查点中恢复回来的。所以如果你开启了检查点功能,并同时在使用累加器和广播变量,那么你最好是使用懒惰实例化的单例模式,因为这样累加器和广播变量才能在驱动器(driver)故障恢复后重新实例化。
DataFrame和SQL相关算子在Streaming应用中可以调用DataFrames and SQL来处理流式数据。开发者可以用通过StreamingContext中的SparkContext对象来创建一个SQLContext,并且,开发者需要确保一旦驱动器(driver)故障恢复后,该SQLContext对象能重新创建出来。同样,你还是可以使用懒惰创建的单例模式来实例化SQLContext,如下面的代码所示,这里我们将最开始的那个小栗子做了一些修改,使用DataFrame和SQL来统计单词计数。其实就是,将每个RDD都转化成一个DataFrame,然后注册成临时表,再用SQL查询这些临时表。
缓存与持久化机制与RDD类似,Spark Streaming也可以让开发人员手动控制,将数据流中的数据持久化到内存中。对DStream调用persist()方法,就可以让Spark Streaming自动将该数据流中的所有产生的RDD,都持久化到内存中。如果要对一个DStream多次执行操作,那么,对DStream持久化是非常有用的。因为多次操作,可以共享使用内存中的一份缓存数据。
对于基于窗口的操作,比如reduceByWindow、reduceByKeyAndWindow,以及基于状态的操作,比如updateStateByKey,默认就隐式开启了持久化机制。即Spark Streaming默认就会将上述操作产生的Dstream中的数据,缓存到内存中,不需要开发人员手动调用persist()方法。
对于通过网络接收数据的输入流,比如socket、Kafka、Flume等,默认的持久化级别,是将数据复制一份,以便于容错。相当于是,用的是类似MEMORY_ONLY_SER_2。
与RDD不同的是,默认的持久化级别,统一都是要序列化的。
应用监控在Spark web UI上看到多出了一个Streaming tab页,上面显示了正在运行的接收器(是否活跃,接收记录的条数,失败信息等)和处理完的批次信息(批次处理时间,查询延时等)。这些信息都可以用来监控streaming应用。
web UI上有两个度量特别重要:
批次处理耗时(Processing Time) – 处理单个批次耗时
批次调度延时(Scheduling Delay) -各批次在队列中等待时间(等待上一个批次处理完)
如果批次处理耗时一直比批次间隔时间大,或者批次调度延时持续上升,就意味着系统处理速度跟不上数据接收速度。这时候你就得考虑一下怎么把批次处理时间降下来(reducing)。
Spark Streaming程序的处理进度可以用StreamingListener接口来监听,这个接口可以监听到接收器的状态和处理时间。
设置合适的批次间隔要想streaming应用在集群上稳定运行,那么系统处理数据的速度必须能跟上其接收数据的速度。换句话说,批次数据的处理速度应该和其生成速度一样快。对于特定的应用来说,可以从其对应的监控(monitoring)页面上观察验证,页面上显示的处理耗时应该要小于批次间隔时间。
根据spark streaming计算的性质,在一定的集群资源限制下,批次间隔的值会极大地影响系统的数据处理能力。例如,在WordCountNetwork示例中,对于特定的数据速率,一个系统可能能够在批次间隔为2秒时跟上数据接收速度,但如果把批次间隔改为500毫秒系统可能就处理不过来了。所以,批次间隔需要谨慎设置,以确保生产系统能够处理得过来。
要找出适合的批次间隔,你可以从一个比较保守的批次间隔值(如5~10秒)开始测试。要验证系统是否能跟上当前的数据接收速率,你可能需要检查一下端到端的批次处理延迟(可以看看Spark驱动器log4j日志中的Total delay,也可以用StreamingListener接口来检测)。如果这个延迟能保持和批次间隔差不多,那么系统基本就是稳定的。否则,如果这个延迟持久在增长,也就是说系统跟不上数据接收速度,那也就意味着系统不稳定。一旦系统文档下来后,你就可以尝试提高数据接收速度,或者减少批次间隔值。不过需要注意,瞬间的延迟增长可以只是暂时的,只要这个延迟后续会自动降下来就没有问题(如:降到小于批次间隔值)
参考:http://ifeve.com/spark-stream...
http://spark.apache.org/docs/...
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/67260.html
摘要:用来管理文件系统的命名空间,其将所有的文件和文件夹的元数据保存在一个文件系统树中。文件系统镜像元数据镜像文件。其周期性的向元数据节点回报其存储的数据块信息。 Hadoop分布式文件系统(hadoopdistributed filesystem,HDFS)。HDFS是一个高度容错性的系统,适合部署在廉价的机器上。HDFS能提供高吞吐量的数据访问,非常适合大规模数据集上的应用。HDFS可以...
摘要:原文链接这些年,你不能错过的学习资源写在前面本系列是综合了自己在学习过程中的理解记录对参考文章中的一些理解个人实践过程中的一些心得而来。 原文链接:『 Spark 』5. 这些年,你不能错过的 spark 学习资源 写在前面 本系列是综合了自己在学习spark过程中的理解记录 + 对参考文章中的一些理解 + 个人实践spark过程中的一些心得而来。写这样一个系列仅仅是为了梳理个人学习s...
摘要:原文链接简介写在前面本系列是综合了自己在学习过程中的理解记录对参考文章中的一些理解个人实践过程中的一些心得而来。其次,本系列是基于目前最新的系列开始的,目前的更新速度很快,记录一下版本好还是必要的。 原文链接:『 Spark 』1. spark 简介 写在前面 本系列是综合了自己在学习spark过程中的理解记录 + 对参考文章中的一些理解 + 个人实践spark过程中的一些心得而来。写...
阅读 3675·2021-09-22 10:57
阅读 1885·2019-08-30 15:55
阅读 2672·2019-08-30 15:44
阅读 1697·2019-08-30 15:44
阅读 1852·2019-08-30 15:44
阅读 2217·2019-08-30 12:49
阅读 1020·2019-08-29 18:47
阅读 3101·2019-08-29 16:15