STLdoc
STLdocumentation
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
Public Types | Public Member Functions | Protected Member Functions | Protected Attributes | Private Member Functions | Private Attributes | List of all members
Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType > Class Template Referenceabstract

The source_block class is an abstract base class for source-only blocks. The class provides basic link management functionality as well as common error checks. More...

#include <agents.h>

Inheritance diagram for Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >:
Concurrency::ISource< _TargetLinkRegistry::type::type > Concurrency::propagator_block< _TargetLinkRegistry, _SourceLinkRegistry, _MessageProcessorType >

Public Types

typedef _TargetLinkRegistry::type::type _Target_type
 The payload type of messages handled by this source_block. More...
 
typedef _TargetLinkRegistry::iterator target_iterator
 The iterator to walk the connected targets. More...
 
- Public Types inherited from Concurrency::ISource< _TargetLinkRegistry::type::type >
typedef _TargetLinkRegistry::type::type source_type
 A type alias for _Type . More...
 

Public Member Functions

 source_block ()
 Constructs a source_block object. More...
 
virtual ~source_block ()
 Destroys the source_block object. More...
 
virtual void link_target (_Inout_ ITarget< _Target_type > *_PTarget)
 Links a target block to this source_block object. More...
 
virtual void unlink_target (_Inout_ ITarget< _Target_type > *_PTarget)
 Unlinks a target block from this source_block object. More...
 
virtual void unlink_targets ()
 Unlinks all target blocks from this source_block object. More...
 
virtual message< _Target_type > * accept (runtime_object_identity _MsgId, _Inout_ ITarget< _Target_type > *_PTarget)
 Accepts a message that was offered by this source_block object, transferring ownership to the caller. More...
 
virtual bool reserve (runtime_object_identity _MsgId, _Inout_ ITarget< _Target_type > *_PTarget)
 Reserves a message previously offered by this source_block object. More...
 
virtual message< _Target_type > * consume (runtime_object_identity _MsgId, _Inout_ ITarget< _Target_type > *_PTarget)
 Consumes a message previously offered by this source_block object and successfully reserved by the target, transferring ownership to the caller. More...
 
virtual void release (runtime_object_identity _MsgId, _Inout_ ITarget< _Target_type > *_PTarget)
 Releases a previous successful message reservation. More...
 
virtual void acquire_ref (_Inout_ ITarget< _Target_type > *)
 Acquires a reference count on this source_block object, to prevent deletion. More...
 
virtual void release_ref (_Inout_ ITarget< _Target_type > *_PTarget)
 Releases a reference count on this source_block object. More...
 
- Public Member Functions inherited from Concurrency::ISource< _TargetLinkRegistry::type::type >
virtual ~ISource ()
 Destroys the ISource object. More...
 

Protected Member Functions

virtual void link_target_notification (_Inout_ ITarget< _Target_type > *)
 A callback that notifies that a new target has been linked to this source_block object. More...
 
virtual void unlink_target_notification (_Inout_ ITarget< _Target_type > *_PTarget)
 A callback that notifies that a target has been unlinked from this source_block object. More...
 
virtual message< _Target_type > * accept_message (runtime_object_identity _MsgId)=0
 When overridden in a derived class, accepts an offered message by the source. Message blocks should override this method to validate the _MsgId and return a message. More...
 
virtual bool reserve_message (runtime_object_identity _MsgId)=0
 When overridden in a derived class, reserves a message previously offered by this source_block object. More...
 
virtual message< _Target_type > * consume_message (runtime_object_identity _MsgId)=0
 When overridden in a derived class, consumes a message that was previously reserved. More...
 
virtual void release_message (runtime_object_identity _MsgId)=0
 When overridden in a derived class, releases a previous message reservation. More...
 
virtual void resume_propagation ()=0
 When overridden in a derived class, resumes propagation after a reservation has been released. More...
 
virtual void process_input_messages (_Inout_ message< _Target_type > *)
 Process input messages. This is only useful for propagator blocks, which derive from source_block More...
 
virtual void propagate_output_messages ()
 Propagate messages to targets. More...
 
virtual void propagate_to_any_targets (_Inout_opt_ message< _Target_type > *_PMessage)
 When overridden in a derived class, propagates the given message to any or all of the linked targets. This is the main propagation routine for message blocks. More...
 
void initialize_source (_Inout_opt_ Scheduler *_PScheduler=NULL, _Inout_opt_ ScheduleGroup *_PScheduleGroup=NULL)
 Initializes the message_propagator within this source_block. More...
 
void enable_batched_processing ()
 Enables batched processing for this block. More...
 
virtual void sync_send (_Inout_opt_ message< _Target_type > *_Msg)
 Synchronously queues up messages and starts a propagation task, if this has not been done already. More...
 
virtual void async_send (_Inout_opt_ message< _Target_type > *_Msg)
 Asynchronously queues up messages and starts a propagation task, if this has not been done already More...
 
void wait_for_outstanding_async_sends ()
 Waits for all asynchronous propagations to complete. This propagator-specific spin wait is used in destructors of message blocks to make sure that all asynchronous propagations have time to finish before destroying the block. More...
 
void remove_targets ()
 Removes all target links for this source block. This should be called from the destructor. More...
 
- Protected Member Functions inherited from Concurrency::ISource< _TargetLinkRegistry::type::type >
void _Invoke_link_source (ITarget< _TargetLinkRegistry::type::type > *_PLinkFrom)
 Links this source to a target. More...
 
void _Invoke_unlink_source (ITarget< _TargetLinkRegistry::type::type > *_PUnlinkFrom)
 Unlinks this source from a target. More...
 

Protected Attributes

ITarget< _Target_type > * _M_pReservedFor
 Connected target that is holding a reservation More...
 
runtime_object_identity _M_reservedId
 Reserved message ID More...
 
_TargetLinkRegistry _M_connectedTargets
 Connected targets More...
 
_MessageProcessorType _M_messageProcessor
 Processor used for asynchronous message handling More...
 

Private Member Functions

void _Handle_message (message< _Target_type > *_PMessage)
 Private methods. More...
 
void _Process_message (message< _Target_type > *_PMessage)
 
void _Propagate_message ()
 
void _Wait_on_ref (long _RefCount=0)
 

Private Attributes

::Concurrency::details::_ReentrantPPLLock _M_internalLock
 Internal lock used for the following synchronization: More...
 
volatile long _M_referenceCount
 

Detailed Description

template<class _TargetLinkRegistry, class _MessageProcessorType = ordered_message_processor<typename _TargetLinkRegistry::type::type>>
class Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >

The source_block class is an abstract base class for source-only blocks. The class provides basic link management functionality as well as common error checks.

Template Parameters
_TargetLinkRegistryLink registry to be used for holding the target links.
_MessageProcessorTypeProcessor type for message processing.

Message blocks should derive from this block to take advantage of link management and synchronization provided by this class.

See also
ISource Class

Member Typedef Documentation

template<class _TargetLinkRegistry, class _MessageProcessorType = ordered_message_processor<typename _TargetLinkRegistry::type::type>>
typedef _TargetLinkRegistry::type::type Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >::_Target_type

The payload type of messages handled by this source_block.

template<class _TargetLinkRegistry, class _MessageProcessorType = ordered_message_processor<typename _TargetLinkRegistry::type::type>>
typedef _TargetLinkRegistry::iterator Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >::target_iterator

The iterator to walk the connected targets.

Constructor & Destructor Documentation

template<class _TargetLinkRegistry, class _MessageProcessorType = ordered_message_processor<typename _TargetLinkRegistry::type::type>>
Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >::source_block ( )
inline

Constructs a source_block object.

4899  :
4901  _M_reservedId(-1),
4903  {
4907  }
volatile long _M_referenceCount
Definition: agents.h:5549
_MessageProcessorType _M_messageProcessor
Processor used for asynchronous message handling
Definition: agents.h:5491
An event type that represents the creation of an object
Definition: concrt.h:5649
__int64 _Trace_agents_get_id(_Type *_PObject)
Definition: agents.h:435
_CONCRTIMP void __cdecl _Trace_agents(Agents_EventType _Type, __int64 _AgentId,...)
runtime_object_identity _M_reservedId
Reserved message ID
Definition: agents.h:5479
ITarget< _Target_type > * _M_pReservedFor
Connected target that is holding a reservation
Definition: agents.h:5473
#define NULL
Definition: corecrt.h:158
template<class _TargetLinkRegistry, class _MessageProcessorType = ordered_message_processor<typename _TargetLinkRegistry::type::type>>
virtual Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >::~source_block ( )
inlinevirtual

Destroys the source_block object.

4914  {
4915  // All targets should have been unlinked
4916  _CONCRT_ASSERT(_M_connectedTargets.count() == 0);
4917 
4919  }
#define _CONCRT_ASSERT(x)
Definition: concrt.h:123
__int64 _Trace_agents_get_id(_Type *_PObject)
Definition: agents.h:435
_TargetLinkRegistry _M_connectedTargets
Connected targets
Definition: agents.h:5485
_CONCRTIMP void __cdecl _Trace_agents(Agents_EventType _Type, __int64 _AgentId,...)
An event type that represents the deletion of an object
Definition: concrt.h:5667

Member Function Documentation

template<class _TargetLinkRegistry, class _MessageProcessorType = ordered_message_processor<typename _TargetLinkRegistry::type::type>>
void Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >::_Handle_message ( message< _Target_type > *  _PMessage)
inlineprivate

Private methods.

5502  {
5503  // Hold a lock to synchronize with unlink targets
5504  _R_lock _Lock(_M_internalLock);
5505  propagate_to_any_targets(_PMessage);
5506  }
::Concurrency::details::_ReentrantPPLLock::_Scoped_lock _R_lock
A lock holder that acquires a reentrant lock on instantiation and releases it on destruction ...
Definition: agents.h:65
::Concurrency::details::_ReentrantPPLLock _M_internalLock
Internal lock used for the following synchronization:
Definition: agents.h:5547
virtual void propagate_to_any_targets(_Inout_opt_ message< _Target_type > *_PMessage)
When overridden in a derived class, propagates the given message to any or all of the linked targets...
Definition: agents.h:5362
template<class _TargetLinkRegistry, class _MessageProcessorType = ordered_message_processor<typename _TargetLinkRegistry::type::type>>
void Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >::_Process_message ( message< _Target_type > *  _PMessage)
inlineprivate
5512  {
5513  // Don't need a lock to process the message
5514  process_input_messages(_PMessage);
5515  }
virtual void process_input_messages(_Inout_ message< _Target_type > *)
Process input messages. This is only useful for propagator blocks, which derive from source_block ...
Definition: agents.h:5340
template<class _TargetLinkRegistry, class _MessageProcessorType = ordered_message_processor<typename _TargetLinkRegistry::type::type>>
void Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >::_Propagate_message ( )
inlineprivate
5521  {
5522  // Hold a lock to synchronize with unlink targets
5523  _R_lock _Lock(_M_internalLock);
5525  }
::Concurrency::details::_ReentrantPPLLock::_Scoped_lock _R_lock
A lock holder that acquires a reentrant lock on instantiation and releases it on destruction ...
Definition: agents.h:65
::Concurrency::details::_ReentrantPPLLock _M_internalLock
Internal lock used for the following synchronization:
Definition: agents.h:5547
virtual void propagate_output_messages()
Propagate messages to targets.
Definition: agents.h:5349
template<class _TargetLinkRegistry, class _MessageProcessorType = ordered_message_processor<typename _TargetLinkRegistry::type::type>>
void Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >::_Wait_on_ref ( long  _RefCount = 0)
inlineprivate
5530  {
5532  while(_M_referenceCount != _RefCount)
5533  {
5534  spinWait._SpinOnce();
5535  }
5536  }
volatile long _M_referenceCount
Definition: agents.h:5549
Implements busy wait with no backoff
Definition: concrt.h:578
bool _SpinOnce()
Spins for one time quantum,until a maximum spin is reached.
Definition: concrt.h:626
template<class _TargetLinkRegistry, class _MessageProcessorType = ordered_message_processor<typename _TargetLinkRegistry::type::type>>
virtual message<_Target_type>* Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >::accept ( runtime_object_identity  _MsgId,
_Inout_ ITarget< _Target_type > *  _PTarget 
)
inlinevirtual

Accepts a message that was offered by this source_block object, transferring ownership to the caller.

Parameters
_MsgIdThe runtime_object_identity of the offered message object.
_PTargetA pointer to the target block that is calling the accept method.
Returns
A pointer to the message object that the caller now has ownership of.

The method throws an invalid_argument exception if the parameter _PTarget is NULL.

The accept method is called by a target while a message is being offered by this ISource block. The message pointer returned may be different from the one passed into the propagate method of the ITarget block, if this source decides to make a copy of the message.

Implements Concurrency::ISource< _TargetLinkRegistry::type::type >.

5017  {
5018  if (_PTarget == NULL)
5019  {
5020  throw std::invalid_argument("_PTarget");
5021  }
5022 
5023  // Assert if the target is not connected
5024  _CONCRT_ASSERT(_M_connectedTargets.contains(_PTarget));
5025 
5026  return accept_message(_MsgId);
5027  }
#define _CONCRT_ASSERT(x)
Definition: concrt.h:123
_TargetLinkRegistry _M_connectedTargets
Connected targets
Definition: agents.h:5485
virtual message< _Target_type > * accept_message(runtime_object_identity _MsgId)=0
When overridden in a derived class, accepts an offered message by the source. Message blocks should o...
#define NULL
Definition: corecrt.h:158
template<class _TargetLinkRegistry, class _MessageProcessorType = ordered_message_processor<typename _TargetLinkRegistry::type::type>>
virtual message<_Target_type>* Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >::accept_message ( runtime_object_identity  _MsgId)
protectedpure virtual

When overridden in a derived class, accepts an offered message by the source. Message blocks should override this method to validate the _MsgId and return a message.

Parameters
_MsgIdThe runtime object identity of the message object.
Returns
A pointer to the message that the caller now has ownership of.

To transfer ownership, the original message pointer should be returned. To maintain ownership, a copy of message payload needs to be made and returned.

Implemented in Concurrency::_Join_node< _Type, _Destination_type, _Jtype >, Concurrency::_Non_greedy_node< _Type >, Concurrency::_Greedy_node< _Type >, Concurrency::_Reserving_node< _Type >, Concurrency::join< _Type, _Jtype >, Concurrency::single_assignment< _Type >, Concurrency::single_assignment< size_t >, Concurrency::timer< _Type >, Concurrency::transformer< _Input, _Output >, Concurrency::overwrite_buffer< _Type >, Concurrency::overwrite_buffer< agent_status >, and Concurrency::unbounded_buffer< _Type >.

template<class _TargetLinkRegistry, class _MessageProcessorType = ordered_message_processor<typename _TargetLinkRegistry::type::type>>
virtual void Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >::acquire_ref ( _Inout_ ITarget< _Target_type > *  )
inlinevirtual

Acquires a reference count on this source_block object, to prevent deletion.

Parameters
_PTargetA pointer to the target block that is calling this method.

This method is called by an ITarget object that is being linked to this source during the link_target method.

Implements Concurrency::ISource< _TargetLinkRegistry::type::type >.

5193  {
5195  }
volatile long _M_referenceCount
Definition: agents.h:5549
long __cdecl _InterlockedIncrement(long volatile *)
template<class _TargetLinkRegistry, class _MessageProcessorType = ordered_message_processor<typename _TargetLinkRegistry::type::type>>
virtual void Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >::async_send ( _Inout_opt_ message< _Target_type > *  _Msg)
inlineprotectedvirtual

Asynchronously queues up messages and starts a propagation task, if this has not been done already

Parameters
_MsgA pointer to a message object to asynchronously send.
5436  {
5437  _M_messageProcessor.async_send(_Msg);
5438  }
_MessageProcessorType _M_messageProcessor
Processor used for asynchronous message handling
Definition: agents.h:5491
template<class _TargetLinkRegistry, class _MessageProcessorType = ordered_message_processor<typename _TargetLinkRegistry::type::type>>
virtual message<_Target_type>* Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >::consume ( runtime_object_identity  _MsgId,
_Inout_ ITarget< _Target_type > *  _PTarget 
)
inlinevirtual

Consumes a message previously offered by this source_block object and successfully reserved by the target, transferring ownership to the caller.

Parameters
_MsgIdThe runtime_object_identity of the reserved message object.
_PTargetA pointer to the target block that is calling the consume method.
Returns
A pointer to the message object that the caller now has ownership of.

The method throws an invalid_argument exception if the parameter _PTarget is NULL.

The method throws a bad_target exception if the parameter _PTarget does not represent the target that called reserve.

The consume method is similar to accept, but must always be preceded by a call to reserve that returned true.

Implements Concurrency::ISource< _TargetLinkRegistry::type::type >.

5109  {
5110  _R_lock _Lock(_M_internalLock);
5111 
5112  if (_PTarget == NULL)
5113  {
5114  throw std::invalid_argument("_PTarget");
5115  }
5116 
5117  if (_M_pReservedFor == NULL || _PTarget != _M_pReservedFor)
5118  {
5119  throw bad_target();
5120  }
5121 
5122  message<_Target_type> * _Msg = consume_message(_MsgId);
5123 
5124  if (_Msg != NULL)
5125  {
5126  // Clear the reservation
5127  // _M_pReservedId is intentionally not reset so that it can assist in debugging
5129 
5130  // Reservation is assumed to block propagation. Notify that propagation can now be resumed
5132  }
5133 
5134  return _Msg;
5135  }
virtual message< _Target_type > * consume_message(runtime_object_identity _MsgId)=0
When overridden in a derived class, consumes a message that was previously reserved.
virtual void resume_propagation()=0
When overridden in a derived class, resumes propagation after a reservation has been released...
::Concurrency::details::_ReentrantPPLLock::_Scoped_lock _R_lock
A lock holder that acquires a reentrant lock on instantiation and releases it on destruction ...
Definition: agents.h:65
::Concurrency::details::_ReentrantPPLLock _M_internalLock
Internal lock used for the following synchronization:
Definition: agents.h:5547
ITarget< _Target_type > * _M_pReservedFor
Connected target that is holding a reservation
Definition: agents.h:5473
#define NULL
Definition: corecrt.h:158
template<class _TargetLinkRegistry, class _MessageProcessorType = ordered_message_processor<typename _TargetLinkRegistry::type::type>>
virtual message<_Target_type>* Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >::consume_message ( runtime_object_identity  _MsgId)
protectedpure virtual

When overridden in a derived class, consumes a message that was previously reserved.

Parameters
_MsgIdThe runtime_object_identity of the message object being consumed.
Returns
A pointer to the message that the caller now has ownership of.

Similar to accept, but is always preceded by a call to reserve.

Implemented in Concurrency::_Join_node< _Type, _Destination_type, _Jtype >, Concurrency::_Order_node_base< _Type >, Concurrency::join< _Type, _Jtype >, Concurrency::single_assignment< _Type >, Concurrency::single_assignment< size_t >, Concurrency::timer< _Type >, Concurrency::transformer< _Input, _Output >, Concurrency::overwrite_buffer< _Type >, Concurrency::overwrite_buffer< agent_status >, and Concurrency::unbounded_buffer< _Type >.

template<class _TargetLinkRegistry, class _MessageProcessorType = ordered_message_processor<typename _TargetLinkRegistry::type::type>>
void Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >::enable_batched_processing ( )
inlineprotected

Enables batched processing for this block.

5398  {
5399  // Register callbacks for CRT110 batched processing
5400  _M_messageProcessor.initialize_batched_processing(
5401  // Processing function used by CRT110
5402  [this](message<_Target_type> * _PMessage)
5403  {
5404  // Handle message through new process_input_message to use CRT110 batch processing
5405  this->process_input_messages(_PMessage);
5406  },
5407  [this](void)
5408  {
5409  this->_Propagate_message();
5410  });
5411  }
_MessageProcessorType _M_messageProcessor
Processor used for asynchronous message handling
Definition: agents.h:5491
void _Propagate_message()
Definition: agents.h:5520
_CRT_BEGIN_C_HEADER typedef void(__CRTDECL *terminate_handler)()
virtual void process_input_messages(_Inout_ message< _Target_type > *)
Process input messages. This is only useful for propagator blocks, which derive from source_block ...
Definition: agents.h:5340
template<class _TargetLinkRegistry, class _MessageProcessorType = ordered_message_processor<typename _TargetLinkRegistry::type::type>>
void Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >::initialize_source ( _Inout_opt_ Scheduler *  _PScheduler = NULL,
_Inout_opt_ ScheduleGroup *  _PScheduleGroup = NULL 
)
inlineprotected

Initializes the message_propagator within this source_block.

Parameters
_PSchedulerThe scheduler to be used for scheduling tasks.
_PScheduleGroupThe schedule group to be used for scheduling tasks.
See also
Scheduler Class, ScheduleGroup Class
5384  {
5385  // Register a callback
5386  _M_messageProcessor.initialize(_PScheduler, _PScheduleGroup,
5387  [this](message<_Target_type> * _PMessage)
5388  {
5389  this->_Handle_message(_PMessage);
5390  });
5391  }
_MessageProcessorType _M_messageProcessor
Processor used for asynchronous message handling
Definition: agents.h:5491
void _Handle_message(message< _Target_type > *_PMessage)
Private methods.
Definition: agents.h:5501
template<class _TargetLinkRegistry, class _MessageProcessorType = ordered_message_processor<typename _TargetLinkRegistry::type::type>>
virtual void Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >::link_target ( _Inout_ ITarget< _Target_type > *  _PTarget)
inlinevirtual

Links a target block to this source_block object.

Parameters
_PTargetA pointer to an ITarget block to link to this source_block object.

The method throws an invalid_argument exception if the parameter _PTarget is NULL.

Implements Concurrency::ISource< _TargetLinkRegistry::type::type >.

4933  {
4934  _R_lock _Lock(_M_internalLock);
4935 
4936  if (_PTarget == NULL)
4937  {
4938  throw std::invalid_argument("_PTarget");
4939  }
4940 
4941  _M_connectedTargets.add(_PTarget);
4942  this->_Invoke_link_source(_PTarget);
4943  link_target_notification(_PTarget);
4944  }
_TargetLinkRegistry _M_connectedTargets
Connected targets
Definition: agents.h:5485
void _Invoke_link_source(ITarget< _TargetLinkRegistry::type::type > *_PLinkFrom)
Links this source to a target.
Definition: agents.h:2754
virtual void link_target_notification(_Inout_ ITarget< _Target_type > *)
A callback that notifies that a new target has been linked to this source_block object.
Definition: agents.h:5241
::Concurrency::details::_ReentrantPPLLock::_Scoped_lock _R_lock
A lock holder that acquires a reentrant lock on instantiation and releases it on destruction ...
Definition: agents.h:65
::Concurrency::details::_ReentrantPPLLock _M_internalLock
Internal lock used for the following synchronization:
Definition: agents.h:5547
#define NULL
Definition: corecrt.h:158
template<class _TargetLinkRegistry, class _MessageProcessorType = ordered_message_processor<typename _TargetLinkRegistry::type::type>>
virtual void Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >::link_target_notification ( _Inout_ ITarget< _Target_type > *  )
inlineprotectedvirtual

A callback that notifies that a new target has been linked to this source_block object.

Parameters
_PTargetThe ITarget block that was linked.
5242  {
5243  // By default, we restart propagation if there is no pending reservation
5244  if (_M_pReservedFor == NULL)
5245  {
5247  }
5248  }
ITarget< _Target_type > * _M_pReservedFor
Connected target that is holding a reservation
Definition: agents.h:5473
virtual void propagate_to_any_targets(_Inout_opt_ message< _Target_type > *_PMessage)
When overridden in a derived class, propagates the given message to any or all of the linked targets...
Definition: agents.h:5362
#define NULL
Definition: corecrt.h:158
template<class _TargetLinkRegistry, class _MessageProcessorType = ordered_message_processor<typename _TargetLinkRegistry::type::type>>
virtual void Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >::process_input_messages ( _Inout_ message< _Target_type > *  )
inlineprotectedvirtual
template<class _TargetLinkRegistry, class _MessageProcessorType = ordered_message_processor<typename _TargetLinkRegistry::type::type>>
virtual void Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >::propagate_output_messages ( )
inlineprotectedvirtual

Propagate messages to targets.

Reimplemented in Concurrency::unbounded_buffer< _Type >.

5350  {
5351  throw invalid_operation("To use batched processing, you must override propagate_output_messages in the message block.");
5352  }
template<class _TargetLinkRegistry, class _MessageProcessorType = ordered_message_processor<typename _TargetLinkRegistry::type::type>>
virtual void Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >::propagate_to_any_targets ( _Inout_opt_ message< _Target_type > *  _PMessage)
inlineprotectedvirtual

When overridden in a derived class, propagates the given message to any or all of the linked targets. This is the main propagation routine for message blocks.

Parameters
_PMessageA pointer to the message that is to be propagated.
5363  {
5364  (void) _PMessage;
5365  throw invalid_operation("To use ordered message processing, you must override propagate_to_any_targets in the message block.");
5366  }
_CRT_BEGIN_C_HEADER typedef void(__CRTDECL *terminate_handler)()
template<class _TargetLinkRegistry, class _MessageProcessorType = ordered_message_processor<typename _TargetLinkRegistry::type::type>>
virtual void Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >::release ( runtime_object_identity  _MsgId,
_Inout_ ITarget< _Target_type > *  _PTarget 
)
inlinevirtual

Releases a previous successful message reservation.

Parameters
_MsgIdThe runtime_object_identity of the reserved message object.
_PTargetA pointer to the target block that is calling the release method.

The method throws an invalid_argument exception if the parameter _PTarget is NULL.

The method throws a bad_target exception if the parameter _PTarget does not represent the target that called reserve.

Implements Concurrency::ISource< _TargetLinkRegistry::type::type >.

5158  {
5159  _R_lock _Lock(_M_internalLock);
5160 
5161  if (_PTarget == NULL)
5162  {
5163  throw std::invalid_argument("_PTarget");
5164  }
5165 
5166  if (_PTarget != _M_pReservedFor)
5167  {
5168  throw bad_target();
5169  }
5170 
5171  release_message(_MsgId);
5172 
5173  // Clear the reservation
5174  // _M_pReservedId is intentionally not reset so that it can assist in debugging
5176 
5177  // Reservation is assumed to block propagation. Notify that propagation can now be resumed
5179  }
virtual void resume_propagation()=0
When overridden in a derived class, resumes propagation after a reservation has been released...
virtual void release_message(runtime_object_identity _MsgId)=0
When overridden in a derived class, releases a previous message reservation.
::Concurrency::details::_ReentrantPPLLock::_Scoped_lock _R_lock
A lock holder that acquires a reentrant lock on instantiation and releases it on destruction ...
Definition: agents.h:65
::Concurrency::details::_ReentrantPPLLock _M_internalLock
Internal lock used for the following synchronization:
Definition: agents.h:5547
ITarget< _Target_type > * _M_pReservedFor
Connected target that is holding a reservation
Definition: agents.h:5473
#define NULL
Definition: corecrt.h:158
template<class _TargetLinkRegistry, class _MessageProcessorType = ordered_message_processor<typename _TargetLinkRegistry::type::type>>
virtual void Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >::release_message ( runtime_object_identity  _MsgId)
protectedpure virtual
template<class _TargetLinkRegistry, class _MessageProcessorType = ordered_message_processor<typename _TargetLinkRegistry::type::type>>
virtual void Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >::release_ref ( _Inout_ ITarget< _Target_type > *  _PTarget)
inlinevirtual

Releases a reference count on this source_block object.

Parameters
_PTargetA pointer to the target block that is calling this method.

This method is called by an ITarget object that is being unlinked from this source. The source block is allowed to release any resources reserved for the target block.

Implements Concurrency::ISource< _TargetLinkRegistry::type::type >.

5209  {
5210  if (_PTarget != NULL)
5211  {
5212  _R_lock _Lock(_M_internalLock);
5213 
5214  // We assume that each target would keep a single reference on its source, so
5215  // we call unlink target notification on every release. Otherwise, we would be
5216  // required to keep a reference count per target.
5217  // Note: unlink_target_notification can check the value of this _PTarget pointer, but
5218  // must not dereference it, as it may have already been deleted.
5219  unlink_target_notification(_PTarget);
5220  }
5221 
5223 
5224  // It is *unsafe* to touch the "this" pointer after decrementing the reference count
5225  }
volatile long _M_referenceCount
Definition: agents.h:5549
::Concurrency::details::_ReentrantPPLLock::_Scoped_lock _R_lock
A lock holder that acquires a reentrant lock on instantiation and releases it on destruction ...
Definition: agents.h:65
virtual void unlink_target_notification(_Inout_ ITarget< _Target_type > *_PTarget)
A callback that notifies that a target has been unlinked from this source_block object.
Definition: agents.h:5257
::Concurrency::details::_ReentrantPPLLock _M_internalLock
Internal lock used for the following synchronization:
Definition: agents.h:5547
long __cdecl _InterlockedDecrement(long volatile *)
#define NULL
Definition: corecrt.h:158
template<class _TargetLinkRegistry, class _MessageProcessorType = ordered_message_processor<typename _TargetLinkRegistry::type::type>>
void Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >::remove_targets ( )
inlineprotected

Removes all target links for this source block. This should be called from the destructor.

5456  {
5457  // Wait for outstanding propagation to complete.
5459 
5460  unlink_targets();
5461 
5462  _Wait_on_ref();
5463  }
virtual void unlink_targets()
Unlinks all target blocks from this source_block object.
Definition: agents.h:4978
void wait_for_outstanding_async_sends()
Waits for all asynchronous propagations to complete. This propagator-specific spin wait is used in de...
Definition: agents.h:5446
void _Wait_on_ref(long _RefCount=0)
Definition: agents.h:5529
template<class _TargetLinkRegistry, class _MessageProcessorType = ordered_message_processor<typename _TargetLinkRegistry::type::type>>
virtual bool Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >::reserve ( runtime_object_identity  _MsgId,
_Inout_ ITarget< _Target_type > *  _PTarget 
)
inlinevirtual

Reserves a message previously offered by this source_block object.

Parameters
_MsgIdThe runtime_object_identity of the offered message object.
_PTargetA pointer to the target block that is calling the reserve method.
Returns
true if the message was successfully reserved, false otherwise. Reservations can fail for many reasons, including: the message was already reserved or accepted by another target, the source could deny reservations, and so forth.

The method throws an invalid_argument exception if the parameter _PTarget is NULL.

After you call reserve, if it succeeds, you must call either consume or release in order to take or give up possession of the message, respectively.

Implements Concurrency::ISource< _TargetLinkRegistry::type::type >.

5053  {
5054  _R_lock _Lock(_M_internalLock);
5055 
5056  if (_PTarget == NULL)
5057  {
5058  throw std::invalid_argument("_PTarget");
5059  }
5060 
5061  if ( _M_pReservedFor != NULL)
5062  {
5063  // Someone else is holding the reservation
5064  return false;
5065  }
5066 
5067  if (!reserve_message(_MsgId))
5068  {
5069  // Failed to reserve the msg ID
5070  return false;
5071  }
5072 
5073  // Save the reserving target and the msg ID
5074  _M_pReservedFor = _PTarget;
5075  _M_reservedId = _MsgId;
5076 
5077  return true;
5078  }
virtual bool reserve_message(runtime_object_identity _MsgId)=0
When overridden in a derived class, reserves a message previously offered by this source_block object...
::Concurrency::details::_ReentrantPPLLock::_Scoped_lock _R_lock
A lock holder that acquires a reentrant lock on instantiation and releases it on destruction ...
Definition: agents.h:65
::Concurrency::details::_ReentrantPPLLock _M_internalLock
Internal lock used for the following synchronization:
Definition: agents.h:5547
runtime_object_identity _M_reservedId
Reserved message ID
Definition: agents.h:5479
ITarget< _Target_type > * _M_pReservedFor
Connected target that is holding a reservation
Definition: agents.h:5473
#define NULL
Definition: corecrt.h:158
template<class _TargetLinkRegistry, class _MessageProcessorType = ordered_message_processor<typename _TargetLinkRegistry::type::type>>
virtual bool Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >::reserve_message ( runtime_object_identity  _MsgId)
protectedpure virtual

When overridden in a derived class, reserves a message previously offered by this source_block object.

Parameters
_MsgIdThe runtime_object_identity of the message object being reserved.
Returns
true if the message was successfully reserved, false otherwise.

After reserve is called, if it returns true, either consume or release must be called to either take or release ownership of the message.

Implemented in Concurrency::_Join_node< _Type, _Destination_type, _Jtype >, Concurrency::_Order_node_base< _Type >, Concurrency::join< _Type, _Jtype >, Concurrency::single_assignment< _Type >, Concurrency::single_assignment< size_t >, Concurrency::timer< _Type >, Concurrency::transformer< _Input, _Output >, Concurrency::overwrite_buffer< _Type >, Concurrency::overwrite_buffer< agent_status >, and Concurrency::unbounded_buffer< _Type >.

template<class _TargetLinkRegistry, class _MessageProcessorType = ordered_message_processor<typename _TargetLinkRegistry::type::type>>
virtual void Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >::resume_propagation ( )
protectedpure virtual
template<class _TargetLinkRegistry, class _MessageProcessorType = ordered_message_processor<typename _TargetLinkRegistry::type::type>>
virtual void Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >::sync_send ( _Inout_opt_ message< _Target_type > *  _Msg)
inlineprotectedvirtual

Synchronously queues up messages and starts a propagation task, if this has not been done already.

Parameters
_MsgA pointer to a message object to synchronously send.
5422  {
5423  // Caller shall not be holding any locks when calling this routine
5424  _M_messageProcessor.sync_send(_Msg);
5425  }
_MessageProcessorType _M_messageProcessor
Processor used for asynchronous message handling
Definition: agents.h:5491
template<class _TargetLinkRegistry, class _MessageProcessorType = ordered_message_processor<typename _TargetLinkRegistry::type::type>>
virtual void Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >::unlink_target ( _Inout_ ITarget< _Target_type > *  _PTarget)
inlinevirtual

Unlinks a target block from this source_block object.

Parameters
_PTargetA pointer to an ITarget block to unlink from this source_block object.

The method throws an invalid_argument exception if the parameter _PTarget is NULL.

Implements Concurrency::ISource< _TargetLinkRegistry::type::type >.

4958  {
4959  _R_lock _Lock(_M_internalLock);
4960 
4961  if (_PTarget == NULL)
4962  {
4963  throw std::invalid_argument("_PTarget");
4964  }
4965 
4966  if (_M_connectedTargets.remove(_PTarget))
4967  {
4968  // We were able to remove the target from our list.
4969  // Inform the target to unlink from us
4970  this->_Invoke_unlink_source(_PTarget);
4971  }
4972  }
_TargetLinkRegistry _M_connectedTargets
Connected targets
Definition: agents.h:5485
void _Invoke_unlink_source(ITarget< _TargetLinkRegistry::type::type > *_PUnlinkFrom)
Unlinks this source from a target.
Definition: agents.h:2773
::Concurrency::details::_ReentrantPPLLock::_Scoped_lock _R_lock
A lock holder that acquires a reentrant lock on instantiation and releases it on destruction ...
Definition: agents.h:65
::Concurrency::details::_ReentrantPPLLock _M_internalLock
Internal lock used for the following synchronization:
Definition: agents.h:5547
#define NULL
Definition: corecrt.h:158
template<class _TargetLinkRegistry, class _MessageProcessorType = ordered_message_processor<typename _TargetLinkRegistry::type::type>>
virtual void Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >::unlink_target_notification ( _Inout_ ITarget< _Target_type > *  _PTarget)
inlineprotectedvirtual

A callback that notifies that a target has been unlinked from this source_block object.

Parameters
_PTargetThe ITarget block that was unlinked.
5258  {
5259  // At this point, the target has already been disconnected from the
5260  // source. It is safe to check the value of this pointer, but not
5261  // safe to dereference it, as it may have already been deleted.
5262 
5263  // If the target being unlinked is the one holding the reservation,
5264  // release the reservation
5265  if (_M_pReservedFor == _PTarget)
5266  {
5267  release(_M_reservedId, _PTarget);
5268  }
5269  }
virtual void release(runtime_object_identity _MsgId, _Inout_ ITarget< _Target_type > *_PTarget)
Releases a previous successful message reservation.
Definition: agents.h:5157
runtime_object_identity _M_reservedId
Reserved message ID
Definition: agents.h:5479
ITarget< _Target_type > * _M_pReservedFor
Connected target that is holding a reservation
Definition: agents.h:5473
template<class _TargetLinkRegistry, class _MessageProcessorType = ordered_message_processor<typename _TargetLinkRegistry::type::type>>
virtual void Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >::unlink_targets ( )
inlinevirtual

Unlinks all target blocks from this source_block object.

Implements Concurrency::ISource< _TargetLinkRegistry::type::type >.

4979  {
4980  _R_lock _Lock(_M_internalLock);
4981 
4982  for (target_iterator _Iter = _M_connectedTargets.begin(); *_Iter != NULL; ++_Iter)
4983  {
4984  ITarget<_Target_type> * _PTarget = *_Iter;
4985  _CONCRT_ASSERT(_PTarget != NULL);
4986 
4987  unlink_target(_PTarget);
4988  }
4989 
4990  // All the targets should be unlinked.
4991  _CONCRT_ASSERT(_M_connectedTargets.count() == 0);
4992  }
_TargetLinkRegistry::iterator target_iterator
The iterator to walk the connected targets.
Definition: agents.h:4893
#define _CONCRT_ASSERT(x)
Definition: concrt.h:123
_TargetLinkRegistry _M_connectedTargets
Connected targets
Definition: agents.h:5485
::Concurrency::details::_ReentrantPPLLock::_Scoped_lock _R_lock
A lock holder that acquires a reentrant lock on instantiation and releases it on destruction ...
Definition: agents.h:65
::Concurrency::details::_ReentrantPPLLock _M_internalLock
Internal lock used for the following synchronization:
Definition: agents.h:5547
#define NULL
Definition: corecrt.h:158
virtual void unlink_target(_Inout_ ITarget< _Target_type > *_PTarget)
Unlinks a target block from this source_block object.
Definition: agents.h:4957
template<class _TargetLinkRegistry, class _MessageProcessorType = ordered_message_processor<typename _TargetLinkRegistry::type::type>>
void Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >::wait_for_outstanding_async_sends ( )
inlineprotected

Waits for all asynchronous propagations to complete. This propagator-specific spin wait is used in destructors of message blocks to make sure that all asynchronous propagations have time to finish before destroying the block.

5447  {
5448  _M_messageProcessor.wait();
5449  }
_MessageProcessorType _M_messageProcessor
Processor used for asynchronous message handling
Definition: agents.h:5491

Member Data Documentation

template<class _TargetLinkRegistry, class _MessageProcessorType = ordered_message_processor<typename _TargetLinkRegistry::type::type>>
_TargetLinkRegistry Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >::_M_connectedTargets
protected

Connected targets

template<class _TargetLinkRegistry, class _MessageProcessorType = ordered_message_processor<typename _TargetLinkRegistry::type::type>>
::Concurrency::details::_ReentrantPPLLock Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >::_M_internalLock
private

Internal lock used for the following synchronization:

  1. Synchronize between link and unlink target
  2. Synchronize between propagate_to_any_targets and unlink_target
  3. Synchronize between reserve and consume/release
template<class _TargetLinkRegistry, class _MessageProcessorType = ordered_message_processor<typename _TargetLinkRegistry::type::type>>
_MessageProcessorType Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >::_M_messageProcessor
protected

Processor used for asynchronous message handling

template<class _TargetLinkRegistry, class _MessageProcessorType = ordered_message_processor<typename _TargetLinkRegistry::type::type>>
ITarget<_Target_type>* Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >::_M_pReservedFor
protected

Connected target that is holding a reservation

template<class _TargetLinkRegistry, class _MessageProcessorType = ordered_message_processor<typename _TargetLinkRegistry::type::type>>
volatile long Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >::_M_referenceCount
private
template<class _TargetLinkRegistry, class _MessageProcessorType = ordered_message_processor<typename _TargetLinkRegistry::type::type>>
runtime_object_identity Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >::_M_reservedId
protected

Reserved message ID


The documentation for this class was generated from the following file: