Beinsearch
12/1/2017 - 7:38 AM

spark

conf = SparkConf().setAppName("extractLogRepair") \
    .set("spark.executor.instances", "4") \
    .set("spark.executor.cores", "8")\
    .set("spark.executor.memory", "2g")
    .setMaster(...)

master url可以是:
local 本地单线程local[K] 
本地多线程(指定K个内核)local[*] 
本地多线程(指定所有可用内核)
spark://HOST:PORT  连接到指定的 Spark standalone cluster master,需要指定端口。
mesos://HOST:PORT  连接到指定的  Mesos 集群,需要指定端口。
yarn-client客户端模式 连接到 YARN 集群。需要配置 HADOOP_CONF_DIR。
yarn-cluster集群模式 连接到 YARN 集群。需要配置 HADOOP_CONF_DIR。

/usr/local/spark/bin/spark-submit \
--class cn.spark.sparktest.core.WordCountCluster \
--num-executors 3 \  配置executor的数量
--executor-memory 100m \  配置每个executor的内存大小
--executor-cores 3 \  配置每个executor的cpu core数量
--driver-memory 100m \  配置driver的内存(影响很大)
/usr/local/SparkTest-0.0.1-SNAPSHOT-jar-with-dependencies.jar \

# Spark性能调优之资源分配
https://www.cnblogs.com/haozhengfei/p/e570f24c43fa15f23ebb97929a1b7fe6.html
#rdd基本操作
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local").setAppName("My App")
sc = SparkContext(conf = conf)
lines = sc.textFile("/var/log/syslog")
LinesError = lines.filter(lambda line: "error" in line)
print lines.count()
sc.stop()

#从txt构造DataFrame
file = sc.textFile('datingTestSet.txt')
data = file.map(lambda x:x.split('\t'))
#data.take(2)
df = sqlContext.createDataFrame(data, ['Mileage ', 'Gametime', 'Icecream', 'label'])
#df.show(5, False)
#df.printSchema

#读取、保存csv
df = sqlContext.read.format('com.databricks.spark.csv').options(header='false', inferschema='true').load(TrainData)
df.select("pcaFeatures", "prediction").toPandas().to_csv("df.csv")
#df.select("year", "model").save("df.csv", "com.databricks.spark.csv")

#rdd与dataframe互转
from pyspark.sql import Row  
lines = sc.textFile("employee.txt")  
parts = lines.map(lambda l: l.split(","))  
employee = parts.map(lambda p: Row(name=p[0], salary=int(p[1])))  

#RDD转换成DataFrame  
employee_temp = spark.createDataFrame(employee)  
#创建视图  
employee_temp.createOrReplaceTempView("employee")  
#过滤数据  
employee_result = spark.sql("SELECT name,salary FROM employee WHERE salary >= 14000 AND salary <= 20000")  

# DataFrame转换成RDD  
result = employee_result.rdd.map(lambda p: "name: " + p.name + "  salary: " + str(p.salary)).collect()  

#pyspark.sql.DataFrame与pandas.DataFrame相互转换
import pandas as pd  
from pyspark.sql import SQLContext  
sqlContext = SQLContext(sc)  

df = pd.DataFrame([[1, 2, 3], [4, 5, 6]], index=['row1', 'row2'], columns=['c1', 'c2', 'c3'])  
sentenceData = spark.createDataFrame([  
    (0.0, "I like Spark"),  
    (1.0, "Pandas is useful"),  
], ["label", "sentence"])  

#pandas.DataFrame 转 spark.DataFrame  
spark_df = sqlContext.createDataFrame(df)  
#spark.DataFrame 转 pandas.DataFrame  
pandas_df = sentenceData.toPandas()  

# 使用VectorAssembler忽略某些特征列
from pyspark.ml.feature import VectorAssembler
ignore = ['id', 'label']
assembler = VectorAssembler(
    inputCols=[x for x in df.columns if x not in ignore],
    outputCol='features')
assembler.transform(df)

#StandardScaler withMean True Do not support SparseVector
https://stackoverflow.com/questions/41259885/sparsevector-vs-densevector-when-using-standardscaler
// 创建DataFrame
val training = spark.createDataFrame(Seq(
  (0L, "a b c d e spark", 1.0),
  (1L, "b d", 0.0),
  (2L, "spark f g h", 1.0),
  (3L, "hadoop mapreduce", 0.0)
)).toDF("id", "text", "label")

// 创建DataFrame
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{DoubleType, StructField, StructType}
val list = List("-0.014,0.002,45.002,0.001")
val rowRDD = spark.sparkContext.parallelize(list).map(_.split(","))
    .map(p => {p.map(_.toDouble)})
    .map(p => {Row(p(0),p(1),p(2),p(3))})
//制定数据转换规则
val schemaString = "_c0,_c1,_c2,_c3"
val schema = StructType(schemaString.split(",").map(fieldName => StructField(fieldName, DoubleType, true)))
//根据规则将数据转成DataFrame
val df = spark.createDataFrame(rowRDD, schema)

//StandardScaler cannot be used to transform SparseVector withMean: 
//https://stackoverflow.com/questions/41319904/spark-python-standard-scaler-error-do-not-support-sparsevector

// 读取、保存CSV
import org.apache.spark.sql.SQLContext
val sqlContext = new SQLContext(sc)
val df =sqlContext.read.format("com.databricks.spark.csv")
.option("header","true") 
.option("inferSchema",true.toString)
.load("D:\\Hadoop\\hadoop-2.6.0\\datatest\\world.csv")
df.select("year", "name").save("new world.csv", "com.databricks.spark.csv")