ETL Process using Snowflake and AWS s3

shorya sharma
6 min readNov 11, 2022

--

I am glad you took interest in understanding the ETL process with snowflake and AWS s3.

Why Snowflake Cloud ?

  1. Performance and speed
  2. Storage and support for structured and semistructured data
  3. Concurrency and accessibility
  4. Seamless data sharing
  5. Availability and security

Prerequisites for this tutorial

  1. Familiarity with AWS Cloud
  2. Familiarity with Snowflake cloud

What we will learn in this course ?

How to create a storage integration Object and authenticate snowflake to read data from s3.

Create a stage object which refers to the Integration Object (One stage per table).

Execute copy Commands manually and automatically using snowpipe.

Extract the records from Snowflake and load it to s3.

Architecture

Let us begin,

Manual Ingestion from AWS s3 to snowflake

  1. Login to your AWS Console
  2. Search for IAM and then click on roles.
  3. Click on create role and follow the Image below

give and external ID like 000000 and click on next.

4. On the persmissions screen search for s3 and give AmazonS3FullAccess and click on next.

5. Give the permission a meaningful name like ‘snowFlake-aws-role’ and in description write ‘Access for snowflake to read from s3’. Click on create role

6. Search for s3 service , create a new bucket with default settings. I have given my bucket name as ‘shorya-snowflake-data’.

7. Go to the root of the bucket and create a folder and give it a name ‘Banking-Dataset-Marketing-target’ and then again inside this folder create another folder ‘Banking-Dataset’ where we will be uploading our datasets.

8. lets create the integration object. In your worksheet in snowflake write the following commands.

use role sysadmin ;

create database if not exists banking_db;

create schema if not exists banking_liv;

use schema banking_db.banking_liv;

-- create your wahrehouse
use warehouse PRODUCTION_WH;

create or replace table banking_liv.banking (
age int,
job string,
marital string,
education string,
default string,
balance float,
housing string,
load string,
contact string,
dayofmonth int,
mnth string,
duration int,
campaign int,
pdays int,
previous int,
poutcome string,
y string
);


show databases;
use role accountadmin;

-- Create an integration object ------
CREATE STORAGE INTEGRATION aws_sf_data
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = S3
ENABLED = TRUE
STORAGE_AWS_ROLE_ARN = '{copy the ARM of the IAM role we created}'
STORAGE_ALLOWED_LOCATIONS = ('s3://shorya-snowflake-data/');

After this write the below command

desc INTEGRATION aws_sf_data;

Now open your IAM console and move to trusted permission and edit the permission

{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"AWS": "{copy the value of STORAGE_AWS_IAM_USER_ARN from the results of above query}"
},
"Action": "sts:AssumeRole",
"Condition": {
"StringEquals": {
"sts:ExternalId": "{copy the value of STORAGE_AWS_EXTERNAL_ID from the results of above query}"
}
}
}
]
}

Now type the following under the previous query in worksheet.

-- Grant usage access to Sysadmin Role ---
grant usage on integration aws_sf_data to role sysadmin;

-- Grant create stage access to Sysadmin Role ---
grant create stage on schema banking_db.banking_liv to role sysadmin;

use role sysadmin;

create schema banking_dev;

use schema banking_db.banking_dev;

-- Create an empty table ---
create table banking as select * from banking_db.banking_liv.banking;

9. lets create a file format and stage area.



-- Create a file format ---
CREATE FILE FORMAT csv_load_format
TYPE = 'CSV'
COMPRESSION = 'AUTO'
FIELD_DELIMITER = ','
RECORD_DELIMITER = '\n'
SKIP_HEADER =1
FIELD_OPTIONALLY_ENCLOSED_BY = '\042'
TRIM_SPACE = FALSE
ERROR_ON_COLUMN_COUNT_MISMATCH = TRUE
ESCAPE = 'NONE'
ESCAPE_UNENCLOSED_FIELD = '\134'
DATE_FORMAT = 'AUTO'
TIMESTAMP_FORMAT = 'AUTO';


-- Create a stage for banking table ---
create stage stg_banking_csv_dev
storage_integration = aws_sf_data
url = 's3://shorya-snowflake-data/Banking-Dataset-Marketing-target/Banking-Dataset/'
file_format = csv_load_format;

10. lets put the data in our s3 bucket (the data is available on the github link provided at the end of the tutorial) and list the contents in this stage area

-- List the contents of stage  ---
list @stg_banking_csv_dev;

11. Now lets ingest the data using the below command.

-- Execcute the copy command ----
copy into banking
from @stg_banking_csv_dev
file_format = csv_load_format
ON_ERROR = ABORT_STATEMENT;

12. Check the data ingested

select * from banking limit 10;

Auto Ingestion from AWS s3 to Snowflake

  1. Create the a new folder ‘banking-snowpipe’ inside the bucket ‘Banking-Dataset-Marketing-target’ , again inside the ‘banking-snowpipe’ create another folder ‘csv’ and make sure its empty for now.
  2. Create the new stage area of the new path.
use role sysadmin;

use schema banking_db.banking_dev;

-- Create a stage for banking table ---
create or replace stage stg_banking_csv_dev
storage_integration = aws_sf_data
url = 's3://shorya-snowflake-data/Banking-Dataset-Marketing-target/Banking-snowpipe/csv/'
file_format = csv_load_format;

3. Now lets create a snowpipe

create or replace pipe banking_pipe auto_ingest=true as
copy into banking from @stg_banking_csv_dev ON_ERROR = continue;

4. let’s the see the pip that got created

show pipes;

The important thing to notice from the above results is the notification_channel .

we will use a simple service of AWS called SQS (Simple Queue Service) , as soon as the data will arrive at the bucket , it will trigger a notification to SQS , which in turn will trigger our snowpipe, And thats how snowpipe will know when it has to trigger the copy command automatically.

5. Move to the root directory of your bucket (for me it is shorya-snowflake-data) , click on properties and scroll down, under event notification section click on create a new event.

6. Give it any name like ‘banking_trigger_event’ and under prefix section make sure to prefix it with the name of the folder where we want the trigger to work.

7. Under event types

8. under destination select the following options and the ARN is something we will get from the notification_channel command that we exceuted.

9. Lets truncate the previously uploaded data from banking table and make sure its empty.

truncate banking;

10. Now upload the data into “csv/” folder (test1.csv file available is in the link given below) and see if the data got ingested or not after waiting for a few seconds.

select count(*) from banking limit 10;

11. Check the load history using the below Command

select * from information_schema.load_history where table_name='BANKING' order by last_load_time desc limit 10;

12. lets upload a few more files inside the “csv/” folder (test2.csv is available in the link below) and wait for few seconds

select count(*) from banking limit 10;

We can see the new data got appended to the previous records , the previous data was not reingested.

Extracting Data from snowflake to AWS s3

  1. create a folder inside ‘Banking-Dataset-Marketing-target’ inside your bucket and give it a name “unload_data”.

2. Execute the below command after changing the URI.

use role accountadmin;

show integrations;

use schema BANKING_DB.BANKING_DEV;

-- Extract/Unload data ---
copy into s3://shorya-snowflake-data/Banking-Dataset-Marketing-target/unload_data/
from
(
select * from BANKING_DB.BANKING_DEV.BANKING where age > 40
)
storage_integration=aws_sf_data
single=false
file_format = csv_load_format;

In the above query we have transformed the data and written it as CSV format in our s3.

The data used in this tutorial is available here https://github.com/shorya1996/snowflake

follow me on Linkedin

LinkedIn: https://www.linkedin.com/in/shorya-sharma-b94161121/

--

--

shorya sharma

Assistant Manager at Bank Of America | Ex-Data Engineer at IBM | Ex - Software Engineer at Compunnel inc. | Python | Data Science