STLdoc
STLdocumentation
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
Classes | Public Types | Public Member Functions | Protected Member Functions | Private Types | Private Member Functions | Private Attributes | List of all members
Concurrency::join< _Type, _Jtype > Class Template Reference

A join messaging block is a single-target, multi-source, ordered propagator_block which combines together messages of type _Type from each of its sources. More...

#include <agents.h>

Inheritance diagram for Concurrency::join< _Type, _Jtype >:
Concurrency::propagator_block< single_link_registry< ITarget< std::vector< _Type > > >, multi_link_registry< ISource< _Type > > > Concurrency::source_block< single_link_registry< ITarget< std::vector< _Type > > >, _MessageProcessorType > Concurrency::ITarget< multi_link_registry< ISource< _Type > >::type::source_type > Concurrency::ISource< single_link_registry< ITarget< std::vector< _Type > > >::type::type >

Classes

struct  _MessageArray
 
struct  _SavedMessageIdArray
 

Public Types

typedef std::vector< _Type_OutputType
 
- Public Types inherited from Concurrency::propagator_block< single_link_registry< ITarget< std::vector< _Type > > >, multi_link_registry< ISource< _Type > > >
typedef multi_link_registry< ISource< _Type > >::type::source_type _Source_type
 The type of the payload for the incoming message to this propagator_block. More...
 
typedef source_link_manager< multi_link_registry< ISource< _Type > > > _SourceLinkManager
 The type of the source_link_manager this propagator_block. More...
 
typedef _SourceLinkManager::iterator source_iterator
 The type of the iterator for the source_link_manager for this propagator_block. More...
 
- Public Types inherited from Concurrency::source_block< single_link_registry< ITarget< std::vector< _Type > > >, _MessageProcessorType >
typedef single_link_registry< ITarget< std::vector< _Type > > >::type::type _Target_type
 The payload type of messages handled by this source_block. More...
 
typedef single_link_registry< ITarget< std::vector< _Type > > >::iterator target_iterator
 The iterator to walk the connected targets. More...
 
- Public Types inherited from Concurrency::ISource< single_link_registry< ITarget< std::vector< _Type > > >::type::type >
typedef single_link_registry< ITarget< std::vector< _Type > > >::type::type source_type
 A type alias for _Type . More...
 
- Public Types inherited from Concurrency::ITarget< multi_link_registry< ISource< _Type > >::type::source_type >
typedef multi_link_registry< ISource< _Type > >::type::source_type type
 A type alias for _Type . More...
 
typedef std::function< bool(multi_link_registry< ISource< _Type > >::type::source_typeconst &)> filter_method
 The signature of any method used by the block that returns a bool value to determine whether an offered message should be accepted. More...
 

Public Member Functions

 join (size_t _NumInputs)
 Constructs a join messaging block. More...
 
 join (size_t _NumInputs, filter_method const &_Filter)
 Constructs a join messaging block. More...
 
 ~join ()
 Destroys the join block. More...
 
- Public Member Functions inherited from Concurrency::propagator_block< single_link_registry< ITarget< std::vector< _Type > > >, multi_link_registry< ISource< _Type > > >
 propagator_block ()
 Constructs a propagator_block object. More...
 
virtual ~propagator_block ()
 Destroys a propagator_block object. More...
 
virtual message_status propagate (_Inout_opt_ message< _Source_type > *_PMessage, _Inout_opt_ ISource< _Source_type > *_PSource)
 Asynchronously passes a message from a source block to this target block. More...
 
virtual message_status send (_Inout_ message< _Source_type > *_PMessage, _Inout_ ISource< _Source_type > *_PSource)
 Synchronously initiates a message to this block. Called by an ISource block. When this function completes, the message will already have propagated into the block. More...
 
- Public Member Functions inherited from Concurrency::source_block< single_link_registry< ITarget< std::vector< _Type > > >, _MessageProcessorType >
 source_block ()
 Constructs a source_block object. More...
 
virtual ~source_block ()
 Destroys the source_block object. More...
 
virtual void link_target (_Inout_ ITarget< _Target_type > *_PTarget)
 Links a target block to this source_block object. More...
 
virtual void unlink_target (_Inout_ ITarget< _Target_type > *_PTarget)
 Unlinks a target block from this source_block object. More...
 
virtual void unlink_targets ()
 Unlinks all target blocks from this source_block object. More...
 
virtual message< _Target_type > * accept (runtime_object_identity _MsgId, _Inout_ ITarget< _Target_type > *_PTarget)
 Accepts a message that was offered by this source_block object, transferring ownership to the caller. More...
 
virtual bool reserve (runtime_object_identity _MsgId, _Inout_ ITarget< _Target_type > *_PTarget)
 Reserves a message previously offered by this source_block object. More...
 
virtual message< _Target_type > * consume (runtime_object_identity _MsgId, _Inout_ ITarget< _Target_type > *_PTarget)
 Consumes a message previously offered by this source_block object and successfully reserved by the target, transferring ownership to the caller. More...
 
virtual void release (runtime_object_identity _MsgId, _Inout_ ITarget< _Target_type > *_PTarget)
 Releases a previous successful message reservation. More...
 
virtual void acquire_ref (_Inout_ ITarget< _Target_type > *)
 Acquires a reference count on this source_block object, to prevent deletion. More...
 
virtual void release_ref (_Inout_ ITarget< _Target_type > *_PTarget)
 Releases a reference count on this source_block object. More...
 
- Public Member Functions inherited from Concurrency::ISource< single_link_registry< ITarget< std::vector< _Type > > >::type::type >
virtual ~ISource ()
 Destroys the ISource object. More...
 
virtual void link_target (_Inout_ ITarget< single_link_registry< ITarget< std::vector< _Type > > >::type::type > *_PTarget)=0
 When overridden in a derived class, links a target block to this ISource block. More...
 
virtual void unlink_target (_Inout_ ITarget< single_link_registry< ITarget< std::vector< _Type > > >::type::type > *_PTarget)=0
 When overridden in a derived class, unlinks a target block from this ISource block, if found to be previously linked. More...
 
virtual message< single_link_registry< ITarget< std::vector< _Type > > >::type::type > * accept (runtime_object_identity _MsgId, _Inout_ ITarget< single_link_registry< ITarget< std::vector< _Type > > >::type::type > *_PTarget)=0
 When overridden in a derived class, accepts a message that was offered by this ISource block, transferring ownership to the caller. More...
 
virtual bool reserve (runtime_object_identity _MsgId, _Inout_ ITarget< single_link_registry< ITarget< std::vector< _Type > > >::type::type > *_PTarget)=0
 When overridden in a derived class, reserves a message previously offered by this ISource block. More...
 
virtual message< single_link_registry< ITarget< std::vector< _Type > > >::type::type > * consume (runtime_object_identity _MsgId, _Inout_ ITarget< single_link_registry< ITarget< std::vector< _Type > > >::type::type > *_PTarget)=0
 When overridden in a derived class, consumes a message previously offered by this ISource block and successfully reserved by the target, transferring ownership to the caller. More...
 
virtual void release (runtime_object_identity _MsgId, _Inout_ ITarget< single_link_registry< ITarget< std::vector< _Type > > >::type::type > *_PTarget)=0
 When overridden in a derived class, releases a previous successful message reservation. More...
 
virtual void acquire_ref (_Inout_ ITarget< single_link_registry< ITarget< std::vector< _Type > > >::type::type > *_PTarget)=0
 When overridden in a derived class, acquires a reference count on this ISource block, to prevent deletion. More...
 
virtual void release_ref (_Inout_ ITarget< single_link_registry< ITarget< std::vector< _Type > > >::type::type > *_PTarget)=0
 When overridden in a derived class, releases a reference count on this ISource block. More...
 
- Public Member Functions inherited from Concurrency::ITarget< multi_link_registry< ISource< _Type > >::type::source_type >
virtual ~ITarget ()
 Destroys the ITarget object. More...
 
virtual message_status propagate (_Inout_opt_ message< multi_link_registry< ISource< _Type > >::type::source_type > *_PMessage, _Inout_opt_ ISource< multi_link_registry< ISource< _Type > >::type::source_type > *_PSource)=0
 When overridden in a derived class, asynchronously passes a message from a source block to this target block. More...
 
virtual message_status send (_Inout_ message< multi_link_registry< ISource< _Type > >::type::source_type > *_PMessage, _Inout_ ISource< multi_link_registry< ISource< _Type > >::type::source_type > *_PSource)=0
 When overridden in a derived class, synchronously passes a message to the target block. More...
 
virtual bool supports_anonymous_source ()
 When overridden in a derived class, returns true or false depending on whether the message block accepts messages offered by a source that is not linked to it. If the overridden method returns true, the target cannot postpone an offered message, as consumption of a postponed message at a later time requires the source to be identified in its source link registry. More...
 

Protected Member Functions

message_status propagate_message (_Inout_ message< _Type > *_PMessage, _Inout_ ISource< _Type > *_PSource)
 Asynchronously passes a message from an ISource block to this join messaging block. It is invoked by the propagate method, when called by a source block. More...
 
virtual message< _OutputType > * accept_message (runtime_object_identity _MsgId)
 Accepts a message that was offered by this join messaging block, transferring ownership to the caller. More...
 
virtual bool reserve_message (runtime_object_identity _MsgId)
 Reserves a message previously offered by this join messaging block. More...
 
virtual message< _OutputType > * consume_message (runtime_object_identity _MsgId)
 Consumes a message previously offered by the join messaging block and reserved by the target, transferring ownership to the caller. More...
 
virtual void release_message (runtime_object_identity _MsgId)
 Releases a previous message reservation. More...
 
virtual void resume_propagation ()
 Resumes propagation after a reservation has been released. More...
 
virtual void link_target_notification (_Inout_ ITarget< std::vector< _Type >> *)
 A callback that notifies that a new target has been linked to this join messaging block. More...
 
void propagate_to_any_targets (_Inout_opt_ message< _OutputType > *)
 Constructs an output message containing an input message from each source when they have all propagated a message. Sends this output message out to each of its targets. More...
 
- Protected Member Functions inherited from Concurrency::propagator_block< single_link_registry< ITarget< std::vector< _Type > > >, multi_link_registry< ISource< _Type > > >
virtual message_status propagate_message (_Inout_ message< _Source_type > *_PMessage, _Inout_ ISource< _Source_type > *_PSource)=0
 When overridden in a derived class, this method asynchronously passes a message from an ISource block to this propagator_block object. It is invoked by the propagate method, when called by a source block. More...
 
virtual message_status send_message (_Inout_ message< _Source_type > *, _Inout_ ISource< _Source_type > *)
 When overridden in a derived class, this method synchronously passes a message from an ISource block to this propagator_block object. It is invoked by the send method, when called by a source block. More...
 
virtual void link_source (_Inout_ ISource< _Source_type > *_PSource)
 Links a specified source block to this propagator_block object. More...
 
virtual void unlink_source (_Inout_ ISource< _Source_type > *_PSource)
 Unlinks a specified source block from this propagator_block object. More...
 
virtual void unlink_sources ()
 Unlinks all source blocks from this propagator_block object. More...
 
virtual void process_input_messages (_Inout_ message< _Target_type > *)
 Process input messages. This is only useful for propagator blocks, which derive from source_block More...
 
void initialize_source_and_target (_Inout_opt_ Scheduler *_PScheduler=NULL, _Inout_opt_ ScheduleGroup *_PScheduleGroup=NULL)
 Initializes the base object. Specifically, the message_processor object needs to be initialized. More...
 
void register_filter (filter_method const &_Filter)
 Registers a filter method that will be invoked on every received message. More...
 
void decline_incoming_messages ()
 Indicates to the block that new messages should be declined. More...
 
void remove_network_links ()
 Removes all the source and target network links from this propagator_block object. More...
 
- Protected Member Functions inherited from Concurrency::source_block< single_link_registry< ITarget< std::vector< _Type > > >, _MessageProcessorType >
virtual void link_target_notification (_Inout_ ITarget< _Target_type > *)
 A callback that notifies that a new target has been linked to this source_block object. More...
 
virtual void unlink_target_notification (_Inout_ ITarget< _Target_type > *_PTarget)
 A callback that notifies that a target has been unlinked from this source_block object. More...
 
virtual void propagate_output_messages ()
 Propagate messages to targets. More...
 
virtual void propagate_to_any_targets (_Inout_opt_ message< _Target_type > *_PMessage)
 When overridden in a derived class, propagates the given message to any or all of the linked targets. This is the main propagation routine for message blocks. More...
 
void initialize_source (_Inout_opt_ Scheduler *_PScheduler=NULL, _Inout_opt_ ScheduleGroup *_PScheduleGroup=NULL)
 Initializes the message_propagator within this source_block. More...
 
void enable_batched_processing ()
 Enables batched processing for this block. More...
 
virtual void sync_send (_Inout_opt_ message< _Target_type > *_Msg)
 Synchronously queues up messages and starts a propagation task, if this has not been done already. More...
 
virtual void async_send (_Inout_opt_ message< _Target_type > *_Msg)
 Asynchronously queues up messages and starts a propagation task, if this has not been done already More...
 
void wait_for_outstanding_async_sends ()
 Waits for all asynchronous propagations to complete. This propagator-specific spin wait is used in destructors of message blocks to make sure that all asynchronous propagations have time to finish before destroying the block. More...
 
void remove_targets ()
 Removes all target links for this source block. This should be called from the destructor. More...
 
- Protected Member Functions inherited from Concurrency::ISource< single_link_registry< ITarget< std::vector< _Type > > >::type::type >
void _Invoke_link_source (ITarget< single_link_registry< ITarget< std::vector< _Type > > >::type::type > *_PLinkFrom)
 Links this source to a target. More...
 
void _Invoke_unlink_source (ITarget< single_link_registry< ITarget< std::vector< _Type > > >::type::type > *_PUnlinkFrom)
 Unlinks this source from a target. More...
 
- Protected Member Functions inherited from Concurrency::ITarget< multi_link_registry< ISource< _Type > >::type::source_type >
virtual void link_source (_Inout_ ISource< multi_link_registry< ISource< _Type > >::type::source_type > *_PSource)=0
 When overridden in a derived class, links a specified source block to this ITarget block. More...
 
virtual void unlink_source (_Inout_ ISource< multi_link_registry< ISource< _Type > >::type::source_type > *_PSource)=0
 When overridden in a derived class, unlinks a specified source block from this ITarget block. More...
 

Private Types

typedef single_link_registry< ITarget< std::vector< _Type > > > _TargetLinkRegistry
 
typedef multi_link_registry< ISource< _Type > > _SourceLinkRegistry
 

Private Member Functions

void _Propagate_priority_order (::Concurrency::details::_Queue< message< _Target_type >> &_MessageBuffer)
 Propagate messages in priority order. More...
 
message< std::vector< _Type > > *__cdecl _Create_new_message ()
 Constructs a new message from the data output. More...
 
void _Initialize (size_t _NumInputs, Scheduler *_PScheduler=NULL, ScheduleGroup *_PScheduleGroup=NULL)
 Initializes the join messaging block. More...
 
void _Delete_stored_messages ()
 Deletes all messages currently stored in this message block. Should be called by the destructor to ensure any messages propagated in are cleaned up. More...
 

Private Attributes

volatile size_t _M_messagesRemaining
 
_MessageArray _M_messageArray
 
_SavedMessageIdArray _M_savedMessageIdArray
 
runtime_object_identity_M_savedIdBuffer
 
::Concurrency::details::_NonReentrantPPLLock _M_propagationLock
 
::Concurrency::details::_Queue< message< std::vector< _Type > > > _M_messageBuffer
 

Additional Inherited Members

- Protected Attributes inherited from Concurrency::propagator_block< single_link_registry< ITarget< std::vector< _Type > > >, multi_link_registry< ISource< _Type > > >
_SourceLinkManager _M_connectedSources
 The container for all the sources connected to this block. More...
 
filter_method_M_pFilter
 The filter function which determines whether offered messages should be accepted. More...
 
volatile bool _M_fDeclineMessages
 A bool that is set to indicate that all messages should be declined in preparation for deleting the block More...
 
- Protected Attributes inherited from Concurrency::source_block< single_link_registry< ITarget< std::vector< _Type > > >, _MessageProcessorType >
ITarget< _Target_type > * _M_pReservedFor
 Connected target that is holding a reservation More...
 
runtime_object_identity _M_reservedId
 Reserved message ID More...
 
single_link_registry< ITarget< std::vector< _Type > > > _M_connectedTargets
 Connected targets More...
 
_MessageProcessorType _M_messageProcessor
 Processor used for asynchronous message handling More...
 

Detailed Description

template<class _Type, join_type _Jtype = non_greedy>
class Concurrency::join< _Type, _Jtype >

A join messaging block is a single-target, multi-source, ordered propagator_block which combines together messages of type _Type from each of its sources.

Template Parameters
_TypeThe payload type of the messages joined and propagated by the block.
_JtypeThe kind of join block this is, either greedy or non_greedy

For more information, see Asynchronous Message Blocks.

See also
choice Class, multitype_join Class, join_type Enumeration

Member Typedef Documentation

template<class _Type , join_type _Jtype = non_greedy>
typedef std::vector<_Type> Concurrency::join< _Type, _Jtype >::_OutputType
template<class _Type , join_type _Jtype = non_greedy>
typedef multi_link_registry<ISource<_Type> > Concurrency::join< _Type, _Jtype >::_SourceLinkRegistry
private
template<class _Type , join_type _Jtype = non_greedy>
typedef single_link_registry<ITarget<std::vector<_Type> > > Concurrency::join< _Type, _Jtype >::_TargetLinkRegistry
private

Constructor & Destructor Documentation

template<class _Type , join_type _Jtype = non_greedy>
Concurrency::join< _Type, _Jtype >::join ( size_t  _NumInputs)
inline

Constructs a join messaging block.

Parameters
_NumInputsThe number of inputs this join block will be allowed.

The runtime uses the default scheduler if you do not specify the _PScheduler or _PScheduleGroup parameters.

The type filter_method is a functor with signature bool (_Type const &) which is invoked by this join messaging block to determine whether or not it should accept an offered message.

See also
Scheduler Class, ScheduleGroup Class
9185  : _M_messageArray(_NumInputs),
9186  _M_savedMessageIdArray(_NumInputs)
9187  {
9188  _Initialize(_NumInputs);
9189  }
_MessageArray _M_messageArray
Definition: agents.h:9927
void _Initialize(size_t _NumInputs, Scheduler *_PScheduler=NULL, ScheduleGroup *_PScheduleGroup=NULL)
Initializes the join messaging block.
Definition: agents.h:9858
_SavedMessageIdArray _M_savedMessageIdArray
Definition: agents.h:9952
template<class _Type , join_type _Jtype = non_greedy>
Concurrency::join< _Type, _Jtype >::join ( size_t  _NumInputs,
filter_method const &  _Filter 
)
inline

Constructs a join messaging block.

Parameters
_NumInputsThe number of inputs this join block will be allowed.
_FilterA filter function which determines whether offered messages should be accepted.

The runtime uses the default scheduler if you do not specify the _PScheduler or _PScheduleGroup parameters.

The type filter_method is a functor with signature bool (_Type const &) which is invoked by this join messaging block to determine whether or not it should accept an offered message.

See also
Scheduler Class, ScheduleGroup Class
9211  : _M_messageArray(_NumInputs),
9212  _M_savedMessageIdArray(_NumInputs)
9213  {
9214  _Initialize(_NumInputs);
9215  register_filter(_Filter);
9216  }
_MessageArray _M_messageArray
Definition: agents.h:9927
void _Initialize(size_t _NumInputs, Scheduler *_PScheduler=NULL, ScheduleGroup *_PScheduleGroup=NULL)
Initializes the join messaging block.
Definition: agents.h:9858
void register_filter(filter_method const &_Filter)
Registers a filter method that will be invoked on every received message.
Definition: agents.h:5842
_SavedMessageIdArray _M_savedMessageIdArray
Definition: agents.h:9952
template<class _Type , join_type _Jtype = non_greedy>
Concurrency::join< _Type, _Jtype >::~join ( )
inline

Destroys the join block.

9339  {
9340  // Remove all links that are targets of this join
9341  this->remove_network_links();
9342 
9343  // Clean up any messages left in this message block
9345 
9346  delete [] _M_savedIdBuffer;
9347  }
runtime_object_identity * _M_savedIdBuffer
Definition: agents.h:9955
void _Delete_stored_messages()
Deletes all messages currently stored in this message block. Should be called by the destructor to en...
Definition: agents.h:9884
void remove_network_links()
Removes all the source and target network links from this propagator_block object.
Definition: agents.h:5866

Member Function Documentation

template<class _Type , join_type _Jtype = non_greedy>
message<std::vector<_Type> >* __cdecl Concurrency::join< _Type, _Jtype >::_Create_new_message ( )
inlineprivate

Constructs a new message from the data output.

Returns
The created message (NULL if creation failed)
9746  {
9747  bool fIsNonGreedy = (_Jtype == non_greedy);
9748 
9749  // If this is a non-greedy join, check each source and try to consume their message
9750  if (fIsNonGreedy)
9751  {
9752 
9753  // The iterator _Iter below will ensure that it is safe to touch
9754  // non-NULL source pointers. Take a snapshot.
9755  std::vector<ISource<_Type> *> _Sources;
9756  source_iterator _Iter = this->_M_connectedSources.begin();
9757 
9758  while (*_Iter != NULL)
9759  {
9760  ISource<_Type> * _PSource = *_Iter;
9761 
9762  if (_PSource == NULL)
9763  {
9764  break;
9765  }
9766 
9767  _Sources.push_back(_PSource);
9768  ++_Iter;
9769  }
9770 
9771  if (_Sources.size() != _M_messageArray._M_count)
9772  {
9773  // Some of the sources were unlinked. The join is broken
9774  return NULL;
9775  }
9776 
9777  // First, try and reserve all the messages. If a reservation fails,
9778  // then release any reservations that had been made.
9779  for (size_t i = 0; i < _M_savedMessageIdArray._M_count; i++)
9780  {
9781  // Snap the current saved ID into a buffer. This value can be changing behind the scenes from
9782  // other source->propagate(msg, this) calls, but if so, that just means the reserve below will
9783  // fail.
9785  _M_savedIdBuffer[i] = _InterlockedExchange((volatile long *) &_M_savedMessageIdArray._M_savedIds[i], -1);
9786 
9788 
9789  if (!_Sources[i]->reserve(_M_savedIdBuffer[i], this))
9790  {
9791  // A reservation failed, release all reservations made up until
9792  // this block, and wait for another message to arrive on this link
9793  for (size_t j = 0; j < i; j++)
9794  {
9795  _Sources[j]->release(_M_savedIdBuffer[j], this);
9796  if (_InterlockedCompareExchange((volatile long *) &_M_savedMessageIdArray._M_savedIds[j], _M_savedIdBuffer[j], -1) == -1)
9797  {
9799  {
9800  this->async_send(NULL);
9801  }
9802  }
9803  }
9804 
9805  // Return NULL to indicate that the create failed
9806  return NULL;
9807  }
9808  }
9809 
9810  // Because everything has been reserved, consume all the messages.
9811  // This is guaranteed to return true.
9812  for (size_t i = 0; i < _M_messageArray._M_count; i++)
9813  {
9814  _M_messageArray._M_messages[i] = _Sources[i]->consume(_M_savedIdBuffer[i], this);
9815  _M_savedIdBuffer[i] = -1;
9816  }
9817  }
9818 
9819  if (!fIsNonGreedy)
9820  {
9821  // Reinitialize how many messages are being waited for.
9822  // This is safe because all messages have been received, thus no new async_sends for
9823  // greedy joins can be called.
9825  }
9826 
9827  std::vector<_Type> _OutputVector;
9828  for (size_t i = 0; i < _M_messageArray._M_count; i++)
9829  {
9831  _OutputVector.push_back(_M_messageArray._M_messages[i]->payload);
9832 
9833  delete _M_messageArray._M_messages[i];
9834  if (fIsNonGreedy)
9835  {
9837  }
9838  }
9839  return (new message<std::vector<_Type>>(_OutputVector));
9840  }
_MessageArray _M_messageArray
Definition: agents.h:9927
Non-greedy join messaging blocks postpone messages and try and consume them after all have arrived...
Definition: agents.h:9132
#define _CONCRT_ASSERT(x)
Definition: concrt.h:123
runtime_object_identity * _M_savedIds
Definition: agents.h:9938
size_t _M_count
Definition: agents.h:9937
#define _InterlockedDecrementSizeT(_Target)
Definition: concrt.h:97
int i[4]
Definition: dvec.h:68
message< _Type > ** _M_messages
Definition: agents.h:9911
_SourceLinkManager _M_connectedSources
The container for all the sources connected to this block.
Definition: agents.h:5884
volatile size_t _M_messagesRemaining
Definition: agents.h:9904
_SourceLinkManager::iterator source_iterator
The type of the iterator for the source_link_manager for this propagator_block.
Definition: agents.h:5597
runtime_object_identity * _M_savedIdBuffer
Definition: agents.h:9955
_Pre_maybenull_ _Inout_ _Deref_prepost_z_ wchar_t const _PSource
Definition: wchar.h:148
size_t _M_count
Definition: agents.h:9910
_SavedMessageIdArray _M_savedMessageIdArray
Definition: agents.h:9952
virtual bool reserve(runtime_object_identity _MsgId, _Inout_ ITarget< _Target_type > *_PTarget)
Reserves a message previously offered by this source_block object.
Definition: agents.h:5052
long __cdecl _InterlockedCompareExchange(long volatile *, long, long)
virtual void async_send(_Inout_opt_ message< _Target_type > *_Msg)
Asynchronously queues up messages and starts a propagation task, if this has not been done already ...
Definition: agents.h:5435
#define NULL
Definition: corecrt.h:158
#define _InterlockedIncrementSizeT(_Target)
Definition: concrt.h:96
template<class _Type , join_type _Jtype = non_greedy>
void Concurrency::join< _Type, _Jtype >::_Delete_stored_messages ( )
inlineprivate

Deletes all messages currently stored in this message block. Should be called by the destructor to ensure any messages propagated in are cleaned up.

9885  {
9886  // Input messages for this message block are in the base-class input buffer
9887  // All messages in that buffer are guaranteed to have moved to the output
9888  // buffer because the destructor first waits for all async sends to finish
9889  // before reaching this point
9890 
9891  // Delete any messages remaining in the output queue
9892  for (;;)
9893  {
9894  message<std::vector<_Type>> * _Msg = _M_messageBuffer._Dequeue();
9895  if (_Msg == NULL)
9896  {
9897  break;
9898  }
9899  delete _Msg;
9900  }
9901  }
::Concurrency::details::_Queue< message< std::vector< _Type > > > _M_messageBuffer
Definition: agents.h:9961
#define NULL
Definition: corecrt.h:158
template<class _Type , join_type _Jtype = non_greedy>
void Concurrency::join< _Type, _Jtype >::_Initialize ( size_t  _NumInputs,
Scheduler *  _PScheduler = NULL,
ScheduleGroup *  _PScheduleGroup = NULL 
)
inlineprivate

Initializes the join messaging block.

Parameters
_NumInputsThe number of inputs.
_PSchedulerThe scheduler onto which the task to propagate the join block's message will be scheduled. If unspecified, the join messaging block uses the default scheduler.
_PScheduleGroupThe schedule group into which the task to propagate the join block's message will be scheduled. The scheduler used is implied by the schedule group. If unspecified, the join uses a schedule group of the scheduler's choosing.
9859  {
9860  this->initialize_source_and_target(_PScheduler, _PScheduleGroup);
9861 
9862  this->_M_connectedSources.set_bound(_NumInputs);
9863  _M_messagesRemaining = _NumInputs;
9864 
9865  bool fIsNonGreedy = (_Jtype == non_greedy);
9866 
9867  if (fIsNonGreedy)
9868  {
9869  // Non greedy joins need a buffer to snap off saved message IDs to.
9870  _M_savedIdBuffer = new runtime_object_identity[_NumInputs];
9871  memset(_M_savedIdBuffer, -1, sizeof(runtime_object_identity) * _NumInputs);
9872  }
9873  else
9874  {
9876  }
9877  }
Non-greedy join messaging blocks postpone messages and try and consume them after all have arrived...
Definition: agents.h:9132
__int32 runtime_object_identity
Each message instance has an identity that follows it as it is cloned and passed between messaging co...
Definition: agents.h:51
_SourceLinkManager _M_connectedSources
The container for all the sources connected to this block.
Definition: agents.h:5884
volatile size_t _M_messagesRemaining
Definition: agents.h:9904
runtime_object_identity * _M_savedIdBuffer
Definition: agents.h:9955
void initialize_source_and_target(_Inout_opt_ Scheduler *_PScheduler=NULL, _Inout_opt_ ScheduleGroup *_PScheduleGroup=NULL)
Initializes the base object. Specifically, the message_processor object needs to be initialized...
Definition: agents.h:5827
#define NULL
Definition: corecrt.h:158
template<class _Type , join_type _Jtype = non_greedy>
void Concurrency::join< _Type, _Jtype >::_Propagate_priority_order ( ::Concurrency::details::_Queue< message< _Target_type >> &  _MessageBuffer)
inlineprivate

Propagate messages in priority order.

Parameters
_MessageBufferReference to a message queue with messages to be propagated
9691  {
9692  message<_Target_type> * _Msg = _MessageBuffer._Peek();
9693 
9694  // If someone has reserved the _Head message, don't propagate anymore
9695  if (this->_M_pReservedFor != NULL)
9696  {
9697  return;
9698  }
9699 
9700  while (_Msg != NULL)
9701  {
9702  message_status _Status = declined;
9703 
9704  // Always start from the first target that linked
9705  for (target_iterator _Iter = this->_M_connectedTargets.begin(); *_Iter != NULL; ++_Iter)
9706  {
9707  ITarget<_Target_type> * _PTarget = *_Iter;
9708  _Status = _PTarget->propagate(_Msg, this);
9709 
9710  // Ownership of message changed. Do not propagate this
9711  // message to any other target.
9712  if (_Status == accepted)
9713  {
9714  break;
9715  }
9716 
9717  // If the target just propagated to reserved this message, stop
9718  // propagating it to others
9719  if (this->_M_pReservedFor != NULL)
9720  {
9721  break;
9722  }
9723  }
9724 
9725  // If status is anything other than accepted, then the head message
9726  // was not propagated out. Thus, nothing after it in the queue can
9727  // be propagated out. Cease propagation.
9728  if (_Status != accepted)
9729  {
9730  break;
9731  }
9732 
9733  // Get the next message
9734  _Msg = _MessageBuffer._Peek();
9735  }
9736  }
_Message * _Peek()
Definition: agents.h:227
single_link_registry< ITarget< std::vector< _Type > > >::iterator target_iterator
The iterator to walk the connected targets.
Definition: agents.h:4893
single_link_registry< ITarget< std::vector< _Type > > > _M_connectedTargets
Connected targets
Definition: agents.h:5485
The target did not accept the message.
Definition: agents.h:1751
message_status
The valid responses for an offer of a message object to a block.
Definition: agents.h:1740
The target accepted the message.
Definition: agents.h:1746
ITarget< _Target_type > * _M_pReservedFor
Connected target that is holding a reservation
Definition: agents.h:5473
#define NULL
Definition: corecrt.h:158
template<class _Type , join_type _Jtype = non_greedy>
virtual message<_OutputType>* Concurrency::join< _Type, _Jtype >::accept_message ( runtime_object_identity  _MsgId)
inlineprotectedvirtual

Accepts a message that was offered by this join messaging block, transferring ownership to the caller.

Parameters
_MsgIdThe runtime_object_identity of the offered message object.
Returns
A pointer to the message object that the caller now has ownership of.

Implements Concurrency::source_block< single_link_registry< ITarget< std::vector< _Type > > >, _MessageProcessorType >.

9473  {
9474  //
9475  // Peek at the head message in the message buffer. If the IDs match
9476  // dequeue and transfer ownership
9477  //
9478  message<_OutputType> * _Msg = NULL;
9479 
9480  if (_M_messageBuffer._Is_head(_MsgId))
9481  {
9482  _Msg = _M_messageBuffer._Dequeue();
9483  }
9484 
9485  return _Msg;
9486  }
::Concurrency::details::_Queue< message< std::vector< _Type > > > _M_messageBuffer
Definition: agents.h:9961
#define NULL
Definition: corecrt.h:158
template<class _Type , join_type _Jtype = non_greedy>
virtual message<_OutputType>* Concurrency::join< _Type, _Jtype >::consume_message ( runtime_object_identity  _MsgId)
inlineprotectedvirtual

Consumes a message previously offered by the join messaging block and reserved by the target, transferring ownership to the caller.

Parameters
_MsgIdThe runtime_object_identity of the message object being consumed.
Returns
A pointer to the message object that the caller now has ownership of.

Similar to accept, but is always preceded by a call to reserve.

Implements Concurrency::source_block< single_link_registry< ITarget< std::vector< _Type > > >, _MessageProcessorType >.

9523  {
9524  // By default, accept the message
9525  return accept_message(_MsgId);
9526  }
virtual message< _OutputType > * accept_message(runtime_object_identity _MsgId)
Accepts a message that was offered by this join messaging block, transferring ownership to the caller...
Definition: agents.h:9472
template<class _Type , join_type _Jtype = non_greedy>
virtual void Concurrency::join< _Type, _Jtype >::link_target_notification ( _Inout_ ITarget< std::vector< _Type >> *  )
inlineprotectedvirtual

A callback that notifies that a new target has been linked to this join messaging block.

Parameters
_PTargetA pointer to the newly linked target.
9565  {
9566  // If the message queue is blocked due to reservation
9567  // there is no need to do any message propagation
9568  if (this->_M_pReservedFor != NULL)
9569  {
9570  return;
9571  }
9572 
9574  }
void _Propagate_priority_order(::Concurrency::details::_Queue< message< _Target_type >> &_MessageBuffer)
Propagate messages in priority order.
Definition: agents.h:9690
::Concurrency::details::_Queue< message< std::vector< _Type > > > _M_messageBuffer
Definition: agents.h:9961
ITarget< _Target_type > * _M_pReservedFor
Connected target that is holding a reservation
Definition: agents.h:5473
#define NULL
Definition: corecrt.h:158
template<class _Type , join_type _Jtype = non_greedy>
message_status Concurrency::join< _Type, _Jtype >::propagate_message ( _Inout_ message< _Type > *  _PMessage,
_Inout_ ISource< _Type > *  _PSource 
)
inlineprotected

Asynchronously passes a message from an ISource block to this join messaging block. It is invoked by the propagate method, when called by a source block.

Parameters
_PMessageA pointer to the message object.
_PSourceA pointer to the source block offering the message.
Returns
A message_status indication of what the target decided to do with the message.
9370  {
9371  // It is important that calls to propagate do *not* take the same lock on the
9372  // internal structure that is used by Consume and the LWT. Doing so could
9373  // result in a deadlock with the Consume call.
9374 
9375  message_status _Ret_val = accepted;
9376 
9377  //
9378  // Find the slot index of this source
9379  //
9380  size_t _Slot = 0;
9381  bool _Found = false;
9382  for (source_iterator _Iter = this->_M_connectedSources.begin(); *_Iter != NULL; ++_Iter)
9383  {
9384  if (*_Iter == _PSource)
9385  {
9386  _Found = true;
9387  break;
9388  }
9389 
9390  _Slot++;
9391  }
9392 
9393  if (!_Found)
9394  {
9395  // If this source was not found in the array, this is not a connected source
9396  // decline the message
9397  return declined;
9398  }
9399 
9401 
9402  bool fIsGreedy = (_Jtype == greedy);
9403 
9404  if (fIsGreedy)
9405  {
9406  //
9407  // Greedy type joins immediately accept the message.
9408  //
9409  {
9410  _NR_lock lockHolder(_M_propagationLock);
9411  if (_M_messageArray._M_messages[_Slot] != NULL)
9412  {
9413  _M_savedMessageIdArray._M_savedIds[_Slot] = _PMessage->msg_id();
9414  _Ret_val = postponed;
9415  }
9416  }
9417 
9418  if (_Ret_val != postponed)
9419  {
9420  _M_messageArray._M_messages[_Slot] = _PSource->accept(_PMessage->msg_id(), this);
9421 
9422  if (_M_messageArray._M_messages[_Slot] != NULL)
9423  {
9425  {
9426  // If messages have arrived on all links, start a propagation
9427  // of the current message
9428  this->async_send(NULL);
9429  }
9430  }
9431  else
9432  {
9433  _Ret_val = missed;
9434  }
9435  }
9436  }
9437  else
9438  {
9439  //
9440  // Non-greedy type joins save the message IDs until they have all arrived
9441  //
9442 
9443  if (_InterlockedExchange((volatile long *) &_M_savedMessageIdArray._M_savedIds[_Slot], _PMessage->msg_id()) == -1)
9444  {
9445  // Decrement the message remaining count if this thread is switching
9446  // the saved ID from -1 to a valid value.
9448  {
9449  this->async_send(NULL);
9450  }
9451  }
9452 
9453  // Always return postponed. This message will be consumed
9454  // in the LWT
9455  _Ret_val = postponed;
9456  }
9457 
9458  return _Ret_val;
9459  }
_MessageArray _M_messageArray
Definition: agents.h:9927
#define _CONCRT_ASSERT(x)
Definition: concrt.h:123
The target did not accept the message.
Definition: agents.h:1751
The target postponed the message.
Definition: agents.h:1756
runtime_object_identity * _M_savedIds
Definition: agents.h:9938
#define _InterlockedDecrementSizeT(_Target)
Definition: concrt.h:97
message_status
The valid responses for an offer of a message object to a block.
Definition: agents.h:1740
The target tried to accept the message, but it was no longer available.
Definition: agents.h:1761
message< _Type > ** _M_messages
Definition: agents.h:9911
The target accepted the message.
Definition: agents.h:1746
::Concurrency::details::_NonReentrantPPLLock::_Scoped_lock _NR_lock
A lock holder that acquires a non-reentrant lock on instantiation and releases it on destruction...
Definition: agents.h:58
_SourceLinkManager _M_connectedSources
The container for all the sources connected to this block.
Definition: agents.h:5884
volatile size_t _M_messagesRemaining
Definition: agents.h:9904
_SourceLinkManager::iterator source_iterator
The type of the iterator for the source_link_manager for this propagator_block.
Definition: agents.h:5597
_Pre_maybenull_ _Inout_ _Deref_prepost_z_ wchar_t const _PSource
Definition: wchar.h:148
size_t _M_count
Definition: agents.h:9910
_SavedMessageIdArray _M_savedMessageIdArray
Definition: agents.h:9952
::Concurrency::details::_NonReentrantPPLLock _M_propagationLock
Definition: agents.h:9958
Greedy join messaging blocks immediately accept a message upon propagation. This is more efficient...
Definition: agents.h:9126
virtual void async_send(_Inout_opt_ message< _Target_type > *_Msg)
Asynchronously queues up messages and starts a propagation task, if this has not been done already ...
Definition: agents.h:5435
#define NULL
Definition: corecrt.h:158
template<class _Type , join_type _Jtype = non_greedy>
void Concurrency::join< _Type, _Jtype >::propagate_to_any_targets ( _Inout_opt_ message< _OutputType > *  )
inlineprotected

Constructs an output message containing an input message from each source when they have all propagated a message. Sends this output message out to each of its targets.

9583  {
9584  message<_OutputType> * _Msg = NULL;
9585  // Create a new message from the input sources
9586  // If messagesRemaining == 0, we have a new message to create. Otherwise, this is coming from
9587  // a consume or release from the target. In that case we don't want to create a new message.
9588  if (_M_messagesRemaining == 0)
9589  {
9590  // A greedy join can immediately create the message, a non-greedy
9591  // join must try and consume all the messages it has postponed
9592  _Msg = _Create_new_message();
9593  }
9594 
9595  if (_Msg == NULL)
9596  {
9597  // Create message failed. This happens in non_greedy joins when the
9598  // reserve/consumption of a postponed message failed.
9600  return;
9601  }
9602 
9603  bool fIsGreedy = (_Jtype == greedy);
9604 
9605  // For a greedy join, reset the number of messages remaining
9606  // Check to see if multiple messages have been passed in on any of the links,
9607  // and postponed. If so, try and reserve/consume them now
9608  if (fIsGreedy)
9609  {
9610  // Look at the saved IDs and reserve/consume any that have passed in while
9611  // this join was waiting to complete
9613 
9614  for (size_t i = 0; i < _M_messageArray._M_count; i++)
9615  {
9616  for(;;)
9617  {
9618  runtime_object_identity _Saved_id;
9619  // Grab the current saved ID value. This value could be changing from based on any
9620  // calls of source->propagate(this). If the message ID is different than what is snapped
9621  // here, that means, the reserve below must fail. This is because reserve is trying
9622  // to get the same source lock the propagate(this) call must be holding.
9623  {
9624  _NR_lock lockHolder(_M_propagationLock);
9625 
9627 
9628  _Saved_id = _M_savedMessageIdArray._M_savedIds[i];
9629 
9630  if (_Saved_id == -1)
9631  {
9633  break;
9634  }
9635  else
9636  {
9638  }
9639  }
9640 
9641  if (_Saved_id != -1)
9642  {
9643  source_iterator _Iter = this->_M_connectedSources.begin();
9644 
9645  ISource<_Type> * _PSource = _Iter[i];
9646  if ((_PSource != NULL) && _PSource->reserve(_Saved_id, this))
9647  {
9648  _M_messageArray._M_messages[i] = _PSource->consume(_Saved_id, this);
9650  break;
9651  }
9652  }
9653  }
9654  }
9655 
9656  // If messages have all been received, async_send again, this will start the
9657  // LWT up to create a new message
9658  if (_M_messagesRemaining == 0)
9659  {
9660  this->async_send(NULL);
9661  }
9662  }
9663 
9664  // Add the new message to the outbound queue
9665  _M_messageBuffer._Enqueue(_Msg);
9666 
9667  if (!_M_messageBuffer._Is_head(_Msg->msg_id()))
9668  {
9669  // another message is at the head of the outbound message queue and blocked
9670  // simply return
9671  return;
9672  }
9673 
9675  }
void _Propagate_priority_order(::Concurrency::details::_Queue< message< _Target_type >> &_MessageBuffer)
Propagate messages in priority order.
Definition: agents.h:9690
_MessageArray _M_messageArray
Definition: agents.h:9927
#define _CONCRT_ASSERT(x)
Definition: concrt.h:123
__int32 runtime_object_identity
Each message instance has an identity that follows it as it is cloned and passed between messaging co...
Definition: agents.h:51
message< std::vector< _Type > > *__cdecl _Create_new_message()
Constructs a new message from the data output.
Definition: agents.h:9745
runtime_object_identity * _M_savedIds
Definition: agents.h:9938
size_t _M_count
Definition: agents.h:9937
#define _InterlockedDecrementSizeT(_Target)
Definition: concrt.h:97
int i[4]
Definition: dvec.h:68
::Concurrency::details::_Queue< message< std::vector< _Type > > > _M_messageBuffer
Definition: agents.h:9961
message< _Type > ** _M_messages
Definition: agents.h:9911
::Concurrency::details::_NonReentrantPPLLock::_Scoped_lock _NR_lock
A lock holder that acquires a non-reentrant lock on instantiation and releases it on destruction...
Definition: agents.h:58
_SourceLinkManager _M_connectedSources
The container for all the sources connected to this block.
Definition: agents.h:5884
volatile size_t _M_messagesRemaining
Definition: agents.h:9904
_SourceLinkManager::iterator source_iterator
The type of the iterator for the source_link_manager for this propagator_block.
Definition: agents.h:5597
_Pre_maybenull_ _Inout_ _Deref_prepost_z_ wchar_t const _PSource
Definition: wchar.h:148
size_t _M_count
Definition: agents.h:9910
_SavedMessageIdArray _M_savedMessageIdArray
Definition: agents.h:9952
::Concurrency::details::_NonReentrantPPLLock _M_propagationLock
Definition: agents.h:9958
Greedy join messaging blocks immediately accept a message upon propagation. This is more efficient...
Definition: agents.h:9126
virtual void async_send(_Inout_opt_ message< _Target_type > *_Msg)
Asynchronously queues up messages and starts a propagation task, if this has not been done already ...
Definition: agents.h:5435
#define NULL
Definition: corecrt.h:158
template<class _Type , join_type _Jtype = non_greedy>
virtual void Concurrency::join< _Type, _Jtype >::release_message ( runtime_object_identity  _MsgId)
inlineprotectedvirtual

Releases a previous message reservation.

Parameters
_MsgIdThe runtime_object_identity of the message object being released.

Implements Concurrency::source_block< single_link_registry< ITarget< std::vector< _Type > > >, _MessageProcessorType >.

9536  {
9537  // The head message is the one reserved.
9538  if (!_M_messageBuffer._Is_head(_MsgId))
9539  {
9540  throw message_not_found();
9541  }
9542  }
::Concurrency::details::_Queue< message< std::vector< _Type > > > _M_messageBuffer
Definition: agents.h:9961
template<class _Type , join_type _Jtype = non_greedy>
virtual bool Concurrency::join< _Type, _Jtype >::reserve_message ( runtime_object_identity  _MsgId)
inlineprotectedvirtual

Reserves a message previously offered by this join messaging block.

Parameters
_MsgIdThe runtime_object_identity of the offered message object.
Returns
true if the message was successfully reserved, false otherwise.

After reserve is called, if it returns true, either consume or release must be called to either take or release ownership of the message.

Implements Concurrency::source_block< single_link_registry< ITarget< std::vector< _Type > > >, _MessageProcessorType >.

9503  {
9504  // Allow reservation if this is the head message
9505  return _M_messageBuffer._Is_head(_MsgId);
9506  }
::Concurrency::details::_Queue< message< std::vector< _Type > > > _M_messageBuffer
Definition: agents.h:9961
template<class _Type , join_type _Jtype = non_greedy>
virtual void Concurrency::join< _Type, _Jtype >::resume_propagation ( )
inlineprotectedvirtual

Resumes propagation after a reservation has been released.

Implements Concurrency::source_block< single_link_registry< ITarget< std::vector< _Type > > >, _MessageProcessorType >.

9549  {
9550  // If there are any messages in the buffer, propagate them out
9551  if (_M_messageBuffer._Count() > 0)
9552  {
9553  this->async_send(NULL);
9554  }
9555  }
::Concurrency::details::_Queue< message< std::vector< _Type > > > _M_messageBuffer
Definition: agents.h:9961
virtual void async_send(_Inout_opt_ message< _Target_type > *_Msg)
Asynchronously queues up messages and starts a propagation task, if this has not been done already ...
Definition: agents.h:5435
#define NULL
Definition: corecrt.h:158

Member Data Documentation

template<class _Type , join_type _Jtype = non_greedy>
_MessageArray Concurrency::join< _Type, _Jtype >::_M_messageArray
private
template<class _Type , join_type _Jtype = non_greedy>
::Concurrency::details::_Queue<message<std::vector<_Type> > > Concurrency::join< _Type, _Jtype >::_M_messageBuffer
private
template<class _Type , join_type _Jtype = non_greedy>
volatile size_t Concurrency::join< _Type, _Jtype >::_M_messagesRemaining
private
template<class _Type , join_type _Jtype = non_greedy>
::Concurrency::details::_NonReentrantPPLLock Concurrency::join< _Type, _Jtype >::_M_propagationLock
private
template<class _Type , join_type _Jtype = non_greedy>
runtime_object_identity* Concurrency::join< _Type, _Jtype >::_M_savedIdBuffer
private
template<class _Type , join_type _Jtype = non_greedy>
_SavedMessageIdArray Concurrency::join< _Type, _Jtype >::_M_savedMessageIdArray
private

The documentation for this class was generated from the following file: