Reading Data from Spark or Hive Metastore and MySQL

shorya sharma
5 min readNov 28, 2021

--

In this article, we’ll learn to use Hive in the PySpark project and connect to the MySQL database through PySpark using Spark over JDBC.

Hello again! So, a keen interest in PySpark brought you here, or some requirement at your workplace or for whatever reason, I am glad you are here.

We will also look at some very useful functionalities as a bonus.

If you have not seen my article regarding setting up PySpark in windows, I highly recommend you to do that as a prerequisite.

Let’s get started!

Import Libraries and Create a Spark Session

import os
import sys
import glob

os.environ['SPARK_HOME'] = 'C:\spark-3.1.2-bin-hadoop3.2'
os.environ['JAVA_HOME'] = 'C:\Program Files\Java\jdk1.8.0_201'
os.environ['HADOOP_HOME'] = 'C:\spark-3.1.2-bin-hadoop3.2'

spark_python = os.path.join(os.environ.get('SPARK_HOME', None), 'python')
py4j = glob.glob(os.path.join(spark_python, 'lib', 'py4j-*.zip'))[0]
sys.path[:0] = [spark_python, py4j]
os.environ['PYTHONPATH'] = py4j

import findspark

findspark.init()
findspark.find()

from pyspark.sql import SparkSession

spark = SparkSession. \
builder. \
appName("Spark Examples"). \
enableHiveSupport(). \
getOrCreate()

If you see the above code, we have enabled hive support which will let us use hive functionalities.

Create Two Data Frames

employees = [
(1, "Smith", -1, "2018", "10", "M", 3000),
(2, "Rose", 1, "2010", "20", "M", 4000),
(3, "Williams", 1, "2010", "10", "M", 1000),
(4, "Jones", 2, "2005", "10", "F", 2000),
(5, "Brown", 2, "2010", "40", "", -1),
(6, "Brown", 2, "2010", "50", "", -1)
]
empColumns = [
"emp_id", "name", "superior_emp_id", "year_joined",
"emp_dept_id", "gender", "salary"
]

empDF = spark.createDataFrame(data=employees, schema=empColumns)
empDF.printSchema()
empDF.show(truncate=False)

departments = [
("Finance", 10),
("Marketing", 20),
("Sales", 30),
("IT", 40)
]

deptColumns = ["dept_name", "dept_id"]

deptDF = spark. \
createDataFrame(data=departments, schema=deptColumns)

deptDF.printSchema()
deptDF.show(truncate=False)

Setup required Hive Metastore Database and Tables

Create a Database and Tables to Store these Data Frames in Hive.

spark.sql("create database if not exists employee_db")
spark.sql("use employee_db")
Output of Creating Database
empDF.createOrReplaceTempView("empDF")
deptDF.createOrReplaceTempView("deptDF")
spark.sql("create table emp_tbl as select * from empDF")
spark.sql("create table dept_tbl as select * from deptDF")
Output of Creating Tables
Validation of Created Database in Spark Warehouse
Validation of Hive Tables Created in Database

Note: if you get an error while performing the above code, then follow the below steps.

Step 1: Download Hadoop zip file from the below URL, extract the files, and paste into ‘C:\spark-3.1.2-bin-hadoop3.2\bin folder’.

Step 2: Restart the kernel and open Jupyter notebook from the Anaconda prompt as an administrator.

Transformations

Let us perform some useful transformations like JOIN, PIVOT, CUSTOM UNION, and removal of duplicate columns after joining.

Join Operation

empDF_deptDF_join = empDF. \
join(deptDF, empDF.emp_dept_id == deptDF.dept_id,"inner")
empDF_deptDF_join.show()
Output of Inner Joining Tables (empDF and deptDF)

Pivot Operation or Transpose

df_pivot = empDF_deptDF_join. \
groupBy("dept_id", "dept_name"). \
count()
df_pivot.createOrReplaceTempView("df_pivot")df_trans = spark.sql("""
SELECT dept_id,
STACK(1, 'dept_name', dept_name) AS (name, col)
FROM df_pivot
""")
df_trans.show()
Output of Transposing
df_trans = spark.sql("""
with Cte AS (
select *,
ROW_NUMBER() OVER (Partition by dept_id order by dept_id) as Rn
FROM df_stacked
)
select Cte.dept_id,
MAX(CASE WHEN Cte.Rn = 1 THEN Cte.col END) as COL1,
MAX(CASE WHEN Cte.Rn = 2 THEN Cte.col END) as COL2,
MAX(CASE WHEN Cte.Rn = 3 THEN Cte.col END) as COL3
FROM Cte
GROUP BY Cte.name, Cte.dept_id
""")
df_trans.show()

Union of Data Frames with Different Columns

from pyspark.sql.functions import *


def customUnion(df1, df2):
cols1 = [x.lower() for x in df1.columns]
cols2 = [x.lower() for x in df2.columns]
total_cols = sorted(cols1 + list(set(cols2) - set(cols1)))

def expr(mycols, allcols):
def processCols(colname):
if colname in mycols:
return colname
else:
return lit(None).alias(colname)

cols = map(processCols, allcols)
return list(cols)

appended = df1. \
select(expr(cols1, total_cols)). \
union(df2.select(expr(cols2, total_cols)))
return appended


df_union = customUnion(empDF, deptDF)
df_union.show()

Removing Duplicate Column Names After Join

dept1 = [
("Finance", 10),
("Marketing", 20),
("Sales", 30),
("IT", 40)
]
deptColumns1 = ["dept_name", "emp_dept_id"]
df3 = spark.createDataFrame(data=dept1, schema=deptColumns1)
df3 = empDF.join(df3, empDF.emp_dept_id == df3.emp_dept_id, 'inner')
df3.show()
Output of Inner Join Operation
cols_new = []
seen = set()

for c in df3.columns:
c = c.lower()
cols_new.append('{}_dup'.format(c) if c in seen else c)
seen.add(c)

df3 = df3. \
toDF(*cols_new). \
select(*[c for c in cols_new if not c.endswith('_dup')])

df3.show()
Output for Removing Duplicate Column Names After Join

I know you are waiting for me to show you how to connect PySpark with MySQL, here it is.

Download the JDBC Driver: https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.45.tar.gz

Extract “mysql-connector-java-5.1.45.tar.gz” and place the file inside ‘C:\spark-3.1.2-bin-hadoop3.2\jars’

And finally.

Spark Over JDBC using MySQL

jdbcHostname = "use your hostname"
jdbcDatabase = "use your database name"
jdbcPort = 3306
jdbcUrl = "jdbc:mysql://{0}:{1}/{2}?user={3}&password={4}".format(
jdbcHostname,
jdbcPort,
jdbcDatabase,
username,
password
)
pushdown_query = "(select * from tablename) emp_alias"
df1 = spark.read.jdbc(url=jdbcUrl, table=pushdown_query)
df1.show()

Wolaaa! we accomplished so much today congratulations.

--

--

shorya sharma
shorya sharma

Written by shorya sharma

Assistant Manager at Bank Of America | Ex-Data Engineer at IBM | Ex - Software Engineer at Compunnel inc. | Python | Data Science

No responses yet