> mysql
Create DB
ETL data from Staging to DW
As part of this role we need to keep the data synchronized between different databases/data warehouses as part of our daily routine.
Objectives
One task that is routinely performed is the sync up of staging data warehouse and production data warehouse. Automating this sync up will save you a lot of time and standardize your process.
We will use a set of python scripts to perform the incremental data load from MySQL server which acts as a staging warehouse to PostgreSQL which is a production data warehouse. This script will be scheduled by the data engineers to sync up the data between the staging and production data warehouse.
- Connect to IBM DB2/any cloud service or PostgreSQL data warehouse and identify the last row on it.
- Connect to MySQL staging data warehouse and find all rows later than the last row on the datawarehouse.
- Insert the new data from the MySQL staging data warehouse into the IBM DB2 or PostgreSQL production data warehouse.
Setup Staging W - MySQL
- Start MySQL server instance on the cloud
- Copy Connection information
- Start server by clicking on MySQL CLI
- Your prompt will change to
- Create db named: sales
> create database sales;
mysql1 row affected (0.01 sec) Query OK,
Download Data
- From a new terminal
- Download the data file from this link into local directory: sales.sql
~../project$ wget https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-DB0321EN-SkillsNetwork/ETL/sales.sql
-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)|169.63.118.104|:443... connected.
Connecting to cf200 OK
HTTP request sent, awaiting response... 349956 (342K) [application/x-sql]
Length: 'sales.sql'
Saving to:
100%[================>] 341.75K --.-KB/s in 0.002s
sales.sql
2024-11-21 11:58:39 (170 MB/s) - 'sales.sql' saved [349956/349956]
Populate DB from SQL
- Connect to db
- sourImport the data into the db: sales using the sales.sql file
> use sales;
mysql
Database changed> source sales.sql;
mysql
...13836 rows affected (0.16 sec)
Query OK, 13836 Duplicates: 0 Warnings: 0 Records:
List Tables
> SHOW FULL TABLES WHERE table_type = 'BASE TABLE';
mysql+-----------------+------------+
| Tables_in_sales | Table_type |
+-----------------+------------+
| sales_data | BASE TABLE |
+-----------------+------------+
1 row in set (0.01 sec)
MySQL Connection Script
# This program requires the python module mysql-connector-python to be installed.
# Install it using the below command
# python3.8 pip install mysql-connector-python
import mysql.connector
# connect to database
# You can get the Hostname and Password from the connection information section of Mysql
= mysql.connector.connect(user='root', password='zmjfo3k3aUjgjT8XAsYfaIJO',host='172.21.221.174',database='sales')
connection
# create cursor
= connection.cursor()
cursor
# create table
= """CREATE TABLE IF NOT EXISTS products(
SQL
rowid int(11) NOT NULL AUTO_INCREMENT PRIMARY KEY,
product varchar(255) NOT NULL,
category varchar(255) NOT NULL
)"""
cursor.execute(SQL)
print("Table created")
# insert data
= """INSERT INTO products(product,category)
SQL VALUES
("Television","Electronics"),
("Laptop","Electronics"),
("Mobile","Electronics")
"""
cursor.execute(SQL)
connection.commit()
# query data
= "SELECT * FROM products"
SQL
cursor.execute(SQL)
for row in cursor.fetchall():
print(row)
# close connection
connection.close()
Install Package
Before executing the script we need to install
- python3.11 -m pip install mysql-connector-python;
~../$ python3.11 -m pip install mysql-connector-python;
-packages is not writeable
Defaulting to user installation because normal site-connector-python
Collecting mysql-9.1.0-cp311-cp311-manylinux_2_28_x86_64.whl.metadata (6.0 kB)
Downloading mysql_connector_python-9.1.0-cp311-cp311-manylinux_2_28_x86_64.whl (34.4 MB)
Downloading mysql_connector_python34.4/34.4 MB 29.6 MB/s eta 0:00:00
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ -connector-python
Installing collected packages: mysql-connector-python-9.1.0
Successfully installed mysql
is available: 24.2 -> 24.3.1
[notice] A new release of pip --upgrade pip [notice] To update, run: pip3 install
Connect to MySQL w/Script
Let’s use the python script to connect to the server
~..$ python3.11 mysqlconnect.py
Table created1, 'Television', 'Electronics')
(2, 'Laptop', 'Electronics')
(3, 'Mobile', 'Electronics') (
View Table
- As you see we have 13836 rows with last rowid=13939
Setup Working DW PostgreSQL
- Start PostgreSQL server instance on the cloud
- Copy Connection information
- Start server by clicking on PostgreSQL CLI
- Your prompt will change to
-# postgres
List dbs
-# \db
postgres
List of tablespaces| Owner | Location
Name ------------+----------+----------
| postgres |
pg_default | postgres |
pg_global 2 rows) (
List Tables
-# \dt
postgres
List of relations| Name | Type | Owner
Schema --------+------------+-------+----------
| products | table | postgres
public | sales_data | table | postgres
public 2 rows) (
Create Py Script to Connect to DW
Download Python Connection Script
~../project$ wget https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-DB0321EN-SkillsNetwork/ETL/postgresqlconnect.py
PostgreSQL Connection Script
# This program requires the python module ibm-db to be installed.
# Install it using the below command
# python3 -m pip install psycopg2
import psycopg2
# connectction details
= '172.21.224.129'
dsn_hostname ='postgres' # e.g. "abc12345"
dsn_user='AS9FvGlUdHjBJlpmHTDd8rIp' # e.g. "7dBZ3wWt9XN6$o0J"
dsn_pwd ="5432" # e.g. "50000"
dsn_port ="postgres" # i.e. "BLUDB"
dsn_database
# create connection
= psycopg2.connect(
conn =dsn_database,
database=dsn_user,
user=dsn_pwd,
password=dsn_hostname,
host= dsn_port
port
)
#Crreate a cursor onject using cursor() method
= conn.cursor()
cursor
# create table
= """CREATE TABLE IF NOT EXISTS products(rowid INTEGER PRIMARY KEY NOT NULL,product varchar(255) NOT NULL,category varchar(255) NOT NULL)"""
SQL
# Execute the SQL statement
cursor.execute(SQL)
print("Table created")
# insert data
"INSERT INTO products(rowid,product,category) VALUES(1,'Television','Electronics')");
cursor.execute(
"INSERT INTO products(rowid,product,category) VALUES(2,'Laptop','Electronics')");
cursor.execute(
"INSERT INTO products(rowid,product,category) VALUES(3,'Mobile','Electronics')");
cursor.execute(
conn.commit()
# insert list of Records
=[(5,'Mobile','Electronics'),(6,'Mobile','Electronics')]
list_ofrecords
= conn.cursor()
cursor
for row in list_ofrecords:
="INSERT INTO products(rowid,product,category) values(%s,%s,%s)"
SQL;
cursor.execute(SQL,row)
conn.commit()
# query data
'SELECT * from products;')
cursor.execute(= cursor.fetchall()
rows
conn.commit()
conn.close()for row in rows:
print(row)
Install Package
Before executing the script we need to install
- python3 -m pip install psycopg2;
~../$ python3 -m pip install psycopg2;
-packages is not writeable
Defaulting to user installation because normal site
Collecting psycopg2-2.9.10.tar.gz (385 kB)
Downloading psycopg2385.7/385.7 KB 138.9 kB/s eta 0:00:00
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Preparing metadata (setup.py) ... donefor collected packages: psycopg2
Building wheels for psycopg2 (setup.py) ... done
Building wheel for psycopg2: filename=psycopg2-2.9.10-cp310-cp310-linux_x86_64.whl size=499308 sha256=0480ddff54df04dff4a9a89bc2f8c72f12f595ba7c6e65987161c9ab11d77591
Created wheel in directory: /home/theia/.cache/pip/wheels/51/41/e0/2912ad51b01f454d26dfb26e5cc5923874656749b9e83943a8
Stored
Successfully built psycopg2
Installing collected packages: psycopg2-2.9.10 Successfully installed psycopg2
Connect to PostgreSQL w/Script
- To test the connection run this
~..\$ python3 postgresqlconnect.py
Table created1, 'Television', 'Electronics')
(2, 'Laptop', 'Electronics')
(3, 'Mobile', 'Electronics')
(5, 'Mobile', 'Electronics')
(6, 'Mobile', 'Electronics') (
Download Data
- Get data from link below
~..\$ wget https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/sales-csv3mo8i5SHvta76u7DzUfhiw.csv
-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)|169.63.118.104|:443... connected.
Connecting to cf200 OK
HTTP request sent, awaiting response... 545993 (533K) [text/csv]
Length: 'sales-csv3mo8i5SHvta76u7DzUfhiw.csv'
Saving to:
-csv3mo8i5SHvta 100%[======================>] 533.20K --.-KB/s in 0.003s
sales
2024-11-21 12:34:29 (150 MB/s) - 'sales-csv3mo8i5SHvta76u7DzUfhiw.csv' saved [545993/545993]
Populate Table w/CSV
- Import the data from the csv file into table: sales_data
- We have total rows = 12186
- Last rowid = 12289
Open pgAdmin4
Create Table
- Name it: sales_data
Columns
- rowid, product_id, customer_id, price, quantity, timeestamp
Note: By default, the sales.csv file contains price and timestamp columns, which are not present in sales.sql. Therefore, you can use the below lines of code in your script to include price and timestamp columns when creating the table in Postgres.
Download Automation Script
- From pgAdmin download the following script that’s saved in automation.py
- Save file to /var/lib/pgadmin/
- Download from: https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-DB0321EN-SkillsNetwork/ETL/automation.py
~..|$ wget https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-DB0321EN-SkillsNetwork/ETL/automation.py -P /var/lib/pgadmin
After receiving the error, I downloaded the file locally, then I uploaded as data to the table but didn’t use it, at least I got it up to the directory
Automation.py
- Here is the content of automation.py which will be used to execute the tasks
# Import libraries required for connecting to mysql
# Import libraries required for connecting to DB2 or PostgreSql
# Connect to MySQL
# Connect to DB2 or PostgreSql
# Find out the last rowid from DB2 data warehouse or PostgreSql data warehouse
# The function get_last_rowid must return the last rowid of the table sales_data on the IBM DB2 database or PostgreSql.
def get_last_rowid():
'SELECT max(rowid) FROM public.sales_data')
p_cursor.execute(
= get_last_rowid()
last_row_id print("Last row id on production datawarehouse = ", last_row_id)
# List out all records in MySQL database with rowid greater than the one on the Data warehouse
# The function get_latest_records must return a list of all records that have a rowid greater than the last_row_id in the sales_data table in the sales database on the MySQL staging data warehouse.
def get_latest_records(rowid):
pass
= get_latest_records(last_row_id)
new_records
print("New rows on staging datawarehouse = ", len(new_records))
# Insert the additional records from MySQL into DB2 or PostgreSql data warehouse.
# The function insert_records must insert all the records passed to it into the sales_data table in IBM DB2 database or PostgreSql.
def insert_records(records):
pass
insert_records(new_records)print("New rows inserted into production datawarehouse = ", len(new_records))
# disconnect from mysql warehouse
# disconnect from DB2 or PostgreSql data warehouse
# End of program
AUTOMATE Data Loading
One of the routine tasks that is carried out around a data warehouse is the extraction of daily new data from the operational database and loading it into the data warehouse. In this section, we will automate the extraction of incremental data, and loading it into the data warehouse.
Create Functions
Extract Last rowid from DW
- We will write a function that will connect to the working DW (PostgreSQL) and
- extract the last rowid from the working DW (PostgreSQL in this case)
- This way we can extract any rows added after this row from the staging DW (MySQL in this case)
- name the function: get_last_rowid()
# In SQL it would be
1;
SELECT rowid FROM public.sales_data ORDER BY rowid DESC LIMIT
ORmax(rowid) FROM public.sales_data;
SELECT = 12289
OUTPUT
In Python it would be:
= """SELECT max(rowid) FROM public.sales_data"""
rowidSQL cursor.execute(rowidSQL)
Function Script
def get_last_rowid():
'SELECT max(rowid) FROM sales_data')
cur.execute(= cur.fetchone()
result return(result)
= get_last_rowid()
last_row_id print("Last row id on production datawarehouse = ", last_row_id)
Extract Latest rows from Staging W
- We will write a function that will connect to MySQL the staging warehouse and
- return all records later than the given last_rowid that was collected from the previous function
- name the function: get_latest_records()
- As you see above the last row as of the running of this script is 12289 we’ll verify that after we ran automation
- As you see below there are 1650 rows/records with a rowid greater than 12289
- So the total rows after we run the automation should be 12186 + 1650 = 13836
# In SQL it would be
* FROM `sales_data` WHERE rowid > 12289;
SELECT is in the image below The output
Function Script
def get_latest_records(last_row_id):
'SELECT * FROM sales_data WHERE rowid > %s', last_row_id)
cursor.execute(= cursor.fetchall()
new_records return(new_records)
= get_latest_records(last_row_id)
new_records
print("New rows on staging datawarehouse = ", len(new_records))
Insert Extracted Records into DW
- Now that we’ve fetched all the latest records from the staging DW (MySQL)
- we need to insert them at the end of the working DW (postgreSQL)
- This function must connect to the working DW (PostgreSQL) and insert all the given records
- name it: insert_records()
Function Script
def insert_records(new_records):
for row in new_records:
= f"INSERT INTO sales_data VALUES{row};"
putin
cur.execute(putin)
insert_records(new_records)print("New rows inserted into production datawarehouse = ", len(new_records))
Run Automation Script
Run the automation.py script to see if our functions worked as expected
Implement get_last_rowid
- After implementing this function in automation.py
- execute the python file to check if the value retrieved is accurate
# Import libraries required for connecting to mysql
# Import libraries required for connecting to PostgreSql
import psycopg2
# Connect to PostgreSql
# connectction details
= '172.21.224.129'
dsn_hostname ='postgres' # e.g. "abc12345"
dsn_user='AS9FvGlUdHjBJlpmHTDd8rIp' # e.g. "7dBZ3wWt9XN6$o0J"
dsn_pwd ="5432" # e.g. "50000"
dsn_port ="postgres" # i.e. "BLUDB"
dsn_database
# create connection
= psycopg2.connect(
conn =dsn_database,
database=dsn_user,
user=dsn_pwd,
password=dsn_hostname,
host= dsn_port
port
)
#Crreate a cursor onject using cursor() method
= conn.cursor()
cur
# Find out the last rowid from DB2 data warehouse or PostgreSql data warehouse
# The function get_last_rowid must return the last rowid of the table sales_data on the IBM DB2 database or PostgreSql.
def get_last_rowid():
'SELECT max(rowid) FROM sales_data')
cur.execute(= cur.fetchone()
result return(result)
= get_last_rowid()
last_row_id print("Last row id on production datawarehouse = ", last_row_id)
# disconnect from PostgreSql data warehouse
conn.close()# End of program
Implement get_latest_records
Here is the code for the get_latest_records
# Import libraries required for connecting to mysql
import mysql.connector
# Import libraries required for connecting to PostgreSql
import psycopg2
# connect to MySQL
= mysql.connector.connect(user='root', password='zmjfo3k3aUjgjT8XAsYfaIJO',host='172.21.221.174',database='sales')
connection
# create cursor
= connection.cursor()
cursor
# Connect to PostgreSql
# connectction details
= '172.21.224.129'
dsn_hostname ='postgres' # e.g. "abc12345"
dsn_user='AS9FvGlUdHjBJlpmHTDd8rIp' # e.g. "7dBZ3wWt9XN6$o0J"
dsn_pwd ="5432" # e.g. "50000"
dsn_port ="postgres" # i.e. "BLUDB"
dsn_database
# create connection
= psycopg2.connect(
conn =dsn_database,
database=dsn_user,
user=dsn_pwd,
password=dsn_hostname,
host= dsn_port
port
)
#Crreate a cursor onject using cursor() method
= conn.cursor()
cur
# Find out the last rowid from DB2 data warehouse or PostgreSql data warehouse
# The function get_last_rowid must return the last rowid of the table sales_data on the IBM DB2 database or PostgreSql.
def get_last_rowid():
'SELECT max(rowid) FROM sales_data')
cur.execute(= cur.fetchone()
result return(result)
= get_last_rowid()
last_row_id print("Last row id on production datawarehouse = ", last_row_id)
# List out all records in MySQL database with rowid greater than the one on the Data warehouse
# The function get_latest_records must return a list of all records that have a rowid greater than the last_row_id in the sales_data table in the sales database on the MySQL staging data warehouse.
def get_latest_records(last_row_id):
'SELECT * FROM sales_data WHERE rowid > %s', last_row_id)
cursor.execute(= cursor.fetchall()
new_records return(new_records)
= get_latest_records(last_row_id)
new_records
print("New rows on staging datawarehouse = ", len(new_records))
# Insert the additional records from MySQL into DB2 or PostgreSql data warehouse.
# The function insert_records must insert all the records passed to it into the sales_data table in IBM DB2 database or PostgreSql.
def insert_records(records):
pass
insert_records(new_records)print("New rows inserted into production datawarehouse = ", len(new_records))
# disconnect from mysql warehouse
connection.close()
# disconnect from PostgreSql data warehouse
conn.close()
# End of program
Implement All Functions
Let’s finally include insert_records() and run the entire automation process
# Import libraries required for connecting to mysql
import mysql.connector
# Import libraries required for connecting to PostgreSql
import psycopg2
# connect to MySQL
= mysql.connector.connect(user='root', password='iBPymI980dmRhEns7J7Q3PRU',host='172.21.199.33',database='sales')
connection
# create cursor
= connection.cursor()
cursor
# Connect to PostgreSql
# connectction details
= '172.21.246.53'
dsn_hostname ='postgres' # e.g. "abc12345"
dsn_user='jULCTPGfm4ZZIeKlnrSLduAG' # e.g. "7dBZ3wWt9XN6$o0J"
dsn_pwd ="5432" # e.g. "50000"
dsn_port ="postgres" # i.e. "BLUDB"
dsn_database
# create connection
= psycopg2.connect(
conn =dsn_database,
database=dsn_user,
user=dsn_pwd,
password=dsn_hostname,
host= dsn_port
port
)
#Crreate a cursor onject using cursor() method
= conn.cursor()
cur
# Find out the last rowid from DB2 data warehouse or PostgreSql data warehouse
# The function get_last_rowid must return the last rowid of the table sales_data on the IBM DB2 database or PostgreSql.
def get_last_rowid():
'SELECT max(row_id) FROM sales_data')
cur.execute(= cur.fetchone()
result return(result)
= get_last_rowid()
last_row_id print("Last row id on production datawarehouse = ", last_row_id)
# List out all records in MySQL database with rowid greater than the one on the Data warehouse
# The function get_latest_records must return a list of all records that have a rowid greater than the last_row_id in the sales_data table in the sales database on the MySQL staging data warehouse.
def get_latest_records(last_row_id):
'SELECT * FROM sales_data WHERE rowid > %s',last_row_id)
cursor.execute(= cursor.fetchall()
new_records return(new_records)
= get_latest_records(last_row_id)
new_records
print("New rows on staging datawarehouse = ", len(new_records))
# Insert the additional records from MySQL into DB2 or PostgreSql data warehouse.
# The function insert_records must insert all the records passed to it into the sales_data table in IBM DB2 database or PostgreSql.
def insert_records(new_records):
for row in new_records:
= f"INSERT INTO sales_data VALUES{row};"
putin
cur.execute(putin)
insert_records(new_records)print("New rows inserted into production datawarehouse = ", len(new_records))
# disconnect from mysql warehouse
connection.close()
# disconnect from PostgreSql data warehouse
conn.close()
# End of program