Machine learning with Pyspark MLlib: Part 1 Regression

shorya sharma
7 min readJun 20, 2023

--

MLlib is Spark’s machine learning (ML) library. Its goal is to make practical machine learning scalable and easy. At a high level, it provides tools such as:

  • ML Algorithms: common learning algorithms such as classification, regression, clustering, and collaborative filtering
  • Featurization: feature extraction, transformation, dimensionality reduction, and selection
  • Pipelines: tools for constructing, evaluating, and tuning ML Pipelines
  • Persistence: saving and load algorithms, models, and Pipelines
  • Utilities: linear algebra, statistics, data handling, etc.

Prerequisites:

1. Pyspark Knowledge

2. You can code in any environment which has Pyspark installed, I am using community edition of databricks for demo purposes.

Regression models (both linear and non-linear) are used for predicting a real value, like salary for example. If your independent variable is time, then you are forecasting future values, otherwise your model is predicting present but unknown values.

Regression analysis is a form of predictive modelling technique which investigates the relationship between a dependent (target) and independent variable (s) (predictor). For example, relationship between rash driving and number of road accidents by a driver is best studied through regression.

In this part, you will understand and learn how to implement the following Machine Learning Regression models:

Simple Linear Regression

It is used to estimate real values (cost of houses, number of calls, total sales etc.) based on continuous variable(s). Here, we establish relationship between independent and dependent variables by fitting a best line.

This best fit line is known as regression line and represented by a linear equation Y= a *X + b.

Code:

Dataset is available at :

ML_Sklearn/Simple linear regression at master · shorya1996/ML_Sklearn · GitHub

from pyspark.sql import SparkSession
from pyspark.ml.stat import Correlation
import pyspark.sql.functions as f
from pyspark.ml.linalg import Vector
from pyspark.ml.feature import VectorAssembler
spark = SparkSession.builder.getOrCreate()
from pyspark.sql.types import *
Schema = StructType([
StructField('YearsExperience', FloatType(), True),
StructField('Salary', FloatType(), True)
])
df = spark.read.csv("/FileStore/tables/Salary_Data.csv", header=True,schema=Schema)
assembler = VectorAssembler(inputCols=['YearsExperience'],outputCol='features')

data_set = assembler.transform(df)
data_set = data_set.select(['features','salary'])
train_data,test_data = data_set.randomSplit([0.8,0.2])
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol="features",labelCol='salary')
lrModel = lr.fit(train_data)
test_stats = lrModel.evaluate(test_data)
print(f"RMSE: {test_stats.rootMeanSquaredError}")
print(f"R2: {test_stats.r2}")
print(f"R2: {test_stats.meanSquaredError}")
test_stats.predictions.show()

Multiple Linear regression

Linear Regression is of mainly two types: Simple Linear Regression and Multiple Linear Regression. Simple Linear Regression is characterized by one independent variable. And, Multiple Linear Regression(as the name suggests) is characterized by multiple (more than 1) independent variables.

Let us say that you ask a doctor to find your BMI ( body mas index ) value. He will take into consideration your height and weight to compute your BMI. So here, BMI is your dependent variable and weight and height is your independent variables.

Code:

Dataset is available at as 50_Startups.csv:

ML_Sklearn/Multiple Linear Regression/50_Startups.csv at master · shorya1996/ML_Sklearn · GitHub

I created a table in databricks out of a csv file, you can use the csv file directly also.

df = spark.sql("select * from startups")
df.show()
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol='State', outputCol='State_numeric')
indexer_fitted = indexer.fit(df)
df_indexed = indexer_fitted.transform(df)
from pyspark.ml.feature import OneHotEncoder
encoder = OneHotEncoder(inputCols=['State_numeric'], outputCols=['State_onehot'])
df_onehot = encoder.fit(df_indexed).transform(df_indexed)
df_onehot.printSchema()
from pyspark.ml.functions import vector_to_array
df_col_onehot = df_onehot.select('*', vector_to_array('state_onehot').alias('col_onehot'))
df_col_onehot.show()
num_categories = len(df_col_onehot.first()['col_onehot']) 
cols_expanded = [(f.col('col_onehot')[i].alias(f'{indexer_fitted.labels[i]}')) for i in range(num_categories)]
df_cols_onehot = df_col_onehot.select('*', *cols_expanded)
df_cols_onehot.show()
df_final = df_cols_onehot.select("R&D Spend", "Administration", "Marketing Spend", "California", "New York", "profit")
assembler = VectorAssembler(inputCols=df_final.columns[:-1],outputCol='features')

data_set = assembler.transform(df_final)
data_set = data_set.select(['features','profit'])
data_set.show()
train_data,test_data = data_set.randomSplit([0.8,0.2])
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol="features",labelCol='profit')
lrModel = lr.fit(train_data)
test_stats = lrModel.evaluate(test_data)
print(f"RMSE: {test_stats.rootMeanSquaredError}")
print(f"R2: {test_stats.r2}")
print(f"R2: {test_stats.meanSquaredError}")
test_stats.predictions.show()

Decision Tree Regression

Decision tree builds regression or classification models in the form of a tree structure. It breaks down a dataset into smaller and smaller subsets while at the same time an associated decision tree is incrementally developed. The result is a tree with decision nodes and leaf nodes. A decision node has two or more, each representing values for the attribute tested. Leaf node represents a decision on the numerical target. The topmost decision node in a tree which corresponds to the best predictor called root node. Decision trees can handle both categorical and numerical data.

Decision Tree Algorithm

The core algorithm for building decision trees called ID3 by J. R. Quinlan which employs a top-down, greedy search through the space of possible branches with no backtracking. The ID3 algorithm can be used to construct a decision tree for regression by replacing Information Gain with Standard Deviation Reduction.

Standard Deviation

A decision tree is built top-down from a root node and involves partitioning the data into subsets that contain instances with similar values (homogenous). We use standard deviation to calculate the homogeneity of a numerical sample. If the numerical sample is completely homogeneous its standard deviation is zero.

a) Standard deviation for one attribute

Standard Deviation Reduction

The standard deviation reduction is based on the decrease in standard deviation after a dataset is split on an attribute. Constructing a decision tree is all about finding attribute that returns the highest standard deviation reduction (i.e., the most homogeneous branches).

Code:

Dataset is available at

ML_Sklearn/Decision Tree Regression/Position_Salaries.csv at master · shorya1996/ML_Sklearn · GitHub

df = spark.sql("select * from position_salaries")
df.show()
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol='Position', outputCol='Position_numeric')
indexer_fitted = indexer.fit(df)
df_indexed = indexer_fitted.transform(df)
assembler = VectorAssembler(inputCols=[ 'Level', 'Position_numeric'],outputCol='features')
data_set = assembler.transform(df_indexed)
data_set = data_set.select(['features','Salary'])
data_set.show()
from pyspark.ml.regression import DecisionTreeRegressor
dr = DecisionTreeRegressor(featuresCol="features",labelCol='Salary')
drModel = dr.fit(data_set)

Random Forest Regression

The random forest model is a type of additive model that makes predictions by combining decisions from a sequence of base models. More formally we can write this class of models as:

g(x)=f0(x)+f1(x)+f2(x)+…

where the final model g is the sum of simple base models fi. Here, each base classifier is a simple decision tree.

STEPS IN RANDOM FOREST REGRESSION

Code:

Dataset used is the real estate dataset available at:

ML_Sklearn/Decision Tree Regression/Real estate valuation data set.csv at master · shorya1996/ML_Sklearn · GitHub

# The inputs are as follows
# X1=the transaction date (for example, 2013.250=2013 March, 2013.500=2013 June, etc.)
# X2=the house age (unit: year)
# X3=the distance to the nearest MRT station (unit: meter)
# X4=the number of convenience stores in the living circle on foot (integer)
# X5=the geographic coordinate, latitude. (unit: degree)
# X6=the geographic coordinate, longitude. (unit: degree)

# The output is as follow
# Y= house price of unit area (10000 New Taiwan Dollar/Ping, where Ping is a local unit, 1 Ping = 3.3 meter squared)


df = spark.sql("select * from real_estate")
df.show()
assembler = VectorAssembler(inputCols=df.columns[1:-1],outputCol='features')

data_set = assembler.transform(df)
data_set = data_set.select(['features','Y house price of unit area'])
data_set.show()
train_data,test_data = data_set.randomSplit([0.8,0.2])
from pyspark.ml.regression import RandomForestRegressor
rf = RandomForestRegressor(featuresCol="features",labelCol='Y house price of unit area')
rfModel = rf.fit(train_data)
test_stats = rfModel.transform(test_data)
test_stats.show()
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(
labelCol="Y house price of unit area", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(test_stats)
rmse

7.484506897505033

This brings us to the end of our part 1. The complete code is available at

PySpark/regression_mllib.ipynb at main · shorya1996/PySpark (github.com)

follow me on LinkedIn

LinkedIn: https://www.linkedin.com/in/shorya-sharma-b94161121

--

--

shorya sharma
shorya sharma

Written by shorya sharma

Assistant Manager at Bank Of America | Ex-Data Engineer at IBM | Ex - Software Engineer at Compunnel inc. | Python | Data Science

No responses yet