一丶目标
- 掌握sparksql原理
- 掌握DataFrame和DataSet的数据结构和使用方式
- 使用spark sql进行相应的编程
shark
- 基于spark的大数据规模底层数据仓库
- 依赖于hive底层代码,依赖于spark的版本
- 随着hive和spark版本的升级,对于shark的优化和维护带来了很大的影响
二丶 Spark SQL
1. spark SQL介绍
概述
- Spark SQL 是spark用来处理结构化数据的一个模块,提供了一个编程抽象DataFrame,并且可以做为分布式SQL查询引擎的作用
- 使用: SQL丶 DataFrame API丶 Datasets API
特性
- Intergrated (易整合)
- 可以使用sql,dataFrame,dataSet来操作
- 也可以使用4种语言来编程(java/scala/phthon/R)
- Uniform Data Access (统一的数据访问方式)
- Hive Intergration (兼容 Hive)
- Standard Connectivity (标准的数据库链接)
2. DataFrame
概述
- DataFrame是sparksql的底层抽象.可以理解为关系数据库中的二维表格,具有schema描述信息.spark1.3引入
- 在spark中,DataFrame是一种以RDD为基础的分布式数据集
DataFrame与RDD的优缺点比较
RDD
- 优点
- 编译时类型安全
- 面相对象的编程风格
- 缺点
- 序列化和反序列化的性能开销大 — 无论是集群间的通信或者是IO操作,都会进行数据和结构的序列化和反序列化操作
- GC的性能开销 — 频繁的创建和销毁对象
DataFrame
- 通过引入schema和off-heap(指不在堆的内存,使用操作系统上的内存),解决了RDD的缺点
- Spark通过schema就能读懂数据,在通信和IO时只需要序列化和反序列化,二结构的部分可以省略
- 通过off-heap可以快速操作数据,避免大量的GC
- 但是丢失了RDD的优点,DataFrame是类型不安全的,API也不是面向对象风格
DataFrame创建方式
准备数据
1 | person.txt |
读取文本文件创建DataFrame
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16val rdd1 = sc.textFile("person.txt").map(_.split(" "))
case class People(id:Int,name:String,age:Int)
//rdd1 与样例类关联
val rdd2 = rdd1.map(x=>People(x(0).toInt,x(1),x(2).toInt))
// 将RDD转换成DataFrame
val dataFrame = rdd2.toDF
//DataFrame处理
dataFrame.show
dataFrame.printSchema
//第二种方式,利用sparkSession
val df = spark.read.text("xx.txt").show()读取jaon文件 创建DataFrame
1
spark.read.json("xx.json").show()
读取 parquet文件 创建DataFrame
1
spark.read.parquet("xx.parquet").show()
Spark session创建
1
spark.createDataSpark()
3. DataFrame常用操作
- DSL : 特定领域的语言
- SQL :
- 需要将dataframe注册成一张表 registerTemple
- spark.sql(“sql语句”)
4. Dataset
4.1 概述
- 分布式数据集合,在spark1.6中添加的新的接口
- 集中了RDD的优点(强类型和可以用lambda函数)
4.2 DataFrame丶Dataset丶RDD的区别
Dataset包含了DataFrame的功能,在spark2.0 中两者统一,DataFrame表示为Dataset[Row],即DataSet的子集
- DataSet可以在编译时检查
- 面相对象的编程接口
4.3 DataFrame和Dataset相互转换
DataFrame转为DataSet
df.as[ElementType]
Dataset转为DataFrame
ds.toDF()
4.4 创建Dataset
- 通过spark.createDataset创建
- 通toDS方法生成DataSet
- 通过DataFrame转换
三丶 以编程的方式执行Spark SQL查询
1.
通过反射机制
1 | //todo: 利用sparkSQL来创建DataFrame---反射机制(case class) |
通过StructType直接指定Schema
- 将RDD转换为包含ROW对象的RDD
- 基于structType类型创建schema,与第一步创建的RDD相匹配
- 通过sparkSession的createDataFrame
1 | //todo: 利用sparkSql 将RDD转换为DataFrame --- 通过StructType直接指定Schema |
2. 利用sparkSQL操作HIVE SQL
1 | val warehouseLocation = "D:\\workSpace_IDEA_NEW\\day2017-10-12\\spark-warehouse" |
四丶数据源
1. sparkSQL从mysql中加载数据
1 | //todo:1、创建sparkSession对象 |
2. sparkSQL将数据写入到mysql中
1 | //todo:1、创建sparkSession对象 |