STLdoc
STLdocumentation
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
Public Types | Public Member Functions | Protected Member Functions | Protected Attributes | Private Member Functions | Private Attributes | List of all members
Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType > Class Template Referenceabstract

The source_block class is an abstract base class for source-only blocks. The class provides basic link management functionality as well as common error checks. More...

#include <agents.h>

Inheritance diagram for Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >:
Concurrency::ISource< _TargetLinkRegistry::type::type > Concurrency::propagator_block< _TargetLinkRegistry, _SourceLinkRegistry, _MessageProcessorType >

Public Types

typedef _TargetLinkRegistry::type::type _Target_type
 The payload type of messages handled by this source_block. More...
 
typedef _TargetLinkRegistry::iterator target_iterator
 The iterator to walk the connected targets. More...
 
- Public Types inherited from Concurrency::ISource< _TargetLinkRegistry::type::type >
typedef _TargetLinkRegistry::type::type source_type
 A type alias for _Type . More...
 

Public Member Functions

 source_block ()
 Constructs a source_block object. More...
 
virtual ~source_block ()
 Destroys the source_block object. More...
 
virtual void link_target (_Inout_ ITarget< _Target_type > *_PTarget)
 Links a target block to this source_block object. More...
 
virtual void unlink_target (_Inout_ ITarget< _Target_type > *_PTarget)
 Unlinks a target block from this source_block object. More...
 
virtual void unlink_targets ()
 Unlinks all target blocks from this source_block object. More...
 
virtual message< _Target_type > * accept (runtime_object_identity _MsgId, _Inout_ ITarget< _Target_type > *_PTarget)
 Accepts a message that was offered by this source_block object, transferring ownership to the caller. More...
 
virtual bool reserve (runtime_object_identity _MsgId, _Inout_ ITarget< _Target_type > *_PTarget)
 Reserves a message previously offered by this source_block object. More...
 
virtual message< _Target_type > * consume (runtime_object_identity _MsgId, _Inout_ ITarget< _Target_type > *_PTarget)
 Consumes a message previously offered by this source_block object and successfully reserved by the target, transferring ownership to the caller. More...
 
virtual void release (runtime_object_identity _MsgId, _Inout_ ITarget< _Target_type > *_PTarget)
 Releases a previous successful message reservation. More...
 
virtual void acquire_ref (_Inout_ ITarget< _Target_type > *)
 Acquires a reference count on this source_block object, to prevent deletion. More...
 
virtual void release_ref (_Inout_ ITarget< _Target_type > *_PTarget)
 Releases a reference count on this source_block object. More...
 
- Public Member Functions inherited from Concurrency::ISource< _TargetLinkRegistry::type::type >
virtual ~ISource ()
 Destroys the ISource object. More...
 

Protected Member Functions

virtual void link_target_notification (_Inout_ ITarget< _Target_type > *)
 A callback that notifies that a new target has been linked to this source_block object. More...
 
virtual void unlink_target_notification (_Inout_ ITarget< _Target_type > *_PTarget)
 A callback that notifies that a target has been unlinked from this source_block object. More...
 
virtual message< _Target_type > * accept_message (runtime_object_identity _MsgId)=0
 When overridden in a derived class, accepts an offered message by the source. Message blocks should override this method to validate the _MsgId and return a message. More...
 
virtual bool reserve_message (runtime_object_identity _MsgId)=0
 When overridden in a derived class, reserves a message previously offered by this source_block object. More...
 
virtual message< _Target_type > * consume_message (runtime_object_identity _MsgId)=0
 When overridden in a derived class, consumes a message that was previously reserved. More...
 
virtual void release_message (runtime_object_identity _MsgId)=0
 When overridden in a derived class, releases a previous message reservation. More...
 
virtual void resume_propagation ()=0
 When overridden in a derived class, resumes propagation after a reservation has been released. More...
 
virtual void process_input_messages (_Inout_ message< _Target_type > *_PMessage)
 Process input messages. This is only useful for propagator blocks, which derive from source_block More...
 
virtual void propagate_output_messages ()
 Propagate messages to targets. More...
 
virtual void propagate_to_any_targets (_Inout_opt_ message< _Target_type > *_PMessage)
 When overridden in a derived class, propagates the given message to any or all of the linked targets. This is the main propagation routine for message blocks. More...
 
void initialize_source (_Inout_opt_ Scheduler *_PScheduler=NULL, _Inout_opt_ ScheduleGroup *_PScheduleGroup=NULL)
 Initializes the message_propagator within this source_block. More...
 
void enable_batched_processing ()
 Enables batched processing for this block. More...
 
virtual void sync_send (_Inout_opt_ message< _Target_type > *_Msg)
 Synchronously queues up messages and starts a propagation task, if this has not been done already. More...
 
virtual void async_send (_Inout_opt_ message< _Target_type > *_Msg)
 Asynchronously queues up messages and starts a propagation task, if this has not been done already More...
 
void wait_for_outstanding_async_sends ()
 Waits for all asynchronous propagations to complete. This propagator-specific spin wait is used in destructors of message blocks to make sure that all asynchronous propagations have time to finish before destroying the block. More...
 
void remove_targets ()
 Removes all target links for this source block. This should be called from the destructor. More...
 
- Protected Member Functions inherited from Concurrency::ISource< _TargetLinkRegistry::type::type >
void _Invoke_link_source (ITarget< _TargetLinkRegistry::type::type > *_PLinkFrom)
 Links this source to a target. More...
 
void _Invoke_unlink_source (ITarget< _TargetLinkRegistry::type::type > *_PUnlinkFrom)
 Unlinks this source from a target. More...
 

Protected Attributes

ITarget< _Target_type > * _M_pReservedFor
 Connected target that is holding a reservation More...
 
runtime_object_identity _M_reservedId
 Reserved message ID More...
 
_TargetLinkRegistry _M_connectedTargets
 Connected targets More...
 
_MessageProcessorType _M_messageProcessor
 Processor used for asynchronous message handling More...
 

Private Member Functions

void _Handle_message (message< _Target_type > *_PMessage)
 Private methods. More...
 
void _Process_message (message< _Target_type > *_PMessage)
 
void _Propagate_message ()
 
void _Wait_on_ref (long _RefCount=0)
 

Private Attributes

::Concurrency::details::_ReentrantPPLLock _M_internalLock
 Internal lock used for the following synchronization: More...
 
volatile long _M_referenceCount
 

Detailed Description

template<class _TargetLinkRegistry, class _MessageProcessorType = ordered_message_processor<typename _TargetLinkRegistry::type::type>>
class Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >

The source_block class is an abstract base class for source-only blocks. The class provides basic link management functionality as well as common error checks.

Template Parameters
_TargetLinkRegistryLink registry to be used for holding the target links.
_MessageProcessorTypeProcessor type for message processing.

Message blocks should derive from this block to take advantage of link management and synchronization provided by this class.

See also
ISource Class

Member Typedef Documentation

template<class _TargetLinkRegistry, class _MessageProcessorType = ordered_message_processor<typename _TargetLinkRegistry::type::type>>
typedef _TargetLinkRegistry::type::type Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >::_Target_type

The payload type of messages handled by this source_block.

template<class _TargetLinkRegistry, class _MessageProcessorType = ordered_message_processor<typename _TargetLinkRegistry::type::type>>
typedef _TargetLinkRegistry::iterator Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >::target_iterator

The iterator to walk the connected targets.

Constructor & Destructor Documentation

template<class _TargetLinkRegistry, class _MessageProcessorType = ordered_message_processor<typename _TargetLinkRegistry::type::type>>
Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >::source_block ( )
inline

Constructs a source_block object.

3248  :
3250  _M_reservedId(-1),
3252  {
3256  }
volatile long _M_referenceCount
Definition: agents.h:3897
_MessageProcessorType _M_messageProcessor
Processor used for asynchronous message handling
Definition: agents.h:3839
An event type that represents the creation of an object
Definition: concrt.h:5795
__int64 _Trace_agents_get_id(_Type *_PObject)
Definition: agents.h:434
#define NULL
Definition: crtdbg.h:30
runtime_object_identity _M_reservedId
Reserved message ID
Definition: agents.h:3827
ITarget< _Target_type > * _M_pReservedFor
Connected target that is holding a reservation
Definition: agents.h:3821
_CRTIMP void __cdecl _Trace_agents(Agents_EventType _Type, __int64 agentId,...)
template<class _TargetLinkRegistry, class _MessageProcessorType = ordered_message_processor<typename _TargetLinkRegistry::type::type>>
virtual Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >::~source_block ( )
inlinevirtual

Destroys the source_block object.

3263  {
3264  // All targets should have been unlinked
3265  _CONCRT_ASSERT(_M_connectedTargets.count() == 0);
3266 
3268  }
#define _CONCRT_ASSERT(x)
Definition: concrt.h:137
__int64 _Trace_agents_get_id(_Type *_PObject)
Definition: agents.h:434
_TargetLinkRegistry _M_connectedTargets
Connected targets
Definition: agents.h:3833
An event type that represents the deletion of an object
Definition: concrt.h:5813
_CRTIMP void __cdecl _Trace_agents(Agents_EventType _Type, __int64 agentId,...)

Member Function Documentation

template<class _TargetLinkRegistry, class _MessageProcessorType = ordered_message_processor<typename _TargetLinkRegistry::type::type>>
void Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >::_Handle_message ( message< _Target_type > *  _PMessage)
inlineprivate

Private methods.

3850  {
3851  // Hold a lock to synchronize with unlink targets
3852  _R_lock _Lock(_M_internalLock);
3853  propagate_to_any_targets(_PMessage);
3854  }
::Concurrency::details::_ReentrantPPLLock::_Scoped_lock _R_lock
A lock holder that acquires a reentrant lock on instantiation and releases it on destruction ...
Definition: agents.h:64
::Concurrency::details::_ReentrantPPLLock _M_internalLock
Internal lock used for the following synchronization:
Definition: agents.h:3895
virtual void propagate_to_any_targets(_Inout_opt_ message< _Target_type > *_PMessage)
When overridden in a derived class, propagates the given message to any or all of the linked targets...
Definition: agents.h:3711
template<class _TargetLinkRegistry, class _MessageProcessorType = ordered_message_processor<typename _TargetLinkRegistry::type::type>>
void Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >::_Process_message ( message< _Target_type > *  _PMessage)
inlineprivate
3860  {
3861  // Don't need a lock to process the message
3862  process_input_messages(_PMessage);
3863  }
virtual void process_input_messages(_Inout_ message< _Target_type > *_PMessage)
Process input messages. This is only useful for propagator blocks, which derive from source_block ...
Definition: agents.h:3689
template<class _TargetLinkRegistry, class _MessageProcessorType = ordered_message_processor<typename _TargetLinkRegistry::type::type>>
void Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >::_Propagate_message ( )
inlineprivate
3869  {
3870  // Hold a lock to synchronize with unlink targets
3871  _R_lock _Lock(_M_internalLock);
3873  }
::Concurrency::details::_ReentrantPPLLock::_Scoped_lock _R_lock
A lock holder that acquires a reentrant lock on instantiation and releases it on destruction ...
Definition: agents.h:64
::Concurrency::details::_ReentrantPPLLock _M_internalLock
Internal lock used for the following synchronization:
Definition: agents.h:3895
virtual void propagate_output_messages()
Propagate messages to targets.
Definition: agents.h:3698
template<class _TargetLinkRegistry, class _MessageProcessorType = ordered_message_processor<typename _TargetLinkRegistry::type::type>>
void Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >::_Wait_on_ref ( long  _RefCount = 0)
inlineprivate
3878  {
3880  while(_M_referenceCount != _RefCount)
3881  {
3882  spinWait._SpinOnce();
3883  }
3884  }
volatile long _M_referenceCount
Definition: agents.h:3897
Implements busy wait with no backoff
Definition: concrt.h:604
bool _SpinOnce()
Spins for one time quantum,until a maximum spin is reached.
Definition: concrt.h:652
template<class _TargetLinkRegistry, class _MessageProcessorType = ordered_message_processor<typename _TargetLinkRegistry::type::type>>
virtual message<_Target_type>* Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >::accept ( runtime_object_identity  _MsgId,
_Inout_ ITarget< _Target_type > *  _PTarget 
)
inlinevirtual

Accepts a message that was offered by this source_block object, transferring ownership to the caller.

Parameters
_MsgIdThe runtime_object_identity of the offered message object.
_PTargetA pointer to the target block that is calling the accept method.
Returns
A pointer to the message object that the caller now has ownership of.

The method throws an invalid_argument exception if the parameter _PTarget is NULL.

The accept method is called by a target while a message is being offered by this ISource block. The message pointer returned may be different from the one passed into the propagate method of the ITarget block, if this source decides to make a copy of the message.

Implements Concurrency::ISource< _TargetLinkRegistry::type::type >.

3366  {
3367  if (_PTarget == NULL)
3368  {
3369  throw std::invalid_argument("_PTarget");
3370  }
3371 
3372  // Assert if the target is not connected
3373  _CONCRT_ASSERT(_M_connectedTargets.contains(_PTarget));
3374 
3375  return accept_message(_MsgId);
3376  }
#define _CONCRT_ASSERT(x)
Definition: concrt.h:137
_TargetLinkRegistry _M_connectedTargets
Connected targets
Definition: agents.h:3833
#define NULL
Definition: crtdbg.h:30
virtual message< _Target_type > * accept_message(runtime_object_identity _MsgId)=0
When overridden in a derived class, accepts an offered message by the source. Message blocks should o...
template<class _TargetLinkRegistry, class _MessageProcessorType = ordered_message_processor<typename _TargetLinkRegistry::type::type>>
virtual message<_Target_type>* Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >::accept_message ( runtime_object_identity  _MsgId)
protectedpure virtual

When overridden in a derived class, accepts an offered message by the source. Message blocks should override this method to validate the _MsgId and return a message.

Parameters
_MsgIdThe runtime object identity of the message object.
Returns
A pointer to the message that the caller now has ownership of.

To transfer ownership, the original message pointer should be returned. To maintain ownership, a copy of message payload needs to be made and returned.

Implemented in Concurrency::_Join_node< _Type, _Destination_type, _Jtype >, Concurrency::_Non_greedy_node< _Type >, Concurrency::_Greedy_node< _Type >, Concurrency::_Reserving_node< _Type >, Concurrency::join< _Type, _Jtype >, Concurrency::single_assignment< _Type >, Concurrency::single_assignment< size_t >, Concurrency::timer< _Type >, Concurrency::transformer< _Input, _Output >, Concurrency::overwrite_buffer< _Type >, Concurrency::overwrite_buffer< agent_status >, and Concurrency::unbounded_buffer< _Type >.

template<class _TargetLinkRegistry, class _MessageProcessorType = ordered_message_processor<typename _TargetLinkRegistry::type::type>>
virtual void Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >::acquire_ref ( _Inout_ ITarget< _Target_type > *  )
inlinevirtual

Acquires a reference count on this source_block object, to prevent deletion.

Parameters
_PTargetA pointer to the target block that is calling this method.

This method is called by an ITarget object that is being linked to this source during the link_target method.

Implements Concurrency::ISource< _TargetLinkRegistry::type::type >.

3542  {
3544  }
volatile long _M_referenceCount
Definition: agents.h:3897
long __cdecl _InterlockedIncrement(long volatile *)
template<class _TargetLinkRegistry, class _MessageProcessorType = ordered_message_processor<typename _TargetLinkRegistry::type::type>>
virtual void Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >::async_send ( _Inout_opt_ message< _Target_type > *  _Msg)
inlineprotectedvirtual

Asynchronously queues up messages and starts a propagation task, if this has not been done already

Parameters
_MsgA pointer to a message object to asynchronously send.
3784  {
3785  _M_messageProcessor.async_send(_Msg);
3786  }
_MessageProcessorType _M_messageProcessor
Processor used for asynchronous message handling
Definition: agents.h:3839
template<class _TargetLinkRegistry, class _MessageProcessorType = ordered_message_processor<typename _TargetLinkRegistry::type::type>>
virtual message<_Target_type>* Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >::consume ( runtime_object_identity  _MsgId,
_Inout_ ITarget< _Target_type > *  _PTarget 
)
inlinevirtual

Consumes a message previously offered by this source_block object and successfully reserved by the target, transferring ownership to the caller.

Parameters
_MsgIdThe runtime_object_identity of the reserved message object.
_PTargetA pointer to the target block that is calling the consume method.
Returns
A pointer to the message object that the caller now has ownership of.

The method throws an invalid_argument exception if the parameter _PTarget is NULL.

The method throws a bad_target exception if the parameter _PTarget does not represent the target that called reserve.

The consume method is similar to accept, but must always be preceded by a call to reserve that returned true.

Implements Concurrency::ISource< _TargetLinkRegistry::type::type >.

3458  {
3459  _R_lock _Lock(_M_internalLock);
3460 
3461  if (_PTarget == NULL)
3462  {
3463  throw std::invalid_argument("_PTarget");
3464  }
3465 
3466  if (_M_pReservedFor == NULL || _PTarget != _M_pReservedFor)
3467  {
3468  throw bad_target();
3469  }
3470 
3471  message<_Target_type> * _Msg = consume_message(_MsgId);
3472 
3473  if (_Msg != NULL)
3474  {
3475  // Clear the reservation
3476  // _M_pReservedId is intentionally not reset so that it can assist in debugging
3478 
3479  // Reservation is assumed to block propagation. Notify that propagation can now be resumed
3481  }
3482 
3483  return _Msg;
3484  }
virtual message< _Target_type > * consume_message(runtime_object_identity _MsgId)=0
When overridden in a derived class, consumes a message that was previously reserved.
virtual void resume_propagation()=0
When overridden in a derived class, resumes propagation after a reservation has been released...
#define NULL
Definition: crtdbg.h:30
::Concurrency::details::_ReentrantPPLLock::_Scoped_lock _R_lock
A lock holder that acquires a reentrant lock on instantiation and releases it on destruction ...
Definition: agents.h:64
::Concurrency::details::_ReentrantPPLLock _M_internalLock
Internal lock used for the following synchronization:
Definition: agents.h:3895
ITarget< _Target_type > * _M_pReservedFor
Connected target that is holding a reservation
Definition: agents.h:3821
template<class _TargetLinkRegistry, class _MessageProcessorType = ordered_message_processor<typename _TargetLinkRegistry::type::type>>
virtual message<_Target_type>* Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >::consume_message ( runtime_object_identity  _MsgId)
protectedpure virtual

When overridden in a derived class, consumes a message that was previously reserved.

Parameters
_MsgIdThe runtime_object_identity of the message object being consumed.
Returns
A pointer to the message that the caller now has ownership of.

Similar to accept, but is always preceded by a call to reserve.

Implemented in Concurrency::_Join_node< _Type, _Destination_type, _Jtype >, Concurrency::_Order_node_base< _Type >, Concurrency::join< _Type, _Jtype >, Concurrency::single_assignment< _Type >, Concurrency::single_assignment< size_t >, Concurrency::timer< _Type >, Concurrency::transformer< _Input, _Output >, Concurrency::overwrite_buffer< _Type >, Concurrency::overwrite_buffer< agent_status >, and Concurrency::unbounded_buffer< _Type >.

template<class _TargetLinkRegistry, class _MessageProcessorType = ordered_message_processor<typename _TargetLinkRegistry::type::type>>
void Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >::enable_batched_processing ( )
inlineprotected

Enables batched processing for this block.

3746  {
3747  // Register callbacks for CRT110 batched processing
3748  _M_messageProcessor.initialize_batched_processing(
3749  // Processing function used by CRT110
3750  [this](message<_Target_type> * _PMessage)
3751  {
3752  // Handle message through new process_input_message to use CRT110 batch processing
3753  this->process_input_messages(_PMessage);
3754  },
3755  [this](void)
3756  {
3757  this->_Propagate_message();
3758  });
3759  }
_MessageProcessorType _M_messageProcessor
Processor used for asynchronous message handling
Definition: agents.h:3839
void _Propagate_message()
Definition: agents.h:3868
virtual void process_input_messages(_Inout_ message< _Target_type > *_PMessage)
Process input messages. This is only useful for propagator blocks, which derive from source_block ...
Definition: agents.h:3689
typedef void(__cdecl *_se_translator_function)(unsigned int
template<class _TargetLinkRegistry, class _MessageProcessorType = ordered_message_processor<typename _TargetLinkRegistry::type::type>>
void Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >::initialize_source ( _Inout_opt_ Scheduler *  _PScheduler = NULL,
_Inout_opt_ ScheduleGroup *  _PScheduleGroup = NULL 
)
inlineprotected

Initializes the message_propagator within this source_block.

Parameters
_PSchedulerThe scheduler to be used for scheduling tasks.
_PScheduleGroupThe schedule group to be used for scheduling tasks.
See also
Scheduler Class, ScheduleGroup Class
3732  {
3733  // Register a callback
3734  _M_messageProcessor.initialize(_PScheduler, _PScheduleGroup,
3735  [this](message<_Target_type> * _PMessage)
3736  {
3737  this->_Handle_message(_PMessage);
3738  });
3739  }
_MessageProcessorType _M_messageProcessor
Processor used for asynchronous message handling
Definition: agents.h:3839
void _Handle_message(message< _Target_type > *_PMessage)
Private methods.
Definition: agents.h:3849
template<class _TargetLinkRegistry, class _MessageProcessorType = ordered_message_processor<typename _TargetLinkRegistry::type::type>>
virtual void Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >::link_target ( _Inout_ ITarget< _Target_type > *  _PTarget)
inlinevirtual

Links a target block to this source_block object.

Parameters
_PTargetA pointer to an ITarget block to link to this source_block object.

The method throws an invalid_argument exception if the parameter _PTarget is NULL.

Implements Concurrency::ISource< _TargetLinkRegistry::type::type >.

3282  {
3283  _R_lock _Lock(_M_internalLock);
3284 
3285  if (_PTarget == NULL)
3286  {
3287  throw std::invalid_argument("_PTarget");
3288  }
3289 
3290  _M_connectedTargets.add(_PTarget);
3291  _Invoke_link_source(_PTarget);
3292  link_target_notification(_PTarget);
3293  }
_TargetLinkRegistry _M_connectedTargets
Connected targets
Definition: agents.h:3833
#define NULL
Definition: crtdbg.h:30
void _Invoke_link_source(ITarget< _TargetLinkRegistry::type::type > *_PLinkFrom)
Links this source to a target.
Definition: agents.h:2753
virtual void link_target_notification(_Inout_ ITarget< _Target_type > *)
A callback that notifies that a new target has been linked to this source_block object.
Definition: agents.h:3590
::Concurrency::details::_ReentrantPPLLock::_Scoped_lock _R_lock
A lock holder that acquires a reentrant lock on instantiation and releases it on destruction ...
Definition: agents.h:64
::Concurrency::details::_ReentrantPPLLock _M_internalLock
Internal lock used for the following synchronization:
Definition: agents.h:3895
template<class _TargetLinkRegistry, class _MessageProcessorType = ordered_message_processor<typename _TargetLinkRegistry::type::type>>
virtual void Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >::link_target_notification ( _Inout_ ITarget< _Target_type > *  )
inlineprotectedvirtual

A callback that notifies that a new target has been linked to this source_block object.

Parameters
_PTargetThe ITarget block that was linked.
3591  {
3592  // By default, we restart propagation if there is no pending resrvation
3593  if (_M_pReservedFor == NULL)
3594  {
3596  }
3597  }
#define NULL
Definition: crtdbg.h:30
ITarget< _Target_type > * _M_pReservedFor
Connected target that is holding a reservation
Definition: agents.h:3821
virtual void propagate_to_any_targets(_Inout_opt_ message< _Target_type > *_PMessage)
When overridden in a derived class, propagates the given message to any or all of the linked targets...
Definition: agents.h:3711
template<class _TargetLinkRegistry, class _MessageProcessorType = ordered_message_processor<typename _TargetLinkRegistry::type::type>>
virtual void Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >::process_input_messages ( _Inout_ message< _Target_type > *  _PMessage)
inlineprotectedvirtual
template<class _TargetLinkRegistry, class _MessageProcessorType = ordered_message_processor<typename _TargetLinkRegistry::type::type>>
virtual void Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >::propagate_output_messages ( )
inlineprotectedvirtual

Propagate messages to targets.

Reimplemented in Concurrency::unbounded_buffer< _Type >.

3699  {
3700  throw invalid_operation("To use batched processing, you must override propagate_output_messages in the message block.");
3701  }
template<class _TargetLinkRegistry, class _MessageProcessorType = ordered_message_processor<typename _TargetLinkRegistry::type::type>>
virtual void Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >::propagate_to_any_targets ( _Inout_opt_ message< _Target_type > *  _PMessage)
inlineprotectedvirtual

When overridden in a derived class, propagates the given message to any or all of the linked targets. This is the main propagation routine for message blocks.

Parameters
_PMessageA pointer to the message that is to be propagated.
3712  {
3713  throw invalid_operation("To use ordered message processing, you must override propagate_to_any_targets in the message block.");
3714  }
template<class _TargetLinkRegistry, class _MessageProcessorType = ordered_message_processor<typename _TargetLinkRegistry::type::type>>
virtual void Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >::release ( runtime_object_identity  _MsgId,
_Inout_ ITarget< _Target_type > *  _PTarget 
)
inlinevirtual

Releases a previous successful message reservation.

Parameters
_MsgIdThe runtime_object_identity of the reserved message object.
_PTargetA pointer to the target block that is calling the release method.

The method throws an invalid_argument exception if the parameter _PTarget is NULL.

The method throws a bad_target exception if the parameter _PTarget does not represent the target that called reserve.

Implements Concurrency::ISource< _TargetLinkRegistry::type::type >.

3507  {
3508  _R_lock _Lock(_M_internalLock);
3509 
3510  if (_PTarget == NULL)
3511  {
3512  throw std::invalid_argument("_PTarget");
3513  }
3514 
3515  if (_PTarget != _M_pReservedFor)
3516  {
3517  throw bad_target();
3518  }
3519 
3520  release_message(_MsgId);
3521 
3522  // Clear the reservation
3523  // _M_pReservedId is intentionally not reset so that it can assist in debugging
3525 
3526  // Reservation is assumed to block propagation. Notify that propagation can now be resumed
3528  }
virtual void resume_propagation()=0
When overridden in a derived class, resumes propagation after a reservation has been released...
virtual void release_message(runtime_object_identity _MsgId)=0
When overridden in a derived class, releases a previous message reservation.
#define NULL
Definition: crtdbg.h:30
::Concurrency::details::_ReentrantPPLLock::_Scoped_lock _R_lock
A lock holder that acquires a reentrant lock on instantiation and releases it on destruction ...
Definition: agents.h:64
::Concurrency::details::_ReentrantPPLLock _M_internalLock
Internal lock used for the following synchronization:
Definition: agents.h:3895
ITarget< _Target_type > * _M_pReservedFor
Connected target that is holding a reservation
Definition: agents.h:3821
template<class _TargetLinkRegistry, class _MessageProcessorType = ordered_message_processor<typename _TargetLinkRegistry::type::type>>
virtual void Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >::release_message ( runtime_object_identity  _MsgId)
protectedpure virtual
template<class _TargetLinkRegistry, class _MessageProcessorType = ordered_message_processor<typename _TargetLinkRegistry::type::type>>
virtual void Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >::release_ref ( _Inout_ ITarget< _Target_type > *  _PTarget)
inlinevirtual

Releases a reference count on this source_block object.

Parameters
_PTargetA pointer to the target block that is calling this method.

This method is called by an ITarget object that is being unlinked from this source. The source block is allowed to release any resources reserved for the target block.

Implements Concurrency::ISource< _TargetLinkRegistry::type::type >.

3558  {
3559  if (_PTarget != NULL)
3560  {
3561  _R_lock _Lock(_M_internalLock);
3562 
3563  // We assume that each target would keep a single reference on its source, so
3564  // we call unlink target notification on every release. Otherwise, we would be
3565  // required to keep a reference count per target.
3566  // Note: unlink_target_notification can check the value of this _PTarget pointer, but
3567  // must not dereference it, as it may have already been deleted.
3568  unlink_target_notification(_PTarget);
3569  }
3570 
3572 
3573  // It is *unsafe* to touch the "this" pointer after decrementing the reference count
3574  }
volatile long _M_referenceCount
Definition: agents.h:3897
#define NULL
Definition: crtdbg.h:30
::Concurrency::details::_ReentrantPPLLock::_Scoped_lock _R_lock
A lock holder that acquires a reentrant lock on instantiation and releases it on destruction ...
Definition: agents.h:64
virtual void unlink_target_notification(_Inout_ ITarget< _Target_type > *_PTarget)
A callback that notifies that a target has been unlinked from this source_block object.
Definition: agents.h:3606
::Concurrency::details::_ReentrantPPLLock _M_internalLock
Internal lock used for the following synchronization:
Definition: agents.h:3895
long __cdecl _InterlockedDecrement(long volatile *)
template<class _TargetLinkRegistry, class _MessageProcessorType = ordered_message_processor<typename _TargetLinkRegistry::type::type>>
void Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >::remove_targets ( )
inlineprotected

Removes all target links for this source block. This should be called from the destructor.

3804  {
3805  // Wait for outstanding propagation to complete.
3807 
3808  unlink_targets();
3809 
3810  _Wait_on_ref();
3811  }
virtual void unlink_targets()
Unlinks all target blocks from this source_block object.
Definition: agents.h:3327
void wait_for_outstanding_async_sends()
Waits for all asynchronous propagations to complete. This propagator-specific spin wait is used in de...
Definition: agents.h:3794
void _Wait_on_ref(long _RefCount=0)
Definition: agents.h:3877
template<class _TargetLinkRegistry, class _MessageProcessorType = ordered_message_processor<typename _TargetLinkRegistry::type::type>>
virtual bool Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >::reserve ( runtime_object_identity  _MsgId,
_Inout_ ITarget< _Target_type > *  _PTarget 
)
inlinevirtual

Reserves a message previously offered by this source_block object.

Parameters
_MsgIdThe runtime_object_identity of the offered message object.
_PTargetA pointer to the target block that is calling the reserve method.
Returns
true if the message was successfully reserved, false otherwise. Reservations can fail for many reasons, including: the message was already reserved or accepted by another target, the source could deny reservations, and so forth.

The method throws an invalid_argument exception if the parameter _PTarget is NULL.

After you call reserve, if it succeeds, you must call either consume or release in order to take or give up possession of the message, respectively.

Implements Concurrency::ISource< _TargetLinkRegistry::type::type >.

3402  {
3403  _R_lock _Lock(_M_internalLock);
3404 
3405  if (_PTarget == NULL)
3406  {
3407  throw std::invalid_argument("_PTarget");
3408  }
3409 
3410  if ( _M_pReservedFor != NULL)
3411  {
3412  // Someone else is holding the reservation
3413  return false;
3414  }
3415 
3416  if (!reserve_message(_MsgId))
3417  {
3418  // Failed to reserve the msg ID
3419  return false;
3420  }
3421 
3422  // Save the reserving target and the msg ID
3423  _M_pReservedFor = _PTarget;
3424  _M_reservedId = _MsgId;
3425 
3426  return true;
3427  }
#define NULL
Definition: crtdbg.h:30
virtual bool reserve_message(runtime_object_identity _MsgId)=0
When overridden in a derived class, reserves a message previously offered by this source_block object...
::Concurrency::details::_ReentrantPPLLock::_Scoped_lock _R_lock
A lock holder that acquires a reentrant lock on instantiation and releases it on destruction ...
Definition: agents.h:64
::Concurrency::details::_ReentrantPPLLock _M_internalLock
Internal lock used for the following synchronization:
Definition: agents.h:3895
runtime_object_identity _M_reservedId
Reserved message ID
Definition: agents.h:3827
ITarget< _Target_type > * _M_pReservedFor
Connected target that is holding a reservation
Definition: agents.h:3821
template<class _TargetLinkRegistry, class _MessageProcessorType = ordered_message_processor<typename _TargetLinkRegistry::type::type>>
virtual bool Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >::reserve_message ( runtime_object_identity  _MsgId)
protectedpure virtual

When overridden in a derived class, reserves a message previously offered by this source_block object.

Parameters
_MsgIdThe runtime_object_identity of the message object being reserved.
Returns
true if the message was successfully reserved, false otherwise.

After reserve is called, if it returns true, either consume or release must be called to either take or release ownership of the message.

Implemented in Concurrency::_Join_node< _Type, _Destination_type, _Jtype >, Concurrency::_Order_node_base< _Type >, Concurrency::join< _Type, _Jtype >, Concurrency::single_assignment< _Type >, Concurrency::single_assignment< size_t >, Concurrency::timer< _Type >, Concurrency::transformer< _Input, _Output >, Concurrency::overwrite_buffer< _Type >, Concurrency::overwrite_buffer< agent_status >, and Concurrency::unbounded_buffer< _Type >.

template<class _TargetLinkRegistry, class _MessageProcessorType = ordered_message_processor<typename _TargetLinkRegistry::type::type>>
virtual void Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >::resume_propagation ( )
protectedpure virtual
template<class _TargetLinkRegistry, class _MessageProcessorType = ordered_message_processor<typename _TargetLinkRegistry::type::type>>
virtual void Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >::sync_send ( _Inout_opt_ message< _Target_type > *  _Msg)
inlineprotectedvirtual

Synchronously queues up messages and starts a propagation task, if this has not been done already.

Parameters
_MsgA pointer to a message object to synchronously send.
3770  {
3771  // Caller shall not be holding any locks when calling this routine
3772  _M_messageProcessor.sync_send(_Msg);
3773  }
_MessageProcessorType _M_messageProcessor
Processor used for asynchronous message handling
Definition: agents.h:3839
template<class _TargetLinkRegistry, class _MessageProcessorType = ordered_message_processor<typename _TargetLinkRegistry::type::type>>
virtual void Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >::unlink_target ( _Inout_ ITarget< _Target_type > *  _PTarget)
inlinevirtual

Unlinks a target block from this source_block object.

Parameters
_PTargetA pointer to an ITarget block to unlink from this source_block object.

The method throws an invalid_argument exception if the parameter _PTarget is NULL.

Implements Concurrency::ISource< _TargetLinkRegistry::type::type >.

3307  {
3308  _R_lock _Lock(_M_internalLock);
3309 
3310  if (_PTarget == NULL)
3311  {
3312  throw std::invalid_argument("_PTarget");
3313  }
3314 
3315  if (_M_connectedTargets.remove(_PTarget))
3316  {
3317  // We were able to remove the target from our list.
3318  // Inform the target to unlink from us
3319  _Invoke_unlink_source(_PTarget);
3320  }
3321  }
_TargetLinkRegistry _M_connectedTargets
Connected targets
Definition: agents.h:3833
#define NULL
Definition: crtdbg.h:30
void _Invoke_unlink_source(ITarget< _TargetLinkRegistry::type::type > *_PUnlinkFrom)
Unlinks this source from a target.
Definition: agents.h:2772
::Concurrency::details::_ReentrantPPLLock::_Scoped_lock _R_lock
A lock holder that acquires a reentrant lock on instantiation and releases it on destruction ...
Definition: agents.h:64
::Concurrency::details::_ReentrantPPLLock _M_internalLock
Internal lock used for the following synchronization:
Definition: agents.h:3895
template<class _TargetLinkRegistry, class _MessageProcessorType = ordered_message_processor<typename _TargetLinkRegistry::type::type>>
virtual void Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >::unlink_target_notification ( _Inout_ ITarget< _Target_type > *  _PTarget)
inlineprotectedvirtual

A callback that notifies that a target has been unlinked from this source_block object.

Parameters
_PTargetThe ITarget block that was unlinked.
3607  {
3608  // At this point, the target has already been disconnected from the
3609  // source. It is safe to check the value of this pointer, but not
3610  // safe to dereference it, as it may have already been deleted.
3611 
3612  // If the target being unlinked is the one holding the reservation,
3613  // release the reservation
3614  if (_M_pReservedFor == _PTarget)
3615  {
3616  release(_M_reservedId, _PTarget);
3617  }
3618  }
virtual void release(runtime_object_identity _MsgId, _Inout_ ITarget< _Target_type > *_PTarget)
Releases a previous successful message reservation.
Definition: agents.h:3506
runtime_object_identity _M_reservedId
Reserved message ID
Definition: agents.h:3827
ITarget< _Target_type > * _M_pReservedFor
Connected target that is holding a reservation
Definition: agents.h:3821
template<class _TargetLinkRegistry, class _MessageProcessorType = ordered_message_processor<typename _TargetLinkRegistry::type::type>>
virtual void Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >::unlink_targets ( )
inlinevirtual

Unlinks all target blocks from this source_block object.

Implements Concurrency::ISource< _TargetLinkRegistry::type::type >.

3328  {
3329  _R_lock _Lock(_M_internalLock);
3330 
3331  for (target_iterator _Iter = _M_connectedTargets.begin(); *_Iter != NULL; ++_Iter)
3332  {
3333  ITarget<_Target_type> * _PTarget = *_Iter;
3334  _CONCRT_ASSERT(_PTarget != NULL);
3335 
3336  unlink_target(_PTarget);
3337  }
3338 
3339  // All the targets should be unlinked.
3340  _CONCRT_ASSERT(_M_connectedTargets.count() == 0);
3341  }
_TargetLinkRegistry::iterator target_iterator
The iterator to walk the connected targets.
Definition: agents.h:3242
#define _CONCRT_ASSERT(x)
Definition: concrt.h:137
_TargetLinkRegistry _M_connectedTargets
Connected targets
Definition: agents.h:3833
#define NULL
Definition: crtdbg.h:30
::Concurrency::details::_ReentrantPPLLock::_Scoped_lock _R_lock
A lock holder that acquires a reentrant lock on instantiation and releases it on destruction ...
Definition: agents.h:64
::Concurrency::details::_ReentrantPPLLock _M_internalLock
Internal lock used for the following synchronization:
Definition: agents.h:3895
virtual void unlink_target(_Inout_ ITarget< _Target_type > *_PTarget)
Unlinks a target block from this source_block object.
Definition: agents.h:3306
template<class _TargetLinkRegistry, class _MessageProcessorType = ordered_message_processor<typename _TargetLinkRegistry::type::type>>
void Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >::wait_for_outstanding_async_sends ( )
inlineprotected

Waits for all asynchronous propagations to complete. This propagator-specific spin wait is used in destructors of message blocks to make sure that all asynchronous propagations have time to finish before destroying the block.

3795  {
3796  _M_messageProcessor.wait();
3797  }
_MessageProcessorType _M_messageProcessor
Processor used for asynchronous message handling
Definition: agents.h:3839

Member Data Documentation

template<class _TargetLinkRegistry, class _MessageProcessorType = ordered_message_processor<typename _TargetLinkRegistry::type::type>>
_TargetLinkRegistry Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >::_M_connectedTargets
protected

Connected targets

template<class _TargetLinkRegistry, class _MessageProcessorType = ordered_message_processor<typename _TargetLinkRegistry::type::type>>
::Concurrency::details::_ReentrantPPLLock Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >::_M_internalLock
private

Internal lock used for the following synchronization:

  1. Synchronize between link and unlink target
  2. Synchronize between propagate_to_any_targets and unlink_target
  3. Synchronize between reserve and consume/release
template<class _TargetLinkRegistry, class _MessageProcessorType = ordered_message_processor<typename _TargetLinkRegistry::type::type>>
_MessageProcessorType Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >::_M_messageProcessor
protected

Processor used for asynchronous message handling

template<class _TargetLinkRegistry, class _MessageProcessorType = ordered_message_processor<typename _TargetLinkRegistry::type::type>>
ITarget<_Target_type>* Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >::_M_pReservedFor
protected

Connected target that is holding a reservation

template<class _TargetLinkRegistry, class _MessageProcessorType = ordered_message_processor<typename _TargetLinkRegistry::type::type>>
volatile long Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >::_M_referenceCount
private
template<class _TargetLinkRegistry, class _MessageProcessorType = ordered_message_processor<typename _TargetLinkRegistry::type::type>>
runtime_object_identity Concurrency::source_block< _TargetLinkRegistry, _MessageProcessorType >::_M_reservedId
protected

Reserved message ID


The documentation for this class was generated from the following file: