Developing a DataLake using Dataproc

shorya sharma
7 min readApr 29, 2023

--

This is part 3 and final part of our series on basics of data engineering on google cloud and hence will be a long one by the end of this article you will be able to perform the following tasks.

  1. Accessing HDFS from hadoop node shell.
  2. Loading data from GCS to HDFS.
  3. Creating Hive table on top of HDFS.
  4. Accessing an HDFS file from pyspark.
  5. Accessing a GCS file from Pyspark.
  6. Developing Spark ETL from HDFS to HDFS.
  7. Developing Spark ETL from GCS to GCS.
  8. Developing Spark ETL from GCS to BigQuery.

A data lake is a centralized repository that ingests and stores large volumes of data in its original form.

Pre-requisites

The below blog is adviced to go through for setting up your project as this will be a follow up tutorial.

https://sharmashorya1996.medium.com/data-warehouse-using-bigquery-basic-elt-1385c7662f4e

What is Dataproc ?

Dataproc is a managed Spark and Hadoop service that lets you take advantage of open source data tools for batch processing, querying, streaming, and machine learning. Dataproc automation helps you create clusters quickly, manage them easily, and save money by turning clusters off when you don’t need them.

Create a dataproc cluster

  1. search for the dataproc in google cloud console.
  2. Enable the API.
  3. Click on create cluster

4. Setup the cluster like the below Screenshots and hit on create.

Using cloud storage as a Data proc File System

By the end of this section we will see how we can use GCS inplace of HDFS as our file system.

Once your cluster is up and running , go inside it and click on VM instances tab.

Inside VM instances tab click on the SSH icon , it will take you to the master node shell environment.

Note: This is not your cloud shell , this is Hadoop master node linux. We can access HDFS from here.

Hdfs dfs -ls ../

Loading data from GCS to HDFS

upload the file simple_file.csv from below link to GCS bucket , once done we can upload the it to our master node by using the below command.

gsutil cp gs://[Bucket-name]/<PATH>/simple_file.csv ./

Now, we have data in our master node, and any data in Master node can be loaded to HDFS using below commands.

 hdfs dfs -mkdir ../../data
hdfs dfs -mkdir ../../data/simple_file
hdfs dfs -put simple_file.csv ../../data/simple_file/

The commands uploaded the data from linux file system to HDFS.

Create a Hive Tabe on top of HDFS

We can access the file using Hive. Hive is a tool to give a schema to your file. use the below command to start hive.

hive 

lets create our first table in hive using our data in HDFS.

create external table simple_table(
col_1 STRING,
col_2 STRING,
col3 STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS TEXTFILE
location '/data/simple_file'
TBLPROPERTIES("skip.header.line.count"="1")
;
exit;

Accessing an HDFS file from Pyspark

Start the spark shell using the below command.

pyspark
simple_file = sc.textFile('hdfs://cluster-3539-m/data/simple_file/simple_file.csv')
simple_file.collect()

Accessing GCS files from Pyspark

We can use pyspark to access GCS directly inplace of HDFS , in the spark shell just use teh below code.

simple_file = sc.textFile('gs://shorya-gcp-data-end-data-bucket/chapter-5/dataset/simple_file.csv')
simple_file.collect()
quit()

Creating and running jobs on Dataproc Cluster

Preparing Log Data in HDFS and GCS

  1. Create a folder in your cloud storage with the name logs_example and copy the log files from the location below to folder logs_example in cloud storage.

https://github.com/shorya1996/datawarehousing

2. Now from Hadoop master node shell, copy the logs_example folder there, and use the HDFS copyFromLocal command to load the files into HDFS.

gsutil cp -r gs://shorya-gcp-data-end-data-bucket/chapter-5/dataset/logs_example ./
hdfs dfs -copyFromLocal logs_example ../../data
hdfs dfs -ls ../../data/logs_example/

Developing Spark ETL from HDFS to HDFS

The goal of the exrecise is to calculate how many user requests each article has from the Apache log data.

  1. open the Cloud shell editor and create a new file pyspark_job.py
  2. write the below code in the file
from pyspark.sql import SparkSession

spark = SparkSession.builder \
.appName('spark_hdfs_to_hdfs') \
.getOrCreate()

sc = spark.sparkContext
sc.setLogLevel("WARN")

MASTER_NODE_INSTANCE_NAME="cluster-3539-m"
log_files_rdd = sc.textFile('hdfs://{}/data/logs_example/*'.format(MASTER_NODE_INSTANCE_NAME))

splitted_rdd = log_files_rdd.map(lambda x: x.split(" "))
selected_col_rdd = splitted_rdd.map(lambda x: (x[0], x[3], x[5], x[6]))

columns = ["ip","date","method","url"]
logs_df = selected_col_rdd.toDF(columns)
logs_df.createOrReplaceTempView('logs_df')

sql = """
SELECT
url,
count(*) as count
FROM logs_df
WHERE url LIKE '%/article%'
GROUP BY url
"""
article_count_df = spark.sql(sql)
print(" ### Get only articles and blogs records ### ")
article_count_df.show(5)

article_count_df.write.save('hdfs://{}/data/article_count_df'.format(MASTER_NODE_INSTANCE_NAME), format='csv', mode='overwrite')

To run the above code in your dataproc cluster , you need to upload the code into GCS using the below command from shell terminal.

gsutil cp pyspark_job.py gs://shorya-gcp-data-end-data-bucket/chapter-5/code

Finally we can submit the job from our cloud shell terminal using the below command.

gcloud dataproc jobs submit pyspark --cluster=cluster-3539 --region=us-central1 gs://shorya-gcp-data-end-data-bucket/chapter-5/code/pyspark_job.py

Developing Spark ETL from GCS to GCS

We can use GCS instead of HDFS seemlessly from pyspark. Let’s modify our previous code a little bit.

Create a file pyspark_from_gcs_to_gcs.py in cloud shell editor and paste the below code into that.

from pyspark.sql import SparkSession

spark = SparkSession.builder \
.appName('spark_hdfs_to_hdfs') \
.getOrCreate()

sc = spark.sparkContext
sc.setLogLevel("WARN")

BUCKET_NAME="shorya-gcp-data-end-data-bucket"
MASTER_NODE_INSTANCE_NAME="cluster-3539-m"
log_files_rdd = sc.textFile('gs://{}/chapter-5/dataset/logs_example/*'.format(BUCKET_NAME))

splitted_rdd = log_files_rdd.map(lambda x: x.split(" "))
selected_col_rdd = splitted_rdd.map(lambda x: (x[0], x[3], x[5], x[6]))

columns = ["ip","date","method","url"]
logs_df = selected_col_rdd.toDF(columns)
logs_df.createOrReplaceTempView('logs_df')

sql = """
SELECT
url,
count(*) as count
FROM logs_df
WHERE url LIKE '%/article%'
GROUP BY url
"""
article_count_df = spark.sql(sql)
print(" ### Get only articles and blogs records ### ")
article_count_df.show(5)

article_count_df.write.save('gs://{}/chapter-5/job-result/article_count_df'.format(BUCKET_NAME), format='csv', mode='overwrite')

Now, similar to steps above, copy the code to cloud storage and submit the pyspark job.

gsutil cp pyspark_from_gcs_to_gcs.py gs://shorya-gcp-data-end-data-bucket/chapter-5/code

gcloud dataproc jobs submit pyspark --cluster=cluster-3539 --region=us-central1 gs://shorya-gcp-data-end-data-bucket/chapter-5/code/pyspark_from_gcs_to_gcs.py

Developing Spark ETL from GCS to BigQuery

In the previos section we saved our output in the GCS , alternatively we can directly store our output to BigQuery.

Create a file pyspark_from_gcs_to_bq.py using cloud shell editor and write the below code in it.

from pyspark.sql import SparkSession

spark = SparkSession.builder \
.appName('spark_hdfs_to_hdfs') \
.getOrCreate()

sc = spark.sparkContext
sc.setLogLevel("WARN")

BUCKET_NAME="shorya-gcp-data-end-data-bucket"
log_files_rdd = sc.textFile('gs://{}/chapter-5/dataset/logs_example/*'.format(BUCKET_NAME))

splitted_rdd = log_files_rdd.map(lambda x: x.split(" "))
selected_col_rdd = splitted_rdd.map(lambda x: (x[0], x[3], x[5], x[6]))

columns = ["ip","date","method","url"]
logs_df = selected_col_rdd.toDF(columns)
logs_df.createOrReplaceTempView('logs_df')

sql = """
SELECT
url,
count(*) as count
FROM logs_df
WHERE url LIKE '%/article%'
GROUP BY url
"""
article_count_df = spark.sql(sql)
print(" ### Get only articles and blogs records ### ")
article_count_df.show(5)

article_count_df.write.format('bigquery') \
.option('temporaryGcsBucket', BUCKET_NAME) \
.option('table', 'dwh_bikesharing.article_count_df') \
.mode('overwrite') \
.save()

copy the code again to GCS , but while running the submit command we need BigQuery jars connector as well.

gsutil cp pyspark_from_gcs_to_bq.py gs://shorya-gcp-data-end-data-bucket/chapter-5/code

gcloud dataproc jobs submit pyspark --cluster=cluster-3539 --region=us-central1 gs://shorya-gcp-data-end-data-bucket/chapter-5/code/pyspark_from_gcs_to_bq.py --jars gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar

Wohoo !!! We have completed a long series on data engineering on GCP.

follow me on Linkedin

LinkedIn: https://www.linkedin.com/in/shorya-sharma-b94161121

--

--

shorya sharma
shorya sharma

Written by shorya sharma

Assistant Manager at Bank Of America | Ex-Data Engineer at IBM | Ex - Software Engineer at Compunnel inc. | Python | Data Science

No responses yet