vorobuev
1/30/2020 - 11:40 AM

SparkTypes

// Spark has SQL functions to deal with numbers
import org.apache.spark.sql.functions.{expr, pow}
val fabricatedQuantity = pow(col("Quantity") * col("UnitPrice"), 2) + 5 
df.select(expr("CustomerId"), fabricatedQuantity.alias("realQuantity"))

// We can do the same with expression
// SELECT customerId, (POWER((Quantity * UnitPrice), 2.0) + 5) as realQuantity FROM dfTable
df.selectExpr(
  "CustomerId",
  "(POWER((Quantity * UnitPrice), 2.0) + 5) as realQuantity"
)

// Another numerical task is to compute the correlation of two columns
// SELECT corr(Quantity, UnitPrice) FROM dfTable
import org.apache.spark.sql.functions.{corr} 
df.stat.corr("Quantity", "UnitPrice") 
df.select(corr("Quantity", "UnitPrice"))

// Another common task is to compute summary statistics for a column or set of columns. 
// We can use the describe method to achieve exactly this. 
// This will take all numeric columns and calculate the count, mean, 
// standard deviation, min, and max.
df.describe().show()

// There are a number of statistical functions available in the StatFunctions Package
df.stat.freqItems(Seq("StockCode", "Quantity")).show()
df.stat.crosstab("StockCode", "Quantity").show()
// We can use spark Boolean type to make logical operations, for example:
import org.apache.spark.sql.functions.col 
df
  .where(col("InvoiceNo").equalTo(536365))
  .select("InvoiceNo", "Description")
  .show(5, false)
  
// We can use expressions too:
df.where("InvoiceNo = 536365")
df.where("InvoiceNo <> 536365")

// Several filters
// SELECT * FROM dfTable 
// WHERE StockCode in ("DOT") AND (UnitPrice > 600 OR instr(Description, "POSTAGE") >= 1)
val priceFilter = col("UnitPrice") > 600
val descripFilter = col("Description").contains("POSTAGE") 
df.where(col("StockCode").isin("DOT")).where(priceFilter.or(descripFilter))

// Boolean expressions are not just reserved to filters. To filter a DataFrame, 
// you can also just specify a Boolean column:
// SELECT UnitPrice, 
//  (StockCode = 'DOT' AND (UnitPrice > 600 OR instr(Description, "POSTAGE") >= 1)) as isExpensive
// FROM dfTable
// WHERE (StockCode = 'DOT' AND
//  (UnitPrice > 600 OR instr(Description, "POSTAGE") >= 1))
val DOTCodeFilter = col("StockCode") === "DOT"
val priceFilter = col("UnitPrice") > 600
val descripFilter = col("Description").contains("POSTAGE") 
df.withColumn("isExpensive", DOTCodeFilter.and(priceFilter.or(descripFilter)))
  .where("isExpensive")
  .select("unitPrice", "isExpensive")
  
// We can use small helper functions
import org.apache.spark.sql.functions.{expr, not, col} 
df.
  withColumn("isExpensive", not(col("UnitPrice").leq(250)))
  .filter("isExpensive")
  .select("Description", "UnitPrice")
df
  .withColumn("isExpensive", expr("NOT UnitPrice <= 250"))
  .filter("isExpensive")
  .select("Description", "UnitPrice")
  
// null-safe equivalence test
df.where(col("Description").eqNullSafe("hello"))