1.1 大数据平台优势
横向扩展:大数据技术出现之初所要解决的问题就是数据存储与计算,近年来随着数据量产生速度越来越快,传统平台存储与计算能力遇到瓶颈,而大数据平台是分布式架构,理论上是可以无限扩展的,所以其能更好的适应时代的发展。
资源共享:企业通过使用单一集群,可以化零为整,整合所有可用服务器资源,并统一对外提供所有的能力,可以实现细粒度的资源调度机制。并且只需维护一个集群,降低运维成本。
数据共享:使用单一存储架构,可以将企业内部所有数据集中在一个集群中,方便进行各种业务数据的整合使用,从而充分利用大数据技术全量数据分析的优势。
服务共享:通过统一服务架构,可将一套统一服务设计规则应用到所有的服务实现上,例如一张表数据可以以文件形式共享也能以接口形式接口进行共享,我们进行统一之后各个部门可以以相同方法进行调用使用,避免烟囱式架构,间接减少重复开发成本。
安全保障:通过统一安全架构,在单一集群架构基础上实现细粒度的资源隔离,对不同人员进行不同程度的授权。
1.2 大数据平台需要具备的能力
集群监控与管理:毫无疑问集群是需要能够进行统筹的管理及监控的能力,否则运维团队在做运维时将无从下手。
资源接入:数据是一个企业的核心资源,我们对业务模型的建立,分析,挖掘都需要建立在原始数据之上,而这些数据来源多(日志,关系数据库,爬虫等),类型杂(结构化,半结构化,非结构化数据),体量大,所以大数据平台需要能够对接各种来源及各种类型的海量数据。
数据存储及查询:数据接入之后,就需要根据不同的应用场景进行存储,例如关系型数据模型,非关系型数据模型,文档数据模型,大数据平台需要能够提供不同的存储模型及不同的查询手段。
数据计算:根据不同的应用场景会有不同的计算要求,简单的可以分为离线计算和实时计算,机器学习,多维分析。在数据对时效性要求不高且数据量大的情况下可以选择离线计算。例如报表之类的需求。但对于时效性要求比较高的场景,例如银行的风险控制,就需要选择实时计算模型。机器学习可以使用大数据平台的全量数据进行模型训练,常用于预测,预警,推荐等应用场景。例如今日头条。由于大数据平台数据的互通性使得可以从多个维度对某一事件进行分析,例如从商品,客户,价格,商品折扣,商品促销等多个维度进行分析,从而得出某一商品近期销售额是增加了还是减少了,商品的主要消费群体是什么年龄段的。
大数据平台安全管理:需要具备用户管理与访问控制能力。
任务管理及调度:我们开发的数据抽取,离线计算还是实时计算等都需要以任务形式提交到调度系统,可以进行任务追踪,日志查询key,执行周期性要求等。
2.1 hadoop生态圈
2.2 大数据平台-HDFS
HDFS是一款分布式文件系统,能够存储在廉价的机器上,能够存储海量的文件数据,同时拥有完善的错误恢复机制,其是GFS(谷歌分布式文件系统)的开源实现,可以说HDFS是整个平台架构里的基石。
HDFS是Master/Slave架构,一个HDFS集群式由一个NameNode和一定数目的DataNode组成,NameNode是一个中心服务器,负责管理文件系统的命名空间以及客户端对文件的访问,例如打开,关闭,关闭,重命名文件或目录,负责确定数据块存储到具体哪个DataNode节点,DataNode负责处理文件系统客户端的读写请求,在NameNode的统一调度下进行数据块的创建,删除,复制等。架构如下:
2.3大数据平台-Zookeeper
Zookeeper是一款分布式协同管理框架,如果您在学习hadoop集群时,肯定会问既然多台服务器协同工作,那么如何实现配置同步及Master与Slave之间的通信呢,也就是Slave如何向Master“汇报工作”和Master如何向Slave“分配工作”的呢,如果Master不可用时,通常需要从Slave中在选举出一个节点作为Master,那么这些问题都是如何解决的呢?答案就是Zookeeper,Zookeeper自身拥有高度可靠性,可扩展性和容错性,能够提供统一命名服务,分布式锁,分布式队列,选举,配置同步,心跳检查等功能,一言以概之就是Zookeeper帮助我们管理集群内部琐碎的事情。逻辑架架构图如下(也是主从模式):
2.4大数据平台-Hbase
Hbase是构建在HDFS之上的,分布式的非关系型数据库,基于谷歌BigTable论文的开源实现。一张表能支撑数十亿行和数百万列,还能体现其快速查询的能力。从设计上来说其是由三类服务构成的Master/Slave架构。Master进程负责Region(按照RowKey将表分割成若干个块)的分配,DDL这类操作,Region-Server进程负责数据的读写,底层数据存储和集群协同管理由HDFS、Zookeeper管理,表数据底层也是一个个的存储在HDFS上的Hfile文件。
数据模型:RowKey相当于关系型数据库中的主键,唯一的。ColumnFamily相当于子表的概念,每个Column都必须属于某个ColumnFamily;Column真正定义数据的属性字段。Version概念,每次修改数据就会产生一个新的版本,Hbase默认存储数据的三个版本(是可以更改的),查询默认是最新版本。数据模型如下:
WAL:预写日志是HDFS上的一个文件,是一个容灾策略,Hbase为了提高写性能,在写入数据时并不直接写入磁盘中,而是将数据直接保存在内存中,但内存大小毕竟是有限的,所以当数据存储达到某个阈值时就将数据写入磁盘并清空内存,但是数据存放在内存(MemStore)中并不安全,所以Hbase采用了预写日志方式,当数据丢失时可以根据日志恢复数据,数据写入日志就算写入成功,并且写入日志是对磁盘的顺序写入,所以写入速度是非常快的,正是这种模式既保证了写入速度,也保证了可靠性。
BlockCache:是一种读缓存,客户端读取数据会先从该缓存块中查找数据。Hbase会将一次文件查找的数据块缓存到内存中,以便后续同一个查找请求。
MemStore:是一种写缓存,如上所述,数据写入并不是直接写入磁盘,而是先写入到内存中。
Hfile:是最终数据的存储载体,本质上就是HDFS文件。
2.5 大数据平台-YARN
YARN是一款集群资源调度框架,其是从Mapreduce中独立出来的,Hadoop1.X时mapreduce不仅充当计算框架角色也担当资源管理角色,从Hadoop2.X后就将资源调度功能独立出来,也就是我们要说的YARN。试想一下我们集群中部署了Hbase,Hive,Spark等多个大数据组件,每个组件设计之初都有自己的一套资源调度系统来管理资源的分配,他们都认为自己应该使用100%的服务器资源,但是资源总量就那么多;或者资源分配不合理等问题。所以我们就需要一个统一的资源调度框架来进行统一管理。而YARN就是这样一种框架。
资源模型:YARN利用Container对象作为资源的基本单位,包括资源名称,内存和CPU。Container将资源进行了隔离,每个应用都可以通过ApplicationMaster向ResourceManager申请资源,例如某个Spark计算任务申请到了6个Container资源。
ResourceManager是全局资源管理器,负责整个集群的资源分配。
ApplicationMaster负责跟ResourceManager进行通信,以生申请所需要的资源,例如Spark的Driver进程。
NodeManager是每个节点上的资源管理器,负责自己所在服务器的资源利用的整个生命周期。
YARN工作过程:
1.用户向YARN提交应用程序。
2.ResourceManager为该应用找到一个可用的NodeManager并分配第一个Container,然后再Container中启动这个ApplicationMaster。
3.ApplicationMaster向ResourceManager进行注册,并且采用轮询方式向ResourceManager申请资源,申请到资源后与对应的NodeManager进行通信要求他设置运行环境。
4.任务开始运行并向ApplicationMaster汇报自己的状态和进度。
5.任务执行完毕后,ApplicationMaster向ResourceManager注销并关闭自己。
2.6 大数据平台-Spark
Spark是一款分布式内存计算模型,其计算是基于内存的,所以计算速度较其他计算引擎是很快的,Spark基于一套统一的数据模型(RDD)和编程模型(Trans/Action)之上,构建出SparkSQL,SparkStreaming,Mlib,GraphX等分支。如图:
如上图可以看出SparkCore是Spark的核心,Spark部署可以支持StandAlone模式,Yarn模式,Mesos模式,StandAlone模式就是利用Spark自己的资源管理器,Yarn之前说了是一种通用的资源管理框架,官方推荐的是Mesos模式,但是工作中一般是YARN模式,个人理解可能是YARN模式比较通用吧。
Spark生态圈如下:
从上图可以看出Spark支持Scala,Java,R、Python语言,每种语言都有相应的库进行支持,但是推荐使用Scala语言,兼容性比较好,因为Spark就是用Scala语言编写的。SparkSQL主要用于批处理作业,实际工作中90%以上工作都是SparkSQL完成的,SparkStreaming子模块主要用于对时效性要求比较高的场景,但其是一种近实时,本质上也是一种批处理,只不过是根据时间分成足够小的批。Spark是一种计算引擎,可以看出其数据源可以来自Flume,Kafka,HDFS,文件等,经过计算后可以存储到Hbase,关系型数据库,HDFS等中,一般流处理都会涉及到Kafka,Spark。
Spark架构:
SparkContext是Spark编程的入口,一个JVM中只能有一个SparkContext,如上图的DriverApplication就相当于程序中的Main函数,其向资源管理器申请资源,资源申请后会反馈给Driver进程,之后Driver就可以直接与Executor交流了,Driver分配任务给Executor,实际执行任务的是Executor,Task是最小的执行单元,其数量由分区数量决定。这里只是简单的说明了一下,如果有兴趣,可以理解一下DAG,Stage的划分,Stage又是一组可以并行执行的Task集合。
Spark是一个很重要的组件,其东西也很多,不是一下就能概括完的,像宽窄依赖,存储级别,算子分类,Stage划分,广播变量,累加器等,如果有兴趣还需要更深入的学习。
除了这些介绍的大数据组件,还有很多,向Kafka,Flume,Hive,Sqoop等,也都是比较常用的。
Ambari是一款用于部署、管理和监控Hadoop集群服务的开源系统,实现了以下功能:1.安装hadoop集群,实现了界面化的安装过程。2.管理hadoop集群,提供了启动,停止等功能。3.监控hadoop集群,监控hadoop集群的健康状态,提供了一套健康指标体系收集监控数据,和一套预警矿浆,可以实现预定指标的预警功能。Amari也是Master/Slave架构,由一个Ambari-Server和多个Ambari-Agent组成,但是Ambari-Server安装成功之后也可以通过界面方式来安装Ambari-Agent,支持以下操作系统:Redhat6/7,CentOS6/7,Ubuntu12/14等。接下来我们以在Redhat7上利用Ambari安装HDP为例(有兴趣的也可以了解下在阿里云上利用ClouderaManager安装CDH)。
一、准备工作:
1.需要安装JDK1.7或者1.8,;2.安装Python2.6以上版本,因为会通过python调用一些命令脚本。
二、安装包下载:
wgethttp://public-repo-1.hortonworks.com/ambari/centos6/2.x/updates/2.4.0.1/AMBARI-2.4.0.1-centos6.tar.gz
wgethttp://public-repo-1.hortonworks.com/HDP/centos6/2.x/updates/2.5.0.0/HDP-2.5.0.0-centos6-rpm.tar.gz
wgethttp://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6/HDP-UTILS-1.1.0.21-centos6.tar.gz
三、安装Apache服务器并启动:
yum install httpd
/etc/init.d/httpd start
四、解压:
tar -zxvf HDP-2.5.0.0-centos6-rpm.tar.gz -C /var/www/html/hdp
tar -zxvf HDP-UTILS-1.1.0.21-centos6.tar.gz -C/var/www/html/hdp/HDP-UTILS-1.1.0.21
tar -zxvf AMBARI-2.4.0.1-centos6.tar.gz -C /var/www/html/ambari
五、搭建本地yum源仓库:
创建ambari.repo文件
[Ambari-2.4.0.1]
name=Ambari-2.4.0.1
baseurl=http://server/AMBARI-2.4.0.1/centos6/2.4.0.1-1
gpgcheck=1
gpgkey=http://server/AMBARI-2.4.0.1/centos6/2.4.0.1-1/RPM-GPG-KEY/RPM-GPG-KEY-Jenkins
enabled=1
priority=1
创建hdp.repo文件
HDP-2.5.0.0]
name=HDP-2.5.0.0
baseurl=http://server/home/www/HDP/centos6
path=/
enabled=1
gpgcheck=0
[HDP-UTILS-1.1.0.21]
name=Hortonworks Data Platform Version - HDP-UTILS-1.1.0.21
baseurl=http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6
gpgcheck=1
enabled=1
priority=1
最后将这两个文件复制到所有准备安装hadoop组件的服务器上,yumrepolist查看,如果出现Ambari-2.4.0.1和HDP-2.5.0.0这说明配置成功。
六、关闭防火墙和SELinux
七、配置主机表
在每台服务器上vim/etc/hosts:添加如下信息:
IP1 hostname1
IP2 hostname2
..............
八、安装postgresql-server,因为Ambari需要使用他存储元数据。
yum install postgresql-server
九、执行ambari-serversetup进行相关配置
首先会先检查是否已经禁止了SELinux,如果之前配置好就没什么问题,
还有一个JDK的配置,我安装时候需要使用标准oraclejdk1.7,没有选择自己安装的JDK。这个可能不同系统不一样,需要注意一下
十、安装Ambari-server
yum install ambari-server
十一、启动
默认端口为8080,可以通过vim/etc/ambari-server/conf/ambari.properties
client.api.port=XXX进行修改。
执行ambari-serverstart,打开浏览器输入http://ip:port/可以进入到Ambari界面,接下来hadoop组件都可以通过界面方式安装,就很简单了。
4.1 数据抽取
数据开发过程首先就是数据的抽取,确定使用何种组件或工具来进行抽取,常用抽取方式有Sqoop,flume,kettle等,如果不考虑成本也可以自己写程序实现。
sqoop是一款开源的工具,主要用于关系型数据库和HDFS之间数据的传输,虽然简单但是挺常用的。
flume架构如下:
agent相当于数据包,包括数据来源source,数据缓冲channel,数据输出目标三个部分,agent可以任意组合,如图:
flume支持的数据源有kafka,netcat,file,httpSource等,支持的sink有HDFS,Hive,Kafka等,具体可以参考中文文档:https://flume.liyifeng.org/,如图
脚本配置案例(将文件中的数据抽取到kafka中):
# 初始化
test.sources = testSource
test.channels = testChannel
test.sinks = testSink
# 配置channel
test.channels.testChannel.type = file
test.channels.testChannel.checkpointDir = /var/flume/checkpoint/test
test.channels.testChannel.dataDirs = /var/flume/data/test
#配置source
test.sources.testSource.type = spooldir
test.sources.testSource.deserializer = LINE
test.sources.testSource.deserializer.maxLineLength = 6400
test.sources.testSource.spoolDir = /events/input/intra/test
test.sources.testSource.includePattern =test_[0-9]{4]-[0-9]{2]-[0-9]{2].csv
test.sources.testSource.channels = testChannel
# 定义sink
test.sinks.testSink.type = org.apache.flume.sink.kafka.KafkaSink
test.sinks.testSink.batchSize = 640
test.sinks.testSink.brokerList = sandbox-hdp.hortonworks.com:6667
test.sinks.testSink.topic = test
test.sinks.testSink.channel = testChannel
Kettle是一个开源的ETL工具,不仅包括数据抽取,还包括数据清洗,任务调度等功能,有兴趣可以看“使用PDI构建开源ETL解决方案”这本书。kettle增量抽取数据案例:
4.2 数据建模、清洗
数据抽取过来之后就需要根据应用场景进行建模,数仓的建设,常用维度建模,维度建模关键步骤:1.确定业务过程,比如资产到货。2.声明业务过程的粒度,例如到货清单每一行。3.确定维度,例如到货日期,供应商,资产,部门等。4.确定事实,比如资产价格,数量,运费,折扣等可度量的事实。建模过程一般需要业务部门的参与,首先确定业务活动价值链,比如:
之后创建业务总线矩阵,比如:
之后是创建高层模型,例如:
最后创建维度模型,例如
关于数仓建设,可以看”数据仓库工具箱维度建模权威指南“这本书,里面有大量案例来解释维度建模方法,缓慢变化维,事务事实表,周期事实表,累加事实表等比较重要的概念。
数据模型创建完成就需要对数据进行清洗转换,对于结构化数据一般使用SQL脚本,KETTLE也是支持SQL脚本的,公司的数据平台产品也是集成KETTLE的,使用公司的产品不仅可以利用kettle进行数据开发,还可以进行元数据管理,数据共享等,这是单纯使用KETTLE做不到的。
但是对于非结构化数据,使用SQL就很困难了,这时候还需要使用代码实现了,例如日志数据,爬虫抓取的数据,其结构都是很乱的,使用SQL一般不能很好的进行处理。例如像下面这样:
defprice_clean(extendedData:String,promotion_price:String,priceText:String,coupon:String,shop_coupon:String):String={
var price = ""
//shop_coupon字段
var shop_coupon_price:Double = 0.0
if (!extendedData.equals("-1")) {
val nObject: JSONObject = JSON.parseObject(extendedData)
val string1: String =nObject.getOrDefault("firePhoenixExtending", "-1").toString
if (string1 != "-1") {
val nObject2: JSONObject = JSON.parseObject(string1)
val money = nObject2.getOrDefault("money","-1").toString
if (money != (-1)) {
price = money
}
}
}
else {
price = pricedeal(check_price(promotion_price,priceText))
if (!shop_coupon.equals("-1")){
val array1: JSONArray = JSON.parseArray(shop_coupon)
for (i <- 0 to array1.size()-1){
val str: String = array1.get(i).toString
if (str.contains("满")&&str.contains("元")){
val p1 = str.split("满")(1).split("元")(0)
val p2 = str.split("省")(1).split("元")(0)
if(isIntByRegex(p1)&&isIntByRegex(p2)){
//判断是否满足条件
if (pricedeal(price).toDouble>=p1.toDouble){
shop_coupon_price = p2.toDouble
}
}
}
}
price = (price.toDouble - shop_coupon_price).toString
}
if (!coupon.equals("-1")){
val p1 = coupon.split("满")(1).split("减")(0)
val p2 = coupon.split("满")(1).split("减")(1)
if (pricedeal(price).toDouble>=p1.toDouble){
price = (pricedeal(price).toDouble -(pricedeal(price).toDouble/p1.toDouble).toInt*p2.toDouble).toString
}
}
}
price
//math.round(2.3)
}
4.3 案例分享
Lambda架构是最常用的大数据架构,它是流批分离的,如下:
下面是一个案例分享,数据来源使用Kafkaproducer生产数据,经过SparkStreaming之后入到Mysql数据库。用于模拟上述流处理部分:
项目结构如下:
pom.xml依赖如下:
注意一下scala版本,需与本地scala版本保持一致,否则会报错application.properties配置文件如下:
mysql.url=jdbc:mysql://127.0.0.1:3306/sakila
mysql.username=root
mysql.password=XXX
kafka.bootstrap.servers=10.5.65.83:9092
kafka.input.topic=actor
input.file=E:shareactor.csv
配置文件中定义了mysql信息及Kafka信息,input.file是生产数据的来源。
PropertiesUtil.java工具类:
package com.shsnc.utils;
public class PropertiesUtil {
public static java.util.Properties get_properties(Stringfilename) throws IOException {
InputStream in =PropertiesUtil.class.getResourceAsStream(filename);
java.util.Properties props = new java.util.Properties();
try{
InputStreamReader inputStreamReader = newInputStreamReader(in, "UTF-8");
props.load(inputStreamReader);
}catch (IOException e){
e.printStackTrace();
}
return props;
}
}
工具类用于加载配置文件
MyProduce.java类用于生产数据到kafka:
package com.shsnc.kafka;
import com.shsnc.utils.PropertiesUtil;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.util.Properties;
public class MyProduce {
public void produce() throws Exception {
Properties pro =PropertiesUtil.get_properties("/application.properties");
//assign broker url
String brokerUrl =pro.getProperty("kafka.bootstrap.servers");
String topicName = pro.getProperty("kafka.input.topic");
String fileName = pro.getProperty("input.file");
//create an instance for properties to access theproducer configs
Properties props = new Properties();
props.put("bootstrap.servers", brokerUrl);
props.put("acks", "1");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
//create producer
Producer
try {
//file
BufferedReader br = new BufferedReader( newFileReader(new File(fileName)) );
try {
long pos = 0, count = 0;
String text = br.readLine();
while ( text != null ) {
pos += text.length() + 2;
producer.send( new ProducerRecord
count++;
text = br.readLine();
}
System.out.println( Long.toString(count) + "messages sent." );
}
finally {
br.close();
}
}
catch ( java.lang.Exception e ) {
e.printStackTrace();
}
finally {
producer.close();
}
}
}
ToMysql.java用于将topic数据存入到mysql的actor表中。
package com.shsnc.utils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.DriverManager;
import java.util.Properties;
public class ToMysql {
public static int write( ConsumerRecord
Properties pro =PropertiesUtil.get_properties("/application.properties");
//the jdbc url
String jdbcUrl = pro.getProperty("mysql.url");
//the user name
String user = pro.getProperty("mysql.username");
//the password
String password = pro.getProperty("mysql.password");
//the # of records puts
int numInserts = 0;
//check
if ( jdbcUrl == null || jdbcUrl.isEmpty() ) {
//error out
throw new Exception("The jdbc-url is notinitialized.");
}
Class.forName("com.mysql.jdbc.Driver");
//the connection object
Connection conn = DriverManager.getConnection(jdbcUrl, user,password);
try {
//flags
long passHead = 0;
//parse event record
String[] elements = records.value().split(",",-1 );
String sql = "insert intoactor1(actor_id,first_name,last_name,last_update)values("+elements[0]+","+elements[1]+","+elements[2]+","+elements[3]+")";
System.out.println(sql);
//String sql = "insert intoactor1(actor_id,first_name,last_name,last_update)values(1,b,c,2006-02-15 04:34:33)";
CallableStatement stmt = conn.prepareCall(sql);
try {
stmt.execute();
System.out.println("插入成功!");
numInserts++;
}
finally {
//close
stmt.close();
}
}
finally {
//close the connection
conn.close();
}
return numInserts;
}
}
KafkaSparkStream.java是通过spark实时读取kafka数据并调用toMysql将数据存入到Mysql中。
package com.shsnc.stream;
import com.shsnc.utils.PropertiesUtil;
import com.shsnc.utils.ToMysql;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import java.io.IOException;
import java.util.*;
public class KafkaSparkStream {
public static void getStream(JavaStreamingContext jsc) throwsIOException, InterruptedException {
Properties pro =PropertiesUtil.get_properties("/application.properties");
Map
kafkaParams.put("bootstrap.servers",pro.getProperty("kafka.bootstrap.servers"));
kafkaParams.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
kafkaParams.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
kafkaParams.put("group.id", "test");
kafkaParams.put("auto.offset.reset", "latest");
//kafkaParams.put("auto.offset.reset", "earliest");
kafkaParams.put("enable.auto.commit", "false");
Collection
JavaInputDStream
KafkaUtils.createDirectStream(
jsc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.
);
System.out.println("stream");
JavaDStream jds = stream.map(consumerRecord ->ToMysql.write(consumerRecord));
jds.print();
jds.count();
}
}
Main.java是主函数。
package com.shsnc;
import com.shsnc.stream.KafkaSparkStream;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
public class Main {
public static void main(String[] args) throws Exception {
SparkConf conf = new SparkConf().
setAppName("SparkStreamingOnKafkaDirected").setMaster("local[*]");
JavaStreamingContext jsc = new JavaStreamingContext(conf,Durations.seconds(10));
KafkaSparkStream.getStream(jsc);
jsc.start();
jsc.awaitTermination();
}
}
4.4 测试结果
Mysql目标表为actor1,有actor_id,first_name,last_name,last_update个字段
测试如下:
起初目标表为空:
kafka中topic中也无新数据产生:
现在启动主函数:
可见并没有数据插入到mysql,现在生产10条数据到kafka:
可见kakfa中已经新产生了10条数据,那么这新增的10条数据是否被spark读取并存入到了mysql呢?如图:
这是控制台日志,可见已经数据已经被spark读取到并插入到mysql中,再验证mysql的actor1表:
可见mysql中确实插入成功了,如果之后kafka中有新数据产生,最终都会被sparkstreaming处理并存储到mysql中,不过例子中没有偏移量的管理,如果程序终止(宕机)就可能导致数据的丢失。
使用kettle的批处理案例如下:
批处理中也包含了数据抽取,数据清洗转换,数据装载的过程。Kappa架构去除了批处理,只保留了流处理,而Flink可以实现批流一体,感觉Flink可以很好的实现Kappa架构。
数据的最终目的还是应用,数据主要用于数据分析(报表,应用系统等)、数据共享、业务创新(反哺业务)、机器学习(预测、推荐系统等)。
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/130062.html
摘要:年月日日,由高可用架构技术社区联合麦思博有限公司共同主办的全球互联网架构大会在上海光大会展中心成功举行。至此,全球互联网架构大会完美落幕。 showImg(https://segmentfault.com/img/bV1mnC?w=800&h=533); 2017年12月22日-23日,由高可用架构技术社区联合麦思博(msup)有限公司共同主办的 GIAC全球互联网架构大会在上海光大会...
摘要:年月日日,由高可用架构技术社区联合麦思博有限公司共同主办的全球互联网架构大会在上海光大会展中心成功举行。至此,全球互联网架构大会完美落幕。 showImg(https://segmentfault.com/img/bV1mnC?w=800&h=533); 2017年12月22日-23日,由高可用架构技术社区联合麦思博(msup)有限公司共同主办的 GIAC全球互联网架构大会在上海光大会...
摘要:月日北京站在天使汇咖啡如期举行。董伟以非常接地气的方式介绍什么是云计算以及云计算平台任务调度系统,包括调度系统中有哪些任务和需求点以及框架设计。通过几个维度介绍云计算平台资源调度是如何进化等。 8 月 29 日 SegmentFault D-Day 北京站在天使汇 DotGeek 咖啡如期举行。本次沙龙邀请到 API Cloud CTO 邹达、QingCloud 高级工程师 Ray、灵...
摘要:北京时间月日月日,由和中国国际人才交流基金会联合主办的第七届全球软件案例研究峰会简称在北京国家会议中心圆满落幕。本届峰会,来自阿里美团百度平安银行等企业的讲师分别从企业转型及研发效能方面分享敏捷和的实践细节和操作经验。 北京时间11月30日-12月3日,由msup和中国国际人才交流基金会联合主办的第七届全球软件案例研究峰会(简称:TOP100summit)在北京国家会议中心圆满落幕。T...
阅读 1251·2023-01-11 13:20
阅读 1566·2023-01-11 13:20
阅读 1019·2023-01-11 13:20
阅读 1702·2023-01-11 13:20
阅读 3973·2023-01-11 13:20
阅读 2546·2023-01-11 13:20
阅读 1356·2023-01-11 13:20
阅读 3494·2023-01-11 13:20