STLdoc
STLdocumentation
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
agents.h
Go to the documentation of this file.
1 /***
2 * ==++==
3 *
4 * Copyright (c) Microsoft Corporation. All rights reserved.
5 *
6 * ==--==
7 * =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
8 *
9 * agents.h
10 *
11 * Main public header file for ConcRT's asynchronous agents layer. This is the only header file a
12 * C++ program must include to use asynchronous agents.
13 *
14 * The core runtime, Parallel Patterns Library (PPL), and resource manager are defined in separate header files.
15 * =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
16 ****/
17 
18 #pragma once
19 
20 #include <crtdefs.h>
21 #include <concrt.h>
22 #include <stdexcept>
23 #include <functional>
24 #include <tuple>
25 #include <type_traits>
26 #include <vector>
27 #include <concurrent_queue.h>
28 
29 #define _AGENTS_H
30 
31 #pragma pack(push,_CRT_PACKING)
32 #pragma warning(push)
33 #pragma warning(disable: 4100) // Unreferenced formal parameter - needed for document generation
34 #pragma warning(disable: 4702) // Unreachable code - needed for retail version code path
35 #pragma warning(disable: 4297) // Function expected not to throw but does
36 // Forward declarations
37 
42 
43 namespace Concurrency
44 {
50 
51 typedef __int32 runtime_object_identity;
52 
57 
58 typedef ::Concurrency::details::_NonReentrantPPLLock::_Scoped_lock _NR_lock;
59 
64 
65 typedef ::Concurrency::details::_ReentrantPPLLock::_Scoped_lock _R_lock;
66 
67 
68 //***************************************************************************
69 // Internal namespace:
70 //
71 // ::Concurrency::details contains definitions to support routines in the public namespaces and macros.
72 // Clients should not directly interact with this namespace.
73 //***************************************************************************
74 
75 namespace details
76 {
77  //**************************************************************************
78  // Core Messaging Support:
79  //**************************************************************************
80 
81  //
82  // A base class to derive from that keeps unique IDs on its derived classes
83  //
84  class _Runtime_object : public _AllocBase
85  {
86  public:
87  // Creates a new runtime object.
89 
90  // Creates a runtime object from an identity.
92 
93  // Gets the runtime object identity.
95  {
96  return _M_id;
97  }
98 
99  protected:
100  // The runtime object identity.
102  };
103 
104  // A queue used to hold the messages for the messaging blocks
105  template<class _Message>
106  class _Queue : public _AllocBase
107  {
108  protected:
109  // A pointer to the head of the queue.
110  _Message * _M_pHead;
111 
112  // A pointer to a pointer to the tail of the queue.
113  _Message ** _M_ppTail;
114 
115  // The number of elements presently stored in the queue.
116  size_t _M_count;
117 
118  public:
119  typedef _Message type;
120 
121  // Create a Queue
122  _Queue() : _M_pHead(NULL), _M_ppTail(&_M_pHead), _M_count(0)
123  {
124  }
125 
126  // Destroy the queue
128  {
129  }
130 
131  // Returns the count of items in the queue
132  size_t _Count() const
133  {
134  return _M_count;
135  }
136 
137  // Add an item to the tail of the queue
138  //
139  // Returns a Boolean indicating whether the operation succeeded.
140  bool _Enqueue(_Message *_Element)
141  {
142  _CONCRT_ASSERT(_Element->_M_pNext == NULL);
143  _CONCRT_ASSERT(*_M_ppTail == NULL);
144 
145  *_M_ppTail = _Element;
146  _Element->_M_pNext = NULL;
147  _M_ppTail = &(_Element->_M_pNext);
148  _M_count++;
149 
150  return true;
151  }
152 
153  // Remove the specified element from the queue
154  //
155  // Returns a Boolean indicating whether the operation succeeded, that is, the message was found in the queue.
156  bool _Remove(_Message * _OldElement)
157  {
158  bool _Result = false;
159 
160  _CONCRT_ASSERT(_OldElement != NULL);
161 
162  if (_M_pHead == _OldElement)
163  {
164  _M_pHead = _OldElement->_M_pNext;
165  if (_M_pHead == NULL)
166  {
167  _M_ppTail = &_M_pHead;
168  }
169 
170  _OldElement->_M_pNext = NULL;
171  _M_count--;
172  _Result = true;
173  }
174  else
175  {
176  _Message * _Next = NULL;
177  for (_Message * _Node = _M_pHead; _Node != NULL; _Node = _Next)
178  {
179  _Next = _Node->_M_pNext;
180 
181  if (_Node->_M_pNext == _OldElement)
182  {
183  _Node->_M_pNext = _OldElement->_M_pNext;
184  // if this is the last element of the _Queue
185  if (_Node->_M_pNext == NULL && _M_count == 1)
186  {
187  _M_ppTail = &_M_pHead;
188  }
189 
190  _OldElement->_M_pNext = NULL;
191  _M_count--;
192  _Result = true;
193  break;
194  }
195  }
196  }
197 
198  return _Result;
199  }
200 
201  // Dequeue an item from the head of queue
202  //
203  // Returns a pointer to the message found at the head of the queue.
204  _Message * _Dequeue()
205  {
206  if (_M_pHead == NULL)
207  {
208  return NULL;
209  }
210 
211  _Message * _Result = _M_pHead;
212 
213  _M_pHead = _Result->_M_pNext;
214  if (_M_pHead == NULL)
215  {
216  _M_ppTail = &_M_pHead;
217  }
218 
219  _Result->_M_pNext = NULL;
220  _M_count--;
221  return _Result;
222  }
223 
224  // Return the item at the head of the queue, without dequeuing
225  //
226  // Returns a pointer to the message found at the head of the queue.
227  _Message * _Peek()
228  {
229  return _M_pHead;
230  }
231 
232  // Return true if the ID matches the message at the head of the queue
233  bool _Is_head(runtime_object_identity _MsgId)
234  {
235  // Peek at the next message in the message buffer. Use it to
236  // check if the IDs match
237  _Message * _Msg = _M_pHead;
238 
239  if (_Msg == NULL || _Msg->msg_id() != _MsgId)
240  {
241  return false;
242  }
243 
244  return true;
245  }
246  };
247 
248  //
249  // _Dynamic_array implements a container very similar to std::vector.
250  // However, it exposes a reduced subset of functionality that is
251  // geared towards use in network_link_registry. The array access is not
252  // thread-safe.
253  //
254  template<class _Type>
256  {
257  public:
258 
260 
261  typedef _Type& reference;
262  typedef _Type const& const_reference;
263 
264  //
265  // Construct a dynamic array
266  //
268  {
269  _Init();
270  }
271 
272  //
273  // Release any resources used by dynamic array
274  //
276  {
277  _Clear();
278  }
279 
280  //
281  // Assignment operator. Copy the contents of _Right
282  //
283  _Myt& operator=(const _Myt& _Right)
284  {
285  if (this != &_Right)
286  {
287  // Remove all the elements
288  _Clear();
289 
290  // Allocate space for the new elements
291  size_t _Size = _Right._Size();
292  _Grow(_Size);
293 
294  // Copy over the new elements
295  for (size_t _I=0; _I < _Size; _I++)
296  {
297  _Push_back(_Right[_I]);
298  }
299  }
300 
301  return *this;
302  }
303 
304  //
305  // Clear all the elements in the array
306  //
307  void _Clear()
308  {
309  if (_M_array != NULL)
310  {
311  delete [] _M_array;
312  _Init();
313  }
314  }
315 
316  //
317  // Add an element to the end of the array
318  //
319  void _Push_back(_Type const& _Element)
320  {
321  if (_M_index >= _M_size)
322  {
323  // Not enough space. Grow the array
324  size_t _NewSize = (_M_index + 1) * _S_growthFactor;
325  _Grow(_NewSize);
326  }
327 
329  _M_array[_M_index] = _Element;
330  _M_index++;
331  }
332 
333  //
334  // Index operation. Retrieve an element at the specified index. No bounds check is done.
335  //
336  reference operator[](size_t _Pos)
337  {
338  _CONCRT_ASSERT(_Pos < _M_size);
339  return _M_array[_Pos];
340  }
341 
342  //
343  // Index operation. Retrieve an element at the specified index. No bounds check is done.
344  //
345  const_reference operator[](size_t _Pos) const
346  {
347  _CONCRT_ASSERT(_Pos < _M_size);
348  return _M_array[_Pos];
349  }
350 
351  //
352  // Returns the count of elements in the array
353  //
354  size_t _Size() const
355  {
356  return _M_index;
357  }
358 
359  //
360  // Swap the contents of this array with _Right
361  //
362  void _Swap(_Myt& _Right)
363  {
364  if (this != &_Right)
365  {
366  // Swap the details.
367  _Type * _Array = _M_array;
368  size_t _Index = _M_index;
369  size_t _Size = _M_size;
370 
371  _M_array = _Right._M_array;
372  _M_index = _Right._M_index;
373  _M_size = _Right._M_size;
374 
375  _Right._M_array = _Array;
376  _Right._M_index = _Index;
377  _Right._M_size = _Size;
378  }
379  }
380 
381  private:
382  //
383  // Initialize the array
384  //
385  void _Init()
386  {
387  _M_array = NULL;
388  _M_index = 0;
389  _M_size = 0;
390  }
391 
392  //
393  // Grow the array to the given size. The old elements are copied over.
394  //
395  void _Grow(size_t _NewSize)
396  {
397  _CONCRT_ASSERT( _NewSize > _M_size );
398 
399  _Type * _Array = new _Type[_NewSize];
400 
401  if (_M_array != NULL)
402  {
403  // Copy over the elements
404  for (size_t _I = 0; _I < _M_size; _I++)
405  {
406  _Array[_I] = _M_array[_I];
407  }
408 
409  delete [] _M_array;
410  }
411 
412  _M_array = _Array;
413  _M_size = _NewSize;
414  }
415 
416  // Private data members
417 
418  // Array of elements
420 
421  // Index where the next element should be inserted
422  size_t _M_index;
423 
424  // Capacity of the array.
425  size_t _M_size;
426 
427  static const int _S_growthFactor = 2;
428  };
429 
430  //
431  // Returns an identifier for the given object that could be used
432  // in an ETW trace (call to _Trace_agents)
433  //
434  template <class _Type>
435  __int64 _Trace_agents_get_id(_Type * _PObject)
436  {
437  return reinterpret_cast<__int64>(_PObject);
438  }
439 
440 } // namespace details
441 
442 //**************************************************************************
443 // Public Namespace:
444 //
445 // Anything in the Concurrency namespace is intended for direct client consumption.
446 //
447 //**************************************************************************
448 
449 //
450 // Forward declarations:
451 //
452 template<class _Type> class ISource;
453 template<class _Type> class ITarget;
454 
455 //**************************************************************************
456 // Network link registry
457 //**************************************************************************
458 
459 // Forward declaration for use in the iterator
460 template<class _Block> class network_link_registry;
461 
469 
470 template<class _Block>
472 {
473 public:
474 
477 
478  // Element type
479  typedef _Block* _EType;
480 
481  // Const iterator - iterator shall not be used to modify the links
482  typedef _EType const& const_reference;
483  typedef _EType const* const_pointer;
484 
488 
489  _Network_link_iterator(_MyContainer * _PNetwork_link, size_t _Index) : _M_pNetwork_link(_PNetwork_link), _M_index(_Index), _M_value(NULL)
490  {
492  }
493 
497 
499  {
501  _M_index = _Right._M_index;
502  }
503 
507 
508  _Myt const& operator=(_Myt const& _Right)
509  {
511  _M_index = _Right._M_index;
512  return *this;
513  }
514 
521 
522  const_reference operator*()
523  {
525  return _M_value;
526  }
527 
534 
535  const_pointer operator->() const
536  {
537  return (&**this);
538  }
539 
547 
548  _Myt& operator++()
549  {
550  ++_M_index;
552  return (*this);
553  }
554 
562 
563  _Myt operator++(int)
564  {
565  _Myt _Tmp = *this;
566  ++*this;
567  return (_Tmp);
568  }
569 
570 private:
571 
572  // Pointer to the underlying container (network link registry)
573  _MyContainer * _M_pNetwork_link;
574 
575  // Current index
576  size_t _M_index;
577 
578  // Current value
579  _EType _M_value;
580 };
581 
594 
595 template<class _Block>
597 {
598 public:
599 
603 
604  typedef _Block type;
605 
609 
610  typedef _Block * _EType;
611 
616 
617  typedef _EType const& const_reference;
618 
623 
624  typedef _EType const* const_pointer;
625 
626  // Make the iterators friends so that they can access some of the
627  // private routines such as _Get_element.
628 
629  friend class _Network_link_iterator<_Block>;
630 
635 
637 
645 
646  virtual void add(_EType _Link) = 0;
647 
658 
659  virtual bool remove(_EType _Link) = 0;
660 
672 
673  virtual bool contains(_EType _Link) = 0;
674 
682 
683  virtual size_t count() = 0;
684 
695 
696  virtual iterator begin() = 0;
697 
698 protected:
699 
707 
708  virtual void _Next_index(size_t& _Index) = 0;
709 
720 
721  virtual _EType _Get_element(size_t _Index) const = 0;
722 };
723 
732 
733 template<class _Block>
735 {
736 public:
737 
741 
743  {
744  }
745 
753 
755  {
756  // It is an error to delete link registry with links
757  // still present
758  if (count() != 0)
759  {
760  throw invalid_operation("Deleting link registry before removing all the links");
761  }
762  }
763 
774 
776  {
777  if (_Link == NULL)
778  {
779  return;
780  }
781 
782  // Only one link can be added.
783  if (_M_connectedLink != NULL)
784  {
785  throw invalid_link_target("_Link");
786  }
787 
789  }
790 
800 
801  virtual bool remove(typename network_link_registry<_Block>::_EType _Link)
802  {
803  if ((_Link != NULL) && (_M_connectedLink == _Link))
804  {
806  return true;
807  }
808 
809  return false;
810  }
811 
821 
823  {
824  return ((_Link != NULL) && (_M_connectedLink == _Link));
825  }
826 
833 
834  virtual size_t count()
835  {
836  return (_M_connectedLink == NULL) ? 0 : 1;
837  }
838 
848 
850  {
851  return (typename network_link_registry<_Block>::iterator(this, 0));
852  }
853 
854 protected:
855 
863 
864  virtual void _Next_index(size_t& _Index)
865  {
866  if (_M_connectedLink == NULL)
867  {
868  _Index++;
869  }
870  }
871 
882 
884  {
885  if (_Index == 0)
886  {
887  return _M_connectedLink;
888  }
889 
890  return NULL;
891  }
892 
893 private:
894 
895  // A single pointer is used to hold the link
897 };
898 
907 
908 template<class _Block>
910 {
911 public:
912 
916 
918  {
919  }
920 
928 
930  {
931  // It is an error to delete link registry with links
932  // still present
933  if (count() != 0)
934  {
935  throw invalid_operation("Deleting link registry before removing all the links");
936  }
937  }
938 
951 
952  void set_bound(size_t _MaxLinks)
953  {
954  _CONCRT_ASSERT(count() == 0);
955  _M_maxLinks = _MaxLinks;
956  }
957 
969 
971  {
972  if (_Link == NULL)
973  {
974  return;
975  }
976 
977  _Add(_Link);
978  }
979 
989 
990  virtual bool remove(typename network_link_registry<_Block>::_EType _Link)
991  {
992  if (_Link == NULL)
993  {
994  return false;
995  }
996 
997  return (_Remove(_Link));
998  }
999 
1009 
1011  {
1012  if (_Link == NULL)
1013  {
1014  return false;
1015  }
1016 
1017  return (_Find(_Link) < _M_vector._Size());
1018  }
1019 
1026 
1027  virtual size_t count()
1028  {
1029  return _Count();
1030  }
1031 
1041 
1043  {
1044  return (typename network_link_registry<_Block>::iterator(this, 0));
1045  }
1046 
1047 protected:
1048 
1056 
1057  virtual void _Next_index(size_t& _Index)
1058  {
1059  size_t _Size = _M_vector._Size();
1060  while (_Index < _Size)
1061  {
1062  if (_M_vector[_Index] != NULL)
1063  {
1064  break;
1065  }
1066 
1067  ++_Index;
1068  }
1069  }
1070 
1081 
1083  {
1084  if (_Index < _M_vector._Size())
1085  {
1086  return _M_vector[_Index];
1087  }
1088 
1089  return NULL;
1090  }
1091 
1092 private:
1093 
1100 
1102  {
1103  size_t _Size = _M_vector._Size();
1104  size_t _Insert_pos = 0;
1105 
1106  _CONCRT_ASSERT(_Link != NULL);
1107 
1108  // If max links is set, ensure that inserting the new
1109  // link will not exceed the bound.
1110  if ((_M_maxLinks != _NOT_SET) && ((_Size+1) > (size_t) _M_maxLinks))
1111  {
1112  throw invalid_link_target("_Link");
1113  }
1114 
1115  for (size_t _Index = 0; _Index < _Size; _Index++)
1116  {
1117  if (_M_vector[_Index] != NULL)
1118  {
1119  // We want to find the first NULL entry after all the
1120  // non-NULL entries.
1121  _Insert_pos = _Index + 1;
1122 
1123  // Throw if duplicate entry is found
1124  if (_M_vector[_Index] == _Link)
1125  {
1126  throw invalid_link_target("_Link");
1127  }
1128  }
1129  }
1130 
1131  if (_Insert_pos < _Size)
1132  {
1133  _M_vector[_Insert_pos] = _Link;
1134  }
1135  else
1136  {
1137  _M_vector._Push_back(_Link);
1138  }
1139  }
1140 
1150 
1152  {
1153  _CONCRT_ASSERT(_Link != NULL);
1154 
1155  for (size_t _Index = 0; _Index < _M_vector._Size(); _Index++)
1156  {
1157  if (_M_vector[_Index] == _Link)
1158  {
1159  _M_vector[_Index] = NULL;
1160 
1161  // If max links is set, prevent new additions to the registry
1162  if (_M_maxLinks != _NOT_SET && _M_maxLinks > 0)
1163  {
1164  // Setting the bound to 0. This causes add to always throw.
1165  _M_maxLinks = 0;
1166  }
1167 
1168  return true;
1169  }
1170  }
1171 
1172  return false;
1173  }
1174 
1175 
1185 
1187  {
1188  size_t _Index = 0;
1189  for (_Index = 0; _Index < _M_vector._Size(); _Index++)
1190  {
1191  if (_M_vector[_Index] == _Link)
1192  {
1193  break;
1194  }
1195  }
1196 
1197  return _Index;
1198  }
1199 
1206 
1207  size_t _Count() const
1208  {
1209  size_t _Count = 0;
1210 
1211  for (size_t _Index = 0; _Index < _M_vector._Size(); _Index++)
1212  {
1213  if (_M_vector[_Index] != NULL)
1214  {
1215  _Count++;
1216  }
1217  }
1218 
1219  return _Count;
1220  }
1221 
1222  static const size_t _NOT_SET = SIZE_MAX;
1223 
1224  // Maximum number of links allowed.
1225  size_t _M_maxLinks;
1226 
1227  // ::Concurrency::details::_Dynamic_array is used to hold the links
1229 };
1230 
1231 // Forward declaration for the iterator
1232 template<class _LinkRegistry> class source_link_manager;
1233 
1240 
1241 template<class _LinkRegistry>
1243 {
1244 public:
1245 
1246  typedef typename _LinkRegistry::type _Block;
1247 
1250 
1251  // Element type
1252  typedef _Block* _EType;
1253 
1254  // Const iterator - iterator shall not be used to modify the links
1255  typedef _EType const& const_reference;
1256  typedef _EType const* const_pointer;
1257 
1261 
1262  _Source_link_iterator(_MyContainer * _PNetwork_link, size_t _Index) : _M_pNetwork_link(_PNetwork_link), _M_index(_Index), _M_sentinel(NULL)
1263  {
1264  // Take a snapshot of the link registry. This will reference the registry.
1266  }
1267 
1271 
1273  {
1274  if (_M_pNetwork_link != NULL)
1275  {
1277  }
1278  }
1282 
1284  {
1286  _M_index = _Right._M_index;
1287  _M_array = _Right._M_array;
1288 
1290  }
1291 
1295 
1296  _Myt const& operator=(_Myt const& _Right)
1297  {
1298  _MyContainer * _OldContainer = _M_pNetwork_link;
1299  _CONCRT_ASSERT(_OldContainer != NULL);
1300 
1302  _M_index = _Right._M_index;
1303  _M_array = _Right._M_array;
1304 
1305  if (_OldContainer != _M_pNetwork_link)
1306  {
1307  _OldContainer->release();
1309  }
1310 
1311  return *this;
1312  }
1313 
1320 
1321  const_reference operator*()
1322  {
1323  return _Get(0);
1324  }
1325 
1332 
1333  const_pointer operator->() const
1334  {
1335  return (&**this);
1336  }
1337 
1341 
1342  const_reference operator[](size_t _Pos) const
1343  {
1344  return _Get(_Pos);
1345  }
1346 
1353 
1354  _Myt& operator++()
1355  {
1356  ++_M_index;
1357  return (*this);
1358  }
1359 
1366 
1367  _Myt operator++(int)
1368  {
1369  _Myt _Tmp = *this;
1370  ++*this;
1371  return (_Tmp);
1372  }
1373 
1374 private:
1375 
1376  // Get the element at the given offset.
1377  const_reference _Get(size_t _Pos) const
1378  {
1379  size_t _Index = _M_index + _Pos;
1380  if (_Index >= _M_array._Size())
1381  {
1382  return _M_sentinel;
1383  }
1384 
1385  return _M_array[_Index];
1386  }
1387 
1388  // Array to hold the snapshot of the link registry
1390 
1391  // Pointer to the underlying container (network link registry)
1392  _MyContainer * _M_pNetwork_link;
1393 
1394  // Current index
1395  size_t _M_index;
1396 
1397  // Sentinel value to return on bounds overflow
1398  _EType _M_sentinel;
1399 };
1400 
1417 
1418 template<class _LinkRegistry>
1419 class source_link_manager
1420 {
1421 public:
1422 
1426 
1427  typedef _LinkRegistry type;
1428 
1432 
1433  typedef typename _LinkRegistry::type _Block;
1434 
1438 
1439  typedef std::function<void(_Block *, bool)> _Callback_method;
1440 
1444 
1445  typedef _Block * _EType;
1446 
1451 
1452  typedef _EType const& const_reference;
1453 
1457 
1458  typedef _EType const* const_pointer;
1459 
1460  // Iterator
1461  friend class _Source_link_iterator<_LinkRegistry>;
1462 
1467 
1469 
1473 
1474  typedef ::Concurrency::details::_ReentrantPPLLock _LockType;
1475 
1479 
1481 
1485 
1487  {
1488  }
1489 
1493 
1495  {
1497  }
1498 
1505 
1507  {
1508  _M_pLinkedTarget = _PTarget;
1509  }
1510 
1518 
1519  void set_bound(size_t _MaxLinks)
1520  {
1521  _M_links.set_bound(_MaxLinks);
1522  }
1523 
1530 
1531  void add(_EType _Link)
1532  {
1533  if (_Link == NULL)
1534  {
1535  return;
1536  }
1537 
1538  {
1539  _LockHolder _Lock(_M_lock);
1540  _M_links.add(_Link);
1541 
1542  // We need to add the _Link first and then invoke the
1543  // callback because _Add could throw.
1544 
1545  // As soon as the above lock is released, remove would
1546  // find the link that was added and could unlink it before
1547  // we are able to invoke the notification below. Keeping an
1548  // active iterator would prevent that from happening.
1549  _M_iteratorCount++;
1550  }
1551 
1552  // Acquire a reference on this link by the target
1553  _Link->acquire_ref(_M_pLinkedTarget);
1554 
1555  // Release the active iterator
1556  release();
1557  }
1558 
1568 
1569  bool remove(_EType _Link)
1570  {
1571  bool _Removed = false;
1572  _EType _RemovedLink = NULL;
1574 
1575  if (_Link == NULL)
1576  {
1577  return false;
1578  }
1579 
1580  {
1581  _LockHolder _Lock(_M_lock);
1582  _Removed = _M_links.remove(_Link);
1583 
1584  if (!_Removed)
1585  {
1586  // No change was made
1587  return _Removed;
1588  }
1589 
1590  if (_M_iteratorCount == 0)
1591  {
1592  // Set the removed link to indicate that
1593  // notification callback needs to be invoked.
1594  _RemovedLink = _Link;
1595  }
1596  else
1597  {
1598  // The iterator will complete the pending operation
1600  }
1601  }
1602 
1603  // NOTE: touching "this" pointer is dangerous as soon as the above lock is released
1604 
1605  // Release the reference for this link
1606  if (_RemovedLink != NULL)
1607  {
1608  _RemovedLink->release_ref(_LinkedTarget);
1609  }
1610 
1611  return _Removed;
1612  }
1613 
1617 
1618  void reference()
1619  {
1620  _LockHolder _Lock(_M_lock);
1621  _M_iteratorCount++;
1622  }
1623 
1627 
1628  void release()
1629  {
1632 
1633  {
1634  _LockHolder _Lock(_M_lock);
1636  _M_iteratorCount--;
1637 
1638  if (_M_iteratorCount == 0)
1639  {
1640  if (_M_pendingRemove._Size() > 0)
1641  {
1642  // Snap the pending remove list with the lock held
1643  _M_pendingRemove._Swap(_LinksToRemove);
1644  }
1645  }
1646  }
1647 
1648  // NOTE: touching "this" pointer is dangerous as soon as the above lock is released
1649 
1650  // Release the references
1651  size_t _Size = _LinksToRemove._Size();
1652 
1653  for (size_t _I=0; _I < _Size; _I++)
1654  {
1655  _LinksToRemove[_I]->release_ref(_LinkedTarget);
1656  }
1657  }
1658 
1669 
1670  bool contains(_EType _Link)
1671  {
1672  _LockHolder _Lock(_M_lock);
1673  return _M_links.contains(_Link);
1674  }
1675 
1682 
1683  size_t count()
1684  {
1685  _LockHolder _Lock(_M_lock);
1686  return _M_links.count();
1687  }
1688 
1689 
1699 
1700  iterator begin()
1701  {
1702  return (iterator(this, 0));
1703  }
1704 
1705 private:
1706 
1707  // Called by the iterator. This routine takes a snapshot of the links
1708  // in the registry and copies it to the array provided.
1710  {
1711  _LockHolder _Lock(_M_lock);
1712  _M_iteratorCount++;
1713 
1714  for(auto _Link = _M_links.begin(); *_Link != NULL; ++_Link)
1715  {
1716  _Array._Push_back(*_Link);
1717  }
1718  }
1719 
1720  // Internal lock used for synchronization
1721  _LockType _M_lock;
1722 
1723  // Count to indicate that an iterator is active
1724  volatile long _M_iteratorCount;
1725 
1726  // A vector of all pending link remove operations
1728 
1729  // Underlying link registry
1730  _LinkRegistry _M_links;
1731 
1732  // Target block holding this source link manager
1734 };
1735 
1739 
1741 {
1745 
1750 
1755 
1760 
1762 };
1763 
1774 
1775 template<class _Type>
1777 {
1778  friend class ::Concurrency::details::_Queue<message<_Type>>;
1779 
1780 public:
1792 
1793  message(_Type const &_P) : payload(_P), _M_pNext(NULL), _M_refCount(0) { }
1794 
1809 
1810  message(_Type const &_P, runtime_object_identity _Id)
1811  : ::Concurrency::details::_Runtime_object(_Id), payload(_P), _M_pNext(NULL), _M_refCount(0)
1812  {
1813  }
1814 
1826 
1827  message(message const & _Msg) : payload(_Msg.payload), _M_pNext(NULL), _M_refCount(0) { }
1828 
1839 
1840  message(_In_ message const * _Msg) : payload((_Msg == NULL) ? NULL : _Msg->payload), _M_pNext(NULL), _M_refCount(0)
1841  {
1842  if (_Msg == NULL)
1843  {
1844  throw std::invalid_argument("_Msg");
1845  }
1846  }
1847 
1851 
1852  virtual ~message() { }
1853 
1860 
1861  runtime_object_identity msg_id() const
1862  {
1863  return _M_id;
1864  }
1865 
1869 
1871 
1879 
1880  long add_ref()
1881  {
1883  }
1884 
1892 
1893  long remove_ref()
1894  {
1896  }
1897 
1901 
1902  typedef _Type type;
1903 
1904 private:
1905  // The intrusive next pointer used by blocks that need
1906  // to chain messages it's holding together
1908 
1909  // Avoid warnings about not generating assignment operators.
1910  message<_Type> const &operator =(message<_Type> const &);
1911 
1912  // A reference count for the message
1913  volatile long _M_refCount;
1914 };
1915 
1916 //**************************************************************************
1917 // Message processor:
1918 //**************************************************************************
1919 
1928 
1929 template<class _Type>
1931 {
1932 public:
1936 
1937  typedef _Type type;
1938 
1948 
1949  virtual void async_send(_Inout_opt_ message<_Type> * _Msg) = 0;
1950 
1960 
1961  virtual void sync_send(_Inout_opt_ message<_Type> * _Msg) = 0;
1962 
1969 
1970  virtual void wait() = 0;
1971 
1972 protected:
1973 
1982 
1983  virtual void process_incoming_message() = 0;
1984 
1992 
1993  static void __cdecl _Process_incoming_message_wrapper(void * _Data)
1994  {
1995  message_processor<_Type> * _PMessageProcessor = (message_processor<_Type> *) _Data;
1996  _PMessageProcessor->process_incoming_message();
1997  }
1998 };
1999 
2007 
2008 template<class _Type>
2010 {
2011 public:
2015 
2016  typedef std::function<void(message<_Type> *)> _Handler_method;
2017 
2021 
2022  typedef std::function<void(void)> _Propagator_method;
2023 
2027 
2028  typedef _Type type;
2029 
2037 
2039  _M_queuedDataCount(0),
2040  _M_stopProcessing(1),
2041  _M_lwtCount(0),
2044  _M_handler(nullptr),
2045  _M_processor(nullptr),
2046  _M_propagator(nullptr)
2047  {
2048  }
2049 
2056 
2058  {
2059  wait();
2060  }
2061 
2077 
2078  void initialize(_Inout_opt_ Scheduler * _PScheduler, _Inout_opt_ ScheduleGroup * _PScheduleGroup, _Handler_method const& _Handler)
2079  {
2080  _M_pScheduler = _PScheduler;
2081  _M_pScheduleGroup = _PScheduleGroup;
2082  _M_handler = _Handler;
2083  _M_stopProcessing = 0;
2084  }
2085 
2095  virtual void initialize_batched_processing(_Handler_method const& _Processor, _Propagator_method const& _Propagator)
2096  {
2097  _M_processor = _Processor;
2098  _M_propagator = _Propagator;
2099  }
2100 
2108 
2109  virtual void sync_send(_Inout_opt_ message<_Type> * _Msg)
2110  {
2111  if (_M_handler == NULL)
2112  {
2113  throw invalid_operation("sync_send called without registering a callback");
2114  }
2115 
2116  _Sync_send_helper(_Msg);
2117  }
2118 
2126 
2128  {
2129  if (_M_handler == NULL)
2130  {
2131  throw invalid_operation("async_send called without registering a callback");
2132  }
2133 
2134  //
2135  // If there is a message to send, enqueue it in the processing queue.
2136  // async_send can be sent a NULL message if the block wishes to reprocess
2137  // the messages that are in its queue. For example, an unbounded_buffer
2138  // that has its head node released after reservation.
2139  //
2140  if (_Msg != NULL)
2141  {
2142  _M_queuedMessages.push(_Msg);
2143  }
2144 
2146  {
2147  // Indicate that an LWT is in progress. This will cause the
2148  // destructor to block.
2150 
2151  if (_M_stopProcessing == 0)
2152  {
2154 
2156 
2158 #ifdef _CRT_USE_WINAPI_FAMILY_DESKTOP_APP
2159  if (_M_pScheduleGroup != NULL)
2160  {
2161  _M_pScheduleGroup->ScheduleTask(_Proc, this);
2162  }
2163  else if (_M_pScheduler != NULL)
2164  {
2165  _M_pScheduler->ScheduleTask(_Proc, this);
2166  }
2167  else
2168  {
2169 #endif /* _CRT_USE_WINAPI_FAMILY_DESKTOP_APP */
2171 #ifdef _CRT_USE_WINAPI_FAMILY_DESKTOP_APP
2172  }
2173 #endif /* _CRT_USE_WINAPI_FAMILY_DESKTOP_APP */
2174 
2175  // The LWT will decrement _M_lwtCount.
2176  return;
2177  }
2178 
2179  // If we get here then no task was scheduled. Decrement LWT count to reflect this fact
2181  }
2182  }
2183 
2188 
2189  virtual void wait()
2190  {
2191  // Cease processing of any new messages
2193 
2194  // This spin makes sure all previously initiated message processings
2195  // will still process correctly. As soon as this count reaches zero, we can
2196  // proceed with the message block destructor.
2198  while(_M_lwtCount != 0)
2199  {
2200  spinWait._SpinOnce();
2201  }
2202 
2203  // Synchronize with sync_send
2204  {
2205  _NR_lock _Lock(_M_asyncSendLock);
2207  }
2208 
2209  }
2210 
2211 protected:
2212 
2217 
2219  {
2223 
2224  // Indicate that an LWT completed
2226 
2227  // Do not access any members here. If the count goes to
2228  // 0 as a result of the above decrement, the object
2229  // could be immediately deleted.
2230  }
2231 
2232  private:
2233 
2235  {
2236  message<_Type> * _Msg = NULL;
2237  while (_M_queuedMessages.try_pop(_Msg))
2238  {
2239  delete _Msg;
2240  }
2241  }
2242 
2244  {
2245  _NR_lock _Lock(_M_asyncSendLock);
2246 
2247  // Message block destructors sets the _M_stopProcessing flag to stop
2248  // processing any more messages. This is required to guarantee
2249  // that the destructor's wait_for_async_sends will complete
2250  if (_M_stopProcessing == 0)
2251  {
2252  if (_M_queuedDataCount > 0)
2253  {
2254  long _Count = _InterlockedExchange((volatile long *) &_M_queuedDataCount, 0);
2255  _Invoke_handler(_Count);
2256  }
2257 
2258  _Invoke_handler(_Msg);
2259  }
2260  else
2261  {
2262  // Destructor is running. Do not process the message
2263  // Delete the msg, if any.
2264  if (_Msg != NULL)
2265  {
2266  delete _Msg;
2267  }
2268  }
2269 
2270  }
2271 
2272  // Helper function to dequeue and process messages to any targets
2274  {
2275  _NR_lock _Lock(_M_asyncSendLock);
2276 
2277  long _Messages_processed = 0;
2278 
2279  // Do batched processing of messages
2280  // Read off the number of messages to process in this iteration by snapping a count
2281  volatile long _Count = _M_queuedDataCount;
2282  bool _StopProcessing = false;
2283 
2284  // This count could be 0 if there was both a synchronous and asynchronous
2285  // send occurring. One of them could have sent all of the messages for the other
2286  while (_Count > 0)
2287  {
2288  // Process _Count number of messages
2289  _Invoke_handler(_Count);
2290  _Messages_processed += _Count;
2291 
2292  // Subtract the count and see if there are new things to process
2293  volatile long _Orig = _InterlockedExchangeAdd((volatile long *) &_M_queuedDataCount, -_Count);
2294  _CONCRT_ASSERT(_Orig >= _Count);
2295  if (_Orig == _Count)
2296  {
2297  // Because _Count did not change, we processed everything there is to process
2298  break;
2299  }
2300 
2301  if (_StopProcessing)
2302  {
2303  break;
2304  }
2305 
2306  // After reading the flag process the currently queued messages
2307  // Any messages received after we observe this flag (to be set) will not
2308  // be processed.
2309  _StopProcessing = (_M_stopProcessing == 0) ? false : true;
2310 
2311  // Snap the count and try to process more
2312  _Count = _M_queuedDataCount;
2313  }
2314 
2315  return _Messages_processed;
2316  }
2317 
2318  // Invoke the handler in the message block for the given
2319  // count
2321  {
2322  // Process _Count number of messages
2323  for(int _I = 0; _I < _Count; _I++)
2324  {
2325  message<_Type> * _Msg = NULL;
2326  _M_queuedMessages.try_pop(_Msg);
2327  if (_M_processor == NULL)
2328  {
2329  // If a processor function does not exist, the message processor is using single
2330  // message processing rather than batched processing. There should also be no
2331  // propagator function defined in this case.
2333  _M_handler(_Msg);
2334  }
2335  else
2336  {
2337  // Use the batched message processing function
2338  _M_processor(_Msg);
2339  }
2340  }
2341 
2342  // Call the handler which propagates the message(s)
2343  if (_M_propagator != NULL)
2344  {
2345  _M_propagator();
2346  }
2347  }
2348 
2349  // Invoke the message block handler for the given message
2351  {
2352  if (_M_processor == NULL)
2353  {
2354  // If a processor function does not exist, the message processor is using single
2355  // message processing rather than batched processing. There should also be no
2356  // propagator function defined in this case.
2358  _M_handler(_Msg);
2359  }
2360  else
2361  {
2362  // Use the batched message processing function
2363  _M_processor(_Msg);
2364 
2365  // Call the handler which propagates the message(s)
2366  if (_M_propagator != NULL)
2367  {
2368  _M_propagator();
2369  }
2370  }
2371  }
2372 
2373  private:
2377 
2379 
2383 
2385 
2390 
2391  volatile long _M_queuedDataCount;
2392 
2396 
2397  Scheduler * _M_pScheduler;
2398 
2402 
2403  ScheduleGroup * _M_pScheduleGroup;
2404 
2409 
2410  volatile long _M_stopProcessing;
2411 
2415 
2416  volatile long _M_lwtCount;
2417 
2421 
2423 
2427 
2429 
2433 
2434  _Propagator_method _M_propagator;
2435 };
2436 
2448 
2449 template<class _Type>
2450 class ITarget
2451 {
2452  //
2453  // ISource<T> is a friend class because calls to Source->link_target()
2454  // and Source->unlink_target() need to call their respective
2455  // Target->link_source() and Target->unlink_source() on the block they are
2456  // linking/unlinking. Those functions are private here because we don't
2457  // want users calling link_source() or unlink_source() directly. link_source/
2458  // unlink_source don't call respective link_target/unlink_target because an
2459  // infinite loop would occur.
2460  //
2461  friend class ISource<_Type>;
2462 
2463 public:
2467 
2468  virtual ~ITarget() {}
2469 
2470  // It is important that calls to propagate do *not* take the same lock on an
2471  // internal message structure that is used by Consume and the LWT. Doing so could
2472  // result in a deadlock with the Consume call.
2473 
2492 
2494 
2516 
2518 
2529 
2531  {
2532  return false;
2533  }
2534 
2538 
2539  typedef _Type type;
2540 
2545 
2546  typedef std::function<bool(_Type const&)> filter_method;
2547 
2548 protected:
2549 
2561 
2562  virtual void link_source(_Inout_ ISource<_Type> * _PSource) = 0;
2563 
2575 
2576  virtual void unlink_source(_Inout_ ISource<_Type> * _PSource) = 0;
2577 
2581 
2582  virtual void unlink_sources() = 0;
2583 };
2584 
2596 
2597 template<class _Type>
2598 class ISource
2599 {
2600 public:
2604 
2605  virtual ~ISource() {}
2606 
2613 
2614  virtual void link_target(_Inout_ ITarget<_Type> * _PTarget) = 0;
2615 
2623 
2624  virtual void unlink_target(_Inout_ ITarget<_Type> * _PTarget) = 0;
2625 
2630 
2631  virtual void unlink_targets() = 0;
2632 
2651 
2652  virtual message<_Type> * accept(runtime_object_identity _MsgId, _Inout_ ITarget<_Type> * _PTarget) = 0;
2653 
2672 
2673  virtual bool reserve(runtime_object_identity _MsgId, _Inout_ ITarget<_Type> * _PTarget) = 0;
2674 
2692 
2693  virtual message<_Type> * consume(runtime_object_identity _MsgId, _Inout_ ITarget<_Type> * _PTarget) = 0;
2694 
2704 
2705  virtual void release(runtime_object_identity _MsgId, _Inout_ ITarget<_Type> * _PTarget) = 0;
2706 
2717 
2718  virtual void acquire_ref(_Inout_ ITarget<_Type> * _PTarget) = 0;
2719 
2730 
2731  virtual void release_ref(_Inout_ ITarget<_Type> * _PTarget) = 0;
2732 
2736 
2738 
2739 protected:
2753 
2755  {
2756  _PLinkFrom->link_source(this);
2757  }
2758 
2772 
2774  {
2775  _PUnlinkFrom->unlink_source(this);
2776  }
2777 };
2778 
2779 //**************************************************************************
2780 // Direct Messaging APIs:
2781 //**************************************************************************
2782 
2806 
2807 template <class _Type>
2808 _Type _Receive_impl(ISource<_Type> * _Src, unsigned int _Timeout, typename ITarget<_Type>::filter_method const* _Filter_proc)
2809 {
2810  // The Blocking Recipient messaging block class is internal to the receive function
2811  class _Blocking_recipient : public ITarget<_Type>
2812  {
2813  public:
2814  // Create an Blocking Recipient
2815  _Blocking_recipient(ISource<_Type> * _PSource,
2816  unsigned int _Timeout = COOPERATIVE_TIMEOUT_INFINITE) :
2817  _M_pFilter(NULL), _M_pConnectedTo(NULL), _M_pMessage(NULL), _M_fState(_NotInitialized), _M_timeout(_Timeout)
2818  {
2819  _Connect(_PSource);
2820  }
2821 
2822  // Create an Blocking Recipient
2823  _Blocking_recipient(ISource<_Type> * _PSource,
2824  typename ITarget<_Type>::filter_method const& _Filter,
2825  unsigned int _Timeout = COOPERATIVE_TIMEOUT_INFINITE) :
2826  _M_pFilter(NULL), _M_pConnectedTo(NULL), _M_pMessage(NULL), _M_fState(_NotInitialized), _M_timeout(_Timeout)
2827  {
2828  if (_Filter != NULL)
2829  {
2830  _M_pFilter = new typename ITarget<_Type>::filter_method(_Filter);
2831  }
2832 
2833  _Connect(_PSource);
2834  }
2835 
2836  // Cleans up any resources that may have been created by the BlockingRecipient.
2837  ~_Blocking_recipient()
2838  {
2839  _Disconnect();
2840 
2841  delete _M_pFilter;
2842  delete _M_pMessage;
2843  }
2844 
2845  // Gets the value of the message sent to this BlockingRecipient. Blocks by
2846  // spinning until a message has arrived.
2847  _Type _Value()
2848  {
2849  _Wait_for_message();
2850 
2851  return _M_pMessage->payload;
2852  }
2853 
2854  // The main propagation function for ITarget blocks. Called by a source
2855  // block, generally within an asynchronous task to send messages to its targets.
2856  virtual message_status propagate(message<_Type> * _PMessage, ISource<_Type> * _PSource)
2857  {
2858  // Throw exception if the message being propagated to this block is NULL
2859  if (_PMessage == NULL)
2860  {
2861  throw std::invalid_argument("_PMessage");
2862  }
2863 
2864  if (_PSource == NULL)
2865  {
2866  throw std::invalid_argument("_PSource");
2867  }
2868 
2869  // Reject if the recipient has already received a message
2870  if (_M_fState == _Initialized)
2871  {
2872  return declined;
2873  }
2874 
2875  // Reject if the message does not meet the filter requirements
2876  if (_M_pFilter != NULL && !(*_M_pFilter)(_PMessage->payload))
2877  {
2878  return declined;
2879  }
2880 
2881  // Accept the message
2882  _CONCRT_ASSERT(_PSource != NULL);
2883  _M_pMessage = _PSource->accept(_PMessage->msg_id(), this);
2884 
2885  if (_M_pMessage != NULL)
2886  {
2887  // Set the initialized flag on this block
2888  if (_InterlockedExchange(&_M_fState, _Initialized) == _Blocked)
2889  {
2890  _M_ev.set();
2891  }
2892 
2893  return accepted;
2894  }
2895 
2896  return missed;
2897  }
2898 
2899  // Synchronously sends a message to this block. When this function completes the message will
2900  // already have propagated into the block.
2901  virtual message_status send(message<_Type> * _PMessage, ISource<_Type> * _PSource)
2902  {
2903  if (_PMessage == NULL)
2904  {
2905  throw std::invalid_argument("_PMessage");
2906  }
2907 
2908  if (_PSource == NULL)
2909  {
2910  throw std::invalid_argument("_PSource");
2911  }
2912 
2913  // Only the connected source is allowed to send messages
2914  // to the blocking recipient. Decline messages without
2915  // a source.
2916 
2917  return declined;
2918  }
2919 
2920  private:
2921 
2922  // Link a source block
2923  virtual void link_source(ISource<_Type> * _PSrc)
2924  {
2925  _M_pConnectedTo = _PSrc;
2926  _PSrc->acquire_ref(this);
2927  }
2928 
2929  // Remove a source messaging block for this BlockingRecipient
2930  virtual void unlink_source(ISource<_Type> * _PSource)
2931  {
2932  if (_InterlockedCompareExchangePointer(reinterpret_cast<void *volatile *>(&_M_pConnectedTo), (void *)NULL, _PSource) == _PSource)
2933  {
2934  _PSource->release_ref(this);
2935  }
2936  }
2937 
2938  // Remove the source messaging block for this BlockingRecipient
2939  virtual void unlink_sources()
2940  {
2941  ISource<_Type> * _PSource = reinterpret_cast<ISource<_Type> *>(_InterlockedExchangePointer(reinterpret_cast<void *volatile *>(&_M_pConnectedTo), (void *)NULL));
2942  if (_PSource != NULL)
2943  {
2944  _PSource->unlink_target(this);
2945  _PSource->release_ref(this);
2946  }
2947  }
2948 
2949 
2950  // Connect the blocking recipient to the source
2951  void _Connect(ISource<_Type> * _PSource)
2952  {
2953  if (_PSource == NULL)
2954  {
2955  throw std::invalid_argument("_PSource");
2956  }
2957 
2958  _PSource->link_target(this);
2959  }
2960 
2961  // Cleanup the connection to the blocking recipient's source. There is no need
2962  // to do anything about the associated context.
2963  void _Disconnect()
2964  {
2965  unlink_sources();
2966  }
2967 
2968  // Internal function used to block while waiting for a message to arrive
2969  // at this BlockingRecipient
2970  void _Wait_for_message()
2971  {
2972  bool _Timeout = false;
2973 
2974  // If we haven't received a message yet, cooperatively block.
2975  if (_InterlockedCompareExchange(&_M_fState, _Blocked, _NotInitialized) == _NotInitialized)
2976  {
2977  if (_M_ev.wait(_M_timeout) == COOPERATIVE_WAIT_TIMEOUT)
2978  {
2979  _Timeout = true;
2980  }
2981  }
2982 
2983  // Unlinking from our source guarantees that there are no threads in propagate
2984  _Disconnect();
2985 
2986  if (_M_fState != _Initialized)
2987  {
2988  // We had to have timed out if we came out of the wait
2989  // without being initialized.
2990  _CONCRT_ASSERT(_Timeout);
2991 
2992  throw operation_timed_out();
2993  }
2994  }
2995 
2996  // States for this block
2997  enum
2998  {
2999  _NotInitialized,
3000  _Blocked,
3001  _Initialized
3002  };
3003 
3004  volatile long _M_fState;
3005 
3006  // The source messaging block connected to this Recipient
3007  ISource<_Type> * _M_pConnectedTo;
3008 
3009  // The message that was received
3010  message<_Type> * volatile _M_pMessage;
3011 
3012  // The timeout.
3013  unsigned int _M_timeout;
3014 
3015  // The event we wait upon
3016  event _M_ev;
3017 
3018  // The filter that is called on this block before accepting a message
3019  typename ITarget<_Type>::filter_method * _M_pFilter;
3020  };
3021 
3022  if (_Filter_proc != NULL)
3023  {
3024  _Blocking_recipient _Recipient(_Src, *_Filter_proc, _Timeout);
3025  return _Recipient._Value();
3026  }
3027  else
3028  {
3029  _Blocking_recipient _Recipient(_Src, _Timeout);
3030  return _Recipient._Value();
3031  }
3032 }
3033 
3061 
3062 template <class _Type>
3064 {
3065  return _Receive_impl(_Src, _Timeout, NULL);
3066 }
3067 
3098 
3099 template <class _Type>
3100 _Type receive(_Inout_ ISource<_Type> * _Src, typename ITarget<_Type>::filter_method const& _Filter_proc, unsigned int _Timeout = COOPERATIVE_TIMEOUT_INFINITE)
3101 {
3102  return _Receive_impl(_Src, _Timeout, &_Filter_proc);
3103 }
3104 
3132 
3133 template <class _Type>
3135 {
3136  return _Receive_impl(&_Src, _Timeout, NULL);
3137 }
3138 
3169 
3170 template <class _Type>
3171 _Type receive(ISource<_Type> &_Src, typename ITarget<_Type>::filter_method const& _Filter_proc, unsigned int _Timeout = COOPERATIVE_TIMEOUT_INFINITE)
3172 {
3173  return _Receive_impl(&_Src, _Timeout, &_Filter_proc);
3174 }
3175 
3197 
3198 template <class _Type>
3199 bool _Try_receive_impl(ISource<_Type> * _Src, _Type & _value, typename ITarget<_Type>::filter_method const * _Filter_proc)
3200 {
3201  // The Immediate Recipient messaging block class is internal to the receive function
3202  class _Immediate_recipient : public ITarget<_Type>
3203  {
3204  public:
3205  // Create an Immediate Recipient
3206  _Immediate_recipient(ISource<_Type> * _PSource) :
3207  _M_pFilter(NULL), _M_pConnectedTo(NULL), _M_pMessage(NULL), _M_isInitialized(0)
3208  {
3209  _Connect(_PSource);
3210  }
3211 
3212  // Create an Immediate Recipient
3213  _Immediate_recipient(ISource<_Type> * _PSource,
3214  typename ITarget<_Type>::filter_method const& _Filter) :
3215  _M_pFilter(NULL), _M_pConnectedTo(NULL), _M_pMessage(NULL), _M_isInitialized(0)
3216  {
3217  if (_Filter != NULL)
3218  {
3219  _M_pFilter = new typename ITarget<_Type>::filter_method(_Filter);
3220  }
3221 
3222  _Connect(_PSource);
3223  }
3224 
3225  // Cleans up any resources that may have been created by the ImmediateRecipient.
3226  ~_Immediate_recipient()
3227  {
3228  _Disconnect();
3229 
3230  delete _M_pFilter;
3231  delete _M_pMessage;
3232  }
3233 
3234  // Gets the value of the message sent to this ImmediateRecipient.
3235  bool _Value(_Type & _value)
3236  {
3237  // Unlinking from our source guarantees that there are no threads in propagate
3238  _Disconnect();
3239 
3240  if (_M_pMessage != NULL)
3241  {
3242  _value = _M_pMessage->payload;
3243  return true;
3244  }
3245 
3246  return false;
3247  }
3248 
3249  // The main propagation function for ITarget blocks. Called by a source
3250  // block, generally within an asynchronous task to send messages to its targets.
3251  virtual message_status propagate(message<_Type> * _PMessage, ISource<_Type> * _PSource)
3252  {
3254 
3255  // Throw exception if the message being propagated to this block is NULL
3256  if (_PMessage == NULL)
3257  {
3258  throw std::invalid_argument("_PMessage");
3259  }
3260 
3261  if (_PSource == NULL)
3262  {
3263  throw std::invalid_argument("_PSource");
3264  }
3265 
3266  // Reject if the recipient has already received a message
3267  if (_M_isInitialized == 1)
3268  {
3269  return declined;
3270  }
3271 
3272  // Reject if the message does not meet the filter requirements
3273  if (_M_pFilter != NULL && !(*_M_pFilter)(_PMessage->payload))
3274  {
3275  return declined;
3276  }
3277 
3278  // Accept the message
3279  _CONCRT_ASSERT(_PSource != NULL);
3280  _M_pMessage = _PSource->accept(_PMessage->msg_id(), this);
3281 
3282  // Set the initialized flag on this block
3283 
3284  if (_M_pMessage != NULL)
3285  {
3286  // Fence to ensure that the above update to _M_pMessage is visible
3287  _InterlockedExchange(&_M_isInitialized, 1);
3288  _Result = accepted;
3289  }
3290  else
3291  {
3292  _Result = missed;
3293  }
3294 
3295  return _Result;
3296  }
3297 
3298 
3299  // Synchronously sends a message to this block. When this function completes the message will
3300  // already have propagated into the block.
3301  virtual message_status send(message<_Type> * _PMessage, ISource<_Type> * _PSource)
3302  {
3303  if (_PMessage == NULL)
3304  {
3305  throw std::invalid_argument("_PMessage");
3306  }
3307 
3308  if (_PSource == NULL)
3309  {
3310  throw std::invalid_argument("_PSource");
3311  }
3312 
3313  // Only the connected source is allowed to send messages
3314  // to the blocking recipient. Decline messages without
3315  // a source.
3316 
3317  return declined;
3318  }
3319 
3320  private:
3321 
3322  // Add a source messaging block
3323  virtual void link_source(ISource<_Type> * _PSrc)
3324  {
3325  _M_pConnectedTo = _PSrc;
3326  _PSrc->acquire_ref(this);
3327  }
3328 
3329  // Remove a source messaging block for this BlockingRecipient
3330  virtual void unlink_source(ISource<_Type> * _PSource)
3331  {
3332  if (_InterlockedCompareExchangePointer(reinterpret_cast<void *volatile *>(&_M_pConnectedTo), (void *)NULL, _PSource) == _PSource)
3333  {
3334  _PSource->release_ref(this);
3335  }
3336  }
3337 
3338  // Remove the source messaging block for this BlockingRecipient
3339  virtual void unlink_sources()
3340  {
3341  ISource<_Type> * _PSource = reinterpret_cast<ISource<_Type> *>(_InterlockedExchangePointer(reinterpret_cast<void *volatile *>(&_M_pConnectedTo), (void *)NULL));
3342  if (_PSource != NULL)
3343  {
3344  _PSource->unlink_target(this);
3345  _PSource->release_ref(this);
3346  }
3347  }
3348 
3349  // Connect to a source block
3350  void _Connect(ISource<_Type> * _PSource)
3351  {
3352  if (_PSource == NULL)
3353  {
3354  throw std::invalid_argument("_PSource");
3355  }
3356 
3357  _CONCRT_ASSERT(_M_isInitialized == 0);
3358 
3359  _PSource->link_target(this);
3360  }
3361 
3362  //
3363  // Cleanup the connection to the trigger's source. There is no need
3364  // to do anything about the associated context.
3365  //
3366  void _Disconnect()
3367  {
3368  unlink_sources();
3369  }
3370 
3371  // The source messaging block connected to this Recipient
3372  ISource<_Type> * _M_pConnectedTo;
3373 
3374  // The message that was received
3375  message<_Type> * volatile _M_pMessage;
3376 
3377  // A flag for whether or not this block has been initialized with a value
3378  volatile long _M_isInitialized;
3379 
3380  // The filter that is called on this block before accepting a message
3381  typename ITarget<_Type>::filter_method * _M_pFilter;
3382  };
3383 
3384  if (_Filter_proc != NULL)
3385  {
3386  _Immediate_recipient _Recipient(_Src, *_Filter_proc);
3387  return _Recipient._Value(_value);
3388  }
3389  else
3390  {
3391  _Immediate_recipient _Recipient(_Src);
3392  return _Recipient._Value(_value);
3393  }
3394 }
3395 
3419 
3420 template <class _Type>
3422 {
3423  return _Try_receive_impl(_Src, _value, NULL);
3424 }
3425 
3452 
3453 template <class _Type>
3454 bool try_receive(_Inout_ ISource<_Type> * _Src, _Type & _value, typename ITarget<_Type>::filter_method const& _Filter_proc)
3455 {
3456  return _Try_receive_impl(_Src, _value, &_Filter_proc);
3457 }
3458 
3482 
3483 template <class _Type>
3485 {
3486  return _Try_receive_impl(&_Src, _value, NULL);
3487 }
3488 
3515 
3516 template <class _Type>
3517 bool try_receive(ISource<_Type> & _Src, _Type & _value, typename ITarget<_Type>::filter_method const& _Filter_proc)
3518 {
3519  return _Try_receive_impl(&_Src, _value, &_Filter_proc);
3520 }
3521 
3522 namespace details
3523 {
3524  //**************************************************************************
3525  // Supporting blocks for send and asend
3526  //**************************************************************************
3527 
3528  // Originator block that pushes messages to a target
3529  template <class _Type>
3530  class _AnonymousOriginator : public ISource<_Type>
3531  {
3532  public:
3533 
3535 
3536  // Create an Originator
3538  {
3539  }
3540 
3541  // Cleans up any resources that may have been created by the Originator.
3543  {
3544  delete _M_pMessage;
3545  }
3546 
3547  // Removes a target messaging block for this Originator
3549  {
3550  throw invalid_operation("unlink_target is not supported on _AnonymousOriginator");
3551  }
3552 
3553  // Removes the target messaging block from this Originator
3554  virtual void unlink_targets()
3555  {
3556  throw invalid_operation("unlink_targets is not supported on _AnonymousOriginator");
3557  }
3558 
3559  // Accept on this Originator is called by a target to take ownership of a
3560  // propagated message
3561  virtual message<_Type> * accept(runtime_object_identity _MsgId, ITarget<_Type> * _PTarget)
3562  {
3563  if (_PTarget != _M_pTarget)
3564  {
3565  return NULL;
3566  }
3567 
3568  if (_M_pMessage == NULL || _M_pMessage->msg_id() != _MsgId)
3569  {
3570  return NULL;
3571  }
3572 
3573  // The IDs match. Actually transfer ownership of the message and
3574  // unlink away from the target
3576 
3577  // The ownership of this message has changed. Set the internal pointer to NULL
3578  // so it won't be deleted in the destructor
3579  _M_pMessage = NULL;
3580 
3581  return _Result;
3582  }
3583 
3584  // Reserve shall not be called by blocks that supports push
3585  virtual bool reserve(runtime_object_identity, ITarget<_Type> *)
3586  {
3587  throw invalid_operation("reserve is not supported on _AnonymousOriginator");
3588  }
3589 
3590  // Consume shall not be called by blocks that supports push
3591  virtual message<_Type> * consume(runtime_object_identity, ITarget<_Type> *)
3592  {
3593  throw invalid_operation("consume is not supported on _AnonymousOriginator");
3594  }
3595 
3596  // Release needs to be defined for ISource blocks, but Originator doesn't need to
3597  // do anything for reservation release because there can only be one target block to read
3598  // the data at a later time.
3599  virtual void release(runtime_object_identity, ITarget<_Type> *)
3600  {
3601  throw invalid_operation("release is not supported on _AnonymousOriginator");
3602  }
3603 
3605  {
3606  throw invalid_operation("acquire_ref is not supported on _AnonymousOriginator");
3607  }
3608 
3610  {
3611  throw invalid_operation("release_ref is not supported on _AnonymousOriginator");
3612  }
3613 
3614  private:
3615  friend class _Originator;
3616 
3617  // Send the given value to the target
3618  bool _internal_send(ITarget<_Type> * _PTarget, _Type const & _Value)
3619  {
3620  _M_pTarget = _PTarget;
3621 
3623  _CONCRT_ASSERT(_M_pTarget->supports_anonymous_source());
3624 
3625  // Create the message
3626  message_status _Status = declined;
3627  message<_Type> * _Msg = new message<_Type>(_Value);
3628 
3630  _M_pMessage = _Msg;
3631 
3632  // Send the message
3633  _Status = _M_pTarget->send(_M_pMessage, this);
3634 
3635  // If the message is declined, the destructor will
3636  // delete the message
3637 
3638  // status should not be postponed.
3639  _CONCRT_ASSERT(_Status != postponed);
3640  return (_Status == accepted);
3641  }
3642 
3643  bool _internal_asend(ITarget<_Type> * _PTarget, _Type const & _Value)
3644  {
3645  _M_pTarget = _PTarget;
3646 
3648  _CONCRT_ASSERT(_M_pTarget->supports_anonymous_source());
3649 
3650  // Create the message
3651  message_status _Status = declined;
3652  message<_Type> * _Msg = new message<_Type>(_Value);
3653 
3655  _M_pMessage = _Msg;
3656 
3657  // Send the message
3658  _Status = _M_pTarget->propagate(_M_pMessage, this);
3659 
3660  // If the message is declined, the destructor will
3661  // delete the message
3662 
3663  // status should not be postponed.
3664  if (_Status == postponed)
3665  {
3666  throw invalid_operation("Messages offered by _AnonymousOriginator shall not be postponed");
3667  }
3668 
3669  return (_Status == accepted);
3670  }
3671 
3672  // Add a target messaging block for this Originator
3673  virtual void link_target(ITarget<_Type> *)
3674  {
3675  throw invalid_operation("link_target is not supported on _AnonymousOriginator");
3676  }
3677 
3678  // The message that will be propagated by the Originator
3680 
3681  // The single target for this block
3683  };
3684 
3685  // The Originator messaging block class is internal to the send function.
3686  template <class _Type>
3687  class _SyncOriginator : public ISource<_Type>
3688  {
3689  public:
3690 
3692 
3693  // Create an Originator
3695  _M_pMessage(NULL),
3698  {
3699  }
3700 
3701  // Cleans up any resources that may have been created by the Originator.
3703  {
3704  unlink_targets();
3705 
3706  _Wait_on_ref();
3707 
3708  delete _M_pMessage;
3709  }
3710 
3711  // Removes a target messaging block for this Originator
3712  virtual void unlink_target(ITarget<_Type> * _PTarget)
3713  {
3714  if (_PTarget == NULL)
3715  {
3716  throw std::invalid_argument("_PTarget");
3717  }
3718  {
3719  // Hold the lock to ensure that the target doesn't unlink while
3720  // propagation is in progress.
3721  _R_lock _Lock(_M_internalLock);
3722  if (_M_connectedTargets.remove(_PTarget))
3723  {
3724  this->_Invoke_unlink_source(_PTarget);
3725 
3726  // Indicate that the send is complete
3727  _Done(declined);
3728  }
3729  }
3730  }
3731 
3732  // Removes the target messaging block from this Originator
3733  virtual void unlink_targets()
3734  {
3735  // Hold the lock to ensure that the target doesn't unlink while
3736  // propagation is in progress.
3737  _R_lock _Lock(_M_internalLock);
3738 
3739  for (typename _Target_registry::iterator _Iter = _M_connectedTargets.begin(); *_Iter != NULL; ++_Iter)
3740  {
3741  ITarget<_Type> * _PTarget = *_Iter;
3742  if (_M_connectedTargets.remove(_PTarget))
3743  {
3744  this->_Invoke_unlink_source(_PTarget);
3745  }
3746  }
3747 
3748  // All targets should be unlinked
3750 
3751  // Indicate that the send is complete
3752  _Done(declined);
3753  }
3754 
3755  // Accept on this Originator is called by a target to take ownership of a
3756  // propagated message
3757  virtual message<_Type> * accept(runtime_object_identity _MsgId, ITarget<_Type> * _PTarget)
3758  {
3759  if (_PTarget == NULL)
3760  {
3761  return NULL;
3762  }
3763 
3764  if (!_M_connectedTargets.contains(_PTarget))
3765  {
3766  return NULL;
3767  }
3768 
3769  if (_M_pMessage == NULL || _M_pMessage->msg_id() != _MsgId)
3770  {
3771  return NULL;
3772  }
3773 
3774  // The IDs match. Actually transfer ownership of the message and
3775  // unlink away from the target
3777 
3778  // The ownership of this message has changed. Set the internal pointer to NULL
3779  // so it won't be deleted in the destructor
3780  _M_pMessage = NULL;
3781 
3782  // The message has been accepted/consumed, propagate indication that it has succeeded
3783  _Done(accepted);
3784 
3785  return _Result;
3786  }
3787 
3788  // Reserve needs to be defined for ISource blocks, but Originator doesn't need to
3789  // do anything for reservation because there can only be one target block to read
3790  // the data at a later time.
3791  virtual bool reserve(runtime_object_identity _MsgId, ITarget<_Type> * _PTarget)
3792  {
3793  if (_PTarget == NULL)
3794  {
3795  throw std::invalid_argument("_PTarget");
3796  }
3797 
3798  if (!_M_connectedTargets.contains(_PTarget))
3799  {
3800  return false;
3801  }
3802 
3803  if (_M_pMessage->msg_id() != _MsgId)
3804  {
3805  return false;
3806  }
3807 
3808  return true;
3809  }
3810 
3811  // Consume is called by a target messaging block to take ownership of a
3812  // previously reserved message.
3813  virtual message<_Type> * consume(runtime_object_identity _MsgId, ITarget<_Type> * _PTarget)
3814  {
3815  if (_PTarget == NULL)
3816  {
3817  throw std::invalid_argument("_PTarget");
3818  }
3819 
3820  if (!_M_connectedTargets.contains(_PTarget))
3821  {
3822  throw bad_target();
3823  }
3824 
3825  return accept(_MsgId, _PTarget);
3826  }
3827 
3828  // Release needs to be defined for ISource blocks, but Originator doesn't need to
3829  // do anything for reservation release because there can only be one target block to read
3830  // the data at a later time.
3831  virtual void release(runtime_object_identity _MsgId, ITarget<_Type> * _PTarget)
3832  {
3833  if (_PTarget == NULL)
3834  {
3835  throw std::invalid_argument("_PTarget");
3836  }
3837 
3838  if (!_M_connectedTargets.contains(_PTarget))
3839  {
3840  throw bad_target();
3841  }
3842 
3843  if ((_M_pMessage == NULL) || (_M_pMessage->msg_id() != _MsgId))
3844  {
3845  throw message_not_found();
3846  }
3847 
3848  // If the previously reserved message is released, then propagate
3849  // declined to indicate that the message was not accepted.
3850  _Done(declined);
3851  }
3852 
3854  {
3856  }
3857 
3859  {
3861  }
3862 
3863  private:
3864 
3865  friend class _Originator;
3866 
3867  // Send the given value to the target
3868  bool _internal_send(ITarget<_Type> * _PTarget, _Type const & _Value)
3869  {
3870  // _send should only be called once.
3871  if (_PTarget == NULL)
3872  {
3873  throw std::invalid_argument("_PTarget");
3874  }
3875 
3876  message_status _Status = declined;
3877  message<_Type> * _Msg = new message<_Type>(_Value);
3878 
3879  {
3880  // Hold the lock to ensure that the target doesn't unlink while
3881  // propagation is in progress.
3882  _R_lock _Lock(_M_internalLock);
3883 
3884  // link to the target, create a message and send it
3885  link_target(_PTarget);
3886 
3888  _M_pMessage = _Msg;
3889 
3890  // Send the message synchronously to the target
3891  _Status = _PTarget->send(_M_pMessage, this);
3892  }
3893 
3894  if (_Status == postponed)
3895  {
3896  // If the target postponed the message, wait for it to
3897  // be accepted/declined.
3899 
3900  // Procure the final status
3901  _Status = _M_fStatus;
3902  }
3903 
3904  // status should not be postponed.
3905  _CONCRT_ASSERT(_Status != postponed);
3906 
3907  return (_Status == accepted);
3908  }
3909 
3910  // Add a target messaging block for this Originator
3911  virtual void link_target(ITarget<_Type> * _PTarget)
3912  {
3913  if (_PTarget == NULL)
3914  {
3915  throw std::invalid_argument("_PTarget");
3916  }
3917 
3918  _M_connectedTargets.add(_PTarget);
3919  this->_Invoke_link_source(_PTarget);
3920 
3921  // There should be no pending messages to propagate at this time.
3923  }
3924 
3925  // Wait for the status to reach one of the terminal
3926  // states (!= postponed)
3928  {
3929  // Wait for the event to be signalled
3932 
3933  }
3934 
3936  {
3938  while(_M_referenceCount != 0)
3939  {
3940  spinWait._SpinOnce();
3941  }
3942  }
3943 
3944  // Indicate that the send operation has completed
3945  void _Done(message_status _Status)
3946  {
3947  // postponed is not a done state
3948  _CONCRT_ASSERT(_Status != postponed);
3949 
3950  _M_fStatus = _Status;
3951  _M_ev.set();
3952  }
3953 
3954  // The message that will be propagated by the Originator
3956 
3957  // Event to indicate completion
3958  event _M_ev;
3959 
3960  // Final status of the send
3962 
3963  // A lock for modifying the buffer or the connected blocks
3965 
3966  // Connected targets
3967  _Target_registry _M_connectedTargets;
3968 
3969  volatile long _M_referenceCount;
3970  };
3971 
3972  // The Originator messaging block class is internal to the send function.
3973  template <class _Type>
3974  class _AsyncOriginator : public ISource<_Type>
3975  {
3976  public:
3977 
3979 
3980  // Cleans up any resources that may have been created by the AsyncOriginator.
3982  {
3983  unlink_targets();
3984 
3985  delete _M_pMessage;
3986  }
3987 
3988  // Removes a target messaging block for this AsyncOriginator
3989  virtual void unlink_target(ITarget<_Type> * _PTarget)
3990  {
3991  if (_PTarget == NULL)
3992  {
3993  throw std::invalid_argument("_PTarget");
3994  }
3995 
3996  bool _Unlinked = false;
3997  {
3998  // Hold the lock to ensure that the target doesn't unlink while
3999  // propagation is in progress.
4000  _R_lock _Lock(_M_internalLock);
4001 
4002  if (_M_connectedTargets.remove(_PTarget))
4003  {
4004  this->_Invoke_unlink_source(_PTarget);
4005  _Unlinked = true;
4006  }
4007  }
4008 
4009  // Release the lock before decrementing the refcount. Otherwise, the
4010  // lock release could corrupt memory.
4011  if (_Unlinked)
4012  {
4013  _Release_ref();
4014  }
4015  }
4016 
4017  // Removes the target messaging block from this AsyncOriginator
4018  virtual void unlink_targets()
4019  {
4020  bool _Unlinked = false;
4021  {
4022  // Hold the lock to ensure that the target doesn't unlink while
4023  // propagation is in progress.
4024  _R_lock _Lock(_M_internalLock);
4025 
4026  for (typename _Target_registry::iterator _Iter = _M_connectedTargets.begin();
4027  *_Iter != NULL;
4028  ++_Iter)
4029  {
4030  ITarget<_Type> * _PTarget = *_Iter;
4031  if (_M_connectedTargets.remove(_PTarget))
4032  {
4033  this->_Invoke_unlink_source(_PTarget);
4034  _Unlinked = true;
4035  }
4036 
4037  }
4038 
4039  // All targets should be unlinked
4041  }
4042 
4043  // Release the lock before decrementing the refcount. Otherwise, the
4044  // lock release could corrupt memory.
4045  if (_Unlinked)
4046  {
4047  _Release_ref();
4048  }
4049  }
4050 
4051  // Accept on this AsyncOriginator is called by a target to take ownership of a
4052  // propagated message. This can only be called from propagate.
4053  virtual message<_Type> * accept(runtime_object_identity _MsgId, ITarget<_Type> * _PTarget)
4054  {
4055  if (_PTarget == NULL)
4056  {
4057  return NULL;
4058  }
4059 
4060  if (!_M_connectedTargets.contains(_PTarget))
4061  {
4062  return NULL;
4063  }
4064 
4065  if (_M_pMessage == NULL || _M_pMessage->msg_id() != _MsgId)
4066  {
4067  return NULL;
4068  }
4069 
4070  //
4071  // If the IDs match, actually transfer ownership of the message.
4072  //
4074  _M_pMessage = NULL;
4075 
4076  return _Result;
4077  }
4078 
4079  // Reserve needs to be defined for ISource blocks, but AsyncOriginator doesn't need to
4080  // do anything for reservation because there can only be one target block to read
4081  // the data at a later time.
4082  virtual bool reserve(runtime_object_identity _MsgId, ITarget<_Type> * _PTarget)
4083  {
4084  if (_PTarget == NULL)
4085  {
4086  throw std::invalid_argument("_PTarget");
4087  }
4088 
4089  if (!_M_connectedTargets.contains(_PTarget))
4090  {
4091  return false;
4092  }
4093 
4094  if (_M_pMessage == NULL || _M_pMessage->msg_id() != _MsgId)
4095  {
4096  return false;
4097  }
4098 
4099  return true;
4100  }
4101 
4102  // Consume is called by a target messaging block to take ownership of a
4103  // previously reserved message.
4104  virtual message<_Type> * consume(runtime_object_identity _MsgId, ITarget<_Type> * _PTarget)
4105  {
4106  if (_PTarget == NULL)
4107  {
4108  throw std::invalid_argument("_PTarget");
4109  }
4110 
4111  if (!_M_connectedTargets.contains(_PTarget))
4112  {
4113  throw bad_target();
4114  }
4115 
4116  if (_M_pMessage == NULL || _M_pMessage->msg_id() != _MsgId)
4117  {
4118  return NULL;
4119  }
4120 
4121  // The ownership of this message has changed. Set the internal pointer to NULL
4122  // so it won't be deleted in the destructor
4123 
4125  _M_pMessage = NULL;
4126 
4127  // We are done. Unlink from the target. DO NOT TOUCH "this" pointer after unlink
4128  unlink_target(_PTarget);
4129 
4130  return _Result;
4131  }
4132 
4133  // Release needs to be defined for ISource blocks, but AsyncOriginator doesn't need to
4134  // do anything for reservation release because there can only be one target block to read
4135  // the data at a later time.
4136  virtual void release(runtime_object_identity _MsgId, ITarget<_Type> * _PTarget)
4137  {
4138  if (_PTarget == NULL)
4139  {
4140  throw std::invalid_argument("_PTarget");
4141  }
4142 
4143  if (!_M_connectedTargets.contains(_PTarget))
4144  {
4145  throw bad_target();
4146  }
4147 
4148  if ((_M_pMessage == NULL) || (_M_pMessage->msg_id() != _MsgId))
4149  {
4150  throw message_not_found();
4151  }
4152 
4153  // We can be connected to only 1 target. Unlink from the target.
4154  // DO NOT TOUCH "this" pointer after unlink
4155  unlink_target(_PTarget);
4156  }
4157 
4159  {
4160  _Acquire_ref();
4161  }
4162 
4164  {
4165  _Release_ref();
4166  }
4167 
4168  private:
4169 
4170  friend class _Originator;
4171 
4172  // Create an AsyncOriginator (constructor is private to ensure that
4173  // it is allocated on the heap).
4175  _M_pMessage(NULL),
4176  _M_refcount(0)
4177  {
4178  }
4179 
4180  // Send the given value to the target
4181  bool _internal_send(ITarget<_Type> * _PTarget, _Type const & _Value)
4182  {
4183  // Keep a refcount so that this object doesn't get deleted if
4184  // the target decides to unlink before we release our lock
4185  _Acquire_ref();
4186 
4187  message_status _Status = declined;
4188  message<_Type> * _Msg = new message<_Type>(_Value);
4189 
4190  {
4191  // Hold the lock to ensure that the target doesn't unlink while
4192  // propagation is in progress.
4193  _R_lock _Lock(_M_internalLock);
4194 
4195  // link to the target, create a message and send it
4196  link_target(_PTarget);
4197 
4199  _M_pMessage = _Msg;
4200 
4201  _Status = _PTarget->propagate(_M_pMessage, this);
4202  }
4203 
4204  // If the status is anything other than postponed, unlink away
4205  // from the target and delete the AsyncOriginator.
4206  if (_Status != postponed)
4207  {
4208  unlink_target(_PTarget);
4209  }
4210 
4211  // Release the reference acquired above
4212  _Release_ref();
4213 
4214  return (_Status == accepted);
4215  }
4216 
4217  // Add a target messaging block for this AsyncOriginator
4218  virtual void link_target(ITarget<_Type> * _PTarget)
4219  {
4220  if (_PTarget == NULL)
4221  {
4222  throw std::invalid_argument("_PTarget");
4223  }
4224 
4225  // Acquire a reference that will be released by unlink_target
4226  _Acquire_ref();
4227  _M_connectedTargets.add(_PTarget);
4228  this->_Invoke_link_source(_PTarget);
4229 
4230  // There should be no pending messages to propagate at this time.
4232 
4233  }
4234 
4235  // Acquire a reference on the async originator object
4237  {
4239  }
4240 
4241  // Release the reference on the async originator object. The object
4242  // will be deleted when the reference count goes to 0.
4244  {
4247  {
4248  delete this;
4249  }
4250  }
4251 
4252  // The message that will be propagated by the AsyncOriginator
4254 
4255  // Reference count to manage object lifetime
4256  volatile long _M_refcount;
4257 
4258  // The internal lock for this block for its message
4260 
4261  // connected targets
4262  _Target_registry _M_connectedTargets;
4263  };
4264 
4265  // static class that exposes methods to initiate messages into
4266  // a dataflow network
4268  {
4269  public:
4270 
4271  // Synchronous initiation of messages
4272  template <class _Type>
4273  static bool _send(ITarget<_Type> * _Trg, const _Type& _Data)
4274  {
4275  if (_Trg != NULL && _Trg->supports_anonymous_source())
4276  {
4277  // _send will block until the message is accepted/rejected.
4278  // Note that this invokes the send method on the target which
4279  // would synchronously process the message.
4280  _AnonymousOriginator<_Type> _Send_block;
4281  return _Send_block._internal_send(_Trg, _Data);
4282  }
4283  else
4284  {
4285  // Create a blocking originator on the stack. _send will block until the
4286  // message is accepted/rejected.
4287  _SyncOriginator<_Type> _Orig;
4288  return _Orig._internal_send(_Trg, _Data);
4289  }
4290  }
4291 
4292  // Asynchronous initiation of messages
4293  template <class _Type>
4294  static bool _asend(ITarget<_Type> * _Trg, const _Type& _Data)
4295  {
4296  // If the block can participate in posting messages without requiring a call back, use that
4297  // method of initiating the message rather for efficiency purposes.
4298  if (_Trg != NULL && _Trg->supports_anonymous_source())
4299  {
4300  _AnonymousOriginator<_Type> _Asend_block;
4301  return _Asend_block._internal_asend(_Trg, _Data);
4302  }
4303  else
4304  {
4305  // Needs to be allocated on the heap
4307  return _AsyncOrig->_internal_send(_Trg, _Data);
4308  }
4309  }
4310  };
4311 
4312 } // namespace details
4313 
4335 
4336 template <class _Type>
4337 bool send(_Inout_ ITarget<_Type> * _Trg, const _Type& _Data)
4338 {
4339  return details::_Originator::_send(_Trg, _Data);
4340 }
4341 
4342 
4364 
4365 template <class _Type>
4366 bool send(ITarget<_Type> &_Trg, const _Type &_Data)
4367 {
4368  return ::Concurrency::send(&_Trg, _Data);
4369 }
4370 
4392 
4393 template <class _Type>
4394 bool asend(_Inout_ ITarget<_Type> * _Trg, const _Type& _Data)
4395 {
4396  return details::_Originator::_asend(_Trg, _Data);
4397 }
4398 
4399 
4421 
4422 template <class _Type>
4423 bool asend(ITarget<_Type> &_Trg, const _Type &_Data)
4424 {
4425  return ::Concurrency::asend(&_Trg, _Data);
4426 }
4427 
4428 //**************************************************************************
4429 // Target Block:
4430 //**************************************************************************
4431 
4443 
4444 template<class _SourceLinkRegistry,
4445  class _MessageProcessorType = ordered_message_processor<typename _SourceLinkRegistry::type::source_type>>
4446 class target_block : public ITarget<typename _SourceLinkRegistry::type::source_type>
4447 {
4448 public:
4452 
4453  typedef typename _SourceLinkRegistry::type::source_type _Source_type;
4454 
4458 
4460 
4464 
4466 
4467  using typename ITarget<_Source_type>::filter_method;
4468 
4472 
4474  {
4478  }
4479 
4483 
4484  virtual ~target_block()
4485  {
4486  // All sources should have been unlinked
4488  delete _M_pFilter;
4489 
4491  }
4492 
4510 
4512  {
4513  // It is important that calls to propagate do *not* take the same lock on the
4514  // internal structure that is used by <c>consume</c> and the LWT. Doing so could
4515  // result in a deadlock.
4516 
4517  if (_PMessage == NULL)
4518  {
4519  throw std::invalid_argument("_PMessage");
4520  }
4521 
4522  if (_PSource == NULL)
4523  {
4524  throw std::invalid_argument("_PSource");
4525  }
4526 
4527  if (_M_fDeclineMessages)
4528  {
4529  return declined;
4530  }
4531 
4532  if (_M_pFilter != NULL && !(*_M_pFilter)(_PMessage->payload))
4533  {
4534  return declined;
4535  }
4536 
4537  return propagate_message(_PMessage, _PSource);
4538  }
4539 
4561 
4563  {
4564  if (_PMessage == NULL)
4565  {
4566  throw std::invalid_argument("_PMessage");
4567  }
4568 
4569  if (_PSource == NULL)
4570  {
4571  throw std::invalid_argument("_PSource");
4572  }
4573 
4574  if (_M_fDeclineMessages)
4575  {
4576  return declined;
4577  }
4578 
4579  if (_M_pFilter != NULL && !(*_M_pFilter)(_PMessage->payload))
4580  {
4581  return declined;
4582  }
4583 
4584  return send_message(_PMessage, _PSource);
4585  }
4586 
4587 protected:
4588 
4603 
4605 
4623 
4625  {
4626  // By default we do not allow send()
4627  return declined;
4628  }
4629 
4641 
4643  {
4644  _M_connectedSources.add(_PSource);
4648  }
4649 
4659 
4661  {
4665 
4666  _M_connectedSources.remove(_PSource);
4667  }
4668 
4672 
4673  virtual void unlink_sources()
4674  {
4675  for (source_iterator _Iter = _M_connectedSources.begin(); *_Iter != NULL; ++_Iter)
4676  {
4677  ISource<_Source_type> * _PSource = *_Iter;
4678  _PSource->unlink_target(this);
4679  }
4680  }
4681 
4688 
4690  {
4691  }
4692 
4693  //
4694  // Utility routines
4695  //
4696 
4704 
4705  void register_filter(filter_method const& _Filter)
4706  {
4707  if (_Filter != NULL)
4708  {
4709  _M_pFilter = new filter_method(_Filter);
4710  }
4711  }
4712 
4719 
4721  {
4722  _M_fDeclineMessages = true;
4723  }
4724 
4737 
4738  void initialize_target(_Inout_opt_ Scheduler * _PScheduler = NULL, _Inout_opt_ ScheduleGroup * _PScheduleGroup = NULL)
4739  {
4740  // Register a callback with the processor
4741  _M_messageProcessor.initialize(_PScheduler, _PScheduleGroup,
4742  // Processing and Propagating function used by ordered_message_processors
4743  [this](message<_Source_type> * _PMessage)
4744  {
4745  // Handle message by calling process_message to maintain CRT100 compatibility
4746  this->process_message(_PMessage);
4747  });
4748 
4749  // Register this target block as the owner of the connected sources
4751  }
4752 
4756 
4758  {
4759  _M_messageProcessor.initialize_batched_processing(
4760  // Processing function used by CRT110
4761  [this](message<_Source_type> * _PMessage)
4762  {
4763  // Handle message through new process_input_message to use CRT110 batch processing
4764  this->process_input_messages(_PMessage);
4765  }, nullptr);
4766  }
4767 
4774 
4776  {
4777  _M_messageProcessor.async_send(_PMessage);
4778  }
4779 
4786 
4788  {
4789  _M_messageProcessor.sync_send(_PMessage);
4790  }
4791 
4799 
4801  {
4802  // Decline new messages to ensure that messages are not dropped during the wait
4804 
4805  _M_messageProcessor.wait();
4806  }
4807 
4814 
4816  {
4818 
4819  unlink_sources();
4820  }
4821 
4825 
4827  {
4828  throw invalid_operation("To use batched processing, you must override process_input_messages in the message block.");
4829  }
4830 
4834 
4835  _SourceLinkManager _M_connectedSources;
4836 
4840 
4842 
4847 
4849 
4853 
4854  _MessageProcessorType _M_messageProcessor;
4855 };
4856 
4857 //**************************************************************************
4858 // Source Block:
4859 //**************************************************************************
4860 
4876 
4877 template<class _TargetLinkRegistry,
4879 class source_block : public ISource<typename _TargetLinkRegistry::type::type>
4880 {
4881 public:
4882 
4886 
4887  typedef typename _TargetLinkRegistry::type::type _Target_type;
4888 
4892 
4893  typedef typename _TargetLinkRegistry::iterator target_iterator;
4894 
4898 
4901  _M_reservedId(-1),
4903  {
4907  }
4908 
4912 
4913  virtual ~source_block()
4914  {
4915  // All targets should have been unlinked
4916  _CONCRT_ASSERT(_M_connectedTargets.count() == 0);
4917 
4919  }
4920 
4931 
4932  virtual void link_target(_Inout_ ITarget<_Target_type> * _PTarget)
4933  {
4934  _R_lock _Lock(_M_internalLock);
4935 
4936  if (_PTarget == NULL)
4937  {
4938  throw std::invalid_argument("_PTarget");
4939  }
4940 
4941  _M_connectedTargets.add(_PTarget);
4942  this->_Invoke_link_source(_PTarget);
4943  link_target_notification(_PTarget);
4944  }
4945 
4956 
4958  {
4959  _R_lock _Lock(_M_internalLock);
4960 
4961  if (_PTarget == NULL)
4962  {
4963  throw std::invalid_argument("_PTarget");
4964  }
4965 
4966  if (_M_connectedTargets.remove(_PTarget))
4967  {
4968  // We were able to remove the target from our list.
4969  // Inform the target to unlink from us
4970  this->_Invoke_unlink_source(_PTarget);
4971  }
4972  }
4973 
4977 
4978  virtual void unlink_targets()
4979  {
4980  _R_lock _Lock(_M_internalLock);
4981 
4982  for (target_iterator _Iter = _M_connectedTargets.begin(); *_Iter != NULL; ++_Iter)
4983  {
4984  ITarget<_Target_type> * _PTarget = *_Iter;
4985  _CONCRT_ASSERT(_PTarget != NULL);
4986 
4987  unlink_target(_PTarget);
4988  }
4989 
4990  // All the targets should be unlinked.
4991  _CONCRT_ASSERT(_M_connectedTargets.count() == 0);
4992  }
4993 
5015 
5016  virtual message<_Target_type> * accept(runtime_object_identity _MsgId, _Inout_ ITarget<_Target_type> * _PTarget)
5017  {
5018  if (_PTarget == NULL)
5019  {
5020  throw std::invalid_argument("_PTarget");
5021  }
5022 
5023  // Assert if the target is not connected
5024  _CONCRT_ASSERT(_M_connectedTargets.contains(_PTarget));
5025 
5026  return accept_message(_MsgId);
5027  }
5028 
5051 
5052  virtual bool reserve(runtime_object_identity _MsgId, _Inout_ ITarget<_Target_type> * _PTarget)
5053  {
5054  _R_lock _Lock(_M_internalLock);
5055 
5056  if (_PTarget == NULL)
5057  {
5058  throw std::invalid_argument("_PTarget");
5059  }
5060 
5061  if ( _M_pReservedFor != NULL)
5062  {
5063  // Someone else is holding the reservation
5064  return false;
5065  }
5066 
5067  if (!reserve_message(_MsgId))
5068  {
5069  // Failed to reserve the msg ID
5070  return false;
5071  }
5072 
5073  // Save the reserving target and the msg ID
5074  _M_pReservedFor = _PTarget;
5075  _M_reservedId = _MsgId;
5076 
5077  return true;
5078  }
5079 
5107 
5108  virtual message<_Target_type> * consume(runtime_object_identity _MsgId, _Inout_ ITarget<_Target_type> * _PTarget)
5109  {
5110  _R_lock _Lock(_M_internalLock);
5111 
5112  if (_PTarget == NULL)
5113  {
5114  throw std::invalid_argument("_PTarget");
5115  }
5116 
5117  if (_M_pReservedFor == NULL || _PTarget != _M_pReservedFor)
5118  {
5119  throw bad_target();
5120  }
5121 
5122  message<_Target_type> * _Msg = consume_message(_MsgId);
5123 
5124  if (_Msg != NULL)
5125  {
5126  // Clear the reservation
5127  // _M_pReservedId is intentionally not reset so that it can assist in debugging
5129 
5130  // Reservation is assumed to block propagation. Notify that propagation can now be resumed
5132  }
5133 
5134  return _Msg;
5135  }
5136 
5156 
5157  virtual void release(runtime_object_identity _MsgId, _Inout_ ITarget<_Target_type> * _PTarget)
5158  {
5159  _R_lock _Lock(_M_internalLock);
5160 
5161  if (_PTarget == NULL)
5162  {
5163  throw std::invalid_argument("_PTarget");
5164  }
5165 
5166  if (_PTarget != _M_pReservedFor)
5167  {
5168  throw bad_target();
5169  }
5170 
5171  release_message(_MsgId);
5172 
5173  // Clear the reservation
5174  // _M_pReservedId is intentionally not reset so that it can assist in debugging
5176 
5177  // Reservation is assumed to block propagation. Notify that propagation can now be resumed
5179  }
5180 
5191 
5193  {
5195  }
5196 
5207 
5208  virtual void release_ref(_Inout_ ITarget<_Target_type> * _PTarget)
5209  {
5210  if (_PTarget != NULL)
5211  {
5212  _R_lock _Lock(_M_internalLock);
5213 
5214  // We assume that each target would keep a single reference on its source, so
5215  // we call unlink target notification on every release. Otherwise, we would be
5216  // required to keep a reference count per target.
5217  // Note: unlink_target_notification can check the value of this _PTarget pointer, but
5218  // must not dereference it, as it may have already been deleted.
5219  unlink_target_notification(_PTarget);
5220  }
5221 
5223 
5224  // It is *unsafe* to touch the "this" pointer after decrementing the reference count
5225  }
5226 
5227 protected:
5228 
5229  //
5230  // Protected methods that a derived class can override to customize
5231  // the functionality
5232  //
5233 
5240 
5242  {
5243  // By default, we restart propagation if there is no pending reservation
5244  if (_M_pReservedFor == NULL)
5245  {
5247  }
5248  }
5249 
5256 
5258  {
5259  // At this point, the target has already been disconnected from the
5260  // source. It is safe to check the value of this pointer, but not
5261  // safe to dereference it, as it may have already been deleted.
5262 
5263  // If the target being unlinked is the one holding the reservation,
5264  // release the reservation
5265  if (_M_pReservedFor == _PTarget)
5266  {
5267  release(_M_reservedId, _PTarget);
5268  }
5269  }
5270 
5286 
5287  virtual message<_Target_type> * accept_message(runtime_object_identity _MsgId) = 0;
5288 
5303 
5304  virtual bool reserve_message(runtime_object_identity _MsgId) = 0;
5305 
5318 
5319  virtual message<_Target_type> * consume_message(runtime_object_identity _MsgId) = 0;
5320 
5327 
5328  virtual void release_message(runtime_object_identity _MsgId) = 0;
5329 
5333 
5334  virtual void resume_propagation() = 0;
5335 
5339 
5341  {
5342  // source_blocks do not need to process anything
5343  }
5344 
5348 
5350  {
5351  throw invalid_operation("To use batched processing, you must override propagate_output_messages in the message block.");
5352  }
5353 
5361 
5363  {
5364  (void) _PMessage;
5365  throw invalid_operation("To use ordered message processing, you must override propagate_to_any_targets in the message block.");
5366  }
5367 
5368  //
5369  // Utility routines
5370  //
5382 
5383  void initialize_source(_Inout_opt_ Scheduler * _PScheduler = NULL, _Inout_opt_ ScheduleGroup * _PScheduleGroup = NULL)
5384  {
5385  // Register a callback
5386  _M_messageProcessor.initialize(_PScheduler, _PScheduleGroup,
5387  [this](message<_Target_type> * _PMessage)
5388  {
5389  this->_Handle_message(_PMessage);
5390  });
5391  }
5392 
5396 
5398  {
5399  // Register callbacks for CRT110 batched processing
5400  _M_messageProcessor.initialize_batched_processing(
5401  // Processing function used by CRT110
5402  [this](message<_Target_type> * _PMessage)
5403  {
5404  // Handle message through new process_input_message to use CRT110 batch processing
5405  this->process_input_messages(_PMessage);
5406  },
5407  [this](void)
5408  {
5409  this->_Propagate_message();
5410  });
5411  }
5412 
5420 
5422  {
5423  // Caller shall not be holding any locks when calling this routine
5424  _M_messageProcessor.sync_send(_Msg);
5425  }
5426 
5434 
5436  {
5437  _M_messageProcessor.async_send(_Msg);
5438  }
5439 
5445 
5447  {
5448  _M_messageProcessor.wait();
5449  }
5450 
5454 
5456  {
5457  // Wait for outstanding propagation to complete.
5459 
5460  unlink_targets();
5461 
5462  _Wait_on_ref();
5463  }
5464 
5465  //
5466  // Protected members
5467  //
5468 
5472 
5474 
5478 
5479  runtime_object_identity _M_reservedId;
5480 
5484 
5485  _TargetLinkRegistry _M_connectedTargets;
5486 
5490 
5491  _MessageProcessorType _M_messageProcessor;
5492 
5493 private:
5494 
5496 
5497 
5498  // Message handler callback for the propagator. Invokes propagate_to_any_targets
5499  // which derived classes should implement.
5500 
5502  {
5503  // Hold a lock to synchronize with unlink targets
5504  _R_lock _Lock(_M_internalLock);
5505  propagate_to_any_targets(_PMessage);
5506  }
5507 
5508  // Message handler callback for the processor. Invokes process_input_messages
5509  // which derived classes should implement.
5510 
5512  {
5513  // Don't need a lock to process the message
5514  process_input_messages(_PMessage);
5515  }
5516 
5517  // Message handler callback for the propagator. Invokes propagate_output_messages
5518  // which derived classes should implement.
5519 
5521  {
5522  // Hold a lock to synchronize with unlink targets
5523  _R_lock _Lock(_M_internalLock);
5525  }
5526 
5527  // Wait for the reference on this block to drop to zero
5528 
5529  void _Wait_on_ref(long _RefCount = 0)
5530  {
5532  while(_M_referenceCount != _RefCount)
5533  {
5534  spinWait._SpinOnce();
5535  }
5536  }
5537 
5538  // Private Data members
5539 
5546 
5548 
5549  volatile long _M_referenceCount;
5550 
5551 };
5552 
5553 //**************************************************************************
5554 // Propagator (source and target) Block:
5555 //**************************************************************************
5575 
5576 template<class _TargetLinkRegistry, class _SourceLinkRegistry,
5578 class propagator_block : public source_block<_TargetLinkRegistry, _MessageProcessorType>, public ITarget<typename _SourceLinkRegistry::type::source_type>
5579 {
5580 public:
5584 
5585  typedef typename _SourceLinkRegistry::type::source_type _Source_type;
5586 
5590 
5592 
5596 
5598 
5600  using typename ITarget<_Source_type>::filter_method;
5601 
5605 
5607  {
5608  }
5609 
5613 
5615  {
5617 
5618  delete _M_pFilter;
5619  }
5620 
5640 
5642  {
5643  // It is important that calls to propagate do *not* take the same lock on the
5644  // internal structure that is used by <c>consume</c> and the LWT. Doing so could
5645  // result in a deadlock.
5646 
5647  if (_PMessage == NULL)
5648  {
5649  throw std::invalid_argument("_PMessage");
5650  }
5651 
5652  if (_PSource == NULL)
5653  {
5654  throw std::invalid_argument("_PSource");
5655  }
5656 
5657  if (_M_fDeclineMessages)
5658  {
5659  return declined;
5660  }
5661 
5662  if (_M_pFilter != NULL && !(*_M_pFilter)(_PMessage->payload))
5663  {
5664  return declined;
5665  }
5666 
5667  return propagate_message(_PMessage, _PSource);
5668  }
5669 
5688 
5690  {
5691  if (_PMessage == NULL)
5692  {
5693  throw std::invalid_argument("_PMessage");
5694  }
5695 
5696  if (_PSource == NULL)
5697  {
5698  throw std::invalid_argument("_PSource");
5699  }
5700 
5701  if (_M_fDeclineMessages)
5702  {
5703  return declined;
5704  }
5705 
5706  if (_M_pFilter != NULL && !(*_M_pFilter)(_PMessage->payload))
5707  {
5708  return declined;
5709  }
5710 
5711  return send_message(_PMessage, _PSource);
5712  }
5713 
5714 protected:
5715 
5730 
5732 
5750 
5752  {
5753  // By default we do not allow send()
5754  return declined;
5755  }
5756 
5763 
5765  {
5766  _M_connectedSources.add(_PSource);
5770  }
5771 
5778 
5780  {
5784 
5785  _M_connectedSources.remove(_PSource);
5786  }
5787 
5791 
5792  virtual void unlink_sources()
5793  {
5794  for (source_iterator _Iter = _M_connectedSources.begin(); *_Iter != NULL; ++_Iter)
5795  {
5796  ISource<_Source_type> * _PSource = *_Iter;
5797  _PSource->unlink_target(this);
5798  }
5799  }
5800 
5801  //
5802  // Utility routines
5803  //
5804 
5808 
5810  {
5811  throw invalid_operation("To use batched processing, you must override process_input_messages in the message block.");
5812  }
5813 
5826 
5827  void initialize_source_and_target(_Inout_opt_ Scheduler * _PScheduler = NULL, _Inout_opt_ ScheduleGroup * _PScheduleGroup = NULL)
5828  {
5829  this->initialize_source(_PScheduler, _PScheduleGroup);
5830 
5831  // Register this propagator block as the owner of the connected sources
5833  }
5834 
5841 
5842  void register_filter(filter_method const& _Filter)
5843  {
5844  if (_Filter != NULL)
5845  {
5846  _M_pFilter = new filter_method(_Filter);
5847  }
5848  }
5849 
5856 
5858  {
5859  _M_fDeclineMessages = true;
5860  }
5861 
5865 
5867  {
5868  // Decline messages while the links are being removed
5870 
5871  // Remove all the target links. This waits for
5872  // all outstanding async propagation operations.
5873  this->remove_targets();
5874 
5875  // unlink all sources. The above steps guarantee that
5876  // they can be removed safely.
5877  unlink_sources();
5878  }
5879 
5883 
5884  _SourceLinkManager _M_connectedSources;
5885 
5889 
5891 
5896 
5897  volatile bool _M_fDeclineMessages;
5898 };
5899 
5900 //**************************************************************************
5901 // Unbounded Buffers:
5902 //**************************************************************************
5903 
5916 
5917 template<class _Type>
5918 class unbounded_buffer : public propagator_block<multi_link_registry<ITarget<_Type>>, multi_link_registry<ISource<_Type>>>
5919 {
5920 private:
5923 
5924 public:
5928 
5941 
5944  {
5946  this->enable_batched_processing();
5947  }
5948 
5964 
5967  {
5969  this->enable_batched_processing();
5970  this->register_filter(_Filter);
5971  }
5972 
5973 #ifdef _CRT_USE_WINAPI_FAMILY_DESKTOP_APP
5974 
5990  unbounded_buffer(Scheduler& _PScheduler) :
5992  {
5993  this->initialize_source_and_target(&_PScheduler);
5994  this->enable_batched_processing();
5995  }
5996 
6015 
6016  unbounded_buffer(Scheduler& _PScheduler, filter_method const& _Filter) :
6018  {
6019  this->initialize_source_and_target(&_PScheduler);
6020  this->enable_batched_processing();
6021  this->register_filter(_Filter);
6022  }
6023 
6040 
6041  unbounded_buffer(ScheduleGroup& _PScheduleGroup) :
6043  {
6044  this->initialize_source_and_target(NULL, &_PScheduleGroup);
6045  this->enable_batched_processing();
6046  }
6047 
6067 
6068  unbounded_buffer(ScheduleGroup& _PScheduleGroup, filter_method const& _Filter) :
6070  {
6071  this->initialize_source_and_target(NULL, &_PScheduleGroup);
6072  this->enable_batched_processing();
6073  this->register_filter(_Filter);
6074  }
6075 #endif /* _CRT_USE_WINAPI_FAMILY_DESKTOP_APP */
6076 
6080 
6082  {
6083  // Remove all links
6084  this->remove_network_links();
6085 
6086  // Clean up any messages left in this message block
6088  }
6089 
6099 
6100  bool enqueue(_Type const& _Item)
6101  {
6102  return ::Concurrency::send<_Type>(this, _Item);
6103  }
6104 
6111 
6113  {
6114  return ::Concurrency::receive<_Type>(this);
6115  }
6116 
6117 
6118 protected:
6119 
6120  //
6121  // propagator_block protected function implementations
6122  //
6123 
6138 
6140  {
6141  // It is important that calls to propagate do *not* take the same lock on the
6142  // internal structure that is used by <c>consume</c> and the LWT. Doing so could
6143  // result in a deadlock.
6144 
6146 
6147  // Accept the message being propagated
6148  _PMessage = _PSource->accept(_PMessage->msg_id(), this);
6149 
6150  if (_PMessage != NULL)
6151  {
6152  this->async_send(_PMessage);
6153  }
6154  else
6155  {
6156  _Result = missed;
6157  }
6158 
6159  return _Result;
6160  }
6161 
6176 
6178  {
6179  _PMessage = _PSource->accept(_PMessage->msg_id(), this);
6180 
6181  if (_PMessage != NULL)
6182  {
6183  this->sync_send(_PMessage);
6184  }
6185  else
6186  {
6187  return missed;
6188  }
6189 
6190  return accepted;
6191  }
6192 
6200 
6202  {
6203  return true;
6204  }
6205 
6216 
6217  virtual message<_Type> * accept_message(runtime_object_identity _MsgId)
6218  {
6219  //
6220  // Peek at the head message in the message buffer. If the IDs match
6221  // dequeue and transfer ownership
6222  //
6223  message<_Type> * _Msg = NULL;
6224 
6225  if (_M_messageBuffer._Is_head(_MsgId))
6226  {
6227  _Msg = _M_messageBuffer._Dequeue();
6228  }
6229 
6230  return _Msg;
6231  }
6232 
6246 
6247  virtual bool reserve_message(runtime_object_identity _MsgId)
6248  {
6249  // Allow reservation if this is the head message
6250  return _M_messageBuffer._Is_head(_MsgId);
6251  }
6252 
6266 
6267  virtual message<_Type> * consume_message(runtime_object_identity _MsgId)
6268  {
6269  // By default, accept the message
6270  return accept_message(_MsgId);
6271  }
6272 
6279 
6280  virtual void release_message(runtime_object_identity _MsgId)
6281  {
6282  // The head message is the one reserved.
6283  if (!_M_messageBuffer._Is_head(_MsgId))
6284  {
6285  throw message_not_found();
6286  }
6287  }
6288 
6292 
6293  virtual void resume_propagation()
6294  {
6295  // If there are any messages in the buffer, propagate them out
6296  if (_M_messageBuffer._Count() > 0)
6297  {
6298  // Set the flag to force a repropagation. This flag is cleared when a propagation happens
6299  // The only functions that call this are release, consume, and link_target, all of which
6300  // hold the internal lock, so the flag is guaranteed to be read by propagation, which also
6301  // holds the same lock.
6302  _M_fForceRepropagation = true;
6303 
6304  // async send a NULL value to initiate the repropagation
6305  this->async_send(NULL);
6306  }
6307  }
6308 
6315 
6317  {
6318  // If the message queue is blocked due to reservation
6319  // there is no need to do any message propagation
6320  if (this->_M_pReservedFor != NULL)
6321  {
6322  return;
6323  }
6324 
6325  message<_Type> * _Msg = _M_messageBuffer._Peek();
6326 
6327  if (_Msg != NULL)
6328  {
6329  // Propagate the head message to the new target
6330  message_status _Status = _PTarget->propagate(_Msg, this);
6331 
6332  if (_Status == accepted)
6333  {
6334  // The target accepted the message, restart propagation.
6336  }
6337 
6338  // If the status is anything other than accepted, then leave
6339  // the message queue blocked.
6340  }
6341  }
6342 
6348  {
6349  if (_PMessage != NULL)
6350  {
6351  _M_processedMessages._Enqueue(_PMessage);
6352  }
6353  }
6354 
6368 
6370  {
6371  // Move the messages from the processedMessages queue to the internal storage
6372  // to make them ready for propagating out
6373 
6374  // If there are messages in the message queue, the queue is blocked and a
6375  // propagation should not happen unless it has been forced using resume_propagation
6376  bool _FIsBlocked = (_M_messageBuffer._Count() > 0);
6377 
6378  for(;;)
6379  {
6380  message<_Type> * _PInputMessage = _M_processedMessages._Dequeue();
6381  if(_PInputMessage == NULL)
6382  {
6383  break;
6384  }
6385  _M_messageBuffer._Enqueue(_PInputMessage);
6386  }
6387 
6388  if (_M_fForceRepropagation == false && _FIsBlocked == true)
6389  {
6390  return;
6391  }
6392 
6393  // Reset the repropagation flag because a propagation has started.
6394  _M_fForceRepropagation = false;
6395 
6396  // Attempt to propagate messages to all the targets
6398  }
6399 
6400 private:
6401 
6408 
6410  {
6411  message<_Target_type> * _Msg = _MessageBuffer._Peek();
6412 
6413  // If someone has reserved the _Head message, don't propagate anymore
6414  if (this->_M_pReservedFor != NULL)
6415  {
6416  return;
6417  }
6418 
6419  while (_Msg != NULL)
6420  {
6421  message_status _Status = declined;
6422 
6423  // Always start from the first target that linked
6424  for (target_iterator _Iter = this->_M_connectedTargets.begin(); *_Iter != NULL; ++_Iter)
6425  {
6426  ITarget<_Target_type> * _PTarget = *_Iter;
6427  _Status = _PTarget->propagate(_Msg, this);
6428 
6429  // Ownership of message changed. Do not propagate this
6430  // message to any other target.
6431  if (_Status == accepted)
6432  {
6433  break;
6434  }
6435 
6436  // If the target just propagated to reserved this message, stop
6437  // propagating it to others
6438  if (this->_M_pReservedFor != NULL)
6439  {
6440  break;
6441  }
6442  }
6443 
6444  // If status is anything other than accepted, then the head message
6445  // was not propagated out. Thus, nothing after it in the queue can
6446  // be propagated out. Cease propagation.
6447  if (_Status != accepted)
6448  {
6449  break;
6450  }
6451 
6452  // Get the next message
6453  _Msg = _MessageBuffer._Peek();
6454  }
6455  }
6456 
6461 
6463  {
6464  // Input messages for this message block are in the base-class input buffer
6465  // All messages in that buffer are guaranteed to have moved to the output
6466  // buffer because the destructor first waits for all async sends to finish
6467  // before reaching this point
6468 
6469  // Delete any messages remaining in the output queue
6470  for (;;)
6471  {
6472  message<_Type> * _Msg = _M_messageBuffer._Dequeue();
6473  if (_Msg == NULL)
6474  {
6475  break;
6476  }
6477  delete _Msg;
6478  }
6479  }
6480 
6484 
6486 
6490 
6492 
6496 
6498 
6499 private:
6500  //
6501  // Hide assignment operator and copy constructor
6502  //
6503  unbounded_buffer const &operator =(unbounded_buffer const&); // no assignment operator
6504  unbounded_buffer(unbounded_buffer const &); // no copy constructor
6505 };
6506 
6507 //**************************************************************************
6508 // Overwrite Buffers:
6509 //**************************************************************************
6510 
6525 
6526 template<class _Type>
6527 class overwrite_buffer : public propagator_block<multi_link_registry<ITarget<_Type>>, multi_link_registry<ISource<_Type>>>
6528 {
6529 private:
6532 
6533 public:
6536 
6549 
6552  {
6554  }
6555 
6571 
6574  {
6576  this->register_filter(_Filter);
6577  }
6578 
6579 #ifdef _CRT_USE_WINAPI_FAMILY_DESKTOP_APP
6580 
6596  overwrite_buffer(Scheduler& _PScheduler) :
6598  {
6599  this->initialize_source_and_target(&_PScheduler);
6600  }
6601 
6620 
6621  overwrite_buffer(Scheduler& _PScheduler,
6622  filter_method const& _Filter) :
6624  {
6625  this->initialize_source_and_target(&_PScheduler);
6626  this->register_filter(_Filter);
6627  }
6628 
6645 
6646  overwrite_buffer(ScheduleGroup& _PScheduleGroup) :
6648  {
6649  this->initialize_source_and_target(NULL, &_PScheduleGroup);
6650  }
6651 
6671 
6672  overwrite_buffer(ScheduleGroup& _PScheduleGroup,
6673  filter_method const& _Filter) :
6675  {
6676  this->initialize_source_and_target(NULL, &_PScheduleGroup);
6677  this->register_filter(_Filter);
6678  }
6679 #endif /* _CRT_USE_WINAPI_FAMILY_DESKTOP_APP */
6680 
6684 
6686  {
6687  // Remove all links that are targets of this overwrite_buffer
6688  this->remove_network_links();
6689 
6690  // Clean up any messages left in this message block
6692  }
6693 
6700 
6701  bool has_value() const
6702  {
6703  return _M_fIsInitialized != 0;
6704  }
6705 
6716 
6718  {
6719  return ::Concurrency::receive<_Type>(this);
6720  }
6721 
6722 protected:
6723 
6724  //
6725  // propagator_block protected function implementation
6726  //
6727 
6742 
6744  {
6745  // It is important that calls to propagate do *not* take the same lock on the
6746  // internal structure that is used by Consume and the LWT. Doing so could
6747  // result in a deadlock with the Consume call.
6748 
6750 
6751  _PMessage = _PSource->accept(_PMessage->msg_id(), this);
6752 
6753  //
6754  // If message was accepted, set the member variables for
6755  // this block and start the asynchronous propagation task
6756  //
6757  if (_PMessage != NULL)
6758  {
6759  // Add a reference for the async_send holding the message
6760  _PMessage->add_ref();
6761 
6762  this->async_send(_PMessage);
6763  }
6764  else
6765  {
6766  _Result = missed;
6767  }
6768 
6769  return _Result;
6770  }
6771 
6786 
6788  {
6789  _PMessage = _PSource->accept(_PMessage->msg_id(), this);
6790 
6791  //
6792  // If message was accepted, set the member variables for
6793  // this block and start the asynchronous propagation task
6794  //
6795  if (_PMessage != NULL)
6796  {
6797  // Add a reference for the sync_send holding the message
6798  _PMessage->add_ref();
6799 
6800  this->sync_send(_PMessage);
6801  }
6802  else
6803  {
6804  return missed;
6805  }
6806 
6807  return accepted;
6808  }
6809 
6817 
6819  {
6820  return true;
6821  }
6822 
6838 
6839  virtual message<_Type> * accept_message(runtime_object_identity _MsgId)
6840  {
6841  //
6842  // If the internal message has not yet been initialized yet, return NULL
6843  //
6844  if (_M_pMessage == NULL)
6845  {
6846  return NULL;
6847  }
6848 
6849  //
6850  // Instead of returning the internal message, we return a copy of the
6851  // message stored.
6852  //
6853  // Because we are returning a copy, the accept routine for an overwritebuffer
6854  // does not need to grab the internalLock
6855  //
6856  message<_Type> * _Msg = NULL;
6857  if (_M_pMessage->msg_id() == _MsgId)
6858  {
6859  _Msg = new message<_Type>(_M_pMessage->payload);
6860  }
6861 
6862  return _Msg;
6863  }
6864 
6878 
6879  virtual bool reserve_message(runtime_object_identity _MsgId)
6880  {
6881  // Ensure that this message currently exists in the overwrite buffer
6882  if (_M_pMessage == NULL || _M_pMessage->msg_id() != _MsgId)
6883  {
6884  return false;
6885  }
6886 
6887  // Can only reserve one message, any other blocks trying to reserve
6888  // will return false
6890 
6891  // Save this message away
6893 
6894  // Add a reference for this message to prevent deletion
6895  _M_pReservedMessage->add_ref();
6896 
6897  return true;
6898  }
6899 
6913 
6914  virtual message<_Type> * consume_message(runtime_object_identity _MsgId)
6915  {
6916  // Leave and return NULL if this msgId doesn't match the reserved message
6917  // Otherwise this is a pull of a later overwritten message, and messages
6918  // could them appear out of order.
6919  if (_M_pReservedMessage != NULL && _M_pReservedMessage->msg_id() != _MsgId)
6920  {
6921  return NULL;
6922  }
6923  // This redundant assert is specifically to make the /analyze switch happy, which cannot recognize the same assertion above in if stmnt.
6925 
6926  _Type _Payload = _M_pReservedMessage->payload;
6927 
6928  // Take the reserved message
6929  message<_Type> * _Result = new message<_Type>(_Payload);
6930 
6931  if (_M_pReservedMessage->remove_ref() == 0)
6932  {
6933  delete _M_pReservedMessage;
6934  }
6936 
6937  return _Result;
6938  }
6939 
6946 
6947  virtual void release_message(runtime_object_identity _MsgId)
6948  {
6951 
6952  if (_MsgId != _M_pReservedMessage->msg_id())
6953  {
6954  throw message_not_found();
6955  }
6956 
6957  if (_M_pReservedMessage->remove_ref() == 0)
6958  {
6959  delete _M_pReservedMessage;
6960  }
6962  }
6963 
6967 
6968  virtual void resume_propagation()
6969  {
6970  // On reservation we do not stop propagation. So there
6971  // is nothing to be done to resume propagation.
6972  }
6973 
6980 
6982  {
6983  // If there is a message available already, propagate it
6984  if (_M_pMessage != NULL)
6985  {
6986  _PTarget->propagate(_M_pMessage, this);
6987  }
6988  }
6989 
7001 
7003  {
7004  // Move the message from the queuedMessages Buffer to the internal storage
7005 
7006  // Add a reference for the overwrite_buffer holding the message
7007  _PMessage->add_ref();
7008 
7009  if (_M_pMessage != NULL)
7010  {
7011  if (_M_pMessage->remove_ref() == 0)
7012  {
7013  delete _M_pMessage;
7014  }
7015  }
7016 
7017  _M_pMessage = _PMessage;
7018 
7019  // Now that message has been received, set this block as initialized
7020  _M_fIsInitialized = true;
7021 
7022  for (target_iterator _Iter = this->_M_connectedTargets.begin(); *_Iter != NULL; ++_Iter)
7023  {
7024  // Overwrite buffers can propagate its message out
7025  // to any number of Targets
7026 
7027  ITarget<_Type> * _PTarget = *_Iter;
7028  _PTarget->propagate(_PMessage, this);
7029  }
7030 
7031  if (_PMessage->remove_ref() == 0)
7032  {
7033  delete _PMessage;
7034  }
7035  }
7036 
7037 private:
7038 
7043 
7045  {
7046  // Input messages for this message block are in the base-class input buffer
7047  // All messages in that buffer are guaranteed to have moved to the output
7048  // buffer because the destructor first waits for all async sends to finish
7049  // before reaching this point
7050 
7051  // The messages for an overwrite buffer are deleted when overwritten
7052  // through reference counting. This final check is put in place in
7053  // case any message still exists in the buffer when the overwrite_buffer
7054  // is deleted. The reference count of this message has not yet reached
7055  // zero because it hasn't been overwritten yet. It is safe because of
7056  // we have finished all propagation.
7057  if (_M_pMessage != NULL)
7058  {
7059  // A block can only have a reserved message after receiving a message
7060  // at some point, so it must be within the above if-clause.
7061  // Now delete the reserved message if it is non-NULL and different from
7062  // the saved internal message
7064  {
7065  delete _M_pReservedMessage;
7066  }
7067  delete _M_pMessage;
7068  }
7069  }
7070 
7071  //
7072  // Private Data Members
7073  //
7074 
7075  // The message being stored
7077 
7078  // The message being reserved
7080 
7081  // The marker for whether the overwrite buffer has already been initialized
7082  volatile bool _M_fIsInitialized;
7083 
7084 private:
7085  //
7086  // Hide assignment operator and copy constructor
7087  //
7088  overwrite_buffer const &operator =(overwrite_buffer const&); // no assignment operator
7089  overwrite_buffer(overwrite_buffer const &); // no copy constructor
7090 };
7091 
7092 //**************************************************************************
7093 // Call:
7094 //**************************************************************************
7095 
7110 
7111 template<class _Type, class _FunctorType = std::function<void(_Type const&)>>
7112 class call : public target_block<multi_link_registry<ISource<_Type>>>
7113 {
7114 private:
7118 
7119  typedef _FunctorType _Call_method;
7120 
7122 
7123 public:
7125 
7143 
7144  call(_Call_method const& _Func) :
7145  _M_pFunc(_Func)
7146  {
7147  this->initialize_target();
7148  this->enable_batched_processing();
7149  }
7150 
7168 
7169  call(_Call_method const& _Func,
7170  filter_method const& _Filter) :
7171  _M_pFunc(_Func)
7172  {
7173  this->initialize_target();
7174  this->enable_batched_processing();
7175  this->register_filter(_Filter);
7176  }
7177 
7178 #ifdef _CRT_USE_WINAPI_FAMILY_DESKTOP_APP
7179 
7200  call(Scheduler& _PScheduler,
7201  _Call_method const& _Func) :
7202  _M_pFunc(_Func)
7203  {
7204  this->initialize_target(&_PScheduler);
7205  this->enable_batched_processing();
7206  }
7207 
7231 
7232  call(Scheduler& _PScheduler,
7233  _Call_method const& _Func,
7234  filter_method const& _Filter) :
7235  _M_pFunc(_Func)
7236  {
7237  this->initialize_target(&_PScheduler);
7238  this->enable_batched_processing();
7239  this->register_filter(_Filter);
7240  }
7241 
7263 
7264  call(ScheduleGroup& _PScheduleGroup,
7265  _Call_method const& _Func) :
7266  _M_pFunc(_Func)
7267  {
7268  this->initialize_target(NULL, &_PScheduleGroup);
7269  this->enable_batched_processing();
7270  }
7271 
7296 
7297  call(ScheduleGroup& _PScheduleGroup,
7298  _Call_method const& _Func,
7299  filter_method const& _Filter) :
7300  _M_pFunc(_Func)
7301  {
7302  this->initialize_target(NULL, &_PScheduleGroup);
7303  this->enable_batched_processing();
7304  this->register_filter(_Filter);
7305  }
7306 #endif /* _CRT_USE_WINAPI_FAMILY_DESKTOP_APP */
7307 
7311 
7313  {
7314  this->remove_sources();
7315  }
7316 
7317 protected:
7318 
7319  //
7320  // target_block protected function implementations
7321  //
7322 
7337 
7339  {
7340  // It is important that calls to propagate do *not* take the same lock on the
7341  // internal structure that is used by Consume and the LWT. Doing so could
7342  // result in a deadlock with the Consume call.
7343 
7345 
7346  //
7347  // Accept the message being propagated
7348  // Note: depending on the source block propagating the message
7349  // this may not necessarily be the same message (pMessage) first
7350  // passed into the function.
7351  //
7352  _PMessage = _PSource->accept(_PMessage->msg_id(), this);
7353 
7354  if (_PMessage != NULL)
7355  {
7356  this->async_send(_PMessage);
7357  }
7358  else
7359  {
7360  _Result = missed;
7361  }
7362 
7363  return _Result;
7364  }
7365 
7380 
7382  {
7384 
7385  //
7386  // Accept the message being propagated
7387  // Note: depending on the source block propagating the message
7388  // this may not necessarily be the same message (pMessage) first
7389  // passed into the function.
7390  //
7391  _PMessage = _PSource->accept(_PMessage->msg_id(), this);
7392 
7393  if (_PMessage != NULL)
7394  {
7395  this->sync_send(_PMessage);
7396  }
7397  else
7398  {
7399  _Result = missed;
7400  }
7401 
7402  return _Result;
7403  }
7404 
7412 
7414  {
7415  return true;
7416  }
7417 
7424 
7425  virtual void process_message(_Inout_ message<_Type> * _PMessage)
7426  {
7427  (void) _PMessage;
7428  // No longer necessary with CRT110 change
7429  }
7430 
7434 
7436  {
7437  // Invoke the function provided by the user
7438  _CONCRT_ASSERT(_PMessage != NULL);
7439  _M_pFunc(_PMessage->payload);
7440  delete _PMessage;
7441  }
7442 
7443 private:
7444 
7445  //
7446  // Private Data Members
7447  //
7448 
7449  // The call method called by this block
7450  _Call_method _M_pFunc;
7451 
7452 private:
7453  //
7454  // Hide assignment operator and copy constructor
7455  //
7456  call const &operator =(call const&); // no assignment operator
7457  call(call const &); // no copy constructor
7458 };
7459 
7460 //**************************************************************************
7461 // Transformer:
7462 //**************************************************************************
7463 
7479 
7480 template<class _Input, class _Output>
7481 class transformer : public propagator_block<single_link_registry<ITarget<_Output>>, multi_link_registry<ISource<_Input>>>
7482 {
7483 private:
7484  typedef std::function<_Output(_Input const&)> _Transform_method;
7487 
7488 public:
7492 
7513 
7514  transformer(_Transform_method const& _Func,
7515  _Inout_opt_ ITarget<_Output> * _PTarget = NULL) :
7516  _M_pFunc(_Func)
7517  {
7519 
7520  if (_PTarget != NULL)
7521  {
7522  this->link_target(_PTarget);
7523  }
7524  }
7525 
7549 
7550  transformer(_Transform_method const& _Func,
7551  _Inout_opt_ ITarget<_Output> * _PTarget,
7552  filter_method const& _Filter) :
7553  _M_pFunc(_Func)
7554  {
7556  this->register_filter(_Filter);
7557 
7558  if (_PTarget != NULL)
7559  {
7560  this->link_target(_PTarget);
7561  }
7562  }
7563 
7564 #ifdef _CRT_USE_WINAPI_FAMILY_DESKTOP_APP
7565 
7589  transformer(Scheduler& _PScheduler,
7590  _Transform_method const& _Func,
7591  _Inout_opt_ ITarget<_Output> * _PTarget = NULL) :
7592  _M_pFunc(_Func)
7593  {
7594  this->initialize_source_and_target(&_PScheduler);
7595 
7596  if (_PTarget != NULL)
7597  {
7598  this->link_target(_PTarget);
7599  }
7600  }
7601 
7628 
7629  transformer(Scheduler& _PScheduler,
7630  _Transform_method const& _Func,
7631  _Inout_opt_ ITarget<_Output> * _PTarget,
7632  filter_method const& _Filter) :
7633  _M_pFunc(_Func)
7634  {
7635  this->initialize_source_and_target(&_PScheduler);
7636  this->register_filter(_Filter);
7637 
7638  if (_PTarget != NULL)
7639  {
7640  this->link_target(_PTarget);
7641  }
7642  }
7643 
7668 
7669  transformer(ScheduleGroup& _PScheduleGroup,
7670  _Transform_method const& _Func,
7671  _Inout_opt_ ITarget<_Output> * _PTarget = NULL) :
7672  _M_pFunc(_Func)
7673  {
7674  this->initialize_source_and_target(NULL, &_PScheduleGroup);
7675 
7676  if (_PTarget != NULL)
7677  {
7678  this->link_target(_PTarget);
7679  }
7680  }
7681 
7709 
7710  transformer(ScheduleGroup& _PScheduleGroup,
7711  _Transform_method const& _Func,
7712  _Inout_opt_ ITarget<_Output> * _PTarget,
7713  filter_method const& _Filter) :
7714  _M_pFunc(_Func)
7715  {
7716  this->initialize_source_and_target(NULL, &_PScheduleGroup);
7717  this->register_filter(_Filter);
7718 
7719  if (_PTarget != NULL)
7720  {
7721  this->link_target(_PTarget);
7722  }
7723  }
7724 #endif /* _CRT_USE_WINAPI_FAMILY_DESKTOP_APP */
7725 
7729 
7731  {
7732  // Remove all links
7733  this->remove_network_links();
7734 
7735  // Clean up any messages left in this message block
7737  }
7738 
7739 protected:
7740 
7741  // Propagator_block protected function implementations
7742 
7757 
7759  {
7760  // It is important that calls to propagate do *not* take the same lock on the
7761  // internal structure that is used by Consume and the LWT. Doing so could
7762  // result in a deadlock with the Consume call.
7763 
7765 
7766  //
7767  // Accept the message being propagated
7768  // Note: depending on the source block propagating the message
7769  // this may not necessarily be the same message (pMessage) first
7770  // passed into the function.
7771  //
7772  _PMessage = _PSource->accept(_PMessage->msg_id(), this);
7773 
7774  if (_PMessage != NULL)
7775  {
7776  // Enqueue the input message
7777  _M_inputMessages.push(_PMessage);
7778  this->async_send(NULL);
7779  }
7780  else
7781  {
7782  _Result = missed;
7783  }
7784 
7785  return _Result;
7786  }
7787 
7802 
7804  {
7805  _PMessage = _PSource->accept(_PMessage->msg_id(), this);
7806 
7807  if (_PMessage != NULL)
7808  {
7809  // Enqueue the input message
7810  _M_inputMessages.push(_PMessage);
7811  this->sync_send(NULL);
7812  }
7813  else
7814  {
7815  return missed;
7816  }
7817 
7818  return accepted;
7819  }
7820 
7828 
7830  {
7831  return true;
7832  }
7833 
7844 
7845  virtual message<_Output> * accept_message(runtime_object_identity _MsgId)
7846  {
7847  //
7848  // Peek at the head message in the message buffer. If the IDs match
7849  // dequeue and transfer ownership
7850  //
7851  message<_Output> * _Msg = NULL;
7852 
7853  if (_M_messageBuffer._Is_head(_MsgId))
7854  {
7855  _Msg = _M_messageBuffer._Dequeue();
7856  }
7857 
7858  return _Msg;
7859  }
7860 
7874 
7875  virtual bool reserve_message(runtime_object_identity _MsgId)
7876  {
7877  // Allow reservation if this is the head message
7878  return _M_messageBuffer._Is_head(_MsgId);
7879  }
7880 
7894 
7895  virtual message<_Output> * consume_message(runtime_object_identity _MsgId)
7896  {
7897  // By default, accept the message
7898  return accept_message(_MsgId);
7899  }
7900 
7907 
7908  virtual void release_message(runtime_object_identity _MsgId)
7909  {
7910  // The head message is the one reserved.
7911  if (!_M_messageBuffer._Is_head(_MsgId))
7912  {
7913  throw message_not_found();
7914  }
7915  }
7916 
7920 
7921  virtual void resume_propagation()
7922  {
7923  // If there are any messages in the buffer, propagate them out
7924  if (_M_messageBuffer._Count() > 0)
7925  {
7926  // async send a NULL value to initiate the repropagation
7927  this->async_send(NULL);
7928  }
7929  }
7930 
7937 
7939  {
7940  // If the message queue is blocked due to reservation
7941  // there is no need to do any message propagation
7942  if (this->_M_pReservedFor != NULL)
7943  {
7944  return;
7945  }
7946 
7948  }
7949 
7953 
7955  {
7956  message<_Output> * _Msg = NULL;
7957 
7958  // Process input message.
7959  message<_Input> * _PInputMessage = NULL;
7960  _M_inputMessages.try_pop(_PInputMessage);
7961 
7962  if (_PInputMessage != NULL)
7963  {
7964  // Invoke the TransformMethod on the data
7965  // Let exceptions flow
7966  _Output _Out = _M_pFunc(_PInputMessage->payload);
7967 
7968  // Reuse the input message ID
7969  _Msg = new message<_Output>(_Out, _PInputMessage->msg_id());
7970  _M_messageBuffer._Enqueue(_Msg);
7971 
7972  // Message cleanup
7973  delete _PInputMessage;
7974 
7975  if (!_M_messageBuffer._Is_head(_Msg->msg_id()))
7976  {
7977  return;
7978  }
7979  }
7980 
7982  }
7983 
7984 private:
7985 
7992 
7994  {
7995  message<_Target_type> * _Msg = _MessageBuffer._Peek();
7996 
7997  // If someone has reserved the _Head message, don't propagate anymore
7998  if (this->_M_pReservedFor != NULL)
7999  {
8000  return;
8001  }
8002 
8003  while (_Msg != NULL)
8004  {
8005  message_status _Status = declined;
8006 
8007  // Always start from the first target that linked
8008  for (target_iterator _Iter = this->_M_connectedTargets.begin(); *_Iter != NULL; ++_Iter)
8009  {
8010  ITarget<_Target_type> * _PTarget = *_Iter;
8011  _Status = _PTarget->propagate(_Msg, this);
8012 
8013  // Ownership of message changed. Do not propagate this
8014  // message to any other target.
8015  if (_Status == accepted)
8016  {
8017  break;
8018  }
8019 
8020  // If the target just propagated to reserved this message, stop
8021  // propagating it to others
8022  if (this->_M_pReservedFor != NULL)
8023  {
8024  break;
8025  }
8026  }
8027 
8028  // If status is anything other than accepted, then the head message
8029  // was not propagated out. Thus, nothing after it in the queue can
8030  // be propagated out. Cease propagation.
8031  if (_Status != accepted)
8032  {
8033  break;
8034  }
8035 
8036  // Get the next message
8037  _Msg = _MessageBuffer._Peek();
8038  }
8039  }
8040 
8045 
8047  {
8048  // Delete input messages
8049  // Because the transformer uses its own input queue, it's possible there are messages
8050  // in this queue and no LWT will be executed to handle them.
8051  message<_Input> * _PInputQueueMessage = NULL;
8052 
8053  while (_M_inputMessages.try_pop(_PInputQueueMessage))
8054  {
8055  // Message cleanup
8056  delete _PInputQueueMessage;
8057  }
8058 
8059  // Delete any messages remaining in the output queue
8060  for (;;)
8061  {
8062  message<_Output> * _Msg = _M_messageBuffer._Dequeue();
8063  if (_Msg == NULL)
8064  {
8065  break;
8066  }
8067  delete _Msg;
8068  }
8069  }
8070 
8071  //
8072  // Private Data Members
8073  //
8074 
8075  // The transformer method called by this block
8076  _Transform_method _M_pFunc;
8077 
8078  // The queue of input messages for this Transformer block
8080 
8084 
8086 
8087 private:
8088  //
8089  // Hide assignment operator and copy constructor
8090  //
8091  transformer const &operator =(transformer const &); // no assignment operator
8092  transformer(transformer const &); // no copy constructor
8093 };
8094 
8095 //**************************************************************************
8096 // Timer:
8097 //**************************************************************************
8109 
8110 template<class _Type>
8111 class timer : public ::Concurrency::details::_Timer, public source_block<single_link_registry<ITarget<_Type>>>
8112 {
8113 private:
8115 public:
8117 
8118 private:
8122 
8123  enum State
8124  {
8128 
8133 
8138 
8143 
8145  };
8146 
8147 public:
8148 
8171 
8172  timer(unsigned int _Ms, _Type const& _Value, ITarget<_Type> *_PTarget = NULL, bool _Repeating = false) :
8173  _Timer(_Ms, _Repeating)
8174  {
8175  _Initialize(_Value, _PTarget, _Repeating);
8176  }
8177 
8178 #ifdef _CRT_USE_WINAPI_FAMILY_DESKTOP_APP
8179 
8205  timer(Scheduler& _Scheduler, unsigned int _Ms, _Type const& _Value, _Inout_opt_ ITarget<_Type> *_PTarget = NULL, bool _Repeating = false) :
8206  _Timer(_Ms, _Repeating)
8207  {
8208  _Initialize(_Value, _PTarget, _Repeating, &_Scheduler);
8209  }
8210 
8237 
8238  timer(ScheduleGroup& _ScheduleGroup, unsigned int _Ms, _Type const& _Value, _Inout_opt_ ITarget<_Type> *_PTarget = NULL, bool _Repeating = false) :
8239  _Timer(_Ms, _Repeating)
8240  {
8241  _Initialize(_Value, _PTarget, _Repeating, NULL, &_ScheduleGroup);
8242  }
8243 #endif /* _CRT_USE_WINAPI_FAMILY_DESKTOP_APP */
8244 
8248 
8250  {
8251  //
8252  // Make sure there are no more outstanding timer fires. Note that this does not mean that the LWT that was queued is finished, it only
8253  // means that no more timers will fire after the return from _Stop. We still *MUST* wait on any outstanding LWTs.
8254  //
8255  if (_M_state == Started)
8256  _Stop();
8257 
8258  // Remove all the targets. This will wait for any outstanding LWTs
8259  this->remove_targets();
8260 
8261  //
8262  // No more asynchronous operations can happen as of this point.
8263  //
8264 
8265  // Clean up any messages left in this message block
8267 
8269  {
8271  }
8272  }
8273 
8278 
8279  void start()
8280  {
8281  if (_M_state == Initialized || _M_state == Paused)
8282  {
8283  _M_state = Started;
8284  _Start();
8285  }
8286  }
8287 
8291 
8292  void stop()
8293  {
8294  if (_M_state == Started)
8295  _Stop();
8296 
8297  _M_state = Stopped;
8298  }
8299 
8304 
8305  void pause()
8306  {
8307  //
8308  // Non repeating timers cannot pause. They go to a final stopped state on pause.
8309  //
8310  if (!_M_fRepeating)
8311  {
8312  stop();
8313  }
8314  else
8315  {
8316  // Pause only a started timer.
8317 
8318  if (_M_state == Started)
8319  {
8320  _Stop();
8321  _M_state = Paused;
8322  }
8323  }
8324  }
8325 
8326 protected:
8327 
8338 
8339  virtual message<_Type> * accept_message(runtime_object_identity _MsgId)
8340  {
8341  if (_M_pMessage == NULL || _MsgId != _M_pMessage->msg_id())
8342  {
8343  return NULL;
8344  }
8345 
8346  message<_Type> *_PMessage = _M_pMessage;
8347  _M_pMessage = NULL;
8348 
8349  return _PMessage;
8350  }
8351 
8365 
8366  virtual bool reserve_message(runtime_object_identity _MsgId)
8367  {
8368  //
8369  // Semantically, every timer tick is the same value -- it doesn't matter the message ID. Because we can only
8370  // have one target as well, we do not need to track anything here.
8371  //
8372  if (_M_pMessage == NULL || _M_pMessage->msg_id() != _MsgId)
8373  {
8374  return false;
8375  }
8376 
8377  return true;
8378  }
8379 
8393 
8394  virtual message<_Type> * consume_message(runtime_object_identity _MsgId)
8395  {
8396  return accept_message(_MsgId);
8397  }
8398 
8405 
8406  virtual void release_message(runtime_object_identity _MsgId)
8407  {
8408  if (_M_pMessage == NULL || _M_pMessage->msg_id() != _MsgId)
8409  {
8410  throw message_not_found();
8411  }
8412 
8413  delete _M_pMessage;
8414  _M_pMessage = NULL;
8415  }
8416 
8420 
8421  virtual void resume_propagation()
8422  {
8423  // Because reservation doesn't prevent propagation there is
8424  // no need to resume on consume/release.
8425  }
8426 
8433 
8435  {
8436  // If there is a timer message sitting around, it must be propagated to the target now.
8437 
8438  if (_M_pMessage != NULL)
8439  {
8440  _PTarget->propagate(_M_pMessage, this);
8441  }
8442  }
8443 
8447 
8449  {
8450  if (_M_pMessage == NULL)
8451  {
8453  for (target_iterator _Iter = this->_M_connectedTargets.begin(); *_Iter != NULL; ++_Iter)
8454  {
8455  ITarget<_Type> * _PTarget = *_Iter;
8456  _PTarget->propagate(_M_pMessage, this);
8457  }
8458  }
8459  }
8460 
8461 private:
8462 
8463  // The timer message we contain
8465 
8466  // Current state of the timer.
8468 
8469  // The value to send on elapse of the timer.
8471 
8472  // An indication of whether the timer is repeating.
8474 
8475  // A flag for whether we need to release a reference on the scheduler.
8477 
8478  // Scheduler used for the timer
8479  Scheduler * _M_pScheduler;
8480 
8484 
8486  {
8487  return new message<_Type>(_M_value);
8488  }
8489 
8493 
8494  virtual void _Fire()
8495  {
8496  this->async_send(NULL);
8497  }
8498 
8511 
8512  void _Initialize(const _Type& _Value, _Inout_ ITarget<_Type> *_PTarget, bool _Repeating, _Inout_opt_ Scheduler * _PScheduler = NULL, _Inout_opt_ ScheduleGroup * _PScheduleGroup = NULL)
8513  {
8514  _M_pMessage = NULL;
8515  _M_value = _Value;
8516  _M_fRepeating = _Repeating;
8517  _M_state = Initialized;
8518  _M_fReferencedScheduler = false;
8519 
8520  //
8521  // If we are going to utilize the current scheduler for timer firing, we need to capture it now. Otherwise,
8522  // the timer threads fired from Windows (what _Fire executes within) will wind up with a default scheduler
8523  // attached -- probably not the semantic we want.
8524  //
8525  if (_PScheduleGroup == NULL && _PScheduler == NULL)
8526  {
8528  _PScheduler = _sched._GetScheduler();
8529  _sched._Reference();
8530  _M_fReferencedScheduler = true;
8531  }
8532 
8533  _M_pScheduler = _PScheduler;
8534  this->initialize_source(_PScheduler, _PScheduleGroup);
8535 
8536  if (_PTarget != NULL)
8537  {
8538  link_target(_PTarget);
8539  }
8540  }
8541 
8546 
8548  {
8549  // Input messages for this message block are in the base-class input buffer
8550  // All messages in that buffer are guaranteed to have moved to the output
8551  // buffer because the destructor first waits for all async sends to finish
8552  // before reaching this point
8553 
8554  // Delete the message remaining in the output queue
8555  if (_M_pMessage != NULL)
8556  {
8557  delete _M_pMessage;
8558  }
8559  }
8560 
8561 private:
8562  //
8563  // Hide assignment operator and copy constructor
8564  //
8565  timer const &operator =(timer const &); // no assignment operator
8566  timer(timer const &); // no copy constructor
8567 };
8568 
8569 //**************************************************************************
8570 // Single assignment:
8571 //**************************************************************************
8572 
8587 
8588 template<class _Type>
8589 class single_assignment : public propagator_block<multi_link_registry<ITarget<_Type>>, multi_link_registry<ISource<_Type>>>
8590 {
8591 private:
8594 
8595 public:
8599 
8612 
8615  {
8617  }
8618 
8634 
8637  {
8639  this->register_filter(_Filter);
8640  }
8641 
8642 #ifdef _CRT_USE_WINAPI_FAMILY_DESKTOP_APP
8643 
8659  single_assignment(Scheduler& _PScheduler) :
8661  {
8662  this->initialize_source_and_target(&_PScheduler);
8663  }
8664 
8683 
8684  single_assignment(Scheduler& _PScheduler, filter_method const& _Filter) :
8686  {
8687  this->initialize_source_and_target(&_PScheduler);
8688  this->register_filter(_Filter);
8689  }
8690 
8707 
8708  single_assignment(ScheduleGroup& _PScheduleGroup) :
8710  {
8711  this->initialize_source_and_target(NULL, &_PScheduleGroup);
8712  }
8713 
8733 
8734  single_assignment(ScheduleGroup& _PScheduleGroup, filter_method const& _Filter) :
8736  {
8737  this->initialize_source_and_target(NULL, &_PScheduleGroup);
8738  this->register_filter(_Filter);
8739  }
8740 #endif /* _CRT_USE_WINAPI_FAMILY_DESKTOP_APP */
8741 
8745 
8747  {
8748  // Remove all links
8749  this->remove_network_links();
8750 
8751  // Clean up any messages left in this message block
8753  }
8754 
8761 
8762  bool has_value() const
8763  {
8764  return (_M_pMessage != NULL);
8765  }
8766 
8767 
8777 
8778  _Type const & value()
8779  {
8780  if (_M_pMessage == NULL)
8781  {
8782  ::Concurrency::receive<_Type>(this);
8783  }
8785 
8786  return _M_pMessage->payload;
8787  }
8788 
8789 
8790 protected:
8791 
8806 
8808  {
8809  // It is important that calls to propagate do *not* take the same lock on the
8810  // internal structure that is used by Consume and the LWT. Doing so could
8811  // result in a deadlock with the Consume call.
8812 
8814 
8815  // single_assignment messaging block can be initialized only once
8816  if (_M_fIsInitialized)
8817  {
8818  return declined;
8819  }
8820 
8821  {
8822  _NR_lock _Lock(_M_propagationLock);
8823 
8824  if (_M_fIsInitialized)
8825  {
8826  _Result = declined;
8827  }
8828  else
8829  {
8830  _PMessage = _PSource->accept(_PMessage->msg_id(), this);
8831 
8832  // Set initialized flag only if we have a message
8833  if (_PMessage != NULL)
8834  {
8835  _M_fIsInitialized = true;
8836  }
8837  else
8838  {
8839  _Result = missed;
8840  }
8841  }
8842  }
8843 
8844  //
8845  // If message was accepted, set the member variables for
8846  // this block and start the asynchronous propagation task
8847  //
8848  if (_Result == accepted)
8849  {
8850  this->async_send(_PMessage);
8851  }
8852 
8853  return _Result;
8854  }
8855 
8870 
8872  {
8874 
8875  // single_assignment messaging block can be initialized only once
8876  if (_M_fIsInitialized)
8877  {
8878  return declined;
8879  }
8880 
8881  {
8882  _NR_lock _Lock(_M_propagationLock);
8883 
8884  if (_M_fIsInitialized)
8885  {
8886  _Result = declined;
8887  }
8888  else
8889  {
8890  _PMessage = _PSource->accept(_PMessage->msg_id(), this);
8891 
8892  // Set initialized flag only if we have a message
8893  if (_PMessage != NULL)
8894  {
8895  _M_fIsInitialized = true;
8896  }
8897  else
8898  {
8899  _Result = missed;
8900  }
8901  }
8902  }
8903 
8904  //
8905  // If message was accepted, set the member variables for
8906  // this block and start the asynchronous propagation task
8907  //
8908  if (_Result == accepted)
8909  {
8910  this->sync_send(_PMessage);
8911  }
8912 
8913  return _Result;
8914  }
8915 
8931 
8932  virtual message<_Type> * accept_message(runtime_object_identity _MsgId)
8933  {
8934  // This check is to prevent spoofing and verify that the propagated message is
8935  // the one that is accepted at the end.
8936  if (_M_pMessage == NULL || _MsgId != _M_pMessage->msg_id())
8937  {
8938  return NULL;
8939  }
8940 
8941  //
8942  // Instead of returning the internal message, we return a copy of the
8943  // message passed in.
8944  //
8945  // Because we are returning a copy, the accept routine for a single_assignment
8946  // does not need to grab the internal lock.
8947  //
8948  return (new message<_Type>(_M_pMessage->payload));
8949  }
8950 
8964 
8965  virtual bool reserve_message(runtime_object_identity _MsgId)
8966  {
8967  if (_M_pMessage == NULL)
8968  {
8969  return false;
8970  }
8971 
8972  if (_M_pMessage->msg_id() != _MsgId)
8973  {
8974  throw message_not_found();
8975  }
8976 
8977  return true;
8978  }
8979 
8993 
8994  virtual message<_Type> * consume_message(runtime_object_identity _MsgId)
8995  {
8997 
8998  return accept_message(_MsgId);
8999  }
9000 
9007 
9008  virtual void release_message(runtime_object_identity _MsgId)
9009  {
9011 
9012  if (_M_pMessage == NULL || _M_pMessage->msg_id() != _MsgId)
9013  {
9014  throw message_not_found();
9015  }
9016  }
9017 
9021 
9022  virtual void resume_propagation()
9023  {
9024  // Because reservation doesn't stop propagation, we don't
9025  // need to do anything on resume after consume/release.
9026  }
9027 
9034 
9036  {
9037  // If there is a message available already, propagate it.
9038 
9039  if (_M_pMessage != NULL)
9040  {
9041  _PTarget->propagate(_M_pMessage, this);
9042  }
9043  }
9051 
9053  {
9054  // Initialized flag should have been set by the propagate function using interlocked operation.
9056 
9057  // Move the message to the internal storage
9058 
9060  _M_pMessage = _PMessage;
9061 
9062  for (target_iterator _Iter = this->_M_connectedTargets.begin(); *_Iter != NULL; ++_Iter)
9063  {
9064  // Single assignment can propagate its message out
9065  // to any number of Targets
9066 
9067  ITarget<_Type> * _PTarget = *_Iter;
9068  _PTarget->propagate(_PMessage, this);
9069  }
9070  }
9071 
9072 private:
9073 
9078 
9080  {
9081  // Input messages for this message block are in the base-class input buffer
9082  // All messages in that buffer are guaranteed to have moved to the output
9083  // buffer because the destructor first waits for all async sends to finish
9084  // before reaching this point
9085 
9086  // The messages for a single_assignment are deleted at the end when
9087  // single_assignment is deleted.
9088  delete _M_pMessage;
9089  }
9090 
9091  //
9092  // Private Data Members
9093  //
9094 
9095  // The message being stored
9097 
9098  // The lock used to protect propagation
9100 
9101  // The marker for whether the single_assignment has already been initialized
9102  volatile bool _M_fIsInitialized;
9103 
9104 private:
9105  //
9106  // Hide assignment operator and copy constructor
9107  //
9108  single_assignment const & operator=(single_assignment const &); // no assignment operator
9109  single_assignment(single_assignment const &); // no copy constructor
9110 };
9111 
9112 //**************************************************************************
9113 // Join (single-type)
9114 //**************************************************************************
9115 
9119 
9125 
9126  greedy = 0,
9131 
9133 };
9134 
9152 
9153 template<class _Type, join_type _Jtype = non_greedy>
9154 class join : public propagator_block<single_link_registry<ITarget<std::vector<_Type>>>, multi_link_registry<ISource<_Type>>>
9155 {
9156 private:
9159 
9160 public:
9165 
9166  typedef typename std::vector<_Type> _OutputType;
9167 
9183 
9184  join(size_t _NumInputs)
9185  : _M_messageArray(_NumInputs),
9186  _M_savedMessageIdArray(_NumInputs)
9187  {
9188  _Initialize(_NumInputs);
9189  }
9190 
9209 
9210  join(size_t _NumInputs, filter_method const& _Filter)
9211  : _M_messageArray(_NumInputs),
9212  _M_savedMessageIdArray(_NumInputs)
9213  {
9214  _Initialize(_NumInputs);
9215  register_filter(_Filter);
9216  }
9217 
9218 #ifdef _CRT_USE_WINAPI_FAMILY_DESKTOP_APP
9219 
9238  join(Scheduler& _PScheduler, size_t _NumInputs)
9239  : _M_messageArray(_NumInputs),
9240  _M_savedMessageIdArray(_NumInputs)
9241  {
9242  _Initialize(_NumInputs, &_PScheduler);
9243  }
9244 
9266 
9267  join(Scheduler& _PScheduler, size_t _NumInputs, filter_method const& _Filter)
9268  : _M_messageArray(_NumInputs),
9269  _M_savedMessageIdArray(_NumInputs)
9270  {
9271  _Initialize(_NumInputs, &_PScheduler);
9272  register_filter(_Filter);
9273  }
9274 
9294 
9295  join(ScheduleGroup& _PScheduleGroup, size_t _NumInputs)
9296  : _M_messageArray(_NumInputs),
9297  _M_savedMessageIdArray(_NumInputs)
9298  {
9299  _Initialize(_NumInputs, NULL, &_PScheduleGroup);
9300  }
9301 
9324 
9325  join(ScheduleGroup& _PScheduleGroup, size_t _NumInputs, filter_method const& _Filter)
9326  : _M_messageArray(_NumInputs),
9327  _M_savedMessageIdArray(_NumInputs)
9328  {
9329  _Initialize(_NumInputs, NULL, &_PScheduleGroup);
9330  register_filter(_Filter);
9331  }
9332 #endif /* _CRT_USE_WINAPI_FAMILY_DESKTOP_APP */
9333 
9337 
9339  {
9340  // Remove all links that are targets of this join
9341  this->remove_network_links();
9342 
9343  // Clean up any messages left in this message block
9345 
9346  delete [] _M_savedIdBuffer;
9347  }
9348 
9349 protected:
9350  //
9351  // propagator_block protected function implementations
9352  //
9353 
9368 
9370  {
9371  // It is important that calls to propagate do *not* take the same lock on the
9372  // internal structure that is used by Consume and the LWT. Doing so could
9373  // result in a deadlock with the Consume call.
9374 
9375  message_status _Ret_val = accepted;
9376 
9377  //
9378  // Find the slot index of this source
9379  //
9380  size_t _Slot = 0;
9381  bool _Found = false;
9382  for (source_iterator _Iter = this->_M_connectedSources.begin(); *_Iter != NULL; ++_Iter)
9383  {
9384  if (*_Iter == _PSource)
9385  {
9386  _Found = true;
9387  break;
9388  }
9389 
9390  _Slot++;
9391  }
9392 
9393  if (!_Found)
9394  {
9395  // If this source was not found in the array, this is not a connected source
9396  // decline the message
9397  return declined;
9398  }
9399 
9401 
9402  bool fIsGreedy = (_Jtype == greedy);
9403 
9404  if (fIsGreedy)
9405  {
9406  //
9407  // Greedy type joins immediately accept the message.
9408  //
9409  {
9410  _NR_lock lockHolder(_M_propagationLock);
9411  if (_M_messageArray._M_messages[_Slot] != NULL)
9412  {
9413  _M_savedMessageIdArray._M_savedIds[_Slot] = _PMessage->msg_id();
9414  _Ret_val = postponed;
9415  }
9416  }
9417 
9418  if (_Ret_val != postponed)
9419  {
9420  _M_messageArray._M_messages[_Slot] = _PSource->accept(_PMessage->msg_id(), this);
9421 
9422  if (_M_messageArray._M_messages[_Slot] != NULL)
9423  {
9425  {
9426  // If messages have arrived on all links, start a propagation
9427  // of the current message
9428  this->async_send(NULL);
9429  }
9430  }
9431  else
9432  {
9433  _Ret_val = missed;
9434  }
9435  }
9436  }
9437  else
9438  {
9439  //
9440  // Non-greedy type joins save the message IDs until they have all arrived
9441  //
9442 
9443  if (_InterlockedExchange((volatile long *) &_M_savedMessageIdArray._M_savedIds[_Slot], _PMessage->msg_id()) == -1)
9444  {
9445  // Decrement the message remaining count if this thread is switching
9446  // the saved ID from -1 to a valid value.
9448  {
9449  this->async_send(NULL);
9450  }
9451  }
9452 
9453  // Always return postponed. This message will be consumed
9454  // in the LWT
9455  _Ret_val = postponed;
9456  }
9457 
9458  return _Ret_val;
9459  }
9460 
9471 
9472  virtual message<_OutputType> * accept_message(runtime_object_identity _MsgId)
9473  {
9474  //
9475  // Peek at the head message in the message buffer. If the IDs match
9476  // dequeue and transfer ownership
9477  //
9478  message<_OutputType> * _Msg = NULL;
9479 
9480  if (_M_messageBuffer._Is_head(_MsgId))
9481  {
9482  _Msg = _M_messageBuffer._Dequeue();
9483  }
9484 
9485  return _Msg;
9486  }
9487 
9501 
9502  virtual bool reserve_message(runtime_object_identity _MsgId)
9503  {
9504  // Allow reservation if this is the head message
9505  return _M_messageBuffer._Is_head(_MsgId);
9506  }
9507 
9521 
9522  virtual message<_OutputType> * consume_message(runtime_object_identity _MsgId)
9523  {
9524  // By default, accept the message
9525  return accept_message(_MsgId);
9526  }
9527 
9534 
9535  virtual void release_message(runtime_object_identity _MsgId)
9536  {
9537  // The head message is the one reserved.
9538  if (!_M_messageBuffer._Is_head(_MsgId))
9539  {
9540  throw message_not_found();
9541  }
9542  }
9543 
9547 
9548  virtual void resume_propagation()
9549  {
9550  // If there are any messages in the buffer, propagate them out
9551  if (_M_messageBuffer._Count() > 0)
9552  {
9553  this->async_send(NULL);
9554  }
9555  }
9556 
9563 
9564  virtual void link_target_notification(_Inout_ ITarget<std::vector<_Type>> *)
9565  {
9566  // If the message queue is blocked due to reservation
9567  // there is no need to do any message propagation
9568  if (this->_M_pReservedFor != NULL)
9569  {
9570  return;
9571  }
9572 
9574  }
9575 
9581 
9583  {
9584  message<_OutputType> * _Msg = NULL;
9585  // Create a new message from the input sources
9586  // If messagesRemaining == 0, we have a new message to create. Otherwise, this is coming from
9587  // a consume or release from the target. In that case we don't want to create a new message.
9588  if (_M_messagesRemaining == 0)
9589  {
9590  // A greedy join can immediately create the message, a non-greedy
9591  // join must try and consume all the messages it has postponed
9592  _Msg = _Create_new_message();
9593  }
9594 
9595  if (_Msg == NULL)
9596  {
9597  // Create message failed. This happens in non_greedy joins when the
9598  // reserve/consumption of a postponed message failed.
9600  return;
9601  }
9602 
9603  bool fIsGreedy = (_Jtype == greedy);
9604 
9605  // For a greedy join, reset the number of messages remaining
9606  // Check to see if multiple messages have been passed in on any of the links,
9607  // and postponed. If so, try and reserve/consume them now
9608  if (fIsGreedy)
9609  {
9610  // Look at the saved IDs and reserve/consume any that have passed in while
9611  // this join was waiting to complete
9613 
9614  for (size_t i = 0; i < _M_messageArray._M_count; i++)
9615  {
9616  for(;;)
9617  {
9618  runtime_object_identity _Saved_id;
9619  // Grab the current saved ID value. This value could be changing from based on any
9620  // calls of source->propagate(this). If the message ID is different than what is snapped
9621  // here, that means, the reserve below must fail. This is because reserve is trying
9622  // to get the same source lock the propagate(this) call must be holding.
9623  {
9624  _NR_lock lockHolder(_M_propagationLock);
9625 
9627 
9628  _Saved_id = _M_savedMessageIdArray._M_savedIds[i];
9629 
9630  if (_Saved_id == -1)
9631  {
9633  break;
9634  }
9635  else
9636  {
9638  }
9639  }
9640 
9641  if (_Saved_id != -1)
9642  {
9643  source_iterator _Iter = this->_M_connectedSources.begin();
9644 
9645  ISource<_Type> * _PSource = _Iter[i];
9646  if ((_PSource != NULL) && _PSource->reserve(_Saved_id, this))
9647  {
9648  _M_messageArray._M_messages[i] = _PSource->consume(_Saved_id, this);
9650  break;
9651  }
9652  }
9653  }
9654  }
9655 
9656  // If messages have all been received, async_send again, this will start the
9657  // LWT up to create a new message
9658  if (_M_messagesRemaining == 0)
9659  {
9660  this->async_send(NULL);
9661  }
9662  }
9663 
9664  // Add the new message to the outbound queue
9665  _M_messageBuffer._Enqueue(_Msg);
9666 
9667  if (!_M_messageBuffer._Is_head(_Msg->msg_id()))
9668  {
9669  // another message is at the head of the outbound message queue and blocked
9670  // simply return
9671  return;
9672  }
9673 
9675  }
9676 
9677 private:
9678 
9679  //
9680  // Private Methods
9681  //
9682 
9689 
9691  {
9692  message<_Target_type> * _Msg = _MessageBuffer._Peek();
9693 
9694  // If someone has reserved the _Head message, don't propagate anymore
9695  if (this->_M_pReservedFor != NULL)
9696  {
9697  return;
9698  }
9699 
9700  while (_Msg != NULL)
9701  {
9702  message_status _Status = declined;
9703 
9704  // Always start from the first target that linked
9705  for (target_iterator _Iter = this->_M_connectedTargets.begin(); *_Iter != NULL; ++_Iter)
9706  {
9707  ITarget<_Target_type> * _PTarget = *_Iter;
9708  _Status = _PTarget->propagate(_Msg, this);
9709 
9710  // Ownership of message changed. Do not propagate this
9711  // message to any other target.
9712  if (_Status == accepted)
9713  {
9714  break;
9715  }
9716 
9717  // If the target just propagated to reserved this message, stop
9718  // propagating it to others
9719  if (this->_M_pReservedFor != NULL)
9720  {
9721  break;
9722  }
9723  }
9724 
9725  // If status is anything other than accepted, then the head message
9726  // was not propagated out. Thus, nothing after it in the queue can
9727  // be propagated out. Cease propagation.
9728  if (_Status != accepted)
9729  {
9730  break;
9731  }
9732 
9733  // Get the next message
9734  _Msg = _MessageBuffer._Peek();
9735  }
9736  }
9737 
9744 
9746  {
9747  bool fIsNonGreedy = (_Jtype == non_greedy);
9748 
9749  // If this is a non-greedy join, check each source and try to consume their message
9750  if (fIsNonGreedy)
9751  {
9752 
9753  // The iterator _Iter below will ensure that it is safe to touch
9754  // non-NULL source pointers. Take a snapshot.
9755  std::vector<ISource<_Type> *> _Sources;
9756  source_iterator _Iter = this->_M_connectedSources.begin();
9757 
9758  while (*_Iter != NULL)
9759  {
9760  ISource<_Type> * _PSource = *_Iter;
9761 
9762  if (_PSource == NULL)
9763  {
9764  break;
9765  }
9766 
9767  _Sources.push_back(_PSource);
9768  ++_Iter;
9769  }
9770 
9771  if (_Sources.size() != _M_messageArray._M_count)
9772  {
9773  // Some of the sources were unlinked. The join is broken
9774  return NULL;
9775  }
9776 
9777  // First, try and reserve all the messages. If a reservation fails,
9778  // then release any reservations that had been made.
9779  for (size_t i = 0; i < _M_savedMessageIdArray._M_count; i++)
9780  {
9781  // Snap the current saved ID into a buffer. This value can be changing behind the scenes from
9782  // other source->propagate(msg, this) calls, but if so, that just means the reserve below will
9783  // fail.
9785  _M_savedIdBuffer[i] = _InterlockedExchange((volatile long *) &_M_savedMessageIdArray._M_savedIds[i], -1);
9786 
9788 
9789  if (!_Sources[i]->reserve(_M_savedIdBuffer[i], this))
9790  {
9791  // A reservation failed, release all reservations made up until
9792  // this block, and wait for another message to arrive on this link
9793  for (size_t j = 0; j < i; j++)
9794  {
9795  _Sources[j]->release(_M_savedIdBuffer[j], this);
9796  if (_InterlockedCompareExchange((volatile long *) &_M_savedMessageIdArray._M_savedIds[j], _M_savedIdBuffer[j], -1) == -1)
9797  {
9799  {
9800  this->async_send(NULL);
9801  }
9802  }
9803  }
9804 
9805  // Return NULL to indicate that the create failed
9806  return NULL;
9807  }
9808  }
9809 
9810  // Because everything has been reserved, consume all the messages.
9811  // This is guaranteed to return true.
9812  for (size_t i = 0; i < _M_messageArray._M_count; i++)
9813  {
9814  _M_messageArray._M_messages[i] = _Sources[i]->consume(_M_savedIdBuffer[i], this);
9815  _M_savedIdBuffer[i] = -1;
9816  }
9817  }
9818 
9819  if (!fIsNonGreedy)
9820  {
9821  // Reinitialize how many messages are being waited for.
9822  // This is safe because all messages have been received, thus no new async_sends for
9823  // greedy joins can be called.
9825  }
9826 
9827  std::vector<_Type> _OutputVector;
9828  for (size_t i = 0; i < _M_messageArray._M_count; i++)
9829  {
9831  _OutputVector.push_back(_M_messageArray._M_messages[i]->payload);
9832 
9833  delete _M_messageArray._M_messages[i];
9834  if (fIsNonGreedy)
9835  {
9837  }
9838  }
9839  return (new message<std::vector<_Type>>(_OutputVector));
9840  }
9841 
9857 
9858  void _Initialize(size_t _NumInputs, Scheduler * _PScheduler = NULL, ScheduleGroup * _PScheduleGroup = NULL)
9859  {
9860  this->initialize_source_and_target(_PScheduler, _PScheduleGroup);
9861 
9862  this->_M_connectedSources.set_bound(_NumInputs);
9863  _M_messagesRemaining = _NumInputs;
9864 
9865  bool fIsNonGreedy = (_Jtype == non_greedy);
9866 
9867  if (fIsNonGreedy)
9868  {
9869  // Non greedy joins need a buffer to snap off saved message IDs to.
9870  _M_savedIdBuffer = new runtime_object_identity[_NumInputs];
9871  memset(_M_savedIdBuffer, -1, sizeof(runtime_object_identity) * _NumInputs);
9872  }
9873  else
9874  {
9876  }
9877  }
9878 
9883 
9885  {
9886  // Input messages for this message block are in the base-class input buffer
9887  // All messages in that buffer are guaranteed to have moved to the output
9888  // buffer because the destructor first waits for all async sends to finish
9889  // before reaching this point
9890 
9891  // Delete any messages remaining in the output queue
9892  for (;;)
9893  {
9894  message<std::vector<_Type>> * _Msg = _M_messageBuffer._Dequeue();
9895  if (_Msg == NULL)
9896  {
9897  break;
9898  }
9899  delete _Msg;
9900  }
9901  }
9902 
9903  // The current number of messages remaining
9904  volatile size_t _M_messagesRemaining;
9905 
9906  // An array containing the accepted messages of this join.
9907  // Wrapped in a struct to enable debugger visualization.
9909  {
9910  size_t _M_count;
9912 
9913  _MessageArray(size_t _NumInputs)
9914  : _M_count(_NumInputs),
9915  _M_messages(new message<_Type>*[_NumInputs])
9916  {
9917  memset(_M_messages, 0, sizeof(message<_Type> *) * _NumInputs);
9918  }
9919 
9921  {
9922  for (size_t i = 0; i < _M_count; i++)
9923  delete _M_messages[i];
9924  delete [] _M_messages;
9925  }
9926  };
9928 
9929  // An array containing the msg IDs of messages propagated to the array
9930  // For greedy joins, this contains a log of other messages passed to this
9931  // join after the first has been accepted
9932  // For non-greedy joins, this contains the message ID of any message
9933  // passed to it.
9934  // Wrapped in a struct to enable debugger visualization.
9936  {
9937  size_t _M_count;
9938  runtime_object_identity * _M_savedIds;
9939 
9940  _SavedMessageIdArray(size_t _NumInputs)
9941  : _M_count(_NumInputs),
9942  _M_savedIds(new runtime_object_identity[_NumInputs])
9943  {
9944  memset(_M_savedIds, -1, sizeof(runtime_object_identity) * _NumInputs);
9945  }
9946 
9948  {
9949  delete [] _M_savedIds;
9950  }
9951  };
9953 
9954  // Buffer for snapping saved IDs in non-greedy joins
9955  runtime_object_identity * _M_savedIdBuffer;
9956 
9957  // A lock for modifying the buffer or the connected blocks
9959 
9960  // Queue to hold output messages
9962 };
9963 
9964 
9965 //**************************************************************************
9966 // Multi-Type Choice and Join helper node:
9967 //**************************************************************************
9968 
9978 
9979 template<class _Type>
9980 class _Order_node_base: public propagator_block<single_link_registry<ITarget<size_t>>, multi_link_registry<ISource<_Type>>>
9981 {
9982 public:
9987 
9989  _M_index(0),
9992  {
9993  }
9994 
9998 
10000  {
10001  // The messages for an _Order_node_base are deleted at the end when
10002  // _Order_node_base is deleted.
10003  delete _M_pReceiveMessage;
10004  delete _M_pSendMessage;
10005  }
10006 
10013 
10014  bool has_value() const
10015  {
10016  return (_M_pReceiveMessage != NULL);
10017  }
10018 
10025 
10026  _Type const & value()
10027  {
10029 
10030  return _M_pReceiveMessage->payload;
10031  }
10032 
10041 
10042  virtual void _Reset() = 0;
10043 
10056 
10057  virtual bool reserve_message(runtime_object_identity)
10058  {
10059  // reserve should never be called for this block.
10060  _CONCRT_ASSERT(false);
10061 
10062  return false;
10063  }
10064 
10078 
10079  virtual message<size_t> * consume_message(runtime_object_identity)
10080  {
10081  // consume should never be called for this block.
10082  _CONCRT_ASSERT(false);
10083 
10084  return NULL;
10085  }
10086 
10093 
10094  virtual void release_message(runtime_object_identity)
10095  {
10096  // release should never be called for this block.
10097  _CONCRT_ASSERT(false);
10098  }
10099 
10100 protected:
10101 
10102 
10106 
10107  virtual void resume_propagation()
10108  {
10109  // Because there is only a single target, nothing needs
10110  // to be done on resume
10111  }
10112 
10119 
10121  {
10122  if (_M_pSendMessage != NULL)
10123  {
10125  }
10126  }
10127 
10131 
10133  {
10135  }
10136 
10140 
10141  void _Initialize_order_node(ISource<_Type> * _PSource, size_t _Index, ITarget<size_t> * _PTarget, Scheduler * _PScheduler = NULL, ScheduleGroup * _PScheduleGroup = NULL)
10142  {
10143  if (_PSource == NULL)
10144  {
10145  throw std::invalid_argument("_PSource");
10146  }
10147 
10148  _M_index = _Index;
10149 
10150  this->initialize_source_and_target(_PScheduler, _PScheduleGroup);
10151 
10152  // Allow only a single source and ensure that they
10153  // cannot be unlinked and relinked.
10154  this->_M_connectedSources.set_bound(1);
10155 
10156  if (_PTarget != NULL)
10157  {
10158  this->link_target(_PTarget);
10159  }
10160 
10161  _PSource->link_target(this);
10162  }
10163 
10164  //
10165  // Private Data Members
10166  //
10167 
10168  // The message to be received from the source
10170 
10171  // The message to be sent to all targets
10173 
10174  // The index of the _Order_node_base in the user's construct
10175  size_t _M_index;
10176 
10177 private:
10178  //
10179  // Hide assignment operator and copy constructor
10180  //
10181  _Order_node_base const & operator=(_Order_node_base const &); // no assignment operator
10182  _Order_node_base(_Order_node_base const &); // no copy constructor
10183 };
10184 
10185 
10194 
10195 template<class _Type>
10196 class _Reserving_node: public _Order_node_base<_Type>
10197 {
10198 private:
10201 
10202 public:
10207 
10221 
10224  _M_savedId(-1),
10226  {
10227  _Initialize_order_node(_PSource, _Index, _PTarget);
10228  }
10229 
10246 
10249  _M_savedId(-1),
10251  {
10252  register_filter(_Filter);
10253  _Initialize_order_node(_PSource, _Index, _PTarget);
10254  }
10255 
10272 
10273  _Reserving_node(Scheduler& _PScheduler, ISource<_Type> * _PSource, size_t _Index, ITarget<size_t> * _PTarget = NULL) :
10275  _M_savedId(-1),
10277  {
10278  _Initialize_order_node(_PSource, _Index, _PTarget, &_PScheduler);
10279  }
10280 
10300 
10301  _Reserving_node(Scheduler& _PScheduler, ISource<_Type> * _PSource, size_t _Index, ITarget<size_t> * _PTarget, filter_method const& _Filter) :
10303  _M_savedId(-1),
10305  {
10306  register_filter(_Filter);
10307  _Initialize_order_node(_PSource, _Index, _PTarget, &_PScheduler);
10308  }
10309 
10326 
10327  _Reserving_node(ScheduleGroup& _PScheduleGroup, ISource<_Type> * _PSource, size_t _Index, ITarget<size_t> * _PTarget = NULL) :
10329  _M_savedId(-1),
10331  {
10332  _Initialize_order_node(_PSource, _Index, _PTarget, NULL, &_PScheduleGroup);
10333  }
10334 
10354 
10355  _Reserving_node(ScheduleGroup& _PScheduleGroup, ISource<_Type> * _PSource, size_t _Index, ITarget<size_t> * _PTarget, filter_method const& _Filter) :
10357  _M_savedId(-1),
10359  {
10360  register_filter(_Filter);
10361  _Initialize_order_node(_PSource, _Index, _PTarget, NULL, &_PScheduleGroup);
10362  }
10363 
10367 
10369  {
10370  if (_M_pReservedSource != NULL)
10371  {
10373  this->_M_connectedSources.release();
10374  }
10375 
10376  // Remove all links
10377  this->remove_network_links();
10378  }
10379 
10380 
10387 
10388  virtual void _Reset()
10389  {
10390  }
10391 
10392 protected:
10393 
10394  //
10395  // propagator_block protected function implementation
10396  //
10397 
10417 
10419  {
10421 
10422  // _Order_node messaging block can be initialized only once, just like single_assignment.
10423  if (_M_fIsInitialized)
10424  {
10425  return declined;
10426  }
10427 
10428  // Reserve a message on the source until this _Order_node gets the feedback from
10429  // the single_assignment on whether it has been selected.
10430  _M_fIsInitialized = _PSource->reserve(_PMessage->msg_id(), this);
10431 
10432  //
10433  // If message was successfully reserved, set the member variables for
10434  // this messaging block and start the asynchronous propagation task.
10435  //
10436  if (_M_fIsInitialized)
10437  {
10438  _M_savedId = _PMessage->msg_id();
10439  this->async_send(NULL);
10440  }
10441  else
10442  {
10443  _Result = missed;
10444  }
10445 
10446  return _Result;
10447  }
10448 
10458 
10459  virtual message<size_t> * accept_message(runtime_object_identity _MsgId)
10460  {
10461  // This check is to prevent spoofing and verify that the propagated message is
10462  // the one that is accepted at the end.
10463  if (this->_M_pSendMessage == NULL || _MsgId != this->_M_pSendMessage->msg_id())
10464  {
10465  return NULL;
10466  }
10467 
10468  // If the source has disconnected then we can't allow for accept to succeed.
10469  source_iterator _Iter = this->_M_connectedSources.begin();
10470  ISource<_Type>* _PSource = *_Iter;
10471 
10472  if (_PSource == NULL)
10473  {
10474  // source was disconnected. Fail accept.
10475  return NULL;
10476  }
10477 
10478  this->_M_pReceiveMessage = _PSource->consume(_M_savedId, this);
10479 
10481 
10482  //
10483  // Instead of returning the internal message, we return a copy of the
10484  // message passed in.
10485  //
10486  // Because we are returning a copy, the accept routine for a _Order_node
10487  // does not need to grab the internal lock.
10488  //
10489  return (new message<size_t>(this->_M_pSendMessage->payload));
10490  }
10491 
10501 
10503  {
10504  if (this->_M_pSendMessage == NULL)
10505  {
10506  this->_Create_send_message();
10507  }
10508 
10509  for (target_iterator _Iter = this->_M_connectedTargets.begin(); *_Iter != NULL; ++_Iter)
10510  {
10511  ITarget<size_t> * _PTarget = *_Iter;
10512  _Propagate_to_target(_PTarget);
10513  }
10514  }
10515 
10516 private:
10517 
10521 
10523  {
10524  message_status _Status = _PTarget->propagate(this->_M_pSendMessage, this);
10525 
10526  // If the message got rejected we have to release the hold on the source message.
10527  if (_Status != accepted)
10528  {
10529  if (_M_savedId != -1)
10530  {
10531  // Release the reservation
10532  source_iterator _Iter = this->_M_connectedSources.begin();
10533  ISource<_Type> * _PSource = *_Iter;
10534 
10535  if (_PSource != NULL)
10536  {
10537  _PSource->release(_M_savedId, this);
10538  }
10539 
10540  // If the source was disconnected, then it would
10541  // automatically release any reservation. So we
10542  // should reset our savedId regardless.
10543  _M_savedId = -1;
10544  }
10545 
10546  }
10547 
10548  return _Status;
10549  }
10550 
10551  //
10552  // Private Data Members
10553  //
10554 
10555  // The source where we have reserved a message
10557 
10558  // For greedy order-nodes, the message ID of subsequent messages sent to this node
10559  // For non-greedy order nodes, the message ID of the message to reserve/consume
10560  runtime_object_identity _M_savedId;
10561 
10562  // The marker that indicates that _Reserving_node has reserved a message
10563  volatile bool _M_fIsInitialized;
10564 
10565 private:
10566  //
10567  // Hide assignment operator and copy constructor
10568  //
10569  _Reserving_node const & operator=(_Reserving_node const &); // no assignment operator
10570  _Reserving_node(_Reserving_node const &); // no copy constructor
10571 };
10572 
10573 
10582 
10583 template<class _Type>
10584 class _Greedy_node: public _Order_node_base<_Type>
10585 {
10586 private:
10589 
10590 public:
10595 
10609 
10611  _M_savedId(-1),
10613  {
10614  _Initialize_order_node(_PSource, _Index, _PTarget);
10615  }
10616 
10633 
10634  _Greedy_node(ISource<_Type> * _PSource, size_t _Index, ITarget<size_t> * _PTarget, filter_method const& _Filter) :
10635  _M_savedId(-1),
10637  {
10638  register_filter(_Filter);
10639  _Initialize_order_node(_PSource, _Index, _PTarget);
10640  }
10641 
10658 
10659  _Greedy_node(Scheduler& _PScheduler, ISource<_Type> * _PSource, size_t _Index, ITarget<size_t> * _PTarget = NULL) :
10660  _M_savedId(-1),
10662  {
10663  _Initialize_order_node(_PSource, _Index, _PTarget, &_PScheduler);
10664  }
10665 
10685 
10686  _Greedy_node(Scheduler& _PScheduler, ISource<_Type> * _PSource, size_t _Index, ITarget<size_t> * _PTarget, filter_method const& _Filter) :
10687  _M_savedId(-1),
10689  {
10690  register_filter(_Filter);
10691  _Initialize_order_node(_PSource, _Index, _PTarget, &_PScheduler);
10692  }
10693 
10710 
10711  _Greedy_node(ScheduleGroup& _PScheduleGroup, ISource<_Type> * _PSource, size_t _Index, ITarget<size_t> * _PTarget = NULL) :
10712  _M_savedId(-1),
10714  {
10715  _Initialize_order_node(_PSource, _Index, _PTarget, NULL, &_PScheduleGroup);
10716  }
10717 
10737 
10738  _Greedy_node(ScheduleGroup& _PScheduleGroup, ISource<_Type> * _PSource, size_t _Index, ITarget<size_t> * _PTarget, filter_method const& _Filter) :
10739  _M_savedId(-1),
10741  {
10742  register_filter(_Filter);
10743  _Initialize_order_node(_PSource, _Index, _PTarget, NULL, &_PScheduleGroup);
10744  }
10745 
10749 
10751  {
10752  // Remove all links
10753  this->remove_network_links();
10754 
10756  {
10757  delete _M_pGreedyMessage;
10758  }
10759  }
10760 
10768 
10769  void _Reset()
10770  {
10771  _R_lock _Lock(_M_resetLock);
10772 
10773  delete this->_M_pReceiveMessage;
10774  this->_M_pReceiveMessage = NULL;
10775 
10776  delete this->_M_pSendMessage;
10777  this->_M_pSendMessage = NULL;
10778 
10779  //
10780  // For greedy type joins, look to see if any other messages have been
10781  // passed to this _Greedy_node while the join was waiting for other
10782  // messages to arrive. This function is already called with _M_resetLock
10783  // held through propagate_to_any_targets().
10784  //
10785  for(;;)
10786  {
10787  // Set the current saved ID as -1. Check to see if something was ready for consumption
10788  // (if _Saved_id != -1) and consume it if possible.
10789  runtime_object_identity _Saved_id;
10790 
10791  {
10792  _NR_lock lockHolder(_M_propagationLock);
10793 
10794  _Saved_id = _M_savedId;
10795 
10796  if (_Saved_id == -1)
10797  {
10799  break;
10800  }
10801  else
10802  {
10803  _M_savedId = -1;
10804  }
10805  }
10806 
10807  if (_Saved_id != -1)
10808  {
10809  source_iterator _Iter = this->_M_connectedSources.begin();
10810 
10811  ISource<_Type> * _PSource = *_Iter;
10812  if ((_PSource != NULL) && _PSource->reserve(_Saved_id, this))
10813  {
10814  _M_pGreedyMessage = _PSource->consume(_Saved_id, this);
10815  this->async_send(NULL);
10816  break;
10817  }
10818  }
10819  }
10820  }
10821 
10822 protected:
10823 
10824  //
10825  // propagator_block protected function implementation
10826  //
10827 
10847 
10849  {
10851 
10852  bool _FDone = false;
10853 
10854  {
10855  _NR_lock lockHolder(_M_propagationLock);
10856  if (_M_pGreedyMessage != NULL)
10857  {
10858  _M_savedId = _PMessage->msg_id();
10859  _Result = postponed;
10860  _FDone = true;
10861  }
10862  }
10863 
10864  if (!_FDone)
10865  {
10866  _M_pGreedyMessage = _PSource->accept(_PMessage->msg_id(), this);
10867 
10868  if (_M_pGreedyMessage != NULL)
10869  {
10870  _Result = accepted;
10871  this->async_send(NULL);
10872  }
10873  else
10874  {
10875  _Result = missed;
10876  }
10877  }
10878 
10879  return _Result;
10880  }
10881 
10891 
10892  virtual message<size_t> * accept_message(runtime_object_identity _MsgId)
10893  {
10894  // This check is to prevent spoofing and verify that the propagated message is
10895  // the one that is accepted at the end.
10896  if (this->_M_pSendMessage == NULL || _MsgId != this->_M_pSendMessage->msg_id())
10897  {
10898  return NULL;
10899  }
10900 
10901  //
10902  // Instead of returning the internal message, we return a copy of the
10903  // message passed in.
10904  //
10905  // Because we are returning a copy, the accept routine for a _Greedy_node
10906  // does not need to grab the internal lock.
10907  //
10908  return (new message<size_t>(this->_M_pSendMessage->payload));
10909  }
10910 
10911 
10921 
10923  {
10924  _R_lock _Lock(_M_resetLock);
10925 
10926  if (this->_M_pSendMessage == NULL)
10927  {
10928  // Save the incoming message so that it can be consumed in the accept function
10930  this->_Create_send_message();
10931  }
10932 
10933  for (target_iterator _Iter = this->_M_connectedTargets.begin(); *_Iter != NULL; ++_Iter)
10934  {
10935  ITarget<size_t> * _PTarget = *_Iter;
10936  _PTarget->propagate(this->_M_pSendMessage, this);
10937  }
10938  }
10939 
10940 private:
10941 
10942  //
10943  // Private Data Members
10944  //
10945 
10946  // The message to be saved by a greedy order node
10948 
10949  // The lock used to protect propagation
10951 
10952  // The lock used to protect modification during a reset
10954 
10955  // For greedy order-nodes, the message ID of subsequent messages sent to this node
10956  // For non-greedy order nodes, the message ID of the message to reserve/consume
10957  runtime_object_identity _M_savedId;
10958 
10959 private:
10960  //
10961  // Hide assignment operator and copy constructor
10962  //
10963  _Greedy_node const & operator=(_Greedy_node const &); // no assignment operator
10964  _Greedy_node(_Greedy_node const &); // no copy constructor
10965 };
10966 
10967 
10976 
10977 template<class _Type>
10979 {
10980 private:
10983 
10984 public:
10989 
11003 
11005  _M_savedId(-1),
11006  _M_reservedId(-1),
11008  {
11009  _Initialize_order_node(_PSource, _Index, _PTarget);
11010  }
11011 
11028 
11030  _M_savedId(-1),
11031  _M_reservedId(-1),
11033  {
11034  register_filter(_Filter);
11035  _Initialize_order_node(_PSource, _Index, _PTarget);
11036  }
11037 
11054 
11055  _Non_greedy_node(Scheduler& _PScheduler, ISource<_Type> * _PSource, size_t _Index, ITarget<size_t> * _PTarget = NULL) :
11056  _M_savedId(-1),
11057  _M_reservedId(-1),
11059  {
11060  _Initialize_order_node(_PSource, _Index, _PTarget, &_PScheduler);
11061  }
11062 
11082 
11083  _Non_greedy_node(Scheduler& _PScheduler, ISource<_Type> * _PSource, size_t _Index, ITarget<size_t> * _PTarget, filter_method const& _Filter) :
11084  _M_savedId(-1),
11085  _M_reservedId(-1),
11087  {
11088  register_filter(_Filter);
11089  _Initialize_order_node(_PSource, _Index, _PTarget, &_PScheduler);
11090  }
11091 
11108 
11109  _Non_greedy_node(ScheduleGroup& _PScheduleGroup, ISource<_Type> * _PSource, size_t _Index, ITarget<size_t> * _PTarget = NULL) :
11110  _M_savedId(-1),
11111  _M_reservedId(-1),
11113  {
11114  _Initialize_order_node(_PSource, _Index, _PTarget, NULL, &_PScheduleGroup);
11115  }
11116 
11136 
11137  _Non_greedy_node(ScheduleGroup& _PScheduleGroup, ISource<_Type> * _PSource, size_t _Index, ITarget<size_t> * _PTarget, filter_method const& _Filter) :
11138  _M_savedId(-1),
11139  _M_reservedId(-1),
11141  {
11142  register_filter(_Filter);
11143  _Initialize_order_node(_PSource, _Index, _PTarget, NULL, &_PScheduleGroup);
11144  }
11145 
11149 
11151  {
11152  if (_M_pReservedSource != NULL)
11153  {
11155  this->_M_connectedSources.release();
11156  }
11157 
11158  // Remove all links
11159  this->remove_network_links();
11160  }
11161 
11169 
11170  void _Reset()
11171  {
11172  _R_lock _Lock(_M_resetLock);
11173 
11174  delete this->_M_pReceiveMessage;
11175  this->_M_pReceiveMessage = NULL;
11176 
11177  delete this->_M_pSendMessage;
11178  this->_M_pSendMessage = NULL;
11179  }
11180 
11188 
11190  {
11191  bool _Ret_val = false;
11192 
11193  // Order node has only a single source.
11194  // Obtain an iterator to the first source. It will guarantee that the reference
11195  // count on the source is maintained
11196  source_iterator _Iter = this->_M_connectedSources.begin();
11197  ISource<_Type> * _PSource = *_Iter;
11198 
11199  if (_PSource != NULL)
11200  {
11201  // CAS out the current saved ID, in order to try and reserve it
11202  runtime_object_identity _SavedId = _InterlockedExchange((volatile long *) &_M_savedId, -1);
11203 
11204  _Ret_val = _PSource->reserve(_SavedId, this);
11205  //
11206  // If this reserved failed, that means we need to wait for another message
11207  // to come in on this link. _M_savedID was set to -1 to indicate to the _Order_node
11208  // that it needs to async_send when that next message comes through
11209  //
11210  // If the reserve succeeds, save away the reserved ID. This will be use later in
11211  // consume
11212  //
11213  if (_Ret_val)
11214  {
11215  _M_reservedId = _SavedId;
11216 
11217  // Acquire a reference on the source
11220  }
11221  }
11222 
11223  return _Ret_val;
11224  }
11225 
11230 
11232  {
11233  if (_M_pReservedSource != NULL)
11234  {
11235  runtime_object_identity _SavedId = _M_reservedId;
11236  this->_M_pReceiveMessage = _M_pReservedSource->consume(_SavedId, this);
11237 
11238  runtime_object_identity _OldId = NULL;
11239  _OldId = _InterlockedExchange((volatile long *) &_M_reservedId, -1);
11240 
11241  _CONCRT_ASSERT(_OldId == _SavedId);
11242 
11243  // Release the reference on the source
11245  this->_M_connectedSources.release();
11246  }
11247  }
11248 
11252 
11254  {
11255  bool retVal = false;
11256 
11257  if (_M_pReservedSource != NULL)
11258  {
11259  runtime_object_identity _SavedId = _M_reservedId;
11260  // If the _M_savedId is still -1, then swap the succeeded one back
11261  _M_pReservedSource->release(_SavedId, this);
11262 
11263  if (_InterlockedCompareExchange((volatile long *) &_M_savedId, _SavedId, -1) == -1)
11264  {
11265  retVal = true;
11266  }
11267 
11268  // Release the reference on the source
11270  this->_M_connectedSources.release();
11271  }
11272 
11273  return retVal;
11274  }
11275 
11276 protected:
11277 
11278  //
11279  // propagator_block protected function implementation
11280  //
11281 
11301 
11303  {
11304  // Change the message ID. If it was -1, that means an async-send needs to occur
11305  if (_InterlockedExchange((volatile long *) &_M_savedId, _PMessage->msg_id()) == -1)
11306  {
11307  this->async_send(NULL);
11308  }
11309 
11310  // Always return postponed. This message will be consumed
11311  // in the LWT
11312 
11313  return postponed;
11314  }
11315 
11325 
11326  virtual message<size_t> * accept_message(runtime_object_identity _MsgId)
11327  {
11328  // This check is to prevent spoofing and verify that the propagated message is
11329  // the one that is accepted at the end.
11330  if (this->_M_pSendMessage == NULL || _MsgId != this->_M_pSendMessage->msg_id())
11331  {
11332  return NULL;
11333  }
11334 
11335  //
11336  // Instead of returning the internal message, we return a copy of the
11337  // message passed in.
11338  //
11339  // Because we are returning a copy, the accept routine for a _Non_greedy_node
11340  // does not need to grab the internal lock.
11341  //
11342  return (new message<size_t>(this->_M_pSendMessage->payload));
11343  }
11344 
11354 
11356  {
11357  _R_lock _Lock(_M_resetLock);
11358 
11359  if (this->_M_pSendMessage == NULL)
11360  {
11361  this->_Create_send_message();
11362  }
11363 
11364  for (target_iterator _Iter = this->_M_connectedTargets.begin(); *_Iter != NULL; ++_Iter)
11365  {
11366  ITarget<size_t> * _PTarget = *_Iter;
11367  _PTarget->propagate(this->_M_pSendMessage, this);
11368  }
11369  }
11370 
11371 private:
11372 
11373  //
11374  // Private Data Members
11375  //
11376 
11377  // The source where we have reserved a message
11379 
11380  // The lock used to protect modification during a reset
11382 
11383  // For non-greedy order nodes, the message ID of the message to reserve/consume
11384  runtime_object_identity _M_savedId;
11385 
11386  // For non-greedy order nodes, the reserved ID of the message that was reserved
11387  runtime_object_identity _M_reservedId;
11388 
11389  // The marker that indicates that _Non_greedy_node has reserved a message
11390  volatile bool _M_fIsInitialized;
11391 
11392 private:
11393  //
11394  // Hide assignment operator and copy constructor
11395  //
11396  _Non_greedy_node const & operator=(_Non_greedy_node const &); // no assignment operator
11397  _Non_greedy_node(_Non_greedy_node const &); // no copy constructor
11398 };
11399 
11400 //**************************************************************************
11401 // Choice:
11402 //**************************************************************************
11403 
11420 
11421 template<class _Type>
11422 class choice: public ISource<size_t>
11423 {
11424 public:
11425 
11445 
11447  {
11449  _Initialize_choices<0>();
11450  }
11451 
11452 #ifdef _CRT_USE_WINAPI_FAMILY_DESKTOP_APP
11453 
11476  choice(Scheduler& _PScheduler, _Type _Tuple) : _M_sourceTuple(_Tuple), _M_pScheduler(&_PScheduler), _M_pScheduleGroup(NULL)
11477  {
11479  _Initialize_choices<0>();
11480  }
11481 
11505 
11506  choice(ScheduleGroup& _PScheduleGroup, _Type _Tuple) : _M_sourceTuple(_Tuple), _M_pScheduler(NULL), _M_pScheduleGroup(&_PScheduleGroup)
11507  {
11508  _M_pSingleAssignment = new single_assignment<size_t>(_PScheduleGroup);
11509  _Initialize_choices<0>();
11510  }
11511 #endif /* _CRT_USE_WINAPI_FAMILY_DESKTOP_APP */
11512 
11533 
11534  choice(choice && _Choice)
11535  {
11536  // Copy scheduler group or scheduler to the new object.
11537  _M_pScheduleGroup = _Choice._M_pScheduleGroup;
11538  _M_pScheduler = _Choice._M_pScheduler;
11539 
11540  // Single assignment is heap allocated, so simply copy the pointer. If it already has
11541  // a value, it will be preserved.
11542  _M_pSingleAssignment = _Choice._M_pSingleAssignment;
11543  _Choice._M_pSingleAssignment = NULL;
11544 
11545  // Invoke copy assignment for tuple to copy pointers to message blocks.
11546  _M_sourceTuple = _Choice._M_sourceTuple;
11547 
11548  // Copy the pointers to order nodes to a new object and zero out in the old object.
11549  memcpy(_M_pSourceChoices, _Choice._M_pSourceChoices, sizeof(_M_pSourceChoices));
11550  memset(_Choice._M_pSourceChoices, 0, sizeof(_M_pSourceChoices));
11551  }
11552 
11556 
11558  {
11559  delete _M_pSingleAssignment;
11560  _Delete_choices<0>();
11561  }
11562 
11566 
11567  typedef _Type type;
11568 
11575 
11576  bool has_value() const
11577  {
11578  return _M_pSingleAssignment->has_value();
11579  }
11580 
11591 
11592  size_t index()
11593  {
11594  return _M_pSingleAssignment->value();
11595  }
11596 
11611 
11612  template <typename _Payload_type>
11613  _Payload_type const & value()
11614  {
11616  }
11617 
11618  //
11619  // ISource public function implementations
11620  //
11621 
11628 
11629  virtual void link_target(_Inout_ ITarget<size_t> * _PTarget)
11630  {
11631  _M_pSingleAssignment->link_target(_PTarget);
11632  }
11633 
11640 
11641  virtual void unlink_target(_Inout_ ITarget<size_t> * _PTarget)
11642  {
11644  }
11645 
11653 
11654  virtual void unlink_targets()
11655  {
11657  }
11658 
11671 
11672  virtual message<size_t> * accept(runtime_object_identity _MsgId, _Inout_ ITarget<size_t> * _PTarget)
11673  {
11674  return _M_pSingleAssignment->accept(_MsgId, _PTarget);
11675  }
11676 
11695 
11696  virtual bool reserve(runtime_object_identity _MsgId, _Inout_ ITarget<size_t> * _PTarget)
11697  {
11698  return _M_pSingleAssignment->reserve(_MsgId, _PTarget);
11699  }
11700 
11718 
11719  virtual message<size_t> * consume(runtime_object_identity _MsgId, _Inout_ ITarget<size_t> * _PTarget)
11720  {
11721  return _M_pSingleAssignment->consume(_MsgId, _PTarget);
11722  }
11723 
11733 
11734  virtual void release(runtime_object_identity _MsgId, _Inout_ ITarget<size_t> * _PTarget)
11735  {
11736  _M_pSingleAssignment->release(_MsgId, _PTarget);
11737  }
11738 
11749 
11750  virtual void acquire_ref(_Inout_ ITarget<size_t> * _PTarget)
11751  {
11752  _M_pSingleAssignment->acquire_ref(_PTarget);
11753  }
11754 
11765 
11766  virtual void release_ref(_Inout_ ITarget<size_t> * _PTarget)
11767  {
11768  _M_pSingleAssignment->release_ref(_PTarget);
11769  }
11770 
11771 private:
11772  template<int _Index>
11774 
11779 
11780  template<int _Index>
11782  {
11783  std::tuple_element_t<_Index, _Type> _Item = std::get<_Index>(_M_sourceTuple);
11784  _Reserving_node_source_type<_Index> * _Order_node_element = NULL;
11785 
11786  if (_M_pScheduleGroup != NULL)
11787  {
11788  _Order_node_element = new _Reserving_node_source_type<_Index>(*_M_pScheduleGroup, _Item, _Index);
11789  }
11790  else if (_M_pScheduler != NULL)
11791  {
11792  _Order_node_element = new _Reserving_node_source_type<_Index>(*_M_pScheduler, _Item, _Index);
11793  }
11794  else
11795  {
11796  _Order_node_element = new _Reserving_node_source_type<_Index>(_Item, _Index);
11797  }
11798 
11799  _M_pSourceChoices[_Index] = _Order_node_element;
11800  _Order_node_element->link_target(_M_pSingleAssignment);
11801  _Initialize_choices<_Index + 1>();
11802  }
11803 
11808 
11809  template<> void _Initialize_choices<std::tuple_size<_Type>::value>()
11810  {
11811  }
11812 
11817 
11818  template<int _Index>
11820  {
11823  _Delete_choices<_Index + 1>();
11824  }
11825 
11830 
11831  template<> void _Delete_choices<std::tuple_size<_Type>::value>()
11832  {
11833  }
11834 
11835  // Array of pointers to _Reserving_node elements representing each source
11836  void * _M_pSourceChoices[std::tuple_size<_Type>::value];
11837 
11838  // Single assignment which chooses between source messaging blocks
11840 
11841  // Tuple of messaging blocks that are sources to this choice
11843 
11844  // The scheduler to propagate messages on
11845  Scheduler * _M_pScheduler;
11846 
11847  // The schedule group to propagate messages on
11848  ScheduleGroup * _M_pScheduleGroup;
11849 
11850 private:
11851  //
11852  // Hide assignment operator
11853  //
11854  choice const &operator =(choice const &); // no assignment operator
11855  choice(choice const &); // no copy constructor
11856 };
11857 
11858 // Templated factory functions that create a choice, three flavors
11859 
11860 #ifdef _CRT_USE_WINAPI_FAMILY_DESKTOP_APP
11861 
11892 template<typename _Type1, typename _Type2, typename... _Types>
11893 choice<std::tuple<_Type1, _Type2, _Types...>>
11894 make_choice(Scheduler& _PScheduler, _Type1 _Item1, _Type2 _Item2, _Types... _Items)
11895 {
11896  return choice<std::tuple<_Type1, _Type2, _Types...>>(_PScheduler, std::make_tuple(_Item1, _Item2, _Items...));
11897 }
11898 
11930 
11931 template<typename _Type1, typename _Type2, typename... _Types>
11932 choice<std::tuple<_Type1, _Type2, _Types...>>
11933 make_choice(ScheduleGroup& _PScheduleGroup, _Type1 _Item1, _Type2 _Item2, _Types... _Items)
11934 {
11935  return choice<std::tuple<_Type1, _Type2, _Types...>>(_PScheduleGroup, std::make_tuple(_Item1, _Item2, _Items...));
11936 }
11937 
11938 #endif /* _CRT_USE_WINAPI_FAMILY_DESKTOP_APP */
11939 
11966 
11967 template<typename _Type1, typename _Type2, typename... _Types>
11968 choice<std::tuple<_Type1, _Type2, _Types...>>
11969 make_choice(_Type1 _Item1, _Type2 _Item2, _Types... _Items)
11970 {
11971  return choice<std::tuple<_Type1, _Type2, _Types...>>(std::make_tuple(_Item1, _Item2, _Items...));
11972 }
11973 
11974 //**************************************************************************
11975 // Join:
11976 //**************************************************************************
11977 
11978 // Template specialization used to unwrap the types from within a tuple.
11979 
11980 
11981 template <typename _Tuple> struct _Unwrap;
11982 
11989 
11990 template <typename... _Types>
11991 struct _Unwrap<std::tuple<_Types...>>
11992 {
11993  typedef std::tuple<typename std::remove_pointer_t<_Types>::source_type...> type;
11994 };
11995 
12006 
12007 template<typename _Type, typename _Destination_type, join_type _Jtype>
12008 class _Join_node: public propagator_block<single_link_registry<ITarget<_Destination_type>>, multi_link_registry<ISource<size_t>>>
12009 {
12010 private:
12013 
12014 public:
12018 
12023 
12024  _Join_node() : _M_counter(std::tuple_size<_Destination_type>::value)
12025  {
12027  }
12028 
12036 
12037  _Join_node(Scheduler& _PScheduler) : _M_counter(std::tuple_size<_Destination_type>::value)
12038  {
12039  this->initialize_source_and_target(&_PScheduler);
12040  }
12041 
12049 
12050  _Join_node(ScheduleGroup& _PScheduleGroup) : _M_counter(std::tuple_size<_Destination_type>::value)
12051  {
12052  this->initialize_source_and_target(NULL, &_PScheduleGroup);
12053  }
12054 
12058 
12060  {
12061  // Remove all links
12062  this->remove_network_links();
12063 
12064  // Clean up any messages left in this message block
12066  }
12067 
12068 protected:
12069 
12084 
12086  {
12087  // This join block is connected to the _Order_node sources, which know not to send
12088  // any more messages until join propagates them further. That is why join can
12089  // always accept the incoming messages.
12090 
12091  _PMessage = _PSource->accept(_PMessage->msg_id(), this);
12092 
12093  //
12094  // Source block created an int message only to notify join that the real
12095  // payload is available. There is no need to keep this message around.
12096  //
12097  _CONCRT_ASSERT(_PMessage != NULL);
12098  delete _PMessage;
12099 
12100  long _Ret_val = _InterlockedDecrement(&_M_counter);
12101 
12102  _CONCRT_ASSERT(_Ret_val >= 0);
12103 
12104  if (_Ret_val == 0)
12105  {
12106  //
12107  // All source messages are now received so join can propagate them further
12108  //
12109  this->async_send(NULL);
12110  }
12111 
12112  return accepted;
12113  }
12114 
12124 
12125  virtual message<_Destination_type> * accept_message(runtime_object_identity _MsgId)
12126  {
12127  //
12128  // Peek at the head message in the message buffer. If the IDs match
12129  // dequeue and transfer ownership
12130  //
12132 
12133  if (_M_messageBuffer._Is_head(_MsgId))
12134  {
12135  _Msg = _M_messageBuffer._Dequeue();
12136  }
12137 
12138  return _Msg;
12139  }
12140 
12154 
12155  virtual bool reserve_message(runtime_object_identity _MsgId)
12156  {
12157  // Allow reservation if this is the head message
12158  return _M_messageBuffer._Is_head(_MsgId);
12159  }
12160 
12174 
12175  virtual message<_Destination_type> * consume_message(runtime_object_identity _MsgId)
12176  {
12177  // By default, accept the message
12178  return accept_message(_MsgId);
12179  }
12180 
12187 
12188  virtual void release_message(runtime_object_identity _MsgId)
12189  {
12190  // The head message is the one reserved.
12191  if (!_M_messageBuffer._Is_head(_MsgId))
12192  {
12193  throw message_not_found();
12194  }
12195  }
12196 
12200 
12201  virtual void resume_propagation()
12202  {
12203  // If there are any messages in the buffer, propagate them out
12204  if (_M_messageBuffer._Count() > 0)
12205  {
12206  this->async_send(NULL);
12207  }
12208  }
12209 
12216 
12218  {
12219  // There is only a single target.
12221  }
12222 
12232 
12234  {
12236 
12237  if (_M_counter == 0)
12238  {
12239  bool fIsNonGreedy = (_Jtype == non_greedy);
12240 
12241  if (fIsNonGreedy)
12242  {
12244  {
12245  return;
12246  }
12247  }
12248 
12249  if (!fIsNonGreedy)
12250  {
12251  // Because a greedy join has captured all input, we can reset
12252  // the counter to the total number of inputs
12253  _InterlockedExchange(&_M_counter, std::tuple_size<_Destination_type>::value);
12254  }
12255 
12256  _Msg = _Create_send_message();
12257  }
12258 
12259  if (_Msg != NULL)
12260  {
12261  _M_messageBuffer._Enqueue(_Msg);
12262 
12263  if (!_M_messageBuffer._Is_head(_Msg->msg_id()))
12264  {
12265  // another message is at the head of the outbound message queue and blocked
12266  // simply return
12267  return;
12268  }
12269  }
12270 
12272  }
12273 
12274 private:
12275 
12285 
12286  template<int _Index>
12287  bool _Try_consume_source_messages(_Destination_type & _Destination_tuple, ISource<size_t> ** _Sources)
12288  {
12290  _Non_greedy_node_source_type * _Node = static_cast<_Non_greedy_node_source_type *>(_Sources[_Index]);
12291 
12292  // Increment the counter once for each reservation
12294 
12295  if (_Node->_Reserve_received_message())
12296  {
12297  bool _Ret_val = _Try_consume_source_messages<_Index + 1>(_Destination_tuple, _Sources);
12298 
12299  if (_Ret_val)
12300  {
12301  _Node->_Consume_received_message();
12302  }
12303  else
12304  {
12305  if (_Node->_Release_received_message())
12306  {
12307  // If _Release_received_message() restored the ID, decrement the count for that
12308  // restoration
12309  if (_InterlockedDecrement(&_M_counter) == 0)
12310  {
12311  this->async_send(NULL);
12312  }
12313  }
12314  }
12315 
12316  return _Ret_val;
12317  }
12318 
12319  return false;
12320  }
12321 
12329 
12330  template<> bool _Try_consume_source_messages<std::tuple_size<_Type>::value>(_Destination_type &, ISource<size_t> **)
12331  {
12332  return true;
12333  }
12334 
12343 
12345  {
12346  _Destination_type _Destination_tuple;
12347 
12348  // Populate the sources buffer
12349  ISource<size_t> * _Sources[std::tuple_size<_Type>::value];
12350  size_t _Index = 0;
12351 
12352  // Get an iterator which will keep a reference on the connected sources
12353  source_iterator _Iter = this->_M_connectedSources.begin();
12354 
12355  while (*_Iter != NULL)
12356  {
12357  ISource<size_t> * _PSource = *_Iter;
12358 
12359  if (_PSource == NULL)
12360  {
12361  // One of the sources disconnected
12362  break;
12363  }
12364 
12365  if (_Index >= std::tuple_size<_Type>::value)
12366  {
12367  // More sources that we expect
12368  break;
12369  }
12370 
12371  _Sources[_Index] = _PSource;
12372  _Index++;
12373  ++_Iter;
12374  }
12375 
12376  // The order nodes should not have unlinked while the join node is
12377  // active.
12378 
12379  if (_Index != std::tuple_size<_Type>::value)
12380  {
12381  // On debug build assert to help debugging
12382  _CONCRT_ASSERT(_Index == std::tuple_size<_Type>::value);
12383  return false;
12384  }
12385 
12386  bool _IsAcquireSuccessful = _Try_consume_source_messages<0>(_Destination_tuple, _Sources);
12387 
12388  return _IsAcquireSuccessful;
12389  }
12390 
12397 
12399  {
12400  message<_Target_type> * _Msg = _MessageBuffer._Peek();
12401 
12402  // If someone has reserved the _Head message, don't propagate anymore
12403  if (this->_M_pReservedFor != NULL)
12404  {
12405  return;
12406  }
12407 
12408  while (_Msg != NULL)
12409  {
12410  message_status _Status = declined;
12411 
12412  // Always start from the first target that linked
12413  for (target_iterator _Iter = this->_M_connectedTargets.begin(); *_Iter != NULL; ++_Iter)
12414  {
12415  ITarget<_Target_type> * _PTarget = *_Iter;
12416  _Status = _PTarget->propagate(_Msg, this);
12417 
12418  // Ownership of message changed. Do not propagate this
12419  // message to any other target.
12420  if (_Status == accepted)
12421  {
12422  break;
12423  }
12424 
12425  // If the target just propagated to reserved this message, stop
12426  // propagating it to others
12427  if (this->_M_pReservedFor != NULL)
12428  {
12429  break;
12430  }
12431  }
12432 
12433  // If status is anything other than accepted, then the head message
12434  // was not propagated out. Thus, nothing after it in the queue can
12435  // be propagated out. Cease propagation.
12436  if (_Status != accepted)
12437  {
12438  break;
12439  }
12440 
12441  // Get the next message
12442  _Msg = _MessageBuffer._Peek();
12443  }
12444  }
12445 
12450 
12452  {
12453  _Destination_type _Destination_tuple;
12454 
12455  // Populate the sources buffer
12456  ISource<size_t> * _Sources[std::tuple_size<_Type>::value];
12457  size_t _Index = 0;
12458 
12459  // Get an iterator which will keep a reference on the connected sources
12460  source_iterator _Iter = this->_M_connectedSources.begin();
12461 
12462  while (*_Iter != NULL)
12463  {
12464  ISource<size_t> * _PSource = *_Iter;
12465 
12466  if (_PSource == NULL)
12467  {
12468  // One of the sources disconnected
12469  break;
12470  }
12471 
12472  // Avoid buffer overrun
12473  if (_Index >= std::tuple_size<_Type>::value)
12474  {
12475  // More sources that we expect
12476  break;
12477  }
12478 
12479  _Sources[_Index] = *_Iter;
12480  _Index++;
12481  ++_Iter;
12482  }
12483 
12484  // The order nodes should not have unlinked while the join node is
12485  // active.
12486  if (_Index != std::tuple_size<_Type>::value)
12487  {
12488  // On debug build assert to help debugging
12489  _CONCRT_ASSERT(_Index == std::tuple_size<_Type>::value);
12490  return NULL;
12491  }
12492 
12493  _Populate_destination_tuple<0>(_Destination_tuple, _Sources);
12494 
12495  return new message<_Destination_type>(_Destination_tuple);
12496  }
12497 
12502 
12504  {
12505  // Delete any messages remaining in the output queue
12506  for (;;)
12507  {
12508  message<_Destination_type> * _Msg = _M_messageBuffer._Dequeue();
12509  if (_Msg == NULL)
12510  {
12511  break;
12512  }
12513  delete _Msg;
12514  }
12515  }
12516 
12520 
12521  template<int _Index>
12522  void _Populate_destination_tuple(_Destination_type & _Destination_tuple, ISource<size_t> ** _Sources)
12523  {
12525  _Order_node_base_source_type * _Node = static_cast<_Order_node_base_source_type *>(_Sources[_Index]);
12526 
12527  std::get<_Index>(_Destination_tuple) = _Node->value();
12528  _Node->_Reset();
12529 
12530  _Populate_destination_tuple<_Index + 1>(_Destination_tuple, _Sources);
12531  }
12532 
12537 
12538  template<> void _Populate_destination_tuple<std::tuple_size<_Type>::value>(_Destination_type &, ISource<size_t> **)
12539  {
12540  }
12541 
12542  // A tuple containing a collection of source messaging blocks
12544 
12545  // Counts messages received by sources of this node and is used to trigger propagation to targets
12546  // This value starts at the total number of inputs and counts down to zero. When it reaches zero,
12547  // a join of the inputs is started.
12548  volatile long _M_counter;
12549 
12550  // Buffer to hold outgoing messages
12552 
12553 private:
12554  //
12555  // Hide assignment operator and copy constructor
12556  //
12557  _Join_node(const _Join_node & _Join); // no copy constructor
12558  _Join_node const &operator =(_Join_node const &); // no assignment operator
12559 };
12560 
12581 
12582 template<typename _Type, join_type _Jtype = non_greedy>
12583 class multitype_join: public ISource<typename _Unwrap<_Type>::type>
12584 {
12585 public:
12586 
12588 
12608 
12610  {
12612  _Initialize_joins<0>();
12613  }
12614 
12615 #ifdef _CRT_USE_WINAPI_FAMILY_DESKTOP_APP
12616 
12639  multitype_join(Scheduler& _PScheduler, _Type _Tuple) : _M_sourceTuple(_Tuple), _M_pScheduler(&_PScheduler), _M_pScheduleGroup(NULL)
12640  {
12642  _Initialize_joins<0>();
12643  }
12644 
12668 
12669  multitype_join(ScheduleGroup& _PScheduleGroup, _Type _Tuple) : _M_sourceTuple(_Tuple), _M_pScheduler(NULL), _M_pScheduleGroup(&_PScheduleGroup)
12670  {
12671  _M_pJoinNode = new _Join_node<_Type, _Destination_type, _Jtype>(_PScheduleGroup);
12672  _Initialize_joins<0>();
12673  }
12674 #endif /* _CRT_USE_WINAPI_FAMILY_DESKTOP_APP */
12675 
12696 
12698  {
12699  // Copy scheduler group or scheduler to the new object.
12700  _M_pScheduleGroup = _Join._M_pScheduleGroup;
12701  _M_pScheduler = _Join._M_pScheduler;
12702 
12703  // Single assignment is heap allocated, so simply copy the pointer. If it already has
12704  // a value, it will be preserved.
12705  _M_pJoinNode = _Join._M_pJoinNode;
12706  _Join._M_pJoinNode = NULL;
12707 
12708  // Invoke copy assignment for tuple to copy pointers to message blocks.
12709  _M_sourceTuple = _Join._M_sourceTuple;
12710 
12711  // Copy the pointers to order nodes to a new object and zero out in the old object.
12712  memcpy(_M_pSourceJoins, _Join._M_pSourceJoins, sizeof(_M_pSourceJoins));
12713  memset(_Join._M_pSourceJoins, 0, sizeof(_M_pSourceJoins));
12714  }
12715 
12719 
12721  {
12722  delete _M_pJoinNode;
12723  _Delete_joins<0>();
12724  }
12725 
12729 
12730  typedef _Type type;
12731 
12732  //
12733  // ISource public function implementations
12734  //
12735 
12742 
12744  {
12745  _M_pJoinNode->link_target(_PTarget);
12746  }
12747 
12754 
12756  {
12757  _M_pJoinNode->unlink_target(_PTarget);
12758  }
12759 
12763 
12764  virtual void unlink_targets()
12765  {
12766  _M_pJoinNode->unlink_targets();
12767  }
12768 
12781 
12782  virtual message<_Destination_type> * accept(runtime_object_identity _MsgId, _Inout_ ITarget<_Destination_type> * _PTarget)
12783  {
12784  return _M_pJoinNode->accept(_MsgId, _PTarget);
12785  }
12786 
12805 
12806  virtual bool reserve(runtime_object_identity _MsgId, _Inout_ ITarget<_Destination_type> * _PTarget)
12807  {
12808  return _M_pJoinNode->reserve(_MsgId, _PTarget);
12809  }
12810 
12828 
12829  virtual message<_Destination_type> * consume(runtime_object_identity _MsgId, _Inout_ ITarget<_Destination_type> * _PTarget)
12830  {
12831  return _M_pJoinNode->consume(_MsgId, _PTarget);
12832  }
12833 
12843 
12844  virtual void release(runtime_object_identity _MsgId, _Inout_ ITarget<_Destination_type> * _PTarget)
12845  {
12846  _M_pJoinNode->release(_MsgId, _PTarget);
12847  }
12848 
12859 
12861  {
12862  _M_pJoinNode->acquire_ref(_PTarget);
12863  }
12864 
12875 
12877  {
12878  _M_pJoinNode->release_ref(_PTarget);
12879  }
12880 
12881 private:
12882  template<int _Index>
12883  using _Source_type = typename std::remove_pointer_t<std::tuple_element_t<_Index, _Type>>::source_type;
12884 
12885  template<int _Index>
12887 
12894 
12895  template<int _Index>
12897  {
12898  std::tuple_element_t<_Index, _Type> _Item = std::get<_Index>(_M_sourceTuple);
12899  _Order_node_base_source_type<_Index> * _Order_node_element = NULL;
12900 
12901  if (_Jtype == non_greedy)
12902  {
12903  typedef _Non_greedy_node<_Source_type<_Index>> _Non_greedy_node_source_type;
12904 
12905  if (_M_pScheduleGroup != NULL)
12906  {
12907  _Order_node_element = new _Non_greedy_node_source_type(*_M_pScheduleGroup, _Item, _Index);
12908  }
12909  else if (_M_pScheduler != NULL)
12910  {
12911  _Order_node_element = new _Non_greedy_node_source_type(*_M_pScheduler, _Item, _Index);
12912  }
12913  else
12914  {
12915  _Order_node_element = new _Non_greedy_node_source_type(_Item, _Index);
12916  }
12917  }
12918  else
12919  {
12920  typedef _Greedy_node<_Source_type<_Index>> _Greedy_node_source_type;
12921 
12922  if (_M_pScheduleGroup != NULL)
12923  {
12924  _Order_node_element = new _Greedy_node_source_type(*_M_pScheduleGroup, _Item, _Index);
12925  }
12926  else if (_M_pScheduler != NULL)
12927  {
12928  _Order_node_element = new _Greedy_node_source_type(*_M_pScheduler, _Item, _Index);
12929  }
12930  else
12931  {
12932  _Order_node_element = new _Greedy_node_source_type(_Item, _Index);
12933  }
12934  }
12935  _M_pSourceJoins[_Index] = _Order_node_element;
12936  _Order_node_element->link_target(_M_pJoinNode);
12937  _Initialize_joins<_Index + 1>();
12938  }
12939 
12944 
12945  template<> void _Initialize_joins<std::tuple_size<_Type>::value>()
12946  {
12947  }
12948 
12955 
12956  template<int _Index>
12958  {
12961  _Delete_joins<_Index + 1>();
12962  }
12963 
12968 
12969  template<> void _Delete_joins<std::tuple_size<_Type>::value>()
12970  {
12971  }
12972 
12973  // Array of pointers to _Order_node elements representing each source
12974  void * _M_pSourceJoins[std::tuple_size<_Type>::value];
12975 
12976  // Join node that collects source messaging block messages
12978 
12979  // Tuple of messaging blocks that are sources to this multitype_join
12981 
12982  // The scheduler to propagate messages on
12983  Scheduler * _M_pScheduler;
12984 
12985  // The schedule group to propagate messages on
12986  ScheduleGroup * _M_pScheduleGroup;
12987 
12988 private:
12989  //
12990  // Hide assignment operator
12991  //
12992  multitype_join const &operator =(multitype_join const &); // no assignment operator
12993  multitype_join(multitype_join const &); // no copy constructor
12994 };
12995 
12996 // Templated factory functions that create a join, three flavors
12997 
12998 #ifdef _CRT_USE_WINAPI_FAMILY_DESKTOP_APP
12999 
13030 template<typename _Type1, typename _Type2, typename... _Types>
13031 multitype_join<std::tuple<_Type1, _Type2, _Types...>>
13032 make_join(Scheduler& _PScheduler, _Type1 _Item1, _Type2 _Item2, _Types... _Items)
13033 {
13034  return multitype_join<std::tuple<_Type1, _Type2, _Types...>>(_PScheduler, std::make_tuple(_Item1, _Item2, _Items...));
13035 }
13036 
13068 
13069 template<typename _Type1, typename _Type2, typename... _Types>
13070 multitype_join<std::tuple<_Type1, _Type2, _Types...>>
13071 make_join(ScheduleGroup& _PScheduleGroup, _Type1 _Item1, _Type2 _Item2, _Types... _Items)
13072 {
13073  return multitype_join<std::tuple<_Type1, _Type2, _Types...>>(_PScheduleGroup, std::make_tuple(_Item1, _Item2, _Items...));
13074 }
13075 
13076 #endif /* _CRT_USE_WINAPI_FAMILY_DESKTOP_APP */
13077 
13104 
13105 template<typename _Type1, typename _Type2, typename... _Types>
13106 multitype_join<std::tuple<_Type1, _Type2, _Types...>>
13107 make_join(_Type1 _Item1, _Type2 _Item2, _Types... _Items)
13108 {
13109  return multitype_join<std::tuple<_Type1, _Type2, _Types...>>(std::make_tuple(_Item1, _Item2, _Items...));
13110 }
13111 
13112 // Templated factory functions that create a *greedy* join, three flavors
13113 #ifdef _CRT_USE_WINAPI_FAMILY_DESKTOP_APP
13114 
13146 template<typename _Type1, typename _Type2, typename... _Types>
13147 multitype_join<std::tuple<_Type1, _Type2, _Types...>, greedy>
13148 make_greedy_join(Scheduler& _PScheduler, _Type1 _Item1, _Type2 _Item2, _Types... _Items)
13149 {
13150  return multitype_join<std::tuple<_Type1, _Type2, _Types...>, greedy>(_PScheduler, std::make_tuple(_Item1, _Item2, _Items...));
13151 }
13152 
13184 
13185 template<typename _Type1, typename _Type2, typename... _Types>
13186 multitype_join<std::tuple<_Type1, _Type2, _Types...>, greedy>
13187 make_greedy_join(ScheduleGroup& _PScheduleGroup, _Type1 _Item1, _Type2 _Item2, _Types... _Items)
13188 {
13189  return multitype_join<std::tuple<_Type1, _Type2, _Types...>, greedy>(_PScheduleGroup, std::make_tuple(_Item1, _Item2, _Items...));
13190 }
13191 
13192 #endif /* _CRT_USE_WINAPI_FAMILY_DESKTOP_APP */
13193 
13220 
13221 template<typename _Type1, typename _Type2, typename... _Types>
13222 multitype_join<std::tuple<_Type1, _Type2, _Types...>, greedy>
13223 make_greedy_join(_Type1 _Item1, _Type2 _Item2, _Types... _Items)
13224 {
13225  return multitype_join<std::tuple<_Type1, _Type2, _Types...>, greedy>(std::make_tuple(_Item1, _Item2, _Items...));
13226 }
13227 
13228 //**************************************************************************
13229 // Agents:
13230 //**************************************************************************
13231 
13238 
13243 
13248 
13253 
13258 
13263 
13265 };
13266 
13274 
13275 class agent
13276 {
13277 public:
13287 
13288  _CONCRTIMP agent();
13289 
13290 #ifdef _CRT_USE_WINAPI_FAMILY_DESKTOP_APP
13291 
13304  _CONCRTIMP agent(Scheduler& _PScheduler);
13305 
13319 
13320  _CONCRTIMP agent(ScheduleGroup& _PGroup);
13321 #endif /* _CRT_USE_WINAPI_FAMILY_DESKTOP_APP */
13322 
13331 
13332  _CONCRTIMP virtual ~agent();
13333 
13340 
13342 
13351 
13353 
13361 
13362  _CONCRTIMP bool start();
13363 
13372 
13373  _CONCRTIMP bool cancel();
13374 
13397 
13398  _CONCRTIMP static agent_status __cdecl wait(_Inout_ agent * _PAgent, unsigned int _Timeout = COOPERATIVE_TIMEOUT_INFINITE);
13399 
13425 
13426  _CONCRTIMP static void __cdecl wait_for_all(size_t _Count, _In_reads_(_Count) agent ** _PAgents,
13427  _Out_writes_opt_(_Count) agent_status * _PStatus = NULL, unsigned int _Timeout = COOPERATIVE_TIMEOUT_INFINITE);
13428 
13456 
13457  _CONCRTIMP static void __cdecl wait_for_one(size_t _Count, _In_reads_(_Count) agent ** _PAgents, agent_status& _Status,
13458  size_t& _Index, unsigned int _Timeout = COOPERATIVE_TIMEOUT_INFINITE);
13459 
13460 protected:
13470 
13471  virtual void run() = 0;
13472 
13485 
13486  _CONCRTIMP bool done();
13487 
13491 
13493 
13494 private:
13495 
13496  // A flag to check of whether the agent can be started
13497  // This is initialized to TRUE and there is a race between Start() and Cancel() to set it
13498  // to FALSE. Once Started or Canceled, further calls to Start() or Cancel() will return false.
13499 
13500  volatile long _M_fStartable;
13501 
13502  // A flag to check of whether the agent can be canceled
13503  // This is initialized to TRUE and there is a race between Cancel() and the LWT executing
13504  // a task that has been started to set it to FALSE. If Cancel() wins, the task will not be
13505  // executed. If the LWT wins, Cancel() will return false.
13506 
13507  volatile long _M_fCancelable;
13508 
13509  // A static wrapper function that calls the Run() method. Used for scheduling of the task
13510 
13511  static void __cdecl _Agent_task_wrapper(void * data);
13512 
13513  Scheduler * _M_pScheduler;
13514  ScheduleGroup * _M_pScheduleGroup;
13515 
13516  //
13517  // Hide assignment operator and copy constructor
13518  //
13519  agent const &operator =(agent const&); // no assignment operator
13520  agent(agent const &); // no copy constructor
13521 };
13522 
13535 template <class _Type>
13536 void Trace_agents_register_name(_Inout_ _Type * _PObject, _In_z_ const wchar_t * _Name)
13537 {
13539 }
13540 
13541 } // namespace Concurrency
13542 
13543 namespace concurrency = ::Concurrency;
13544 
13545 #pragma warning(pop)
13546 #pragma pack(pop)
virtual message< size_t > * accept_message(runtime_object_identity _MsgId)
Accept the message by making a copy of the payload.
Definition: agents.h:11326
void decline_incoming_messages()
Indicates to the block that new messages should be declined.
Definition: agents.h:5857
_Message * _Peek()
Definition: agents.h:227
call(_Call_method const &_Func, filter_method const &_Filter)
Constructs a call messaging block.
Definition: agents.h:7169
virtual void unlink_source(_Inout_ ISource< _Source_type > *_PSource)
Unlinks a specified source block from this target_block object.
Definition: agents.h:4660
single_link_registry< ITarget< _Output > > _TargetLinkRegistry
Definition: agents.h:7485
volatile bool _M_fIsInitialized
Definition: agents.h:9102
multitype_join const & operator=(multitype_join const &)
volatile long _M_lwtCount
A counter to indicate the number of outstanding LWTs
Definition: agents.h:2416
void _Propagate_priority_order(::Concurrency::details::_Queue< message< _Target_type >> &_MessageBuffer)
Propagate messages in priority order.
Definition: agents.h:9690
void _Delete_choices()
Deletes all _Reserving_node elements that were created in _Initialize_choices.
Definition: agents.h:11819
virtual ~_AsyncOriginator()
Definition: agents.h:3981
single_assignment()
Constructs a single_assignment messaging block.
Definition: agents.h:8613
virtual bool supports_anonymous_source()
Overrides the supports_anonymous_source method to indicate that this block can accept messages offere...
Definition: agents.h:6201
void _Swap(_Myt &_Right)
Definition: agents.h:362
volatile long _M_referenceCount
Definition: agents.h:5549
_MessageProcessorType _M_messageProcessor
Processor used for asynchronous message handling
Definition: agents.h:5491
virtual message< _Target_type > * consume(runtime_object_identity _MsgId, _Inout_ ITarget< _Target_type > *_PTarget)
Consumes a message previously offered by this source_block object and successfully reserved by the ta...
Definition: agents.h:5108
void stop()
Stops the timer messaging block.
Definition: agents.h:8292
virtual void unlink_sources()
Unlinks all source blocks from this target_block object.
Definition: agents.h:4673
~single_assignment()
Destroys the single_assignment messaging block.
Definition: agents.h:8746
ISource< _Type > * _M_pReservedSource
Definition: agents.h:10556
::Concurrency::details::_Queue< message< _Output > > _M_messageBuffer
Message queue used to store outbound messages
Definition: agents.h:8085
~multitype_join()
Destroys the multitype_join messaging block.
Definition: agents.h:12720
virtual ~ISource()
Destroys the ISource object.
Definition: agents.h:2605
multitype_join(multitype_join &&_Join)
Constructs a multitype_join messaging block.
Definition: agents.h:12697
void _Invoke_handler(message< _Type > *_Msg)
Definition: agents.h:2350
_TargetLinkRegistry::iterator target_iterator
The iterator to walk the connected targets.
Definition: agents.h:4893
size_t _M_index
Definition: agents.h:422
_Reserving_node const & operator=(_Reserving_node const &)
virtual message_status propagate_message(_Inout_ message< _Source_type > *_PMessage, _Inout_ ISource< _Source_type > *_PSource)=0
When overridden in a derived class, this method asynchronously passes a message from an ISource block...
virtual message< size_t > * consume(runtime_object_identity _MsgId, _Inout_ ITarget< size_t > *_PTarget)
Consumes a message previously offered by this choice messaging block and successfully reserved by the...
Definition: agents.h:11719
virtual void propagate_to_any_targets(_Inout_opt_ message< _Output > *)
Executes the transformer function on the input messages.
Definition: agents.h:7954
volatile long _M_referenceCount
Definition: agents.h:3969
propagator_block()
Constructs a propagator_block object.
Definition: agents.h:5606
unbounded_buffer()
Constructs an unbounded_buffer messaging block.
Definition: agents.h:5942
static _CONCRTIMP void __cdecl _Yield()
This class describes an exception thrown when an invalid operation is performed that is not more accu...
Definition: pplinterface.h:132
void _Initialize(const _Type &_Value, _Inout_ ITarget< _Type > *_PTarget, bool _Repeating, _Inout_opt_ Scheduler *_PScheduler=NULL, _Inout_opt_ ScheduleGroup *_PScheduleGroup=NULL)
Common initialization.
Definition: agents.h:8512
virtual message< size_t > * accept(runtime_object_identity _MsgId, _Inout_ ITarget< size_t > *_PTarget)
Accepts a message that was offered by this choice block, transferring ownership to the caller...
Definition: agents.h:11672
reference operator[](size_t _Pos)
Definition: agents.h:336
Definition: agents.h:255
virtual message< _Type > * consume_message(runtime_object_identity _MsgId)
Consumes a message previously offered by the single_assignment and reserved by the target...
Definition: agents.h:8994
void _Initialize_order_node(ISource< _Type > *_PSource, size_t _Index, ITarget< size_t > *_PTarget, Scheduler *_PScheduler=NULL, ScheduleGroup *_PScheduleGroup=NULL)
Validate constructor arguments and fully connect this _Order_node_base.
Definition: agents.h:10141
void _Propagate_message()
Definition: agents.h:5520
ScheduleGroup * _M_pScheduleGroup
Definition: agents.h:11848
_Non_greedy_node(Scheduler &_PScheduler, ISource< _Type > *_PSource, size_t _Index, ITarget< size_t > *_PTarget, filter_method const &_Filter)
Constructs a _Non_greedy_node within the specified scheduler, and places it on any schedule group of ...
Definition: agents.h:11083
ISource< _Type > * _M_pReservedSource
Definition: agents.h:11378
_Message ** _M_ppTail
Definition: agents.h:113
constexpr tuple< typename _Unrefwrap< _Types >::type...> make_tuple(_Types &&..._Args)
Definition: tuple:890
_Reserving_node(Scheduler &_PScheduler, ISource< _Type > *_PSource, size_t _Index, ITarget< size_t > *_PTarget=NULL)
Constructs a _Reserving_node within the specified scheduler, and places it on any schedule group of t...
Definition: agents.h:10273
choice(choice &&_Choice)
Constructs a choice messaging block.
Definition: agents.h:11534
~_MessageArray()
Definition: agents.h:9920
_MessageArray _M_messageArray
Definition: agents.h:9927
virtual void unlink_target(_Inout_ ITarget< _Type > *_PTarget)=0
When overridden in a derived class, unlinks a target block from this ISource block, if found to be previously linked.
basic_ostream< _Elem, _Traits > & _Out(basic_ostream< _Elem, _Traits > &_Os, _Ty _Dx)
Definition: random:174
virtual bool reserve_message(runtime_object_identity _MsgId)
Reserves a message previously offered by this join messaging block.
Definition: agents.h:9502
virtual void _Reset()=0
Resets the _Order_node_base and prepares it for the next propagation
_Type _M_sourceTuple
Definition: agents.h:12543
Scheduler * _M_pScheduler
Definition: agents.h:11845
virtual void release_message(runtime_object_identity _MsgId)
Releases a previous message reservation.
Definition: agents.h:6947
multi_link_registry< ISource< _Type > > _SourceLinkRegistry
Definition: agents.h:5922
_Out_writes_opt_(_MaxCount)
void _Process_message(message< _Target_type > *_PMessage)
Definition: agents.h:5511
agent const & operator=(agent const &)
void _Consume_received_message()
Called for a non_greedy type join block in order to consume the message in this join block that has b...
Definition: agents.h:11231
_CONCRTIMP agent_status status()
A synchronous source of status information from the agent.
virtual message_status send_message(_Inout_ message< _Type > *_PMessage, _Inout_ ISource< _Type > *_PSource)
Synchronously passes a message from an ISource block to this single_assignment messaging block...
Definition: agents.h:8871
bool _Is_head(runtime_object_identity _MsgId)
Definition: agents.h:233
bool _Non_greedy_acquire_messages()
Tries to acquire all of the messages from the _Non_greedy_nodes. Each node has already indicated that...
Definition: agents.h:12344
virtual message< _Target_type > * consume_message(runtime_object_identity _MsgId)=0
When overridden in a derived class, consumes a message that was previously reserved.
void _Wait_for_completion()
Definition: agents.h:3927
virtual bool reserve_message(runtime_object_identity _MsgId)
Reserves a message previously offered by this overwrite_buffer messaging block.
Definition: agents.h:6879
event _M_ev
Definition: agents.h:3958
::Concurrency::runtime_object_identity _M_id
Definition: agents.h:101
static bool _send(ITarget< _Type > *_Trg, const _Type &_Data)
Definition: agents.h:4273
~_Non_greedy_node()
Cleans up any resources that may have been created by the _Order_node.
Definition: agents.h:11150
State _M_state
Definition: agents.h:8467
source_link_manager< _SourceLinkRegistry > _SourceLinkManager
The type of the source_link_manager this target_block object.
Definition: agents.h:4459
An event type that represents the linking of message blocks
Definition: concrt.h:5679
Definition: concrt.h:376
Non-greedy join messaging blocks postpone messages and try and consume them after all have arrived...
Definition: agents.h:9132
_Propagator_method _M_propagator
A message propagating object which exposes the callback to be invoked
Definition: agents.h:2434
virtual void propagate_to_any_targets(_Inout_opt_ message< size_t > *)
Takes the message and propagates it to all the targets of this _Order_node
Definition: agents.h:11355
join(size_t _NumInputs)
Constructs a join messaging block.
Definition: agents.h:9184
An event type that represents the unlinking of message blocks
Definition: concrt.h:5685
virtual void release(runtime_object_identity _MsgId, _Inout_ ITarget< _Destination_type > *_PTarget)
Releases a previous successful message reservation.
Definition: agents.h:12844
virtual void link_source(_Inout_ ISource< _Source_type > *_PSource)
Links a specified source block to this propagator_block object.
Definition: agents.h:5764
static _CONCRTIMP void __cdecl _ScheduleTask(TaskProc _Proc, void *_Data)
virtual bool reserve(runtime_object_identity _MsgId, _Inout_ ITarget< _Type > *_PTarget)=0
When overridden in a derived class, reserves a message previously offered by this ISource block...
virtual void sync_send(_Inout_opt_ message< _Target_type > *_Msg)
Synchronously queues up messages and starts a propagation task, if this has not been done already...
Definition: agents.h:5421
_SourceLinkRegistry::type::source_type _Source_type
The type of the payload for the incoming message to this propagator_block.
Definition: agents.h:5585
The basic message envelope containing the data payload being passed between messaging blocks...
Definition: agents.h:1776
Implements busy wait with no backoff
Definition: concrt.h:578
#define _CONCRT_ASSERT(x)
Definition: concrt.h:123
virtual message< _Type > * accept_message(runtime_object_identity _MsgId)
Accepts a message that was offered by this overwrite_buffer messaging block, returning a copy of the ...
Definition: agents.h:6839
multitype_join< std::tuple< _Type1, _Type2, _Types...>, greedy > make_greedy_join(_Type1 _Item1, _Type2 _Item2, _Types..._Items)
Constructs a greedy multitype_join messaging block from an optional Scheduler or ScheduleGroup and tw...
Definition: agents.h:13223
virtual void unlink_target(_Inout_ ITarget< size_t > *_PTarget)
Unlinks a target block from this choice messaging block.
Definition: agents.h:11641
void async_send(_Inout_opt_ message< _Source_type > *_PMessage)
Asynchronously sends a message for processing.
Definition: agents.h:4775
runtime_object_identity _M_savedId
Definition: agents.h:10560
size_t _M_count
Definition: agents.h:116
_CONCRTIMP unsigned int _Release()
virtual ~target_block()
Destroys the target_block object.
Definition: agents.h:4484
void _Delete_joins()
Deletes all _Order_node elements that were created in _Initialize_joins.
Definition: agents.h:12957
virtual bool supports_anonymous_source()
Overrides the supports_anonymous_source method to indicate that this block can accept messages offere...
Definition: agents.h:7829
virtual message< _Type > * accept_message(runtime_object_identity _MsgId)
Accepts a message that was offered by this single_assignment messaging block, returning a copy of the...
Definition: agents.h:8932
multi_link_registry< ISource< _Type > > _SourceLinkRegistry
Definition: agents.h:7121
~_Reserving_node()
Cleans up any resources that may have been created by the _Reserving_node.
Definition: agents.h:10368
Definition: agents.h:3687
void _Clear_queued_messages()
Definition: agents.h:2234
virtual void release_ref(_Inout_ ITarget< _Target_type > *_PTarget)
Releases a reference count on this source_block object.
Definition: agents.h:5208
virtual void process_incoming_message()=0
When overridden in a derived class, performs the forward processing of messages into the block...
bool asend(_Inout_ ITarget< _Type > *_Trg, const _Type &_Data)
An asynchronous send operation, which schedules a task to propagate the data to the target block...
Definition: agents.h:4394
The ITarget class is the interface for all target blocks. Target blocks consume messages offered to t...
Definition: agents.h:453
virtual message< _Output > * accept_message(runtime_object_identity _MsgId)
Accepts a message that was offered by this transformer messaging block, transferring ownership to the...
Definition: agents.h:7845
void _Handle_message(message< _Target_type > *_PMessage)
Private methods.
Definition: agents.h:5501
::Concurrency::details::_Queue< message< _Destination_type > > _M_messageBuffer
Definition: agents.h:12551
The agent has been started, but not entered its run method.
Definition: agents.h:13249
virtual bool supports_anonymous_source()
Overrides the supports_anonymous_source method to indicate that this block can accept messages offere...
Definition: agents.h:6818
This class describes an exception thrown when a messaging block is given a pointer to a target which ...
Definition: concrt.h:1520
void(__cdecl * TaskProc)(void *)
Concurrency::details contains definitions of support routines in the public namespaces and one or mor...
Definition: concrt.h:251
_FS_DLL int __CLRCALL_PURE_OR_CDECL _Link(const wchar_t *, const wchar_t *)
virtual message_status propagate_message(_Inout_ message< _Type > *_PMessage, _Inout_ ISource< _Type > *_PSource)
Asynchronously passes a message from an ISource block to this single_assignment messaging block...
Definition: agents.h:8807
void decline_incoming_messages()
Indicates to the block that new messages should be declined.
Definition: agents.h:4720
virtual message< _Type > * consume_message(runtime_object_identity _MsgId)
Consumes a message previously offered by the timer and reserved by the target, transferring ownership...
Definition: agents.h:8394
_Type type
A type alias for _Type .
Definition: agents.h:1937
bool _internal_send(ITarget< _Type > *_PTarget, _Type const &_Value)
Definition: agents.h:3868
concurrent_queue< message< _Type > * > _M_queuedMessages
A queue of the messages
Definition: agents.h:2378
virtual void unlink_target(ITarget< _Type > *)
Definition: agents.h:3548
An event type that represents the creation of an object
Definition: concrt.h:5649
virtual void resume_propagation()=0
When overridden in a derived class, resumes propagation after a reservation has been released...
_Type source_type
A type alias for _Type .
Definition: agents.h:2737
__int64 _Trace_agents_get_id(_Type *_PObject)
Definition: agents.h:435
_Type type
A type alias for _Type .
Definition: agents.h:1902
virtual bool supports_anonymous_source()
When overridden in a derived class, returns true or false depending on whether the message block acce...
Definition: agents.h:2530
~join()
Destroys the join block.
Definition: agents.h:9338
virtual void link_target(_Inout_ ITarget< _Destination_type > *_PTarget)
Links a target block to this multitype_join messaging block.
Definition: agents.h:12743
void _Delete_stored_messages()
Deletes all messages currently stored in this message block. Should be called by the destructor to en...
Definition: agents.h:8547
_Join_node const & operator=(_Join_node const &)
std::function< void(void)> _Propagator_method
The signature of the callback method invoked while propagating messages.
Definition: agents.h:2022
_Greedy_node const & operator=(_Greedy_node const &)
static _CONCRTIMP agent_status __cdecl wait(_Inout_ agent *_PAgent, unsigned int _Timeout=COOPERATIVE_TIMEOUT_INFINITE)
Waits for an agent to complete its task.
bool _M_fForceRepropagation
A bool to signal to the processor to force a repropagation to occur
Definition: agents.h:6497
Scheduler * _M_pScheduler
Definition: agents.h:13513
_TargetLinkRegistry _M_connectedTargets
Connected targets
Definition: agents.h:5485
~_Order_node_base()
Cleans up any resources that may have been created by the _Order_node.
Definition: agents.h:9999
void _Initialize_choices()
Constructs and initializes a _Reserving_node for each tuple messaging block passed in...
Definition: agents.h:11781
runtime_object_identity _M_savedId
Definition: agents.h:11384
The source_block class is an abstract base class for source-only blocks. The class provides basic lin...
Definition: agents.h:4879
volatile long _M_stopProcessing
A flag set in the destructor of a block to cease processing of new messages. This is required to guar...
Definition: agents.h:2410
virtual message< _Target_type > * accept(runtime_object_identity _MsgId, _Inout_ ITarget< _Target_type > *_PTarget)
Accepts a message that was offered by this source_block object, transferring ownership to the caller...
Definition: agents.h:5016
_TargetLinkRegistry::type::type _Target_type
The payload type of messages handled by this source_block.
Definition: agents.h:4887
_Dynamic_array()
Definition: agents.h:267
void _Done(message_status _Status)
Definition: agents.h:3945
virtual bool reserve(runtime_object_identity _MsgId, ITarget< _Type > *_PTarget)
Definition: agents.h:3791
single_link_registry< ITarget< _Type > > _Target_registry
Definition: agents.h:3534
single_assignment(filter_method const &_Filter)
Constructs a single_assignment messaging block.
Definition: agents.h:8635
void _Push_back(_Type const &_Element)
Definition: agents.h:319
STL namespace.
message(_Type const &_P, runtime_object_identity _Id)
Constructs a message object.
Definition: agents.h:1810
_Type * _M_array
Definition: agents.h:419
virtual void release_message(runtime_object_identity _MsgId)=0
When overridden in a derived class, releases a previous message reservation.
The target did not accept the message.
Definition: agents.h:1751
message< _Type > * _M_pMessage
Definition: agents.h:7076
message(_Type const &_P)
Constructs a message object.
Definition: agents.h:1793
_Non_greedy_node(Scheduler &_PScheduler, ISource< _Type > *_PSource, size_t _Index, ITarget< size_t > *_PTarget=NULL)
Constructs a _Non_greedy_node within the specified scheduler, and places it on any schedule group of ...
Definition: agents.h:11055
Definition: agents.h:106
choice(_Type _Tuple)
Constructs a choice messaging block.
Definition: agents.h:11446
overwrite_buffer< agent_status > _M_status
Holds the current status of the agent.
Definition: agents.h:13492
virtual void link_target(ITarget< _Type > *)
Definition: agents.h:3673
virtual message< _Destination_type > * consume(runtime_object_identity _MsgId, _Inout_ ITarget< _Destination_type > *_PTarget)
Consumes a message previously offered by the multitype_join messaging block and successfully reserved...
Definition: agents.h:12829
virtual void unlink_source(_Inout_ ISource< _Source_type > *_PSource)
Unlinks a specified source block from this propagator_block object.
Definition: agents.h:5779
The target postponed the message.
Definition: agents.h:1756
The Concurrency namespace provides classes and functions that provide access to the Concurrency Runti...
Definition: agents.h:43
virtual message< _OutputType > * accept_message(runtime_object_identity _MsgId)
Accepts a message that was offered by this join messaging block, transferring ownership to the caller...
Definition: agents.h:9472
virtual void release(runtime_object_identity _MsgId, _Inout_ ITarget< size_t > *_PTarget)
Releases a previous successful message reservation.
Definition: agents.h:11734
void _Reset()
Resets the _Greedy_node and prepares it for the next propagation
Definition: agents.h:10769
bool _Reserve_received_message()
Called for a non_greedy type join block in order to reserve the message in this join block ...
Definition: agents.h:11189
Helper class used in multi-type greedy join blocks Ordered node is a single-target, single-source ordered propagator block
Definition: agents.h:10584
virtual void link_target(_Inout_ ITarget< _Type > *_PTarget)=0
When overridden in a derived class, links a target block to this ISource block.
_FunctorType _Call_method
The function type that this block executes upon receiving a message.
Definition: agents.h:7119
virtual ~source_block()
Destroys the source_block object.
Definition: agents.h:4913
_CONCRTIMP void _Start()
_CRT_BEGIN_C_HEADER _Check_return_ _Ret_maybenull_ _In_ size_t _Size
Definition: corecrt_malloc.h:58
::Concurrency::details::_ReentrantPPLLock _M_resetLock
Definition: agents.h:10953
virtual bool supports_anonymous_source()
Overrides the supports_anonymous_source method to indicate that this block can accept messages offere...
Definition: agents.h:7413
_SourceLinkManager::iterator source_iterator
The type of the iterator for the source_link_manager for this target_block object.
Definition: agents.h:4465
volatile long _M_counter
Definition: agents.h:12548
Helper class used in multi-type choice blocks Ordered node is a single-target, single-source ordered ...
Definition: agents.h:10196
join_type
The type of a join messaging block.
Definition: agents.h:9120
virtual void release(runtime_object_identity _MsgId, _Inout_ ITarget< _Type > *_PTarget)=0
When overridden in a derived class, releases a previous successful message reservation.
Scheduler * _M_pScheduler
Definition: agents.h:12983
virtual void release_ref(_Inout_ ITarget< size_t > *_PTarget)
Releases a reference count on this choice messaging block.
Definition: agents.h:11766
_Type _M_sourceTuple
Definition: agents.h:11842
void propagate_to_any_targets(_Inout_opt_ message< _OutputType > *)
Constructs an output message containing an input message from each source when they have all propagat...
Definition: agents.h:9582
Definition: agents.h:11981
virtual void link_source(_Inout_ ISource< _Source_type > *_PSource)
Links a specified source block to this target_block object.
Definition: agents.h:4642
The timer has been initialized, but not yet started.
Definition: agents.h:8129
virtual void acquire_ref(_Inout_ ITarget< _Destination_type > *_PTarget)
Acquires a reference count on this multitype_join messaging block, to prevent deletion.
Definition: agents.h:12860
An ordered_message_processor is a message_processor that allows message blocks to process messages in...
Definition: agents.h:2009
_Reserving_node(ScheduleGroup &_PScheduleGroup, ISource< _Type > *_PSource, size_t _Index, ITarget< size_t > *_PTarget, filter_method const &_Filter)
Constructs a _Order_node within the specified schedule group. The scheduler is implied by the schedul...
Definition: agents.h:10355
_Payload_type const & value()
Gets the message whose index was selected by the choice messaging block.
Definition: agents.h:11613
multitype_join< std::tuple< _Type1, _Type2, _Types...> > make_join(_Type1 _Item1, _Type2 _Item2, _Types..._Items)
Constructs a non_greedy multitype_join messaging block from an optional Scheduler or ScheduleGroup an...
Definition: agents.h:13107
void enable_batched_processing()
Enables batched processing for this block.
Definition: agents.h:4757
_Type _M_sourceTuple
Definition: agents.h:12980
virtual void unlink_source(_Inout_ ISource< _Type > *_PSource)=0
When overridden in a derived class, unlinks a specified source block from this ITarget block...
__int32 runtime_object_identity
Each message instance has an identity that follows it as it is cloned and passed between messaging co...
Definition: agents.h:51
Helper class used in multi-type non-greedy join blocks Ordered node is a single-target, single-source ordered propagator block
Definition: agents.h:10978
_In_ size_t _In_ int _Index
Definition: time.h:102
void _Invoke_link_source(ITarget< _Type > *_PLinkFrom)
Links this source to a target.
Definition: agents.h:2754
multi_link_registry< ISource< _Type > > _SourceLinkRegistry
Definition: agents.h:10200
void pause()
Stops the timer messaging block. If it is a repeating timer messaging block, it can be restarted with...
Definition: agents.h:8305
virtual void process_input_messages(_Inout_ message< _Source_type > *)
Processes messages that are received as inputs.
Definition: agents.h:4826
size_t _Size() const
Definition: agents.h:354
virtual void release(runtime_object_identity _MsgId, _Inout_ ITarget< _Target_type > *_PTarget)
Releases a previous successful message reservation.
Definition: agents.h:5157
virtual message< _Type > * accept(runtime_object_identity _MsgId, ITarget< _Type > *_PTarget)
Definition: agents.h:3561
~_Join_node()
Cleans up any resources that may have been created by the join.
Definition: agents.h:12059
_Greedy_node(Scheduler &_PScheduler, ISource< _Type > *_PSource, size_t _Index, ITarget< size_t > *_PTarget, filter_method const &_Filter)
Constructs a _Greedy_node within the specified scheduler, and places it on any schedule group of the ...
Definition: agents.h:10686
An event type that represents the name for an object
Definition: concrt.h:5691
message< _Type > * _M_pReceiveMessage
Definition: agents.h:10169
virtual void release_ref(_Inout_ ITarget< _Type > *)
When overridden in a derived class, releases a reference count on this ISource block.
Definition: agents.h:3609
volatile bool _M_fIsInitialized
Definition: agents.h:11390
A timer messaging block is a single-target source_block capable of sending a message to its target af...
Definition: agents.h:8111
virtual message< _Type > * accept(runtime_object_identity _MsgId, _Inout_ ITarget< _Type > *_PTarget)=0
When overridden in a derived class, accepts a message that was offered by this ISource block...
virtual message_status send_message(_Inout_ message< _Type > *_PMessage, _Inout_ ISource< _Type > *_PSource)
Synchronously passes a message from an ISource block to this unbounded_buffer messaging block...
Definition: agents.h:6177
_CONCRTIMP void __cdecl _Trace_agents(Agents_EventType _Type, __int64 _AgentId,...)
size_t _M_size
Definition: agents.h:425
The agent finished without being canceled.
Definition: agents.h:13259
_Reserving_node(ISource< _Type > *_PSource, size_t _Index, ITarget< size_t > *_PTarget, filter_method const &_Filter)
Constructs a _Reserving_node within the default scheduler, and places it on any schedule group of the...
Definition: agents.h:10247
void _Wait_on_ref()
Definition: agents.h:3935
size_t index()
Returns an index into the tuple representing the element selected by the choice messaging block...
Definition: agents.h:11592
virtual void link_target_notification(_Inout_ ITarget< _Output > *)
A callback that notifies that a new target has been linked to this transformer messaging block...
Definition: agents.h:7938
_Type & reference
Definition: agents.h:261
size_t _Count() const
Definition: agents.h:132
message< std::vector< _Type > > *__cdecl _Create_new_message()
Constructs a new message from the data output.
Definition: agents.h:9745
bool send(ITarget< _Type > &_Trg, const _Type &_Data)
A synchronous send operation, which waits until the target either accepts or declines the message...
Definition: agents.h:4366
virtual message_status send_message(_Inout_ message< _Input > *_PMessage, _Inout_ ISource< _Input > *_PSource)
Synchronously passes a message from an ISource block to this transformer messaging block...
Definition: agents.h:7803
bool has_value() const
Checks whether this single_assignment messaging block has been initialized with a value yet...
Definition: agents.h:8762
virtual message< _OutputType > * consume_message(runtime_object_identity _MsgId)
Consumes a message previously offered by the join messaging block and reserved by the target...
Definition: agents.h:9522
choice< std::tuple< _Type1, _Type2, _Types...> > make_choice(_Type1 _Item1, _Type2 _Item2, _Types..._Items)
Constructs a choice messaging block from an optional Scheduler or ScheduleGroup and two or more input...
Definition: agents.h:11969
multi_link_registry< ISource< _Type > > _SourceLinkRegistry
Definition: agents.h:10982
The timer has started and been paused.
Definition: agents.h:8139
A multitype_join messaging block is a multi-source, single-target messaging block that combines toget...
Definition: agents.h:12583
void _Delete_stored_messages()
Deletes all messages currently stored in this message block. Should be called by the destructor to en...
Definition: agents.h:9079
runtime_object_identity * _M_savedIds
Definition: agents.h:9938
virtual bool reserve_message(runtime_object_identity _MsgId)
Reserves a message previously offered by this single_assignment messaging block.
Definition: agents.h:8965
void _Populate_destination_tuple(_Destination_type &_Destination_tuple, ISource< size_t > **_Sources)
Copies payloads from all sources to destination tuple.
Definition: agents.h:12522
choice const & operator=(choice const &)
An unbounded_buffer messaging block is a multi-target, multi-source, ordered propagator_block capable...
Definition: agents.h:5918
void _Clear()
Definition: agents.h:307
bool _SpinOnce()
Spins for one time quantum,until a maximum spin is reached.
Definition: concrt.h:626
The message_processor class is the abstract base class for processing of message objects. There is no guarantee on the ordering of the messages.
Definition: agents.h:1930
bool _Release_received_message()
Called for a non_greedy type join block release a reservation on this block
Definition: agents.h:11253
overwrite_buffer const & operator=(overwrite_buffer const &)
unbounded_buffer(filter_method const &_Filter)
Constructs an unbounded_buffer messaging block.
Definition: agents.h:5965
multitype_join(_Type _Tuple)
Constructs a multitype_join messaging block.
Definition: agents.h:12609
_Queue()
Definition: agents.h:122
size_t _M_count
Definition: agents.h:9937
timer const & operator=(timer const &)
#define _InterlockedDecrementSizeT(_Target)
Definition: concrt.h:97
virtual ~ordered_message_processor()
Destroys the ordered_message_processor object.
Definition: agents.h:2057
std::tuple< typename std::remove_pointer_t< _Types >::source_type...> type
Definition: agents.h:11993
overwrite_buffer(filter_method const &_Filter)
Constructs an overwrite_buffer messaging block.
Definition: agents.h:6572
bool _internal_asend(ITarget< _Type > *_PTarget, _Type const &_Value)
Definition: agents.h:3643
virtual void resume_propagation()
Resumes propagation after a reservation has been released
Definition: agents.h:12201
virtual void link_target_notification(_Inout_ ITarget< std::vector< _Type >> *)
A callback that notifies that a new target has been linked to this join messaging block...
Definition: agents.h:9564
message< _Type > * _M_pMessage
Definition: agents.h:3679
virtual message< _Type > * accept(runtime_object_identity _MsgId, ITarget< _Type > *_PTarget)
Definition: agents.h:3757
virtual message< _Type > * accept(runtime_object_identity _MsgId, ITarget< _Type > *_PTarget)
Definition: agents.h:4053
The timer has been started.
Definition: agents.h:8134
_Target_registry _M_connectedTargets
Definition: agents.h:4262
message< _Type > const & operator=(message< _Type > const &)
virtual void propagate_to_any_targets(_Inout_ message< _Type > *_PMessage)
Places the message _PMessage in this overwrite_buffer messaging block and offers it to all of the li...
Definition: agents.h:7002
static void __cdecl _Agent_task_wrapper(void *data)
volatile bool _M_fDeclineMessages
A bool that is set to indicate that all messages should be declined in preparation for deleting the b...
Definition: agents.h:5897
virtual bool reserve_message(runtime_object_identity _MsgId)
Reserves a message previously offered by the source.
Definition: agents.h:12155
int i[4]
Definition: dvec.h:68
_Join_node(Scheduler &_PScheduler)
Constructs a join within the specified scheduler, and places it on any schedule group of the schedule...
Definition: agents.h:12037
::Concurrency::details::_NonReentrantPPLLock _M_asyncSendLock
A lock to use for queueing incoming messages.
Definition: agents.h:2384
single_link_registry< ITarget< size_t > > _TargetLinkRegistry
Definition: agents.h:10199
ScheduleGroup * _M_pScheduleGroup
Definition: agents.h:13514
virtual void link_source(_Inout_ ISource< _Type > *_PSource)=0
When overridden in a derived class, links a specified source block to this ITarget block...
virtual bool reserve(runtime_object_identity, ITarget< _Type > *)
Definition: agents.h:3585
virtual message_status send_message(_Inout_ message< _Type > *_PMessage, _Inout_ ISource< _Type > *_PSource)
Synchronously passes a message from an ISource block to this call messaging block. It is invoked by the send method, when called by a source block.
Definition: agents.h:7381
_Type dequeue()
Removes an item from the unbounded_buffer messaging block.
Definition: agents.h:6112
virtual void resume_propagation()
Resumes propagation after a reservation has been released.
Definition: agents.h:6968
virtual void process_input_messages(_Inout_ message< _Target_type > *)
Process input messages. This is only useful for propagator blocks, which derive from source_block ...
Definition: agents.h:5809
virtual void sync_send(_Inout_opt_ message< _Type > *_Msg)=0
When overridden in a derived class, places messages into the block synchronously. ...
virtual message_status propagate(_Inout_opt_ message< _Source_type > *_PMessage, _Inout_opt_ ISource< _Source_type > *_PSource)
Asynchronously passes a message from a source block to this target block.
Definition: agents.h:5641
virtual void propagate_to_any_targets(_Inout_opt_ message< _Destination_type > *)
Takes the message and propagates it to all the targets of this join block.
Definition: agents.h:12233
volatile bool _M_fIsInitialized
Definition: agents.h:10563
_Type const & const_reference
Definition: agents.h:262
A class intended to be used as a base class for all independent agents. It is used to hide state from...
Definition: agents.h:13275
An event type that represents the conclusion of some processing
Definition: concrt.h:5661
_MessageArray(size_t _NumInputs)
Definition: agents.h:9913
#define _In_z_
Definition: sal.h:310
message_status
The valid responses for an offer of a message object to a block.
Definition: agents.h:1740
virtual void link_target_notification(_Inout_ ITarget< _Type > *_PTarget)
A callback that notifies that a new target has been linked to this timer messaging block...
Definition: agents.h:8434
bool send(_Inout_ ITarget< _Type > *_Trg, const _Type &_Data)
A synchronous send operation, which waits until the target either accepts or declines the message...
Definition: agents.h:4337
#define _In_
Definition: sal.h:305
runtime_object_identity msg_id() const
Returns the ID of the message object.
Definition: agents.h:1861
virtual void _Reset()
Resets the _Reserving_node and prepares it for the next propagation
Definition: agents.h:10388
_In_ wctype_t _Type
Definition: corecrt_wctype.h:111
Definition: utility:384
volatile long _M_refCount
Definition: agents.h:1913
void _Invoke_unlink_source(ITarget< _Type > *_PUnlinkFrom)
Unlinks this source from a target.
Definition: agents.h:2773
_Call_method _M_pFunc
Definition: agents.h:7450
virtual void link_target(_Inout_ ITarget< size_t > *_PTarget)
Links a target block to this choice messaging block.
Definition: agents.h:11629
#define _Inout_opt_
Definition: sal.h:376
virtual message< _Target_type > * accept_message(runtime_object_identity _MsgId)=0
When overridden in a derived class, accepts an offered message by the source. Message blocks should o...
_Message * _Dequeue()
Definition: agents.h:204
message< _Type > * _M_pMessage
Definition: agents.h:9096
This class describes an exception thrown when an operation has timed out.
Definition: concrt.h:1712
message(_In_ message const *_Msg)
Constructs a message object.
Definition: agents.h:1840
_SourceLinkRegistry::type::source_type _Source_type
The type of the payload for the incoming messages to this target_block object.
Definition: agents.h:4453
_Join_node(ScheduleGroup &_PScheduleGroup)
Constructs a join within the specified schedule group. The scheduler is implied by the schedule group...
Definition: agents.h:12050
virtual message_status propagate_message(message< _Type > *_PMessage, ISource< _Type > *_PSource)
Asynchronously passes a message from an ISource block to this ITarget block. It is invoked by the pro...
Definition: agents.h:10418
single_link_registry< ITarget< size_t > > _TargetLinkRegistry
Definition: agents.h:10587
::Concurrency::details::_Queue< message< std::vector< _Type > > > _M_messageBuffer
Definition: agents.h:9961
runtime_object_identity _M_reservedId
Definition: agents.h:11387
single_link_registry< ITarget< size_t > > _TargetLinkRegistry
Definition: agents.h:10981
virtual void propagate_to_any_targets(_Inout_opt_ message< _Type > *_PMessage)
Places the message _PMessage in this single_assignment messaging block and offers it to all of the l...
Definition: agents.h:9052
virtual void unlink_targets()
When overridden in a derived class, unlinks all target blocks from this ISource block.
Definition: agents.h:4018
Definition: agents.h:4267
overwrite_buffer()
Constructs an overwrite_buffer messaging block.
Definition: agents.h:6550
_CONCRTIMP bool start()
Moves an agent from the agent_created state to the agent_runnable state, and schedules it for executi...
An overwrite_buffer messaging block is a multi-target, multi-source, ordered propagator_block capable...
Definition: agents.h:6527
Definition: agents.h:9908
multi_link_registry< ISource< _Type > > _SourceLinkRegistry
Definition: agents.h:6531
The target_block class is an abstract base class that provides basic link management functionality an...
Definition: agents.h:4446
void start()
Starts the timer messaging block. The specified number of milliseconds after this is called...
Definition: agents.h:8279
virtual void link_target_notification(_Inout_ ITarget< _Target_type > *)
A callback that notifies that a new target has been linked to this source_block object.
Definition: agents.h:5241
virtual void acquire_ref(_Inout_ ITarget< _Type > *)
When overridden in a derived class, acquires a reference count on this ISource block, to prevent deletion.
Definition: agents.h:4158
virtual bool reserve_message(runtime_object_identity _MsgId)
Reserves a message previously offered by this transformer messaging block.
Definition: agents.h:7875
transformer(_Transform_method const &_Func, _Inout_opt_ ITarget< _Output > *_PTarget=NULL)
Constructs a transformer messaging block.
Definition: agents.h:7514
virtual void resume_propagation()
Resumes propagation after a reservation has been released.
Definition: agents.h:9022
virtual void acquire_ref(_Inout_ ITarget< _Type > *)
When overridden in a derived class, acquires a reference count on this ISource block, to prevent deletion.
Definition: agents.h:3604
virtual message< _Type > * consume(runtime_object_identity _MsgId, ITarget< _Type > *_PTarget)
Definition: agents.h:3813
virtual void unlink_target(_Inout_ ITarget< _Destination_type > *_PTarget)
Unlinks a target block from this multitype_join messaging block.
Definition: agents.h:12755
virtual void resume_propagation()
Resumes propagation after a reservation has been released.
Definition: agents.h:6293
_Unwrap< _Type >::type _Destination_type
Definition: agents.h:12587
transformer const & operator=(transformer const &)
virtual void link_target_notification(_Inout_ ITarget< _Destination_type > *)
Notification that a target was linked to this source.
Definition: agents.h:12217
~choice()
Destroys the choice messaging block.
Definition: agents.h:11557
_Type value()
Gets a reference to the current payload of the message being stored in the overwrite_buffer messaging...
Definition: agents.h:6717
bool _M_fRepeating
Definition: agents.h:8473
bool _M_fDeclineMessages
A bool that is set to indicate that all messages should be declined in preparation for deleting the b...
Definition: agents.h:4848
virtual bool reserve_message(runtime_object_identity _MsgId)=0
When overridden in a derived class, reserves a message previously offered by this source_block object...
volatile long _M_queuedDataCount
A count of the current number of messages to process. Used as a flag to see if a new process message ...
Definition: agents.h:2391
::Concurrency::details::_ReentrantPPLLock _M_internalLock
Definition: agents.h:3964
virtual void _Fire()
Called when the timer fires.
Definition: agents.h:8494
virtual message< _Type > * consume(runtime_object_identity _MsgId, ITarget< _Type > *_PTarget)
Definition: agents.h:4104
void wait_for_async_sends()
Waits for all asynchronous propagations to complete.
Definition: agents.h:4800
::Concurrency::details::_ReentrantPPLLock::_Scoped_lock _R_lock
A lock holder that acquires a reentrant lock on instantiation and releases it on destruction ...
Definition: agents.h:65
static _CONCRTIMP void __cdecl wait_for_all(size_t _Count, _In_reads_(_Count) agent **_PAgents, _Out_writes_opt_(_Count) agent_status *_PStatus=NULL, unsigned int _Timeout=COOPERATIVE_TIMEOUT_INFINITE)
Waits for all of the specified agents to complete their tasks.
_SavedMessageIdArray(size_t _NumInputs)
Definition: agents.h:9940
std::vector< _Type > _OutputType
Definition: agents.h:9166
_Type type
A type alias for _Type .
Definition: agents.h:2539
This class describes an exception thrown when a messaging block is unable to find a requested message...
Definition: concrt.h:1544
virtual void acquire_ref(_Inout_ ITarget< size_t > *_PTarget)
Acquires a reference count on this choice messaging block, to prevent deletion.
Definition: agents.h:11750
virtual void unlink_targets()
Unlinks all targets from this choice messaging block.
Definition: agents.h:11654
long remove_ref()
Subtracts from the reference count for the message object. Used for message blocks that need referenc...
Definition: agents.h:1893
Definition: concrt.h:5341
multi_link_registry< ISource< size_t > > _SourceLinkRegistry
Definition: agents.h:12012
~call()
Destroys the call messaging block.
Definition: agents.h:7312
virtual void unlink_target_notification(_Inout_ ITarget< _Target_type > *_PTarget)
A callback that notifies that a target has been unlinked from this source_block object.
Definition: agents.h:5257
_Type type
A type alias for _Type .
Definition: agents.h:12730
void remove_sources()
Unlinks all sources after waiting for outstanding asynchronous send operations to complete...
Definition: agents.h:4815
_Type const payload
The payload of the message object.
Definition: agents.h:1870
The target tried to accept the message, but it was no longer available.
Definition: agents.h:1761
void _Initialize_joins()
Constructs and initializes a _Order_node for each tuple messaging block passed in.
Definition: agents.h:12896
multi_link_registry< ISource< _Type > > _SourceLinkRegistry
Definition: agents.h:10588
::Concurrency::details::_ReentrantPPLLock _M_internalLock
Definition: agents.h:4259
message< _Type > ** _M_messages
Definition: agents.h:9911
The target accepted the message.
Definition: agents.h:1746
multi_link_registry< ISource< _Input > > _SourceLinkRegistry
Definition: agents.h:7486
_CRT_BEGIN_C_HEADER typedef void(__CRTDECL *terminate_handler)()
message< _Type > * _M_pMessage
Definition: agents.h:3955
_In_ size_t _Out_opt_ int _In_z_ unsigned char const * _Src
Definition: mbstring.h:1039
single_link_registry< ITarget< _Destination_type > > _TargetLinkRegistry
Definition: agents.h:12011
multi_link_registry< ITarget< _Type > > _TargetLinkRegistry
Definition: agents.h:5921
virtual _CONCRTIMP ~agent()
Destroys the agent.
void enable_batched_processing()
Enables batched processing for this block.
Definition: agents.h:5397
_SyncOriginator()
Definition: agents.h:3694
volatile long _M_fCancelable
Definition: agents.h:13507
_CONCRTIMP ISource< agent_status > * status_port()
An asynchronous source of status information from the agent.
An event type that represents the scheduling of a process
Definition: concrt.h:5673
_Type const & value()
Gets a reference to the current payload of the message being stored.
Definition: agents.h:10026
::Concurrency::details::_ReentrantPPLLock _M_internalLock
Internal lock used for the following synchronization:
Definition: agents.h:5547
::Concurrency::details::_NonReentrantPPLLock::_Scoped_lock _NR_lock
A lock holder that acquires a non-reentrant lock on instantiation and releases it on destruction...
Definition: agents.h:58
volatile long _M_refcount
Definition: agents.h:4256
ScheduleGroup * _M_pScheduleGroup
Definition: agents.h:12986
virtual void acquire_ref(_Inout_ ITarget< _Target_type > *)
Acquires a reference count on this source_block object, to prevent deletion.
Definition: agents.h:5192
A join messaging block is a single-target, multi-source, ordered propagator_block which combines toge...
Definition: agents.h:9154
void _Sync_send_helper(message< _Type > *_Msg)
Definition: agents.h:2243
virtual ~_SyncOriginator()
Definition: agents.h:3702
virtual void link_target(ITarget< _Type > *_PTarget)
Definition: agents.h:3911
message< _Type > * _M_pReservedMessage
Definition: agents.h:7079
void _Delete_stored_messages()
Deletes all messages currently stored in this message block. Should be called by the destructor to en...
Definition: agents.h:12503
The ISource class is the interface for all source blocks. Source blocks propagate messages to ITarget...
Definition: agents.h:452
_MessageProcessorType _M_messageProcessor
The message_processor for this target_block.
Definition: agents.h:4854
virtual void resume_propagation()
Resumes propagation after a reservation has been released.
Definition: agents.h:8421
_Type type
A type alias for _Type .
Definition: agents.h:2028
_SourceLinkManager _M_connectedSources
The container for all the sources connected to this block.
Definition: agents.h:5884
bool has_value() const
Checks whether this choice messaging block has been initialized with a value yet. ...
Definition: agents.h:11576
single_link_registry< ITarget< _Type > > _Target_registry
Definition: agents.h:3691
virtual message_status propagate_message(_Inout_ message< _Type > *_PMessage, _Inout_ ISource< _Type > *_PSource)
Asynchronously passes a message from an ISource block to this unbounded_buffer messaging block...
Definition: agents.h:6139
virtual message< _Type > * consume(runtime_object_identity, ITarget< _Type > *)
Definition: agents.h:3591
virtual message< size_t > * consume_message(runtime_object_identity)
Consumes a message previously offered by the source and reserved by the target, transferring ownershi...
Definition: agents.h:10079
virtual void propagate_to_any_targets(_Inout_opt_ message< size_t > *)
Takes the message and propagates it to all the targets of this _Order_node
Definition: agents.h:10502
void initialize_target(_Inout_opt_ Scheduler *_PScheduler=NULL, _Inout_opt_ ScheduleGroup *_PScheduleGroup=NULL)
Initializes the base object. Specifically, the message_processor object needs to be initialized...
Definition: agents.h:4738
#define false
Definition: stdbool.h:16
_Greedy_node(ScheduleGroup &_PScheduleGroup, ISource< _Type > *_PSource, size_t _Index, ITarget< size_t > *_PTarget=NULL)
Constructs a _Greedy_node within the specified schedule group. The scheduler is implied by the schedu...
Definition: agents.h:10711
_Non_greedy_node const & operator=(_Non_greedy_node const &)
virtual void unlink_sources()
Unlinks all source blocks from this propagator_block object.
Definition: agents.h:5792
bool enqueue(_Type const &_Item)
Adds an item to the unbounded_buffer messaging block.
Definition: agents.h:6100
virtual void unlink_targets()
Unlinks all target blocks from this source_block object.
Definition: agents.h:4978
void * _InterlockedCompareExchangePointer(void *volatile *, void *, void *)
const unsigned int COOPERATIVE_TIMEOUT_INFINITE
Value indicating that a wait should never time out.
Definition: concrt.h:3478
virtual bool reserve_message(runtime_object_identity _MsgId)
Reserves a message previously offered by this unbounded_buffer messaging block.
Definition: agents.h:6247
virtual message< _Destination_type > * accept_message(runtime_object_identity _MsgId)
Accepts an offered message by the source, transferring ownership to the caller.
Definition: agents.h:12125
ScheduleGroup * _M_pScheduleGroup
The schedule group to process messages on
Definition: agents.h:2403
virtual void link_target(_Inout_ ITarget< _Target_type > *_PTarget)
Links a target block to this source_block object.
Definition: agents.h:4932
bool _M_fReferencedScheduler
Definition: agents.h:8476
_Greedy_node(Scheduler &_PScheduler, ISource< _Type > *_PSource, size_t _Index, ITarget< size_t > *_PTarget=NULL)
Constructs a _Greedy_node within the specified scheduler, and places it on any schedule group of the ...
Definition: agents.h:10659
virtual void acquire_ref(_Inout_ ITarget< _Type > *)
When overridden in a derived class, acquires a reference count on this ISource block, to prevent deletion.
Definition: agents.h:3853
_CONCRTIMP _Timer(unsigned int _Ms, bool _FRepeating)
message_status _Propagate_to_target(ITarget< size_t > *_PTarget)
Propagate messages to the given target
Definition: agents.h:10522
long _Process_message_helper()
Definition: agents.h:2273
void initialize_source(_Inout_opt_ Scheduler *_PScheduler=NULL, _Inout_opt_ ScheduleGroup *_PScheduleGroup=NULL)
Initializes the message_propagator within this source_block.
Definition: agents.h:5383
_CONCRTIMP bool cancel()
Moves an agent from either the agent_created or agent_runnable states to the agent_canceled state...
bool has_value() const
Checks whether this block has been initialized yet.
Definition: agents.h:10014
_Type const & value()
Gets a reference to the current payload of the message being stored in the single_assignment messagin...
Definition: agents.h:8778
An event type that represents the initiation of some processing
Definition: concrt.h:5655
_In_reads_(_N) wchar_t const *_S2
An event type that represents the deletion of an object
Definition: concrt.h:5667
message< size_t > * _M_pSendMessage
Definition: agents.h:10172
void Trace_agents_register_name(_Inout_ _Type *_PObject, _In_z_ const wchar_t *_Name)
Associates the given name to the message block or agent in the ETW trace.
Definition: agents.h:13536
virtual void link_target_notification(_Inout_ ITarget< _Type > *_PTarget)
A callback that notifies that a new target has been linked to this unbounded_buffer messaging block...
Definition: agents.h:6316
~unbounded_buffer()
Destroys the unbounded_buffer messaging block.
Definition: agents.h:6081
virtual void async_send(_Inout_opt_ message< _Type > *_Msg)
Asynchronously queues up messages and starts a processing task, if this has not been done already...
Definition: agents.h:2127
_CONCRTIMP agent()
Constructs an agent.
virtual void release_ref(_Inout_ ITarget< _Type > *)
When overridden in a derived class, releases a reference count on this ISource block.
Definition: agents.h:3858
::Concurrency::details::_NonReentrantPPLLock _M_propagationLock
Definition: agents.h:10950
agent_status
The valid states for an agent.
Definition: agents.h:13239
multi_link_registry< ITarget< _Type > > _TargetLinkRegistry
Definition: agents.h:6530
bool _internal_send(ITarget< _Type > *_PTarget, _Type const &_Value)
Definition: agents.h:4181
virtual void release_message(runtime_object_identity _MsgId)
Releases a previous message reservation.
Definition: agents.h:7908
_Join_node< _Type, _Destination_type, _Jtype > * _M_pJoinNode
Definition: agents.h:12977
unbounded_buffer const & operator=(unbounded_buffer const &)
virtual void unlink_targets()
Unlinks all targets from this multitype_join messaging block.
Definition: agents.h:12764
_Non_greedy_node(ScheduleGroup &_PScheduleGroup, ISource< _Type > *_PSource, size_t _Index, ITarget< size_t > *_PTarget, filter_method const &_Filter)
Constructs a _Non_greedy_node within the specified schedule group. The scheduler is implied by the sc...
Definition: agents.h:11137
volatile size_t _M_messagesRemaining
Definition: agents.h:9904
_Type _M_value
Definition: agents.h:8470
~_Dynamic_array()
Definition: agents.h:275
_Non_greedy_node(ISource< _Type > *_PSource, size_t _Index, ITarget< size_t > *_PTarget, filter_method const &_Filter)
Constructs a _Non_greedy_node within the default scheduler, and places it on any schedule group of th...
Definition: agents.h:11029
virtual message< _Output > * consume_message(runtime_object_identity _MsgId)
Consumes a message previously offered by the transformer and reserved by the target, transferring ownership to the caller.
Definition: agents.h:7895
bool _Remove(_Message *_OldElement)
Definition: agents.h:156
void _Propagate_priority_order(::Concurrency::details::_Queue< message< _Target_type >> &_MessageBuffer)
Propagates messages in priority order.
Definition: agents.h:6409
virtual void link_target_notification(_Inout_ ITarget< size_t > *)
Notification that a target was linked to this source.
Definition: agents.h:10120
Definition: tuple:220
void _Acquire_ref()
Definition: agents.h:4236
typename std::remove_pointer_t< std::tuple_element_t< _Index, _Type >>::source_type _Source_type
Definition: agents.h:12883
_CONCRTIMP bool done()
Moves an agent into the agent_done state, indicating that the agent has completed.
void _Create_send_message()
Create a message that contains an index used to determine the source message
Definition: agents.h:10132
virtual void initialize_batched_processing(_Handler_method const &_Processor, _Propagator_method const &_Propagator)
Initialize batched message processing
Definition: agents.h:2095
virtual ::Concurrency::runtime_object_identity _GetId() const
Definition: agents.h:94
virtual message_status propagate_message(message< _Type > *_PMessage, ISource< _Type > *_PSource)
Asynchronously passes a message from an ISource block to this ITarget block. It is invoked by the pro...
Definition: agents.h:10848
_Target_registry _M_connectedTargets
Definition: agents.h:3967
message * _M_pNext
Definition: agents.h:1907
void register_filter(filter_method const &_Filter)
Registers a filter method that will be invoked on every message received.
Definition: agents.h:4705
constexpr auto data(_Container &_Cont) -> decltype(_Cont.data())
Definition: xutility:1512
A single_assignment messaging block is a multi-target, multi-source, ordered propagator_block capable...
Definition: agents.h:8589
virtual void unlink_target(ITarget< _Type > *_PTarget)
Definition: agents.h:3712
_CONCRTIMP size_t wait(unsigned int _Timeout=COOPERATIVE_TIMEOUT_INFINITE)
Waits for the event to become signaled.
memcpy(_Destination, _Source, _SourceSize)
bool _internal_send(ITarget< _Type > *_PTarget, _Type const &_Value)
Definition: agents.h:3618
_Handler_method _M_handler
A message handler object which exposes the callback to be invoked
Definition: agents.h:2422
_SourceLinkManager _M_connectedSources
The container for all the sources connected to this block.
Definition: agents.h:4835
virtual void release(runtime_object_identity _MsgId, ITarget< _Type > *_PTarget)
Definition: agents.h:4136
State
Tracks the state machine of the timer.
Definition: agents.h:8123
Base class for Helper node used in multi-type join and choice blocks Order node is a single-target...
Definition: agents.h:9980
_Non_greedy_node(ISource< _Type > *_PSource, size_t _Index, ITarget< size_t > *_PTarget=NULL)
Constructs a _Non_greedy_node within the default scheduler, and places it on any schedule group of th...
Definition: agents.h:11004
void wait_for_outstanding_async_sends()
Waits for all asynchronous propagations to complete. This propagator-specific spin wait is used in de...
Definition: agents.h:5446
size_t _M_index
Definition: agents.h:10175
void _Invoke_handler(long _Count)
Definition: agents.h:2320
_In_ _Value
Definition: corecrt_wstdlib.h:65
transformer(_Transform_method const &_Func, _Inout_opt_ ITarget< _Output > *_PTarget, filter_method const &_Filter)
Constructs a transformer messaging block.
Definition: agents.h:7550
_Type type
A type alias for _Type .
Definition: agents.h:11567
virtual message< _Destination_type > * accept(runtime_object_identity _MsgId, _Inout_ ITarget< _Destination_type > *_PTarget)
Accepts a message that was offered by this multitype_join block, transferring ownership to the caller...
Definition: agents.h:12782
void _Delete_stored_messages()
Deletes all messages currently stored in this message block. Should be called by the destructor to en...
Definition: agents.h:8046
virtual void unlink_targets()=0
When overridden in a derived class, unlinks all target blocks from this ISource block.
Scheduler * _M_pScheduler
Definition: agents.h:8479
virtual void unlink_targets()
When overridden in a derived class, unlinks all target blocks from this ISource block.
Definition: agents.h:3733
_SourceLinkManager::iterator source_iterator
The type of the iterator for the source_link_manager for this propagator_block.
Definition: agents.h:5597
runtime_object_identity * _M_savedIdBuffer
Definition: agents.h:9955
runtime_object_identity _M_reservedId
Reserved message ID
Definition: agents.h:5479
_Type receive(_Inout_ ISource< _Type > *_Src, unsigned int _Timeout=COOPERATIVE_TIMEOUT_INFINITE)
A general receive implementation, allowing a context to wait for data from exactly one source and fil...
Definition: agents.h:3063
virtual message< _Type > * consume_message(runtime_object_identity _MsgId)
Consumes a message previously offered by the unbounded_buffer messaging block and reserved by the tar...
Definition: agents.h:6267
_Pre_maybenull_ _Inout_ _Deref_prepost_z_ wchar_t const _PSource
Definition: wchar.h:148
ITarget< _Type > * _M_pTarget
Definition: agents.h:3682
std::function< void(message< _Type > *)> _Handler_method
The signature of the callback method invoked while processing messages.
Definition: agents.h:2016
size_t _M_count
Definition: agents.h:9910
message< _Destination_type > * _Create_send_message()
Called when all the source messaging blocks have received their messages. The payloads are copied int...
Definition: agents.h:12451
virtual void resume_propagation()
Resumes propagation after a reservation has been released.
Definition: agents.h:7921
virtual void release(runtime_object_identity, ITarget< _Type > *)
Definition: agents.h:3599
virtual message_status propagate_message(_Inout_ message< _Type > *_PMessage, _Inout_ ISource< _Type > *_PSource)
Asynchronously passes a message from an ISource block to this call messaging block. It is invoked by the propagate method, when called by a source block.
Definition: agents.h:7338
message< _Type > * _NewMessage() const
Allocates a new message.
Definition: agents.h:8485
virtual void release_message(runtime_object_identity _MsgId)
Releases a previous message reservation.
Definition: agents.h:9535
_CONCRTIMP void set()
Signals the event.
virtual void link_target(ITarget< _Type > *_PTarget)
Definition: agents.h:4218
void _Initialize(size_t _NumInputs, Scheduler *_PScheduler=NULL, ScheduleGroup *_PScheduleGroup=NULL)
Initializes the join messaging block.
Definition: agents.h:9858
void initialize(_Inout_opt_ Scheduler *_PScheduler, _Inout_opt_ ScheduleGroup *_PScheduleGroup, _Handler_method const &_Handler)
Initializes the ordered_message_processor object with the appropriate callback function, scheduler and schedule group.
Definition: agents.h:2078
virtual void release_ref(_Inout_ ITarget< _Type > *)
When overridden in a derived class, releases a reference count on this ISource block.
Definition: agents.h:4163
virtual void propagate_to_any_targets(_Inout_opt_ message< _Type > *)
Tries to offer the message produced by the timer block to all of the linked targets.
Definition: agents.h:8448
_Reserving_node(ScheduleGroup &_PScheduleGroup, ISource< _Type > *_PSource, size_t _Index, ITarget< size_t > *_PTarget=NULL)
Constructs a _Order_node within the specified schedule group. The scheduler is implied by the schedul...
Definition: agents.h:10327
volatile message_status _M_fStatus
Definition: agents.h:3961
virtual void release(runtime_object_identity _MsgId, ITarget< _Type > *_PTarget)
Definition: agents.h:3831
void initialize_source_and_target(_Inout_opt_ Scheduler *_PScheduler=NULL, _Inout_opt_ ScheduleGroup *_PScheduleGroup=NULL)
Initializes the base object. Specifically, the message_processor object needs to be initialized...
Definition: agents.h:5827
long __cdecl _InterlockedDecrement(long volatile *)
A choice messaging block is a multi-source, single-target block that represents a control-flow intera...
Definition: agents.h:11422
virtual message_status propagate_message(message< _Type > *_PMessage, ISource< _Type > *)
Asynchronously passes a message from an ISource block to this ITarget block. It is invoked by the pro...
Definition: agents.h:11302
virtual message< _Type > * accept_message(runtime_object_identity _MsgId)
Accepts a message that was offered by this timer messaging block, transferring ownership to the calle...
Definition: agents.h:8339
void _Wait_on_ref(long _RefCount=0)
Definition: agents.h:5529
virtual message_status propagate(_Inout_opt_ message< _Type > *_PMessage, _Inout_opt_ ISource< _Type > *_PSource)=0
When overridden in a derived class, asynchronously passes a message from a source block to this targe...
void _Delete_stored_messages()
Deletes all messages currently stored in this message block. Should be called by the destructor to en...
Definition: agents.h:9884
filter_method * _M_pFilter
The filter function which determines whether offered messages should be accepted. ...
Definition: agents.h:5890
~overwrite_buffer()
Destroys the overwrite_buffer messaging block.
Definition: agents.h:6685
virtual void process_input_messages(_Inout_ message< _Type > *_PMessage)
Executes the call function on the input messages.
Definition: agents.h:7435
~_SavedMessageIdArray()
Definition: agents.h:9947
static bool _asend(ITarget< _Type > *_Trg, const _Type &_Data)
Definition: agents.h:4294
virtual bool reserve(runtime_object_identity _MsgId, ITarget< _Type > *_PTarget)
Definition: agents.h:4082
#define _CONCRTIMP
Definition: crtdefs.h:48
virtual void propagate_to_any_targets(_Inout_opt_ message< size_t > *)
Takes the message and propagates it to all the targets of this _Greedy_node
Definition: agents.h:10922
virtual void release_message(runtime_object_identity)
Releases a previous message reservation.
Definition: agents.h:10094
single_assignment const & operator=(single_assignment const &)
void * _M_pSourceJoins[std::tuple_size< _Type >::value]
Definition: agents.h:12974
std::function< bool(_Type const &)> filter_method
The signature of any method used by the block that returns a bool value to determine whether an offer...
Definition: agents.h:2546
virtual void unlink_targets()
When overridden in a derived class, unlinks all target blocks from this ISource block.
Definition: agents.h:3554
_AsyncOriginator()
Definition: agents.h:4174
virtual message_status send(_Inout_ message< _Type > *_PMessage, _Inout_ ISource< _Type > *_PSource)=0
When overridden in a derived class, synchronously passes a message to the target block.
virtual message< size_t > * accept_message(runtime_object_identity _MsgId)
Accept the message by making a copy of the payload.
Definition: agents.h:10892
void _Release_ref()
Definition: agents.h:4243
_Diff _Count
Definition: algorithm:1941
_Greedy_node(ISource< _Type > *_PSource, size_t _Index, ITarget< size_t > *_PTarget, filter_method const &_Filter)
Constructs a _Greedy_node within the default scheduler, and places it on any schedule group of the sc...
Definition: agents.h:10634
Definition: concrt.h:279
_Myt & operator=(const _Myt &_Right)
Definition: agents.h:283
single_link_registry< ITarget< std::vector< _Type > > > _TargetLinkRegistry
Definition: agents.h:9157
call(_Call_method const &_Func)
Constructs a call messaging block.
Definition: agents.h:7144
#define _Inout_
Definition: sal.h:375
virtual bool reserve(runtime_object_identity _MsgId, _Inout_ ITarget< size_t > *_PTarget)
Reserves a message previously offered by this choice messaging block.
Definition: agents.h:11696
virtual bool reserve_message(runtime_object_identity)
Reserves a message previously offered by the source.
Definition: agents.h:10057
_Type _Receive_impl(ISource< _Type > *_Src, unsigned int _Timeout, typename ITarget< _Type >::filter_method const *_Filter_proc)
A general receive implementation, allowing a context to wait for data from exactly one source and fil...
Definition: agents.h:2808
void register_filter(filter_method const &_Filter)
Registers a filter method that will be invoked on every received message.
Definition: agents.h:5842
void _Propagate_priority_order(::Concurrency::details::_Queue< message< _Target_type >> &_MessageBuffer)
Propagate messages in priority order
Definition: agents.h:12398
virtual message_status send(_Inout_ message< _Source_type > *_PMessage, _Inout_ ISource< _Source_type > *_PSource)
Synchronously initiates a message to this block. Called by an ISource block. When this function compl...
Definition: agents.h:5689
~_Queue()
Definition: agents.h:127
_CONCRTIMP unsigned int _Reference()
virtual void link_target_notification(_Inout_ ITarget< _Type > *_PTarget)
A callback that notifies that a new target has been linked to this single_assignment messaging block...
Definition: agents.h:9035
virtual void propagate_output_messages()
Places the message _PMessage in this unbounded_buffer messaging block and tries to offer it to all o...
Definition: agents.h:6369
volatile bool _M_fIsInitialized
Definition: agents.h:7082
virtual message_status propagate_message(message< size_t > *_PMessage, ISource< size_t > *_PSource)
Asynchronously passes a message from an ISource block to this ITarget block. It is invoked by the pro...
Definition: agents.h:12085
virtual void process_incoming_message()
The processing function that is called asynchronously. It dequeues messages and begins processing the...
Definition: agents.h:2218
virtual message_status propagate_message(_Inout_ message< _Type > *_PMessage, _Inout_ ISource< _Type > *_PSource)
Asynchronously passes a message from an ISource block to this overwrite_buffer messaging block...
Definition: agents.h:6743
_Non_greedy_node(ScheduleGroup &_PScheduleGroup, ISource< _Type > *_PSource, size_t _Index, ITarget< size_t > *_PTarget=NULL)
Constructs a _Non_greedy_node within the specified schedule group. The scheduler is implied by the sc...
Definition: agents.h:11109
virtual message_status propagate_message(_Inout_ message< _Input > *_PMessage, _Inout_ ISource< _Input > *_PSource)
Asynchronously passes a message from an ISource block to this transformer messaging block...
Definition: agents.h:7758
void _Delete_stored_messages()
Deletes all messages currently stored in this message block. Should be called by the destructor to en...
Definition: agents.h:7044
runtime_object_identity _M_savedId
Definition: agents.h:10957
_Message * _M_pHead
Definition: agents.h:110
Defines a block allowing sources of distinct types to be joined. Join node is a single-target, multi-source ordered propagator block
Definition: agents.h:12008
concurrent_queue< message< _Input > * > _M_inputMessages
Definition: agents.h:8079
virtual void link_target_notification(_Inout_ ITarget< _Type > *_PTarget)
A callback that notifies that a new target has been linked to this overwrite_buffer messaging block...
Definition: agents.h:6981
_Greedy_node(ScheduleGroup &_PScheduleGroup, ISource< _Type > *_PSource, size_t _Index, ITarget< size_t > *_PTarget, filter_method const &_Filter)
Constructs a _Greedy_node within the specified schedule group. The scheduler is implied by the schedu...
Definition: agents.h:10738
const_reference operator[](size_t _Pos) const
Definition: agents.h:345
~_Greedy_node()
Cleans up any resources that may have been created by the _Greedy_node.
Definition: agents.h:10750
source_link_manager< _SourceLinkRegistry > _SourceLinkManager
The type of the source_link_manager this propagator_block.
Definition: agents.h:5591
void _Init()
Definition: agents.h:385
void remove_network_links()
Removes all the source and target network links from this propagator_block object.
Definition: agents.h:5866
void _Delete_stored_messages()
Deletes all messages currently stored in this message block. Should be called by the destructor to en...
Definition: agents.h:6462
single_assignment< size_t > * _M_pSingleAssignment
Definition: agents.h:11839
_Message type
Definition: agents.h:119
void _Reset()
Resets the _Order_node and prepares it for the next propagation
Definition: agents.h:11170
_Join_node()
Constructs a join within the default scheduler, and places it on any schedule group of the scheduler'...
Definition: agents.h:12024
_SavedMessageIdArray _M_savedMessageIdArray
Definition: agents.h:9952
virtual message< _Type > * accept_message(runtime_object_identity _MsgId)
Accepts a message that was offered by this unbounded_buffer messaging block, transferring ownership t...
Definition: agents.h:6217
virtual message_status send_message(_Inout_ message< _Source_type > *, _Inout_ ISource< _Source_type > *)
When overridden in a derived class, this method synchronously passes a message from an ISource block ...
Definition: agents.h:4624
A call messaging block is a multi-source, ordered target_block that invokes a specified function when...
Definition: agents.h:7112
ITarget< _Target_type > * _M_pReservedFor
Connected target that is holding a reservation
Definition: agents.h:5473
~timer()
Destroys a timer messaging block.
Definition: agents.h:8249
virtual ~propagator_block()
Destroys a propagator_block object.
Definition: agents.h:5614
long __cdecl _InterlockedIncrement(long volatile *)
join(size_t _NumInputs, filter_method const &_Filter)
Constructs a join messaging block.
Definition: agents.h:9210
bool asend(ITarget< _Type > &_Trg, const _Type &_Data)
An asynchronous send operation, which schedules a task to propagate the value to the target block...
Definition: agents.h:4423
virtual void propagate_to_any_targets(_Inout_opt_ message< _Target_type > *_PMessage)
When overridden in a derived class, propagates the given message to any or all of the linked targets...
Definition: agents.h:5362
virtual ~message()
Destroys the message object.
Definition: agents.h:1852
call const & operator=(call const &)
virtual void resume_propagation()
Resumes propagation after a reservation has been released.
Definition: agents.h:9548
virtual message_status send_message(_Inout_ message< _Type > *_PMessage, _Inout_ ISource< _Type > *_PSource)
Synchronously passes a message from an ISource block to this overwrite_buffer messaging block...
Definition: agents.h:6787
virtual message_status send_message(_Inout_ message< _Source_type > *, _Inout_ ISource< _Source_type > *)
When overridden in a derived class, this method synchronously passes a message from an ISource block ...
Definition: agents.h:5751
source_block()
Constructs a source_block object.
Definition: agents.h:4899
_Transform_method _M_pFunc
Definition: agents.h:8076
const size_t COOPERATIVE_WAIT_TIMEOUT
Value indicating that a wait timed out.
Definition: concrt.h:3469
single_link_registry< ITarget< _Type > > _TargetLinkRegistry
Definition: agents.h:8114
_Dynamic_array< _Type > _Myt
Definition: agents.h:259
_CONCRTIMP::Concurrency::Scheduler * _GetScheduler()
Definition: concrt.h:382
message< _Type > * _M_pMessage
Definition: agents.h:4253
long add_ref()
Adds to the reference count for the message object. Used for message blocks that need reference count...
Definition: agents.h:1880
virtual void unlink_sources()=0
When overridden in a derived class, unlinks all source blocks from this ITarget block.
virtual void release_message(runtime_object_identity _MsgId)
Releases a previous message reservation.
Definition: agents.h:6280
virtual void wait()=0
When overridden in a derived class, waits for all asynchronous operations to complete.
#define SIZE_MAX
Definition: limits.h:76
_Order_node_base()
Constructs a _Order_node_base within the default scheduler, and places it on any schedule group of th...
Definition: agents.h:9988
::Concurrency::details::_ReentrantPPLLock _M_resetLock
Definition: agents.h:11381
virtual bool reserve(runtime_object_identity _MsgId, _Inout_ ITarget< _Target_type > *_PTarget)
Reserves a message previously offered by this source_block object.
Definition: agents.h:5052
bool _Try_receive_impl(ISource< _Type > *_Src, _Type &_value, typename ITarget< _Type >::filter_method const *_Filter_proc)
Helper function that implements try_receive A general try-receive implementation, allowing a context ...
Definition: agents.h:3199
_Handler_method _M_processor
A message processing object which exposes the callback to be invoked
Definition: agents.h:2428
_Result
Definition: corecrt_wconio.h:362
virtual message< _Type > * consume(runtime_object_identity _MsgId, _Inout_ ITarget< _Type > *_PTarget)=0
When overridden in a derived class, consumes a message previously offered by this ISource block and s...
virtual bool reserve(runtime_object_identity _MsgId, _Inout_ ITarget< _Destination_type > *_PTarget)
Reserves a message previously offered by this multitype_join messaging block.
Definition: agents.h:12806
::Concurrency::details::_Queue< message< _Type > > _M_messageBuffer
Message queue used to store messages
Definition: agents.h:6491
virtual message_status send(_Inout_ message< _Source_type > *_PMessage, _Inout_ ISource< _Source_type > *_PSource)
Synchronously passes a message from a source block to this target block.
Definition: agents.h:4562
void sync_send(_Inout_opt_ message< _Source_type > *_PMessage)
Synchronously send a message for processing.
Definition: agents.h:4787
virtual void process_input_messages(_Inout_ message< _Target_type > *)
Process input messages. This is only useful for propagator blocks, which derive from source_block ...
Definition: agents.h:5340
bool has_value() const
Checks whether this overwrite_buffer messaging block has a value yet.
Definition: agents.h:6701
ordered_message_processor()
Constructs an ordered_message_processor object.
Definition: agents.h:2038
virtual void release_ref(_Inout_ ITarget< _Type > *_PTarget)=0
When overridden in a derived class, releases a reference count on this ISource block.
virtual ~ITarget()
Destroys the ITarget object.
Definition: agents.h:2468
virtual message< _Destination_type > * consume_message(runtime_object_identity _MsgId)
Consumes a message previously offered by the source and reserved by the target, transferring ownershi...
Definition: agents.h:12175
virtual void process_message(message< _Source_type > *)
When overridden in a derived class, processes a message that was accepted by this target_block object...
Definition: agents.h:4689
static void __cdecl _Process_incoming_message_wrapper(void *_Data)
Wrapper for process_incoming_message suitable for use as a argument to CreateThread and other similar...
Definition: agents.h:1993
virtual void process_input_messages(_Inout_ message< _Type > *_PMessage)
Places the message _PMessage in this unbounded_buffer messaging block and tries to offer it to all o...
Definition: agents.h:6347
::Concurrency::details::_NonReentrantPPLLock _M_propagationLock
Definition: agents.h:9958
long __cdecl _InterlockedCompareExchange(long volatile *, long, long)
single_link_registry< ITarget< _Type > > _Target_registry
Definition: agents.h:3978
virtual bool reserve_message(runtime_object_identity _MsgId)
Reserves a message previously offered by this timer messaging block.
Definition: agents.h:8366
multi_link_registry< ISource< _Type > > _SourceLinkRegistry
Definition: agents.h:9158
static const int _S_growthFactor
Definition: agents.h:427
virtual ~_AnonymousOriginator()
Definition: agents.h:3542
virtual void release_message(runtime_object_identity _MsgId)
Releases a previous message reservation.
Definition: agents.h:12188
::Concurrency::details::_NonReentrantPPLLock _M_propagationLock
Definition: agents.h:9099
A transformer messaging block is a single-target, multi-source, ordered propagator_block which can ac...
Definition: agents.h:7481
virtual void sync_send(_Inout_opt_ message< _Type > *_Msg)
Synchronously queues up messages and starts a processing task, if this has not been done already...
Definition: agents.h:2109
message< _Type > * _M_pMessage
Definition: agents.h:8464
void * _M_pSourceChoices[std::tuple_size< _Type >::value]
Definition: agents.h:11836
virtual void resume_propagation()
Resumes propagation after a reservation has been released
Definition: agents.h:10107
virtual void async_send(_Inout_opt_ message< _Type > *_Msg)=0
When overridden in a derived class, places messages into the block asynchronously.
virtual message< _Type > * consume_message(runtime_object_identity _MsgId)
Consumes a message previously offered by the overwrite_buffer messaging block and reserved by the tar...
Definition: agents.h:6914
void _Propagate_priority_order(::Concurrency::details::_Queue< message< _Target_type >> &_MessageBuffer)
Propagates messages in priority order.
Definition: agents.h:7993
filter_method * _M_pFilter
The filter function which determines whether offered messages should be accepted. ...
Definition: agents.h:4841
virtual message_status propagate(_Inout_opt_ message< _Source_type > *_PMessage, _Inout_opt_ ISource< _Source_type > *_PSource)
Asynchronously passes a message from a source block to this target block.
Definition: agents.h:4511
::Concurrency::details::_Queue< message< _Type > > _M_processedMessages
Message queue used to store processed messages
Definition: agents.h:6485
The concurrent_queue class is a sequence container class that allows first-in, first-out access to it...
Definition: concurrent_queue.h:55
virtual void unlink_target(ITarget< _Type > *_PTarget)
Definition: agents.h:3989
message(message const &_Msg)
Constructs a message object.
Definition: agents.h:1827
void _Grow(size_t _NewSize)
Definition: agents.h:395
multi_link_registry< ISource< _Type > > _SourceLinkRegistry
Definition: agents.h:8593
The propagator_block class is an abstract base class for message blocks that are both a source and ta...
Definition: agents.h:5578
_Reserving_node(Scheduler &_PScheduler, ISource< _Type > *_PSource, size_t _Index, ITarget< size_t > *_PTarget, filter_method const &_Filter)
Constructs a _Reserving_node within the specified scheduler, and places it on any schedule group of t...
Definition: agents.h:10301
Greedy join messaging blocks immediately accept a message upon propagation. This is more efficient...
Definition: agents.h:9126
bool try_receive(_Inout_ ISource< _Type > *_Src, _Type &_value)
A general try-receive implementation, allowing a context to look for data from exactly one source and...
Definition: agents.h:3421
virtual void process_message(_Inout_ message< _Type > *_PMessage)
Processes a message that was accepted by this call messaging block.
Definition: agents.h:7425
constexpr const _Ty &() _Right
Definition: algorithm:3723
virtual void release_message(runtime_object_identity _MsgId)
Releases a previous message reservation.
Definition: agents.h:9008
virtual void async_send(_Inout_opt_ message< _Target_type > *_Msg)
Asynchronously queues up messages and starts a propagation task, if this has not been done already ...
Definition: agents.h:5435
The agent has been created but not started.
Definition: agents.h:13244
virtual void propagate_output_messages()
Propagate messages to targets.
Definition: agents.h:5349
void remove_targets()
Removes all target links for this source block. This should be called from the destructor.
Definition: agents.h:5455
multi_link_registry< ITarget< _Type > > _TargetLinkRegistry
Definition: agents.h:8592
std::function< _Output(_Input const &)> _Transform_method
Definition: agents.h:7484
bool _Try_consume_source_messages(_Destination_type &_Destination_tuple, ISource< size_t > **_Sources)
Tries to reserve from all sources. If successful, it will consume all the messages ...
Definition: agents.h:12287
virtual void release_ref(_Inout_ ITarget< _Destination_type > *_PTarget)
Releases a reference count on this multiple_join messaging block.
Definition: agents.h:12876
_Greedy_node(ISource< _Type > *_PSource, size_t _Index, ITarget< size_t > *_PTarget=NULL)
Constructs a _Greedy_node within the default scheduler, and places it on any schedule group of the sc...
Definition: agents.h:10610
_Reserving_node(ISource< _Type > *_PSource, size_t _Index, ITarget< size_t > *_PTarget=NULL)
Constructs a _Reserving_node within the default scheduler, and places it on any schedule group of the...
Definition: agents.h:10222
#define NULL
Definition: corecrt.h:158
The agent has started.
Definition: agents.h:13254
message< _Type > * _M_pGreedyMessage
Definition: agents.h:10947
virtual void acquire_ref(_Inout_ ITarget< _Type > *_PTarget)=0
When overridden in a derived class, acquires a reference count on this ISource block, to prevent deletion.
The agent was canceled.
Definition: agents.h:13264
Scheduler * _M_pScheduler
The scheduler to process messages on
Definition: agents.h:2397
virtual void wait()
A processor-specific spin wait used in destructors of message blocks to make sure that all asynchrono...
Definition: agents.h:2189
~transformer()
Destroys the transformer messaging block.
Definition: agents.h:7730
_AnonymousOriginator()
Definition: agents.h:3537
bool _Enqueue(_Message *_Element)
Definition: agents.h:140
timer(unsigned int _Ms, _Type const &_Value, ITarget< _Type > *_PTarget=NULL, bool _Repeating=false)
Constructs a timer messaging block that will fire a given message after a specified interval...
Definition: agents.h:8172
target_block()
Constructs a target_block object.
Definition: agents.h:4473
virtual message< size_t > * accept_message(runtime_object_identity _MsgId)
Accept the message by making a copy of the payload.
Definition: agents.h:10459
virtual void release_message(runtime_object_identity _MsgId)
Releases a previous message reservation.
Definition: agents.h:8406
The timer has been stopped.
Definition: agents.h:8144
#define _InterlockedIncrementSizeT(_Target)
Definition: concrt.h:96
_In_ size_t _Deref_pre_opt_z_ char const ** _PSrc
Definition: wchar.h:78
volatile long _M_fStartable
Definition: agents.h:13500
_Order_node_base const & operator=(_Order_node_base const &)
static _CONCRTIMP _Scheduler __cdecl _Get()
static _CONCRTIMP void __cdecl wait_for_one(size_t _Count, _In_reads_(_Count) agent **_PAgents, agent_status &_Status, size_t &_Index, unsigned int _Timeout=COOPERATIVE_TIMEOUT_INFINITE)
Waits for any one of the specified agents to complete its task.
virtual void run()=0
Represents the main task of an agent. run should be overridden in a derived class, and specifies what the agent should do after it has been started.
virtual void unlink_target(_Inout_ ITarget< _Target_type > *_PTarget)
Unlinks a target block from this source_block object.
Definition: agents.h:4957
virtual message_status propagate_message(_Inout_ message< _Source_type > *_PMessage, _Inout_ ISource< _Source_type > *_PSource)=0
When overridden in a derived class, this method asynchronously passes a message from an ISource block...
message_status propagate_message(_Inout_ message< _Type > *_PMessage, _Inout_ ISource< _Type > *_PSource)
Asynchronously passes a message from an ISource block to this join messaging block. It is invoked by the propagate method, when called by a source block.
Definition: agents.h:9369