GDP Table - ETL - SQLite3 - BS

This is the same scenario as GDP Table - ETL - SQLite3 - Pandas except we’ll be using BeautifulSoup to parse the website as opposed to Pandas used in the other script.

Here is a brief recap, for more details on the scenario visit the link above.

Scenario


  • You are tasked with creating an automated script that can extract the list of all countries in order of their GDPs in billion USDs (rounded to 2 decimal places), as logged by the International Monetary Fund (IMF).
  • The required information needs to be made accessible as a CSV file Countries_by_GDP.csv
  • as well as a table Countries_by_GDP in
  • a database file World_Economies.db
  • with attributes Country and GDP_USD_billion
  • Run a query on the database table to display only the entries with more than a 100 billion USD economy.
  • Log in a file the entire process of execution named etl_project_log.txt.

Tasks

  • Write a data extraction function to retrieve the relevant information from the provided URL.
  • Transform the available GDP information into ‘Billion USD’ from ‘Million USD’.
  • Load the transformed information to the required CSV file Countries_by_GDP.csv
  • Save the data in a SQLite3 database World_Economies.db in table: Countries_by_GDP
  • Run the required query on the database
  • Log the progress of the code with appropriate timestamps in etl_project_log.txt

Setup


library(reticulate)
from bs4 import BeautifulSoup
import pandas as pd
import numpy as np
import sqlite3
import requests
from datetime import datetime 

Global Vars

url = 'https://web.archive.org/web/20230902185326/https://en.wikipedia.org/wiki/List_of_countries_by_GDP_%28nominal%29'
sql_connection = sqlite3.connect('D:/data/GDP_BS/World_Economies.db')
transformed_file = 'D:/data/GDP_BS/Countries_by_GDP.csv'
table_name = 'Countries_by_GDP'
attribute_list = ['Country', 'GDP_USD_millions']
log_file = 'D:/data/GDP_BS/etl_project_log.txt'

Extract


Inspect Data

  • If you use inspect tab on the site page you’ll find out that:
    • The table we need is the second <table class="wikitable sortable...."></table>

    • Alot of information in the header that we can actually omit since we know the only two columns we’ll need are Columns 1 “Country/Territory” and Column 3 “IMF Estimate”

    • All the data is found in<tbody>...</tbody>

    • First row in the body is<tr class="static-row-header"></tr>

    • The data we want start in row 2 which is <tr>...</tr> and continues all the way down the table

    • Till we get to the closing </tbody>

    • Within each row <tr><td`>contains the cell content</td> </tr>`

  • With all that in mind now we can extract the data

Extract All TABLES

  • First: Let’s extract all the tables from the page
tables = page.find_all('table')

List Classes of All Tables

data = requests.get(url).text
page = BeautifulSoup(data, 'html5lib')
tables = page.find_all('table')
for table in tables:
        print(table.get('class'))
  • We could’ve targeted the table directly but there is no telling if there isn’t another table with the same name as well.

Or we can risk of misspelling and cause an issue

# Target the table directly using class tag
table = page.find('table', class_='wikitable sortable')

Choose Table

  • As you see above the table we want is the 3rd one because the ‘class’ tag matches what we found when we inspected the page earlier
table = tables[2]

Set Rows

  • Extract all the rows in the table and we can do that targeting <tr> </tr>
rows = table.find_all('tr')

Create DF

  • Let’s create a df to store the rows in it
  • We already have the global var for the column names
df = pd.DataFrame(columns = attribute_list)

Loop Rows Extract Cols

  • We’ll go through all the rows one by one and extract each col using the <td> </td> col = row.find_all('td')

  • Note: some rows have - values, some might be None, and some might be [ ] blank, from observation below we need to target col[2] which is the 3rd ‘td’ tag for the GDP value

  • Target <a> </a> tag in first column to have value

  • So use a conditional statement to skip those cells

  • Create a data_dict with

    • key: Country name content
    • value: GDP content
  • From inspection you’ll notice that each <tr> <td>[style] <a> .text: Country Name</a></td>

    <td>style</td> <td>DGP figure</td> </tr>

  • So we need to target td[0]:

    • skip the <a></a> and extract the content for the Country Name "Country" : col[0].a.contents[0]
  • We need to target the third td tag td[2] for the GDP figure "GDP_USD_millions": col[2].contents[0]

  • Create a local df1 for all the cells we just extracted from the dict we just created, and set index = 0

  • Concatenate the data df1 with the df that already has one row of column headings

Test Extraction w Print

# ______ SETUP AND TEST EXTRACTION
for x, row in enumerate(rows):
        col = row.find_all('td')
        if len(col)!=0:
                #print(f"This is row {x}, and value is: {col}")
                print(f"We are at Row #{x}")
                if col[0].find('a') is not None and '—' not in col[2]:
                        data_dict = {"Country" : col[0].a.contents[0],                                        "GDP_USD_millions": col[2].contents[0]}   
                        print(f"Country is: {col[0].a.contents[0]} GDP is: {col[2].contents[0]}")
                        df1 = pd.DataFrame(data_dict, index=[0])
                        df = pd.concat([df,df1], ignore_index = True)

Extract Function

def extract(url, attribute_list):
        data = requests.get(url).text
        page = BeautifulSoup(data, 'html.parser')
        df = pd.DataFrame(columns = attribute_list)
        tables = page.find_all('table')
        table = tables[2]
        rows = table.find_all('tr')
        data_dict ={}
        for row in rows:
                col = row.find_all('td')
                if len(col)!=0:
                        if col[0].find('a') is not None and '—' not in col[2]:
                                data_dict = {"Country" : col[0].a.contents[0],                                                                           "GDP_USD_millions": col[2].contents[0]}   
                                df1 = pd.DataFrame(data_dict, index=[0])
                                df = pd.concat([df,df1], ignore_index = True)
        return df

Transform


  • Note that the first row of the table was dropped in EXTRACT because it represented a Total and didn’t have a specific link to a country
  • Save the dataframe column as a list.
    • Iterate over the contents of the list and use split() and join() functions to convert the currency text into numerical text.
    • Type cast the numerical text to float.
  • Let’s test the dtype of the elements in the list before conversion and after to make sure it went well

Function to check dtypes in List

# function to check for dtype in list
def check_list_dtype (a_list):
        for x, element in enumerate(a_list):
                if isinstance(element, int):
                        print(f"Element {x} is INT")
                if isinstance(element, str):
                        print(f"Element {x} is STRING")
                if isinstance(element, float):
                        print(f"Element {x} is FLOAT")

Test Transformation

# _____  SETUP AND TEST TRANSFORMATION SPECIALLY FLOAT CONVERSION
col_list = df["GDP_USD_millions"].tolist()
check_list_dtype(col_list)  # output shows all STRING

col_list = [float("".join(x.split(','))) for x in col_list]
check_list_dtype(col_list)  # output shows all FLOAT

Transform Function

  • Now that the important part is tested let’s
  • Set the values to round 2, and divide by 1000 to make it billion
  • Rename columns to match the requirements
def transform(df):
        col_list = df["GDP_USD_millions"].tolist()
        col_list = [float("".join(x.split(','))) for x in col_list]
        col_list = [(x/1000) for x in col_list]
        col_list = [np.round(x,2) for x in col_list]
        df['GDP_USD_millions'] = col_list
        df.columns = ['Country','GDP_USD_billions']
        return df
df = extract(url, attribute_list)
df = transform(df)
df.head()
Empty DataFrame
Columns: [Country, GDP_USD_billions]
Index: []

Load to CSV

  • Save the transformed data to transformed_file
def load_to_csv(df, csv_path):
        df.to_csv(csv_path)

Load to SQLite3

def load_to_db(df, sql_connection, table_name):
        df.to_sql(table_name, sql_connection, if_exists = 'replace', index =False)
        print('Table is loaded with data')

Run Queries

def run_query(query_statement, sql_connection):
    print(query_statement)
    query_output = pd.read_sql(query_statement, sql_connection)
    print(query_output)

Log Progress

def log_progress(message):
        timestamp_format = '%Y-%h-%d-%H:%M:%S' # Year-Monthname-Day-Hour-Minute-Second
        now = datetime.now() # get current timestamp 
        timestamp = now.strftime(timestamp_format) 
        with open(log_file,"a") as f: 
                f.write(timestamp + ',' + message + '\n')

Query First 15 Rows

# View 15 rows from Table
query_statement = f"SELECT* FROM {table_name} LIMIT 15"
run_query(query_statement, sql_connection)
SELECT* FROM Countries_by_GDP LIMIT 15
Empty DataFrame
Columns: [Country, GDP_USD_billions]
Index: []

Query Count of Rows

# Count the number of Countries in the table
query_statement =  f"SELECT COUNT(*) FROM {table_name}"
run_query(query_statement, sql_connection)
SELECT COUNT(*) FROM Countries_by_GDP
   COUNT(*)
0         0

Function Calls


Here is the sequence of function calls:

Task Log message on completion
Declaring known values Preliminaries complete. Initiating ETL process.
Call extract() function Data extraction complete. Initiating Transformation process.
Call transform() function Data transformation complete. Initiating loading process.
Call load_to_csv() Data saved to CSV file.
Initiate SQLite3 connection SQL Connection initiated.
Call load_to_db() Data loaded to Database as table. Running the query.
Call run_query() * Process Complete.
Close SQLite3 connection -

Execute File


log_progress('Preliminaries complete. Initiating ETL process')

df = extract(url, attribute_list)

log_progress('Data extraction complete. Initiating Transformation process')

df = transform(df)

log_progress('Data transformation complete. Initiating loading process')

load_to_csv(df, transformed_file)

log_progress('Data saved to CSV file')

log_progress('SQL Connection initiated.')

load_to_db(df, sql_connection, table_name)
Table is loaded with data

Run Query

log_progress('Data loaded to Database as table. Running the query')

query_statement = f"SELECT * from {table_name} WHERE GDP_USD_billions >= 100"
run_query(query_statement, sql_connection)
SELECT * from Countries_by_GDP WHERE GDP_USD_billions >= 100
Empty DataFrame
Columns: [Country, GDP_USD_billions]
Index: []
log_progress('Process Complete.')

sql_connection.close()

Complete Script


from bs4 import BeautifulSoup
import pandas as pd
import numpy as np
import sqlite3
import requests
from datetime import datetime 

# Global vars
url = 'https://web.archive.org/web/20230902185326/https://en.wikipedia.org/wiki/List_of_countries_by_GDP_%28nominal%29'
sql_connection = sqlite3.connect('D:/data/GDP_BS/World_Economies.db')
transformed_file = 'D:/data/GDP_BS/Countries_by_GDP.csv'
table_name = 'Countries_by_GDP'
attribute_list = ['Country', 'GDP_USD_millions']
log_file = 'D:/data/GDP_BS/etl_project_log.txt'

# Extract Function
def extract(url, attribute_list):
        data = requests.get(url).text
        page = BeautifulSoup(data, 'html.parser')
        df = pd.DataFrame(columns = attribute_list)
        tables = page.find_all('table')
        table = tables[2]
        rows = table.find_all('tr')
        data_dict ={}
        for row in rows:
                col = row.find_all('td')
                if len(col)!=0:
                        if col[0].find('a') is not None and '—' not in col[2]:
                                data_dict = {"Country" : col[0].a.contents[0],                                        "GDP_USD_millions": col[2].contents[0]}   
                                df1 = pd.DataFrame(data_dict, index=[0])
                                df = pd.concat([df,df1], ignore_index = True)
        return df

# Transform Function
def transform(df):
        col_list = df["GDP_USD_millions"].tolist()
        col_list = [float("".join(x.split(','))) for x in col_list]
        col_list = [(x/1000) for x in col_list]
        col_list = [np.round(x,2) for x in col_list]
        df.columns = ['Country','GDP_USD_billions']
        return df

def load_to_csv(df, csv_path):
        df.to_csv(csv_path)
        
def load_to_db(df, sql_connection, table_name):
        df.to_sql(table_name, sql_connection, if_exists = 'replace', index =False)
        print('Table is loaded with data')
        
def run_query(query_statement, sql_connection):
    print(query_statement)
    query_output = pd.read_sql(query_statement, sql_connection)
    print(query_output)
    
def log_progress(message):
        timestamp_format = '%Y-%h-%d-%H:%M:%S' # Year-Monthname-Day-Hour-Minute-Second
        now = datetime.now() # get current timestamp 
        timestamp = now.strftime(timestamp_format) 
        with open(log_file,"a") as f: 
                f.write(timestamp + ',' + message + '\n')
                
# ETL Process Calls
log_progress('Preliminaries complete. Initiating ETL process')

df = extract(url, attribute_list)

log_progress('Data extraction complete. Initiating Transformation process')

df = transform(df)

log_progress('Data transformation complete. Initiating loading process')

load_to_csv(df, transformed_file)

log_progress('Data saved to CSV file')

log_progress('SQL Connection initiated.')

load_to_db(df, sql_connection, table_name)

# Run Query
log_progress('Data loaded to Database as table. Running the query')

query_statement = f"SELECT * from {table_name} WHERE GDP_USD_billions >= 100"
run_query(query_statement, sql_connection)

log_progress('Process Complete.')

# Close connection
sql_connection.close()