摘要:已更新至,归管了,因此也相应统一。本文不再适用及以上版本。字段类型会非常非常奇葩。。。。但是如果体积过于庞大,很容易导致特别是我们一般不会给配置过高的内存。第二个,是函数的返回值。对于而言,我们可以直接使用,来得到这个什么都没有的东西。
Spark已更新至2.x,DataFrame归DataSet管了,因此API也相应统一。本文不再适用2.0.0及以上版本。
DataFrame原生支持直接输出到JDBC,但如果目标表有自增字段(比如id),那么DataFrame就不能直接进行写入了。因为DataFrame.write().jdbc()要求DataFrame的schema与目标表的表结构必须完全一致(甚至字段顺序都要一致),否则会抛异常,当然,如果你SaveMode选择了Overwrite,那么Spark删除你原有的表,然后根据DataFrame的Schema生成一个。。。。字段类型会非常非常奇葩。。。。
于是我们只能通过DataFrame.collect(),把整个DataFrame转成List
翻看Spark的JDBC源码,发现实际上是通过foreachPartition方法,在DataFrame每一个分区中,对每个Row的数据进行JDBC插入,那么为什么我们就不能直接用呢?
Spark JdbcUtils.scala部分源码:
def saveTable(df: DataFrame,url: String,table: String,properties: Properties = new Properties()) { val dialect = JdbcDialects.get(url) val nullTypes: Array[Int] = df.schema.fields.map { field => dialect.getJDBCType(field.dataType).map(_.jdbcNullType).getOrElse( field.dataType match { case IntegerType => java.sql.Types.INTEGER case LongType => java.sql.Types.BIGINT case DoubleType => java.sql.Types.DOUBLE case FloatType => java.sql.Types.REAL case ShortType => java.sql.Types.INTEGER case ByteType => java.sql.Types.INTEGER case BooleanType => java.sql.Types.BIT case StringType => java.sql.Types.CLOB case BinaryType => java.sql.Types.BLOB case TimestampType => java.sql.Types.TIMESTAMP case DateType => java.sql.Types.DATE case t: DecimalType => java.sql.Types.DECIMAL case _ => throw new IllegalArgumentException( s"Can"t translate null value for field $field") }) } val rddSchema = df.schema val driver: String = DriverRegistry.getDriverClassName(url) val getConnection: () => Connection = JDBCRDD.getConnector(driver, url, properties) // ****************** here ****************** df.foreachPartition { iterator => savePartition(getConnection, table, iterator, rddSchema, nullTypes) } }
嗯。。。既然Scala能实现,那么作为他的爸爸,Java也应该能玩!
我们看看foreachPartition的方法原型:
def foreachPartition(f: Iterator[Row] => Unit)
又是函数式语言最爱的匿名函数。。。非常讨厌写lambda,所以我们还是实现个匿名类吧。要实现的抽象类为:
scala.runtime.AbstractFunction1
来玩耍一下吧!
df.foreachPartition(new AbstractFunction1, BoxedUnit>() { @Override public BoxedUnit apply(Iterator it) { while (it.hasNext()){ System.out.println(it.next().toString()); } return BoxedUnit.UNIT; } });
嗯,maven complete一下,spark-submit看看~
好勒~抛异常了
org.apache.spark.SparkException: Task not serializable
Task不能被序列化
嗯哼,想想之前实现UDF的时候,UDF1/2/3/4...各接口,都extends Serializable,也就是说,在Spark运行期间,Driver会把UDF接口实现类序列化,并在Executor中反序列化,执行call方法。。。这就不难理解了,我们foreachPartition丢进去的类,也应该implements Serializable。这样,我们就得自己搞一个继承AbstractFunction1
import org.apache.spark.sql.Row; import scala.runtime.AbstractFunction1; import scala.runtime.BoxedUnit; import java.io.Serializable; public abstract class JavaForeachPartitionFunc extends AbstractFunction1, BoxedUnit> implements Serializable { }
可是每次都要return BoxedUnit.UNIT 搞得太别扭了,没一点Java的风格。
import org.apache.spark.sql.Row; import scala.collection.Iterator; import scala.runtime.AbstractFunction1; import scala.runtime.BoxedUnit; import java.io.Serializable; public abstract class JavaForeachPartitionFunc extends AbstractFunction1, BoxedUnit> implements Serializable { @Override public BoxedUnit apply(Iterator it) { call(it); return BoxedUnit.UNIT; } public abstract void call(Iterator
it); }
于是我们可以直接Override call方法,就可以用满满Java Style的代码去玩耍了!
df.foreachPartition(new JavaForeachPartitionFunc() { @Override public void call(Iteratorit) { while (it.hasNext()){ System.out.println(it.next().toString()); } } });
注意!我们实现的匿名类的方法,实际上是在executor上执行的,所以println是输出到executor机器的stdout上。这个我们可以通过Spark的web ui,点击具体Application的Executor页面去查看(调试用的虚拟机集群,手扶拖拉机一样的配置,别吐槽了~)
至于foreach方法同理。只不过把Iterator
have fun~
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/65979.html
摘要:是中处理结构化数据的模块。可以从很多数据源加载数据并构造得到,如结构化数据文件,中的表,外部数据库,或者已有的。使用反射机制,推导包含指定类型对象的。这一功能应该优先于使用。随后,将会扫描必要的列,并自动调整压缩比例,以减少内存占用和压力。 Spark SQL是Spark中处理结构化数据的模块。与基础的Spark RDD API不同,Spark SQL的接口提供了更多关于数据的结构信息...
摘要:本文发于我的个人博客知识点大全与实战我正在大数据技术派和朋友们讨论有趣的话题,你也来加入吧概述什么是是用于结构化数据处理的模块。是最新的查询起始点,实质上是和的组合,所以在和上可用的在上同样是可以使用的。 关注公众号:大数据技术派,回复资料,领取1000G资料。本文发于我的个人博客:Spark SQL知识点大全...
摘要:是最新的查询起始点,实质上是和的组合,所以在和上可用的在上同样是可以使用的。转换为转换为其实就是对的封装,所以可以直接获取内部的注意此时得到的存储类型为是具有强类型的数据集合,需要提供对应的类型信息。Spark SQL概述1、什么是Spark SQLSpark SQL是Spark用于结构化数据(structured data)处理的Spark模块。与基本的Spark RDD API不同,Sp...
阅读 1138·2021-11-24 09:38
阅读 3584·2021-11-22 15:32
阅读 3435·2019-08-30 15:54
阅读 2548·2019-08-30 15:53
阅读 1470·2019-08-30 15:52
阅读 2419·2019-08-30 13:15
阅读 1814·2019-08-29 12:21
阅读 1330·2019-08-26 18:36