Count unique ids between two consecutive dates that are values of a column in PySpark


I have a PySpark DF, with ID and Date programming column, looking like this.

ID Date
1 2021-10-01
2 2021-10-01
1 2021-10-02
3 2021-10-02

I want to count the number of unique IDs that did not exist in the date one day before. So, here the result would be 1 as there is only one new unique ID in 2021-10-02.

ID Date Count
1 2021-10-01 -
2 2021-10-01 -
1 2021-10-02 1
3 2021-10-02 1

I tried following this solution but it does not work on date type value. Any help would be highly appreciated. Thank you!

Count unique ids between two consecutive dates that are values of a column in PySpark

If you want to avoid a self-join (e.g. for performance reasons), you could work with Window functions:

from pyspark.sql import Row, Window
import datetime

df = spark.createDataFrame([
    Row(ID=1, date=datetime.date(2021,10,1)),
    Row(ID=2, date=datetime.date(2021,10,1)),
    Row(ID=1, date=datetime.date(2021,10,2)),
    Row(ID=2, date=datetime.date(2021,10,2)),
    Row(ID=1, date=datetime.date(2021,10,3)),
    Row(ID=3, date=datetime.date(2021,10,3)),

First add the number of days since an ID was last seen (will be None if it never appeared before)

df = df.withColumn('days_since_last_occurrence', F.datediff('date', F.lag('date').over(Window.partitionBy('ID').orderBy('date'))))

Second, we add a column marking rows where this number of days is not 1. We add a 1 into this column so that we can later sum over this column to count the rows

df = df.withColumn('is_new', F.when(F.col('days_since_last_occurrence') == 1, None).otherwise(1))

Now we do the sum of all rows with the same date and then remove the column we do not require anymore:

    .withColumn('count', F.sum('is_new').over(Window.partitionBy('date'))) # sum over all rows with the same date
  .drop('is_new', 'days_since_last_occurrence')
    .sort('date', 'ID')
# Output:
| ID|      date|count|
|  1|2021-10-01|    2|
|  2|2021-10-01|    2|
|  1|2021-10-02| null|
|  2|2021-10-02| null|
|  1|2021-10-03|    1|
|  3|2021-10-03|    1|

Count unique ids between two consecutive dates that are values of a column in PySpark

Take out the id list of the current day and the previous day, and then get the size of the difference between the two to get the final result.

Update to a solution to eliminate join.

df ='date', F.expr('collect_set(id) over (partition by date) as id_arr')).dropDuplicates() \
    .select('*', F.expr('size(array_except(id_arr, lag(id_arr,1,id_arr) over (order by date))) as count')) \
     .select(F.explode('id_arr').alias('id'), 'date', 'count')

