I have been developing an infrastructure framework that integrates the various parts in my system together, so I started building an ESB (Enterprise Service Bus) and found that MSMQ is suitable for that framework as it provides guaranteed message delivery, efficient routing, security, and priority-based messaging and enables sub-systems at different times to communicate across networks even if they are temporarily offline.
After building the core communication code and testing it against my business rules, I noticed that I needed some monitoring over the service bus to ensure that messages placed on the service bus are delivered to the specified bus subscribers. Also I need to be able be able to generate notification in case of exceptions or latency.
By using MSMQ every message sent to remote queue is placed in a temporary outgoing queue which is created on each publishing machine. These outgoing queues are used in case the remote queue is unavailable, this means that I have to check Outgoing Queues in each publisher.After investigating .Net Messaging APIs, I found that it does not provide any queue monitoring or management functionality.
M. Aamir Malik wrote a good article that describes how to access the Outgoing Message Queues. You can find the article here[http://www.codeproject.com/KB/IP/MSMQ.aspx]. Unfortunately, in his article, Malik did not mention how to get the number of unacknowledged messages in the outgoing queues.I followed his technique and tried to extend the code to allow me to implement this functionality then I got held back by the fact that Microsoft does not provide full implementation of the COM Interfaces.
After doing some research I found that there are three ways to solve this issue :
1-Using MSMQ COM Interfaces compatibility will be a problem as some functions are not found in the old versions and some other functions are not implemented from certain operating systems.
2-Using MSMQ performance counter which will not give me all the information I need.
3-Using MSMQ admin APIs [http://support.microsoft.com/kb/242471] which will give me a full control over MSMQ.
I decided to focus on the third option and downloading APIs.Unfortunately, the admin APIs are written in C++ which forces me to write a wrapper for these APIs but this will make my deployment a nightmare.So, I found that the best way is to write a safe code for that wrapper. I searched the internet and found a nice article in Microsoft magazine that bridges the gap between unmanaged code and managed code called [CLR INSIDE OUT] which you can find here [http://msdn.microsoft.com/en-us/magazine/cc164193.aspx]
I used [P/Invoke Interop Assistant] to generate the managed version of the Admin APIs by supplying [mqmgmt.h] C++ header file. After solving problems of the generated code it looked like this :
using System;
using System.Runtime.InteropServices;
namespace Messaging.Extensions
{
public static class MessageQueueExtensions
{
#region -- Constants –
/// NULL VALUE
public const byte VT_NULL = 1;
/// UNSIGNED INTEGER
public const byte VT_UI4 = 19;
/// MQ_ADMIN_ACCESS -> 0x00000080
public const int MQ_ADMIN_ACCESS = 128;
/// MSMQ_CONNECTED -> L"CONNECTED"
public const string MSMQ_CONNECTED = "CONNECTED";
/// MSMQ_DISCONNECTED -> L"DISCONNECTED"
public const string MSMQ_DISCONNECTED = "DISCONNECTED";
/// MGMT_QUEUE_TYPE_PUBLIC -> L"PUBLIC"
public const string MGMT_QUEUE_TYPE_PUBLIC = "PUBLIC";
/// MGMT_QUEUE_TYPE_PRIVATE -> L"PRIVATE"
public const string MGMT_QUEUE_TYPE_PRIVATE = "PRIVATE";
/// MGMT_QUEUE_TYPE_MACHINE -> L"MACHINE"
public const string MGMT_QUEUE_TYPE_MACHINE = "MACHINE";
/// MGMT_QUEUE_TYPE_CONNECTOR -> L"CONNECTOR"
public const string MGMT_QUEUE_TYPE_CONNECTOR = "CONNECTOR";
/// MGMT_QUEUE_STATE_LOCAL -> L"LOCAL CONNECTION"
public const string MGMT_QUEUE_STATE_LOCAL = "LOCAL CONNECTION";
/// MGMT_QUEUE_STATE_NONACTIVE -> L"INACTIVE"
public const string MGMT_QUEUE_STATE_NONACTIVE = "INACTIVE";
/// MGMT_QUEUE_STATE_WAITING -> L"WAITING"
public const string MGMT_QUEUE_STATE_WAITING = "WAITING";
/// MGMT_QUEUE_STATE_NEED_VALIDATE -> L"NEED VALIDATION"
public const string MGMT_QUEUE_STATE_NEED_VALIDATE = "NEED VALIDATION";
/// MGMT_QUEUE_STATE_ONHOLD -> L"ONHOLD"
public const string MGMT_QUEUE_STATE_ONHOLD = "ONHOLD";
/// MGMT_QUEUE_STATE_CONNECTED -> L"CONNECTED"
public const string MGMT_QUEUE_STATE_CONNECTED = "CONNECTED";
/// MGMT_QUEUE_STATE_DISCONNECTING -> L"DISCONNECTING"
public const string MGMT_QUEUE_STATE_DISCONNECTING = "DISCONNECTING";
/// MGMT_QUEUE_STATE_DISCONNECTED -> L"DISCONNECTED"
public const string MGMT_QUEUE_STATE_DISCONNECTED = "DISCONNECTED";
/// MGMT_QUEUE_LOCAL_LOCATION -> L"LOCAL"
public const string MGMT_QUEUE_LOCAL_LOCATION = "LOCAL";
/// MGMT_QUEUE_REMOTE_LOCATION -> L"REMOTE"
public const string MGMT_QUEUE_REMOTE_LOCATION = "REMOTE";
/// MGMT_QUEUE_UNKNOWN_TYPE -> L"UNKNOWN"
public const string MGMT_QUEUE_UNKNOWN_TYPE = "UNKNOWN";
/// MGMT_QUEUE_CORRECT_TYPE -> L"YES"
public const string MGMT_QUEUE_CORRECT_TYPE = "YES";
/// MGMT_QUEUE_INCORRECT_TYPE -> L"NO"
public const string MGMT_QUEUE_INCORRECT_TYPE = "NO";
/// MO_MACHINE_TOKEN -> L"MACHINE"
public const string MO_MACHINE_TOKEN = "MACHINE";
/// MO_QUEUE_TOKEN -> L"QUEUE"
public const string MO_QUEUE_TOKEN = "QUEUE";
/// MACHINE_ACTION_CONNECT -> L"CONNECT"
public const string MACHINE_ACTION_CONNECT = "CONNECT";
/// MACHINE_ACTION_DISCONNECT -> L"DISCONNECT"
public const string MACHINE_ACTION_DISCONNECT = "DISCONNECT";
/// MACHINE_ACTION_TIDY -> L"TIDY"
public const string MACHINE_ACTION_TIDY = "TIDY";
/// QUEUE_ACTION_PAUSE -> L"PAUSE"
public const string QUEUE_ACTION_PAUSE = "PAUSE";
/// QUEUE_ACTION_RESUME -> L"RESUME"
public const string QUEUE_ACTION_RESUME = "RESUME";
/// QUEUE_ACTION_EOD_RESEND -> L"EOD_RESEND"
public const string QUEUE_ACTION_EOD_RESEND = "EOD_RESEND";
#endregion
#region -- Enums --
public enum MQMGMT_MACHINE_PROPERTIES
{
/// PROPID_MGMT_MSMQ_BASE -> 0
PROPID_MGMT_MSMQ_BASE = 0,
PROPID_MGMT_MSMQ_ACTIVEQUEUES,
PROPID_MGMT_MSMQ_PRIVATEQ,
PROPID_MGMT_MSMQ_DSSERVER,
PROPID_MGMT_MSMQ_CONNECTED,
PROPID_MGMT_MSMQ_TYPE,
}
public enum MQMGMT_QUEUE_PROPERTIES
{
/// PROPID_MGMT_QUEUE_BASE -> 0
PROPID_MGMT_QUEUE_BASE = 0,
PROPID_MGMT_QUEUE_PATHNAME,
PROPID_MGMT_QUEUE_FORMATNAME,
PROPID_MGMT_QUEUE_TYPE,
PROPID_MGMT_QUEUE_LOCATION,
PROPID_MGMT_QUEUE_XACT,
PROPID_MGMT_QUEUE_FOREIGN,
PROPID_MGMT_QUEUE_MESSAGE_COUNT,
PROPID_MGMT_QUEUE_USED_QUOTA,
PROPID_MGMT_QUEUE_JOURNAL_MESSAGE_COUNT,
PROPID_MGMT_QUEUE_JOURNAL_USED_QUOTA,
PROPID_MGMT_QUEUE_STATE,
PROPID_MGMT_QUEUE_NEXTHOPS,
PROPID_MGMT_QUEUE_EOD_LAST_ACK,
PROPID_MGMT_QUEUE_EOD_LAST_ACK_TIME,
PROPID_MGMT_QUEUE_EOD_LAST_ACK_COUNT,
PROPID_MGMT_QUEUE_EOD_FIRST_NON_ACK,
PROPID_MGMT_QUEUE_EOD_LAST_NON_ACK,
PROPID_MGMT_QUEUE_EOD_NEXT_SEQ,
PROPID_MGMT_QUEUE_EOD_NO_READ_COUNT,
PROPID_MGMT_QUEUE_EOD_NO_ACK_COUNT,
PROPID_MGMT_QUEUE_EOD_RESEND_TIME,
PROPID_MGMT_QUEUE_EOD_RESEND_INTERVAL,
PROPID_MGMT_QUEUE_EOD_RESEND_COUNT,
PROPID_MGMT_QUEUE_EOD_SOURCE_INFO,
}
#endregion
#region -- Structures --
[StructLayoutAttribute(LayoutKind.Explicit)]
public struct Union
{
/// UCHAR->unsigned char
[FieldOffsetAttribute(0)]
public byte bVal;
/// SHORT->short
[FieldOffsetAttribute(0)]
public short iVal;
/// USHORT->unsigned short
[FieldOffsetAttribute(0)]
public ushort uiVal;
/// VARIANT_BOOL->short
[FieldOffsetAttribute(0)]
public short boolVal;
/// LONG->int
[FieldOffsetAttribute(0)]
public int lVal;
/// ULONG->unsigned int
[FieldOffsetAttribute(0)]
public uint ulVal;
/// ULARGE_INTEGER->_ULARGE_INTEGER
[FieldOffsetAttribute(0)]
public ULARGE_INTEGER uhVal;
/// SCODE->LONG->int
[FieldOffsetAttribute(0)]
public int scode;
/// DATE->double
[FieldOffsetAttribute(0)]
public double date;
/// CLSID*
[FieldOffsetAttribute(0)]
public System.IntPtr puuid;
/// BLOB->tagBLOB
[FieldOffsetAttribute(0)]
public tagBLOB blob;
/// LPOLESTR->OLECHAR*
[FieldOffsetAttribute(0)]
public System.IntPtr bstrVal;
/// LPSTR->CHAR*
[FieldOffsetAttribute(0)]
public System.IntPtr pszVal;
/// LPWSTR->WCHAR*
[FieldOffsetAttribute(0)]
public System.IntPtr pwszVal;
/// CAI->tagCAI
[FieldOffsetAttribute(0)]
public tagCAI cai;
/// CAUI->tagCAUI
[FieldOffsetAttribute(0)]
public tagCAUI caui;
/// CABOOL->tagCABOOL
[FieldOffsetAttribute(0)]
public tagCABOOL cabool;
/// CAL->tagCAL
[FieldOffsetAttribute(0)]
public tagCAL cal;
/// CAUL->tagCAUL
[FieldOffsetAttribute(0)]
public tagCAUL caul;
/// CACLSID->tagCACLSID
[FieldOffsetAttribute(0)]
public tagCACLSID cauuid;
/// CABSTR->tagCABSTR
[FieldOffsetAttribute(0)]
public tagCABSTR cabstr;
/// CALPWSTR->tagCALPWSTR
[FieldOffsetAttribute(0)]
public tagCALPWSTR calpwstr;
/// CAPROPVARIANT->tagCAPROPVARIANT
[FieldOffsetAttribute(0)]
public tagCAPROPVARIANT capropvar;
}
[StructLayoutAttribute(LayoutKind.Sequential)]
public struct MQPROPVARIANT
{
/// VARTYPE->unsigned short
public ushort vt;
/// WORD->unsigned short
public ushort wReserved1;
/// WORD->unsigned short
public ushort wReserved2;
/// WORD->unsigned short
public ushort wReserved3;
/// Anonymous_1506164d_aea5_43ce_9c68_e6f00748bae9
public Union Union1;
}
[StructLayoutAttribute(LayoutKind.Sequential)]
public struct MQMGMTPROPS
{
/// DWORD->unsigned int
public uint cProp;
/// MGMTPROPID*
public System.IntPtr aPropID;
/// MQPROPVARIANT*
public System.IntPtr aPropVar;
/// HRESULT*
public System.IntPtr aStatus;
}
#endregion
#region -- External Methods --
/// Return Type: HRESULT->LONG->int
///pMachineName: LPCWSTR->WCHAR*
///pObjectName: LPCWSTR->WCHAR*
///pMgmtProps: MQMGMTPROPS*
[DllImportAttribute("mqrt.dll", EntryPoint = "MQMgmtGetInfo")]
public static extern int MQMgmtGetInfo([InAttribute()] [MarshalAsAttribute(UnmanagedType.LPWStr)] string pMachineName, [InAttribute()] [MarshalAsAttribute(UnmanagedType.LPWStr)] string pObjectName, ref MQMGMTPROPS pMgmtProps);
/// Return Type: HRESULT->LONG->int
///pMachineName: LPCWSTR->WCHAR*
///pObjectName: LPCWSTR->WCHAR*
///pAction: LPCWSTR->WCHAR*
[DllImportAttribute("mqrt.dll", EntryPoint = "MQMgmtAction")]
public static extern int MQMgmtAction([InAttribute()] [MarshalAsAttribute(UnmanagedType.LPWStr)] string pMachineName, [InAttribute()] [MarshalAsAttribute(UnmanagedType.LPWStr)] string pObjectName, [InAttribute()] [MarshalAsAttribute(UnmanagedType.LPWStr)] string pAction);
/// Return Type: HRESULT->LONG->int
///hQueue: HANDLE->void*
[DllImportAttribute("mqrt.dll", EntryPoint = "MQPurgeQueue")]
public static extern int MQPurgeQueue(System.IntPtr hQueue);
#endregion
}
}
Now I have a managed code version of the MSMQ Admin APIs, so I wrote some extension methods to extend MessageQueue class as follows :
private static uint GetCount(string path, MQMGMT_QUEUE_PROPERTIES Required)
{
if (path.Contains("Formatname:"))
path = path.Replace("Formatname:", "");
MQMGMTPROPS props = new MQMGMTPROPS { cProp = 1 };
try
{
props.aPropID = Marshal.AllocHGlobal(sizeof(int));
Marshal.WriteInt32(props.aPropID, (int)Required);
props.aPropVar = Marshal.AllocHGlobal(Marshal.SizeOf(typeof(MQPROPVARIANT)));
Marshal.StructureToPtr(new MQPROPVARIANT { vt = VT_NULL }, props.aPropVar, false);
props.aStatus = Marshal.AllocHGlobal(sizeof(int));
Marshal.WriteInt32(props.aStatus, 0);
int result = MQMgmtGetInfo(null, "queue=" + path, ref props);
if (result != 0 || Marshal.ReadInt32(props.aStatus) != 0)
{
return 0;
}
MQPROPVARIANT propVar = (MQPROPVARIANT)Marshal.PtrToStructure(props.aPropVar, typeof(MQPROPVARIANT));
if (propVar.vt != VT_UI4)
{
return 0;
}
else
{
return propVar.Union1.ulVal;
}
}
finally
{
Marshal.FreeHGlobal(props.aPropID);
Marshal.FreeHGlobal(props.aPropVar);
Marshal.FreeHGlobal(props.aStatus);
}
}
public static uint GetUnacknowledgedMessages(this MessageQueue queue)
{
return GetCount(queue.Path, MQMGMT_QUEUE_PROPERTIES.PROPID_MGMT_QUEUE_EOD_NO_ACK_COUNT);
}
After investing some time, I thought that it is much better to have combination of MSMQ COM Interfaces and Admin APIs to give me more flexibility, you will find the complete source code here. [MessageQueueExtensions.zip (15.63 kb)]