ronwdavis.com

Mastering PySpark Window Functions: 5 Key Examples

Written on

Introduction to Window Operations

In the realm of data analysis, window operations are indispensable tools. While various tools offer different levels of flexibility and functionality, the ability to perform calculations across a defined window is critical.

Understanding Windows in Data Analysis

A window refers to a collection of rows that share a certain relationship, which could be based on common group membership or consecutive days. Once we define a window with the necessary constraints, we can execute calculations or aggregations on that subset of data.

In this article, we will explore five detailed examples to deepen our comprehension of window operations using PySpark. We will learn how to create windows with partitions, tailor these windows, and perform calculations on them.

PySpark serves as a Python API for Spark, an analytics engine designed for large-scale data processing. For our examples, I have prepared a sample dataset containing mock data, which you can download from my datasets repository. The dataset we will analyze is named "sample_sales_pyspark.csv".

Let's initiate a Spark session and create a DataFrame from this dataset.

from pyspark.sql import SparkSession

from pyspark.sql import Window, functions as F

spark = SparkSession.builder.getOrCreate()

data = spark.read.csv("sample_sales_pyspark.csv", header=True)

data.show(15)

Example 1: Cumulative Sales Calculation

To start, we can create a window by partitioning and ordering columns. Our DataFrame contains information regarding store, product, and sales, including quantity, price, revenue, and date. If we wish to compute the cumulative sales of each product per store, we can define our window like this:

window = Window.partitionBy("store_code", "product_code").orderBy("sales_date")

Next, we can calculate the cumulative sales by applying the sum function over this window:

data = data.withColumn("total_sales", F.sum("sales_qty").over(window))

This command generates a new column called "total_sales," which reflects the cumulative sum of sales quantities computed over the defined window. Let's check the first 30 entries for store "B1" to verify the accuracy of our cumulative calculations:

data

.filter((F.col("store_code")=="B1"))

.select("store_code", "product_code", "sales_date", "sales_qty", "total_sales")

.show(30)

The output should show cumulative sales information for the specified store and product.

Example 2: Maximum Price Calculation

Once we have established a window, we can perform various other aggregations. For instance, if we utilize the max function within the previously defined window, we will obtain the cumulative maximum price of products in that store.

# define the window

window = Window.partitionBy("store_code", "product_code").orderBy("sales_date")

# cumulative max price

data = data.withColumn("max_price", F.max("price").over(window))

# check the output

data

.filter((F.col("store_code")=="B1"))

.select("store_code", "product_code", "sales_date", "price", "max_price")

.show(15)

The values in the "max_price" column will either increase or remain unchanged. Even if the current price drops, the maximum price will hold steady, reflecting the highest value encountered so far.

Example 3: Lag and Lead Functions

Lag and lead functions are frequently employed in time series analysis. They return values offset from the current row:

  • lag("sales_qty", 1): 1 row before
  • lag("sales_qty", 2): 2 rows before
  • lead("sales_qty", 1): 1 row after
  • lead("sales_qty", 2): 2 rows after

We can specify the offset using negative values, so lag("sales_qty", 1) is equivalent to lead("sales_qty", -1). Let’s see how this works:

# define the window

window = Window.partitionBy("store_code", "product_code").orderBy("sales_date")

# previous day sales qty

data = data.withColumn("prev_day_sales_lag", F.lag("sales_qty", 1).over(window))

data = data.withColumn("prev_day_sales_lead", F.lead("sales_qty", -1).over(window))

# check the output for a different product-store pair

data

.filter((F.col("store_code")=="A1") & (F.col("product_code")=="95955"))

.select("sales_date", "sales_qty", "prev_day_sales_lag", "prev_day_sales_lead")

.show(15)

The first row will have a null value for the previous day's sales because there is no prior record.

Example 4: Custom Window with Rows Between

Having established a partitioned window, we can refine it using the rowsBetween method. For instance, if we want to calculate the average sales quantity over the last three days within the window, we can define it as follows:

window = Window

.partitionBy("store_code", "product_code")

.orderBy("sales_date")

.rowsBetween(-3, -1)

The first and second parameters indicate the start and end of the row range. To compute the average sales quantity over the last three days, we apply the mean function over this window:

data = data.withColumn("last_3_day_avg", F.mean("sales_qty").over(window))

Example 5: Cumulative Average Calculation

Lastly, we might need to calculate the cumulative average for a column within defined windows. This involves covering all rows from the first to the current one.

We can achieve this by setting the starting point using unboundedPreceding. Here’s how:

window = Window

.partitionBy("store_code", "product_code")

.orderBy("sales_date")

.rowsBetween(Window.unboundedPreceding, -1)

# calculate mean

data = data.withColumn("cumulative_mean", F.mean("sales_qty").over(window))

The cumulative mean column will provide the average sales quantity of all rows leading up to the current row (excluding the current row). If we want to include the current row in the calculation, we can adjust the window as follows:

window = Window

.partitionBy("store_code", "product_code")

.orderBy("sales_date")

.rowsBetween(Window.unboundedPreceding, Window.currentRow)

Conclusion

Window operations are vital for data analysis, particularly when working with time series data or developing machine learning models for predictive analytics. They allow us to create various features necessary for analysis.

Most data manipulation tools offer functionalities that simplify window operations. In this article, we explored how to leverage PySpark for these operations effectively.

Thank you for your attention! I welcome any feedback or questions you may have.

Share the page:

Twitter Facebook Reddit LinkIn

-----------------------

Recent Post:

Keeping Your Brain Sharp: 3 Essential Habits for Enhanced Focus

Discover three key habits to maintain brain health and cognitive function as you age.

Spotlighting Hidden Literary Gems: 10 Outstanding Showcases

Discover extraordinary stories through 10 showcases highlighting talented writers whose work deserves more recognition.

Finding Meaning in a Capitalist World: A Call to Action

Explore how capitalism influences our lives, and discover alternatives for finding true meaning beyond materialism.

Unveiling the Universe: The James Webb Space Telescope's Impact

Discover the transformative potential of the James Webb Space Telescope and its implications for our understanding of the universe.

A Beautiful Reflection on Love and Aging

A touching story of Sarah and James, whose love endures through the trials of aging, showcasing the beauty of companionship in later life.

Building a Robust Data Mesh on Microsoft Azure: A Comprehensive Guide

Explore how to effectively implement a Data Mesh architecture on Microsoft Azure, enhancing data management across your organization.

Building a Profitable Business in Just Two Hours Daily

Discover how to create a successful one-person business with just two hours a day. Learn the importance of focus and self-improvement.

Unlocking the Secrets to Crafting Engaging Digital Content

Discover essential tips for creating captivating digital content that connects with your audience and enhances engagement.