网站建设资讯

NEWS

网站建设资讯

Spark的transformation和action的操作学习笔记-创新互联

一、spark的transformation 和 action区别

创新互联IDC提供业务:绵阳主机托管,成都服务器租用,绵阳主机托管,重庆服务器租用等四川省内主机托管与主机租用业务;数据中心含:双线机房,BGP机房,电信机房,移动机房,联通机房。

Spark有一些基本的transformation 和 action的操作,其中transformation形成各类型的RDD,action不形成RDD,而是对RDD进行累加、合并、保存操作。

Spark的transformation 和 action的操作学习笔记

二、transformation 有哪些

transformation有map、filter、flatMap(与map不一样)、Sample、groupByKey、ReduceByKey、Union、Join、cogroup、crossProduct、mapValues、sort、partitionBy,共13种。还有sortByKey呢?

1、map:

val rdd = sc.parallelize(List(1,2,3,4,5,6))

val mapRdd = rdd.map(_*2)  //这是典型的函数式编程

mapRdd.collect()  //上面的map是transformation,到了这里的collect才开始执行,是action,返回一个Array   Array(2,4,6,8,10,12)

map(x=>(x,1)),将map(x)这样的,映射成map(x,1)这样的,一般用于对Key进行计数

2、filter

过滤,选择函数,

val filterRdd = mapRdd.filter(_ > 5)

filterRdd.collect() //返回所有大于5的数据的一个Array, Array(6,8,10,12)

3、flatmap加上reduceBykey

val wordcount = rdd.flatMap(_.split(' ')).map((_, 1)).reduceByKey(_+_)  //把每一行进行根据空格分割,然后flatMap会把多个list合并成一个list,最后把每个元素变成一个元组

//然后把具有相同key的元素的value进行相加操作,参考上面图片中的函数定义,针对reduceByKey,传入的函数是对value进行操作的。

wordcount.saveAsTextFile("/xxx/ss/aa")  //把结果存入文件系统

wordcount.collect //可以得到一个数组

4、groupByKey

对文件按照空格进行分割后,按照单词进行groupByKey分组

val wordcount=rdd.flatMap(_.split(' ')).map(_.1)).groupByKey

使用collect查看一下结果

wordcount.collect

5、Union

2个合并成1个

val rdd1 = sc.parallelize(List(('a',1),(‘a’, 2)))

val rdd2 = sc.parallelize(List(('b',1),(‘b’, 2)))

val result_union = rdd1 unionrdd2 //结果是把两个list合并成一个,List(('a',1),(‘a’, 2),('b',1),(‘b’, 2))

6、Join

笛卡尔积的干活,小组循环赛

val rdd1 = sc.parallelize(List(('a',1),(‘a’, 2), ('b', 3)))

val rdd2 = sc.parallelize(List(('a',4),(‘b’, 5)))

val result_union = rdd1 joinrdd2 //结果是把两个list做笛卡尔积,Array(('a', (1,4), ('a', (2,4), ('b', (3, 5)))

7、sortByKey

排序,非常好用的哈

val wordcount = rdd.flatMap(_split(' ')).map(_,1).reduceByKey(_+_).map(x => (x._2, x._1)).sortByKey(false).map(x => (x._2, x._1))

//其实完成了一个sort by value的过程, sortByKey(false),表示倒序排列

三、action有哪些

action有count、collect、reduce、lookup、save5种。

1、count

计算rdd的个数

val rdd = sc.textFile("/xxx/sss/ee")

rdd.count //计算行数

rdd.cache  //可以把rdd保留在内存里面

rdd.count //计算行数,但是因为上面进行了cache,这里速度会很快

2、collect

collect函数可以提取出所有rdd里的数据项

val rdd1=sc.parallelize(List(('a',1),('b',1)))

val rdd2=sc.parallelize(List(('c',1),('d',1)))

val result=rdd1 union rdd2

使用collect操作查看一下执行结果

3、reduce

map、reduce是hadoop的2个核心,map是映射,reduce是精简

val rdd = sc.parallelize(List(1,2,3,4))

rdd.reduce(_+_) //reduce是一个action,这里的结果是10

4、lookup

查找的干活

val rdd = sc.parallelize(List(('a',1),(‘a’, 2),('b',1),(‘b’, 2))

rdd.lookup("a") //返回一个seq, (1, 2) 是把a对应的所有元素的value提出来组成一个seq

5、save

查询搜索结果排名第 1 点击次序排在第 2 的数据

val rdd1 = sc.textFile("hdfs://192.168.0.10:9000/input/SogouQ2.txt").map(_.split("\t"))  //长度为6错误,好像日志不标准,有的为6,有的不是  .filter(_.length==6)

rdd1.count()

val rdd2=rdd1.filter(_(3).toInt==1).filter(_(4).toInt==2).count()

rdd2.saveAsTextFile("hdfs://192.168.0.10:9000/output/sogou1111/")

另外有需要云服务器可以了解下创新互联scvps.cn,海内外云服务器15元起步,三天无理由+7*72小时售后在线,公司持有idc许可证,提供“云服务器、裸金属服务器、高防服务器、香港服务器、美国服务器、虚拟主机、免备案服务器”等云主机租用服务以及企业上云的综合解决方案,具有“安全稳定、简单易用、服务可用性高、性价比高”等特点与优势,专为企业上云打造定制,能够满足用户丰富、多元化的应用场景需求。


当前文章:Spark的transformation和action的操作学习笔记-创新互联
本文来源:http://cdweb.net/article/giddi.html