STLdoc
STLdocumentation
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
Public Member Functions | Protected Member Functions | Private Types | Private Member Functions | Private Attributes | List of all members
Concurrency::transformer< _Input, _Output > Class Template Reference

A transformer messaging block is a single-target, multi-source, ordered propagator_block which can accept messages of one type and is capable of storing an unbounded number of messages of a different type. More...

#include <agents.h>

Inheritance diagram for Concurrency::transformer< _Input, _Output >:
Concurrency::propagator_block< single_link_registry< ITarget< _Output > >, multi_link_registry< ISource< _Input > > > Concurrency::source_block< single_link_registry< ITarget< _Output > >, _MessageProcessorType > Concurrency::ITarget< multi_link_registry< ISource< _Input > >::type::source_type > Concurrency::ISource< single_link_registry< ITarget< _Output > >::type::type >

Public Member Functions

 transformer (_Transform_method const &_Func, _Inout_opt_ ITarget< _Output > *_PTarget=NULL)
 Constructs a transformer messaging block. More...
 
 transformer (_Transform_method const &_Func, _Inout_opt_ ITarget< _Output > *_PTarget, filter_method const &_Filter)
 Constructs a transformer messaging block. More...
 
 ~transformer ()
 Destroys the transformer messaging block. More...
 
- Public Member Functions inherited from Concurrency::propagator_block< single_link_registry< ITarget< _Output > >, multi_link_registry< ISource< _Input > > >
 propagator_block ()
 Constructs a propagator_block object. More...
 
virtual ~propagator_block ()
 Destroys a propagator_block object. More...
 
virtual message_status propagate (_Inout_opt_ message< _Source_type > *_PMessage, _Inout_opt_ ISource< _Source_type > *_PSource)
 Asynchronously passes a message from a source block to this target block. More...
 
virtual message_status send (_Inout_ message< _Source_type > *_PMessage, _Inout_ ISource< _Source_type > *_PSource)
 Synchronously initiates a message to this block. Called by an ISource block. When this function completes, the message will already have propagated into the block. More...
 
- Public Member Functions inherited from Concurrency::source_block< single_link_registry< ITarget< _Output > >, _MessageProcessorType >
 source_block ()
 Constructs a source_block object. More...
 
virtual ~source_block ()
 Destroys the source_block object. More...
 
virtual void link_target (_Inout_ ITarget< _Target_type > *_PTarget)
 Links a target block to this source_block object. More...
 
virtual void unlink_target (_Inout_ ITarget< _Target_type > *_PTarget)
 Unlinks a target block from this source_block object. More...
 
virtual void unlink_targets ()
 Unlinks all target blocks from this source_block object. More...
 
virtual message< _Target_type > * accept (runtime_object_identity _MsgId, _Inout_ ITarget< _Target_type > *_PTarget)
 Accepts a message that was offered by this source_block object, transferring ownership to the caller. More...
 
virtual bool reserve (runtime_object_identity _MsgId, _Inout_ ITarget< _Target_type > *_PTarget)
 Reserves a message previously offered by this source_block object. More...
 
virtual message< _Target_type > * consume (runtime_object_identity _MsgId, _Inout_ ITarget< _Target_type > *_PTarget)
 Consumes a message previously offered by this source_block object and successfully reserved by the target, transferring ownership to the caller. More...
 
virtual void release (runtime_object_identity _MsgId, _Inout_ ITarget< _Target_type > *_PTarget)
 Releases a previous successful message reservation. More...
 
virtual void acquire_ref (_Inout_ ITarget< _Target_type > *)
 Acquires a reference count on this source_block object, to prevent deletion. More...
 
virtual void release_ref (_Inout_ ITarget< _Target_type > *_PTarget)
 Releases a reference count on this source_block object. More...
 
- Public Member Functions inherited from Concurrency::ISource< single_link_registry< ITarget< _Output > >::type::type >
virtual ~ISource ()
 Destroys the ISource object. More...
 
virtual void link_target (_Inout_ ITarget< single_link_registry< ITarget< _Output > >::type::type > *_PTarget)=0
 When overridden in a derived class, links a target block to this ISource block. More...
 
virtual void unlink_target (_Inout_ ITarget< single_link_registry< ITarget< _Output > >::type::type > *_PTarget)=0
 When overridden in a derived class, unlinks a target block from this ISource block, if found to be previously linked. More...
 
virtual message< single_link_registry< ITarget< _Output > >::type::type > * accept (runtime_object_identity _MsgId, _Inout_ ITarget< single_link_registry< ITarget< _Output > >::type::type > *_PTarget)=0
 When overridden in a derived class, accepts a message that was offered by this ISource block, transferring ownership to the caller. More...
 
virtual bool reserve (runtime_object_identity _MsgId, _Inout_ ITarget< single_link_registry< ITarget< _Output > >::type::type > *_PTarget)=0
 When overridden in a derived class, reserves a message previously offered by this ISource block. More...
 
virtual message< single_link_registry< ITarget< _Output > >::type::type > * consume (runtime_object_identity _MsgId, _Inout_ ITarget< single_link_registry< ITarget< _Output > >::type::type > *_PTarget)=0
 When overridden in a derived class, consumes a message previously offered by this ISource block and successfully reserved by the target, transferring ownership to the caller. More...
 
virtual void release (runtime_object_identity _MsgId, _Inout_ ITarget< single_link_registry< ITarget< _Output > >::type::type > *_PTarget)=0
 When overridden in a derived class, releases a previous successful message reservation. More...
 
virtual void acquire_ref (_Inout_ ITarget< single_link_registry< ITarget< _Output > >::type::type > *_PTarget)=0
 When overridden in a derived class, acquires a reference count on this ISource block, to prevent deletion. More...
 
virtual void release_ref (_Inout_ ITarget< single_link_registry< ITarget< _Output > >::type::type > *_PTarget)=0
 When overridden in a derived class, releases a reference count on this ISource block. More...
 
- Public Member Functions inherited from Concurrency::ITarget< multi_link_registry< ISource< _Input > >::type::source_type >
virtual ~ITarget ()
 Destroys the ITarget object. More...
 
virtual message_status propagate (_Inout_opt_ message< multi_link_registry< ISource< _Input > >::type::source_type > *_PMessage, _Inout_opt_ ISource< multi_link_registry< ISource< _Input > >::type::source_type > *_PSource)=0
 When overridden in a derived class, asynchronously passes a message from a source block to this target block. More...
 
virtual message_status send (_Inout_ message< multi_link_registry< ISource< _Input > >::type::source_type > *_PMessage, _Inout_ ISource< multi_link_registry< ISource< _Input > >::type::source_type > *_PSource)=0
 When overridden in a derived class, synchronously passes a message to the target block. More...
 

Protected Member Functions

virtual message_status propagate_message (_Inout_ message< _Input > *_PMessage, _Inout_ ISource< _Input > *_PSource)
 Asynchronously passes a message from an ISource block to this transformer messaging block. It is invoked by the propagate method, when called by a source block. More...
 
virtual message_status send_message (_Inout_ message< _Input > *_PMessage, _Inout_ ISource< _Input > *_PSource)
 Synchronously passes a message from an ISource block to this transformer messaging block. It is invoked by the send method, when called by a source block. More...
 
virtual bool supports_anonymous_source ()
 Overrides the supports_anonymous_source method to indicate that this block can accept messages offered to it by a source that is not linked. More...
 
virtual message< _Output > * accept_message (runtime_object_identity _MsgId)
 Accepts a message that was offered by this transformer messaging block, transferring ownership to the caller. More...
 
virtual bool reserve_message (runtime_object_identity _MsgId)
 Reserves a message previously offered by this transformer messaging block. More...
 
virtual message< _Output > * consume_message (runtime_object_identity _MsgId)
 Consumes a message previously offered by the transformer and reserved by the target, transferring ownership to the caller. More...
 
virtual void release_message (runtime_object_identity _MsgId)
 Releases a previous message reservation. More...
 
virtual void resume_propagation ()
 Resumes propagation after a reservation has been released. More...
 
virtual void link_target_notification (_Inout_ ITarget< _Output > *)
 A callback that notifies that a new target has been linked to this transformer messaging block. More...
 
virtual void propagate_to_any_targets (_Inout_opt_ message< _Output > *)
 Executes the transformer function on the input messages. More...
 
- Protected Member Functions inherited from Concurrency::propagator_block< single_link_registry< ITarget< _Output > >, multi_link_registry< ISource< _Input > > >
virtual message_status propagate_message (_Inout_ message< _Source_type > *_PMessage, _Inout_ ISource< _Source_type > *_PSource)=0
 When overridden in a derived class, this method asynchronously passes a message from an ISource block to this propagator_block object. It is invoked by the propagate method, when called by a source block. More...
 
virtual message_status send_message (_Inout_ message< _Source_type > *, _Inout_ ISource< _Source_type > *)
 When overridden in a derived class, this method synchronously passes a message from an ISource block to this propagator_block object. It is invoked by the send method, when called by a source block. More...
 
virtual void link_source (_Inout_ ISource< _Source_type > *_PSource)
 Links a specified source block to this propagator_block object. More...
 
virtual void unlink_source (_Inout_ ISource< _Source_type > *_PSource)
 Unlinks a specified source block from this propagator_block object. More...
 
virtual void unlink_sources ()
 Unlinks all source blocks from this propagator_block object. More...
 
virtual void process_input_messages (_Inout_ message< _Target_type > *)
 Process input messages. This is only useful for propagator blocks, which derive from source_block More...
 
void initialize_source_and_target (_Inout_opt_ Scheduler *_PScheduler=NULL, _Inout_opt_ ScheduleGroup *_PScheduleGroup=NULL)
 Initializes the base object. Specifically, the message_processor object needs to be initialized. More...
 
void register_filter (filter_method const &_Filter)
 Registers a filter method that will be invoked on every received message. More...
 
void decline_incoming_messages ()
 Indicates to the block that new messages should be declined. More...
 
void remove_network_links ()
 Removes all the source and target network links from this propagator_block object. More...
 
- Protected Member Functions inherited from Concurrency::source_block< single_link_registry< ITarget< _Output > >, _MessageProcessorType >
virtual void link_target_notification (_Inout_ ITarget< _Target_type > *)
 A callback that notifies that a new target has been linked to this source_block object. More...
 
virtual void unlink_target_notification (_Inout_ ITarget< _Target_type > *_PTarget)
 A callback that notifies that a target has been unlinked from this source_block object. More...
 
virtual void propagate_output_messages ()
 Propagate messages to targets. More...
 
virtual void propagate_to_any_targets (_Inout_opt_ message< _Target_type > *_PMessage)
 When overridden in a derived class, propagates the given message to any or all of the linked targets. This is the main propagation routine for message blocks. More...
 
void initialize_source (_Inout_opt_ Scheduler *_PScheduler=NULL, _Inout_opt_ ScheduleGroup *_PScheduleGroup=NULL)
 Initializes the message_propagator within this source_block. More...
 
void enable_batched_processing ()
 Enables batched processing for this block. More...
 
virtual void sync_send (_Inout_opt_ message< _Target_type > *_Msg)
 Synchronously queues up messages and starts a propagation task, if this has not been done already. More...
 
virtual void async_send (_Inout_opt_ message< _Target_type > *_Msg)
 Asynchronously queues up messages and starts a propagation task, if this has not been done already More...
 
void wait_for_outstanding_async_sends ()
 Waits for all asynchronous propagations to complete. This propagator-specific spin wait is used in destructors of message blocks to make sure that all asynchronous propagations have time to finish before destroying the block. More...
 
void remove_targets ()
 Removes all target links for this source block. This should be called from the destructor. More...
 
- Protected Member Functions inherited from Concurrency::ISource< single_link_registry< ITarget< _Output > >::type::type >
void _Invoke_link_source (ITarget< single_link_registry< ITarget< _Output > >::type::type > *_PLinkFrom)
 Links this source to a target. More...
 
void _Invoke_unlink_source (ITarget< single_link_registry< ITarget< _Output > >::type::type > *_PUnlinkFrom)
 Unlinks this source from a target. More...
 
- Protected Member Functions inherited from Concurrency::ITarget< multi_link_registry< ISource< _Input > >::type::source_type >
virtual void link_source (_Inout_ ISource< multi_link_registry< ISource< _Input > >::type::source_type > *_PSource)=0
 When overridden in a derived class, links a specified source block to this ITarget block. More...
 
virtual void unlink_source (_Inout_ ISource< multi_link_registry< ISource< _Input > >::type::source_type > *_PSource)=0
 When overridden in a derived class, unlinks a specified source block from this ITarget block. More...
 

Private Types

typedef std::function< _Output(_Input const &)> _Transform_method
 
typedef single_link_registry< ITarget< _Output > > _TargetLinkRegistry
 
typedef multi_link_registry< ISource< _Input > > _SourceLinkRegistry
 

Private Member Functions

void _Propagate_priority_order (::Concurrency::details::_Queue< message< _Target_type >> &_MessageBuffer)
 Propagates messages in priority order. More...
 
void _Delete_stored_messages ()
 Deletes all messages currently stored in this message block. Should be called by the destructor to ensure any messages propagated in are cleaned up. More...
 
transformer const & operator= (transformer const &)
 
 transformer (transformer const &)
 

Private Attributes

_Transform_method _M_pFunc
 
concurrent_queue< message< _Input > * > _M_inputMessages
 
::Concurrency::details::_Queue< message< _Output > > _M_messageBuffer
 Message queue used to store outbound messages More...
 

Additional Inherited Members

- Public Types inherited from Concurrency::propagator_block< single_link_registry< ITarget< _Output > >, multi_link_registry< ISource< _Input > > >
typedef multi_link_registry< ISource< _Input > >::type::source_type _Source_type
 The type of the payload for the incoming message to this propagator_block. More...
 
typedef source_link_manager< multi_link_registry< ISource< _Input > > > _SourceLinkManager
 The type of the source_link_manager this propagator_block. More...
 
typedef _SourceLinkManager::iterator source_iterator
 The type of the iterator for the source_link_manager for this propagator_block. More...
 
- Public Types inherited from Concurrency::source_block< single_link_registry< ITarget< _Output > >, _MessageProcessorType >
typedef single_link_registry< ITarget< _Output > >::type::type _Target_type
 The payload type of messages handled by this source_block. More...
 
typedef single_link_registry< ITarget< _Output > >::iterator target_iterator
 The iterator to walk the connected targets. More...
 
- Public Types inherited from Concurrency::ISource< single_link_registry< ITarget< _Output > >::type::type >
typedef single_link_registry< ITarget< _Output > >::type::type source_type
 A type alias for _Type . More...
 
- Public Types inherited from Concurrency::ITarget< multi_link_registry< ISource< _Input > >::type::source_type >
typedef multi_link_registry< ISource< _Input > >::type::source_type type
 A type alias for _Type . More...
 
typedef std::function< bool(multi_link_registry< ISource< _Input > >::type::source_typeconst &)> filter_method
 The signature of any method used by the block that returns a bool value to determine whether an offered message should be accepted. More...
 
- Protected Attributes inherited from Concurrency::propagator_block< single_link_registry< ITarget< _Output > >, multi_link_registry< ISource< _Input > > >
_SourceLinkManager _M_connectedSources
 The container for all the sources connected to this block. More...
 
filter_method_M_pFilter
 The filter function which determines whether offered messages should be accepted. More...
 
volatile bool _M_fDeclineMessages
 A bool that is set to indicate that all messages should be declined in preparation for deleting the block More...
 
- Protected Attributes inherited from Concurrency::source_block< single_link_registry< ITarget< _Output > >, _MessageProcessorType >
ITarget< _Target_type > * _M_pReservedFor
 Connected target that is holding a reservation More...
 
runtime_object_identity _M_reservedId
 Reserved message ID More...
 
single_link_registry< ITarget< _Output > > _M_connectedTargets
 Connected targets More...
 
_MessageProcessorType _M_messageProcessor
 Processor used for asynchronous message handling More...
 

Detailed Description

template<class _Input, class _Output>
class Concurrency::transformer< _Input, _Output >

A transformer messaging block is a single-target, multi-source, ordered propagator_block which can accept messages of one type and is capable of storing an unbounded number of messages of a different type.

Template Parameters
_InputThe payload type of the messages accepted by the buffer.
_OutputThe payload type of the messages stored and propagated out by the buffer.

For more information, see Asynchronous Message Blocks.

See also
call Class

Member Typedef Documentation

template<class _Input , class _Output >
typedef multi_link_registry<ISource<_Input> > Concurrency::transformer< _Input, _Output >::_SourceLinkRegistry
private
template<class _Input , class _Output >
typedef single_link_registry<ITarget<_Output> > Concurrency::transformer< _Input, _Output >::_TargetLinkRegistry
private
template<class _Input , class _Output >
typedef std::function<_Output(_Input const&)> Concurrency::transformer< _Input, _Output >::_Transform_method
private

Constructor & Destructor Documentation

template<class _Input , class _Output >
Concurrency::transformer< _Input, _Output >::transformer ( _Transform_method const &  _Func,
_Inout_opt_ ITarget< _Output > *  _PTarget = NULL 
)
inline

Constructs a transformer messaging block.

Parameters
_FuncA function that will be invoked for each accepted message.
_PTargetA pointer to a target block to link with the transformer.

The runtime uses the default scheduler if you do not specify the _PScheduler or _PScheduleGroup parameters.

The type _Transform_method is a functor with signature _Output (_Input const &) which is invoked by this transformer messaging block to process a message.

The type filter_method is a functor with signature bool (_Input const &) which is invoked by this transformer messaging block to determine whether or not it should accept an offered message.

See also
Scheduler Class, ScheduleGroup Class
7515  :
7516  _M_pFunc(_Func)
7517  {
7519 
7520  if (_PTarget != NULL)
7521  {
7522  this->link_target(_PTarget);
7523  }
7524  }
virtual void link_target(_Inout_ ITarget< _Target_type > *_PTarget)
Links a target block to this source_block object.
Definition: agents.h:4932
void initialize_source_and_target(_Inout_opt_ Scheduler *_PScheduler=NULL, _Inout_opt_ ScheduleGroup *_PScheduleGroup=NULL)
Initializes the base object. Specifically, the message_processor object needs to be initialized...
Definition: agents.h:5827
_Transform_method _M_pFunc
Definition: agents.h:8076
#define NULL
Definition: corecrt.h:158
template<class _Input , class _Output >
Concurrency::transformer< _Input, _Output >::transformer ( _Transform_method const &  _Func,
_Inout_opt_ ITarget< _Output > *  _PTarget,
filter_method const &  _Filter 
)
inline

Constructs a transformer messaging block.

Parameters
_FuncA function that will be invoked for each accepted message.
_PTargetA pointer to a target block to link with the transform.
_FilterA filter function which determines whether offered messages should be accepted.

The runtime uses the default scheduler if you do not specify the _PScheduler or _PScheduleGroup parameters.

The type _Transform_method is a functor with signature _Output (_Input const &) which is invoked by this transformer messaging block to process a message.

The type filter_method is a functor with signature bool (_Input const &) which is invoked by this transformer messaging block to determine whether or not it should accept an offered message.

See also
Scheduler Class, ScheduleGroup Class
7552  :
7553  _M_pFunc(_Func)
7554  {
7556  this->register_filter(_Filter);
7557 
7558  if (_PTarget != NULL)
7559  {
7560  this->link_target(_PTarget);
7561  }
7562  }
virtual void link_target(_Inout_ ITarget< _Target_type > *_PTarget)
Links a target block to this source_block object.
Definition: agents.h:4932
void initialize_source_and_target(_Inout_opt_ Scheduler *_PScheduler=NULL, _Inout_opt_ ScheduleGroup *_PScheduleGroup=NULL)
Initializes the base object. Specifically, the message_processor object needs to be initialized...
Definition: agents.h:5827
void register_filter(filter_method const &_Filter)
Registers a filter method that will be invoked on every received message.
Definition: agents.h:5842
_Transform_method _M_pFunc
Definition: agents.h:8076
#define NULL
Definition: corecrt.h:158
template<class _Input , class _Output >
Concurrency::transformer< _Input, _Output >::~transformer ( )
inline

Destroys the transformer messaging block.

7731  {
7732  // Remove all links
7733  this->remove_network_links();
7734 
7735  // Clean up any messages left in this message block
7737  }
void _Delete_stored_messages()
Deletes all messages currently stored in this message block. Should be called by the destructor to en...
Definition: agents.h:8046
void remove_network_links()
Removes all the source and target network links from this propagator_block object.
Definition: agents.h:5866
template<class _Input , class _Output >
Concurrency::transformer< _Input, _Output >::transformer ( transformer< _Input, _Output > const &  )
private

Member Function Documentation

template<class _Input , class _Output >
void Concurrency::transformer< _Input, _Output >::_Delete_stored_messages ( )
inlineprivate

Deletes all messages currently stored in this message block. Should be called by the destructor to ensure any messages propagated in are cleaned up.

8047  {
8048  // Delete input messages
8049  // Because the transformer uses its own input queue, it's possible there are messages
8050  // in this queue and no LWT will be executed to handle them.
8051  message<_Input> * _PInputQueueMessage = NULL;
8052 
8053  while (_M_inputMessages.try_pop(_PInputQueueMessage))
8054  {
8055  // Message cleanup
8056  delete _PInputQueueMessage;
8057  }
8058 
8059  // Delete any messages remaining in the output queue
8060  for (;;)
8061  {
8062  message<_Output> * _Msg = _M_messageBuffer._Dequeue();
8063  if (_Msg == NULL)
8064  {
8065  break;
8066  }
8067  delete _Msg;
8068  }
8069  }
::Concurrency::details::_Queue< message< _Output > > _M_messageBuffer
Message queue used to store outbound messages
Definition: agents.h:8085
concurrent_queue< message< _Input > * > _M_inputMessages
Definition: agents.h:8079
#define NULL
Definition: corecrt.h:158
template<class _Input , class _Output >
void Concurrency::transformer< _Input, _Output >::_Propagate_priority_order ( ::Concurrency::details::_Queue< message< _Target_type >> &  _MessageBuffer)
inlineprivate

Propagates messages in priority order.

Parameters
_MessageBufferReference to a message queue with messages to be propagated
7994  {
7995  message<_Target_type> * _Msg = _MessageBuffer._Peek();
7996 
7997  // If someone has reserved the _Head message, don't propagate anymore
7998  if (this->_M_pReservedFor != NULL)
7999  {
8000  return;
8001  }
8002 
8003  while (_Msg != NULL)
8004  {
8005  message_status _Status = declined;
8006 
8007  // Always start from the first target that linked
8008  for (target_iterator _Iter = this->_M_connectedTargets.begin(); *_Iter != NULL; ++_Iter)
8009  {
8010  ITarget<_Target_type> * _PTarget = *_Iter;
8011  _Status = _PTarget->propagate(_Msg, this);
8012 
8013  // Ownership of message changed. Do not propagate this
8014  // message to any other target.
8015  if (_Status == accepted)
8016  {
8017  break;
8018  }
8019 
8020  // If the target just propagated to reserved this message, stop
8021  // propagating it to others
8022  if (this->_M_pReservedFor != NULL)
8023  {
8024  break;
8025  }
8026  }
8027 
8028  // If status is anything other than accepted, then the head message
8029  // was not propagated out. Thus, nothing after it in the queue can
8030  // be propagated out. Cease propagation.
8031  if (_Status != accepted)
8032  {
8033  break;
8034  }
8035 
8036  // Get the next message
8037  _Msg = _MessageBuffer._Peek();
8038  }
8039  }
_Message * _Peek()
Definition: agents.h:227
single_link_registry< ITarget< _Output > >::iterator target_iterator
The iterator to walk the connected targets.
Definition: agents.h:4893
single_link_registry< ITarget< _Output > > _M_connectedTargets
Connected targets
Definition: agents.h:5485
The target did not accept the message.
Definition: agents.h:1751
message_status
The valid responses for an offer of a message object to a block.
Definition: agents.h:1740
The target accepted the message.
Definition: agents.h:1746
ITarget< _Target_type > * _M_pReservedFor
Connected target that is holding a reservation
Definition: agents.h:5473
#define NULL
Definition: corecrt.h:158
template<class _Input , class _Output >
virtual message<_Output>* Concurrency::transformer< _Input, _Output >::accept_message ( runtime_object_identity  _MsgId)
inlineprotectedvirtual

Accepts a message that was offered by this transformer messaging block, transferring ownership to the caller.

Parameters
_MsgIdThe runtime_object_identity of the offered message object.
Returns
A pointer to the message object that the caller now has ownership of.

Implements Concurrency::source_block< single_link_registry< ITarget< _Output > >, _MessageProcessorType >.

7846  {
7847  //
7848  // Peek at the head message in the message buffer. If the IDs match
7849  // dequeue and transfer ownership
7850  //
7851  message<_Output> * _Msg = NULL;
7852 
7853  if (_M_messageBuffer._Is_head(_MsgId))
7854  {
7855  _Msg = _M_messageBuffer._Dequeue();
7856  }
7857 
7858  return _Msg;
7859  }
::Concurrency::details::_Queue< message< _Output > > _M_messageBuffer
Message queue used to store outbound messages
Definition: agents.h:8085
#define NULL
Definition: corecrt.h:158
template<class _Input , class _Output >
virtual message<_Output>* Concurrency::transformer< _Input, _Output >::consume_message ( runtime_object_identity  _MsgId)
inlineprotectedvirtual

Consumes a message previously offered by the transformer and reserved by the target, transferring ownership to the caller.

Parameters
_MsgIdThe runtime_object_identity of the message object being consumed.
Returns
A pointer to the message object that the caller now has ownership of.

Similar to accept, but is always preceded by a call to reserve.

Implements Concurrency::source_block< single_link_registry< ITarget< _Output > >, _MessageProcessorType >.

7896  {
7897  // By default, accept the message
7898  return accept_message(_MsgId);
7899  }
virtual message< _Output > * accept_message(runtime_object_identity _MsgId)
Accepts a message that was offered by this transformer messaging block, transferring ownership to the...
Definition: agents.h:7845
template<class _Input , class _Output >
virtual void Concurrency::transformer< _Input, _Output >::link_target_notification ( _Inout_ ITarget< _Output > *  )
inlineprotectedvirtual

A callback that notifies that a new target has been linked to this transformer messaging block.

Parameters
_PTargetA pointer to the newly linked target.
7939  {
7940  // If the message queue is blocked due to reservation
7941  // there is no need to do any message propagation
7942  if (this->_M_pReservedFor != NULL)
7943  {
7944  return;
7945  }
7946 
7948  }
::Concurrency::details::_Queue< message< _Output > > _M_messageBuffer
Message queue used to store outbound messages
Definition: agents.h:8085
ITarget< _Target_type > * _M_pReservedFor
Connected target that is holding a reservation
Definition: agents.h:5473
void _Propagate_priority_order(::Concurrency::details::_Queue< message< _Target_type >> &_MessageBuffer)
Propagates messages in priority order.
Definition: agents.h:7993
#define NULL
Definition: corecrt.h:158
template<class _Input , class _Output >
transformer const& Concurrency::transformer< _Input, _Output >::operator= ( transformer< _Input, _Output > const &  )
private
template<class _Input , class _Output >
virtual message_status Concurrency::transformer< _Input, _Output >::propagate_message ( _Inout_ message< _Input > *  _PMessage,
_Inout_ ISource< _Input > *  _PSource 
)
inlineprotectedvirtual

Asynchronously passes a message from an ISource block to this transformer messaging block. It is invoked by the propagate method, when called by a source block.

Parameters
_PMessageA pointer to the message object.
_PSourceA pointer to the source block offering the message.
Returns
A message_status indication of what the target decided to do with the message.
7759  {
7760  // It is important that calls to propagate do *not* take the same lock on the
7761  // internal structure that is used by Consume and the LWT. Doing so could
7762  // result in a deadlock with the Consume call.
7763 
7765 
7766  //
7767  // Accept the message being propagated
7768  // Note: depending on the source block propagating the message
7769  // this may not necessarily be the same message (pMessage) first
7770  // passed into the function.
7771  //
7772  _PMessage = _PSource->accept(_PMessage->msg_id(), this);
7773 
7774  if (_PMessage != NULL)
7775  {
7776  // Enqueue the input message
7777  _M_inputMessages.push(_PMessage);
7778  this->async_send(NULL);
7779  }
7780  else
7781  {
7782  _Result = missed;
7783  }
7784 
7785  return _Result;
7786  }
message_status
The valid responses for an offer of a message object to a block.
Definition: agents.h:1740
The target tried to accept the message, but it was no longer available.
Definition: agents.h:1761
The target accepted the message.
Definition: agents.h:1746
_Pre_maybenull_ _Inout_ _Deref_prepost_z_ wchar_t const _PSource
Definition: wchar.h:148
concurrent_queue< message< _Input > * > _M_inputMessages
Definition: agents.h:8079
_Result
Definition: corecrt_wconio.h:362
virtual void async_send(_Inout_opt_ message< _Target_type > *_Msg)
Asynchronously queues up messages and starts a propagation task, if this has not been done already ...
Definition: agents.h:5435
#define NULL
Definition: corecrt.h:158
template<class _Input , class _Output >
virtual void Concurrency::transformer< _Input, _Output >::propagate_to_any_targets ( _Inout_opt_ message< _Output > *  )
inlineprotectedvirtual

Executes the transformer function on the input messages.

7955  {
7956  message<_Output> * _Msg = NULL;
7957 
7958  // Process input message.
7959  message<_Input> * _PInputMessage = NULL;
7960  _M_inputMessages.try_pop(_PInputMessage);
7961 
7962  if (_PInputMessage != NULL)
7963  {
7964  // Invoke the TransformMethod on the data
7965  // Let exceptions flow
7966  _Output _Out = _M_pFunc(_PInputMessage->payload);
7967 
7968  // Reuse the input message ID
7969  _Msg = new message<_Output>(_Out, _PInputMessage->msg_id());
7970  _M_messageBuffer._Enqueue(_Msg);
7971 
7972  // Message cleanup
7973  delete _PInputMessage;
7974 
7975  if (!_M_messageBuffer._Is_head(_Msg->msg_id()))
7976  {
7977  return;
7978  }
7979  }
7980 
7982  }
::Concurrency::details::_Queue< message< _Output > > _M_messageBuffer
Message queue used to store outbound messages
Definition: agents.h:8085
basic_ostream< _Elem, _Traits > & _Out(basic_ostream< _Elem, _Traits > &_Os, _Ty _Dx)
Definition: random:174
concurrent_queue< message< _Input > * > _M_inputMessages
Definition: agents.h:8079
_Transform_method _M_pFunc
Definition: agents.h:8076
void _Propagate_priority_order(::Concurrency::details::_Queue< message< _Target_type >> &_MessageBuffer)
Propagates messages in priority order.
Definition: agents.h:7993
#define NULL
Definition: corecrt.h:158
template<class _Input , class _Output >
virtual void Concurrency::transformer< _Input, _Output >::release_message ( runtime_object_identity  _MsgId)
inlineprotectedvirtual

Releases a previous message reservation.

Parameters
_MsgIdThe runtime_object_identity of the message object being released.

Implements Concurrency::source_block< single_link_registry< ITarget< _Output > >, _MessageProcessorType >.

7909  {
7910  // The head message is the one reserved.
7911  if (!_M_messageBuffer._Is_head(_MsgId))
7912  {
7913  throw message_not_found();
7914  }
7915  }
::Concurrency::details::_Queue< message< _Output > > _M_messageBuffer
Message queue used to store outbound messages
Definition: agents.h:8085
template<class _Input , class _Output >
virtual bool Concurrency::transformer< _Input, _Output >::reserve_message ( runtime_object_identity  _MsgId)
inlineprotectedvirtual

Reserves a message previously offered by this transformer messaging block.

Parameters
_MsgIdThe runtime_object_identity of the message object being reserved.
Returns
true if the message was successfully reserved, false otherwise.

After reserve is called, if it returns true, either consume or release must be called to either take or release ownership of the message.

Implements Concurrency::source_block< single_link_registry< ITarget< _Output > >, _MessageProcessorType >.

7876  {
7877  // Allow reservation if this is the head message
7878  return _M_messageBuffer._Is_head(_MsgId);
7879  }
::Concurrency::details::_Queue< message< _Output > > _M_messageBuffer
Message queue used to store outbound messages
Definition: agents.h:8085
template<class _Input , class _Output >
virtual void Concurrency::transformer< _Input, _Output >::resume_propagation ( )
inlineprotectedvirtual

Resumes propagation after a reservation has been released.

Implements Concurrency::source_block< single_link_registry< ITarget< _Output > >, _MessageProcessorType >.

7922  {
7923  // If there are any messages in the buffer, propagate them out
7924  if (_M_messageBuffer._Count() > 0)
7925  {
7926  // async send a NULL value to initiate the repropagation
7927  this->async_send(NULL);
7928  }
7929  }
::Concurrency::details::_Queue< message< _Output > > _M_messageBuffer
Message queue used to store outbound messages
Definition: agents.h:8085
virtual void async_send(_Inout_opt_ message< _Target_type > *_Msg)
Asynchronously queues up messages and starts a propagation task, if this has not been done already ...
Definition: agents.h:5435
#define NULL
Definition: corecrt.h:158
template<class _Input , class _Output >
virtual message_status Concurrency::transformer< _Input, _Output >::send_message ( _Inout_ message< _Input > *  _PMessage,
_Inout_ ISource< _Input > *  _PSource 
)
inlineprotectedvirtual

Synchronously passes a message from an ISource block to this transformer messaging block. It is invoked by the send method, when called by a source block.

Parameters
_PMessageA pointer to the message object.
_PSourceA pointer to the source block offering the message.
Returns
A message_status indication of what the target decided to do with the message.
7804  {
7805  _PMessage = _PSource->accept(_PMessage->msg_id(), this);
7806 
7807  if (_PMessage != NULL)
7808  {
7809  // Enqueue the input message
7810  _M_inputMessages.push(_PMessage);
7811  this->sync_send(NULL);
7812  }
7813  else
7814  {
7815  return missed;
7816  }
7817 
7818  return accepted;
7819  }
virtual void sync_send(_Inout_opt_ message< _Target_type > *_Msg)
Synchronously queues up messages and starts a propagation task, if this has not been done already...
Definition: agents.h:5421
The target tried to accept the message, but it was no longer available.
Definition: agents.h:1761
The target accepted the message.
Definition: agents.h:1746
_Pre_maybenull_ _Inout_ _Deref_prepost_z_ wchar_t const _PSource
Definition: wchar.h:148
concurrent_queue< message< _Input > * > _M_inputMessages
Definition: agents.h:8079
#define NULL
Definition: corecrt.h:158
template<class _Input , class _Output >
virtual bool Concurrency::transformer< _Input, _Output >::supports_anonymous_source ( )
inlineprotectedvirtual

Overrides the supports_anonymous_source method to indicate that this block can accept messages offered to it by a source that is not linked.

Returns
true because the block does not postpone offered messages.

Reimplemented from Concurrency::ITarget< multi_link_registry< ISource< _Input > >::type::source_type >.

7830  {
7831  return true;
7832  }

Member Data Documentation

template<class _Input , class _Output >
concurrent_queue<message<_Input> *> Concurrency::transformer< _Input, _Output >::_M_inputMessages
private
template<class _Input , class _Output >
::Concurrency::details::_Queue<message<_Output> > Concurrency::transformer< _Input, _Output >::_M_messageBuffer
private

Message queue used to store outbound messages

template<class _Input , class _Output >
_Transform_method Concurrency::transformer< _Input, _Output >::_M_pFunc
private

The documentation for this class was generated from the following file: