Skip to content

Tag: linage

GCP – Create Custom Bigquery Linage using DataCatalog Python API

In our GCP (google cloud platform) data warehousing workflow, we rely on GCP BigQuery for storing and analyzing data. However, the data ingestion process involves a different service that does not automatically show lineage in BigQuery. To address this limitation, I developed a Python utility that enables the creation of custom lineage for ingestion jobs using Dataplex Custom Linage Python Client.

Custom lineage creation involves three key tasks, each serving an essential purpose:

  1. Create a Lineage Process: This step allows us to define a name for the lineage process. Leveraging GCP Cloud Composer, I often use the DAG name as the process name, facilitating seamless linking of the ingestion tables to their respective processes.
  2. Create the Run: For every execution of the above process, we should create a new run. I assign the task ID as the run name, ensuring a unique identifier for each run.
  3. Create a Lineage Event: In the final task, I specify the source and target mapping along with associated details, effectively establishing the lineage relationship between the datasets.

Image depicting the GCP BigQuery Custom Lineage Process.
Exploring the data lineage process using GCP BigQuery and dataplex custom lineage python client.
Bigquery Linage Runs
Create Bigquery Custom Linage Runs using Dataplex Custom Linage Python Client
Bigquery Custom Run Details

Please find the entire code snippet on github

https://gist.github.com/Gaurang033/01ab9d4cedfb1049dd23dd30cd88cdad

Install Dependencies

google-cloud-datacatalog-lineage==0.2.3

Create Custom Linage Process

For process you can also add custom attributes, I have given an example of owner, framework and service.

def create_linage_process(project_id, process_display_name):
    parent = f"projects/{project_id}/locations/northamerica-northeast1"
    process = Process()
    process.display_name = process_display_name
    process.attributes = {
        "owner": "gaurangnshah@gmail.com",
        "framework": "file_ingestion_framework",
        "service": "databricks"
    }

    response = client.create_process(parent=parent, process=process)
    return response.name

Create Custom Linage Run

following code will help you create the custom run for the linage process we created

def create_run(process_id, start_time, end_time, state, run_display_name):
    run = lineage_v1.Run()
    run.start_time = start_time
    run.end_time = end_time
    run.state = state
    run.display_name = run_display_name
    run.attributes = {
        "owner": "gaurang",
        "purpose": "Testing Linage"
    }

    request = lineage_v1.CreateRunRequest(parent=process_id, run=run)
    response = client.create_run(request=request)
    logger.info(f"New run Created {response.name}")
    return response.name

Create Custom Linage Event

once you have linage run created you need to attach an even to that, event is nothing but source to target mapping. for both source and target you need to use fully qualified name with proper protocols. please visit following page to see all the supported protocols for source and target FQDN

https://cloud.google.com//data-catalog/docs/fully-qualified-names

def create_lineage_event(run_id, source_fqdn, target_fqdn, start_time, end_time):
    source = lineage_v1.EntityReference()
    target = lineage_v1.EntityReference()
    source.fully_qualified_name = source_fqdn
    target.fully_qualified_name = target_fqdn
    links = [EventLink(source=source, target=target)]
    lineage_event = LineageEvent(links=links, start_time=start_time, end_time=end_time)

    request = lineage_v1.CreateLineageEventRequest(parent=run_id, lineage_event=lineage_event)
    response = client.create_lineage_event(request=request)
    print("Lineage event created: %s", response.name)

Update Custom Linage Process

For us, it’s a same process which ingest new file into table, rather than creating new process every time, I am just updating the existing process to add new run and linage event.

def create_custom_linage_for_ingestion(project_id, process_display_name, source, target, start_time, end_time, state,
                                       run_display_name):
    process_id = create_linage_process(project_id, process_display_name=process_display_name)
    run_id = create_run(process_id=process_id, start_time=start_time, end_time=end_time, state=state,
                        run_display_name=run_display_name)
    create_lineage_event(run_id=run_id, start_time=start_time, end_time=end_time, source_fqdn=source,
                         target_fqdn=target)


def _get_process_id(project_id, process_display_name):
    parent = f"projects/{project_id}/locations/northamerica-northeast1"
    processes = client.list_processes(parent=parent)
    for process in processes:
        if process.display_name == process_display_name:
            return process.name
    return None


def _convert_to_proto_timestamp(timestamp):
    return timestamp.strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + "Z"

How To Run?

if __name__ == '__main__':
    project_id = "<your_project_id>"
    process_display_name = "INGESTION_DAG_NAME"  ## DAG NAME
    source = "path:gs://<your_bucket_name>/test_schema/test_20230604.csv"
    target = "bigquery:<project_id>.gaurang.test_custom_linage"

    start_time = datetime.now() - timedelta(hours=3)
    process_start_time = _convert_to_proto_timestamp(start_time)  # Start time dag
    process_end_time = _convert_to_proto_timestamp(datetime.now())  # End Time

    state = "COMPLETED"
    run_display_name = "TASK_RUN_ID"
    create_or_update_custom_linage_for_ingestion(project_id, process_display_name, source, target, process_start_time,
                                                 process_end_time, state, run_display_name)
Leave a Comment