Concurrent programs can be executed sequentially on a single processor by interleaving the execution steps of each computational process, or executed in parallel by assigning each computational process to one of a set of processors that may be close or distributed across a network. The main challenges in designing concurrent programs are ensuring the correct sequencing of the interactions or communications between different computational processes, and coordinating access to resources that are shared among processes.
We had high performance concurrent programs since decades but they mainly be used for scientific applications, In the past multithreading has been considered a powerful tool that is hard to handle .While there may be some truth to this, newer tools have made the job of the software developer much easier when creating parallel implementations of algorithms. At the same time, the necessity to use multiple threads to create performant applications has become more and more clear. Technologies like Hyperthreading and multiple core processors can only be used if processors have to schedule processing power to multiple, concurrently running processes or threads.
Moore's law states that the number of transistors that can be placed inexpensively on an integrated circuit has doubled approximately every two years as can be observed below:
According to this law your application is expected to operate much more faster in the future. Unfortunately there are physical reason for Moore's law to be true for the upcoming decades as ICs generated heat is increasing unexpectedly so manufactures tend to use Multi-core Multi-CPU and by doing this; developers and architects must develop and design applications that make a good use of these multi-core in order to get better performance for their applications.
Every developer knows well that developing multi-threaded application is not an easy task and sometimes it might be a nightmare. In my opinion, developers should concatenate on their domain model and the business logic; not wasting a lot of time and effort to investigate and debug multi-tasking applications.
Microsoft presented a new programming model on top of .net framework that helps us to build data-driven application for distributed and concurrent systems. CCR is a message-oriented programming model optimized for:
- Asynchronous Operations [Queuing].
- Concurrent Tasks [Scheduling].
- Operations Coordination [Arbitration].
- Resiliency [failure handling].
The following diagram outlines the main components of CCR:
As can be observed, CCR implementation has three main categories of functionality:
-
The Port primitives which is FIFO queue of messages (any CLR type).
-
The coordination primitives also called Arbiters which execute user code, often a delegate to some method, when certain conditions are met.
-
The Dispatcher, DispatcherQueue and Task primitives. The CCR isolates the scheduling and load balancing logic from the rest of the implementation by using these classes to abstract how user tasks execute, what they contain, and on what resources they will run.
The CCR port is the most common primitive and is used as the point of interaction between any two components. Adding an item to the port is an asynchronous operation which is a fast, non-blocking operation so the Post method returns control to the caller as soon as possible, as can be noticed in the following code snippet:
// using type per port.
Port<string> port = new Port<string>();
port.Post("Hello CCR");
Console.WriteLine(port.Test());
// using multi-type per port.
PortSet<int, string, decimal, Type> portset = new PortSet<int, string, decimal, Type>();
portset.Post(5.5M);
portset.Post(1);
portset.Post("Hello CCR");
portset.Post(typeof(System.Activator));
Console.WriteLine("{0}\t{1}\t{2}\t{3}", portset.Test<int>(), portset.Test<decimal>(), portset.Test<Type>(), portset.Test<string>());
Now let's use CCR arbiter to register a simple receiver arbiter to the port. The receiver is associated with a user delegate, in this case an anonymous method defined inline:
Dispatcher dispatcher = new Dispatcher();
DispatcherQueue taskQueue = new DispatcherQueue("sample queue", dispatcher);
Arbiter.Activate(taskQueue,port.Receive(delegate(string item)
{
Console.WriteLine("Received item : {0}", item);
}
));
The CCR arbiter has a very powerful feature which is the Choice mechanism, if you have a port that can accept multi-type messages then you can use the arbiter to select the appropriate handler for each message as follows:
Arbiter.Activate(taskQueue,
Arbiter.Choice(EventPorts.port,
(StockData stockdata) => DumpStockData(stockdata),
(OrderData orderdata) => DumpOrderData(orderdata)
));
Now after investigating the basic functionalities of CCR, let's put this framework in some practical examples. I was working in a high performance data streaming system by using windows sockets, in most cases developers tend to bind thread per socket for continuous send and receive mechanisms (Blocking Mode) but using thread per socket will limit the scalability of your server side as most of the time your thread is blocked waiting for I/O to complete and when the number of threads increases then the thread context switching will be an overhead and this will degrade the overall performance of your server. According to that the best way is to use asynchronous calls to decrease the number of threads used by the server, we can do this easily by using CCR. First of all I will create wrapper template referring to the success/failure ports and completion callback method delegate as follows:
internal sealed class ApmResultToCcrResultFactory
{
public static AsyncCallback Create<T>(Port<T> portResult,
Port<Exception> portException,
ApmEndMethod<T> endMethod)
{
ApmResultToCcrResult<T> ar2cr =
new ApmResultToCcrResult<T>(portResult,portException, endMethod);
return ar2cr.ApmToCcrResult;
}
public delegate T ApmEndMethod<T>(IAsyncResult ar);
private sealed class ApmResultToCcrResult<T>
{
private ApmEndMethod<T> m_ApmEndMethod;
private Port<T> m_portResult;
private Port<Exception> m_portException;
internal ApmResultToCcrResult(Port<T> portResult,
Port<Exception> portException, ApmEndMethod<T> endMethod)
{
m_portResult = portResult;
m_portException = portException;
m_ApmEndMethod = endMethod;
}
internal void ApmToCcrResult(IAsyncResult ar)
{
try
{
m_portResult.Post(m_ApmEndMethod(ar));
}
catch (Exception e)
{
m_portException.Post(e);
}
}
}
}
}
Then I will create an extension method to handle any asynchronous method of the socket class, for example I will take the accept method:
public static void AcceptSocket(this Socket listener,
ref Port<Socket> portSocket,
ref Port<Exception> portException)
{
listener.BeginAccept(ApmResultToCcrResultFactory.Create(portSocket,portException,
delegate(IAsyncResult ar) { return listener.EndAccept(ar); }), null);
}
Now we can use the simple choice mechanism of the CCR arbiter mentioned before to ensure that 1 and only 1 accept is called:
PortSet<Socket, Exception> result = listener.AcceptSocket();
Arbiter.Activate(dq, Arbiter.Choice(result,
(Socket AcceptedSocket) => Console.WriteLine("Socket Accpeted : {0}",AcceptedSocket.RemoteEndPoint),
(Exception exp) => Console.WriteLine(exp.Message))
);
As you can see that CCR handles the Begin\End Asynchronous pattern easily, you can use the same concept for reading streams, web responses and any asynchronous operation but for high performance socket applications the most preferable technique is I/O completion port. When I was working on that system my manager asked me to bind our server to more than one IP address in order to guarantee our service scalability, reliability and availability, and by using Begin\End Asynchronous pattern I will have a tough work of synchronizing multiple threads, at this time Semaphore was the best option but instead of doing that I decided to go for CCR with I/O completion port as follows :
internal static IEnumerator<ITask> AcceptSocket(Socket s)
{
// Use this port to signal the completion of an async operation.
var completionPort = new Port<EmptyValue>();
using(var sae = new SocketAsyncEventArgs())
{
// Set up the event handler.
sae.Completed += (sender, args) => completionPort.Post(EmptyValue.SharedInstance);
while (true)
{
// If the operation does not complete synchronously, then yield to the
// completion port.
if (s.AcceptAsync(sae))
yield return Arbiter.Receive(false, completionPort, ignored => { });
// IO completion port is done and actual processing goes here
Console.WriteLine("Accpeted Socket from IP {0} with Address {1}",
s.LocalEndPoint,sae.AcceptSocket.RemoteEndPoint);
// To added this socket in the accept mode again.
ListenerSockets.Post(s);
}
}
}
I created iterator method to ensure the completion of the I/O as can be seen from the above code snippet then I will use CCR iterator arbiter to register for asynchronous I/O socket accept:
Arbiter.Activate(dq,Arbiter.ReceiveWithIterator<Socket>(true, ListenerSockets,
(new IteratorHandler<Socket>(AcceptSocket))));
In order to activate our server we will just post our listener sockets as follows:
ListenerSockets.Post(listener1);
ListenerSockets.Post(listener2);
As you can see there is no threads included or any synchronization objects, of course we can use configuration to add more listeners at runtime and extend our server scalability, reliability and availability. CCR made my multi-threaded programming much more easier as I will never think about thread synchronization and complexities. I attached the source code for the above scenarios here SocketServer.zip (77.86 kb).
In conclusion, the CCR provides some extensive prospects to express your application in terms of its fundamental data-dependencies, describing schedules of tasks that the runtime then distributes seamlessly over the available cores. This simultaneously frees you from explicitly dealing with threads and locks whilst enabling your application to take full advantage of the increasingly powerful multi-core machines that software will run on in the future.
Currently rated 4.8 by 6 people
- Currently 4.833333/5 Stars.
- 1
- 2
- 3
- 4
- 5