本篇内容介绍了“如何通过map操作看RDD的Map过程”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
成都创新互联服务项目包括永济网站建设、永济网站制作、永济网页制作以及永济网络营销策划等。多年来,我们专注于互联网行业,利用自身积累的技术优势、行业经验、深度合作伙伴关系等,向广大中小型企业、政府机构等提供互联网行业的解决方案,永济网站推广取得了明显的社会效益与经济效益。目前,我们服务的客户以成都为中心已经辐射到永济省份的部分城市,未来相信会继续扩大服务区域并继续获得客户的支持与信任!
RDD中的map,flatMap等操作是怎么串在一起形成DAG图的呢?这是个很重要的问题,理解了这一点才能更好的理解Spark的内核实现。本文通过map过程来试图解释这一点。
先看看RDD的一个子类:MapPartitionsRDD,它会用在map函数场景下。
它的定义:
private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag]( var prev: RDD[T], f: (TaskContext, Int, Iterator[T]) => Iterator[U], // (TaskContext, partition index, iterator) preservesPartitioning: Boolean = false, isFromBarrier: Boolean = false, isOrderSensitive: Boolean = false) extends RDD[U](prev)
prev是父RDD,就是父类RDD的入参,在后面的代码里就是firstParent。
F代表了map函数的定义,其中第二个Int参数是分区索引号。我们先不管这个f入参怎么传进来的,先看看MapPartitionsRDD需要做哪些事。
前面说过,对于RDD来说,最重要的函数就是compute,MapPartitionsRDD的compute方法定义:
override def compute(split: Partition, context: TaskContext): Iterator[U] = f(context, split.index, firstParent[T].iterator(split, context))
很明确,就是用当前的solit分区来执行入参的f函数!
那么,这个MapPartitionsRDD是怎么产生的呢?原来是在RDD类中的map函数产生的:
def map[U: ClassTag](f: T => U): RDD[U] = withScope { val cleanF = sc.clean(f) new MapPartitionsRDD[U, T](this, (_, _, iter) => iter.map(cleanF)) }
这几行代码什么意思?这里还是需要好好分析一下的。
对照MapPartitionsRDD的定义,我们知道:
(_, _, iter) => iter.map(cleanF)
里面的_,_代表TaskContext和分区索引,因为在MapPartitionsRDD的compute方法中已经有了split入参和context入参,所以在RDD中就不需要传这两个参数了。
iter代表要处理的数据集,在MapPartitionsRDD中的compute方法中定义为:
firstParent[T].iterator(split, context)
函数就是第一个父类RDD的split分区的数据集。这里就很清楚了,对这个数据集做cleanF操作(也就是sc.clean之后的map函数,sc.clean是去掉不能序列号的字节码的意思,保证可以序列化后分发到其他节点执行)。
“如何通过map操作看RDD的Map过程”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注创新互联网站,小编将为大家输出更多高质量的实用文章!