enjoyhot
7/17/2016 - 5:45 AM

spark-notes

spark-notes

#基础笔记

标签(空格分隔): spark


基本操作


启动Spark

shell

/usr/local/spark/bin$ ./pyspark --executor-memory 4G --total-executor-cores 80

submit

/usr/local/spark/bin/spark-submit --executor-memory 4G --total-executor-cores 80 --py-files loadDataSetSpark.py mainSpark.py

认识RDD

RDD:弹性分布式数据集,表示分布在多个计算节点上可以并行操作的元素集合。 操作:转化操作(transformation)和行动操作(action),可通过返回值区别,转化操作返回RDD类型,行动操作为其它类型。

如以下lines就是一个RDD:

#sc = SparkContext(conf=conf),这个在启动shell中就作为全局变量存在,不用自行初始化
lines = sc.parallelize(['pandas','i like pandas'])
lines = sc.textFile('readme.md')

RDD.persist() # 能把这个RDD缓存,降低action计算的成本(因为每次新的action操作都要重新计算一次),对应的就有RDD.unpersist(),当内存吃紧时可以用。

RDD操作的基本流程: e.g

lines_data = sc.textFile('readme.md')
lines = lines_data.filter()
lines.persist()
lines.count()
lines.first()

注:

  • rdd.collect()操作是将数据存在单台机器的内存上;
  • rdd.filter(self.xxx)会分发整个self对象,可改为val=self.xxx;

基本函数

  • RDD操作: distinct() union(rdd) intersection(rdd) #交集 substract(rdd) #减去交集

  • 转化操作: 1.map(func) # 能实现list append 2.flatMap(func) # 能实现list expend

  • 行动操作: 1.reduce(fun) 2.top(n),take(n)

  • pair RDD转化操作: mapValues(func): rdd.mapValues(lambda x : x+1) # key不变,value+1 reduceByKey(func) #接收对相同的key的2个value参数 keys() values() combineByKey():该函数用于对key的值进行各种操作,相比其它ByKey更原生,计算(key,mean)例子如下:

sumCount = keyValue.combineByKey((lambda x: (x,1),
                                  (lambda x,value:(x[0] + y,x[1] + 1)),
                                  lambda x,y: (x[0]+y[0],x[1]+y[1])))
sumCount.map(lambda key,xy:(key,xy[0]/xy[1])).collectAsMap()
其它:
rdd.reduceByKey(func) == rdd.groupByKey().mapValues(lambda x:x.reduce(func))
rdd1.join(rdd2) #{(1,(2,3)),(2,(4,5)),...}
  • pair RDD行动操作: rdd.lookup(1) #{(1,2),(1,3),(2,3)} 返回[2,3]

读取与保存文件

1.读取本地数据并保存为一般格式

tow_state = sc.textFile('/user/gujw/town_state.csv')
tow_state.saveAsTextFile('/user/gujw/town_state.csv')

在50070端口可查看上传到分布式文件系统的情况

注:不用手动加hdfs前缀.

2.保存为Parquet格式 一种流行的列式存储格式,相对高效。

#from pyspark.sql import *
#sqlc = SQLContext(sc)
lines_rdd = sc.textFile('/user/gujw/town_state.csv').map(lambda line: line.split(","))
header = lines_rdd.first()
rdd = lines_rdd.filter(lambda x:x!=header)
tow_state_df = rdd.toDF(['id','town','state']) # ['id','town','state']
tow_state_df.show()
tow_state_df.saveAsParquetFile('/user/gujw/test/town_state.parquet')

假如要使用hdfs的parquet文件

from pyspark.sql import *
sqlc = SQLContext(sc)
df = sqlc.read.parquet("/user/gujw/test/town_state.parquet")
df.registerTempTable("dft")
sqlc.cacheTable("dft")
s = "select id,town from dft"
sqlc.sql(s).show()

取元素:

id_town_list = sqlc.sql(s).collect()
for item in id_town_list:
    print item[0]
    break

提高速度的方法


读取文件

RDD = sc.textFile(dir | part-*.txt) # 读取目录或正则匹配
pairRDD = sc.wholeTextFile(dir | part-*.txt)

wholeTextFile如果是读一个文件,一次读取所有行,返回rdd(filepath,contents)

代码逻辑层面

1.对数据分区,如pair RDD采用partitionBy(100)进行哈希分区,避免不必要的混洗,但注意需要persist持久化才能避免重新分区;

对RDD的结果有分区的是:cogroup(结果自动分区吧?,下同),groupWith,join,leftOuterJoin,rightOuterJoin,groupByKey,reduceByKey,combineByKey,partitionBy,sort,mapValues(父RDD需有分区),flatMapValues(父RDD需有分区)

2、自定义分区

import urlparse
def hash_domain(url):
    return hash(urlparse.urlparse(url).netloc)
rdd.partitionBy(100,hash_domain)