RDD操作详解
启动spark-shell
spark-shell –master spark://hdp-node-01:7077
基本转换
1) map
map是对RDD中的每个元素都执行一个指定的函数来产生一个新的RDD。 任何原RDD中的元素在新RDD中都有且只有一个元素与之对应。
举例:
1 | scala> val a =sc.parallelize(1 to 9, 3) |
上述例子中把原RDD中每个元素都乘以2来产生一个新的RDD。
2) filter
filter 是对RDD中的每个元素都执行一个指定的函数来过滤产生一个新的RDD。 任何原RDD中的元素在新RDD中都有且只有一个元素与之对应。
1 | val rdd =sc.parallelize(List(1,2,3,4,5,6)) |
3) flatMap
与map类似,区别是原RDD中的元素经map处理后只能生成一个元素,而原RDD中的元素经flatmap处理后可生成多个元素来构建新RDD。 举例:对原RDD中的每个元素x产生y个元素(从1到y,y为元素x的值)
1 | scala> val a = sc.parallelize(1to 4, 2) |
4) mapPartitions
mapPartitions是map的一个变种。map的输入函数是应用于RDD中每个元素,而mapPartitions的输入函数是应用于每个分区,也就是把每个分区中的内容作为整体来处理的。 它的函数定义为:
def mapPartitionsU:ClassTag: RDD[U]
f即为输入函数,它处理每个分区里面的内容。每个分区中的内容将以Iterator[T]传递给输入函数f,f的输出结果是Iterator[U]。最终的RDD由所有分区经过输入函数处理后的结果合并起来的。
举例:
1 | scala> val a =sc.parallelize(1 to 9, 3) |
上述例子中的函数myfunc是把分区中一个元素和它的下一个元素组成一个Tuple。因为分区中最后一个元素没有下一个元素了,所以(3,4)和(6,7)不在结果中。 mapPartitions还有些变种,比如mapPartitionsWithContext,它能把处理过程中的一些状态信息传递给用户指定的输入函数。还有mapPartitionsWithIndex,它能把分区的index传递给用户指定的输入函数。
5) mapPartitionsWithIndex
defmapPartitionsWithIndexU => Iterator[U],preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]): RDD[U]
函数作用同mapPartitions,不过提供了两个参数,第一个参数为分区的索引。
1 | var rdd1 =sc.makeRDD(1 to 5,2) |
7) flatMapWith
flatMapWith与mapWith很类似,都是接收两个函数,一个函数把partitionIndex作为输入,输出是一个新类型A;另外一个函数是以二元组(T,A)作为输入,输出为一个序列,这些序列里面的元素组成了新的RDD。它的定义如下:
def flatMapWithA:ClassTag, U: ClassTag(f: (T, A) => Seq[U]): RDD[U]
举例:
1 | scala> val a =sc.parallelize(List(1,2,3,4,5,6,7,8,9), 3) |
8) coalesce
defcoalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord:Ordering[T] = null): RDD[T]
该函数用于将RDD进行重分区,使用HashPartitioner。
第一个参数为重分区的数目,第二个为是否进行shuffle,默认为false;
以下面的例子来看:
1 | scala> var data= sc.parallelize(1 to 12, 3) |
9) repartition
defrepartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
该函数其实就是coalesce函数第二个参数为true的实现
1 | scala> var data= sc.parallelize(1 to 12, 3) |
10) randomSplit
defrandomSplit(weights: Array[Double], seed: Long = Utils.random.nextLong):Array[RDD[T]]
该函数根据weights权重,将一个RDD切分成多个RDD。
该权重参数为一个Double数组
第二个参数为random的种子,基本可忽略。
1 | scala> var rdd= sc.makeRDD(1 to 12,12) |
11) glom
def glom():RDD[Array[T]]
该函数是将RDD中每一个分区中类型为T的元素转换成Array[T],这样每一个分区就只有一个数组元素。
1 | scala> var rdd= sc.makeRDD(1 to 10,3) |
12) union 并集
1 | val rdd1 =sc.parallelize(List(5, 6, 4, 3)) |
13) distinct
去重
val rdd1 =sc.parallelize(List(5, 6, 4, 3))
val rdd2 =sc.parallelize(List(1, 2, 3, 4))
//求并集
val rdd3 =rdd1.union(rdd2)
//去重输出
rdd3.distinct.collect
14) intersection 交集
val rdd1 =sc.parallelize(List(5, 6, 4, 3))
val rdd2 =sc.parallelize(List(1, 2, 3, 4))
//求交集
val rdd4 =rdd1.intersection(rdd2)
rdd4.collect
15) subtract
def subtract(other:RDD[T]): RDD[T]
defsubtract(other: RDD[T], numPartitions: Int): RDD[T]
defsubtract(other: RDD[T], partitioner: Partitioner)(implicit ord: Ordering[T] =null): RDD[T]
该函数返回在RDD中出现,并且不在otherRDD中出现的元素,不去重。
1 | val rdd1 =sc.parallelize(List(5, 6, 6, 4, 3)) |
16) subtractByKey
defsubtractByKeyW])(implicit arg0: ClassTag[W]): RDD[(K, V)]
defsubtractByKeyW], numPartitions: Int)(implicit arg0:ClassTag[W]): RDD[(K, V)]
defsubtractByKeyW], p: Partitioner)(implicit arg0:ClassTag[W]): RDD[(K, V)]
subtractByKey和基本转换操作中的subtract类似,只不过这里是针对K的,返回在主RDD中出现,并且不在otherRDD中出现的元素。
参数numPartitions用于指定结果的分区数
参数partitioner用于指定分区函数
1 | var rdd1 =sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2) |
17) groupbyKey
val rdd1 =sc.parallelize(List((“tom”, 1), (“jerry”, 3),(“kitty”, 2)))
val rdd2 =sc.parallelize(List((“jerry”, 2), (“tom”, 1),(“shuke”, 2)))
//求并集
val rdd4 = rdd1 union rdd2
//按key进行分组
val rdd5 = rdd4.groupByKey
rdd5.collect
18) reduceByKey
顾名思义,reduceByKey就是对元素为KV对的RDD中Key相同的元素的Value进行reduce,因此,Key相同的多个元素的值被reduce为一个值,然后与原RDD中的Key组成一个新的KV对。
举例:
1 | val rdd1 =sc.parallelize(List(("tom", 1), ("jerry", 3),("kitty", 2))) |
19) sortByKey
将List((“tom”, 1), (“jerry”, 3),(“kitty”, 2), (“shuke”, 1))和List((“jerry”, 2),(“tom”, 3), (“shuke”, 2), (“kitty”, 5))做wordcount,并按名称排序
val rdd1 =sc.parallelize(List((“tom”, 1), (“jerry”, 3),(“kitty”, 2), (“shuke”, 1)))
val rdd2 =sc.parallelize(List((“jerry”, 2), (“tom”, 3),(“shuke”, 2), (“kitty”, 5)))
val rdd3 = rdd1.union(rdd2)
//按key进行聚合
val rdd4 = rdd3.reduceByKey( + )
//false降序
val rdd5 = rdd4.sortByKey(false)
rdd5.collect
20) sortBy
将List((“tom”, 1), (“jerry”, 3),(“kitty”, 2), (“shuke”, 1))和List((“jerry”, 2),(“tom”, 3), (“shuke”, 2), (“kitty”, 5))做wordcount,并按数值排序
1 | val rdd1 =sc.parallelize(List(("tom", 1), ("jerry", 3),("kitty", 2), ("shuke", 1))) |
21) zip
def zipU(implicit arg0: ClassTag[U]): RDD[(T, U)]
zip函数用于将两个RDD组合成Key/Value形式的RDD,这里默认两个RDD的partition数量以及元素数量都相同,否则会抛出异常。
1 | scala> var rdd1= sc.makeRDD(1 to 5,2) |
22) zipPartitions
zipPartitions函数将多个RDD按照partition组合成为新的RDD,该函数需要组合的RDD具有相同的分区数,但对于每个分区内的元素数量没有要求。
该函数有好几种实现,可分为三类:
参数是一个**RDD**
def zipPartitionsB, V(f:(Iterator[T], Iterator[B]) => Iterator[V])(implicit arg0: ClassTag[B], arg1:ClassTag[V]): RDD[V]
def zipPartitionsB, V(f: (Iterator[T], Iterator[B]) =>Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[V]): RDD[V]
这两个区别就是参数preservesPartitioning,是否保留父RDD的partitioner分区信息
映射方法f参数为两个RDD的迭代器。
1 | scala> var rdd1 = sc.makeRDD(1 to 5,2) |
//rdd1两个分区中元素分布:
1 | scala> rdd1.mapPartitionsWithIndex{ |
//rdd2两个分区中元素分布
1 | scala> rdd2.mapPartitionsWithIndex{ |
//rdd1和rdd2做zipPartition
1 | scala> rdd1.zipPartitions(rdd2){ |
参数是两个**RDD**
def zipPartitionsB, C, V(f: (Iterator[T], Iterator[B], Iterator[C]) =>Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[V]):RDD[V]
def zipPartitionsB, C, V(f: (Iterator[T], Iterator[B],Iterator[C]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[C],arg2: ClassTag[V]): RDD[V]
用法同上面,只不过该函数参数为两个RDD,映射方法f输入参数为两个RDD的迭代器。
1 | var rdd1 = sc.makeRDD(1 to 5,2) |
//rdd3中个分区元素分布
1 | scala> rdd3.mapPartitionsWithIndex{ |
//三个RDD做zipPartitions
1 | scala> var rdd4 =rdd1.zipPartitions(rdd2,rdd3){ |
参数是三个**RDD**
def zipPartitionsB, C, D, V(f: (Iterator[T], Iterator[B], Iterator[C],Iterator[D]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[C],arg2: ClassTag[D], arg3: ClassTag[V]): RDD[V]
def zipPartitionsB, C, D, V(f: (Iterator[T],Iterator[B], Iterator[C], Iterator[D]) => Iterator[V])(implicit arg0:ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[D], arg3: ClassTag[V]): RDD[V]
用法同上面,只不过这里又多了个一个RDD而已。
23) zipWithIndex
defzipWithIndex(): RDD[(T, Long)]
该函数将RDD中的元素和这个元素在RDD中的ID(索引号)组合成键/值对。
1 | scala> var rdd2=sc.makeRDD(Seq("A","B","R","D","F"),2) |
24) zipWithUniqueId
defzipWithUniqueId(): RDD[(T, Long)]
该函数将RDD中元素和一个唯一ID组合成键/值对,该唯一ID生成算法如下:
每个分区中第一个元素的唯一ID值为:该分区索引号,
每个分区中第N个元素的唯一ID值为:(前一个元素的唯一ID值) + (该RDD总的分区数)
看下面的例子:
1 | scala> var rdd1= sc.makeRDD(Seq("A","B","C","D","E","F"),2) |
键值转换
25) partitionBy
def partitionBy(partitioner: Partitioner):RDD[(K, V)]
该函数根据partitioner函数生成新的ShuffleRDD,将原RDD重新分区。
1 | scala> var rdd1 =sc.makeRDD(Array((1,"A"),(2,"B"),(3,"C"),(4,"D")),2) |
//查看rdd1中每个分区的元素
1 | scala> rdd1.mapPartitionsWithIndex{ |
//使用partitionBy重分区
1 | scala> var rdd2 = rdd1.partitionBy(neworg.apache.spark.HashPartitioner(2)) |
//查看rdd2中每个分区的元素
1 | scala> rdd2.mapPartitionsWithIndex{ |
26) mapValues
mapValues顾名思义就是输入函数应用于RDD中Kev-Value的Value,原RDD中的Key保持不变,与新的Value一起组成新的RDD中的元素。因此,该函数只适用于元素为KV对的RDD。
举例:
1 | val a =sc.parallelize(List("dog", "tiger", "lion","cat", "panther", " eagle"), 2) |
27) flatMapValues
flatMapValues类似于mapValues,不同的在于flatMapValues应用于元素为KV对的RDD中Value。每个一元素的Value被输入函数映射为一系列的值,然后这些值再与原RDD中的Key组成一系列新的KV对。
举例
1 | val a = sc.parallelize(List((1, 2), (3, 4),(5, 6))) |
28) combineByKey
def combineByKeyC=> C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K,C)]
def combineByKeyC=> C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C,numPartitions: Int): RDD[(K, C)]
def combineByKeyC=> C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C,partitioner: Partitioner, mapSideCombine: Boolean = true, serializer:Serializer = null): RDD[(K, C)]
该函数用于将RDD[K,V]转换成RDD[K,C],这里的V类型和C类型可以相同也可以不同。
其中的参数:
createCombiner:组合器函数,用于将V类型转换成C类型,输入参数为RDD[K,V]中的V,输出为C ,分区内相同的key做一次
mergeValue:合并值函数,将一个C类型和一个V类型值合并成一个C类型,输入参数为(C,V),输出为C,分区内相同的key循环做
mergeCombiners:分区合并组合器函数,用于将两个C类型值合并成一个C类型,输入参数为(C,C),输出为C,分区之间循环做
numPartitions:结果RDD分区数,默认保持原有的分区数
partitioner:分区函数,默认为HashPartitioner
mapSideCombine:是否需要在Map端进行combine操作,类似于MapReduce中的combine,默认为true
看下面例子:
1 | scala> var rdd1 = sc.makeRDD(Array(("A",1),("A",2),("B",1),("B",2),("C",1))) |
其中三个映射函数分别为:
createCombiner: (V) => C
(v : Int) => v + “” //在每一个V值后面加上字符,返回C类型(String)
mergeValue: (C, V) => C
(c : String, v : Int) => c + “@” + v //合并C类型和V类型,中间加字符@,返回C(String)
mergeCombiners: (C, C) => C
(c1 : String, c2 : String) => c1 + “$” + c2 //合并C类型和C类型,中间加$,返回C(String)
其他参数为默认值。
最终,将RDD[String,Int]转换为RDD[String,String]。
再看例子:
1 | rdd1.combineByKey( |
最终将RDD[String,Int]转换为RDD[String,List[Int]]。
29) foldByKey
def foldByKey(zeroValue: V)(func: (V, V)=> V): RDD[(K, V)]
def foldByKey(zeroValue: V, numPartitions:Int)(func: (V, V) => V): RDD[(K, V)]
def foldByKey(zeroValue: V, partitioner:Partitioner)(func: (V, V) => V): RDD[(K, V)]
该函数用于RDD[K,V]根据K将V做折叠、合并处理,其中的参数zeroValue表示先根据映射函数将zeroValue应用于V,进行初始化V,再将映射函数应用于初始化后的V.
例子:
1 | scala> var rdd1 =sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1))) |
再看:
1 | scala> rdd1.foldByKey(2)(+).collect |
再看乘法操作:
1 | scala> rdd1.foldByKey(0)(*).collect |
30) reduceByKeyLocally
def reduceByKeyLocally(func: (V, V) =>V): Map[K, V]
该函数将RDD[K,V]中每个K对应的V值根据映射函数来运算,运算结果映射到一个Map[K,V]中,而不是RDD[K,V]。
1 | scala> var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1))) |
31) cogroup 和 groupByKey 的区别
1 | valrdd1 = sc.parallelize(List(("tom", 1), ("tom", 2),("jerry", 3), ("kitty", 2))) |
32) join
1 | val rdd1 =sc.parallelize(List(("tom", 1), ("jerry", 3),("kitty", 2))) |
33) leftOuterJoin
def leftOuterJoinW]):RDD[(K, (V, Option[W]))]
def leftOuterJoinW],numPartitions: Int): RDD[(K, (V, Option[W]))]
def leftOuterJoinW],partitioner: Partitioner): RDD[(K, (V, Option[W]))]
leftOuterJoin类似于SQL中的左外关联left outer join,返回结果以前面的RDD为主,关联不上的记录为空。只能用于两个RDD之间的关联,如果要多个RDD关联,多关联几次即可。
参数numPartitions用于指定结果的分区数
参数partitioner用于指定分区函数
1 | var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2) |
34) rightOuterJoin
def rightOuterJoinW]):RDD[(K, (Option[V], W))]
def rightOuterJoinW],numPartitions: Int): RDD[(K, (Option[V], W))]
def rightOuterJoinW],partitioner: Partitioner): RDD[(K, (Option[V], W))]
rightOuterJoin类似于SQL中的有外关联right outer join,返回结果以参数中的RDD为主,关联不上的记录为空。只能用于两个RDD之间的关联,如果要多个RDD关联,多关联几次即可。
参数numPartitions用于指定结果的分区数
参数partitioner用于指定分区函数
1 | var rdd1 =sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2) |
Action 操作
35) first
def first(): T
first返回RDD中的第一个元素,不排序。
1 | scala> var rdd1 =sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2) |
36) count
def count(): Long
count返回RDD中的元素数量。
1 | scala> var rdd1 =sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2) |
37) reduce
def reduce(f: (T, T) ⇒ T): T
根据映射函数f,对RDD中的元素进行二元计算,返回计算结果。
1 | scala> var rdd1 = sc.makeRDD(1 to 10,2) |
collect
def collect(): Array[T]
collect用于将一个RDD转换成数组。
1 | scala> var rdd1 = sc.makeRDD(1 to 10,2) |
38) take
def take(num: Int): Array[T]
take用于获取RDD中从0到num-1下标的元素,不排序。
1 | scala> var rdd1 = sc.makeRDD(Seq(10, 4,2, 12, 3)) |
39) top
def top(num: Int)(implicit ord:Ordering[T]): Array[T]
top函数用于从RDD中,按照默认(降序)或者指定的排序规则,返回前num个元素。
1 | scala> var rdd1 = sc.makeRDD(Seq(10, 4,2, 12, 3)) |
40) takeOrdered
def takeOrdered(num: Int)(implicit ord:Ordering[T]): Array[T]
takeOrdered和top类似,只不过以和top相反的顺序返回元素。
1 | scala> var rdd1 = sc.makeRDD(Seq(10, 4,2, 12, 3)) |
41) aggregate
def aggregateU(seqOp: (U,T) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): U
aggregate用户聚合RDD中的元素,先使用seqOp将RDD中每个分区中的T类型元素聚合成U类型,再使用combOp将之前每个分区聚合后的U类型聚合成U类型,特别注意seqOp和combOp都会使用zeroValue的值,zeroValue的类型为U。
1 | var rdd1 = sc.makeRDD(1 to 10,2) |
##第一个分区中包含5,4,3,2,1
##第二个分区中包含10,9,8,7,6
1 | scala> rdd1.aggregate(1)( |
结果为什么是58,看下面的计算过程:
##先在每个分区中迭代执行 (x : Int,y : Int) => x + y 并且使用zeroValue的值1
##即:part_0中 zeroValue+5+4+3+2+1 =1+5+4+3+2+1 = 16
## part_1中 zeroValue+10+9+8+7+6 = 1+10+9+8+7+6 = 41
##再将两个分区的结果合并(a : Int,b : Int) => a + b ,并且使用zeroValue的值1
##即:zeroValue+part_0+part_1 = 1 + 16 + 41 = 58
再比如:
1 | scala> rdd1.aggregate(2)( |
##这次zeroValue=2
##part_0中 zeroValue+5+4+3+2+1 = 2+5+4+3+2+1 = 17
##part_1中 zeroValue+10+9+8+7+6 = 2+10+9+8+7+6 = 42
##最后:zeroValuepart_0part_1 = 2 17 42 = 1428
因此,zeroValue即确定了U的类型,也会对结果产生至关重要的影响,使用时候要特别注意。
42) fold
def fold(zeroValue: T)(op: (T, T) ⇒ T): T
fold是aggregate的简化,将aggregate中的seqOp和combOp使用同一个函数op。
1 | var rdd1 = sc.makeRDD(1 to 10, 2) |
##结果同上面使用aggregate的第一个例子一样,即:
1 | scala> rdd1.aggregate(1)( |
43) lookup
def lookup(key: K): Seq[V]
lookup用于(K,V)类型的RDD,指定K值,返回RDD中该K对应的所有V值。
1 | scala> var rdd1 =sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1))) |
44) countByKey
def countByKey(): Map[K, Long]
countByKey用于统计RDD[K,V]中每个K的数量。
1 | scala> var rdd1 =sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("B",3))) |
45) foreach
def foreach(f: (T) ⇒ Unit): Unit
foreach用于遍历RDD,将函数f应用于每一个元素。
但要注意,如果对RDD执行foreach,只会在Executor端有效,而并不是Driver端。
比如:rdd.foreach(println),只会在Executor的stdout中打印出来,Driver端是看不到的。
我在Spark1.4中是这样,不知道是否真如此。
这时候,使用accumulator共享变量与foreach结合,倒是个不错的选择。
1 | scala> var cnt = sc.accumulator(0) |
46) foreachPartition
def foreachPartition(f: (Iterator[T]) ⇒ Unit): Unit
foreachPartition和foreach类似,只不过是对每一个分区使用f。
1 | scala> var rdd1 = sc.makeRDD(1 to 10,2) |
47) sortBy
def sortByK ⇒ K, ascending: Boolean = true, numPartitions: Int =this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
sortBy根据给定的排序k函数将RDD中的元素进行排序。
1 | scala> var rdd1 =sc.makeRDD(Seq(3,6,7,1,2,0),2) |
48) saveAsTextFile
def saveAsTextFile(path: String): Unit
def saveAsTextFile(path: String, codec:Class[_ <: CompressionCodec]): Unit
saveAsTextFile用于将RDD以文本文件的格式存储到文件系统中。
codec参数可以指定压缩的类名。
var rdd1 = sc.makeRDD(1 to 10,2)
scala>rdd1.saveAsTextFile(“hdfs://cdh5/tmp/lxw1234.com/“) //保存到HDFS
hadoop fs -ls /tmp/lxw1234.com
Found 2 items
-rw-r–r– 2 lxw1234 supergroup 02015-07-10 09:15 /tmp/lxw1234.com/_SUCCESS
-rw-r–r– 2 lxw1234 supergroup 21 2015-07-1009:15 /tmp/lxw1234.com/part-00000
hadoop fs -cat /tmp/lxw1234.com/part-00000
注意:如果使用rdd1.saveAsTextFile(“file:///tmp/lxw1234.com”)将文件保存到本地文件系统,那么只会保存在Executor所在机器的本地目录。
//指定压缩格式保存
rdd1.saveAsTextFile(“hdfs://cdh5/tmp/lxw1234.com/“,classOf[com.hadoop.compression.lzo.LzopCodec])
hadoop fs -ls /tmp/lxw1234.com
-rw-r–r– 2 lxw1234 supergroup 02015-07-10 09:20 /tmp/lxw1234.com/_SUCCESS
-rw-r–r– 2 lxw1234 supergroup 712015-07-10 09:20 /tmp/lxw1234.com/part-00000.lzo
hadoop fs -text/tmp/lxw1234.com/part-00000.lzo
49) saveAsSequenceFile
saveAsSequenceFile用于将RDD以SequenceFile的文件格式保存到HDFS上。
用法同saveAsTextFile。
50) saveAsObjectFile
def saveAsObjectFile(path: String): Unit
saveAsObjectFile用于将RDD中的元素序列化成对象,存储到文件中。
对于HDFS,默认采用SequenceFile保存。
var rdd1 = sc.makeRDD(1 to 10,2)
rdd1.saveAsObjectFile(“hdfs://cdh5/tmp/lxw1234.com/“)
hadoop fs -cat /tmp/lxw1234.com/part-00000
SEQ!org.apache.hadoop.io.NullWritable”org.apache.hadoop.io.BytesWritableT
51) saveAsHadoopFile
def saveAsHadoopFile(path: String,keyClass: Class[], valueClass: Class[], outputFormatClass: Class[ <:OutputFormat[, ]], codec: Class[ <: CompressionCodec]): Unit
def saveAsHadoopFile(path: String,keyClass: Class[], valueClass: Class[], outputFormatClass: Class[ <:OutputFormat[, ]], conf: JobConf = …, codec: Option[Class[ <:CompressionCodec]] = None): Unit
saveAsHadoopFile是将RDD存储在HDFS上的文件中,支持老版本Hadoop API。
可以指定outputKeyClass、outputValueClass以及压缩格式。
每个分区输出一个文件。
var rdd1 = sc.makeRDD(Array((“A”,2),(“A”,1),(“B”,6),(“B”,3),(“B”,7)))
importorg.apache.hadoop.mapred.TextOutputFormat
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.IntWritable
rdd1.saveAsHadoopFile(“/tmp/lxw1234.com/“,classOf[Text],classOf[IntWritable],classOf[TextOutputFormat[Text,IntWritable]])
rdd1.saveAsHadoopFile(“/tmp/lxw1234.com/“,classOf[Text],classOf[IntWritable],classOf[TextOutputFormat[Text,IntWritable]],
classOf[com.hadoop.compression.lzo.LzopCodec])
52) saveAsHadoopDataset
def saveAsHadoopDataset(conf: JobConf):Unit
saveAsHadoopDataset用于将RDD保存到除了HDFS的其他存储中,比如HBase。
在JobConf中,通常需要关注或者设置五个参数:
文件的保存路径、key值的class类型、value值的class类型、RDD的输出格式(OutputFormat)、以及压缩相关的参数。
##使用saveAsHadoopDataset将RDD保存到HDFS中
1 | import org.apache.spark.SparkConf |
##保存数据到HBASE
HBase建表:
create ‘lxw1234′,{NAME => ‘f1′,VERSIONS =>1},{NAME => ‘f2′,VERSIONS => 1},{NAME => ‘f3′,VERSIONS => 1}
1 | import org.apache.spark.SparkConf |
##结果:
hbase(main):005:0> scan ‘lxw1234’
ROW COLUMN+CELL
A column=f1:c1, timestamp=1436504941187, value=\x00\x00\x00\x02
B column=f1:c1, timestamp=1436504941187, value=\x00\x00\x00\x06
C column=f1:c1, timestamp=1436504941187, value=\x00\x00\x00\x07
3 row(s) in 0.0550 seconds
注意:保存到HBase,运行时候需要在SPARK_CLASSPATH中加入HBase相关的jar包。
53) saveAsNewAPIHadoopFile
def saveAsNewAPIHadoopFileF <:OutputFormat[K, V](implicit fm: ClassTag[F]): Unit
def saveAsNewAPIHadoopFile(path: String,keyClass: Class[], valueClass: Class[], outputFormatClass: Class[ <:OutputFormat[, _]], conf: Configuration = self.context.hadoopConfiguration):Unit
saveAsNewAPIHadoopFile用于将RDD数据保存到HDFS上,使用新版本Hadoop API。
用法基本同saveAsHadoopFile。
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import SparkContext._
importorg.apache.hadoop.mapreduce.lib.output.TextOutputFormat
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.IntWritable
var rdd1 =sc.makeRDD(Array((“A”,2),(“A”,1),(“B”,6),(“B”,3),(“B”,7)))
rdd1.saveAsNewAPIHadoopFile(“/tmp/lxw1234/“,classOf[Text],classOf[IntWritable],classOf[TextOutputFormat[Text,IntWritable]])
54) saveAsNewAPIHadoopDataset
def saveAsNewAPIHadoopDataset(conf:Configuration): Unit
作用同saveAsHadoopDataset,只不过采用新版本Hadoop API。
以写入HBase为例:
HBase建表:
1 | create ‘lxw1234′,{NAME => ‘f1′,VERSIONS =>1},{NAME => ‘f2′,VERSIONS => 1},{NAME => ‘f3′,VERSIONS => 1} |
完整的Spark应用程序:
1 | import org.apache.spark.SparkConf |
注意:保存到HBase,运行时候需要在SPARK_CLASSPATH中加入HBase相关的jar包。