~../project$mkdir /home/project/airflow/dags/capstone
~../project$ wget https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-DB0321EN-SkillsNetwork/ETL/accesslog.txt -P /home/project/airflow/dags/capstone
-courses-data.s3.us.cloud-object-storage.appdomain.cloud (cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud)|169.63.118.104|:443... connected.
Connecting to cf200 OK
HTTP request sent, awaiting response... 2370789 (2.3M) [text/plain]
Length: '/home/project/airflow/dags/capstone/accesslog.txt'
Saving to:
100%[=====================================>] 2.26M --.-KB/s in 0.01s
accesslog.txt
2024-11-22 13:45:55 (152 MB/s) - '/home/project/airflow/dags/capstone/accesslog.txt' saved [2370789/2370789]
ETL Pipeline with Apache Airflow DAG
In this section we’ll write a pipeline that analyzes the web server log file, extracts the required lines(ending with html) and fields(time stamp, size ) and transforms (bytes to mb) and load (append to an existing file.)
Objectives
In this section we will author an Apache Airflow DAG that will:
- Extract data from a web server log file
- Transform the data
- Load the transformed data into a tar file
Setup
- Start Apache Airflow.
- I’ll be using an instance of A Airflow on the cloud
Data
- Download the dataset from the source to
- The destination: /home/project/airflow/dags/capstone
- -P will create the directory capstone to save the file to
- Source : https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-DB0321EN-SkillsNetwork/ETL/accesslog.txt
- It appears that each line has 7 fields and we only need the first field which is the IP address
Create a DAG
Create Script File
- At cmd prompt we need to create a file that will hold our python script for the pipeline.
- Since we are using a cloud instance we can go to File -> New -> Save file as process_web_log.py
Imports
- Let’s first import the libraries we will need
# ___ IMPORT libraries
from datetime import timedelta
# The DAG object; we'll need this to instantiate a DAG
from airflow.models import DAG
# Operators; you need this to write tasks!
from airflow.operators.python import PythonOperator
# This makes scheduling easy
from airflow.utils.dates import days_ago import requests
# This is used to save txt file to tar file
import tarfile
Set Variables
- Let’s set some variables to make our code more readable
= '/home/project/airflow/dags/capstone/accesslog.txt'
input_file = '/home/project/airflow/dags/capstone/extracted_data.txt'
extracted_file = '/home/project/airflow/dags/capstone/transformed_data.txt'
transformed_file = '/home/project/airflow/dags/capstone/weblog.tar' load_file
Task1 - Define DAG Arguments
Create a DAG with these arguments.
- owner
- start_date
- You may define any suitable additional arguments.
# defining DAG arguments, days_ago(0) means runs today
= {
default_args 'owner': 'Me',
'start_date': days_ago(0),
'email': ['notyourbusiness@heehaa.com'],
'retries': 1,
'retry_delay': timedelta(minutes=2),
}
Task 2 - Define the DAG
- Create a DAG named process_web_log that runs daily.
# define the DAG, days=1 means it runs daily
= DAG(
dag = 'etl_dag',
dag_id =default_args,
default_args='This what Santa does',
description=timedelta(days=1),
schedule_interval )
Task 3 - Extract
- Create a task named extract_data.
- This task should extract the ipaddress field from the web server log file and save it into a file named extracted_data.txt
Create Function for Task 1
# Create Function to Extract IP address from the file which happens to be the first field out of 7
def extractip():
global input_file
print("Inside IP address Extract Function")
with open(input_file, 'r') as infile, \
open(extracted_file, 'w') as outfile:
for line in infile:
= line.split(' ')
fields if len(fields) >= 7:
= fields[0]
field_1 + "\n") outfile.write(field_1
# Using bash grep
= BashOperator (
extract = 'extract',
task_id = """ grep -oE '^[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}.[0-9]{1,3}' \\
bash_command /home/project/airflow/dags/accesslog.txt \\
> /home/project/airflow/dags/extracted-data.txt """,
=dag,
dag )
Define Task 1
# ___ TASK DEFINITION : EXTRACT to call the `extractip` function
= PythonOperator(
extract_data ='extract_data',
task_id=extractip,
python_callable=dag,
dag )
Task 4 - Transform
- Create a task named transform_data.
- This task should filter out all the occurrences of ipaddress “198.46.149.143” from extracted_data.txt and save the output to a file named transformed_data.txt.
Create Function for Task 2
# ___ CREATE FUNCTION2 FOR TASK 2 to filter out the occurrences of ip="198.46.149.143"
def transform():
global extracted_file, transformed_file
print("Inside Transform")
with open(extracted_file, 'r') as infile, \
open(transformed_file, 'w') as outfile:
=[]
extracted_lines for line in infile:
if '198.46.149.143' in line:
extracted_lines.append(line.strip())'\n'.join(extracted_lines)) outfile.write(
Define Task 2
# ___ TASK DEFINITION : Transform_data to call the 'transform' function
= PythonOperator(
transform_data ='transform_data',
task_id= transform,
python_callable=dag,
dag )
Task 5 - Create a task to Load
- Create a task named load_data.
- This task should archive the file transformed_data.txt into a tar file named weblog.tar.
Create Function for Task 3
# ___ CREATE FUNCTION to load data from transformed_data.txt to a tar file weblog.tar
def load():
global transformed_file, load_file
print("Inside Load_data Function")
# Open the output tar file in write mode and write txt file to it
with tarfile.open(load_file, 'w') as tar:
# Add the txt file to the tar file
tar.add(transformed_file)
Define Task 3
# ___ TASK DEFINITION: Load transformed txt file to tar file
= PythonOperator(
load_data = 'load_data'
task_id = load,
python_callable = dag,
dag )
Task 6 - Define the task pipeline
Define the task pipeline as per the details given below:
Task | Functionality |
---|---|
First task | extract_data |
Second task | transform_data |
Third task | load_data |
# ___ PIPELINE
>> transform_data >> load_data extract_data
Get DAG Operational
- Save the DAG you defined into a file named process_web_log.py.
Set Airflow Home
~$ export AIRFLOW_HOME=/home/project/airflow
~$ echo $AIRFLOW_HOME
/home/project/airflow
Task 7 - Submit the DAG
- Submit process_web_log.py file into the directory of Airflow Home/dags
- Now you see the python file inside the Airflow/dags directory
~$ cp process_web_log.py $AIRFLOW_HOME/dags
Verify DAG Submission
~$ airflow dags list | grep "process_web_log"
| /home/project/airflow/dags/process_web_log.py | Me | True etl_dag
List All Tasks
list etl_dag
$ airflow tasks
extract_data
load_data transform_data
Check for Errors
- Corrected error by giving permissions
list-import-errors
$ airflow dags
No data found
Permission Denied Reading File
I need to give permission to directory to allow DAG to read it
~$ sudo chown -R 100999 /home/project/airflow/dags/capstone
~$ sudo chmod -R g+rw /home/project/airflow/dags/capstone
Task 8 - Unpause the DAG
@theiadocker-emhrcf:/home/project$ airflow dags unpause etl_dag
theia2024-11-22T22:23:15.424+0000] {dagbag.py:545} INFO - Filling up the DagBag from /home/project/airflow/dags
[| is_paused
dag_id ========+==========
| True
etl_dag
@theiadocker-emhrcf:/home/project$ airflow dags unpause etl_dag
theia2024-11-22T22:23:39.648+0000] {dagbag.py:545} INFO - Filling up the DagBag from /home/project/airflow/dags
[ No paused DAGs were found
Task 9 - Monitor the DAG
Complete Script
# ___ IMPORT libraries
from datetime import timedelta
# The DAG object; we'll need this to instantiate a DAG
from airflow.models import DAG
# Operators; you need this to write tasks!
from airflow.operators.python import PythonOperator
# This makes scheduling easy
from airflow.utils.dates import days_ago
# This is used to save txt file to tar file
import tarfile
# ____ SET VARIABLES
= '/home/project/airflow/dags/capstone/accesslog.txt'
input_file = '/home/project/airflow/dags/capstone/extracted_data.txt'
extracted_file = '/home/project/airflow/dags/capstone/transformed_data.txt'
transformed_file = '/home/project/airflow/dags/capstone/weblog.tar'
load_file
# ___ ARGUMENTS
# defining DAG arguments, days_ago(0) means runs today
= {
default_args 'owner': 'Me',
'start_date': days_ago(0),
'email': ['notyourbusiness@heehaa.com'],
'retries': 1,
'retry_delay': timedelta(minutes=2),
}
# ___ DAG DEFINITION
# define the DAG, days=1 means it runs daily
= DAG(
dag = 'etl_dag',
dag_id =default_args,
default_args='This what Santa does',
description=timedelta(days=1),
schedule_interval
)
# ___ FUNCTION DEFINITIONS
# Create Function 1 for Task 1 to Extract IP address from the file which happens to be the first field out of 7
def extractip():
global input_file
print("Inside IP address Extract Function")
with open(input_file, 'r') as infile, \
open(extracted_file, 'w') as outfile:
for line in infile:
= line.split(' ')
fields if len(fields) >= 7:
= fields[0]
field_1 + "\n")
outfile.write(field_1
# ___ CREATE FUNCTION 2 FOR TASK 2 to filter out the occurrences of ip="198.46.149.143"
def transform():
global extracted_file, transformed_file
print("Inside Transform")
with open(extracted_file, 'r') as infile, \
open(transformed_file, 'w') as outfile:
=[]
extracted_lines for line in infile:
if '198.46.149.143' in line:
extracted_lines.append(line.strip())'\n'.join(extracted_lines))
outfile.write(
# ___ CREATE FUNCTION 3 to load data from transformed_data.txt to a tar file weblog.tar
def load():
global transformed_file, load_file
print("Inside Load_data Function")
# Open the output tar file in write mode and write txt file to it
with tarfile.open(load_file, 'w') as tar:
# Add the txt file to the tar file
tar.add(transformed_file)
# ___ TASK DEFINITIONS
# ___ TASK DEFINITION : EXTRACT to call the `extractip` function
= PythonOperator(
extract_data ='extract_data',
task_id=extractip,
python_callable=dag,
dag
)# ___ TASK DEFINITION : Transform_data to call the 'transform' function
= PythonOperator(
transform_data ='transform_data',
task_id= transform,
python_callable=dag,
dag
)
# ___ TASK DEFINITION: Load transformed txt file to tar file
= PythonOperator(
load_data = 'load_data',
task_id = load,
python_callable = dag,
dag
)
# ___ PIPELINE
>> transform_data >> load_data extract_data
Scripts to test DAGs
Extract Test
# ____ SET VARIABLES
= '/home/project/airflow/dags/capstone/accesslog.txt'
input_file = '/home/project/airflow/dags/capstone/extracted_data.txt'
extracted_file = '/home/project/airflow/dags/capstone/transformed_data.txt'
transformed_file = '/home/project/airflow/dags/capstone/weblog.tar'
load_file
print("Inside IP address Extract Function")
with open(input_file, 'r') as infile, \
open(extracted_file, 'w') as outfile:
for line in infile:
= line.split(' ')
fields if len(fields) >= 7:
= fields[0]
field_1 + "\n") outfile.write(field_1
Transform Test
input_file = '/home/project/airflow/dags/capstone/accesslog.txt'
extracted_file = '/home/project/airflow/dags/capstone/extracted_data.txt'
transformed_file = '/home/project/airflow/dags/capstone/transformed_data.txt'
load_file = '/home/project/airflow/dags/capstone/weblog.tar'
print("Inside Transform")
with open(extracted_file, 'r') as infile, \
open(transformed_file, 'w') as outfile:
extracted_lines =[]
for line in infile:
if '198.46.149.143' in line:
extracted_lines.append(line.strip())
outfile.write('\n'.join(extracted_lines))
Load Test
# ____ SET VARIABLES
= '/home/project/airflow/dags/capstone/accesslog.txt'
input_file = '/home/project/airflow/dags/capstone/extracted_data.txt'
extracted_file = '/home/project/airflow/dags/capstone/transformed_data.txt'
transformed_file = '/home/project/airflow/dags/capstone/weblog.tar'
load_file
global transformed_file, load_file
print("Inside Load_data Function")
# Open the output tar file in write mode and write txt file to it
with tarfile.open(load_file, 'w') as tar:
# Add the txt file to the tar file
tar.add(transformed_file)
with tarfile.open('/home/project/weblog.tar', 'w') as tar:
'/home/project/airflow/dags/capstone/transformed_data.txt') tar.add(