Spark的RDD基础操作

简单算子

  • 1.构造一些数据
  • 2.用map算子进行一些操作
  • 3.用mapPartitions算子进行操作
  • 4.调用zipwithpartition算子标记分区
  • 5.调用foreachPartiton进行输出
  • 6.调用glom算子收集每个分区的数据
  • 7.调用flodleft函数处理Array
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    val conf = new SparkConf().setAppName("SimpleRDD").setMaster("local[*]")
    val sc = new SparkContext(conf)
    /**
    * 1.构造一些数据
    * 2.用map算子进行一些操作
    * 3.用mapPartitions算子进行操作
    * 4.调用zipwithpartition算子标记分区
    * 5.调用foreachPartiton进行输出
    * 6.调用glom算子收集每个分区的数据
    * 7.调用flodleft函数处理Array
    */
    def printRDD(rdd:RDD[_])={
    val str=rdd.collect().mkString(" , ")
    println(str)
    }
    val numberRDD=sc.parallelize(1 until 10,2)
    println("查看原始的RDD中的每一个元素:")
    // numberRDD.foreach(println)
    // printRDD(numberRDD)
    val numberRDD10=numberRDD.map(_*10)
    printRDD(numberRDD10)
    val numberRDDLess=numberRDD10.mapPartitions(iter=> {
    val arr=new ArrayBuffer[Double]
    while (iter.hasNext) {
    val value=iter.next().toDouble/10
    arr+=value
    }
    arr.iterator
    })
    printRDD(numberRDDLess)
    // glom 是把每个分区里面的元素放到一个list里面
    val partitions=numberRDDLess.glom()
    println("构造数据的时候创建了 "+numberRDDLess.partitions.size+" 个分区")
    println("目前经过glom操作后,还有: "+partitions.count()+" 个分区,只是每个分区的数据存到到一个list中去了")
    partitions.foreach(arr=>{
    println("分区数据:"+arr.mkString(" , "))
    })
    val numbers=partitions.zipWithIndex()
    numbers.foreach{
    case (arr,v)=>
    println("第 "+ v+"个分区数据: "+arr.mkString(" , "))
    }
    partitions.foreach(arr=>{
    println("分区内容: "+
    arr.foldLeft("")((x,y)=>x+" "+y))
    })

Spark懒加载实践

通过例子说明,spark的确是当执行了一个action算子,才会生成一个job.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
val conf = new SparkConf().setAppName("Computations").setMaster("local[*]")
val sc = new SparkContext(conf)
// 构造一个数据集
val numbers = sc.parallelize(1 to 10, 4)
val bigger = numbers.map(n => n * 100)
val biggerStill = bigger.map(n => n + 1)
println("调用 toDebugString 算子去查看经过几次转换后,依赖关系是什么样:")
println(biggerStill.toDebugString)
// 进行一次reduce操作
val s = biggerStill.reduce(_ + _)
println("sum = " + s)
println("numbersRDD的id = " + numbers.id)
println("biggerRDD的id = " + bigger.id)
println("biggerStillRDD的id = " + biggerStill.id)
println("查看biggerStill RDD 依赖继承关系: ")
showDependency(biggerStill)
val moreNumbers = bigger ++ biggerStill
println("moreNumbers的依赖继承关系: ")
println(moreNumbers.toDebugString)
println("moreNumbers: id=" + moreNumbers.id)
showDependency(moreNumbers)
moreNumbers.cache()
// cache操作可能会丢失数据,而且并没有发生依赖的变化
println("cached moreNumbers的依赖继承关系(并没有变化): ")
println(moreNumbers.toDebugString)
println("执行 cache 操作后,moreNumbers的依赖继承关系: ")
showDependency(moreNumbers)
println("检查一下moreNumbers有没有设置检查点? : " + moreNumbers.isCheckpointed)
sc.setCheckpointDir("/tmp/sparkcps")
moreNumbers.checkpoint()
println("现在执行了checkpoint 检查一下moreNumbers有没有设置检查点? : " + moreNumbers.isCheckpointed)
moreNumbers.count()
println("现在执行了一个count操作 检查一下moreNumbers有没有设置检查点? : " + moreNumbers.isCheckpointed)
println(moreNumbers.toDebugString)
println("做了以上操作后,moreNumbers的依赖继承关系: ")
showDependency(moreNumbers)
println("这里不应该抛异常...")
println("因为spark是懒加载,只有遇到action算子的时候,才会开始生成job开始调度计算....")
val thisWillBlowUp = numbers map {
case (7) => {
throw new Exception
}
case (n) => n
}
println("异常应该在这里抛出来...")
try {
println(thisWillBlowUp.count())
} catch {
case (e: Exception) => println("Nice,果真在这里抛异常了...")
}
}
// 利用递归函数来输出rdd的依赖继承关系
def showDependency[T](rdd: RDD[T]): Unit = {
showDependency(rdd, 0)
}
private def showDependency[T](rdd: RDD[T], length: Int): Unit = {
println("".padTo(length, ' ') + "RDD id= " + rdd.id)
rdd.dependencies.foreach(dep => {
showDependency(dep.rdd, length + 1)
})
}

#

Donate comment here