j-bennet
9/15/2017 - 10:25 PM

[Simple timestamp aggregation in Spark] #spark

[Simple timestamp aggregation in Spark] #spark

import datetime as dt
from pyspark.sql.types import *

from context import initialize

MY_SCHEMA = StructType([
    StructField('ts', TimestampType(), True),
])


if __name__ == '__main__':
    data = [
        (dt.datetime(2017, 9, 1, hour, minute),)
        for hour in range(24)
        for minute in range(0, 60)
    ]

    sc, sqlContext = initialize()
    rdd = sc.parallelize(data)
    df = sqlContext.createDataFrame(rdd, schema=MY_SCHEMA)
    df.createOrReplaceTempView('df')

    agg5m = sqlContext.sql("""
      select window(ts, '5 minutes').start as ts_5min 
      from df
      group by window(ts, '5 minutes')
    """)

    agg1h = sqlContext.sql("""
      select window(ts, '1 hour').start as ts_1hour 
      from df
      group by window(ts, '1 hour')
    """)

    rows5m = agg5m.sort(agg5m.ts_5min).collect()
    rows1h = agg1h.sort(agg1h.ts_1hour).collect()