[Submit Dataproc jobs for unstructured data] #GCP #Google #Cloud #Dataproc #unstructured #data
Unstructured data includes data that is without a schema and data that has a structure, but which is not useful for the intended purpose.
In this lab you will learn about Spark and the framework of Resilient Distributed Datasets (RDDs) and operations for working with big data and unstructured data.
copy files to master node home dir
cp -r /training .
hadoop fs -ls /
import files into HDFS
hadoop fs -mkdir /sampledata
hadoop fs -copyFromLocal road-not-taken.txt /sampledata/.
hadoop fs -copyFromLocal sherlock-holmes.txt /sampledata/.
hadoop fs -ls /sampledata
open Hadoop interface in browser (compute engine > IP of master node + port)
start pySpark in shell of master node:
pyspark
create RDD by reading text file from HDFS; Use the python type() function to identify the object type. And use a built-in method for the object to count the number of lines:
lines = sc.textFile("/sampledata/sherlock-holmes.txt")
type(lines)
lines.count()
lines.take(15)
split sentences into words using flatMap():
words = lines.flatMap(lambda x: x.split(' '))
type(words)
words.count()
words.take(15)
Use a spark map() transformation to create pairs. The first element in the pair is the word. The second element of the pair is the number of characters (length) of the word:
pairs = words.map(lambda x: (x,len(x)))
type(pairs)
pairs.count()
pairs.take(5)
The objective is to count the number of words of various lengths -- so how many words with 10 characters are used in the books. Modify the map() so that it creates paris of word length, and '1' for each instance. This is a common "column-oriented" method for counting instances in a parallelizable way.
pairs = words.map(lambda x: (len(x),1))
pairs.take(5)
Use a parallelizable method (add) to accumulate the instances. The add function will be called inside the Spark reduceByKey() transformation.
from operator import add
wordsize = pairs.reduceByKey(add)
type(wordsize)
wordsize.count()
wordsize.take(5)
Convert the RDD into a Python object for easy output.
output = wordsize.collect()
type(output)
for (size,count) in output: print(size, count)
What happened? Why is the list out of order? It is because the collect() action was parallelized, and then the results were assembled. Use the Spark sortByKey() method to sort the pairs before collecting them into a list.
output = wordsize.sortByKey().collect()
for (size,count) in output: print(size, count)
using the dot operator, you can dot eh steps above in a more concide way:
from operator import add
output2 = sc.textFile("/sampledata/sherlock-holmes.txt").flatMap(lambda x: x.split(' ')).map(lambda x: (len(x),1)).reduceByKey(add).sortByKey().collect()
for (size, count) in output2: print(size, count)
leave pySpark
exit()
create script file
nano wordcount.py
Paste code > Ctrl-O > Ctrl-X
from pyspark.sql import SparkSession
from operator import add
import re
print("Okay Google.")
spark = SparkSession.builder.appName("CountUniqueWords").getOrCreate()
lines = spark.read.text("/sampledata/road-not-taken.txt").rdd.map(lambda x: x[0])
counts = lines.flatMap(lambda x: x.split(' ')) \
.filter(lambda x: re.sub('[^a-zA-Z]+', '', x)) \
.filter(lambda x: len(x)>1 ) \
.map(lambda x: x.upper()) \
.map(lambda x: (x, 1)) \
.reduceByKey(add) \
.sortByKey()
output = counts.collect()
for (word, count) in output:
print("%s = %i" % (word, count))
spark.stop()
Run the job
spark-submit wordcount.py