Skip to content

Airflow SMS Notification for Failed Jobs

Airflow only provides email notification for job failure. However, this is not enough for production job as not everyone have access to email on their phone. Atleast, that’s true with our team. And so, I had to figure out a way to send SMS notification for job failure so swift action can be taken.

Airflow SMS Notification

Send Logs to AWS S3 Bucket

modify following setting in airflow.cfg file to send Job logs to s3 bucket.
In my case ec2 has access to s3 bucket through aws role so I don’t need to provide connection Id. In your case you might need create connection id and then need to provide.

remote_log_conn_id =
remote_base_log_folder = s3://<your bucket name>/airflow-logs/
encrypt_s3_logs = False

Create AWS Lambda to Parse Logs.

AWS lambda will parse airflow job logs and parse it to check if any task has failed or not and based on that it will send message to airflow topic. Parsing the log file is not complicated, we just need to check for following line in log file which indicates if log file any task failed or not.

Make sure the s3 bucket where are storing airflow job logs sending put events to aws lambda.

import json
import boto3
import re
import os 
from urllib.parse import unquote_plus

# Publish a simple message to the specified SNS topic
def send_sns_notification(message):
    topicArn = os.environ.get("TopicArn")
    sns = boto3.client('sns')
    response = sns.publish(
        TopicArn=topicArn,    
        Message=message, 
        MessageAttributes={
            'AWS.SNS.SMS.SenderID': {
              'DataType': 'String',
              'StringValue': 'Airflow'   
            },
            'AWS.SNS.SMS.SMSType': {
              'DataType': 'String',
              'StringValue': 'Transactional'   
            }
        }   
    )
    
    print(response)

def lambda_handler(event, context):
    s3 = boto3.client('s3')
    error_regex=re.compile('(\[\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}\]) ({.*\.py:\d+}) (INFO) - Marking task as FAILED.\s*dag_id=(.*), task_id=(.*), execution_date=(.*), start_date=(.*), end_date=(.*)')
    for record in event['Records']:
        bucket = record['s3']['bucket']['name']
        key = unquote_plus(record['s3']['object']['key'])
        filename="{}/{}".format(bucket, key)
        print("processing {} file".format(filename))
        # Publish a simple message to the specified SNS topic
        data = s3.get_object(Bucket=bucket, Key=key)
        logfile_content = data['Body'].read().decode('utf-8')
        errors = error_regex.findall(logfile_content)
        if(len(errors) > 0):
            dag_name = key.split("/")[-4]
            task_name = key.split("/")[-3]
            message="job {} with task {} failed".format(dag_name, task_name)
            print(message)
            send_sns_notification(message)
            print("notification sent")
        else:
            print("file {} does not have any error".format(filename))
Published inAirflowUncategorized

Be First to Comment

Leave a Reply

Your email address will not be published. Required fields are marked *