# if not installed already
pip install pyspark
pip install findspark
# Import general spark libraries
import findspark
import wget
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
findspark.init()
Create Spark Session
Forecast Sales - SparkML
Objectives
- We will download the search term data for an e-commerce web server
- Analyze it
- Use ML Model to predict and forecast future sales
Setup
Create Spark Session
# Create a spark session
= SparkSession \
spark \
.builder "Saving and Loading a SparkML Model") \
.appName("Spark.some.config.option","some-value") \
.config( .getOrCreate()
Data
Download
Download the data from online to a local drive: named: searchterms.csv
Load into Spark DF
# Load Data into Spark DF
= spark.read.csv("d:/Education/R/Projects/Capstone/searchterms.csv", header= True) searchterms_df
View Shape
# View shape of DF
print((searchterms_df.count(), len(searchterms_df.columns)))
10000, 4) (
View First 5 Rows
# View first 5 rows of DF
5)
searchterms_df.show(
+---+-----+----+--------------+
|day|month|year| searchterm|
+---+-----+----+--------------+
| 12| 11|2021| mobile 6 inch|
| 12| 11|2021| mobile latest|
| 12| 11|2021| tablet wifi|
| 12| 11|2021|laptop 14 inch|
| 12| 11|2021| mobile 5g|
+---+-----+----+--------------+
5 rows only showing top
View Schema
# View Schema = Datatypes of all columns
searchterms_df.printSchema()
root|-- day: string (nullable = true)
|-- month: string (nullable = true)
|-- year: string (nullable = true)
|-- searchterm: string (nullable = true)
Create View
Before we can query the DF with SQL we need to create a view
# Create TempView
"DROP VIEW IF EXISTS search")
spark.sql("search") searchterms_df.createTempView(
Count Searched Terms
- How many times was the term “gaming laptop” searched for?
- We could filter out for the term itself but let’s see what we have first when we group
Let’s Group first
# Group by searchterm
= spark.sql("SELECT searchterm, count(*) AS NumTimes FROM search GROUP BY searchterm")
result
result.show()
+-------------------+--------+
| searchterm|NumTimes|
+-------------------+--------+
| mobile 5g| 2301|
|ebooks data science| 410|
| mobile 6 inch| 2312|
| tablet 10 inch| 715|
| laptop| 935|
| mobile latest| 1327|
| gaming laptop| 499|
| laptop 14 inch| 461|
| tablet wifi| 896|
| pen drive| 144|
+-------------------+--------+
Order DESC
# Let's order the result to find the top searchterm
= spark.sql("SELECT searchterm, count(*) AS NumTimes FROM search GROUP BY searchterm ORDER BY NumTimes DESC")
result
result.show()
+-------------------+--------+
| searchterm|NumTimes|
+-------------------+--------+
| mobile 6 inch| 2312|
| mobile 5g| 2301|
| mobile latest| 1327|
| laptop| 935|
| tablet wifi| 896|
| tablet 10 inch| 715|
| gaming laptop| 499|
| laptop 14 inch| 461|
|ebooks data science| 410|
| pen drive| 144|
+-------------------+--------+
Filter One Searchterm
- Even though we can see the results above, let’s just filter for one term
- Since we have the TempView let’s use SQL again
# Find out how many times gaming laptop appeared
= spark.sql("SELECT count(*) AS GamingLaptopAppeared FROM search WHERE searchterm = 'gaming laptop'")
result
result.show()
+--------------------+
|GamingLaptopAppeared|
+--------------------+
| 499|
+--------------------+
Pretrained Forecasting Model
Download the Model
- Downloaded the model to my local drive, file named: sales_prediction.model
import requests
= 'https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-DB0321EN-SkillsNetwork/Bigdata%20and%20Spark/model.tar.gz'
url = 'model.tar.gz'
target_path
= requests.get(url, stream=True)
response if response.status_code == 200:
with open(target_path, 'wb') as f:
f.write(response.raw.read())
Unzip tar file
# Open up a terminal in the directory where the tar file is loacated or cd over to it then use this
-xvzf model.tar.gz
tar
#for lab jupyter notebook
!tar -xvzf model.tar.gz
/
sales_prediction.model/metadata/
sales_prediction.model/metadata/part-00000
sales_prediction.model/metadata/.part-00000.crc
sales_prediction.model/metadata/_SUCCESS
sales_prediction.model/metadata/._SUCCESS.crc
sales_prediction.model/data/
sales_prediction.model/data/part-00000-1db9fe2f-4d93-4b1f-966b-3b09e72d664e-c000.snappy.parquet
sales_prediction.model/data/_SUCCESS
sales_prediction.model/data/.part-00000-1db9fe2f-4d93-4b1f-966b-3b09e72d664e-c000.snappy.parquet.crc
sales_prediction.model/data/._SUCCESS.crc sales_prediction.model
Load LR Model
# Load LR model from local drive
from pyspark.ml.regression import LinearRegressionModel
= LinearRegressionModel.load('sales_prediction.model') model
Predict Sales
Using the sales forecast model, predict the sales for the year of 2023.
- We need to decide on what to use for input: year
- And for output: sales
- Now we write the function that will convert the dataframe columns into feature vectors
Create Function
# Import Spark ML libraries
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
def predict(sales):
= VectorAssembler(
assembler =['sales'],
inputCols='features')
outputCol
= [[sales,0]]
data = ['sales', 'year']
columns = spark.createDataFrame(data,columns)
_ = assembler.transform(_).select('features','year')
__ = model.transform(__)
predictions 'prediction').show() predictions.select(
def predict(year):
= [[year]]
data = ['year']
columns = spark.createDataFrame(data,columns)
input_df
= VectorAssembler(inputCols=['year'], outputCol='features')
assembler = assembler.transform(input_df).select('features')
features_df
= model.transform(features_df)
predictions 'prediction').show() predictions.select(
OR
# The only issue here is it's hard coded as opposed to being a function that will accept various inputs
= VectorAssembler(
assembler =['year'],
inputCols='features')
outputCol= [[2023,0]]
data = ['year', 'sales']
columns = spark.createDataFrame(data,columns)
_ = assembler.transform(_).select('features','sales')
__ = model.transform(__)
predictions 'prediction').show() predictions.select(
Predict
2023)
predict(
+------------------+
| prediction|
+------------------+
|175.16564294006457|
+------------------+
2022)
predict(
+------------------+
| prediction|
+------------------+
|168.64307507877493|
+------------------+