Working with dates and times in Spark

Spark supports DateType and TimestampType columns and defines a rich API of functions to make working with dates and times easy. This blog post will demonstrates how to make DataFrames with DateType / TimestampType columns and how to leverage Spark’s functions for working with these columns.

Complex Spark Column types

Spark supports ArrayTypeMapType and StructType columns in addition to the DateType / TimestampType columns covered in this post.

Check out Writing Beautiful Spark Code for a detailed overview of the different complex column types and how they should be used when architecting Spark applications.

Creating DateType columns

Import the java.sql.Date library to create a DataFrame with a DateType column.

import java.sql.Date
import org.apache.spark.sql.types.{DateType, IntegerType}

val sourceDF = spark.createDF(
  List(
    (1, Date.valueOf("2016-09-30")),
    (2, Date.valueOf("2016-12-14"))
  ), List(
    ("person_id", IntegerType, true),
    ("birth_date", DateType, true)
  )
)
sourceDF.show()

+---------+----------+
|person_id|birth_date|
+---------+----------+
|        1|2016-09-30|
|        2|2016-12-14|
+---------+----------+

sourceDF.printSchema()

root
 |-- person_id: integer (nullable = true)
 |-- birth_date: date (nullable = true)

The cast() method can create a DateType column by converting a StringType column into a date.

val sourceDF = spark.createDF(
  List(
    (1, "2013-01-30"),
    (2, "2012-01-01")
  ), List(
    ("person_id", IntegerType, true),
    ("birth_date", StringType, true)
  )
).withColumn(
  "birth_date",
  col("birth_date").cast("date")
)
sourceDF.show()

+---------+----------+
|person_id|birth_date|
+---------+----------+
|        1|2013-01-30|
|        2|2012-01-01|
+---------+----------+

sourceDF.printSchema()

root
 |-- person_id: integer (nullable = true)
 |-- birth_date: date (nullable = true)

year(), month(), dayofmonth()

Let’s create a DataFrame with a DateType column and use built in Spark functions to extract the year, month, and day from the date.

val sourceDF = spark.createDF(
  List(
    (1, Date.valueOf("2016-09-30")),
    (2, Date.valueOf("2016-12-14"))
  ), List(
    ("person_id", IntegerType, true),
    ("birth_date", DateType, true)
  )
)

sourceDF.withColumn(
  "birth_year",
  year(col("birth_date"))
).withColumn(
  "birth_month",
  month(col("birth_date"))
).withColumn(
  "birth_day",
  dayofmonth(col("birth_date"))
).show()
+---------+----------+----------+-----------+---------+
|person_id|birth_date|birth_year|birth_month|birth_day|
+---------+----------+----------+-----------+---------+
|        1|2016-09-30|      2016|          9|       30|
|        2|2016-12-14|      2016|         12|       14|
+---------+----------+----------+-----------+---------+

minute(), second()

Let’s create a DataFrame with a TimestampType column and use built in Spark functions to extract the minute and second from the timestamp.

import java.sql.Timestamp

val sourceDF = spark.createDF(
  List(
    (1, Timestamp.valueOf("2017-12-02 03:04:00")),
    (2, Timestamp.valueOf("1999-01-01 01:45:20"))
  ), List(
    ("person_id", IntegerType, true),
    ("fun_time", TimestampType, true)
  )
)

sourceDF.withColumn(
  "fun_minute",
  minute(col("fun_time"))
).withColumn(
  "fun_second",
  second(col("fun_time"))
).show()
+---------+-------------------+----------+----------+
|person_id|           fun_time|fun_minute|fun_second|
+---------+-------------------+----------+----------+
|        1|2017-12-02 03:04:00|         4|         0|
|        2|1999-01-01 01:45:20|        45|        20|
+---------+-------------------+----------+----------+

datediff()

The datediff() and current_date() functions can be used to calculate the number of days between today and a date in a DateType column. Let’s use these functions to calculate someone’s age in days.

val sourceDF = spark.createDF(
  List(
    (1, Date.valueOf("1990-09-30")),
    (2, Date.valueOf("2001-12-14"))
  ), List(
    ("person_id", IntegerType, true),
    ("birth_date", DateType, true)
  )
)

sourceDF.withColumn(
  "age_in_days",
  datediff(current_timestamp(), col("birth_date"))
).show()
+---------+----------+-----------+
|person_id|birth_date|age_in_days|
+---------+----------+-----------+
|        1|1990-09-30|       9946|
|        2|2001-12-14|       5853|
+---------+----------+-----------+

date_add()

The date_add() function can be used to add days to a date. Let’s add 15 days to a date column.

val sourceDF = spark.createDF(
  List(
    (1, Date.valueOf("1990-09-30")),
    (2, Date.valueOf("2001-12-14"))
  ), List(
    ("person_id", IntegerType, true),
    ("birth_date", DateType, true)
  )
)

sourceDF.withColumn(
  "15_days_old",
  date_add(col("birth_date"), 15)
).show()
+---------+----------+-----------+
|person_id|birth_date|15_days_old|
+---------+----------+-----------+
|        1|1990-09-30| 1990-10-15|
|        2|2001-12-14| 2001-12-29|
+---------+----------+-----------+

Next steps

Look at the Spark SQL functions for the full list of methods available for working with dates and times in Spark.

The Spark date functions aren’t comprehensive and Java / Scala datetime libraries are notoriously difficult to work with. We should think about filling in the gaps in the native Spark datetime libraries by adding functions to spark-daria.

Registration

2 Comments


  1. Hi Matthew,
    First of all, Thank you for such a nice explanation. I have created DateType columns using these approaches and write output data frame to parquet file, but while opening this file it will start giving error like below :

    Parquet.ParquetException: fatal error reading column ‘birth_date’
    System.ArgumentException: The UTC Offset of the local dateTime parameter does not match the offset argument.

    Could you please help with this?

    Thanks in advance 🙂

    Reply

  2. Hey I’m a huge fan of your posts. I usually have been able to find what I’m looking for but I feel like I haven’t found any resources on the web about converting ArrayType(StringType()) to ArrayType(DateType()). I have an array like the following [“22/01/2021”, “13/10/2018”] and I want to convert it to ISO-8601 format. Some suggestions included exploding the array column and then using pyspark.sql.functions.to_date(), but this is inefficient (millions of rows exploded will give hundreds of millions of rows) and quite frankly not elegant. I look forward to hearing your suggestion on this–it seems simple but Im astounded by the lack of resources Ive found thus far, cheers!

    Reply

Leave a Reply

Your email address will not be published. Required fields are marked *