Install PIG In Client Node of Hadoop Cluster
Apache Pig is a platform for analyzing large data sets. Pig Latin is the high level programming language that, lets us specify a sequence of data transformations such as merging data sets, filtering them, grouping them, and applying functions to records or groups of records.
Pig comes with many built-in functions. Also we can create our own user-defined functions to do special-purpose processing. Pig Latin programs complied into Map/Reduce jobs and are executed in a distributed fashion on a hadoop cluster. Let us first install Pig in our EdgeNode. Next we will check the simple pig script execution.
Pig scripts can be executed in local mode or hadoop mode. We are going to install PIG in Hadoop Mode i.e. we will execute the pig scripts in hadoop (mapreduce) mode, and for that we need access to a Hadoop cluster and HDFS installation, that we already have.
Install PIG
First we will ssh login to our EdgeNode.
Before installing PIG, we already have a release of Hadoop installed and configured. We will install PIG in the same base directory path where we already have our Hadoop binaries & configuration files and all other client tools in /usr/local.
root@EdgeNode:~# cd /usr/local
Next, we will download a recent stable release of PIG from the below Apache site:
http://www-us.apache.org/dist/pig/pig-0.16.0/
Next follow the Installation steps as below:
root@EdgeNode:/usr/local# wget http://www-us.apache.org/dist/pig/pig-0.16.0/pig-0.16.0.tar.gz
root@EdgeNode:/usr/local# tar -xzvf pig-0.16.0.tar.gz >> /dev/null
root@EdgeNode:/usr/local# mv pig-0.16.0 /usr/local/pig
root@EdgeNode:/usr/local# rm pig-0.16.0.tar.gz
Next we will set the PIG Environment variables in the .bashrc file. Append the below lines, save and quit.
root@EdgeNode:/usr/local# vi ~/.bashrc
export PIG_HOME=/usr/local/pig
export PATH=$PATH:$PIG_HOME/bin
export CLASSPATH=$CLASSPATH:/usr/local/pig/lib/*:.
export PIG_CLASSPATH=$HADOOP_HOME/etc/hadoop
Source the environment file.
root@EdgeNode:/usr/local# source ~/.bashrc
Now let us validate pig is installed properly:
root@EdgeNode:/usr/local# cd $PIG_HOME/bin
root@EdgeNode:/usr/local/pig/bin# pig -version
Apache Pig version 0.16.0 (r1746530)
compiled Jun 01 2016, 23:10:49
Finally we have installed and configured PIG. Now let us write some simple pig latin scripts and validate the execution.
Data Processing Task
We are going to leverage Pig application as an ETL tool to extract data from a source, transform it according to our requirements and finally load it into a datastore. We will perform the following data manipulation operations using Pig Latin
- LOAD two sample datasets from the file system
- FILTER tuples from one dataset
- JOIN two datasets
- Performs transformations on FOREACH row in the data
- STORE the results to the file system
- GROUP the data into another output dataset
- DUMP or display the cumulative results to screen
Data Preparation
First let us prepare two dummy datasets like below. We would first load two datasets into pig; One is the Q1 Profit of a company & another is the Currency exchange rate. We will perform filter operations to exclude new operating company AUSTRALIA from the Profit dataset. Next we will perform a JOIN & Calculate the Profit Amount for all countries in terms of USD and store the result. Additionally we will add load date as current date as a meta data column. Further we will generate an additional dataset to show the Max,Min,Average,Sum of the Q1 profit for the company and display in screen.
Profit_Q1.txt
Country|Date|Profit|Currency
INDIA|2016-01-31|4500000|INR
US|2016-01-31|9000000|USD
SINGAPORE|2016-01-31|7000000|SGD
AUSTRALIA|2016-01-31||AUD
INDIA|2016-02-29|4900000|INR
US|2016-02-29|8900000|USD
SINGAPORE|2016-02-29|7100000|SGD
AUSTRALIA|2016-02-29||AUD
INDIA|2016-03-31|5000000|INR
US|2016-03-31|9100000|USD
SINGAPORE|2016-03-31|7200000|SGD
AUSTRALIA|2016-03-31||AUD
INDIA|2016-04-30|4800000|INR
US|2016-04-30|8900000|USD
SINGAPORE|2016-04-30|7300000|SGD
AUSTRALIA|2016-04-30||AUD
Exchange_Rate.csv
Exc_Date,From,To,Rate
2016-01-31,INR,USD,0.0147
2016-01-31,SGD,USD,0.702
2016-02-29,INR,USD,0.0146
2016-02-29,SGD,USD,0.711
2016-03-31,INR,USD,0.015
2016-03-31,SGD,USD,0.742
2016-04-30,INR,USD,0.015
2016-04-30,SGD,USD,0.7439
Let's load this sample data first to local system and further put the files into HDFS. Follow the commands as below:
root@EdgeNode:/usr/local/pig/bin# cd ~
root@EdgeNode:~# echo "Country|Date|Profit|Currency
INDIA|2016-01-31|4500000|INR
US|2016-01-31|9000000|USD
SINGAPORE|2016-01-31|7000000|SGD
AUSTRALIA|2016-01-31||AUD
INDIA|2016-02-29|4900000|INR
US|2016-02-29|8900000|USD
SINGAPORE|2016-02-29|7100000|SGD
AUSTRALIA|2016-02-29||AUD
INDIA|2016-03-31|5000000|INR
US|2016-03-31|9100000|USD
SINGAPORE|2016-03-31|7200000|SGD
AUSTRALIA|2016-03-31||AUD
INDIA|2016-04-30|4800000|INR
US|2016-04-30|8900000|USD
SINGAPORE|2016-04-30|7300000|SGD
AUSTRALIA|2016-04-30||AUD" >> Profit_Q1.txt
root@EdgeNode:~# echo "Exc_Date,From,To,Rate
2016-01-31,INR,USD,0.0147
2016-01-31,SGD,USD,0.702
2016-02-29,INR,USD,0.0146
2016-02-29,SGD,USD,0.711
2016-03-31,INR,USD,0.015
2016-03-31,SGD,USD,0.742
2016-04-30,INR,USD,0.015
2016-04-30,SGD,USD,0.7439" >> Exchange_Rate.csv
root@EdgeNode:/usr/local/pig/bin# cd ~
root@EdgeNode:~# hadoop fs -mkdir /pig_analytics
root@EdgeNode:~# hadoop fs -copyFromLocal /root/Profit_Q1.txt /pig_analytics
root@EdgeNode:~# hadoop fs -copyFromLocal /root/Exchange_Rate.csv /pig_analytics
Pig Latin Script
root@EdgeNode:~# vi profit_analysis.pig
/* Script to Process Profit Analysis */
-- Extract Source Profit Data
in_profit = LOAD '/pig_analytics/Profit_Q1.txt' USING PigStorage('|') AS (country:chararray, date:chararray, profit:float, currency:chararray);
-- Filter Header & country not Australia
profit = FILTER in_profit BY (country != 'Country') AND (country != 'AUSTRALIA');
-- Extract Currency Exchange Data
in_xrate = LOAD '/pig_analytics/Exchange_Rate.csv' USING PigStorage(',') AS (exc_date:chararray, from_cur:chararray, to_cur:chararray, rate:float);
-- Filter Header
xrate = FILTER in_xrate BY (exc_date != 'Exc_Date');
-- Join dataset based on date & source currency
profit_xrate = JOIN profit BY (date, currency) LEFT OUTER, xrate BY (exc_date, from_cur);
-- Calculate conversion amount
profit_rate = FOREACH profit_xrate GENERATE $0 AS country, $3 AS curr, $1 AS date, $2 AS profit_base, $2 * ($7 IS NULL ? 1: $7) AS profit_usd, ToString(CurrentTime(),'yyyy-MM-dd') AS load_date;
-- Load final detail data into HDFS
STORE profit_rate INTO '/pig_analytics/out_profit_Q1_dtl' USING PigStorage (',');
-- Group dataset by Country
profit_by_country = GROUP profit_rate BY country;
-- Perform Aggregate operations based on Groups
profit_country = FOREACH profit_by_country GENERATE group as country, MIN(profit_rate.date) AS st_date, MAX(profit_rate.date) AS end_date, SUM(profit_rate.profit_usd) AS total_profit, AVG(profit_rate.profit_usd) AS avg_profit, ToString(CurrentTime(),'yyyy-MM-dd') AS load_date;
-- Load final summary data into HDFS
STORE profit_country INTO '/pig_analytics/out_profit_Q1' USING PigStorage (',');
root@EdgeNode:~# pig
grunt> exec /root/profit_analysis.pig
grunt> quit
Let us validate the data loaded in HDFS.
root@EdgeNode:~# hadoop fs -ls -R /pig_analytics
root@EdgeNode:~# hadoop fs -cat /pig_analytics/out_profit_Q1_dtl/part-r-00000
root@EdgeNode:~# hadoop fs -cat /pig_analytics/out_profit_Q1/part-r-00000
Next we will learn how to provision streaming data using Apache Flume. Let's get started.