STLdoc
STLdocumentation
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
concurrent_queue.h
Go to the documentation of this file.
1 /***
2 * ==++==
3 *
4 * Copyright (c) Microsoft Corporation. All rights reserved.
5 * Microsoft would like to acknowledge that this concurrency data structure implementation
6 * is based on Intel's implementation in its Threading Building Blocks ("Intel Material").
7 *
8 * ==--==
9 * =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
10 *
11 * concurrent_queue.h
12 *
13 * =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
14 ****/
15 
16 /*
17  Intel Material Copyright 2005-2008 Intel Corporation. All Rights Reserved.
18 */
19 
20 #pragma once
21 
22 
23 #include <crtdefs.h>
24 #include <memory>
25 #include <cstddef>
26 #include <crtdbg.h>
27 #include <concrt.h>
28 #include <utility>
29 
30 #define _PPL_CONTAINER
31 
32 #if !(defined (_M_X64) || defined (_M_IX86) || defined (_M_ARM) || defined (_M_ARM64))
33  #error ERROR: Concurrency Runtime is supported only on X64, X86, ARM, and ARM64 architectures.
34 #endif /* !(defined (_M_X64) || defined (_M_IX86) || defined (_M_ARM) || defined (_M_ARM64)) */
35 
36 #if defined (_M_CEE)
37  #error ERROR: Concurrency Runtime is not supported when compiling /clr.
38 #endif /* defined (_M_CEE) */
39 
40 #pragma pack(push,_CRT_PACKING)
41 #pragma warning(push)
42 #pragma warning (disable: 4510 4512 4610) // disable warnings for compiler unable to generate constructor
43 #pragma push_macro("new")
44 #undef new
45 
50 
51 namespace Concurrency
52 {
53 
54 template<typename _Ty, class _Ax = std::allocator<_Ty> >
56 
57 namespace details
58 {
59 
61 
62  typedef size_t _Ticket;
63 
64  class _Concurrent_queue_iterator_rep;
66  template<typename _Container, typename _Value> class _Concurrent_queue_iterator;
67 
68  // Type-independent portion of concurrent_queue.
70  {
71  // Internal representation
73 
74  friend class _Concurrent_queue_rep;
75  friend struct _Micro_queue;
79  protected:
80  // Prefix on a page
81  struct _Page
82  {
84  size_t _Mask;
85  };
86 
87  // Always a power of 2
89 
90  // Size of an item
91  size_t _Item_size;
92 
93  private:
94  virtual void _Move_item( _Page& _Dst, size_t _Index, void* _Src ) = 0;
95  virtual void _Copy_item( _Page& _Dst, size_t _Index, const void* _Src ) = 0;
96  virtual void _Assign_and_destroy_item( void* _Dst, _Page& _Src, size_t _Index ) = 0;
97  protected:
98  _CONCRTIMP _Concurrent_queue_base_v4( size_t _Item_size );
100 
101  // Enqueue item at tail of queue
102  _CONCRTIMP void _Internal_push( const void* _Src );
103 
104  // Enqueue item at tail of queue by move
105  _CONCRTIMP void _Internal_move_push( void* _Src );
106 
107  // swap the internal representation
109 
110  // Attempt to dequeue item from queue.
112  _CONCRTIMP bool _Internal_pop_if_present( void* _Dst );
113 
114  // Get size of queue
115  _CONCRTIMP size_t _Internal_size() const;
116 
117  // Test instantaneous queue empty
118  _CONCRTIMP bool _Internal_empty() const;
119 
120  // custom allocator
121  virtual _Page *_Allocate_page() = 0;
122 
123  // custom de-allocator
124  virtual void _Deallocate_page( _Page *p ) = 0;
125 
126  // free any remaining pages
128 
129  // throw an exception
131 
132  private:
133  // Deny copy construction
135 
136  // Deny assignment
137  void operator=( const _Concurrent_queue_base_v4& );
138  };
139 
141 
142 
143  // A queue using simple locking.
147  {
148  class _Pop_finalizer;
149  class _Push_finalizer;
150 
153 
156 
157  volatile long _Page_mutex_flag;
158 
159  void _Push( void* _Item, _Ticket _K, _Concurrent_queue_base& _Base, void (_Concurrent_queue_base::*moveOp)(_Concurrent_queue_base_v4::_Page&, size_t, void*));
160 
161  bool _Pop( void* _Dest, _Ticket _K, _Concurrent_queue_base& _Base );
162  };
163 
164  // Disable warning C4324: structure was padded due to __declspec(align())
165  // This padding is expected and necessary.
166  #pragma warning(push)
167  #pragma warning(disable: 4324)
168 
169 
170  // Internal representation of a ConcurrentQueue.
174  {
175  private:
176  friend struct _Micro_queue;
177 
178  // Approximately n_queue/golden ratio
179  static const size_t _Phi = 3;
180 
181  public:
182  // Must be power of 2
183  static const size_t _N_queue = 8;
184 
185  // Map ticket to an array index
186  static size_t _Index( _Ticket _K )
187  {
188  return _K*_Phi%_N_queue;
189  }
190 
191  __declspec(align(64))
192  _Subatomic<_Ticket> _Head_counter;
193 
194  __declspec(align(64))
195  _Subatomic<_Ticket> _Tail_counter;
196 
197  __declspec(align(64))
198  _Micro_queue _Array[_N_queue];
199 
200  _Micro_queue& _Choose( _Ticket _K )
201  {
202  // The formula here approximates LRU in a cache-oblivious way.
203  return _Array[_Index(_K)];
204  }
205  };
206 
207  #pragma warning(pop)
208 
209 
210  // Type-independent portion of _Concurrent_queue_iterator.
212  // Concurrentconcurrent_queue over which we are iterating.
214  _Concurrent_queue_iterator_rep* _My_rep;
215 
216  template<typename _C, typename _Ty, typename _U>
218 
219  template<typename _C, typename _Ty, typename _U>
221  protected:
222  // Pointer to current item
223  mutable void* _My_item;
224 
225  // Default constructor
227  : _My_rep(NULL), _My_item(NULL)
228  {
229  }
230 
231  // Copy constructor
233  : _My_rep(NULL), _My_item(NULL)
234  {
235  _Assign(_I);
236  }
237 
238  // Construct iterator pointing to head of queue.
239  _CONCRTIMP _Concurrent_queue_iterator_base_v4( const _Concurrent_queue_base& );
240 
241  // Assignment
243 
244  // Advance iterator one step towards tail of queue.
245  _CONCRTIMP void _Advance();
246 
247  // Destructor
249  };
250 
252 
253  // Meets requirements of a forward iterator for STL.
255  template<typename _Container, typename _Value>
256  class _Concurrent_queue_iterator: public _Concurrent_queue_iterator_base_v4, public std::iterator<std::forward_iterator_tag, _Value>
257  {
258  template<typename _Ty, class _Ax> friend class ::Concurrency::concurrent_queue;
259 
260  // Construct iterator pointing to head of queue.
261  _Concurrent_queue_iterator( const _Concurrent_queue_base& _Queue )
263  {
264  }
265  public:
267  {
268  }
269 
274  {
275  }
276 
277  // Iterator assignment
279  {
280  _Assign(_Other);
281  return *this;
282  }
283 
284  // Reference to current item
285  _Value& operator*() const
286  {
287  return *static_cast<_Value*>(_My_item);
288  }
289 
291  {
292  return &operator*();
293  }
294 
295  // Advance to next item in queue
297  {
298  _Advance();
299  return *this;
300  }
301 
302  // Post increment
304  {
305  _Concurrent_queue_iterator _Result = *this;
306  _Advance();
307  return _Result;
308  }
309  }; // _Concurrent_queue_iterator
310 
311  template<typename _Container, typename _Value>
312  struct std::_Is_checked_helper<_Concurrent_queue_iterator<_Container, _Value> >
313  : public true_type
314  { // mark _Concurrent_queue_iterator as checked. This suppresses warning C4996
315  };
316 
317  template<typename _C, typename _Ty, typename _U>
319  {
320  return _I._My_item==_J._My_item;
321  }
322 
323  template<typename _C, typename _Ty, typename _U>
325  {
326  return _I._My_item!=_J._My_item;
327  }
328 
329 } // namespace details;
330 
331 
348 
349 template<typename _Ty, class _Ax>
350 class concurrent_queue: public ::Concurrency::details::_Concurrent_queue_base_v4
351 {
352  template<typename _Container, typename _Value> friend class ::Concurrency::details::_Concurrent_queue_iterator;
353 
354  // allocator type
355  typedef typename _Ax::template rebind<char>::other _Page_allocator_type;
356  _Page_allocator_type _My_allocator;
357 
358  // Class used to ensure exception-safety of method "pop"
360  {
361  private:
362  _Ty& _My_value;
363 
364  void operator=(const _Destroyer&); // prevent warning: assign operator can't be generated
365  public:
367  : _My_value(_Value)
368  {
369  }
370 
372  {
373  _My_value.~_Ty();
374  }
375  };
376 
377  _Ty& _Get_ref( _Page& _Pg, size_t _Index )
378  {
380  return static_cast<_Ty*>(static_cast<void*>(&_Pg+1))[_Index];
381  }
382 
383  /*override*/ virtual void _Copy_item( _Page& _Dst, size_t _Index, const void* _Src )
384  {
385  new( &_Get_ref(_Dst,_Index) ) _Ty(*static_cast<const _Ty*>(_Src));
386  }
387 
388  /*override*/ virtual void _Move_item( _Page& _Dst, size_t _Index, void* _Src )
389  {
390  new( &_Get_ref(_Dst,_Index) ) _Ty(std::move(*static_cast<_Ty*>(_Src)));
391  }
392 
393  /*override*/ virtual void _Assign_and_destroy_item( void* _Dst, _Page& _Src, size_t _Index )
394  {
395  _Ty& _From = _Get_ref(_Src,_Index);
396  _Destroyer _D(_From);
397  if (_Dst != NULL)
398  {
399  *static_cast<_Ty*>(_Dst) = std::move(_From);
400  }
401  }
402 
403  /*overide*/ virtual _Page *_Allocate_page()
404  {
405  size_t _N = sizeof(_Page) + _Items_per_page*_Item_size;
406  _Page *_Pg = reinterpret_cast<_Page*>(_My_allocator.allocate( _N ));
407  if( !_Pg )
409  return _Pg;
410  }
411 
412  /*override*/ virtual void _Deallocate_page( _Page *_Pg )
413  {
414  size_t _N = sizeof(_Page) + _Items_per_page*_Item_size;
415  _My_allocator.deallocate( reinterpret_cast<char*>(_Pg), _N );
416  }
417 
418 public:
422 
423  typedef _Ty value_type;
424 
428 
429  typedef _Ax allocator_type;
430 
434 
435  typedef _Ty& reference;
436 
441 
442  typedef const _Ty& const_reference;
443 
447 
449 
453 
455 
471 
472  explicit concurrent_queue(const allocator_type &_Al = allocator_type())
473  : _Concurrent_queue_base_v4( sizeof(_Ty) ), _My_allocator( _Al )
474  {
475  }
476 
495 
496  concurrent_queue(const concurrent_queue& _OtherQ, const allocator_type &_Al = allocator_type());
497 
516 
517  concurrent_queue(concurrent_queue&& _OtherQ, const allocator_type &_Al = allocator_type());
518 
540 
541  template<typename _InputIterator>
542  concurrent_queue(_InputIterator _Begin, _InputIterator _End)
543  : _Concurrent_queue_base_v4( sizeof(_Ty) ), _My_allocator( allocator_type() )
544  {
545  while (_Begin != _End)
546  {
547  this->push(*_Begin);
548  ++_Begin;
549  }
550  }
551 
555 
557 
567 
568  void push( const _Ty& _Src )
569  {
570  _Internal_push( &_Src );
571  }
572 
582 
583  void push( _Ty&& _Src )
584  {
585  _Internal_move_push( &_Src );
586  }
587 
605 
606  bool try_pop( _Ty& _Dest )
607  {
608  return _Internal_pop_if_present( &_Dest );
609  }
610 
621 
622  size_type unsafe_size() const
623  {
624  return _Internal_size();
625  }
626 
637 
638  bool empty() const
639  {
640  return _Internal_empty();
641  }
642 
649 
650  allocator_type get_allocator() const
651  {
652  return this->_My_allocator;
653  }
654 
658 
659  void clear();
660 
664 
666 
670 
672 
685 
686  iterator unsafe_begin()
687  {
688  return iterator(*this);
689  }
690 
703 
704  iterator unsafe_end()
705  {
706  return iterator();
707  }
708 
721 
722  const_iterator unsafe_begin() const
723  {
724  return const_iterator(*this);
725  }
726 
739 
740  const_iterator unsafe_end() const
741  {
742  return const_iterator();
743  }
744 };
745 
746 
765 
766 template<typename _Ty, class _Ax>
767 concurrent_queue<_Ty,_Ax>::concurrent_queue(const concurrent_queue& _Queue, const allocator_type& _Al = allocator_type())
768  : _Concurrent_queue_base_v4( sizeof(_Ty) ), _My_allocator(_Al)
769 {
770  concurrent_queue::const_iterator _QEnd = _Queue.unsafe_end();
771  for (concurrent_queue::const_iterator _It = _Queue.unsafe_begin(); _It != _QEnd; ++_It)
772  this->push(*_It);
773 }
774 
790 
791 template<typename _Ty, class _Ax>
792 concurrent_queue<_Ty,_Ax>::concurrent_queue(concurrent_queue&& _Queue, const allocator_type& _Al = allocator_type())
793  : _Concurrent_queue_base_v4( sizeof(_Ty) ), _My_allocator(_Al)
794 {
795  _Internal_swap(_Queue);
796 }
797 
801 
802 template<typename _Ty, class _Ax>
804 {
805  clear();
806  _Internal_finish_clear();
807 }
808 
812 
813 template<typename _Ty, class _Ax>
815 {
816  while( !empty() )
817  {
818  if (!_Internal_pop_if_present(NULL))
819  {
821  break;
822  }
823  }
824 }
825 
826 } // namespace Concurrency
827 
828 namespace concurrency = ::Concurrency;
829 
830 #pragma pop_macro("new")
831 #pragma warning(pop)
832 #pragma pack(pop)
Definition: xtr1common:22
friend class _Concurrent_queue_iterator_rep
Definition: concurrent_queue.h:77
_Value * operator->() const
Definition: concurrent_queue.h:290
_Ax allocator_type
A type that represents the allocator class for the concurrent queue.
Definition: concurrent_queue.h:429
static const size_t _N_queue
Definition: concurrent_queue.h:183
~_Destroyer()
Definition: concurrent_queue.h:371
#define NULL
Definition: vcruntime.h:236
_Ty & _My_value
Definition: concurrent_queue.h:362
size_t _Items_per_page
Definition: concurrent_queue.h:88
Definition: concurrent_queue.h:66
void operator=(const _Concurrent_queue_base_v4 &)
_Concurrent_queue_iterator_base_v4(const _Concurrent_queue_iterator_base_v4 &_I)
Definition: concurrent_queue.h:232
concurrent_queue(const allocator_type &_Al=allocator_type())
Constructs a concurrent queue.
Definition: concurrent_queue.h:472
#define _CONCRT_ASSERT(x)
Definition: concrt.h:123
bool operator==(const _Concurrent_queue_iterator< _C, _Ty > &_I, const _Concurrent_queue_iterator< _C, _U > &_J)
Definition: concurrent_queue.h:318
_Ty value_type
A type that represents the data type stored in a concurrent queue.
Definition: concurrent_queue.h:423
const _Ty & const_reference
A type that provides a reference to a const element stored in a concurrent queue for reading and perf...
Definition: concurrent_queue.h:442
virtual void _Copy_item(_Page &_Dst, size_t _Index, const void *_Src)
Definition: concurrent_queue.h:383
virtual void _Move_item(_Page &_Dst, size_t _Index, void *_Src)=0
size_type unsafe_size() const
Returns the number of items in the queue. This method is not concurrency-safe.
Definition: concurrent_queue.h:622
std::ptrdiff_t difference_type
A type that provides the signed distance between two elements in a concurrent queue.
Definition: concurrent_queue.h:454
static size_t _Index(_Ticket _K)
Definition: concurrent_queue.h:186
_CONCRTIMP void _Internal_throw_exception() const
void * align(size_t _Bound, size_t _Size, void *&_Ptr, size_t &_Space) _NOEXCEPT
Definition: memory:1985
_CONCRTIMP void _Internal_push(const void *_Src)
__declspec(align(64)) _Subatomic< _Ticket > _Head_counter
_Ty & _Get_ref(_Page &_Pg, size_t _Index)
Definition: concurrent_queue.h:377
size_t _Mask
Definition: concurrent_queue.h:84
_Concurrent_queue_iterator operator++(int)
Definition: concurrent_queue.h:303
static const size_t _Phi
Definition: concurrent_queue.h:179
Definition: agents.h:106
The Concurrency namespace provides classes and functions that provide access to the Concurrency Runti...
Definition: agents.h:43
std::size_t size_type
A type that counts the number of elements in a concurrent queue.
Definition: concurrent_queue.h:448
int ptrdiff_t
Definition: vcruntime.h:199
_Concurrent_queue_iterator(const _Concurrent_queue_iterator< _Container, typename _Container::value_type > &_Other)
Definition: concurrent_queue.h:272
Definition: concurrent_queue.h:146
_Destroyer(_Ty &_Value)
Definition: concurrent_queue.h:366
_Concurrent_queue_iterator()
Definition: concurrent_queue.h:266
_Page_allocator_type _My_allocator
Definition: concurrent_queue.h:356
bool try_pop(_Ty &_Dest)
Dequeues an item from the queue if one is available. This method is concurrency-safe.
Definition: concurrent_queue.h:606
_Concurrent_queue_rep * _My_rep
Definition: concurrent_queue.h:72
Definition: concrt.h:483
void _Push(void *_Item, _Ticket _K, _Concurrent_queue_base &_Base, void(_Concurrent_queue_base::*moveOp)(_Concurrent_queue_base_v4::_Page &, size_t, void *))
void clear()
Clears the concurrent queue, destroying any currently enqueued elements. This method is not concurren...
Definition: concurrent_queue.h:814
details::_Concurrent_queue_iterator< concurrent_queue, _Ty > iterator
A type that represents a non-thread-safe iterator over the elements in a concurrent queue...
Definition: concurrent_queue.h:665
_CONCRTIMP void _Assign(const _Concurrent_queue_iterator_base_v4 &)
_Ax::template rebind< char >::other _Page_allocator_type
Definition: concurrent_queue.h:355
virtual void _Deallocate_page(_Page *_Pg)
Definition: concurrent_queue.h:412
allocator_type get_allocator() const
Returns a copy of the allocator used to construct the concurrent queue. This method is concurrency-sa...
Definition: concurrent_queue.h:650
_CONCRTIMP void _Internal_move_push(void *_Src)
_CONCRTIMP _Concurrent_queue_base_v4(size_t _Item_size)
details::_Concurrent_queue_iterator< concurrent_queue, const _Ty > const_iterator
A type that represents a non-thread-safe const iterator over elements in a concurrent queue...
Definition: concurrent_queue.h:671
size_t _Item_size
Definition: concurrent_queue.h:91
size_t _Ticket
Definition: concurrent_queue.h:60
iterator unsafe_end()
Returns an iterator of type iterator or const_iterator to the end of the concurrent queue...
Definition: concurrent_queue.h:704
unsigned int size_t
Definition: sourceannotations.h:19
_Subatomic< _Concurrent_queue_base::_Page * > _Tail_page
Definition: concurrent_queue.h:154
bool _Pop(void *_Dest, _Ticket _K, _Concurrent_queue_base &_Base)
virtual void _Move_item(_Page &_Dst, size_t _Index, void *_Src)
Definition: concurrent_queue.h:388
volatile long _Page_mutex_flag
Definition: concurrent_queue.h:157
const_iterator unsafe_end() const
Returns an iterator of type iterator or const_iterator to the end of the concurrent queue...
Definition: concurrent_queue.h:740
concurrent_queue(_InputIterator _Begin, _InputIterator _End)
Constructs a concurrent queue.
Definition: concurrent_queue.h:542
~concurrent_queue()
Destroys the concurrent queue.
Definition: concurrent_queue.h:803
_Micro_queue & _Choose(_Ticket _K)
Definition: concurrent_queue.h:200
_Concurrent_queue_iterator & operator=(const _Concurrent_queue_iterator &_Other)
Definition: concurrent_queue.h:278
iterator unsafe_begin()
Returns an iterator of type iterator or const_iterator to the beginning of the concurrent queue...
Definition: concurrent_queue.h:686
_CONCRTIMP void _Internal_swap(_Concurrent_queue_base_v4 &other)
virtual void _Assign_and_destroy_item(void *_Dst, _Page &_Src, size_t _Index)=0
_Subatomic< _Ticket > _Head_counter
Definition: concurrent_queue.h:152
_Concurrent_queue_iterator(const _Concurrent_queue_base &_Queue)
Definition: concurrent_queue.h:261
const_iterator unsafe_begin() const
Returns an iterator of type iterator or const_iterator to the beginning of the concurrent queue...
Definition: concurrent_queue.h:722
_Value & operator*() const
Definition: concurrent_queue.h:285
friend bool operator==(const _Concurrent_queue_iterator< _C, _Ty > &, const _Concurrent_queue_iterator< _C, _U > &)
Definition: concurrent_queue.h:318
_Concurrent_queue_iterator & operator++()
Definition: concurrent_queue.h:296
Definition: concurrent_queue.h:69
_Concurrent_queue_iterator_rep * _My_rep
Definition: concurrent_queue.h:214
constexpr auto empty(const _Container &_Cont) -> decltype(_Cont.empty())
Definition: xutility:1492
_Concurrent_queue_iterator_base_v4()
Definition: concurrent_queue.h:226
Definition: concurrent_queue.h:173
virtual _Page * _Allocate_page()
Definition: concurrent_queue.h:403
bool empty() const
Tests if the concurrent queue is empty at the moment this method is called. This method is concurrenc...
Definition: concurrent_queue.h:638
_Subatomic< _Concurrent_queue_base::_Page * > _Head_page
Definition: concurrent_queue.h:149
#define _CONCRTIMP
Definition: crtdefs.h:48
constexpr remove_reference< _Ty >::type && move(_Ty &&_Arg) _NOEXCEPT
Definition: type_traits:1290
void push(const _Ty &_Src)
Enqueues an item at tail end of the concurrent queue. This method is concurrency-safe.
Definition: concurrent_queue.h:568
virtual void _Copy_item(_Page &_Dst, size_t _Index, const void *_Src)=0
friend bool operator!=(const _Concurrent_queue_iterator< _C, _Ty > &, const _Concurrent_queue_iterator< _C, _U > &)
Definition: concurrent_queue.h:324
_Subatomic< _Ticket > _Tail_counter
Definition: concurrent_queue.h:155
virtual void _Assign_and_destroy_item(void *_Dst, _Page &_Src, size_t _Index)
Definition: concurrent_queue.h:393
bool operator!=(const _Concurrent_queue_iterator< _C, _Ty > &_I, const _Concurrent_queue_iterator< _C, _U > &_J)
Definition: concurrent_queue.h:324
_Concurrent_queue_base_v4 _Concurrent_queue_base
Definition: concurrent_queue.h:140
_Ty & reference
A type that provides a reference to an element stored in a concurrent queue.
Definition: concurrent_queue.h:435
void push(_Ty &&_Src)
Enqueues an item at tail end of the concurrent queue. This method is concurrency-safe.
Definition: concurrent_queue.h:583
_In_ int _Value
Definition: setjmp.h:173
friend class _Micro_queue_pop_finalizer
Definition: concurrent_queue.h:76
Definition: concurrent_queue.h:359
_Concurrent_queue_iterator_base_v4 concurrent_queue_iterator_base
Definition: concurrent_queue.h:251
_CONCRTIMP bool _Internal_pop_if_present(void *_Dst)
The concurrent_queue class is a sequence container class that allows first-in, first-out access to it...
Definition: concurrent_queue.h:55
_Page * _Next
Definition: concurrent_queue.h:83
void * _My_item
Definition: concurrent_queue.h:223