Skip to content

Big Data Processing Posts

Airflow Operational Dashboard using Bigquery and Looker Studio

In our daily operations, we rely on Airflow (Cloud Composer) to run hundreds of dags. While we have integrated it with ServiceNow and SMTP for airflow notifications, we found that these measures were insufficient in providing us with valuable insights. We needed a way to track the number of failed dags over a specific period, identify which dags were failing more frequently, and gain a comprehensive understanding of our workflow performance.

To address these challenges, we decided to create a Looker Studio dashboard by leveraging the power of BigQuery and redirecting Airflow (Cloud Composer) logs through a log sink. By storing our logs in BigQuery, we gained access to a wealth of data that allowed us to generate informative charts and visualizations. In this blog post, I will guide you through the step-by-step process of setting up this invaluable solution.

Create a Log Sink

To begin the process, navigate to the log router and create a sink using the following query. When configuring the sink, select the desired target as a BigQuery dataset where you want all the logs to be redirected. Additionally, it is recommended to choose a partitioned table, as this will optimize query performance, facilitate the cleanup of older logs, and reduce costs. By partitioning the table, BigQuery will scan less data when querying specific time ranges, resulting in faster results and more efficient resource usage.

"Marking task as"
resource.type="cloud_composer_environment"
log_name: "airflow-worker"
labels.workflow!="airflow_monitoring"
log sink to redirect airflow logs to bigquery for looker studio dashboard

Upon successful creation of the log sink you would be able to see airflow_worker table created in the dataset you have specified during the log sink configuration.

Write a query to get insight

Following query retrives the data from airflow_worker table and

  • Adjusting timestamps to “America/Toronto” timezone: Airflow (Cloud Composer) logs are stored in UTC timestamps by default. However, our scheduler is set to the “America/Toronto” timezone. To ensure consistency, I’m converting the timestamps to the “America/Toronto” timezone. Keep in mind that you may need to modify this part based on your own timezone settings.
  • Retrieving status information: The status of each Airflow DAG is captured in the textPayload column. I’m using regular expressions to extract one of three possible statuses: “Success,” “Fail,” or “Skip.” This allows us to easily identify the execution outcome of each DAG run.
    Since status information is only available at task level, I am considering dag as failed if any task within the dag has failed. if you choose to show information at task level you might need to modify this query.
SELECT
  *
FROM (
  SELECT
    DATE(timestamp, "America/Toronto") AS execution_date, -- this is UTC date 
    datetime(timestamp, "America/Toronto") as execution_timestamp, 
    labels.workflow,
    DATE(CAST(labels.execution_date AS timestamp), "America/Toronto") AS schedule_date, -- UTC Date 
    datetime(cast(labels.execution_date AS timestamp), "America/Toronto") AS schedule_timestamp, --- UTC timestamp 
    labels.try_number,
    CASE
      WHEN CONTAINS_SUBSTR(textPayload, "Success") THEN "success"
      WHEN CONTAINS_SUBSTR(textPayload, "SKIPPED") THEN "skip"
    ELSE
    "fail"
  END
    AS status,
    ROW_NUMBER() OVER(PARTITION BY labels.workflow, CAST(labels.execution_date AS timestamp)
    ORDER BY
      CASE
        WHEN CONTAINS_SUBSTR(textPayload, "Success") THEN 1
        WHEN CONTAINS_SUBSTR(textPayload, "SKIPPED") THEN 2
      ELSE
      0
    END
      ) AS rnk
  FROM
    ss-org-logging-project.airflow_dags_status.airflow_worker )
WHERE
  rnk = 1  

Dashboard.

Unfortunately due to my organization policy I can’t share my looker studio dashboard outside my organization so you will have to create your own dashboard. I am uploading screenshot of my dashboard for your reference.

airflow daily status
showing airflow daily status.
airflow prod weekly status
Airflow Prod Weekly Filed DAG List
Airflow Prod DAG Failure List

Leave a Comment

Send Resource Changes Notification in GCP to Microsoft Teams

Currently I am helping an organization to adopt Google Cloud Platform. And though we have decided to use terraform for infrastructure as code, some people are creating/modifying resources directly through console. And so what we wanted to in some auditability and notification around resource creation or modification.

With GCP all the audit logs are available in google cloud logging. However, cloud logging is costly and not a good idea to store log for a longer period of time. The other reason is query format for logging is bit different and not everyone is familiar with that.

Audit

For Audit Purpose I am configuring the GCP Log Router to redirect the logs to bigquery.

Query

Query is only looking for any modification done by any user rather than service account.

protoPayload.@type="type.googleapis.com/google.cloud.audit.AuditLog"
protoPayload.authenticationInfo.principalSubject:"user:"
proto_payload.method_name: ("delete" OR "create" OR "update")

Make sure to choose Use partitioned tables else it will create a table for day.

Once you configure the sink, you will be able to see all the logs for any resource modification.

Notification

GCP does not have any direct integration available with Microsoft teams ( may be coz it’s Microsoft product :)). And so I needed to write my own cloud function which can send message to teams’ channel.

For this as well we need to create sink with similar query. However, rather than sending logs to bigquery we are sending logs to PubSub topic.

Once Sink is created we need to create a cloud function to trigger when event appears on this topic.

Code

import base64
import json
import os
import requests
from pprint import pprint


def func_handler(event, context):
    """Triggered from a message on a Cloud Pub/Sub topic.
    Args:
         event (dict): Event payload.
         context (google.cloud.functions.Context): Metadata for the event.
    """
    message = json.loads(base64.b64decode(event['data']).decode('utf-8'))
    pprint(message)
    print(type(message))
    resource_type = message.get("resource").get("type")
    project_id = message.get("resource").get("labels").get("project_id")
    region = message.get("resource").get("labels").get("region")
    method = message.get("protoPayload").get("methodName").lower()
    user = message.get("protoPayload").get("authenticationInfo").get("principalEmail")
    resource_name = message.get("protoPayload").get("resourceName").split("/")[-1]

    if "create" in method:
        action = "created"
    elif "update" in method:
        action = "updated"
    elif "delete" in method:
        action = "deleted"
    else:
        action= method

    title = "{} got {}".format(resource_type, action)
    content="""
    Resource Name: {} 
    Project ID: {} 
    Region: {} 
    Action: {}
    User: {}
    """.format(resource_name, project_id, region, action, user)
    webhook = os.environ.get("webhook")
    send_teams(webhook, content, title)


def send_teams(webhook_url:str, content:str, title:str, color:str="FF0000") -> int:
    """
      - Send a teams notification to the desired webhook_url
      - Returns the status code of the HTTP request
        - webhook_url : the url you got from the teams webhook configuration
        - content : your formatted notification content
        - title : the message that'll be displayed as title, and on phone notifications
        - color (optional) : hexadecimal code of the notification's top line color, default corresponds to black
    """
    response = requests.post(
        url=webhook_url,
        headers={"Content-Type": "application/json"},
        json={
            "themeColor": color,
            "summary": title,
            "sections": [{
                "activityTitle": title,
                "activitySubtitle": content
            }],
        },
    )
    print(response.status_code) # Should be 200
Leave a Comment

GCP – Bring CDC data using Debezium

Few months back our organization decided to go with GCP for data platform. And so we started evaluating multiple tools to bring data from different RDMBS sources. Our goal was to find a tool which helps us identify CDC from multiple sources we have (MySQL, oracle, sql server, db2 on mainframe) and bring it to either cloud storage or bigquery.

And by the time I am writing this blog, GCP doesn’t have any tool which satisfies our requirement. It has data stream but it only supports oracle and MySQL. And while searching outside GCP I stumbled upon Debezium. Debezium is a open source tool which help identify CDC from multiple RDBMS sources and puts the data on Kafka or pubsub topics in real-time. Much better, we were looking for some batch solution and we found streaming.

In this blogpost, I will explain in details how to deploy Debezium on GCP Kubernatees cluster and connect that to gcp cloudsql and gcp pubsub topics.

Deploying Debezium on GCP Kubernatees

Create Service Accounts

To deploy debezium on kubernatees we first need to create an I Am service account. create a service account with following roles.

  • Cloud SQL Client
  • Pub/Sub Publisher

You will also need to create a service account for pods. use the following command line from to create service account.

kubectl apply -f - <<EOF
apiVersion: v1
kind: ServiceAccount
metadata:
name: debezium-sa
EOF

Create MySQL User

considering you already have a cloud sql (mysql) instance running. let’s create a user with proper access to read transaction logs.

CREATE USER 'replication_user'@'%' IDENTIFIED BY 'secret';
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'replication_user'

Create GCP Kubernetes cluster for Debezium

gcloud beta container clusters create "debezium-poc" --scopes=sql-admin,pubsub --region "us-east1" --service-account=sa-debzium-k8@dataframework.iam.gserviceaccount.com

now, once cluster is created we need to deploy pods and configuration for the pods.

Deploying config-map

mysql_config_map.yaml

apiVersion: v1
kind: ConfigMap
metadata:
  name: debezium-mysql
  labels:
    app: debezium-mysql
data:
  application.properties: |-
      debezium.sink.type=pubsub
      debezium.source.connector.class=io.debezium.connector.mysql.MySqlConnector
      debezium.source.offset.storage.file.filename=data/offsets.dat
      debezium.source.offset.flush.interval.ms=0
      debezium.source.database.hostname=localhost
      debezium.source.database.port=3306
      debezium.source.database.user=replication_user
      debezium.source.database.password=secret
      debezium.source.database.server.id=184054
      debezium.source.database.server.name=dpmysql
      debezium.source.database.history = io.debezium.relational.history.FileDatabaseHistory
      debezium.source.database.history.file.filename = history_file.txt

in above configuration. Please change user and password as per the user you have created. server.name could be anything which makes sense for you for the source. server.id needs to be a unique number so you can provide any random number.

To deploy the config map run the following command.

kubectl apply -f mysql_config_map.yaml

Deploying StatefulSet

StatefulSet is consist of two containers.

  • debezium server – While writing this blog 1.7.0.Final is the latest version available so I am using it. however, you can use whatever version is latest.
  • cloud-sql-proxy – This is required to connect cloud sql instance from kubernetes.

mysql_statefulset.yaml

apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: debezium-mysql
  labels:
    app: debezium-mysql
spec:
  replicas: 1
  serviceName: debezium-mysql
  selector:
    matchLabels:
      app: debezium-mysql
  template:
    metadata:
      labels:
        app: debezium-mysql
        version: v1
    spec:
      serviceAccountName: debezium-sa
      securityContext:
        fsGroup: 185 # Debezium container uses jboss user thats id is 185.
      containers:
        - name: debezium-server
          image: debezium/server:1.7.0.Final
          volumeMounts:
            - name: debezium-config-volume
              mountPath: /debezium/conf
            - name: debezium-data-volume
              mountPath: /debezium/data
        - name: cloud-sql-proxy
          image: gcr.io/cloudsql-docker/gce-proxy:1.27.1
          command: 
            - /cloud_sql_proxy
            - -instances=dataframework:us-east1:dpmysql-public=tcp:3306
          securityContext:
            runAsNonRoot: true
      volumes:
        - name: debezium-config-volume
          configMap:
            name: debezium-mysql
  volumeClaimTemplates:
    - metadata:
        name: debezium-data-volume
      spec:
        accessModes: [ "ReadWriteOnce" ]
        resources:
          requests:
            storage: 10Mi

To deploy the Statefulset run following command

 kubectl apply -f mysql_statefulset.yaml

Deploying Service

the last thing we need to deploy is a service for our pods.

mysql_cdc_service.yaml

apiVersion: v1
kind: Service
metadata:
  name: debezium-mysql
  labels:
    app: debezium-mysql
spec:
  type: ClusterIP
  ports:
    - port: 8080
      targetPort: 8080
      protocol: TCP
      name: http
  clusterIP: None
  selector:
    app: debezium-mysql

To deploy service run the following command

 kubectl apply -f mysql_cdc_service.yaml

Create PubSub Topic

so now, we have deployed the Debezium on Kubernetes, all we need is a pubsub topic created to capture all the changes.

The topic name should be in format: <server_name>. <database_name>.<table_name>

  • server_name – this should be from your config map debezium.source.database.server.name property
  • database_name – mysql database name
  • table_name – mysql table name.

1 Comment

GCP – Execute Jar on Databricks from Airflow

We have a framework written using spark scala api for file ingestion. We are using cloud composer also knows as airflow for our job orchestration. And so we wanted to perform following task with airflow (composer)

  1. First it will create a cluster with the provided configuration
  2. inserts the jar while creating a cluster
  3. creates a job and executes the job with given parameter

Good thing is airflow has a operator to execute jar file. However, the example available on airflow website is very specific to AWS envionment and so it took some time for me to figure out how to create dag for GCP databricks.

Let’s understand how to do this.

Setup Databricks Connection

To setup connection you need two things. databricks API token and databricks workspace URL.

Generate API Token

To generate databricks API token, login to your workspace and then go to settings –> user settings. And then click on generate new token. Please save this token as you won’t be able to retrieve this again.

Following tasks you need to execute on Airflow (cloud composer). You will need admin access for this.

Install Databricks Library

  • from your google cloud console, navigate to your cloud composer instance and click on it.
  • Click on PYPI PACKAGES and then click on EDIT.
  • Add the apache-airflow-providers-databricks package
  • it will take some time to make this changes, so wait for some time and visit this page again to see if package has installed properly. (internet connectivity from cloud could create an issue for this installation)

Create Connection

Login to airflow and navigate to admin –> connection. We are just modifying default connectoin, so if you were able to install the databricks package sucessfully. You should be able to see databricks_default connection. Click on edit. You just need to fill following fields.

  • Host: you’re host should be databricks workspace URL, it should look something like this.
    https://xxx.gcp.databricks.com/?o=xxx
  • Password: In password field you need to paste the API token we created in first step.

Now let’s write DAG to create cluster and execute the jar file.

DAG to Execute JAR on Databricks

import os
from datetime import datetime

from airflow import DAG
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
from airflow.models import Variable

with DAG(
    dag_id='ingest_csv_file_to_bigqury',
    schedule_interval='@daily',
    start_date=datetime(2021, 1, 1),
    tags=['Gaurang'],
    catchup=False,
) as dag:
    new_cluster = {
        "spark_version": "7.3.x-scala2.12",
        "node_type_id": "n1-standard-4",
        "autoscale": {
            "min_workers": 2,
            "max_workers": 8
        },
        "gcp_attributes": {
            "use_preemptible_executors": False,
            "google_service_account": "sa-databricks@testproject.iam.gserviceaccount.com"

        },
    }

    spark_jar_task = DatabricksSubmitRunOperator(
        task_id='run_ingestion_jar',
        new_cluster=new_cluster,
        spark_jar_task={'main_class_name': 'com.test.Main', "parameters":["xxx", "yyy]},
        libraries=[{'jar': 'gs://deploymet/test-1.0.jar'}],
    )

    spark_jar_task

DatabricksSubmitRunOperator takes three parmeters.

  • new_cluster: You need to provide a JSON for creating new cluster.
  • spark_jar_task: you need to provide you main class and parameters your jar is expecting
  • libraries: location of your jar file.
Leave a Comment

GCP Cloud Functions Logging With Python

Cloud Function does a good job of redirecting all your print statements to logging. This is really good for basic work. However, for production ready code we need proper logging level with our application log. There are two way to achieve this. one way is to use cloud logging module provided by google and other is to write a custom formatter for python logging. I am going to share both the ways here.

Using Cloud Logging Module

For this you need to have google-cloud-logging python library in your requirement.txt

from google.cloud import logging as cloudlogging

def hello_gcs(event, context):
    logger = get_logger(__name__)
    logger.info("info")
    logger.warn("warn")
    logger.error("error")
    logger.debug("debug")
	
	
def get_logger(name):
  lg_client = cloudlogging.Client()
  
  lg_handler = lg_client.get_default_handler()
  cloud_logger = logging.getLogger("cloudLogger")
  cloud_logger.setLevel(logging.DEBUG)
  cloud_logger.addHandler(lg_handler)
  return cloud_logger

This is how it would look like, if you’ll use above code for cloud logging.

Using Custom Formatter

def hello_gcs(event, context):
    logger = get_custom_logger(__name__)
    logger.debug("debug")
    logger.info("info")
    logger.warn("warn")
    logger.error("error")
    

class CloudLoggingFormatter(logging.Formatter):

    """Produces messages compatible with google cloud logging"""
    def format(self, record: logging.LogRecord) -> str:
        s = super().format(record)
        return json.dumps(
            {
                "message": s,
                "severity": record.levelname
            }
        )

def get_custom_logger(name):
  cloud_logger = logging.getLogger(name)
  handler = logging.StreamHandler(sys.stdout)
  formatter = CloudLoggingFormatter(fmt="[%(asctime)s] {%(filename)s:%(lineno)d} - %(message)s")
  handler.setFormatter(formatter)
  cloud_logger.addHandler(handler)
  cloud_logger.setLevel(logging.DEBUG)
  return cloud_logger

And this is how it looks with custom formatter.

And this is the comparison for both,

Leave a Comment

Google Cloud Scheduler to Auto Shutdown Cloud SQL

Recently I started working on GCP, and for one of my POC I wanted cloud SQL. As this was POC I did not want it to run 24×7 and was trying to find a way to auto shutdown at a particular time to save cost. I don’t think there is any in-built option for that. However, GCP cloud scheduler has an option to call http request at a scheduled interval. This is good, as cloud SQL allows to have some management operation done through rest calls.

New I AM Service Account

For Scheduler to successfully run shutdown operation, it needs to be associated with associated with a service account with proper roles. So let’s create a new service account.

  • Navigate to I am and then click on service account. You’re screen might look like this.
  • now let’s put some name and description. I normally put service at the start of the name, but it’s not necessary.
  • After this, we need to associate proper role to this account. For this we are going to choose Clod SQL Admin
  • For this purpose we don’t need to associate any user to this service account.

New Scheduler

  • Navigate to scheduler and click on create job . And fill the name, description, frequency and Timezone.
  • For this choose Target Type as HTTP. Your url should be as mentioned below.
    URL: https://sqladmin.googleapis.com/v1/projects/project-id/instances/instance-id
    Method: Patch
    Auth Header: Add OAuth Token
    Service Account: service-mange-cloud-sql (one we created in above step, if your name is different please choose that)
    Body:
{
  "settings": {
    "activationPolicy": "NEVER"
  }
}
  • In the next step you can configure retry, I am keeping it default for simplicity.
  • one successfully created you should be able to see your job on scheduler home page. Here you can test your job as well.
Leave a Comment

Monkey patching to decorate your library function

We are using Airflow as our scheduler/orchestrator for all AWS data related jobs. We use SSHOperator to run our jobs. Our support team complained that sometimes job fails while making a connection, and re-running them just works fine. This was creating a lot of noise. And as there is no re-try mechanism in operator, we can’t configure it to auto-retry.

I realized that SSHOperator class calls a function from SSHHook class which actually makes the ssh connection to given details.

Problem

There are lots of Dags (classes) which refers to function. How to add retry mechanism without modifying each and every class

Solution

Monkey patching is first thing which came to my mind. However, when I applied the monkey patching it created a recursion, as I had to call function again for retry. It took a while to fix this issue. However, finally I was able to fix it by creating another reference of original function is same class I am patching.

plugins/hooks/ssh_hook_with_retry.py

# -*- coding: utf-8 -*-

# Author: Gaurang Shah
#
from airflow.contrib.hooks.ssh_hook import SSHHook
import time 

max_retry = 3
retry = 0


def get_conn_with_retry(self):
    try:
		# call the original function 
        return SSHHook.original_func(self)
    except Exception as e:
        global retry
        if retry < max_retry:
            retry += 1
            print("tried %s times, failed to connect retrying again" %retry)
			time.sleep(60)
			# call the patched function which inturn calls get_conn_with_retry function 
            self.get_conn()
        else:
            raise e

plugins/hooks/__init__.py

from airflow.contrib.hooks import ssh_hook

# add new reference to original function to avoid recursion 
ssh_hook.SSHHook.original_func = ssh_hook.SSHHook.get_conn
from hooks.ssh_hook_with_retry import get_conn_with_retry

# path get_conn function with our function 
ssh_hook.SSHHook.get_conn = get_conn_with_retry

# when someone calls SSHHook.get_conn then ssh_hook_with_retry.get_conn_with_retry will be called
# and to call orignal get_conn we will have to call  SSHHook.original_func 
Comments closed

Airflow SMS Notification for Failed Jobs

Airflow only provides email notification for job failure. However, this is not enough for production job as not everyone have access to email on their phone. Atleast, that’s true with our team. And so, I had to figure out a way to send SMS notification for job failure so swift action can be taken.

Airflow SMS Notification

Send Logs to AWS S3 Bucket

modify following setting in airflow.cfg file to send Job logs to s3 bucket.
In my case ec2 has access to s3 bucket through aws role so I don’t need to provide connection Id. In your case you might need create connection id and then need to provide.

remote_log_conn_id =
remote_base_log_folder = s3://<your bucket name>/airflow-logs/
encrypt_s3_logs = False

Create AWS Lambda to Parse Logs.

AWS lambda will parse airflow job logs and parse it to check if any task has failed or not and based on that it will send message to airflow topic. Parsing the log file is not complicated, we just need to check for following line in log file which indicates if log file any task failed or not.

Make sure the s3 bucket where are storing airflow job logs sending put events to aws lambda.

import json
import boto3
import re
import os 
from urllib.parse import unquote_plus

# Publish a simple message to the specified SNS topic
def send_sns_notification(message):
    topicArn = os.environ.get("TopicArn")
    sns = boto3.client('sns')
    response = sns.publish(
        TopicArn=topicArn,    
        Message=message, 
        MessageAttributes={
            'AWS.SNS.SMS.SenderID': {
              'DataType': 'String',
              'StringValue': 'Airflow'   
            },
            'AWS.SNS.SMS.SMSType': {
              'DataType': 'String',
              'StringValue': 'Transactional'   
            }
        }   
    )
    
    print(response)

def lambda_handler(event, context):
    s3 = boto3.client('s3')
    error_regex=re.compile('(\[\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2},\d{3}\]) ({.*\.py:\d+}) (INFO) - Marking task as FAILED.\s*dag_id=(.*), task_id=(.*), execution_date=(.*), start_date=(.*), end_date=(.*)')
    for record in event['Records']:
        bucket = record['s3']['bucket']['name']
        key = unquote_plus(record['s3']['object']['key'])
        filename="{}/{}".format(bucket, key)
        print("processing {} file".format(filename))
        # Publish a simple message to the specified SNS topic
        data = s3.get_object(Bucket=bucket, Key=key)
        logfile_content = data['Body'].read().decode('utf-8')
        errors = error_regex.findall(logfile_content)
        if(len(errors) > 0):
            dag_name = key.split("/")[-4]
            task_name = key.split("/")[-3]
            message="job {} with task {} failed".format(dag_name, task_name)
            print(message)
            send_sns_notification(message)
            print("notification sent")
        else:
            print("file {} does not have any error".format(filename))
Leave a Comment

Airflow: Custom Failure Email Notification Template

Airflow allows you to set custom email notification template in case if you think the default template is not enough. However, this is only for the failure notification and not for retry notification (atleast in 1.10 version, things might change in version 2).

The template is divided into two parts, one for email subject and another for email body.

Default Failure Notification

Default Airflow Failure Notification Email

Custom Failure Email Notification

Custom Airflow Failure Notification Email

Airflow Failure Notification Subject Template

Airflow Job Failed: {{ ti.dag_id }}

Airflow Failure Notification Content Template

<!DOCTYPE html>
<html>
<body>
 <div style=" position: relative;  min-height: 100vh;">
   <div style="padding-bottom: 2.5rem;">
      <b>Task ID:</b> {{ ti.task_id }} <br>
      <b>Task execution date:</b> {{ ti.execution_date }} <br>
      <b>Log:</b> <a href="{{ti.log_url}}">Link</a><br>
      <b>Try:</b> {{try_number}} out of {{max_tries + 1}}<br>
      <b>Exception:</b><p style="background-color: #eee; border: 1px solid #999;display: block;padding: 20px;">{{exception_html}}</p>
      <b>Host:</b> {{ti.hostname}}<br>
      <b>Log file:</b> {{ti.log_filepath}}<br>
      <b>Mark success:</b> <a href="{{ti.mark_success_url}}">Link</a><br>
   </div>
  </br></br></br></br></br></br></br></br></br></br></br></br></br></br></br></br>
   <p style="background-color: #eee; border: 1px solid #999;display: block;padding: 15px;"><small>Powerdby airflow created by gaurang</small></p>
 </div>
</body>
</html>

Changing Config

Finally you need to change the airflow.cfg file to use the new template we just created. Put the above two files so any location and update the email section of the config to add html_content_template and subject_template properties.

[email]
email_backend = airflow.utils.email.send_email_smtp
html_content_template=/root/airflow/email/templates/content_template.j2
subject_template=/root/airflow/email/templates/subject_template.j2
Leave a Comment

AWS S3: How to recover deleted files?

Few days back while fixing some production issue my team deleted a big database. it had more than 100 tables and around 100 GB of data. And so re-processing and loading tables before business people query it was next to impossible task. Good thing, we had versioning enabled on bucket.

If you have a versioning enable on your S3 buckets, Every time you make changes to your file it creates a new version. similar to git, and if you delete a file, rather than deleting a file physically it just marks file as deleted. And so, if you want to recover a file, all you need to do is delete the delete marker from file.

Now let’s see how to do it step-by-step.

let’s take the following bucket, prefix for example

[gshah@aws-dev restore]$ aws s3 ls s3://aws-dev01-sample-bucket/SIT/USER/gshah/                                                                              
2020-07-20 13:50:12          0 1.txt                                                                                                                                       
2020-07-20 13:50:18          0 2.txt                                                                                                                                       
2020-07-20 13:50:25          0 3.txt                                                                                                                                       
2020-07-20 13:42:28          0 abc.txt                                                                                                                                     
2020-07-20 13:43:01          0 xyz.txt

Now let’s delete delete file files.

[gshah@aws-dev restore]$ aws s3 rm s3://aws-dev01-sample-bucket/SIT/USER/gshah/abc.txt                                                                       
delete: s3://aws-dev01-sample-bucket/SIT/USER/gshah/abc.txt                                                                                                           
[gshah@aws-dev restore]$ aws s3 rm s3://aws-dev01-sample-bucket/SIT/USER/gshah/xyz.txt                                                                       
delete: s3://aws-dev01-sample-bucket/SIT/USER/gshah/xyz.txt                                                                                                           
[gshah@aws-dev restore]$ aws s3 ls s3://aws-dev01-sample-bucket/SIT/USER/gshah/                                                                              
2020-07-20 13:50:12          0 1.txt                                                                                                                                       
2020-07-20 13:50:18          0 2.txt                                                                                                                                       
2020-07-20 13:50:25          0 3.txt

Now let’s see those deleted files

[gshah@aws-dev restore]$ aws s3api list-object-versions \                                                                                                         
> --bucket aws-dev01-sample-bucket \                                                                                                                                    
> --prefix SIT/USER/gshah/  \                                                                                                                                            
> --output json \                                                                                                                                                          
> --query 'DeleteMarkers[?IsLatest==true]'                                                                                                                               
[                                                                                                                                                                          
    {                                                                                                                                                                      
        "Owner": {                                                                                                                                                         
            "ID": "514384a9e158b47a163ad2a0b3e7d767dfbc46b167f6899de54b0eb7d4413cd9"                                                                                       
        },                                                                                                                                                                 
        "IsLatest": true,                                                                                                                                                  
        "VersionId": "oGhCi9bGRS_xYNstvGEgjGv24Dv94VzW",                                                                                                                   
        "Key": "SIT/USER/gshah/4.txt",                                                                                                                                   
        "LastModified": "2020-07-20T13:51:33.000Z"                                                                                                                         
    },                                                                                                                                                                     
    {                                                                                                                                                                      
        "Owner": {                                                                                                                                                         
            "ID": "514384a9e158b47a163ad2a0b3e7d767dfbc46b167f6899de54b0eb7d4413cd9"                                                                                       
        },                                                                                                                                                                 
        "IsLatest": true,                                                                                                                                                  
        "VersionId": "_Air1UwjRGOqln65oSp5xiCO06ZPwocP",                                                                                                                   
        "Key": "SIT/USER/gshah/abc.txt",                                                                                                                                 
        "LastModified": "2020-07-20T13:59:52.000Z"                                                                                                                         
    },                                                                                                                                                                     
    {                                                                                                                                                                      
        "Owner": {                                                                                                                                                         
            "ID": "514384a9e158b47a163ad2a0b3e7d767dfbc46b167f6899de54b0eb7d4413cd9"                                                                                       
        },                                                                                                                                                                 
        "IsLatest": true,                                                                                                                                                  
        "VersionId": "e1LYLX92jBtLgbXp92.nZdf0yFKZ8m3I",                                                                                                                   
        "Key": "SIT/USER/gshah/xyz.txt",                                                                                                                                 
        "LastModified": "2020-07-20T13:59:58.000Z"                                                                                                                         
    }                                                                                                                                                                      
]

This will show all the deleted files, not just recently deleted files. For recently deleted files put time inside query argument. i.e. ‘DeleteMarkers[?IsLatest==true && LastModified >= 2020-07-20T13:59:52.000Z]’. You can see that it shows 4.txt as well, which I deleted sometime in past.

Now let’s recover those files.
for this I need to delete the delete market for this files.

[gshah@aws-dev restore]$ aws s3api delete-object --bucket aws-dev01-sample-bucket --key SIT/USER/gshah/xyz.txt                                                                                                                                                                                                                                                                                                                                                                                    
{                                                                                                                                                                          
    "VersionId": "xpLtEqtX7PBYy3NmstxbvwlVqV6thjwt",                                                                                                                       
    "DeleteMarker": true                                                                                                                                                   
}                                                                                                                                                                          
[gshah@aws-dev restore]$ aws s3api delete-object --bucket aws-dev01-sample-bucket --key SIT/USER/gshah/abc.txt                                                                                                                                                                                                                                                         
{                                                                                                                                                                          
    "VersionId": "gKv_9EaQF2UPtZFQDrQkKFobd.OhORXV",                                                                                                                       
    "DeleteMarker": true                                                                                                                                                   
}

Let’s see if we got those files back

[gshah@aws-dev restore]$ aws s3 ls s3://aws-dev01-sample-bucket/SIT/USER/gshah/                                                                              
2020-07-20 13:50:12          0 1.txt
2020-07-20 13:50:18          0 2.txt
2020-07-20 13:50:25          0 3.txt
2020-07-20 13:42:28          0 abc.txt
2020-07-20 13:43:01          0 xyz.txt

Leave a Comment