资讯专栏INFORMATION COLUMN

pyspark底层浅析

FrozenMap / 2575人阅读

摘要:底层浅析简介是官方提供的接口,同时也是中的一个程序。这里一提,对于大部分机器学习算法,你都会看到模块与模块都提供了接口,它们的区别在于模块接受格式的数据而模块接受格式的数据。

pyspark底层浅析 pyspark简介

pyspark是Spark官方提供的API接口,同时pyspark也是Spark中的一个程序。

在terminal中输入pyspark指令,可以打开python的shell,同时其中默认初始化了SparkConf和SparkContext.

在编写Spark应用的.py文件时,可以通过import pyspark引入该模块,并通过SparkConf对Spark的启动参数进行设置。不过,如果你仅完成了Spark的安装,直接用python指令运行py文件并不能检索到pyspark模块。你可以通过pip等包管理工具安装该模块,也可以直接使用pyspark(新版本已不支持)或spark-submit直接提交.py文件的作业。

pyspark program

这里指的是spark中的bin/pyspark,github地址 。

实际上pyspark只不过解析了命令行中的参数,并进行了python方面的设置,然后调用spark-submit

exec "${SPARK_HOME}"/bin/spark-submit pyspark-shell-main --name "PySparkShell" "$@"

在较新一些的版本如Spark2.2中,已经不支持用pyspark运行py脚本文件,一切spark作业都应该使用spark-submit提交。

pyspark module

Spark是用scala编写的框架,不过考虑到主要是机器学习的应用场景,Spark官方提供了可以用python的API。但是,一方面,python的API是不全的,即不是所有的scala的函数都可以用pyspark调用到,虽然新的API也在随着版本迭代不断开放;另一方面,pyspark模块,对于很多复杂算法,是通过反射机制调用的Spark中JVM里正在运行的scala编写的类、方法。所以,如果你将频繁应用spark于业务或研究,建议学习直接使用scala语言编写程序,而不是python。

这篇博客并不会讲述如何去使用pyspark来编写python的spark应用。各类API以及模块如何使用,你完全可以前往官方文档查看。这里的链接是最新版pyspark的文档,如果你的机器上的spark不是最新版,请去找对应版本的pyspark文档。因为正如我上面所说,不同版本的pyspark逐步开放了新的API并有对旧API进行改进,你在最新版本看到的类、函数,不一定能在旧版本使用。这里一提,对于大部分机器学习算法,你都会看到ml模块与mllib模块都提供了接口,它们的区别在于ml模块接受DataFrame格式的数据而mllib模块接受RDD格式的数据。

关于pyspark底层,这里主要探索两个地方。一个是其初始化时的工作,一个是其对JVM中scala代码的调用

SparkContext

SparkContext类在pyspark/context.py中,在python代码里通过初试化该类的实例来完成Spark的启动与初始化。这个类的__init__方法中执行了下面几行代码

        self._callsite = first_spark_call() or CallSite(None, None, None)
        SparkContext._ensure_initialized(self, gateway=gateway)
        try:
            self._do_init(master, appName, sparkHome, pyFiles, environment, batchSize, serializer,
                          conf, jsc, profiler_cls)
        except:
            # If an error occurs, clean up in order to allow future SparkContext creation:
            self.stop()
            raise

first_spark_call和CallSite方法都是用来获取JAVA虚拟机中的堆栈,它们在pyspark/traceback_util.py中。

之后调用了类函数_ensure_initialized函数,对Spark的Java的gate_way和jvm进行设置。
最后调用了类中的_do_init_函数,从函数就可以看出是对内部类成员SparkConf的实例_conf函数进行设置,判断各参数值是否为None,非空的话就进行设置,并读取一些本地的python环境参数,启动Spark。

调用JVM类与方法

以mllib库为例,主要逻辑都在pyspark/mllib/common.py中。你去查看mllib模块中机器学习算法的类与函数,你会发现基本都是使用self.call或者callMLlibFunc,将函数名与参数传入。

各类模型的Model类都继承自common.JavaModelWrapper,这个类代码很短:

class JavaModelWrapper(object):
    """
    Wrapper for the model in JVM
    """
    def __init__(self, java_model):
        self._sc = SparkContext._active_spark_context
        self._java_model = java_model

    def __del__(self):
        self._sc._gateway.detach(self._java_model)

    def call(self, name, *a):
        """Call method of java_model"""
        return callJavaFunc(self._sc, getattr(self._java_model, name), *a)

_java_model是来自Java或Scala的类的实例,在调用对应的训练算法时由对应的scala代码在末尾将这些类初始化并返回,其关键的类方法call,同callMLLibFunc方法一样,都是调用了callJavaFunc的方法。对于调用某一类的方法,是运用python的getattr函数,将类实例与方法名传入,使用反射机制获取函数;而对于调用一些不属于类的方法,即使用callMLLibFunc时,是传入的PythonMLLibAPI类的实例以及方法名,来获取函数:

def callMLlibFunc(name, *args):
    """ Call API in PythonMLLibAPI """
    sc = SparkContext.getOrCreate()
    api = getattr(sc._jvm.PythonMLLibAPI(), name)
    return callJavaFunc(sc, api, *args)

最终callJavaFunc做的也很简单,将python的参数*a,使用_py2java方法转换为java的数据类型,并执行函数,再将结果使用_java2py方法转换为python的数据类型返回:

def callJavaFunc(sc, func, *args):
    """ Call Java Function """
    args = [_py2java(sc, a) for a in args]
    return _java2py(sc, func(*args))

这里的_java2py,对很多数据格式的支持不是很好,所以当你尝试用底层的call方法调用一些pyspark尚未支持但scala中已经有的函数时,可能在scala部分可以执行,但是python的返回结果却不尽如人意。

ml模块的调用机制与mllib的机制有些许的不同,但本质上都还是去调用在Spark的JVM中scala代码的class。

总结

本篇博客其实说的非常简单,pyspark即使是不涉及具体算法的部分,也还有很多内容尚未讨论。这里仅是对pyspark产生一个初步的认识,同时简单分析了一下底层对scala的调用过程。
你兴许会有这样的疑问--“去看这些源代码有什么用呢?好像就算知道这些,实际使用时不还是用一下API就好了吗?”。
实际上,看源代码首先的就是满足一下好奇心,对Spark有一个更充分的了解;其次关于具体用途,我举个例子,很多情况你使用的集群可能不是最新版本的,因为复杂的配置导致一般而言也不可能有一个新版本就更新一次,这时你想用新版本的API怎么办?看了这篇博客想必你也会有一些“大胆的想法”。后一篇博客会举例说明我在实际工作中相关的一个问题,以及如何利用这些源码去解决的。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/40956.html

相关文章

  • PySpark SQL 相关知识介绍

    摘要:大数据除了体积和速度外,数据的多样性和准确性也是大数据的一大特点。这些也被称为大数据的特征。介绍是一个解决大数据问题的分布式可伸缩的框架。介绍计算的模型最早出现在谷歌的一篇研究论文中。相关链接介绍是一个通用的分布式编程框架。 本文作者:foochane 本文链接:https://foochane.cn/article/2019060601.html 1 大数据简介 大数据是这个时代最...

    CoderStudy 评论0 收藏0
  • 【技术性】OO语言知识

    摘要:篇分布计算提高效率的库及库函数,比如的库就有一大堆函数,本质上和的分布式计算的底层思想是一致的。篇特别适用于搭,比如的用于和在用的,其实根本上都是用了的脚本特性,串联起来。的种常见操作增删找值相当于执行了这个命令然后可以用函数来, 持续更新。--------------------C++篇------------------------ 分布计算提高效率的库及库函数,比如FB的foll...

    Cobub 评论0 收藏0
  • 【技术性】OO语言知识

    摘要:篇分布计算提高效率的库及库函数,比如的库就有一大堆函数,本质上和的分布式计算的底层思想是一致的。篇特别适用于搭,比如的用于和在用的,其实根本上都是用了的脚本特性,串联起来。的种常见操作增删找值相当于执行了这个命令然后可以用函数来, 持续更新。--------------------C++篇------------------------ 分布计算提高效率的库及库函数,比如FB的foll...

    Genng 评论0 收藏0
  • pyspark中调用scala代码

    摘要:由于使用的是天河二号,版本是,同样,所以获取主题时还不能使用在中才开放对的接口,只能使用的方法。本来做并行化就是希望效率更高,却在调用代码,同时进行了很多数据转换。 在pyspark中调用scala代码 情境说明 问题 我们这边是要使用Spark去并行一个自然语言处理的算法,其中使用到了LDA主题模型。由于使用的是天河二号,Spark版本是1.5.1,pyspark同样,所以获取主题时...

    alanoddsoff 评论0 收藏0
  • AI如何改变智能城市物联网?

    摘要:如何改变智能城市物联网来源愿码内容编辑愿码连接每个程序员的故事网站愿码愿景打造全学科系统免费课程,助力小白用户初级工程师成本免费系统学习低成本进阶,帮助一线资深工程师成长并利用自身优势创造睡后收入。 AI如何改变智能城市物联网? showImg(https://segmentfault.com/img/remote/1460000018768732); 来源 | 愿码(ChainDe...

    csRyan 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<