Recently I was working on NiFi and realize that our Dev Instance is running too slow, reason being Developers forgot to cleanup their work.
And So I used the NiFi API and write the code in python to identify some of the processor groups which we can delete. I will walk through the code here.
How to get API token?
To access NiFi endpoint behind the security you would need API token. And then you need to send this API token in header of all your request.
Following code will take the username and password and based on
def get_token(username, password, host): url = "https://%s:8080/nifi-api/access/token" % host header = {"Content-Type": "application/x-www-form-urlencoded;charset=UTF-8"} data = {"username": username, "password": password} resp = requests.post(url, data=data, headers=header, verify=False) if resp.status_code not in (200, 201): print resp.reason print resp.text exit(1) return resp.text
How to Find Stopped Processor Group?
Following code will find all the processor group which doesn’t have any processor running.
def find_stopped_processor(group_id, host): url = "https://%s:8080/nifi-api/process-groups/%s" % (host, group_id) header = {"Authorization": "Bearer %s" % utils.ACCESS_TOKEN} r = requests.get(url, headers=header, verify=False) resp = r.json() running_count = resp.get("runningCount") if int(running_count) == 0: print "%s,%s" % (group_id, resp.get("status").get("name")) def find_processor_group_stopped_processor(parent_processor, host): for processor_group in parent_processor: p = processor_group.get("processGroupStatusSnapshot") if len(p.get("processGroupStatusSnapshots")) > 0: find_processor_group_stopped_processor(p.get("processGroupStatusSnapshots")) else: find_stopped_processor(p.get("id"), host)
Be First to Comment