[Simple timestamp aggregation in Spark] #spark
import datetime as dt
from pyspark.sql.types import *
from context import initialize
MY_SCHEMA = StructType([
StructField('ts', TimestampType(), True),
])
if __name__ == '__main__':
data = [
(dt.datetime(2017, 9, 1, hour, minute),)
for hour in range(24)
for minute in range(0, 60)
]
sc, sqlContext = initialize()
rdd = sc.parallelize(data)
df = sqlContext.createDataFrame(rdd, schema=MY_SCHEMA)
df.createOrReplaceTempView('df')
agg5m = sqlContext.sql("""
select window(ts, '5 minutes').start as ts_5min
from df
group by window(ts, '5 minutes')
""")
agg1h = sqlContext.sql("""
select window(ts, '1 hour').start as ts_1hour
from df
group by window(ts, '1 hour')
""")
rows5m = agg5m.sort(agg5m.ts_5min).collect()
rows1h = agg1h.sort(agg1h.ts_1hour).collect()