Pipelines are a component of Microsoft BizTalk Server that provides an implementation of the Pipes and Filters integration pattern. During the receiving and sending of messages, there are business reasons to perform transformations on messages to prepare them to enter or leave BizTalk Server. Pipelines enable the developer to define a series of transformations that will be performed on a message as it is being received or sent.

A common example is that you may need to compress your data before sending them to target subscriber in order to save server resources such as disk space or bandwidth, through this talk I'm going to discuss how to develop a general custom pipeline component to compress/decompress data. There are two types of pipelines, send and receive, and these match the ports in which they execute. Send pipelines are executed in send ports and in the response portion of a request/response receive port, while receive pipelines are executed in receive locations, and in the response portion of a solicit/response send port. Essentially, receive pipelines are intended to be used to transform messages that are being published to the MessageBox database, while send pipelines are intended to be used on messages which have been subscribed to and are being sent out of BizTalk Server.

 

Receive Pipeline Stages Send Pipeline Stages
   
  1. Decode: Decrypts or decodes the message data.
  2. Disassemble: Disassembles an interchange into smaller messages and parses message contents.
  3. Validate: Validates the message data, generally against a schema.
  4. Resolve Party: Identifies the BizTalk Server party associated with some security token in the message or message context.
  1. Pre-assemble: Performs any message processing necessary before assembling the message.
  2. Assemble: Assembles the message and prepares it to be transmitted by taking steps such as adding envelopes, converting XML to flat files, or other tasks complementary to the disassemble stage in a receive pipeline.
  3. Encode: Encodes or encrypts the message before delivery.

 

A general pipeline component gets one message from the BizTalk Messaging Engine, processes it, and returns it to the BizTalk Server engine, it implements the following interfaces:

  • IBaseComponent Interface: All pipeline components need to implement this interface to provide basic information about the component.
  • IComponent Interface: All pipeline components except assemblers and disassemblers implement this interface to get messages from the BizTalk Server engine for processing and to pass processed messages back to the engine.
  • IComponentUI Interface: Defines methods that enable pipeline components to be used within the Pipeline Designer environment.
  • IPersistPropertyBag Interface: Defines the methods to prepare for, load, and save the properties of pipeline components.


Now let's start developing our compression/decompression custom pipeline component:

  1. Open visual studio 2010 and create a new class library project then add a reference of BizTalk pipeline " Microsoft.BizTalk.Pipeline.dll" normally found under the "c:\Program Files\Microsoft BizTalk Server 2010\Microsoft.BizTalk.Pipeline.dll".
  2. Add new class to the project, name it "CompressionPipeline" that implements all the basic interfaces mentioned earlier, additional three class attributes will be added as follows:


                 [ComponentCategory(CategoryTypes.CATID_PipelineComponent)]
         
           [System.Runtime.InteropServices.Guid("AAD4E8D2-3900-4542-88A7-43B6D6FE9080")]
                
    [ComponentCategory(CategoryTypes.CATID_Encoder)]
             
        public class CompressionPipeline : Microsoft.BizTalk.Component.Interop.IComponent,
              
            IBaseComponent, IPersistPropertyBag, IComponentUI

    •  The "ComponentCategory" associates our component with a specific pipeline design category in the pipeline designer which tells the designer that this class represents a pipeline component.
    • The "Guid" attribute supplies an explicit unique identifier to our class for COM registery and uniquely identify our component. Use visual studio tools to generate GUID by choosing "Tools" à "Create GUID".
    • The third attribute is used to tell the pipeline designer that this component is encoding component as we are developing a send pipeline.

  3. Before implementing each interface we will add a resource file that separates resources "Strings, Pictures, Icons, …" from our actual implementation code, add a new resource file, I will add some strings to the resource file that help us implementing our interfaces as can be observed from the below screenshot:

     



     
  4. First, I will implement IBaseComponent interface, which allows other developers and biztalk administrators to get information about our compression pipeline component, I will use the resources created in the previous step as follows:

    #region IBaseComponent Members
            
    public string Description
    {
          get { return CompressionPipelineResources.CompressionPipelineDescription; }
    }
            
    public string Name
    {
         get { return CompressionPipelineResources.CompressionPipelineName; }
    }
            
    public string Version
    {
         get { return CompressionPipelineResources.CompressionPipelineVersion; }
    }
            
    #endregion

  5. We will need an icon for our component so open the resource file again and choose "Images" or "Icons" and copy the desired image from your local disk drive to the resource file. This will drive us to implement IComponentUI as observed from the following code snippet:

    #region IComponentUI Members        
    public IntPtr Icon
           
    {
               
         get { return CompressionPipelineResources.ZIP.GetHicon(); }
           
    }
            
    public System.Collections.IEnumerator Validate(object projectSystem)
           
    {
               
         
    return null;
           
    }
             
    #endregion

    As you can see from the above code that I omitted the "validate" method which verifies that all of the configuration properties are set correctly. So I configured validation to null.

  6. .Net introduced compression capabilities since version 2.0 as a part of the data streaming framework, there are several frameworks available that give better performance than the one provided by .Net but I prefer to use compression algorithms provided by .Net in order to eliminate dependencies and to simplify deployments. .Net provides two algorithms GZip and Deflate so we want our BizTalk administrators to be able to select the compression algorithm, our custom pipeline contains a string property that holds the definition of the compression algorithm. BizTalk supports the persistence of the user configuration by implementing IPersistPropertyBag interface as follows:


     #region -- Properties --       
    [Browsable(true)]
           
    [Description("Controls the compression algorithm used to encode the output stream.")]
           
    [DefaultValue("GZip")]
           
    public string CompressionType
           
    {
               
          
    get;
           set;       
    }
           
    #endregion
     
    #region IPersistPropertyBag Members              
    public
    void GetClassID(out Guid classID)
                 
    {
                      
          
    classID = new System.Guid("AAD4E8D2-3900-4542-88A7-43B6D6FE9080");
                 
    }
                  
    public void InitNew()
                 
    {
                             
    }
                  
    public
    void Load(IPropertyBag propertyBag, int errorLog)
                 
    {
                      
          
    object val=null;
                      
          
    propertyBag.Read("CompressionAlgorithm", out val,0);
                      
          
    if
    (val != null) this.CompressionType = val.ToString();
                 
    }
                  
    public void Save(IPropertyBag propertyBag, bool clearDirty, bool saveAllProperties)
                 
    {
                     
          
    object val = this.CompressionType;
                     
          
    propertyBag.Write("CompressionAlgorithm",ref val);
                 
    }
                  
    #endregion

    As you can see that "Load" and "Save" methods are used to persist our compression algorithm type, using string is not the best practice for this functionality as the BizTalk administrator or our users could mistype or misspell the algorithm type. The best way is to use enumeration and limit the user to choose from a dropdown list to avoid any mistyping, doing this is out of my scope though this talk but you can refer to Saravana Kumar's talk " Understanding Design-Time Properties for Custom Pipeline Components.doc (1.22 mb)" also you can download the full resource from here.

  7. The last step for our pipeline is to  implement IComponent interface that does the actual stream compression work:

    #region IComponent Members               
    public Microsoft.BizTalk.Message.Interop.IBaseMessage Execute(
              Microsoft.BizTalk.Component.Interop.IPipelineContext pc, Microsoft.BizTalk.Message.Interop.IBaseMessage inmsg)
           
    {
               
          if (inmsg.BodyPart != null && inmsg.BodyPart.Data != null)
                
          
    {
                   
               
    try
                   
               
    {
                       
                     
    MemoryStream ms = new MemoryStream();
                      Stream inStr = inmsg.BodyPart.GetOriginalDataStream();
                      switch (this.CompressionType)
                      {
                            case "GZip":
                                using (GZipStream zipped = new GZipStream(ms, CompressionMode.Compress, true))
                                {
                                    inStr.Seek(0, SeekOrigin.Begin);
                                    inStr.CopyTo(zipped);
                                }
                                break;
                            case "Deflate":
                                using (DeflateStream zipped = new DeflateStream(ms, CompressionMode.Compress, true))
                                {
                                    inStr.Seek(0, SeekOrigin.Begin);
                                    inStr.CopyTo(zipped);
                                }
                                break;
                        }
                        inmsg.BodyPart.Data = ms;
                        inmsg.BodyPart.Data.Seek(0, SeekOrigin.Begin);
                        return inmsg;
                    }
                    catch (Exception ex)
                    {
                        throw new BTSException(string.Format(CompressionPipelineResources.CompressionExcpetion, ex.Message));
                    }
                }
                return inmsg;
            }        
    #endregion


    One important thing here is never to close the message input stream that's why we set the value of "LeaveOpen" of compression stream to "true" in order to leave the stream open after compression.

  8. The last step is to sign our custom class library "dll" then copy it to BizTalk pipeline components folder which is normally found under the path " C:\Program Files\Microsoft BizTalk Server 2010\Pipeline Components \".

Now our custom compression pipeline component is ready to be used by pipeline designer. The next step is to create a new BizTalk project to use our custom pipeline:

  1. Create a new BizTalk project then add a new send pipeline item, the pipeline designer will be opened. From the toolbox right click then click "Choose Item", from the tab control choose "BizTalk Pipeline Components", you will find our compression custom pipeline component, choose it and drag the component from the toolbox to the "Encode" area as below:
  2.  





  3. The second step is to deploy our custom compression pipeline to our BizTalk server to be ready for use, right click on the BizTalk project and click properties then choose "Deployment" from property page and setup your target server as well as your target BizTalk application as can be seen from the below screenshot:





  4. You will have to sign the project, click "Signing" and create a new key then save your settings and close the properties, then right click on the project and choose "Deploy".
  5. Now our pipeline component can be used from BizTalk server, open BizTalk administration console then explore "BizTalk Application 1" you will find our custom compression component exists in pipelines artifacts. Create a new send port and configure it to use our compression pipeline component as below:



We can test our pipeline component by using visual studio in case of BizTalk unavailability by following the below steps:

  1. Load the custom pipeline project solution into Visual Studio.
  2. Change the output path for your solution to <Installation Folder>\Pipeline Components. In Solution Explorer, right-click your project, click the Build tab, and then change the Output Path by clicking the Browse button and selecting the <Installation Folder>\Pipeline Components directory.
  3. Change the start action for your solution. In Solution Explorer, right-click your project, click the Debug tab, click Start external program, then click … and navigate to <Installation Folder>\SDK\Utilities\PipelineTools and choose Pipeline.exe. Under Start Options, enter the command line arguments appropriate for your component for example [<Path>\YourPipeline.btp -d <Path>\YourTestFile.txt -c]
  4. Set your breakpoints if you want to debug the pipeline component.
  5. Press Ctrl + F5 to start testing our just F5 to begin debugging.

Now we have finished our compression pipeline component, you can follow the same steps to create decompression for a receiving pipeline also there is a great tool for creating custom pipeline component which adds a new pipeline project template for Visual Studio, you can find it on codeplex "BizTalk Server Pipeline Component Wizard" .
I have attached the full solution that contains both compression/decompression pipelines here [Intellecting.CustomPipelines.zip (161.46 kb)].

Currently rated 5.0 by 1 people

  • Currently 5/5 Stars.
  • 1
  • 2
  • 3
  • 4
  • 5

It has been a long time since my last talk about using service broker for notification of data changes, let's continue what we have started. The first thing we need to do here is how to receive a message from service broker. SQL server has a special command for retrieving a message from service broker:

Declare @FetchSize int = 1          -- Number of messages to get in a single fetch process
Declare @TimeToWait int = 5000      -- Time to wait for messages to be received (for example, 5 Seconds)

WAITFOR ( RECEIVE TOP(@FetchSize) * FROM NotificationQueue), TIMEOUT @TimeToWait

As you observe from the above query that you can control the number of messages to fetch and the blocking timeout for a message to be received, if you have multiple messages submitted through one conversation handle then you can do batch receive but if one message is submitted then you can only receive one message at a time.
Now after we got our message from service broker, we want to prepare it to be audited. We supposed to transform the message body to XML format and use SQL server XML capabilities to transform the XML into SQL data.
I will create audit trace table with the following structure:

create table AuditTrace
(
   AuditTraceID bigint primary key identity(1,1),
   [User] nvarchar(200),
   [Process] int,
   [HostName] nvarchar(200),
   [Time] datetime,
   [TargetTable] nvarchar(200),
   [Action] varchar(10),
   [OldData] xml,
   [NewData] xml
)

 Here are the steps to fill the above table from the parsed message received from SQL server service broker:

  1. First of all we will we cast the message body to XML format as follows:

    DECLARE @PayLoad xmlWAITFOR (RECEIVE TOP(1) @PayLoad = message_body FROM NotificationQueue), TIMEOUT 5000

  2. Then we prepare the message payload by letting SQL server parse the message and convert it to parsed XML document ready for consumption:

    DECLARE @idoc intEXEC sp_xml_preparedocument @idoc OUTPUT, @PayLoad

  3. Now I will use OPENXML to open the document and start processing it by parsing the message header into SQL data format and insert the data to our audit trace table as follows:

    insert into  AuditTrace ([User],[Process],[HostName],[Time],[TargetTable],[Action]) SELECT * FROM OPENXML(@idoc, N'//NotificationMessage/Header') 
    with ([User] nvarchar(200),Process int,HostName nvarchar(200),Time datetime,TargetTable nvarchar(200),MessageType varchar(10))

  4. By using XQuery I will update our previously inserted row in the audit trace to amend the old and the new changed data:

    Declare @Key bigint = SCOPE_IDENTITY()
    Update audittrace set NewData = @PayLoad.query(N'//NotificationMessage/New'), olddata = @PayLoad.query(N'//NotificationMessage/Old')
    where AuditTraceid = @Key

  5. The last thing to do here is to clean up the memory to close the previously opened XML document:

    EXEC sp_xml_removedocument @idoc


 By completing these tasks we will have a audit data in our audit trace data table like screen-shot below :

 

I want to consider another situation where we want to audit the changed data to another table in our database that has the same schema as the production table but is suffixed with (Audit), I could do this by using complex XML with some SQL server temp tables and some loops, instead I will use SQL CLR to achieve my needs, The common language runtime (CLR) integration feature is off by default, and must be enabled in order to use objects that are implemented using CLR integration. To enable CLR integration, use the clr enabled option of the sp_configure stored procedure:

sp_configure 'show advanced options', 1;
GO
RECONFIGURE;
GO
sp_configure 'clr enabled', 1;
GO
RECONFIGURE;
GO

To create our CLR stored procedure, open Visual Studio 2010 and create new database project, choose your preferred language (I will take C#) as you can see below:

 

After creating the project you will need to choose your database in which deployment will take place, then add a new stored procedure which will contain our parsing and auditing code as follows:

[Microsoft.SqlServer.Server.SqlProcedure]
public static void ParseAuditMessage(string XMLBody)
{
        string MESSAGE_HEADER_NAME= "Header";
        string MESSAGE_NEW_NAME = "New";
        MemoryStream st = new MemoryStream();StreamWriter wr = new StreamWriter(st);
        wr.Write(XMLBody);
        wr.Flush();
        st.Seek(0, SeekOrigin.Begin);
        DataSet ds = new DataSet();
        ds.ReadXml(st);
        string TargetTable = ds.Tables[MESSAGE_HEADER_NAME].Rows[0]["TargetTable"].ToString();
        foreach (DataRow DataRowTmp in ds.Tables[MESSAGE_NEW_NAME].Rows)
        {
            
using (SqlConnection connection = new SqlConnection("context connection=true"))
            {
                StringBuilder CommandString = new StringBuilder();
                StringBuilder TargetsString = new StringBuilder();
                StringBuilder ParametersString = new StringBuilder();
                SqlCommand tmpCommand = new SqlCommand("",connection);
               
CommandString.AppendFormat("INSERT INTO {0}audit ", TargetTable);
                foreach (DataColumn col in DataRowTmp.Table.Columns)
                {
                    TargetsString.AppendFormat("{0},", col.ColumnName);
                    ParametersString.AppendFormat("@{0},", col.ColumnName);
                    tmpCommand.Parameters.AddWithValue(string.Format("@{0}", col.ColumnName), DataRowTmp[col]);
                }
                if (TargetsString.Length > 0)
                    TargetsString = TargetsString.Remove(TargetsString.Length - 1, 1);
                if (ParametersString.Length > 0)
                    ParametersString = ParametersString.Remove(ParametersString.Length - 1, 1);
                CommandString.AppendFormat(" ({0}) VALUES ({1}) ", TargetsString.ToString(), ParametersString.ToString());
                tmpCommand.CommandText = CommandString.ToString();
                connection.Open();
                tmpCommand.ExecuteNonQuery();
            }
        }
    } 

 

Build your project then click deploy which will deploy you assembly and creates a new stored procedure to your target database chosen earlier. Now let's move to SQL server, first we will need to create a new stored procedure that will receive notification message from server broker and passes it to our CLR procedure for auditing:

Create PROCEDURE [dbo].[NotificationQueueActiviation]
AS
Declare @FetchSize int = 1
Declare @TimeToWait int = 5000
Declare @MsgBody nvarchar(max)
BEGIN
BEGIN TRANSACTION;
    WAITFOR ( RECEIVE TOP(@FetchSize) @MsgBody = message_body FROM NotificationQueue), TIMEOUT @TimeToWait
    
IF (@@ROWCOUNT = 0)
    BEGIN
      ROLLBACK TRANSACTION;
    
END
    EXECUTE ParseAuditMessage @MsgBody 
COMMIT TRANSACTION;
END

The last thing we will need to do is to alter our service broker queue to use internal activation as follows:

ALTER QUEUE NotificationQueue
    WITH ACTIVATION
    ( STATUS = ON,
      PROCEDURE_NAME = NotificationQueueActiviation,
      MAX_QUEUE_READERS = 20,
      EXECUTE AS SELF
    );
GO

 

Now if we run any CRUD operation against our database then we will have our log as you can see below:

You can find the SQL server CLR project which contains a simple parsing and auditing mechanism here. AuditProc.zip (25.18 kb) 

Currently rated 5.0 by 3 people

  • Currently 5/5 Stars.
  • 1
  • 2
  • 3
  • 4
  • 5

Service Broker helps developers build asynchronous, loosely coupled applications in which independent components work together to accomplish a task. These application components exchange messages that contain the information that is required to complete the task. Service Broker's features provide a number of significant benefits to database applications. These features and benefits include:

  • Conversations: Service broker is all about messaging, two entities can exchanges messages without caring about the low level communication infrastructure; conversation is a reliable, persistent communication channel that guarantees message delivery.
  • Message ordering and coordination: Service Broker queues are integrated into the database which means regular database maintenance and administration also included and handles the most difficult tasks involved in writing messaging applications. These difficult tasks include message coordination, reliable message delivery, locking, and starting queue readers.
  • Transactional asynchronous programming: Message delivery between applications is transactional and asynchronous; if your application transaction rolls back, all Service Broker operations in the transaction roll back. In asynchronous delivery, the Database Engine handles delivery while the application continues to run.
  • Support for loosely coupled applications: SOA is about messaging and service broker is built using messaging infrastructure so it supports loosely coupled applications. Your applications do not need to run at the same time and do not have to know the physical location or the implementation of the other participants in the conversation.

The following illustration presents a high-level view of Service Broker network communication between two SQL Server instances:

Notice that the conversation is a persistent, logical connection. The conversation can occur over any period of time, and during that period of time, the conversation can use any number of network connections.

Network connections occur between two Service Broker endpoints. These connections use TCP/IP. If the connection is inactive for a short time, SQL Server closes the network connection. To deliver a message, Service Broker holds the message in the transmission queue for the database that sent the message. The recipient delivers the message directly to the queue for the destination service. The queue for the sending service is not involved in the operation.

Let's now but service broker into practical example which is  a generic database CRUD auditing. The first thing you will need to do is to enable service broker for your database and this could be done by using the following SQL command:

alter database [DataBaseName] set enable_broker

This option activates Service Broker message delivery, preserving the existing Service Broker identifier for the database. if you have an old database that uses service broker then you will need to generate a new identifier for your service broker:

alter database [DataBaseName] set new_broker

Now it's time to create our service broker objects, primary I will create a template message type to hold the actual data payload for exchange during a certain conversation. A message type defines the name of a message and the validation that Service Broker performs on messages that have that name:

CREATE MESSAGE TYPE [//intellecting.net/Notification] VALIDATION = NONE ;

Note that the message type specifies a validation type of NONE because the message will contain data that is not well-formed XML or it might binary data.

Then I will create service broker contract which defines the name of a specific business task and list the message types used in that task. Service Broker contracts define two different service roles: the initiator and the target. The initiator of a conversation begins the conversation by sending a message to the target. The contract that the conversation uses defines which service role can send messages of a given message type:

CREATE CONTRACT [//intellecting.net/INotification] ([//intellecting.net/Notification] SENT BY INITIATOR);

The above SQL command indicates that only the initiator of the conversation can send messages of the specified message type which means that we defined a one way contract.

Now I will create a service broker Queue which stores messages, when Service Broker receives a message for a service, Service Broker inserts the message into the queue for that service. Service Broker manages queues and presents a view of a queue that is similar to a table:

CREATE QUEUE NotificationQueue WITH STATUS = ON;

A queue may be associated with a stored procedure. In this case, SQL Server activates the stored procedure when there are messages in the queue to be processed. SQL Server can start more than one instance of the stored procedure up to a configured maximum as shown below:

CREATE QUEUE AuditQueue WITH STATUS = ON,
      ACTIVATION (
            PROCEDURE_NAME = AuditMessage,
            MAX_QUEUE_READERS = 20,
            EXECUTE AS SELF )

As you can notice from the above SQL code that the activation stored procedure is defined with a maximum of 20 instance and will execute as the current user.

If your auditing database will lay on the same server with the OLTP database then it is preferable to use  internal activation with stored procedures as it is a common way to design Service Broker applications and of course your stored procedure will contain the routine needed to audit your data changes.

Finally we will need to create a service which represents a business process as a distinct set of tasks, each contract within the service represents a specific task. Each service uses a queue to store messages, messages sent to the service are delivered to the queue:

CREATE SERVICE NotificationService
       ON QUEUE NotificationQueue
      ([//intellecting.net/INotification])

If your auditing database will be on another SQL server then you will need to configure your service broker instance to use message routing, initially we will create service broker endpoint that exposes our service capabilities to the outside world:

CREATE ENDPOINT AuditBrokerEndpoint
    STATE = STARTED
    AS TCP ( LISTENER_PORT = 5000 )
    FOR SERVICE_BROKER ( AUTHENTICATION = WINDOWS ) ;

After configuring the endpoint, you must set up security between the distributed Service Broker services. By default, Service Broker doesn’t allow two Service Broker services to communicate without configured security. For transport security I will use Windows Authentication especially if the two service broker instances are registered in the same domain and  It would even work if you have a trusted relationship between two Windows domains.

You need to run your SQL server instance with a certain domain account, for example intellecting\ahmedamir, you can do this easily through Services MMC snap-in. you now have to create SQL Server logins for the SQL Server that communicates with you. Note that this login represents the service account of the remote SQL Server machine, not the service account of the instance where you’re creating the Service Broker endpoint:

CREATE LOGIN [intellecting\AuditAccount] FROM WINDOWS;

You will need to grant connect and send permission for your service as follows:

GRANT CONNECT ON ENDPOINT::AuditBrokerEndpoint TO [intellecting\AuditAccount];
GRANT SEND ON SERVICE::[NotificationService] TO PUBLIC;
 

After configuring the endpoint and service security, we need to instruct Service Broker to route our sender messages to a certain SQL server instance, so on the sender service broker instance we will configure our routing as follows:

CREATE ROUTE AuditBrokerService WITH
      SERVICE_NAME = 'AuditBrokerService',
     
ADDRESS = 'TCP://targetserver:5000'

By creating all service broker objects; our messaging infrastructure is ready for message exchange through a single SQL server instance our by using multiple SQL server instances. Now let us try to send message to our service broker, I will create the service initiator; the following stored procedure opens a conversation to our service and submit the XML message to the service queue:

CREATE PROCEDURE [dbo].[AduitMessage] @Message xml
AS
BEGIN
      BEGIN TRY
            BEGIN TRANSACTION
                  DECLARE @ch UNIQUEIDENTIFIER
              
    BEGIN DIALOG CONVERSATION @ch
                  FROM SERVICE NotificationService
                  TO SERVICE 'NotificationService'
                  ON CONTRACT [//intellecting.net/INotification] WITH ENCRYPTION = OFF;
               
   SEND ON CONVERSATION @ch MESSAGE TYPE [//intellecting.net/Notification] (@Message);
            COMMIT;
      END TRY
      BEGIN CATCH
            ROLLBACK TRANSACTION

      END CATCH
END

As can be observed from the above code that no message encryption is used and the procedure sends a message to a local service queue, if you want to send a message to another SQL server instance then you will just use the name of the service in the Service Broker Route defined earlier.

The question now is how to format your audit message with the hooked data and how to use the above stored procedure to send your audit messages to service broker. In order to hook any data changes you will need to create SQL AFTER trigger to capture data changes, you can create three trigger; for insert, Update and delete but I will consolidate them to one trigger as follows:

CREATE TRIGGER [dbo].[tr_Orders_Audit] ON [dbo].[Orders]
FOR INSERT,UPDATE,DELETE
AS
BEGIN
  DECLARE @Message XML
  
DECLARE @TableName varchar(MAX) = 'Orders'
  
DECLARE @ColumnsUpdated varbinary(MAX) = COLUMNS_UPDATED()
  
DECLARE @InsertedCount int = (SELECT COUNT(1) FROM INSERTED)
  
DECLARE @DeletedCount int = (SELECT COUNT(1) FROM DELETED)
  
DECLARE @ChangedCloumns XML = (SELECT COLUMN_NAME AS Name,DATA_TYPE as Type,
                                
sys.fn_IsBitSetInBitmask(@ColumnsUpdated,COLUMNPROPERTY(
                                
OBJECT_ID(TABLE_SCHEMA + '.' + TABLE_NAME),COLUMN_NAME, 'ColumnID')) as IsUpdate
                                 FROM INFORMATION_SCHEMA.COLUMNS Field WHERE TABLE_NAME = @TableName FOR XML RAW('Column'))

 
 IF @InsertedCount > 0 AND  @DeletedCount <= 0 -- SQL Insert
 
BEGIN
      DECLARE @MessageType varchar(MAX) = 'INSERT'
      DECLARE @NewData XML = (Select * from INSERTED FOR XML RAW('New'))
 
END

 
 IF @InsertedCount > 0 AND  @DeletedCount > 0 -- SQL Update
 
BEGIN
      SET @MessageType = 'UPDATE'     
     
SET @NewData =  (Select * from INSERTED FOR XML RAW('New'))
      DECLARE @OldData XML  = (SELECT * from DELETED FOR XML Raw('Old'))
 
END
 
 
 IF @InsertedCount <= 0 AND  @DeletedCount > 0 -- SQL Delete
 
BEGIN
      SET @MessageType = 'DELETE'
      SET @ChangedCloumns = (select Column_Name AS Name,Data_Type as Type,0  as  IsUpdate 
                             from
 INFORMATION_SCHEMA.COLUMNS where table_name=@TableName FOR XML RAW('Column'))
      SET @OldData = (SELECT * from DELETED FOR XML RAW('Old'))
 
END

 
 DECLARE @Header XML = (Select SUSER_SNAME() AS [User],@@SPID as Process, HOST_NAME() AS [HostName],GETDATE() AS [Time],
                        
@TableName AS TargetTable,@MessageType as MessageType FOR XML RAW('Header'))
  
SET @Message = (Select @Header,@ChangedCloumns,@NewData,@OldData for XML RAW('NotificationMessage'))
 
exec AduitMessage @Message
END


As can be noticed from the above code that our trigger is divided into multiple parts:

  1. In the first part, we dynamically detect the changed columns of target table by calling SQL server trigger function 'COLUMNS_UPDATED()' which in turn returns a bitmask of the changed columns according to column order, then we pass this bit mask to system function to get the names of the changed columns and finally we format the result by using XML.
  2. In the second part, we detect the trigger action [Insert, Update or Delete] by selecting the number of rows in the INSERTED & DELETED tables and according to the trigger type we format the data changes by using XML.
  3. In the third part, we collect information about the user and actions to be our message header and then we combine all the formatted data generated by the first two steps to produce the final message to be audited.
  4. In the final part, we just submit the formatted XML message to our audit procedure which in turn sends the message to service broker infrastructure.


Now I can say that we finished the first part of creating our auditing system, the remaining is how to get your XML messages from service broker (in case of distributed brokers) and how to parse them to produce our audit data. I will introduce message receiving and parsing in my next talk soon.

I attached the SQL server database that contains all the code discussed here. DATA.zip (264.13 kb)

Currently rated 5.0 by 2 people

  • Currently 5/5 Stars.
  • 1
  • 2
  • 3
  • 4
  • 5

CCR

Posted on June 17, 2010 21:06 by Ahmed Al Amir

Concurrent programs can be executed sequentially on a single processor by interleaving the execution steps of each computational process, or executed in parallel by assigning each computational process to one of a set of processors that may be close or distributed across a network. The main challenges in designing concurrent programs are ensuring the correct sequencing of the interactions or communications between different computational processes, and coordinating access to resources that are shared among processes.

We had high performance concurrent programs since decades but they mainly be used for scientific applications, In the past multithreading has been considered a powerful tool that is hard to handle .While there may be some truth to this, newer tools have made the job of the software developer much easier when creating parallel implementations of algorithms. At the same time, the necessity to use multiple threads to create performant applications has become more and more clear. Technologies like Hyperthreading and multiple core processors can only be used if processors have to schedule processing power to multiple, concurrently running processes or threads.

Moore's law states that the number of transistors that can be placed inexpensively on an integrated circuit has doubled approximately every two years as can be observed below:

 


According to this law your application is expected to operate much more faster in the future. Unfortunately there are physical reason for Moore's law to be true for the upcoming decades as ICs generated heat is increasing unexpectedly so manufactures tend to use Multi-core Multi-CPU and by doing this; developers and architects must develop and design applications that make a good use of these multi-core in order to get better performance for their applications.

Every developer knows well that developing multi-threaded application is not an easy task and sometimes it might be a nightmare. In my opinion, developers should concatenate on their domain model and the business logic; not wasting a lot of time and effort to investigate and debug multi-tasking applications.

Microsoft presented a new programming model on top of .net framework that helps us to build data-driven application for distributed and concurrent systems. CCR is a message-oriented programming model optimized for:

  • Asynchronous  Operations [Queuing].
  • Concurrent Tasks [Scheduling].
  • Operations Coordination [Arbitration].
  • Resiliency [failure handling].

The following diagram outlines the main components of CCR:


As can be observed, CCR implementation has three main categories of functionality: 

  1. The Port primitives which is FIFO queue of messages (any CLR type).
  2. The coordination primitives also called Arbiters which execute user code, often a delegate to some method, when certain conditions are met.
  3. The Dispatcher, DispatcherQueue and Task primitives. The CCR isolates the scheduling and load balancing logic from the rest of the implementation by using these classes to abstract how user tasks execute, what they contain, and on what resources they will run.

The CCR port is the most common primitive and is used as the point of interaction between any two components. Adding an item to the port is an asynchronous operation which is a fast, non-blocking operation so the Post method returns control to the caller as soon as possible, as can be noticed in the following code snippet:

// using type per port.
Port<string> port = new Port<string>();
port.Post("Hello CCR");
Console.WriteLine(port.Test());
// using multi-type per port.
PortSet<int, string, decimal, Type> portset = new PortSet<int, string, decimal, Type>();
portset.Post(5.5M);
portset.Post(1);
portset.Post("Hello CCR");
portset.Post(typeof(System.Activator));
Console.WriteLine("{0}\t{1}\t{2}\t{3}", portset.Test<int>(), portset.Test<decimal>(), portset.Test<Type>(), portset.Test<string>());

Now let's use CCR arbiter to register a simple receiver arbiter to the port. The receiver is associated with a user delegate, in this case an anonymous method defined inline:

Dispatcher dispatcher = new Dispatcher();
DispatcherQueue taskQueue = new DispatcherQueue("sample queue", dispatcher);           
Arbiter.Activate(taskQueue,port.Receive(delegate(string item)
{     
      Console.WriteLine("Received item : {0}", item);

}
));

The CCR arbiter has a very powerful feature which is the Choice mechanism, if you have a port that can accept multi-type messages then you can use the arbiter to select the appropriate handler for each message as follows:

Arbiter.Activate(taskQueue,
     Arbiter.Choice(EventPorts.port,
        (StockData stockdata) => DumpStockData(stockdata),
        (OrderData orderdata) => DumpOrderData(orderdata)
));

Now after investigating the basic functionalities of CCR, let's put this framework in some practical examples. I was working in a high performance data streaming system by using windows sockets, in most cases developers tend to bind thread per socket for continuous send and receive mechanisms (Blocking Mode) but using thread per socket will limit the scalability of your server side as most of the time your thread is blocked waiting for I/O to complete and when the number of threads increases then the thread context switching will be an overhead and this will degrade the overall performance of your server. According to that the best way is to use asynchronous calls to decrease the number of threads used by the server, we can do this easily by using CCR. First of all I will create wrapper template referring to the success/failure ports and completion callback method delegate as follows:

internal sealed class ApmResultToCcrResultFactory
{
     public static AsyncCallback Create<T>(Port<T> portResult,
                                          
Port<Exception> portException,
                                           ApmEndMethod<T> endMethod)
     {
            ApmResultToCcrResult<T> ar2cr = 
                 
new ApmResultToCcrResult<T>(portResult,portException, endMethod);
            return ar2cr.ApmToCcrResult;
     }
     public delegate T ApmEndMethod<T>(IAsyncResult ar);
     private sealed class ApmResultToCcrResult<T>
     {           
          
private ApmEndMethod<T> m_ApmEndMethod;
           private Port<T> m_portResult;
           private Port<Exception> m_portException;
           internal ApmResultToCcrResult(Port<T> portResult,
                   Port<Exception> portException, ApmEndMethod<T> endMethod)
           {
                  m_portResult = portResult;
                  m_portException = portException;
                  m_ApmEndMethod = endMethod;
           }
           internal void ApmToCcrResult(IAsyncResult ar)
           {
                  try
                  {
                       m_portResult.Post(m_ApmEndMethod(ar));
                  }
                  catch (Exception e)
                  {
                        m_portException.Post(e);
                  }
           }
        }
    }
}

 

Then I will create an extension method to handle any asynchronous method of the socket class, for example I will take the accept method:

public static void AcceptSocket(this Socket listener,
                                ref Port<Socket> portSocket,
                                ref Port<Exception> portException)
{
    
listener.BeginAccept(ApmResultToCcrResultFactory.Create(portSocket,portException,
            delegate(IAsyncResult ar) { return listener.EndAccept(ar); }), null);
}


Now we can use the simple choice mechanism of the CCR arbiter mentioned before to ensure that 1 and only 1 accept is called:

PortSet<Socket, Exception> result = listener.AcceptSocket();
Arbiter.Activate(dq, Arbiter.Choice(result,
         
(Socket AcceptedSocket) => Console.WriteLine("Socket Accpeted : {0}",AcceptedSocket.RemoteEndPoint),
          (Exception exp) => Console.WriteLine(exp.Message))
);


As you can see that CCR handles the Begin\End Asynchronous pattern easily, you can use the same concept for reading streams, web responses and any asynchronous operation but for high performance socket applications the most preferable technique is I/O completion port. When I was working on that system my manager asked me to bind our server to more than one IP address in order to guarantee our service scalability, reliability and availability, and by using  Begin\End Asynchronous pattern I will have a tough work of synchronizing multiple threads, at this time Semaphore was the best option but instead of doing that I decided to go for CCR with I/O completion port as follows :

internal static IEnumerator<ITask> AcceptSocket(Socket s)
{
     // Use this port to signal the completion of an async operation.
     var completionPort = new Port<EmptyValue>();

     using(var sae = new SocketAsyncEventArgs())
     {               
          // Set up the event handler.
          sae.Completed += (sender, args) => completionPort.Post(EmptyValue.SharedInstance);
         
while (true)
          {
              // If the operation does not complete synchronously, then yield to the
              // completion port.
              if (s.AcceptAsync(sae))
                 yield return Arbiter.Receive(false, completionPort, ignored => { });
             // IO completion port is done and actual processing goes here
             Console.WriteLine("Accpeted Socket from IP {0} with Address {1}",
                               s.LocalEndPoint,sae.AcceptSocket.RemoteEndPoint);
             // To added this socket in the accept mode again.
             ListenerSockets.Post(s);
          }
     }
}

I created iterator method to ensure the completion of the I/O as can be seen from the above code snippet then I will use CCR iterator arbiter to register for asynchronous I/O socket accept:

Arbiter.Activate(dq,Arbiter.ReceiveWithIterator<Socket>(true, ListenerSockets,
         (new IteratorHandler<Socket>(AcceptSocket))));

In order to activate our server we will just post our listener sockets as follows:

ListenerSockets.Post(listener1);
ListenerSockets.Post(listener2);


As you can see there is no threads included or any synchronization objects, of course we can use configuration to add more listeners at runtime and extend our server scalability, reliability and availability. CCR made my multi-threaded programming much more easier as I will never think about thread synchronization and complexities. I attached the source code for the above scenarios here
SocketServer.zip (77.86 kb).

In conclusion, the CCR provides some extensive prospects to express your application in terms of its fundamental data-dependencies, describing schedules of tasks that the runtime then distributes seamlessly over the available cores. This simultaneously frees you from explicitly dealing with threads and locks whilst enabling your application to take full advantage of the increasingly powerful multi-core machines that software will run on in the future.

Currently rated 5.0 by 5 people

  • Currently 5/5 Stars.
  • 1
  • 2
  • 3
  • 4
  • 5

The Upcoming C#

Posted on May 10, 2010 19:20 by Ahmed Al Amir

C# is intended to be simple, modern, general-purpose, type safe, object-oriented programming language. It is almost 10 years now since the start of C#, during this long period the language has evolved from stage to stage but before talking about the future of C#, it is always better to mention the evolution of C# during the past 10 years.

If you take a look to the below figure, you will notice the trends that shape the development of C#, in each version you will find a new powerful feature that differentiate each stage from the other:

The first version of C# [V 1.0] was concentrating on building CLR (Common Language Runtime) which is kind of process virtual machine that abstracts the developer from OS management level like memory management, Threading and Exception handling. Java was the leading programming language to handle VM since 1997. C# learned a lot from Java and Microsoft was trying to avoid all the pitfalls of Java.

Through the second version of C# [V 2.0], C# team was trying to finish all the work that could not be done in the first version, so they take the principle of STL (standard template library) from C++ and evaluated STL to be C# generics. By providing Generics, C# team completed their main target of having complete programming language then Microsoft seeks how to give C# a competitive edge among the other programming language.

In the third version of C# [V 3.0] Microsoft starts to enter the era of declarative programming and the dream of all developers to embed SQL like queries into programming language came true by introducing LINQ. LINQ is based on recursive programming using lambda calculus, Microsoft is the leading company in introducing such idea into programming languages and by doing that C# was moving from OOP (Object Oriented Programming) to AOP (Aspect Oriented Programming). I have a nice talk about LINQ foundation that you can find here.

The Upcoming version of C# [V 4.0] the team is trying to follow the trend of dynamic programming, In this version Microsoft introduces the DLR (Dynamic language runtime) that allows a dynamic type system to be shared by all languages utilizing the DLR services and Dynamic method dispatch in addition to Dynamic code generation.

In my opinion, C# is following a mix of the following programming trends:

  • Declarative Programming: The idea behind declarative programming is to concentrate on what your code should do, rather than how your code goanna accomplish it. C# 3.0 introduced declarative programming through LINQ, in LINQ you describe at a very high level what kind of query you want (Projection, Filtering, Grouping, …) but you don’t say anything about how to do it, behind the scene a different things might happen depending on your query, for example if you run the query against in-memory collection then your query eventually will be transformed to some loops and recursive functions but if you run the same query against SQL Server then your query will be translated to SQL and will be executed not even on .Net but in another environment which is SQL Server; when you wrote this query you didn’t say how you want it to be executed. C# 3.0 applies declarative programming style and gives you the tools to write more declarative APIs like extension methods and lambda expressions.

 

  • Dynamic Programming: This technique describes a class of high-level programming languages that execute at runtime many common behaviors that other languages might perform during compilation. LISP was the first dynamic programming language created in 1958, it was originally created as a practical mathematical notation for computer program, in contrast C# 3.0 is a static programming language in which everything is defined at compile time. For several decades there were wars between the two camps and C# Team thought that it would be fine to assemble some features from dynamic programming languages in order to narrow the gap between the two camps. C# 4.0 introduces the dynamic language runtime (DLR) on top of CLR to provide necessary tools for dynamic programming.

 

  • Concurrent Programming: The concurrent code can be executed sequentially on a single processor by interleaving the execution steps of each computational process, or executed in parallel by assigning each computational process to one of a set of processors that may be close or distributed across a network. The main challenges in designing concurrent code are ensuring the correct sequencing of the interactions or communications between different computational processes, and coordinating access to resources that are shared among processes. PLINQ (Parallel LINQ) is  one of the LINQ providers that allows LINQ queries to be executed on multiple cores but in my point of view C# team has much to do with concurrency as they are still at the beginning. One of the most effective frameworks to handle concurrent programming is  the CCR framework (Concurrency Coordination Runtime); it is a part of Microsoft robotics studio which can be found here.

 

After discussing the evolution of C# during the past 10 years let’s now explore the most important feature of C# 4.0. The new version of C# focuses on dynamic programming, Microsoft presented the DLR (Dynamic Language Runtime) on top of CLR:

The DLR adds a set of services to the CLR for better supporting dynamic languages. These services include the following:

  1. Expression trees: The DLR uses expression trees to represent language semantics. For this purpose, the DLR has extended LINQ expression trees to include control flow, assignment, and other language-modeling nodes.
  2. Call site caching: dynamic call site is a place in the code where you perform an operation like a + b or a.b() on dynamic objects. The DLR caches the characteristics of a and b (usually the types of these objects) and information about the operation. If such an operation has been performed previously, the DLR retrieves all the necessary information from the cache for fast dispatch.
  3. Dynamic object interoperability: The DLR provides a set of classes and interfaces that represent dynamic objects and operations and can be used by language implementers and authors of dynamic libraries.

 

C# 4.0 introduces Dynamic Lookup that allows you a unified approach to invoking things dynamically. With dynamic lookup, when you have an object in your hand you do not need to worry about whether it comes from COM, IronPython, the HTML DOM or reflection; you just apply operations to it and leave it to the runtime to figure out what exactly those operations mean for that particular object.

This affords you enormous flexibility, and can greatly simplify your code, but it does come with a significant drawback: Static typing is not maintained for these operations. A dynamic object is assumed at compile time to support any operation, and only at runtime will you get an error if it wasn’t so. Oftentimes this will be no loss, because the object wouldn’t have a static type anyway, in other cases it is a tradeoff between brevity and safety.

C# 4.0 presents the following gear for dynamic programming:

  • New Dynamic Data Type: C# 4.0 bring in a new static type called dynamic. Objects of type dynamic can do anything that will be resolved only at runtime. The following example shows the old style of dynamic method call:

        

        public static IClientChannel Use(Type endpointType, string config, string address)

        {

           Type channelType = typeof(ChannelFactory<>).MakeGenericType(endpointType);

           object factory = Activator.CreateInstance(channelType,config);

           ((ChannelFactory)factory).Endpoint.Address =new EndpointAddress(address);

           return (IClientChannel)channelType.GetMethod("CreateChannel", new Type[] { }).Invoke(factory,new object[] {});

        } 

The above code snippet shows part of my enterprise service bus (ESB) by using WCF. It creates WCF client channel at runtime depending on the WCF contract type, as you can see that it uses reflection in order to create the channel. By using C# 4.0 dynamics the above code can be simplified like the following:

        public static IClientChannel Use(Type endpointType, string config, string address)

        {

           dynamic factory = Activator.CreateInstance(typeof(ChannelFactory<>).MakeGenericType(endpointType),config);                                

           return factory.CreateChannel(new EndpointAddress(address))as IClientChannel;

        }   

As you can see that the C# compiler allows you to call a method with any name and any arguments on factory because it is of type dynamic. At runtime the actual object that factory refers to will be examined to determine what to do. For high performance system like ESB; reflection will degrade overall performance but by using dynamic you will not hurt performance as the DLR will cache your dynamic calls and the next time you will do dynamic calls it will not use reflection but it will use the cache.

  • Dynamic Operations: Dynamic objects allows you to ship dynamically method calls, field and property accesses, indexer and operator calls and delegate invocations:

 

     dynamic d = new SomeDynamicObject();
     d.ID = new Random().Next(1, 10000);
     d.ClientName = "Hana Ahmed";
     d.Balance = dummy * new Random().NextDouble();
     d.Tax = (Func<double, double>)((double Precent) => { return d.Balance * Precent/100; });

 

The C# compiler will snatch the necessary information about your dynamic object so that the DLR can pick it up and determine what the actions to be done but you must know that the result of any dynamic operation is itself of type dynamic.

  • Dynamic objects: These types of objects enable you to define which operations can be performed on dynamic objects and how to perform those operations. For example, you can define what happens when you try to get or set an object property, call a method, or perform standard mathematical operations such as addition and multiplication.

In order to define your own dynamic objects you will need to derive from a helper class called DynamicObject. If your class defines properties and also overrides the TrySetMember method, the dynamic language runtime (DLR) first uses the language binder to look for a static definition of a property in the class. If there is no such property, the DLR calls the TrySetMember method. Here is an example of dynamic object:

    public class Order : DynamicObject
    {
        Dictionary<string, object> dictionary = new Dictionary<string, object>();


        public override bool TryGetMember(GetMemberBinder binder, out object result)
        {           

            string name = binder.Name.ToLower();
            return dictionary.TryGetValue(name, out result);
        }              

        public override bool TrySetMember(SetMemberBinder binder, object value)
        {
            dictionary[binder.Name.ToLower()] = value;
           
return true;
        }

     }

 

You can find the full source code that demonstrates all the dynamic code that we discussed here. CSharp4.zip (33.73 kb)

  • DLR limitations: 

There are a few and things that might work differently than you would expect:

  • Dynamic data objects cannot be used in inter-process communication which means that they only reside at your process level. Infrastructure communication code will not be able to serialize these objects between domains.
  • The DLR allows objects to be created from objects that represent classes. However, the current implementation of C# doesn’t have syntax to support this.
  • Dynamic lookup will not be able to find extension methods. Whether extension methods apply or not depends on the static context of the call (i.e. which using clauses occur), and this context information is not currently kept as part of the payload.
  • Anonymous functions (i.e. lambda expressions) cannot appear as arguments to a dynamic method call. The compiler cannot bind (i.e. “understand”) an anonymous function without knowing what type it is converted to.

Currently rated 4.4 by 7 people

  • Currently 4.428571/5 Stars.
  • 1
  • 2
  • 3
  • 4
  • 5