STLdoc
STLdocumentation
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
agents.h
Go to the documentation of this file.
1 /***
2 * ==++==
3 *
4 * Copyright (c) Microsoft Corporation. All rights reserved.
5 *
6 * ==--==
7 * =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
8 *
9 * agents.h
10 *
11 * Main public header file for ConcRT's asynchronous agents layer. This is the only header file a
12 * C++ program must include to use asynchronous agents.
13 *
14 * The core runtime, Parallel Patterns Library (PPL), and resource manager are defined in separate header files.
15 * =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
16 ****/
17 
18 #pragma once
19 
20 #include <crtdefs.h>
21 #include <concrt.h>
22 #include <stdexcept>
23 #include <functional>
24 #include <tuple>
25 #include <type_traits>
26 #include <vector>
27 #include <concurrent_queue.h>
28 
29 #define _AGENTS_H
30 
31 #pragma pack(push,_CRT_PACKING)
32 #pragma warning(push)
33 #pragma warning(disable: 4100) // Unreferenced formal parameter - needed for document generation
34 #pragma warning(disable: 4702) // Unreachable code - needed for retail version code path
35 // Forward declarations
36 
41 
42 namespace Concurrency
43 {
49 
50 typedef __int32 runtime_object_identity;
51 
56 
57 typedef ::Concurrency::details::_NonReentrantPPLLock::_Scoped_lock _NR_lock;
58 
63 
64 typedef ::Concurrency::details::_ReentrantPPLLock::_Scoped_lock _R_lock;
65 
66 
67 //***************************************************************************
68 // Internal namespace:
69 //
70 // Concurrency::details contains definitions to support routines in the public namespaces and macros.
71 // Clients should not directly interact with this namespace.
72 //***************************************************************************
73 
74 namespace details
75 {
76  //**************************************************************************
77  // Core Messaging Support:
78  //**************************************************************************
79 
80  //
81  // A base class to derive from that keeps unique IDs on its derived classes
82  //
83  class _Runtime_object : public _AllocBase
84  {
85  public:
86  // Creates a new runtime object.
88 
89  // Creates a runtime object from an identity.
91 
92  // Gets the runtime object identity.
94  {
95  return _M_id;
96  }
97 
98  protected:
99  // The runtime object identity.
101  };
102 
103  // A queue used to hold the messages for the messaging blocks
104  template<class _Message>
105  class _Queue : public _AllocBase
106  {
107  protected:
108  // A pointer to the head of the queue.
109  _Message * _M_pHead;
110 
111  // A pointer to a pointer to the tail of the queue.
112  _Message ** _M_ppTail;
113 
114  // The number of elements presently stored in the queue.
115  size_t _M_count;
116 
117  public:
118  typedef typename _Message type;
119 
120  // Create a Queue
121  _Queue() : _M_pHead(NULL), _M_ppTail(&_M_pHead), _M_count(0)
122  {
123  }
124 
125  // Destroy the queue
127  {
128  }
129 
130  // Returns the count of items in the queue
131  size_t _Count() const
132  {
133  return _M_count;
134  }
135 
136  // Add an item to the tail of the queue
137  //
138  // Returns a Boolean indicating whether the operation succeeded.
139  bool _Enqueue(_Message *_Element)
140  {
141  _CONCRT_ASSERT(_Element->_M_pNext == NULL);
142  _CONCRT_ASSERT(*_M_ppTail == NULL);
143 
144  *_M_ppTail = _Element;
145  _Element->_M_pNext = NULL;
146  _M_ppTail = &(_Element->_M_pNext);
147  _M_count++;
148 
149  return true;
150  }
151 
152  // Remove the specified element from the queue
153  //
154  // Returns a Boolean indicating whether the operation succeeded, that is, the message was found in the queue.
155  bool _Remove(_Message * _OldElement)
156  {
157  bool _Result = false;
158 
159  _CONCRT_ASSERT(_OldElement != NULL);
160 
161  if (_M_pHead == _OldElement)
162  {
163  _M_pHead = _OldElement->_M_pNext;
164  if (_M_pHead == NULL)
165  {
166  _M_ppTail = &_M_pHead;
167  }
168 
169  _OldElement->_M_pNext = NULL;
170  _M_count--;
171  _Result = true;
172  }
173  else
174  {
175  _Message * _Next = NULL;
176  for (_Message * _Node = _M_pHead; _Node != NULL; _Node = _Next)
177  {
178  _Next = _Node->_M_pNext;
179 
180  if (_Node->_M_pNext == _OldElement)
181  {
182  _Node->_M_pNext = _OldElement->_M_pNext;
183  // if this is the last element of the _Queue
184  if (_Node->_M_pNext == NULL && _M_count == 1)
185  {
186  _M_ppTail = &_M_pHead;
187  }
188 
189  _OldElement->_M_pNext = NULL;
190  _M_count--;
191  _Result = true;
192  break;
193  }
194  }
195  }
196 
197  return _Result;
198  }
199 
200  // Dequeue an item from the head of queue
201  //
202  // Returns a pointer to the message found at the head of the queue.
203  _Message * _Dequeue()
204  {
205  if (_M_pHead == NULL)
206  {
207  return NULL;
208  }
209 
210  _Message * _Result = _M_pHead;
211 
212  _M_pHead = _Result->_M_pNext;
213  if (_M_pHead == NULL)
214  {
215  _M_ppTail = &_M_pHead;
216  }
217 
218  _Result->_M_pNext = NULL;
219  _M_count--;
220  return _Result;
221  }
222 
223  // Return the item at the head of the queue, without dequeuing
224  //
225  // Returns a pointer to the message found at the head of the queue.
226  _Message * _Peek()
227  {
228  return _M_pHead;
229  }
230 
231  // Return true if the ID matches the message at the head of the queue
232  bool _Is_head(runtime_object_identity _MsgId)
233  {
234  // Peek at the next message in the message buffer. Use it to
235  // check if the IDs match
236  _Message * _Msg = _M_pHead;
237 
238  if (_Msg == NULL || _Msg->msg_id() != _MsgId)
239  {
240  return false;
241  }
242 
243  return true;
244  }
245  };
246 
247  //
248  // _Dynamic_array implements a container very similar to std::vector.
249  // However, it exposes a reduced subset of functionality that is
250  // geared towards use in network_link_registry. The array acess is not
251  // thread-safe.
252  //
253  template<class _Type>
255  {
256  public:
257 
259 
260  typedef _Type& reference;
261  typedef _Type const& const_reference;
262 
263  //
264  // Construct a dynamic array
265  //
267  {
268  _Init();
269  }
270 
271  //
272  // Release any resources used by dynamic array
273  //
275  {
276  _Clear();
277  }
278 
279  //
280  // Assignment operator. Copy the contents of _Right
281  //
282  _Myt& operator=(const _Myt& _Right)
283  {
284  if (this != &_Right)
285  {
286  // Remove all the elements
287  _Clear();
288 
289  // Allocate space for the new elements
290  size_t _Size = _Right._Size();
291  _Grow(_Size);
292 
293  // Copy over the new elements
294  for (size_t _I=0; _I < _Size; _I++)
295  {
296  _Push_back(_Right[_I]);
297  }
298  }
299 
300  return *this;
301  }
302 
303  //
304  // Clear all the elements in the array
305  //
306  void _Clear()
307  {
308  if (_M_array != NULL)
309  {
310  delete [] _M_array;
311  _Init();
312  }
313  }
314 
315  //
316  // Add an element to the end of the array
317  //
318  void _Push_back(_Type const& _Element)
319  {
320  if (_M_index >= _M_size)
321  {
322  // Not enough space. Grow the array
323  size_t _NewSize = (_M_index + 1) * _S_growthFactor;
324  _Grow(_NewSize);
325  }
326 
328  _M_array[_M_index] = _Element;
329  _M_index++;
330  }
331 
332  //
333  // Index operation. Retrieve an element at the specified index. No bounds check is done.
334  //
335  reference operator[](size_t _Pos)
336  {
337  _CONCRT_ASSERT(_Pos < _M_size);
338  return _M_array[_Pos];
339  }
340 
341  //
342  // Index operation. Retrieve an element at the specified index. No bounds check is done.
343  //
344  const_reference operator[](size_t _Pos) const
345  {
346  _CONCRT_ASSERT(_Pos < _M_size);
347  return _M_array[_Pos];
348  }
349 
350  //
351  // Returns the count of elements in the array
352  //
353  size_t _Size() const
354  {
355  return _M_index;
356  }
357 
358  //
359  // Swap the contents of this array with _Right
360  //
361  void _Swap(_Myt& _Right)
362  {
363  if (this != &_Right)
364  {
365  // Swap the details.
366  _Type * _Array = _M_array;
367  size_t _Index = _M_index;
368  size_t _Size = _M_size;
369 
370  _M_array = _Right._M_array;
371  _M_index = _Right._M_index;
372  _M_size = _Right._M_size;
373 
374  _Right._M_array = _Array;
375  _Right._M_index = _Index;
376  _Right._M_size = _Size;
377  }
378  }
379 
380  private:
381  //
382  // Initialize the array
383  //
384  void _Init()
385  {
386  _M_array = NULL;
387  _M_index = 0;
388  _M_size = 0;
389  }
390 
391  //
392  // Grow the array to the given size. The old elements are copied over.
393  //
394  void _Grow(size_t _NewSize)
395  {
396  _CONCRT_ASSERT( _NewSize > _M_size );
397 
398  _Type * _Array = new _Type[_NewSize];
399 
400  if (_M_array != NULL)
401  {
402  // Copy over the elememts
403  for (size_t _I = 0; _I < _M_size; _I++)
404  {
405  _Array[_I] = _M_array[_I];
406  }
407 
408  delete [] _M_array;
409  }
410 
411  _M_array = _Array;
412  _M_size = _NewSize;
413  }
414 
415  // Private data members
416 
417  // Array of elements
419 
420  // Index where the next element should be inserted
421  size_t _M_index;
422 
423  // Capacity of the array.
424  size_t _M_size;
425 
426  static const int _S_growthFactor = 2;
427  };
428 
429  //
430  // Returns an identifier for the given object that could be used
431  // in an ETW trace (call to _Trace_agents)
432  //
433  template <class _Type>
434  __int64 _Trace_agents_get_id(_Type * _PObject)
435  {
436  return reinterpret_cast<__int64>(_PObject);
437  }
438 
439 } // namespace details
440 
441 //**************************************************************************
442 // Public Namespace:
443 //
444 // Anything in the Concurrency namespace is intended for direct client consumption.
445 //
446 //**************************************************************************
447 
448 //
449 // Forward declarations:
450 //
451 template<class _Type> class ISource;
452 template<class _Type> class ITarget;
453 
454 //**************************************************************************
455 // Network link registry
456 //**************************************************************************
457 
458 // Forward declaration for use in the iterator
459 template<class _Block> class network_link_registry;
460 
468 
469 template<class _Block>
471 {
472 public:
473 
476 
477  // Element type
478  typedef _Block* _EType;
479 
480  // Const iterator - iterator shall not be used to modify the links
481  typedef _EType const& const_reference;
482  typedef _EType const* const_pointer;
483 
487 
488  _Network_link_iterator(_MyContainer * _PNetwork_link, size_t _Index) : _M_pNetwork_link(_PNetwork_link), _M_index(_Index), _M_value(NULL)
489  {
491  }
492 
496 
498  {
500  _M_index = _Right._M_index;
501  }
502 
506 
507  _Myt const& operator=(_Myt const& _Right)
508  {
510  _M_index = _Right._M_index;
511  return *this;
512  }
513 
520 
521  const_reference operator*()
522  {
524  return _M_value;
525  }
526 
533 
534  const_pointer operator->() const
535  {
536  return (&**this);
537  }
538 
546 
547  _Myt& operator++()
548  {
549  ++_M_index;
551  return (*this);
552  }
553 
561 
562  _Myt operator++(int)
563  {
564  _Myt _Tmp = *this;
565  ++*this;
566  return (_Tmp);
567  }
568 
569 private:
570 
571  // Pointer to the underlying container (network link registry)
572  _MyContainer * _M_pNetwork_link;
573 
574  // Current index
575  size_t _M_index;
576 
577  // Current value
578  _EType _M_value;
579 };
580 
593 
594 template<class _Block>
596 {
597 public:
598 
602 
603  typedef typename _Block type;
604 
608 
609  typedef _Block * _EType;
610 
615 
616  typedef _EType const& const_reference;
617 
622 
623  typedef _EType const* const_pointer;
624 
625  // Make the iterators friends so that they can access some of the
626  // private routines such as _Get_element.
627 
628  friend class _Network_link_iterator<_Block>;
629 
634 
636 
644 
645  virtual void add(_EType _Link) = 0;
646 
657 
658  virtual bool remove(_EType _Link) = 0;
659 
671 
672  virtual bool contains(_EType _Link) = 0;
673 
681 
682  virtual size_t count() = 0;
683 
694 
695  virtual iterator begin() = 0;
696 
697 protected:
698 
706 
707  virtual void _Next_index(size_t& _Index) = 0;
708 
719 
720  virtual _EType _Get_element(size_t _Index) const = 0;
721 };
722 
731 
732 template<class _Block>
734 {
735 public:
736 
740 
742  {
743  }
744 
752 
754  {
755  // It is an error to delete link registry with links
756  // still present
757  if (count() != 0)
758  {
759  throw invalid_operation("Deleting link registry before removing all the links");
760  }
761  }
762 
773 
774  virtual void add(_EType _Link)
775  {
776  if (_Link == NULL)
777  {
778  return;
779  }
780 
781  // Only one link can be added.
782  if (_M_connectedLink != NULL)
783  {
784  throw invalid_link_target("_Link");
785  }
786 
788  }
789 
799 
800  virtual bool remove(_EType _Link)
801  {
802  if ((_Link != NULL) && (_M_connectedLink == _Link))
803  {
805  return true;
806  }
807 
808  return false;
809  }
810 
820 
821  virtual bool contains(_EType _Link)
822  {
823  return ((_Link != NULL) && (_M_connectedLink == _Link));
824  }
825 
832 
833  virtual size_t count()
834  {
835  return (_M_connectedLink == NULL) ? 0 : 1;
836  }
837 
847 
848  virtual iterator begin()
849  {
850  return (iterator(this, 0));
851  }
852 
853 protected:
854 
862 
863  virtual void _Next_index(size_t& _Index)
864  {
865  if (_M_connectedLink == NULL)
866  {
867  _Index++;
868  }
869  }
870 
881 
882  virtual _EType _Get_element(size_t _Index) const
883  {
884  if (_Index == 0)
885  {
886  return _M_connectedLink;
887  }
888 
889  return NULL;
890  }
891 
892 private:
893 
894  // A single pointer is used to hold the link
896 };
897 
906 
907 template<class _Block>
909 {
910 public:
911 
915 
917  {
918  }
919 
927 
929  {
930  // It is an error to delete link registry with links
931  // still present
932  if (count() != 0)
933  {
934  throw invalid_operation("Deleting link registry before removing all the links");
935  }
936  }
937 
950 
951  void set_bound(size_t _MaxLinks)
952  {
953  _CONCRT_ASSERT(count() == 0);
954  _M_maxLinks = _MaxLinks;
955  }
956 
968 
969  virtual void add(_EType _Link)
970  {
971  if (_Link == NULL)
972  {
973  return;
974  }
975 
976  _Add(_Link);
977  }
978 
988 
989  virtual bool remove(_EType _Link)
990  {
991  if (_Link == NULL)
992  {
993  return false;
994  }
995 
996  return (_Remove(_Link));
997  }
998 
1008 
1009  virtual bool contains(_EType _Link)
1010  {
1011  if (_Link == NULL)
1012  {
1013  return false;
1014  }
1015 
1016  return (_Find(_Link) < _M_vector._Size());
1017  }
1018 
1025 
1026  virtual size_t count()
1027  {
1028  return _Count();
1029  }
1030 
1040 
1041  virtual iterator begin()
1042  {
1043  return (iterator(this, 0));
1044  }
1045 
1046 protected:
1047 
1055 
1056  virtual void _Next_index(size_t& _Index)
1057  {
1058  size_t _Size = _M_vector._Size();
1059  while (_Index < _Size)
1060  {
1061  if (_M_vector[_Index] != NULL)
1062  {
1063  break;
1064  }
1065 
1066  ++_Index;
1067  }
1068  }
1069 
1080 
1081  virtual _EType _Get_element(size_t _Index) const
1082  {
1083  if (_Index < _M_vector._Size())
1084  {
1085  return _M_vector[_Index];
1086  }
1087 
1088  return NULL;
1089  }
1090 
1091 private:
1092 
1099 
1101  {
1102  size_t _Size = _M_vector._Size();
1103  size_t _Insert_pos = 0;
1104 
1105  _CONCRT_ASSERT(_Link != NULL);
1106 
1107  // If max links is set, ensure that inserting the new
1108  // link will not exceed the bound.
1109  if ((_M_maxLinks != _NOT_SET) && ((_Size+1) > (size_t) _M_maxLinks))
1110  {
1111  throw invalid_link_target("_Link");
1112  }
1113 
1114  for (size_t _Index = 0; _Index < _Size; _Index++)
1115  {
1116  if (_M_vector[_Index] != NULL)
1117  {
1118  // We want to find the first NULL entry after all the
1119  // non-NULL entries.
1120  _Insert_pos = _Index + 1;
1121 
1122  // Throw if dupiclate entry is found
1123  if (_M_vector[_Index] == _Link)
1124  {
1125  throw invalid_link_target("_Link");
1126  }
1127  }
1128  }
1129 
1130  if (_Insert_pos < _Size)
1131  {
1132  _M_vector[_Insert_pos] = _Link;
1133  }
1134  else
1135  {
1136  _M_vector._Push_back(_Link);
1137  }
1138  }
1139 
1149 
1151  {
1152  _CONCRT_ASSERT(_Link != NULL);
1153 
1154  for (size_t _Index = 0; _Index < _M_vector._Size(); _Index++)
1155  {
1156  if (_M_vector[_Index] == _Link)
1157  {
1158  _M_vector[_Index] = NULL;
1159 
1160  // If max links is set, prevent new additions to the registry
1161  if (_M_maxLinks != _NOT_SET && _M_maxLinks > 0)
1162  {
1163  // Setting the bound to 0. This causes add to always throw.
1164  _M_maxLinks = 0;
1165  }
1166 
1167  return true;
1168  }
1169  }
1170 
1171  return false;
1172  }
1173 
1174 
1184 
1185  virtual size_t _Find(_EType _Link)
1186  {
1187  size_t _Index = 0;
1188  for (_Index = 0; _Index < _M_vector._Size(); _Index++)
1189  {
1190  if (_M_vector[_Index] == _Link)
1191  {
1192  break;
1193  }
1194  }
1195 
1196  return _Index;
1197  }
1198 
1205 
1206  size_t _Count() const
1207  {
1208  size_t _Count = 0;
1209 
1210  for (size_t _Index = 0; _Index < _M_vector._Size(); _Index++)
1211  {
1212  if (_M_vector[_Index] != NULL)
1213  {
1214  _Count++;
1215  }
1216  }
1217 
1218  return _Count;
1219  }
1220 
1221  static const size_t _NOT_SET = SIZE_MAX;
1222 
1223  // Maximum number of links allowed.
1224  size_t _M_maxLinks;
1225 
1226  // ::Concurrency::details::_Dynamic_array is used to hold the links
1228 };
1229 
1230 // Forward declaration for the iterator
1231 template<class _LinkRegistry> class source_link_manager;
1232 
1239 
1240 template<class _LinkRegistry>
1242 {
1243 public:
1244 
1245  typedef typename _LinkRegistry::type _Block;
1246 
1249 
1250  // Element type
1251  typedef _Block* _EType;
1252 
1253  // Const iterator - iterator shall not be used to modify the links
1254  typedef _EType const& const_reference;
1255  typedef _EType const* const_pointer;
1256 
1260 
1261  _Source_link_iterator(_MyContainer * _PNetwork_link, size_t _Index) : _M_pNetwork_link(_PNetwork_link), _M_index(_Index), _M_sentinel(NULL)
1262  {
1263  // Take a snapshot of the link registry. This will reference the registry.
1265  }
1266 
1270 
1272  {
1273  if (_M_pNetwork_link != NULL)
1274  {
1276  }
1277  }
1281 
1283  {
1285  _M_index = _Right._M_index;
1286  _M_array = _Right._M_array;
1287 
1289  }
1290 
1294 
1295  _Myt const& operator=(_Myt const& _Right)
1296  {
1297  _MyContainer * _OldContainer = _M_pNetwork_link;
1298  _CONCRT_ASSERT(_OldContainer != NULL);
1299 
1301  _M_index = _Right._M_index;
1302  _M_array = _Right._M_array;
1303 
1304  if (_OldContainer != _M_pNetwork_link)
1305  {
1306  _OldContainer->release();
1308  }
1309 
1310  return *this;
1311  }
1312 
1319 
1320  const_reference operator*()
1321  {
1322  return _Get(0);
1323  }
1324 
1331 
1332  const_pointer operator->() const
1333  {
1334  return (&**this);
1335  }
1336 
1340 
1341  const_reference operator[](size_t _Pos) const
1342  {
1343  return _Get(_Pos);
1344  }
1345 
1352 
1353  _Myt& operator++()
1354  {
1355  ++_M_index;
1356  return (*this);
1357  }
1358 
1365 
1366  _Myt operator++(int)
1367  {
1368  _Myt _Tmp = *this;
1369  ++*this;
1370  return (_Tmp);
1371  }
1372 
1373 private:
1374 
1375  // Get the element at the given offset.
1376  const_reference _Get(size_t _Pos) const
1377  {
1378  size_t _Index = _M_index + _Pos;
1379  if (_Index >= _M_array._Size())
1380  {
1381  return _M_sentinel;
1382  }
1383 
1384  return _M_array[_Index];
1385  }
1386 
1387  // Array to hold the snapshot of the link registry
1389 
1390  // Pointer to the underlying container (network link registry)
1391  _MyContainer * _M_pNetwork_link;
1392 
1393  // Current index
1394  size_t _M_index;
1395 
1396  // Sentinel value to return on bounds overflow
1397  _EType _M_sentinel;
1398 };
1399 
1416 
1417 template<class _LinkRegistry>
1418 class source_link_manager
1419 {
1420 public:
1421 
1425 
1426  typedef _LinkRegistry type;
1427 
1431 
1432  typedef typename _LinkRegistry::type _Block;
1433 
1437 
1438  typedef std::tr1::function<void(_Block *, bool)> _Callback_method;
1439 
1443 
1444  typedef _Block * _EType;
1445 
1450 
1451  typedef _EType const& const_reference;
1452 
1456 
1457  typedef _EType const* const_pointer;
1458 
1459  // Iterator
1460  friend class _Source_link_iterator<_LinkRegistry>;
1461 
1466 
1468 
1472 
1473  typedef ::Concurrency::details::_ReentrantPPLLock _LockType;
1474 
1478 
1480 
1484 
1486  {
1487  }
1488 
1492 
1494  {
1496  }
1497 
1504 
1506  {
1507  _M_pLinkedTarget = _PTarget;
1508  }
1509 
1517 
1518  void set_bound(size_t _MaxLinks)
1519  {
1520  _M_links.set_bound(_MaxLinks);
1521  }
1522 
1529 
1530  void add(_EType _Link)
1531  {
1532  if (_Link == NULL)
1533  {
1534  return;
1535  }
1536 
1537  {
1538  _LockHolder _Lock(_M_lock);
1539  _M_links.add(_Link);
1540 
1541  // We need to add the _Link first and then invoke the
1542  // callback because _Add could throw.
1543 
1544  // As soon as the above lock is released, remove would
1545  // find the link that was added and could unlink it before
1546  // we are able to invoke the notification below. Keeping an
1547  // active iterator would prevent that from happening.
1548  _M_iteratorCount++;
1549  }
1550 
1551  // Acquire a reference on this link by the target
1552  _Link->acquire_ref(_M_pLinkedTarget);
1553 
1554  // Release the active iterator
1555  release();
1556  }
1557 
1567 
1568  bool remove(_EType _Link)
1569  {
1570  bool _Removed = false;
1571  _EType _RemovedLink = NULL;
1573 
1574  if (_Link == NULL)
1575  {
1576  return false;
1577  }
1578 
1579  {
1580  _LockHolder _Lock(_M_lock);
1581  _Removed = _M_links.remove(_Link);
1582 
1583  if (!_Removed)
1584  {
1585  // No change was made
1586  return _Removed;
1587  }
1588 
1589  if (_M_iteratorCount == 0)
1590  {
1591  // Set the removed link to indicate that
1592  // notification callback needs to be invoked.
1593  _RemovedLink = _Link;
1594  }
1595  else
1596  {
1597  // The iterator will complete the pending operation
1599  }
1600  }
1601 
1602  // NOTE: touching "this" pointer is dangerous as soon as the above lock is released
1603 
1604  // Release the reference for this link
1605  if (_RemovedLink != NULL)
1606  {
1607  _RemovedLink->release_ref(_LinkedTarget);
1608  }
1609 
1610  return _Removed;
1611  }
1612 
1616 
1617  void reference()
1618  {
1619  _LockHolder _Lock(_M_lock);
1620  _M_iteratorCount++;
1621  }
1622 
1626 
1627  void release()
1628  {
1631 
1632  {
1633  _LockHolder _Lock(_M_lock);
1635  _M_iteratorCount--;
1636 
1637  if (_M_iteratorCount == 0)
1638  {
1639  if (_M_pendingRemove._Size() > 0)
1640  {
1641  // Snap the pending remove list with the lock held
1642  _M_pendingRemove._Swap(_LinksToRemove);
1643  }
1644  }
1645  }
1646 
1647  // NOTE: touching "this" pointer is dangerous as soon as the above lock is released
1648 
1649  // Release the references
1650  size_t _Size = _LinksToRemove._Size();
1651 
1652  for (size_t _I=0; _I < _Size; _I++)
1653  {
1654  _LinksToRemove[_I]->release_ref(_LinkedTarget);
1655  }
1656  }
1657 
1668 
1669  bool contains(_EType _Link)
1670  {
1671  _LockHolder _Lock(_M_lock);
1672  return _M_links.contains(_Link);
1673  }
1674 
1681 
1682  size_t count()
1683  {
1684  _LockHolder _Lock(_M_lock);
1685  return _M_links.count();
1686  }
1687 
1688 
1698 
1699  iterator begin()
1700  {
1701  return (iterator(this, 0));
1702  }
1703 
1704 private:
1705 
1706  // Called by the iterator. This routine takes a snapshot of the links
1707  // in the registry and copies it to the array provided.
1709  {
1710  _LockHolder _Lock(_M_lock);
1711  _M_iteratorCount++;
1712 
1713  for(_LinkRegistry::iterator _Link = _M_links.begin(); *_Link != NULL; _Link++)
1714  {
1715  _Array._Push_back(*_Link);
1716  }
1717  }
1718 
1719  // Internal lock used for synchronization
1720  _LockType _M_lock;
1721 
1722  // Count to indicate that an iterator is active
1723  volatile long _M_iteratorCount;
1724 
1725  // A vector of all pending link remove operations
1727 
1728  // Underlying link registry
1729  _LinkRegistry _M_links;
1730 
1731  // Target block holding this source link manager
1733 };
1734 
1738 
1740 {
1744 
1749 
1754 
1759 
1761 };
1762 
1773 
1774 template<class _Type>
1776 {
1777  friend class ::Concurrency::details::_Queue<message<_Type>>;
1778 
1779 public:
1791 
1792  message(_Type const &_P) : payload(_P), _M_pNext(NULL), _M_refCount(0) { }
1793 
1808 
1809  message(_Type const &_P, runtime_object_identity _Id)
1810  : ::Concurrency::details::_Runtime_object(_Id), payload(_P), _M_pNext(NULL), _M_refCount(0)
1811  {
1812  }
1813 
1825 
1826  message(message const & _Msg) : payload(_Msg.payload), _M_pNext(NULL), _M_refCount(0) { }
1827 
1838 
1839  message(_In_ message const * _Msg) : payload((_Msg == NULL) ? NULL : _Msg->payload), _M_pNext(NULL), _M_refCount(0)
1840  {
1841  if (_Msg == NULL)
1842  {
1843  throw std::invalid_argument("_Msg");
1844  }
1845  }
1846 
1850 
1851  virtual ~message() { }
1852 
1859 
1860  runtime_object_identity msg_id() const
1861  {
1862  return _M_id;
1863  }
1864 
1868 
1870 
1878 
1879  long add_ref()
1880  {
1882  }
1883 
1891 
1892  long remove_ref()
1893  {
1895  }
1896 
1900 
1901  typedef typename _Type type;
1902 
1903 private:
1904  // The intrusive next pointer used by blocks that need
1905  // to chain messages it's holding together
1907 
1908  // Avoid warnings about not generating assignment operators.
1909  message<_Type> const &operator =(message<_Type> const &);
1910 
1911  // A reference count for the message
1912  volatile long _M_refCount;
1913 };
1914 
1915 //**************************************************************************
1916 // Message processor:
1917 //**************************************************************************
1918 
1927 
1928 template<class _Type>
1930 {
1931 public:
1935 
1936  typedef typename _Type type;
1937 
1947 
1948  virtual void async_send(_Inout_opt_ message<_Type> * _Msg) = 0;
1949 
1959 
1960  virtual void sync_send(_Inout_opt_ message<_Type> * _Msg) = 0;
1961 
1968 
1969  virtual void wait() = 0;
1970 
1971 protected:
1972 
1981 
1982  virtual void process_incoming_message() = 0;
1983 
1991 
1992  static void __cdecl _Process_incoming_message_wrapper(void * _Data)
1993  {
1994  message_processor<_Type> * _PMessageProcessor = (message_processor<_Type> *) _Data;
1995  _PMessageProcessor->process_incoming_message();
1996  }
1997 };
1998 
2006 
2007 template<class _Type>
2009 {
2010 public:
2014 
2015  typedef std::tr1::function<void(message<_Type> *)> _Handler_method;
2016 
2020 
2021  typedef std::tr1::function<void(void)> _Propagator_method;
2022 
2026 
2027  typedef _Type type;
2028 
2036 
2038  _M_queuedDataCount(0),
2039  _M_stopProcessing(1),
2040  _M_lwtCount(0),
2043  _M_handler(nullptr),
2044  _M_processor(nullptr),
2045  _M_propagator(nullptr)
2046  {
2047  }
2048 
2055 
2057  {
2058  wait();
2059  }
2060 
2076 
2077  void initialize(_Inout_opt_ Scheduler * _PScheduler, _Inout_opt_ ScheduleGroup * _PScheduleGroup, _Handler_method const& _Handler)
2078  {
2079  _M_pScheduler = _PScheduler;
2080  _M_pScheduleGroup = _PScheduleGroup;
2081  _M_handler = _Handler;
2082  _M_stopProcessing = 0;
2083  }
2084 
2094  virtual void initialize_batched_processing(_Handler_method const& _Processor, _Propagator_method const& _Propagator)
2095  {
2096  _M_processor = _Processor;
2097  _M_propagator = _Propagator;
2098  }
2099 
2107 
2108  virtual void sync_send(_Inout_opt_ message<_Type> * _Msg)
2109  {
2110  if (_M_handler == NULL)
2111  {
2112  throw invalid_operation("sync_send called without registering a callback");
2113  }
2114 
2115  _Sync_send_helper(_Msg);
2116  }
2117 
2125 
2127  {
2128  if (_M_handler == NULL)
2129  {
2130  throw invalid_operation("async_send called without registering a callback");
2131  }
2132 
2133  //
2134  // If there is a message to send, enqueue it in the processing queue.
2135  // async_send can be sent a NULL message if the block wishes to reprocess
2136  // the messages that are in its queue. For example, an unbounded_buffer
2137  // that has its head node released after reservation.
2138  //
2139  if (_Msg != NULL)
2140  {
2141  _M_queuedMessages.push(_Msg);
2142  }
2143 
2145  {
2146  // Indicate that an LWT is in progress. This will cause the
2147  // destructor to block.
2149 
2150  if (_M_stopProcessing == 0)
2151  {
2153 
2155 
2157 #ifdef _CRT_USE_WINAPI_FAMILY_DESKTOP_APP
2158  if (_M_pScheduleGroup != NULL)
2159  {
2160  _M_pScheduleGroup->ScheduleTask(_Proc, this);
2161  }
2162  else if (_M_pScheduler != NULL)
2163  {
2164  _M_pScheduler->ScheduleTask(_Proc, this);
2165  }
2166  else
2167  {
2168 #endif /* _CRT_USE_WINAPI_FAMILY_DESKTOP_APP */
2170 #ifdef _CRT_USE_WINAPI_FAMILY_DESKTOP_APP
2171  }
2172 #endif /* _CRT_USE_WINAPI_FAMILY_DESKTOP_APP */
2173 
2174  // The LWT will decrement _M_lwtCount.
2175  return;
2176  }
2177 
2178  // If we get here then no task was scheduled. Decrement LWT count to reflect this fact
2180  }
2181  }
2182 
2187 
2188  virtual void wait()
2189  {
2190  // Cease processing of any new messages
2192 
2193  // This spin makes sure all previously initiated message processings
2194  // will still process correctly. As soon as this count reaches zero, we can
2195  // procede with the message block destructor.
2197  while(_M_lwtCount != 0)
2198  {
2199  spinWait._SpinOnce();
2200  }
2201 
2202  // Synchronize with sync_send
2203  {
2204  _NR_lock _Lock(_M_asyncSendLock);
2206  }
2207 
2208  }
2209 
2210 protected:
2211 
2216 
2218  {
2222 
2223  // Indicate that an LWT completed
2225 
2226  // Do not access any members here. If the count goes to
2227  // 0 as a result of the above decrement, the object
2228  // could be immediately deleted.
2229  }
2230 
2231  private:
2232 
2234  {
2235  message<_Type> * _Msg = NULL;
2236  while (_M_queuedMessages.try_pop(_Msg))
2237  {
2238  delete _Msg;
2239  }
2240  }
2241 
2243  {
2244  _NR_lock _Lock(_M_asyncSendLock);
2245 
2246  // Message block destructors sets the _M_stopProcessing flag to stop
2247  // processing any more messages. This is required to guarantee
2248  // that the destructor's wait_for_async_sends will complete
2249  if (_M_stopProcessing == 0)
2250  {
2251  if (_M_queuedDataCount > 0)
2252  {
2253  long _Count = _InterlockedExchange((volatile long *) &_M_queuedDataCount, 0);
2254  _Invoke_handler(_Count);
2255  }
2256 
2257  _Invoke_handler(_Msg);
2258  }
2259  else
2260  {
2261  // Destructor is running. Do not process the message
2262  // Delete the msg, if any.
2263  if (_Msg != NULL)
2264  {
2265  delete _Msg;
2266  }
2267  }
2268 
2269  }
2270 
2271  // Helper function to dequeue and process messages to any targets
2273  {
2274  _NR_lock _Lock(_M_asyncSendLock);
2275 
2276  long _Messages_processed = 0;
2277 
2278  // Do batched processing of messages
2279  // Read off the number of messages to process in this iteration by snapping a count
2280  volatile long _Count = _M_queuedDataCount;
2281  bool _StopProcessing = false;
2282 
2283  // This count could be 0 if there was both a synchronous and asynchronous
2284  // send occuring. One of them could have sent all of the messages for the other
2285  while (_Count > 0)
2286  {
2287  // Process _Count number of messages
2288  _Invoke_handler(_Count);
2289  _Messages_processed += _Count;
2290 
2291  // Subtract the count and see if there are new things to process
2292  volatile long _Orig = _InterlockedExchangeAdd((volatile long *) &_M_queuedDataCount, -_Count);
2293  _CONCRT_ASSERT(_Orig >= _Count);
2294  if (_Orig == _Count)
2295  {
2296  // Because _Count did not change, we processed everything there is to process
2297  break;
2298  }
2299 
2300  if (_StopProcessing)
2301  {
2302  break;
2303  }
2304 
2305  // After reading the flag process the currently queued messages
2306  // Any messages received after we observe this flag (to be set) will not
2307  // be processed.
2308  _StopProcessing = (_M_stopProcessing == 0) ? false : true;
2309 
2310  // Snap the count and try to process more
2311  _Count = _M_queuedDataCount;
2312  }
2313 
2314  return _Messages_processed;
2315  }
2316 
2317  // Invoke the handler in the message block for the given
2318  // count
2320  {
2321  // Process _Count number of messages
2322  for(int _I = 0; _I < _Count; _I++)
2323  {
2324  message<_Type> * _Msg = NULL;
2325  _M_queuedMessages.try_pop(_Msg);
2326  if (_M_processor == NULL)
2327  {
2328  // If a processor function does not exist, the message processor is using single
2329  // message processing rather than batched processing. There should also be no
2330  // propagator function defined in this case.
2332  _M_handler(_Msg);
2333  }
2334  else
2335  {
2336  // Use the batched message processing function
2337  _M_processor(_Msg);
2338  }
2339  }
2340 
2341  // Call the handler which propagates the message(s)
2342  if (_M_propagator != NULL)
2343  {
2344  _M_propagator();
2345  }
2346  }
2347 
2348  // Invoke the message block handler for the given message
2350  {
2351  if (_M_processor == NULL)
2352  {
2353  // If a processor function does not exist, the message processor is using single
2354  // message processing rather than batched processing. There should also be no
2355  // propagator function defined in this case.
2357  _M_handler(_Msg);
2358  }
2359  else
2360  {
2361  // Use the batched message processing function
2362  _M_processor(_Msg);
2363 
2364  // Call the handler which propagates the message(s)
2365  if (_M_propagator != NULL)
2366  {
2367  _M_propagator();
2368  }
2369  }
2370  }
2371 
2372  private:
2376 
2378 
2382 
2384 
2389 
2390  volatile long _M_queuedDataCount;
2391 
2395 
2396  Scheduler * _M_pScheduler;
2397 
2401 
2402  ScheduleGroup * _M_pScheduleGroup;
2403 
2408 
2409  volatile long _M_stopProcessing;
2410 
2414 
2415  volatile long _M_lwtCount;
2416 
2420 
2422 
2426 
2428 
2432 
2433  _Propagator_method _M_propagator;
2434 };
2435 
2447 
2448 template<class _Type>
2449 class ITarget
2450 {
2451  //
2452  // ISource<T> is a friend class because calls to Source->link_target()
2453  // and Source->unlink_target() need to call their respective
2454  // Target->link_source() and Target->unlink_source() on the block they are
2455  // linking/unlinking. Those functions are private here because we don't
2456  // want users calling link_source() or unlink_source() directly. link_source/
2457  // unlink_source don't call respective link_target/unlink_target because an
2458  // infinite loop would occur.
2459  //
2460  friend class ISource<_Type>;
2461 
2462 public:
2466 
2467  virtual ~ITarget() {}
2468 
2469  // It is important that calls to propagate do *not* take the same lock on an
2470  // internal message structure that is used by Consume and the LWT. Doing so could
2471  // result in a deadlock with the Consume call.
2472 
2491 
2492  virtual message_status propagate(_Inout_opt_ message<_Type> * _PMessage, _Inout_opt_ ISource<_Type> * _PSource) = 0;
2493 
2515 
2516  virtual message_status send(_Inout_ message<_Type> * _PMessage, _Inout_ ISource<_Type> * _PSource) = 0;
2517 
2528 
2530  {
2531  return false;
2532  }
2533 
2537 
2538  typedef typename _Type type;
2539 
2544 
2545  typedef std::tr1::function<bool(_Type const&)> filter_method;
2546 
2547 protected:
2548 
2560 
2561  virtual void link_source(_Inout_ ISource<_Type> * _PSource) = 0;
2562 
2574 
2575  virtual void unlink_source(_Inout_ ISource<_Type> * _PSource) = 0;
2576 
2580 
2581  virtual void unlink_sources() = 0;
2582 };
2583 
2595 
2596 template<class _Type>
2597 class ISource
2598 {
2599 public:
2603 
2604  virtual ~ISource() {}
2605 
2612 
2613  virtual void link_target(_Inout_ ITarget<_Type> * _PTarget) = 0;
2614 
2622 
2623  virtual void unlink_target(_Inout_ ITarget<_Type> * _PTarget) = 0;
2624 
2629 
2630  virtual void unlink_targets() = 0;
2631 
2650 
2651  virtual message<_Type> * accept(runtime_object_identity _MsgId, _Inout_ ITarget<_Type> * _PTarget) = 0;
2652 
2671 
2672  virtual bool reserve(runtime_object_identity _MsgId, _Inout_ ITarget<_Type> * _PTarget) = 0;
2673 
2691 
2692  virtual message<_Type> * consume(runtime_object_identity _MsgId, _Inout_ ITarget<_Type> * _PTarget) = 0;
2693 
2703 
2704  virtual void release(runtime_object_identity _MsgId, _Inout_ ITarget<_Type> * _PTarget) = 0;
2705 
2716 
2717  virtual void acquire_ref(_Inout_ ITarget<_Type> * _PTarget) = 0;
2718 
2729 
2730  virtual void release_ref(_Inout_ ITarget<_Type> * _PTarget) = 0;
2731 
2735 
2736  typedef typename _Type source_type;
2737 
2738 protected:
2752 
2754  {
2755  _PLinkFrom->link_source(this);
2756  }
2757 
2771 
2773  {
2774  _PUnlinkFrom->unlink_source(this);
2775  }
2776 };
2777 
2778 //**************************************************************************
2779 // Target Block:
2780 //**************************************************************************
2781 
2793 
2794 template<class _SourceLinkRegistry,
2795  class _MessageProcessorType = ordered_message_processor<typename _SourceLinkRegistry::type::source_type>>
2796 class target_block : public ITarget<typename _SourceLinkRegistry::type::source_type>
2797 {
2798 public:
2799 
2803 
2804  typedef typename _SourceLinkRegistry::type::source_type _Source_type;
2805 
2809 
2811 
2815 
2817 
2821 
2823  {
2827  }
2828 
2832 
2833  virtual ~target_block()
2834  {
2835  // All sources should have been unlinked
2837  delete _M_pFilter;
2838 
2840  }
2841 
2859 
2861  {
2862  // It is important that calls to propagate do *not* take the same lock on the
2863  // internal structure that is used by <c>consume</c> and the LWT. Doing so could
2864  // result in a deadlock.
2865 
2866  if (_PMessage == NULL)
2867  {
2868  throw std::invalid_argument("_PMessage");
2869  }
2870 
2871  if (_PSource == NULL)
2872  {
2873  throw std::invalid_argument("_PSource");
2874  }
2875 
2876  if (_M_fDeclineMessages)
2877  {
2878  return declined;
2879  }
2880 
2881  if (_M_pFilter != NULL && !(*_M_pFilter)(_PMessage->payload))
2882  {
2883  return declined;
2884  }
2885 
2886  return propagate_message(_PMessage, _PSource);
2887  }
2888 
2910 
2912  {
2913  if (_PMessage == NULL)
2914  {
2915  throw std::invalid_argument("_PMessage");
2916  }
2917 
2918  if (_PSource == NULL)
2919  {
2920  throw std::invalid_argument("_PSource");
2921  }
2922 
2923  if (_M_fDeclineMessages)
2924  {
2925  return declined;
2926  }
2927 
2928  if (_M_pFilter != NULL && !(*_M_pFilter)(_PMessage->payload))
2929  {
2930  return declined;
2931  }
2932 
2933  return send_message(_PMessage, _PSource);
2934  }
2935 
2936 protected:
2937 
2952 
2954 
2972 
2974  {
2975  // By default we do not allow send()
2976  return declined;
2977  }
2978 
2990 
2991  virtual void link_source(_Inout_ ISource<_Source_type> * _PSource)
2992  {
2993  _M_connectedSources.add(_PSource);
2997  }
2998 
3008 
3010  {
3014 
3015  _M_connectedSources.remove(_PSource);
3016  }
3017 
3021 
3022  virtual void unlink_sources()
3023  {
3024  for (source_iterator _Iter = _M_connectedSources.begin(); *_Iter != NULL; ++_Iter)
3025  {
3026  ISource<_Source_type> * _PSource = *_Iter;
3027  _PSource->unlink_target(this);
3028  }
3029  }
3030 
3037 
3039  {
3040  }
3041 
3042  //
3043  // Utility routines
3044  //
3045 
3053 
3054  void register_filter(filter_method const& _Filter)
3055  {
3056  if (_Filter != NULL)
3057  {
3058  _M_pFilter = new filter_method(_Filter);
3059  }
3060  }
3061 
3068 
3070  {
3071  _M_fDeclineMessages = true;
3072  }
3073 
3086 
3087  void initialize_target(_Inout_opt_ Scheduler * _PScheduler = NULL, _Inout_opt_ ScheduleGroup * _PScheduleGroup = NULL)
3088  {
3089  // Register a callback with the processor
3090  _M_messageProcessor.initialize(_PScheduler, _PScheduleGroup,
3091  // Processing and Propagating function used by ordered_message_processors
3092  [this](message<_Source_type> * _PMessage)
3093  {
3094  // Handle message by calling process_message to maintain CRT100 compatibility
3095  this->process_message(_PMessage);
3096  });
3097 
3098  // Register this target block as the owner of the connected sources
3100  }
3101 
3105 
3107  {
3108  _M_messageProcessor.initialize_batched_processing(
3109  // Processing function used by CRT110
3110  [this](message<_Source_type> * _PMessage)
3111  {
3112  // Handle message through new process_input_message to use CRT110 batch processing
3113  this->process_input_messages(_PMessage);
3114  }, nullptr);
3115  }
3116 
3123 
3125  {
3126  _M_messageProcessor.async_send(_PMessage);
3127  }
3128 
3135 
3137  {
3138  _M_messageProcessor.sync_send(_PMessage);
3139  }
3140 
3148 
3150  {
3151  // Decline new messages to ensure that messages are not dropped during the wait
3153 
3154  _M_messageProcessor.wait();
3155  }
3156 
3163 
3165  {
3167 
3168  unlink_sources();
3169  }
3170 
3174 
3176  {
3177  throw invalid_operation("To use batched processing, you must override process_input_messages in the message block.");
3178  }
3179 
3183 
3184  _SourceLinkManager _M_connectedSources;
3185 
3189 
3191 
3196 
3198 
3202 
3203  _MessageProcessorType _M_messageProcessor;
3204 };
3205 
3206 //**************************************************************************
3207 // Source Block:
3208 //**************************************************************************
3209 
3225 
3226 template<class _TargetLinkRegistry,
3228 class source_block : public ISource<typename _TargetLinkRegistry::type::type>
3229 {
3230 public:
3231 
3235 
3236  typedef typename _TargetLinkRegistry::type::type _Target_type;
3237 
3241 
3242  typedef typename _TargetLinkRegistry::iterator target_iterator;
3243 
3247 
3250  _M_reservedId(-1),
3252  {
3256  }
3257 
3261 
3262  virtual ~source_block()
3263  {
3264  // All targets should have been unlinked
3265  _CONCRT_ASSERT(_M_connectedTargets.count() == 0);
3266 
3268  }
3269 
3280 
3281  virtual void link_target(_Inout_ ITarget<_Target_type> * _PTarget)
3282  {
3283  _R_lock _Lock(_M_internalLock);
3284 
3285  if (_PTarget == NULL)
3286  {
3287  throw std::invalid_argument("_PTarget");
3288  }
3289 
3290  _M_connectedTargets.add(_PTarget);
3291  _Invoke_link_source(_PTarget);
3292  link_target_notification(_PTarget);
3293  }
3294 
3305 
3307  {
3308  _R_lock _Lock(_M_internalLock);
3309 
3310  if (_PTarget == NULL)
3311  {
3312  throw std::invalid_argument("_PTarget");
3313  }
3314 
3315  if (_M_connectedTargets.remove(_PTarget))
3316  {
3317  // We were able to remove the target from our list.
3318  // Inform the target to unlink from us
3319  _Invoke_unlink_source(_PTarget);
3320  }
3321  }
3322 
3326 
3327  virtual void unlink_targets()
3328  {
3329  _R_lock _Lock(_M_internalLock);
3330 
3331  for (target_iterator _Iter = _M_connectedTargets.begin(); *_Iter != NULL; ++_Iter)
3332  {
3333  ITarget<_Target_type> * _PTarget = *_Iter;
3334  _CONCRT_ASSERT(_PTarget != NULL);
3335 
3336  unlink_target(_PTarget);
3337  }
3338 
3339  // All the targets should be unlinked.
3340  _CONCRT_ASSERT(_M_connectedTargets.count() == 0);
3341  }
3342 
3364 
3365  virtual message<_Target_type> * accept(runtime_object_identity _MsgId, _Inout_ ITarget<_Target_type> * _PTarget)
3366  {
3367  if (_PTarget == NULL)
3368  {
3369  throw std::invalid_argument("_PTarget");
3370  }
3371 
3372  // Assert if the target is not connected
3373  _CONCRT_ASSERT(_M_connectedTargets.contains(_PTarget));
3374 
3375  return accept_message(_MsgId);
3376  }
3377 
3400 
3401  virtual bool reserve(runtime_object_identity _MsgId, _Inout_ ITarget<_Target_type> * _PTarget)
3402  {
3403  _R_lock _Lock(_M_internalLock);
3404 
3405  if (_PTarget == NULL)
3406  {
3407  throw std::invalid_argument("_PTarget");
3408  }
3409 
3410  if ( _M_pReservedFor != NULL)
3411  {
3412  // Someone else is holding the reservation
3413  return false;
3414  }
3415 
3416  if (!reserve_message(_MsgId))
3417  {
3418  // Failed to reserve the msg ID
3419  return false;
3420  }
3421 
3422  // Save the reserving target and the msg ID
3423  _M_pReservedFor = _PTarget;
3424  _M_reservedId = _MsgId;
3425 
3426  return true;
3427  }
3428 
3456 
3457  virtual message<_Target_type> * consume(runtime_object_identity _MsgId, _Inout_ ITarget<_Target_type> * _PTarget)
3458  {
3459  _R_lock _Lock(_M_internalLock);
3460 
3461  if (_PTarget == NULL)
3462  {
3463  throw std::invalid_argument("_PTarget");
3464  }
3465 
3466  if (_M_pReservedFor == NULL || _PTarget != _M_pReservedFor)
3467  {
3468  throw bad_target();
3469  }
3470 
3471  message<_Target_type> * _Msg = consume_message(_MsgId);
3472 
3473  if (_Msg != NULL)
3474  {
3475  // Clear the reservation
3476  // _M_pReservedId is intentionally not reset so that it can assist in debugging
3478 
3479  // Reservation is assumed to block propagation. Notify that propagation can now be resumed
3481  }
3482 
3483  return _Msg;
3484  }
3485 
3505 
3506  virtual void release(runtime_object_identity _MsgId, _Inout_ ITarget<_Target_type> * _PTarget)
3507  {
3508  _R_lock _Lock(_M_internalLock);
3509 
3510  if (_PTarget == NULL)
3511  {
3512  throw std::invalid_argument("_PTarget");
3513  }
3514 
3515  if (_PTarget != _M_pReservedFor)
3516  {
3517  throw bad_target();
3518  }
3519 
3520  release_message(_MsgId);
3521 
3522  // Clear the reservation
3523  // _M_pReservedId is intentionally not reset so that it can assist in debugging
3525 
3526  // Reservation is assumed to block propagation. Notify that propagation can now be resumed
3528  }
3529 
3540 
3542  {
3544  }
3545 
3556 
3557  virtual void release_ref(_Inout_ ITarget<_Target_type> * _PTarget)
3558  {
3559  if (_PTarget != NULL)
3560  {
3561  _R_lock _Lock(_M_internalLock);
3562 
3563  // We assume that each target would keep a single reference on its source, so
3564  // we call unlink target notification on every release. Otherwise, we would be
3565  // required to keep a reference count per target.
3566  // Note: unlink_target_notification can check the value of this _PTarget pointer, but
3567  // must not dereference it, as it may have already been deleted.
3568  unlink_target_notification(_PTarget);
3569  }
3570 
3572 
3573  // It is *unsafe* to touch the "this" pointer after decrementing the reference count
3574  }
3575 
3576 protected:
3577 
3578  //
3579  // Protected methods that a derived class can override to customize
3580  // the functionality
3581  //
3582 
3589 
3591  {
3592  // By default, we restart propagation if there is no pending resrvation
3593  if (_M_pReservedFor == NULL)
3594  {
3596  }
3597  }
3598 
3605 
3607  {
3608  // At this point, the target has already been disconnected from the
3609  // source. It is safe to check the value of this pointer, but not
3610  // safe to dereference it, as it may have already been deleted.
3611 
3612  // If the target being unlinked is the one holding the reservation,
3613  // release the reservation
3614  if (_M_pReservedFor == _PTarget)
3615  {
3616  release(_M_reservedId, _PTarget);
3617  }
3618  }
3619 
3635 
3636  virtual message<_Target_type> * accept_message(runtime_object_identity _MsgId) = 0;
3637 
3652 
3653  virtual bool reserve_message(runtime_object_identity _MsgId) = 0;
3654 
3667 
3668  virtual message<_Target_type> * consume_message(runtime_object_identity _MsgId) = 0;
3669 
3676 
3677  virtual void release_message(runtime_object_identity _MsgId) = 0;
3678 
3682 
3683  virtual void resume_propagation() = 0;
3684 
3688 
3690  {
3691  // source_blocks do not need to process anything
3692  }
3693 
3697 
3699  {
3700  throw invalid_operation("To use batched processing, you must override propagate_output_messages in the message block.");
3701  }
3702 
3710 
3712  {
3713  throw invalid_operation("To use ordered message processing, you must override propagate_to_any_targets in the message block.");
3714  }
3715 
3716  //
3717  // Utility routines
3718  //
3730 
3731  void initialize_source(_Inout_opt_ Scheduler * _PScheduler = NULL, _Inout_opt_ ScheduleGroup * _PScheduleGroup = NULL)
3732  {
3733  // Register a callback
3734  _M_messageProcessor.initialize(_PScheduler, _PScheduleGroup,
3735  [this](message<_Target_type> * _PMessage)
3736  {
3737  this->_Handle_message(_PMessage);
3738  });
3739  }
3740 
3744 
3746  {
3747  // Register callbacks for CRT110 batched processing
3748  _M_messageProcessor.initialize_batched_processing(
3749  // Processing function used by CRT110
3750  [this](message<_Target_type> * _PMessage)
3751  {
3752  // Handle message through new process_input_message to use CRT110 batch processing
3753  this->process_input_messages(_PMessage);
3754  },
3755  [this](void)
3756  {
3757  this->_Propagate_message();
3758  });
3759  }
3760 
3768 
3770  {
3771  // Caller shall not be holding any locks when calling this routine
3772  _M_messageProcessor.sync_send(_Msg);
3773  }
3774 
3782 
3784  {
3785  _M_messageProcessor.async_send(_Msg);
3786  }
3787 
3793 
3795  {
3796  _M_messageProcessor.wait();
3797  }
3798 
3802 
3804  {
3805  // Wait for outstanding propagation to complete.
3807 
3808  unlink_targets();
3809 
3810  _Wait_on_ref();
3811  }
3812 
3813  //
3814  // Protected members
3815  //
3816 
3820 
3822 
3826 
3827  runtime_object_identity _M_reservedId;
3828 
3832 
3833  _TargetLinkRegistry _M_connectedTargets;
3834 
3838 
3839  _MessageProcessorType _M_messageProcessor;
3840 
3841 private:
3842 
3844 
3845 
3846  // Message handler callback for the propagator. Invokes propagate_to_any_targets
3847  // which derived classes should implement.
3848 
3850  {
3851  // Hold a lock to synchronize with unlink targets
3852  _R_lock _Lock(_M_internalLock);
3853  propagate_to_any_targets(_PMessage);
3854  }
3855 
3856  // Message handler callback for the processor. Invokes process_input_messages
3857  // which derived classes should implement.
3858 
3860  {
3861  // Don't need a lock to process the message
3862  process_input_messages(_PMessage);
3863  }
3864 
3865  // Message handler callback for the propagator. Invokes propagate_output_messages
3866  // which derived classes should implement.
3867 
3869  {
3870  // Hold a lock to synchronize with unlink targets
3871  _R_lock _Lock(_M_internalLock);
3873  }
3874 
3875  // Wait for the reference on this block to drop to zero
3876 
3877  void _Wait_on_ref(long _RefCount = 0)
3878  {
3880  while(_M_referenceCount != _RefCount)
3881  {
3882  spinWait._SpinOnce();
3883  }
3884  }
3885 
3886  // Private Data members
3887 
3894 
3896 
3897  volatile long _M_referenceCount;
3898 
3899 };
3900 
3901 //**************************************************************************
3902 // Propagator (source and target) Block:
3903 //**************************************************************************
3923 
3924 template<class _TargetLinkRegistry, class _SourceLinkRegistry,
3926 class propagator_block : public source_block<_TargetLinkRegistry, _MessageProcessorType>, public ITarget<typename _SourceLinkRegistry::type::source_type>
3927 {
3928 public:
3929 
3933 
3934  typedef typename _SourceLinkRegistry::type::source_type _Source_type;
3935 
3939 
3941 
3945 
3947 
3951 
3953  {
3954  }
3955 
3959 
3961  {
3963 
3964  delete _M_pFilter;
3965  }
3966 
3986 
3988  {
3989  // It is important that calls to propagate do *not* take the same lock on the
3990  // internal structure that is used by <c>consume</c> and the LWT. Doing so could
3991  // result in a deadlock.
3992 
3993  if (_PMessage == NULL)
3994  {
3995  throw std::invalid_argument("_PMessage");
3996  }
3997 
3998  if (_PSource == NULL)
3999  {
4000  throw std::invalid_argument("_PSource");
4001  }
4002 
4003  if (_M_fDeclineMessages)
4004  {
4005  return declined;
4006  }
4007 
4008  if (_M_pFilter != NULL && !(*_M_pFilter)(_PMessage->payload))
4009  {
4010  return declined;
4011  }
4012 
4013  return propagate_message(_PMessage, _PSource);
4014  }
4015 
4034 
4036  {
4037  if (_PMessage == NULL)
4038  {
4039  throw std::invalid_argument("_PMessage");
4040  }
4041 
4042  if (_PSource == NULL)
4043  {
4044  throw std::invalid_argument("_PSource");
4045  }
4046 
4047  if (_M_fDeclineMessages)
4048  {
4049  return declined;
4050  }
4051 
4052  if (_M_pFilter != NULL && !(*_M_pFilter)(_PMessage->payload))
4053  {
4054  return declined;
4055  }
4056 
4057  return send_message(_PMessage, _PSource);
4058  }
4059 
4060 protected:
4061 
4076 
4078 
4096 
4098  {
4099  // By default we do not allow send()
4100  return declined;
4101  }
4102 
4109 
4110  virtual void link_source(_Inout_ ISource<_Source_type> * _PSource)
4111  {
4112  _M_connectedSources.add(_PSource);
4116  }
4117 
4124 
4126  {
4130 
4131  _M_connectedSources.remove(_PSource);
4132  }
4133 
4137 
4138  virtual void unlink_sources()
4139  {
4140  for (source_iterator _Iter = _M_connectedSources.begin(); *_Iter != NULL; ++_Iter)
4141  {
4142  ISource<_Source_type> * _PSource = *_Iter;
4143  _PSource->unlink_target(this);
4144  }
4145  }
4146 
4147  //
4148  // Utility routines
4149  //
4150 
4154 
4156  {
4157  throw invalid_operation("To use batched processing, you must override process_input_messages in the message block.");
4158  }
4159 
4172 
4173  void initialize_source_and_target(_Inout_opt_ Scheduler * _PScheduler = NULL, _Inout_opt_ ScheduleGroup * _PScheduleGroup = NULL)
4174  {
4175  initialize_source(_PScheduler, _PScheduleGroup);
4176 
4177  // Register this propagator block as the owner of the connected sources
4179  }
4180 
4187 
4188  void register_filter(filter_method const& _Filter)
4189  {
4190  if (_Filter != NULL)
4191  {
4192  _M_pFilter = new filter_method(_Filter);
4193  }
4194  }
4195 
4202 
4204  {
4205  _M_fDeclineMessages = true;
4206  }
4207 
4211 
4213  {
4214  // Decline messages while the links are being removed
4216 
4217  // Remove all the target links. This waits for
4218  // all outstanding async propagation operations.
4219  remove_targets();
4220 
4221  // unlink all sources. The above steps guarantee that
4222  // they can be removed safely.
4223  unlink_sources();
4224  }
4225 
4229 
4230  _SourceLinkManager _M_connectedSources;
4231 
4235 
4237 
4242 
4243  volatile bool _M_fDeclineMessages;
4244 };
4245 
4246 //**************************************************************************
4247 // Unbounded Buffers:
4248 //**************************************************************************
4249 
4262 
4263 template<class _Type>
4264 class unbounded_buffer : public propagator_block<multi_link_registry<ITarget<_Type>>, multi_link_registry<ISource<_Type>>>
4265 {
4266 public:
4279 
4282  {
4285  }
4286 
4302 
4305  {
4308  register_filter(_Filter);
4309  }
4310 
4311 #ifdef _CRT_USE_WINAPI_FAMILY_DESKTOP_APP
4312 
4328  unbounded_buffer(Scheduler& _PScheduler) :
4330  {
4331  initialize_source_and_target(&_PScheduler);
4333  }
4334 
4353 
4354  unbounded_buffer(Scheduler& _PScheduler, filter_method const& _Filter) :
4356  {
4357  initialize_source_and_target(&_PScheduler);
4359  register_filter(_Filter);
4360  }
4361 
4378 
4379  unbounded_buffer(ScheduleGroup& _PScheduleGroup) :
4381  {
4382  initialize_source_and_target(NULL, &_PScheduleGroup);
4384  }
4385 
4405 
4406  unbounded_buffer(ScheduleGroup& _PScheduleGroup, filter_method const& _Filter) :
4408  {
4409  initialize_source_and_target(NULL, &_PScheduleGroup);
4411  register_filter(_Filter);
4412  }
4413 #endif /* _CRT_USE_WINAPI_FAMILY_DESKTOP_APP */
4414 
4418 
4420  {
4421  // Remove all links
4423 
4424  // Clean up any messages left in this message block
4426  }
4427 
4437 
4438  bool enqueue(_Type const& _Item)
4439  {
4440  return Concurrency::send<_Type>(this, _Item);
4441  }
4442 
4449 
4451  {
4452  return receive<_Type>(this);
4453  }
4454 
4455 
4456 protected:
4457 
4458  //
4459  // propagator_block protected function implementations
4460  //
4461 
4476 
4478  {
4479  // It is important that calls to propagate do *not* take the same lock on the
4480  // internal structure that is used by <c>consume</c> and the LWT. Doing so could
4481  // result in a deadlock.
4482 
4483  message_status _Result = accepted;
4484 
4485  // Accept the message being propagated
4486  _PMessage = _PSource->accept(_PMessage->msg_id(), this);
4487 
4488  if (_PMessage != NULL)
4489  {
4490  async_send(_PMessage);
4491  }
4492  else
4493  {
4494  _Result = missed;
4495  }
4496 
4497  return _Result;
4498  }
4499 
4514 
4516  {
4517  _PMessage = _PSource->accept(_PMessage->msg_id(), this);
4518 
4519  if (_PMessage != NULL)
4520  {
4521  sync_send(_PMessage);
4522  }
4523  else
4524  {
4525  return missed;
4526  }
4527 
4528  return accepted;
4529  }
4530 
4538 
4540  {
4541  return true;
4542  }
4543 
4554 
4555  virtual message<_Type> * accept_message(runtime_object_identity _MsgId)
4556  {
4557  //
4558  // Peek at the head message in the message buffer. If the IDs match
4559  // dequeue and transfer ownership
4560  //
4561  message<_Type> * _Msg = NULL;
4562 
4563  if (_M_messageBuffer._Is_head(_MsgId))
4564  {
4565  _Msg = _M_messageBuffer._Dequeue();
4566  }
4567 
4568  return _Msg;
4569  }
4570 
4584 
4585  virtual bool reserve_message(runtime_object_identity _MsgId)
4586  {
4587  // Allow reservation if this is the head message
4588  return _M_messageBuffer._Is_head(_MsgId);
4589  }
4590 
4604 
4605  virtual message<_Type> * consume_message(runtime_object_identity _MsgId)
4606  {
4607  // By default, accept the message
4608  return accept_message(_MsgId);
4609  }
4610 
4617 
4618  virtual void release_message(runtime_object_identity _MsgId)
4619  {
4620  // The head message is the one reserved.
4621  if (!_M_messageBuffer._Is_head(_MsgId))
4622  {
4623  throw message_not_found();
4624  }
4625  }
4626 
4630 
4631  virtual void resume_propagation()
4632  {
4633  // If there are any messages in the buffer, propagate them out
4634  if (_M_messageBuffer._Count() > 0)
4635  {
4636  // Set the flag to force a repropagation. This flag is cleared when a propagation happens
4637  // The only functions that call this are release, consume, and link_target, all of which
4638  // hold the internal lock, so the flag is guaranteed to be read by propagation, which also
4639  // holds the same lock.
4640  _M_fForceRepropagation = true;
4641 
4642  // async send a NULL value to initiate the repropagation
4643  async_send(NULL);
4644  }
4645  }
4646 
4653 
4655  {
4656  // If the message queue is blocked due to reservation
4657  // there is no need to do any message propagation
4658  if (_M_pReservedFor != NULL)
4659  {
4660  return;
4661  }
4662 
4663  message<_Type> * _Msg = _M_messageBuffer._Peek();
4664 
4665  if (_Msg != NULL)
4666  {
4667  // Propagate the head message to the new target
4668  message_status _Status = _PTarget->propagate(_Msg, this);
4669 
4670  if (_Status == accepted)
4671  {
4672  // The target accepted the message, restart propagation.
4674  }
4675 
4676  // If the status is anything other than accepted, then leave
4677  // the message queue blocked.
4678  }
4679  }
4680 
4686  {
4687  if (_PMessage != NULL)
4688  {
4689  _M_processedMessages._Enqueue(_PMessage);
4690  }
4691  }
4692 
4706 
4708  {
4709  // Move the messages from the processedMessages queue to the internal storage
4710  // to make them ready for propagating out
4711 
4712  // If there are messages in the message queue, the queue is blocked and a
4713  // propagation should not happen unless it has been forced using resume_propagation
4714  bool _FIsBlocked = (_M_messageBuffer._Count() > 0);
4715 
4716  for(;;)
4717  {
4718  message<_Type> * _PInputMessage = _M_processedMessages._Dequeue();
4719  if(_PInputMessage == NULL)
4720  {
4721  break;
4722  }
4723  _M_messageBuffer._Enqueue(_PInputMessage);
4724  }
4725 
4726  if (_M_fForceRepropagation == false && _FIsBlocked == true)
4727  {
4728  return;
4729  }
4730 
4731  // Reset the repropagation flag because a propagation has started.
4732  _M_fForceRepropagation = false;
4733 
4734  // Attempt to propagate messages to all the targets
4736  }
4737 
4738 private:
4739 
4746 
4748  {
4749  message<_Target_type> * _Msg = _MessageBuffer._Peek();
4750 
4751  // If someone has reserved the _Head message, don't propagate anymore
4752  if (_M_pReservedFor != NULL)
4753  {
4754  return;
4755  }
4756 
4757  while (_Msg != NULL)
4758  {
4759  message_status _Status = declined;
4760 
4761  // Always start from the first target that linked
4762  for (target_iterator _Iter = _M_connectedTargets.begin(); *_Iter != NULL; ++_Iter)
4763  {
4764  ITarget<_Target_type> * _PTarget = *_Iter;
4765  _Status = _PTarget->propagate(_Msg, this);
4766 
4767  // Ownership of message changed. Do not propagate this
4768  // message to any other target.
4769  if (_Status == accepted)
4770  {
4771  break;
4772  }
4773 
4774  // If the target just propagated to reserved this message, stop
4775  // propagating it to others
4776  if (_M_pReservedFor != NULL)
4777  {
4778  break;
4779  }
4780  }
4781 
4782  // If status is anything other than accepted, then the head message
4783  // was not propagated out. Thus, nothing after it in the queue can
4784  // be propagated out. Cease propagation.
4785  if (_Status != accepted)
4786  {
4787  break;
4788  }
4789 
4790  // Get the next message
4791  _Msg = _MessageBuffer._Peek();
4792  }
4793  }
4794 
4799 
4801  {
4802  // Input messages for this message block are in the base-class input buffer
4803  // All messages in that buffer are guaranteed to have moved to the output
4804  // buffer because the destructor first waits for all async sends to finish
4805  // before reaching this point
4806 
4807  // Delete any messages remaining in the output queue
4808  for (;;)
4809  {
4810  message<_Type> * _Msg = _M_messageBuffer._Dequeue();
4811  if (_Msg == NULL)
4812  {
4813  break;
4814  }
4815  delete _Msg;
4816  }
4817  }
4818 
4822 
4824 
4828 
4830 
4834 
4836 
4837 private:
4838  //
4839  // Hide assignment operator and copy constructor
4840  //
4841  unbounded_buffer const &operator =(unbounded_buffer const&); // no assignment operator
4842  unbounded_buffer(unbounded_buffer const &); // no copy constructor
4843 };
4844 
4845 //**************************************************************************
4846 // Overwrite Buffers:
4847 //**************************************************************************
4848 
4863 
4864 template<class _Type>
4865 class overwrite_buffer : public propagator_block<multi_link_registry<ITarget<_Type>>, multi_link_registry<ISource<_Type>>>
4866 {
4867 public:
4880 
4883  {
4885  }
4886 
4902 
4905  {
4907  register_filter(_Filter);
4908  }
4909 
4910 #ifdef _CRT_USE_WINAPI_FAMILY_DESKTOP_APP
4911 
4927  overwrite_buffer(Scheduler& _PScheduler) :
4929  {
4930  initialize_source_and_target(&_PScheduler);
4931  }
4932 
4951 
4952  overwrite_buffer(Scheduler& _PScheduler,
4953  filter_method const& _Filter) :
4955  {
4956  initialize_source_and_target(&_PScheduler);
4957  register_filter(_Filter);
4958  }
4959 
4976 
4977  overwrite_buffer(ScheduleGroup& _PScheduleGroup) :
4979  {
4980  initialize_source_and_target(NULL, &_PScheduleGroup);
4981  }
4982 
5002 
5003  overwrite_buffer(ScheduleGroup& _PScheduleGroup,
5004  filter_method const& _Filter) :
5006  {
5007  initialize_source_and_target(NULL, &_PScheduleGroup);
5008  register_filter(_Filter);
5009  }
5010 #endif /* _CRT_USE_WINAPI_FAMILY_DESKTOP_APP */
5011 
5015 
5017  {
5018  // Remove all links that are targets of this overwrite_buffer
5020 
5021  // Clean up any messages left in this message block
5023  }
5024 
5031 
5032  bool has_value() const
5033  {
5034  return _M_fIsInitialized != 0;
5035  }
5036 
5047 
5049  {
5050  return receive<_Type>(this);
5051  }
5052 
5053 protected:
5054 
5055  //
5056  // propagator_block protected function implementation
5057  //
5058 
5073 
5075  {
5076  // It is important that calls to propagate do *not* take the same lock on the
5077  // internal structure that is used by Consume and the LWT. Doing so could
5078  // result in a deadlock with the Consume call.
5079 
5080  message_status _Result = accepted;
5081 
5082  _PMessage = _PSource->accept(_PMessage->msg_id(), this);
5083 
5084  //
5085  // If message was accepted, set the member variables for
5086  // this block and start the asynchronous propagation task
5087  //
5088  if (_PMessage != NULL)
5089  {
5090  // Add a reference for the async_send holding the message
5091  _PMessage->add_ref();
5092 
5093  async_send(_PMessage);
5094  }
5095  else
5096  {
5097  _Result = missed;
5098  }
5099 
5100  return _Result;
5101  }
5102 
5117 
5119  {
5120  _PMessage = _PSource->accept(_PMessage->msg_id(), this);
5121 
5122  //
5123  // If message was accepted, set the member variables for
5124  // this block and start the asynchronous propagation task
5125  //
5126  if (_PMessage != NULL)
5127  {
5128  // Add a reference for the sync_send holding the message
5129  _PMessage->add_ref();
5130 
5131  sync_send(_PMessage);
5132  }
5133  else
5134  {
5135  return missed;
5136  }
5137 
5138  return accepted;
5139  }
5140 
5148 
5150  {
5151  return true;
5152  }
5153 
5169 
5170  virtual message<_Type> * accept_message(runtime_object_identity _MsgId)
5171  {
5172  //
5173  // If the internal message has not yet been initialized yet, return NULL
5174  //
5175  if (_M_pMessage == NULL)
5176  {
5177  return NULL;
5178  }
5179 
5180  //
5181  // Instead of returning the internal message, we return a copy of the
5182  // message stored.
5183  //
5184  // Because we are returning a copy, the accept routine for an overwritebuffer
5185  // does not need to grab the internalLock
5186  //
5187  message<_Type> * _Msg = NULL;
5188  if (_M_pMessage->msg_id() == _MsgId)
5189  {
5190  _Msg = new message<_Type>(_M_pMessage->payload);
5191  }
5192 
5193  return _Msg;
5194  }
5195 
5209 
5210  virtual bool reserve_message(runtime_object_identity _MsgId)
5211  {
5212  // Ensure that this message currently exists in the overwrite buffer
5213  if (_M_pMessage == NULL || _M_pMessage->msg_id() != _MsgId)
5214  {
5215  return false;
5216  }
5217 
5218  // Can only reserve one message, any other blocks trying to reserve
5219  // will return false
5221 
5222  // Save this message away
5224 
5225  // Add a reference for this message to prevent deletion
5226  _M_pReservedMessage->add_ref();
5227 
5228  return true;
5229  }
5230 
5244 
5245  virtual message<_Type> * consume_message(runtime_object_identity _MsgId)
5246  {
5247  // Leave and return NULL if this msgId doesn't match the reserved message
5248  // Otherwise this is a pull of a later overwritten message, and messages
5249  // could them appear out of order.
5250  if (_M_pReservedMessage != NULL && _M_pReservedMessage->msg_id() != _MsgId)
5251  {
5252  return NULL;
5253  }
5254  // This redundant assert is specifically to make the /analyze switch happy, which cannot recognize the same assertion above in if stmnt.
5256 
5257  _Type _Payload = _M_pReservedMessage->payload;
5258 
5259  // Take the reserved message
5260  message<_Type> * _Result = new message<_Type>(_Payload);
5261 
5262  if (_M_pReservedMessage->remove_ref() == 0)
5263  {
5264  delete _M_pReservedMessage;
5265  }
5267 
5268  return _Result;
5269  }
5270 
5277 
5278  virtual void release_message(runtime_object_identity _MsgId)
5279  {
5282 
5283  if (_MsgId != _M_pReservedMessage->msg_id())
5284  {
5285  throw message_not_found();
5286  }
5287 
5288  if (_M_pReservedMessage->remove_ref() == 0)
5289  {
5290  delete _M_pReservedMessage;
5291  }
5293  }
5294 
5298 
5299  virtual void resume_propagation()
5300  {
5301  // On reservation we do not stop propagation. So there
5302  // is nothing to be done to resume propagation.
5303  }
5304 
5311 
5313  {
5314  // If there is a message available already, propagate it
5315  if (_M_pMessage != NULL)
5316  {
5317  _PTarget->propagate(_M_pMessage, this);
5318  }
5319  }
5320 
5332 
5334  {
5335  // Move the message from the queuedMessages Buffer to the internal storage
5336 
5337  // Add a reference for the overwrite_buffer holding the message
5338  _PMessage->add_ref();
5339 
5340  if (_M_pMessage != NULL)
5341  {
5342  if (_M_pMessage->remove_ref() == 0)
5343  {
5344  delete _M_pMessage;
5345  }
5346  }
5347 
5348  _M_pMessage = _PMessage;
5349 
5350  // Now that message has been received, set this block as initialized
5351  _M_fIsInitialized = true;
5352 
5353  for (target_iterator _Iter = _M_connectedTargets.begin(); *_Iter != NULL; ++_Iter)
5354  {
5355  // Overwrite buffers can propagate its message out
5356  // to any number of Targets
5357 
5358  ITarget<_Type> * _PTarget = *_Iter;
5359  _PTarget->propagate(_PMessage, this);
5360  }
5361 
5362  if (_PMessage->remove_ref() == 0)
5363  {
5364  delete _PMessage;
5365  }
5366  }
5367 
5368 private:
5369 
5374 
5376  {
5377  // Input messages for this message block are in the base-class input buffer
5378  // All messages in that buffer are guaranteed to have moved to the output
5379  // buffer because the destructor first waits for all async sends to finish
5380  // before reaching this point
5381 
5382  // The messages for an overwrite buffer are deleted when overwritten
5383  // through reference counting. This final check is put in place in
5384  // case any message still exists in the buffer when the overwrite_buffer
5385  // is deleted. The reference count of this message has not yet reached
5386  // zero because it hasn't been overwritten yet. It is safe because of
5387  // we have finished all propagation.
5388  if (_M_pMessage != NULL)
5389  {
5390  // A block can only have a reserved message after receiving a message
5391  // at some point, so it must be within the above if-clause.
5392  // Now delete the reserved message if it is non-NULL and different from
5393  // the saved internal message
5395  {
5396  delete _M_pReservedMessage;
5397  }
5398  delete _M_pMessage;
5399  }
5400  }
5401 
5402  //
5403  // Private Data Members
5404  //
5405 
5406  // The message being stored
5408 
5409  // The message being reserved
5411 
5412  // The marker for whether the overwrite buffer has already been initialized
5413  volatile bool _M_fIsInitialized;
5414 
5415 private:
5416  //
5417  // Hide assignment operator and copy constructor
5418  //
5419  overwrite_buffer const &operator =(overwrite_buffer const&); // no assignment operator
5420  overwrite_buffer(overwrite_buffer const &); // no copy constructor
5421 };
5422 
5423 //**************************************************************************
5424 // Call:
5425 //**************************************************************************
5426 
5441 
5442 template<class _Type, class _FunctorType = std::tr1::function<void(_Type const&)>>
5443 class call : public target_block<multi_link_registry<ISource<_Type>>>
5444 {
5448 
5449  typedef _FunctorType _Call_method;
5450 
5451 public:
5469 
5470  call(_Call_method const& _Func) :
5471  _M_pFunc(_Func)
5472  {
5475  }
5476 
5494 
5495  call(_Call_method const& _Func,
5496  filter_method const& _Filter) :
5497  _M_pFunc(_Func)
5498  {
5501  register_filter(_Filter);
5502  }
5503 
5504 #ifdef _CRT_USE_WINAPI_FAMILY_DESKTOP_APP
5505 
5526  call(Scheduler& _PScheduler,
5527  _Call_method const& _Func) :
5528  _M_pFunc(_Func)
5529  {
5530  initialize_target(&_PScheduler);
5532  }
5533 
5557 
5558  call(Scheduler& _PScheduler,
5559  _Call_method const& _Func,
5560  filter_method const& _Filter) :
5561  _M_pFunc(_Func)
5562  {
5563  initialize_target(&_PScheduler);
5565  register_filter(_Filter);
5566  }
5567 
5589 
5590  call(ScheduleGroup& _PScheduleGroup,
5591  _Call_method const& _Func) :
5592  _M_pFunc(_Func)
5593  {
5594  initialize_target(NULL, &_PScheduleGroup);
5596  }
5597 
5622 
5623  call(ScheduleGroup& _PScheduleGroup,
5624  _Call_method const& _Func,
5625  filter_method const& _Filter) :
5626  _M_pFunc(_Func)
5627  {
5628  initialize_target(NULL, &_PScheduleGroup);
5630  register_filter(_Filter);
5631  }
5632 #endif /* _CRT_USE_WINAPI_FAMILY_DESKTOP_APP */
5633 
5637 
5639  {
5640  remove_sources();
5641  }
5642 
5643 protected:
5644 
5645  //
5646  // target_block protected function implementations
5647  //
5648 
5663 
5665  {
5666  // It is important that calls to propagate do *not* take the same lock on the
5667  // internal structure that is used by Consume and the LWT. Doing so could
5668  // result in a deadlock with the Consume call.
5669 
5670  message_status _Result = accepted;
5671 
5672  //
5673  // Accept the message being propagated
5674  // Note: depending on the source block propagating the message
5675  // this may not necessarily be the same message (pMessage) first
5676  // passed into the function.
5677  //
5678  _PMessage = _PSource->accept(_PMessage->msg_id(), this);
5679 
5680  if (_PMessage != NULL)
5681  {
5682  async_send(_PMessage);
5683  }
5684  else
5685  {
5686  _Result = missed;
5687  }
5688 
5689  return _Result;
5690  }
5691 
5706 
5708  {
5709  message_status _Result = accepted;
5710 
5711  //
5712  // Accept the message being propagated
5713  // Note: depending on the source block propagating the message
5714  // this may not necessarily be the same message (pMessage) first
5715  // passed into the function.
5716  //
5717  _PMessage = _PSource->accept(_PMessage->msg_id(), this);
5718 
5719  if (_PMessage != NULL)
5720  {
5721  sync_send(_PMessage);
5722  }
5723  else
5724  {
5725  _Result = missed;
5726  }
5727 
5728  return _Result;
5729  }
5730 
5738 
5740  {
5741  return true;
5742  }
5743 
5750 
5751  virtual void process_message(_Inout_ message<_Type> * _PMessage)
5752  {
5753  // No longer necessary with CRT110 change
5754  }
5755 
5759 
5761  {
5762  // Invoke the function provided by the user
5763  _CONCRT_ASSERT(_PMessage != NULL);
5764  _M_pFunc(_PMessage->payload);
5765  delete _PMessage;
5766  }
5767 
5768 private:
5769 
5770  //
5771  // Private Data Members
5772  //
5773 
5774  // The call method called by this block
5775  _Call_method _M_pFunc;
5776 
5777 private:
5778  //
5779  // Hide assignment operator and copy constructor
5780  //
5781  call const &operator =(call const&); // no assignment operator
5782  call(call const &); // no copy constructor
5783 };
5784 
5785 //**************************************************************************
5786 // Transformer:
5787 //**************************************************************************
5788 
5804 
5805 template<class _Input, class _Output>
5806 class transformer : public propagator_block<single_link_registry<ITarget<_Output>>, multi_link_registry<ISource<_Input>>>
5807 {
5808  typedef std::tr1::function<_Output(_Input const&)> _Transform_method;
5809 
5810 public:
5831 
5832  transformer(_Transform_method const& _Func,
5833  _Inout_opt_ ITarget<_Output> * _PTarget = NULL) :
5834  _M_pFunc(_Func)
5835  {
5837 
5838  if (_PTarget != NULL)
5839  {
5840  link_target(_PTarget);
5841  }
5842  }
5843 
5867 
5868  transformer(_Transform_method const& _Func,
5869  _Inout_opt_ ITarget<_Output> * _PTarget,
5870  filter_method const& _Filter) :
5871  _M_pFunc(_Func)
5872  {
5874  register_filter(_Filter);
5875 
5876  if (_PTarget != NULL)
5877  {
5878  link_target(_PTarget);
5879  }
5880  }
5881 
5882 #ifdef _CRT_USE_WINAPI_FAMILY_DESKTOP_APP
5883 
5907  transformer(Scheduler& _PScheduler,
5908  _Transform_method const& _Func,
5909  _Inout_opt_ ITarget<_Output> * _PTarget = NULL) :
5910  _M_pFunc(_Func)
5911  {
5912  initialize_source_and_target(&_PScheduler);
5913 
5914  if (_PTarget != NULL)
5915  {
5916  link_target(_PTarget);
5917  }
5918  }
5919 
5946 
5947  transformer(Scheduler& _PScheduler,
5948  _Transform_method const& _Func,
5949  _Inout_opt_ ITarget<_Output> * _PTarget,
5950  filter_method const& _Filter) :
5951  _M_pFunc(_Func)
5952  {
5953  initialize_source_and_target(&_PScheduler);
5954  register_filter(_Filter);
5955 
5956  if (_PTarget != NULL)
5957  {
5958  link_target(_PTarget);
5959  }
5960  }
5961 
5986 
5987  transformer(ScheduleGroup& _PScheduleGroup,
5988  _Transform_method const& _Func,
5989  _Inout_opt_ ITarget<_Output> * _PTarget = NULL) :
5990  _M_pFunc(_Func)
5991  {
5992  initialize_source_and_target(NULL, &_PScheduleGroup);
5993 
5994  if (_PTarget != NULL)
5995  {
5996  link_target(_PTarget);
5997  }
5998  }
5999 
6027 
6028  transformer(ScheduleGroup& _PScheduleGroup,
6029  _Transform_method const& _Func,
6030  _Inout_opt_ ITarget<_Output> * _PTarget,
6031  filter_method const& _Filter) :
6032  _M_pFunc(_Func)
6033  {
6034  initialize_source_and_target(NULL, &_PScheduleGroup);
6035  register_filter(_Filter);
6036 
6037  if (_PTarget != NULL)
6038  {
6039  link_target(_PTarget);
6040  }
6041  }
6042 #endif /* _CRT_USE_WINAPI_FAMILY_DESKTOP_APP */
6043 
6047 
6049  {
6050  // Remove all links
6052 
6053  // Clean up any messages left in this message block
6055  }
6056 
6057 protected:
6058 
6059  // Propagator_block protected function implementations
6060 
6075 
6077  {
6078  // It is important that calls to propagate do *not* take the same lock on the
6079  // internal structure that is used by Consume and the LWT. Doing so could
6080  // result in a deadlock with the Consume call.
6081 
6082  message_status _Result = accepted;
6083 
6084  //
6085  // Accept the message being propagated
6086  // Note: depending on the source block propagating the message
6087  // this may not necessarily be the same message (pMessage) first
6088  // passed into the function.
6089  //
6090  _PMessage = _PSource->accept(_PMessage->msg_id(), this);
6091 
6092  if (_PMessage != NULL)
6093  {
6094  // Enqueue the input message
6095  _M_inputMessages.push(_PMessage);
6096  async_send(NULL);
6097  }
6098  else
6099  {
6100  _Result = missed;
6101  }
6102 
6103  return _Result;
6104  }
6105 
6120 
6122  {
6123  _PMessage = _PSource->accept(_PMessage->msg_id(), this);
6124 
6125  if (_PMessage != NULL)
6126  {
6127  // Enqueue the input message
6128  _M_inputMessages.push(_PMessage);
6129  sync_send(NULL);
6130  }
6131  else
6132  {
6133  return missed;
6134  }
6135 
6136  return accepted;
6137  }
6138 
6146 
6148  {
6149  return true;
6150  }
6151 
6162 
6163  virtual message<_Output> * accept_message(runtime_object_identity _MsgId)
6164  {
6165  //
6166  // Peek at the head message in the message buffer. If the IDs match
6167  // dequeue and transfer ownership
6168  //
6169  message<_Output> * _Msg = NULL;
6170 
6171  if (_M_messageBuffer._Is_head(_MsgId))
6172  {
6173  _Msg = _M_messageBuffer._Dequeue();
6174  }
6175 
6176  return _Msg;
6177  }
6178 
6192 
6193  virtual bool reserve_message(runtime_object_identity _MsgId)
6194  {
6195  // Allow reservation if this is the head message
6196  return _M_messageBuffer._Is_head(_MsgId);
6197  }
6198 
6212 
6213  virtual message<_Output> * consume_message(runtime_object_identity _MsgId)
6214  {
6215  // By default, accept the message
6216  return accept_message(_MsgId);
6217  }
6218 
6225 
6226  virtual void release_message(runtime_object_identity _MsgId)
6227  {
6228  // The head message is the one reserved.
6229  if (!_M_messageBuffer._Is_head(_MsgId))
6230  {
6231  throw message_not_found();
6232  }
6233  }
6234 
6238 
6239  virtual void resume_propagation()
6240  {
6241  // If there are any messages in the buffer, propagate them out
6242  if (_M_messageBuffer._Count() > 0)
6243  {
6244  // async send a NULL value to initiate the repropagation
6245  async_send(NULL);
6246  }
6247  }
6248 
6255 
6257  {
6258  // If the message queue is blocked due to reservation
6259  // there is no need to do any message propagation
6260  if (_M_pReservedFor != NULL)
6261  {
6262  return;
6263  }
6264 
6266  }
6267 
6271 
6273  {
6274  message<_Output> * _Msg = NULL;
6275 
6276  // Process input message.
6277  message<_Input> * _PInputMessage = NULL;
6278  _M_inputMessages.try_pop(_PInputMessage);
6279 
6280  if (_PInputMessage != NULL)
6281  {
6282  // Invoke the TransformMethod on the data
6283  // Let exceptions flow
6284  _Output _Out = _M_pFunc(_PInputMessage->payload);
6285 
6286  // Reuse the input message ID
6287  _Msg = new message<_Output>(_Out, _PInputMessage->msg_id());
6288  _M_messageBuffer._Enqueue(_Msg);
6289 
6290  // Message cleanup
6291  delete _PInputMessage;
6292 
6293  if (!_M_messageBuffer._Is_head(_Msg->msg_id()))
6294  {
6295  return;
6296  }
6297  }
6298 
6300  }
6301 
6302 private:
6303 
6310 
6312  {
6313  message<_Target_type> * _Msg = _MessageBuffer._Peek();
6314 
6315  // If someone has reserved the _Head message, don't propagate anymore
6316  if (_M_pReservedFor != NULL)
6317  {
6318  return;
6319  }
6320 
6321  while (_Msg != NULL)
6322  {
6323  message_status _Status = declined;
6324 
6325  // Always start from the first target that linked
6326  for (target_iterator _Iter = _M_connectedTargets.begin(); *_Iter != NULL; ++_Iter)
6327  {
6328  ITarget<_Target_type> * _PTarget = *_Iter;
6329  _Status = _PTarget->propagate(_Msg, this);
6330 
6331  // Ownership of message changed. Do not propagate this
6332  // message to any other target.
6333  if (_Status == accepted)
6334  {
6335  break;
6336  }
6337 
6338  // If the target just propagated to reserved this message, stop
6339  // propagating it to others
6340  if (_M_pReservedFor != NULL)
6341  {
6342  break;
6343  }
6344  }
6345 
6346  // If status is anything other than accepted, then the head message
6347  // was not propagated out. Thus, nothing after it in the queue can
6348  // be propagated out. Cease propagation.
6349  if (_Status != accepted)
6350  {
6351  break;
6352  }
6353 
6354  // Get the next message
6355  _Msg = _MessageBuffer._Peek();
6356  }
6357  }
6358 
6363 
6365  {
6366  // Delete input messages
6367  // Because the transformer uses its own input queue, it's possible there are messages
6368  // in this queue and no LWT will be executed to handle them.
6369  message<_Input> * _PInputQueueMessage = NULL;
6370 
6371  while (_M_inputMessages.try_pop(_PInputQueueMessage))
6372  {
6373  // Message cleanup
6374  delete _PInputQueueMessage;
6375  }
6376 
6377  // Delete any messages remaining in the output queue
6378  for (;;)
6379  {
6380  message<_Output> * _Msg = _M_messageBuffer._Dequeue();
6381  if (_Msg == NULL)
6382  {
6383  break;
6384  }
6385  delete _Msg;
6386  }
6387  }
6388 
6389  //
6390  // Private Data Members
6391  //
6392 
6393  // The transformer method called by this block
6394  _Transform_method _M_pFunc;
6395 
6396  // The queue of input messages for this Transformer block
6398 
6402 
6404 
6405 private:
6406  //
6407  // Hide assignment operator and copy constructor
6408  //
6409  transformer const &operator =(transformer const &); // no assignment operator
6410  transformer(transformer const &); // no copy constructor
6411 };
6412 
6413 //**************************************************************************
6414 // Timer:
6415 //**************************************************************************
6427 
6428 template<class _Type>
6429 class timer : public Concurrency::details::_Timer, public source_block<single_link_registry<ITarget<_Type>>>
6430 {
6431 private:
6432 
6436 
6437  enum State
6438  {
6442 
6447 
6452 
6457 
6459  };
6460 
6461 public:
6462 
6485 
6486  timer(unsigned int _Ms, _Type const& _Value, ITarget<_Type> *_PTarget = NULL, bool _Repeating = false) :
6487  _Timer(_Ms, _Repeating)
6488  {
6489  _Initialize(_Value, _PTarget, _Repeating);
6490  }
6491 
6492 #ifdef _CRT_USE_WINAPI_FAMILY_DESKTOP_APP
6493 
6519  timer(Scheduler& _Scheduler, unsigned int _Ms, _Type const& _Value, _Inout_opt_ ITarget<_Type> *_PTarget = NULL, bool _Repeating = false) :
6520  _Timer(_Ms, _Repeating)
6521  {
6522  _Initialize(_Value, _PTarget, _Repeating, &_Scheduler);
6523  }
6524 
6551 
6552  timer(ScheduleGroup& _ScheduleGroup, unsigned int _Ms, _Type const& _Value, _Inout_opt_ ITarget<_Type> *_PTarget = NULL, bool _Repeating = false) :
6553  _Timer(_Ms, _Repeating)
6554  {
6555  _Initialize(_Value, _PTarget, _Repeating, NULL, &_ScheduleGroup);
6556  }
6557 #endif /* _CRT_USE_WINAPI_FAMILY_DESKTOP_APP */
6558 
6562 
6564  {
6565  //
6566  // Make sure there are no more outstanding timer fires. Note that this does not mean that the LWT that was queued is finished, it only
6567  // means that no more timers will fire after the return from _Stop. We still *MUST* wait on any outstanding LWTs.
6568  //
6569  if (_M_state == Started)
6570  _Stop();
6571 
6572  // Remove all the targets. This will wait for any outstanding LWTs
6573  remove_targets();
6574 
6575  //
6576  // No more asynchronous operations can happen as of this point.
6577  //
6578 
6579  // Clean up any messages left in this message block
6581 
6583  {
6585  }
6586  }
6587 
6592 
6593  void start()
6594  {
6595  if (_M_state == Initialized || _M_state == Paused)
6596  {
6597  _M_state = Started;
6598  _Start();
6599  }
6600  }
6601 
6605 
6606  void stop()
6607  {
6608  if (_M_state == Started)
6609  _Stop();
6610 
6611  _M_state = Stopped;
6612  }
6613 
6618 
6619  void pause()
6620  {
6621  //
6622  // Non repeating timers cannot pause. They go to a final stopped state on pause.
6623  //
6624  if (!_M_fRepeating)
6625  {
6626  stop();
6627  }
6628  else
6629  {
6630  // Pause only a started timer.
6631 
6632  if (_M_state == Started)
6633  {
6634  _Stop();
6635  _M_state = Paused;
6636  }
6637  }
6638  }
6639 
6640 protected:
6641 
6652 
6653  virtual message<_Type> * accept_message(runtime_object_identity _MsgId)
6654  {
6655  if (_M_pMessage == NULL || _MsgId != _M_pMessage->msg_id())
6656  {
6657  return NULL;
6658  }
6659 
6660  message<_Type> *_PMessage = _M_pMessage;
6661  _M_pMessage = NULL;
6662 
6663  return _PMessage;
6664  }
6665 
6679 
6680  virtual bool reserve_message(runtime_object_identity _MsgId)
6681  {
6682  //
6683  // Semantically, every timer tick is the same value -- it doesn't matter the message ID. Because we can only
6684  // have one target as well, we do not need to track anything here.
6685  //
6686  if (_M_pMessage == NULL || _M_pMessage->msg_id() != _MsgId)
6687  {
6688  return false;
6689  }
6690 
6691  return true;
6692  }
6693 
6707 
6708  virtual message<_Type> * consume_message(runtime_object_identity _MsgId)
6709  {
6710  return accept_message(_MsgId);
6711  }
6712 
6719 
6720  virtual void release_message(runtime_object_identity _MsgId)
6721  {
6722  if (_M_pMessage == NULL || _M_pMessage->msg_id() != _MsgId)
6723  {
6724  throw message_not_found();
6725  }
6726 
6727  delete _M_pMessage;
6728  _M_pMessage = NULL;
6729  }
6730 
6734 
6735  virtual void resume_propagation()
6736  {
6737  // Because reservation doesn't prevent propagation there is
6738  // no need to resume on consume/release.
6739  }
6740 
6747 
6749  {
6750  // If there is a timer message sitting around, it must be propagated to the target now.
6751 
6752  if (_M_pMessage != NULL)
6753  {
6754  _PTarget->propagate(_M_pMessage, this);
6755  }
6756  }
6757 
6761 
6763  {
6764  if (_M_pMessage == NULL)
6765  {
6767  for (target_iterator _Iter = _M_connectedTargets.begin(); *_Iter != NULL; ++_Iter)
6768  {
6769  ITarget<_Type> * _PTarget = *_Iter;
6770  _PTarget->propagate(_M_pMessage, this);
6771  }
6772  }
6773  }
6774 
6775 private:
6776 
6777  // The timer message we contain
6779 
6780  // Current state of the timer.
6782 
6783  // The value to send on elapse of the timer.
6785 
6786  // An indication of whether the timer is repeating.
6788 
6789  // A flag for whether we need to release a reference on the scheduler.
6791 
6792  // Scheduler used for the timer
6793  Scheduler * _M_pScheduler;
6794 
6798 
6800  {
6801  return new message<_Type>(_M_value);
6802  }
6803 
6807 
6808  virtual void _Fire()
6809  {
6810  async_send(NULL);
6811  }
6812 
6825 
6826  void _Initialize(const _Type& _Value, _Inout_ ITarget<_Type> *_PTarget, bool _Repeating, _Inout_opt_ Scheduler * _PScheduler = NULL, _Inout_opt_ ScheduleGroup * _PScheduleGroup = NULL)
6827  {
6828  _M_pMessage = NULL;
6829  _M_value = _Value;
6830  _M_fRepeating = _Repeating;
6831  _M_state = Initialized;
6832  _M_fReferencedScheduler = false;
6833 
6834  //
6835  // If we are going to utilize the current scheduler for timer firing, we need to capture it now. Otherwise,
6836  // the timer threads fired from Windows (what _Fire executes within) will wind up with a default scheduler
6837  // attached -- probably not the semantic we want.
6838  //
6839  if (_PScheduleGroup == NULL && _PScheduler == NULL)
6840  {
6842  _PScheduler = _sched._GetScheduler();
6843  _sched._Reference();
6844  _M_fReferencedScheduler = true;
6845  }
6846 
6847  _M_pScheduler = _PScheduler;
6848  initialize_source(_PScheduler, _PScheduleGroup);
6849 
6850  if (_PTarget != NULL)
6851  {
6852  link_target(_PTarget);
6853  }
6854  }
6855 
6860 
6862  {
6863  // Input messages for this message block are in the base-class input buffer
6864  // All messages in that buffer are guaranteed to have moved to the output
6865  // buffer because the destructor first waits for all async sends to finish
6866  // before reaching this point
6867 
6868  // Delete the message remaining in the output queue
6869  if (_M_pMessage != NULL)
6870  {
6871  delete _M_pMessage;
6872  }
6873  }
6874 
6875 private:
6876  //
6877  // Hide assignment operator and copy constructor
6878  //
6879  timer const &operator =(timer const &); // no assignment operator
6880  timer(timer const &); // no copy constructor
6881 };
6882 
6883 //**************************************************************************
6884 // Single assignment:
6885 //**************************************************************************
6886 
6901 
6902 template<class _Type>
6903 class single_assignment : public propagator_block<multi_link_registry<ITarget<_Type>>, multi_link_registry<ISource<_Type>>>
6904 {
6905 public:
6906 
6919 
6922  {
6924  }
6925 
6941 
6944  {
6946  register_filter(_Filter);
6947  }
6948 
6949 #ifdef _CRT_USE_WINAPI_FAMILY_DESKTOP_APP
6950 
6966  single_assignment(Scheduler& _PScheduler) :
6968  {
6969  initialize_source_and_target(&_PScheduler);
6970  }
6971 
6990 
6991  single_assignment(Scheduler& _PScheduler, filter_method const& _Filter) :
6993  {
6994  initialize_source_and_target(&_PScheduler);
6995  register_filter(_Filter);
6996  }
6997 
7014 
7015  single_assignment(ScheduleGroup& _PScheduleGroup) :
7017  {
7018  initialize_source_and_target(NULL, &_PScheduleGroup);
7019  }
7020 
7040 
7041  single_assignment(ScheduleGroup& _PScheduleGroup, filter_method const& _Filter) :
7043  {
7044  initialize_source_and_target(NULL, &_PScheduleGroup);
7045  register_filter(_Filter);
7046  }
7047 #endif /* _CRT_USE_WINAPI_FAMILY_DESKTOP_APP */
7048 
7052 
7054  {
7055  // Remove all links
7057 
7058  // Clean up any messages left in this message block
7060  }
7061 
7068 
7069  bool has_value() const
7070  {
7071  return (_M_pMessage != NULL);
7072  }
7073 
7074 
7084 
7085  _Type const & value()
7086  {
7087  if (_M_pMessage == NULL)
7088  {
7089  receive<_Type>(this);
7090  }
7092 
7093  return _M_pMessage->payload;
7094  }
7095 
7096 
7097 protected:
7098 
7113 
7115  {
7116  // It is important that calls to propagate do *not* take the same lock on the
7117  // internal structure that is used by Consume and the LWT. Doing so could
7118  // result in a deadlock with the Consume call.
7119 
7120  message_status _Result = accepted;
7121 
7122  // single_assignment messaging block can be initialized only once
7123  if (_M_fIsInitialized)
7124  {
7125  return declined;
7126  }
7127 
7128  {
7129  _NR_lock _Lock(_M_propagationLock);
7130 
7131  if (_M_fIsInitialized)
7132  {
7133  _Result = declined;
7134  }
7135  else
7136  {
7137  _PMessage = _PSource->accept(_PMessage->msg_id(), this);
7138 
7139  // Set initialized flag only if we have a message
7140  if (_PMessage != NULL)
7141  {
7142  _M_fIsInitialized = true;
7143  }
7144  else
7145  {
7146  _Result = missed;
7147  }
7148  }
7149  }
7150 
7151  //
7152  // If message was accepted, set the member variables for
7153  // this block and start the asynchronous propagation task
7154  //
7155  if (_Result == accepted)
7156  {
7157  async_send(_PMessage);
7158  }
7159 
7160  return _Result;
7161  }
7162 
7177 
7179  {
7180  message_status _Result = accepted;
7181 
7182  // single_assignment messaging block can be initialized only once
7183  if (_M_fIsInitialized)
7184  {
7185  return declined;
7186  }
7187 
7188  {
7189  _NR_lock _Lock(_M_propagationLock);
7190 
7191  if (_M_fIsInitialized)
7192  {
7193  _Result = declined;
7194  }
7195  else
7196  {
7197  _PMessage = _PSource->accept(_PMessage->msg_id(), this);
7198 
7199  // Set initialized flag only if we have a message
7200  if (_PMessage != NULL)
7201  {
7202  _M_fIsInitialized = true;
7203  }
7204  else
7205  {
7206  _Result = missed;
7207  }
7208  }
7209  }
7210 
7211  //
7212  // If message was accepted, set the member variables for
7213  // this block and start the asynchronous propagation task
7214  //
7215  if (_Result == accepted)
7216  {
7217  sync_send(_PMessage);
7218  }
7219 
7220  return _Result;
7221  }
7222 
7238 
7239  virtual message<_Type> * accept_message(runtime_object_identity _MsgId)
7240  {
7241  // This check is to prevent spoofing and verify that the propagated message is
7242  // the one that is accepted at the end.
7243  if (_M_pMessage == NULL || _MsgId != _M_pMessage->msg_id())
7244  {
7245  return NULL;
7246  }
7247 
7248  //
7249  // Instead of returning the internal message, we return a copy of the
7250  // message passed in.
7251  //
7252  // Because we are returning a copy, the accept routine for a single_assignment
7253  // does not need to grab the internal lock.
7254  //
7255  return (new message<_Type>(_M_pMessage->payload));
7256  }
7257 
7271 
7272  virtual bool reserve_message(runtime_object_identity _MsgId)
7273  {
7274  if (_M_pMessage == NULL)
7275  {
7276  return false;
7277  }
7278 
7279  if (_M_pMessage->msg_id() != _MsgId)
7280  {
7281  throw message_not_found();
7282  }
7283 
7284  return true;
7285  }
7286 
7300 
7301  virtual message<_Type> * consume_message(runtime_object_identity _MsgId)
7302  {
7304 
7305  return accept_message(_MsgId);
7306  }
7307 
7314 
7315  virtual void release_message(runtime_object_identity _MsgId)
7316  {
7318 
7319  if (_M_pMessage == NULL || _M_pMessage->msg_id() != _MsgId)
7320  {
7321  throw message_not_found();
7322  }
7323  }
7324 
7328 
7329  virtual void resume_propagation()
7330  {
7331  // Because reservation doesn't stop propagation, we don't
7332  // need to do anything on resume after consume/release.
7333  }
7334 
7341 
7343  {
7344  // If there is a message available already, propagate it.
7345 
7346  if (_M_pMessage != NULL)
7347  {
7348  _PTarget->propagate(_M_pMessage, this);
7349  }
7350  }
7358 
7360  {
7361  // Initialized flag should have been set by the propagate function using interlocked operation.
7363 
7364  // Move the message to the internal storage
7365 
7367  _M_pMessage = _PMessage;
7368 
7369  for (target_iterator _Iter = _M_connectedTargets.begin(); *_Iter != NULL; ++_Iter)
7370  {
7371  // Single assignment can propagate its message out
7372  // to any number of Targets
7373 
7374  ITarget<_Type> * _PTarget = *_Iter;
7375  _PTarget->propagate(_PMessage, this);
7376  }
7377  }
7378 
7379 private:
7380 
7385 
7387  {
7388  // Input messages for this message block are in the base-class input buffer
7389  // All messages in that buffer are guaranteed to have moved to the output
7390  // buffer because the destructor first waits for all async sends to finish
7391  // before reaching this point
7392 
7393  // The messages for a single_assignment are deleted at the end when
7394  // single_assignment is deleted.
7395  delete _M_pMessage;
7396  }
7397 
7398  //
7399  // Private Data Members
7400  //
7401 
7402  // The message being stored
7404 
7405  // The lock used to protect propagation
7407 
7408  // The marker for whether the single_assignment has already been initialized
7409  volatile bool _M_fIsInitialized;
7410 
7411 private:
7412  //
7413  // Hide assignment operator and copy constructor
7414  //
7415  single_assignment const & operator=(single_assignment const &); // no assignment operator
7416  single_assignment(single_assignment const &); // no copy constructor
7417 };
7418 
7419 //**************************************************************************
7420 // Join (single-type)
7421 //**************************************************************************
7422 
7426 
7432 
7433  greedy = 0,
7438 
7440 };
7441 
7459 
7460 template<class _Type, join_type _Jtype = non_greedy>
7461 class join : public propagator_block<single_link_registry<ITarget<std::vector<_Type>>>, multi_link_registry<ISource<_Type>>>
7462 {
7463 public:
7464  typedef typename std::vector<_Type> _OutputType;
7465 
7481 
7482  join(size_t _NumInputs)
7483  : _M_messageArray(_NumInputs),
7484  _M_savedMessageIdArray(_NumInputs)
7485  {
7486  _Initialize(_NumInputs);
7487  }
7488 
7507 
7508  join(size_t _NumInputs, filter_method const& _Filter)
7509  : _M_messageArray(_NumInputs),
7510  _M_savedMessageIdArray(_NumInputs)
7511  {
7512  _Initialize(_NumInputs);
7513  register_filter(_Filter);
7514  }
7515 
7516 #ifdef _CRT_USE_WINAPI_FAMILY_DESKTOP_APP
7517 
7536  join(Scheduler& _PScheduler, size_t _NumInputs)
7537  : _M_messageArray(_NumInputs),
7538  _M_savedMessageIdArray(_NumInputs)
7539  {
7540  _Initialize(_NumInputs, &_PScheduler);
7541  }
7542 
7564 
7565  join(Scheduler& _PScheduler, size_t _NumInputs, filter_method const& _Filter)
7566  : _M_messageArray(_NumInputs),
7567  _M_savedMessageIdArray(_NumInputs)
7568  {
7569  _Initialize(_NumInputs, &_PScheduler);
7570  register_filter(_Filter);
7571  }
7572 
7592 
7593  join(ScheduleGroup& _PScheduleGroup, size_t _NumInputs)
7594  : _M_messageArray(_NumInputs),
7595  _M_savedMessageIdArray(_NumInputs)
7596  {
7597  _Initialize(_NumInputs, NULL, &_PScheduleGroup);
7598  }
7599 
7622 
7623  join(ScheduleGroup& _PScheduleGroup, size_t _NumInputs, filter_method const& _Filter)
7624  : _M_messageArray(_NumInputs),
7625  _M_savedMessageIdArray(_NumInputs)
7626  {
7627  _Initialize(_NumInputs, NULL, &_PScheduleGroup);
7628  register_filter(_Filter);
7629  }
7630 #endif /* _CRT_USE_WINAPI_FAMILY_DESKTOP_APP */
7631 
7635 
7637  {
7638  // Remove all links that are targets of this join
7640 
7641  // Clean up any messages left in this message block
7643 
7644  delete [] _M_savedIdBuffer;
7645  }
7646 
7647 protected:
7648  //
7649  // propagator_block protected function implementations
7650  //
7651 
7666 
7668  {
7669  // It is important that calls to propagate do *not* take the same lock on the
7670  // internal structure that is used by Consume and the LWT. Doing so could
7671  // result in a deadlock with the Consume call.
7672 
7673  message_status _Ret_val = accepted;
7674 
7675  //
7676  // Find the slot index of this source
7677  //
7678  size_t _Slot = 0;
7679  bool _Found = false;
7680  for (source_iterator _Iter = _M_connectedSources.begin(); *_Iter != NULL; ++_Iter)
7681  {
7682  if (*_Iter == _PSource)
7683  {
7684  _Found = true;
7685  break;
7686  }
7687 
7688  _Slot++;
7689  }
7690 
7691  if (!_Found)
7692  {
7693  // If this source was not found in the array, this is not a connected source
7694  // decline the message
7695  return declined;
7696  }
7697 
7699 
7700  bool fIsGreedy = (_Jtype == greedy);
7701 
7702  if (fIsGreedy)
7703  {
7704  //
7705  // Greedy type joins immediately accept the message.
7706  //
7707  {
7708  _NR_lock lockHolder(_M_propagationLock);
7709  if (_M_messageArray._M_messages[_Slot] != NULL)
7710  {
7711  _M_savedMessageIdArray._M_savedIds[_Slot] = _PMessage->msg_id();
7712  _Ret_val = postponed;
7713  }
7714  }
7715 
7716  if (_Ret_val != postponed)
7717  {
7718  _M_messageArray._M_messages[_Slot] = _PSource->accept(_PMessage->msg_id(), this);
7719 
7720  if (_M_messageArray._M_messages[_Slot] != NULL)
7721  {
7723  {
7724  // If messages have arrived on all links, start a propagation
7725  // of the current message
7726  async_send(NULL);
7727  }
7728  }
7729  else
7730  {
7731  _Ret_val = missed;
7732  }
7733  }
7734  }
7735  else
7736  {
7737  //
7738  // Non-greedy type joins save the message IDs until they have all arrived
7739  //
7740 
7741  if (_InterlockedExchange((volatile long *) &_M_savedMessageIdArray._M_savedIds[_Slot], _PMessage->msg_id()) == -1)
7742  {
7743  // Decrement the message remaining count if this thread is switching
7744  // the saved ID from -1 to a valid value.
7746  {
7747  async_send(NULL);
7748  }
7749  }
7750 
7751  // Always return postponed. This message will be consumed
7752  // in the LWT
7753  _Ret_val = postponed;
7754  }
7755 
7756  return _Ret_val;
7757  }
7758 
7769 
7770  virtual message<_OutputType> * accept_message(runtime_object_identity _MsgId)
7771  {
7772  //
7773  // Peek at the head message in the message buffer. If the IDs match
7774  // dequeue and transfer ownership
7775  //
7776  message<_OutputType> * _Msg = NULL;
7777 
7778  if (_M_messageBuffer._Is_head(_MsgId))
7779  {
7780  _Msg = _M_messageBuffer._Dequeue();
7781  }
7782 
7783  return _Msg;
7784  }
7785 
7799 
7800  virtual bool reserve_message(runtime_object_identity _MsgId)
7801  {
7802  // Allow reservation if this is the head message
7803  return _M_messageBuffer._Is_head(_MsgId);
7804  }
7805 
7819 
7820  virtual message<_OutputType> * consume_message(runtime_object_identity _MsgId)
7821  {
7822  // By default, accept the message
7823  return accept_message(_MsgId);
7824  }
7825 
7832 
7833  virtual void release_message(runtime_object_identity _MsgId)
7834  {
7835  // The head message is the one reserved.
7836  if (!_M_messageBuffer._Is_head(_MsgId))
7837  {
7838  throw message_not_found();
7839  }
7840  }
7841 
7845 
7846  virtual void resume_propagation()
7847  {
7848  // If there are any messages in the buffer, propagate them out
7849  if (_M_messageBuffer._Count() > 0)
7850  {
7851  async_send(NULL);
7852  }
7853  }
7854 
7861 
7862  virtual void link_target_notification(_Inout_ ITarget<std::vector<_Type>> *)
7863  {
7864  // If the message queue is blocked due to reservation
7865  // there is no need to do any message propagation
7866  if (_M_pReservedFor != NULL)
7867  {
7868  return;
7869  }
7870 
7872  }
7873 
7879 
7881  {
7882  message<_OutputType> * _Msg = NULL;
7883  // Create a new message from the input sources
7884  // If messagesRemaining == 0, we have a new message to create. Otherwise, this is coming from
7885  // a consume or release from the target. In that case we don't want to create a new message.
7886  if (_M_messagesRemaining == 0)
7887  {
7888  // A greedy join can immediately create the message, a non-greedy
7889  // join must try and consume all the messages it has postponed
7890  _Msg = _Create_new_message();
7891  }
7892 
7893  if (_Msg == NULL)
7894  {
7895  // Create message failed. This happens in non_greedy joins when the
7896  // reserve/consumption of a postponed message failed.
7898  return;
7899  }
7900 
7901  bool fIsGreedy = (_Jtype == greedy);
7902 
7903  // For a greedy join, reset the number of messages remaining
7904  // Check to see if multiple messages have been passed in on any of the links,
7905  // and postponed. If so, try and reserve/consume them now
7906  if (fIsGreedy)
7907  {
7908  // Look at the saved IDs and reserve/consume any that have passed in while
7909  // this join was waiting to complete
7911 
7912  for (size_t i = 0; i < _M_messageArray._M_count; i++)
7913  {
7914  for(;;)
7915  {
7916  runtime_object_identity _Saved_id;
7917  // Grab the current saved ID value. This value could be changing from based on any
7918  // calls of source->propagate(this). If the message ID is different than what is snapped
7919  // here, that means, the reserve below must fail. This is because reserve is trying
7920  // to get the same source lock the propagate(this) call must be holding.
7921  {
7922  _NR_lock lockHolder(_M_propagationLock);
7923 
7925 
7926  _Saved_id = _M_savedMessageIdArray._M_savedIds[i];
7927 
7928  if (_Saved_id == -1)
7929  {
7931  break;
7932  }
7933  else
7934  {
7936  }
7937  }
7938 
7939  if (_Saved_id != -1)
7940  {
7942 
7943  ISource<_Type> * _PSource = _Iter[i];
7944  if ((_PSource != NULL) && _PSource->reserve(_Saved_id, this))
7945  {
7946  _M_messageArray._M_messages[i] = _PSource->consume(_Saved_id, this);
7948  break;
7949  }
7950  }
7951  }
7952  }
7953 
7954  // If messages have all been received, async_send again, this will start the
7955  // LWT up to create a new message
7956  if (_M_messagesRemaining == 0)
7957  {
7958  async_send(NULL);
7959  }
7960  }
7961 
7962  // Add the new message to the outbound queue
7963  _M_messageBuffer._Enqueue(_Msg);
7964 
7965  if (!_M_messageBuffer._Is_head(_Msg->msg_id()))
7966  {
7967  // another message is at the head of the outbound message queue and blocked
7968  // simply return
7969  return;
7970  }
7971 
7973  }
7974 
7975 private:
7976 
7977  //
7978  // Private Methods
7979  //
7980 
7987 
7989  {
7990  message<_Target_type> * _Msg = _MessageBuffer._Peek();
7991 
7992  // If someone has reserved the _Head message, don't propagate anymore
7993  if (_M_pReservedFor != NULL)
7994  {
7995  return;
7996  }
7997 
7998  while (_Msg != NULL)
7999  {
8000  message_status _Status = declined;
8001 
8002  // Always start from the first target that linked
8003  for (target_iterator _Iter = _M_connectedTargets.begin(); *_Iter != NULL; ++_Iter)
8004  {
8005  ITarget<_Target_type> * _PTarget = *_Iter;
8006  _Status = _PTarget->propagate(_Msg, this);
8007 
8008  // Ownership of message changed. Do not propagate this
8009  // message to any other target.
8010  if (_Status == accepted)
8011  {
8012  break;
8013  }
8014 
8015  // If the target just propagated to reserved this message, stop
8016  // propagating it to others
8017  if (_M_pReservedFor != NULL)
8018  {
8019  break;
8020  }
8021  }
8022 
8023  // If status is anything other than accepted, then the head message
8024  // was not propagated out. Thus, nothing after it in the queue can
8025  // be propagated out. Cease propagation.
8026  if (_Status != accepted)
8027  {
8028  break;
8029  }
8030 
8031  // Get the next message
8032  _Msg = _MessageBuffer._Peek();
8033  }
8034  }
8035 
8042 
8044  {
8045  bool fIsNonGreedy = (_Jtype == non_greedy);
8046 
8047  // If this is a non-greedy join, check each source and try to consume their message
8048  if (fIsNonGreedy)
8049  {
8050 
8051  // The iterator _Iter below will ensure that it is safe to touch
8052  // non-NULL source pointers. Take a snapshot.
8053  std::vector<ISource<_Type> *> _Sources;
8055 
8056  while (*_Iter != NULL)
8057  {
8058  ISource<_Type> * _PSource = *_Iter;
8059 
8060  if (_PSource == NULL)
8061  {
8062  break;
8063  }
8064 
8065  _Sources.push_back(_PSource);
8066  ++_Iter;
8067  }
8068 
8069  if (_Sources.size() != _M_messageArray._M_count)
8070  {
8071  // Some of the sources were unlinked. The join is broken
8072  return NULL;
8073  }
8074 
8075  // First, try and reserve all the messages. If a reservation fails,
8076  // then release any reservations that had been made.
8077  for (size_t i = 0; i < _M_savedMessageIdArray._M_count; i++)
8078  {
8079  // Snap the current saved ID into a buffer. This value can be changing behind the scenes from
8080  // other source->propagate(msg, this) calls, but if so, that just means the reserve below will
8081  // fail.
8083  _M_savedIdBuffer[i] = _InterlockedExchange((volatile long *) &_M_savedMessageIdArray._M_savedIds[i], -1);
8084 
8086 
8087  if (!_Sources[i]->reserve(_M_savedIdBuffer[i], this))
8088  {
8089  // A reservation failed, release all reservations made up until
8090  // this block, and wait for another message to arrive on this link
8091  for (size_t j = 0; j < i; j++)
8092  {
8093  _Sources[j]->release(_M_savedIdBuffer[j], this);
8094  if (_InterlockedCompareExchange((volatile long *) &_M_savedMessageIdArray._M_savedIds[j], _M_savedIdBuffer[j], -1) == -1)
8095  {
8097  {
8098  async_send(NULL);
8099  }
8100  }
8101  }
8102 
8103  // Return NULL to indicate that the create failed
8104  return NULL;
8105  }
8106  }
8107 
8108  // Because everything has been reserved, consume all the messages.
8109  // This is guaranteed to return true.
8110  for (size_t i = 0; i < _M_messageArray._M_count; i++)
8111  {
8112  _M_messageArray._M_messages[i] = _Sources[i]->consume(_M_savedIdBuffer[i], this);
8113  _M_savedIdBuffer[i] = -1;
8114  }
8115  }
8116 
8117  if (!fIsNonGreedy)
8118  {
8119  // Reinitialize how many messages are being waited for.
8120  // This is safe because all messages have been received, thus no new async_sends for
8121  // greedy joins can be called.
8123  }
8124 
8125  std::vector<_Type> _OutputVector;
8126  for (size_t i = 0; i < _M_messageArray._M_count; i++)
8127  {
8129  _OutputVector.push_back(_M_messageArray._M_messages[i]->payload);
8130 
8131  delete _M_messageArray._M_messages[i];
8132  if (fIsNonGreedy)
8133  {
8135  }
8136  }
8137  return (new message<std::vector<_Type>>(_OutputVector));
8138  }
8139 
8155 
8156  void _Initialize(size_t _NumInputs, Scheduler * _PScheduler = NULL, ScheduleGroup * _PScheduleGroup = NULL)
8157  {
8158  initialize_source_and_target(_PScheduler, _PScheduleGroup);
8159 
8160  _M_connectedSources.set_bound(_NumInputs);
8161  _M_messagesRemaining = _NumInputs;
8162 
8163  bool fIsNonGreedy = (_Jtype == non_greedy);
8164 
8165  if (fIsNonGreedy)
8166  {
8167  // Non greedy joins need a buffer to snap off saved message IDs to.
8168  _M_savedIdBuffer = new runtime_object_identity[_NumInputs];
8169  memset(_M_savedIdBuffer, -1, sizeof(runtime_object_identity) * _NumInputs);
8170  }
8171  else
8172  {
8174  }
8175  }
8176 
8181 
8183  {
8184  // Input messages for this message block are in the base-class input buffer
8185  // All messages in that buffer are guaranteed to have moved to the output
8186  // buffer because the destructor first waits for all async sends to finish
8187  // before reaching this point
8188 
8189  // Delete any messages remaining in the output queue
8190  for (;;)
8191  {
8192  message<std::vector<_Type>> * _Msg = _M_messageBuffer._Dequeue();
8193  if (_Msg == NULL)
8194  {
8195  break;
8196  }
8197  delete _Msg;
8198  }
8199  }
8200 
8201  // The current number of messages remaining
8202  volatile size_t _M_messagesRemaining;
8203 
8204  // An array containing the accepted messages of this join.
8205  // Wrapped in a struct to enable debugger visualization.
8207  {
8208  size_t _M_count;
8210 
8211  _MessageArray(size_t _NumInputs)
8212  : _M_count(_NumInputs),
8213  _M_messages(new message<_Type>*[_NumInputs])
8214  {
8215  memset(_M_messages, 0, sizeof(message<_Type> *) * _NumInputs);
8216  }
8217 
8219  {
8220  for (size_t i = 0; i < _M_count; i++)
8221  delete _M_messages[i];
8222  delete [] _M_messages;
8223  }
8224  };
8226 
8227  // An array containing the msg IDs of messages propagated to the array
8228  // For greedy joins, this contains a log of other messages passed to this
8229  // join after the first has been accepted
8230  // For non-greedy joins, this contains the message ID of any message
8231  // passed to it.
8232  // Wrapped in a struct to enable debugger visualization.
8234  {
8235  size_t _M_count;
8236  runtime_object_identity * _M_savedIds;
8237 
8238  _SavedMessageIdArray(size_t _NumInputs)
8239  : _M_count(_NumInputs),
8240  _M_savedIds(new runtime_object_identity[_NumInputs])
8241  {
8242  memset(_M_savedIds, -1, sizeof(runtime_object_identity) * _NumInputs);
8243  }
8244 
8246  {
8247  delete [] _M_savedIds;
8248  }
8249  };
8251 
8252  // Buffer for snapping saved IDs in non-greedy joins
8253  runtime_object_identity * _M_savedIdBuffer;
8254 
8255  // A lock for modifying the buffer or the connected blocks
8257 
8258  // Queue to hold output messages
8260 };
8261 
8262 
8263 //**************************************************************************
8264 // Multi-Type Choice and Join helper node:
8265 //**************************************************************************
8266 
8276 
8277 template<class _Type>
8278 class _Order_node_base: public propagator_block<single_link_registry<ITarget<size_t>>, multi_link_registry<ISource<_Type>>>
8279 {
8280 public:
8281 
8286 
8288  _M_index(0),
8291  {
8292  }
8293 
8297 
8299  {
8300  // The messages for an _Order_node_base are deleted at the end when
8301  // _Order_node_base is deleted.
8302  delete _M_pReceiveMessage;
8303  delete _M_pSendMessage;
8304  }
8305 
8312 
8313  bool has_value() const
8314  {
8315  return (_M_pReceiveMessage != NULL);
8316  }
8317 
8324 
8325  _Type const & value()
8326  {
8328 
8329  return _M_pReceiveMessage->payload;
8330  }
8331 
8340 
8341  virtual void _Reset() = 0;
8342 
8355 
8356  virtual bool reserve_message(runtime_object_identity)
8357  {
8358  // reserve should never be called for this block.
8359  _CONCRT_ASSERT(false);
8360 
8361  return false;
8362  }
8363 
8377 
8378  virtual message<size_t> * consume_message(runtime_object_identity)
8379  {
8380  // consume should never be called for this block.
8381  _CONCRT_ASSERT(false);
8382 
8383  return NULL;
8384  }
8385 
8392 
8393  virtual void release_message(runtime_object_identity)
8394  {
8395  // release should never be called for this block.
8396  _CONCRT_ASSERT(false);
8397  }
8398 
8399 protected:
8400 
8401 
8405 
8406  virtual void resume_propagation()
8407  {
8408  // Because there is only a single target, nothing needs
8409  // to be done on resume
8410  }
8411 
8418 
8420  {
8421  if (_M_pSendMessage != NULL)
8422  {
8424  }
8425  }
8426 
8430 
8432  {
8434  }
8435 
8439 
8440  void _Initialize_order_node(ISource<_Type> * _PSource, size_t _Index, ITarget<size_t> * _PTarget, Scheduler * _PScheduler = NULL, ScheduleGroup * _PScheduleGroup = NULL)
8441  {
8442  if (_Index < 0)
8443  {
8444  throw std::invalid_argument("_Index");
8445  }
8446 
8447  if (_PSource == NULL)
8448  {
8449  throw std::invalid_argument("_PSource");
8450  }
8451 
8452  _M_index = _Index;
8453 
8454  initialize_source_and_target(_PScheduler, _PScheduleGroup);
8455 
8456  // Allow only a single source and ensure that they
8457  // cannot be unlinked and relinked.
8459 
8460  if (_PTarget != NULL)
8461  {
8462  link_target(_PTarget);
8463  }
8464 
8465  _PSource->link_target(this);
8466  }
8467 
8468  //
8469  // Private Data Members
8470  //
8471 
8472  // The message to be received from the source
8474 
8475  // The message to be sent to all targets
8477 
8478  // The index of the _Order_node_base in the user's construct
8479  size_t _M_index;
8480 
8481 private:
8482  //
8483  // Hide assignment operator and copy constructor
8484  //
8485  _Order_node_base const & operator=(_Order_node_base const &); // no assignment operator
8486  _Order_node_base(_Order_node_base const &); // no copy constructor
8487 };
8488 
8489 
8498 
8499 template<class _Type>
8500 class _Reserving_node: public _Order_node_base<_Type>
8501 {
8502 public:
8516 
8517  _Reserving_node(ISource<_Type> * _PSource, size_t _Index, ITarget<size_t> * _PTarget = NULL) :
8519  _M_savedId(-1),
8521  {
8522  _Initialize_order_node(_PSource, _Index, _PTarget);
8523  }
8524 
8541 
8542  _Reserving_node(ISource<_Type> * _PSource, size_t _Index, ITarget<size_t> * _PTarget, filter_method const& _Filter) :
8544  _M_savedId(-1),
8546  {
8547  register_filter(_Filter);
8548  _Initialize_order_node(_PSource, _Index, _PTarget);
8549  }
8550 
8567 
8568  _Reserving_node(Scheduler& _PScheduler, ISource<_Type> * _PSource, size_t _Index, ITarget<size_t> * _PTarget = NULL) :
8570  _M_savedId(-1),
8572  {
8573  _Initialize_order_node(_PSource, _Index, _PTarget, &_PScheduler);
8574  }
8575 
8595 
8596  _Reserving_node(Scheduler& _PScheduler, ISource<_Type> * _PSource, size_t _Index, ITarget<size_t> * _PTarget, filter_method const& _Filter) :
8598  _M_savedId(-1),
8600  {
8601  register_filter(_Filter);
8602  _Initialize_order_node(_PSource, _Index, _PTarget, &_PScheduler);
8603  }
8604 
8621 
8622  _Reserving_node(ScheduleGroup& _PScheduleGroup, ISource<_Type> * _PSource, size_t _Index, ITarget<size_t> * _PTarget = NULL) :
8624  _M_savedId(-1),
8626  {
8627  _Initialize_order_node(_PSource, _Index, _PTarget, NULL, &_PScheduleGroup);
8628  }
8629 
8649 
8650  _Reserving_node(ScheduleGroup& _PScheduleGroup, ISource<_Type> * _PSource, size_t _Index, ITarget<size_t> * _PTarget, filter_method const& _Filter) :
8652  _M_savedId(-1),
8654  {
8655  register_filter(_Filter);
8656  _Initialize_order_node(_PSource, _Index, _PTarget, NULL, &_PScheduleGroup);
8657  }
8658 
8662 
8664  {
8665  if (_M_pReservedSource != NULL)
8666  {
8669  }
8670 
8671  // Remove all links
8673  }
8674 
8675 
8682 
8683  virtual void _Reset()
8684  {
8685  }
8686 
8687 protected:
8688 
8689  //
8690  // propagator_block protected function implementation
8691  //
8692 
8712 
8714  {
8715  message_status _Result = postponed;
8716 
8717  // _Order_node messaging block can be initialized only once, just like single_assignment.
8718  if (_M_fIsInitialized)
8719  {
8720  return declined;
8721  }
8722 
8723  // Reserve a message on the source until this _Order_node gets the feedback from
8724  // the single_assignment on whether it has been selected.
8725  _M_fIsInitialized = _PSource->reserve(_PMessage->msg_id(), this);
8726 
8727  //
8728  // If message was successfully reserved, set the member variables for
8729  // this messaging block and start the asynchronous propagation task.
8730  //
8731  if (_M_fIsInitialized)
8732  {
8733  _M_savedId = _PMessage->msg_id();
8734  async_send(NULL);
8735  }
8736  else
8737  {
8738  _Result = missed;
8739  }
8740 
8741  return _Result;
8742  }
8743 
8753 
8754  virtual message<size_t> * accept_message(runtime_object_identity _MsgId)
8755  {
8756  // This check is to prevent spoofing and verify that the propagated message is
8757  // the one that is accepted at the end.
8758  if (_M_pSendMessage == NULL || _MsgId != _M_pSendMessage->msg_id())
8759  {
8760  return NULL;
8761  }
8762 
8763  // If the source has disconnected then we can't allow for accept to succeed.
8765  ISource<_Type>* _PSource = *_Iter;
8766 
8767  if (_PSource == NULL)
8768  {
8769  // source was disconnected. Fail accept.
8770  return NULL;
8771  }
8772 
8773  _M_pReceiveMessage = _PSource->consume(_M_savedId, this);
8774 
8776 
8777  //
8778  // Instead of returning the internal message, we return a copy of the
8779  // message passed in.
8780  //
8781  // Because we are returning a copy, the accept routine for a _Order_node
8782  // does not need to grab the internal lock.
8783  //
8784  return (new message<size_t>(_M_pSendMessage->payload));
8785  }
8786 
8796 
8798  {
8799  if (_M_pSendMessage == NULL)
8800  {
8802  }
8803 
8804  for (target_iterator _Iter = _M_connectedTargets.begin(); *_Iter != NULL; ++_Iter)
8805  {
8806  ITarget<size_t> * _PTarget = *_Iter;
8807  _Propagate_to_target(_PTarget);
8808  }
8809  }
8810 
8811 private:
8812 
8816 
8818  {
8819  message_status _Status = _PTarget->propagate(_M_pSendMessage, this);
8820 
8821  // If the message got rejected we have to release the hold on the source message.
8822  if (_Status != accepted)
8823  {
8824  if (_M_savedId != -1)
8825  {
8826  // Release the reservation
8828  ISource<_Type> * _PSource = *_Iter;
8829 
8830  if (_PSource != NULL)
8831  {
8832  _PSource->release(_M_savedId, this);
8833  }
8834 
8835  // If the source was disconnected, then it would
8836  // automatically release any reservation. So we
8837  // should reset our savedId regardless.
8838  _M_savedId = -1;
8839  }
8840 
8841  }
8842 
8843  return _Status;
8844  }
8845 
8846  //
8847  // Private Data Members
8848  //
8849 
8850  // The source where we have reserved a message
8852 
8853  // For greedy order-nodes, the message ID of subsequent messages sent to this node
8854  // For non-greedy order nodes, the message ID of the message to reserve/consume
8855  runtime_object_identity _M_savedId;
8856 
8857  // The marker that indicates that _Reserving_node has reserved a message
8858  volatile bool _M_fIsInitialized;
8859 
8860 private:
8861  //
8862  // Hide assignment operator and copy constructor
8863  //
8864  _Reserving_node const & operator=(_Reserving_node const &); // no assignment operator
8865  _Reserving_node(_Reserving_node const &); // no copy constructor
8866 };
8867 
8868 
8877 
8878 template<class _Type>
8879 class _Greedy_node: public _Order_node_base<_Type>
8880 {
8881 public:
8895 
8896  _Greedy_node(ISource<_Type> * _PSource, size_t _Index, ITarget<size_t> * _PTarget = NULL) :
8897  _M_savedId(-1),
8899  {
8900  _Initialize_order_node(_PSource, _Index, _PTarget);
8901  }
8902 
8919 
8920  _Greedy_node(ISource<_Type> * _PSource, size_t _Index, ITarget<size_t> * _PTarget, filter_method const& _Filter) :
8921  _M_savedId(-1),
8923  {
8924  register_filter(_Filter);
8925  _Initialize_order_node(_PSource, _Index, _PTarget);
8926  }
8927 
8944 
8945  _Greedy_node(Scheduler& _PScheduler, ISource<_Type> * _PSource, size_t _Index, ITarget<size_t> * _PTarget = NULL) :
8946  _M_savedId(-1),
8948  {
8949  _Initialize_order_node(_PSource, _Index, _PTarget, &_PScheduler);
8950  }
8951 
8971 
8972  _Greedy_node(Scheduler& _PScheduler, ISource<_Type> * _PSource, size_t _Index, ITarget<size_t> * _PTarget, filter_method const& _Filter) :
8973  _M_savedId(-1),
8975  {
8976  register_filter(_Filter);
8977  _Initialize_order_node(_PSource, _Index, _PTarget, &_PScheduler);
8978  }
8979 
8996 
8997  _Greedy_node(ScheduleGroup& _PScheduleGroup, ISource<_Type> * _PSource, size_t _Index, ITarget<size_t> * _PTarget = NULL) :
8998  _M_savedId(-1),
9000  {
9001  _Initialize_order_node(_PSource, _Index, _PTarget, NULL, &_PScheduleGroup);
9002  }
9003 
9023 
9024  _Greedy_node(ScheduleGroup& _PScheduleGroup, ISource<_Type> * _PSource, size_t _Index, ITarget<size_t> * _PTarget, filter_method const& _Filter) :
9025  _M_savedId(-1),
9027  {
9028  register_filter(_Filter);
9029  _Initialize_order_node(_PSource, _Index, _PTarget, NULL, &_PScheduleGroup);
9030  }
9031 
9035 
9037  {
9038  // Remove all links
9040 
9042  {
9043  delete _M_pGreedyMessage;
9044  }
9045  }
9046 
9054 
9055  void _Reset()
9056  {
9057  _R_lock _Lock(_M_resetLock);
9058 
9059  delete _M_pReceiveMessage;
9061 
9062  delete _M_pSendMessage;
9064 
9065  //
9066  // For greedy type joins, look to see if any other messages have been
9067  // passed to this _Greedy_node while the join was waiting for other
9068  // messages to arrive. This function is already called with _M_resetLock
9069  // held through propagate_to_any_targets().
9070  //
9071  for(;;)
9072  {
9073  // Set the current saved ID as -1. Check to see if something was ready for consumption
9074  // (if _Saved_id != -1) and consume it if possible.
9075  runtime_object_identity _Saved_id;
9076 
9077  {
9078  _NR_lock lockHolder(_M_propagationLock);
9079 
9080  _Saved_id = _M_savedId;
9081 
9082  if (_Saved_id == -1)
9083  {
9085  break;
9086  }
9087  else
9088  {
9089  _M_savedId = -1;
9090  }
9091  }
9092 
9093  if (_Saved_id != -1)
9094  {
9096 
9097  ISource<_Type> * _PSource = *_Iter;
9098  if ((_PSource != NULL) && _PSource->reserve(_Saved_id, this))
9099  {
9100  _M_pGreedyMessage = _PSource->consume(_Saved_id, this);
9101  async_send(NULL);
9102  break;
9103  }
9104  }
9105  }
9106  }
9107 
9108 protected:
9109 
9110  //
9111  // propagator_block protected function implementation
9112  //
9113 
9133 
9135  {
9136  message_status _Result = postponed;
9137 
9138  bool _FDone = false;
9139 
9140  {
9141  _NR_lock lockHolder(_M_propagationLock);
9142  if (_M_pGreedyMessage != NULL)
9143  {
9144  _M_savedId = _PMessage->msg_id();
9145  _Result = postponed;
9146  _FDone = true;
9147  }
9148  }
9149 
9150  if (!_FDone)
9151  {
9152  _M_pGreedyMessage = _PSource->accept(_PMessage->msg_id(), this);
9153 
9154  if (_M_pGreedyMessage != NULL)
9155  {
9156  _Result = accepted;
9157  async_send(NULL);
9158  }
9159  else
9160  {
9161  _Result = missed;
9162  }
9163  }
9164 
9165  return _Result;
9166  }
9167 
9177 
9178  virtual message<size_t> * accept_message(runtime_object_identity _MsgId)
9179  {
9180  // This check is to prevent spoofing and verify that the propagated message is
9181  // the one that is accepted at the end.
9182  if (_M_pSendMessage == NULL || _MsgId != _M_pSendMessage->msg_id())
9183  {
9184  return NULL;
9185  }
9186 
9187  //
9188  // Instead of returning the internal message, we return a copy of the
9189  // message passed in.
9190  //
9191  // Because we are returning a copy, the accept routine for a _Greedy_node
9192  // does not need to grab the internal lock.
9193  //
9194  return (new message<size_t>(_M_pSendMessage->payload));
9195  }
9196 
9197 
9207 
9209  {
9210  _R_lock _Lock(_M_resetLock);
9211 
9212  if (_M_pSendMessage == NULL)
9213  {
9214  // Save the incoming message so that it can be consumed in the accept function
9217  }
9218 
9219  for (target_iterator _Iter = _M_connectedTargets.begin(); *_Iter != NULL; ++_Iter)
9220  {
9221  ITarget<size_t> * _PTarget = *_Iter;
9222  _PTarget->propagate(_M_pSendMessage, this);
9223  }
9224  }
9225 
9226 private:
9227 
9228  //
9229  // Private Data Members
9230  //
9231 
9232  // The message to be saved by a greedy order node
9234 
9235  // The lock used to protect propagation
9237 
9238  // The lock used to protect modification during a reset
9240 
9241  // For greedy order-nodes, the message ID of subsequent messages sent to this node
9242  // For non-greedy order nodes, the message ID of the message to reserve/consume
9243  runtime_object_identity _M_savedId;
9244 
9245 private:
9246  //
9247  // Hide assignment operator and copy constructor
9248  //
9249  _Greedy_node const & operator=(_Greedy_node const &); // no assignment operator
9250  _Greedy_node(_Greedy_node const &); // no copy constructor
9251 };
9252 
9253 
9262 
9263 template<class _Type>
9265 {
9266 public:
9280 
9281  _Non_greedy_node(ISource<_Type> * _PSource, size_t _Index, ITarget<size_t> * _PTarget = NULL) :
9282  _M_savedId(-1),
9283  _M_reservedId(-1),
9285  {
9286  _Initialize_order_node(_PSource, _Index, _PTarget);
9287  }
9288 
9305 
9306  _Non_greedy_node(ISource<_Type> * _PSource, size_t _Index, ITarget<size_t> * _PTarget, filter_method const& _Filter) :
9307  _M_savedId(-1),
9308  _M_reservedId(-1),
9310  {
9311  register_filter(_Filter);
9312  _Initialize_order_node(_PSource, _Index, _PTarget);
9313  }
9314 
9331 
9332  _Non_greedy_node(Scheduler& _PScheduler, ISource<_Type> * _PSource, size_t _Index, ITarget<size_t> * _PTarget = NULL) :
9333  _M_savedId(-1),
9334  _M_reservedId(-1),
9336  {
9337  _Initialize_order_node(_PSource, _Index, _PTarget, &_PScheduler);
9338  }
9339 
9359 
9360  _Non_greedy_node(Scheduler& _PScheduler, ISource<_Type> * _PSource, size_t _Index, ITarget<size_t> * _PTarget, filter_method const& _Filter) :
9361  _M_savedId(-1),
9362  _M_reservedId(-1),
9364  {
9365  register_filter(_Filter);
9366  _Initialize_order_node(_PSource, _Index, _PTarget, &_PScheduler);
9367  }
9368 
9385 
9386  _Non_greedy_node(ScheduleGroup& _PScheduleGroup, ISource<_Type> * _PSource, size_t _Index, ITarget<size_t> * _PTarget = NULL) :
9387  _M_savedId(-1),
9388  _M_reservedId(-1),
9390  {
9391  _Initialize_order_node(_PSource, _Index, _PTarget, NULL, &_PScheduleGroup);
9392  }
9393 
9413 
9414  _Non_greedy_node(ScheduleGroup& _PScheduleGroup, ISource<_Type> * _PSource, size_t _Index, ITarget<size_t> * _PTarget, filter_method const& _Filter) :
9415  _M_savedId(-1),
9416  _M_reservedId(-1),
9418  {
9419  register_filter(_Filter);
9420  _Initialize_order_node(_PSource, _Index, _PTarget, NULL, &_PScheduleGroup);
9421  }
9422 
9426 
9428  {
9429  if (_M_pReservedSource != NULL)
9430  {
9433  }
9434 
9435  // Remove all links
9437  }
9438 
9446 
9447  void _Reset()
9448  {
9449  _R_lock _Lock(_M_resetLock);
9450 
9451  delete _M_pReceiveMessage;
9453 
9454  delete _M_pSendMessage;
9456  }
9457 
9465 
9467  {
9468  bool _Ret_val = false;
9469 
9470  // Order node has only a single source.
9471  // Obtain an iterator to the first source. It will guarantee that the reference
9472  // count on the source is maintained
9474  ISource<_Type> * _PSource = *_Iter;
9475 
9476  if (_PSource != NULL)
9477  {
9478  // CAS out the current saved ID, in order to try and reserve it
9479  runtime_object_identity _SavedId = _InterlockedExchange((volatile long *) &_M_savedId, -1);
9480 
9481  _Ret_val = _PSource->reserve(_SavedId, this);
9482  //
9483  // If this reserved failed, that means we need to wait for another message
9484  // to come in on this link. _M_savedID was set to -1 to indicate to the _Order_node
9485  // that it needs to async_send when that next message comes through
9486  //
9487  // If the reserve succeeds, save away the reserved ID. This will be use later in
9488  // consume
9489  //
9490  if (_Ret_val)
9491  {
9492  _M_reservedId = _SavedId;
9493 
9494  // Acquire a reference on the source
9496  _M_pReservedSource = _PSource;
9497  }
9498  }
9499 
9500  return _Ret_val;
9501  }
9502 
9507 
9509  {
9510  if (_M_pReservedSource != NULL)
9511  {
9512  runtime_object_identity _SavedId = _M_reservedId;
9513  _M_pReceiveMessage = _M_pReservedSource->consume(_SavedId, this);
9514 
9515  runtime_object_identity _OldId = NULL;
9516  _OldId = _InterlockedExchange((volatile long *) &_M_reservedId, -1);
9517 
9518  _CONCRT_ASSERT(_OldId == _SavedId);
9519 
9520  // Release the reference on the source
9523  }
9524  }
9525 
9529 
9531  {
9532  bool retVal = false;
9533 
9534  if (_M_pReservedSource != NULL)
9535  {
9536  runtime_object_identity _SavedId = _M_reservedId;
9537  // If the _M_savedId is still -1, then swap the succeeded one back
9538  _M_pReservedSource->release(_SavedId, this);
9539 
9540  if (_InterlockedCompareExchange((volatile long *) &_M_savedId, _SavedId, -1) == -1)
9541  {
9542  retVal = true;
9543  }
9544 
9545  // Release the reference on the source
9548  }
9549 
9550  return retVal;
9551  }
9552 
9553 protected:
9554 
9555  //
9556  // propagator_block protected function implementation
9557  //
9558 
9578 
9580  {
9581  // Change the message ID. If it was -1, that means an async-send needs to occur
9582  if (_InterlockedExchange((volatile long *) &_M_savedId, _PMessage->msg_id()) == -1)
9583  {
9584  async_send(NULL);
9585  }
9586 
9587  // Always return postponed. This message will be consumed
9588  // in the LWT
9589 
9590  return postponed;
9591  }
9592 
9602 
9603  virtual message<size_t> * accept_message(runtime_object_identity _MsgId)
9604  {
9605  // This check is to prevent spoofing and verify that the propagated message is
9606  // the one that is accepted at the end.
9607  if (_M_pSendMessage == NULL || _MsgId != _M_pSendMessage->msg_id())
9608  {
9609  return NULL;
9610  }
9611 
9612  //
9613  // Instead of returning the internal message, we return a copy of the
9614  // message passed in.
9615  //
9616  // Because we are returning a copy, the accept routine for a _Non_greedy_node
9617  // does not need to grab the internal lock.
9618  //
9619  return (new message<size_t>(_M_pSendMessage->payload));
9620  }
9621 
9631 
9633  {
9634  _R_lock _Lock(_M_resetLock);
9635 
9636  if (_M_pSendMessage == NULL)
9637  {
9639  }
9640 
9641  for (target_iterator _Iter = _M_connectedTargets.begin(); *_Iter != NULL; ++_Iter)
9642  {
9643  ITarget<size_t> * _PTarget = *_Iter;
9644  _PTarget->propagate(_M_pSendMessage, this);
9645  }
9646  }
9647 
9648 private:
9649 
9650  //
9651  // Private Data Members
9652  //
9653 
9654  // The source where we have reserved a message
9656 
9657  // The lock used to protect modification during a reset
9659 
9660  // For non-greedy order nodes, the message ID of the message to reserve/consume
9661  runtime_object_identity _M_savedId;
9662 
9663  // For non-greedy order nodes, the reserved ID of the message that was reserved
9664  runtime_object_identity _M_reservedId;
9665 
9666  // The marker that indicates that _Non_greedy_node has reserved a message
9667  volatile bool _M_fIsInitialized;
9668 
9669 private:
9670  //
9671  // Hide assignment operator and copy constructor
9672  //
9673  _Non_greedy_node const & operator=(_Non_greedy_node const &); // no assignment operator
9674  _Non_greedy_node(_Non_greedy_node const &); // no copy constructor
9675 };
9676 
9677 //**************************************************************************
9678 // Choice:
9679 //**************************************************************************
9680 
9697 
9698 template<class _Type>
9699 class choice: public ISource<size_t>
9700 {
9701 public:
9702 
9722 
9724  {
9726  _Initialize_choices<0>();
9727  }
9728 
9729 #ifdef _CRT_USE_WINAPI_FAMILY_DESKTOP_APP
9730 
9753  choice(Scheduler& _PScheduler, _Type _Tuple) : _M_sourceTuple(_Tuple), _M_pScheduler(&_PScheduler), _M_pScheduleGroup(NULL)
9754  {
9756  _Initialize_choices<0>();
9757  }
9758 
9782 
9783  choice(ScheduleGroup& _PScheduleGroup, _Type _Tuple) : _M_sourceTuple(_Tuple), _M_pScheduler(NULL), _M_pScheduleGroup(&_PScheduleGroup)
9784  {
9785  _M_pSingleAssignment = new single_assignment<size_t>(_PScheduleGroup);
9786  _Initialize_choices<0>();
9787  }
9788 #endif /* _CRT_USE_WINAPI_FAMILY_DESKTOP_APP */
9789 
9810 
9811  choice(choice && _Choice)
9812  {
9813  // Copy scheduler group or scheduler to the new object.
9814  _M_pScheduleGroup = _Choice._M_pScheduleGroup;
9815  _M_pScheduler = _Choice._M_pScheduler;
9816 
9817  // Single assignment is heap allocated, so simply copy the pointer. If it already has
9818  // a value, it will be preserved.
9819  _M_pSingleAssignment = _Choice._M_pSingleAssignment;
9820  _Choice._M_pSingleAssignment = NULL;
9821 
9822  // Invoke copy assignment for tuple to copy pointers to message blocks.
9823  _M_sourceTuple = _Choice._M_sourceTuple;
9824 
9825  // Copy the pointers to order nodes to a new object and zero out in the old object.
9826  memcpy(_M_pSourceChoices, _Choice._M_pSourceChoices, sizeof(_M_pSourceChoices));
9827  memset(_Choice._M_pSourceChoices, 0, sizeof(_M_pSourceChoices));
9828  }
9829 
9833 
9835  {
9836  delete _M_pSingleAssignment;
9837  _Delete_choices<0>();
9838  }
9839 
9843 
9844  typedef typename _Type type;
9845 
9852 
9853  bool has_value() const
9854  {
9855  return _M_pSingleAssignment->has_value();
9856  }
9857 
9868 
9869  size_t index()
9870  {
9871  return _M_pSingleAssignment->value();
9872  }
9873 
9888 
9889  template <typename _Payload_type>
9890  _Payload_type const & value()
9891  {
9893  }
9894 
9895  //
9896  // ISource public function implementations
9897  //
9898 
9905 
9906  virtual void link_target(_Inout_ ITarget<size_t> * _PTarget)
9907  {
9908  _M_pSingleAssignment->link_target(_PTarget);
9909  }
9910 
9917 
9918  virtual void unlink_target(_Inout_ ITarget<size_t> * _PTarget)
9919  {
9921  }
9922 
9930 
9931  virtual void unlink_targets()
9932  {
9934  }
9935 
9948 
9949  virtual message<size_t> * accept(runtime_object_identity _MsgId, _Inout_ ITarget<size_t> * _PTarget)
9950  {
9951  return _M_pSingleAssignment->accept(_MsgId, _PTarget);
9952  }
9953 
9972 
9973  virtual bool reserve(runtime_object_identity _MsgId, _Inout_ ITarget<size_t> * _PTarget)
9974  {
9975  return _M_pSingleAssignment->reserve(_MsgId, _PTarget);
9976  }
9977 
9995 
9996  virtual message<size_t> * consume(runtime_object_identity _MsgId, _Inout_ ITarget<size_t> * _PTarget)
9997  {
9998  return _M_pSingleAssignment->consume(_MsgId, _PTarget);
9999  }
10000 
10010 
10011  virtual void release(runtime_object_identity _MsgId, _Inout_ ITarget<size_t> * _PTarget)
10012  {
10013  _M_pSingleAssignment->release(_MsgId, _PTarget);
10014  }
10015 
10026 
10027  virtual void acquire_ref(_Inout_ ITarget<size_t> * _PTarget)
10028  {
10029  _M_pSingleAssignment->acquire_ref(_PTarget);
10030  }
10031 
10042 
10043  virtual void release_ref(_Inout_ ITarget<size_t> * _PTarget)
10044  {
10045  _M_pSingleAssignment->release_ref(_PTarget);
10046  }
10047 
10048 private:
10049 
10054 
10055  template<int _Index>
10057  {
10058  std::tr1::tuple_element<_Index, _Type>::type _Item = std::tr1::get<_Index>(_M_sourceTuple);
10060 
10061  if (_M_pScheduleGroup != NULL)
10062  {
10063  _Order_node_element = new _Reserving_node<std::tr1::remove_pointer<std::tr1::tuple_element<_Index, _Type>::type>::type::source_type> (*_M_pScheduleGroup, _Item, _Index);
10064  }
10065  else if (_M_pScheduler != NULL)
10066  {
10067  _Order_node_element = new _Reserving_node<std::tr1::remove_pointer<std::tr1::tuple_element<_Index, _Type>::type>::type::source_type> (*_M_pScheduler, _Item, _Index);
10068  }
10069  else
10070  {
10071  _Order_node_element = new _Reserving_node<std::tr1::remove_pointer<std::tr1::tuple_element<_Index, _Type>::type>::type::source_type> (_Item, _Index);
10072  }
10073 
10074  _M_pSourceChoices[_Index] = _Order_node_element;
10075  _Order_node_element->link_target(_M_pSingleAssignment);
10076  _Initialize_choices<_Index + 1>();
10077  }
10078 
10083 
10084  template<> void _Initialize_choices<std::tr1::tuple_size<_Type>::value>()
10085  {
10086  }
10087 
10092 
10093  template<int _Index>
10095  {
10097  _M_pSourceChoices[_Index] = NULL;
10098  _Delete_choices<_Index + 1>();
10099  }
10100 
10105 
10106  template<> void _Delete_choices<std::tr1::tuple_size<_Type>::value>()
10107  {
10108  }
10109 
10110  // Array of pointers to _Reserving_node elements representing each source
10111  void * _M_pSourceChoices[std::tr1::tuple_size<_Type>::value];
10112 
10113  // Single assignment which chooses between source messaging blocks
10115 
10116  // Tuple of messaging blocks that are sources to this choice
10118 
10119  // The scheduler to propagate messages on
10120  Scheduler * _M_pScheduler;
10121 
10122  // The schedule group to propagate messages on
10123  ScheduleGroup * _M_pScheduleGroup;
10124 
10125 private:
10126  //
10127  // Hide assignment operator
10128  //
10129  choice const &operator =(choice const &); // no assignment operator
10130  choice(choice const &); // no copy constructor
10131 };
10132 
10133 // Templated factory functions that create a choice, three flavors
10134 
10135 #ifdef _CRT_USE_WINAPI_FAMILY_DESKTOP_APP
10136 
10167 template<typename _Type1, typename _Type2, typename... _Types>
10168 choice<std::tuple<_Type1, _Type2, _Types...>>
10169 make_choice(Scheduler& _PScheduler, _Type1 _Item1, _Type2 _Item2, _Types... _Items)
10170 {
10171  return choice<std::tuple<_Type1, _Type2, _Types...>>(_PScheduler, std::make_tuple(_Item1, _Item2, _Items...));
10172 }
10173 
10205 
10206 template<typename _Type1, typename _Type2, typename... _Types>
10207 choice<std::tuple<_Type1, _Type2, _Types...>>
10208 make_choice(ScheduleGroup& _PScheduleGroup, _Type1 _Item1, _Type2 _Item2, _Types... _Items)
10209 {
10210  return choice<std::tuple<_Type1, _Type2, _Types...>>(_PScheduleGroup, std::make_tuple(_Item1, _Item2, _Items...));
10211 }
10212 
10213 #endif /* _CRT_USE_WINAPI_FAMILY_DESKTOP_APP */
10214 
10241 
10242 template<typename _Type1, typename _Type2, typename... _Types>
10243 choice<std::tuple<_Type1, _Type2, _Types...>>
10244 make_choice(_Type1 _Item1, _Type2 _Item2, _Types... _Items)
10245 {
10246  return choice<std::tuple<_Type1, _Type2, _Types...>>(std::make_tuple(_Item1, _Item2, _Items...));
10247 }
10248 
10249 //**************************************************************************
10250 // Join:
10251 //**************************************************************************
10252 
10253 // Template specialization used to unwrap the types from within a tuple.
10254 
10255 
10256 template <typename _Tuple> struct _Unwrap;
10257 
10264 
10265 template <typename... _Types>
10266 struct _Unwrap<std::tuple<_Types...>>
10267 {
10268  typedef std::tuple<typename std::remove_pointer<_Types>::type::source_type...> type;
10269 };
10270 
10281 
10282 template<typename _Type, typename _Destination_type, join_type _Jtype>
10283 class _Join_node: public propagator_block<single_link_registry<ITarget<_Destination_type>>, multi_link_registry<ISource<size_t>>>
10284 {
10285 public:
10286 
10291 
10292  _Join_node() : _M_counter(std::tr1::tuple_size<_Destination_type>::value)
10293  {
10295  }
10296 
10304 
10305  _Join_node(Scheduler& _PScheduler) : _M_counter(std::tr1::tuple_size<_Destination_type>::value)
10306  {
10307  initialize_source_and_target(&_PScheduler);
10308  }
10309 
10317 
10318  _Join_node(ScheduleGroup& _PScheduleGroup) : _M_counter(std::tr1::tuple_size<_Destination_type>::value)
10319  {
10320  initialize_source_and_target(NULL, &_PScheduleGroup);
10321  }
10322 
10326 
10328  {
10329  // Remove all links
10331 
10332  // Clean up any messages left in this message block
10334  }
10335 
10336 protected:
10337 
10352 
10354  {
10355  // This join block is connected to the _Order_node sources, which know not to send
10356  // any more messages until join propagates them further. That is why join can
10357  // always accept the incoming messages.
10358 
10359  _PMessage = _PSource->accept(_PMessage->msg_id(), this);
10360 
10361  //
10362  // Source block created an int message only to notify join that the real
10363  // payload is available. There is no need to keep this message around.
10364  //
10365  _CONCRT_ASSERT(_PMessage != NULL);
10366  delete _PMessage;
10367 
10368  long _Ret_val = _InterlockedDecrement(&_M_counter);
10369 
10370  _CONCRT_ASSERT(_Ret_val >= 0);
10371 
10372  if (_Ret_val == 0)
10373  {
10374  //
10375  // All source messages are now received so join can propagate them further
10376  //
10377  async_send(NULL);
10378  }
10379 
10380  return accepted;
10381  }
10382 
10392 
10393  virtual message<_Destination_type> * accept_message(runtime_object_identity _MsgId)
10394  {
10395  //
10396  // Peek at the head message in the message buffer. If the IDs match
10397  // dequeue and transfer ownership
10398  //
10400 
10401  if (_M_messageBuffer._Is_head(_MsgId))
10402  {
10403  _Msg = _M_messageBuffer._Dequeue();
10404  }
10405 
10406  return _Msg;
10407  }
10408 
10422 
10423  virtual bool reserve_message(runtime_object_identity _MsgId)
10424  {
10425  // Allow reservation if this is the head message
10426  return _M_messageBuffer._Is_head(_MsgId);
10427  }
10428 
10442 
10443  virtual message<_Destination_type> * consume_message(runtime_object_identity _MsgId)
10444  {
10445  // By default, accept the message
10446  return accept_message(_MsgId);
10447  }
10448 
10455 
10456  virtual void release_message(runtime_object_identity _MsgId)
10457  {
10458  // The head message is the one reserved.
10459  if (!_M_messageBuffer._Is_head(_MsgId))
10460  {
10461  throw message_not_found();
10462  }
10463  }
10464 
10468 
10469  virtual void resume_propagation()
10470  {
10471  // If there are any messages in the buffer, propagate them out
10472  if (_M_messageBuffer._Count() > 0)
10473  {
10474  async_send(NULL);
10475  }
10476  }
10477 
10484 
10486  {
10487  // There is only a single target.
10489  }
10490 
10500 
10502  {
10504 
10505  if (_M_counter == 0)
10506  {
10507  bool fIsNonGreedy = (_Jtype == non_greedy);
10508 
10509  if (fIsNonGreedy)
10510  {
10512  {
10513  return;
10514  }
10515  }
10516 
10517  if (!fIsNonGreedy)
10518  {
10519  // Because a greedy join has captured all input, we can reset
10520  // the counter to the total number of inputs
10521  _InterlockedExchange(&_M_counter, std::tr1::tuple_size<_Destination_type>::value);
10522  }
10523 
10524  _Msg = _Create_send_message();
10525  }
10526 
10527  if (_Msg != NULL)
10528  {
10529  _M_messageBuffer._Enqueue(_Msg);
10530 
10531  if (!_M_messageBuffer._Is_head(_Msg->msg_id()))
10532  {
10533  // another message is at the head of the outbound message queue and blocked
10534  // simply return
10535  return;
10536  }
10537  }
10538 
10540  }
10541 
10542 private:
10543 
10553 
10554  template<int _Index>
10555  bool _Try_consume_source_messages(_Destination_type & _Destination_tuple, ISource<size_t> ** _Sources)
10556  {
10558  static_cast<_Non_greedy_node<std::tr1::remove_pointer<std::tr1::tuple_element<_Index, _Type>::type>::type::source_type> *>(_Sources[_Index]);
10559 
10560  // Increment the counter once for each reservation
10562 
10563  if (_Node->_Reserve_received_message())
10564  {
10565  bool _Ret_val = _Try_consume_source_messages<_Index + 1>(_Destination_tuple, _Sources);
10566 
10567  if (_Ret_val)
10568  {
10569  _Node->_Consume_received_message();
10570  }
10571  else
10572  {
10573  if (_Node->_Release_received_message())
10574  {
10575  // If _Release_received_message() restored the ID, decrement the count for that
10576  // restoration
10577  if (_InterlockedDecrement(&_M_counter) == 0)
10578  {
10579  async_send(NULL);
10580  }
10581  }
10582  }
10583 
10584  return _Ret_val;
10585  }
10586 
10587  return false;
10588  }
10589 
10597 
10598  template<> bool _Try_consume_source_messages<std::tr1::tuple_size<_Type>::value>(_Destination_type &, ISource<size_t> **)
10599  {
10600  return true;
10601  }
10602 
10611 
10613  {
10614  _Destination_type _Destination_tuple;
10615 
10616  // Populate the sources buffer
10617  ISource<size_t> * _Sources[std::tr1::tuple_size<_Type>::value];
10618  size_t _Index = 0;
10619 
10620  // Get an iterator which will keep a reference on the connected sources
10622 
10623  while (*_Iter != NULL)
10624  {
10625  ISource<size_t> * _PSource = *_Iter;
10626 
10627  if (_PSource == NULL)
10628  {
10629  // One of the sources disconnected
10630  break;
10631  }
10632 
10633  if (_Index >= std::tr1::tuple_size<_Type>::value)
10634  {
10635  // More sources that we expect
10636  break;
10637  }
10638 
10639  _Sources[_Index] = _PSource;
10640  _Index++;
10641  ++_Iter;
10642  }
10643 
10644  // The order nodes should not have unlinked while the join node is
10645  // active.
10646 
10647  if (_Index != std::tr1::tuple_size<_Type>::value)
10648  {
10649  // On debug build assert to help debugging
10650  _CONCRT_ASSERT(_Index == std::tr1::tuple_size<_Type>::value);
10651  return false;
10652  }
10653 
10654  bool _IsAcquireSuccessful = _Try_consume_source_messages<0>(_Destination_tuple, _Sources);
10655 
10656  return _IsAcquireSuccessful;
10657  }
10658 
10665 
10667  {
10668  message<_Target_type> * _Msg = _MessageBuffer._Peek();
10669 
10670  // If someone has reserved the _Head message, don't propagate anymore
10671  if (_M_pReservedFor != NULL)
10672  {
10673  return;
10674  }
10675 
10676  while (_Msg != NULL)
10677  {
10678  message_status _Status = declined;
10679 
10680  // Always start from the first target that linked
10681  for (target_iterator _Iter = _M_connectedTargets.begin(); *_Iter != NULL; ++_Iter)
10682  {
10683  ITarget<_Target_type> * _PTarget = *_Iter;
10684  _Status = _PTarget->propagate(_Msg, this);
10685 
10686  // Ownership of message changed. Do not propagate this
10687  // message to any other target.
10688  if (_Status == accepted)
10689  {
10690  break;
10691  }
10692 
10693  // If the target just propagated to reserved this message, stop
10694  // propagating it to others
10695  if (_M_pReservedFor != NULL)
10696  {
10697  break;
10698  }
10699  }
10700 
10701  // If status is anything other than accepted, then the head message
10702  // was not propagated out. Thus, nothing after it in the queue can
10703  // be propagated out. Cease propagation.
10704  if (_Status != accepted)
10705  {
10706  break;
10707  }
10708 
10709  // Get the next message
10710  _Msg = _MessageBuffer._Peek();
10711  }
10712  }
10713 
10718 
10720  {
10721  _Destination_type _Destination_tuple;
10722 
10723  // Populate the sources buffer
10724  ISource<size_t> * _Sources[std::tr1::tuple_size<_Type>::value];
10725  size_t _Index = 0;
10726 
10727  // Get an iterator which will keep a reference on the connected sources
10729 
10730  while (*_Iter != NULL)
10731  {
10732  ISource<size_t> * _PSource = *_Iter;
10733 
10734  if (_PSource == NULL)
10735  {
10736  // One of the sources disconnected
10737  break;
10738  }
10739 
10740  // Avoid buffer overrun
10741  if (_Index >= std::tr1::tuple_size<_Type>::value)
10742  {
10743  // More sources that we expect
10744  break;
10745  }
10746 
10747  _Sources[_Index] = *_Iter;
10748  _Index++;
10749  ++_Iter;
10750  }
10751 
10752  // The order nodes should not have unlinked while the join node is
10753  // active.
10754  if (_Index != std::tr1::tuple_size<_Type>::value)
10755  {
10756  // On debug build assert to help debugging
10757  _CONCRT_ASSERT(_Index == std::tr1::tuple_size<_Type>::value);
10758  return NULL;
10759  }
10760 
10761  _Populate_destination_tuple<0>(_Destination_tuple, _Sources);
10762 
10763  return new message<_Destination_type>(_Destination_tuple);
10764  }
10765 
10770 
10772  {
10773  // Delete any messages remaining in the output queue
10774  for (;;)
10775  {
10776  message<_Destination_type> * _Msg = _M_messageBuffer._Dequeue();
10777  if (_Msg == NULL)
10778  {
10779  break;
10780  }
10781  delete _Msg;
10782  }
10783  }
10784 
10788 
10789  template<int _Index>
10790  void _Populate_destination_tuple(_Destination_type & _Destination_tuple, ISource<size_t> ** _Sources)
10791  {
10793  static_cast<_Order_node_base<std::tr1::remove_pointer<std::tr1::tuple_element<_Index, _Type>::type>::type::source_type> *>(_Sources[_Index]);
10794 
10795  std::tr1::get<_Index>(_Destination_tuple) = _Node->value();
10796  _Node->_Reset();
10797 
10798  _Populate_destination_tuple<_Index + 1>(_Destination_tuple, _Sources);
10799  }
10800 
10805 
10806  template<> void _Populate_destination_tuple<std::tr1::tuple_size<_Type>::value>(_Destination_type &, ISource<size_t> **)
10807  {
10808  }
10809 
10810  // A tuple containing a collection of source messaging blocks
10812 
10813  // Counts messages received by sources of this node and is used to trigger propagation to targets
10814  // This value starts at the total number of inputs and counts down to zero. When it reaches zero,
10815  // a join of the inputs is started.
10816  volatile long _M_counter;
10817 
10818  // Buffer to hold outgoing messages
10820 
10821 private:
10822  //
10823  // Hide assignment operator and copy constructor
10824  //
10825  _Join_node(const _Join_node & _Join); // no copy constructor
10826  _Join_node const &operator =(_Join_node const &); // no assignment operator
10827 };
10828 
10849 
10850 template<typename _Type, join_type _Jtype = non_greedy>
10851 class multitype_join: public ISource<typename _Unwrap<_Type>::type>
10852 {
10853 public:
10854 
10856 
10876 
10878  {
10880  _Initialize_joins<0>();
10881  }
10882 
10883 #ifdef _CRT_USE_WINAPI_FAMILY_DESKTOP_APP
10884 
10907  multitype_join(Scheduler& _PScheduler, _Type _Tuple) : _M_sourceTuple(_Tuple), _M_pScheduler(&_PScheduler), _M_pScheduleGroup(NULL)
10908  {
10910  _Initialize_joins<0>();
10911  }
10912 
10936 
10937  multitype_join(ScheduleGroup& _PScheduleGroup, _Type _Tuple) : _M_sourceTuple(_Tuple), _M_pScheduler(NULL), _M_pScheduleGroup(&_PScheduleGroup)
10938  {
10939  _M_pJoinNode = new _Join_node<_Type, _Destination_type, _Jtype>(_PScheduleGroup);
10940  _Initialize_joins<0>();
10941  }
10942 #endif /* _CRT_USE_WINAPI_FAMILY_DESKTOP_APP */
10943 
10964 
10966  {
10967  // Copy scheduler group or scheduler to the new object.
10968  _M_pScheduleGroup = _Join._M_pScheduleGroup;
10969  _M_pScheduler = _Join._M_pScheduler;
10970 
10971  // Single assignment is heap allocated, so simply copy the pointer. If it already has
10972  // a value, it will be preserved.
10973  _M_pJoinNode = _Join._M_pJoinNode;
10974  _Join._M_pJoinNode = NULL;
10975 
10976  // Invoke copy assignment for tuple to copy pointers to message blocks.
10977  _M_sourceTuple = _Join._M_sourceTuple;
10978 
10979  // Copy the pointers to order nodes to a new object and zero out in the old object.
10980  memcpy(_M_pSourceJoins, _Join._M_pSourceJoins, sizeof(_M_pSourceJoins));
10981  memset(_Join._M_pSourceJoins, 0, sizeof(_M_pSourceJoins));
10982  }
10983 
10987 
10989  {
10990  delete _M_pJoinNode;
10991  _Delete_joins<0>();
10992  }
10993 
10997 
10998  typedef typename _Type type;
10999 
11000  //
11001  // ISource public function implementations
11002  //
11003 
11010 
11012  {
11013  _M_pJoinNode->link_target(_PTarget);
11014  }
11015 
11022 
11024  {
11025  _M_pJoinNode->unlink_target(_PTarget);
11026  }
11027 
11031 
11032  virtual void unlink_targets()
11033  {
11034  _M_pJoinNode->unlink_targets();
11035  }
11036 
11049 
11050  virtual message<_Destination_type> * accept(runtime_object_identity _MsgId, _Inout_ ITarget<_Destination_type> * _PTarget)
11051  {
11052  return _M_pJoinNode->accept(_MsgId, _PTarget);
11053  }
11054 
11073 
11074  virtual bool reserve(runtime_object_identity _MsgId, _Inout_ ITarget<_Destination_type> * _PTarget)
11075  {
11076  return _M_pJoinNode->reserve(_MsgId, _PTarget);
11077  }
11078 
11096 
11097  virtual message<_Destination_type> * consume(runtime_object_identity _MsgId, _Inout_ ITarget<_Destination_type> * _PTarget)
11098  {
11099  return _M_pJoinNode->consume(_MsgId, _PTarget);
11100  }
11101 
11111 
11112  virtual void release(runtime_object_identity _MsgId, _Inout_ ITarget<_Destination_type> * _PTarget)
11113  {
11114  _M_pJoinNode->release(_MsgId, _PTarget);
11115  }
11116 
11127 
11129  {
11130  _M_pJoinNode->acquire_ref(_PTarget);
11131  }
11132 
11143 
11145  {
11146  _M_pJoinNode->release_ref(_PTarget);
11147  }
11148 
11149 private:
11150 
11157 
11158  template<int _Index>
11160  {
11161  std::tr1::tuple_element<_Index, _Type>::type _Item = std::tr1::get<_Index>(_M_sourceTuple);
11163 
11164  bool fIsNonGreedy = (_Jtype == non_greedy);
11165 
11166  if (fIsNonGreedy)
11167  {
11168  if (_M_pScheduleGroup != NULL)
11169  {
11170  _Order_node_element = new _Non_greedy_node<std::tr1::remove_pointer<std::tr1::tuple_element<_Index, _Type>::type>::type::source_type> (*_M_pScheduleGroup, _Item, _Index);
11171  }
11172  else if (_M_pScheduler != NULL)
11173  {
11174  _Order_node_element = new _Non_greedy_node<std::tr1::remove_pointer<std::tr1::tuple_element<_Index, _Type>::type>::type::source_type> (*_M_pScheduler, _Item, _Index);
11175  }
11176  else
11177  {
11178  _Order_node_element = new _Non_greedy_node<std::tr1::remove_pointer<std::tr1::tuple_element<_Index, _Type>::type>::type::source_type> (_Item, _Index);
11179  }
11180  }
11181  else
11182  {
11183  if (_M_pScheduleGroup != NULL)
11184  {
11185  _Order_node_element = new _Greedy_node<std::tr1::remove_pointer<std::tr1::tuple_element<_Index, _Type>::type>::type::source_type> (*_M_pScheduleGroup, _Item, _Index);
11186  }
11187  else if (_M_pScheduler != NULL)
11188  {
11189  _Order_node_element = new _Greedy_node<std::tr1::remove_pointer<std::tr1::tuple_element<_Index, _Type>::type>::type::source_type> (*_M_pScheduler, _Item, _Index);
11190  }
11191  else
11192  {
11193  _Order_node_element = new _Greedy_node<std::tr1::remove_pointer<std::tr1::tuple_element<_Index, _Type>::type>::type::source_type> (_Item, _Index);
11194  }
11195  }
11196  _M_pSourceJoins[_Index] = _Order_node_element;
11197  _Order_node_element->link_target(_M_pJoinNode);
11198  _Initialize_joins<_Index + 1>();
11199  }
11200 
11205 
11206  template<> void _Initialize_joins<std::tr1::tuple_size<_Type>::value>()
11207  {
11208  }
11209 
11216 
11217  template<int _Index>
11219  {
11221  _M_pSourceJoins[_Index] = NULL;
11222  _Delete_joins<_Index + 1>();
11223  }
11224 
11229 
11230  template<> void _Delete_joins<std::tr1::tuple_size<_Type>::value>()
11231  {
11232  }
11233 
11234  // Array of pointers to _Order_node elements representing each source
11235  void * _M_pSourceJoins[std::tr1::tuple_size<_Type>::value];
11236 
11237  // Join node that collects source messaging block messages
11239 
11240  // Tuple of messaging blocks that are sources to this multitype_join
11242 
11243  // The scheduler to propagate messages on
11244  Scheduler * _M_pScheduler;
11245 
11246  // The schedule group to propagate messages on
11247  ScheduleGroup * _M_pScheduleGroup;
11248 
11249 private:
11250  //
11251  // Hide assignment operator
11252  //
11253  multitype_join const &operator =(multitype_join const &); // no assignment operator
11254  multitype_join(multitype_join const &); // no copy constructor
11255 };
11256 
11257 // Templated factory functions that create a join, three flavors
11258 
11259 #ifdef _CRT_USE_WINAPI_FAMILY_DESKTOP_APP
11260 
11291 template<typename _Type1, typename _Type2, typename... _Types>
11292 multitype_join<std::tuple<_Type1, _Type2, _Types...>>
11293 make_join(Scheduler& _PScheduler, _Type1 _Item1, _Type2 _Item2, _Types... _Items)
11294 {
11295  return multitype_join<std::tuple<_Type1, _Type2, _Types...>>(_PScheduler, std::make_tuple(_Item1, _Item2, _Items...));
11296 }
11297 
11329 
11330 template<typename _Type1, typename _Type2, typename... _Types>
11331 multitype_join<std::tuple<_Type1, _Type2, _Types...>>
11332 make_join(ScheduleGroup& _PScheduleGroup, _Type1 _Item1, _Type2 _Item2, _Types... _Items)
11333 {
11334  return multitype_join<std::tuple<_Type1, _Type2, _Types...>>(_PScheduleGroup, std::make_tuple(_Item1, _Item2, _Items...));
11335 }
11336 
11337 #endif /* _CRT_USE_WINAPI_FAMILY_DESKTOP_APP */
11338 
11365 
11366 template<typename _Type1, typename _Type2, typename... _Types>
11367 multitype_join<std::tuple<_Type1, _Type2, _Types...>>
11368 make_join(_Type1 _Item1, _Type2 _Item2, _Types... _Items)
11369 {
11370  return multitype_join<std::tuple<_Type1, _Type2, _Types...>>(std::make_tuple(_Item1, _Item2, _Items...));
11371 }
11372 
11373 // Templated factory functions that create a *greedy* join, three flavors
11374 #ifdef _CRT_USE_WINAPI_FAMILY_DESKTOP_APP
11375 
11407 template<typename _Type1, typename _Type2, typename... _Types>
11408 multitype_join<std::tuple<_Type1, _Type2, _Types...>, greedy>
11409 make_greedy_join(Scheduler& _PScheduler, _Type1 _Item1, _Type2 _Item2, _Types... _Items)
11410 {
11411  return multitype_join<std::tuple<_Type1, _Type2, _Types...>, greedy>(_PScheduler, std::make_tuple(_Item1, _Item2, _Items...));
11412 }
11413 
11445 
11446 template<typename _Type1, typename _Type2, typename... _Types>
11447 multitype_join<std::tuple<_Type1, _Type2, _Types...>, greedy>
11448 make_greedy_join(ScheduleGroup& _PScheduleGroup, _Type1 _Item1, _Type2 _Item2, _Types... _Items)
11449 {
11450  return multitype_join<std::tuple<_Type1, _Type2, _Types...>, greedy>(_PScheduleGroup, std::make_tuple(_Item1, _Item2, _Items...));
11451 }
11452 
11453 #endif /* _CRT_USE_WINAPI_FAMILY_DESKTOP_APP */
11454 
11481 
11482 template<typename _Type1, typename _Type2, typename... _Types>
11483 multitype_join<std::tuple<_Type1, _Type2, _Types...>, greedy>
11484 make_greedy_join(_Type1 _Item1, _Type2 _Item2, _Types... _Items)
11485 {
11486  return multitype_join<std::tuple<_Type1, _Type2, _Types...>, greedy>(std::make_tuple(_Item1, _Item2, _Items...));
11487 }
11488 
11489 //**************************************************************************
11490 // Agents:
11491 //**************************************************************************
11492 
11499 
11504 
11509 
11514 
11519 
11524 
11526 };
11527 
11535 
11536 class agent
11537 {
11538 public:
11548 
11549  _CRTIMP2 agent();
11550 
11551 #ifdef _CRT_USE_WINAPI_FAMILY_DESKTOP_APP
11552 
11565  _CRTIMP2 agent(Scheduler& _PScheduler);
11566 
11580 
11581  _CRTIMP2 agent(ScheduleGroup& _PGroup);
11582 #endif /* _CRT_USE_WINAPI_FAMILY_DESKTOP_APP */
11583 
11592 
11593  _CRTIMP2 virtual ~agent();
11594 
11601 
11603 
11612 
11614 
11622 
11623  _CRTIMP2 bool start();
11624 
11633 
11634  _CRTIMP2 bool cancel();
11635 
11658 
11659  _CRTIMP2 static agent_status __cdecl wait(_Inout_ agent * _PAgent, unsigned int _Timeout = COOPERATIVE_TIMEOUT_INFINITE);
11660 
11686 
11687  _CRTIMP2 static void __cdecl wait_for_all(size_t _Count, _In_reads_(_Count) agent ** _PAgents,
11688  _Out_writes_opt_(_Count) agent_status * _PStatus = NULL, unsigned int _Timeout = COOPERATIVE_TIMEOUT_INFINITE);
11689 
11717 
11718  _CRTIMP2 static void __cdecl wait_for_one(size_t _Count, _In_reads_(_Count) agent ** _PAgents, agent_status& _Status,
11719  size_t& _Index, unsigned int _Timeout = COOPERATIVE_TIMEOUT_INFINITE);
11720 
11721 protected:
11731 
11732  virtual void run() = 0;
11733 
11746 
11747  _CRTIMP2 bool done();
11748 
11752 
11754 
11755 private:
11756 
11757  // A flag to check of whether the agent can be started
11758  // This is initialized to TRUE and there is a race between Start() and Cancel() to set it
11759  // to FALSE. Once Started or Canceled, further calls to Start() or Cancel() will return false.
11760 
11761  volatile long _M_fStartable;
11762 
11763  // A flag to check of whether the agent can be canceled
11764  // This is initailized to TRUE and there is a race between Cancel() and the LWT executing
11765  // a task that has been started to set it to FALSE. If Cancel() wins, the task will not be
11766  // executed. If the LWT wins, Cancel() will return false.
11767 
11768  volatile long _M_fCancelable;
11769 
11770  // A static wrapper function that calls the Run() method. Used for scheduling of the task
11771 
11772  static void __cdecl _Agent_task_wrapper(void * data);
11773 
11774  Scheduler * _M_pScheduler;
11775  ScheduleGroup * _M_pScheduleGroup;
11776 
11777  //
11778  // Hide assignment operator and copy constructor
11779  //
11780  agent const &operator =(agent const&); // no assignment operator
11781  agent(agent const &); // no copy constructor
11782 };
11783 
11784 //**************************************************************************
11785 // Direct Messaging APIs:
11786 //**************************************************************************
11787 
11811 
11812 template <class _Type>
11813 _Type _Receive_impl(ISource<_Type> * _Src, unsigned int _Timeout, typename ITarget<_Type>::filter_method const* _Filter_proc)
11814 {
11815  // The Blocking Recipient messaging block class is internal to the receive function
11816  class _Blocking_recipient : public ITarget<_Type>
11817  {
11818  public:
11819  // Create an Blocking Recipient
11820  _Blocking_recipient(ISource<_Type> * _PSource,
11821  unsigned int _Timeout = COOPERATIVE_TIMEOUT_INFINITE) :
11822  _M_pFilter(NULL), _M_pConnectedTo(NULL), _M_pMessage(NULL), _M_fState(_NotInitialized), _M_timeout(_Timeout)
11823  {
11824  _Connect(_PSource);
11825  }
11826 
11827  // Create an Blocking Recipient
11828  _Blocking_recipient(ISource<_Type> * _PSource,
11829  filter_method const& _Filter,
11830  unsigned int _Timeout = COOPERATIVE_TIMEOUT_INFINITE) :
11831  _M_pFilter(NULL), _M_pConnectedTo(NULL), _M_pMessage(NULL), _M_fState(_NotInitialized), _M_timeout(_Timeout)
11832  {
11833  if (_Filter != NULL)
11834  {
11835  _M_pFilter = new filter_method(_Filter);
11836  }
11837 
11838  _Connect(_PSource);
11839  }
11840 
11841  // Cleans up any resources that may have been created by the BlockingRecipient.
11842  ~_Blocking_recipient()
11843  {
11844  _Disconnect();
11845 
11846  delete _M_pFilter;
11847  delete _M_pMessage;
11848  }
11849 
11850  // Gets the value of the message sent to this BlockingRecipient. Blocks by
11851  // spinning until a message has arrived.
11852  _Type _Value()
11853  {
11854  _Wait_for_message();
11855 
11856  return _M_pMessage->payload;
11857  }
11858 
11859  // The main propagation function for ITarget blocks. Called by a source
11860  // block, generally within an asynchronous task to send messages to its targets.
11861  virtual message_status propagate(message<_Type> * _PMessage, ISource<_Type> * _PSource)
11862  {
11863  // Throw exception if the message being propagated to this block is NULL
11864  if (_PMessage == NULL)
11865  {
11866  throw std::invalid_argument("_PMessage");
11867  }
11868 
11869  if (_PSource == NULL)
11870  {
11871  throw std::invalid_argument("_PSource");
11872  }
11873 
11874  // Reject if the recipient has already received a message
11875  if (_M_fState == _Initialized)
11876  {
11877  return declined;
11878  }
11879 
11880  // Reject if the message does not meet the filter requirements
11881  if (_M_pFilter != NULL && !(*_M_pFilter)(_PMessage->payload))
11882  {
11883  return declined;
11884  }
11885 
11886  // Accept the message
11887  _CONCRT_ASSERT(_PSource != NULL);
11888  _M_pMessage = _PSource->accept(_PMessage->msg_id(), this);
11889 
11890  if (_M_pMessage != NULL)
11891  {
11892  // Set the initialized flag on this block
11893  if (_InterlockedExchange(&_M_fState, _Initialized) == _Blocked)
11894  {
11895  _M_ev.set();
11896  }
11897 
11898  return accepted;
11899  }
11900 
11901  return missed;
11902  }
11903 
11904  // Synchronously sends a message to this block. When this function completes the message will
11905  // already have propagated into the block.
11906  virtual message_status send(message<_Type> * _PMessage, ISource<_Type> * _PSource)
11907  {
11908  if (_PMessage == NULL)
11909  {
11910  throw std::invalid_argument("_PMessage");
11911  }
11912 
11913  if (_PSource == NULL)
11914  {
11915  throw std::invalid_argument("_PSource");
11916  }
11917 
11918  // Only the connected source is allowed to send messages
11919  // to the blocking recepient. Decline messages without
11920  // a source.
11921 
11922  return declined;
11923  }
11924 
11925  private:
11926 
11927  // Link a source block
11928  virtual void link_source(ISource<_Type> * _PSrc)
11929  {
11930  _M_pConnectedTo = _PSrc;
11931  _PSrc->acquire_ref(this);
11932  }
11933 
11934  // Remove a source messaging block for this BlockingRecipient
11935  virtual void unlink_source(ISource<_Type> * _PSource)
11936  {
11937  if (_InterlockedCompareExchangePointer(reinterpret_cast<void *volatile *>(&_M_pConnectedTo), (void *)NULL, _PSource) == _PSource)
11938  {
11939  _PSource->release_ref(this);
11940  }
11941  }
11942 
11943  // Remove the source messaging block for this BlockingRecipient
11944  virtual void unlink_sources()
11945  {
11946  ISource<_Type> * _PSource = reinterpret_cast<ISource<_Type> *>(_InterlockedExchangePointer(reinterpret_cast<void *volatile *>(&_M_pConnectedTo), (void *)NULL));
11947  if (_PSource != NULL)
11948  {
11949  _PSource->unlink_target(this);
11950  _PSource->release_ref(this);
11951  }
11952  }
11953 
11954 
11955  // Connect the blocking recipient to the source
11956  void _Connect(ISource<_Type> * _PSource)
11957  {
11958  if (_PSource == NULL)
11959  {
11960  throw std::invalid_argument("_PSource");
11961  }
11962 
11963  _PSource->link_target(this);
11964  }
11965 
11966  // Cleanup the connection to the blocking recipient's source. There is no need
11967  // to do anything about the associated context.
11968  void _Disconnect()
11969  {
11970  unlink_sources();
11971  }
11972 
11973  // Internal function used to block while waiting for a message to arrive
11974  // at this BlockingRecipient
11975  void _Wait_for_message()
11976  {
11977  bool _Timeout = false;
11978 
11979  // If we haven't received a message yet, cooperatively block.
11980  if (_InterlockedCompareExchange(&_M_fState, _Blocked, _NotInitialized) == _NotInitialized)
11981  {
11982  if (_M_ev.wait(_M_timeout) == COOPERATIVE_WAIT_TIMEOUT)
11983  {
11984  _Timeout = true;
11985  }
11986  }
11987 
11988  // Unlinking from our source guarantees that there are no threads in propagate
11989  _Disconnect();
11990 
11991  if (_M_fState != _Initialized)
11992  {
11993  // We had to have timed out if we came out of the wait
11994  // without being initialized.
11995  _CONCRT_ASSERT(_Timeout);
11996 
11997  throw operation_timed_out();
11998  }
11999  }
12000 
12001  // States for this block
12002  enum
12003  {
12004  _NotInitialized,
12005  _Blocked,
12006  _Initialized
12007  };
12008 
12009  volatile long _M_fState;
12010 
12011  // The source messaging block connected to this Recipient
12012  ISource<_Type> * _M_pConnectedTo;
12013 
12014  // The message that was received
12015  message<_Type> * volatile _M_pMessage;
12016 
12017  // The timeout.
12018  unsigned int _M_timeout;
12019 
12020  // The event we wait upon
12021  event _M_ev;
12022 
12023  // The filter that is called on this block before accepting a message
12024  filter_method * _M_pFilter;
12025  };
12026 
12027  if (_Filter_proc != NULL)
12028  {
12029  _Blocking_recipient _Recipient(_Src, *_Filter_proc, _Timeout);
12030  return _Recipient._Value();
12031  }
12032  else
12033  {
12034  _Blocking_recipient _Recipient(_Src, _Timeout);
12035  return _Recipient._Value();
12036  }
12037 }
12038 
12066 
12067 template <class _Type>
12069 {
12070  return _Receive_impl(_Src, _Timeout, NULL);
12071 }
12072 
12103 
12104 template <class _Type>
12105 _Type receive(_Inout_ ISource<_Type> * _Src, typename ITarget<_Type>::filter_method const& _Filter_proc, unsigned int _Timeout = COOPERATIVE_TIMEOUT_INFINITE)
12106 {
12107  return _Receive_impl(_Src, _Timeout, &_Filter_proc);
12108 }
12109 
12137 
12138 template <class _Type>
12140 {
12141  return _Receive_impl(&_Src, _Timeout, NULL);
12142 }
12143 
12174 
12175 template <class _Type>
12176 _Type receive(ISource<_Type> &_Src, typename ITarget<_Type>::filter_method const& _Filter_proc, unsigned int _Timeout = COOPERATIVE_TIMEOUT_INFINITE)
12177 {
12178  return _Receive_impl(&_Src, _Timeout, &_Filter_proc);
12179 }
12180 
12202 
12203 template <class _Type>
12204 bool _Try_receive_impl(ISource<_Type> * _Src, _Type & _value, typename ITarget<_Type>::filter_method const * _Filter_proc)
12205 {
12206  // The Immediate Recipient messaging block class is internal to the receive function
12207  class _Immediate_recipient : public ITarget<_Type>
12208  {
12209  public:
12210  // Create an Immediate Recipient
12211  _Immediate_recipient(ISource<_Type> * _PSource) :
12212  _M_pFilter(NULL), _M_pConnectedTo(NULL), _M_pMessage(NULL), _M_isInitialized(0)
12213  {
12214  _Connect(_PSource);
12215  }
12216 
12217  // Create an Immediate Recipient
12218  _Immediate_recipient(ISource<_Type> * _PSource,
12219  filter_method const& _Filter) :
12220  _M_pFilter(NULL), _M_pConnectedTo(NULL), _M_pMessage(NULL), _M_isInitialized(0)
12221  {
12222  if (_Filter != NULL)
12223  {
12224  _M_pFilter = new filter_method(_Filter);
12225  }
12226 
12227  _Connect(_PSource);
12228  }
12229 
12230  // Cleans up any resources that may have been created by the ImmediateRecipient.
12231  ~_Immediate_recipient()
12232  {
12233  _Disconnect();
12234 
12235  delete _M_pFilter;
12236  delete _M_pMessage;
12237  }
12238 
12239  // Gets the value of the message sent to this ImmediateRecipient.
12240  bool _Value(_Type & _value)
12241  {
12242  // Unlinking from our source guarantees that there are no threads in propagate
12243  _Disconnect();
12244 
12245  if (_M_pMessage != NULL)
12246  {
12247  _value = _M_pMessage->payload;
12248  return true;
12249  }
12250 
12251  return false;
12252  }
12253 
12254  // The main propagation function for ITarget blocks. Called by a source
12255  // block, generally within an asynchronous task to send messages to its targets.
12256  virtual message_status propagate(message<_Type> * _PMessage, ISource<_Type> * _PSource)
12257  {
12258  message_status _Result = accepted;
12259 
12260  // Throw exception if the message being propagated to this block is NULL
12261  if (_PMessage == NULL)
12262  {
12263  throw std::invalid_argument("_PMessage");
12264  }
12265 
12266  if (_PSource == NULL)
12267  {
12268  throw std::invalid_argument("_PSource");
12269  }
12270 
12271  // Reject if the recipient has already received a message
12272  if (_M_isInitialized == 1)
12273  {
12274  return declined;
12275  }
12276 
12277  // Reject if the message does not meet the filter requirements
12278  if (_M_pFilter != NULL && !(*_M_pFilter)(_PMessage->payload))
12279  {
12280  return declined;
12281  }
12282 
12283  // Accept the message
12284  _CONCRT_ASSERT(_PSource != NULL);
12285  _M_pMessage = _PSource->accept(_PMessage->msg_id(), this);
12286 
12287  // Set the initialized flag on this block
12288 
12289  if (_M_pMessage != NULL)
12290  {
12291  // Fence to ensure that the above update to _M_pMessage is visible
12292  _InterlockedExchange(&_M_isInitialized, 1);
12293  _Result = accepted;
12294  }
12295  else
12296  {
12297  _Result = missed;
12298  }
12299 
12300  return _Result;
12301  }
12302 
12303 
12304  // Synchronously sends a message to this block. When this function completes the message will
12305  // already have propagated into the block.
12306  virtual message_status send(message<_Type> * _PMessage, ISource<_Type> * _PSource)
12307  {
12308  if (_PMessage == NULL)
12309  {
12310  throw std::invalid_argument("_PMessage");
12311  }
12312 
12313  if (_PSource == NULL)
12314  {
12315  throw std::invalid_argument("_PSource");
12316  }
12317 
12318  // Only the connected source is allowed to send messages
12319  // to the blocking recepient. Decline messages without
12320  // a source.
12321 
12322  return declined;
12323  }
12324 
12325  private:
12326 
12327  // Add a source messaging block
12328  virtual void link_source(ISource<_Type> * _PSrc)
12329  {
12330  _M_pConnectedTo = _PSrc;
12331  _PSrc->acquire_ref(this);
12332  }
12333 
12334  // Remove a source messaging block for this BlockingRecipient
12335  virtual void unlink_source(ISource<_Type> * _PSource)
12336  {
12337  if (_InterlockedCompareExchangePointer(reinterpret_cast<void *volatile *>(&_M_pConnectedTo), (void *)NULL, _PSource) == _PSource)
12338  {
12339  _PSource->release_ref(this);
12340  }
12341  }
12342 
12343  // Remove the source messaging block for this BlockingRecipient
12344  virtual void unlink_sources()
12345  {
12346  ISource<_Type> * _PSource = reinterpret_cast<ISource<_Type> *>(_InterlockedExchangePointer(reinterpret_cast<void *volatile *>(&_M_pConnectedTo), (void *)NULL));
12347  if (_PSource != NULL)
12348  {
12349  _PSource->unlink_target(this);
12350  _PSource->release_ref(this);
12351  }
12352  }
12353 
12354  // Connect to a source block
12355  void _Connect(ISource<_Type> * _PSource)
12356  {
12357  if (_PSource == NULL)
12358  {
12359  throw std::invalid_argument("_PSource");
12360  }
12361 
12362  _CONCRT_ASSERT(_M_isInitialized == 0);
12363 
12364  _PSource->link_target(this);
12365  }
12366 
12367  //
12368  // Cleanup the connection to the trigger's source. There is no need
12369  // to do anything about the associated context.
12370  //
12371  void _Disconnect()
12372  {
12373  unlink_sources();
12374  }
12375 
12376  // The source messaging block connected to this Recipient
12377  ISource<_Type> * _M_pConnectedTo;
12378 
12379  // The message that was received
12380  message<_Type> * volatile _M_pMessage;
12381 
12382  // A flag for whether or not this block has been initialized with a value
12383  volatile long _M_isInitialized;
12384 
12385  // The filter that is called on this block before accepting a message
12386  filter_method * _M_pFilter;
12387  };
12388 
12389  if (_Filter_proc != NULL)
12390  {
12391  _Immediate_recipient _Recipient(_Src, *_Filter_proc);
12392  return _Recipient._Value(_value);
12393  }
12394  else
12395  {
12396  _Immediate_recipient _Recipient(_Src);
12397  return _Recipient._Value(_value);
12398  }
12399 }
12400 
12424 
12425 template <class _Type>
12427 {
12428  return _Try_receive_impl(_Src, _value, NULL);
12429 }
12430 
12457 
12458 template <class _Type>
12459 bool try_receive(_Inout_ ISource<_Type> * _Src, _Type & _value, typename ITarget<_Type>::filter_method const& _Filter_proc)
12460 {
12461  return _Try_receive_impl(_Src, _value, &_Filter_proc);
12462 }
12463 
12487 
12488 template <class _Type>
12490 {
12491  return _Try_receive_impl(&_Src, _value, NULL);
12492 }
12493 
12520 
12521 template <class _Type>
12522 bool try_receive(ISource<_Type> & _Src, _Type & _value, typename ITarget<_Type>::filter_method const& _Filter_proc)
12523 {
12524  return _Try_receive_impl(&_Src, _value, &_Filter_proc);
12525 }
12526 
12527 namespace details
12528 {
12529  //**************************************************************************
12530  // Supporting blocks for send and asend
12531  //**************************************************************************
12532 
12533  // Originator block that pushes messages to a target
12534  template <class _Type>
12535  class _AnonymousOriginator : public ISource<_Type>
12536  {
12537  public:
12538 
12540 
12541  // Create an Originator
12543  {
12544  }
12545 
12546  // Cleans up any resources that may have been created by the Originator.
12548  {
12549  delete _M_pMessage;
12550  }
12551 
12552  // Removes a target messaging block for this Originator
12553  virtual void unlink_target(ITarget<_Type> * _PTarget)
12554  {
12555  throw invalid_operation("unlink_target is not supported on _AnonymousOriginator");
12556  }
12557 
12558  // Removes the target messaging block from this Originator
12559  virtual void unlink_targets()
12560  {
12561  throw invalid_operation("unlink_targets is not supported on _AnonymousOriginator");
12562  }
12563 
12564  // Accept on this Originator is called by a target to take ownership of a
12565  // propagated message
12566  virtual message<_Type> * accept(runtime_object_identity _MsgId, ITarget<_Type> * _PTarget)
12567  {
12568  if (_PTarget != _M_pTarget)
12569  {
12570  return NULL;
12571  }
12572 
12573  if (_M_pMessage == NULL || _M_pMessage->msg_id() != _MsgId)
12574  {
12575  return NULL;
12576  }
12577 
12578  // The IDs match, actaully transfer ownership of the message and
12579  // unlink away from the target
12580  message<_Type> * _Result = _M_pMessage;
12581 
12582  // The ownership of this message has changed. Set the internal pointer to NULL
12583  // so it won't be deleted in the destructor
12584  _M_pMessage = NULL;
12585 
12586  return _Result;
12587  }
12588 
12589  // Reserve shall not be called by blocks that supports push
12590  virtual bool reserve(runtime_object_identity _MsgId, ITarget<_Type> * _PTarget)
12591  {
12592  throw invalid_operation("reserve is not supported on _AnonymousOriginator");
12593  }
12594 
12595  // Consume shall not be called by blocks that supports push
12596  virtual message<_Type> * consume(runtime_object_identity _MsgId, ITarget<_Type> * _PTarget)
12597  {
12598  throw invalid_operation("consume is not supported on _AnonymousOriginator");
12599  }
12600 
12601  // Release needs to be defined for ISource blocks, but Originator doesn't need to
12602  // do anything for reservation release because there can only be one target block to read
12603  // the data at a later time.
12604  virtual void release(runtime_object_identity _MsgId, ITarget<_Type> * _PTarget)
12605  {
12606  throw invalid_operation("release is not supported on _AnonymousOriginator");
12607  }
12608 
12610  {
12611  throw invalid_operation("acquire_ref is not supported on _AnonymousOriginator");
12612  }
12613 
12615  {
12616  throw invalid_operation("release_ref is not supported on _AnonymousOriginator");
12617  }
12618 
12619  private:
12620  friend class _Originator;
12621 
12622  // Send the given value to the target
12623  bool _internal_send(ITarget<_Type> * _PTarget, _Type const & _Value)
12624  {
12625  _M_pTarget = _PTarget;
12626 
12628  _CONCRT_ASSERT(_M_pTarget->supports_anonymous_source());
12629 
12630  // Create the message
12631  message_status _Status = declined;
12632  message<_Type> * _Msg = new message<_Type>(_Value);
12633 
12635  _M_pMessage = _Msg;
12636 
12637  // Send the message
12638  _Status = _M_pTarget->send(_M_pMessage, this);
12639 
12640  // If the message is declined, the destructor will
12641  // delete the message
12642 
12643  // status should not be postponed.
12644  _CONCRT_ASSERT(_Status != postponed);
12645  return (_Status == accepted);
12646  }
12647 
12648  bool _internal_asend(ITarget<_Type> * _PTarget, _Type const & _Value)
12649  {
12650  _M_pTarget = _PTarget;
12651 
12653  _CONCRT_ASSERT(_M_pTarget->supports_anonymous_source());
12654 
12655  // Create the message
12656  message_status _Status = declined;
12657  message<_Type> * _Msg = new message<_Type>(_Value);
12658 
12660  _M_pMessage = _Msg;
12661 
12662  // Send the message
12663  _Status = _M_pTarget->propagate(_M_pMessage, this);
12664 
12665  // If the message is declined, the destructor will
12666  // delete the message
12667 
12668  // status should not be postponed.
12669  if (_Status == postponed)
12670  {
12671  throw invalid_operation("Messages offered by _AnonymousOriginator shall not be postponed");
12672  }
12673 
12674  return (_Status == accepted);
12675  }
12676 
12677  // Add a target messaging block for this Originator
12678  virtual void link_target(ITarget<_Type> * _PTarget)
12679  {
12680  throw invalid_operation("link_target is not supported on _AnonymousOriginator");
12681  }
12682 
12683  // The message that will be propagated by the Originator
12685 
12686  // The single target for this block
12688  };
12689 
12690  // The Originator messaging block class is internal to the send function.
12691  template <class _Type>
12692  class _SyncOriginator : public ISource<_Type>
12693  {
12694  public:
12695 
12697 
12698  // Create an Originator
12700  _M_pMessage(NULL),
12703  {
12704  }
12705 
12706  // Cleans up any resources that may have been created by the Originator.
12708  {
12709  unlink_targets();
12710 
12711  _Wait_on_ref();
12712 
12713  delete _M_pMessage;
12714  }
12715 
12716  // Removes a target messaging block for this Originator
12717  virtual void unlink_target(ITarget<_Type> * _PTarget)
12718  {
12719  if (_PTarget == NULL)
12720  {
12721  throw std::invalid_argument("_PTarget");
12722  }
12723  {
12724  // Hold the lock to ensure that the target doesn't unlink while
12725  // propagation is in progress.
12726  _R_lock _Lock(_M_internalLock);
12727  if (_M_connectedTargets.remove(_PTarget))
12728  {
12729  _Invoke_unlink_source(_PTarget);
12730 
12731  // Indicate that the send is complete
12732  _Done(declined);
12733  }
12734  }
12735  }
12736 
12737  // Removes the target messaging block from this Originator
12738  virtual void unlink_targets()
12739  {
12740  // Hold the lock to ensure that the target doesn't unlink while
12741  // propagation is in progress.
12742  _R_lock _Lock(_M_internalLock);
12743 
12744  for (_Target_registry::iterator _Iter = _M_connectedTargets.begin(); *_Iter != NULL; ++_Iter)
12745  {
12746  ITarget<_Type> * _PTarget = *_Iter;
12747  if (_M_connectedTargets.remove(_PTarget))
12748  {
12749  _Invoke_unlink_source(_PTarget);
12750  }
12751  }
12752 
12753  // All targets should be unlinked
12755 
12756  // Indicate that the send is complete
12757  _Done(declined);
12758  }
12759 
12760  // Accept on this Originator is called by a target to take ownership of a
12761  // propagated message
12762  virtual message<_Type> * accept(runtime_object_identity _MsgId, ITarget<_Type> * _PTarget)
12763  {
12764  if (_PTarget == NULL)
12765  {
12766  return NULL;
12767  }
12768 
12769  if (!_M_connectedTargets.contains(_PTarget))
12770  {
12771  return NULL;
12772  }
12773 
12774  if (_M_pMessage == NULL || _M_pMessage->msg_id() != _MsgId)
12775  {
12776  return NULL;
12777  }
12778 
12779  // The IDs match, actaully transfer ownership of the message and
12780  // unlink away from the target
12781  message<_Type> * _Result = _M_pMessage;
12782 
12783  // The ownership of this message has changed. Set the internal pointer to NULL
12784  // so it won't be deleted in the destructor
12785  _M_pMessage = NULL;
12786 
12787  // The message has been accepted/consumed, propagate indication that it has succeeded
12788  _Done(accepted);
12789 
12790  return _Result;
12791  }
12792 
12793  // Reserve needs to be defined for ISource blocks, but Originator doesn't need to
12794  // do anything for reservation because there can only be one target block to read
12795  // the data at a later time.
12796  virtual bool reserve(runtime_object_identity _MsgId, ITarget<_Type> * _PTarget)
12797  {
12798  if (_PTarget == NULL)
12799  {
12800  throw std::invalid_argument("_PTarget");
12801  }
12802 
12803  if (!_M_connectedTargets.contains(_PTarget))
12804  {
12805  return false;
12806  }
12807 
12808  if (_M_pMessage->msg_id() != _MsgId)
12809  {
12810  return false;
12811  }
12812 
12813  return true;
12814  }
12815 
12816  // Consume is called by a target messaging block to take ownership of a
12817  // previously reserved message.
12818  virtual message<_Type> * consume(runtime_object_identity _MsgId, ITarget<_Type> * _PTarget)
12819  {
12820  if (_PTarget == NULL)
12821  {
12822  throw std::invalid_argument("_PTarget");
12823  }
12824 
12825  if (!_M_connectedTargets.contains(_PTarget))
12826  {
12827  throw bad_target();
12828  }
12829 
12830  return accept(_MsgId, _PTarget);
12831  }
12832 
12833  // Release needs to be defined for ISource blocks, but Originator doesn't need to
12834  // do anything for reservation release because there can only be one target block to read
12835  // the data at a later time.
12836  virtual void release(runtime_object_identity _MsgId, ITarget<_Type> * _PTarget)
12837  {
12838  if (_PTarget == NULL)
12839  {
12840  throw std::invalid_argument("_PTarget");
12841  }
12842 
12843  if (!_M_connectedTargets.contains(_PTarget))
12844  {
12845  throw bad_target();
12846  }
12847 
12848  if ((_M_pMessage == NULL) || (_M_pMessage->msg_id() != _MsgId))
12849  {
12850  throw message_not_found();
12851  }
12852 
12853  // If the previously reserved message is released, then propagate
12854  // declined to indicate that the message was not accepted.
12855  _Done(declined);
12856  }
12857 
12859  {
12861  }
12862 
12864  {
12866  }
12867 
12868  private:
12869 
12870  friend class _Originator;
12871 
12872  // Send the given value to the target
12873  bool _internal_send(ITarget<_Type> * _PTarget, _Type const & _Value)
12874  {
12875  // _send should only be called once.
12876  if (_PTarget == NULL)
12877  {
12878  throw std::invalid_argument("_PTarget");
12879  }
12880 
12881  message_status _Status = declined;
12882  message<_Type> * _Msg = new message<_Type>(_Value);
12883 
12884  {
12885  // Hold the lock to ensure that the target doesn't unlink while
12886  // propagation is in progress.
12887  _R_lock _Lock(_M_internalLock);
12888 
12889  // link to the target, create a message and send it
12890  link_target(_PTarget);
12891 
12893  _M_pMessage = _Msg;
12894 
12895  // Send the message synchronously to the target
12896  _Status = _PTarget->send(_M_pMessage, this);
12897  }
12898 
12899  if (_Status == postponed)
12900  {
12901  // If the target postponed the message, wait for it to
12902  // be accepted/declined.
12904 
12905  // Procure the final status
12906  _Status = _M_fStatus;
12907  }
12908 
12909  // status should not be postponed.
12910  _CONCRT_ASSERT(_Status != postponed);
12911 
12912  return (_Status == accepted);
12913  }
12914 
12915  // Add a target messaging block for this Originator
12916  virtual void link_target(ITarget<_Type> * _PTarget)
12917  {
12918  if (_PTarget == NULL)
12919  {
12920  throw std::invalid_argument("_PTarget");
12921  }
12922 
12923  _M_connectedTargets.add(_PTarget);
12924  _Invoke_link_source(_PTarget);
12925 
12926  // There should be no pending messages to propagate at this time.
12928  }
12929 
12930  // Wait for the status to reach one of the terminal
12931  // states (!= postponed)
12933  {
12934  // Wait for the event to be signalled
12937 
12938  }
12939 
12941  {
12943  while(_M_referenceCount != 0)
12944  {
12945  spinWait._SpinOnce();
12946  }
12947  }
12948 
12949  // Indicate that the send operation has completed
12950  void _Done(message_status _Status)
12951  {
12952  // postponed is not a done state
12953  _CONCRT_ASSERT(_Status != postponed);
12954 
12955  _M_fStatus = _Status;
12956  _M_ev.set();
12957  }
12958 
12959  // The message that will be propagated by the Originator
12961 
12962  // Event to indicate completion
12963  event _M_ev;
12964 
12965  // Final status of the send
12967 
12968  // A lock for modifying the buffer or the connected blocks
12970 
12971  // Connected targets
12972  _Target_registry _M_connectedTargets;
12973 
12974  volatile long _M_referenceCount;
12975  };
12976 
12977  // The Originator messaging block class is internal to the send function.
12978  template <class _Type>
12979  class _AsyncOriginator : public ISource<_Type>
12980  {
12981  public:
12982 
12984 
12985  // Cleans up any resources that may have been created by the AsyncOriginator.
12987  {
12988  unlink_targets();
12989 
12990  delete _M_pMessage;
12991  }
12992 
12993  // Removes a target messaging block for this AsyncOriginator
12994  virtual void unlink_target(ITarget<_Type> * _PTarget)
12995  {
12996  if (_PTarget == NULL)
12997  {
12998  throw std::invalid_argument("_PTarget");
12999  }
13000 
13001  bool _Unlinked = false;
13002  {
13003  // Hold the lock to ensure that the target doesn't unlink while
13004  // propagation is in progress.
13005  _R_lock _Lock(_M_internalLock);
13006 
13007  if (_M_connectedTargets.remove(_PTarget))
13008  {
13009  _Invoke_unlink_source(_PTarget);
13010  _Unlinked = true;
13011  }
13012  }
13013 
13014  // Release the lock before decrementing the refcount. Otherwise, the
13015  // lock release could corrupt memory.
13016  if (_Unlinked)
13017  {
13018  _Release_ref();
13019  }
13020  }
13021 
13022  // Removes the target messaging block from this AsyncOriginator
13023  virtual void unlink_targets()
13024  {
13025  bool _Unlinked = false;
13026  {
13027  // Hold the lock to ensure that the target doesn't unlink while
13028  // propagation is in progress.
13029  _R_lock _Lock(_M_internalLock);
13030 
13032  *_Iter != NULL;
13033  ++_Iter)
13034  {
13035  ITarget<_Type> * _PTarget = *_Iter;
13036  if (_M_connectedTargets.remove(_PTarget))
13037  {
13038  _Invoke_unlink_source(_PTarget);
13039  _Unlinked = true;
13040  }
13041 
13042  }
13043 
13044  // All targets should be unlinked
13046  }
13047 
13048  // Release the lock before decrementing the refcount. Otherwise, the
13049  // lock release could corrupt memory.
13050  if (_Unlinked)
13051  {
13052  _Release_ref();
13053  }
13054  }
13055 
13056  // Accept on this AsyncOriginator is called by a target to take ownership of a
13057  // propagated message. This can only be called from propagate.
13058  virtual message<_Type> * accept(runtime_object_identity _MsgId, ITarget<_Type> * _PTarget)
13059  {
13060  if (_PTarget == NULL)
13061  {
13062  return NULL;
13063  }
13064 
13065  if (!_M_connectedTargets.contains(_PTarget))
13066  {
13067  return NULL;
13068  }
13069 
13070  if (_M_pMessage == NULL || _M_pMessage->msg_id() != _MsgId)
13071  {
13072  return NULL;
13073  }
13074 
13075  //
13076  // If the IDs match, actaully transfer ownership of the message.
13077  //
13078  message<_Type> * _Result = _M_pMessage;
13079  _M_pMessage = NULL;
13080 
13081  return _Result;
13082  }
13083 
13084  // Reserve needs to be defined for ISource blocks, but AsyncOriginator doesn't need to
13085  // do anything for reservation because there can only be one target block to read
13086  // the data at a later time.
13087  virtual bool reserve(runtime_object_identity _MsgId, ITarget<_Type> * _PTarget)
13088  {
13089  if (_PTarget == NULL)
13090  {
13091  throw std::invalid_argument("_PTarget");
13092  }
13093 
13094  if (!_M_connectedTargets.contains(_PTarget))
13095  {
13096  return false;
13097  }
13098 
13099  if (_M_pMessage == NULL || _M_pMessage->msg_id() != _MsgId)
13100  {
13101  return false;
13102  }
13103 
13104  return true;
13105  }
13106 
13107  // Consume is called by a target messaging block to take ownership of a
13108  // previously reserved message.
13109  virtual message<_Type> * consume(runtime_object_identity _MsgId, ITarget<_Type> * _PTarget)
13110  {
13111  if (_PTarget == NULL)
13112  {
13113  throw std::invalid_argument("_PTarget");
13114  }
13115 
13116  if (!_M_connectedTargets.contains(_PTarget))
13117  {
13118  throw bad_target();
13119  }
13120 
13121  if (_M_pMessage == NULL || _M_pMessage->msg_id() != _MsgId)
13122  {
13123  return NULL;
13124  }
13125 
13126  // The ownership of this message has changed. Set the internal pointer to NULL
13127  // so it won't be deleted in the destructor
13128 
13129  message<_Type> * _Result = _M_pMessage;
13130  _M_pMessage = NULL;
13131 
13132  // We are done. Unlink from the target. DO NOT TOUCH "this" pointer after unlink
13133  unlink_target(_PTarget);
13134 
13135  return _Result;
13136  }
13137 
13138  // Release needs to be defined for ISource blocks, but AsyncOriginator doesn't need to
13139  // do anything for reservation release because there can only be one target block to read
13140  // the data at a later time.
13141  virtual void release(runtime_object_identity _MsgId, ITarget<_Type> * _PTarget)
13142  {
13143  if (_PTarget == NULL)
13144  {
13145  throw std::invalid_argument("_PTarget");
13146  }
13147 
13148  if (!_M_connectedTargets.contains(_PTarget))
13149  {
13150  throw bad_target();
13151  }
13152 
13153  if ((_M_pMessage == NULL) || (_M_pMessage->msg_id() != _MsgId))
13154  {
13155  throw message_not_found();
13156  }
13157 
13158  // We can be connected to only 1 target. Unlink from the target.
13159  // DO NOT TOUCH "this" pointer after unlink
13160  unlink_target(_PTarget);
13161  }
13162 
13164  {
13165  _Acquire_ref();
13166  }
13167 
13169  {
13170  _Release_ref();
13171  }
13172 
13173  private:
13174 
13175  friend class _Originator;
13176 
13177  // Create an AsyncOriginator (constructor is private to ensure that
13178  // it is allocated on the heap).
13180  _M_pMessage(NULL),
13181  _M_refcount(0)
13182  {
13183  }
13184 
13185  // Send the given value to the target
13186  bool _internal_send(ITarget<_Type> * _PTarget, _Type const & _Value)
13187  {
13188  // Keep a refcount so that this object doesn't get deleted if
13189  // the target decides to unlink before we release our lock
13190  _Acquire_ref();
13191 
13192  message_status _Status = declined;
13193  message<_Type> * _Msg = new message<_Type>(_Value);
13194 
13195  {
13196  // Hold the lock to ensure that the target doesn't unlink while
13197  // propagation is in progress.
13198  _R_lock _Lock(_M_internalLock);
13199 
13200  // link to the target, create a message and send it
13201  link_target(_PTarget);
13202 
13204  _M_pMessage = _Msg;
13205 
13206  _Status = _PTarget->propagate(_M_pMessage, this);
13207  }
13208 
13209  // If the status is anything other than postponed, unlink away
13210  // from the target and delete the AsyncOriginator.
13211  if (_Status != postponed)
13212  {
13213  unlink_target(_PTarget);
13214  }
13215 
13216  // Release the reference acquired above
13217  _Release_ref();
13218 
13219  return (_Status == accepted);
13220  }
13221 
13222  // Add a target messaging block for this AsyncOriginator
13223  virtual void link_target(ITarget<_Type> * _PTarget)
13224  {
13225  if (_PTarget == NULL)
13226  {
13227  throw std::invalid_argument("_PTarget");
13228  }
13229 
13230  // Acquire a reference that will be released by unlink_target
13231  _Acquire_ref();
13232  _M_connectedTargets.add(_PTarget);
13233  _Invoke_link_source(_PTarget);
13234 
13235  // There should be no pending messages to propagate at this time.
13237 
13238  }
13239 
13240  // Acquire a reference on the async originator object
13242  {
13244  }
13245 
13246  // Release the reference on the async originator object. The object
13247  // will be deleted when the reference count goes to 0.
13249  {
13252  {
13253  delete this;
13254  }
13255  }
13256 
13257  // The message that will be propagated by the AsyncOriginator
13259 
13260  // Reference count to manage object lifetime
13261  volatile long _M_refcount;
13262 
13263  // The internal lock for this block for its message
13265 
13266  // connected targets
13267  _Target_registry _M_connectedTargets;
13268  };
13269 
13270  // static class that exposes methods to initiate messages into
13271  // a dataflow network
13273  {
13274  public:
13275 
13276  // Synchronous initiation of messages
13277  template <class _Type>
13278  static bool _send(ITarget<_Type> * _Trg, const _Type& _Data)
13279  {
13280  if (_Trg != NULL && _Trg->supports_anonymous_source())
13281  {
13282  // _send will block until the message is accepted/rejected.
13283  // Note that this invokes the send method on the target which
13284  // would synchronously process the message.
13285  _AnonymousOriginator<_Type> _Send_block;
13286  return _Send_block._internal_send(_Trg, _Data);
13287  }
13288  else
13289  {
13290  // Create a blocking originator on the stack. _send will block until the
13291  // message is accepted/rejected.
13292  _SyncOriginator<_Type> _Orig;
13293  return _Orig._internal_send(_Trg, _Data);
13294  }
13295  }
13296 
13297  // Asynchronous initiation of messages
13298  template <class _Type>
13299  static bool _asend(ITarget<_Type> * _Trg, const _Type& _Data)
13300  {
13301  // If the block can participate in posting messages without requiring a call back, use that
13302  // method of initiating the message rather for efficiency purposes.
13303  if (_Trg != NULL && _Trg->supports_anonymous_source())
13304  {
13305  _AnonymousOriginator<_Type> _Asend_block;
13306  return _Asend_block._internal_asend(_Trg, _Data);
13307  }
13308  else
13309  {
13310  // Needs to be allocated on the heap
13312  return _AsyncOrig->_internal_send(_Trg, _Data);
13313  }
13314  }
13315  };
13316 
13317 } // namespace details
13318 
13340 
13341 template <class _Type>
13342 bool send(_Inout_ ITarget<_Type> * _Trg, const _Type& _Data)
13343 {
13344  return details::_Originator::_send(_Trg, _Data);
13345 }
13346 
13347 
13369 
13370 template <class _Type>
13371 bool send(ITarget<_Type> &_Trg, const _Type &_Data)
13372 {
13373  return send(&_Trg, _Data);
13374 }
13375 
13397 
13398 template <class _Type>
13399 bool asend(_Inout_ ITarget<_Type> * _Trg, const _Type& _Data)
13400 {
13401  return details::_Originator::_asend(_Trg, _Data);
13402 }
13403 
13404 
13426 
13427 template <class _Type>
13428 bool asend(ITarget<_Type> &_Trg, const _Type &_Data)
13429 {
13430  return asend(&_Trg, _Data);
13431 }
13432 
13445 template <class _Type>
13446 void Trace_agents_register_name(_Inout_ _Type * _PObject, _In_z_ const wchar_t * _Name)
13447 {
13449 }
13450 
13451 } // namespace Concurrency
13452 
13453 namespace concurrency = Concurrency;
13454 
13455 #pragma warning(pop)
13456 #pragma pack(pop)
virtual message< size_t > * accept_message(runtime_object_identity _MsgId)
Accept the message by making a copy of the payload.
Definition: agents.h:9603
void decline_incoming_messages()
Indicates to the block that new messages should be declined.
Definition: agents.h:4203
_Message * _Peek()
Definition: agents.h:226
call(_Call_method const &_Func, filter_method const &_Filter)
Constructs a call messaging block.
Definition: agents.h:5495
virtual void unlink_source(_Inout_ ISource< _Source_type > *_PSource)
Unlinks a specified source block from this target_block object.
Definition: agents.h:3009
volatile bool _M_fIsInitialized
Definition: agents.h:7409
multitype_join const & operator=(multitype_join const &)
volatile long _M_lwtCount
A counter to indicate the number of outstanding LWTs
Definition: agents.h:2415
void _Propagate_priority_order(::Concurrency::details::_Queue< message< _Target_type >> &_MessageBuffer)
Propagate messages in priority order.
Definition: agents.h:7988
void _Delete_choices()
Deletes all _Reserving_node elements that were created in _Initialize_choices.
Definition: agents.h:10094
virtual ~_AsyncOriginator()
Definition: agents.h:12986
single_assignment()
Constructs a single_assignment messaging block.
Definition: agents.h:6920
virtual bool supports_anonymous_source()
Overrides the supports_anonymous_source method to indicate that this block can accept messages offere...
Definition: agents.h:4539
void _Swap(_Myt &_Right)
Definition: agents.h:361
volatile long _M_referenceCount
Definition: agents.h:3897
_MessageProcessorType _M_messageProcessor
Processor used for asynchronous message handling
Definition: agents.h:3839
virtual message< _Target_type > * consume(runtime_object_identity _MsgId, _Inout_ ITarget< _Target_type > *_PTarget)
Consumes a message previously offered by this source_block object and successfully reserved by the ta...
Definition: agents.h:3457
void stop()
Stops the timer messaging block.
Definition: agents.h:6606
virtual void unlink_sources()
Unlinks all source blocks from this target_block object.
Definition: agents.h:3022
~single_assignment()
Destroys the single_assignment messaging block.
Definition: agents.h:7053
ISource< _Type > * _M_pReservedSource
Definition: agents.h:8851
::Concurrency::details::_Queue< message< _Output > > _M_messageBuffer
Message queue used to store outbound messages
Definition: agents.h:6403
~multitype_join()
Destroys the multitype_join messaging block.
Definition: agents.h:10988
virtual ~ISource()
Destroys the ISource object.
Definition: agents.h:2604
multitype_join(multitype_join &&_Join)
Constructs a multitype_join messaging block.
Definition: agents.h:10965
void _Invoke_handler(message< _Type > *_Msg)
Definition: agents.h:2349
_TargetLinkRegistry::iterator target_iterator
The iterator to walk the connected targets.
Definition: agents.h:3242
size_t _M_index
Definition: agents.h:421
_Reserving_node const & operator=(_Reserving_node const &)
virtual message_status propagate_message(_Inout_ message< _Source_type > *_PMessage, _Inout_ ISource< _Source_type > *_PSource)=0
When overridden in a derived class, this method asynchronously passes a message from an ISource block...
virtual message< size_t > * consume(runtime_object_identity _MsgId, _Inout_ ITarget< size_t > *_PTarget)
Consumes a message previously offered by this choice messaging block and successfully reserved by the...
Definition: agents.h:9996
virtual void propagate_to_any_targets(_Inout_opt_ message< _Output > *)
Executes the transformer function on the input messages.
Definition: agents.h:6272
volatile long _M_referenceCount
Definition: agents.h:12974
propagator_block()
Constructs a propagator_block object.
Definition: agents.h:3952
unbounded_buffer()
Constructs an unbounded_buffer messaging block.
Definition: agents.h:4280
This class describes an exception thrown when an invalid operation is performed that is not more accu...
Definition: concrt.h:1705
void _Initialize(const _Type &_Value, _Inout_ ITarget< _Type > *_PTarget, bool _Repeating, _Inout_opt_ Scheduler *_PScheduler=NULL, _Inout_opt_ ScheduleGroup *_PScheduleGroup=NULL)
Common initialization.
Definition: agents.h:6826
virtual message< size_t > * accept(runtime_object_identity _MsgId, _Inout_ ITarget< size_t > *_PTarget)
Accepts a message that was offered by this choice block, transferring ownership to the caller...
Definition: agents.h:9949
reference operator[](size_t _Pos)
Definition: agents.h:335
Definition: agents.h:254
virtual message< _Type > * consume_message(runtime_object_identity _MsgId)
Consumes a message previously offered by the single_assignment and reserved by the target...
Definition: agents.h:7301
void _Initialize_order_node(ISource< _Type > *_PSource, size_t _Index, ITarget< size_t > *_PTarget, Scheduler *_PScheduler=NULL, ScheduleGroup *_PScheduleGroup=NULL)
Validate constructor arguments and fully connect this _Order_node_base.
Definition: agents.h:8440
void _Propagate_message()
Definition: agents.h:3868
ScheduleGroup * _M_pScheduleGroup
Definition: agents.h:10123
std::tr1::function< void(void)> _Propagator_method
The signature of the callback method invoked while propagating messages.
Definition: agents.h:2021
_Non_greedy_node(Scheduler &_PScheduler, ISource< _Type > *_PSource, size_t _Index, ITarget< size_t > *_PTarget, filter_method const &_Filter)
Constructs a _Non_greedy_node within the specified scheduler, and places it on any schedule group of ...
Definition: agents.h:9360
_CRTIMP _In_ int _Value
Definition: setjmp.h:190
ISource< _Type > * _M_pReservedSource
Definition: agents.h:9655
_Message ** _M_ppTail
Definition: agents.h:112
_Reserving_node(Scheduler &_PScheduler, ISource< _Type > *_PSource, size_t _Index, ITarget< size_t > *_PTarget=NULL)
Constructs a _Reserving_node within the specified scheduler, and places it on any schedule group of t...
Definition: agents.h:8568
virtual void process_input_messages(_Inout_ message< _Target_type > *_PMessage)
Process input messages. This is only useful for propagator blocks, which derive from source_block ...
Definition: agents.h:3689
choice(choice &&_Choice)
Constructs a choice messaging block.
Definition: agents.h:9811
~_MessageArray()
Definition: agents.h:8218
_MessageArray _M_messageArray
Definition: agents.h:8225
virtual void unlink_target(_Inout_ ITarget< _Type > *_PTarget)=0
When overridden in a derived class, unlinks a target block from this ISource block, if found to be previously linked.
basic_ostream< _Elem, _Traits > & _Out(basic_ostream< _Elem, _Traits > &_Os, _Ty _Dx)
Definition: random:169
virtual bool reserve_message(runtime_object_identity _MsgId)
Reserves a message previously offered by this join messaging block.
Definition: agents.h:7800
virtual void _Reset()=0
Resets the _Order_node_base and prepares it for the next propagation
_Type _M_sourceTuple
Definition: agents.h:10811
Scheduler * _M_pScheduler
Definition: agents.h:10120
virtual void release_message(runtime_object_identity _MsgId)
Releases a previous message reservation.
Definition: agents.h:5278
void _Process_message(message< _Target_type > *_PMessage)
Definition: agents.h:3859
agent const & operator=(agent const &)
_CRTIMP size_t wait(unsigned int _Timeout=COOPERATIVE_TIMEOUT_INFINITE)
Waits for the event to become signaled.
void _Consume_received_message()
Called for a non_greedy type join block in order to consume the message in this join block that has b...
Definition: agents.h:9508
virtual message_status send_message(_Inout_ message< _Type > *_PMessage, _Inout_ ISource< _Type > *_PSource)
Synchronously passes a message from an ISource block to this single_assignment messaging block...
Definition: agents.h:7178
bool _Is_head(runtime_object_identity _MsgId)
Definition: agents.h:232
bool _Non_greedy_acquire_messages()
Tries to acquire all of the messages from the _Non_greedy_nodes. Each node has already indicated that...
Definition: agents.h:10612
void _Wait_for_completion()
Definition: agents.h:12932
virtual message< _Target_type > * consume_message(runtime_object_identity _MsgId)=0
When overridden in a derived class, consumes a message that was previously reserved.
virtual bool reserve_message(runtime_object_identity _MsgId)
Reserves a message previously offered by this overwrite_buffer messaging block.
Definition: agents.h:5210
event _M_ev
Definition: agents.h:12963
static _CRTIMP void __cdecl _ScheduleTask(TaskProc _Proc, void *_Data)
::Concurrency::runtime_object_identity _M_id
Definition: agents.h:100
static bool _send(ITarget< _Type > *_Trg, const _Type &_Data)
Definition: agents.h:13278
~_Non_greedy_node()
Cleans up any resources that may have been created by the _Order_node.
Definition: agents.h:9427
State _M_state
Definition: agents.h:6781
source_link_manager< _SourceLinkRegistry > _SourceLinkManager
The type of the source_link_manager this target_block object.
Definition: agents.h:2810
An event type that represents the linking of message blocks
Definition: concrt.h:5825
Definition: concrt.h:390
Non-greedy join messaging blocks postpone messages and try and consume them after all have arrived...
Definition: agents.h:7439
_Propagator_method _M_propagator
A message propagating object which exposes the callback to be invoked
Definition: agents.h:2433
virtual void propagate_to_any_targets(_Inout_opt_ message< size_t > *)
Takes the message and propagates it to all the targets of this _Order_node
Definition: agents.h:9632
join(size_t _NumInputs)
Constructs a join messaging block.
Definition: agents.h:7482
An event type that represents the unlinking of message blocks
Definition: concrt.h:5831
virtual void release(runtime_object_identity _MsgId, _Inout_ ITarget< _Destination_type > *_PTarget)
Releases a previous successful message reservation.
Definition: agents.h:11112
virtual void link_source(_Inout_ ISource< _Source_type > *_PSource)
Links a specified source block to this propagator_block object.
Definition: agents.h:4110
virtual void process_input_messages(_Inout_ message< _Source_type > *_PMessage)
Processes messages that are received as inputs.
Definition: agents.h:3175
virtual bool reserve(runtime_object_identity _MsgId, _Inout_ ITarget< _Type > *_PTarget)=0
When overridden in a derived class, reserves a message previously offered by this ISource block...
virtual void sync_send(_Inout_opt_ message< _Target_type > *_Msg)
Synchronously queues up messages and starts a propagation task, if this has not been done already...
Definition: agents.h:3769
_SourceLinkRegistry::type::source_type _Source_type
The type of the payload for the incoming message to this propagator_block.
Definition: agents.h:3934
_CRTIMP unsigned int _Release()
The basic message envelope containing the data payload being passed between messaging blocks...
Definition: agents.h:1775
Implements busy wait with no backoff
Definition: concrt.h:604
#define _CONCRT_ASSERT(x)
Definition: concrt.h:137
virtual message< _Type > * accept_message(runtime_object_identity _MsgId)
Accepts a message that was offered by this overwrite_buffer messaging block, returning a copy of the ...
Definition: agents.h:5170
multitype_join< std::tuple< _Type1, _Type2, _Types...>, greedy > make_greedy_join(_Type1 _Item1, _Type2 _Item2, _Types..._Items)
Constructs a greedy multitype_join messaging block from an optional Scheduler or ScheduleGroup and tw...
Definition: agents.h:11484
virtual void unlink_target(_Inout_ ITarget< size_t > *_PTarget)
Unlinks a target block from this choice messaging block.
Definition: agents.h:9918
void async_send(_Inout_opt_ message< _Source_type > *_PMessage)
Asynchronously sends a message for processing.
Definition: agents.h:3124
runtime_object_identity _M_savedId
Definition: agents.h:8855
size_t _M_count
Definition: agents.h:115
std::tr1::function< _Output(_Input const &)> _Transform_method
Definition: agents.h:5808
virtual ~target_block()
Destroys the target_block object.
Definition: agents.h:2833
void _Delete_joins()
Deletes all _Order_node elements that were created in _Initialize_joins.
Definition: agents.h:11218
virtual bool supports_anonymous_source()
Overrides the supports_anonymous_source method to indicate that this block can accept messages offere...
Definition: agents.h:6147
virtual message< _Type > * accept_message(runtime_object_identity _MsgId)
Accepts a message that was offered by this single_assignment messaging block, returning a copy of the...
Definition: agents.h:7239
_FS_DLL int __CLRCALL_PURE_OR_CDECL _Link(const char *, const char *)
~_Reserving_node()
Cleans up any resources that may have been created by the _Reserving_node.
Definition: agents.h:8663
Definition: agents.h:12692
void _Clear_queued_messages()
Definition: agents.h:2233
virtual void release_ref(_Inout_ ITarget< _Target_type > *_PTarget)
Releases a reference count on this source_block object.
Definition: agents.h:3557
virtual void process_incoming_message()=0
When overridden in a derived class, performs the forward processing of messages into the block...
bool asend(_Inout_ ITarget< _Type > *_Trg, const _Type &_Data)
An asynchronous send operation, which schedules a task to propagate the data to the target block...
Definition: agents.h:13399
The ITarget class is the interface for all target blocks. Target blocks consume messages offered to t...
Definition: agents.h:452
virtual message< _Output > * accept_message(runtime_object_identity _MsgId)
Accepts a message that was offered by this transformer messaging block, transferring ownership to the...
Definition: agents.h:6163
void _Handle_message(message< _Target_type > *_PMessage)
Private methods.
Definition: agents.h:3849
::Concurrency::details::_Queue< message< _Destination_type > > _M_messageBuffer
Definition: agents.h:10819
The agent has been started, but not entered its run method.
Definition: agents.h:11510
virtual bool supports_anonymous_source()
Overrides the supports_anonymous_source method to indicate that this block can accept messages offere...
Definition: agents.h:5149
This class describes an exception thrown when a messaging block is given a pointer to a target which ...
Definition: concrt.h:1543
void(__cdecl * TaskProc)(void *)
Concurrency::details contains definitions of support routines in the public namespaces and one or mor...
Definition: concrt.h:265
virtual message_status propagate_message(_Inout_ message< _Type > *_PMessage, _Inout_ ISource< _Type > *_PSource)
Asynchronously passes a message from an ISource block to this single_assignment messaging block...
Definition: agents.h:7114
void decline_incoming_messages()
Indicates to the block that new messages should be declined.
Definition: agents.h:3069
virtual message< _Type > * consume_message(runtime_object_identity _MsgId)
Consumes a message previously offered by the timer and reserved by the target, transferring ownership...
Definition: agents.h:6708
_Type type
A type alias for _Type .
Definition: agents.h:1936
bool _internal_send(ITarget< _Type > *_PTarget, _Type const &_Value)
Definition: agents.h:12873
concurrent_queue< message< _Type > * > _M_queuedMessages
A queue of the messages
Definition: agents.h:2377
static _CRTIMP2 agent_status __cdecl wait(_Inout_ agent *_PAgent, unsigned int _Timeout=COOPERATIVE_TIMEOUT_INFINITE)
Waits for an agent to complete its task.
An event type that represents the creation of an object
Definition: concrt.h:5795
virtual void resume_propagation()=0
When overridden in a derived class, resumes propagation after a reservation has been released...
_Type source_type
A type alias for _Type .
Definition: agents.h:2736
__int64 _Trace_agents_get_id(_Type *_PObject)
Definition: agents.h:434
_Type type
A type alias for _Type .
Definition: agents.h:1901
virtual bool supports_anonymous_source()
When overridden in a derived class, returns true or false depending on whether the message block acce...
Definition: agents.h:2529
~join()
Destroys the join block.
Definition: agents.h:7636
virtual void link_target(_Inout_ ITarget< _Destination_type > *_PTarget)
Links a target block to this multitype_join messaging block.
Definition: agents.h:11011
void _Delete_stored_messages()
Deletes all messages currently stored in this message block. Should be called by the destructor to en...
Definition: agents.h:6861
_Join_node const & operator=(_Join_node const &)
_Greedy_node const & operator=(_Greedy_node const &)
bool _M_fForceRepropagation
A bool to signal to the processor to force a repropagation to occur
Definition: agents.h:4835
Scheduler * _M_pScheduler
Definition: agents.h:11774
_TargetLinkRegistry _M_connectedTargets
Connected targets
Definition: agents.h:3833
~_Order_node_base()
Cleans up any resources that may have been created by the _Order_node.
Definition: agents.h:8298
void _Initialize_choices()
Constructs and initializes a _Reserving_node for each tuple messaging block passed in...
Definition: agents.h:10056
runtime_object_identity _M_savedId
Definition: agents.h:9661
The source_block class is an abstract base class for source-only blocks. The class provides basic lin...
Definition: agents.h:3228
volatile long _M_stopProcessing
A flag set in the destructor of a block to cease processing of new messages. This is required to guar...
Definition: agents.h:2409
virtual message< _Target_type > * accept(runtime_object_identity _MsgId, _Inout_ ITarget< _Target_type > *_PTarget)
Accepts a message that was offered by this source_block object, transferring ownership to the caller...
Definition: agents.h:3365
_TargetLinkRegistry::type::type _Target_type
The payload type of messages handled by this source_block.
Definition: agents.h:3236
_Dynamic_array()
Definition: agents.h:266
void _Done(message_status _Status)
Definition: agents.h:12950
virtual bool reserve(runtime_object_identity _MsgId, ITarget< _Type > *_PTarget)
Definition: agents.h:12796
single_link_registry< ITarget< _Type > > _Target_registry
Definition: agents.h:12539
_CRTIMP unsigned int _Reference()
single_assignment(filter_method const &_Filter)
Constructs a single_assignment messaging block.
Definition: agents.h:6942
void _Push_back(_Type const &_Element)
Definition: agents.h:318
typedef void(__cdecl *_se_translator_function)(unsigned int
STL namespace.
message(_Type const &_P, runtime_object_identity _Id)
Constructs a message object.
Definition: agents.h:1809
_Type * _M_array
Definition: agents.h:418
virtual void release_message(runtime_object_identity _MsgId)=0
When overridden in a derived class, releases a previous message reservation.
The target did not accept the message.
Definition: agents.h:1750
message< _Type > * _M_pMessage
Definition: agents.h:5407
message(_Type const &_P)
Constructs a message object.
Definition: agents.h:1792
_Non_greedy_node(Scheduler &_PScheduler, ISource< _Type > *_PSource, size_t _Index, ITarget< size_t > *_PTarget=NULL)
Constructs a _Non_greedy_node within the specified scheduler, and places it on any schedule group of ...
Definition: agents.h:9332
Definition: agents.h:105
choice(_Type _Tuple)
Constructs a choice messaging block.
Definition: agents.h:9723
overwrite_buffer< agent_status > _M_status
Holds the current status of the agent.
Definition: agents.h:11753
virtual message< _Destination_type > * consume(runtime_object_identity _MsgId, _Inout_ ITarget< _Destination_type > *_PTarget)
Consumes a message previously offered by the multitype_join messaging block and successfully reserved...
Definition: agents.h:11097
virtual void unlink_source(_Inout_ ISource< _Source_type > *_PSource)
Unlinks a specified source block from this propagator_block object.
Definition: agents.h:4125
The target postponed the message.
Definition: agents.h:1755
The Concurrency namespace provides classes and functions that provide access to the Concurrency Runti...
Definition: agents.h:42
virtual message< _OutputType > * accept_message(runtime_object_identity _MsgId)
Accepts a message that was offered by this join messaging block, transferring ownership to the caller...
Definition: agents.h:7770
virtual void release(runtime_object_identity _MsgId, _Inout_ ITarget< size_t > *_PTarget)
Releases a previous successful message reservation.
Definition: agents.h:10011
void _Reset()
Resets the _Greedy_node and prepares it for the next propagation
Definition: agents.h:9055
bool _Reserve_received_message()
Called for a non_greedy type join block in order to reserve the message in this join block ...
Definition: agents.h:9466
Helper class used in multi-type greedy join blocks Ordered node is a single-target, single-source ordered propagator block
Definition: agents.h:8879
virtual void link_target(_Inout_ ITarget< _Type > *_PTarget)=0
When overridden in a derived class, links a target block to this ISource block.
_FunctorType _Call_method
The function type that this block executes upon receiving a message.
Definition: agents.h:5449
virtual ~source_block()
Destroys the source_block object.
Definition: agents.h:3262
::Concurrency::details::_ReentrantPPLLock _M_resetLock
Definition: agents.h:9239
virtual bool supports_anonymous_source()
Overrides the supports_anonymous_source method to indicate that this block can accept messages offere...
Definition: agents.h:5739
_SourceLinkManager::iterator source_iterator
The type of the iterator for the source_link_manager for this target_block object.
Definition: agents.h:2816
volatile long _M_counter
Definition: agents.h:10816
Helper class used in multi-type choice blocks Ordered node is a single-target, single-source ordered ...
Definition: agents.h:8500
join_type
The type of a join messaging block.
Definition: agents.h:7427
virtual void release(runtime_object_identity _MsgId, _Inout_ ITarget< _Type > *_PTarget)=0
When overridden in a derived class, releases a previous successful message reservation.
Scheduler * _M_pScheduler
Definition: agents.h:11244
virtual void release_ref(_Inout_ ITarget< size_t > *_PTarget)
Releases a reference count on this choice messaging block.
Definition: agents.h:10043
_Type _M_sourceTuple
Definition: agents.h:10117
void propagate_to_any_targets(_Inout_opt_ message< _OutputType > *)
Constructs an output message containing an input message from each source when they have all propagat...
Definition: agents.h:7880
Definition: agents.h:10256
virtual void link_source(_Inout_ ISource< _Source_type > *_PSource)
Links a specified source block to this target_block object.
Definition: agents.h:2991
The timer has been initialized, but not yet started.
Definition: agents.h:6443
virtual void acquire_ref(_Inout_ ITarget< _Destination_type > *_PTarget)
Acquires a reference count on this multitype_join messaging block, to prevent deletion.
Definition: agents.h:11128
An ordered_message_processor is a message_processor that allows message blocks to process messages in...
Definition: agents.h:2008
_Reserving_node(ScheduleGroup &_PScheduleGroup, ISource< _Type > *_PSource, size_t _Index, ITarget< size_t > *_PTarget, filter_method const &_Filter)
Constructs a _Order_node within the specified schedule group. The scheduler is implied by the schedul...
Definition: agents.h:8650
#define _Out_writes_opt_(size)
Definition: sal.h:355
_Payload_type const & value()
Gets the message whose index was selected by the choice messaging block.
Definition: agents.h:9890
multitype_join< std::tuple< _Type1, _Type2, _Types...> > make_join(_Type1 _Item1, _Type2 _Item2, _Types..._Items)
Constructs a non_greedy multitype_join messaging block from an optional Scheduler or ScheduleGroup an...
Definition: agents.h:11368
void enable_batched_processing()
Enables batched processing for this block.
Definition: agents.h:3106
#define NULL
Definition: crtdbg.h:30
_Type _M_sourceTuple
Definition: agents.h:11241
virtual void unlink_source(_Inout_ ISource< _Type > *_PSource)=0
When overridden in a derived class, unlinks a specified source block from this ITarget block...
__int32 runtime_object_identity
Each message instance has an identity that follows it as it is cloned and passed between messaging co...
Definition: agents.h:50
_CRTIMP Concurrency::Scheduler * _GetScheduler()
Definition: concrt.h:396
Helper class used in multi-type non-greedy join blocks Ordered node is a single-target, single-source ordered propagator block
Definition: agents.h:9264
void _Invoke_link_source(ITarget< _Type > *_PLinkFrom)
Links this source to a target.
Definition: agents.h:2753
void pause()
Stops the timer messaging block. If it is a repeating timer messaging block, it can be restarted with...
Definition: agents.h:6619
size_t _Size() const
Definition: agents.h:353
virtual void release(runtime_object_identity _MsgId, _Inout_ ITarget< _Target_type > *_PTarget)
Releases a previous successful message reservation.
Definition: agents.h:3506
virtual message< _Type > * accept(runtime_object_identity _MsgId, ITarget< _Type > *_PTarget)
Definition: agents.h:12566
~_Join_node()
Cleans up any resources that may have been created by the join.
Definition: agents.h:10327
_Greedy_node(Scheduler &_PScheduler, ISource< _Type > *_PSource, size_t _Index, ITarget< size_t > *_PTarget, filter_method const &_Filter)
Constructs a _Greedy_node within the specified scheduler, and places it on any schedule group of the ...
Definition: agents.h:8972
An event type that represents the name for an object
Definition: concrt.h:5837
virtual _CRTIMP2 ~agent()
Destroys the agent.
message< _Type > * _M_pReceiveMessage
Definition: agents.h:8473
virtual void release_ref(_Inout_ ITarget< _Type > *)
When overridden in a derived class, releases a reference count on this ISource block.
Definition: agents.h:12614
volatile bool _M_fIsInitialized
Definition: agents.h:9667
A timer messaging block is a single-target source_block capable of sending a message to its target af...
Definition: agents.h:6429
virtual message< _Type > * accept(runtime_object_identity _MsgId, _Inout_ ITarget< _Type > *_PTarget)=0
When overridden in a derived class, accepts a message that was offered by this ISource block...
virtual message_status send_message(_Inout_ message< _Type > *_PMessage, _Inout_ ISource< _Type > *_PSource)
Synchronously passes a message from an ISource block to this unbounded_buffer messaging block...
Definition: agents.h:4515
size_t _M_size
Definition: agents.h:424
The agent finished without being canceled.
Definition: agents.h:11520
_Reserving_node(ISource< _Type > *_PSource, size_t _Index, ITarget< size_t > *_PTarget, filter_method const &_Filter)
Constructs a _Reserving_node within the default scheduler, and places it on any schedule group of the...
Definition: agents.h:8542
void _Wait_on_ref()
Definition: agents.h:12940
size_t index()
Returns an index into the tuple representing the element selected by the choice messaging block...
Definition: agents.h:9869
virtual void link_target_notification(_Inout_ ITarget< _Output > *)
A callback that notifies that a new target has been linked to this transformer messaging block...
Definition: agents.h:6256
_Type & reference
Definition: agents.h:260
virtual void process_input_messages(_Inout_ message< _Target_type > *_PMessage)
Process input messages. This is only useful for propagator blocks, which derive from source_block ...
Definition: agents.h:4155
size_t _Count() const
Definition: agents.h:131
message< std::vector< _Type > > *__cdecl _Create_new_message()
Constructs a new message from the data output.
Definition: agents.h:8043
virtual message_status send_message(_Inout_ message< _Input > *_PMessage, _Inout_ ISource< _Input > *_PSource)
Synchronously passes a message from an ISource block to this transformer messaging block...
Definition: agents.h:6121
virtual bool reserve(runtime_object_identity _MsgId, ITarget< _Type > *_PTarget)
Definition: agents.h:12590
bool has_value() const
Checks whether this single_assignment messaging block has been initialized with a value yet...
Definition: agents.h:7069
virtual message< _OutputType > * consume_message(runtime_object_identity _MsgId)
Consumes a message previously offered by the join messaging block and reserved by the target...
Definition: agents.h:7820
choice< std::tuple< _Type1, _Type2, _Types...> > make_choice(_Type1 _Item1, _Type2 _Item2, _Types..._Items)
Constructs a choice messaging block from an optional Scheduler or ScheduleGroup and two or more input...
Definition: agents.h:10244
The timer has started and been paused.
Definition: agents.h:6453
A multitype_join messaging block is a multi-source, single-target messaging block that combines toget...
Definition: agents.h:10851
void _Delete_stored_messages()
Deletes all messages currently stored in this message block. Should be called by the destructor to en...
Definition: agents.h:7386
runtime_object_identity * _M_savedIds
Definition: agents.h:8236
virtual bool reserve_message(runtime_object_identity _MsgId)
Reserves a message previously offered by this single_assignment messaging block.
Definition: agents.h:7272
void _Populate_destination_tuple(_Destination_type &_Destination_tuple, ISource< size_t > **_Sources)
Copies payloads from all sources to destination tuple.
Definition: agents.h:10790
choice const & operator=(choice const &)
An unbounded_buffer messaging block is a multi-target, multi-source, ordered propagator_block capable...
Definition: agents.h:4264
void _Clear()
Definition: agents.h:306
bool _SpinOnce()
Spins for one time quantum,until a maximum spin is reached.
Definition: concrt.h:652
The message_processor class is the abstract base class for processing of message objects. There is no guarantee on the ordering of the messages.
Definition: agents.h:1929
bool _Release_received_message()
Called for a non_greedy type join block release a reservation on this block
Definition: agents.h:9530
overwrite_buffer const & operator=(overwrite_buffer const &)
unbounded_buffer(filter_method const &_Filter)
Constructs an unbounded_buffer messaging block.
Definition: agents.h:4303
multitype_join(_Type _Tuple)
Constructs a multitype_join messaging block.
Definition: agents.h:10877
_Queue()
Definition: agents.h:121
size_t _M_count
Definition: agents.h:8235
timer const & operator=(timer const &)
#define _InterlockedDecrementSizeT(_Target)
Definition: concrt.h:111
virtual ~ordered_message_processor()
Destroys the ordered_message_processor object.
Definition: agents.h:2056
overwrite_buffer(filter_method const &_Filter)
Constructs an overwrite_buffer messaging block.
Definition: agents.h:4903
bool _internal_asend(ITarget< _Type > *_PTarget, _Type const &_Value)
Definition: agents.h:12648
virtual void resume_propagation()
Resumes propagation after a reservation has been released
Definition: agents.h:10469
virtual void link_target_notification(_Inout_ ITarget< std::vector< _Type >> *)
A callback that notifies that a new target has been linked to this join messaging block...
Definition: agents.h:7862
message< _Type > * _M_pMessage
Definition: agents.h:12684
virtual message< _Type > * accept(runtime_object_identity _MsgId, ITarget< _Type > *_PTarget)
Definition: agents.h:12762
virtual message< _Type > * accept(runtime_object_identity _MsgId, ITarget< _Type > *_PTarget)
Definition: agents.h:13058
The timer has been started.
Definition: agents.h:6448
_Target_registry _M_connectedTargets
Definition: agents.h:13267
message< _Type > const & operator=(message< _Type > const &)
virtual void propagate_to_any_targets(_Inout_ message< _Type > *_PMessage)
Places the message _PMessage in this overwrite_buffer messaging block and offers it to all of the li...
Definition: agents.h:5333
static void __cdecl _Agent_task_wrapper(void *data)
_In_ _CRT_GUARDOVERFLOW size_t _NewSize
Definition: malloc.h:108
volatile bool _M_fDeclineMessages
A bool that is set to indicate that all messages should be declined in preparation for deleting the b...
Definition: agents.h:4243
virtual bool reserve_message(runtime_object_identity _MsgId)
Reserves a message previously offered by the source.
Definition: agents.h:10423
int i[4]
Definition: dvec.h:70
_Join_node(Scheduler &_PScheduler)
Constructs a join within the specified scheduler, and places it on any schedule group of the schedule...
Definition: agents.h:10305
::Concurrency::details::_NonReentrantPPLLock _M_asyncSendLock
A lock to use for queueing incoming messages.
Definition: agents.h:2383
ScheduleGroup * _M_pScheduleGroup
Definition: agents.h:11775
virtual void link_source(_Inout_ ISource< _Type > *_PSource)=0
When overridden in a derived class, links a specified source block to this ITarget block...
_CRTIMP2 agent()
Constructs an agent.
virtual message_status send_message(_Inout_ message< _Type > *_PMessage, _Inout_ ISource< _Type > *_PSource)
Synchronously passes a message from an ISource block to this call messaging block. It is invoked by the send method, when called by a source block.
Definition: agents.h:5707
_Type dequeue()
Removes an item from the unbounded_buffer messaging block.
Definition: agents.h:4450
virtual void resume_propagation()
Resumes propagation after a reservation has been released.
Definition: agents.h:5299
_CRTIMP void set()
Signals the event.
virtual void sync_send(_Inout_opt_ message< _Type > *_Msg)=0
When overridden in a derived class, places messages into the block synchronously. ...
virtual message_status propagate(_Inout_opt_ message< _Source_type > *_PMessage, _Inout_opt_ ISource< _Source_type > *_PSource)
Asynchronously passes a message from a source block to this target block.
Definition: agents.h:3987
virtual void propagate_to_any_targets(_Inout_opt_ message< _Destination_type > *)
Takes the message and propagates it to all the targets of this join block.
Definition: agents.h:10501
volatile bool _M_fIsInitialized
Definition: agents.h:8858
_Type const & const_reference
Definition: agents.h:261
A class intended to be used as a base class for all independent agents. It is used to hide state from...
Definition: agents.h:11536
An event type that represents the conclusion of some processing
Definition: concrt.h:5807
_MessageArray(size_t _NumInputs)
Definition: agents.h:8211
_In_ size_t _In_z_ const unsigned char * _Src
Definition: mbstring.h:95
#define _In_z_
Definition: sal.h:319
message_status
The valid responses for an offer of a message object to a block.
Definition: agents.h:1739
virtual void link_target_notification(_Inout_ ITarget< _Type > *_PTarget)
A callback that notifies that a new target has been linked to this timer messaging block...
Definition: agents.h:6748
bool send(_Inout_ ITarget< _Type > *_Trg, const _Type &_Data)
A synchronous send operation, which waits until the target either accepts or declines the message...
Definition: agents.h:13342
#define _In_
Definition: sal.h:314
runtime_object_identity msg_id() const
Returns the ID of the message object.
Definition: agents.h:1860
virtual void _Reset()
Resets the _Reserving_node and prepares it for the next propagation
Definition: agents.h:8683
_CRTIMP2 ISource< agent_status > * status_port()
An asynchronous source of status information from the agent.
Definition: utility:307
volatile long _M_refCount
Definition: agents.h:1912
void _Invoke_unlink_source(ITarget< _Type > *_PUnlinkFrom)
Unlinks this source from a target.
Definition: agents.h:2772
_CRTIMP2 bool start()
Moves an agent from the agent_created state to the agent_runnable state, and schedules it for executi...
_Call_method _M_pFunc
Definition: agents.h:5775
virtual void link_target(_Inout_ ITarget< size_t > *_PTarget)
Links a target block to this choice messaging block.
Definition: agents.h:9906
#define _Inout_opt_
Definition: sal.h:385
virtual message< _Target_type > * accept_message(runtime_object_identity _MsgId)=0
When overridden in a derived class, accepts an offered message by the source. Message blocks should o...
_Message * _Dequeue()
Definition: agents.h:203
message< _Type > * _M_pMessage
Definition: agents.h:7403
This class describes an exception thrown when an operation has timed out.
Definition: concrt.h:1762
message(_In_ message const *_Msg)
Constructs a message object.
Definition: agents.h:1839
_SourceLinkRegistry::type::source_type _Source_type
The type of the payload for the incoming messages to this target_block object.
Definition: agents.h:2804
_Join_node(ScheduleGroup &_PScheduleGroup)
Constructs a join within the specified schedule group. The scheduler is implied by the schedule group...
Definition: agents.h:10318
virtual message_status propagate_message(message< _Type > *_PMessage, ISource< _Type > *_PSource)
Asynchronously passes a message from an ISource block to this ITarget block. It is invoked by the pro...
Definition: agents.h:8713
::Concurrency::details::_Queue< message< std::vector< _Type > > > _M_messageBuffer
Definition: agents.h:8259
runtime_object_identity _M_reservedId
Definition: agents.h:9664
virtual message< _Type > * consume(runtime_object_identity _MsgId, ITarget< _Type > *_PTarget)
Definition: agents.h:12596
virtual void propagate_to_any_targets(_Inout_opt_ message< _Type > *_PMessage)
Places the message _PMessage in this single_assignment messaging block and offers it to all of the l...
Definition: agents.h:7359
virtual void unlink_targets()
When overridden in a derived class, unlinks all target blocks from this ISource block.
Definition: agents.h:13023
Definition: agents.h:13272
overwrite_buffer()
Constructs an overwrite_buffer messaging block.
Definition: agents.h:4881
An overwrite_buffer messaging block is a multi-target, multi-source, ordered propagator_block capable...
Definition: agents.h:4865
Definition: agents.h:8206
The target_block class is an abstract base class that provides basic link management functionality an...
Definition: agents.h:2796
void start()
Starts the timer messaging block. The specified number of milliseconds after this is called...
Definition: agents.h:6593
virtual void acquire_ref(_Inout_ ITarget< _Type > *)
When overridden in a derived class, acquires a reference count on this ISource block, to prevent deletion.
Definition: agents.h:13163
virtual void link_target_notification(_Inout_ ITarget< _Target_type > *)
A callback that notifies that a new target has been linked to this source_block object.
Definition: agents.h:3590
virtual bool reserve_message(runtime_object_identity _MsgId)
Reserves a message previously offered by this transformer messaging block.
Definition: agents.h:6193
transformer(_Transform_method const &_Func, _Inout_opt_ ITarget< _Output > *_PTarget=NULL)
Constructs a transformer messaging block.
Definition: agents.h:5832
virtual void acquire_ref(_Inout_ ITarget< _Type > *)
When overridden in a derived class, acquires a reference count on this ISource block, to prevent deletion.
Definition: agents.h:12609
virtual void resume_propagation()
Resumes propagation after a reservation has been released.
Definition: agents.h:7329
virtual message< _Type > * consume(runtime_object_identity _MsgId, ITarget< _Type > *_PTarget)
Definition: agents.h:12818
virtual void unlink_target(_Inout_ ITarget< _Destination_type > *_PTarget)
Unlinks a target block from this multitype_join messaging block.
Definition: agents.h:11023
virtual void resume_propagation()
Resumes propagation after a reservation has been released.
Definition: agents.h:4631
_Unwrap< _Type >::type _Destination_type
Definition: agents.h:10855
transformer const & operator=(transformer const &)
virtual void link_target_notification(_Inout_ ITarget< _Destination_type > *)
Notification that a target was linked to this source.
Definition: agents.h:10485
~choice()
Destroys the choice messaging block.
Definition: agents.h:9834
_Type value()
Gets a reference to the current payload of the message being stored in the overwrite_buffer messaging...
Definition: agents.h:5048
bool _M_fRepeating
Definition: agents.h:6787
bool _M_fDeclineMessages
A bool that is set to indicate that all messages should be declined in preparation for deleting the b...
Definition: agents.h:3197
virtual bool reserve_message(runtime_object_identity _MsgId)=0
When overridden in a derived class, reserves a message previously offered by this source_block object...
volatile long _M_queuedDataCount
A count of the current number of messages to process. Used as a flag to see if a new process message ...
Definition: agents.h:2390
::Concurrency::details::_ReentrantPPLLock _M_internalLock
Definition: agents.h:12969
virtual void _Fire()
Called when the timer fires.
Definition: agents.h:6808
virtual message< _Type > * consume(runtime_object_identity _MsgId, ITarget< _Type > *_PTarget)
Definition: agents.h:13109
void wait_for_async_sends()
Waits for all asynchronous propagations to complete.
Definition: agents.h:3149
::Concurrency::details::_ReentrantPPLLock::_Scoped_lock _R_lock
A lock holder that acquires a reentrant lock on instantiation and releases it on destruction ...
Definition: agents.h:64
static _CRTIMP2 void __cdecl wait_for_one(size_t _Count, _In_reads_(_Count) agent **_PAgents, agent_status &_Status, size_t &_Index, unsigned int _Timeout=COOPERATIVE_TIMEOUT_INFINITE)
Waits for any one of the specified agents to complete its task.
_SavedMessageIdArray(size_t _NumInputs)
Definition: agents.h:8238
std::vector< _Type > _OutputType
Definition: agents.h:7464
_Type type
A type alias for _Type .
Definition: agents.h:2538
This class describes an exception thrown when a messaging block is unable to find a requested message...
Definition: concrt.h:1567
static _CRTIMP void __cdecl _Yield()
virtual void acquire_ref(_Inout_ ITarget< size_t > *_PTarget)
Acquires a reference count on this choice messaging block, to prevent deletion.
Definition: agents.h:10027
virtual void unlink_targets()
Unlinks all targets from this choice messaging block.
Definition: agents.h:9931
void * _M_pSourceChoices[std::tr1::tuple_size< _Type >::value]
Definition: agents.h:10111
long remove_ref()
Subtracts from the reference count for the message object. Used for message blocks that need referenc...
Definition: agents.h:1892
Definition: concrt.h:5489
~call()
Destroys the call messaging block.
Definition: agents.h:5638
virtual void unlink_target_notification(_Inout_ ITarget< _Target_type > *_PTarget)
A callback that notifies that a target has been unlinked from this source_block object.
Definition: agents.h:3606
_Type type
A type alias for _Type .
Definition: agents.h:10998
void remove_sources()
Unlinks all sources after waiting for outstanding asynchronous send operations to complete...
Definition: agents.h:3164
_Type const payload
The payload of the message object.
Definition: agents.h:1869
_In_reads_(_N) const wchar_t *_S2
The target tried to accept the message, but it was no longer available.
Definition: agents.h:1760
void _Initialize_joins()
Constructs and initializes a _Order_node for each tuple messaging block passed in.
Definition: agents.h:11159
::Concurrency::details::_ReentrantPPLLock _M_internalLock
Definition: agents.h:13264
message< _Type > ** _M_messages
Definition: agents.h:8209
The target accepted the message.
Definition: agents.h:1745
message< _Type > * _M_pMessage
Definition: agents.h:12960
void enable_batched_processing()
Enables batched processing for this block.
Definition: agents.h:3745
_SyncOriginator()
Definition: agents.h:12699
volatile long _M_fCancelable
Definition: agents.h:11768
An event type that represents the scheduling of a process
Definition: concrt.h:5819
_Type const & value()
Gets a reference to the current payload of the message being stored.
Definition: agents.h:8325
::Concurrency::details::_ReentrantPPLLock _M_internalLock
Internal lock used for the following synchronization:
Definition: agents.h:3895
::Concurrency::details::_NonReentrantPPLLock::_Scoped_lock _NR_lock
A lock holder that acquires a non-reentrant lock on instantiation and releases it on destruction...
Definition: agents.h:57
void * _M_pSourceJoins[std::tr1::tuple_size< _Type >::value]
Definition: agents.h:11235
volatile long _M_refcount
Definition: agents.h:13261
ScheduleGroup * _M_pScheduleGroup
Definition: agents.h:11247
virtual void acquire_ref(_Inout_ ITarget< _Target_type > *)
Acquires a reference count on this source_block object, to prevent deletion.
Definition: agents.h:3541
A join messaging block is a single-target, multi-source, ordered propagator_block which combines toge...
Definition: agents.h:7461
void _Sync_send_helper(message< _Type > *_Msg)
Definition: agents.h:2242
virtual ~_SyncOriginator()
Definition: agents.h:12707
virtual void link_target(ITarget< _Type > *_PTarget)
Definition: agents.h:12916
message< _Type > * _M_pReservedMessage
Definition: agents.h:5410
void _Delete_stored_messages()
Deletes all messages currently stored in this message block. Should be called by the destructor to en...
Definition: agents.h:10771
The ISource class is the interface for all source blocks. Source blocks propagate messages to ITarget...
Definition: agents.h:451
_MessageProcessorType _M_messageProcessor
The message_processor for this target_block.
Definition: agents.h:3203
virtual void resume_propagation()
Resumes propagation after a reservation has been released.
Definition: agents.h:6735
_Type type
A type alias for _Type .
Definition: agents.h:2027
_SourceLinkManager _M_connectedSources
The container for all the sources connected to this block.
Definition: agents.h:4230
bool has_value() const
Checks whether this choice messaging block has been initialized with a value yet. ...
Definition: agents.h:9853
single_link_registry< ITarget< _Type > > _Target_registry
Definition: agents.h:12696
virtual message_status propagate_message(_Inout_ message< _Type > *_PMessage, _Inout_ ISource< _Type > *_PSource)
Asynchronously passes a message from an ISource block to this unbounded_buffer messaging block...
Definition: agents.h:4477
virtual message< size_t > * consume_message(runtime_object_identity)
Consumes a message previously offered by the source and reserved by the target, transferring ownershi...
Definition: agents.h:8378
virtual void propagate_to_any_targets(_Inout_opt_ message< size_t > *)
Takes the message and propagates it to all the targets of this _Order_node
Definition: agents.h:8797
void initialize_target(_Inout_opt_ Scheduler *_PScheduler=NULL, _Inout_opt_ ScheduleGroup *_PScheduleGroup=NULL)
Initializes the base object. Specifically, the message_processor object needs to be initialized...
Definition: agents.h:3087
#define false
Definition: stdbool.h:11
_Greedy_node(ScheduleGroup &_PScheduleGroup, ISource< _Type > *_PSource, size_t _Index, ITarget< size_t > *_PTarget=NULL)
Constructs a _Greedy_node within the specified schedule group. The scheduler is implied by the schedu...
Definition: agents.h:8997
_Non_greedy_node const & operator=(_Non_greedy_node const &)
virtual void unlink_target(ITarget< _Type > *_PTarget)
Definition: agents.h:12553
static _CRTIMP _Scheduler __cdecl _Get()
virtual void unlink_sources()
Unlinks all source blocks from this propagator_block object.
Definition: agents.h:4138
bool enqueue(_Type const &_Item)
Adds an item to the unbounded_buffer messaging block.
Definition: agents.h:4438
virtual void unlink_targets()
Unlinks all target blocks from this source_block object.
Definition: agents.h:3327
void * _InterlockedCompareExchangePointer(void *volatile *, void *, void *)
const unsigned int COOPERATIVE_TIMEOUT_INFINITE
Value indicating that a wait should never time out.
Definition: concrt.h:3538
virtual bool reserve_message(runtime_object_identity _MsgId)
Reserves a message previously offered by this unbounded_buffer messaging block.
Definition: agents.h:4585
virtual message< _Destination_type > * accept_message(runtime_object_identity _MsgId)
Accepts an offered message by the source, transferring ownership to the caller.
Definition: agents.h:10393
ScheduleGroup * _M_pScheduleGroup
The schedule group to process messages on
Definition: agents.h:2402
virtual void link_target(_Inout_ ITarget< _Target_type > *_PTarget)
Links a target block to this source_block object.
Definition: agents.h:3281
bool _M_fReferencedScheduler
Definition: agents.h:6790
_Greedy_node(Scheduler &_PScheduler, ISource< _Type > *_PSource, size_t _Index, ITarget< size_t > *_PTarget=NULL)
Constructs a _Greedy_node within the specified scheduler, and places it on any schedule group of the ...
Definition: agents.h:8945
virtual void acquire_ref(_Inout_ ITarget< _Type > *)
When overridden in a derived class, acquires a reference count on this ISource block, to prevent deletion.
Definition: agents.h:12858
message_status _Propagate_to_target(ITarget< size_t > *_PTarget)
Propagate messages to the given target
Definition: agents.h:8817
long _Process_message_helper()
Definition: agents.h:2272
void initialize_source(_Inout_opt_ Scheduler *_PScheduler=NULL, _Inout_opt_ ScheduleGroup *_PScheduleGroup=NULL)
Initializes the message_propagator within this source_block.
Definition: agents.h:3731
Definition: agents.h:12979
bool has_value() const
Checks whether this block has been initialized yet.
Definition: agents.h:8313
_Type const & value()
Gets a reference to the current payload of the message being stored in the single_assignment messagin...
Definition: agents.h:7085
An event type that represents the initiation of some processing
Definition: concrt.h:5801
An event type that represents the deletion of an object
Definition: concrt.h:5813
message< size_t > * _M_pSendMessage
Definition: agents.h:8476
void Trace_agents_register_name(_Inout_ _Type *_PObject, _In_z_ const wchar_t *_Name)
Associates the given name to the message block or agent in the ETW trace.
Definition: agents.h:13446
static _CRTIMP2 void __cdecl wait_for_all(size_t _Count, _In_reads_(_Count) agent **_PAgents, _Out_writes_opt_(_Count) agent_status *_PStatus=NULL, unsigned int _Timeout=COOPERATIVE_TIMEOUT_INFINITE)
Waits for all of the specified agents to complete their tasks.
virtual void link_target_notification(_Inout_ ITarget< _Type > *_PTarget)
A callback that notifies that a new target has been linked to this unbounded_buffer messaging block...
Definition: agents.h:4654
~unbounded_buffer()
Destroys the unbounded_buffer messaging block.
Definition: agents.h:4419
virtual void async_send(_Inout_opt_ message< _Type > *_Msg)
Asynchronously queues up messages and starts a processing task, if this has not been done already...
Definition: agents.h:2126
virtual void release_ref(_Inout_ ITarget< _Type > *)
When overridden in a derived class, releases a reference count on this ISource block.
Definition: agents.h:12863
::Concurrency::details::_NonReentrantPPLLock _M_propagationLock
Definition: agents.h:9236
agent_status
The valid states for an agent.
Definition: agents.h:11500
bool _internal_send(ITarget< _Type > *_PTarget, _Type const &_Value)
Definition: agents.h:13186
virtual void release_message(runtime_object_identity _MsgId)
Releases a previous message reservation.
Definition: agents.h:6226
_Join_node< _Type, _Destination_type, _Jtype > * _M_pJoinNode
Definition: agents.h:11238
unbounded_buffer const & operator=(unbounded_buffer const &)
virtual void unlink_targets()
Unlinks all targets from this multitype_join messaging block.
Definition: agents.h:11032
_Non_greedy_node(ScheduleGroup &_PScheduleGroup, ISource< _Type > *_PSource, size_t _Index, ITarget< size_t > *_PTarget, filter_method const &_Filter)
Constructs a _Non_greedy_node within the specified schedule group. The scheduler is implied by the sc...
Definition: agents.h:9414
volatile size_t _M_messagesRemaining
Definition: agents.h:8202
_Type _M_value
Definition: agents.h:6784
_In_ wctype_t _Type
Definition: ctype.h:205
~_Dynamic_array()
Definition: agents.h:274
_Non_greedy_node(ISource< _Type > *_PSource, size_t _Index, ITarget< size_t > *_PTarget, filter_method const &_Filter)
Constructs a _Non_greedy_node within the default scheduler, and places it on any schedule group of th...
Definition: agents.h:9306
virtual message< _Output > * consume_message(runtime_object_identity _MsgId)
Consumes a message previously offered by the transformer and reserved by the target, transferring ownership to the caller.
Definition: agents.h:6213
bool _Remove(_Message *_OldElement)
Definition: agents.h:155
void _Propagate_priority_order(::Concurrency::details::_Queue< message< _Target_type >> &_MessageBuffer)
Propagates messages in priority order.
Definition: agents.h:4747
virtual void link_target_notification(_Inout_ ITarget< size_t > *)
Notification that a target was linked to this source.
Definition: agents.h:8419
_CRTIMP2 agent_status status()
A synchronous source of status information from the agent.
Definition: tuple:127
void _Acquire_ref()
Definition: agents.h:13241
void _Create_send_message()
Create a message that contains an index used to determine the source message
Definition: agents.h:8431
virtual void initialize_batched_processing(_Handler_method const &_Processor, _Propagator_method const &_Propagator)
Initialize batched message processing
Definition: agents.h:2094
virtual ::Concurrency::runtime_object_identity _GetId() const
Definition: agents.h:93
virtual message_status propagate_message(message< _Type > *_PMessage, ISource< _Type > *_PSource)
Asynchronously passes a message from an ISource block to this ITarget block. It is invoked by the pro...
Definition: agents.h:9134
_Target_registry _M_connectedTargets
Definition: agents.h:12972
message * _M_pNext
Definition: agents.h:1906
void register_filter(filter_method const &_Filter)
Registers a filter method that will be invoked on every message received.
Definition: agents.h:3054
A single_assignment messaging block is a multi-target, multi-source, ordered propagator_block capable...
Definition: agents.h:6903
virtual void unlink_target(ITarget< _Type > *_PTarget)
Definition: agents.h:12717
Definition: array:530
std::tr1::function< bool(_Type const &)> filter_method
The signature of any method used by the block that returns a bool value to determine whether an offer...
Definition: agents.h:2545
bool _internal_send(ITarget< _Type > *_PTarget, _Type const &_Value)
Definition: agents.h:12623
_Handler_method _M_handler
A message handler object which exposes the callback to be invoked
Definition: agents.h:2421
_SourceLinkManager _M_connectedSources
The container for all the sources connected to this block.
Definition: agents.h:3184
virtual void release(runtime_object_identity _MsgId, ITarget< _Type > *_PTarget)
Definition: agents.h:13141
State
Tracks the state machine of the timer.
Definition: agents.h:6437
Base class for Helper node used in multi-type join and choice blocks Order node is a single-target...
Definition: agents.h:8278
_Non_greedy_node(ISource< _Type > *_PSource, size_t _Index, ITarget< size_t > *_PTarget=NULL)
Constructs a _Non_greedy_node within the default scheduler, and places it on any schedule group of th...
Definition: agents.h:9281
void wait_for_outstanding_async_sends()
Waits for all asynchronous propagations to complete. This propagator-specific spin wait is used in de...
Definition: agents.h:3794
size_t _M_index
Definition: agents.h:8479
void _Invoke_handler(long _Count)
Definition: agents.h:2319
transformer(_Transform_method const &_Func, _Inout_opt_ ITarget< _Output > *_PTarget, filter_method const &_Filter)
Constructs a transformer messaging block.
Definition: agents.h:5868
_Type type
A type alias for _Type .
Definition: agents.h:9844
virtual message< _Destination_type > * accept(runtime_object_identity _MsgId, _Inout_ ITarget< _Destination_type > *_PTarget)
Accepts a message that was offered by this multitype_join block, transferring ownership to the caller...
Definition: agents.h:11050
void _Delete_stored_messages()
Deletes all messages currently stored in this message block. Should be called by the destructor to en...
Definition: agents.h:6364
virtual void unlink_targets()=0
When overridden in a derived class, unlinks all target blocks from this ISource block.
Scheduler * _M_pScheduler
Definition: agents.h:6793
virtual void unlink_targets()
When overridden in a derived class, unlinks all target blocks from this ISource block.
Definition: agents.h:12738
_SourceLinkManager::iterator source_iterator
The type of the iterator for the source_link_manager for this propagator_block.
Definition: agents.h:3946
runtime_object_identity * _M_savedIdBuffer
Definition: agents.h:8253
runtime_object_identity _M_reservedId
Reserved message ID
Definition: agents.h:3827
_Type receive(_Inout_ ISource< _Type > *_Src, unsigned int _Timeout=COOPERATIVE_TIMEOUT_INFINITE)
A general receive implementation, allowing a context to wait for data from exactly one source and fil...
Definition: agents.h:12068
virtual message< _Type > * consume_message(runtime_object_identity _MsgId)
Consumes a message previously offered by the unbounded_buffer messaging block and reserved by the tar...
Definition: agents.h:4605
#define _CRTIMP2
Definition: crtdefs.h:126
virtual void link_target(ITarget< _Type > *_PTarget)
Definition: agents.h:12678
ITarget< _Type > * _M_pTarget
Definition: agents.h:12687
size_t _M_count
Definition: agents.h:8208
message< _Destination_type > * _Create_send_message()
Called when all the source messaging blocks have received their messages. The payloads are copied int...
Definition: agents.h:10719
virtual void resume_propagation()
Resumes propagation after a reservation has been released.
Definition: agents.h:6239
virtual message_status propagate_message(_Inout_ message< _Type > *_PMessage, _Inout_ ISource< _Type > *_PSource)
Asynchronously passes a message from an ISource block to this call messaging block. It is invoked by the propagate method, when called by a source block.
Definition: agents.h:5664
message< _Type > * _NewMessage() const
Allocates a new message.
Definition: agents.h:6799
virtual void release_message(runtime_object_identity _MsgId)
Releases a previous message reservation.
Definition: agents.h:7833
virtual void link_target(ITarget< _Type > *_PTarget)
Definition: agents.h:13223
void _Initialize(size_t _NumInputs, Scheduler *_PScheduler=NULL, ScheduleGroup *_PScheduleGroup=NULL)
Initializes the join messaging block.
Definition: agents.h:8156
void initialize(_Inout_opt_ Scheduler *_PScheduler, _Inout_opt_ ScheduleGroup *_PScheduleGroup, _Handler_method const &_Handler)
Initializes the ordered_message_processor object with the appropriate callback function, scheduler and schedule group.
Definition: agents.h:2077
virtual void release_ref(_Inout_ ITarget< _Type > *)
When overridden in a derived class, releases a reference count on this ISource block.
Definition: agents.h:13168
virtual void propagate_to_any_targets(_Inout_opt_ message< _Type > *)
Tries to offer the message produced by the timer block to all of the linked targets.
Definition: agents.h:6762
_Reserving_node(ScheduleGroup &_PScheduleGroup, ISource< _Type > *_PSource, size_t _Index, ITarget< size_t > *_PTarget=NULL)
Constructs a _Order_node within the specified schedule group. The scheduler is implied by the schedul...
Definition: agents.h:8622
volatile message_status _M_fStatus
Definition: agents.h:12966
virtual void release(runtime_object_identity _MsgId, ITarget< _Type > *_PTarget)
Definition: agents.h:12836
void initialize_source_and_target(_Inout_opt_ Scheduler *_PScheduler=NULL, _Inout_opt_ ScheduleGroup *_PScheduleGroup=NULL)
Initializes the base object. Specifically, the message_processor object needs to be initialized...
Definition: agents.h:4173
long __cdecl _InterlockedDecrement(long volatile *)
A choice messaging block is a multi-source, single-target block that represents a control-flow intera...
Definition: agents.h:9699
_CRTIMP2 bool cancel()
Moves an agent from either the agent_created or agent_runnable states to the agent_canceled state...
virtual message_status propagate_message(message< _Type > *_PMessage, ISource< _Type > *)
Asynchronously passes a message from an ISource block to this ITarget block. It is invoked by the pro...
Definition: agents.h:9579
virtual message< _Type > * accept_message(runtime_object_identity _MsgId)
Accepts a message that was offered by this timer messaging block, transferring ownership to the calle...
Definition: agents.h:6653
void _Wait_on_ref(long _RefCount=0)
Definition: agents.h:3877
virtual message_status propagate(_Inout_opt_ message< _Type > *_PMessage, _Inout_opt_ ISource< _Type > *_PSource)=0
When overridden in a derived class, asynchronously passes a message from a source block to this targe...
void _Delete_stored_messages()
Deletes all messages currently stored in this message block. Should be called by the destructor to en...
Definition: agents.h:8182
filter_method * _M_pFilter
The filter function which determines whether offered messages should be accepted. ...
Definition: agents.h:4236
~overwrite_buffer()
Destroys the overwrite_buffer messaging block.
Definition: agents.h:5016
virtual void process_input_messages(_Inout_ message< _Type > *_PMessage)
Executes the call function on the input messages.
Definition: agents.h:5760
~_SavedMessageIdArray()
Definition: agents.h:8245
static bool _asend(ITarget< _Type > *_Trg, const _Type &_Data)
Definition: agents.h:13299
virtual bool reserve(runtime_object_identity _MsgId, ITarget< _Type > *_PTarget)
Definition: agents.h:13087
virtual void propagate_to_any_targets(_Inout_opt_ message< size_t > *)
Takes the message and propagates it to all the targets of this _Greedy_node
Definition: agents.h:9208
virtual void release_message(runtime_object_identity)
Releases a previous message reservation.
Definition: agents.h:8393
single_assignment const & operator=(single_assignment const &)
virtual void unlink_targets()
When overridden in a derived class, unlinks all target blocks from this ISource block.
Definition: agents.h:12559
_AsyncOriginator()
Definition: agents.h:13179
virtual message_status send(_Inout_ message< _Type > *_PMessage, _Inout_ ISource< _Type > *_PSource)=0
When overridden in a derived class, synchronously passes a message to the target block.
virtual message< size_t > * accept_message(runtime_object_identity _MsgId)
Accept the message by making a copy of the payload.
Definition: agents.h:9178
void _Release_ref()
Definition: agents.h:13248
_Diff _Count
Definition: algorithm:1941
_Greedy_node(ISource< _Type > *_PSource, size_t _Index, ITarget< size_t > *_PTarget, filter_method const &_Filter)
Constructs a _Greedy_node within the default scheduler, and places it on any schedule group of the sc...
Definition: agents.h:8920
Definition: concrt.h:293
_Myt & operator=(const _Myt &_Right)
Definition: agents.h:282
call(_Call_method const &_Func)
Constructs a call messaging block.
Definition: agents.h:5470
#define _Inout_
Definition: sal.h:384
virtual bool reserve(runtime_object_identity _MsgId, _Inout_ ITarget< size_t > *_PTarget)
Reserves a message previously offered by this choice messaging block.
Definition: agents.h:9973
virtual bool reserve_message(runtime_object_identity)
Reserves a message previously offered by the source.
Definition: agents.h:8356
_Type _Receive_impl(ISource< _Type > *_Src, unsigned int _Timeout, typename ITarget< _Type >::filter_method const *_Filter_proc)
A general receive implementation, allowing a context to wait for data from exactly one source and fil...
Definition: agents.h:11813
void register_filter(filter_method const &_Filter)
Registers a filter method that will be invoked on every received message.
Definition: agents.h:4188
void _Propagate_priority_order(::Concurrency::details::_Queue< message< _Target_type >> &_MessageBuffer)
Propagate messages in priority order
Definition: agents.h:10666
virtual message_status send(_Inout_ message< _Source_type > *_PMessage, _Inout_ ISource< _Source_type > *_PSource)
Synchronously initiates a message to this block. Called by an ISource block. When this function compl...
Definition: agents.h:4035
~_Queue()
Definition: agents.h:126
virtual void link_target_notification(_Inout_ ITarget< _Type > *_PTarget)
A callback that notifies that a new target has been linked to this single_assignment messaging block...
Definition: agents.h:7342
virtual void propagate_output_messages()
Places the message _PMessage in this unbounded_buffer messaging block and tries to offer it to all o...
Definition: agents.h:4707
_CRTIMP _Timer(unsigned int _Ms, bool _FRepeating)
volatile bool _M_fIsInitialized
Definition: agents.h:5413
virtual message_status propagate_message(message< size_t > *_PMessage, ISource< size_t > *_PSource)
Asynchronously passes a message from an ISource block to this ITarget block. It is invoked by the pro...
Definition: agents.h:10353
virtual void process_incoming_message()
The processing function that is called asynchronously. It dequeues messages and begins processing the...
Definition: agents.h:2217
virtual message_status propagate_message(_Inout_ message< _Type > *_PMessage, _Inout_ ISource< _Type > *_PSource)
Asynchronously passes a message from an ISource block to this overwrite_buffer messaging block...
Definition: agents.h:5074
_Non_greedy_node(ScheduleGroup &_PScheduleGroup, ISource< _Type > *_PSource, size_t _Index, ITarget< size_t > *_PTarget=NULL)
Constructs a _Non_greedy_node within the specified schedule group. The scheduler is implied by the sc...
Definition: agents.h:9386
virtual message_status propagate_message(_Inout_ message< _Input > *_PMessage, _Inout_ ISource< _Input > *_PSource)
Asynchronously passes a message from an ISource block to this transformer messaging block...
Definition: agents.h:6076
void _Delete_stored_messages()
Deletes all messages currently stored in this message block. Should be called by the destructor to en...
Definition: agents.h:5375
std::tuple< typename std::remove_pointer< _Types >::type::source_type...> type
Definition: agents.h:10268
runtime_object_identity _M_savedId
Definition: agents.h:9243
_Message * _M_pHead
Definition: agents.h:109
Defines a block allowing sources of distinct types to be joined. Join node is a single-target, multi-source ordered propagator block
Definition: agents.h:10283
tuple< typename _Unrefwrap< _Types >::type...> make_tuple(_Types &&..._Args)
Definition: tuple:607
concurrent_queue< message< _Input > * > _M_inputMessages
Definition: agents.h:6397
virtual void link_target_notification(_Inout_ ITarget< _Type > *_PTarget)
A callback that notifies that a new target has been linked to this overwrite_buffer messaging block...
Definition: agents.h:5312
_Greedy_node(ScheduleGroup &_PScheduleGroup, ISource< _Type > *_PSource, size_t _Index, ITarget< size_t > *_PTarget, filter_method const &_Filter)
Constructs a _Greedy_node within the specified schedule group. The scheduler is implied by the schedu...
Definition: agents.h:9024
const_reference operator[](size_t _Pos) const
Definition: agents.h:344
~_Greedy_node()
Cleans up any resources that may have been created by the _Greedy_node.
Definition: agents.h:9036
source_link_manager< _SourceLinkRegistry > _SourceLinkManager
The type of the source_link_manager this propagator_block.
Definition: agents.h:3940
void _Init()
Definition: agents.h:384
void remove_network_links()
Removes all the source and target network links from this propagator_block object.
Definition: agents.h:4212
void _Delete_stored_messages()
Deletes all messages currently stored in this message block. Should be called by the destructor to en...
Definition: agents.h:4800
single_assignment< size_t > * _M_pSingleAssignment
Definition: agents.h:10114
_Message type
Definition: agents.h:118
void _Reset()
Resets the _Order_node and prepares it for the next propagation
Definition: agents.h:9447
_Join_node()
Constructs a join within the default scheduler, and places it on any schedule group of the scheduler'...
Definition: agents.h:10292
_SavedMessageIdArray _M_savedMessageIdArray
Definition: agents.h:8250
virtual message< _Type > * accept_message(runtime_object_identity _MsgId)
Accepts a message that was offered by this unbounded_buffer messaging block, transferring ownership t...
Definition: agents.h:4555
virtual message_status send_message(_Inout_ message< _Source_type > *, _Inout_ ISource< _Source_type > *)
When overridden in a derived class, this method synchronously passes a message from an ISource block ...
Definition: agents.h:2973
A call messaging block is a multi-source, ordered target_block that invokes a specified function when...
Definition: agents.h:5443
ITarget< _Target_type > * _M_pReservedFor
Connected target that is holding a reservation
Definition: agents.h:3821
~timer()
Destroys a timer messaging block.
Definition: agents.h:6563
_CRTIMP2 bool done()
Moves an agent into the agent_done state, indicating that the agent has completed.
virtual ~propagator_block()
Destroys a propagator_block object.
Definition: agents.h:3960
long __cdecl _InterlockedIncrement(long volatile *)
join(size_t _NumInputs, filter_method const &_Filter)
Constructs a join messaging block.
Definition: agents.h:7508
virtual void propagate_to_any_targets(_Inout_opt_ message< _Target_type > *_PMessage)
When overridden in a derived class, propagates the given message to any or all of the linked targets...
Definition: agents.h:3711
virtual void release(runtime_object_identity _MsgId, ITarget< _Type > *_PTarget)
Definition: agents.h:12604
virtual ~message()
Destroys the message object.
Definition: agents.h:1851
call const & operator=(call const &)
virtual void resume_propagation()
Resumes propagation after a reservation has been released.
Definition: agents.h:7846
virtual message_status send_message(_Inout_ message< _Type > *_PMessage, _Inout_ ISource< _Type > *_PSource)
Synchronously passes a message from an ISource block to this overwrite_buffer messaging block...
Definition: agents.h:5118
virtual message_status send_message(_Inout_ message< _Source_type > *, _Inout_ ISource< _Source_type > *)
When overridden in a derived class, this method synchronously passes a message from an ISource block ...
Definition: agents.h:4097
source_block()
Constructs a source_block object.
Definition: agents.h:3248
_Transform_method _M_pFunc
Definition: agents.h:6394
const size_t COOPERATIVE_WAIT_TIMEOUT
Value indicating that a wait timed out.
Definition: concrt.h:3529
_Dynamic_array< _Type > _Myt
Definition: agents.h:258
message< _Type > * _M_pMessage
Definition: agents.h:13258
long add_ref()
Adds to the reference count for the message object. Used for message blocks that need reference count...
Definition: agents.h:1879
virtual void unlink_sources()=0
When overridden in a derived class, unlinks all source blocks from this ITarget block.
virtual void release_message(runtime_object_identity _MsgId)
Releases a previous message reservation.
Definition: agents.h:4618
virtual void wait()=0
When overridden in a derived class, waits for all asynchronous operations to complete.
#define SIZE_MAX
Definition: limits.h:81
_Order_node_base()
Constructs a _Order_node_base within the default scheduler, and places it on any schedule group of th...
Definition: agents.h:8287
::Concurrency::details::_ReentrantPPLLock _M_resetLock
Definition: agents.h:9658
virtual bool reserve(runtime_object_identity _MsgId, _Inout_ ITarget< _Target_type > *_PTarget)
Reserves a message previously offered by this source_block object.
Definition: agents.h:3401
bool _Try_receive_impl(ISource< _Type > *_Src, _Type &_value, typename ITarget< _Type >::filter_method const *_Filter_proc)
Helper function that implements try_receive A general try-receive implementation, allowing a context ...
Definition: agents.h:12204
_Handler_method _M_processor
A message processing object which exposes the callback to be invoked
Definition: agents.h:2427
virtual message< _Type > * consume(runtime_object_identity _MsgId, _Inout_ ITarget< _Type > *_PTarget)=0
When overridden in a derived class, consumes a message previously offered by this ISource block and s...
virtual bool reserve(runtime_object_identity _MsgId, _Inout_ ITarget< _Destination_type > *_PTarget)
Reserves a message previously offered by this multitype_join messaging block.
Definition: agents.h:11074
::Concurrency::details::_Queue< message< _Type > > _M_messageBuffer
Message queue used to store messages
Definition: agents.h:4829
virtual message_status send(_Inout_ message< _Source_type > *_PMessage, _Inout_ ISource< _Source_type > *_PSource)
Synchronously passes a message from a source block to this target block.
Definition: agents.h:2911
_Check_return_ _In_ long _Size
Definition: io.h:325
_CRTIMP void __cdecl _Trace_agents(Agents_EventType _Type, __int64 agentId,...)
void sync_send(_Inout_opt_ message< _Source_type > *_PMessage)
Synchronously send a message for processing.
Definition: agents.h:3136
bool has_value() const
Checks whether this overwrite_buffer messaging block has a value yet.
Definition: agents.h:5032
ordered_message_processor()
Constructs an ordered_message_processor object.
Definition: agents.h:2037
virtual void release_ref(_Inout_ ITarget< _Type > *_PTarget)=0
When overridden in a derived class, releases a reference count on this ISource block.
virtual ~ITarget()
Destroys the ITarget object.
Definition: agents.h:2467
virtual message< _Destination_type > * consume_message(runtime_object_identity _MsgId)
Consumes a message previously offered by the source and reserved by the target, transferring ownershi...
Definition: agents.h:10443
virtual void process_message(message< _Source_type > *)
When overridden in a derived class, processes a message that was accepted by this target_block object...
Definition: agents.h:3038
static void __cdecl _Process_incoming_message_wrapper(void *_Data)
Wrapper for process_incoming_message suitable for use as a argument to CreateThread and other similar...
Definition: agents.h:1992
virtual void process_input_messages(_Inout_ message< _Type > *_PMessage)
Places the message _PMessage in this unbounded_buffer messaging block and tries to offer it to all o...
Definition: agents.h:4685
::Concurrency::details::_NonReentrantPPLLock _M_propagationLock
Definition: agents.h:8256
long __cdecl _InterlockedCompareExchange(long volatile *, long, long)
single_link_registry< ITarget< _Type > > _Target_registry
Definition: agents.h:12983
virtual bool reserve_message(runtime_object_identity _MsgId)
Reserves a message previously offered by this timer messaging block.
Definition: agents.h:6680
static const int _S_growthFactor
Definition: agents.h:426
virtual ~_AnonymousOriginator()
Definition: agents.h:12547
virtual void release_message(runtime_object_identity _MsgId)
Releases a previous message reservation.
Definition: agents.h:10456
::Concurrency::details::_NonReentrantPPLLock _M_propagationLock
Definition: agents.h:7406
A transformer messaging block is a single-target, multi-source, ordered propagator_block which can ac...
Definition: agents.h:5806
virtual void sync_send(_Inout_opt_ message< _Type > *_Msg)
Synchronously queues up messages and starts a processing task, if this has not been done already...
Definition: agents.h:2108
message< _Type > * _M_pMessage
Definition: agents.h:6778
virtual void resume_propagation()
Resumes propagation after a reservation has been released
Definition: agents.h:8406
virtual void async_send(_Inout_opt_ message< _Type > *_Msg)=0
When overridden in a derived class, places messages into the block asynchronously.
virtual message< _Type > * consume_message(runtime_object_identity _MsgId)
Consumes a message previously offered by the overwrite_buffer messaging block and reserved by the tar...
Definition: agents.h:5245
void _Propagate_priority_order(::Concurrency::details::_Queue< message< _Target_type >> &_MessageBuffer)
Propagates messages in priority order.
Definition: agents.h:6311
filter_method * _M_pFilter
The filter function which determines whether offered messages should be accepted. ...
Definition: agents.h:3190
virtual message_status propagate(_Inout_opt_ message< _Source_type > *_PMessage, _Inout_opt_ ISource< _Source_type > *_PSource)
Asynchronously passes a message from a source block to this target block.
Definition: agents.h:2860
::Concurrency::details::_Queue< message< _Type > > _M_processedMessages
Message queue used to store processed messages
Definition: agents.h:4823
The concurrent_queue class is a sequence container class that allows first-in, first-out access to it...
Definition: concurrent_queue.h:54
virtual void unlink_target(ITarget< _Type > *_PTarget)
Definition: agents.h:12994
message(message const &_Msg)
Constructs a message object.
Definition: agents.h:1826
void _Grow(size_t _NewSize)
Definition: agents.h:394
The propagator_block class is an abstract base class for message blocks that are both a source and ta...
Definition: agents.h:3926
_Reserving_node(Scheduler &_PScheduler, ISource< _Type > *_PSource, size_t _Index, ITarget< size_t > *_PTarget, filter_method const &_Filter)
Constructs a _Reserving_node within the specified scheduler, and places it on any schedule group of t...
Definition: agents.h:8596
Greedy join messaging blocks immediately accept a message upon propagation. This is more efficient...
Definition: agents.h:7433
bool try_receive(_Inout_ ISource< _Type > *_Src, _Type &_value)
A general try-receive implementation, allowing a context to look for data from exactly one source and...
Definition: agents.h:12426
virtual void process_message(_Inout_ message< _Type > *_PMessage)
Processes a message that was accepted by this call messaging block.
Definition: agents.h:5751
const _Ty & _Right
Definition: algorithm:4087
virtual void release_message(runtime_object_identity _MsgId)
Releases a previous message reservation.
Definition: agents.h:7315
virtual void async_send(_Inout_opt_ message< _Target_type > *_Msg)
Asynchronously queues up messages and starts a propagation task, if this has not been done already ...
Definition: agents.h:3783
The agent has been created but not started.
Definition: agents.h:11505
virtual void propagate_output_messages()
Propagate messages to targets.
Definition: agents.h:3698
void remove_targets()
Removes all target links for this source block. This should be called from the destructor.
Definition: agents.h:3803
bool _Try_consume_source_messages(_Destination_type &_Destination_tuple, ISource< size_t > **_Sources)
Tries to reserve from all sources. If successful, it will consume all the messages ...
Definition: agents.h:10555
virtual void release_ref(_Inout_ ITarget< _Destination_type > *_PTarget)
Releases a reference count on this multiple_join messaging block.
Definition: agents.h:11144
_Greedy_node(ISource< _Type > *_PSource, size_t _Index, ITarget< size_t > *_PTarget=NULL)
Constructs a _Greedy_node within the default scheduler, and places it on any schedule group of the sc...
Definition: agents.h:8896
_Reserving_node(ISource< _Type > *_PSource, size_t _Index, ITarget< size_t > *_PTarget=NULL)
Constructs a _Reserving_node within the default scheduler, and places it on any schedule group of the...
Definition: agents.h:8517
The agent has started.
Definition: agents.h:11515
message< _Type > * _M_pGreedyMessage
Definition: agents.h:9233
virtual void acquire_ref(_Inout_ ITarget< _Type > *_PTarget)=0
When overridden in a derived class, acquires a reference count on this ISource block, to prevent deletion.
The agent was canceled.
Definition: agents.h:11525
Scheduler * _M_pScheduler
The scheduler to process messages on
Definition: agents.h:2396
virtual void wait()
A processor-specific spin wait used in destructors of message blocks to make sure that all asynchrono...
Definition: agents.h:2188
~transformer()
Destroys the transformer messaging block.
Definition: agents.h:6048
_AnonymousOriginator()
Definition: agents.h:12542
bool _Enqueue(_Message *_Element)
Definition: agents.h:139
timer(unsigned int _Ms, _Type const &_Value, ITarget< _Type > *_PTarget=NULL, bool _Repeating=false)
Constructs a timer messaging block that will fire a given message after a specified interval...
Definition: agents.h:6486
target_block()
Constructs a target_block object.
Definition: agents.h:2822
virtual message< size_t > * accept_message(runtime_object_identity _MsgId)
Accept the message by making a copy of the payload.
Definition: agents.h:8754
std::tr1::function< void(message< _Type > *)> _Handler_method
The signature of the callback method invoked while processing messages.
Definition: agents.h:2015
virtual void release_message(runtime_object_identity _MsgId)
Releases a previous message reservation.
Definition: agents.h:6720
The timer has been stopped.
Definition: agents.h:6458
#define _InterlockedIncrementSizeT(_Target)
Definition: concrt.h:110
volatile long _M_fStartable
Definition: agents.h:11761
_Order_node_base const & operator=(_Order_node_base const &)
virtual void run()=0
Represents the main task of an agent. run should be overridden in a derived class, and specifies what the agent should do after it has been started.
virtual void unlink_target(_Inout_ ITarget< _Target_type > *_PTarget)
Unlinks a target block from this source_block object.
Definition: agents.h:3306
virtual message_status propagate_message(_Inout_ message< _Source_type > *_PMessage, _Inout_ ISource< _Source_type > *_PSource)=0
When overridden in a derived class, this method asynchronously passes a message from an ISource block...
message_status propagate_message(_Inout_ message< _Type > *_PMessage, _Inout_ ISource< _Type > *_PSource)
Asynchronously passes a message from an ISource block to this join messaging block. It is invoked by the propagate method, when called by a source block.
Definition: agents.h:7667