Airflow only provides email notification for job failure. However, this is not enough for production job as not everyone have access to email on their phone. Atleast, that’s true with our team. And so, I had to figure out a way to send SMS notification for job failure so swift action can be taken.
Send Logs to AWS S3 Bucket
modify following setting in airflow.cfg file to send Job logs to s3 bucket.
In my case ec2 has access to s3 bucket through aws role so I don’t need to provide connection Id. In your case you might need create connection id and then need to provide.
remote_log_conn_id = remote_base_log_folder = s3://<your bucket name>/airflow-logs/ encrypt_s3_logs = False
Create AWS Lambda to Parse Logs.
AWS lambda will parse airflow job logs and parse it to check if any task has failed or not and based on that it will send message to airflow topic. Parsing the log file is not complicated, we just need to check for following line in log file which indicates if log file any task failed or not.
Make sure the s3 bucket where are storing airflow job logs sending put events to aws lambda.
import json import boto3 import re import os from urllib.parse import unquote_plus # Publish a simple message to the specified SNS topic def send_sns_notification(message): topicArn = os.environ.get("TopicArn") sns = boto3.client('sns') response = sns.publish( TopicArn=topicArn, Message=message, MessageAttributes={ 'AWS.SNS.SMS.SenderID': { 'DataType': 'String', 'StringValue': 'Airflow' }, 'AWS.SNS.SMS.SMSType': { 'DataType': 'String', 'StringValue': 'Transactional' } } ) print(response) def lambda_handler(event, context): s3 = boto3.client('s3') error_regex=re.compile('(\[\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}\]) ({.*\.py:\d+}) (INFO) - Marking task as FAILED.\s*dag_id=(.*), task_id=(.*), execution_date=(.*), start_date=(.*), end_date=(.*)') for record in event['Records']: bucket = record['s3']['bucket']['name'] key = unquote_plus(record['s3']['object']['key']) filename="{}/{}".format(bucket, key) print("processing {} file".format(filename)) # Publish a simple message to the specified SNS topic data = s3.get_object(Bucket=bucket, Key=key) logfile_content = data['Body'].read().decode('utf-8') errors = error_regex.findall(logfile_content) if(len(errors) > 0): dag_name = key.split("/")[-4] task_name = key.split("/")[-3] message="job {} with task {} failed".format(dag_name, task_name) print(message) send_sns_notification(message) print("notification sent") else: print("file {} does not have any error".format(filename))Leave a Comment