# ---
# Databricks training
# ---
%run "./Includes/Classroom-Setup"
# The Databricks File System (DBFS) is the built-in, Azure-blob-backed, alternative to the Hadoop Distributed File System (HDFS).
# A Parquet "file" is actually a collection of files stored in a single directory. The Parquet format offers features that make it the ideal choice for storing "big data" on distributed file systems.
ipGeocodeDF = spark.read.parquet("/mnt/training/ip-geocode.parquet")
ipGeocodeDF.printSchema()
# csv
%fs head /mnt/training/bikeSharing/data-001/day.csv --maxBytes=492
bikeSharingDayDF = (spark
.read # Call the read method returning a DataFrame
.option("inferSchema","true") # Option to tell Spark to infer the schema
.option("header","true") # Option telling Spark that the file has a header
.csv("/mnt/training/bikeSharing/data-001/day.csv")) # Option telling Spark where the file is
bikeSharingDayDF.show(10)
display(bikeSharingDayDF)
# local file
# You can also create DataFrames by uploading files. The files are nominally stored as tables, from which you create DataFrames.
# Download the following file to your local machine: state-income.csv
# Access the file via the path /FileStore/tables/state_income-9f7c5.csv
stateIncomeDF = (spark
.read # Call the read method returning a DataFrame
.option("inferSchema","true") # Option to tell Spark to infer the schema
.option("header","true") # Option telling Spark that the file has a header
.csv("/FileStore/tables/state_income-9f7c5.csv")) # Option telling Spark where the file is
stateIncomeDF.show(10)
# mount Azure blob to dbfs
""" Microsoft Azure provides cloud file storage in the form of the Blob Store. Files are stored in "blobs." If you have an Azure account, create a blob, store data files in that blob, and mount the blob as a DBFS directory.
Once the blob is mounted as a DBFS directory, access it without exposing your Azure Blob Store keys.
"""
%fs mounts
"""
Mount a Databricks Azure blob (using read-only access and secret key pair), access one of the files in the blob as a DBFS path, then unmount the blob.
Caution The mount point must start with /mnt/.
Create the mount point with dbutils.fs.mount(source = .., mountPoint = .., extraConfigs = ..)
"""
SasURL = "https://dbtraineastus2.blob.core.windows.net/?sv=2017-07-29&ss=b&srt=sco&sp=rl&se=2023-04-19T06:32:30Z&st=2018-04-18T22:32:30Z&spr=https&sig=BB%2FQzc0XHAH%2FarDQhKcpu49feb7llv3ZjnfViuI9IWo%3D"
indQuestionMark = SasURL.index('?')
SasKey = SasURL[indQuestionMark:len(SasURL)]
StorageAccount = "dbtraineastus2"
ContainerName = "training"
MountPoint = "/mnt/temp-training"
dbutils.fs.mount(
source = "wasbs://%s@%s.blob.core.windows.net/" % (ContainerName, StorageAccount),
mount_point = MountPoint,
extra_configs = {"fs.azure.sas.%s.%s.blob.core.windows.net" % (ContainerName, StorageAccount) : "%s" % SasKey}
)
# another example of blob mount from BDX project
# <https://forums.databricks.com/questions/14151/mount-blob-path-to-get-files-underlying-in-all-the.html>
# <https://docs.databricks.com/spark/latest/data-sources/azure/azure-storage.html>
%scala
dbutils.fs.mount(
source = "wasbs://spark-repo@bgpovstorage.blob.core.windows.net/",
mountPoint = "/mnt/sparkRepo",
extraConfigs = Map("fs.azure.account.key.bgpovstorage.blob.core.windows.net" -> "<key>"))
# python
dbutils.fs.mount(
source = "wasbs://spark-repo@bgpovstorage.blob.core.windows.net/",
mount_point = "/mnt/spark-repo-python",
extra_configs = {"fs.azure.account.key.bgpovstorage.blob.core.windows.net":"<key>"})
# List the contents of a subdirectory in directory you just mounted:
%fs ls /mnt/temp-training
# path / name / size
dbfs:/mnt/temp-training/301/ 301/ 0
dbfs:/mnt/temp-training/Chicago-Crimes-2018.csv Chicago-Crimes-2018.csv 5201668
# Take a peek at the head of the file auto-mpg.csv:
%fs head /mnt/temp-training/auto-mpg.csv
# Take a peek at the head of the file auto-mpg.csv:
%fs head /mnt/temp-training/auto-mpg.csv
# Now you are done, unmount the directory.
%fs unmount /mnt/temp-training
# ---
# Databricks Denver Workshop 8.21.19
# ---
# create a data frame using the pyspark function spark.read.csv()
# add an option to specifiy the first row contains a header
# spark.read.csv
flightsDF = (spark
.read
.option("header", True)
.option("inferSchema", True)
.csv("/databricks-datasets/asa/small/small.csv"))
# escaping quotes
# A reference to our csv file
inPath = "/mnt/training-sources/initech/productsCsv/"
# sample of data .csv
# product_id,category,brand,model,price,processor,size,display
# 1,Laptops ,HP,"Spectre x360 2-in-1 13.3"" 4K Ultra HD Touch-Screen Laptop",1499.989990234375,null,null,null
# 2,Laptops ,Microsoft,"Surface Pro – 12.3""",1299.989990234375,null,null,null
productDF = (
spark.read # The DataFrameReader
.option("header", True)# remove this line and replace with correct options
.option("inferSchema", True)# hint: this data is QUOTEd and ESCAPEd with double quotation marks
.option("quote", "\"")
.option("escape", "\"")
.csv(inPath) # Creates a DataFrame from CSV after reading in the file
)
# correctly read data
# Spectre x360 2-in-1 13.3" 4K Ultra HD Touch-Screen Laptop
# Surface Pro – 12.3"
# ---
# create temporary view
# ---
"""
When working on a new analytics project, or just when looking a new data, it's a helpful and often necessary exercise to examine and profile the data. SQL is a great language for this task.
Dataframes can be examined using SQL by creating a temporary view with a function named createOrReplaceTempView()
createOrReplaceTempView accepts a string as its only argumeent that specifies the name of the temporary view. That view will be scope to only this execution of the notebook. Whwen you browse away from the notebook, you can no longer access the temporary view, unless you come back and access it from the notebook it was created in.
Note:
There is an adjacent function named createOrReplaceGlobalView() that will create a view that is accessible from any notebook in the workspace.
"""
#this function creates a temporary view that can enables SQL access to the data
sampleDF.createOrReplaceTempView("dbx_example")
%sql SHOW TABLES
tmpViewName = "userAPL_products" #replace "XX" in this variable name with your user number
productDF.createOrReplaceTempView(tmpViewName) # Edit this line to create the temporary view
%sql SELECT * FROM userAPL_products # replace "XX" with your user number
%sql
# -- How many products are found in the data file?
SELECT COUNT(1) AS NumProducts FROM userXX_products;
%sql
# -- How much is the least expensive product? How much is the most expensive? What is the average product price?
SELECT MIN(price) AS minPrice, MAX(price) AS maxPrice, AVG(price) AS avgPrice FROM userXX.products;
%sql
# -- How many Microsoft models are in the data and what is their average price?
SELECT
COUNT (DISTINCT model) AS modelCount,
AVG(price) as avgPrice
FROM
userXX_products
WHERE
brand = 'Microsoft'
GROUP BY
brand
"""
Writing Data
For this exercise, we will continue using the data frame created above. In the following examples and exercises, you will write the data in the data frame to a new Spark table which can be used in later queries.
There are many options to consider when writing data. For data that will be used in future analysis, it's helpful to write the data directly to a Spark table. Spark will store the data in Parquet format by default, which is optimized for distributed processing, and a good choice most of the time.
Write data created in previous data frames to a Spark table. Tables are stored in databases. A best practice is to store tables used for shared purposes in the same database.
Databases can be created using a SQL statement
"""
%sql
CREATE DATABASE IF NOT EXISTS airlines;
#tables can also be created in a python cell
spark.sql("CREATE DATABASE IF NOT EXISTS airlines")
"""
With the database created for the output table, the following statement can be run to write the data to a table.
The saveAsTable() function is used to write a dataframe to a table. By default the saveAsTable() function writes to a folder found at /user/hive/warehouse/[dbanme]/[tablename]>. You can override this default behavior by supplying an optional "path"
Data written using saveAsTable() is stored in Parquet format unless otherwise specified
NOTE: mode("overwrite") will remove the table before writing. mode("append") will add new data to an existing table
"""
#the following code will write the data in Parquet format to the path specified
(
sampleDF.write
.mode("overwrite")
.option("path", "/data/airlines/flights")
.saveAsTable("airlines.flights")
)
%sql DESCRIBE EXTENDED airlines.flights
# When the writing is complete, the table is ready to use. Run the following SQL cell to ensure that your table is working correctly. Remember to edit the database name to match your user number
%sql
USE userAPL; # -- edit this line to contain your database name
SELECT * FROM userapl_products;