Skip to content

Category: Hadoop

Distcp to Copy your HDFS data to GCP Cloud Storage

A while back, I found myself deeply immersed in a Hadoop migration project where our cloud platform of choice was Google Cloud Platform (GCP). Our mission? To seamlessly transition data from on-premises infrastructure to the cloud. Due to various constraints, utilizing hardware wasn’t a viable option. Thus, I embarked on a quest to explore multiple software solutions to tackle this challenge.

For one-off migrations, Spark emerged as a favorable choice. It facilitated direct data migration to BigQuery, bypassing the intermediary step of storing it in cloud storage. However, there was a caveat: Spark lacked the ability to detect changes, necessitating a full refresh each time. This approach proved less than ideal, especially when dealing with substantial datasets.

My gaze then turned to Cloudera BDR, but alas, it didn’t support integration with Google Cloud. Left with no alternative, I delved into Distcp. In this blog post, I’ll guide you through the setup process for Distcp, enabling seamless data transfer from an on-prem HDFS cluster to Google Cloud Storage.

Service Account Setup

To begin, create a GCP service account with read/write permissions for the designated Google Cloud Storage bucket. Obtain the JSON key associated with this service account. This key will need to be distributed across all nodes involved in the migration process. For instance, I’ve opted to store it at /tmp/sa-datamigonpremtobigquery.json. Also make sure, the user with which you are going to run distcp command have access to this path.

HDFS.conf

Please store following file on edge node in your home directory. please replace the value of fs.gs.project.id with your project id.

<configuration>
  <property>
    <name>fs.AbstractFileSystem.gs.impl</name>
    <value>com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS</value>
    <description>The AbstractFileSystem for 'gs:' URIs.</description>
  </property>
  <property>
    <name>fs.gs.project.id</name>
    <value>raw-bucket</value>
    <description>
      Optional. Google Cloud Project ID with access to GCS buckets.
      Required only for list buckets and create bucket operations.
    </description>
  </property>
  <property>
    <name>google.cloud.auth.type</name>
    <value>SERVICE_ACCOUNT_JSON_KEYFILE</value>
    <description>
      Authentication type to use for GCS access.
    </description>
  </property>
  <property>
    <name>google.cloud.auth.service.account.json.keyfile</name>
    <value>/tmp/sa-datamigonpremtobigquery.json</value>
    <description>
      The JSON keyfile of the service account used for GCS
      access when google.cloud.auth.type is SERVICE_ACCOUNT_JSON_KEYFILE.
    </description>
  </property>

  <property>
    <name>fs.gs.checksum.type</name>
    <value>CRC32C</value>
    <description>
          https://cloud.google.com/architecture/hadoop/validating-data-transfers
  </description>
  </property>

  <property>
    <name>dfs.checksum.combine.mode</name>
    <value>COMPOSITE_CRC</value>
    <description>
          https://cloud.google.com/architecture/hadoop/validating-data-transfers
  </description>
  </property>
</configuration>

Executing Transfer

hadoop --debug distcp --conf hdfs.conf -pc -update -v -log hdfs:///tmp/distcp_log hdfs:///tmp/ gs://raw-bucket/ 
Leave a Comment

Spark – How to rename multiple columns in DataFrame

In the last post we show how to apply a function to multiple columns. And if you have done that, you might have multiple column with desired data. However, you might want to rename back to original name.

let’s consider you have following dataframe. And you want to rename all the columns to different name.

>>> df.printSchema()
root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- joining_dt: date (nullable = true)

First thing you need is map which contains mapping from old names to new names and a small functional programming.

How to rename multiple columns in Pyspark

from pyspark.sql.functions import col
col_rename = {"age":"new_age", "name":"new_name", "joining_dt":"new_joining_dt"}
df_with_col_renamed = df.select([col(c).alias(col_rename.get(c,c)) for c in df.columns])
>>> df_with_col_renamed.printSchema()
root
 |-- new_name: string (nullable = true)
 |-- new_age: integer (nullable = true)
 |-- new_joining_dt: date (nullable = true)

How to rename multiple columns in spark using Scala

val colToRename = Map("age"->"new_age", 
					  "name"->"new_name", 
					  "joining_dt"->"new_joining_dt")
val newDf = df.select(
				df.columns.map{
						oldName=>col(oldName).alias(colToRename.getOrElse(oldName, oldName))
				}: _*)
Leave a Comment

Spark – How to apply a function to multiple columns on DataFrame?

let’s see that you have a spark dataframe and you want to apply a function to multiple columns. One way is to use WithColumn multiple times. However, that’s good when you have only few columns and you know column names in advance. Otherwise, it’s tedious and error-some.

So let’s see how to do that

val df=List(("$100", "$90", "$10")).toDF("selling_price", "market_price", "profit")
+-------------+------------+------+
|selling_price|market_price|profit|
+-------------+------------+------+
|         $100|         $90|   $10|
+-------------+------------+------+

Let’s consider you have a spark dataframe as above with more than 50 such columns, and you want to remove $ character and convert datatype to Decimal. Rather than writing 50 lines of code, you can do that using fold in less than 5 lines.

First, Create a list with new column name (yes, you need new column name) and the function you want to apply. I just added _new to existing column name so it’s easier to rename later.
And next thing you need is to utilize foldLeft method to recursively function from a list to given dataframe.

import scala.collection.mutable.ListBuffer
import org.apache.spark.sql.types.DataTypes._
val df=List(("$100", "$90", "$10")).toDF("selling_price", "market_price", "profit")
df.show
val operations =  ListBuffer[(String, org.apache.spark.sql.Column)]()
val colNames = df.columns
val DecimalType = createDecimalType(10, 4)
colNames.foreach{colName =>
  val operation = (s"${colName}_new", regexp_replace(col(colName), lit("\$"), lit("")).cast(DecimalType))
  operations += operation
}

val dfWithNewColumns = operations.foldLeft(df) { (tempDF, listValue) =>
  tempDF.withColumn(listValue._1, listValue._2)
}

dfWithNewColumns.show

let’s see if that worked.

 
scala> dfWithNewColumns.printSchema
root
 |-- selling_price: string (nullable = true)
 |-- market_price: string (nullable = true)
 |-- profit: string (nullable = true)
 |-- selling_price_new: decimal(10,4) (nullable = true)
 |-- market_price_new: decimal(10,4) (nullable = true)
 |-- profit_new: decimal(10,4) (nullable = true)


scala> dfWithNewColumns.show
+-------------+------------+------+-----------------+----------------+----------+
|selling_price|market_price|profit|selling_price_new|market_price_new|profit_new|
+-------------+------------+------+-----------------+----------------+----------+
|         $100|         $90|   $10|         100.0000|         90.0000|   10.0000|
+-------------+------------+------+-----------------+----------------+----------+

Leave a Comment

Hive Lateral view explode vs posexplode

Lateral view Explode

Lateral view explode, explodes the array data into multiple rows. for example, let’s say our table look like this, where Telephone is an array of string.

namephone_numberscities
AAA[“365-889-1234”, “365-887-2232”][“Hamilton”][“Burlington”]
BBB[“232-998-3232”, “878-998-2232”][“Toronto”, “Stoney Creek”]

Applying a lateral view explode on the above table will expand the both Telephone and Cities and do a cross join, your final table will look like this.

namephone_numberscities
AAA365-889-1234Hamilton
AAA365-887-2232Hamilton
AAA365-889-1234Burlington
AAA365-887-2232Burlington
BBB232-998-3232Toronto
BBB878-998-2232Toronto
BBB232-998-3232Stoney Creek
BBB878-998-2232Stoney Creek

Lateral View POSExplode

However, this is not what you probably want, if you want to map first telephone number to first city and second with second one, and that for all the records. Then you can use posexplode (positional explode)

posexplode gives you an index along with value when you expand any error, and then you can use this indexes to map values with each other as mentioned below.

select 
    name, 
    phone_number, 
    city 
from temp.test_laterla_view_posexplode
lateral view posexplode(phone_numbers) pn as pos_phone, phone_number
lateral view posexplode(cities) pn as pos_city, city 
where 
    pos_phone == pos_city

With above query you will get following results, where phone number is mapped with corresponding city.

namephone_numbercity
AAA365-889-1234Hamilton
AAA365-887-2232Burlington
BBB232-998-3232Toronto
BBB878-998-2232Stoney Creek
3 Comments

When to use lateral view explode in hive

if you have a table with one or more column with array datatype  and if you want it to expand into multiple rows, you can use lateral view explode function. 

Let’s consider we have following table, where one employee has multiple phone numbers which are stores as part of array (list). 

emp_namephone_numbers
user1[“546-487-3384″,”383-767-2238”]
user2[“373-384-1192″,”374-282-1289″,”332-453-5566”]

However as a output if we want to convert this Array (list) into multiple rows, we can use lateral view explode function a mentioned below 

select emp_name, phone_number 
from 
    temp.test_laterla_view_explode
lateral view explode(phone_numbers) p as phone_number

This will generate the output as mentioned below

emp_namephone_number
user2373-384-1192
user2374-282-1289
user2332-453-5566
user1546-487-3384
user1383-767-2238
1 Comment

Hive – Convert JSON to complex Data Type

if you have a small (not complex) json file and need to create a corresponding hive table, it’s easy. 

{
	"country":"Switzerland",
	"languages":["German","French","Italian"],
	"religions":
		{
			"catholic":[10,20],
			"protestant":[40,50]
		}
}

However that’s hardly the case in real life. we get JSON file with 100s of nested fields.  Manually parsing that into Hive table is a tedious task. 

To ease the work you can take the help of spark.  don’t worry, it’s just two lines of code 🙂 

first put your file in hdfs location 

hdfs dfs -put sample.json /tmp/

Fetch Schema for Hive Table 

>>> df = spark.read.json("/tmp/sample.json")
>>> df
DataFrame[country: string, languages: array, religions: struct,protestant:array>]

Hive table

your final hive table will look like this, with minor modification in schema and adding json serde and other properties. 

CREATE TABLE temp.test_json(
	  country string, 
	  languages array, 
	  religions struct,protestant:array>)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
STORED AS TEXTFILE
location '/tmp/test_json/table/'

if you don’t like making modification to schema,  alternatively you can save you table to hive and get schema using that. 

df.write.saveAsTable("temp.test_json")
And then run following in hive
show create table temp.test_json

Hive Data

either way,  this is how data looks

Leave a Comment

How to Access Hive With Python script?

You can read hive tables using pyhive python library. 

Install PyHive library

pip install pyhive

Connect to Hive using LDAP

from pyhive import hive
connection = hive.connect(host='HIVE_HOST',
                          port=10000,
                          database='temp',
                          username='HIVE_USERNAME',
                          password='HIVE_PASSWORD',
                          auth='CUSTOM')	

Connect to Hive using Kerberos

from pyhive import hive
connection = hive.connect(host='HIVE_HOST',
                          port=10000,
                          database='temp',
                          username='HIVE_USERNAME',
                          auth='KERBEROS',
                          kerberos_service_name='hive')	

To connect using kerberos, you don’t need to supply password. However you need to provide kerberos service name.  

Execute hive Query using PyHive

query="select * from temp.test_table"
cur = connection.cursor()
cur.execute(query)
res = cur.fetchall()

2 Comments

Import Data from Netezza to Hive using sqoop

following is the syntax for importing data from netezza to hive. 

sqoop import \
--connect jdbc:netezza://:/ \
--username= \
--password= \
--table  \
--hcatalog-database  \
--hcatalog-table  \
-m 1

provide the username in all CAPS, otherwise it will throw authentication error.

create hive table. 

if you don’t have the corresponding hive table, you can use the following option.  which will create hive table if it doesn’t exist. 

--create-hcatalog-table 

Sqoop import change hive table format and properties

if you want to change the format of the hive table, you can do with following option. 

--hcatalog-storage-stanza \
'stored as orc tblproperties ("orc.compress"="SNAPPY")'
Leave a Comment

NiFi API to filter Processor Groups

Recently I was working on NiFi and realize that our Dev Instance is running too slow, reason being Developers forgot to cleanup their work. 

And So I used the NiFi API and write the code in python to identify some of the processor groups which we can delete. I will walk through the code here.

How to get API token?

To access NiFi endpoint behind the security you would need API token. And then you need to send this API token in header of all your request. 

Following code will take the username and password and based on 

def get_token(username, password, host):
    url = "https://%s:8080/nifi-api/access/token" % host
    header = {"Content-Type": "application/x-www-form-urlencoded;charset=UTF-8"}
    data = {"username": username, "password": password}

    resp = requests.post(url, data=data, headers=header, verify=False)

    if resp.status_code not in (200, 201):
        print resp.reason
        print resp.text
        exit(1)
    return resp.text

How to Find Stopped Processor Group?

Following code will find all the processor group which doesn’t have any processor running.

def find_stopped_processor(group_id, host):
    url = "https://%s:8080/nifi-api/process-groups/%s" % (host, group_id)
    header = {"Authorization": "Bearer %s" % utils.ACCESS_TOKEN}
    r = requests.get(url, headers=header, verify=False)
    resp = r.json()
    running_count = resp.get("runningCount")
    if int(running_count) == 0:
        print "%s,%s" % (group_id, resp.get("status").get("name"))


def find_processor_group_stopped_processor(parent_processor, host):
    for processor_group in parent_processor:
        p = processor_group.get("processGroupStatusSnapshot")
        if len(p.get("processGroupStatusSnapshots")) > 0:
            find_processor_group_stopped_processor(p.get("processGroupStatusSnapshots"))
        else:
            find_stopped_processor(p.get("id"), host)

Leave a Comment

What is Predicate PushDown in Hive?

Predicate Pushdown in hive is a feature to Push your predicate ( where condition) further up in the query. It tries to execute the expression as early as possible in plan. 

Let’s try to understand this by example. let’s consider we have two tables, product and sales and we want to answer following question. 

How many products of brand Washington has been sold so far?

Non-Optimized Query

Following query will answer the above question. However, if you are familiar with sql you will realize that above query is not optimized.  It applies first joins the two table and then  applies the condition (predicate).

select sum(s.unit_sales) from foodmart.product p 
join 
	foodmart.sales_fact_dec_1998 s 
on 
	p.product_id = s.product_id
where 
	p.brand_name = "Washington"

Optimized Query

We could easily optimize this above query by applying condition first on product table and then joining it to sales table as mentioned below.

SELECT sum(s.unit_sales)
FROM foodmart.sales_fact_dec_1998 s
JOIN (
	SELECT product_id, brand_name
	FROM foodmart.product
	WHERE 
		brand_name = "Washington"
	) p
ON 
	p.product_id = s.product_id

This is what PPD (predicate pushdown) does internally.  if you have ppd enabled your first query will automatically be converted to a second optimized query.

Let’s see this in action.  The product table has total 1560 rows (product) with only 11 products with the brand name Washington.

For better understanding, I have disabled the vectorization.  If you are not sure what vectorization is, please read the following blog post – What is vectorization? 

Running Query with PPD Disabled

Following is the DAG of the first query with PPD disabled. 
Please set the following parameter to false, to disable the PPD.

set hive.optimize.ppd=false;

if you notice, it’s reading all rows from the product table and then passing it to the reducer for join. 

DAG when PPD (predicate pushdown) is disabled.
DAG for first query when PPD is disabled

Running Query with PPD Enabled.

And Following is the DAG of the same query with PPD Enabled.
Please set the following parameter to true, to enable the PPD.

set hive.optimize.ppd=true;

Once, we enable the PPD, it first applies the condition on product table and sends only 11 rows to the reducer for join.

DAG when PPD (predicate pushdown) is enabled.
DAG for first query when PPD is enabled

2 Comments