// Spark has SQL functions to deal with numbers
import org.apache.spark.sql.functions.{expr, pow}
val fabricatedQuantity = pow(col("Quantity") * col("UnitPrice"), 2) + 5
df.select(expr("CustomerId"), fabricatedQuantity.alias("realQuantity"))
// We can do the same with expression
// SELECT customerId, (POWER((Quantity * UnitPrice), 2.0) + 5) as realQuantity FROM dfTable
df.selectExpr(
"CustomerId",
"(POWER((Quantity * UnitPrice), 2.0) + 5) as realQuantity"
)
// Another numerical task is to compute the correlation of two columns
// SELECT corr(Quantity, UnitPrice) FROM dfTable
import org.apache.spark.sql.functions.{corr}
df.stat.corr("Quantity", "UnitPrice")
df.select(corr("Quantity", "UnitPrice"))
// Another common task is to compute summary statistics for a column or set of columns.
// We can use the describe method to achieve exactly this.
// This will take all numeric columns and calculate the count, mean,
// standard deviation, min, and max.
df.describe().show()
// There are a number of statistical functions available in the StatFunctions Package
df.stat.freqItems(Seq("StockCode", "Quantity")).show()
df.stat.crosstab("StockCode", "Quantity").show()
// We can use spark Boolean type to make logical operations, for example:
import org.apache.spark.sql.functions.col
df
.where(col("InvoiceNo").equalTo(536365))
.select("InvoiceNo", "Description")
.show(5, false)
// We can use expressions too:
df.where("InvoiceNo = 536365")
df.where("InvoiceNo <> 536365")
// Several filters
// SELECT * FROM dfTable
// WHERE StockCode in ("DOT") AND (UnitPrice > 600 OR instr(Description, "POSTAGE") >= 1)
val priceFilter = col("UnitPrice") > 600
val descripFilter = col("Description").contains("POSTAGE")
df.where(col("StockCode").isin("DOT")).where(priceFilter.or(descripFilter))
// Boolean expressions are not just reserved to filters. To filter a DataFrame,
// you can also just specify a Boolean column:
// SELECT UnitPrice,
// (StockCode = 'DOT' AND (UnitPrice > 600 OR instr(Description, "POSTAGE") >= 1)) as isExpensive
// FROM dfTable
// WHERE (StockCode = 'DOT' AND
// (UnitPrice > 600 OR instr(Description, "POSTAGE") >= 1))
val DOTCodeFilter = col("StockCode") === "DOT"
val priceFilter = col("UnitPrice") > 600
val descripFilter = col("Description").contains("POSTAGE")
df.withColumn("isExpensive", DOTCodeFilter.and(priceFilter.or(descripFilter)))
.where("isExpensive")
.select("unitPrice", "isExpensive")
// We can use small helper functions
import org.apache.spark.sql.functions.{expr, not, col}
df.
withColumn("isExpensive", not(col("UnitPrice").leq(250)))
.filter("isExpensive")
.select("Description", "UnitPrice")
df
.withColumn("isExpensive", expr("NOT UnitPrice <= 250"))
.filter("isExpensive")
.select("Description", "UnitPrice")
// null-safe equivalence test
df.where(col("Description").eqNullSafe("hello"))