vsouza
10/5/2016 - 2:50 AM

spark_to_redshift.py

def toRedshift(time, rdd):
    try:

        sqlContext = getSqlContextInstance(rdd.context)
        schema = StructType([
            StructField('user_id', StringType(), True),
            StructField('device_id', StringType(), True),
            StructField('steps', IntegerType(), True),
            StructField('battery_level', IntegerType(), True),
            StructField('calories_spent', IntegerType(), True),
            StructField('distance', FloatType(), True),
            StructField('current_time', IntegerType(), True),
            
        ])
        df = sqlContext.createDataFrame(rdd, schema)
        df.registerTempTable("activity_log")
        df.write \
          .format("com.databricks.spark.redshift") \
          .option("url", "jdbc:redshiftURL.com:5439/database?user=USERNAME&password=PASSWORD") \
          .option("dbtable", "activity_log") \
          .option("tempdir", "s3n://spark-temp-data/") \
          .mode("append") \
          .save()
    except Exception as e:
        raise(e)

py_rdd.foreachRDD(process)