Airflow providers multiple secrets backends to be configured for storing airflow connection and variables. for a long time we were using airflow backends however recently I migrated all the connection to vault and started using vault as our backend. In this post I will show you a step by step guide on how to do this.
Configure HashiCorp Vault
create mount point
we have multiple composer(airflow) environment and so strategy we have used is created a single mount point named airflow and then use the different path for different airflow instances. you could choose to have different strategy as per your organization standard and requirement. Run the following command to create the secrets mount point.
Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
vault secrets enable -path=airflow -version=2 kv
vault secrets enable -path=airflow -version=2 kv
vault secrets enable -path=airflow -version=2 kv
Create role
vault provides multiple ways to authenticate, what we are going to use is role. so let’s create a role. please copy the secret id from the output and store it somewhere. this would be useful when we will vault connection in airflow.
role need to be associated with a policy. policy is nothing but a grant (access). Run the following code to create a policy which would give read and list permission to airflow path, we created earlier.
connection_path: path where you would like to store your airflow connection. for me it’s my composer name and then connection. if you have single airflow you could just store everything under connection. variables_path: i have specified null as I am storing variables in airflow. if you want to store variables also in vault, just provide the path. config_path: same as variables, I am keeping config in airflow url: replace with you vault url auth_type: we are using approle to authenticate with vault as discussed above role_id: the role we created above. if you have used different name, please replace here. secret_id: secret_id we generated for the role
How to store connections
for connection create a path with connection name and put the json with proper key and value for connection. for example, default bigquery connection. it would look like this.
mount point: airflow Path: dev-composer/connections/bigquery_default
When planning to use Google Cloud Composer (Airflow in GCP), there are few essential considerations to address before setup. While these can be configured post-setup, it would be a tedious and time-consuming task.
TimeZone for scheduler
he default time for the scheduler is UTC. This means if you schedule a DAG to run at 5 PM, it will run at 5 PM UTC, not your local time. Calculating this for each DAG deployment is impractical. It’s advisable to change the default scheduling time. To change this:
Navigate to your airflow instance
Go to airflow configuration overrides.
click on edit and choose the time zone you want for core.default_timezone
Where to store airflow connection and variables
By default, Airflow (Google Cloud Composer) stores connections and variables within Airflow itself. However, it supports multiple external backends, including GCP, AWS, and HashiCorp Vault. Airflow does not version these connections or variables nor provides granular access control, making it prudent to store them externally. Organizational standards often require storing all secrets and certificates in a single system.
In our setup, we chose to store connections in HashiCorp Vault due to their sensitive nature, while non-sensitive variables remained in Airflow.
One key point to note: Airflow adds new backends as extra backends. If it cannot find a variable or connection in the external backend (e.g., Vault), it will search within Airflow itself.
Default Role assignment for All Airflow Users
Airflow has built-in RBAC with five main roles: public, viewer, user, op, admin. The default role assigned to all users in GCP is ‘op’.
If this role doesn’t fit your organizational needs, create a custom role and change the default role assignment.
In our scenario, the ‘op’ role includes permissions to create and maintain connections and variables. Since we maintain all connections in HashiCorp Vault, we didn’t want duplicates created within Airflow. Therefore, we created a custom role without these permissions and set it as the default role for all users. To change the default role, override webserver.rbac_user_registration_role to the custom role.
By addressing these configurations early on, you can streamline your use of Google Cloud Composer and Airflow in GCP, ensuring efficient and secure operations.
In our previous blog posts, we explored leveraging Google Recommender for cost optimization. Now, let’s dive into identifying security issues within your Google Cloud Platform (GCP) environment using Google Recommender. If you missed the previous post on redirecting recommendations to BigQuery, I highly recommend giving it a read, as it lays out the groundwork for our current discussion.
Adhering to the principles of a zero trust policy, it’s crucial to ensure that individuals or service accounts only possess the permissions they truly require. Google Recommender plays a pivotal role in this aspect. By examining policy insights, if it’s flagged that a principal holds unnecessary permissions within their role, the IAM Recommender steps in to evaluate whether these permissions can be revoked or if there’s a more suitable role available. If revocation is possible, the IAM Recommender generates a recommendation to revoke the role. Alternatively, if there’s a better-suited role, it suggests replacing the existing one with the suggested role. This replacement could entail a new custom role, an existing custom role, or predefined roles.
If you’ve already redirected all recommendations to BigQuery, you can run the following query to gain insights into any surplus permissions held by individuals or service accounts. Furthermore, it will provide recommendations regarding roles that may need to be removed or replaced with more stringent alternatives.
SQL to find GCP IAM recommendation
Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
SELECT
cloud_entity_type,
cloud_entity_id,
recommendation_details,
recommender_subtype,
JSON_VALUE(recommendation_details, "$.overview.member") AS user,
JSON_VALUE(recommendation_details, "$.overview.removedRole") AS existing_role,
JSON_QUERY_ARRAY(recommendation_details, "$.overview.addedRoles") AS new_role,
priority,
JSON_VALUE(recommendation_details, "$.overview.minimumObservationPeriodInDays") AS minimumObservationPeriodInDays
SELECT
cloud_entity_type,
cloud_entity_id,
recommendation_details,
recommender_subtype,
JSON_VALUE(recommendation_details, "$.overview.member") AS user,
JSON_VALUE(recommendation_details, "$.overview.removedRole") AS existing_role,
JSON_QUERY_ARRAY(recommendation_details, "$.overview.addedRoles") AS new_role,
priority,
JSON_VALUE(recommendation_details, "$.overview.minimumObservationPeriodInDays") AS minimumObservationPeriodInDays
FROM
your_project.recommendations.recommendations_export`
WHERE
recommender = "google.iam.policy.Recommender"
AND state = "ACTIVE"
AND TIMESTAMP_TRUNC(_PARTITIONTIME, DAY) = (
SELECT
TIMESTAMP_TRUNC(MAX(_PARTITIONTIME), DAY)
FROM
your_project.recommendations.recommendations_export
)
SELECT
cloud_entity_type,
cloud_entity_id,
recommendation_details,
recommender_subtype,
JSON_VALUE(recommendation_details, "$.overview.member") AS user,
JSON_VALUE(recommendation_details, "$.overview.removedRole") AS existing_role,
JSON_QUERY_ARRAY(recommendation_details, "$.overview.addedRoles") AS new_role,
priority,
JSON_VALUE(recommendation_details, "$.overview.minimumObservationPeriodInDays") AS minimumObservationPeriodInDays
FROM
your_project.recommendations.recommendations_export`
WHERE
recommender = "google.iam.policy.Recommender"
AND state = "ACTIVE"
AND TIMESTAMP_TRUNC(_PARTITIONTIME, DAY) = (
SELECT
TIMESTAMP_TRUNC(MAX(_PARTITIONTIME), DAY)
FROM
your_project.recommendations.recommendations_export
)
minimumObservationPeriodInDays: Additionally, it’s worth noting that the IAM Recommender only begins generating role recommendations once it has gathered a certain amount of permission usage data. By default, the minimum observation period is set to 90 days. However, for project-level role recommendations, you have the flexibility to manually adjust it to 30 or 60 days. If you wish to modify this setting, you can do so by visiting the following link: Configure Role Recommendations.
cloud_entity_type: shows if issue is at org level, folder level or project level
cloud_entity_id: shows you the id of the org, project or folder. you can use this id in you GCP console to search for particular entity.
recommender_subtype: will show you weather to remove role or replace role with another role, or if someone service account is using default role.
user: Principle (user or service account) for which recommandation has generated
existing_role: show you the existing role
new_role: role you should replace existing role with, in case when recommand_subtype is remove_role this would be empty.
priority: priority of a particular recommandation.
In our ongoing exploration of cloud cost optimization, we’re constantly seeking ways to maximize efficiency and minimize expenditure. In our previous blog post, we discussed the importance of centralizing recommendations within BigQuery to streamline cost analysis. If you haven’t had the chance to read that article, I highly recommend doing so, as it lays the groundwork for the strategy we’re about to delve into.
Now, let’s dive into a powerful BigQuery query designed to uncover Compute Engine resources prime for scale down, further enhancing your cost optimization efforts.
Unveiling the Query
Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
SELECT
r,
SPLIT(r, "/")[4] AS project_name,
ARRAY_REVERSE(SPLIT(r, "/"))[0] AS resource_name,
recommender_subtype as action,
description,
primary_impact.cost_projection.cost_in_local_currency. units AS cost_savings_per_month
SELECT
r,
SPLIT(r, "/")[4] AS project_name,
ARRAY_REVERSE(SPLIT(r, "/"))[0] AS resource_name,
recommender_subtype as action,
description,
primary_impact.cost_projection.cost_in_local_currency. units AS cost_savings_per_month
FROM
<your_project>.recommendations.recommendations_export,
UNNEST(target_resources) r
WHERE
recommender_subtype = "CHANGE_MACHINE_TYPE"
AND TIMESTAMP_TRUNC(_PARTITIONTIME, DAY) = (
SELECT
TIMESTAMP_TRUNC(MAX(_PARTITIONTIME), DAY)
FROM
<your_project>.recommendations.recommendations_export)
SELECT
r,
SPLIT(r, "/")[4] AS project_name,
ARRAY_REVERSE(SPLIT(r, "/"))[0] AS resource_name,
recommender_subtype as action,
description,
primary_impact.cost_projection.cost_in_local_currency. units AS cost_savings_per_month
FROM
<your_project>.recommendations.recommendations_export,
UNNEST(target_resources) r
WHERE
recommender_subtype = "CHANGE_MACHINE_TYPE"
AND TIMESTAMP_TRUNC(_PARTITIONTIME, DAY) = (
SELECT
TIMESTAMP_TRUNC(MAX(_PARTITIONTIME), DAY)
FROM
<your_project>.recommendations.recommendations_export)
Breaking Down the Query
SELECT: The query selects essential fields like the resource and project names, recommendation action, description, and projected cost savings per month.
FROM: It sources data from recommendations.recommendations_export, extracting target resources using UNNEST.
WHERE: Filters recommendations to focus solely on those advocating for changing machine types. Additionally, it ensures we’re working with the latest data partition.
What It Means for You
By running this query, you gain insights into Compute Engine resources where adjusting machine types could lead to substantial cost savings. Each recommendation is accompanied by a description and projected monthly savings, empowering you to make informed decisions about scaling down resources without sacrificing performance.
Conclusion
Cost optimization in the cloud isn’t just about cutting corners; it’s about strategic resource allocation. With the provided BigQuery query, identifying Compute Engine resources ripe for scale down becomes a streamlined process. Embrace data-driven decision-making to optimize your cloud costs effectively.
Stay tuned for more insights and tips on maximizing the value of your cloud investments!
Remember, when it comes to cloud cost optimization, every adjustment counts. Start uncovering opportunities for savings today with our BigQuery-powered approach. Your bottom line will thank you.
Would you like to dive deeper into any specific aspect or have further queries? Feel free to reach out!
In recent days, I was working on costs optimization on the Google Cloud Platform (GCP) . Google offers recommendations for cost savings and security at the project level. However, managing these recommendations across numerous projects can be arduous, particularly in scenarios like mine where we oversee approximately 100 projects, with limited access to many. Fortunately, redirecting all recommendations to BigQuery and leveraging SQL’s analytical capabilities proved to be a game-changer. Additionally, configuring Looker Studio facilitated streamlined visualization.
In this blog post, I’ll illustrate the process of redirecting GCP recommendations to Google BigQuery and uncovering cost-saving recommendations specifically for idle resources.
Redirecting GCP Recommendations to BigQuery
you will need a service account created at org level with following roles
roles/bigquery.dataEditor
roles/recommender.exporter
Choose a project to which you want to send GCP recommandation, in our case we have created a separate project for billing and recommendation. Seperate project makes it easier for access control.
Now navigate to google bigquery and open data transfers from left side menu and click on CREATE TANSFERS.
Choose the Recommander V1 from the option as shown in screenshot below and fill our other information
Once the data transfer is executed successfully you will be able to see following two tables in the dataset
insights_export
recommendations_export
Analyse Data For GCP for Cost Saving
Following query will give you list of all the idle resources which could be either deleted or shutdown to save cost. Query will show you project name, resource name, action to be taken, description and how much you will cost you will be saving in your local currency.
Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
SELECT
r,
SPLIT(r, "/")[4] AS project_name,
ARRAY_REVERSE(SPLIT(r, "/"))[0] AS resource_name,
recommender_subtype as action,
description,
primary_impact.cost_projection.cost_in_local_currency. units AS cost_savings_per_month
FROM
you_project.dataset.recommendations_export,
UNNEST(target_resources) AS r
WHERE
TIMESTAMP_TRUNC(_PARTITIONTIME, DAY) = (select TIMESTAMP_TRUNC(max(_PARTITIONTIME), DAY) from your_project.your_dataset.recommendations_export)
AND primary_impact.category = "COST"
AND state = "ACTIVE"
AND recommender LIKE "%IdleResourceRecommender"
SELECT
r,
SPLIT(r, "/")[4] AS project_name,
ARRAY_REVERSE(SPLIT(r, "/"))[0] AS resource_name,
recommender_subtype as action,
description,
primary_impact.cost_projection.cost_in_local_currency. units AS cost_savings_per_month
FROM
you_project.dataset.recommendations_export,
UNNEST(target_resources) AS r
WHERE
TIMESTAMP_TRUNC(_PARTITIONTIME, DAY) = (select TIMESTAMP_TRUNC(max(_PARTITIONTIME), DAY) from your_project.your_dataset.recommendations_export)
AND primary_impact.category = "COST"
AND state = "ACTIVE"
AND recommender LIKE "%IdleResourceRecommender"
SELECT
r,
SPLIT(r, "/")[4] AS project_name,
ARRAY_REVERSE(SPLIT(r, "/"))[0] AS resource_name,
recommender_subtype as action,
description,
primary_impact.cost_projection.cost_in_local_currency. units AS cost_savings_per_month
FROM
you_project.dataset.recommendations_export,
UNNEST(target_resources) AS r
WHERE
TIMESTAMP_TRUNC(_PARTITIONTIME, DAY) = (select TIMESTAMP_TRUNC(max(_PARTITIONTIME), DAY) from your_project.your_dataset.recommendations_export)
AND primary_impact.category = "COST"
AND state = "ACTIVE"
AND recommender LIKE "%IdleResourceRecommender"
Visualizing Data
In order to track if we are implementing this suggestion or not I further created a dashboard using looker studio.
Query for looker studio dashboard.
Following query will give you cost optimization recommendation of last 30 days.
Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
SELECT
r,
SPLIT(r, "/")[4] AS project_name,
ARRAY_REVERSE(SPLIT(r, "/"))[0] AS resource_name,
recommender_subtype as action,
description,
primary_impact.cost_projection.cost_in_local_currency. units AS cost_savings_per_month,
SELECT
r,
SPLIT(r, "/")[4] AS project_name,
ARRAY_REVERSE(SPLIT(r, "/"))[0] AS resource_name,
recommender_subtype as action,
description,
primary_impact.cost_projection.cost_in_local_currency. units AS cost_savings_per_month,
state,
date(last_refresh_time) as date
FROM
you_project.dataset.recommendations_export,
UNNEST(target_resources) AS r
WHERE
TIMESTAMP_TRUNC(_PARTITIONTIME, DAY) > TIMESTAMP(current_date() - 30)
AND primary_impact.category = "COST"
AND state = "ACTIVE"
AND recommender LIKE "%IdleResourceRecommender"
SELECT
r,
SPLIT(r, "/")[4] AS project_name,
ARRAY_REVERSE(SPLIT(r, "/"))[0] AS resource_name,
recommender_subtype as action,
description,
primary_impact.cost_projection.cost_in_local_currency. units AS cost_savings_per_month,
state,
date(last_refresh_time) as date
FROM
you_project.dataset.recommendations_export,
UNNEST(target_resources) AS r
WHERE
TIMESTAMP_TRUNC(_PARTITIONTIME, DAY) > TIMESTAMP(current_date() - 30)
AND primary_impact.category = "COST"
AND state = "ACTIVE"
AND recommender LIKE "%IdleResourceRecommender"
Dashboard
This dashboard provides valuable insights indicating that our efforts towards cost optimization are bearing fruit. The noticeable decrease in overall recommendations signifies successful implementation of our strategies, affirming that we are indeed on the right track.
A while back, I found myself deeply immersed in a Hadoop migration project where our cloud platform of choice was Google Cloud Platform (GCP). Our mission? To seamlessly transition data from on-premises infrastructure to the cloud. Due to various constraints, utilizing hardware wasn’t a viable option. Thus, I embarked on a quest to explore multiple software solutions to tackle this challenge.
For one-off migrations, Spark emerged as a favorable choice. It facilitated direct data migration to BigQuery, bypassing the intermediary step of storing it in cloud storage. However, there was a caveat: Spark lacked the ability to detect changes, necessitating a full refresh each time. This approach proved less than ideal, especially when dealing with substantial datasets.
My gaze then turned to Cloudera BDR, but alas, it didn’t support integration with Google Cloud. Left with no alternative, I delved into Distcp. In this blog post, I’ll guide you through the setup process for Distcp, enabling seamless data transfer from an on-prem HDFS cluster to Google Cloud Storage.
Service Account Setup
To begin, create a GCP service account with read/write permissions for the designated Google Cloud Storage bucket. Obtain the JSON key associated with this service account. This key will need to be distributed across all nodes involved in the migration process. For instance, I’ve opted to store it at /tmp/sa-datamigonpremtobigquery.json. Also make sure, the user with which you are going to run distcp command have access to this path.
HDFS.conf
Please store following file on edge node in your home directory. please replace the value of fs.gs.project.id with your project id.
<configuration>
<property>
<name>fs.AbstractFileSystem.gs.impl</name>
<value>com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS</value>
<description>The AbstractFileSystem for 'gs:' URIs.</description>
</property>
<property>
<name>fs.gs.project.id</name>
<value>raw-bucket</value>
<description>
Optional. Google Cloud Project ID with access to GCS buckets.
Required only for list buckets and create bucket operations.
</description>
</property>
<property>
<name>google.cloud.auth.type</name>
<value>SERVICE_ACCOUNT_JSON_KEYFILE</value>
<description>
Authentication type to use for GCS access.
</description>
</property>
<property>
<name>google.cloud.auth.service.account.json.keyfile</name>
<value>/tmp/sa-datamigonpremtobigquery.json</value>
<description>
The JSON keyfile of the service account used for GCS
access when google.cloud.auth.type is SERVICE_ACCOUNT_JSON_KEYFILE.
</description>
</property>
<property>
<name>fs.gs.checksum.type</name>
<value>CRC32C</value>
<description>
https://cloud.google.com/architecture/hadoop/validating-data-transfers
</description>
</property>
<property>
<name>dfs.checksum.combine.mode</name>
<value>COMPOSITE_CRC</value>
<description>
https://cloud.google.com/architecture/hadoop/validating-data-transfers
</description>
</property>
</configuration>
<configuration>
<property>
<name>fs.AbstractFileSystem.gs.impl</name>
<value>com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS</value>
<description>The AbstractFileSystem for 'gs:' URIs.</description>
</property>
<property>
<name>fs.gs.project.id</name>
<value>raw-bucket</value>
<description>
Optional. Google Cloud Project ID with access to GCS buckets.
Required only for list buckets and create bucket operations.
</description>
</property>
<property>
<name>google.cloud.auth.type</name>
<value>SERVICE_ACCOUNT_JSON_KEYFILE</value>
<description>
Authentication type to use for GCS access.
</description>
</property>
<property>
<name>google.cloud.auth.service.account.json.keyfile</name>
<value>/tmp/sa-datamigonpremtobigquery.json</value>
<description>
The JSON keyfile of the service account used for GCS
access when google.cloud.auth.type is SERVICE_ACCOUNT_JSON_KEYFILE.
</description>
</property>
<property>
<name>fs.gs.checksum.type</name>
<value>CRC32C</value>
<description>
https://cloud.google.com/architecture/hadoop/validating-data-transfers
</description>
</property>
<property>
<name>dfs.checksum.combine.mode</name>
<value>COMPOSITE_CRC</value>
<description>
https://cloud.google.com/architecture/hadoop/validating-data-transfers
</description>
</property>
</configuration>
In today’s data-driven landscape, organizations often have multiple teams comprising domain experts who create their own unique products. However, enabling other teams to leverage these products efficiently requires a structured approach. Enter data mesh—a paradigm that decentralizes data ownership and processing. In this guide, we’ll explore how to implement data mesh using Google Cloud Platform (GCP) BigQuery, empowering teams to manage their data products seamlessly.
Setting up Domain-Specific GCP Projects
Begin by assigning a dedicated GCP project to each domain or business team. This ensures that teams have the autonomy to develop and manage their data products within their respective environments. By segregating projects based on domains, teams can focus on their specific requirements without interfering with others’ workflows.
Development and Promotion Workflow
Within their assigned GCP projects, domain teams develop their data products tailored to their expertise. These products undergo rigorous testing and refinement in the development environment. However, it’s crucial to avoid publishing directly from the development environment to prevent potential disruptions for subscribers. Frequent changes in the development phase can lead to compatibility issues and operational challenges for downstream users.
Promotion to Higher Environments
Once a data product is deemed ready for consumption, it’s promoted to higher environments, typically housed in different GCP projects. This transition ensures that only validated and stable versions of products are made available to subscribers. By segregating development and production environments, organizations can maintain data integrity and stability while minimizing disruptions to subscriber workflows.
Publishing Data Products
When promoting a data product to a higher environment, a team lead assumes the responsibility of publishing it. This involves orchestrating a seamless transition and ensuring that subscribers can access the updated version without interruptions.
Make sure Analytics Hub API is enabled for this.
Follow these steps to publish your product:
Navigate to GCP BigQuery and access Analytics Hub. Click on “Create Exchange.”
Depending on the nature of your product, provide the necessary details and proceed by clicking “Create Exchange.”
At this stage, you’ll have the option to configure permissions for administration, publishing, subscription, and viewing of the listing. You can either set these permissions now or configure them later.
Once the exchange is created, proceed to create a listing for it. A listing involves selecting the dataset you wish to publish. Currently, GCP BigQuery only allows choosing datasets only. And depends on what location you choose for your exchange you should be able to select datasets.
Provide the required Listing Details and Listing Contact Information corresponding to your data product. Once completed, you’ll be able to publish the dataset through the listing.
Subscribing to the Data Products (listings)
Once the Data Product is published other users can search and subscribe to this listing. User would need access to create a new Dataset in the project they want to subscribe this new data products.
In BigQuery, tables can be partitioned based on particular column. Partitioning involves dividing a large table into smaller, more manageable pieces or partitions. Each partition contains a subset of the data and is stored separately. The key idea behind partitioning is to allow BigQuery to scan and process only the partitions that are relevant to a query, rather than scanning the entire table. This can lead to dramatic improvements in query performance and cost savings.
Why Are Partition Columns Important?
1. Query Performance
Partitioning a table based on a meaningful column, such as a timestamp or date, can significantly speed up query execution. When you run a query that includes a filter condition on the partitioning column, BigQuery can prune irrelevant partitions, scanning only the data that meets the criteria. This reduces query execution time, especially when dealing with large datasets.
For example, if you have a daily time-series dataset and partition it by date, querying data for a specific date range becomes much faster because BigQuery only needs to scan the partitions corresponding to that range.
2. Cost Efficiency
Improved query performance isn’t the only benefit of partition columns. It also translates into cost savings. BigQuery charges you based on the amount of data processed during a query. By scanning fewer partitions, you reduce the amount of data processed, leading to lower query costs. This cost reduction can be substantial for organizations with large datasets and frequent queries.
Query to Find partitioned column for all GCP bigquery tables
And so, I was looking for a way to find partitioned column for each table in our project. So we can improve our query performance and cost.
Following query will help show you partitioned column for each Bigquery table in your GCP Project
Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
SELECT
table_catalog AS project_id,
table_schema AS dataset,
table_name,
partition_column
FROM (
SELECT
table_catalog,
table_schema,
table_name,
CASE
WHEN is_partitioning_column = "YES" THEN column_name
ELSE
NULL
END
AS partition_column,
ROW_NUMBER() OVER(PARTITIONBY table_catalog, table_schema, table_name ORDER BY is_partitioning_column DESC) AS rnk
SELECT
table_catalog AS project_id,
table_schema AS dataset,
table_name,
partition_column
FROM (
SELECT
table_catalog,
table_schema,
table_name,
CASE
WHEN is_partitioning_column = "YES" THEN column_name
ELSE
NULL
END
AS partition_column,
ROW_NUMBER() OVER(PARTITION BY table_catalog, table_schema, table_name ORDER BY is_partitioning_column DESC) AS rnk
FROM
<your_project>.<your_region>.INFORMATION_SCHEMA.COLUMNS
)a
WHERE
a.rnk = 1
SELECT
table_catalog AS project_id,
table_schema AS dataset,
table_name,
partition_column
FROM (
SELECT
table_catalog,
table_schema,
table_name,
CASE
WHEN is_partitioning_column = "YES" THEN column_name
ELSE
NULL
END
AS partition_column,
ROW_NUMBER() OVER(PARTITION BY table_catalog, table_schema, table_name ORDER BY is_partitioning_column DESC) AS rnk
FROM
<your_project>.<your_region>.INFORMATION_SCHEMA.COLUMNS
)a
WHERE
a.rnk = 1
In our GCP (google cloud platform) data warehousing workflow, we rely on GCP BigQuery for storing and analyzing data. However, the data ingestion process involves a different service that does not automatically show lineage in BigQuery. To address this limitation, I developed a Python utility that enables the creation of custom lineage for ingestion jobs using Dataplex Custom Linage Python Client.
Custom lineage creation involves three key tasks, each serving an essential purpose:
Create a Lineage Process: This step allows us to define a name for the lineage process. Leveraging GCP Cloud Composer, I often use the DAG name as the process name, facilitating seamless linking of the ingestion tables to their respective processes.
Create the Run: For every execution of the above process, we should create a new run. I assign the task ID as the run name, ensuring a unique identifier for each run.
Create a Lineage Event: In the final task, I specify the source and target mapping along with associated details, effectively establishing the lineage relationship between the datasets.
Exploring the data lineage process using GCP BigQuery and dataplex custom lineage python client.
once you have linage run created you need to attach an even to that, event is nothing but source to target mapping. for both source and target you need to use fully qualified name with proper protocols. please visit following page to see all the supported protocols for source and target FQDN
For us, it’s a same process which ingest new file into table, rather than creating new process every time, I am just updating the existing process to add new run and linage event.
The organization I am currently consulting has recently migrated to the Google Cloud Platform (GCP) to leverage its powerful services, including Google BigQuery for efficient big data analysis. However, we have observed a significant increase in Query Execution costs and deemed it necessary to investigate the users or teams responsible for these expenses. By identifying the high spenders, we can provide them with valuable insights on optimizing query execution to minimize costs. It’s important to note that these users are transitioning from an on-premises environment where a CAPEX model was implemented, and they may not be fully aware of the cost implications associated with every query on GCP’s BigQuery. We aim to educate them on optimizing their queries to achieve the desired output while minimizing expenses effectively .
In this blog post, we will explore effective strategies to identify which teams or users are driving up costs
Use BigQuery Labels for Cost Attribution
To track query costs accurately, one option is to employ BigQuery labels. Although this method requires users to set labels manually before executing queries, it provides granular cost attribution. However, relying solely on users’ compliance may not always yield optimal results.
Leverage BigQuery Job Information Schema
BigQuery maintains detailed information for each job execution, including user details, slot utilization, and data processed. By querying the job information schema, you can calculate the query analysis cost per user accurately.
Ensure that the following permissions are granted to run this query:
bigquery.resourceViewer
bigquery.metadataViewer
Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
SELECT
user_email,
SUM(total_cost) total_cost_per_user
FROM (
SELECT
reservation_id,
user_email,
CASE
WHEN reservation_id IS NULLTHEN (SUM(total_bytes_processed)/1024/1024/1024/1024)*5-- 5 USD by TB processed
WHEN reservation_id is not nulland reservation_id <> "default-pipeline" then (SUM(jbo.total_slot_ms)/(1000*60*60))*0.069-- 0.69 USD per slot hour for northamerica-northeast1
DATE(creation_time) >= "2023-05-01" --change the filter
GROUPBY
reservation_id,
user_email )
GROUPBY
user_email
ORDER BY
total_cost_per_user DESC
SELECT
user_email,
SUM(total_cost) total_cost_per_user
FROM (
SELECT
reservation_id,
user_email,
CASE
WHEN reservation_id IS NULL THEN (SUM(total_bytes_processed)/1024/1024/1024/1024)*5 -- 5 USD by TB processed
WHEN reservation_id is not null and reservation_id <> "default-pipeline" then (SUM(jbo.total_slot_ms)/(1000*60*60))*0.069 -- 0.69 USD per slot hour for northamerica-northeast1
END
AS total_cost
FROM
region-northamerica-northeast1.INFORMATION_SCHEMA.JOBS_BY_ORGANIZATION jbo
WHERE
DATE(creation_time) >= "2023-05-01" --change the filter
GROUP BY
reservation_id,
user_email )
GROUP BY
user_email
ORDER BY
total_cost_per_user DESC
SELECT
user_email,
SUM(total_cost) total_cost_per_user
FROM (
SELECT
reservation_id,
user_email,
CASE
WHEN reservation_id IS NULL THEN (SUM(total_bytes_processed)/1024/1024/1024/1024)*5 -- 5 USD by TB processed
WHEN reservation_id is not null and reservation_id <> "default-pipeline" then (SUM(jbo.total_slot_ms)/(1000*60*60))*0.069 -- 0.69 USD per slot hour for northamerica-northeast1
END
AS total_cost
FROM
region-northamerica-northeast1.INFORMATION_SCHEMA.JOBS_BY_ORGANIZATION jbo
WHERE
DATE(creation_time) >= "2023-05-01" --change the filter
GROUP BY
reservation_id,
user_email )
GROUP BY
user_email
ORDER BY
total_cost_per_user DESC
Understand the Limitations of Cost Calculation using Information Schema
If your organization is utilizing on-demand pricing in BigQuery, the cost calculated through the information schema will closely align with the cost report.
However, if you organization is using auto-scalling slots, cost calculation through the information schema may not provide accurate results. While the information schema captures slot utilization during query execution, it doesn’t account for slots used during scale-up, scale-down, or the cooldown period. As a result, there may be discrepancies between the cost reported in the information schema and the actual cost shown in the cost report. This difference becomes more prominent for queries with shorter execution times (within 1 minute).
Looker Studio Reports for Quick Analysis and Visualization
To streamline the process of extracting query cost information, consider creating Looker Studio reports. These reports offer date filters, enabling quick access to the desired information. Additionally, Looker Studio reports provide a visual representation of query costs, facilitating a better understanding of cost trends and patterns.