SSIS – Use timestamp to detect changes

Last week, a solution was required to build an SSIS solution that can pick up all changes from a database using the versionnumber (timestamp) and sync the changes to an external database using the timestamp of the database. This is quite easy to achieve by:

  1. Step 1, retrieve the previous timestamp. Storing the previous timestamp can be easily achieved by create a file using File.WriteAllText method.

 Dim runLastFile = System.IO.Path.Combine(currentPath, “LastRun.txt”)     System.IO.File.WriteAllText(runLastFile, lastRun)

I know it’s in VB but you can easily convert it to C#, since it’s an old solution i didn’t bother to convert it. If the file is not yet existing, you need to create a logic to get the lowest timestamp. Like this:

declare
@minVersion binary(10),
@minVersion_str nvarchar(42)

SET @minVersion = ( SELECT MIN(timestamp fieldName) FROM {sourceTable} )
SET @minVersion_str = upper(sys.fn_varbintohexstr(@minVersion))
SELECT @minVersion_str as LowestVersion

2. Step 2, get the current timestamp from the source database by using MIN_ACTIVE_ROWVERSION, the challenge however is to store this as string/text so the SQL query can be constructed later on in the process. After 15 minutes of googling I was able to find it somewhere (credit to the forum, lost the link).

declare
@currVersion binary(10),
@currVersion_str nvarchar(42)

SET @currVersion = MIN_ACTIVE_ROWVERSION()
SET @currVersion_str = upper(sys.fn_varbintohexstr(@currVersion))
SELECT @currVersion_str as CurrentVersion

3. Step 3, construct the SQL query using the previous timestamp. Something like this:

                         SELECT {sourceColumn}
FROM {sourceTable}
WHERE {timestamp fieldName}  >= @previousTimeStamp

4. Step 4, if the process is successful don’t forget to store the current timestamp.

5. Step 5, start from step 1.

Pure C# ETL (Extract Transform Load)

With regards to ETL (Extract Transform Load) the default tool to use for the job is Microsoft SQL Server Intergration Services (SSIS) hands down.  The way SSIS works is, it stores the information about the source and target in it’s metadata, all the column size, types and length are stored in it. The problem arises when data type or size changes and the metadata gets out of sync. In order to fix it, you need to open the package and refresh either the source or destination shape so it could pickup the updated metadata information.

We had this project that we need to build an ETL solution that has ff requirements:  process a complex xml file (multi-level and self-referencing structure), should support multiple version of data,  configurable in runtime and not too sensitive with the data type.  I end up building a flexible ETL solution using C#  with the help of Unity Framework, AutoMapper, LinqToSql and stored procedure for bulk insert.

Library information:

1. Unity – dependency injection framework.

2. Automapper – object auto-mapping

The processing step  is broken down into 6 parts:

1. When xml comes in it will be Deserialize into an object.

2. Using reflection loop thru the object (every properties).

3. Get the type of the object and create data processor using Unity if there’s no instance, if there’s an instance invoke the existing one.

4. Call the data processor to convert from deserialize format to the data entities using AutoMapper and add the converted instance to it’s internal collection (collection of objects).

5. After looping, instruct every data processor to commit the data.

6. Get Processing and error information.

 Implementation:

Step 1. Deserialization

1.1 First you need a schema for the xml, then use the XSD.exe tool to generate the class that can be used in deserialization process.

1.2 To deserialize an xml to the object use the following code:

/// 
/// Deserialize Xml
/// 
/// 
/// 
/// 
protected T DeserializeFile  (string path)
{
T item;
using (StreamReader sr = new StreamReader(path))
{
XmlSerializer xs = new XmlSerializer(typeof(T));
item = (T)xs.Deserialize(sr);
}
return item;
}

Step 2,3& 4. Loop thru every properties using Reflection, for every object create a data processor using Unity then instruct it to add it to it’s internal collection. Each data processor would hold a collection equal to it’s type:

To loop to every property of the object using reflection use the following code:


private DataProcessorManager mDataProcessorManager;

/// 
/// 
/// 
/// 
/// 
protected virtual void Parse(T item, string parentID)
{
if (item != null)
{
Type obj = item.GetType();
//Check if object is an array
if (item.GetType().IsArray)
{
Array array = item as Array;
//Loop every object and process it one by one
foreach (var o in array)
{
//Process the object and retrieves the reference GUID
ProcessSegment(o, parentID);

string refNo = o.GetPropertyValue("GUID");
//Check whether the object contains other properties that is an object or array of an object aside
//from primitive types or types to be skipped.
foreach (PropertyInfo p in o.GetType().GetProperties())
{
if (IsPropertyToBeProcessed(p))
{
//Get Object property
var prop = p.GetValue(o, null);
//Recursive Call
if (prop != null)
{
Parse(prop, refNo);
}
}
}
}
}
else
{
//Process the object and retrieve the GUID
ProcessSegment(item, parentID);
string refNo = item.GetPropertyValue("GUID");
//Check whether the object contains other properties that is an object or array of an object aside
//from primitive types or types to be skipped.
foreach (PropertyInfo p in obj.GetProperties())
{
if (IsPropertyToBeProcessed(p))
{
//Get Object property
var prop = p.GetValue(item, null);
//Recursive Call
if (prop != null)
{
Parse(prop, refNo);
}
}
}
}
}
}

/// 
/// 
/// 
/// 
protected virtual void ProcessSegment(object item, string refNo)
{
//Get the type of object to be processed
Type itemType = item.GetType();

//Top Level (Skip)
if (!mToBeSkippedTypeList.Exists(i=> i.Equals(itemType)))
{
//Check whether there's a data processor for that object
mDataProcessorManager.ConvertAndAdd(item, refNo);
}
}

/// 
/// Check whether this object is what we need to process, exclude
/// primitive types like Int32, String and etcs.
/// 
/// 
/// 
protected bool IsPropertyToBeProcessed(PropertyInfo propertyInfo)
{
bool toBeProcessed = true;
if (propertyInfo == null)
{
toBeProcessed = false;
return toBeProcessed;
}
//Check whether the object is not primitive int, byte etc. and not on the list of type
//to be skipped.
if (mToBeSkippedTypeList.Exists(i => i.Equals(propertyInfo.PropertyType)))
{
toBeProcessed = false;
}
return toBeProcessed;
}

To create an instance of data processor using Unity:


//Resolve Processor Using Unity
try
{
//Try to create a data processor using Unity, basically all DataProcess inherits from DataProcessorFactory
DataProcessorFactory f = mUnityContainer.Resolve(itemType.Name);
//In this method call AutoMapper for explicit mapping, those property names that doesn't match
f.SetMapper();
mProcessorCollection.Add(itemType, f);
}
catch (ResolutionFailedException unityExc)
{
//Add the itemType to unsupported object
if (!mListOfUnsupportedTypes.Contains(itemType))
{
mListOfUnsupportedTypes.Add(itemType);
}
isSuccess = false;
}

DataProcessorFactory abstract methods that each data process should implement.

It means 1 data processor = 1 object type.


/// 
/// This method is intended for converting a source object to destination.
/// This should be implemented by derived class since converting may differ per object.
/// In this method, AutoMapper is called for conversion and the converted object
/// is added to the list/collection that will be processed when Execute method is called.
/// 
/// Type of object to be converted and added
/// 
/// Reference Number of parent object in case of hierarchical processing
/// Returns true if successful
public abstract bool ConvertAndAdd(TSource item, string refNo);

/// 
/// Perform processing on objects that is/are on the collection
/// This is where data persisting occurs.
/// 
/// 
public abstract bool Execute();

/// 
/// This is where the AutoMapper configuration is set.
/// Since AutoMapper can only map using the same property name; and the identity of
/// the object is always in the GUID element I have to map the ID of the Destination to GUID of source object
/// 
public virtual void SetMapper()
{
}

To commit the data you just need to call the Execute method of every data processor instance.