Orchestrate batch data loading using Cloud Composer — ELT
Cloud Composer is a fully managed workflow orchestration service, enabling you to create, schedule, monitor, and manage workflows that span across clouds and on-premises data centers.
Cloud Composer is built on the popular Apache Airflow open source project and operates using the Python programming language.
In this tutorial we will learn how to orchestrate our data warehouse tables and build a data warehouse in BigQuery. We will learn how to automate table creations using cloud composer service in GCP.
Pre-requisites
The below blog is adviced to go through for setting up your project as this will be a follow up tutorial.
Creating a cloud composer environment
- In GCP console search bar , look for composer.
- Click on Enable API if you have’t enabled it.
3. Click on create environment — Composer 1 on the Cloud Composer Console webpage.
4. Fill the details and click on create.
5. Click on Airflow in Airflow webserver column.
This page will list down all our DAGs.
Why Airflow ?
Airflow is a batch workflow orchestration platform. The Airflow framework contains operators to connect with many technologies and is easily extensible to connect with a new technology. If your workflows have a clear start and end, and run at regular intervals, they can be programmed as an Airflow DAG.
Cloud Composer Bucket Directories
Cloud Composer has specific folders in teh GCS bucket for airflow management.
Scheduling a pipeline from Cloud SQL to GCS and BigQuery
- Follow the steps from the below link under the heading ‘Create a MySQL database in Cloud SQL’ to create a MySQL instance in cloud SQL and then assign cloud SQL service account a storage object admin role.
2. Write the following code to create a DAG that extracts data from Cloud SQL tp our GCS bucket, and from the GCS bucket to BigQuery tables.
vi sql_gcs_bq.py
from airflow import DAG
from airflow.contrib.operators.gcp_sql_operator import CloudSqlInstanceExportOperator
from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.utils.dates import days_ago
args = {
'owner': 'shorya'
}GCP_PROJECT_ID = 'shorya-gcp-data-end'
INSTANCE_NAME = 'mysql-instance-source'
EXPORT_URI = 'gs://shorya-gcp-data-end-data-bucket/mysql_export/from_composer/stations/stations.csv'
SQL_QUERY = "SELECT * FROM apps_db.stations"export_body = {
"exportContext": {
"fileType": "csv",
"uri": EXPORT_URI,
"csvExportOptions":{
"selectQuery": SQL_QUERY
}
}
}with DAG(
dag_id='dag_load_bigquery',
default_args=args,
schedule_interval='0 5 * * *',
start_date=days_ago(1),
) as dag: sql_export_task = CloudSqlInstanceExportOperator(
project_id=GCP_PROJECT_ID,
body=export_body,
instance=INSTANCE_NAME,
task_id='sql_export_task'
) gcs_to_bq_example = GoogleCloudStorageToBigQueryOperator(
task_id = "gcs_to_bq_example",
bucket = 'shorya-gcp-data-end-data-bucket',
source_objects = ['mysql_export/from_composer/stations/stations.csv'],
destination_project_dataset_table ='raw_bikesharing.stations',
schema_fields=[
{'name': 'station_id', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'name', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'region_id', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'capacity', 'type': 'INTEGER', 'mode': 'NULLABLE'}
],
write_disposition='WRITE_TRUNCATE'
) bq_to_bq = BigQueryOperator(
task_id = "bq_to_bq",
sql = "SELECT count(*) as count FROM `raw_bikesharing.stations`",
destination_dataset_table = 'dwh_bikesharing.temporary_stations_count',
write_disposition = 'WRITE_TRUNCATE',
create_disposition = 'CREATE_IF_NEEDED',
use_legacy_sql = False,
priority = 'BATCH'
) sql_export_task >> gcs_to_bq_example >> bq_to_bqif __name__ == "__main__":
dag.cli()
3. Create a new dataset with the name dwh_bikesharing in BigQuery.
4. To deploy the DAG in the cloud composer use the below code.
gcloud composer environments storage dags import --environment shorya-gcp-cloud-composer --location us-central1 --source sql_gcs_bq.py
5. check the data in BigQuery
6. Do not forget to delete the services used.
Wohoo !!! We have completed Orchestration of Batch loading data on google cloud.
follow me on Linkedin
LinkedIn: https://www.linkedin.com/in/shorya-sharma-b94161121