STLdoc
STLdocumentation
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
Public Member Functions | Protected Member Functions | Private Member Functions | Private Attributes | List of all members
Concurrency::unbounded_buffer< _Type > Class Template Reference

An unbounded_buffer messaging block is a multi-target, multi-source, ordered propagator_block capable of storing an unbounded number of messages. More...

#include <agents.h>

Inheritance diagram for Concurrency::unbounded_buffer< _Type >:
Concurrency::propagator_block< multi_link_registry< ITarget< _Type > >, multi_link_registry< ISource< _Type > > > Concurrency::source_block< multi_link_registry< ITarget< _Type > >, _MessageProcessorType > Concurrency::ITarget< multi_link_registry< ISource< _Type > >::type::source_type > Concurrency::ISource< multi_link_registry< ITarget< _Type > >::type::type >

Public Member Functions

 unbounded_buffer ()
 Constructs an unbounded_buffer messaging block. More...
 
 unbounded_buffer (filter_method const &_Filter)
 Constructs an unbounded_buffer messaging block. More...
 
 ~unbounded_buffer ()
 Destroys the unbounded_buffer messaging block. More...
 
bool enqueue (_Type const &_Item)
 Adds an item to the unbounded_buffer messaging block. More...
 
_Type dequeue ()
 Removes an item from the unbounded_buffer messaging block. More...
 
- Public Member Functions inherited from Concurrency::propagator_block< multi_link_registry< ITarget< _Type > >, multi_link_registry< ISource< _Type > > >
 propagator_block ()
 Constructs a propagator_block object. More...
 
virtual ~propagator_block ()
 Destroys a propagator_block object. More...
 
virtual message_status propagate (_Inout_opt_ message< _Source_type > *_PMessage, _Inout_opt_ ISource< _Source_type > *_PSource)
 Asynchronously passes a message from a source block to this target block. More...
 
virtual message_status send (_Inout_ message< _Source_type > *_PMessage, _Inout_ ISource< _Source_type > *_PSource)
 Synchronously initiates a message to this block. Called by an ISource block. When this function completes, the message will already have propagated into the block. More...
 
- Public Member Functions inherited from Concurrency::source_block< multi_link_registry< ITarget< _Type > >, _MessageProcessorType >
 source_block ()
 Constructs a source_block object. More...
 
virtual ~source_block ()
 Destroys the source_block object. More...
 
virtual void link_target (_Inout_ ITarget< _Target_type > *_PTarget)
 Links a target block to this source_block object. More...
 
virtual void unlink_target (_Inout_ ITarget< _Target_type > *_PTarget)
 Unlinks a target block from this source_block object. More...
 
virtual void unlink_targets ()
 Unlinks all target blocks from this source_block object. More...
 
virtual message< _Target_type > * accept (runtime_object_identity _MsgId, _Inout_ ITarget< _Target_type > *_PTarget)
 Accepts a message that was offered by this source_block object, transferring ownership to the caller. More...
 
virtual bool reserve (runtime_object_identity _MsgId, _Inout_ ITarget< _Target_type > *_PTarget)
 Reserves a message previously offered by this source_block object. More...
 
virtual message< _Target_type > * consume (runtime_object_identity _MsgId, _Inout_ ITarget< _Target_type > *_PTarget)
 Consumes a message previously offered by this source_block object and successfully reserved by the target, transferring ownership to the caller. More...
 
virtual void release (runtime_object_identity _MsgId, _Inout_ ITarget< _Target_type > *_PTarget)
 Releases a previous successful message reservation. More...
 
virtual void acquire_ref (_Inout_ ITarget< _Target_type > *)
 Acquires a reference count on this source_block object, to prevent deletion. More...
 
virtual void release_ref (_Inout_ ITarget< _Target_type > *_PTarget)
 Releases a reference count on this source_block object. More...
 
- Public Member Functions inherited from Concurrency::ISource< multi_link_registry< ITarget< _Type > >::type::type >
virtual ~ISource ()
 Destroys the ISource object. More...
 
virtual void link_target (_Inout_ ITarget< multi_link_registry< ITarget< _Type > >::type::type > *_PTarget)=0
 When overridden in a derived class, links a target block to this ISource block. More...
 
virtual void unlink_target (_Inout_ ITarget< multi_link_registry< ITarget< _Type > >::type::type > *_PTarget)=0
 When overridden in a derived class, unlinks a target block from this ISource block, if found to be previously linked. More...
 
virtual message< multi_link_registry< ITarget< _Type > >::type::type > * accept (runtime_object_identity _MsgId, _Inout_ ITarget< multi_link_registry< ITarget< _Type > >::type::type > *_PTarget)=0
 When overridden in a derived class, accepts a message that was offered by this ISource block, transferring ownership to the caller. More...
 
virtual bool reserve (runtime_object_identity _MsgId, _Inout_ ITarget< multi_link_registry< ITarget< _Type > >::type::type > *_PTarget)=0
 When overridden in a derived class, reserves a message previously offered by this ISource block. More...
 
virtual message< multi_link_registry< ITarget< _Type > >::type::type > * consume (runtime_object_identity _MsgId, _Inout_ ITarget< multi_link_registry< ITarget< _Type > >::type::type > *_PTarget)=0
 When overridden in a derived class, consumes a message previously offered by this ISource block and successfully reserved by the target, transferring ownership to the caller. More...
 
virtual void release (runtime_object_identity _MsgId, _Inout_ ITarget< multi_link_registry< ITarget< _Type > >::type::type > *_PTarget)=0
 When overridden in a derived class, releases a previous successful message reservation. More...
 
virtual void acquire_ref (_Inout_ ITarget< multi_link_registry< ITarget< _Type > >::type::type > *_PTarget)=0
 When overridden in a derived class, acquires a reference count on this ISource block, to prevent deletion. More...
 
virtual void release_ref (_Inout_ ITarget< multi_link_registry< ITarget< _Type > >::type::type > *_PTarget)=0
 When overridden in a derived class, releases a reference count on this ISource block. More...
 
- Public Member Functions inherited from Concurrency::ITarget< multi_link_registry< ISource< _Type > >::type::source_type >
virtual ~ITarget ()
 Destroys the ITarget object. More...
 
virtual message_status propagate (_Inout_opt_ message< multi_link_registry< ISource< _Type > >::type::source_type > *_PMessage, _Inout_opt_ ISource< multi_link_registry< ISource< _Type > >::type::source_type > *_PSource)=0
 When overridden in a derived class, asynchronously passes a message from a source block to this target block. More...
 
virtual message_status send (_Inout_ message< multi_link_registry< ISource< _Type > >::type::source_type > *_PMessage, _Inout_ ISource< multi_link_registry< ISource< _Type > >::type::source_type > *_PSource)=0
 When overridden in a derived class, synchronously passes a message to the target block. More...
 

Protected Member Functions

virtual message_status propagate_message (_Inout_ message< _Type > *_PMessage, _Inout_ ISource< _Type > *_PSource)
 Asynchronously passes a message from an ISource block to this unbounded_buffer messaging block. It is invoked by the propagate method, when called by a source block. More...
 
virtual message_status send_message (_Inout_ message< _Type > *_PMessage, _Inout_ ISource< _Type > *_PSource)
 Synchronously passes a message from an ISource block to this unbounded_buffer messaging block. It is invoked by the send method, when called by a source block. More...
 
virtual bool supports_anonymous_source ()
 Overrides the supports_anonymous_source method to indicate that this block can accept messages offered to it by a source that is not linked. More...
 
virtual message< _Type > * accept_message (runtime_object_identity _MsgId)
 Accepts a message that was offered by this unbounded_buffer messaging block, transferring ownership to the caller. More...
 
virtual bool reserve_message (runtime_object_identity _MsgId)
 Reserves a message previously offered by this unbounded_buffer messaging block. More...
 
virtual message< _Type > * consume_message (runtime_object_identity _MsgId)
 Consumes a message previously offered by the unbounded_buffer messaging block and reserved by the target, transferring ownership to the caller. More...
 
virtual void release_message (runtime_object_identity _MsgId)
 Releases a previous message reservation. More...
 
virtual void resume_propagation ()
 Resumes propagation after a reservation has been released. More...
 
virtual void link_target_notification (_Inout_ ITarget< _Type > *_PTarget)
 A callback that notifies that a new target has been linked to this unbounded_buffer messaging block. More...
 
virtual void process_input_messages (_Inout_ message< _Type > *_PMessage)
 Places the message _PMessage in this unbounded_buffer messaging block and tries to offer it to all of the linked targets. More...
 
virtual void propagate_output_messages ()
 Places the message _PMessage in this unbounded_buffer messaging block and tries to offer it to all of the linked targets. More...
 
- Protected Member Functions inherited from Concurrency::propagator_block< multi_link_registry< ITarget< _Type > >, multi_link_registry< ISource< _Type > > >
virtual message_status propagate_message (_Inout_ message< _Source_type > *_PMessage, _Inout_ ISource< _Source_type > *_PSource)=0
 When overridden in a derived class, this method asynchronously passes a message from an ISource block to this propagator_block object. It is invoked by the propagate method, when called by a source block. More...
 
virtual message_status send_message (_Inout_ message< _Source_type > *, _Inout_ ISource< _Source_type > *)
 When overridden in a derived class, this method synchronously passes a message from an ISource block to this propagator_block object. It is invoked by the send method, when called by a source block. More...
 
virtual void link_source (_Inout_ ISource< _Source_type > *_PSource)
 Links a specified source block to this propagator_block object. More...
 
virtual void unlink_source (_Inout_ ISource< _Source_type > *_PSource)
 Unlinks a specified source block from this propagator_block object. More...
 
virtual void unlink_sources ()
 Unlinks all source blocks from this propagator_block object. More...
 
virtual void process_input_messages (_Inout_ message< _Target_type > *_PMessage)
 Process input messages. This is only useful for propagator blocks, which derive from source_block More...
 
void initialize_source_and_target (_Inout_opt_ Scheduler *_PScheduler=NULL, _Inout_opt_ ScheduleGroup *_PScheduleGroup=NULL)
 Initializes the base object. Specifically, the message_processor object needs to be initialized. More...
 
void register_filter (filter_method const &_Filter)
 Registers a filter method that will be invoked on every received message. More...
 
void decline_incoming_messages ()
 Indicates to the block that new messages should be declined. More...
 
void remove_network_links ()
 Removes all the source and target network links from this propagator_block object. More...
 
- Protected Member Functions inherited from Concurrency::source_block< multi_link_registry< ITarget< _Type > >, _MessageProcessorType >
virtual void link_target_notification (_Inout_ ITarget< _Target_type > *)
 A callback that notifies that a new target has been linked to this source_block object. More...
 
virtual void unlink_target_notification (_Inout_ ITarget< _Target_type > *_PTarget)
 A callback that notifies that a target has been unlinked from this source_block object. More...
 
virtual void propagate_to_any_targets (_Inout_opt_ message< _Target_type > *_PMessage)
 When overridden in a derived class, propagates the given message to any or all of the linked targets. This is the main propagation routine for message blocks. More...
 
void initialize_source (_Inout_opt_ Scheduler *_PScheduler=NULL, _Inout_opt_ ScheduleGroup *_PScheduleGroup=NULL)
 Initializes the message_propagator within this source_block. More...
 
void enable_batched_processing ()
 Enables batched processing for this block. More...
 
virtual void sync_send (_Inout_opt_ message< _Target_type > *_Msg)
 Synchronously queues up messages and starts a propagation task, if this has not been done already. More...
 
virtual void async_send (_Inout_opt_ message< _Target_type > *_Msg)
 Asynchronously queues up messages and starts a propagation task, if this has not been done already More...
 
void wait_for_outstanding_async_sends ()
 Waits for all asynchronous propagations to complete. This propagator-specific spin wait is used in destructors of message blocks to make sure that all asynchronous propagations have time to finish before destroying the block. More...
 
void remove_targets ()
 Removes all target links for this source block. This should be called from the destructor. More...
 
- Protected Member Functions inherited from Concurrency::ISource< multi_link_registry< ITarget< _Type > >::type::type >
void _Invoke_link_source (ITarget< multi_link_registry< ITarget< _Type > >::type::type > *_PLinkFrom)
 Links this source to a target. More...
 
void _Invoke_unlink_source (ITarget< multi_link_registry< ITarget< _Type > >::type::type > *_PUnlinkFrom)
 Unlinks this source from a target. More...
 
- Protected Member Functions inherited from Concurrency::ITarget< multi_link_registry< ISource< _Type > >::type::source_type >
virtual void link_source (_Inout_ ISource< multi_link_registry< ISource< _Type > >::type::source_type > *_PSource)=0
 When overridden in a derived class, links a specified source block to this ITarget block. More...
 
virtual void unlink_source (_Inout_ ISource< multi_link_registry< ISource< _Type > >::type::source_type > *_PSource)=0
 When overridden in a derived class, unlinks a specified source block from this ITarget block. More...
 

Private Member Functions

void _Propagate_priority_order (::Concurrency::details::_Queue< message< _Target_type >> &_MessageBuffer)
 Propagates messages in priority order. More...
 
void _Delete_stored_messages ()
 Deletes all messages currently stored in this message block. Should be called by the destructor to ensure any messages propagated in are cleaned up. More...
 
unbounded_buffer const & operator= (unbounded_buffer const &)
 
 unbounded_buffer (unbounded_buffer const &)
 

Private Attributes

::Concurrency::details::_Queue< message< _Type > > _M_processedMessages
 Message queue used to store processed messages More...
 
::Concurrency::details::_Queue< message< _Type > > _M_messageBuffer
 Message queue used to store messages More...
 
bool _M_fForceRepropagation
 A bool to signal to the processor to force a repropagation to occur More...
 

Additional Inherited Members

- Public Types inherited from Concurrency::propagator_block< multi_link_registry< ITarget< _Type > >, multi_link_registry< ISource< _Type > > >
typedef multi_link_registry< ISource< _Type > >::type::source_type _Source_type
 The type of the payload for the incoming message to this propagator_block. More...
 
typedef source_link_manager< multi_link_registry< ISource< _Type > > > _SourceLinkManager
 The type of the source_link_manager this propagator_block. More...
 
typedef _SourceLinkManager::iterator source_iterator
 The type of the iterator for the source_link_manager for this propagator_block. More...
 
- Public Types inherited from Concurrency::source_block< multi_link_registry< ITarget< _Type > >, _MessageProcessorType >
typedef multi_link_registry< ITarget< _Type > >::type::type _Target_type
 The payload type of messages handled by this source_block. More...
 
typedef multi_link_registry< ITarget< _Type > >::iterator target_iterator
 The iterator to walk the connected targets. More...
 
- Public Types inherited from Concurrency::ISource< multi_link_registry< ITarget< _Type > >::type::type >
typedef multi_link_registry< ITarget< _Type > >::type::type source_type
 A type alias for _Type . More...
 
- Public Types inherited from Concurrency::ITarget< multi_link_registry< ISource< _Type > >::type::source_type >
typedef multi_link_registry< ISource< _Type > >::type::source_type type
 A type alias for _Type . More...
 
typedef std::tr1::function< bool(multi_link_registry< ISource< _Type > >::type::source_typeconst &)> filter_method
 The signature of any method used by the block that returns a bool value to determine whether an offered message should be accepted. More...
 
- Protected Attributes inherited from Concurrency::propagator_block< multi_link_registry< ITarget< _Type > >, multi_link_registry< ISource< _Type > > >
_SourceLinkManager _M_connectedSources
 The container for all the sources connected to this block. More...
 
filter_method_M_pFilter
 The filter function which determines whether offered messages should be accepted. More...
 
volatile bool _M_fDeclineMessages
 A bool that is set to indicate that all messages should be declined in preparation for deleting the block More...
 
- Protected Attributes inherited from Concurrency::source_block< multi_link_registry< ITarget< _Type > >, _MessageProcessorType >
ITarget< _Target_type > * _M_pReservedFor
 Connected target that is holding a reservation More...
 
runtime_object_identity _M_reservedId
 Reserved message ID More...
 
multi_link_registry< ITarget< _Type > > _M_connectedTargets
 Connected targets More...
 
_MessageProcessorType _M_messageProcessor
 Processor used for asynchronous message handling More...
 

Detailed Description

template<class _Type>
class Concurrency::unbounded_buffer< _Type >

An unbounded_buffer messaging block is a multi-target, multi-source, ordered propagator_block capable of storing an unbounded number of messages.

Template Parameters
_TypeThe payload type of the messages stored and propagated by the buffer.

For more information, see Asynchronous Message Blocks.

See also
overwrite_buffer Class, single_assignment Class

Constructor & Destructor Documentation

template<class _Type >
Concurrency::unbounded_buffer< _Type >::unbounded_buffer ( )
inline

Constructs an unbounded_buffer messaging block.

The runtime uses the default scheduler if you do not specify the _PScheduler or _PScheduleGroup parameters.

The type filter_method is a functor with signature bool (_Type const &) which is invoked by this unbounded_buffer messaging block to determine whether or not it should accept an offered message.

See also
Scheduler Class, ScheduleGroup Class
4280  :
4281  _M_fForceRepropagation(false)
4282  {
4285  }
bool _M_fForceRepropagation
A bool to signal to the processor to force a repropagation to occur
Definition: agents.h:4835
void enable_batched_processing()
Enables batched processing for this block.
Definition: agents.h:3745
void initialize_source_and_target(_Inout_opt_ Scheduler *_PScheduler=NULL, _Inout_opt_ ScheduleGroup *_PScheduleGroup=NULL)
Initializes the base object. Specifically, the message_processor object needs to be initialized...
Definition: agents.h:4173
template<class _Type >
Concurrency::unbounded_buffer< _Type >::unbounded_buffer ( filter_method const &  _Filter)
inline

Constructs an unbounded_buffer messaging block.

Parameters
_FilterA filter function which determines whether offered messages should be accepted.

The runtime uses the default scheduler if you do not specify the _PScheduler or _PScheduleGroup parameters.

The type filter_method is a functor with signature bool (_Type const &) which is invoked by this unbounded_buffer messaging block to determine whether or not it should accept an offered message.

See also
Scheduler Class, ScheduleGroup Class
4303  :
4304  _M_fForceRepropagation(false)
4305  {
4308  register_filter(_Filter);
4309  }
bool _M_fForceRepropagation
A bool to signal to the processor to force a repropagation to occur
Definition: agents.h:4835
void enable_batched_processing()
Enables batched processing for this block.
Definition: agents.h:3745
void initialize_source_and_target(_Inout_opt_ Scheduler *_PScheduler=NULL, _Inout_opt_ ScheduleGroup *_PScheduleGroup=NULL)
Initializes the base object. Specifically, the message_processor object needs to be initialized...
Definition: agents.h:4173
void register_filter(filter_method const &_Filter)
Registers a filter method that will be invoked on every received message.
Definition: agents.h:4188
template<class _Type >
Concurrency::unbounded_buffer< _Type >::~unbounded_buffer ( )
inline

Destroys the unbounded_buffer messaging block.

4420  {
4421  // Remove all links
4423 
4424  // Clean up any messages left in this message block
4426  }
void remove_network_links()
Removes all the source and target network links from this propagator_block object.
Definition: agents.h:4212
void _Delete_stored_messages()
Deletes all messages currently stored in this message block. Should be called by the destructor to en...
Definition: agents.h:4800
template<class _Type >
Concurrency::unbounded_buffer< _Type >::unbounded_buffer ( unbounded_buffer< _Type > const &  )
private

Member Function Documentation

template<class _Type >
void Concurrency::unbounded_buffer< _Type >::_Delete_stored_messages ( )
inlineprivate

Deletes all messages currently stored in this message block. Should be called by the destructor to ensure any messages propagated in are cleaned up.

4801  {
4802  // Input messages for this message block are in the base-class input buffer
4803  // All messages in that buffer are guaranteed to have moved to the output
4804  // buffer because the destructor first waits for all async sends to finish
4805  // before reaching this point
4806 
4807  // Delete any messages remaining in the output queue
4808  for (;;)
4809  {
4810  message<_Type> * _Msg = _M_messageBuffer._Dequeue();
4811  if (_Msg == NULL)
4812  {
4813  break;
4814  }
4815  delete _Msg;
4816  }
4817  }
#define NULL
Definition: crtdbg.h:30
::Concurrency::details::_Queue< message< _Type > > _M_messageBuffer
Message queue used to store messages
Definition: agents.h:4829
template<class _Type >
void Concurrency::unbounded_buffer< _Type >::_Propagate_priority_order ( ::Concurrency::details::_Queue< message< _Target_type >> &  _MessageBuffer)
inlineprivate

Propagates messages in priority order.

Parameters
_MessageBufferReference to a message queue with messages to be propagated
4748  {
4749  message<_Target_type> * _Msg = _MessageBuffer._Peek();
4750 
4751  // If someone has reserved the _Head message, don't propagate anymore
4752  if (_M_pReservedFor != NULL)
4753  {
4754  return;
4755  }
4756 
4757  while (_Msg != NULL)
4758  {
4759  message_status _Status = declined;
4760 
4761  // Always start from the first target that linked
4762  for (target_iterator _Iter = _M_connectedTargets.begin(); *_Iter != NULL; ++_Iter)
4763  {
4764  ITarget<_Target_type> * _PTarget = *_Iter;
4765  _Status = _PTarget->propagate(_Msg, this);
4766 
4767  // Ownership of message changed. Do not propagate this
4768  // message to any other target.
4769  if (_Status == accepted)
4770  {
4771  break;
4772  }
4773 
4774  // If the target just propagated to reserved this message, stop
4775  // propagating it to others
4776  if (_M_pReservedFor != NULL)
4777  {
4778  break;
4779  }
4780  }
4781 
4782  // If status is anything other than accepted, then the head message
4783  // was not propagated out. Thus, nothing after it in the queue can
4784  // be propagated out. Cease propagation.
4785  if (_Status != accepted)
4786  {
4787  break;
4788  }
4789 
4790  // Get the next message
4791  _Msg = _MessageBuffer._Peek();
4792  }
4793  }
_Message * _Peek()
Definition: agents.h:226
multi_link_registry< ITarget< _Type > >::iterator target_iterator
The iterator to walk the connected targets.
Definition: agents.h:3242
multi_link_registry< ITarget< _Type > > _M_connectedTargets
Connected targets
Definition: agents.h:3833
The target did not accept the message.
Definition: agents.h:1750
#define NULL
Definition: crtdbg.h:30
message_status
The valid responses for an offer of a message object to a block.
Definition: agents.h:1739
The target accepted the message.
Definition: agents.h:1745
ITarget< _Target_type > * _M_pReservedFor
Connected target that is holding a reservation
Definition: agents.h:3821
template<class _Type >
virtual message<_Type>* Concurrency::unbounded_buffer< _Type >::accept_message ( runtime_object_identity  _MsgId)
inlineprotectedvirtual

Accepts a message that was offered by this unbounded_buffer messaging block, transferring ownership to the caller.

Parameters
_MsgIdThe runtime_object_identity of the offered message object.
Returns
A pointer to the message object that the caller now has ownership of.

Implements Concurrency::source_block< multi_link_registry< ITarget< _Type > >, _MessageProcessorType >.

4556  {
4557  //
4558  // Peek at the head message in the message buffer. If the IDs match
4559  // dequeue and transfer ownership
4560  //
4561  message<_Type> * _Msg = NULL;
4562 
4563  if (_M_messageBuffer._Is_head(_MsgId))
4564  {
4565  _Msg = _M_messageBuffer._Dequeue();
4566  }
4567 
4568  return _Msg;
4569  }
#define NULL
Definition: crtdbg.h:30
::Concurrency::details::_Queue< message< _Type > > _M_messageBuffer
Message queue used to store messages
Definition: agents.h:4829
template<class _Type >
virtual message<_Type>* Concurrency::unbounded_buffer< _Type >::consume_message ( runtime_object_identity  _MsgId)
inlineprotectedvirtual

Consumes a message previously offered by the unbounded_buffer messaging block and reserved by the target, transferring ownership to the caller.

Parameters
_MsgIdThe runtime_object_identity of the message object being consumed.
Returns
A pointer to the message object that the caller now has ownership of.

Similar to accept, but is always preceded by a call to reserve.

Implements Concurrency::source_block< multi_link_registry< ITarget< _Type > >, _MessageProcessorType >.

4606  {
4607  // By default, accept the message
4608  return accept_message(_MsgId);
4609  }
virtual message< _Type > * accept_message(runtime_object_identity _MsgId)
Accepts a message that was offered by this unbounded_buffer messaging block, transferring ownership t...
Definition: agents.h:4555
template<class _Type >
_Type Concurrency::unbounded_buffer< _Type >::dequeue ( )
inline

Removes an item from the unbounded_buffer messaging block.

Returns
The payload of the message removed from the unbounded_buffer.
4451  {
4452  return receive<_Type>(this);
4453  }
template<class _Type >
bool Concurrency::unbounded_buffer< _Type >::enqueue ( _Type const &  _Item)
inline

Adds an item to the unbounded_buffer messaging block.

Parameters
_ItemThe item to add.
Returns
true if the item was accepted, false otherwise.
4439  {
4440  return Concurrency::send<_Type>(this, _Item);
4441  }
template<class _Type >
virtual void Concurrency::unbounded_buffer< _Type >::link_target_notification ( _Inout_ ITarget< _Type > *  _PTarget)
inlineprotectedvirtual

A callback that notifies that a new target has been linked to this unbounded_buffer messaging block.

Parameters
_PTargetA pointer to the newly linked target.
4655  {
4656  // If the message queue is blocked due to reservation
4657  // there is no need to do any message propagation
4658  if (_M_pReservedFor != NULL)
4659  {
4660  return;
4661  }
4662 
4663  message<_Type> * _Msg = _M_messageBuffer._Peek();
4664 
4665  if (_Msg != NULL)
4666  {
4667  // Propagate the head message to the new target
4668  message_status _Status = _PTarget->propagate(_Msg, this);
4669 
4670  if (_Status == accepted)
4671  {
4672  // The target accepted the message, restart propagation.
4674  }
4675 
4676  // If the status is anything other than accepted, then leave
4677  // the message queue blocked.
4678  }
4679  }
#define NULL
Definition: crtdbg.h:30
message_status
The valid responses for an offer of a message object to a block.
Definition: agents.h:1739
The target accepted the message.
Definition: agents.h:1745
void _Propagate_priority_order(::Concurrency::details::_Queue< message< _Target_type >> &_MessageBuffer)
Propagates messages in priority order.
Definition: agents.h:4747
ITarget< _Target_type > * _M_pReservedFor
Connected target that is holding a reservation
Definition: agents.h:3821
::Concurrency::details::_Queue< message< _Type > > _M_messageBuffer
Message queue used to store messages
Definition: agents.h:4829
template<class _Type >
unbounded_buffer const& Concurrency::unbounded_buffer< _Type >::operator= ( unbounded_buffer< _Type > const &  )
private
template<class _Type >
virtual void Concurrency::unbounded_buffer< _Type >::process_input_messages ( _Inout_ message< _Type > *  _PMessage)
inlineprotectedvirtual

Places the message _PMessage in this unbounded_buffer messaging block and tries to offer it to all of the linked targets.

4686  {
4687  if (_PMessage != NULL)
4688  {
4689  _M_processedMessages._Enqueue(_PMessage);
4690  }
4691  }
#define NULL
Definition: crtdbg.h:30
::Concurrency::details::_Queue< message< _Type > > _M_processedMessages
Message queue used to store processed messages
Definition: agents.h:4823
template<class _Type >
virtual message_status Concurrency::unbounded_buffer< _Type >::propagate_message ( _Inout_ message< _Type > *  _PMessage,
_Inout_ ISource< _Type > *  _PSource 
)
inlineprotectedvirtual

Asynchronously passes a message from an ISource block to this unbounded_buffer messaging block. It is invoked by the propagate method, when called by a source block.

Parameters
_PMessageA pointer to the message object.
_PSourceA pointer to the source block offering the message.
Returns
A message_status indication of what the target decided to do with the message.
4478  {
4479  // It is important that calls to propagate do *not* take the same lock on the
4480  // internal structure that is used by <c>consume</c> and the LWT. Doing so could
4481  // result in a deadlock.
4482 
4483  message_status _Result = accepted;
4484 
4485  // Accept the message being propagated
4486  _PMessage = _PSource->accept(_PMessage->msg_id(), this);
4487 
4488  if (_PMessage != NULL)
4489  {
4490  async_send(_PMessage);
4491  }
4492  else
4493  {
4494  _Result = missed;
4495  }
4496 
4497  return _Result;
4498  }
#define NULL
Definition: crtdbg.h:30
message_status
The valid responses for an offer of a message object to a block.
Definition: agents.h:1739
The target tried to accept the message, but it was no longer available.
Definition: agents.h:1760
The target accepted the message.
Definition: agents.h:1745
virtual void async_send(_Inout_opt_ message< _Target_type > *_Msg)
Asynchronously queues up messages and starts a propagation task, if this has not been done already ...
Definition: agents.h:3783
template<class _Type >
virtual void Concurrency::unbounded_buffer< _Type >::propagate_output_messages ( )
inlineprotectedvirtual

Places the message _PMessage in this unbounded_buffer messaging block and tries to offer it to all of the linked targets.

Parameters
_PMessageA pointer to a message object that this unbounded_buffer has taken ownership of.

If another message is already ahead of this one in the unbounded_buffer, propagation to linked targets will not occur until any earlier messages have been accepted or consumed. The first linked target to successfully accept or consume the message takes ownership, and no other target can then get the message.

Reimplemented from Concurrency::source_block< multi_link_registry< ITarget< _Type > >, _MessageProcessorType >.

4708  {
4709  // Move the messages from the processedMessages queue to the internal storage
4710  // to make them ready for propagating out
4711 
4712  // If there are messages in the message queue, the queue is blocked and a
4713  // propagation should not happen unless it has been forced using resume_propagation
4714  bool _FIsBlocked = (_M_messageBuffer._Count() > 0);
4715 
4716  for(;;)
4717  {
4718  message<_Type> * _PInputMessage = _M_processedMessages._Dequeue();
4719  if(_PInputMessage == NULL)
4720  {
4721  break;
4722  }
4723  _M_messageBuffer._Enqueue(_PInputMessage);
4724  }
4725 
4726  if (_M_fForceRepropagation == false && _FIsBlocked == true)
4727  {
4728  return;
4729  }
4730 
4731  // Reset the repropagation flag because a propagation has started.
4732  _M_fForceRepropagation = false;
4733 
4734  // Attempt to propagate messages to all the targets
4736  }
bool _M_fForceRepropagation
A bool to signal to the processor to force a repropagation to occur
Definition: agents.h:4835
#define NULL
Definition: crtdbg.h:30
void _Propagate_priority_order(::Concurrency::details::_Queue< message< _Target_type >> &_MessageBuffer)
Propagates messages in priority order.
Definition: agents.h:4747
::Concurrency::details::_Queue< message< _Type > > _M_messageBuffer
Message queue used to store messages
Definition: agents.h:4829
::Concurrency::details::_Queue< message< _Type > > _M_processedMessages
Message queue used to store processed messages
Definition: agents.h:4823
template<class _Type >
virtual void Concurrency::unbounded_buffer< _Type >::release_message ( runtime_object_identity  _MsgId)
inlineprotectedvirtual

Releases a previous message reservation.

Parameters
_MsgIdThe runtime_object_identity of the message object being released.

Implements Concurrency::source_block< multi_link_registry< ITarget< _Type > >, _MessageProcessorType >.

4619  {
4620  // The head message is the one reserved.
4621  if (!_M_messageBuffer._Is_head(_MsgId))
4622  {
4623  throw message_not_found();
4624  }
4625  }
::Concurrency::details::_Queue< message< _Type > > _M_messageBuffer
Message queue used to store messages
Definition: agents.h:4829
template<class _Type >
virtual bool Concurrency::unbounded_buffer< _Type >::reserve_message ( runtime_object_identity  _MsgId)
inlineprotectedvirtual

Reserves a message previously offered by this unbounded_buffer messaging block.

Parameters
_MsgIdThe runtime_object_identity of the message object being reserved.
Returns
true if the message was successfully reserved, false otherwise.

After reserve is called, if it returns true, either consume or release must be called to either take or release ownership of the message.

Implements Concurrency::source_block< multi_link_registry< ITarget< _Type > >, _MessageProcessorType >.

4586  {
4587  // Allow reservation if this is the head message
4588  return _M_messageBuffer._Is_head(_MsgId);
4589  }
::Concurrency::details::_Queue< message< _Type > > _M_messageBuffer
Message queue used to store messages
Definition: agents.h:4829
template<class _Type >
virtual void Concurrency::unbounded_buffer< _Type >::resume_propagation ( )
inlineprotectedvirtual

Resumes propagation after a reservation has been released.

Implements Concurrency::source_block< multi_link_registry< ITarget< _Type > >, _MessageProcessorType >.

4632  {
4633  // If there are any messages in the buffer, propagate them out
4634  if (_M_messageBuffer._Count() > 0)
4635  {
4636  // Set the flag to force a repropagation. This flag is cleared when a propagation happens
4637  // The only functions that call this are release, consume, and link_target, all of which
4638  // hold the internal lock, so the flag is guaranteed to be read by propagation, which also
4639  // holds the same lock.
4640  _M_fForceRepropagation = true;
4641 
4642  // async send a NULL value to initiate the repropagation
4643  async_send(NULL);
4644  }
4645  }
bool _M_fForceRepropagation
A bool to signal to the processor to force a repropagation to occur
Definition: agents.h:4835
#define NULL
Definition: crtdbg.h:30
::Concurrency::details::_Queue< message< _Type > > _M_messageBuffer
Message queue used to store messages
Definition: agents.h:4829
virtual void async_send(_Inout_opt_ message< _Target_type > *_Msg)
Asynchronously queues up messages and starts a propagation task, if this has not been done already ...
Definition: agents.h:3783
template<class _Type >
virtual message_status Concurrency::unbounded_buffer< _Type >::send_message ( _Inout_ message< _Type > *  _PMessage,
_Inout_ ISource< _Type > *  _PSource 
)
inlineprotectedvirtual

Synchronously passes a message from an ISource block to this unbounded_buffer messaging block. It is invoked by the send method, when called by a source block.

Parameters
_PMessageA pointer to the message object.
_PSourceA pointer to the source block offering the message.
Returns
A message_status indication of what the target decided to do with the message.
4516  {
4517  _PMessage = _PSource->accept(_PMessage->msg_id(), this);
4518 
4519  if (_PMessage != NULL)
4520  {
4521  sync_send(_PMessage);
4522  }
4523  else
4524  {
4525  return missed;
4526  }
4527 
4528  return accepted;
4529  }
virtual void sync_send(_Inout_opt_ message< _Target_type > *_Msg)
Synchronously queues up messages and starts a propagation task, if this has not been done already...
Definition: agents.h:3769
#define NULL
Definition: crtdbg.h:30
The target tried to accept the message, but it was no longer available.
Definition: agents.h:1760
The target accepted the message.
Definition: agents.h:1745
template<class _Type >
virtual bool Concurrency::unbounded_buffer< _Type >::supports_anonymous_source ( )
inlineprotectedvirtual

Overrides the supports_anonymous_source method to indicate that this block can accept messages offered to it by a source that is not linked.

Returns
true because the block does not postpone offered messages.

Reimplemented from Concurrency::ITarget< multi_link_registry< ISource< _Type > >::type::source_type >.

4540  {
4541  return true;
4542  }

Member Data Documentation

template<class _Type >
bool Concurrency::unbounded_buffer< _Type >::_M_fForceRepropagation
private

A bool to signal to the processor to force a repropagation to occur

template<class _Type >
::Concurrency::details::_Queue<message<_Type> > Concurrency::unbounded_buffer< _Type >::_M_messageBuffer
private

Message queue used to store messages

template<class _Type >
::Concurrency::details::_Queue<message<_Type> > Concurrency::unbounded_buffer< _Type >::_M_processedMessages
private

Message queue used to store processed messages


The documentation for this class was generated from the following file: