Mikkel84
10/14/2018 - 8:31 AM

[Submit Dataproc jobs for unstructured data] #GCP #Google #Cloud #Dataproc #unstructured #data

[Submit Dataproc jobs for unstructured data] #GCP #Google #Cloud #Dataproc #unstructured #data

Unstructured data includes data that is without a schema and data that has a structure, but which is not useful for the intended purpose.

In this lab you will learn about Spark and the framework of Resilient Distributed Datasets (RDDs) and operations for working with big data and unstructured data.

Preparation

  • Dataproc cluster SSH
  • secure web access (tag, IP, protocol)

Prepare Data

  • copy files to master node home dir

    cp -r /training .
    hadoop fs -ls /
    
  • import files into HDFS

    hadoop fs -mkdir /sampledata
    hadoop fs -copyFromLocal road-not-taken.txt /sampledata/.
    hadoop fs -copyFromLocal sherlock-holmes.txt /sampledata/.
    hadoop fs -ls /sampledata
    
  • open Hadoop interface in browser (compute engine > IP of master node + port)

Use interactive PySpark (REPL) to learn about RDDs & Lambda fct.

  • start pySpark in shell of master node:

    pyspark
    
  • create RDD by reading text file from HDFS; Use the python type() function to identify the object type. And use a built-in method for the object to count the number of lines:

    lines = sc.textFile("/sampledata/sherlock-holmes.txt")
    type(lines)
    lines.count()
    lines.take(15)
    
  • split sentences into words using flatMap():

    words =  lines.flatMap(lambda x: x.split(' '))
    type(words)
    words.count()
    words.take(15)
    
  • Use a spark map() transformation to create pairs. The first element in the pair is the word. The second element of the pair is the number of characters (length) of the word:

    pairs = words.map(lambda x: (x,len(x)))
    type(pairs)
    pairs.count()
    pairs.take(5)
    
  • The objective is to count the number of words of various lengths -- so how many words with 10 characters are used in the books. Modify the map() so that it creates paris of word length, and '1' for each instance. This is a common "column-oriented" method for counting instances in a parallelizable way.

    pairs = words.map(lambda x: (len(x),1))
    pairs.take(5)
    
  • Use a parallelizable method (add) to accumulate the instances. The add function will be called inside the Spark reduceByKey() transformation.

    from operator import add
    wordsize = pairs.reduceByKey(add)
    type(wordsize)
    wordsize.count()
    wordsize.take(5)
    
  • Convert the RDD into a Python object for easy output.

    output = wordsize.collect()
    type(output)
    for (size,count) in output: print(size, count)
    
  • What happened? Why is the list out of order? It is because the collect() action was parallelized, and then the results were assembled. Use the Spark sortByKey() method to sort the pairs before collecting them into a list.

    output = wordsize.sortByKey().collect()
    for (size,count) in output: print(size, count)
    
  • using the dot operator, you can dot eh steps above in a more concide way:

    from operator import add
    output2 =  sc.textFile("/sampledata/sherlock-holmes.txt").flatMap(lambda x: x.split(' ')).map(lambda x: (len(x),1)).reduceByKey(add).sortByKey().collect()
    for (size, count) in output2: print(size, count)
    
  • leave pySpark

    exit()
    

Submit the pySpark job from the master node

  1. create script file

     nano wordcount.py
    
  2. Paste code > Ctrl-O > Ctrl-X

     from pyspark.sql import SparkSession
     from operator import add
     import re
    
     print("Okay Google.")
     spark = SparkSession.builder.appName("CountUniqueWords").getOrCreate()
     lines = spark.read.text("/sampledata/road-not-taken.txt").rdd.map(lambda x: x[0])
     counts = lines.flatMap(lambda x: x.split(' ')) \
                       .filter(lambda x: re.sub('[^a-zA-Z]+', '', x)) \
                       .filter(lambda x: len(x)>1 ) \
                       .map(lambda x: x.upper()) \
                       .map(lambda x: (x, 1)) \
                       .reduceByKey(add) \
                       .sortByKey()
     output = counts.collect()
     for (word, count) in output:
       print("%s = %i" % (word, count))
     spark.stop()
    
  3. Run the job

     spark-submit wordcount.py
    
  • Hint: The Datalab applications that you ran previously do not show up in the Dataproc > Jobs page in Console. Only applications submitted from Console are tracked in Console.