Spark的RDD基础操作

2016/11/17 posted in  Spark

简单算子

  • 1.构造一些数据
  • 2.用map算子进行一些操作
  • 3.用mapPartitions算子进行操作
  • 4.调用zipwithpartition算子标记分区
  • 5.调用foreachPartiton进行输出
  • 6.调用glom算子收集每个分区的数据
  • 7.调用flodleft函数处理Array
    val conf = new SparkConf().setAppName("SimpleRDD").setMaster("local[*]")
    val sc = new SparkContext(conf)
    /**
      * 1.构造一些数据
      * 2.用map算子进行一些操作
      * 3.用mapPartitions算子进行操作
      * 4.调用zipwithpartition算子标记分区
      * 5.调用foreachPartiton进行输出
      * 6.调用glom算子收集每个分区的数据
      * 7.调用flodleft函数处理Array
      */
    def printRDD(rdd:RDD[_])={
      val str=rdd.collect().mkString(" , ")
      println(str)
    }
    val numberRDD=sc.parallelize(1 until 10,2)
    println("查看原始的RDD中的每一个元素:")
//    numberRDD.foreach(println)
//    printRDD(numberRDD)
    val numberRDD10=numberRDD.map(_*10)
    printRDD(numberRDD10)
    val numberRDDLess=numberRDD10.mapPartitions(iter=> {
      val arr=new ArrayBuffer[Double]
      while (iter.hasNext) {
        val value=iter.next().toDouble/10
          arr+=value
      }
      arr.iterator
    })
    printRDD(numberRDDLess)

    // glom 是把每个分区里面的元素放到一个list里面
    val partitions=numberRDDLess.glom()
    println("构造数据的时候创建了 "+numberRDDLess.partitions.size+" 个分区")
    println("目前经过glom操作后,还有: "+partitions.count()+" 个分区,只是每个分区的数据存到到一个list中去了")
    partitions.foreach(arr=>{
      println("分区数据:"+arr.mkString(" , "))
    })

    val numbers=partitions.zipWithIndex()
    numbers.foreach{
      case (arr,v)=>
        println("第 "+ v+"个分区数据: "+arr.mkString(" , "))
    }

    partitions.foreach(arr=>{
      println("分区内容: "+
      arr.foldLeft("")((x,y)=>x+" "+y))
    })

Spark懒加载实践

通过例子说明,spark的确是当执行了一个action算子,才会生成一个job.

    val conf = new SparkConf().setAppName("Computations").setMaster("local[*]")
    val sc = new SparkContext(conf)

    // 构造一个数据集
    val numbers = sc.parallelize(1 to 10, 4)
    val bigger = numbers.map(n => n * 100)
    val biggerStill = bigger.map(n => n + 1)

    println("调用 toDebugString 算子去查看经过几次转换后,依赖关系是什么样:")
    println(biggerStill.toDebugString)

    //    进行一次reduce操作
    val s = biggerStill.reduce(_ + _)

    println("sum = " + s)

    println("numbersRDD的id = " + numbers.id)
    println("biggerRDD的id = " + bigger.id)
    println("biggerStillRDD的id = " + biggerStill.id)
    println("查看biggerStill RDD 依赖继承关系: ")
    showDependency(biggerStill)

    val moreNumbers = bigger ++ biggerStill
    println("moreNumbers的依赖继承关系: ")
    println(moreNumbers.toDebugString)
    println("moreNumbers: id=" + moreNumbers.id)
    showDependency(moreNumbers)

    moreNumbers.cache()
    // cache操作可能会丢失数据,而且并没有发生依赖的变化
    println("cached moreNumbers的依赖继承关系(并没有变化): ")
    println(moreNumbers.toDebugString)
    println("执行 cache 操作后,moreNumbers的依赖继承关系: ")

    showDependency(moreNumbers)

    println("检查一下moreNumbers有没有设置检查点? : " + moreNumbers.isCheckpointed)
    sc.setCheckpointDir("/tmp/sparkcps")
    moreNumbers.checkpoint()
    println("现在执行了checkpoint 检查一下moreNumbers有没有设置检查点? : " + moreNumbers.isCheckpointed)
    moreNumbers.count()
    println("现在执行了一个count操作 检查一下moreNumbers有没有设置检查点? : " + moreNumbers.isCheckpointed)
    println(moreNumbers.toDebugString)
    println("做了以上操作后,moreNumbers的依赖继承关系: ")
    showDependency(moreNumbers)

    println("这里不应该抛异常...")
    println("因为spark是懒加载,只有遇到action算子的时候,才会开始生成job开始调度计算....")
    val thisWillBlowUp = numbers map {
      case (7) => {
        throw new Exception
      }
      case (n) => n
    }

    println("异常应该在这里抛出来...")
    try {
      println(thisWillBlowUp.count())
    } catch {
      case (e: Exception) => println("Nice,果真在这里抛异常了...")
    }

  }

  // 利用递归函数来输出rdd的依赖继承关系
  def showDependency[T](rdd: RDD[T]): Unit = {
    showDependency(rdd, 0)
  }

  private def showDependency[T](rdd: RDD[T], length: Int): Unit = {
    println("".padTo(length, ' ') + "RDD id= " + rdd.id)
    rdd.dependencies.foreach(dep => {
      showDependency(dep.rdd, length + 1)
    })
  }