Skip to content

Category: Uncategorized

Airflow SMS Notification for Failed Jobs

Airflow only provides email notification for job failure. However, this is not enough for production job as not everyone have access to email on their phone. Atleast, that’s true with our team. And so, I had to figure out a way to send SMS notification for job failure so swift action can be taken.

Airflow SMS Notification

Send Logs to AWS S3 Bucket

modify following setting in airflow.cfg file to send Job logs to s3 bucket.
In my case ec2 has access to s3 bucket through aws role so I don’t need to provide connection Id. In your case you might need create connection id and then need to provide.

remote_log_conn_id =
remote_base_log_folder = s3://<your bucket name>/airflow-logs/
encrypt_s3_logs = False

Create AWS Lambda to Parse Logs.

AWS lambda will parse airflow job logs and parse it to check if any task has failed or not and based on that it will send message to airflow topic. Parsing the log file is not complicated, we just need to check for following line in log file which indicates if log file any task failed or not.

Make sure the s3 bucket where are storing airflow job logs sending put events to aws lambda.

import json
import boto3
import re
import os 
from urllib.parse import unquote_plus

# Publish a simple message to the specified SNS topic
def send_sns_notification(message):
    topicArn = os.environ.get("TopicArn")
    sns = boto3.client('sns')
    response = sns.publish(
        TopicArn=topicArn,    
        Message=message, 
        MessageAttributes={
            'AWS.SNS.SMS.SenderID': {
              'DataType': 'String',
              'StringValue': 'Airflow'   
            },
            'AWS.SNS.SMS.SMSType': {
              'DataType': 'String',
              'StringValue': 'Transactional'   
            }
        }   
    )
    
    print(response)

def lambda_handler(event, context):
    s3 = boto3.client('s3')
    error_regex=re.compile('(\[\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}\]) ({.*\.py:\d+}) (INFO) - Marking task as FAILED.\s*dag_id=(.*), task_id=(.*), execution_date=(.*), start_date=(.*), end_date=(.*)')
    for record in event['Records']:
        bucket = record['s3']['bucket']['name']
        key = unquote_plus(record['s3']['object']['key'])
        filename="{}/{}".format(bucket, key)
        print("processing {} file".format(filename))
        # Publish a simple message to the specified SNS topic
        data = s3.get_object(Bucket=bucket, Key=key)
        logfile_content = data['Body'].read().decode('utf-8')
        errors = error_regex.findall(logfile_content)
        if(len(errors) > 0):
            dag_name = key.split("/")[-4]
            task_name = key.split("/")[-3]
            message="job {} with task {} failed".format(dag_name, task_name)
            print(message)
            send_sns_notification(message)
            print("notification sent")
        else:
            print("file {} does not have any error".format(filename))
Leave a Comment

computing total storage size of a folder in azure data lake storage

A few days back we needed to calculate how much data have we ingested into our data lake by each project. And that’s when I realized there is no direct way to get the size of any directory in Azure Datalake Storage. Storage explore allows you to get the statistics of the folder which shows the size, however, imagine that doing for 100 folders. And so, I thought to write a script.

Following PowerShell script will give you the size of all the folders under the given path.

$path="/infomart"
$account="azueus2dev"
$ChildPaths=(Get-AzureRmDataLakeStoreChildItem -Account "azueus2devadlsdatalake" -path $path).Name
foreach($ChildPath in $ChildPaths){
	$length=(Get-AzureRmDataLakeStoreChildItemSummary -Account $account -path "$path/$ChildPath" -Concurrency 128).Length
	"$path/$ChildPath, $length" | Out-File $path.txt -Append
}

You will need to install AzureRM.DataLakeStore module to run above script.

install-module -name AzureRM.DataLakeStore

Leave a Comment