Apache Pig is a platform for analyzing large data sets. Pig Latin is the high level programming language that, lets us specify a sequence of data transformations such as merging data sets, filtering them, grouping them, and applying functions to records or groups of records.

Pig comes with many built-in functions. Also we can create our own user-defined functions to do special-purpose processing. Pig Latin programs complied into Map/Reduce jobs and are executed in a distributed fashion on a hadoop cluster. Let us first install Pig in our EdgeNode. Next we will check the simple pig script execution.

Pig scripts can be executed in local mode or hadoop mode. We are going to install PIG in Hadoop Mode i.e. we will execute the pig scripts in hadoop (mapreduce) mode, and for that we need access to a Hadoop cluster and HDFS installation, that we already have.

Install PIG

First we will ssh login to our EdgeNode. Before installing PIG, we already have a release of Hadoop installed and configured. We will install PIG in the same base directory path where we already have our Hadoop binaries & configuration files and all other client tools in /usr/local.

root@EdgeNode:~# cd /usr/local

Next, we will download a recent stable release of PIG from the below Apache site:
http://www-us.apache.org/dist/pig/pig-0.16.0/
Next follow the Installation steps as below:

root@EdgeNode:/usr/local# wget http://www-us.apache.org/dist/pig/pig-0.16.0/pig-0.16.0.tar.gz
root@EdgeNode:/usr/local# tar -xzvf pig-0.16.0.tar.gz >> /dev/null
root@EdgeNode:/usr/local# mv pig-0.16.0 /usr/local/pig
root@EdgeNode:/usr/local# rm pig-0.16.0.tar.gz

Next we will set the PIG Environment variables in the .bashrc file. Append the below lines, save and quit.

root@EdgeNode:/usr/local# vi ~/.bashrc


export PIG_HOME=/usr/local/pig
export PATH=$PATH:$PIG_HOME/bin

export CLASSPATH=$CLASSPATH:/usr/local/pig/lib/*:.
export PIG_CLASSPATH=$HADOOP_HOME/etc/hadoop

Source the environment file.

root@EdgeNode:/usr/local# source ~/.bashrc

Now let us validate pig is installed properly:

root@EdgeNode:/usr/local# cd $PIG_HOME/bin
root@EdgeNode:/usr/local/pig/bin# pig -version
Apache Pig version 0.16.0 (r1746530)
compiled Jun 01 2016, 23:10:49

Finally we have installed and configured PIG. Now let us write some simple pig latin scripts and validate the execution.

Data Processing Task

We are going to leverage Pig application as an ETL tool to extract data from a source, transform it according to our requirements and finally load it into a datastore. We will perform the following data manipulation operations using Pig Latin

  • LOAD two sample datasets from the file system
  • FILTER tuples from one dataset
  • JOIN two datasets
  • Performs transformations on FOREACH row in the data
  • STORE the results to the file system
  • GROUP the data into another output dataset
  • DUMP or display the cummulative results to screen

Data Preparation

First let us prepare two dummy datasets like below. We would first load two datasets into pig; One is the Q1 Profit of a company & another is the Currency exchange rate. We will perform filter operations to exclude new operating company AUSTRALIA from the Profit dataset. Next we will perform a JOIN & Calculate the Profit Amount for all countries in terms of USD and store the result. Additionally we will add load date as current date as a meta data column. Further we will generate an additional dataset to show the Max,Min,Average,Sum of the Q1 profit for the company and display in screen.

Profit_Q1.txt

Country|Date|Profit|Currency
INDIA|2016-01-31|4500000|INR
US|2016-01-31|9000000|USD
SINGAPORE|2016-01-31|7000000|SGD
AUSTRALIA|2016-01-31||AUD
INDIA|2016-02-29|4900000|INR
US|2016-02-29|8900000|USD
SINGAPORE|2016-02-29|7100000|SGD
AUSTRALIA|2016-02-29||AUD
INDIA|2016-03-31|5000000|INR
US|2016-03-31|9100000|USD
SINGAPORE|2016-03-31|7200000|SGD
AUSTRALIA|2016-03-31||AUD
INDIA|2016-04-30|4800000|INR
US|2016-04-30|8900000|USD
SINGAPORE|2016-04-30|7300000|SGD
AUSTRALIA|2016-04-30||AUD
Exchange_Rate.csv

Exc_Date,From,To,Rate
2016-01-31,INR,USD,0.0147
2016-01-31,SGD,USD,0.702
2016-02-29,INR,USD,0.0146
2016-02-29,SGD,USD,0.711
2016-03-31,INR,USD,0.015
2016-03-31,SGD,USD,0.742
2016-04-30,INR,USD,0.015
2016-04-30,SGD,USD,0.7439

Lets load this sample data first to local system and further put the files into HDFS. Follow the commands as below:

root@EdgeNode:/usr/local/pig/bin# cd ~

root@EdgeNode:~# echo "Country|Date|Profit|Currency
INDIA|2016-01-31|4500000|INR
US|2016-01-31|9000000|USD
SINGAPORE|2016-01-31|7000000|SGD
AUSTRALIA|2016-01-31||AUD
INDIA|2016-02-29|4900000|INR
US|2016-02-29|8900000|USD
SINGAPORE|2016-02-29|7100000|SGD
AUSTRALIA|2016-02-29||AUD
INDIA|2016-03-31|5000000|INR
US|2016-03-31|9100000|USD
SINGAPORE|2016-03-31|7200000|SGD
AUSTRALIA|2016-03-31||AUD
INDIA|2016-04-30|4800000|INR
US|2016-04-30|8900000|USD
SINGAPORE|2016-04-30|7300000|SGD
AUSTRALIA|2016-04-30||AUD" >> Profit_Q1.txt


root@EdgeNode:~# echo "Exc_Date,From,To,Rate
2016-01-31,INR,USD,0.0147
2016-01-31,SGD,USD,0.702
2016-02-29,INR,USD,0.0146
2016-02-29,SGD,USD,0.711
2016-03-31,INR,USD,0.015
2016-03-31,SGD,USD,0.742
2016-04-30,INR,USD,0.015
2016-04-30,SGD,USD,0.7439" >> Exchange_Rate.csv
root@EdgeNode:/usr/local/pig/bin# cd ~
root@EdgeNode:~# hadoop fs -mkdir /pig_analytics
root@EdgeNode:~# hadoop fs -copyFromLocal /root/Profit_Q1.txt /pig_analytics
root@EdgeNode:~# hadoop fs -copyFromLocal /root/Exchange_Rate.csv /pig_analytics

Pig Latin Script

Below is our Pig Latin Script to process the data.

root@EdgeNode:~# vi profit_analysis.pig
profit_analysis.pig

/* Script to Process Profit Analysis */

-- Extract Source Profit Data
in_profit = LOAD '/pig_analytics/Profit_Q1.txt' USING PigStorage('|') AS (country:chararray, date:chararray, profit:float, currency:chararray);

-- Filter Header & country not Australia
profit = FILTER in_profit BY (country != 'Country') AND (country != 'AUSTRALIA');

-- Extract Currency Exchange Data
in_xrate = LOAD '/pig_analytics/Exchange_Rate.csv' USING PigStorage(',') AS (exc_date:chararray, from_cur:chararray, to_cur:chararray, rate:float);

-- Filter Header
xrate = FILTER in_xrate BY (exc_date != 'Exc_Date');

-- Join dataset based on date & source currency
profit_xrate = JOIN profit BY (date, currency) LEFT OUTER, xrate BY (exc_date, from_cur);

-- Calculate conversion amount
profit_rate = FOREACH profit_xrate GENERATE $0 AS country, $3 AS curr, $1 AS date, $2 AS profit_base, $2 * ($7 IS NULL ? 1: $7) AS profit_usd, ToString(CurrentTime(),'yyyy-MM-dd') AS load_date;

-- Load final detail data into HDFS
STORE profit_rate INTO '/pig_analytics/out_profit_Q1_dtl' USING PigStorage (',');

-- Group dataset by Country
profit_by_country = GROUP profit_rate BY country;

-- Perform Aggregate operations based on Groups
profit_country = FOREACH profit_by_country GENERATE group as country, MIN(profit_rate.date) AS st_date, MAX(profit_rate.date) AS end_date, SUM(profit_rate.profit_usd) AS total_profit, AVG(profit_rate.profit_usd) AS avg_profit, ToString(CurrentTime(),'yyyy-MM-dd') AS load_date;

-- Load final summary data into HDFS
STORE profit_country INTO '/pig_analytics/out_profit_Q1' USING PigStorage (',');

root@EdgeNode:~# pig
grunt> exec /root/profit_analysis.pig
grunt> quit

Let us validate the data loaded in HDFS.

root@EdgeNode:~# hadoop fs -ls -R /pig_analytics
root@EdgeNode:~# hadoop fs -cat /pig_analytics/out_profit_Q1_dtl/part-r-00000
root@EdgeNode:~# hadoop fs -cat /pig_analytics/out_profit_Q1/part-r-00000

Next we will learn how to provision streaming data using Apache Flume. Let's get started.


Have a question on this subject?

Ask questions to our expert community members and clear your doubts. Asking question or engaging in technical discussion is both easy and rewarding.

Are you on Twitter?

Start following us. This way we will always keep you updated with what's happening in Data Analytics community. We won't spam you. Promise.

  • Stream Webserver Log into Hdfs using FLUME

    In this article we will use Apache Flume to gather stream access log data from our remote Web Server into Hadoop Distributed File System. We will be analyzing the access log in a real-time basis. So we have to setup Flume such that it collects the...

  • Install PIG In Client Node of Hadoop Cluster

    Apache Pig is a platform for analyzing large data sets. Pig Latin is the high level programming language that, lets us specify a sequence of data transformations such as merging data sets, filtering them, grouping them, and applying functions to...

  • Hadoop DataLake Implementation Part 6

    In this article we will load the Customer data in the Hive warehouse as SCD Type 1. This time we will follow a different approach to implement Insert/Update or Merge strategy using Hive QL, rather than SQOOP Merge utility

  • Oracle Installation for SQOOP Import

    We would like to perform practical test of Apache SQOOP import/export utility between ORACLE relational database & Apache HADOOP file system, let us quickly setup an ORACLE server. For that we will be using cloud based services/servers as we did...

  • Hadoop DataLake Implementation Part 7

    In this article we will load our master data table ‘Product’ as Slowly Changing Dimension of Type 2 to maintain full history, so as to analyze the sales and stocks data with reference to the historical master data.

  • Install SPARK in Hadoop Cluster

    Apache Spark is a fast and general purpose engine for large-scale data processing over a distributed cluster. Apache Spark has an advanced DAG execution engine that supports cyclic data flow and in-memory computing. Spark run programs up to 100x...

  • SQOOP import from Oracle

    In this article we will use Apache SQOOP to import data from Oracle database. Now that we have an oracle server in our cluster ready, let us login to EdgeNode. Next we will configure sqoop to import this data in HDFS file system followed by direct...

  • Install Hive in Client Node of Hadoop Cluster

    In the previous article, we have shown how to setup a client node. Once this is done, now let's put Hadoop to use for some big data analytics purpose. One way to do that is by using Hive which let's us run SQL queries against the big data. A...

  • Hadoop DataLake Implementation Part 10

    In this article we will create oozie workflow to orchestrate the daily loading of showroom dimension table from MySQL source to HDFS using Sqoop, followed by Loading data from HDFS to Hive warehouse using Hive and finally housekkeping & archive.

  • Configuring MySQL as Hive Metastore

    In the previous article, we have learnt How to Install and Configure Hive with default Derby metastore. However, an embedded derby based metastore can process only one request at a time. Since this is very restrictive, we will setup a traditional...