CDC Implementation using Flatfile
This article shows how to use a flatfile to implement Change data Capture. Suppose we want to maintain the last extraction date in a flatfile, based on that value we want to capture the changed data of our business table.
First we will discuss what we are going to do followed by how we are going to do.
- Suppose we have an employee table in the source system. We want to load the Delta or Changed employee data to our target data warehouse.
- We want to maintain the last extraction date in a flatfile instead of maintaining in a DB table.
- So the Business ETL session will extract the changed data based on the last extraction date as defined in the flatfile.
How we are going to do
- We will create three sessions to accomplish our task.
- First we will create a mapping to read the flatfile maintaining the last extraction date and in turn generates a parameter file.
- Next is the Business session mapping which will extract the delta data based on the $$start_date and $$end_date parameter values as defined in the parameter file generated by the previous session.
- Last we will create a mapping to update the flatfile maintaing the last extraction date which runs only when the Business session completes successfully.
- We keep all the three sessions under one workflow.
- We have one workflow level global parameter file having the connection details.
- We have another runtime parameter file for the business session containg the extraction parameter values only.
The flatfile which maintains the extraction date is a comma separated file having three fields:
START_DATE, END_DATE, RUN_DATE.
Content of the CDC FlatFile:-
C:\Informatica\PowerCenter8.6.0\server\infa_shared\SrcFiles\FF_CDC_DT.txt
2010-10-10,2010-10-11,2010-10-11
NOTE: Date format is YYYY-MM-DD
Now we define a target definition for the Parameter file as below:
Now find the mapping which reads the extraction dates file and generates the runtime Parameter file.
In the expression transformation add the following ports:
- V_START_DATE as Variable Port
- V_END_DATE as Variable Port
- V_RUN_DATE as Variable Port
- ParamText as Output Port
Now find the expressions for all the above ports of the expression transformation.
V_START_DATE:-
IIF(
TO_DATE(END_DATE,'YYYY-MM-DD')= TRUNC(SYSDATE),
TO_DATE(START_DATE,'YYYY-MM-DD'),
TO_DATE(END_DATE,'YYYY-MM-DD')
)
V_END_DATE:-
TRUNC(SYSDATE)
V_RUN_DATE:-
TRUNC(SYSDATE)
ParamText:-
'[WorkFolder.WF:wf_runtime_param.ST:s_m_emp_cdc]'
|| chr(10) ||
'$$start_date=' || TO_CHAR(V_START_DATE,'YYYY-MM-DD')
|| chr(10) ||
'$$end_date=' || TO_CHAR(V_END_DATE,'YYYY-MM-DD')
|| chr(10)
NOTE:
- Informatica Folder Name is WorkFolder.
- Workflow Name is wf_runtime_param
- The business session name which will use this parameter file is s_m_emp_cdc.
- $$start_date and $$end_date are the Mapping Parameters that will be used by the Business session for CDC extraction and load purpose.
- chr(10) ascii equivalent of NewLine.
Next link the Output port namely ParamText of the expression transformation to the port ParamText of the Target Instance.
Now find the output generated by the mapping i.e. the runtime Parameter file ff_param_runtime.txt
[WorkFolder.WF:wf_runtime_param.ST:s_m_emp_cdc]
$$start_date=2010-10-11
$$end_date=2011-10-12
Next let us see the business mapping which extracts the delta employee information based on the extraction Mapping Parameters $$start_date and $$end_date.
Define two Parameters namely:
- $$start_date string(10)
- $$end_date string(10)
Find the Source Qualifier SQL Query.
SELECT
EMP_SRC.EMPNO, EMP_SRC.ENAME,
EMP_SRC.JOB, EMP_SRC.MGR,
EMP_SRC.HIREDATE, EMP_SRC.SAL,
EMP_SRC.COMM, EMP_SRC.DEPTNO
FROM
EMP_SRC
WHERE EMP_SRC.HIREDATE ≥ TO_DATE('$$start_date','YYYY-MM-DD')
AND EMP_SRC.HIREDATE < TO_DATE('$$end_date','YYYY-MM-DD')
Next let us see the mapping which resets the the extraction dates file, which runs only after successful execution of the business session.
In the expression transformation add the following ports:
- V_START_DATE as Variable Port
- V_END_DATE as Variable Port
- V_RUN_DATE as Variable Port
- OUT_START_DATE as Output Port
- OUT_END_DATE as Output Port
- OUT_RUN_DATE as Output Port
Now find the expressions for all the above ports of the expression transformation.
V_START_DATE:-
IIF(
TO_DATE(END_DATE,'YYYY-MM-DD') = TRUNC(SYSDATE),
TO_DATE(START_DATE,'YYYY-MM-DD'),
TO_DATE(END_DATE,'YYYY-MM-DD')
)
V_END_DATE:-
TRUNC(SYSDATE)
V_RUN_DATE:-
TRUNC(SYSDATE)
OUT_START_DATE:-
TO_CHAR(V_START_DATE,'YYYY-MM-DD')
OUT_END_DATE:-
TO_CHAR(V_END_DATE,'YYYY-MM-DD')
OUT_RUN_DATE:-
TO_CHAR(V_RUN_DATE,'YYYY-MM-DD')
Next link the Output ports of the expression transformation to the corresponding ports of the Target Instance.
Now let us look at the Workflow and sessions:
In the Workflow Properties tab set the Parameter file namely global_param.txt and in the Variables tab create a Workflow Variable namely $$var_param_file of datatype nstring.
Next for the business session s_m_emp_cdc set the Parameter Filename in the Properties tab to $$var_param_file.
Content of the Global Parameter FlatFile:-
C:\Informatica\PowerCenter8.6.0\server\infa_shared\BWParam\global_param.txt
[WorkFolder.WF:wf_runtime_param]
$DBConnection_SRC=Info_Src_Conn
$DBConnection_TGT=Info_Tgt_Conn
$PMMergeSessParamFile=TRUE
$$var_param_file=C:\Informatica\PowerCenter8.6.0\server\infa_shared
\TgtFiles\ff_param_runtime.txt
[WorkFolder.WF:wf_runtime_param.ST:s_m_get_runtime_param]
$InputFile_CDC=$PMSourceFileDir\FF_CDC_DT.txt
$OutputFile_Param=$PMTargetFileDir\ff_param_runtime.txt
[WorkFolder.WF:wf_runtime_param.ST:s_m_set_runtime_param]
$InputFile_CDC=$PMSourceFileDir\FF_CDC_DT.txt
$OutputFile_CDC=$PMSourceFileDir\FF_CDC_DT1.txt
Now there are many important points to discuss regarding the Workflow level global parameter file.
NOTE:
- $$var_param_file is set to the path of the runtime output parameter file generated by session s_m_get_runtime_param.
- Now the Business session needs to read the Parameter file ff_param_runtime.txt
- We know that Parameter file can be declared at Workflow level and also at Session level.
- The Integration Service uses the workflow level parameter file, and ignores the session level parameter file.
- Now we want the business session within the workflow wf_runtime_param having Workflow Parameter file global_param.txt to use the Session Parameter file ff_param_runtime.txt.
- $PMMergeSessParamFile=TRUE property causes the Integration Service to read both the session level and workflow level parameter files. Hence our problem is solved.
- Observe the parameters in the set session that updates the extraction date in the flat file for the next day load. Here we are reading from FF_CDC_DT.txt and loading to a different file name FF_CDC_DT1.txt.
So we need to rename the file to FF_CDC_DT.txt and delete the file FF_CDC_DT1.txt at the post succession of the session run.
Find the Post-Session Success Commands:
Copy_file:
copy $OutputFile_CDC $InputFile_CDC;
Delete_file:
del $OutputFile_CDC;
In this way we can Capture Changed Data using a Flatfile.