Data Engineering — Building an ETL Pipeline for Canadian ETFs

Web scraping → Transformation & Data Validation → Loading into Snowflake → EDA in Snowflake (part 2)

Sarthak Girdhar
7 min readMar 10, 2024
Image generated by Author using DALL E 3 (Microsoft Copilot Designer)

Before I begin, there must be one question that’s bugging you.

Why didn’t Sarthak choose to merge the two dataframes created in part 1? Snowflake also has compute costs, so, it doesn’t make sense to join these two dataframes/tables in Snowflake over pandas. Whatever happened to Roche’s Maxim of Data Transformation that Sarthak quoted in an earlier project. He could have just simply used canadian_etf = pd.merge(wiki_etf_df, investing_etf_df, on=’Symbol’, how=’inner’) to merge the two dataframes to create a new one.

I didn’t choose to do that for three reasons:-

  1. Sometimes, it is beneficial for the downstream users to have access to the raw (as much as possible) data. In this scenario, the end users will have access to both the dataframes/Snowflake tables separately and combined.
  2. The Wikipedia table and the Investing.com table won’t get updated at the same frequency. The price of ETFs moves daily and therefore, the Investing.com table would need to be refreshed more often than the Wikipedia table. If we had combined both the tables, then we would need to refresh/load the same (at least, most of it) data over & over again, which would have been more expensive in the long run.
  3. I wanted to demonstrate joins in Snowflake.

Introduction

This is part 2 of building an ETL pipeline for the Canadian ETFs Screener.

In part 1, we scraped/extracted data from Wikipedia and Investing.com, did some transformations, and finally performed some data quality checks.

In this part, we’ll load both the dataframes into two separate Snowflake tables, join them together to create one table, and finally write some queries in SQL as part of our Exploratory Data Analysis (EDA).

Loading the two dataframes into Snowflake

Let’s start with importing the necessary libraries.

# import libraries
import snowflake.connector
from snowflake.connector.pandas_tools import write_pandas

Next, we’ll write the function to upload wiki_etf_df dataframe into a table in Snowflake.

# create a function to upload the 'wiki_etf_df' dataframe into a table in Snowflake
def wiki_to_snowflake(ACCOUNT, USER, PASSWORD, WAREHOUSE, DATABASE, SCHEMA):

# connect to Snowflake
conn = snowflake.connector.connect(
user=USER,
password=PASSWORD,
account=ACCOUNT,
warehouse=WAREHOUSE,
database=DATABASE,
schema=SCHEMA
)

# create a cursor
cur = conn.cursor()

# create the warehouse
cur.execute(f'CREATE WAREHOUSE IF NOT EXISTS {WAREHOUSE} WAREHOUSE_SIZE = XSMALL AUTO_SUSPEND = 300')

# use the warehouse
cur.execute(f'USE WAREHOUSE {WAREHOUSE}')

# create the database
cur.execute(f'CREATE DATABASE IF NOT EXISTS {DATABASE}')

# use the database
cur.execute(f'USE DATABASE {DATABASE}')

# create the schema
cur.execute(f'CREATE SCHEMA IF NOT EXISTS {SCHEMA}')

# use the schema
cur.execute(f'USE SCHEMA {SCHEMA}')

# create the table
cur.execute("""
CREATE OR REPLACE TABLE wiki_etf (
"Symbol" STRING,
"Name" STRING,
"Issuer" STRING,
"Asset Class" STRING,
"Inception Date" DATE,
"Total Assets (MM)" FLOAT,
"MER (%)" FLOAT
)
""")

# load the data from 'wiki_etf_df' dataframe into 'wiki_etf' Snowflake table

cur.execute('TRUNCATE TABLE wiki_etf') # clear existing data if needed

write_pandas(conn, wiki_etf_df, 'WIKI_ETF')


# close the cursor and Snowflake connection
cur.close()
conn.close()

# call the function
wiki_to_snowflake('Y*****A-Y*****9', 'sarthakgirdhar', 'XXXXXXXX', 'COMPUTE_WH','CanadianETFScreener', 'Wikipedia')
12 Part of WIKI_ETF table in Snowflake
Part of WIKI_ETF table in Snowflake (Image by author)

Finally, we’ll write the function to upload investing_etf_df dataframe into a table in Snowflake.

# create a function to upload the 'investing_etf_df' dataframe into a table in Snowflake
def investing_to_snowflake(ACCOUNT, USER, PASSWORD, WAREHOUSE, DATABASE, SCHEMA):

# connect to Snowflake
conn = snowflake.connector.connect(
user=USER,
password=PASSWORD,
account=ACCOUNT,
warehouse=WAREHOUSE,
database=DATABASE,
schema=SCHEMA
)

# create a cursor
cur = conn.cursor()

# create the warehouse
cur.execute(f'CREATE WAREHOUSE IF NOT EXISTS {WAREHOUSE} WAREHOUSE_SIZE = XSMALL AUTO_SUSPEND = 300')

# use the warehouse
cur.execute(f'USE WAREHOUSE {WAREHOUSE}')

# create the database
cur.execute(f'CREATE DATABASE IF NOT EXISTS {DATABASE}')

# use the database
cur.execute(f'USE DATABASE {DATABASE}')

# create the schema
cur.execute(f'CREATE SCHEMA IF NOT EXISTS {SCHEMA}')

# use the schema
cur.execute(f'USE SCHEMA {SCHEMA}')

# create the table
cur.execute("""
CREATE OR REPLACE TABLE investing_etf (
"Symbol" STRING,
"Price" FLOAT,
"Chg. %" FLOAT,
"Vol." FLOAT
)
""")

# load the data from 'investing_etf_df' dataframe into 'investing_etf' Snowflake table

cur.execute('TRUNCATE TABLE investing_etf') # clear existing data if needed

write_pandas(conn, investing_etf_df, 'INVESTING_ETF')


# close the cursor and Snowflake connection
cur.close()
conn.close()

# call the function
investing_to_snowflake('Y*****A-Y*****9', 'sarthakgirdhar', 'XXXXXXXX', 'COMPUTE_WH','CanadianETFScreener', 'Investing')
13 Part of INVESTING_ETF table in Snowflake
Part of INVESTING_ETF table in Snowflake (Image by author)

Joining the two tables

Now that the two dataframes have been loaded as Snowflake tables, it is time to join them. We’ll be using Natural Join to accomplish the same.

/* Join the two tables to create a new Snowflake table */

CREATE OR REPLACE TABLE CANADIANETFSCREENER.MERGED.ETF
AS
(SELECT * FROM CANADIANETFSCREENER.WIKIPEDIA.WIKI_ETF
NATURAL JOIN CANADIANETFSCREENER.INVESTING.INVESTING_ETF
ORDER BY "Symbol");
SELECT * FROM CANADIANETFSCREENER.MERGED.ETF;
14 Part of CANADIANETFSCREENER.MERGED.ETF table in Snowflake
Part of CANADIANETFSCREENER.MERGED.ETF table in Snowflake (Image by author)

That’s it! You see how simple it was 😀

At this stage, there should be one more thing that you should be thinking about, that’s right — Data Modeling. Issuer and Asset Class should be their own separate tables. If we had prices for these ETFs for separate dates, then Date would be another table too. However, this table isn’t that big (not that many data points), therefore, it doesn’t make a lot of sense to go for Dimensional modeling.

If you want to see Dimensional modeling in action, check out this project.

Exploratory Data Analysis (EDA) in Snowflake

We’re almost getting to the end of our project! In this section, we will write some SQL queries to get some further insights into our dataset.

Which ETFs from ‘Blackrock’ have an MER of less than 0.25%?

SELECT "Symbol", "Name", "MER (%)"
FROM CANADIANETFSCREENER.MERGED.ETF
WHERE "Issuer" = 'Blackrock'
AND "MER (%)" < 0.25
ORDER BY "MER (%)" DESC;
Blackrock ETFs with MER of less than 0.25%

Which ETFs have the most Total Assets in the ‘Fixed Income’ Asset Class?

SELECT "Symbol", "Name", "Total Assets (MM)"
FROM CANADIANETFSCREENER.MERGED.ETF
WHERE "Asset Class" = 'Fixed Income'
ORDER BY "Total Assets (MM)" DESC
LIMIT 15;
ETFs with the most Total Assets in the ‘Fixed Income’ Asset Class

Which Issuer has the most Total Assets and how many ETFs offerings do they have?

SELECT "Issuer", COUNT("Issuer") AS "Number of ETFs", SUM("Total Assets (MM)") AS "Assets (MM)"
FROM CANADIANETFSCREENER.MERGED.ETF
GROUP BY "Issuer"
ORDER BY 3 DESC;

We can see that Blackrock, BMO, and Vanguard are the top Asset Management Companies in Canada.

Let’s up the ante!

Total Assets by Issuer and Asset Class

SELECT "Issuer", "Asset Class", SUM("Total Assets (MM)") AS "Assets (MM)"
FROM CANADIANETFSCREENER.MERGED.ETF
GROUP BY GROUPING SETS (("Issuer", "Asset Class"))
ORDER BY 3 DESC;

In an earlier query, we were looking for Asset Management Companies (AMCs)/Issuers with the most Assets (in Multi Millions).

In this query, we want to find out which AMCs have the most assets and where is that money being invested. This is achieved via Grouping Sets.

Which was the third most traded ETF by Volume?

WITH most_traded_ETF AS (
SELECT *,
DENSE_RANK() OVER (ORDER BY "Vol." Desc) AS Rnk
FROM CANADIANETFSCREENER.MERGED.ETF
)
SELECT *
FROM most_traded_etf
WHERE Rnk=3;
Third most traded ETF by Volume.png

Give me all the details of the oldest ETF from each Issuer

WITH oldest_ETF AS (
SELECT *,
DENSE_RANK() OVER (PARTITION BY "Issuer" ORDER BY "Inception Date" ASC) AS Rnk
FROM CANADIANETFSCREENER.MERGED.ETF
)
SELECT * EXCLUDE "RNK"
FROM oldest_etf
WHERE Rnk = 1;
Oldest ETF from each Issuer

In the results, you will notice that there are multiple ETFs by RBC Global, AGFiQ, etc. that have the same Rank. This is because they all have the same Inception Date. If that’s not to your liking, you can simply change the query to DENSE_RANK() OVER (PARTITION BY “Issuer” ORDER BY “Inception Date” ASC, “Symbol”) AS Rnk and you will now get different ranks for all the ETFs.

Conclusion

This brings us to the end of this project. Even though, it was a simple ETL pipeline, we covered a lot of ground in regard to Data Engineering practices.

We learned to think for our downstream users (data analysts, business users, etc.) thereby, transforming and validating data accordingly. We learned how to join two tables. Furthermore, we learned when and why is Dimensional modeling useful and when it might not be that helpful. Finally, we learned to write cleaner SQL queries using Common Table Expressions (CTEs) and Window Functions.

References

--

--