ETL SCV JSON XML

Simple ETL


Import Data - Shell

  • Import the data using the terminal
wget https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMDeveloperSkillsNetwork-PY0221EN-SkillsNetwork/labs/module%206/Lab%20-%20Extract%20Transform%20Load/data/source.zip

Unzip File - Shell

  • Unzip the downloaded file
  • Now we have the list shown in the image:
    • etl_code.py (script file)
    • source.zip
    • all the files after unzipping
    • as you can see we have multiple files made up of 3 types: csv, json, xml
unzip source.zip

Import Using Pandas

Unzip File - Pandas

  • Let’s import the file using pandas (import pandas)
  • we’ll store the zip file in destination_directory
  • we’ll use zipfile to unzip it
  • we’ll use requests.get to fetch it (import requests)
  • Since we are using RStudio for this we need a couple of commands in R before we import the Python libraries
library(reticulate)
import pandas as pd
import requests
import zipfile

url = "https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMDeveloperSkillsNetwork-PY0221EN-SkillsNetwork/labs/module%206/Lab%20-%20Extract%20Transform%20Load/data/source.zip"

response = requests.get(url)

if response.status_code == 200:
        print("Code is OK, downloading file to local directory")
        with open("D:/data/local_zip.zip", "wb") as file:
                file.write(response.content)
                print("Zip file is saved locally at: D:/data/local_zip.zip")
# Extract local file
with zipfile.ZipFile("D:/data/local_zip.zip", "r") as local_zip:
        local_zip.extractall("D:/data/local_unzip")
        print("All files have been extracted")

Extract


Most of what follows is python and pandas

Import Libraries

  • The xml library can be used to parse the information from an .xml file format.
  • The .csv and .json file formats can be read using the pandas library.
  • You will use the pandas library to create a data frame format that will store the extracted data from any file.
  • To call the correct function for data extraction, you need to access the file format information. For this access, you can use the glob library.
  • To log the information correctly, you need the date and time information at the point of logging. For this information, you require the datetime package.
  • Import only the ElementTree function from the xml.etree library because you require that function to parse the data from an XML file format.

While glob, xml, and datetime are inbuilt features of Python, you need to install the pandas library

Global Variables

You also require two file paths that will be available globally in the code for all functions.

  • These are transformed_data.csv, to store the final output data that you can load to a database, and
  •  log_file.txt, that stores all the logs
  • So add those two statements right after the imports
import pandas as pd
import glob 
import xml.etree.ElementTree as ET 
from datetime import datetime 

# Gloval vars
log_file = "D:/data/local_unzip/log_file.txt" 
target_file = "D:/data/local_unzip/transformed_data.csv" 
  • Since we have 3 different file formats we’ll need 3 different functions one to extract each type
  • Name these three functions as extract_from_csv()extract_from_json(), and extract_from_xml()
  • Pass the data file as an argument, file_to_process, to each function

CSV Function

  • Extract csv data into df
# Extract csv data into df
def extract_from_csv(file_to_process): 
        dataframe = pd.read_csv(file_to_process) 
        return dataframe 

JSON Function

  • Extract json data into df
  • It requires an extra argument lines=True to enable the function to read the file as a JSON object on line to line basis as follows.
# Extract json data into a df
def extract_from_json(file_to_process): 
        dataframe = pd.read_json(file_to_process, lines=True) 
        return dataframe 

XML Function

  • To extract from an XML file, you need first to parse the data from the file using the ElementTree function
  • You can then extract relevant information from this data and append it to a pandas dataframe
  • Note: You must know the headers of the extracted data to write this function.
  • In this data, you extract “name”, “height”, and “weight” headers for different persons.
# Parse XML data first then assign to df
def extract_from_xml(file_to_process): 
        dataframe = pd.DataFrame(columns=["name", "height", "weight"]) 
        tree = ET.parse(file_to_process) 
        root = tree.getroot() 
        for person in root: 
                name = person.find("name").text 
                height = float(person.find("height").text) 
                weight = float(person.find("weight").text) 
                dataframe = pd.concat([dataframe, pd.DataFrame([{"name":name, "height":height, "weight":weight}])], ignore_index=True) 
        return dataframe 

GLOB

  • Since we have 3 different data types in 3 different file types we need to loop through all the files in the folder one type at a time
  • This is where glob comes in handy
  • We’ll extract each file from each type and concatenate it to the df
  • When we are done we should have a df with all the data from all the file types together in extracted_data

Extract Function

# Loop through file types and call each function according to each file type
def extract(): 
        extracted_data = pd.DataFrame(columns=['name','height','weight']) 
        # create an empty data frame to hold extracted data 
     
        # process all csv files 
        for csvfile in glob.glob("D:/data/local_unzip/*.csv"): 
                extracted_data = pd.concat([extracted_data, pd.DataFrame(extract_from_csv(csvfile))], ignore_index=True) 
         
        # process all json files 
        for jsonfile in glob.glob("D:/data/local_unzip/*.json"): 
                extracted_data = pd.concat([extracted_data, pd.DataFrame(extract_from_json(jsonfile))], ignore_index=True) 
     
        # process all xml files 
        for xmlfile in glob.glob("D:/data/local_unzip/*.xml"): 
                extracted_data = pd.concat([extracted_data, pd.DataFrame(extract_from_xml(xmlfile))], ignore_index=True) 
         
        return extracted_data 

Transform


  • The height in the extracted data is in inches, and the weight is in pounds. However, for our application, the height is required to be in meters, and the weight is required to be in kilograms, rounded to two decimal places.
  • Therefore, we need to write the function to perform the unit conversion for the two parameters.
  • The name of this function will be transform(), and it will receive the extracted dataframe as the input.
  • Since the dataframe is in the form of a dictionary with three keys, “name”, “height”, and “weight”, each of them having a list of values, we will apply the transform function on the entire list at once.
  • The output of this function will now be a dataframe where the “height” and “weight” parameters will be modified to the required format.
# Transform the data
def transform(data): 
        '''Convert inches to meters and round off to two decimals 
        1 inch is 0.0254 meters '''
        data['height'] = round(data.height * 0.0254,2) 
 
        '''Convert pounds to kilograms and round off to two decimals 
        1 pound is 0.45359237 kilograms '''
        data['weight'] = round(data.weight * 0.45359237,2) 
    
        return data 

Load


Save to CSV

  • We need to load the transformed data to a CSV file that you can use to load to a database as per requirement.
  • To load the data, we need a function load_data() that accepts the transformed data as a dataframe and the target_file path.
  • We need to use the to_csv attribute of the dataframe in the function
# Load data to a csv file
def load_data(target_file, transformed_data): 
        transformed_data.to_csv(target_file) 

Log


Implement the logging operation to record the progress of the different operations.

  • For each operation, we need to record a message, along with its timestamp, in the log_file
  • To record the message, we need to implement a function log_progress() that accepts the log message as the argument.
  • The function captures the current date and time using the datetime function from the datetime library.
  • The use of this function requires the definition of a date-time format, and
  • We need to convert the timestamp to a string format using the strftime attribute
# Log operation progress
def log_progress(message): 
        timestamp_format = '%Y-%h-%d-%H:%M:%S' # Year-Monthname-Day-Hour-Minute-Second 
        now = datetime.now() # get current timestamp 
        timestamp = now.strftime(timestamp_format) 
        with open(log_file,"a") as f: 
                f.write(timestamp + ',' + message + '\n') 

Test & Log


To test and log the functions and the entire ETL process, let’s follow statements listed next:

# Test and Log the entire ETL process

# Log the initialization of the ETL process 
log_progress("ETL Job Started") 
 
# Log the beginning of the Extraction process 
log_progress("Extract phase Started") 
extracted_data = extract() 
<string>:8: FutureWarning: The behavior of DataFrame concatenation with empty or all-NA entries is deprecated. In a future version, this will no longer exclude empty or all-NA columns when determining the result dtypes. To retain the old behavior, exclude the relevant entries before the concat operation.
<string>:10: FutureWarning: The behavior of DataFrame concatenation with empty or all-NA entries is deprecated. In a future version, this will no longer exclude empty or all-NA columns when determining the result dtypes. To retain the old behavior, exclude the relevant entries before the concat operation.
<string>:10: FutureWarning: The behavior of DataFrame concatenation with empty or all-NA entries is deprecated. In a future version, this will no longer exclude empty or all-NA columns when determining the result dtypes. To retain the old behavior, exclude the relevant entries before the concat operation.
<string>:10: FutureWarning: The behavior of DataFrame concatenation with empty or all-NA entries is deprecated. In a future version, this will no longer exclude empty or all-NA columns when determining the result dtypes. To retain the old behavior, exclude the relevant entries before the concat operation.
 
# Log the completion of the Extraction process 
log_progress("Extract phase Ended") 
 
# Log the beginning of the Transformation process 
log_progress("Transform phase Started") 
transformed_data = transform(extracted_data) 
print("Transformed Data") 
Transformed Data
print(transformed_data) 
      name  height  weight  ...  Unnamed: 0.2  Unnamed: 0.1  Unnamed: 0
0     alex    1.67   51.25  ...           NaN           NaN         NaN
1     ajay    1.82   61.91  ...           NaN           NaN         NaN
2    alice    1.76   69.41  ...           NaN           NaN         NaN
3     ravi    1.73   64.56  ...           NaN           NaN         NaN
4      joe    1.72   65.45  ...           NaN           NaN         NaN
..     ...     ...     ...  ...           ...           ...         ...
931   ivan    1.72   51.77  ...           NaN           NaN         NaN
932  simon    1.72   50.97  ...           NaN           NaN         NaN
933  jacob    1.70   54.73  ...           NaN           NaN         NaN
934  cindy    1.69   57.81  ...           NaN           NaN         NaN
935   ivan    1.72   51.77  ...           NaN           NaN         NaN

[936 rows x 26 columns]
 
# Log the completion of the Transformation process 
log_progress("Transform phase Ended") 
 
# Log the beginning of the Loading process 
log_progress("Load phase Started") 
load_data(target_file,transformed_data) 
 
# Log the completion of the Loading process 
log_progress("Load phase Ended") 
 
# Log the completion of the ETL process 
log_progress("ETL Job Ended") 

Practice

https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMDeveloperSkillsNetwork-PY0221EN-SkillsNetwork/labs/module%206/Lab%20-%20Extract%20Transform%20Load/data/datasource.zip

Complete the following practice exercises:

  1. Create a folder data_source and use the terminal shell to change the current directory to \home\project\data_source. Create a file etl_practice.py in this folder.

  2. Download and unzip the data available in the link shared above.

  3. The data available has four headers: ‘car_model’, ‘year_of_manufacture’, ‘price’, ‘fuel’. Implement the extraction process for the CSV, JSON, and XML files.

  4. Transform the values under the ‘price’ header such that they are rounded to 2 decimal places.

  5. Implement the loading function for the transformed data to a target file, transformed_data.csv.

  6. Implement the logging function for the entire process and save it in log_file.txt.

  7. Test the implemented functions and log the events as done in the lab.

Please note that the solutions for this practice exercise are not provided to mo