def toRedshift(time, rdd):
try:
sqlContext = getSqlContextInstance(rdd.context)
schema = StructType([
StructField('user_id', StringType(), True),
StructField('device_id', StringType(), True),
StructField('steps', IntegerType(), True),
StructField('battery_level', IntegerType(), True),
StructField('calories_spent', IntegerType(), True),
StructField('distance', FloatType(), True),
StructField('current_time', IntegerType(), True),
])
df = sqlContext.createDataFrame(rdd, schema)
df.registerTempTable("activity_log")
df.write \
.format("com.databricks.spark.redshift") \
.option("url", "jdbc:redshiftURL.com:5439/database?user=USERNAME&password=PASSWORD") \
.option("dbtable", "activity_log") \
.option("tempdir", "s3n://spark-temp-data/") \
.mode("append") \
.save()
except Exception as e:
raise(e)
py_rdd.foreachRDD(process)