Let us check how to perform Incremental Extraction & Merge using Sqoop. The SQOOP Merge utility allows to combine two datasets where entries in one dataset should overwrite entries of an older dataset. For example, an incremental import run in last-modified mode will generate multiple datasets in HDFS where successively newer data appears in each dataset. The merge tool will "flatten" two datasets into one, taking the newest available records for each primary key or merge key.

In this case we will use the Oracle HR Schema; We will create a copy of the existing EMPLOYEES table in the HR schema namely EMP. Then we will import the initial data using SQOOP Import. Next we will update some records in the EMP table. Then we will rerun SQOOP Import based on check-column to extract incremental data followed by merge-key option.

In this case lets add an additional column in the EMP table say LAST_UPD_DATE. Here we will maintain the data on which there is any change in the record. So below are the steps we want to simulate.

  1. Prepare the Source Data in Oracle as per our requirement.
  2. Initial Import of the EMP table into hadoop HDFS using SQOOP Import.
  3. Modify some data in our Oracle table.
  4. Incremental data Import from EMP table using SQOOP Import into hadoop HDFS using Check Column, in our case LAST_UPD_DATE; Followed by Sqoop Merge, using the Merge Key, in our case lets say EMPLOYEE_ID.
  5. Make some additional data changes in our Source table.
  6. Perform Incremental data Import from EMP same like previous import process with additional input to the extract the data based on the last extract date using Last Value, in our case the last date of Change Data that was Captured during the previous incremental Sqoop Import.

Let us first ssh login to our database server and Start SQL*Plus utility. Next prepare our Source table as we discussed above.

[root@oraxe ~]# sqlplus /nolog
SQL> connect hr/hrpass1234
SQL> CREATE TABLE emp AS SELECT * FROM employees;
SQL> ALTER TABLE emp ADD ( last_upd_date DATE );
SQL> UPDATE emp SET last_upd_date=SYSDATE-3;
SQL> COMMIT;
SQL> EXIT;

Initial Load

Next we will Login to our EdgeNode and start the Initial Sqoop import process.

root@EdgeNode:~# cd $SQOOP_HOME/sqoop_work
root@EdgeNode:/usr/local/sqoop/sqoop_work# export HADOOP_OPTS=-Djava.security.egd=file:/dev/../dev/urandom
root@EdgeNode:/usr/local/sqoop/sqoop_work# sqoop import -D mapred.child.java.opts="\-Djava.security.egd=file:/dev/../dev/urandom" \
--bindir ./ \
--connect "jdbc:oracle:thin:@10.0.100.4:1521/XE" \
--username "HR" \
--password "hrpass1234" \
--num-mappers 1 \
--as-textfile \
--target-dir /user/root/emp \
--fields-terminated-by '|' \
--lines-terminated-by '\n' \
--table EMP \
--verbose

Let us first check the sqoop import data file in HDFS.

root@EdgeNode:/usr/local/sqoop/sqoop_work# hadoop fs -tail /user/root/emp/part-m-00000

Let us now create a HIVE external table based on this EMP file in HDFS and validate the data.

root@EdgeNode:/usr/local/sqoop/sqoop_work# hive
hive> USE default;
hive> CREATE EXTERNAL TABLE emp (
employee_id INT,
first_name VARCHAR(20),
last_name VARCHAR(25),
email VARCHAR(25),
phone_number VARCHAR(20),
hire_date VARCHAR(30), 
job_id VARCHAR(10),
salary DECIMAL(8,2),
commission_pct DECIMAL(2,2),
manager_id INT,
department_id INT,
last_upd_date VARCHAR(30)
) 
ROW FORMAT DELIMITED 
FIELDS TERMINATED BY '|'
LINES TERMINATED BY '\n'
STORED AS TEXTFILE
LOCATION '/user/root/emp';

hive> SELECT COUNT(1) FROM emp;
hive> SELECT *  FROM emp WHERE employee_id in (100,101,102,103);
hive> SELECT MAX( CAST( last_upd_date AS TIMESTAMP ) ) FROM emp;

2016-09-05 06:04:27

Again lets go back to the Oracle database to modify some records in our source table in order to check the SQOOP Import for Incremental data.

[root@oraxe ~]# sqlplus /nolog
SQL> connect hr/hrpass1234
SQL> UPDATE emp SET salary=25000, last_upd_date=SYSDATE-2 WHERE employee_id=100;
SQL> UPDATE emp SET salary=18000, last_upd_date=SYSDATE-2 WHERE employee_id=101;
SQL> COMMIT;
SQL> EXIT;

Incremental Load

Next again go to the EdgeNode and execute the Sqoop import utility command as below:

root@EdgeNode:/usr/local/sqoop/sqoop_work# export HADOOP_OPTS=-Djava.security.egd=file:/dev/../dev/urandom
root@EdgeNode:/usr/local/sqoop/sqoop_work# sqoop import -D mapred.child.java.opts="\-Djava.security.egd=file:/dev/../dev/urandom" \
--bindir ./ \
--connect "jdbc:oracle:thin:@10.0.100.4:1521/XE" \
--username "HR" \
--password "hrpass1234" \
--num-mappers 1 \
--as-textfile \
--target-dir /user/root/emp \
--fields-terminated-by '|' \
--lines-terminated-by '\n' \
--table EMP \
--incremental lastmodified \
--merge-key "EMPLOYEE_ID" \
--check-column "LAST_UPD_DATE" \
--last-value "2016-09-05 06:04:27.0" \
--verbose

Once the SQOOP Import completes it provides with the below info with regards to run future incremental import on the same table.

Incremental import complete! To run another incremental import of all data following this import, supply the following arguments:
--incremental lastmodified
--check-column LAST_UPD_DATE
--last-value 2016-09-08 05:46:02.0
(Consider saving this with 'sqoop job --create')

SQOOP Job

A Job records the configuration information required to execute a Sqoop command at a later time. The SQOOP Job utility allows us to create and work with saved jobs. Saved jobs remember the parameters used to specify a job, so they can be re-executed by invoking the job by its handle. If a saved job is configured to perform an incremental import, state regarding the most recently imported rows is updated in the saved job to allow the job to continually import only the newest rows.

In order to save a Job, Sqoop uses a Metastore to save the job parameters metadata. By default the Sqoop Metastore configuration is defined in $SQOOP_HOME/conf/sqoop-site.xml file. Let us host the Sqoop metastore in our EdgeNode. Sqoop by default, does not store passwords in the metastore. If we create a job that requires a password, we will be prompted for that password each time we execute the job. In order to enable passwords in the metastore let us set the sqoop configuration property sqoop.metastore.client.record.password to true in the sqoop-site.xml config file. Running sqoop-metastore launches a shared HSQLDB database instance on the current machine. By default, job descriptions are saved to a private repository stored in $HOME/.sqoop/.

root@EdgeNode:/usr/local/sqoop/sqoop_work# sqoop-metastore

Setup the Incremental Import as a saved Job.

root@EdgeNode:/usr/local/sqoop/sqoop_work# sqoop job --list

root@EdgeNode:/usr/local/sqoop/sqoop_work# sqoop job --create job_emp_delta_merge -- import --bindir /usr/local/sqoop/sqoop_work/ --connect "jdbc:oracle:thin:@10.132.81.62:1521/XE" --username "HR" --password "hrpass1234" --num-mappers 1 --as-textfile --target-dir /user/root/emp --fields-terminated-by '|' --lines-terminated-by '\n' --table EMP --incremental lastmodified --merge-key "EMPLOYEE_ID" --check-column "LAST_UPD_DATE" --verbose

root@EdgeNode:/usr/local/sqoop/sqoop_work# sqoop job --show job_emp_delta_merge

Again lets go back to the Oracle database to modify some records in our source table in order to check the SQOOP Import for Incremental data as a Job.

[root@oraxe ~]# sqlplus /nolog
SQL> connect hr/hrpass1234
SQL> UPDATE emp SET salary=18000, last_upd_date=SYSDATE-1 WHERE employee_id=102;
SQL> UPDATE emp SET salary=20000 WHERE employee_id=103;
SQL> COMMIT;
SQL> EXIT;

Let us now execute the saved sqoop job:

root@EdgeNode:/usr/local/sqoop/sqoop_work# export HADOOP_OPTS=-Djava.security.egd=file:/dev/../dev/urandom
root@EdgeNode:/usr/local/sqoop/sqoop_work# sqoop job --exec job_emp_delta_merge

Lets validate the data import

hive> SELECT COUNT(1) FROM emp;
hive> SELECT *  FROM emp WHERE employee_id in (100,101,102,103);
hive> SELECT MAX( CAST( last_upd_date AS TIMESTAMP ) ) FROM emp;

Incremental imports are performed by comparing the values in a check column against a reference value for the most recent import. In our case, if the --incremental lastmodified argument was specified, along with --check-column LAST_UPD_DATE and --last-value 2016-09-07, all rows with last_upd_date > '2016-09-07' will be imported. If an incremental import is run from the command line, the value which should be specified as --last-value in a subsequent incremental import will be printed to the screen for our reference. If an incremental import is run from a saved job, this value will be retained in the saved job. Subsequent runs of sqoop job --exec job_emp_delta_merge will continue to import only newer rows than those previously imported.

Once we are done with data provisioning, it's time for data transformation / manipulation. We'll use Pig for this purpose.


Have a question on this subject?

Ask questions to our expert community members and clear your doubts. Asking question or engaging in technical discussion is both easy and rewarding.

Are you on Twitter?

Start following us. This way we will always keep you updated with what's happening in Data Analytics community. We won't spam you. Promise.

  • SQOOP import from Oracle

    In this article we will use Apache SQOOP to import data from Oracle database. Now that we have an oracle server in our cluster ready, let us login to EdgeNode. Next we will configure sqoop to import this data in HDFS file system followed by direct...

  • Hadoop DataLake Implementation Part 3

    To complete our implementation setup we will create the source tables based on the downloaded datafiles. Let us first load the SQL files in MySQL server under a new database called ‘sales’. We will simulate this database schema as our OLTP source...

  • Hadoop DataLake Implementation Part 8

    In this article we will load our first fact table into Hive warehouse which is sales transactions.

  • Install HBASE in Hadoop Cluster

    Apache HBase provides large-scale tabular storage for Hadoop using the Hadoop Distributed File System (HDFS). Apache HBase is an open-source, distributed, versioned, non-relational database modeled after Google's Bigtable. HBase is used in cases...

  • Install FLUME In Client Node of Hadoop Cluster

    Apache Flume is a distributed, robust, reliable, and available system for efficiently collecting, aggregating and moving large amounts of log data or streaming event data from different sources to a centralized data store. Its main goal is to...

  • Hadoop DataLake Implementation Part 7

    In this article we will load our master data table ‘Product’ as Slowly Changing Dimension of Type 2 to maintain full history, so as to analyze the sales and stocks data with reference to the historical master data.

  • Hadoop DataLake Implementation Part 6

    In this article we will load the Customer data in the Hive warehouse as SCD Type 1. This time we will follow a different approach to implement Insert/Update or Merge strategy using Hive QL, rather than SQOOP Merge utility

  • Install SPARK in Hadoop Cluster

    Apache Spark is a fast and general purpose engine for large-scale data processing over a distributed cluster. Apache Spark has an advanced DAG execution engine that supports cyclic data flow and in-memory computing. Spark run programs up to 100x...

  • Apache Hadoop Architecture

    In this article we will learn about the Apache Hadoop framework architecture. The basic components of the Apache Hadoop HDFS & MapReduce engine are discussed in brief.

  • Install Hive in Client Node of Hadoop Cluster

    In the previous article, we have shown how to setup a client node. Once this is done, now let's put Hadoop to use for some big data analytics purpose. One way to do that is by using Hive which let's us run SQL queries against the big data. A...