It has been a long time since my last talk about using service broker for notification of data changes, let's continue what we have started. The first thing we need to do here is how to receive a message from service broker. SQL server has a special command for retrieving a message from service broker:
Declare @FetchSize int = 1 -- Number of messages to get in a single fetch process
Declare @TimeToWait int = 5000 -- Time to wait for messages to be received (for example, 5 Seconds)
WAITFOR ( RECEIVE TOP(@FetchSize) * FROM NotificationQueue), TIMEOUT @TimeToWait
As you observe from the above query that you can control the number of messages to fetch and the blocking timeout for a message to be received, if you have multiple messages submitted through one conversation handle then you can do batch receive but if one message is submitted then you can only receive one message at a time.
Now after we got our message from service broker, we want to prepare it to be audited. We supposed to transform the message body to XML format and use SQL server XML capabilities to transform the XML into SQL data.
I will create audit trace table with the following structure:
create table AuditTrace
(
AuditTraceID bigint primary key identity(1,1),
[User] nvarchar(200),
[Process] int,
[HostName] nvarchar(200),
[Time] datetime,
[TargetTable] nvarchar(200),
[Action] varchar(10),
[OldData] xml,
[NewData] xml
)
Here are the steps to fill the above table from the parsed message received from SQL server service broker:
- First of all we will we cast the message body to XML format as follows:
DECLARE @PayLoad xmlWAITFOR (RECEIVE TOP(1) @PayLoad = message_body FROM NotificationQueue), TIMEOUT 5000
- Then we prepare the message payload by letting SQL server parse the message and convert it to parsed XML document ready for consumption:
DECLARE @idoc intEXEC sp_xml_preparedocument @idoc OUTPUT, @PayLoad
- Now I will use OPENXML to open the document and start processing it by parsing the message header into SQL data format and insert the data to our audit trace table as follows:
insert into AuditTrace ([User],[Process],[HostName],[Time],[TargetTable],[Action]) SELECT * FROM OPENXML(@idoc, N'//NotificationMessage/Header')
with ([User] nvarchar(200),Process int,HostName nvarchar(200),Time datetime,TargetTable nvarchar(200),MessageType varchar(10))
- By using XQuery I will update our previously inserted row in the audit trace to amend the old and the new changed data:
Declare @Key bigint = SCOPE_IDENTITY()
Update audittrace set NewData = @PayLoad.query(N'//NotificationMessage/New'), olddata = @PayLoad.query(N'//NotificationMessage/Old')
where AuditTraceid = @Key
- The last thing to do here is to clean up the memory to close the previously opened XML document:
EXEC sp_xml_removedocument @idoc
By completing these tasks we will have a audit data in our audit trace data table like screen-shot below :
I want to consider another situation where we want to audit the changed data to another table in our database that has the same schema as the production table but is suffixed with (Audit), I could do this by using complex XML with some SQL server temp tables and some loops, instead I will use SQL CLR to achieve my needs, The common language runtime (CLR) integration feature is off by default, and must be enabled in order to use objects that are implemented using CLR integration. To enable CLR integration, use the clr enabled option of the sp_configure stored procedure:
sp_configure 'show advanced options', 1;
GO
RECONFIGURE;
GO
sp_configure 'clr enabled', 1;
GO
RECONFIGURE;
GO
To create our CLR stored procedure, open Visual Studio 2010 and create new database project, choose your preferred language (I will take C#) as you can see below:
After creating the project you will need to choose your database in which deployment will take place, then add a new stored procedure which will contain our parsing and auditing code as follows:
[Microsoft.SqlServer.Server.SqlProcedure]
public static void ParseAuditMessage(string XMLBody)
{
string MESSAGE_HEADER_NAME= "Header";
string MESSAGE_NEW_NAME = "New";
MemoryStream st = new MemoryStream();StreamWriter wr = new StreamWriter(st);
wr.Write(XMLBody);
wr.Flush();
st.Seek(0, SeekOrigin.Begin);
DataSet ds = new DataSet();
ds.ReadXml(st);
string TargetTable = ds.Tables[MESSAGE_HEADER_NAME].Rows[0]["TargetTable"].ToString();
foreach (DataRow DataRowTmp in ds.Tables[MESSAGE_NEW_NAME].Rows)
{
using (SqlConnection connection = new SqlConnection("context connection=true"))
{
StringBuilder CommandString = new StringBuilder();
StringBuilder TargetsString = new StringBuilder();
StringBuilder ParametersString = new StringBuilder();
SqlCommand tmpCommand = new SqlCommand("",connection);
CommandString.AppendFormat("INSERT INTO {0}audit ", TargetTable);
foreach (DataColumn col in DataRowTmp.Table.Columns)
{
TargetsString.AppendFormat("{0},", col.ColumnName);
ParametersString.AppendFormat("@{0},", col.ColumnName);
tmpCommand.Parameters.AddWithValue(string.Format("@{0}", col.ColumnName), DataRowTmp[col]);
}
if (TargetsString.Length > 0)
TargetsString = TargetsString.Remove(TargetsString.Length - 1, 1);
if (ParametersString.Length > 0)
ParametersString = ParametersString.Remove(ParametersString.Length - 1, 1);
CommandString.AppendFormat(" ({0}) VALUES ({1}) ", TargetsString.ToString(), ParametersString.ToString());
tmpCommand.CommandText = CommandString.ToString();
connection.Open();
tmpCommand.ExecuteNonQuery();
}
}
}
Build your project then click deploy which will deploy you assembly and creates a new stored procedure to your target database chosen earlier. Now let's move to SQL server, first we will need to create a new stored procedure that will receive notification message from server broker and passes it to our CLR procedure for auditing:
Create PROCEDURE [dbo].[NotificationQueueActiviation]
AS
Declare @FetchSize int = 1
Declare @TimeToWait int = 5000
Declare @MsgBody nvarchar(max)
BEGIN
BEGIN TRANSACTION;
WAITFOR ( RECEIVE TOP(@FetchSize) @MsgBody = message_body FROM NotificationQueue), TIMEOUT @TimeToWait
IF (@@ROWCOUNT = 0)
BEGIN
ROLLBACK TRANSACTION;
END
EXECUTE ParseAuditMessage @MsgBody
COMMIT TRANSACTION;
END
The last thing we will need to do is to alter our service broker queue to use internal activation as follows:
ALTER QUEUE NotificationQueue
WITH ACTIVATION
( STATUS = ON,
PROCEDURE_NAME = NotificationQueueActiviation,
MAX_QUEUE_READERS = 20,
EXECUTE AS SELF
);
GO
Now if we run any CRUD operation against our database then we will have our log as you can see below:
You can find the SQL server CLR project which contains a simple parsing and auditing mechanism here. AuditProc.zip (25.18 kb)
Currently rated 4.8 by 4 people
- Currently 4.75/5 Stars.
- 1
- 2
- 3
- 4
- 5