We are using Airflow as our scheduler/orchestrator for all AWS data related jobs. We use SSHOperator to run our jobs. Our support team complained that sometimes job fails while making a connection, and re-running them just works fine. This was creating a lot of noise. And as there is no re-try mechanism in operator, we can’t configure it to auto-retry.
I realized that SSHOperator class calls a function from SSHHook class which actually makes the ssh connection to given details.
Problem
There are lots of Dags (classes) which refers to function. How to add retry mechanism without modifying each and every class
Solution
Monkey patching is first thing which came to my mind. However, when I applied the monkey patching it created a recursion, as I had to call function again for retry. It took a while to fix this issue. However, finally I was able to fix it by creating another reference of original function is same class I am patching.
plugins/hooks/ssh_hook_with_retry.py
# -*- coding: utf-8 -*- # Author: Gaurang Shah # from airflow.contrib.hooks.ssh_hook import SSHHook import time max_retry = 3 retry = 0 def get_conn_with_retry(self): try: # call the original function return SSHHook.original_func(self) except Exception as e: global retry if retry < max_retry: retry += 1 print("tried %s times, failed to connect retrying again" %retry) time.sleep(60) # call the patched function which inturn calls get_conn_with_retry function self.get_conn() else: raise e
plugins/hooks/__init__.py
from airflow.contrib.hooks import ssh_hook # add new reference to original function to avoid recursion ssh_hook.SSHHook.original_func = ssh_hook.SSHHook.get_conn from hooks.ssh_hook_with_retry import get_conn_with_retry # path get_conn function with our function ssh_hook.SSHHook.get_conn = get_conn_with_retry # when someone calls SSHHook.get_conn then ssh_hook_with_retry.get_conn_with_retry will be called # and to call orignal get_conn we will have to call SSHHook.original_funcComments closed