STLdoc
STLdocumentation
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
concurrent_queue.h
Go to the documentation of this file.
1 /***
2 * ==++==
3 *
4 * Copyright (c) Microsoft Corporation. All rights reserved.
5 * Microsoft would like to acknowledge that this concurrency data structure implementation
6 * is based on Intel's implementation in its Threading Building Blocks ("Intel Material").
7 *
8 * ==--==
9 * =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
10 *
11 * concurrent_queue.h
12 *
13 * =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
14 ****/
15 
16 /*
17  Intel Material Copyright 2005-2008 Intel Corporation. All Rights Reserved.
18 */
19 
20 #pragma once
21 
22 
23 #include <crtdefs.h>
24 #include <memory>
25 #include <cstddef>
26 #include <crtdbg.h>
27 #include <concrt.h>
28 #include <utility>
29 
30 #define _PPL_CONTAINER
31 
32 #if !(defined (_M_X64) || defined (_M_IX86) || defined (_M_ARM))
33  #error ERROR: Concurrency Runtime is supported only on X64, X86, and ARM architectures.
34 #endif /* !(defined (_M_X64) || defined (_M_IX86) || defined (_M_ARM)) */
35 
36 #if defined (_M_CEE)
37  #error ERROR: Concurrency Runtime is not supported when compiling /clr.
38 #endif /* defined (_M_CEE) */
39 
40 #pragma pack(push,_CRT_PACKING)
41 #pragma warning(push)
42 #pragma warning (disable: 4510 4512 4610) // disable warnings for compiler unable to generate constructor
43 
44 
49 
50 namespace Concurrency
51 {
52 
53 template<typename _Ty, class _Ax = std::allocator<_Ty> >
55 
56 namespace details
57 {
58 
60 
61  typedef size_t _Ticket;
62 
63  class _Concurrent_queue_iterator_rep;
65  template<typename _Container, typename _Value> class _Concurrent_queue_iterator;
66 
67  // Type-independent portion of concurrent_queue.
69  {
70  // Internal representation
72 
73  friend class _Concurrent_queue_rep;
74  friend struct _Micro_queue;
78  protected:
79  // Prefix on a page
80  struct _Page
81  {
83  size_t _Mask;
84  };
85 
86  // Always a power of 2
88 
89  // Size of an item
90  size_t _Item_size;
91 
92  private:
93  virtual void _Move_item( _Page& _Dst, size_t _Index, void* _Src ) = 0;
94  virtual void _Copy_item( _Page& _Dst, size_t _Index, const void* _Src ) = 0;
95  virtual void _Assign_and_destroy_item( void* _Dst, _Page& _Src, size_t _Index ) = 0;
96  protected:
97  _CRTIMP2 _Concurrent_queue_base_v4( size_t _Item_size );
99 
100  // Enqueue item at tail of queue
101  _CRTIMP2 void _Internal_push( const void* _Src );
102 
103  // Enqueue item at tail of queue by move
104  _CRTIMP2 void _Internal_move_push( void* _Src );
105 
106  // swap the internal representation
108 
109  // Attempt to dequeue item from queue.
111  _CRTIMP2 bool _Internal_pop_if_present( void* _Dst );
112 
113  // Get size of queue
114  _CRTIMP2 size_t _Internal_size() const;
115 
116  // Test instantaneous queue empty
117  _CRTIMP2 bool _Internal_empty() const;
118 
119  // custom allocator
120  virtual _Page *_Allocate_page() = 0;
121 
122  // custom de-allocator
123  virtual void _Deallocate_page( _Page *p ) = 0;
124 
125  // free any remaining pages
127 
128  // throw an exception
129  _CRTIMP2 void _Internal_throw_exception() const;
130 
131  private:
132  // Deny copy construction
134 
135  // Deny assignment
136  void operator=( const _Concurrent_queue_base_v4& );
137  };
138 
140 
141 
142  // A queue using simple locking.
146  {
147  class _Pop_finalizer;
148  class _Push_finalizer;
149 
152 
155 
156  volatile long _Page_mutex_flag;
157 
158  void _Push( void* _Item, _Ticket _K, _Concurrent_queue_base& _Base, void (_Concurrent_queue_base::*moveOp)(_Concurrent_queue_base_v4::_Page&, size_t, void*));
159 
160  bool _Pop( void* _Dest, _Ticket _K, _Concurrent_queue_base& _Base );
161  };
162 
163  // Disable warning C4324: structure was padded due to __declspec(align())
164  // This padding is expected and necessary.
165  #pragma warning(push)
166  #pragma warning(disable: 4324)
167 
168 
169  // Internal representation of a ConcurrentQueue.
173  {
174  private:
175  friend struct _Micro_queue;
176 
177  // Approximately n_queue/golden ratio
178  static const size_t _Phi = 3;
179 
180  public:
181  // Must be power of 2
182  static const size_t _N_queue = 8;
183 
184  // Map ticket to an array index
185  static size_t _Index( _Ticket _K )
186  {
187  return _K*_Phi%_N_queue;
188  }
189 
190  __declspec(align(64))
191  _Subatomic<_Ticket> _Head_counter;
192 
193  __declspec(align(64))
194  _Subatomic<_Ticket> _Tail_counter;
195 
196  __declspec(align(64))
197  _Micro_queue _Array[_N_queue];
198 
199  _Micro_queue& _Choose( _Ticket _K )
200  {
201  // The formula here approximates LRU in a cache-oblivious way.
202  return _Array[_Index(_K)];
203  }
204  };
205 
206  #pragma warning(pop)
207 
208 
209  // Type-independent portion of _Concurrent_queue_iterator.
211  // Concurrentconcurrent_queue over which we are iterating.
213  _Concurrent_queue_iterator_rep* _My_rep;
214 
215  template<typename _C, typename _Ty, typename _U>
217 
218  template<typename _C, typename _Ty, typename _U>
220  protected:
221  // Pointer to current item
222  mutable void* _My_item;
223 
224  // Default constructor
226  : _My_rep(NULL), _My_item(NULL)
227  {
228  }
229 
230  // Copy constructor
232  : _My_rep(NULL), _My_item(NULL)
233  {
234  _Assign(_I);
235  }
236 
237  // Construct iterator pointing to head of queue.
238  _CRTIMP2 _Concurrent_queue_iterator_base_v4( const _Concurrent_queue_base& );
239 
240  // Assignment
242 
243  // Advance iterator one step towards tail of queue.
244  _CRTIMP2 void _Advance();
245 
246  // Destructor
248  };
249 
251 
252  // Meets requirements of a forward iterator for STL.
254  template<typename _Container, typename _Value>
255  class _Concurrent_queue_iterator: public _Concurrent_queue_iterator_base_v4, public std::iterator<std::forward_iterator_tag, _Value>
256  {
257  template<typename _Ty, class _Ax> friend class ::Concurrency::concurrent_queue;
258 
259  // Construct iterator pointing to head of queue.
260  _Concurrent_queue_iterator( const _Concurrent_queue_base& _Queue )
262  {
263  }
264  public:
266  {
267  }
268 
273  {
274  }
275 
276  // Iterator assignment
278  {
279  _Assign(_Other);
280  return *this;
281  }
282 
283  // Reference to current item
284  _Value& operator*() const
285  {
286  return *static_cast<_Value*>(_My_item);
287  }
288 
290  {
291  return &operator*();
292  }
293 
294  // Advance to next item in queue
296  {
297  _Advance();
298  return *this;
299  }
300 
301  // Post increment
303  {
304  _Concurrent_queue_iterator _Result = *this;
305  _Advance();
306  return _Result;
307  }
308  }; // _Concurrent_queue_iterator
309 
310  template<typename _Container, typename _Value>
311  struct std::_Is_checked_helper<_Concurrent_queue_iterator<_Container, _Value> >
312  : public true_type
313  { // mark _Concurrent_queue_iterator as checked. This suppresses warning C4996
314  };
315 
316  template<typename _C, typename _Ty, typename _U>
318  {
319  return _I._My_item==_J._My_item;
320  }
321 
322  template<typename _C, typename _Ty, typename _U>
324  {
325  return _I._My_item!=_J._My_item;
326  }
327 
328 } // namespace details;
329 
330 
347 
348 template<typename _Ty, class _Ax>
349 class concurrent_queue: public ::Concurrency::details::_Concurrent_queue_base_v4
350 {
351  template<typename _Container, typename _Value> friend class ::Concurrency::details::_Concurrent_queue_iterator;
352 
353  // allocator type
354  typedef typename _Ax::template rebind<char>::other _Page_allocator_type;
355  _Page_allocator_type _My_allocator;
356 
357  // Class used to ensure exception-safety of method "pop"
359  {
360  private:
361  _Ty& _My_value;
362 
363  void operator=(const _Destroyer&); // prevent warning: assign operator can't be generated
364  public:
366  : _My_value(_Value)
367  {
368  }
369 
371  {
372  _My_value.~_Ty();
373  }
374  };
375 
376  _Ty& _Get_ref( _Page& _Pg, size_t _Index )
377  {
379  return static_cast<_Ty*>(static_cast<void*>(&_Pg+1))[_Index];
380  }
381 
382  /*override*/ virtual void _Copy_item( _Page& _Dst, size_t _Index, const void* _Src )
383  {
384  new( &_Get_ref(_Dst,_Index) ) _Ty(*static_cast<const _Ty*>(_Src));
385  }
386 
387  /*override*/ virtual void _Move_item( _Page& _Dst, size_t _Index, void* _Src )
388  {
389  new( &_Get_ref(_Dst,_Index) ) _Ty(std::move(*static_cast<_Ty*>(_Src)));
390  }
391 
392  /*override*/ virtual void _Assign_and_destroy_item( void* _Dst, _Page& _Src, size_t _Index )
393  {
394  _Ty& _From = _Get_ref(_Src,_Index);
395  _Destroyer _D(_From);
396  if (_Dst != NULL)
397  {
398  *static_cast<_Ty*>(_Dst) = std::move(_From);
399  }
400  }
401 
402  /*overide*/ virtual _Page *_Allocate_page()
403  {
404  size_t _N = sizeof(_Page) + _Items_per_page*_Item_size;
405  _Page *_Pg = reinterpret_cast<_Page*>(_My_allocator.allocate( _N ));
406  if( !_Pg )
408  return _Pg;
409  }
410 
411  /*override*/ virtual void _Deallocate_page( _Page *_Pg )
412  {
413  size_t _N = sizeof(_Page) + _Items_per_page*_Item_size;
414  _My_allocator.deallocate( reinterpret_cast<char*>(_Pg), _N );
415  }
416 
417 public:
421 
422  typedef _Ty value_type;
423 
427 
428  typedef _Ax allocator_type;
429 
433 
434  typedef _Ty& reference;
435 
440 
441  typedef const _Ty& const_reference;
442 
446 
448 
452 
454 
470 
471  explicit concurrent_queue(const allocator_type &_Al = allocator_type())
472  : _Concurrent_queue_base_v4( sizeof(_Ty) ), _My_allocator( _Al )
473  {
474  }
475 
494 
495  concurrent_queue(const concurrent_queue& _OtherQ, const allocator_type &_Al = allocator_type());
496 
515 
516  concurrent_queue(concurrent_queue&& _OtherQ, const allocator_type &_Al = allocator_type());
517 
539 
540  template<typename _InputIterator>
541  concurrent_queue(_InputIterator _Begin, _InputIterator _End)
542  : _Concurrent_queue_base_v4( sizeof(_Ty) ), _My_allocator( allocator_type() )
543  {
544  while (_Begin != _End)
545  {
546  this->push(*_Begin);
547  ++_Begin;
548  }
549  }
550 
554 
556 
566 
567  void push( const _Ty& _Src )
568  {
569  _Internal_push( &_Src );
570  }
571 
581 
582  void push( _Ty&& _Src )
583  {
585  }
586 
604 
605  bool try_pop( _Ty& _Dest )
606  {
607  return _Internal_pop_if_present( &_Dest );
608  }
609 
620 
621  size_type unsafe_size() const
622  {
623  return _Internal_size();
624  }
625 
636 
637  bool empty() const
638  {
639  return _Internal_empty();
640  }
641 
648 
649  allocator_type get_allocator() const
650  {
651  return this->_My_allocator;
652  }
653 
657 
658  void clear();
659 
663 
665 
669 
671 
684 
685  iterator unsafe_begin()
686  {
687  return iterator(*this);
688  }
689 
702 
703  iterator unsafe_end()
704  {
705  return iterator();
706  }
707 
720 
721  const_iterator unsafe_begin() const
722  {
723  return const_iterator(*this);
724  }
725 
738 
739  const_iterator unsafe_end() const
740  {
741  return const_iterator();
742  }
743 };
744 
745 
764 
765 template<typename _Ty, class _Ax>
766 concurrent_queue<_Ty,_Ax>::concurrent_queue(const concurrent_queue& _Queue, const allocator_type& _Al = allocator_type())
767  : _Concurrent_queue_base_v4( sizeof(_Ty) ), _My_allocator(_Al)
768 {
769  concurrent_queue::const_iterator _QEnd = _Queue.unsafe_end();
770  for (concurrent_queue::const_iterator _It = _Queue.unsafe_begin(); _It != _QEnd; ++_It)
771  this->push(*_It);
772 }
773 
789 
790 template<typename _Ty, class _Ax>
791 concurrent_queue<_Ty,_Ax>::concurrent_queue(concurrent_queue&& _Queue, const allocator_type& _Al = allocator_type())
792  : _Concurrent_queue_base_v4( sizeof(_Ty) ), _My_allocator(_Al)
793 {
794  _Internal_swap(_Queue);
795 }
796 
800 
801 template<typename _Ty, class _Ax>
803 {
804  clear();
805  _Internal_finish_clear();
806 }
807 
811 
812 template<typename _Ty, class _Ax>
814 {
815  while( !empty() )
816  {
817  if (!_Internal_pop_if_present(NULL))
818  {
820  break;
821  }
822  }
823 }
824 
825 } // namespace Concurrency
826 
827 namespace concurrency = Concurrency;
828 
829 #pragma warning(pop)
830 #pragma pack(pop)
Definition: xtr1common:34
friend class _Concurrent_queue_iterator_rep
Definition: concurrent_queue.h:76
_Value * operator->() const
Definition: concurrent_queue.h:289
_Ax allocator_type
A type that represents the allocator class for the concurrent queue.
Definition: concurrent_queue.h:428
static const size_t _N_queue
Definition: concurrent_queue.h:182
_CRTIMP2 void _Concurrent_queue_base_v4::_Internal_swap(_Concurrent_queue_base_v4 &other)
~_Destroyer()
Definition: concurrent_queue.h:370
_Ty & _My_value
Definition: concurrent_queue.h:361
size_t _Items_per_page
Definition: concurrent_queue.h:87
_CRTIMP _In_ int _Value
Definition: setjmp.h:190
Definition: concurrent_queue.h:65
void operator=(const _Concurrent_queue_base_v4 &)
_Concurrent_queue_iterator_base_v4(const _Concurrent_queue_iterator_base_v4 &_I)
Definition: concurrent_queue.h:231
concurrent_queue(const allocator_type &_Al=allocator_type())
Constructs a concurrent queue.
Definition: concurrent_queue.h:471
#define _CONCRT_ASSERT(x)
Definition: concrt.h:137
bool operator==(const _Concurrent_queue_iterator< _C, _Ty > &_I, const _Concurrent_queue_iterator< _C, _U > &_J)
Definition: concurrent_queue.h:317
_Ty value_type
A type that represents the data type stored in a concurrent queue.
Definition: concurrent_queue.h:422
const _Ty & const_reference
A type that provides a reference to a const element stored in a concurrent queue for reading and perf...
Definition: concurrent_queue.h:441
virtual void _Copy_item(_Page &_Dst, size_t _Index, const void *_Src)
Definition: concurrent_queue.h:382
_W64 unsigned int size_t
Definition: crtdefs.h:496
virtual void _Move_item(_Page &_Dst, size_t _Index, void *_Src)=0
size_type unsafe_size() const
Returns the number of items in the queue. This method is not concurrency-safe.
Definition: concurrent_queue.h:621
std::ptrdiff_t difference_type
A type that provides the signed distance between two elements in a concurrent queue.
Definition: concurrent_queue.h:453
static size_t _Index(_Ticket _K)
Definition: concurrent_queue.h:185
void * align(size_t _Bound, size_t _Size, void *&_Ptr, size_t &_Space) _NOEXCEPT
Definition: memory:1951
_N
Definition: wchar.h:1269
__declspec(align(64)) _Subatomic< _Ticket > _Head_counter
_Ty & _Get_ref(_Page &_Pg, size_t _Index)
Definition: concurrent_queue.h:376
_OutIt move(_InIt _First, _InIt _Last, _OutIt _Dest)
Definition: xutility:2447
size_t _Mask
Definition: concurrent_queue.h:83
_Concurrent_queue_iterator operator++(int)
Definition: concurrent_queue.h:302
static const size_t _Phi
Definition: concurrent_queue.h:178
Definition: agents.h:105
The Concurrency namespace provides classes and functions that provide access to the Concurrency Runti...
Definition: agents.h:42
std::size_t size_type
A type that counts the number of elements in a concurrent queue.
Definition: concurrent_queue.h:447
static void empty(void)
Definition: ivec.h:829
_Concurrent_queue_iterator(const _Concurrent_queue_iterator< _Container, typename _Container::value_type > &_Other)
Definition: concurrent_queue.h:271
_CRTIMP2 void _Internal_push(const void *_Src)
Definition: concurrent_queue.h:145
#define NULL
Definition: crtdbg.h:30
_Destroyer(_Ty &_Value)
Definition: concurrent_queue.h:365
_Concurrent_queue_iterator()
Definition: concurrent_queue.h:265
_Page_allocator_type _My_allocator
Definition: concurrent_queue.h:355
bool try_pop(_Ty &_Dest)
Dequeues an item from the queue if one is available. This method is concurrency-safe.
Definition: concurrent_queue.h:605
_Concurrent_queue_rep * _My_rep
Definition: concurrent_queue.h:71
Definition: concrt.h:497
void _Push(void *_Item, _Ticket _K, _Concurrent_queue_base &_Base, void(_Concurrent_queue_base::*moveOp)(_Concurrent_queue_base_v4::_Page &, size_t, void *))
void clear()
Clears the concurrent queue, destroying any currently enqueued elements. This method is not concurren...
Definition: concurrent_queue.h:813
details::_Concurrent_queue_iterator< concurrent_queue, _Ty > iterator
A type that represents a non-thread-safe iterator over the elements in a concurrent queue...
Definition: concurrent_queue.h:664
_Ax::template rebind< char >::other _Page_allocator_type
Definition: concurrent_queue.h:354
virtual void _Deallocate_page(_Page *_Pg)
Definition: concurrent_queue.h:411
allocator_type get_allocator() const
Returns a copy of the allocator used to construct the concurrent queue. This method is concurrency-sa...
Definition: concurrent_queue.h:649
details::_Concurrent_queue_iterator< concurrent_queue, const _Ty > const_iterator
A type that represents a non-thread-safe const iterator over elements in a concurrent queue...
Definition: concurrent_queue.h:670
size_t _Item_size
Definition: concurrent_queue.h:90
_In_ size_t _In_z_ const unsigned char * _Src
Definition: mbstring.h:95
size_t _Ticket
Definition: concurrent_queue.h:59
iterator unsafe_end()
Returns an iterator of type iterator or const_iterator to the end of the concurrent queue...
Definition: concurrent_queue.h:703
_W64 int ptrdiff_t
Definition: crtdefs.h:530
_Subatomic< _Concurrent_queue_base::_Page * > _Tail_page
Definition: concurrent_queue.h:153
bool _Pop(void *_Dest, _Ticket _K, _Concurrent_queue_base &_Base)
virtual void _Move_item(_Page &_Dst, size_t _Index, void *_Src)
Definition: concurrent_queue.h:387
volatile long _Page_mutex_flag
Definition: concurrent_queue.h:156
const_iterator unsafe_end() const
Returns an iterator of type iterator or const_iterator to the end of the concurrent queue...
Definition: concurrent_queue.h:739
concurrent_queue(_InputIterator _Begin, _InputIterator _End)
Constructs a concurrent queue.
Definition: concurrent_queue.h:541
~concurrent_queue()
Destroys the concurrent queue.
Definition: concurrent_queue.h:802
_Micro_queue & _Choose(_Ticket _K)
Definition: concurrent_queue.h:199
_Concurrent_queue_iterator & operator=(const _Concurrent_queue_iterator &_Other)
Definition: concurrent_queue.h:277
iterator unsafe_begin()
Returns an iterator of type iterator or const_iterator to the beginning of the concurrent queue...
Definition: concurrent_queue.h:685
virtual void _Assign_and_destroy_item(void *_Dst, _Page &_Src, size_t _Index)=0
_Subatomic< _Ticket > _Head_counter
Definition: concurrent_queue.h:151
_Concurrent_queue_iterator(const _Concurrent_queue_base &_Queue)
Definition: concurrent_queue.h:260
const_iterator unsafe_begin() const
Returns an iterator of type iterator or const_iterator to the beginning of the concurrent queue...
Definition: concurrent_queue.h:721
_Value & operator*() const
Definition: concurrent_queue.h:284
friend bool operator==(const _Concurrent_queue_iterator< _C, _Ty > &, const _Concurrent_queue_iterator< _C, _U > &)
Definition: concurrent_queue.h:317
_Concurrent_queue_iterator & operator++()
Definition: concurrent_queue.h:295
Definition: concurrent_queue.h:68
_Concurrent_queue_iterator_rep * _My_rep
Definition: concurrent_queue.h:213
_CRTIMP2 _Concurrent_queue_base_v4(size_t _Item_size)
_Concurrent_queue_iterator_base_v4()
Definition: concurrent_queue.h:225
#define _CRTIMP2
Definition: crtdefs.h:126
Definition: concurrent_queue.h:172
virtual _Page * _Allocate_page()
Definition: concurrent_queue.h:402
bool empty() const
Tests if the concurrent queue is empty at the moment this method is called. This method is concurrenc...
Definition: concurrent_queue.h:637
_Subatomic< _Concurrent_queue_base::_Page * > _Head_page
Definition: concurrent_queue.h:148
void push(const _Ty &_Src)
Enqueues an item at tail end of the concurrent queue. This method is concurrency-safe.
Definition: concurrent_queue.h:567
virtual void _Copy_item(_Page &_Dst, size_t _Index, const void *_Src)=0
friend bool operator!=(const _Concurrent_queue_iterator< _C, _Ty > &, const _Concurrent_queue_iterator< _C, _U > &)
Definition: concurrent_queue.h:323
_Subatomic< _Ticket > _Tail_counter
Definition: concurrent_queue.h:154
virtual void _Assign_and_destroy_item(void *_Dst, _Page &_Src, size_t _Index)
Definition: concurrent_queue.h:392
bool operator!=(const _Concurrent_queue_iterator< _C, _Ty > &_I, const _Concurrent_queue_iterator< _C, _U > &_J)
Definition: concurrent_queue.h:323
_Concurrent_queue_base_v4 _Concurrent_queue_base
Definition: concurrent_queue.h:139
_Ty & reference
A type that provides a reference to an element stored in a concurrent queue.
Definition: concurrent_queue.h:434
_CRTIMP2 void _Assign(const _Concurrent_queue_iterator_base_v4 &)
void push(_Ty &&_Src)
Enqueues an item at tail end of the concurrent queue. This method is concurrency-safe.
Definition: concurrent_queue.h:582
_CRTIMP2 bool _Internal_pop_if_present(void *_Dst)
friend class _Micro_queue_pop_finalizer
Definition: concurrent_queue.h:75
Definition: concurrent_queue.h:358
_Concurrent_queue_iterator_base_v4 concurrent_queue_iterator_base
Definition: concurrent_queue.h:250
The concurrent_queue class is a sequence container class that allows first-in, first-out access to it...
Definition: concurrent_queue.h:54
_Page * _Next
Definition: concurrent_queue.h:82
void * _My_item
Definition: concurrent_queue.h:222