SQOOP Merge & Incremental Extraction from Oracle
Let us check how to perform Incremental Extraction & Merge using Sqoop. The SQOOP Merge utility allows to combine two datasets where entries in one dataset should overwrite entries of an older dataset. For example, an incremental import run in last-modified mode will generate multiple datasets in HDFS where successively newer data appears in each dataset. The merge tool will "flatten" two datasets into one, taking the newest available records for each primary key or merge key.
In this case we will use the Oracle HR Schema; We will create a copy of the existing EMPLOYEES table in the HR schema namely EMP. Then we will import the initial data using SQOOP Import. Next we will update some records in the EMP table. Then we will rerun SQOOP Import based on check-column to extract incremental data followed by merge-key option.
In this case let's add an additional column in the EMP table say LAST_UPD_DATE. Here we will maintain the data on which there is any change in the record. So below are the steps we want to simulate.
- Prepare the Source Data in Oracle as per our requirement.
- Initial Import of the EMP table into hadoop HDFS using SQOOP Import.
- Modify some data in our Oracle table.
- Incremental data Import from EMP table using SQOOP Import into hadoop HDFS using Check Column, in our case LAST_UPD_DATE; Followed by Sqoop Merge, using the Merge Key, in our case lets say EMPLOYEE_ID.
- Make some additional data changes in our Source table.
- Perform Incremental data Import from EMP same like previous import process with additional input to the extract the data based on the last extract date using Last Value, in our case the last date of Change Data that was Captured during the previous incremental Sqoop Import.
Let us first ssh login to our database server and Start SQL*Plus utility. Next prepare our Source table as we discussed above.
[root@oraxe ~]# sqlplus /nolog
SQL> connect hr/hrpass1234
SQL> CREATE TABLE emp AS SELECT * FROM employees;
SQL> ALTER TABLE emp ADD ( last_upd_date DATE );
SQL> UPDATE emp SET last_upd_date=SYSDATE-3;
SQL> COMMIT;
SQL> EXIT;
Initial Load
Next we will Login to our EdgeNode and start the Initial Sqoop import process.
root@EdgeNode:~# cd $SQOOP_HOME/sqoop_work
root@EdgeNode:/usr/local/sqoop/sqoop_work# export HADOOP_OPTS=-Djava.security.egd=file:/dev/../dev/urandom
root@EdgeNode:/usr/local/sqoop/sqoop_work# sqoop import -D mapred.child.java.opts="\-Djava.security.egd=file:/dev/../dev/urandom" \
--bindir ./ \
--connect "jdbc:oracle:thin:@10.0.100.4:1521/XE" \
--username "HR" \
--password "hrpass1234" \
--num-mappers 1 \
--as-textfile \
--target-dir /user/root/emp \
--fields-terminated-by '|' \
--lines-terminated-by '\n' \
--table EMP \
--verbose
Let us first check the sqoop import data file in HDFS.
root@EdgeNode:/usr/local/sqoop/sqoop_work# hadoop fs -tail /user/root/emp/part-m-00000
Let us now create a HIVE external table based on this EMP file in HDFS and validate the data.
root@EdgeNode:/usr/local/sqoop/sqoop_work# hive
hive> USE default;
hive> CREATE EXTERNAL TABLE emp (
employee_id INT,
first_name VARCHAR(20),
last_name VARCHAR(25),
email VARCHAR(25),
phone_number VARCHAR(20),
hire_date VARCHAR(30),
job_id VARCHAR(10),
salary DECIMAL(8,2),
commission_pct DECIMAL(2,2),
manager_id INT,
department_id INT,
last_upd_date VARCHAR(30)
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
LINES TERMINATED BY '\n'
STORED AS TEXTFILE
LOCATION '/user/root/emp';
hive> SELECT COUNT(1) FROM emp;
hive> SELECT * FROM emp WHERE employee_id in (100,101,102,103);
hive> SELECT MAX( CAST( last_upd_date AS TIMESTAMP ) ) FROM emp;
2016-09-05 06:04:27
Again let's go back to the Oracle database to modify some records in our source table in order to check the SQOOP Import for Incremental data.
[root@oraxe ~]# sqlplus /nolog
SQL> connect hr/hrpass1234
SQL> UPDATE emp SET salary=25000, last_upd_date=SYSDATE-2 WHERE employee_id=100;
SQL> UPDATE emp SET salary=18000, last_upd_date=SYSDATE-2 WHERE employee_id=101;
SQL> COMMIT;
SQL> EXIT;
Incremental Load
Next again go to the EdgeNode and execute the Sqoop import utility command as below:
root@EdgeNode:/usr/local/sqoop/sqoop_work# export HADOOP_OPTS=-Djava.security.egd=file:/dev/../dev/urandom
root@EdgeNode:/usr/local/sqoop/sqoop_work# sqoop import -D mapred.child.java.opts="\-Djava.security.egd=file:/dev/../dev/urandom" \
--bindir ./ \
--connect "jdbc:oracle:thin:@10.0.100.4:1521/XE" \
--username "HR" \
--password "hrpass1234" \
--num-mappers 1 \
--as-textfile \
--target-dir /user/root/emp \
--fields-terminated-by '|' \
--lines-terminated-by '\n' \
--table EMP \
--incremental lastmodified \
--merge-key "EMPLOYEE_ID" \
--check-column "LAST_UPD_DATE" \
--last-value "2016-09-05 06:04:27.0" \
--verbose
Once the SQOOP Import completes it provides with the below info with regards to run future incremental import on the same table.
Incremental import complete! To run another incremental import of all data following this import, supply the following arguments:
--incremental lastmodified
--check-column LAST_UPD_DATE
--last-value 2016-09-08 05:46:02.0
(Consider saving this with 'sqoop job --create')
SQOOP Job
A Job records the configuration information required to execute a Sqoop command at a later time. The SQOOP Job utility allows us to create and work with saved jobs. Saved jobs remember the parameters used to specify a job, so they can be re-executed by invoking the job by its handle. If a saved job is configured to perform an incremental import, state regarding the most recently imported rows is updated in the saved job to allow the job to continually import only the newest rows.
In order to save a Job, Sqoop uses a Metastore to save the job parameters metadata. By default the Sqoop Metastore configuration is defined in $SQOOP_HOME/conf/sqoop-site.xml file. Let us host the Sqoop metastore in our EdgeNode. Sqoop by default, does not store passwords in the metastore. If we create a job that requires a password, we will be prompted for that password each time we execute the job. In order to enable passwords in the metastore let us set the sqoop configuration property sqoop.metastore.client.record.password to true in the sqoop-site.xml config file.
Running sqoop-metastore launches a shared HSQLDB database instance on the current machine. By default, job descriptions are saved to a private repository stored in $HOME/.sqoop/.
root@EdgeNode:/usr/local/sqoop/sqoop_work# sqoop-metastore
Setup the Incremental Import as a saved Job.
root@EdgeNode:/usr/local/sqoop/sqoop_work# sqoop job --list
root@EdgeNode:/usr/local/sqoop/sqoop_work# sqoop job --create job_emp_delta_merge -- import --bindir /usr/local/sqoop/sqoop_work/ --connect "jdbc:oracle:thin:@10.132.81.62:1521/XE" --username "HR" --password "hrpass1234" --num-mappers 1 --as-textfile --target-dir /user/root/emp --fields-terminated-by '|' --lines-terminated-by '\n' --table EMP --incremental lastmodified --merge-key "EMPLOYEE_ID" --check-column "LAST_UPD_DATE" --verbose
root@EdgeNode:/usr/local/sqoop/sqoop_work# sqoop job --show job_emp_delta_merge
Again let's go back to the Oracle database to modify some records in our source table in order to check the SQOOP Import for Incremental data as a Job.
[root@oraxe ~]# sqlplus /nolog
SQL> connect hr/hrpass1234
SQL> UPDATE emp SET salary=18000, last_upd_date=SYSDATE-1 WHERE employee_id=102;
SQL> UPDATE emp SET salary=20000 WHERE employee_id=103;
SQL> COMMIT;
SQL> EXIT;
Let us now execute the saved sqoop job:
root@EdgeNode:/usr/local/sqoop/sqoop_work# export HADOOP_OPTS=-Djava.security.egd=file:/dev/../dev/urandom
root@EdgeNode:/usr/local/sqoop/sqoop_work# sqoop job --exec job_emp_delta_merge
Let's validate the data import
hive> SELECT COUNT(1) FROM emp;
hive> SELECT * FROM emp WHERE employee_id in (100,101,102,103);
hive> SELECT MAX( CAST( last_upd_date AS TIMESTAMP ) ) FROM emp;
Incremental imports are performed by comparing the values in a check column against a reference value for the most recent import. In our case, if the --incremental lastmodified argument was specified, along with --check-column LAST_UPD_DATE and --last-value 2016-09-07, all rows with last_upd_date > '2016-09-07' will be imported. If an incremental import is run from the command line, the value which should be specified as --last-value in a subsequent incremental import will be printed to the screen for our reference. If an incremental import is run from a saved job, this value will be retained in the saved job. Subsequent runs of sqoop job --exec job_emp_delta_merge will continue to import only newer rows than those previously imported.
Once we are done with data provisioning, it's time for data transformation / manipulation. We'll use Pig for this purpose.