Create Spark Session

Forecast Sales - SparkML

Objectives


  • We will download the search term data for an e-commerce web server
  • Analyze it
  • Use ML Model to predict and forecast future sales

Setup


# if not installed already
pip install pyspark
pip install findspark

# Import general spark libraries
import findspark
import wget

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

findspark.init()

Create Spark Session

# Create a spark session
spark = SparkSession \
    .builder \
    .appName("Saving and Loading a SparkML Model") \
    .config("Spark.some.config.option","some-value") \
    .getOrCreate()

Data


Download

Download the data from online to a local drive: named: searchterms.csv

Load into Spark DF

# Load Data into Spark DF
searchterms_df = spark.read.csv("d:/Education/R/Projects/Capstone/searchterms.csv", header= True)

View Shape

# View shape of DF
print((searchterms_df.count(), len(searchterms_df.columns)))

(10000, 4)

View First 5 Rows

# View first 5 rows of DF
searchterms_df.show(5)

+---+-----+----+--------------+
|day|month|year|    searchterm|
+---+-----+----+--------------+
| 12|   11|2021| mobile 6 inch|
| 12|   11|2021| mobile latest|
| 12|   11|2021|   tablet wifi|
| 12|   11|2021|laptop 14 inch|
| 12|   11|2021|     mobile 5g|
+---+-----+----+--------------+
only showing top 5 rows

View Schema

# View Schema = Datatypes of all columns
searchterms_df.printSchema()

root
 |-- day: string (nullable = true)
 |-- month: string (nullable = true)
 |-- year: string (nullable = true)
 |-- searchterm: string (nullable = true)

Create View

Before we can query the DF with SQL we need to create a view

# Create TempView
spark.sql("DROP VIEW IF EXISTS search")
searchterms_df.createTempView("search")

Count Searched Terms

  • How many times was the term “gaming laptop” searched for?
  • We could filter out for the term itself but let’s see what we have first when we group

Let’s Group first

# Group by searchterm
result = spark.sql("SELECT searchterm, count(*) AS NumTimes FROM search GROUP BY searchterm")
result.show()

+-------------------+--------+
|         searchterm|NumTimes|
+-------------------+--------+
|          mobile 5g|    2301|
|ebooks data science|     410|
|      mobile 6 inch|    2312|
|     tablet 10 inch|     715|
|             laptop|     935|
|      mobile latest|    1327|
|      gaming laptop|     499|
|     laptop 14 inch|     461|
|        tablet wifi|     896|
|          pen drive|     144|
+-------------------+--------+

Order DESC

# Let's order the result to find the top searchterm
result = spark.sql("SELECT searchterm, count(*) AS NumTimes FROM search GROUP BY searchterm ORDER BY NumTimes DESC")
result.show()

+-------------------+--------+
|         searchterm|NumTimes|
+-------------------+--------+
|      mobile 6 inch|    2312|
|          mobile 5g|    2301|
|      mobile latest|    1327|
|             laptop|     935|
|        tablet wifi|     896|
|     tablet 10 inch|     715|
|      gaming laptop|     499|
|     laptop 14 inch|     461|
|ebooks data science|     410|
|          pen drive|     144|
+-------------------+--------+

Filter One Searchterm

  • Even though we can see the results above, let’s just filter for one term
  • Since we have the TempView let’s use SQL again
# Find out how many times gaming laptop appeared
result = spark.sql("SELECT count(*) AS GamingLaptopAppeared FROM search WHERE searchterm = 'gaming laptop'")
result.show()

+--------------------+
|GamingLaptopAppeared|
+--------------------+
|                 499|
+--------------------+

Pretrained Forecasting Model


Download the Model

  • Downloaded the model to my local drive, file named: sales_prediction.model
import requests

url = 'https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-DB0321EN-SkillsNetwork/Bigdata%20and%20Spark/model.tar.gz'
target_path = 'model.tar.gz'

response = requests.get(url, stream=True)
if response.status_code == 200:
    with open(target_path, 'wb') as f:
        f.write(response.raw.read())

Unzip tar file

# Open up a terminal in the directory where the tar file is loacated or cd over to it then use this
tar -xvzf model.tar.gz

#for lab jupyter notebook
!tar -xvzf model.tar.gz

sales_prediction.model/
sales_prediction.model/metadata/
sales_prediction.model/metadata/part-00000
sales_prediction.model/metadata/.part-00000.crc
sales_prediction.model/metadata/_SUCCESS
sales_prediction.model/metadata/._SUCCESS.crc
sales_prediction.model/data/
sales_prediction.model/data/part-00000-1db9fe2f-4d93-4b1f-966b-3b09e72d664e-c000.snappy.parquet
sales_prediction.model/data/_SUCCESS
sales_prediction.model/data/.part-00000-1db9fe2f-4d93-4b1f-966b-3b09e72d664e-c000.snappy.parquet.crc
sales_prediction.model/data/._SUCCESS.crc

Load LR Model

# Load LR model from local drive
from pyspark.ml.regression import LinearRegressionModel
model = LinearRegressionModel.load('sales_prediction.model')  

Predict Sales


Using the sales forecast model, predict the sales for the year of 2023.

  • We need to decide on what to use for input: year
  • And for output: sales
  • Now we write the function that will convert the dataframe columns into feature vectors

Create Function

# Import Spark ML libraries
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
def predict(sales):
  assembler = VectorAssembler(
  inputCols=['sales'],
  outputCol='features')
  
  data = [[sales,0]]
  columns = ['sales', 'year']
  _ = spark.createDataFrame(data,columns)
  __ = assembler.transform(_).select('features','year')
  predictions = model.transform(__)
  predictions.select('prediction').show()
def predict(year):
  data = [[year]]
  columns = ['year']
  input_df = spark.createDataFrame(data,columns)
  
  assembler = VectorAssembler(inputCols=['year'],  outputCol='features')
  features_df = assembler.transform(input_df).select('features')

  predictions = model.transform(features_df)
  predictions.select('prediction').show()

OR

# The only issue here is it's hard coded as opposed to being a function that will accept various inputs
assembler = VectorAssembler(
  inputCols=['year'],
  outputCol='features')
data = [[2023,0]]
columns = ['year', 'sales']
_ = spark.createDataFrame(data,columns)
__ = assembler.transform(_).select('features','sales')
predictions = model.transform(__)
predictions.select('prediction').show()

Predict

predict(2023)

+------------------+
|        prediction|
+------------------+
|175.16564294006457|
+------------------+
predict(2022)

+------------------+
|        prediction|
+------------------+
|168.64307507877493|
+------------------+