资讯专栏INFORMATION COLUMN

在pyspark中调用scala代码

alanoddsoff / 848人阅读

摘要:由于使用的是天河二号,版本是,同样,所以获取主题时还不能使用在中才开放对的接口,只能使用的方法。本来做并行化就是希望效率更高,却在调用代码,同时进行了很多数据转换。

在pyspark中调用scala代码 情境说明 问题

我们这边是要使用Spark去并行一个自然语言处理的算法,其中使用到了LDA主题模型。由于使用的是天河二号,Spark版本是1.5.1,pyspark同样,所以获取主题时还不能使用describeTopics(在spark1.6中才开放对python的接口),只能使用topicsMatrix的方法。

本来凑合用topicsMatrix也行,但我们发现,这一个用来获取主题模型的函数,居然比Lda的训练还要慢!无论在我们自己的集群还是在天河二号的分区上,都是这一个情况。观察topicsMatrix的源代码,好像也没有什么复杂操作,只是把数据汇总collect而已:

@Since("1.3.0")
override lazy val topicsMatrix: Matrix = {
  // Collect row-major topics
  val termTopicCounts: Array[(Int, TopicCounts)] =
    graph.vertices.filter(_._1 < 0).map { case (termIndex, cnts) =>
    (index2term(termIndex), cnts)}.collect()
  // Convert to Matrix
  val brzTopics = BDM.zeros[Double](vocabSize, k)
  termTopicCounts.foreach { case (term, cnts) =>
    var j = 0
    while (j < k) {
      brzTopics(term, j) = cnts(j)
      j += 1
    }
  }
  Matrices.fromBreeze(brzTopics)
}

由于并不是算法中有一些复杂运算导致较慢,我们自然不希望在程序中有这样的情况。发现到在Spark1.5.1中,mllib中LdaModel已经实现了describeTopics,只是未在Python中开放,我们自然希望尝试使用describeTopics看看效果。

describeTopics的源代码探索

已知LDA.train()返回的是LdaModel的实例,于是乎,参考上篇博客,用以下方式去调用:

model = LDA.train(rdd_data, k=num_topics, maxIterations=20)
topics = model.call("describeTopics", _py2java(sc, 10))

执行速度特别快,然而返回的结果却不尽如人意,仅返回了一个长度k的列表,每个元素是一个key为"class",value为"scala.Tuple2"的单元素字典。从结果来看,scala的代码应该是被成功执行了,然而返回结果却出了问题。查看callJavaFunc的内容,可以判断出,是describeTopics的返回结果没有被_java2py函数正常的转换。

比对Spark1.5和Spark1.6的代码,LdaModel.describeTopics函数的内容是一致的,那么问题在哪儿呢?再去查看pyspark的LDA.train()调用的PythonMLLibAPI.trainLdaModel,发现在1.6中返回的不再是LdaModel而是它的子类LdaModelWrapper。查看这个类的方法,发现它重载了describeTopics来方便_java2py进行数据转换:

private[python] class LDAModelWrapper(model: LDAModel) {

  def topicsMatrix(): Matrix = model.topicsMatrix

  def vocabSize(): Int = model.vocabSize

  def describeTopics(): Array[Byte] = describeTopics(this.model.vocabSize)

  def describeTopics(maxTermsPerTopic: Int): Array[Byte] = {
    val topics = model.describeTopics(maxTermsPerTopic).map { case (terms, termWeights) =>
      val jTerms = JavaConverters.seqAsJavaListConverter(terms).asJava
      val jTermWeights = JavaConverters.seqAsJavaListConverter(termWeights).asJava
      Array[Any](jTerms, jTermWeights)
    }
    SerDe.dumps(JavaConverters.seqAsJavaListConverter(topics).asJava)
  }

  def save(sc: SparkContext, path: String): Unit = model.save(sc, path)
}

找到这里,解决方法就油然而生了。只要我们把这一段scala代码在python中调用,并将describeTopics的Java对象传入,不就万事大吉了吗?

在pyspark中调用scala代码

也许还有别的方法,不过这里使用的方法也足够简单。将.scala文件打包成jar后,启动spark时加入参数--driver-class-path /path/to/xxx.jar,便可以将你的scala代码放入Spark运行的虚拟机JVM中,从而让python代码在运行中通过反射机制在SparkContext._jvm里动态获取到你的类与方法:

func = sc._jvm.com.example.YourObject.func
打包scala代码

那么,现在的问题就是如何把scala代码打包成jar了。scala虽然也是基于JVM运行的语言,与java非常相似,但是其编译选项中并没有提供将其打包成jar的参数。这里我们用sbt打包它,sbt的下载与安装请自行查阅其他教程,这里就不提供了,官方网站。

首先编写好你的scala代码,确认没有bug,并在文件开头用package关键字将其封装至包中。接着,请手动建立你的项目目录,并创建如下结构:

在build.sbt中,请至少进行以下设置

//项目名
name := "Project"

//项目版本
version := "0.1"

//scala版本
scalaVersion := "2.10.5"

//jdk版本
javacOptions ++= Seq("-source", "1.7", "-target", "1.7")

//主函数
mainClass in Compile := Some("YourClass.func")

在plugins.sbt中,请加上这一句话,告诉sbt需要这个第三方插件,这是用来打包的

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.5")

这些都准备完成后,在terminal里进入你的项目根目录下,输入

sbt package

等待打包完成,会有相应提示。
更多的打包选项,以及sbt的更多用法,感兴趣可以自行查阅。

解决我们的问题

回到我们这里的问题,我们希望能在python中对describeTopics的返回值进行转换,那么我么只需要打包那一个重载的describeTopics就好了,这样可以避免打包Spark的第三方包。更改一下函数的返回值,并注释掉调用Spark的SerDe进行序列化的语句,最终的代码如下:

package com.sysu.sparkhelper

import java.util.List
import scala.collection.JavaConverters

object LdaHelper {
    def convert(topics: Array[(Array[Int], Array[Double])]): List[Array[Any]] = {
        val result = topics.map { case (terms, termWeights) =>
          val jTerms = JavaConverters.seqAsJavaListConverter(terms).asJava
          val jTermWeights = JavaConverters.seqAsJavaListConverter(termWeights).asJava
          Array[Any](jTerms, jTermWeights)
        }
        return JavaConverters.seqAsJavaListConverter(result).asJava
        // SerDe.dumps(JavaConverters.seqAsJavaListConverter(result).asJava)
    }
}

用sbt打包完成后,使用--driver-class-path添加jar包,在python中相应代码为:

lda_java_model = model._java_model
func = getattr(model._java_model, "describeTopics")
result = func(_py2java(sc, 10))
topics = _java2py(sc, sc._jvm.com.sysu.sparkhelper.LdaHelper.convert(result))
总结

这算是阅读源码的一次应用,可以说还是解决了遇到的问题,同时也加深了对Spark的了解。
本来做并行化就是希望效率更高,pyspark却在调用scala代码,同时进行了很多数据转换。想要更好的使用Spark的话,使用scala去编程应该才是最好的。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/40955.html

相关文章

  • pyspark底层浅析

    摘要:底层浅析简介是官方提供的接口,同时也是中的一个程序。这里一提,对于大部分机器学习算法,你都会看到模块与模块都提供了接口,它们的区别在于模块接受格式的数据而模块接受格式的数据。 pyspark底层浅析 pyspark简介 pyspark是Spark官方提供的API接口,同时pyspark也是Spark中的一个程序。 在terminal中输入pyspark指令,可以打开python的she...

    FrozenMap 评论0 收藏0
  • CentOS7 install spark+ipython-nodebook

    摘要:使用浏览器作为界面,向后台的服务器发送请求,并显示结果。本文主要介绍在上安装流程该文件是用户登录时,操作系统定制用户环境时使用的第一个文件,应用于登录到系统的每一个用户。 ipython-nodebook IPython notebook 目前已经成为用 Python 做教学、计算、科研的一个重要工具。 IPython Notebook 使用浏览器作为界面,向后台的 IPython ...

    soasme 评论0 收藏0
  • Spark的安装及配置

    摘要:本文作者本文链接安装说明在安装之前,需要安装集群环境,如果没有可以查看分布式集群的搭建用到的软件软件版本下载地址节点安排名称主节点子节点子节点安装解压到安装目录修改配置文件配置文件位于目录下。 本文作者:foochane 本文链接:https://foochane.cn/article/2019051904.html 1 安装说明 在安装spark之前,需要安装hadoop集群环境,...

    lunaticf 评论0 收藏0
  • Spark入门阶段一之扫盲笔记

    摘要:同时集成了机器学习类库。基于计算框架,将的分布式计算应用到机器学习领域。提供了一个简单的声明方法指定机器学习任务,并且动态地选择最优的学习算法。宣称其性能是的多倍。 介绍 spark是分布式并行数据处理框架 与mapreduce的区别: mapreduce通常将中间结果放在hdfs上,spark是基于内存并行大数据框架,中间结果放在内存,对于迭代数据spark效率更高,mapred...

    starsfun 评论0 收藏0

发表评论

0条评论

alanoddsoff

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<