ETL Pipeline with Apache Airflow DAG

In this section we’ll write a pipeline that analyzes the web server log file, extracts the required lines(ending with html) and fields(time stamp, size ) and transforms (bytes to mb) and load (append to an existing file.)

Objectives


In this section we will author an Apache Airflow DAG that will:

  • Extract data from a web server log file
  • Transform the data
  • Load the transformed data into a tar file

Setup


  • Start Apache Airflow.
  • I’ll be using an instance of A Airflow on the cloud

Data


  • Download the dataset from the source to
  • The destination: /home/project/airflow/dags/capstone
  • -P will create the directory capstone to save the file to
  • Source : https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-DB0321EN-SkillsNetwork/ETL/accesslog.txt
  • It appears that each line has 7 fields and we only need the first field which is the IP address
~../project$mkdir /home/project/airflow/dags/capstone

~../project$ wget https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-DB0321EN-SkillsNetwork/ETL/accesslog.txt -P /home/project/airflow/dags/capstone 

Connecting to cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)|169.63.118.104|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2370789 (2.3M) [text/plain]
Saving to: '/home/project/airflow/dags/capstone/accesslog.txt'

accesslog.txt              100%[=====================================>]   2.26M  --.-KB/s    in 0.01s   

2024-11-22 13:45:55 (152 MB/s) - '/home/project/airflow/dags/capstone/accesslog.txt' saved [2370789/2370789]

Create a DAG


Create Script File

  • At cmd prompt we need to create a file that will hold our python script for the pipeline.
  • Since we are using a cloud instance we can go to File -> New -> Save file as process_web_log.py

Imports

  • Let’s first import the libraries we will need
# ___  IMPORT libraries
from datetime import timedelta

# The DAG object; we'll need this to instantiate a DAG
from airflow.models import DAG

# Operators; you need this to write tasks!
from airflow.operators.python import PythonOperator

# This makes scheduling easy
from airflow.utils.dates import days_ago import requests

# This is used to save txt file to tar file
import tarfile

Set Variables

  • Let’s set some variables to make our code more readable
input_file = '/home/project/airflow/dags/capstone/accesslog.txt'
extracted_file = '/home/project/airflow/dags/capstone/extracted_data.txt'
transformed_file = '/home/project/airflow/dags/capstone/transformed_data.txt'
load_file = '/home/project/airflow/dags/capstone/weblog.tar'

Task1 - Define DAG Arguments

Create a DAG with these arguments.

  • owner
  • start_date
  • email
  • You may define any suitable additional arguments.
# defining DAG arguments, days_ago(0) means runs today
default_args = {
    'owner': 'Me',
    'start_date': days_ago(0),
    'email': ['notyourbusiness@heehaa.com'],
    'retries': 1,
    'retry_delay': timedelta(minutes=2),
}

Task 2 - Define the DAG

  • Create a DAG named process_web_log that runs daily.
# define the DAG, days=1 means it runs daily
dag = DAG(
    dag_id = 'etl_dag',
    default_args=default_args,
    description='This what Santa does',
    schedule_interval=timedelta(days=1),
)

Task 3 - Extract

  • Create a task named extract_data.
  • This task should extract the ipaddress field from the web server log file and save it into a file named extracted_data.txt

Create Function for Task 1

# Create Function to Extract IP address from the file which happens to be the first field out of 7
def extractip():
  global input_file
  print("Inside IP address Extract Function")
  with open(input_file, 'r') as infile, \
    open(extracted_file, 'w') as outfile:
      for line in infile:
        fields = line.split(' ')
        if len(fields) >= 7:
          field_1 = fields[0]
          outfile.write(field_1 + "\n") 
# Using bash grep
extract = BashOperator (
        task_id = 'extract',
        bash_command = """ grep -oE '^[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}' \\
        /home/project/airflow/dags/accesslog.txt \\
        > /home/project/airflow/dags/extracted-data.txt """,
        dag=dag,
)

Define Task 1

# ___ TASK DEFINITION : EXTRACT to call the `extractip` function
extract_data = PythonOperator(
    task_id='extract_data',
    python_callable=extractip,
    dag=dag,
)

Task 4 - Transform

  • Create a task named transform_data.
  • This task should filter out all the occurrences of ipaddress “198.46.149.143” from extracted_data.txt and save the output to a file named transformed_data.txt.

Create Function for Task 2

# ___  CREATE FUNCTION2 FOR TASK 2 to filter out the occurrences of ip="198.46.149.143"
def transform():
    global extracted_file, transformed_file
    print("Inside Transform")
    with open(extracted_file, 'r') as infile, \
        open(transformed_file, 'w') as outfile:
            extracted_lines =[]
            for line in infile:
                if '198.46.149.143' in line:
                    extracted_lines.append(line.strip())
            outfile.write('\n'.join(extracted_lines))

Define Task 2

# ___ TASK DEFINITION : Transform_data to call the 'transform' function
transform_data = PythonOperator(
    task_id='transform_data',
    python_callable= transform,
    dag=dag,
)

Task 5 - Create a task to Load

  • Create a task named load_data.
  • This task should archive the file transformed_data.txt into a tar file named weblog.tar.

Create Function for Task 3

# ___ CREATE FUNCTION to load data from transformed_data.txt to a tar file weblog.tar
def load():
  global transformed_file, load_file
  print("Inside Load_data Function")
  # Open the output tar file in write mode and write txt file to it
  with tarfile.open(load_file, 'w') as tar:
      # Add the txt file to the tar file
      tar.add(transformed_file)

Define Task 3

# ___ TASK DEFINITION: Load transformed txt file to tar file
load_data = PythonOperator(
  task_id = 'load_data'
  python_callable = load,
  dag = dag,
)

Task 6 - Define the task pipeline

Define the task pipeline as per the details given below:

Task Functionality
First task extract_data
Second task transform_data
Third task load_data
# ___  PIPELINE
extract_data >> transform_data >> load_data

Get DAG Operational


  • Save the DAG you defined into a file named process_web_log.py.

Set Airflow Home

~$ export AIRFLOW_HOME=/home/project/airflow
~$ echo $AIRFLOW_HOME
/home/project/airflow

Task 7 - Submit the DAG

  • Submit process_web_log.py file into the directory of Airflow Home/dags
  • Now you see the python file inside the Airflow/dags directory
~$ cp process_web_log.py $AIRFLOW_HOME/dags

Verify DAG Submission

~$ airflow dags list | grep "process_web_log"           
etl_dag   | /home/project/airflow/dags/process_web_log.py                                                                            | Me      | True  

List All Tasks

$ airflow tasks list etl_dag

extract_data
load_data
transform_data

Check for Errors

  • Corrected error by giving permissions
$ airflow dags list-import-errors

No data found

Permission Denied Reading File

I need to give permission to directory to allow DAG to read it

~$ sudo chown -R 100999 /home/project/airflow/dags/capstone
~$ sudo chmod -R g+rw /home/project/airflow/dags/capstone

Task 8 - Unpause the DAG

theia@theiadocker-emhrcf:/home/project$ airflow dags unpause etl_dag
[2024-11-22T22:23:15.424+0000] {dagbag.py:545} INFO - Filling up the DagBag from /home/project/airflow/dags
dag_id  | is_paused
========+==========
etl_dag | True

theia@theiadocker-emhrcf:/home/project$ airflow dags unpause etl_dag
[2024-11-22T22:23:39.648+0000] {dagbag.py:545} INFO - Filling up the DagBag from /home/project/airflow/dags
No paused DAGs were found

Task 9 - Monitor the DAG

Complete Script


# ___  IMPORT libraries
from datetime import timedelta

# The DAG object; we'll need this to instantiate a DAG
from airflow.models import DAG

# Operators; you need this to write tasks!
from airflow.operators.python import PythonOperator

# This makes scheduling easy
from airflow.utils.dates import days_ago

# This is used to save txt file to tar file
import tarfile


# ____  SET VARIABLES
input_file = '/home/project/airflow/dags/capstone/accesslog.txt'
extracted_file = '/home/project/airflow/dags/capstone/extracted_data.txt'
transformed_file = '/home/project/airflow/dags/capstone/transformed_data.txt'
load_file = '/home/project/airflow/dags/capstone/weblog.tar'


# ___  ARGUMENTS
# defining DAG arguments, days_ago(0) means runs today
default_args = {
    'owner': 'Me',
    'start_date': days_ago(0),
    'email': ['notyourbusiness@heehaa.com'],
    'retries': 1,
    'retry_delay': timedelta(minutes=2),
}

# ___  DAG DEFINITION
# define the DAG, days=1 means it runs daily
dag = DAG(
    dag_id = 'etl_dag',
    default_args=default_args,
    description='This what Santa does',
    schedule_interval=timedelta(days=1),
)

# ___  FUNCTION DEFINITIONS
# Create Function 1 for Task 1 to Extract IP address from the file which happens to be the first field out of 7
def extractip():
  global input_file
  print("Inside IP address Extract Function")
  with open(input_file, 'r') as infile, \
    open(extracted_file, 'w') as outfile:
      for line in infile:
        fields = line.split(' ')
        if len(fields) >= 7:
          field_1 = fields[0]
          outfile.write(field_1 + "\n")
          
# ___  CREATE FUNCTION 2 FOR TASK 2 to filter out the occurrences of ip="198.46.149.143"
def transform():
    global extracted_file, transformed_file
    print("Inside Transform")
    with open(extracted_file, 'r') as infile, \
        open(transformed_file, 'w') as outfile:
            extracted_lines =[]
            for line in infile:
                if '198.46.149.143' in line:
                    extracted_lines.append(line.strip())
            outfile.write('\n'.join(extracted_lines))         
          
          
# ___ CREATE FUNCTION 3 to load data from transformed_data.txt to a tar file weblog.tar
def load():
  global transformed_file, load_file
  print("Inside Load_data Function")
  # Open the output tar file in write mode and write txt file to it
  with tarfile.open(load_file, 'w') as tar:
      # Add the txt file to the tar file
      tar.add(transformed_file)



# ___  TASK DEFINITIONS
# ___ TASK DEFINITION : EXTRACT to call the `extractip` function
extract_data = PythonOperator(
    task_id='extract_data',
    python_callable=extractip,
    dag=dag,
)
# ___ TASK DEFINITION : Transform_data to call the 'transform' function
transform_data = PythonOperator(
    task_id='transform_data',
    python_callable= transform,
    dag=dag,
)

# ___ TASK DEFINITION: Load transformed txt file to tar file
load_data = PythonOperator(
  task_id = 'load_data',
  python_callable = load,
  dag = dag,
)



# ___  PIPELINE
extract_data >> transform_data >> load_data

Scripts to test DAGs


Extract Test

# ____  SET VARIABLES
input_file = '/home/project/airflow/dags/capstone/accesslog.txt'
extracted_file = '/home/project/airflow/dags/capstone/extracted_data.txt'
transformed_file = '/home/project/airflow/dags/capstone/transformed_data.txt'
load_file = '/home/project/airflow/dags/capstone/weblog.tar'


print("Inside IP address Extract Function")
with open(input_file, 'r') as infile, \
    open(extracted_file, 'w') as outfile:
      for line in infile:
        fields = line.split(' ')
        if len(fields) >= 7:
          field_1 = fields[0]
          outfile.write(field_1 + "\n")

Transform Test

input_file = '/home/project/airflow/dags/capstone/accesslog.txt'
extracted_file = '/home/project/airflow/dags/capstone/extracted_data.txt'
transformed_file = '/home/project/airflow/dags/capstone/transformed_data.txt'
load_file = '/home/project/airflow/dags/capstone/weblog.tar'

print("Inside Transform")
with open(extracted_file, 'r') as infile, \
    open(transformed_file, 'w') as outfile:
        extracted_lines =[]
        for line in infile:
            if '198.46.149.143' in line:
                extracted_lines.append(line.strip())
        outfile.write('\n'.join(extracted_lines))

Load Test

# ____  SET VARIABLES
input_file = '/home/project/airflow/dags/capstone/accesslog.txt'
extracted_file = '/home/project/airflow/dags/capstone/extracted_data.txt'
transformed_file = '/home/project/airflow/dags/capstone/transformed_data.txt'
load_file = '/home/project/airflow/dags/capstone/weblog.tar'


global transformed_file, load_file
print("Inside Load_data Function")
# Open the output tar file in write mode and write txt file to it
with tarfile.open(load_file, 'w') as tar:
    # Add the txt file to the tar file
    tar.add(transformed_file)
with tarfile.open('/home/project/weblog.tar', 'w') as tar:
    tar.add('/home/project/airflow/dags/capstone/transformed_data.txt')