STLdoc
STLdocumentation
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
Public Member Functions | Protected Member Functions | Private Types | Private Member Functions | Private Attributes | List of all members
Concurrency::_Join_node< _Type, _Destination_type, _Jtype > Class Template Reference

Defines a block allowing sources of distinct types to be joined. Join node is a single-target, multi-source ordered propagator block More...

#include <agents.h>

Inheritance diagram for Concurrency::_Join_node< _Type, _Destination_type, _Jtype >:
Concurrency::propagator_block< single_link_registry< ITarget< _Destination_type > >, multi_link_registry< ISource< size_t > > > Concurrency::source_block< single_link_registry< ITarget< _Destination_type > >, _MessageProcessorType > Concurrency::ITarget< multi_link_registry< ISource< size_t > >::type::source_type > Concurrency::ISource< single_link_registry< ITarget< _Destination_type > >::type::type >

Public Member Functions

 _Join_node ()
 Constructs a join within the default scheduler, and places it on any schedule group of the scheduler's choosing. More...
 
 _Join_node (Scheduler &_PScheduler)
 Constructs a join within the specified scheduler, and places it on any schedule group of the scheduler's choosing. More...
 
 _Join_node (ScheduleGroup &_PScheduleGroup)
 Constructs a join within the specified schedule group. The scheduler is implied by the schedule group. More...
 
 ~_Join_node ()
 Cleans up any resources that may have been created by the join. More...
 
- Public Member Functions inherited from Concurrency::propagator_block< single_link_registry< ITarget< _Destination_type > >, multi_link_registry< ISource< size_t > > >
 propagator_block ()
 Constructs a propagator_block object. More...
 
virtual ~propagator_block ()
 Destroys a propagator_block object. More...
 
virtual message_status propagate (_Inout_opt_ message< _Source_type > *_PMessage, _Inout_opt_ ISource< _Source_type > *_PSource)
 Asynchronously passes a message from a source block to this target block. More...
 
virtual message_status send (_Inout_ message< _Source_type > *_PMessage, _Inout_ ISource< _Source_type > *_PSource)
 Synchronously initiates a message to this block. Called by an ISource block. When this function completes, the message will already have propagated into the block. More...
 
- Public Member Functions inherited from Concurrency::source_block< single_link_registry< ITarget< _Destination_type > >, _MessageProcessorType >
 source_block ()
 Constructs a source_block object. More...
 
virtual ~source_block ()
 Destroys the source_block object. More...
 
virtual void link_target (_Inout_ ITarget< _Target_type > *_PTarget)
 Links a target block to this source_block object. More...
 
virtual void unlink_target (_Inout_ ITarget< _Target_type > *_PTarget)
 Unlinks a target block from this source_block object. More...
 
virtual void unlink_targets ()
 Unlinks all target blocks from this source_block object. More...
 
virtual message< _Target_type > * accept (runtime_object_identity _MsgId, _Inout_ ITarget< _Target_type > *_PTarget)
 Accepts a message that was offered by this source_block object, transferring ownership to the caller. More...
 
virtual bool reserve (runtime_object_identity _MsgId, _Inout_ ITarget< _Target_type > *_PTarget)
 Reserves a message previously offered by this source_block object. More...
 
virtual message< _Target_type > * consume (runtime_object_identity _MsgId, _Inout_ ITarget< _Target_type > *_PTarget)
 Consumes a message previously offered by this source_block object and successfully reserved by the target, transferring ownership to the caller. More...
 
virtual void release (runtime_object_identity _MsgId, _Inout_ ITarget< _Target_type > *_PTarget)
 Releases a previous successful message reservation. More...
 
virtual void acquire_ref (_Inout_ ITarget< _Target_type > *)
 Acquires a reference count on this source_block object, to prevent deletion. More...
 
virtual void release_ref (_Inout_ ITarget< _Target_type > *_PTarget)
 Releases a reference count on this source_block object. More...
 
- Public Member Functions inherited from Concurrency::ISource< single_link_registry< ITarget< _Destination_type > >::type::type >
virtual ~ISource ()
 Destroys the ISource object. More...
 
virtual void link_target (_Inout_ ITarget< single_link_registry< ITarget< _Destination_type > >::type::type > *_PTarget)=0
 When overridden in a derived class, links a target block to this ISource block. More...
 
virtual void unlink_target (_Inout_ ITarget< single_link_registry< ITarget< _Destination_type > >::type::type > *_PTarget)=0
 When overridden in a derived class, unlinks a target block from this ISource block, if found to be previously linked. More...
 
virtual message< single_link_registry< ITarget< _Destination_type > >::type::type > * accept (runtime_object_identity _MsgId, _Inout_ ITarget< single_link_registry< ITarget< _Destination_type > >::type::type > *_PTarget)=0
 When overridden in a derived class, accepts a message that was offered by this ISource block, transferring ownership to the caller. More...
 
virtual bool reserve (runtime_object_identity _MsgId, _Inout_ ITarget< single_link_registry< ITarget< _Destination_type > >::type::type > *_PTarget)=0
 When overridden in a derived class, reserves a message previously offered by this ISource block. More...
 
virtual message< single_link_registry< ITarget< _Destination_type > >::type::type > * consume (runtime_object_identity _MsgId, _Inout_ ITarget< single_link_registry< ITarget< _Destination_type > >::type::type > *_PTarget)=0
 When overridden in a derived class, consumes a message previously offered by this ISource block and successfully reserved by the target, transferring ownership to the caller. More...
 
virtual void release (runtime_object_identity _MsgId, _Inout_ ITarget< single_link_registry< ITarget< _Destination_type > >::type::type > *_PTarget)=0
 When overridden in a derived class, releases a previous successful message reservation. More...
 
virtual void acquire_ref (_Inout_ ITarget< single_link_registry< ITarget< _Destination_type > >::type::type > *_PTarget)=0
 When overridden in a derived class, acquires a reference count on this ISource block, to prevent deletion. More...
 
virtual void release_ref (_Inout_ ITarget< single_link_registry< ITarget< _Destination_type > >::type::type > *_PTarget)=0
 When overridden in a derived class, releases a reference count on this ISource block. More...
 
- Public Member Functions inherited from Concurrency::ITarget< multi_link_registry< ISource< size_t > >::type::source_type >
virtual ~ITarget ()
 Destroys the ITarget object. More...
 
virtual message_status propagate (_Inout_opt_ message< multi_link_registry< ISource< size_t > >::type::source_type > *_PMessage, _Inout_opt_ ISource< multi_link_registry< ISource< size_t > >::type::source_type > *_PSource)=0
 When overridden in a derived class, asynchronously passes a message from a source block to this target block. More...
 
virtual message_status send (_Inout_ message< multi_link_registry< ISource< size_t > >::type::source_type > *_PMessage, _Inout_ ISource< multi_link_registry< ISource< size_t > >::type::source_type > *_PSource)=0
 When overridden in a derived class, synchronously passes a message to the target block. More...
 
virtual bool supports_anonymous_source ()
 When overridden in a derived class, returns true or false depending on whether the message block accepts messages offered by a source that is not linked to it. If the overridden method returns true, the target cannot postpone an offered message, as consumption of a postponed message at a later time requires the source to be identified in its source link registry. More...
 

Protected Member Functions

virtual message_status propagate_message (message< size_t > *_PMessage, ISource< size_t > *_PSource)
 Asynchronously passes a message from an ISource block to this ITarget block. It is invoked by the propagate method, when called by a source block. More...
 
virtual message< _Destination_type > * accept_message (runtime_object_identity _MsgId)
 Accepts an offered message by the source, transferring ownership to the caller. More...
 
virtual bool reserve_message (runtime_object_identity _MsgId)
 Reserves a message previously offered by the source. More...
 
virtual message< _Destination_type > * consume_message (runtime_object_identity _MsgId)
 Consumes a message previously offered by the source and reserved by the target, transferring ownership to the caller. More...
 
virtual void release_message (runtime_object_identity _MsgId)
 Releases a previous message reservation. More...
 
virtual void resume_propagation ()
 Resumes propagation after a reservation has been released More...
 
virtual void link_target_notification (_Inout_ ITarget< _Destination_type > *)
 Notification that a target was linked to this source. More...
 
virtual void propagate_to_any_targets (_Inout_opt_ message< _Destination_type > *)
 Takes the message and propagates it to all the targets of this join block. More...
 
- Protected Member Functions inherited from Concurrency::propagator_block< single_link_registry< ITarget< _Destination_type > >, multi_link_registry< ISource< size_t > > >
virtual message_status propagate_message (_Inout_ message< _Source_type > *_PMessage, _Inout_ ISource< _Source_type > *_PSource)=0
 When overridden in a derived class, this method asynchronously passes a message from an ISource block to this propagator_block object. It is invoked by the propagate method, when called by a source block. More...
 
virtual message_status send_message (_Inout_ message< _Source_type > *, _Inout_ ISource< _Source_type > *)
 When overridden in a derived class, this method synchronously passes a message from an ISource block to this propagator_block object. It is invoked by the send method, when called by a source block. More...
 
virtual void link_source (_Inout_ ISource< _Source_type > *_PSource)
 Links a specified source block to this propagator_block object. More...
 
virtual void unlink_source (_Inout_ ISource< _Source_type > *_PSource)
 Unlinks a specified source block from this propagator_block object. More...
 
virtual void unlink_sources ()
 Unlinks all source blocks from this propagator_block object. More...
 
virtual void process_input_messages (_Inout_ message< _Target_type > *)
 Process input messages. This is only useful for propagator blocks, which derive from source_block More...
 
void initialize_source_and_target (_Inout_opt_ Scheduler *_PScheduler=NULL, _Inout_opt_ ScheduleGroup *_PScheduleGroup=NULL)
 Initializes the base object. Specifically, the message_processor object needs to be initialized. More...
 
void register_filter (filter_method const &_Filter)
 Registers a filter method that will be invoked on every received message. More...
 
void decline_incoming_messages ()
 Indicates to the block that new messages should be declined. More...
 
void remove_network_links ()
 Removes all the source and target network links from this propagator_block object. More...
 
- Protected Member Functions inherited from Concurrency::source_block< single_link_registry< ITarget< _Destination_type > >, _MessageProcessorType >
virtual void link_target_notification (_Inout_ ITarget< _Target_type > *)
 A callback that notifies that a new target has been linked to this source_block object. More...
 
virtual void unlink_target_notification (_Inout_ ITarget< _Target_type > *_PTarget)
 A callback that notifies that a target has been unlinked from this source_block object. More...
 
virtual void propagate_output_messages ()
 Propagate messages to targets. More...
 
virtual void propagate_to_any_targets (_Inout_opt_ message< _Target_type > *_PMessage)
 When overridden in a derived class, propagates the given message to any or all of the linked targets. This is the main propagation routine for message blocks. More...
 
void initialize_source (_Inout_opt_ Scheduler *_PScheduler=NULL, _Inout_opt_ ScheduleGroup *_PScheduleGroup=NULL)
 Initializes the message_propagator within this source_block. More...
 
void enable_batched_processing ()
 Enables batched processing for this block. More...
 
virtual void sync_send (_Inout_opt_ message< _Target_type > *_Msg)
 Synchronously queues up messages and starts a propagation task, if this has not been done already. More...
 
virtual void async_send (_Inout_opt_ message< _Target_type > *_Msg)
 Asynchronously queues up messages and starts a propagation task, if this has not been done already More...
 
void wait_for_outstanding_async_sends ()
 Waits for all asynchronous propagations to complete. This propagator-specific spin wait is used in destructors of message blocks to make sure that all asynchronous propagations have time to finish before destroying the block. More...
 
void remove_targets ()
 Removes all target links for this source block. This should be called from the destructor. More...
 
- Protected Member Functions inherited from Concurrency::ISource< single_link_registry< ITarget< _Destination_type > >::type::type >
void _Invoke_link_source (ITarget< single_link_registry< ITarget< _Destination_type > >::type::type > *_PLinkFrom)
 Links this source to a target. More...
 
void _Invoke_unlink_source (ITarget< single_link_registry< ITarget< _Destination_type > >::type::type > *_PUnlinkFrom)
 Unlinks this source from a target. More...
 
- Protected Member Functions inherited from Concurrency::ITarget< multi_link_registry< ISource< size_t > >::type::source_type >
virtual void link_source (_Inout_ ISource< multi_link_registry< ISource< size_t > >::type::source_type > *_PSource)=0
 When overridden in a derived class, links a specified source block to this ITarget block. More...
 
virtual void unlink_source (_Inout_ ISource< multi_link_registry< ISource< size_t > >::type::source_type > *_PSource)=0
 When overridden in a derived class, unlinks a specified source block from this ITarget block. More...
 

Private Types

typedef single_link_registry< ITarget< _Destination_type > > _TargetLinkRegistry
 
typedef multi_link_registry< ISource< size_t > > _SourceLinkRegistry
 

Private Member Functions

template<int _Index>
bool _Try_consume_source_messages (_Destination_type &_Destination_tuple, ISource< size_t > **_Sources)
 Tries to reserve from all sources. If successful, it will consume all the messages More...
 
template<>
bool _Try_consume_source_messages (_Destination_type &, ISource< size_t > **)
 Provides a sentinel template specialization for _Try_consume_source_messages recursive template expansion. More...
 
bool _Non_greedy_acquire_messages ()
 Tries to acquire all of the messages from the _Non_greedy_nodes. Each node has already indicated that it has received a message that it can try to reserve. This function starts the reservation and consume process. More...
 
void _Propagate_priority_order (::Concurrency::details::_Queue< message< _Target_type >> &_MessageBuffer)
 Propagate messages in priority order More...
 
message< _Destination_type > * _Create_send_message ()
 Called when all the source messaging blocks have received their messages. The payloads are copied into local tuple and then packaged into a message to be propagated: _M_pSendMessage. More...
 
void _Delete_stored_messages ()
 Deletes all messages currently stored in this message block. Should be called by the destructor to ensure any messages propagated in are cleaned up. More...
 
template<int _Index>
void _Populate_destination_tuple (_Destination_type &_Destination_tuple, ISource< size_t > **_Sources)
 Copies payloads from all sources to destination tuple. More...
 
template<>
void _Populate_destination_tuple (_Destination_type &, ISource< size_t > **)
 Provides a sentinel template specialization for _Populate_destination_tuple recursive template expansion. More...
 
 _Join_node (const _Join_node &_Join)
 
_Join_node const & operator= (_Join_node const &)
 

Private Attributes

_Type _M_sourceTuple
 
volatile long _M_counter
 
::Concurrency::details::_Queue< message< _Destination_type > > _M_messageBuffer
 

Additional Inherited Members

- Public Types inherited from Concurrency::propagator_block< single_link_registry< ITarget< _Destination_type > >, multi_link_registry< ISource< size_t > > >
typedef multi_link_registry< ISource< size_t > >::type::source_type _Source_type
 The type of the payload for the incoming message to this propagator_block. More...
 
typedef source_link_manager< multi_link_registry< ISource< size_t > > > _SourceLinkManager
 The type of the source_link_manager this propagator_block. More...
 
typedef _SourceLinkManager::iterator source_iterator
 The type of the iterator for the source_link_manager for this propagator_block. More...
 
- Public Types inherited from Concurrency::source_block< single_link_registry< ITarget< _Destination_type > >, _MessageProcessorType >
typedef single_link_registry< ITarget< _Destination_type > >::type::type _Target_type
 The payload type of messages handled by this source_block. More...
 
typedef single_link_registry< ITarget< _Destination_type > >::iterator target_iterator
 The iterator to walk the connected targets. More...
 
- Public Types inherited from Concurrency::ISource< single_link_registry< ITarget< _Destination_type > >::type::type >
typedef single_link_registry< ITarget< _Destination_type > >::type::type source_type
 A type alias for _Type . More...
 
- Public Types inherited from Concurrency::ITarget< multi_link_registry< ISource< size_t > >::type::source_type >
typedef multi_link_registry< ISource< size_t > >::type::source_type type
 A type alias for _Type . More...
 
typedef std::function< bool(multi_link_registry< ISource< size_t > >::type::source_typeconst &)> filter_method
 The signature of any method used by the block that returns a bool value to determine whether an offered message should be accepted. More...
 
- Protected Attributes inherited from Concurrency::propagator_block< single_link_registry< ITarget< _Destination_type > >, multi_link_registry< ISource< size_t > > >
_SourceLinkManager _M_connectedSources
 The container for all the sources connected to this block. More...
 
filter_method_M_pFilter
 The filter function which determines whether offered messages should be accepted. More...
 
volatile bool _M_fDeclineMessages
 A bool that is set to indicate that all messages should be declined in preparation for deleting the block More...
 
- Protected Attributes inherited from Concurrency::source_block< single_link_registry< ITarget< _Destination_type > >, _MessageProcessorType >
ITarget< _Target_type > * _M_pReservedFor
 Connected target that is holding a reservation More...
 
runtime_object_identity _M_reservedId
 Reserved message ID More...
 
single_link_registry< ITarget< _Destination_type > > _M_connectedTargets
 Connected targets More...
 
_MessageProcessorType _M_messageProcessor
 Processor used for asynchronous message handling More...
 

Detailed Description

template<typename _Type, typename _Destination_type, join_type _Jtype>
class Concurrency::_Join_node< _Type, _Destination_type, _Jtype >

Defines a block allowing sources of distinct types to be joined. Join node is a single-target, multi-source ordered propagator block

Template Parameters
_TypeThe payload tuple type
_JtypeThe kind of join this is, either 'greedy' or 'non-greedy'

Member Typedef Documentation

template<typename _Type, typename _Destination_type, join_type _Jtype>
typedef multi_link_registry<ISource<size_t> > Concurrency::_Join_node< _Type, _Destination_type, _Jtype >::_SourceLinkRegistry
private
template<typename _Type, typename _Destination_type, join_type _Jtype>
typedef single_link_registry<ITarget<_Destination_type> > Concurrency::_Join_node< _Type, _Destination_type, _Jtype >::_TargetLinkRegistry
private

Constructor & Destructor Documentation

template<typename _Type, typename _Destination_type, join_type _Jtype>
Concurrency::_Join_node< _Type, _Destination_type, _Jtype >::_Join_node ( )
inline

Constructs a join within the default scheduler, and places it on any schedule group of the scheduler's choosing.

12024  : _M_counter(std::tuple_size<_Destination_type>::value)
12025  {
12027  }
volatile long _M_counter
Definition: agents.h:12548
void initialize_source_and_target(_Inout_opt_ Scheduler *_PScheduler=NULL, _Inout_opt_ ScheduleGroup *_PScheduleGroup=NULL)
Initializes the base object. Specifically, the message_processor object needs to be initialized...
Definition: agents.h:5827
template<typename _Type, typename _Destination_type, join_type _Jtype>
Concurrency::_Join_node< _Type, _Destination_type, _Jtype >::_Join_node ( Scheduler &  _PScheduler)
inline

Constructs a join within the specified scheduler, and places it on any schedule group of the scheduler's choosing.

Parameters
_PSchedulerA reference to a scheduler instance.
12037  : _M_counter(std::tuple_size<_Destination_type>::value)
12038  {
12039  this->initialize_source_and_target(&_PScheduler);
12040  }
volatile long _M_counter
Definition: agents.h:12548
void initialize_source_and_target(_Inout_opt_ Scheduler *_PScheduler=NULL, _Inout_opt_ ScheduleGroup *_PScheduleGroup=NULL)
Initializes the base object. Specifically, the message_processor object needs to be initialized...
Definition: agents.h:5827
template<typename _Type, typename _Destination_type, join_type _Jtype>
Concurrency::_Join_node< _Type, _Destination_type, _Jtype >::_Join_node ( ScheduleGroup &  _PScheduleGroup)
inline

Constructs a join within the specified schedule group. The scheduler is implied by the schedule group.

Parameters
_PScheduleGroupA reference to a schedule group.
12050  : _M_counter(std::tuple_size<_Destination_type>::value)
12051  {
12052  this->initialize_source_and_target(NULL, &_PScheduleGroup);
12053  }
volatile long _M_counter
Definition: agents.h:12548
void initialize_source_and_target(_Inout_opt_ Scheduler *_PScheduler=NULL, _Inout_opt_ ScheduleGroup *_PScheduleGroup=NULL)
Initializes the base object. Specifically, the message_processor object needs to be initialized...
Definition: agents.h:5827
#define NULL
Definition: corecrt.h:158
template<typename _Type, typename _Destination_type, join_type _Jtype>
Concurrency::_Join_node< _Type, _Destination_type, _Jtype >::~_Join_node ( )
inline

Cleans up any resources that may have been created by the join.

12060  {
12061  // Remove all links
12062  this->remove_network_links();
12063 
12064  // Clean up any messages left in this message block
12066  }
void _Delete_stored_messages()
Deletes all messages currently stored in this message block. Should be called by the destructor to en...
Definition: agents.h:12503
void remove_network_links()
Removes all the source and target network links from this propagator_block object.
Definition: agents.h:5866
template<typename _Type, typename _Destination_type, join_type _Jtype>
Concurrency::_Join_node< _Type, _Destination_type, _Jtype >::_Join_node ( const _Join_node< _Type, _Destination_type, _Jtype > &  _Join)
private

Member Function Documentation

template<typename _Type, typename _Destination_type, join_type _Jtype>
message<_Destination_type>* Concurrency::_Join_node< _Type, _Destination_type, _Jtype >::_Create_send_message ( )
inlineprivate

Called when all the source messaging blocks have received their messages. The payloads are copied into local tuple and then packaged into a message to be propagated: _M_pSendMessage.

12452  {
12453  _Destination_type _Destination_tuple;
12454 
12455  // Populate the sources buffer
12456  ISource<size_t> * _Sources[std::tuple_size<_Type>::value];
12457  size_t _Index = 0;
12458 
12459  // Get an iterator which will keep a reference on the connected sources
12460  source_iterator _Iter = this->_M_connectedSources.begin();
12461 
12462  while (*_Iter != NULL)
12463  {
12464  ISource<size_t> * _PSource = *_Iter;
12465 
12466  if (_PSource == NULL)
12467  {
12468  // One of the sources disconnected
12469  break;
12470  }
12471 
12472  // Avoid buffer overrun
12473  if (_Index >= std::tuple_size<_Type>::value)
12474  {
12475  // More sources that we expect
12476  break;
12477  }
12478 
12479  _Sources[_Index] = *_Iter;
12480  _Index++;
12481  ++_Iter;
12482  }
12483 
12484  // The order nodes should not have unlinked while the join node is
12485  // active.
12486  if (_Index != std::tuple_size<_Type>::value)
12487  {
12488  // On debug build assert to help debugging
12489  _CONCRT_ASSERT(_Index == std::tuple_size<_Type>::value);
12490  return NULL;
12491  }
12492 
12493  _Populate_destination_tuple<0>(_Destination_tuple, _Sources);
12494 
12495  return new message<_Destination_type>(_Destination_tuple);
12496  }
#define _CONCRT_ASSERT(x)
Definition: concrt.h:123
_In_ size_t _In_ int _Index
Definition: time.h:102
_SourceLinkManager _M_connectedSources
The container for all the sources connected to this block.
Definition: agents.h:5884
_SourceLinkManager::iterator source_iterator
The type of the iterator for the source_link_manager for this propagator_block.
Definition: agents.h:5597
_Pre_maybenull_ _Inout_ _Deref_prepost_z_ wchar_t const _PSource
Definition: wchar.h:148
#define NULL
Definition: corecrt.h:158
template<typename _Type, typename _Destination_type, join_type _Jtype>
void Concurrency::_Join_node< _Type, _Destination_type, _Jtype >::_Delete_stored_messages ( )
inlineprivate

Deletes all messages currently stored in this message block. Should be called by the destructor to ensure any messages propagated in are cleaned up.

12504  {
12505  // Delete any messages remaining in the output queue
12506  for (;;)
12507  {
12508  message<_Destination_type> * _Msg = _M_messageBuffer._Dequeue();
12509  if (_Msg == NULL)
12510  {
12511  break;
12512  }
12513  delete _Msg;
12514  }
12515  }
::Concurrency::details::_Queue< message< _Destination_type > > _M_messageBuffer
Definition: agents.h:12551
#define NULL
Definition: corecrt.h:158
template<typename _Type, typename _Destination_type, join_type _Jtype>
bool Concurrency::_Join_node< _Type, _Destination_type, _Jtype >::_Non_greedy_acquire_messages ( )
inlineprivate

Tries to acquire all of the messages from the _Non_greedy_nodes. Each node has already indicated that it has received a message that it can try to reserve. This function starts the reservation and consume process.

Returns
A bool indicating whether the reserve/consume of all messages succeeded.
12345  {
12346  _Destination_type _Destination_tuple;
12347 
12348  // Populate the sources buffer
12349  ISource<size_t> * _Sources[std::tuple_size<_Type>::value];
12350  size_t _Index = 0;
12351 
12352  // Get an iterator which will keep a reference on the connected sources
12353  source_iterator _Iter = this->_M_connectedSources.begin();
12354 
12355  while (*_Iter != NULL)
12356  {
12357  ISource<size_t> * _PSource = *_Iter;
12358 
12359  if (_PSource == NULL)
12360  {
12361  // One of the sources disconnected
12362  break;
12363  }
12364 
12365  if (_Index >= std::tuple_size<_Type>::value)
12366  {
12367  // More sources that we expect
12368  break;
12369  }
12370 
12371  _Sources[_Index] = _PSource;
12372  _Index++;
12373  ++_Iter;
12374  }
12375 
12376  // The order nodes should not have unlinked while the join node is
12377  // active.
12378 
12379  if (_Index != std::tuple_size<_Type>::value)
12380  {
12381  // On debug build assert to help debugging
12382  _CONCRT_ASSERT(_Index == std::tuple_size<_Type>::value);
12383  return false;
12384  }
12385 
12386  bool _IsAcquireSuccessful = _Try_consume_source_messages<0>(_Destination_tuple, _Sources);
12387 
12388  return _IsAcquireSuccessful;
12389  }
#define _CONCRT_ASSERT(x)
Definition: concrt.h:123
_In_ size_t _In_ int _Index
Definition: time.h:102
_SourceLinkManager _M_connectedSources
The container for all the sources connected to this block.
Definition: agents.h:5884
_SourceLinkManager::iterator source_iterator
The type of the iterator for the source_link_manager for this propagator_block.
Definition: agents.h:5597
_Pre_maybenull_ _Inout_ _Deref_prepost_z_ wchar_t const _PSource
Definition: wchar.h:148
#define NULL
Definition: corecrt.h:158
template<typename _Type, typename _Destination_type, join_type _Jtype>
template<int _Index>
void Concurrency::_Join_node< _Type, _Destination_type, _Jtype >::_Populate_destination_tuple ( _Destination_type &  _Destination_tuple,
ISource< size_t > **  _Sources 
)
inlineprivate

Copies payloads from all sources to destination tuple.

12523  {
12524  typedef _Order_node_base<typename std::remove_pointer_t<std::tuple_element_t<_Index, _Type>>::source_type> _Order_node_base_source_type;
12525  _Order_node_base_source_type * _Node = static_cast<_Order_node_base_source_type *>(_Sources[_Index]);
12526 
12527  std::get<_Index>(_Destination_tuple) = _Node->value();
12528  _Node->_Reset();
12529 
12530  _Populate_destination_tuple<_Index + 1>(_Destination_tuple, _Sources);
12531  }
single_link_registry< ITarget< _Destination_type > >::type::type source_type
A type alias for _Type .
Definition: agents.h:2737
_In_ size_t _In_ int _Index
Definition: time.h:102
template<typename _Type, typename _Destination_type, join_type _Jtype>
template<>
void Concurrency::_Join_node< _Type, _Destination_type, _Jtype >::_Populate_destination_tuple ( _Destination_type &  ,
ISource< size_t > **   
)
inlineprivate

Provides a sentinel template specialization for _Populate_destination_tuple recursive template expansion.

12539  {
12540  }
template<typename _Type, typename _Destination_type, join_type _Jtype>
void Concurrency::_Join_node< _Type, _Destination_type, _Jtype >::_Propagate_priority_order ( ::Concurrency::details::_Queue< message< _Target_type >> &  _MessageBuffer)
inlineprivate

Propagate messages in priority order

Parameters
_MessageBufferReference to a message queue with messages to be propagated
12399  {
12400  message<_Target_type> * _Msg = _MessageBuffer._Peek();
12401 
12402  // If someone has reserved the _Head message, don't propagate anymore
12403  if (this->_M_pReservedFor != NULL)
12404  {
12405  return;
12406  }
12407 
12408  while (_Msg != NULL)
12409  {
12410  message_status _Status = declined;
12411 
12412  // Always start from the first target that linked
12413  for (target_iterator _Iter = this->_M_connectedTargets.begin(); *_Iter != NULL; ++_Iter)
12414  {
12415  ITarget<_Target_type> * _PTarget = *_Iter;
12416  _Status = _PTarget->propagate(_Msg, this);
12417 
12418  // Ownership of message changed. Do not propagate this
12419  // message to any other target.
12420  if (_Status == accepted)
12421  {
12422  break;
12423  }
12424 
12425  // If the target just propagated to reserved this message, stop
12426  // propagating it to others
12427  if (this->_M_pReservedFor != NULL)
12428  {
12429  break;
12430  }
12431  }
12432 
12433  // If status is anything other than accepted, then the head message
12434  // was not propagated out. Thus, nothing after it in the queue can
12435  // be propagated out. Cease propagation.
12436  if (_Status != accepted)
12437  {
12438  break;
12439  }
12440 
12441  // Get the next message
12442  _Msg = _MessageBuffer._Peek();
12443  }
12444  }
_Message * _Peek()
Definition: agents.h:227
single_link_registry< ITarget< _Destination_type > >::iterator target_iterator
The iterator to walk the connected targets.
Definition: agents.h:4893
single_link_registry< ITarget< _Destination_type > > _M_connectedTargets
Connected targets
Definition: agents.h:5485
The target did not accept the message.
Definition: agents.h:1751
message_status
The valid responses for an offer of a message object to a block.
Definition: agents.h:1740
The target accepted the message.
Definition: agents.h:1746
ITarget< _Target_type > * _M_pReservedFor
Connected target that is holding a reservation
Definition: agents.h:5473
#define NULL
Definition: corecrt.h:158
template<typename _Type, typename _Destination_type, join_type _Jtype>
template<int _Index>
bool Concurrency::_Join_node< _Type, _Destination_type, _Jtype >::_Try_consume_source_messages ( _Destination_type &  _Destination_tuple,
ISource< size_t > **  _Sources 
)
inlineprivate

Tries to reserve from all sources. If successful, it will consume all the messages

Returns
A bool indicating whether the consumption attempt worked.
Template Parameters
_IndexThe highest-number index of the join's sources
12288  {
12289  typedef _Non_greedy_node<typename std::remove_pointer_t<std::tuple_element_t<_Index, _Type>>::source_type> _Non_greedy_node_source_type;
12290  _Non_greedy_node_source_type * _Node = static_cast<_Non_greedy_node_source_type *>(_Sources[_Index]);
12291 
12292  // Increment the counter once for each reservation
12294 
12295  if (_Node->_Reserve_received_message())
12296  {
12297  bool _Ret_val = _Try_consume_source_messages<_Index + 1>(_Destination_tuple, _Sources);
12298 
12299  if (_Ret_val)
12300  {
12301  _Node->_Consume_received_message();
12302  }
12303  else
12304  {
12305  if (_Node->_Release_received_message())
12306  {
12307  // If _Release_received_message() restored the ID, decrement the count for that
12308  // restoration
12309  if (_InterlockedDecrement(&_M_counter) == 0)
12310  {
12311  this->async_send(NULL);
12312  }
12313  }
12314  }
12315 
12316  return _Ret_val;
12317  }
12318 
12319  return false;
12320  }
single_link_registry< ITarget< _Destination_type > >::type::type source_type
A type alias for _Type .
Definition: agents.h:2737
volatile long _M_counter
Definition: agents.h:12548
_In_ size_t _In_ int _Index
Definition: time.h:102
long __cdecl _InterlockedDecrement(long volatile *)
long __cdecl _InterlockedIncrement(long volatile *)
virtual void async_send(_Inout_opt_ message< _Target_type > *_Msg)
Asynchronously queues up messages and starts a propagation task, if this has not been done already ...
Definition: agents.h:5435
#define NULL
Definition: corecrt.h:158
template<typename _Type, typename _Destination_type, join_type _Jtype>
template<>
bool Concurrency::_Join_node< _Type, _Destination_type, _Jtype >::_Try_consume_source_messages ( _Destination_type &  ,
ISource< size_t > **   
)
inlineprivate

Provides a sentinel template specialization for _Try_consume_source_messages recursive template expansion.

Returns
A bool indicating whether the consumption attempt worked.
12331  {
12332  return true;
12333  }
template<typename _Type, typename _Destination_type, join_type _Jtype>
virtual message<_Destination_type>* Concurrency::_Join_node< _Type, _Destination_type, _Jtype >::accept_message ( runtime_object_identity  _MsgId)
inlineprotectedvirtual

Accepts an offered message by the source, transferring ownership to the caller.

Parameters
_MsgIdThe runtime object identity of the message.
Returns
A pointer to the message that the caller now has ownership of.

Implements Concurrency::source_block< single_link_registry< ITarget< _Destination_type > >, _MessageProcessorType >.

12126  {
12127  //
12128  // Peek at the head message in the message buffer. If the IDs match
12129  // dequeue and transfer ownership
12130  //
12131  message<_Destination_type> * _Msg = NULL;
12132 
12133  if (_M_messageBuffer._Is_head(_MsgId))
12134  {
12135  _Msg = _M_messageBuffer._Dequeue();
12136  }
12137 
12138  return _Msg;
12139  }
::Concurrency::details::_Queue< message< _Destination_type > > _M_messageBuffer
Definition: agents.h:12551
#define NULL
Definition: corecrt.h:158
template<typename _Type, typename _Destination_type, join_type _Jtype>
virtual message<_Destination_type>* Concurrency::_Join_node< _Type, _Destination_type, _Jtype >::consume_message ( runtime_object_identity  _MsgId)
inlineprotectedvirtual

Consumes a message previously offered by the source and reserved by the target, transferring ownership to the caller.

Parameters
_MsgIdThe runtime object identity of the message.
Returns
A pointer to the message that the caller now has ownership of.

consume_message is similar to accept, but is always preceded by a call to reserve.

Implements Concurrency::source_block< single_link_registry< ITarget< _Destination_type > >, _MessageProcessorType >.

12176  {
12177  // By default, accept the message
12178  return accept_message(_MsgId);
12179  }
virtual message< _Destination_type > * accept_message(runtime_object_identity _MsgId)
Accepts an offered message by the source, transferring ownership to the caller.
Definition: agents.h:12125
template<typename _Type, typename _Destination_type, join_type _Jtype>
virtual void Concurrency::_Join_node< _Type, _Destination_type, _Jtype >::link_target_notification ( _Inout_ ITarget< _Destination_type > *  )
inlineprotectedvirtual

Notification that a target was linked to this source.

Parameters
_PTargetA pointer to the newly linked target.
12218  {
12219  // There is only a single target.
12221  }
::Concurrency::details::_Queue< message< _Destination_type > > _M_messageBuffer
Definition: agents.h:12551
void _Propagate_priority_order(::Concurrency::details::_Queue< message< _Target_type >> &_MessageBuffer)
Propagate messages in priority order
Definition: agents.h:12398
template<typename _Type, typename _Destination_type, join_type _Jtype>
_Join_node const& Concurrency::_Join_node< _Type, _Destination_type, _Jtype >::operator= ( _Join_node< _Type, _Destination_type, _Jtype > const &  )
private
template<typename _Type, typename _Destination_type, join_type _Jtype>
virtual message_status Concurrency::_Join_node< _Type, _Destination_type, _Jtype >::propagate_message ( message< size_t > *  _PMessage,
ISource< size_t > *  _PSource 
)
inlineprotectedvirtual

Asynchronously passes a message from an ISource block to this ITarget block. It is invoked by the propagate method, when called by a source block.

Parameters
_PMessageA pointer to the message object.
_PSourceA pointer to the source block offering the message.
Returns
A message_status indication of what the target decided to do with the message.
12086  {
12087  // This join block is connected to the _Order_node sources, which know not to send
12088  // any more messages until join propagates them further. That is why join can
12089  // always accept the incoming messages.
12090 
12091  _PMessage = _PSource->accept(_PMessage->msg_id(), this);
12092 
12093  //
12094  // Source block created an int message only to notify join that the real
12095  // payload is available. There is no need to keep this message around.
12096  //
12097  _CONCRT_ASSERT(_PMessage != NULL);
12098  delete _PMessage;
12099 
12100  long _Ret_val = _InterlockedDecrement(&_M_counter);
12101 
12102  _CONCRT_ASSERT(_Ret_val >= 0);
12103 
12104  if (_Ret_val == 0)
12105  {
12106  //
12107  // All source messages are now received so join can propagate them further
12108  //
12109  this->async_send(NULL);
12110  }
12111 
12112  return accepted;
12113  }
#define _CONCRT_ASSERT(x)
Definition: concrt.h:123
volatile long _M_counter
Definition: agents.h:12548
virtual message< _Type > * accept(runtime_object_identity _MsgId, _Inout_ ITarget< _Type > *_PTarget)=0
When overridden in a derived class, accepts a message that was offered by this ISource block...
runtime_object_identity msg_id() const
Returns the ID of the message object.
Definition: agents.h:1861
The target accepted the message.
Definition: agents.h:1746
long __cdecl _InterlockedDecrement(long volatile *)
virtual void async_send(_Inout_opt_ message< _Target_type > *_Msg)
Asynchronously queues up messages and starts a propagation task, if this has not been done already ...
Definition: agents.h:5435
#define NULL
Definition: corecrt.h:158
template<typename _Type, typename _Destination_type, join_type _Jtype>
virtual void Concurrency::_Join_node< _Type, _Destination_type, _Jtype >::propagate_to_any_targets ( _Inout_opt_ message< _Destination_type > *  )
inlineprotectedvirtual

Takes the message and propagates it to all the targets of this join block.

Parameters
_PMessageA pointer to a new message.

This function packages source payloads into a tuple message and immediately sends it to the targets.

12234  {
12235  message<_Destination_type> * _Msg = NULL;
12236 
12237  if (_M_counter == 0)
12238  {
12239  bool fIsNonGreedy = (_Jtype == non_greedy);
12240 
12241  if (fIsNonGreedy)
12242  {
12244  {
12245  return;
12246  }
12247  }
12248 
12249  if (!fIsNonGreedy)
12250  {
12251  // Because a greedy join has captured all input, we can reset
12252  // the counter to the total number of inputs
12253  _InterlockedExchange(&_M_counter, std::tuple_size<_Destination_type>::value);
12254  }
12255 
12256  _Msg = _Create_send_message();
12257  }
12258 
12259  if (_Msg != NULL)
12260  {
12261  _M_messageBuffer._Enqueue(_Msg);
12262 
12263  if (!_M_messageBuffer._Is_head(_Msg->msg_id()))
12264  {
12265  // another message is at the head of the outbound message queue and blocked
12266  // simply return
12267  return;
12268  }
12269  }
12270 
12272  }
bool _Non_greedy_acquire_messages()
Tries to acquire all of the messages from the _Non_greedy_nodes. Each node has already indicated that...
Definition: agents.h:12344
Non-greedy join messaging blocks postpone messages and try and consume them after all have arrived...
Definition: agents.h:9132
::Concurrency::details::_Queue< message< _Destination_type > > _M_messageBuffer
Definition: agents.h:12551
volatile long _M_counter
Definition: agents.h:12548
message< _Destination_type > * _Create_send_message()
Called when all the source messaging blocks have received their messages. The payloads are copied int...
Definition: agents.h:12451
void _Propagate_priority_order(::Concurrency::details::_Queue< message< _Target_type >> &_MessageBuffer)
Propagate messages in priority order
Definition: agents.h:12398
#define NULL
Definition: corecrt.h:158
template<typename _Type, typename _Destination_type, join_type _Jtype>
virtual void Concurrency::_Join_node< _Type, _Destination_type, _Jtype >::release_message ( runtime_object_identity  _MsgId)
inlineprotectedvirtual

Releases a previous message reservation.

Parameters
_MsgIdThe runtime object identity of the message.

Implements Concurrency::source_block< single_link_registry< ITarget< _Destination_type > >, _MessageProcessorType >.

12189  {
12190  // The head message is the one reserved.
12191  if (!_M_messageBuffer._Is_head(_MsgId))
12192  {
12193  throw message_not_found();
12194  }
12195  }
::Concurrency::details::_Queue< message< _Destination_type > > _M_messageBuffer
Definition: agents.h:12551
template<typename _Type, typename _Destination_type, join_type _Jtype>
virtual bool Concurrency::_Join_node< _Type, _Destination_type, _Jtype >::reserve_message ( runtime_object_identity  _MsgId)
inlineprotectedvirtual

Reserves a message previously offered by the source.

Parameters
_MsgIdThe runtime object identity of the message.
Returns
A bool indicating whether the reservation worked or not.

After reserve is called, if it returns true, either consume or release must be called to either take or release ownership of the message.

Implements Concurrency::source_block< single_link_registry< ITarget< _Destination_type > >, _MessageProcessorType >.

12156  {
12157  // Allow reservation if this is the head message
12158  return _M_messageBuffer._Is_head(_MsgId);
12159  }
::Concurrency::details::_Queue< message< _Destination_type > > _M_messageBuffer
Definition: agents.h:12551
template<typename _Type, typename _Destination_type, join_type _Jtype>
virtual void Concurrency::_Join_node< _Type, _Destination_type, _Jtype >::resume_propagation ( )
inlineprotectedvirtual

Resumes propagation after a reservation has been released

Implements Concurrency::source_block< single_link_registry< ITarget< _Destination_type > >, _MessageProcessorType >.

12202  {
12203  // If there are any messages in the buffer, propagate them out
12204  if (_M_messageBuffer._Count() > 0)
12205  {
12206  this->async_send(NULL);
12207  }
12208  }
::Concurrency::details::_Queue< message< _Destination_type > > _M_messageBuffer
Definition: agents.h:12551
virtual void async_send(_Inout_opt_ message< _Target_type > *_Msg)
Asynchronously queues up messages and starts a propagation task, if this has not been done already ...
Definition: agents.h:5435
#define NULL
Definition: corecrt.h:158

Member Data Documentation

template<typename _Type, typename _Destination_type, join_type _Jtype>
volatile long Concurrency::_Join_node< _Type, _Destination_type, _Jtype >::_M_counter
private
template<typename _Type, typename _Destination_type, join_type _Jtype>
::Concurrency::details::_Queue<message<_Destination_type> > Concurrency::_Join_node< _Type, _Destination_type, _Jtype >::_M_messageBuffer
private
template<typename _Type, typename _Destination_type, join_type _Jtype>
_Type Concurrency::_Join_node< _Type, _Destination_type, _Jtype >::_M_sourceTuple
private

The documentation for this class was generated from the following file: