Skip to content

Category: hive

Distcp to Copy your HDFS data to GCP Cloud Storage

A while back, I found myself deeply immersed in a Hadoop migration project where our cloud platform of choice was Google Cloud Platform (GCP). Our mission? To seamlessly transition data from on-premises infrastructure to the cloud. Due to various constraints, utilizing hardware wasn’t a viable option. Thus, I embarked on a quest to explore multiple software solutions to tackle this challenge.

For one-off migrations, Spark emerged as a favorable choice. It facilitated direct data migration to BigQuery, bypassing the intermediary step of storing it in cloud storage. However, there was a caveat: Spark lacked the ability to detect changes, necessitating a full refresh each time. This approach proved less than ideal, especially when dealing with substantial datasets.

My gaze then turned to Cloudera BDR, but alas, it didn’t support integration with Google Cloud. Left with no alternative, I delved into Distcp. In this blog post, I’ll guide you through the setup process for Distcp, enabling seamless data transfer from an on-prem HDFS cluster to Google Cloud Storage.

Service Account Setup

To begin, create a GCP service account with read/write permissions for the designated Google Cloud Storage bucket. Obtain the JSON key associated with this service account. This key will need to be distributed across all nodes involved in the migration process. For instance, I’ve opted to store it at /tmp/sa-datamigonpremtobigquery.json. Also make sure, the user with which you are going to run distcp command have access to this path.

HDFS.conf

Please store following file on edge node in your home directory. please replace the value of fs.gs.project.id with your project id.

<configuration>
  <property>
    <name>fs.AbstractFileSystem.gs.impl</name>
    <value>com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS</value>
    <description>The AbstractFileSystem for 'gs:' URIs.</description>
  </property>
  <property>
    <name>fs.gs.project.id</name>
    <value>raw-bucket</value>
    <description>
      Optional. Google Cloud Project ID with access to GCS buckets.
      Required only for list buckets and create bucket operations.
    </description>
  </property>
  <property>
    <name>google.cloud.auth.type</name>
    <value>SERVICE_ACCOUNT_JSON_KEYFILE</value>
    <description>
      Authentication type to use for GCS access.
    </description>
  </property>
  <property>
    <name>google.cloud.auth.service.account.json.keyfile</name>
    <value>/tmp/sa-datamigonpremtobigquery.json</value>
    <description>
      The JSON keyfile of the service account used for GCS
      access when google.cloud.auth.type is SERVICE_ACCOUNT_JSON_KEYFILE.
    </description>
  </property>

  <property>
    <name>fs.gs.checksum.type</name>
    <value>CRC32C</value>
    <description>
          https://cloud.google.com/architecture/hadoop/validating-data-transfers
  </description>
  </property>

  <property>
    <name>dfs.checksum.combine.mode</name>
    <value>COMPOSITE_CRC</value>
    <description>
          https://cloud.google.com/architecture/hadoop/validating-data-transfers
  </description>
  </property>
</configuration>

Executing Transfer

hadoop --debug distcp --conf hdfs.conf -pc -update -v -log hdfs:///tmp/distcp_log hdfs:///tmp/ gs://raw-bucket/ 
Leave a Comment

Hive Lateral view explode vs posexplode

Lateral view Explode

Lateral view explode, explodes the array data into multiple rows. for example, let’s say our table look like this, where Telephone is an array of string.

namephone_numberscities
AAA[“365-889-1234”, “365-887-2232”][“Hamilton”][“Burlington”]
BBB[“232-998-3232”, “878-998-2232”][“Toronto”, “Stoney Creek”]

Applying a lateral view explode on the above table will expand the both Telephone and Cities and do a cross join, your final table will look like this.

namephone_numberscities
AAA365-889-1234Hamilton
AAA365-887-2232Hamilton
AAA365-889-1234Burlington
AAA365-887-2232Burlington
BBB232-998-3232Toronto
BBB878-998-2232Toronto
BBB232-998-3232Stoney Creek
BBB878-998-2232Stoney Creek

Lateral View POSExplode

However, this is not what you probably want, if you want to map first telephone number to first city and second with second one, and that for all the records. Then you can use posexplode (positional explode)

posexplode gives you an index along with value when you expand any error, and then you can use this indexes to map values with each other as mentioned below.

select 
    name, 
    phone_number, 
    city 
from temp.test_laterla_view_posexplode
lateral view posexplode(phone_numbers) pn as pos_phone, phone_number
lateral view posexplode(cities) pn as pos_city, city 
where 
    pos_phone == pos_city

With above query you will get following results, where phone number is mapped with corresponding city.

namephone_numbercity
AAA365-889-1234Hamilton
AAA365-887-2232Burlington
BBB232-998-3232Toronto
BBB878-998-2232Stoney Creek
3 Comments

When to use lateral view explode in hive

if you have a table with one or more column with array datatype  and if you want it to expand into multiple rows, you can use lateral view explode function. 

Let’s consider we have following table, where one employee has multiple phone numbers which are stores as part of array (list). 

emp_namephone_numbers
user1[“546-487-3384″,”383-767-2238”]
user2[“373-384-1192″,”374-282-1289″,”332-453-5566”]

However as a output if we want to convert this Array (list) into multiple rows, we can use lateral view explode function a mentioned below 

select emp_name, phone_number 
from 
    temp.test_laterla_view_explode
lateral view explode(phone_numbers) p as phone_number

This will generate the output as mentioned below

emp_namephone_number
user2373-384-1192
user2374-282-1289
user2332-453-5566
user1546-487-3384
user1383-767-2238
1 Comment

Hive – Convert JSON to complex Data Type

if you have a small (not complex) json file and need to create a corresponding hive table, it’s easy. 

{
	"country":"Switzerland",
	"languages":["German","French","Italian"],
	"religions":
		{
			"catholic":[10,20],
			"protestant":[40,50]
		}
}

However that’s hardly the case in real life. we get JSON file with 100s of nested fields.  Manually parsing that into Hive table is a tedious task. 

To ease the work you can take the help of spark.  don’t worry, it’s just two lines of code 🙂 

first put your file in hdfs location 

hdfs dfs -put sample.json /tmp/

Fetch Schema for Hive Table 

>>> df = spark.read.json("/tmp/sample.json")
>>> df
DataFrame[country: string, languages: array, religions: struct,protestant:array>]

Hive table

your final hive table will look like this, with minor modification in schema and adding json serde and other properties. 

CREATE TABLE temp.test_json(
	  country string, 
	  languages array, 
	  religions struct,protestant:array>)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
STORED AS TEXTFILE
location '/tmp/test_json/table/'

if you don’t like making modification to schema,  alternatively you can save you table to hive and get schema using that. 

df.write.saveAsTable("temp.test_json")
And then run following in hive
show create table temp.test_json

Hive Data

either way,  this is how data looks

Leave a Comment

How to Access Hive With Python script?

You can read hive tables using pyhive python library. 

Install PyHive library

pip install pyhive

Connect to Hive using LDAP

from pyhive import hive
connection = hive.connect(host='HIVE_HOST',
                          port=10000,
                          database='temp',
                          username='HIVE_USERNAME',
                          password='HIVE_PASSWORD',
                          auth='CUSTOM')	

Connect to Hive using Kerberos

from pyhive import hive
connection = hive.connect(host='HIVE_HOST',
                          port=10000,
                          database='temp',
                          username='HIVE_USERNAME',
                          auth='KERBEROS',
                          kerberos_service_name='hive')	

To connect using kerberos, you don’t need to supply password. However you need to provide kerberos service name.  

Execute hive Query using PyHive

query="select * from temp.test_table"
cur = connection.cursor()
cur.execute(query)
res = cur.fetchall()

2 Comments

What is Predicate PushDown in Hive?

Predicate Pushdown in hive is a feature to Push your predicate ( where condition) further up in the query. It tries to execute the expression as early as possible in plan. 

Let’s try to understand this by example. let’s consider we have two tables, product and sales and we want to answer following question. 

How many products of brand Washington has been sold so far?

Non-Optimized Query

Following query will answer the above question. However, if you are familiar with sql you will realize that above query is not optimized.  It applies first joins the two table and then  applies the condition (predicate).

select sum(s.unit_sales) from foodmart.product p 
join 
	foodmart.sales_fact_dec_1998 s 
on 
	p.product_id = s.product_id
where 
	p.brand_name = "Washington"

Optimized Query

We could easily optimize this above query by applying condition first on product table and then joining it to sales table as mentioned below.

SELECT sum(s.unit_sales)
FROM foodmart.sales_fact_dec_1998 s
JOIN (
	SELECT product_id, brand_name
	FROM foodmart.product
	WHERE 
		brand_name = "Washington"
	) p
ON 
	p.product_id = s.product_id

This is what PPD (predicate pushdown) does internally.  if you have ppd enabled your first query will automatically be converted to a second optimized query.

Let’s see this in action.  The product table has total 1560 rows (product) with only 11 products with the brand name Washington.

For better understanding, I have disabled the vectorization.  If you are not sure what vectorization is, please read the following blog post – What is vectorization? 

Running Query with PPD Disabled

Following is the DAG of the first query with PPD disabled. 
Please set the following parameter to false, to disable the PPD.

set hive.optimize.ppd=false;

if you notice, it’s reading all rows from the product table and then passing it to the reducer for join. 

DAG when PPD (predicate pushdown) is disabled.
DAG for first query when PPD is disabled

Running Query with PPD Enabled.

And Following is the DAG of the same query with PPD Enabled.
Please set the following parameter to true, to enable the PPD.

set hive.optimize.ppd=true;

Once, we enable the PPD, it first applies the condition on product table and sends only 11 rows to the reducer for join.

DAG when PPD (predicate pushdown) is enabled.
DAG for first query when PPD is enabled

2 Comments

What is vectorization in hive?

Vectorization in hive is a feature (available from Hive 0.13.0) which when enabled rather than reading one row at a time it reads a block on 1024 rows .  This Improves the CPU Usage for operation like,  Scan, Filter, join and aggregations. 

Note that, Vectorization is only available if data is stored in ORC format

How to Enable Vectorized Execution?

To Enable Vectorized

set hive.vectorized.execution.enabled = true;

To Disable Vectorized

set hive.vectorized.execution.enabled = false;

Difference Vectorized vs Non-Vectorized Queries. 

I have a Product Table with 1560 rows and I want to know how many products has name with Washington in it. 

Non-Vectorized Query. 

set hive.vectorized.execution.enabled = false;
select count(*) from foodmart.product 
where 
	product.product_name like "%Washington%"

In the below image you will notice that INPUT_RECORDS_PROCESSED is 1560.  

Non-Vectorized Hive Query shows all 1560 records being read.
Non-Vectorized Query DAG

Vectorized Query

set hive.vectorized.execution.enabled = true;
select count(*) from foodmart.product 
where 
	product.product_name like "%Washington%"

In Below image you will see that INPUT_RECORDS_PROCESSED is only 2. This is because we have enabled the vectorized which rather than processing one row, it processed 1024 rows in a block. if you will divided 1560 by 1024, you will get two blocks. 1560/1024 = 2 (block has to be int value) 

Vectorized Hive Query shows only two records being read.
Vectorized Query DAG.
2 Comments

How MapJoin works in hive?

There are two types of join common join also knows as distributed join and a mapjoin or also knows as mapside join. 

Before we jump to mapjoin let me give you an overview of common join. How common join works is, it distributes all the rows based on your join key on all the nodes.  After this all the keys with same value ends on the same node and the in the final reducer step the join happens.

Common joins are good when I have both the tables really huge. However, what if I have one table with 1 TB of data and other table with just 10MB if data. Common join would take more time to distribute the row.

In such case, mapjoin help a lot. How it works is rather the distributing the rows from both the tables, it keeps the small table into memory and for every mapper of big table it join it reads the small table from memory. This process doesn’t required any reducer to join and so the name map-join or map side join. 

Let’s see this with an example. 

Let’s consider we have two tables employee and employee_location.  employee is a huge table and employee_location is a small enough to fit in memory

By Default MapJoins are enabled and so if you you will join above two tables, mapjoin is going to happen. 

select * from emp join emp_location on emp.id == emp_location.id 

This is how DAG looks for above mapjoin. 

mapjoin
DAG for mapjoin (mapside join)

Now let’s disable the map join and see what happens when we try to join same two tables. 

set hive.auto.convert.join = false;
select * from emp join emp_location on emp.id == emp_location.id;

And this is how DAG looks like for common join or distributed join. 

Distributed join DAG.
DAG for distributed join

Parameters Affecting Mapjoin

Following are four parameters which affects the join.

hive.auto.convert.join

Whether Hive enables the optimization about converting common join into mapjoin based on the input file size.
Default Value: true

hive.mapjoin.smalltable.filesize

Applicable only if above parameter is set to  true. The threshold (in bytes) for the input file size of the small tables; if the file size is smaller than this threshold, it will try to convert the common join into map join.
Default Value: 25000000 (25 MB)

hive.auto.convert.join.noconditionaltask

Whether Hive enables the optimization about converting common join into mapjoin based on the input file size. If this parameter is on, and the sum of size for n-1 of the tables/partitions for an n-way join is smaller than the size specified by hive.auto.convert.join.noconditionaltask.size, the join is directly converted to a mapjoin (there is no conditional task).
Default Value: True

hive.auto.convert.join.noconditionaltask.size

If hive.auto.convert.join.noconditionaltask is off, this parameter does not take effect. However, if it is on, and the sum of size for n-1 of the tables/partitions for an n-way join is smaller than this size, the join is directly converted to a mapjoin (there is no conditional task).
Default Value: 10 MB

Leave a Comment

Update Hive Table

Hive is a append only database and so update and delete is not supported on hive external and managed table. 

From hive version 0.14 the have started a new feature called transactional.  Which allows to have ACID properties for a particular hive table and allows to delete and update. but let’s keep the transactional table for any other posts. 

Here let’s discuss how to update hive table which is not transaction, either external or managed ( External table couldn’t be transactional).

Chances are if you have tried to update the hive table, external or managed (non transactional), you might have got below errors, depends on your hive version.

 select * from temp.test_udpate;
+-----------------+-------------------+--+
| test_update.id  | test_update.name  |
+-----------------+-------------------+--+
| 1               | test user 1       |
| 2               | test user 2       |
| 2               | test user 3       |
+-----------------+-------------------+--+
delete from temp.test1 where id=1;
Error: Error while compiling statement: FAILED:
SemanticException [Error 10297]: Attempt to do update or
delete on table temp.test1 that does not use an
AcidOutputFormat or is not bucketed (state=42000,code=10297)

Then the question is how to update or delete a record in hive table?

Deleting Records in Hive Table

Deleting rerecords is easy,  you can use insert overwrite Syntax for this. Let’s says we want to delete a record from above hive table which has name as “test user 3”.  Then we need to select all the records which does not have name as “test user 3” and overwrite into same table. 

 
insert overwrite table temp.test_update \
select * from temp.test_update \ 
where name = "test user 3";

Update Records in Hive Table

updating the record consist of three steps as mentioned below. Let’s say in your test.update table we want to update the id to for all the records which has name as “test user 3”

Create a temporary table which has updated record

 
create temporary table temp.test \
as select 3 as id, name from temp.test_update \
where name="test user 3";

Delete the records which you want to update from original table

  insert overwrite table temp.test_update \
select * from temp.test_update \
where name="test user 3";

Insert the Updated Record(s)

 insert into table temp.test_update select * from temp.test;

3 Comments