在写下这个标题的时候已经下午了,看来只有半天时间了(汗
初衷以及Spark简介
最近找工作,发现很多公司都要求Hadoop、Spark,我平时也没有啥这方面的需求所以没学,而且这个东西真正用起来需要分布式集群。
于是买了本书《Spark快速大数据分析》,挺薄的,而且是三种语言写的(java,scala,python),如果只看Python,目测一天就能学完核心概念。
配置环境
学习一门新技术,环境配置估计就能把一堆人卡死在门外,不过这也意味着环境配好你也就离成功不远了,想想就有点小激动。
下载Spark
下载地址
https://spark.apache.org/downloads.html
书上是spark 1.2,包类型为hadoop 2.4
不过最新版是spark 2.2,包类型为hadoop 2.7
于是就下了最新版,一般来说,不按照教程的版本会出现各种问题
仔细看了书上的配置教程,好像也没做啥事,也就解压运行bin\pyspark
,然后正常用。
我照着做果然报错了,这和书上说的不一样。
原来还需要Hadoop
这就尴尬了,书上也没说咋配,于是参考了这篇,还是很简单的过程。
新建环境变量SPARK_HOME值为spark根目录
新建环境变量HADOOP_HOME值为hadoop根目录
spark下的bin目录和hadoop下的bin目录添加到系统变量path里
然后运行pyspark
果然还是报错了
参考了这篇解决了
原来是缺少winutils.exe
这个文件,应该是hadoop\bin
目录下的,需要到https://github.com/steveloughran/winutils 下载
然后执行命令来修改权限1
winutils chmod 777 /tmp/hive
终于解决了
直接执行命令pyspark
再顺带跑个书上的例子1
2
3lines = sc.textFile('README.md') #创建一个名为lines的RDD
lines.count() #统计RDD中的元素个数
lines.first() #这个RDD的第一个元素,也就是这个文件的第一行
顺带说下,可以直接在正常的python ide里面运行1
2
3
4
5from pyspark import SparkContext
sc = SparkContext('local')
lines = sc.textFile('README.md') #创建一个名为lines的RDD
lines.count() #统计RDD中的元素个数
lines.first() #这个RDD的第一个元素,也就是这个文件的第一行
RDD是个啥
弹性分布式数据集(Resilient Distributed Datasets)就是一种对数据的抽象,你不用去管底层数据是咋存放的,也不用管那些操作放到底层咋实现的,Spark会自动帮你做好一切,RDD就是Spark的核心。
RDD三种常见操作分为创建、转化以及行动。
创建操作
有两种创建方法,一种是textFile,也就是从本地文件读取,还有一种是parallelize,将传入的集合并行化。
textFile上面已经有了,下面是parallelize的代码。1
2
3from pyspark import SparkContext
sc = SparkContext('local')
lines = sc.parallelize(['hello','world','!']) #传入了一个list
这就是创建好了,暂时不知道啥用,接着往下看。
转化操作
转化操作,一般有map,filter,union等操作,也就是会返回一个RDD,不会产生实际计算,也就是惰性求值。
接着上面的parallelize继续写一个转化操作的例子1
Olines = lines.filter(lambda x:'o' in x) # 根据传入的函数来筛选
如果传入函数里包含对象成员,spark会把整个对象传过去,所以最好先单独放到一个局部变量里。
行动操作
行动操作需要返回计算机结果,比如count,take,collect等,每一次都是从头计算。
继续接上面的例子1
2
3
4
5
6print('lines has ',lines.count(),' elements.')
for line in lines.collect():
print(line)
print('Olines has ',Olines.count(),' elements.')
for Oline in Olines.collect():
print(Oline)
全部执行结果如下
由于转化操作不会计算,所以出错了也未必知道,这里可以简单调用count()方法,通过计算元素个数来简单测试
转化操作和行动操作的区别
- 转化操作返回值是RDD,行动操作的返回值则是其他类型
- 转化操作并不会真正计算,甚至读取数据也没执行,Spark内部是使用谱系图(lineage graph)来记录他们之间的关系的,而行动操作会从读取数据开始执行,也就是每次都是从头计算
持久化
上面提到的转化操作惰性求值,会导致每次行动操作都要重新计算一遍,这样会大大浪费计算资源。
这里Spark提供一种名为持久化的方法,持久化可以让RDD的数据存储于内存中,方便重复使用。
RDD对象直接调用persist或者cache,两者的区别在于cache只有一个默认的缓存级别,而persist可以设置其它缓存级别。
Spark的MapReduce
通过求平均值这个小例子简单介绍一下几个转化操作和行动操作的用法
这是第一种,用flatMap和reduce结合,flatMap把一个list里面的数字提取出来并且结构化为(sum,count)的形式,方便reduce计算1
2
3
4
5
6from pyspark import SparkContext
sc = SparkContext('local')
nums = sc.parallelize([[1,2,3],[4,5,6],[7,8,9],[10,11,12,13]])
nums = nums.flatMap(lambda x:[(a,1) for a in x])
result = nums.reduce(lambda x,y:(x[0]+y[0],x[1]+y[1]))
result = 1.0*result[0]/result[1]
这是第二种,用flatMap和aggregate结合,flatMap只是把list中的数字提取出来,aggregate一共有三个参数,第一个参数是初始化的累加器,每个节点一个累加器,第二个参数是一个函数,进行的是累加器与每个值之间的操作,第三个参数是也是一个函数,当执行完第二个参数后,进行累加器之间的操作。1
2
3
4
5
6
7nums = sc.parallelize([[1,2,3],[4,5,6],[7,8,9],[10,11,12,13]])
nums = nums.flatMap(lambda x:[a for a in x]) #把每个值提取出来
result = nums.aggregate((0,0), #设置累加器初始值
(lambda acc,value: (acc[0]+value,acc[1]+1)),
(lambda acc1,acc2: (acc1[0]+acc2[0],acc1[1]+acc2[1]))
)
result = 1.0*result[0]/result[1]
小结
Spark功能强大,编程思路与正常的单机数据处理略有不同,虽然接触了半天,但也算是弄懂了一些核心概念,收获不错。
参考
配置方面
https://spark.apache.org/downloads.html
http://blog.csdn.net/yiyouxian/article/details/51020334
http://www.jianshu.com/p/7b325155edab
http://blog.csdn.net/ydq1206/article/details/51922148
持久化
http://www.ccblog.cn/102.htm
http://blog.csdn.net/houmou/article/details/52491419