In this article we will load our first fact table into Hive warehouse which is sales transactions.

Load Sales Fact Table

Using Sqoop we will load the sales data, initial/base as well as incremental dataset from Source MySQL to HDFS.

ssh This email address is being protected from spambots. You need JavaScript enabled to view it. -p 2222
sqoop job --meta-connect "jdbc:hsqldb:hsql://sandbox-hdp.hortonworks.com:16001/sqoop" \
--create jb_stg_sales \
-- import \
--bindir ./ \
--driver com.mysql.jdbc.Driver \
--connect jdbc:mysql://sandbox-hdp.hortonworks.com:3306/sales \
--username root \
--password-file /user/edw_user/sales/.password \
--table sales \
--fetch-size 1000 \
--as-textfile \
--fields-terminated-by '|' \
--target-dir /user/edw_user/sales/staging/sales \
--incremental append \
--check-column id \
--split-by id \
--num-mappers 2

Execute the sqoop job to perform initial extraction from source to hdfs.

sqoop job --meta-connect "jdbc:hsqldb:hsql://sandbox-hdp.hortonworks.com:16001/sqoop" --list
sqoop job --meta-connect "jdbc:hsqldb:hsql://sandbox-hdp.hortonworks.com:16001/sqoop" --show jb_stg_sales
sqoop job --meta-connect "jdbc:hsqldb:hsql://sandbox-hdp.hortonworks.com:16001/sqoop" --exec jb_stg_sales

Now we will define a hive external table for the Sales staging data as well as final Hive managed ORC dimension table. Connect to Beeline CLI using edw_user as username and password as hadoop. We will connect to hive schema ‘sales_analytics’.

One time setup

beeline
!connect jdbc:hive2://sandbox-hdp.hortonworks.com:10000/sales_analytics edw_user

CREATE EXTERNAL TABLE IF NOT EXISTS ext_sales (
id INT,
order_number VARCHAR(50),
customer_id INT,
showroom_id INT,
product_id INT,
quantity VARCHAR(50),
discount INT,
amount INT,
delivered VARCHAR(50),
card_type VARCHAR(50),
card_number VARCHAR(50),
txn_date DATE,
update_date TIMESTAMP,
create_date TIMESTAMP
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
STORED AS TEXTFILE
LOCATION '/user/edw_user/sales/staging/sales';

SELECT * FROM ext_sales LIMIT 10;

Now we need to create an intermediate table to load the transformed data from staging in order to replace the product natural keys with product surrogate keys.

CREATE TABLE IF NOT EXISTS stg_sales (
id INT,
order_number VARCHAR(50),
customer_id INT,
showroom_id INT,
product_key INT,
quantity VARCHAR(50),
discount INT,
amount INT,
net_amount INT,
delivered VARCHAR(50),
card_type VARCHAR(50),
card_number VARCHAR(50),
update_date TIMESTAMP,
create_date TIMESTAMP,
txn_date DATE
)
STORED AS ORC 
TBLPROPERTIES ("orc.compress"="SNAPPY");

Next we will define the Hive managed ORC fact table.

CREATE TABLE IF NOT EXISTS fact_sales (
id INT,
order_number VARCHAR(50),
customer_id INT,
showroom_id INT,
product_key INT,
quantity VARCHAR(50),
discount INT,
amount INT,
net_amount INT,
delivered VARCHAR(50),
card_type VARCHAR(50),
card_number VARCHAR(50),
update_date TIMESTAMP,
create_date TIMESTAMP
)
PARTITIONED BY (txn_date DATE)
STORED AS ORC 
TBLPROPERTIES ("orc.compress"="SNAPPY");

!quit

Now we will define a Pig Script to replace the Product natural key with Surrogate key and loading sales data to final fact table. This script will be used later in oozie workflow manager to schedule the load.

Initial/Delta Setup

vi /home/edw_user/sampledata/transform_sales.pig

ext_sales = LOAD 'sales_analytics.ext_sales' USING org.apache.hive.hcatalog.pig.HCatLoader();
dim_product = LOAD 'sales_analytics.dim_product' USING org.apache.hive.hcatalog.pig.HCatLoader();
active_product = FILTER dim_product BY (active_flag == 'Y');
sales_product = JOIN ext_sales BY (product_id), active_product BY (id) PARALLEL 2;
stg_sales = FOREACH sales_product GENERATE ext_sales::id AS id, order_number AS order_number, customer_id AS customer_id, showroom_id AS showroom_id, prod_key AS product_key, quantity AS quantity, discount AS discount, amount AS amount, amount-discount AS net_amount, delivered AS delivered, card_type AS card_type, card_number AS card_number, ext_sales::update_date AS update_date, ext_sales::create_date AS create_date, txn_date AS txn_date;
STORE stg_sales INTO 'sales_analytics.stg_sales' USING org.apache.hive.hcatalog.pig.HCatStorer();
quit;

:wq


hdfs dfs -put /home/edw_user/sampledata/transform_sales.pig /user/edw_user/sales/scripts

Execute the Pig Script to trigger the initial data transformation & loading.

pig -x tez -useHCatalog -f "/home/edw_user/sampledata/transform_sales.pig"

Finally we load data from staging sales table to final fact table. This script will also be used later by oozie.

vi /home/edw_user/sampledata/load_sales.hql

set hive.execution.engine=tez;
set hive.optimize.sort.dynamic.partition=true;
set hive.exec.reducers.max=31;

SELECT * FROM stg_sales LIMIT 10;

INSERT INTO TABLE fact_sales PARTITION(txn_date) 
SELECT 
id, order_number, customer_id, showroom_id, product_key, quantity, discount, amount, 
amount-discount, delivered, card_type, card_number, create_date, update_date, txn_date
FROM stg_sales DISTRIBUTE BY txn_date;

ANALYZE TABLE fact_sales PARTITION(txn_date) COMPUTE STATISTICS FOR COLUMNS;

:wq


hdfs dfs -put /home/edw_user/sampledata/load_sales.hql /user/edw_user/sales/scripts

Execute the script to trigger the initial data load.

beeline -u jdbc:hive2://sandbox-hdp.hortonworks.com:10000/sales_analytics -n edw_user -p hadoop -d org.apache.hive.jdbc.HiveDriver -f "/home/edw_user/sampledata/load_sales.hql"

Let us quickly check the data loaded.

beeline
!connect jdbc:hive2://sandbox-hdp.hortonworks.com:10000/sales_analytics edw_user
SELECT * FROM stg_sales LIMIT 10;
SELECT * FROM fact_sales LIMIT 10;
SELECT fact_sales.* FROM dim_date, fact_sales WHERE day_date=txn_date AND day_of_week_number=7;

SELECT p.make, sum(s.net_amount) as sales
FROM fact_sales s, dim_product p
WHERE s.product_key = p.prod_key
GROUP BY p.make;

!quit

Finally copy & archive the sales datafiles.

vi /home/edw_user/sampledata/archive_sales.sh

hdfs dfs -mkdir /user/edw_user/sales/archive/sales/`date +%Y%m%d`
hdfs dfs -mv /user/edw_user/sales/staging/sales/* /user/edw_user/sales/archive/sales/`date +%Y%m%d`

:wq


hdfs dfs -put /home/edw_user/sampledata/archive_sales.sh /user/edw_user/sales/scripts
sh /home/edw_user/sampledata/archive_sales.sh

exit

In the next article we will load our stocks fact table followed by some analytics in excel using Hive sales mart tables.


Have a question on this subject?

Ask questions to our expert community members and clear your doubts. Asking question or engaging in technical discussion is both easy and rewarding.

Are you on Twitter?

Start following us. This way we will always keep you updated with what's happening in Data Analytics community. We won't spam you. Promise.

  • Apache Hadoop Architecture

    In this article we will learn about the Apache Hadoop framework architecture. The basic components of the Apache Hadoop HDFS & MapReduce engine are discussed in brief.

  • Understanding Map-Reduce with Examples

    In my previous article – “Fools guide to Big Data” – we have discussed about the origin of Bigdata and the need of big data analytics. We have also noted that Big Data is data that is too large, complex and dynamic for any conventional data tools...

  • Configuring MySQL as Hive Metastore

    In the previous article, we have learnt How to Install and Configure Hive with default Derby metastore. However, an embedded derby based metastore can process only one request at a time. Since this is very restrictive, we will setup a traditional...

  • Stream Webserver Log into Hdfs using FLUME

    In this article we will use Apache Flume to gather stream access log data from our remote Web Server into Hadoop Distributed File System. We will be analyzing the access log in a real-time basis. So we have to setup Flume such that it collects the...

  • How to Setup Hadoop Multi Node Cluster - Step By Step

    Setting up Hadoop in a single machine is easy, but no fun. Why? Because Hadoop is not meant for a single machine. Hadoop is meant to run on a computing cluster comprising of many machines. Running HDFS and MapReduce on a single machine is great for...

  • SQOOP Merge & Incremental Extraction from Oracle

    Let us check how to perform Incremental Extraction & Merge using Sqoop. The SQOOP Merge utility allows to combine two datasets where entries in one dataset should overwrite entries of an older dataset. For example, an incremental import run in...

  • Hadoop DataLake Implementation Part 4

    Now that our dummy OLTP source system & Hadoop HDFS directory structure is ready, we will first load the ‘dates’ data file in HDFS and further to a hive table.

  • Oracle Installation for SQOOP Import

    We would like to perform practical test of Apache SQOOP import/export utility between ORACLE relational database & Apache HADOOP file system, let us quickly setup an ORACLE server. For that we will be using cloud based services/servers as we did...

  • Install PIG In Client Node of Hadoop Cluster

    Apache Pig is a platform for analyzing large data sets. Pig Latin is the high level programming language that, lets us specify a sequence of data transformations such as merging data sets, filtering them, grouping them, and applying functions to...

  • Hadoop DataLake Implementation Part 6

    In this article we will load the Customer data in the Hive warehouse as SCD Type 1. This time we will follow a different approach to implement Insert/Update or Merge strategy using Hive QL, rather than SQOOP Merge utility