This article shows how to use a flatfile to implement Change data Capture. Suppose we want to maintain the last extraction date in a flatfile, based on that value we want to capture the changed data of our business table.

First we will discuss what we are going to do followed by how we are going to do.

  • Suppose we have an employee table in the source system. We want to load the Delta or Changed employee data to our target data warehouse.
  • We want to maintain the last extraction date in a flatfile instead of maintaining in a DB table.
  • So the Business ETL session will extract the changed data based on the last extraction date as defined in the flatfile.

How we are going to do

  • We will create three sessions to accomplish our task.
  • First we will create a mapping to read the flatfile maintaining the last extraction date and in turn generates a parameter file.
  • Next is the Business session mapping which will extract the delta data based on the $$start_date and $$end_date parameter values as defined in the parameter file generated by the previous session.
  • Last we will create a mapping to update the flatfile maintaing the last extraction date which runs only when the Business session completes successfully.
  • We keep all the three sessions under one workflow.
  • We have one workflow level global parameter file having the connection details.
  • We have another runtime parameter file for the business session containg the extraction parameter values only.

The flatfile which maintains the extraction date is a comma separated file having three fields: START_DATE, END_DATE, RUN_DATE.

Content of the CDC FlatFile:-
C:\Informatica\PowerCenter8.6.0\server\infa_shared\SrcFiles\FF_CDC_DT.txt

2010-10-10,2010-10-11,2010-10-11

NOTE: Date format is YYYY-MM-DD

Source Definition of CDC Flatfile

Now we define a target definition for the Parameter file as below:

Target Definition of Parameter Flatfile

Now find the mapping which reads the extraction dates file and generates the runtime Parameter file.

Mapping to generate Parameter file

In the expression transformation add the following ports:

  • V_START_DATE as Variable Port
  • V_END_DATE as Variable Port
  • V_RUN_DATE as Variable Port
  • ParamText as Output Port

Expression Transformation- Param generation

Now find the expressions for all the above ports of the expression transformation.

V_START_DATE:-
IIF(
TO_DATE(END_DATE,'YYYY-MM-DD')= TRUNC(SYSDATE),
TO_DATE(START_DATE,'YYYY-MM-DD'), 
TO_DATE(END_DATE,'YYYY-MM-DD')
)

V_END_DATE:-
TRUNC(SYSDATE)

V_RUN_DATE:-
TRUNC(SYSDATE)

ParamText:-
'[WorkFolder.WF:wf_runtime_param.ST:s_m_emp_cdc]' 
|| chr(10) ||
'$$start_date=' || TO_CHAR(V_START_DATE,'YYYY-MM-DD')
|| chr(10) ||
'$$end_date=' || TO_CHAR(V_END_DATE,'YYYY-MM-DD') 
|| chr(10)

NOTE:

  • Informatica Folder Name is WorkFolder.
  • Workflow Name is wf_runtime_param
  • The business session name which will use this parameter file is s_m_emp_cdc.
  • $$start_date and $$end_date are the Mapping Parameters that will be used by the Business session for CDC extraction and load purpose.
  • chr(10) ascii equivalent of NewLine.

Next link the Output port namely ParamText of the expression transformation to the port ParamText of the Target Instance.

Now find the output generated by the mapping i.e. the runtime Parameter file ff_param_runtime.txt

[WorkFolder.WF:wf_runtime_param.ST:s_m_emp_cdc]
$$start_date=2010-10-11
$$end_date=2011-10-12

Next let us see the business mapping which extracts the delta employee information based on the extraction Mapping Parameters $$start_date and $$end_date.

Mapping to extract and load source changed data

Define two Parameters namely:

  • $$start_date string(10)
  • $$end_date string(10)

Find the Source Qualifier SQL Query.

Source Qualifier- Sql Query

SELECT 
EMP_SRC.EMPNO, EMP_SRC.ENAME, 
EMP_SRC.JOB, EMP_SRC.MGR, 
EMP_SRC.HIREDATE, EMP_SRC.SAL, 
EMP_SRC.COMM, EMP_SRC.DEPTNO 
FROM
EMP_SRC 
WHERE EMP_SRC.HIREDATE ≥ TO_DATE('$$start_date','YYYY-MM-DD')
AND EMP_SRC.HIREDATE < TO_DATE('$$end_date','YYYY-MM-DD')

Next let us see the mapping which resets the the extraction dates file, which runs only after successful execution of the business session.

Mapping to update Extraction Date flatfile

In the expression transformation add the following ports:

  • V_START_DATE as Variable Port
  • V_END_DATE as Variable Port
  • V_RUN_DATE as Variable Port
  • OUT_START_DATE as Output Port
  • OUT_END_DATE as Output Port
  • OUT_RUN_DATE as Output Port

Expression Transformation- CDC Flatfile

Now find the expressions for all the above ports of the expression transformation.

V_START_DATE:-
IIF(
TO_DATE(END_DATE,'YYYY-MM-DD') = TRUNC(SYSDATE),
TO_DATE(START_DATE,'YYYY-MM-DD'),
TO_DATE(END_DATE,'YYYY-MM-DD')
)

V_END_DATE:-
TRUNC(SYSDATE)

V_RUN_DATE:-
TRUNC(SYSDATE)

OUT_START_DATE:-
TO_CHAR(V_START_DATE,'YYYY-MM-DD')

OUT_END_DATE:-
TO_CHAR(V_END_DATE,'YYYY-MM-DD')

OUT_RUN_DATE:-
TO_CHAR(V_RUN_DATE,'YYYY-MM-DD')

Next link the Output ports of the expression transformation to the corresponding ports of the Target Instance.

Now let us look at the Workflow and sessions:

Workflow

In the Workflow Properties tab set the Parameter file namely global_param.txt and in the Variables tab create a Workflow Variable namely $$var_param_file of datatype nstring.

Next for the business session s_m_emp_cdc set the Parameter Filename in the Properties tab to $$var_param_file.

Business Session Parameter File

Content of the Global Parameter FlatFile:-
C:\Informatica\PowerCenter8.6.0\server\infa_shared\BWParam\global_param.txt

[WorkFolder.WF:wf_runtime_param]
$DBConnection_SRC=Info_Src_Conn
$DBConnection_TGT=Info_Tgt_Conn
$PMMergeSessParamFile=TRUE
$$var_param_file=C:\Informatica\PowerCenter8.6.0\server\infa_shared
\TgtFiles\ff_param_runtime.txt

[WorkFolder.WF:wf_runtime_param.ST:s_m_get_runtime_param]
$InputFile_CDC=$PMSourceFileDir\FF_CDC_DT.txt
$OutputFile_Param=$PMTargetFileDir\ff_param_runtime.txt


[WorkFolder.WF:wf_runtime_param.ST:s_m_set_runtime_param]
$InputFile_CDC=$PMSourceFileDir\FF_CDC_DT.txt
$OutputFile_CDC=$PMSourceFileDir\FF_CDC_DT1.txt

Now there are many important points to discuss regarding the Workflow level global parameter file.

NOTE:

  • $$var_param_file is set to the path of the runtime output parameter file generated by session s_m_get_runtime_param.
  • Now the Business session needs to read the Parameter file ff_param_runtime.txt
  • We know that Parameter file can be declared at Workflow level and also at Session level.
  • The Integration Service uses the workflow level parameter file, and ignores the session level parameter file.
  • Now we want the business session within the workflow wf_runtime_param having Workflow Parameter file global_param.txt to use the Session Parameter file ff_param_runtime.txt.
  • $PMMergeSessParamFile=TRUE property causes the Integration Service to read both the session level and workflow level parameter files. Hence our problem is solved.
  • Observe the parameters in the set session that updates the extraction date in the flat file for the next day load. Here we are reading from FF_CDC_DT.txt and loading to a different file name FF_CDC_DT1.txt. So we need to rename the file to FF_CDC_DT.txt and delete the file FF_CDC_DT1.txt at the post succession of the session run.
Find the Post-Session Success Commands:

Copy_file:
copy $OutputFile_CDC $InputFile_CDC;

Delete_file:
del $OutputFile_CDC;

In this way we can Capture Changed Data using a Flatfile.


Have a question on this subject?

Ask questions to our expert community members and clear your doubts. Asking question or engaging in technical discussion is both easy and rewarding.

Are you on Twitter?

Start following us. This way we will always keep you updated with what's happening in Data Analytics community. We won't spam you. Promise.

  • Working with Informatica Flatfiles

    In this article series we will try to cover all the possible scenarios related to flatfiles in Informatica.

  • Useful Informatica Metadata Repository Queries

    Informatica metadata repository stores and maintains information about all the objects in Informatica. They contain details of connection information, users, folders, mappings, sources, targets etc. These information can serve many purposes while...

  • Informatica Dynamic Lookup Cache

    A LookUp cache does not change its data once built. But what if the underlying table upon which lookup was done changes the data after the lookup cache is created? Is there a way so that the cache always remain up-to-date even if the underlying...

  • How to get Folders and Mapping names from Informatica Metadata Query

    We can use OPB_MAPPING and OPB_SUBJECT tables residing under informatica Repository to obtain information about all the mappings under each Informatica Folder. Following SQL query shows you how to do it.

  • Implementing SCD2 in Informatica Using ORA_HASH at Source

    In this article we shall see how we can implement SCD type2 in Informatica using ORA_HASH, which is an ORACLE function that computes hash value for a given expression. We can use this feature to find the existence of any change in any of the SCD...

  • Comparing Performance of SORT operation (Order By) in Informatica and Oracle

    In this "DWBI Concepts' Original article", we put Oracle database and Informatica PowerCentre to lock horns to prove which one of them handles data SORTing operation faster. This article gives a crucial insight to application developer in order to...

  • Loading Flatfiles delimited by comma and double quotes

    In this article let us take up a very trivial but an important aspect that we as DW developers usual face. This is related to loading flat file sources. Whenever we have flat file sources we usual ask source systems for a specific type of field...

  • All about Informatica Lookup

    A Lookup is a Passive, Connected or Unconnected Transformation used to look up data in a relational table, view, synonym or flat file. The integration service queries the lookup table to retrieve a value based on the input source value and the...

  • Implementing Informatica Partitions

    Identification and elimination of performance bottlenecks will obviously optimize session performance. After tuning all the mapping bottlenecks, we can further optimize session performance by increasing the number of pipeline partitions in the...

  • Using Informatica Stored Procedure Transformation

    Stored Procedure Transformation - as the name suggests is used to execute stored procedures through Informatica ETL. It can also be used to call functions to return calculated values. The Stored Procedures that are to be executed should be...