Developing a DataLake using Dataproc
This is part 3 and final part of our series on basics of data engineering on google cloud and hence will be a long one by the end of this article you will be able to perform the following tasks.
- Accessing HDFS from hadoop node shell.
- Loading data from GCS to HDFS.
- Creating Hive table on top of HDFS.
- Accessing an HDFS file from pyspark.
- Accessing a GCS file from Pyspark.
- Developing Spark ETL from HDFS to HDFS.
- Developing Spark ETL from GCS to GCS.
- Developing Spark ETL from GCS to BigQuery.
A data lake is a centralized repository that ingests and stores large volumes of data in its original form.
Pre-requisites
The below blog is adviced to go through for setting up your project as this will be a follow up tutorial.
https://sharmashorya1996.medium.com/data-warehouse-using-bigquery-basic-elt-1385c7662f4e
What is Dataproc ?
Dataproc is a managed Spark and Hadoop service that lets you take advantage of open source data tools for batch processing, querying, streaming, and machine learning. Dataproc automation helps you create clusters quickly, manage them easily, and save money by turning clusters off when you don’t need them.
Create a dataproc cluster
- search for the dataproc in google cloud console.
- Enable the API.
- Click on create cluster
4. Setup the cluster like the below Screenshots and hit on create.
Using cloud storage as a Data proc File System
By the end of this section we will see how we can use GCS inplace of HDFS as our file system.
Once your cluster is up and running , go inside it and click on VM instances tab.
Inside VM instances tab click on the SSH icon , it will take you to the master node shell environment.
Note: This is not your cloud shell , this is Hadoop master node linux. We can access HDFS from here.
Hdfs dfs -ls ../
Loading data from GCS to HDFS
upload the file simple_file.csv from below link to GCS bucket , once done we can upload the it to our master node by using the below command.
gsutil cp gs://[Bucket-name]/<PATH>/simple_file.csv ./
Now, we have data in our master node, and any data in Master node can be loaded to HDFS using below commands.
hdfs dfs -mkdir ../../data
hdfs dfs -mkdir ../../data/simple_file
hdfs dfs -put simple_file.csv ../../data/simple_file/
The commands uploaded the data from linux file system to HDFS.
Create a Hive Tabe on top of HDFS
We can access the file using Hive. Hive is a tool to give a schema to your file. use the below command to start hive.
hive
lets create our first table in hive using our data in HDFS.
create external table simple_table(
col_1 STRING,
col_2 STRING,
col3 STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS TEXTFILE
location '/data/simple_file'
TBLPROPERTIES("skip.header.line.count"="1")
;
exit;
Accessing an HDFS file from Pyspark
Start the spark shell using the below command.
pyspark
simple_file = sc.textFile('hdfs://cluster-3539-m/data/simple_file/simple_file.csv')
simple_file.collect()
Accessing GCS files from Pyspark
We can use pyspark to access GCS directly inplace of HDFS , in the spark shell just use teh below code.
simple_file = sc.textFile('gs://shorya-gcp-data-end-data-bucket/chapter-5/dataset/simple_file.csv')
simple_file.collect()
quit()
Creating and running jobs on Dataproc Cluster
Preparing Log Data in HDFS and GCS
- Create a folder in your cloud storage with the name logs_example and copy the log files from the location below to folder logs_example in cloud storage.
https://github.com/shorya1996/datawarehousing
2. Now from Hadoop master node shell, copy the logs_example folder there, and use the HDFS copyFromLocal command to load the files into HDFS.
gsutil cp -r gs://shorya-gcp-data-end-data-bucket/chapter-5/dataset/logs_example ./
hdfs dfs -copyFromLocal logs_example ../../data
hdfs dfs -ls ../../data/logs_example/
Developing Spark ETL from HDFS to HDFS
The goal of the exrecise is to calculate how many user requests each article has from the Apache log data.
- open the Cloud shell editor and create a new file pyspark_job.py
- write the below code in the file
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName('spark_hdfs_to_hdfs') \
.getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("WARN")
MASTER_NODE_INSTANCE_NAME="cluster-3539-m"
log_files_rdd = sc.textFile('hdfs://{}/data/logs_example/*'.format(MASTER_NODE_INSTANCE_NAME))
splitted_rdd = log_files_rdd.map(lambda x: x.split(" "))
selected_col_rdd = splitted_rdd.map(lambda x: (x[0], x[3], x[5], x[6]))
columns = ["ip","date","method","url"]
logs_df = selected_col_rdd.toDF(columns)
logs_df.createOrReplaceTempView('logs_df')
sql = """
SELECT
url,
count(*) as count
FROM logs_df
WHERE url LIKE '%/article%'
GROUP BY url
"""
article_count_df = spark.sql(sql)
print(" ### Get only articles and blogs records ### ")
article_count_df.show(5)
article_count_df.write.save('hdfs://{}/data/article_count_df'.format(MASTER_NODE_INSTANCE_NAME), format='csv', mode='overwrite')
To run the above code in your dataproc cluster , you need to upload the code into GCS using the below command from shell terminal.
gsutil cp pyspark_job.py gs://shorya-gcp-data-end-data-bucket/chapter-5/code
Finally we can submit the job from our cloud shell terminal using the below command.
gcloud dataproc jobs submit pyspark --cluster=cluster-3539 --region=us-central1 gs://shorya-gcp-data-end-data-bucket/chapter-5/code/pyspark_job.py
Developing Spark ETL from GCS to GCS
We can use GCS instead of HDFS seemlessly from pyspark. Let’s modify our previous code a little bit.
Create a file pyspark_from_gcs_to_gcs.py in cloud shell editor and paste the below code into that.
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName('spark_hdfs_to_hdfs') \
.getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("WARN")
BUCKET_NAME="shorya-gcp-data-end-data-bucket"
MASTER_NODE_INSTANCE_NAME="cluster-3539-m"
log_files_rdd = sc.textFile('gs://{}/chapter-5/dataset/logs_example/*'.format(BUCKET_NAME))
splitted_rdd = log_files_rdd.map(lambda x: x.split(" "))
selected_col_rdd = splitted_rdd.map(lambda x: (x[0], x[3], x[5], x[6]))
columns = ["ip","date","method","url"]
logs_df = selected_col_rdd.toDF(columns)
logs_df.createOrReplaceTempView('logs_df')
sql = """
SELECT
url,
count(*) as count
FROM logs_df
WHERE url LIKE '%/article%'
GROUP BY url
"""
article_count_df = spark.sql(sql)
print(" ### Get only articles and blogs records ### ")
article_count_df.show(5)
article_count_df.write.save('gs://{}/chapter-5/job-result/article_count_df'.format(BUCKET_NAME), format='csv', mode='overwrite')
Now, similar to steps above, copy the code to cloud storage and submit the pyspark job.
gsutil cp pyspark_from_gcs_to_gcs.py gs://shorya-gcp-data-end-data-bucket/chapter-5/code
gcloud dataproc jobs submit pyspark --cluster=cluster-3539 --region=us-central1 gs://shorya-gcp-data-end-data-bucket/chapter-5/code/pyspark_from_gcs_to_gcs.py
Developing Spark ETL from GCS to BigQuery
In the previos section we saved our output in the GCS , alternatively we can directly store our output to BigQuery.
Create a file pyspark_from_gcs_to_bq.py using cloud shell editor and write the below code in it.
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName('spark_hdfs_to_hdfs') \
.getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("WARN")
BUCKET_NAME="shorya-gcp-data-end-data-bucket"
log_files_rdd = sc.textFile('gs://{}/chapter-5/dataset/logs_example/*'.format(BUCKET_NAME))
splitted_rdd = log_files_rdd.map(lambda x: x.split(" "))
selected_col_rdd = splitted_rdd.map(lambda x: (x[0], x[3], x[5], x[6]))
columns = ["ip","date","method","url"]
logs_df = selected_col_rdd.toDF(columns)
logs_df.createOrReplaceTempView('logs_df')
sql = """
SELECT
url,
count(*) as count
FROM logs_df
WHERE url LIKE '%/article%'
GROUP BY url
"""
article_count_df = spark.sql(sql)
print(" ### Get only articles and blogs records ### ")
article_count_df.show(5)
article_count_df.write.format('bigquery') \
.option('temporaryGcsBucket', BUCKET_NAME) \
.option('table', 'dwh_bikesharing.article_count_df') \
.mode('overwrite') \
.save()
copy the code again to GCS , but while running the submit command we need BigQuery jars connector as well.
gsutil cp pyspark_from_gcs_to_bq.py gs://shorya-gcp-data-end-data-bucket/chapter-5/code
gcloud dataproc jobs submit pyspark --cluster=cluster-3539 --region=us-central1 gs://shorya-gcp-data-end-data-bucket/chapter-5/code/pyspark_from_gcs_to_bq.py --jars gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar
Wohoo !!! We have completed a long series on data engineering on GCP.
follow me on Linkedin
LinkedIn: https://www.linkedin.com/in/shorya-sharma-b94161121