Getting Started with PySpark With Examples
The article is mainly focused on people who have just started in the field of big data and Spark with Python, or even if you are a professional, you may just wanna stick around and brush up on your fundamentals such as setting up the environment for application development using Pyspark and then build end top end pipeline to read data from files, process and write to files.
This article also covers UDFs (User Defined Functions) in Spark.
What is Spark?
Apache Spark is nothing but a unified analytics engine, which is primarily used for big data i.e. large-scale data processing. It is convenient to use for the developers as it provides high-level APIs in Java, Scala, Python, and R. But it does not mean that if you are a SQL developer you won't be able to use Spark, in fact, it also supports higher-level tools including Spark SQL for SQL and structured data processing.
What is PySpark?
PySpark is an interface for Apache Spark in Python. It does not just permit you to compose Spark applications utilizing Python APIs, yet in addition, gives the PySpark shell to intuitively dissecting your information in a disseminated environment.
PySpark supports most spark features like spark SQL, Dataframe, Streaming, MLib, Spark Core.
Installation
Let’s see how we can install PySpark in Ubuntu and windows.
Ubuntu
- Install Python and Install pip
- Install Jupyter Notebook
- Install Java using the command sudo apt-get install default-jre
- Install Scala using the command sudo apt-get install Scala
- Install Py4J, which connects Python with Java and Scala using the command pip3, install Py4J
- Install Apache Spark from the browser
sudo tar -zxvf spark-2.1.0-bin-hadoop2.3.tgz
export SPARK_HOME = 'home/ubunutu/spark-2.1.0-bin-hadoop.2.7'
export PATH = $SPARK_HOME:$PATH
export PYTHONPATH = $SPARK_HOME/python:$PYTHONPATH
export PYSPARK_DRIVER_PYTHON = 'jupyter'
export PYSPARK_DRIVER_PYTHON_OPTS='notebook'
export PYSPARK_PYTHON=python3
If you want to access PySpark move to spark/python, But if you want to access PySpark from anywhere without changing the directory then:
pip3 install findspark
Windows
- Download and Install Anaconda
2. Download and Install Java
3. Download Apache Spark
https://dlcdn.apache.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
After downloading Apache spark extract it, and place it in local disk C.
4. Download Winutils
Once you download the winutils code and paste it inside ‘C:\spark-3.1.2-bin-hadoop3.2\bin’
5. SET Environment Variables
In User Variables
In system variables add a new path: C:\spark-3.1.2-bin-hadoop3.2\bin
Install find Spark package: In anaconda prompt run
conda install -c conda-forge findspark
Wohooo!! PySpark is up and running on your laptops.
6. Run random examples
Time to run our random examples, which can be useful for you in the real world.
I am using windows as my operating system.
Open the Jupyter notebook and follow along, you will be able to get the codes from the Github link along with the CSV file used.
Import Libraries, Set Environment and Find Spark
import os
import sys
import glob
os.environ['SPARK_HOME'] = 'C:\spark-3.1.2-bin-hadoop3.2'
os.environ['JAVA_HOME'] = 'C:\Program Files\Java\jdk1.8.0_201'
os.environ['HADOOP_HOME'] = 'C:\spark-3.1.2-bin-hadoop3.2'
spark_python = os.path.join(os.environ.get('SPARK_HOME',None),'python')
py4j = glob.glob(os.path.join(spark_python,'lib','py4j-*.zip'))[0]
sys.path[:0]=[spark_python,py4j]
os.environ['PYTHONPATH']=py4j
import findspark
findspark.init()
findspark.find()
Initialize Spark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Python Basic Examples").getOrCreate()
Read the CSV File into the Data Frame
df = spark.read.csv('F:/main course/Artificial_Neural_Networks/AppleStore.csv',inferSchema=True,header=True)
df.printSchema()
df.show(truncate=False)
Let’s select few columns from the database,
df = df.select(['size_bytes','price','prime_genre','user_rating'])
Because this is my local computer and does not have enough memory, to show the functionalities I will reduce my dataframe to a smaller subset. If you have enough memory or you are on a cluster, feel free to skip the next step
df = df.limit(20)
Cast Operation
from pyspark.sql.functions import col
from pyspark.sql.types import StringType
df_transformed = df.withColumn("price_new",col("price").cast(StringType())
Creating an UDF
Let us create an UDF which pads 0s towards the left.
from pyspark.sql.functions import udf
def leftPad(string):
string = str(string).rjust(10,'0')
return string
convertUDF = udf(lambda z: leftPad(z),StringType())
Creating a UDF to change the Values to Upper Case
convertUDF2 = udf(lambda z: str(z).upper(),StringType())
Using the UDFs
df_transformed = df_transformed. \
withColumn(
"price_new",
convertUDF(col("price_new")).alias("price_new")
)
df_transformed = df_transformed. \
withColumn(
"prime_genre",
convertUDF2(col("prime_genre")).alias("prime_genre")
)
Using Substring and Trim
from pyspark.sql.functions import substring, trim
df_transformed = df_transformed.withColumn("price_new",substring('price_new',6,4))
df_transformed = df_transformed. \
withColumn('prime_genre', trim('prime_genre'))
Lets see what our transformed dataframe looks like
df_transformed.show()
Analysis and Operations
Count Distinct
from pyspark.sql.functions import countDistint
df_distinct_count = df_transformed.select(countDistint("prime_genre").alias("prime_genre_count"))
df_distinct_count.show()
Analyze Data using analytical or windowing functions
Let us go ahead and understand how to get Frequency, cumulative frequency and cumulative percentage using analytical or windowing functions of Pyspark.
from pyspark.sql.functions import count
frequencies = df_transformed. \
groupBy('prime_genre'). \
agg(count('prime_genre').alias('frequency')). \
selectExpr(
'*',
'100*Frequency / sum(Frequency) over() Percent'
). \
selectExpr(
'*',
'sum(Frequency) over(order by Frequency desc) cumulative_frequency',
'sum(Percent) over(order by Frequency desc) cumulative_Percent'
)
frequencies.show()
Dealing with Nulls
Create a Dataframe showing the count of null and not null values from a given column
from pyspark.sql.functions import isnan
null_values = df_transformed.filter(
df_transformed.price.contains('None') | \
df_transformed.price.contains('NULL') | \
(col("price") == '') | \
isnan(df_transformed.price) | \
df_transformed.price.isNull()
).count()
Not_null_values = df_transformed.count() - null_values
data = [(null_values, Not_null_values)]
df_new = spark.createDataFrame(data, ["Null_value", "Not_null_values"])
df_new.show()
Create Dataframes for unique values and null values
Create two Data frames, one will have all the unique values and the other will have all the values which are dropped from the main data frame
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
w2 = Window. \
partitionBy('prime_genre'). \
orderBy(col("user_rating").desc())
df_duplicate = df_transformed. \
withColumn("row", row_number().over(w2)). \
filter(col("row") > 1). \
drop("row")
df_unique = df_transformed. \
withColumn("row", row_number().over(w2)). \
filter(col("row") == 1). \
drop("row")
df_duplicate.show()
df_unique.show()
I bet it was fun to do some hands on over PySpark and I hope the blog was useful for you.
Link for the code and csv:
Also If you are interested In Hive, check out this tutorial of mine: