Reading Data from Spark or Hive Metastore and MySQL
In this article, we’ll learn to use Hive in the PySpark project and connect to the MySQL database through PySpark using Spark over JDBC.
Hello again! So, a keen interest in PySpark brought you here, or some requirement at your workplace or for whatever reason, I am glad you are here.
We will also look at some very useful functionalities as a bonus.
If you have not seen my article regarding setting up PySpark in windows, I highly recommend you to do that as a prerequisite.
Let’s get started!
Import Libraries and Create a Spark Session
import os
import sys
import glob
os.environ['SPARK_HOME'] = 'C:\spark-3.1.2-bin-hadoop3.2'
os.environ['JAVA_HOME'] = 'C:\Program Files\Java\jdk1.8.0_201'
os.environ['HADOOP_HOME'] = 'C:\spark-3.1.2-bin-hadoop3.2'
spark_python = os.path.join(os.environ.get('SPARK_HOME', None), 'python')
py4j = glob.glob(os.path.join(spark_python, 'lib', 'py4j-*.zip'))[0]
sys.path[:0] = [spark_python, py4j]
os.environ['PYTHONPATH'] = py4j
import findspark
findspark.init()
findspark.find()
from pyspark.sql import SparkSession
spark = SparkSession. \
builder. \
appName("Spark Examples"). \
enableHiveSupport(). \
getOrCreate()
If you see the above code, we have enabled hive support which will let us use hive functionalities.
Create Two Data Frames
employees = [
(1, "Smith", -1, "2018", "10", "M", 3000),
(2, "Rose", 1, "2010", "20", "M", 4000),
(3, "Williams", 1, "2010", "10", "M", 1000),
(4, "Jones", 2, "2005", "10", "F", 2000),
(5, "Brown", 2, "2010", "40", "", -1),
(6, "Brown", 2, "2010", "50", "", -1)
]
empColumns = [
"emp_id", "name", "superior_emp_id", "year_joined",
"emp_dept_id", "gender", "salary"
]
empDF = spark.createDataFrame(data=employees, schema=empColumns)
empDF.printSchema()
empDF.show(truncate=False)
departments = [
("Finance", 10),
("Marketing", 20),
("Sales", 30),
("IT", 40)
]
deptColumns = ["dept_name", "dept_id"]
deptDF = spark. \
createDataFrame(data=departments, schema=deptColumns)
deptDF.printSchema()
deptDF.show(truncate=False)
Setup required Hive Metastore Database and Tables
Create a Database and Tables to Store these Data Frames in Hive.
spark.sql("create database if not exists employee_db")
spark.sql("use employee_db")
empDF.createOrReplaceTempView("empDF")
deptDF.createOrReplaceTempView("deptDF")spark.sql("create table emp_tbl as select * from empDF")
spark.sql("create table dept_tbl as select * from deptDF")
Note: if you get an error while performing the above code, then follow the below steps.
Step 1: Download Hadoop zip file from the below URL, extract the files, and paste into ‘C:\spark-3.1.2-bin-hadoop3.2\bin folder’.
Step 2: Restart the kernel and open Jupyter notebook from the Anaconda prompt as an administrator.
Transformations
Let us perform some useful transformations like JOIN, PIVOT, CUSTOM UNION, and removal of duplicate columns after joining.
Join Operation
empDF_deptDF_join = empDF. \
join(deptDF, empDF.emp_dept_id == deptDF.dept_id,"inner")empDF_deptDF_join.show()
Pivot Operation or Transpose
df_pivot = empDF_deptDF_join. \
groupBy("dept_id", "dept_name"). \
count()df_pivot.createOrReplaceTempView("df_pivot")df_trans = spark.sql("""
SELECT dept_id,
STACK(1, 'dept_name', dept_name) AS (name, col)
FROM df_pivot
""")df_trans.show()
df_trans = spark.sql("""
with Cte AS (
select *,
ROW_NUMBER() OVER (Partition by dept_id order by dept_id) as Rn
FROM df_stacked
)
select Cte.dept_id,
MAX(CASE WHEN Cte.Rn = 1 THEN Cte.col END) as COL1,
MAX(CASE WHEN Cte.Rn = 2 THEN Cte.col END) as COL2,
MAX(CASE WHEN Cte.Rn = 3 THEN Cte.col END) as COL3
FROM Cte
GROUP BY Cte.name, Cte.dept_id
""")
df_trans.show()
Union of Data Frames with Different Columns
from pyspark.sql.functions import *
def customUnion(df1, df2):
cols1 = [x.lower() for x in df1.columns]
cols2 = [x.lower() for x in df2.columns]
total_cols = sorted(cols1 + list(set(cols2) - set(cols1)))
def expr(mycols, allcols):
def processCols(colname):
if colname in mycols:
return colname
else:
return lit(None).alias(colname)
cols = map(processCols, allcols)
return list(cols)
appended = df1. \
select(expr(cols1, total_cols)). \
union(df2.select(expr(cols2, total_cols)))
return appended
df_union = customUnion(empDF, deptDF)
df_union.show()
Removing Duplicate Column Names After Join
dept1 = [
("Finance", 10),
("Marketing", 20),
("Sales", 30),
("IT", 40)
]
deptColumns1 = ["dept_name", "emp_dept_id"]
df3 = spark.createDataFrame(data=dept1, schema=deptColumns1)
df3 = empDF.join(df3, empDF.emp_dept_id == df3.emp_dept_id, 'inner')
df3.show()
cols_new = []
seen = set()
for c in df3.columns:
c = c.lower()
cols_new.append('{}_dup'.format(c) if c in seen else c)
seen.add(c)
df3 = df3. \
toDF(*cols_new). \
select(*[c for c in cols_new if not c.endswith('_dup')])
df3.show()
I know you are waiting for me to show you how to connect PySpark with MySQL, here it is.
Download the JDBC Driver: https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.45.tar.gz
Extract “mysql-connector-java-5.1.45.tar.gz” and place the file inside ‘C:\spark-3.1.2-bin-hadoop3.2\jars’
And finally.
Spark Over JDBC using MySQL
jdbcHostname = "use your hostname"
jdbcDatabase = "use your database name"
jdbcPort = 3306
jdbcUrl = "jdbc:mysql://{0}:{1}/{2}?user={3}&password={4}".format(
jdbcHostname,
jdbcPort,
jdbcDatabase,
username,
password
)
pushdown_query = "(select * from tablename) emp_alias"
df1 = spark.read.jdbc(url=jdbcUrl, table=pushdown_query)
df1.show()
Wolaaa! we accomplished so much today congratulations.
Referral Link for the Code:
Also If you are interested in Hive, check out this tutorial: