STLdoc
STLdocumentation
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
Public Types | Public Member Functions | Protected Member Functions | Private Member Functions | Private Attributes | List of all members
Concurrency::ordered_message_processor< _Type > Class Template Reference

An ordered_message_processor is a message_processor that allows message blocks to process messages in the order they were received. More...

#include <agents.h>

Inheritance diagram for Concurrency::ordered_message_processor< _Type >:
Concurrency::message_processor< _Type >

Public Types

typedef std::function< void(message< _Type > *)> _Handler_method
 The signature of the callback method invoked while processing messages. More...
 
typedef std::function< void(void)> _Propagator_method
 The signature of the callback method invoked while propagating messages. More...
 
typedef _Type type
 A type alias for _Type . More...
 
- Public Types inherited from Concurrency::message_processor< _Type >
typedef _Type type
 A type alias for _Type . More...
 

Public Member Functions

 ordered_message_processor ()
 Constructs an ordered_message_processor object. More...
 
virtual ~ordered_message_processor ()
 Destroys the ordered_message_processor object. More...
 
void initialize (_Inout_opt_ Scheduler *_PScheduler, _Inout_opt_ ScheduleGroup *_PScheduleGroup, _Handler_method const &_Handler)
 Initializes the ordered_message_processor object with the appropriate callback function, scheduler and schedule group. More...
 
virtual void initialize_batched_processing (_Handler_method const &_Processor, _Propagator_method const &_Propagator)
 Initialize batched message processing More...
 
virtual void sync_send (_Inout_opt_ message< _Type > *_Msg)
 Synchronously queues up messages and starts a processing task, if this has not been done already. More...
 
virtual void async_send (_Inout_opt_ message< _Type > *_Msg)
 Asynchronously queues up messages and starts a processing task, if this has not been done already. More...
 
virtual void wait ()
 A processor-specific spin wait used in destructors of message blocks to make sure that all asynchronous processing tasks have time to finish before destroying the block. More...
 

Protected Member Functions

virtual void process_incoming_message ()
 The processing function that is called asynchronously. It dequeues messages and begins processing them. More...
 

Private Member Functions

void _Clear_queued_messages ()
 
void _Sync_send_helper (message< _Type > *_Msg)
 
long _Process_message_helper ()
 
void _Invoke_handler (long _Count)
 
void _Invoke_handler (message< _Type > *_Msg)
 

Private Attributes

concurrent_queue< message< _Type > * > _M_queuedMessages
 A queue of the messages More...
 
::Concurrency::details::_NonReentrantPPLLock _M_asyncSendLock
 A lock to use for queueing incoming messages. More...
 
volatile long _M_queuedDataCount
 A count of the current number of messages to process. Used as a flag to see if a new process message task needs to be created. More...
 
Scheduler * _M_pScheduler
 The scheduler to process messages on More...
 
ScheduleGroup * _M_pScheduleGroup
 The schedule group to process messages on More...
 
volatile long _M_stopProcessing
 A flag set in the destructor of a block to cease processing of new messages. This is required to guarantee that _M_queuedDataCount will get to 0 eventually. More...
 
volatile long _M_lwtCount
 A counter to indicate the number of outstanding LWTs More...
 
_Handler_method _M_handler
 A message handler object which exposes the callback to be invoked More...
 
_Handler_method _M_processor
 A message processing object which exposes the callback to be invoked More...
 
_Propagator_method _M_propagator
 A message propagating object which exposes the callback to be invoked More...
 

Additional Inherited Members

- Static Protected Member Functions inherited from Concurrency::message_processor< _Type >
static void __cdecl _Process_incoming_message_wrapper (void *_Data)
 Wrapper for process_incoming_message suitable for use as a argument to CreateThread and other similar methods. More...
 

Detailed Description

template<class _Type>
class Concurrency::ordered_message_processor< _Type >

An ordered_message_processor is a message_processor that allows message blocks to process messages in the order they were received.

Template Parameters
_TypeThe payload type of messages handled by the processor.

Member Typedef Documentation

template<class _Type >
typedef std::function<void(message<_Type> *)> Concurrency::ordered_message_processor< _Type >::_Handler_method

The signature of the callback method invoked while processing messages.

template<class _Type >
typedef std::function<void(void)> Concurrency::ordered_message_processor< _Type >::_Propagator_method

The signature of the callback method invoked while propagating messages.

template<class _Type >
typedef _Type Concurrency::ordered_message_processor< _Type >::type

A type alias for _Type .

Constructor & Destructor Documentation

Constructs an ordered_message_processor object.

This ordered_message_processor will not schedule asynchronous or synchronous handlers until the initialize function is called.

2038  :
2039  _M_queuedDataCount(0),
2040  _M_stopProcessing(1),
2041  _M_lwtCount(0),
2044  _M_handler(nullptr),
2045  _M_processor(nullptr),
2046  _M_propagator(nullptr)
2047  {
2048  }
volatile long _M_lwtCount
A counter to indicate the number of outstanding LWTs
Definition: agents.h:2416
_Propagator_method _M_propagator
A message propagating object which exposes the callback to be invoked
Definition: agents.h:2434
volatile long _M_stopProcessing
A flag set in the destructor of a block to cease processing of new messages. This is required to guar...
Definition: agents.h:2410
volatile long _M_queuedDataCount
A count of the current number of messages to process. Used as a flag to see if a new process message ...
Definition: agents.h:2391
ScheduleGroup * _M_pScheduleGroup
The schedule group to process messages on
Definition: agents.h:2403
_Handler_method _M_handler
A message handler object which exposes the callback to be invoked
Definition: agents.h:2422
_Handler_method _M_processor
A message processing object which exposes the callback to be invoked
Definition: agents.h:2428
#define NULL
Definition: corecrt.h:158
Scheduler * _M_pScheduler
The scheduler to process messages on
Definition: agents.h:2397
template<class _Type >
virtual Concurrency::ordered_message_processor< _Type >::~ordered_message_processor ( )
inlinevirtual

Destroys the ordered_message_processor object.

Waits for all outstanding asynchronous operations before destroying the processor.

2058  {
2059  wait();
2060  }
virtual void wait()
A processor-specific spin wait used in destructors of message blocks to make sure that all asynchrono...
Definition: agents.h:2189

Member Function Documentation

template<class _Type >
void Concurrency::ordered_message_processor< _Type >::_Clear_queued_messages ( )
inlineprivate
2235  {
2236  message<_Type> * _Msg = NULL;
2237  while (_M_queuedMessages.try_pop(_Msg))
2238  {
2239  delete _Msg;
2240  }
2241  }
concurrent_queue< message< _Type > * > _M_queuedMessages
A queue of the messages
Definition: agents.h:2378
#define NULL
Definition: corecrt.h:158
template<class _Type >
void Concurrency::ordered_message_processor< _Type >::_Invoke_handler ( long  _Count)
inlineprivate
2321  {
2322  // Process _Count number of messages
2323  for(int _I = 0; _I < _Count; _I++)
2324  {
2325  message<_Type> * _Msg = NULL;
2326  _M_queuedMessages.try_pop(_Msg);
2327  if (_M_processor == NULL)
2328  {
2329  // If a processor function does not exist, the message processor is using single
2330  // message processing rather than batched processing. There should also be no
2331  // propagator function defined in this case.
2333  _M_handler(_Msg);
2334  }
2335  else
2336  {
2337  // Use the batched message processing function
2338  _M_processor(_Msg);
2339  }
2340  }
2341 
2342  // Call the handler which propagates the message(s)
2343  if (_M_propagator != NULL)
2344  {
2345  _M_propagator();
2346  }
2347  }
_Propagator_method _M_propagator
A message propagating object which exposes the callback to be invoked
Definition: agents.h:2434
#define _CONCRT_ASSERT(x)
Definition: concrt.h:123
concurrent_queue< message< _Type > * > _M_queuedMessages
A queue of the messages
Definition: agents.h:2378
_Handler_method _M_handler
A message handler object which exposes the callback to be invoked
Definition: agents.h:2422
_Diff _Count
Definition: algorithm:1941
_Handler_method _M_processor
A message processing object which exposes the callback to be invoked
Definition: agents.h:2428
#define NULL
Definition: corecrt.h:158
template<class _Type >
void Concurrency::ordered_message_processor< _Type >::_Invoke_handler ( message< _Type > *  _Msg)
inlineprivate
2351  {
2352  if (_M_processor == NULL)
2353  {
2354  // If a processor function does not exist, the message processor is using single
2355  // message processing rather than batched processing. There should also be no
2356  // propagator function defined in this case.
2358  _M_handler(_Msg);
2359  }
2360  else
2361  {
2362  // Use the batched message processing function
2363  _M_processor(_Msg);
2364 
2365  // Call the handler which propagates the message(s)
2366  if (_M_propagator != NULL)
2367  {
2368  _M_propagator();
2369  }
2370  }
2371  }
_Propagator_method _M_propagator
A message propagating object which exposes the callback to be invoked
Definition: agents.h:2434
#define _CONCRT_ASSERT(x)
Definition: concrt.h:123
_Handler_method _M_handler
A message handler object which exposes the callback to be invoked
Definition: agents.h:2422
_Handler_method _M_processor
A message processing object which exposes the callback to be invoked
Definition: agents.h:2428
#define NULL
Definition: corecrt.h:158
template<class _Type >
long Concurrency::ordered_message_processor< _Type >::_Process_message_helper ( )
inlineprivate
2274  {
2275  _NR_lock _Lock(_M_asyncSendLock);
2276 
2277  long _Messages_processed = 0;
2278 
2279  // Do batched processing of messages
2280  // Read off the number of messages to process in this iteration by snapping a count
2281  volatile long _Count = _M_queuedDataCount;
2282  bool _StopProcessing = false;
2283 
2284  // This count could be 0 if there was both a synchronous and asynchronous
2285  // send occurring. One of them could have sent all of the messages for the other
2286  while (_Count > 0)
2287  {
2288  // Process _Count number of messages
2289  _Invoke_handler(_Count);
2290  _Messages_processed += _Count;
2291 
2292  // Subtract the count and see if there are new things to process
2293  volatile long _Orig = _InterlockedExchangeAdd((volatile long *) &_M_queuedDataCount, -_Count);
2294  _CONCRT_ASSERT(_Orig >= _Count);
2295  if (_Orig == _Count)
2296  {
2297  // Because _Count did not change, we processed everything there is to process
2298  break;
2299  }
2300 
2301  if (_StopProcessing)
2302  {
2303  break;
2304  }
2305 
2306  // After reading the flag process the currently queued messages
2307  // Any messages received after we observe this flag (to be set) will not
2308  // be processed.
2309  _StopProcessing = (_M_stopProcessing == 0) ? false : true;
2310 
2311  // Snap the count and try to process more
2312  _Count = _M_queuedDataCount;
2313  }
2314 
2315  return _Messages_processed;
2316  }
#define _CONCRT_ASSERT(x)
Definition: concrt.h:123
volatile long _M_stopProcessing
A flag set in the destructor of a block to cease processing of new messages. This is required to guar...
Definition: agents.h:2410
::Concurrency::details::_NonReentrantPPLLock _M_asyncSendLock
A lock to use for queueing incoming messages.
Definition: agents.h:2384
volatile long _M_queuedDataCount
A count of the current number of messages to process. Used as a flag to see if a new process message ...
Definition: agents.h:2391
::Concurrency::details::_NonReentrantPPLLock::_Scoped_lock _NR_lock
A lock holder that acquires a non-reentrant lock on instantiation and releases it on destruction...
Definition: agents.h:58
void _Invoke_handler(long _Count)
Definition: agents.h:2320
_Diff _Count
Definition: algorithm:1941
template<class _Type >
void Concurrency::ordered_message_processor< _Type >::_Sync_send_helper ( message< _Type > *  _Msg)
inlineprivate
2244  {
2245  _NR_lock _Lock(_M_asyncSendLock);
2246 
2247  // Message block destructors sets the _M_stopProcessing flag to stop
2248  // processing any more messages. This is required to guarantee
2249  // that the destructor's wait_for_async_sends will complete
2250  if (_M_stopProcessing == 0)
2251  {
2252  if (_M_queuedDataCount > 0)
2253  {
2254  long _Count = _InterlockedExchange((volatile long *) &_M_queuedDataCount, 0);
2255  _Invoke_handler(_Count);
2256  }
2257 
2258  _Invoke_handler(_Msg);
2259  }
2260  else
2261  {
2262  // Destructor is running. Do not process the message
2263  // Delete the msg, if any.
2264  if (_Msg != NULL)
2265  {
2266  delete _Msg;
2267  }
2268  }
2269 
2270  }
volatile long _M_stopProcessing
A flag set in the destructor of a block to cease processing of new messages. This is required to guar...
Definition: agents.h:2410
::Concurrency::details::_NonReentrantPPLLock _M_asyncSendLock
A lock to use for queueing incoming messages.
Definition: agents.h:2384
volatile long _M_queuedDataCount
A count of the current number of messages to process. Used as a flag to see if a new process message ...
Definition: agents.h:2391
::Concurrency::details::_NonReentrantPPLLock::_Scoped_lock _NR_lock
A lock holder that acquires a non-reentrant lock on instantiation and releases it on destruction...
Definition: agents.h:58
void _Invoke_handler(long _Count)
Definition: agents.h:2320
_Diff _Count
Definition: algorithm:1941
#define NULL
Definition: corecrt.h:158
template<class _Type >
virtual void Concurrency::ordered_message_processor< _Type >::async_send ( _Inout_opt_ message< _Type > *  _Msg)
inlinevirtual

Asynchronously queues up messages and starts a processing task, if this has not been done already.

Parameters
_MsgA pointer to a message.

Implements Concurrency::message_processor< _Type >.

2128  {
2129  if (_M_handler == NULL)
2130  {
2131  throw invalid_operation("async_send called without registering a callback");
2132  }
2133 
2134  //
2135  // If there is a message to send, enqueue it in the processing queue.
2136  // async_send can be sent a NULL message if the block wishes to reprocess
2137  // the messages that are in its queue. For example, an unbounded_buffer
2138  // that has its head node released after reservation.
2139  //
2140  if (_Msg != NULL)
2141  {
2142  _M_queuedMessages.push(_Msg);
2143  }
2144 
2146  {
2147  // Indicate that an LWT is in progress. This will cause the
2148  // destructor to block.
2150 
2151  if (_M_stopProcessing == 0)
2152  {
2154 
2156 
2158 #ifdef _CRT_USE_WINAPI_FAMILY_DESKTOP_APP
2159  if (_M_pScheduleGroup != NULL)
2160  {
2161  _M_pScheduleGroup->ScheduleTask(_Proc, this);
2162  }
2163  else if (_M_pScheduler != NULL)
2164  {
2165  _M_pScheduler->ScheduleTask(_Proc, this);
2166  }
2167  else
2168  {
2169 #endif /* _CRT_USE_WINAPI_FAMILY_DESKTOP_APP */
2171 #ifdef _CRT_USE_WINAPI_FAMILY_DESKTOP_APP
2172  }
2173 #endif /* _CRT_USE_WINAPI_FAMILY_DESKTOP_APP */
2174 
2175  // The LWT will decrement _M_lwtCount.
2176  return;
2177  }
2178 
2179  // If we get here then no task was scheduled. Decrement LWT count to reflect this fact
2181  }
2182  }
volatile long _M_lwtCount
A counter to indicate the number of outstanding LWTs
Definition: agents.h:2416
static _CONCRTIMP void __cdecl _ScheduleTask(TaskProc _Proc, void *_Data)
#define _CONCRT_ASSERT(x)
Definition: concrt.h:123
void(__cdecl * TaskProc)(void *)
Concurrency::details contains definitions of support routines in the public namespaces and one or mor...
Definition: concrt.h:251
concurrent_queue< message< _Type > * > _M_queuedMessages
A queue of the messages
Definition: agents.h:2378
__int64 _Trace_agents_get_id(_Type *_PObject)
Definition: agents.h:435
volatile long _M_stopProcessing
A flag set in the destructor of a block to cease processing of new messages. This is required to guar...
Definition: agents.h:2410
An ordered_message_processor is a message_processor that allows message blocks to process messages in...
Definition: agents.h:2009
_CONCRTIMP void __cdecl _Trace_agents(Agents_EventType _Type, __int64 _AgentId,...)
volatile long _M_queuedDataCount
A count of the current number of messages to process. Used as a flag to see if a new process message ...
Definition: agents.h:2391
An event type that represents the scheduling of a process
Definition: concrt.h:5673
ScheduleGroup * _M_pScheduleGroup
The schedule group to process messages on
Definition: agents.h:2403
_Handler_method _M_handler
A message handler object which exposes the callback to be invoked
Definition: agents.h:2422
long __cdecl _InterlockedDecrement(long volatile *)
long __cdecl _InterlockedIncrement(long volatile *)
#define NULL
Definition: corecrt.h:158
Scheduler * _M_pScheduler
The scheduler to process messages on
Definition: agents.h:2397
template<class _Type >
void Concurrency::ordered_message_processor< _Type >::initialize ( _Inout_opt_ Scheduler *  _PScheduler,
_Inout_opt_ ScheduleGroup *  _PScheduleGroup,
_Handler_method const &  _Handler 
)
inline

Initializes the ordered_message_processor object with the appropriate callback function, scheduler and schedule group.

Parameters
_PSchedulerA pointer to the scheduler to be used for scheduling light-weight tasks.
_PScheduleGroupA pointer to the schedule group to be used for scheduling light-weight tasks.
_HandlerThe handler functor invoked during callback.
See also
Scheduler Class, ScheduleGroup Class
2079  {
2080  _M_pScheduler = _PScheduler;
2081  _M_pScheduleGroup = _PScheduleGroup;
2082  _M_handler = _Handler;
2083  _M_stopProcessing = 0;
2084  }
volatile long _M_stopProcessing
A flag set in the destructor of a block to cease processing of new messages. This is required to guar...
Definition: agents.h:2410
ScheduleGroup * _M_pScheduleGroup
The schedule group to process messages on
Definition: agents.h:2403
_Handler_method _M_handler
A message handler object which exposes the callback to be invoked
Definition: agents.h:2422
Scheduler * _M_pScheduler
The scheduler to process messages on
Definition: agents.h:2397
template<class _Type >
virtual void Concurrency::ordered_message_processor< _Type >::initialize_batched_processing ( _Handler_method const &  _Processor,
_Propagator_method const &  _Propagator 
)
inlinevirtual

Initialize batched message processing

Parameters
_ProcessorThe processor functor invoked during callback.
_PropagatorThe propagator functor invoked during callback.
2096  {
2097  _M_processor = _Processor;
2098  _M_propagator = _Propagator;
2099  }
_Propagator_method _M_propagator
A message propagating object which exposes the callback to be invoked
Definition: agents.h:2434
_Handler_method _M_processor
A message processing object which exposes the callback to be invoked
Definition: agents.h:2428
template<class _Type >
virtual void Concurrency::ordered_message_processor< _Type >::process_incoming_message ( )
inlineprotectedvirtual

The processing function that is called asynchronously. It dequeues messages and begins processing them.

Implements Concurrency::message_processor< _Type >.

2219  {
2223 
2224  // Indicate that an LWT completed
2226 
2227  // Do not access any members here. If the count goes to
2228  // 0 as a result of the above decrement, the object
2229  // could be immediately deleted.
2230  }
volatile long _M_lwtCount
A counter to indicate the number of outstanding LWTs
Definition: agents.h:2416
__int64 _Trace_agents_get_id(_Type *_PObject)
Definition: agents.h:435
_CONCRTIMP void __cdecl _Trace_agents(Agents_EventType _Type, __int64 _AgentId,...)
An event type that represents the conclusion of some processing
Definition: concrt.h:5661
long _Process_message_helper()
Definition: agents.h:2273
An event type that represents the initiation of some processing
Definition: concrt.h:5655
long __cdecl _InterlockedDecrement(long volatile *)
_Diff _Count
Definition: algorithm:1941
template<class _Type >
virtual void Concurrency::ordered_message_processor< _Type >::sync_send ( _Inout_opt_ message< _Type > *  _Msg)
inlinevirtual

Synchronously queues up messages and starts a processing task, if this has not been done already.

Parameters
_MsgA pointer to a message.

Implements Concurrency::message_processor< _Type >.

2110  {
2111  if (_M_handler == NULL)
2112  {
2113  throw invalid_operation("sync_send called without registering a callback");
2114  }
2115 
2116  _Sync_send_helper(_Msg);
2117  }
void _Sync_send_helper(message< _Type > *_Msg)
Definition: agents.h:2243
_Handler_method _M_handler
A message handler object which exposes the callback to be invoked
Definition: agents.h:2422
#define NULL
Definition: corecrt.h:158
template<class _Type >
virtual void Concurrency::ordered_message_processor< _Type >::wait ( )
inlinevirtual

A processor-specific spin wait used in destructors of message blocks to make sure that all asynchronous processing tasks have time to finish before destroying the block.

Implements Concurrency::message_processor< _Type >.

2190  {
2191  // Cease processing of any new messages
2193 
2194  // This spin makes sure all previously initiated message processings
2195  // will still process correctly. As soon as this count reaches zero, we can
2196  // proceed with the message block destructor.
2198  while(_M_lwtCount != 0)
2199  {
2200  spinWait._SpinOnce();
2201  }
2202 
2203  // Synchronize with sync_send
2204  {
2205  _NR_lock _Lock(_M_asyncSendLock);
2207  }
2208 
2209  }
volatile long _M_lwtCount
A counter to indicate the number of outstanding LWTs
Definition: agents.h:2416
static _CONCRTIMP void __cdecl _Yield()
Implements busy wait with no backoff
Definition: concrt.h:578
void _Clear_queued_messages()
Definition: agents.h:2234
volatile long _M_stopProcessing
A flag set in the destructor of a block to cease processing of new messages. This is required to guar...
Definition: agents.h:2410
::Concurrency::details::_NonReentrantPPLLock _M_asyncSendLock
A lock to use for queueing incoming messages.
Definition: agents.h:2384
::Concurrency::details::_NonReentrantPPLLock::_Scoped_lock _NR_lock
A lock holder that acquires a non-reentrant lock on instantiation and releases it on destruction...
Definition: agents.h:58
long __cdecl _InterlockedIncrement(long volatile *)

Member Data Documentation

template<class _Type >
::Concurrency::details::_NonReentrantPPLLock Concurrency::ordered_message_processor< _Type >::_M_asyncSendLock
private

A lock to use for queueing incoming messages.

template<class _Type >
_Handler_method Concurrency::ordered_message_processor< _Type >::_M_handler
private

A message handler object which exposes the callback to be invoked

template<class _Type >
volatile long Concurrency::ordered_message_processor< _Type >::_M_lwtCount
private

A counter to indicate the number of outstanding LWTs

template<class _Type >
_Handler_method Concurrency::ordered_message_processor< _Type >::_M_processor
private

A message processing object which exposes the callback to be invoked

template<class _Type >
_Propagator_method Concurrency::ordered_message_processor< _Type >::_M_propagator
private

A message propagating object which exposes the callback to be invoked

template<class _Type >
ScheduleGroup* Concurrency::ordered_message_processor< _Type >::_M_pScheduleGroup
private

The schedule group to process messages on

template<class _Type >
Scheduler* Concurrency::ordered_message_processor< _Type >::_M_pScheduler
private

The scheduler to process messages on

template<class _Type >
volatile long Concurrency::ordered_message_processor< _Type >::_M_queuedDataCount
private

A count of the current number of messages to process. Used as a flag to see if a new process message task needs to be created.

template<class _Type >
concurrent_queue<message<_Type> *> Concurrency::ordered_message_processor< _Type >::_M_queuedMessages
private

A queue of the messages

template<class _Type >
volatile long Concurrency::ordered_message_processor< _Type >::_M_stopProcessing
private

A flag set in the destructor of a block to cease processing of new messages. This is required to guarantee that _M_queuedDataCount will get to 0 eventually.


The documentation for this class was generated from the following file: