Mastering PySpark Window Functions: 5 Key Examples
Written on
Introduction to Window Operations
In the realm of data analysis, window operations are indispensable tools. While various tools offer different levels of flexibility and functionality, the ability to perform calculations across a defined window is critical.
Understanding Windows in Data Analysis
A window refers to a collection of rows that share a certain relationship, which could be based on common group membership or consecutive days. Once we define a window with the necessary constraints, we can execute calculations or aggregations on that subset of data.
In this article, we will explore five detailed examples to deepen our comprehension of window operations using PySpark. We will learn how to create windows with partitions, tailor these windows, and perform calculations on them.
PySpark serves as a Python API for Spark, an analytics engine designed for large-scale data processing. For our examples, I have prepared a sample dataset containing mock data, which you can download from my datasets repository. The dataset we will analyze is named "sample_sales_pyspark.csv".
Let's initiate a Spark session and create a DataFrame from this dataset.
from pyspark.sql import SparkSession
from pyspark.sql import Window, functions as F
spark = SparkSession.builder.getOrCreate()
data = spark.read.csv("sample_sales_pyspark.csv", header=True)
data.show(15)
Example 1: Cumulative Sales Calculation
To start, we can create a window by partitioning and ordering columns. Our DataFrame contains information regarding store, product, and sales, including quantity, price, revenue, and date. If we wish to compute the cumulative sales of each product per store, we can define our window like this:
window = Window.partitionBy("store_code", "product_code").orderBy("sales_date")
Next, we can calculate the cumulative sales by applying the sum function over this window:
data = data.withColumn("total_sales", F.sum("sales_qty").over(window))
This command generates a new column called "total_sales," which reflects the cumulative sum of sales quantities computed over the defined window. Let's check the first 30 entries for store "B1" to verify the accuracy of our cumulative calculations:
data
.filter((F.col("store_code")=="B1"))
.select("store_code", "product_code", "sales_date", "sales_qty", "total_sales")
.show(30)
The output should show cumulative sales information for the specified store and product.
Example 2: Maximum Price Calculation
Once we have established a window, we can perform various other aggregations. For instance, if we utilize the max function within the previously defined window, we will obtain the cumulative maximum price of products in that store.
# define the window
window = Window.partitionBy("store_code", "product_code").orderBy("sales_date")
# cumulative max price
data = data.withColumn("max_price", F.max("price").over(window))
# check the output
data
.filter((F.col("store_code")=="B1"))
.select("store_code", "product_code", "sales_date", "price", "max_price")
.show(15)
The values in the "max_price" column will either increase or remain unchanged. Even if the current price drops, the maximum price will hold steady, reflecting the highest value encountered so far.
Example 3: Lag and Lead Functions
Lag and lead functions are frequently employed in time series analysis. They return values offset from the current row:
- lag("sales_qty", 1): 1 row before
- lag("sales_qty", 2): 2 rows before
- lead("sales_qty", 1): 1 row after
- lead("sales_qty", 2): 2 rows after
We can specify the offset using negative values, so lag("sales_qty", 1) is equivalent to lead("sales_qty", -1). Let’s see how this works:
# define the window
window = Window.partitionBy("store_code", "product_code").orderBy("sales_date")
# previous day sales qty
data = data.withColumn("prev_day_sales_lag", F.lag("sales_qty", 1).over(window))
data = data.withColumn("prev_day_sales_lead", F.lead("sales_qty", -1).over(window))
# check the output for a different product-store pair
data
.filter((F.col("store_code")=="A1") & (F.col("product_code")=="95955"))
.select("sales_date", "sales_qty", "prev_day_sales_lag", "prev_day_sales_lead")
.show(15)
The first row will have a null value for the previous day's sales because there is no prior record.
Example 4: Custom Window with Rows Between
Having established a partitioned window, we can refine it using the rowsBetween method. For instance, if we want to calculate the average sales quantity over the last three days within the window, we can define it as follows:
window = Window
.partitionBy("store_code", "product_code")
.orderBy("sales_date")
.rowsBetween(-3, -1)
The first and second parameters indicate the start and end of the row range. To compute the average sales quantity over the last three days, we apply the mean function over this window:
data = data.withColumn("last_3_day_avg", F.mean("sales_qty").over(window))
Example 5: Cumulative Average Calculation
Lastly, we might need to calculate the cumulative average for a column within defined windows. This involves covering all rows from the first to the current one.
We can achieve this by setting the starting point using unboundedPreceding. Here’s how:
window = Window
.partitionBy("store_code", "product_code")
.orderBy("sales_date")
.rowsBetween(Window.unboundedPreceding, -1)
# calculate mean
data = data.withColumn("cumulative_mean", F.mean("sales_qty").over(window))
The cumulative mean column will provide the average sales quantity of all rows leading up to the current row (excluding the current row). If we want to include the current row in the calculation, we can adjust the window as follows:
window = Window
.partitionBy("store_code", "product_code")
.orderBy("sales_date")
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
Conclusion
Window operations are vital for data analysis, particularly when working with time series data or developing machine learning models for predictive analytics. They allow us to create various features necessary for analysis.
Most data manipulation tools offer functionalities that simplify window operations. In this article, we explored how to leverage PySpark for these operations effectively.
Thank you for your attention! I welcome any feedback or questions you may have.