STLdoc
STLdocumentation
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
Public Types | Public Member Functions | Protected Member Functions | Private Member Functions | Private Attributes | List of all members
Concurrency::ordered_message_processor< _Type > Class Template Reference

An ordered_message_processor is a message_processor that allows message blocks to process messages in the order they were received. More...

#include <agents.h>

Inheritance diagram for Concurrency::ordered_message_processor< _Type >:
Concurrency::message_processor< _Type >

Public Types

typedef std::tr1::function< void(message< _Type > *)> _Handler_method
 The signature of the callback method invoked while processing messages. More...
 
typedef std::tr1::function< void(void)> _Propagator_method
 The signature of the callback method invoked while propagating messages. More...
 
typedef _Type type
 A type alias for _Type . More...
 
- Public Types inherited from Concurrency::message_processor< _Type >
typedef _Type type
 A type alias for _Type . More...
 

Public Member Functions

 ordered_message_processor ()
 Constructs an ordered_message_processor object. More...
 
virtual ~ordered_message_processor ()
 Destroys the ordered_message_processor object. More...
 
void initialize (_Inout_opt_ Scheduler *_PScheduler, _Inout_opt_ ScheduleGroup *_PScheduleGroup, _Handler_method const &_Handler)
 Initializes the ordered_message_processor object with the appropriate callback function, scheduler and schedule group. More...
 
virtual void initialize_batched_processing (_Handler_method const &_Processor, _Propagator_method const &_Propagator)
 Initialize batched message processing More...
 
virtual void sync_send (_Inout_opt_ message< _Type > *_Msg)
 Synchronously queues up messages and starts a processing task, if this has not been done already. More...
 
virtual void async_send (_Inout_opt_ message< _Type > *_Msg)
 Asynchronously queues up messages and starts a processing task, if this has not been done already. More...
 
virtual void wait ()
 A processor-specific spin wait used in destructors of message blocks to make sure that all asynchronous processing tasks have time to finish before destroying the block. More...
 

Protected Member Functions

virtual void process_incoming_message ()
 The processing function that is called asynchronously. It dequeues messages and begins processing them. More...
 

Private Member Functions

void _Clear_queued_messages ()
 
void _Sync_send_helper (message< _Type > *_Msg)
 
long _Process_message_helper ()
 
void _Invoke_handler (long _Count)
 
void _Invoke_handler (message< _Type > *_Msg)
 

Private Attributes

concurrent_queue< message< _Type > * > _M_queuedMessages
 A queue of the messages More...
 
::Concurrency::details::_NonReentrantPPLLock _M_asyncSendLock
 A lock to use for queueing incoming messages. More...
 
volatile long _M_queuedDataCount
 A count of the current number of messages to process. Used as a flag to see if a new process message task needs to be created. More...
 
Scheduler * _M_pScheduler
 The scheduler to process messages on More...
 
ScheduleGroup * _M_pScheduleGroup
 The schedule group to process messages on More...
 
volatile long _M_stopProcessing
 A flag set in the destructor of a block to cease processing of new messages. This is required to guarantee that _M_queuedDataCount will get to 0 eventually. More...
 
volatile long _M_lwtCount
 A counter to indicate the number of outstanding LWTs More...
 
_Handler_method _M_handler
 A message handler object which exposes the callback to be invoked More...
 
_Handler_method _M_processor
 A message processing object which exposes the callback to be invoked More...
 
_Propagator_method _M_propagator
 A message propagating object which exposes the callback to be invoked More...
 

Additional Inherited Members

- Static Protected Member Functions inherited from Concurrency::message_processor< _Type >
static void __cdecl _Process_incoming_message_wrapper (void *_Data)
 Wrapper for process_incoming_message suitable for use as a argument to CreateThread and other similar methods. More...
 

Detailed Description

template<class _Type>
class Concurrency::ordered_message_processor< _Type >

An ordered_message_processor is a message_processor that allows message blocks to process messages in the order they were received.

Template Parameters
_TypeThe payload type of messages handled by the processor.

Member Typedef Documentation

template<class _Type >
typedef std::tr1::function<void(message<_Type> *)> Concurrency::ordered_message_processor< _Type >::_Handler_method

The signature of the callback method invoked while processing messages.

template<class _Type >
typedef std::tr1::function<void(void)> Concurrency::ordered_message_processor< _Type >::_Propagator_method

The signature of the callback method invoked while propagating messages.

template<class _Type >
typedef _Type Concurrency::ordered_message_processor< _Type >::type

A type alias for _Type .

Constructor & Destructor Documentation

Constructs an ordered_message_processor object.

This ordered_message_processor will not schedule asynchronous or synchronous handlers until the initialize function is called.

2037  :
2038  _M_queuedDataCount(0),
2039  _M_stopProcessing(1),
2040  _M_lwtCount(0),
2043  _M_handler(nullptr),
2044  _M_processor(nullptr),
2045  _M_propagator(nullptr)
2046  {
2047  }
volatile long _M_lwtCount
A counter to indicate the number of outstanding LWTs
Definition: agents.h:2415
_Propagator_method _M_propagator
A message propagating object which exposes the callback to be invoked
Definition: agents.h:2433
volatile long _M_stopProcessing
A flag set in the destructor of a block to cease processing of new messages. This is required to guar...
Definition: agents.h:2409
#define NULL
Definition: crtdbg.h:30
volatile long _M_queuedDataCount
A count of the current number of messages to process. Used as a flag to see if a new process message ...
Definition: agents.h:2390
ScheduleGroup * _M_pScheduleGroup
The schedule group to process messages on
Definition: agents.h:2402
_Handler_method _M_handler
A message handler object which exposes the callback to be invoked
Definition: agents.h:2421
_Handler_method _M_processor
A message processing object which exposes the callback to be invoked
Definition: agents.h:2427
Scheduler * _M_pScheduler
The scheduler to process messages on
Definition: agents.h:2396
template<class _Type >
virtual Concurrency::ordered_message_processor< _Type >::~ordered_message_processor ( )
inlinevirtual

Destroys the ordered_message_processor object.

Waits for all outstanding asynchronous operations before destroying the processor.

2057  {
2058  wait();
2059  }
virtual void wait()
A processor-specific spin wait used in destructors of message blocks to make sure that all asynchrono...
Definition: agents.h:2188

Member Function Documentation

template<class _Type >
void Concurrency::ordered_message_processor< _Type >::_Clear_queued_messages ( )
inlineprivate
2234  {
2235  message<_Type> * _Msg = NULL;
2236  while (_M_queuedMessages.try_pop(_Msg))
2237  {
2238  delete _Msg;
2239  }
2240  }
concurrent_queue< message< _Type > * > _M_queuedMessages
A queue of the messages
Definition: agents.h:2377
#define NULL
Definition: crtdbg.h:30
template<class _Type >
void Concurrency::ordered_message_processor< _Type >::_Invoke_handler ( long  _Count)
inlineprivate
2320  {
2321  // Process _Count number of messages
2322  for(int _I = 0; _I < _Count; _I++)
2323  {
2324  message<_Type> * _Msg = NULL;
2325  _M_queuedMessages.try_pop(_Msg);
2326  if (_M_processor == NULL)
2327  {
2328  // If a processor function does not exist, the message processor is using single
2329  // message processing rather than batched processing. There should also be no
2330  // propagator function defined in this case.
2332  _M_handler(_Msg);
2333  }
2334  else
2335  {
2336  // Use the batched message processing function
2337  _M_processor(_Msg);
2338  }
2339  }
2340 
2341  // Call the handler which propagates the message(s)
2342  if (_M_propagator != NULL)
2343  {
2344  _M_propagator();
2345  }
2346  }
_Propagator_method _M_propagator
A message propagating object which exposes the callback to be invoked
Definition: agents.h:2433
#define _CONCRT_ASSERT(x)
Definition: concrt.h:137
concurrent_queue< message< _Type > * > _M_queuedMessages
A queue of the messages
Definition: agents.h:2377
#define NULL
Definition: crtdbg.h:30
_Handler_method _M_handler
A message handler object which exposes the callback to be invoked
Definition: agents.h:2421
_Diff _Count
Definition: algorithm:1941
_Handler_method _M_processor
A message processing object which exposes the callback to be invoked
Definition: agents.h:2427
template<class _Type >
void Concurrency::ordered_message_processor< _Type >::_Invoke_handler ( message< _Type > *  _Msg)
inlineprivate
2350  {
2351  if (_M_processor == NULL)
2352  {
2353  // If a processor function does not exist, the message processor is using single
2354  // message processing rather than batched processing. There should also be no
2355  // propagator function defined in this case.
2357  _M_handler(_Msg);
2358  }
2359  else
2360  {
2361  // Use the batched message processing function
2362  _M_processor(_Msg);
2363 
2364  // Call the handler which propagates the message(s)
2365  if (_M_propagator != NULL)
2366  {
2367  _M_propagator();
2368  }
2369  }
2370  }
_Propagator_method _M_propagator
A message propagating object which exposes the callback to be invoked
Definition: agents.h:2433
#define _CONCRT_ASSERT(x)
Definition: concrt.h:137
#define NULL
Definition: crtdbg.h:30
_Handler_method _M_handler
A message handler object which exposes the callback to be invoked
Definition: agents.h:2421
_Handler_method _M_processor
A message processing object which exposes the callback to be invoked
Definition: agents.h:2427
template<class _Type >
long Concurrency::ordered_message_processor< _Type >::_Process_message_helper ( )
inlineprivate
2273  {
2274  _NR_lock _Lock(_M_asyncSendLock);
2275 
2276  long _Messages_processed = 0;
2277 
2278  // Do batched processing of messages
2279  // Read off the number of messages to process in this iteration by snapping a count
2280  volatile long _Count = _M_queuedDataCount;
2281  bool _StopProcessing = false;
2282 
2283  // This count could be 0 if there was both a synchronous and asynchronous
2284  // send occuring. One of them could have sent all of the messages for the other
2285  while (_Count > 0)
2286  {
2287  // Process _Count number of messages
2288  _Invoke_handler(_Count);
2289  _Messages_processed += _Count;
2290 
2291  // Subtract the count and see if there are new things to process
2292  volatile long _Orig = _InterlockedExchangeAdd((volatile long *) &_M_queuedDataCount, -_Count);
2293  _CONCRT_ASSERT(_Orig >= _Count);
2294  if (_Orig == _Count)
2295  {
2296  // Because _Count did not change, we processed everything there is to process
2297  break;
2298  }
2299 
2300  if (_StopProcessing)
2301  {
2302  break;
2303  }
2304 
2305  // After reading the flag process the currently queued messages
2306  // Any messages received after we observe this flag (to be set) will not
2307  // be processed.
2308  _StopProcessing = (_M_stopProcessing == 0) ? false : true;
2309 
2310  // Snap the count and try to process more
2311  _Count = _M_queuedDataCount;
2312  }
2313 
2314  return _Messages_processed;
2315  }
#define _CONCRT_ASSERT(x)
Definition: concrt.h:137
volatile long _M_stopProcessing
A flag set in the destructor of a block to cease processing of new messages. This is required to guar...
Definition: agents.h:2409
::Concurrency::details::_NonReentrantPPLLock _M_asyncSendLock
A lock to use for queueing incoming messages.
Definition: agents.h:2383
volatile long _M_queuedDataCount
A count of the current number of messages to process. Used as a flag to see if a new process message ...
Definition: agents.h:2390
::Concurrency::details::_NonReentrantPPLLock::_Scoped_lock _NR_lock
A lock holder that acquires a non-reentrant lock on instantiation and releases it on destruction...
Definition: agents.h:57
void _Invoke_handler(long _Count)
Definition: agents.h:2319
_Diff _Count
Definition: algorithm:1941
template<class _Type >
void Concurrency::ordered_message_processor< _Type >::_Sync_send_helper ( message< _Type > *  _Msg)
inlineprivate
2243  {
2244  _NR_lock _Lock(_M_asyncSendLock);
2245 
2246  // Message block destructors sets the _M_stopProcessing flag to stop
2247  // processing any more messages. This is required to guarantee
2248  // that the destructor's wait_for_async_sends will complete
2249  if (_M_stopProcessing == 0)
2250  {
2251  if (_M_queuedDataCount > 0)
2252  {
2253  long _Count = _InterlockedExchange((volatile long *) &_M_queuedDataCount, 0);
2254  _Invoke_handler(_Count);
2255  }
2256 
2257  _Invoke_handler(_Msg);
2258  }
2259  else
2260  {
2261  // Destructor is running. Do not process the message
2262  // Delete the msg, if any.
2263  if (_Msg != NULL)
2264  {
2265  delete _Msg;
2266  }
2267  }
2268 
2269  }
volatile long _M_stopProcessing
A flag set in the destructor of a block to cease processing of new messages. This is required to guar...
Definition: agents.h:2409
#define NULL
Definition: crtdbg.h:30
::Concurrency::details::_NonReentrantPPLLock _M_asyncSendLock
A lock to use for queueing incoming messages.
Definition: agents.h:2383
volatile long _M_queuedDataCount
A count of the current number of messages to process. Used as a flag to see if a new process message ...
Definition: agents.h:2390
::Concurrency::details::_NonReentrantPPLLock::_Scoped_lock _NR_lock
A lock holder that acquires a non-reentrant lock on instantiation and releases it on destruction...
Definition: agents.h:57
void _Invoke_handler(long _Count)
Definition: agents.h:2319
_Diff _Count
Definition: algorithm:1941
template<class _Type >
virtual void Concurrency::ordered_message_processor< _Type >::async_send ( _Inout_opt_ message< _Type > *  _Msg)
inlinevirtual

Asynchronously queues up messages and starts a processing task, if this has not been done already.

Parameters
_MsgA pointer to a message.

Implements Concurrency::message_processor< _Type >.

2127  {
2128  if (_M_handler == NULL)
2129  {
2130  throw invalid_operation("async_send called without registering a callback");
2131  }
2132 
2133  //
2134  // If there is a message to send, enqueue it in the processing queue.
2135  // async_send can be sent a NULL message if the block wishes to reprocess
2136  // the messages that are in its queue. For example, an unbounded_buffer
2137  // that has its head node released after reservation.
2138  //
2139  if (_Msg != NULL)
2140  {
2141  _M_queuedMessages.push(_Msg);
2142  }
2143 
2145  {
2146  // Indicate that an LWT is in progress. This will cause the
2147  // destructor to block.
2149 
2150  if (_M_stopProcessing == 0)
2151  {
2153 
2155 
2157 #ifdef _CRT_USE_WINAPI_FAMILY_DESKTOP_APP
2158  if (_M_pScheduleGroup != NULL)
2159  {
2160  _M_pScheduleGroup->ScheduleTask(_Proc, this);
2161  }
2162  else if (_M_pScheduler != NULL)
2163  {
2164  _M_pScheduler->ScheduleTask(_Proc, this);
2165  }
2166  else
2167  {
2168 #endif /* _CRT_USE_WINAPI_FAMILY_DESKTOP_APP */
2170 #ifdef _CRT_USE_WINAPI_FAMILY_DESKTOP_APP
2171  }
2172 #endif /* _CRT_USE_WINAPI_FAMILY_DESKTOP_APP */
2173 
2174  // The LWT will decrement _M_lwtCount.
2175  return;
2176  }
2177 
2178  // If we get here then no task was scheduled. Decrement LWT count to reflect this fact
2180  }
2181  }
volatile long _M_lwtCount
A counter to indicate the number of outstanding LWTs
Definition: agents.h:2415
static _CRTIMP void __cdecl _ScheduleTask(TaskProc _Proc, void *_Data)
#define _CONCRT_ASSERT(x)
Definition: concrt.h:137
void(__cdecl * TaskProc)(void *)
Concurrency::details contains definitions of support routines in the public namespaces and one or mor...
Definition: concrt.h:265
concurrent_queue< message< _Type > * > _M_queuedMessages
A queue of the messages
Definition: agents.h:2377
__int64 _Trace_agents_get_id(_Type *_PObject)
Definition: agents.h:434
volatile long _M_stopProcessing
A flag set in the destructor of a block to cease processing of new messages. This is required to guar...
Definition: agents.h:2409
An ordered_message_processor is a message_processor that allows message blocks to process messages in...
Definition: agents.h:2008
#define NULL
Definition: crtdbg.h:30
volatile long _M_queuedDataCount
A count of the current number of messages to process. Used as a flag to see if a new process message ...
Definition: agents.h:2390
An event type that represents the scheduling of a process
Definition: concrt.h:5819
ScheduleGroup * _M_pScheduleGroup
The schedule group to process messages on
Definition: agents.h:2402
_Handler_method _M_handler
A message handler object which exposes the callback to be invoked
Definition: agents.h:2421
long __cdecl _InterlockedDecrement(long volatile *)
long __cdecl _InterlockedIncrement(long volatile *)
_CRTIMP void __cdecl _Trace_agents(Agents_EventType _Type, __int64 agentId,...)
Scheduler * _M_pScheduler
The scheduler to process messages on
Definition: agents.h:2396
template<class _Type >
void Concurrency::ordered_message_processor< _Type >::initialize ( _Inout_opt_ Scheduler *  _PScheduler,
_Inout_opt_ ScheduleGroup *  _PScheduleGroup,
_Handler_method const &  _Handler 
)
inline

Initializes the ordered_message_processor object with the appropriate callback function, scheduler and schedule group.

Parameters
_PSchedulerA pointer to the scheduler to be used for scheduling light-weight tasks.
_PScheduleGroupA pointer to the schedule group to be used for scheduling light-weight tasks.
_HandlerThe handler functor invoked during callback.
See also
Scheduler Class, ScheduleGroup Class
2078  {
2079  _M_pScheduler = _PScheduler;
2080  _M_pScheduleGroup = _PScheduleGroup;
2081  _M_handler = _Handler;
2082  _M_stopProcessing = 0;
2083  }
volatile long _M_stopProcessing
A flag set in the destructor of a block to cease processing of new messages. This is required to guar...
Definition: agents.h:2409
ScheduleGroup * _M_pScheduleGroup
The schedule group to process messages on
Definition: agents.h:2402
_Handler_method _M_handler
A message handler object which exposes the callback to be invoked
Definition: agents.h:2421
Scheduler * _M_pScheduler
The scheduler to process messages on
Definition: agents.h:2396
template<class _Type >
virtual void Concurrency::ordered_message_processor< _Type >::initialize_batched_processing ( _Handler_method const &  _Processor,
_Propagator_method const &  _Propagator 
)
inlinevirtual

Initialize batched message processing

Parameters
_ProcessorThe processor functor invoked during callback.
_PropagatorThe propagator functor invoked during callback.
2095  {
2096  _M_processor = _Processor;
2097  _M_propagator = _Propagator;
2098  }
_Propagator_method _M_propagator
A message propagating object which exposes the callback to be invoked
Definition: agents.h:2433
_Handler_method _M_processor
A message processing object which exposes the callback to be invoked
Definition: agents.h:2427
template<class _Type >
virtual void Concurrency::ordered_message_processor< _Type >::process_incoming_message ( )
inlineprotectedvirtual

The processing function that is called asynchronously. It dequeues messages and begins processing them.

Implements Concurrency::message_processor< _Type >.

2218  {
2222 
2223  // Indicate that an LWT completed
2225 
2226  // Do not access any members here. If the count goes to
2227  // 0 as a result of the above decrement, the object
2228  // could be immediately deleted.
2229  }
volatile long _M_lwtCount
A counter to indicate the number of outstanding LWTs
Definition: agents.h:2415
__int64 _Trace_agents_get_id(_Type *_PObject)
Definition: agents.h:434
An event type that represents the conclusion of some processing
Definition: concrt.h:5807
long _Process_message_helper()
Definition: agents.h:2272
An event type that represents the initiation of some processing
Definition: concrt.h:5801
long __cdecl _InterlockedDecrement(long volatile *)
_Diff _Count
Definition: algorithm:1941
_CRTIMP void __cdecl _Trace_agents(Agents_EventType _Type, __int64 agentId,...)
template<class _Type >
virtual void Concurrency::ordered_message_processor< _Type >::sync_send ( _Inout_opt_ message< _Type > *  _Msg)
inlinevirtual

Synchronously queues up messages and starts a processing task, if this has not been done already.

Parameters
_MsgA pointer to a message.

Implements Concurrency::message_processor< _Type >.

2109  {
2110  if (_M_handler == NULL)
2111  {
2112  throw invalid_operation("sync_send called without registering a callback");
2113  }
2114 
2115  _Sync_send_helper(_Msg);
2116  }
#define NULL
Definition: crtdbg.h:30
void _Sync_send_helper(message< _Type > *_Msg)
Definition: agents.h:2242
_Handler_method _M_handler
A message handler object which exposes the callback to be invoked
Definition: agents.h:2421
template<class _Type >
virtual void Concurrency::ordered_message_processor< _Type >::wait ( )
inlinevirtual

A processor-specific spin wait used in destructors of message blocks to make sure that all asynchronous processing tasks have time to finish before destroying the block.

Implements Concurrency::message_processor< _Type >.

2189  {
2190  // Cease processing of any new messages
2192 
2193  // This spin makes sure all previously initiated message processings
2194  // will still process correctly. As soon as this count reaches zero, we can
2195  // procede with the message block destructor.
2197  while(_M_lwtCount != 0)
2198  {
2199  spinWait._SpinOnce();
2200  }
2201 
2202  // Synchronize with sync_send
2203  {
2204  _NR_lock _Lock(_M_asyncSendLock);
2206  }
2207 
2208  }
volatile long _M_lwtCount
A counter to indicate the number of outstanding LWTs
Definition: agents.h:2415
Implements busy wait with no backoff
Definition: concrt.h:604
void _Clear_queued_messages()
Definition: agents.h:2233
volatile long _M_stopProcessing
A flag set in the destructor of a block to cease processing of new messages. This is required to guar...
Definition: agents.h:2409
::Concurrency::details::_NonReentrantPPLLock _M_asyncSendLock
A lock to use for queueing incoming messages.
Definition: agents.h:2383
static _CRTIMP void __cdecl _Yield()
::Concurrency::details::_NonReentrantPPLLock::_Scoped_lock _NR_lock
A lock holder that acquires a non-reentrant lock on instantiation and releases it on destruction...
Definition: agents.h:57
long __cdecl _InterlockedIncrement(long volatile *)

Member Data Documentation

template<class _Type >
::Concurrency::details::_NonReentrantPPLLock Concurrency::ordered_message_processor< _Type >::_M_asyncSendLock
private

A lock to use for queueing incoming messages.

template<class _Type >
_Handler_method Concurrency::ordered_message_processor< _Type >::_M_handler
private

A message handler object which exposes the callback to be invoked

template<class _Type >
volatile long Concurrency::ordered_message_processor< _Type >::_M_lwtCount
private

A counter to indicate the number of outstanding LWTs

template<class _Type >
_Handler_method Concurrency::ordered_message_processor< _Type >::_M_processor
private

A message processing object which exposes the callback to be invoked

template<class _Type >
_Propagator_method Concurrency::ordered_message_processor< _Type >::_M_propagator
private

A message propagating object which exposes the callback to be invoked

template<class _Type >
ScheduleGroup* Concurrency::ordered_message_processor< _Type >::_M_pScheduleGroup
private

The schedule group to process messages on

template<class _Type >
Scheduler* Concurrency::ordered_message_processor< _Type >::_M_pScheduler
private

The scheduler to process messages on

template<class _Type >
volatile long Concurrency::ordered_message_processor< _Type >::_M_queuedDataCount
private

A count of the current number of messages to process. Used as a flag to see if a new process message task needs to be created.

template<class _Type >
concurrent_queue<message<_Type> *> Concurrency::ordered_message_processor< _Type >::_M_queuedMessages
private

A queue of the messages

template<class _Type >
volatile long Concurrency::ordered_message_processor< _Type >::_M_stopProcessing
private

A flag set in the destructor of a block to cease processing of new messages. This is required to guarantee that _M_queuedDataCount will get to 0 eventually.


The documentation for this class was generated from the following file: