Pure C# ETL (Extract Transform Load)

With regards to ETL (Extract Transform Load) the default tool to use for the job is Microsoft SQL Server Intergration Services (SSIS) hands down.  The way SSIS works is, it stores the information about the source and target in it’s metadata, all the column size, types and length are stored in it. The problem arises when data type or size changes and the metadata gets out of sync. In order to fix it, you need to open the package and refresh either the source or destination shape so it could pickup the updated metadata information.

We had this project that we need to build an ETL solution that has ff requirements:  process a complex xml file (multi-level and self-referencing structure), should support multiple version of data,  configurable in runtime and not too sensitive with the data type.  I end up building a flexible ETL solution using C#  with the help of Unity Framework, AutoMapper, LinqToSql and stored procedure for bulk insert.

Library information:

1. Unity – dependency injection framework.

2. Automapper – object auto-mapping

The processing step  is broken down into 6 parts:

1. When xml comes in it will be Deserialize into an object.

2. Using reflection loop thru the object (every properties).

3. Get the type of the object and create data processor using Unity if there’s no instance, if there’s an instance invoke the existing one.

4. Call the data processor to convert from deserialize format to the data entities using AutoMapper and add the converted instance to it’s internal collection (collection of objects).

5. After looping, instruct every data processor to commit the data.

6. Get Processing and error information.

 Implementation:

Step 1. Deserialization

1.1 First you need a schema for the xml, then use the XSD.exe tool to generate the class that can be used in deserialization process.

1.2 To deserialize an xml to the object use the following code:

/// 
/// Deserialize Xml
/// 
/// 
/// 
/// 
protected T DeserializeFile  (string path)
{
T item;
using (StreamReader sr = new StreamReader(path))
{
XmlSerializer xs = new XmlSerializer(typeof(T));
item = (T)xs.Deserialize(sr);
}
return item;
}

Step 2,3& 4. Loop thru every properties using Reflection, for every object create a data processor using Unity then instruct it to add it to it’s internal collection. Each data processor would hold a collection equal to it’s type:

To loop to every property of the object using reflection use the following code:


private DataProcessorManager mDataProcessorManager;

/// 
/// 
/// 
/// 
/// 
protected virtual void Parse(T item, string parentID)
{
if (item != null)
{
Type obj = item.GetType();
//Check if object is an array
if (item.GetType().IsArray)
{
Array array = item as Array;
//Loop every object and process it one by one
foreach (var o in array)
{
//Process the object and retrieves the reference GUID
ProcessSegment(o, parentID);

string refNo = o.GetPropertyValue("GUID");
//Check whether the object contains other properties that is an object or array of an object aside
//from primitive types or types to be skipped.
foreach (PropertyInfo p in o.GetType().GetProperties())
{
if (IsPropertyToBeProcessed(p))
{
//Get Object property
var prop = p.GetValue(o, null);
//Recursive Call
if (prop != null)
{
Parse(prop, refNo);
}
}
}
}
}
else
{
//Process the object and retrieve the GUID
ProcessSegment(item, parentID);
string refNo = item.GetPropertyValue("GUID");
//Check whether the object contains other properties that is an object or array of an object aside
//from primitive types or types to be skipped.
foreach (PropertyInfo p in obj.GetProperties())
{
if (IsPropertyToBeProcessed(p))
{
//Get Object property
var prop = p.GetValue(item, null);
//Recursive Call
if (prop != null)
{
Parse(prop, refNo);
}
}
}
}
}
}

/// 
/// 
/// 
/// 
protected virtual void ProcessSegment(object item, string refNo)
{
//Get the type of object to be processed
Type itemType = item.GetType();

//Top Level (Skip)
if (!mToBeSkippedTypeList.Exists(i=> i.Equals(itemType)))
{
//Check whether there's a data processor for that object
mDataProcessorManager.ConvertAndAdd(item, refNo);
}
}

/// 
/// Check whether this object is what we need to process, exclude
/// primitive types like Int32, String and etcs.
/// 
/// 
/// 
protected bool IsPropertyToBeProcessed(PropertyInfo propertyInfo)
{
bool toBeProcessed = true;
if (propertyInfo == null)
{
toBeProcessed = false;
return toBeProcessed;
}
//Check whether the object is not primitive int, byte etc. and not on the list of type
//to be skipped.
if (mToBeSkippedTypeList.Exists(i => i.Equals(propertyInfo.PropertyType)))
{
toBeProcessed = false;
}
return toBeProcessed;
}

To create an instance of data processor using Unity:


//Resolve Processor Using Unity
try
{
//Try to create a data processor using Unity, basically all DataProcess inherits from DataProcessorFactory
DataProcessorFactory f = mUnityContainer.Resolve(itemType.Name);
//In this method call AutoMapper for explicit mapping, those property names that doesn't match
f.SetMapper();
mProcessorCollection.Add(itemType, f);
}
catch (ResolutionFailedException unityExc)
{
//Add the itemType to unsupported object
if (!mListOfUnsupportedTypes.Contains(itemType))
{
mListOfUnsupportedTypes.Add(itemType);
}
isSuccess = false;
}

DataProcessorFactory abstract methods that each data process should implement.

It means 1 data processor = 1 object type.


/// 
/// This method is intended for converting a source object to destination.
/// This should be implemented by derived class since converting may differ per object.
/// In this method, AutoMapper is called for conversion and the converted object
/// is added to the list/collection that will be processed when Execute method is called.
/// 
/// Type of object to be converted and added
/// 
/// Reference Number of parent object in case of hierarchical processing
/// Returns true if successful
public abstract bool ConvertAndAdd(TSource item, string refNo);

/// 
/// Perform processing on objects that is/are on the collection
/// This is where data persisting occurs.
/// 
/// 
public abstract bool Execute();

/// 
/// This is where the AutoMapper configuration is set.
/// Since AutoMapper can only map using the same property name; and the identity of
/// the object is always in the GUID element I have to map the ID of the Destination to GUID of source object
/// 
public virtual void SetMapper()
{
}

To commit the data you just need to call the Execute method of every data processor instance.

C# Enterprise Library Asynchronous Logging

We all know how slow any logging mechanism is, specially when one of the trace listeners is logging to a database. By using .NET 4.0 with it’s Parallel Library (System.Threading.Task) we can easily turn our logger to use fire-forget asynchronous operation.

 Example below uses Enterprise Library 5.0.

Suppose this is your existing LogMessage method:

Just add another method LogMessageAsync with ff code:

Now to log asynchronously just call the LogMessageAsync() method.

 

 

C# Parallel – Call methods with input and return values

In .NET 4.0 they made it easy to start a task/process simultaneously. Code below shows how you can execute two methods with input and return values simultaneously thru the Tasks Factory in System.Threading.Task namespace.

Note: Add System.Threading.Tasks & System.Diagnostics in the using statement.

Code:

 

 

Output:

C# using ThreadPool for multi-threaded application

I’m currently designing an application specifically to handle multiple processing at a certain time. I’ve read all articles about threading, whether to use a background worker, custom thread management, or by using a thread pool. What facinates me is the thread pool, just by passing a workitem viola you have a working multi-threaded application. See simple example below:


using System;
using System.Threading;

public class MyProcess
{

    private ManualResetEvent _doneEvent;

    public MyProcess(ManualResetEvent doneEvent)
    {
        _doneEvent = doneEvent;
    }

    public void MyProcessThreadPoolCallback(Object threadContext)
    {
        int threadIndex = (int)threadContext;
        Console.WriteLine("thread {0} started...", threadIndex);
        StartProcess();
        Console.WriteLine("thread {0} end...", threadIndex);

    // Indicates that the process had been completed
        _doneEvent.Set();
    }

    public void StartProcess()
    {
        // TODO: Add code for processing here

    }

  
}

public class ThreadPoolExample
{
    static void Main()
    {
        const int totalCountToProcess = 10;
        
        ManualResetEvent[] doneEvents = new ManualResetEvent[totalCountToProcess];
        MyProcess[] MyProcessArray = new MyProcess[totalCountToProcess];
      
        // Configure and launch threads using ThreadPool:
        Console.WriteLine("launching tasks...");
        for (int i = 0; i < totalCountToProcess ; i++)
        {
            doneEvents[i] = new ManualResetEvent(false);
            MyProcess p = new MyProcess(doneEvents[i]);
            MyProcess[i] = p;
            ThreadPool.QueueUserWorkItem(p.MyProcessThreadPoolCallback, i);
        }

        // Wait for all threads in pool to finished processing

        WaitHandle.WaitAll(doneEvents);
        Console.WriteLine("All Process are complete.");

           }
}

 

System errorcode 53 while connecting to an existing network share drive

Recently I have encountered the famous error 53 (network path not found) while connecting to network share folder using C# WinAPI. So typically first step is to really check whether path is existing with valid credentials and yes it was there and the credential is valid. After one hour of checking an rechecking I discovered that the path (folder) was a DFS (Domain based File Share).

Resolution: Get the actual folder path from the referral list in DFS tab of the shared path

.