Create DB

ETL data from Staging to DW

As part of this role we need to keep the data synchronized between different databases/data warehouses as part of our daily routine.

Objectives


One task that is routinely performed is the sync up of staging data warehouse and production data warehouse. Automating this sync up will save you a lot of time and standardize your process.

We will use a set of python scripts to perform the incremental data load from MySQL server which acts as a staging warehouse to PostgreSQL which is a production data warehouse. This script will be scheduled by the data engineers to sync up the data between the staging and production data warehouse.

  • Connect to IBM DB2/any cloud service or PostgreSQL data warehouse and identify the last row on it.
  • Connect to MySQL staging data warehouse and find all rows later than the last row on the datawarehouse.
  • Insert the new data from the MySQL staging data warehouse into the IBM DB2 or PostgreSQL production data warehouse.

Setup Staging W - MySQL


  • Start MySQL server instance on the cloud
  • Copy Connection information
  • Start server by clicking on MySQL CLI
  • Your prompt will change to
mysql>
  • Create db named: sales
mysql> create database sales;
Query OK, 1 row affected (0.01 sec)

Download Data

  • From a new terminal
  • Download the data file from this link into local directory: sales.sql
~../project$ wget https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-DB0321EN-SkillsNetwork/ETL/sales.sql

Connecting to cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)|169.63.118.104|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 349956 (342K) [application/x-sql]
Saving to: 'sales.sql'

sales.sql           100%[================>] 341.75K  --.-KB/s    in 0.002s  

2024-11-21 11:58:39 (170 MB/s) - 'sales.sql' saved [349956/349956]

Populate DB from SQL

  • Connect to db
  • sourImport the data into the db: sales using the sales.sql file
mysql> use sales;
Database changed
mysql> source sales.sql;

...
Query OK, 13836 rows affected (0.16 sec)
Records: 13836  Duplicates: 0  Warnings: 0

List Tables

mysql> SHOW FULL TABLES WHERE table_type = 'BASE TABLE';
+-----------------+------------+
| Tables_in_sales | Table_type |
+-----------------+------------+
| sales_data      | BASE TABLE |
+-----------------+------------+
1 row in set (0.01 sec)

MySQL Connection Script

# This program requires the python module mysql-connector-python to be installed.
# Install it using the below command
# python3.8 pip install mysql-connector-python

import mysql.connector

# connect to database
# You can get the Hostname and Password from the connection information section of Mysql 
connection = mysql.connector.connect(user='root', password='zmjfo3k3aUjgjT8XAsYfaIJO',host='172.21.221.174',database='sales')

# create cursor

cursor = connection.cursor()

# create table

SQL = """CREATE TABLE IF NOT EXISTS products(

rowid int(11) NOT NULL AUTO_INCREMENT PRIMARY KEY,
product varchar(255) NOT NULL,
category varchar(255) NOT NULL

)"""

cursor.execute(SQL)

print("Table created")

# insert data

SQL = """INSERT INTO products(product,category)
     VALUES
     ("Television","Electronics"),
     ("Laptop","Electronics"),
     ("Mobile","Electronics")
     """

cursor.execute(SQL)
connection.commit()


# query data

SQL = "SELECT * FROM products"

cursor.execute(SQL)

for row in cursor.fetchall():
    print(row)

# close connection
connection.close()

Install Package

Before executing the script we need to install

  • python3.11 -m pip install mysql-connector-python; 
~../$ python3.11 -m pip install mysql-connector-python;

Defaulting to user installation because normal site-packages is not writeable
Collecting mysql-connector-python
  Downloading mysql_connector_python-9.1.0-cp311-cp311-manylinux_2_28_x86_64.whl.metadata (6.0 kB)
Downloading mysql_connector_python-9.1.0-cp311-cp311-manylinux_2_28_x86_64.whl (34.4 MB)
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 34.4/34.4 MB 29.6 MB/s eta 0:00:00
Installing collected packages: mysql-connector-python
Successfully installed mysql-connector-python-9.1.0

[notice] A new release of pip is available: 24.2 -> 24.3.1
[notice] To update, run: pip3 install --upgrade pip

Connect to MySQL w/Script

Let’s use the python script to connect to the server

~..$ python3.11 mysqlconnect.py

Table created
(1, 'Television', 'Electronics')
(2, 'Laptop', 'Electronics')
(3, 'Mobile', 'Electronics')

View Table

  • As you see we have 13836 rows with last rowid=13939

Setup Working DW PostgreSQL


  • Start PostgreSQL server instance on the cloud
  • Copy Connection information
  • Start server by clicking on PostgreSQL CLI
  • Your prompt will change to
postgres-#

List dbs

postgres-# \db

       List of tablespaces
    Name    |  Owner   | Location 
------------+----------+----------
 pg_default | postgres | 
 pg_global  | postgres | 
(2 rows)

List Tables

postgres-# \dt

           List of relations
 Schema |    Name    | Type  |  Owner   
--------+------------+-------+----------
 public | products   | table | postgres
 public | sales_data | table | postgres
(2 rows)

Create Py Script to Connect to DW

Download Python Connection Script

~../project$ wget https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-DB0321EN-SkillsNetwork/ETL/postgresqlconnect.py

PostgreSQL Connection Script

# This program requires the python module ibm-db to be installed.
# Install it using the below command
# python3 -m pip install psycopg2

import psycopg2

# connectction details

dsn_hostname = '172.21.224.129'
dsn_user='postgres'        # e.g. "abc12345"
dsn_pwd ='AS9FvGlUdHjBJlpmHTDd8rIp'      # e.g. "7dBZ3wWt9XN6$o0J"
dsn_port ="5432"                # e.g. "50000" 
dsn_database ="postgres"           # i.e. "BLUDB"


# create connection

conn = psycopg2.connect(
   database=dsn_database, 
   user=dsn_user,
   password=dsn_pwd,
   host=dsn_hostname, 
   port= dsn_port
)

#Crreate a cursor onject using cursor() method

cursor = conn.cursor()

# create table
SQL = """CREATE TABLE IF NOT EXISTS products(rowid INTEGER PRIMARY KEY NOT NULL,product varchar(255) NOT NULL,category varchar(255) NOT NULL)"""

# Execute the SQL statement
cursor.execute(SQL)

print("Table created")

# insert data

cursor.execute("INSERT INTO  products(rowid,product,category) VALUES(1,'Television','Electronics')");

cursor.execute("INSERT INTO  products(rowid,product,category) VALUES(2,'Laptop','Electronics')");

cursor.execute("INSERT INTO products(rowid,product,category) VALUES(3,'Mobile','Electronics')");

conn.commit()

# insert list of Records

list_ofrecords =[(5,'Mobile','Electronics'),(6,'Mobile','Electronics')]

cursor = conn.cursor()

for row in list_ofrecords:
  
   SQL="INSERT INTO products(rowid,product,category) values(%s,%s,%s)" 
   cursor.execute(SQL,row);
   conn.commit()

# query data

cursor.execute('SELECT * from products;')
rows = cursor.fetchall()
conn.commit()
conn.close()
for row in rows:
    print(row)

Install Package

Before executing the script we need to install

  • python3 -m pip install psycopg2; 
~../$ python3 -m pip install psycopg2;


Defaulting to user installation because normal site-packages is not writeable
Collecting psycopg2
  Downloading psycopg2-2.9.10.tar.gz (385 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 385.7/385.7 KB 138.9 kB/s eta 0:00:00
  Preparing metadata (setup.py) ... done
Building wheels for collected packages: psycopg2
  Building wheel for psycopg2 (setup.py) ... done
  Created wheel for psycopg2: filename=psycopg2-2.9.10-cp310-cp310-linux_x86_64.whl size=499308 sha256=0480ddff54df04dff4a9a89bc2f8c72f12f595ba7c6e65987161c9ab11d77591
  Stored in directory: /home/theia/.cache/pip/wheels/51/41/e0/2912ad51b01f454d26dfb26e5cc5923874656749b9e83943a8
Successfully built psycopg2
Installing collected packages: psycopg2
Successfully installed psycopg2-2.9.10

Connect to PostgreSQL w/Script

  • To test the connection run this
~..\$ python3 postgresqlconnect.py

Table created
(1, 'Television', 'Electronics')
(2, 'Laptop', 'Electronics')
(3, 'Mobile', 'Electronics')
(5, 'Mobile', 'Electronics')
(6, 'Mobile', 'Electronics')

Download Data

  • Get data from link below
~..\$ wget https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/sales-csv3mo8i5SHvta76u7DzUfhiw.csv

Connecting to cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)|169.63.118.104|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 545993 (533K) [text/csv]
Saving to: 'sales-csv3mo8i5SHvta76u7DzUfhiw.csv'

sales-csv3mo8i5SHvta 100%[======================>] 533.20K  --.-KB/s    in 0.003s  

2024-11-21 12:34:29 (150 MB/s) - 'sales-csv3mo8i5SHvta76u7DzUfhiw.csv' saved [545993/545993]

Populate Table w/CSV

  • Import the data from the csv file into table: sales_data
  • We have total rows = 12186
  • Last rowid = 12289

Open pgAdmin4

Create Table

  • Name it: sales_data

Columns

  • rowid, product_id, customer_id, price, quantity, timeestamp

Note: By default, the sales.csv file contains price and timestamp columns, which are not present in sales.sql. Therefore, you can use the below lines of code in your script to include price and timestamp columns when creating the table in Postgres.

Download Automation Script

  • From pgAdmin download the following script that’s saved in automation.py
  • Save file to /var/lib/pgadmin/
  • Download from: https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-DB0321EN-SkillsNetwork/ETL/automation.py
~..|$ wget https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-DB0321EN-SkillsNetwork/ETL/automation.py -P /var/lib/pgadmin

After receiving the error, I downloaded the file locally, then I uploaded as data to the table but didn’t use it, at least I got it up to the directory

Automation.py

  • Here is the content of automation.py which will be used to execute the tasks
# Import libraries required for connecting to mysql

# Import libraries required for connecting to DB2 or PostgreSql

# Connect to MySQL

# Connect to DB2 or PostgreSql

# Find out the last rowid from DB2 data warehouse or PostgreSql data warehouse
# The function get_last_rowid must return the last rowid of the table sales_data on the IBM DB2 database or PostgreSql.

def get_last_rowid():
    p_cursor.execute('SELECT max(rowid) FROM public.sales_data')


last_row_id = get_last_rowid()
print("Last row id on production datawarehouse = ", last_row_id)

# List out all records in MySQL database with rowid greater than the one on the Data warehouse
# The function get_latest_records must return a list of all records that have a rowid greater than the last_row_id in the sales_data table in the sales database on the MySQL staging data warehouse.

def get_latest_records(rowid):
    pass    

new_records = get_latest_records(last_row_id)

print("New rows on staging datawarehouse = ", len(new_records))

# Insert the additional records from MySQL into DB2 or PostgreSql data warehouse.
# The function insert_records must insert all the records passed to it into the sales_data table in IBM DB2 database or PostgreSql.

def insert_records(records):
    pass

insert_records(new_records)
print("New rows inserted into production datawarehouse = ", len(new_records))

# disconnect from mysql warehouse

# disconnect from DB2 or PostgreSql data warehouse 

# End of program

AUTOMATE Data Loading


One of the routine tasks that is carried out around a data warehouse is the extraction of daily new data from the operational database and loading it into the data warehouse. In this section, we will automate the extraction of incremental data, and loading it into the data warehouse.

Create Functions


Extract Last rowid from DW

  • We will write a function that will connect to the working DW (PostgreSQL) and
  • extract the last rowid from the working DW (PostgreSQL in this case)
  • This way we can extract any rows added after this row from the staging DW (MySQL in this case)
  • name the function: get_last_rowid()
# In SQL it would be
SELECT rowid FROM public.sales_data ORDER BY rowid DESC LIMIT 1;
OR
SELECT max(rowid) FROM public.sales_data;
OUTPUT = 12289

In Python it would be:

rowidSQL = """SELECT max(rowid) FROM public.sales_data"""
cursor.execute(rowidSQL)

Function Script

def get_last_rowid():
    cur.execute('SELECT max(rowid) FROM sales_data')
    result = cur.fetchone()
    return(result)


last_row_id = get_last_rowid()
print("Last row id on production datawarehouse = ", last_row_id)

Extract Latest rows from Staging W

  • We will write a function that will connect to MySQL the staging warehouse and
  • return all records later than the given last_rowid that was collected from the previous function
  • name the function: get_latest_records()
  • As you see above the last row as of the running of this script is 12289 we’ll verify that after we ran automation
  • As you see below there are 1650 rows/records with a rowid greater than 12289
  • So the total rows after we run the automation should be 12186 + 1650 = 13836
# In SQL it would be
SELECT * FROM `sales_data` WHERE rowid > 12289;
The output is in the image below

Function Script

def get_latest_records(last_row_id):
    cursor.execute('SELECT * FROM sales_data WHERE rowid > %s', last_row_id)
    new_records = cursor.fetchall()
    return(new_records)

new_records = get_latest_records(last_row_id)

print("New rows on staging datawarehouse = ", len(new_records))

Insert Extracted Records into DW

  • Now that we’ve fetched all the latest records from the staging DW (MySQL)
  • we need to insert them at the end of the working DW (postgreSQL)
  • This function must connect to the working DW (PostgreSQL) and insert all the given records
  • name it: insert_records()

Function Script

def insert_records(new_records):
  for row in new_records:
    putin = f"INSERT INTO sales_data VALUES{row};"
    cur.execute(putin)

insert_records(new_records)
print("New rows inserted into production datawarehouse = ", len(new_records))

Run Automation Script


Run the automation.py script to see if our functions worked as expected

Implement get_last_rowid

  • After implementing this function in automation.py
  • execute the python file to check if the value retrieved is accurate
# Import libraries required for connecting to mysql

# Import libraries required for connecting to PostgreSql
import psycopg2

# Connect to  PostgreSql
# connectction details

dsn_hostname = '172.21.224.129'
dsn_user='postgres'        # e.g. "abc12345"
dsn_pwd ='AS9FvGlUdHjBJlpmHTDd8rIp'      # e.g. "7dBZ3wWt9XN6$o0J"
dsn_port ="5432"                # e.g. "50000" 
dsn_database ="postgres"           # i.e. "BLUDB"


# create connection

conn = psycopg2.connect(
   database=dsn_database, 
   user=dsn_user,
   password=dsn_pwd,
   host=dsn_hostname, 
   port= dsn_port
)

#Crreate a cursor onject using cursor() method

cur = conn.cursor()

# Find out the last rowid from DB2 data warehouse or PostgreSql data warehouse
# The function get_last_rowid must return the last rowid of the table sales_data on the IBM DB2 database or PostgreSql.

def get_last_rowid():
    cur.execute('SELECT max(rowid) FROM sales_data')
    result = cur.fetchone()
    return(result)


last_row_id = get_last_rowid()
print("Last row id on production datawarehouse = ", last_row_id)



# disconnect from PostgreSql data warehouse 
conn.close()
# End of program

Implement get_latest_records

Here is the code for the get_latest_records

# Import libraries required for connecting to mysql
import mysql.connector

# Import libraries required for connecting to  PostgreSql
import psycopg2

# connect to MySQL
connection = mysql.connector.connect(user='root', password='zmjfo3k3aUjgjT8XAsYfaIJO',host='172.21.221.174',database='sales')

# create cursor
cursor = connection.cursor()



# Connect to  PostgreSql
# connectction details

dsn_hostname = '172.21.224.129'
dsn_user='postgres'        # e.g. "abc12345"
dsn_pwd ='AS9FvGlUdHjBJlpmHTDd8rIp'      # e.g. "7dBZ3wWt9XN6$o0J"
dsn_port ="5432"                # e.g. "50000" 
dsn_database ="postgres"           # i.e. "BLUDB"


# create connection

conn = psycopg2.connect(
   database=dsn_database, 
   user=dsn_user,
   password=dsn_pwd,
   host=dsn_hostname, 
   port= dsn_port
)

#Crreate a cursor onject using cursor() method
cur = conn.cursor()

# Find out the last rowid from DB2 data warehouse or PostgreSql data warehouse
# The function get_last_rowid must return the last rowid of the table sales_data on the IBM DB2 database or PostgreSql.

def get_last_rowid():
    cur.execute('SELECT max(rowid) FROM sales_data')
    result = cur.fetchone()
    return(result)


last_row_id = get_last_rowid()
print("Last row id on production datawarehouse = ", last_row_id)

# List out all records in MySQL database with rowid greater than the one on the Data warehouse
# The function get_latest_records must return a list of all records that have a rowid greater than the last_row_id in the sales_data table in the sales database on the MySQL staging data warehouse.

def get_latest_records(last_row_id):
    cursor.execute('SELECT * FROM sales_data WHERE rowid > %s', last_row_id)
    new_records = cursor.fetchall()
    return(new_records)

new_records = get_latest_records(last_row_id)

print("New rows on staging datawarehouse = ", len(new_records))

# Insert the additional records from MySQL into DB2 or PostgreSql data warehouse.
# The function insert_records must insert all the records passed to it into the sales_data table in IBM DB2 database or PostgreSql.

def insert_records(records):
    pass

insert_records(new_records)
print("New rows inserted into production datawarehouse = ", len(new_records))

# disconnect from mysql warehouse
connection.close()

# disconnect from PostgreSql data warehouse 
conn.close() 

# End of program

Implement All Functions

Let’s finally include insert_records() and run the entire automation process

# Import libraries required for connecting to mysql
import mysql.connector

# Import libraries required for connecting to  PostgreSql
import psycopg2

# connect to MySQL
connection = mysql.connector.connect(user='root', password='iBPymI980dmRhEns7J7Q3PRU',host='172.21.199.33',database='sales')

# create cursor
cursor = connection.cursor()



# Connect to  PostgreSql
# connectction details

dsn_hostname = '172.21.246.53'
dsn_user='postgres'        # e.g. "abc12345"
dsn_pwd ='jULCTPGfm4ZZIeKlnrSLduAG'      # e.g. "7dBZ3wWt9XN6$o0J"
dsn_port ="5432"                # e.g. "50000" 
dsn_database ="postgres"           # i.e. "BLUDB"


# create connection

conn = psycopg2.connect(
   database=dsn_database, 
   user=dsn_user,
   password=dsn_pwd,
   host=dsn_hostname, 
   port= dsn_port
)

#Crreate a cursor onject using cursor() method
cur = conn.cursor()

# Find out the last rowid from DB2 data warehouse or PostgreSql data warehouse
# The function get_last_rowid must return the last rowid of the table sales_data on the IBM DB2 database or PostgreSql.

def get_last_rowid():
    cur.execute('SELECT max(row_id) FROM sales_data')
    result = cur.fetchone()
    return(result)


last_row_id = get_last_rowid()
print("Last row id on production datawarehouse = ", last_row_id)

# List out all records in MySQL database with rowid greater than the one on the Data warehouse
# The function get_latest_records must return a list of all records that have a rowid greater than the last_row_id in the sales_data table in the sales database on the MySQL staging data warehouse.

def get_latest_records(last_row_id):
    cursor.execute('SELECT * FROM sales_data WHERE rowid > %s',last_row_id)
    new_records = cursor.fetchall()
    return(new_records)

new_records = get_latest_records(last_row_id)

print("New rows on staging datawarehouse = ", len(new_records))

# Insert the additional records from MySQL into DB2 or PostgreSql data warehouse.
# The function insert_records must insert all the records passed to it into the sales_data table in IBM DB2 database or PostgreSql.

def insert_records(new_records):
  for row in new_records:
    putin = f"INSERT INTO sales_data VALUES{row};"
    cur.execute(putin)

insert_records(new_records)
print("New rows inserted into production datawarehouse = ", len(new_records))

# disconnect from mysql warehouse
connection.close()

# disconnect from PostgreSql data warehouse 
conn.close() 

# End of program