Loading...

How to group by time interval in Spark SQL

Answer #1 100 %

Spark >= 2.0

You can use window (not to be mistaken with window functions). Depending on a variant it assigns timestamp, to one more, potentially overlapping buckets:

df.groupBy($"KEY", window($"time", "5 minutes")).sum("metric")

// +---+---------------------------------------------+-----------+
// |KEY|window                                       |sum(metric)|
// +---+---------------------------------------------+-----------+
// |001|[2016-05-01 10:50:00.0,2016-05-01 10:55:00.0]|45         |
// |001|[2016-05-01 10:55:00.0,2016-05-01 11:00:00.0]|12         |
// |003|[2016-05-01 10:55:00.0,2016-05-01 11:00:00.0]|13         |
// |001|[2016-05-01 11:00:00.0,2016-05-01 11:05:00.0]|11         |
// |002|[2016-05-01 10:50:00.0,2016-05-01 10:55:00.0]|100        |
// +---+---------------------------------------------+-----------+

Spark < 2.0

Lets start with example data:

import spark.implicits._  // import sqlContext.implicits._ in Spark < 2.0

val df = Seq(
  ("001", "event1", 10, "2016-05-01 10:50:51"),
  ("002", "event2", 100, "2016-05-01 10:50:53"),
  ("001", "event3", 20, "2016-05-01 10:50:55"),
  ("001", "event1", 15, "2016-05-01 10:51:50"),
  ("003", "event1", 13, "2016-05-01 10:55:30"),
  ("001", "event2", 12, "2016-05-01 10:57:00"),
  ("001", "event3", 11, "2016-05-01 11:00:01")
).toDF("KEY", "Event_Type", "metric", "Time")

I assume that event is identified by KEY. If this is not the case you can adjust GROUP BY / PARTITION BY clauses according to your requirements.

If you're interested in an aggregation with static window independent of data convert timestamps to a numeric data type and round

import org.apache.spark.sql.functions.{round, sum}

// cast string to timestamp_seconds
val ts = $"Time".cast("timestamp").cast("long")

// Round to 300 seconds interval
// In Spark >= 3.1 replace cast("timestamp") with 
val interval = (round(ts / 300L) * 300.0).cast("timestamp").alias("interval")

df.groupBy($"KEY", interval).sum("metric")

// +---+---------------------+-----------+
// |KEY|interval             |sum(metric)|
// +---+---------------------+-----------+
// |001|2016-05-01 11:00:00.0|11         |
// |001|2016-05-01 10:55:00.0|12         |
// |001|2016-05-01 10:50:00.0|45         |
// |003|2016-05-01 10:55:00.0|13         |
// |002|2016-05-01 10:50:00.0|100        |
// +---+---------------------+-----------+

If you're interested in a window relative to the current row use window functions:

import org.apache.spark.sql.expressions.Window

// Partition by KEY
// Order by timestamp 
// Consider window of -150 seconds to + 150 seconds relative to the current row
val w = Window.partitionBy($"KEY").orderBy("ts").rangeBetween(-150, 150)
df.withColumn("ts", ts).withColumn("window_sum", sum($"metric").over(w))

// +---+----------+------+-------------------+----------+----------+
// |KEY|Event_Type|metric|Time               |ts        |window_sum|
// +---+----------+------+-------------------+----------+----------+
// |003|event1    |13    |2016-05-01 10:55:30|1462092930|13        |
// |001|event1    |10    |2016-05-01 10:50:51|1462092651|45        |
// |001|event3    |20    |2016-05-01 10:50:55|1462092655|45        |
// |001|event1    |15    |2016-05-01 10:51:50|1462092710|45        |
// |001|event2    |12    |2016-05-01 10:57:00|1462093020|12        |
// |001|event3    |11    |2016-05-01 11:00:01|1462093201|11        |
// |002|event2    |100   |2016-05-01 10:50:53|1462092653|100       |
// +---+----------+------+-------------------+----------+----------+

For performance reasons this approach is useful only if data can partitioned into multiple separate groups. In Spark < 2.0.0 you'll also need HiveContext to make it work.

You’ll also like:


© 2022 CodeForDev.com -