STXXL  1.4-dev
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
parallel_priority_queue.h
Go to the documentation of this file.
1 /***************************************************************************
2  * include/stxxl/bits/containers/parallel_priority_queue.h
3  *
4  * Part of the STXXL. See http://stxxl.sourceforge.net
5  *
6  * Copyright (C) 2014-2015 Thomas Keh <[email protected]>
7  * Copyright (C) 2014-2015 Timo Bingmann <[email protected]>
8  *
9  * Distributed under the Boost Software License, Version 1.0.
10  * (See accompanying file LICENSE_1_0.txt or copy at
11  * http://www.boost.org/LICENSE_1_0.txt)
12  **************************************************************************/
13 
14 #ifndef STXXL_CONTAINERS_PARALLEL_PRIORITY_QUEUE_HEADER
15 #define STXXL_CONTAINERS_PARALLEL_PRIORITY_QUEUE_HEADER
16 
17 #include <algorithm>
18 #include <cassert>
19 #include <cstddef>
20 #include <cstdlib>
21 #include <ctime>
22 #include <list>
23 #include <utility>
24 #include <numeric>
25 #include <vector>
26 
27 #if STXXL_PARALLEL
28  #include <omp.h>
29 #endif
30 
31 #if __cplusplus >= 201103L
32 #define STXXL_MOVE(T) std::move(T)
33 #else
34 #define STXXL_MOVE(T) T
35 #endif
36 
43 #include <stxxl/bits/common/rand.h>
44 #include <stxxl/bits/config.h>
52 #include <stxxl/bits/namespace.h>
53 #include <stxxl/bits/noncopyable.h>
54 #include <stxxl/bits/parallel.h>
55 #include <stxxl/bits/verbose.h>
56 #include <stxxl/types>
57 
59 
60 namespace ppq_local {
61 
62 /*!
63  * A random-access iterator class for block oriented data. The iterator is
64  * intended to be provided by the internal_array and external_array classes
65  * and to be used by the multiway_merge algorithm as input iterators.
66  *
67  * \tparam ValueType the value type
68  */
69 template <class ValueType>
71 {
72 public:
73  typedef ValueType value_type;
75  typedef value_type* pointer;
76  typedef ptrdiff_t difference_type;
77  typedef std::random_access_iterator_tag iterator_category;
78  typedef std::vector<std::pair<pointer, pointer> > block_pointers_type;
79 
80 protected:
82 
83  //! pointer to a vector of begin/end pointer pairs
84  //! They allow access to the data blocks.
86 
87  //! pointer to the current element
89 
90  //! index of the current element
91  size_t m_index;
92 
93  //! index of the current element's block
94  size_t m_block_index;
95 
96  //! size of each data block
97  size_t m_block_items;
98 
99 public:
100  //! default constructor (should not be used directly)
102  : m_block_pointers(NULL)
103  { }
104 
105  //! constructor
106  //!
107  //! \param block_pointers A reference to the properly initialized vector of begin and end pointers.
108  //! One pair for each block. The pointers should be valid for all blocks that
109  //! are expected to be accessed with this iterator.
110  //! \param block_items The size of a single block. If there is only one block (e.g. if the iterator
111  //! belongs to an internal_array), use the total size here.
112  //! \param index The index of the current element (global - index 0 belongs to the first element
113  //! in the first block, no matter if the values are still valid)
114  ppq_iterator(const block_pointers_type* block_pointers, size_t block_items,
115  size_t index)
116  : m_block_pointers(block_pointers),
117  m_index(index),
118  m_block_items(block_items)
119  {
120  update();
121  }
122 
123  //! returns the value's index in the internal or external array
124  size_t get_index() const
125  {
126  return m_index;
127  }
128 
129  reference operator * () const
130  {
131  assert(m_current);
132  return *m_current;
133  }
134 
135  pointer operator -> () const
136  {
137  return &(operator * ());
138  }
139 
140  reference operator [] (difference_type relative_index) const
141  {
142  const difference_type index = m_index + relative_index;
143 
144  const size_t block_index = index / m_block_items;
145  const size_t local_index = index % m_block_items;
146 
147  assert(block_index < m_block_pointers->size());
148  assert((*m_block_pointers)[block_index].first + local_index
149  < (*m_block_pointers)[block_index].second);
150 
151  return *((*m_block_pointers)[block_index].first + local_index);
152  }
153 
154  //! prefix-increment operator
156  {
157  ++m_index;
158  ++m_current;
159 
160  if (UNLIKELY(m_current == (*m_block_pointers)[m_block_index].second)) {
161  if (m_block_index + 1 < m_block_pointers->size()) {
162  m_current = (*m_block_pointers)[++m_block_index].first;
163  }
164  else {
165  // global end
166  assert(m_block_index + 1 == m_block_pointers->size());
167  m_current = (*m_block_pointers)[m_block_index++].second;
168  }
169  }
170 
171  return *this;
172  }
173  //! prefix-decrement operator
175  {
176  assert(m_index > 0);
177  --m_index;
178 
179  if (m_block_index >= m_block_pointers->size()
180  || m_current == (*m_block_pointers)[m_block_index].first) {
181  // begin of current block or global end
182  assert(m_block_index > 0);
183  assert(m_block_index <= m_block_pointers->size());
184  m_current = (*m_block_pointers)[--m_block_index].second - 1;
185  }
186  else {
187  --m_current;
188  }
189 
190  return *this;
191  }
192 
193  self_type operator + (difference_type addend) const
194  {
195  return self_type(m_block_pointers, m_block_items, m_index + addend);
196  }
198  {
199  m_index += addend;
200  update();
201  return *this;
202  }
203  self_type operator - (difference_type subtrahend) const
204  {
205  return self_type(m_block_pointers, m_block_items, m_index - subtrahend);
206  }
207  difference_type operator - (const self_type& o) const
208  {
209  return (m_index - o.m_index);
210  }
211  self_type& operator -= (difference_type subtrahend)
212  {
213  m_index -= subtrahend;
214  update();
215  return *this;
216  }
217  bool operator == (const self_type& o) const
218  {
219  return m_index == o.m_index;
220  }
221  bool operator != (const self_type& o) const
222  {
223  return m_index != o.m_index;
224  }
225  bool operator < (const self_type& o) const
226  {
227  return m_index < o.m_index;
228  }
229  bool operator <= (const self_type& o) const
230  {
231  return m_index <= o.m_index;
232  }
233  bool operator > (const self_type& o) const
234  {
235  return m_index > o.m_index;
236  }
237  bool operator >= (const self_type& o) const
238  {
239  return m_index >= o.m_index;
240  }
241 
242  friend std::ostream& operator << (std::ostream& os, const ppq_iterator& i)
243  {
244  return os << "[" << i.m_index << "]";
245  }
246 
247 private:
248  //! updates m_block_index and m_current based on m_index
249  inline void update()
250  {
251  m_block_index = m_index / m_block_items;
252  const size_t local_index = m_index % m_block_items;
253 
254  if (m_block_index < m_block_pointers->size()) {
255  m_current = (*m_block_pointers)[m_block_index].first + local_index;
256  assert(m_current <= (*m_block_pointers)[m_block_index].second);
257  }
258  else {
259  // global end if end is beyond the last real block
260  assert(m_block_index == m_block_pointers->size());
261  assert(local_index == 0);
262  //-tb old: m_current = (*m_block_pointers)[m_block_index - 1].second;
263  m_current = NULL;
264  }
265  }
266 };
267 
268 /*!
269  * Internal arrays store a sorted sequence of values in RAM, which will be
270  * merged together into the deletion buffer when it needs to be
271  * refilled. Internal arrays are constructed from the insertions heaps when
272  * they overflow.
273  */
274 template <class ValueType>
276 {
277 public:
278  typedef ValueType value_type;
280 
281 protected:
283 
284  //! Contains the items of the sorted sequence.
285  std::vector<value_type> m_values;
286 
287  //! Index of the current head
289 
290  //! Level of internal array (Sander's PQ: group number)
292 
293  //! Begin and end pointers of the array
294  //! This is used by the iterator
296 
297 public:
298  //! Default constructor. Don't use this directy. Needed for regrowing in
299  //! surrounding vector.
300  internal_array() : m_min_index(0) { }
301 
302  //! Constructor which takes a value vector. The value vector is empty
303  //! afterwards.
304  internal_array(std::vector<value_type>& values,
305  unsigned_type min_index = 0,
306  unsigned_type level = 0)
307  : m_values(), m_min_index(min_index), m_level(level),
308  m_block_pointers(1)
309  {
310  std::swap(m_values, values);
311  STXXL_ASSERT(values.size() > 0);
312  m_block_pointers[0] = std::make_pair(&(*m_values.begin()), &(*m_values.begin()) + m_values.size());
313  }
314 
315  //! Swap internal_array with another one.
317  {
318  using std::swap;
319 
320  swap(m_values, o.m_values);
321  swap(m_min_index, o.m_min_index);
322  swap(m_level, o.m_level);
323  swap(m_block_pointers, o.m_block_pointers);
324  }
325 
326  //! Swap internal_array with another one.
327  friend void swap(internal_array& a, internal_array& b)
328  {
329  a.swap(b);
330  }
331 
332  //! Random access operator
333  inline value_type& operator [] (size_t i)
334  {
335  return m_values[i];
336  }
337 
338  //! Use inc_min(diff) if multiple values have been extracted.
339  inline void inc_min(size_t diff = 1)
340  {
341  m_min_index += diff;
342  }
343 
344  //! The currently smallest element in the array.
345  inline const value_type & get_min() const
346  {
347  return m_values[m_min_index];
348  }
349 
350  //! The index of the currently smallest element in the array.
351  inline size_t get_min_index() const
352  {
353  return m_min_index;
354  }
355 
356  //! The index of the largest element in the array.
357  inline size_t get_max_index() const
358  {
359  return (m_values.size() - 1);
360  }
361 
362  //! Returns if the array has run empty.
363  inline bool empty() const
364  {
365  return (m_min_index >= m_values.size());
366  }
367 
368  //! Make this array empty.
369  inline void make_empty()
370  {
371  m_min_index = m_values.size();
372  }
373 
374  //! Returns the current size of the array.
375  inline size_t size() const
376  {
377  return (m_values.size() - m_min_index);
378  }
379 
380  //! Returns the initial size of the array.
381  inline size_t capacity() const
382  {
383  return m_values.size();
384  }
385 
386  //! Returns the level (group number) of the array.
387  inline unsigned_type level() const
388  {
389  return m_level;
390  }
391 
392  //! Return the amount of internal memory used by an array with the capacity
393  //! in number of items.
394  static size_t int_memory(size_t capacity)
395  {
396  return sizeof(internal_array) + capacity * sizeof(value_type);
397  }
398 
399  //! Return the amount of internal memory used by the array
400  inline size_t int_memory() const
401  {
402  return int_memory(m_values.capacity());
403  }
404 
405  //! Begin iterator
406  inline iterator begin() const
407  {
408  // not const, unfortunately.
409  return iterator(&m_block_pointers, capacity(), m_min_index);
410  }
411 
412  //! End iterator
413  inline iterator end() const
414  {
415  // not const, unfortunately.
416  return iterator(&m_block_pointers, capacity(), capacity());
417  }
418 };
419 
420 template <class ExternalArrayType>
422 
423 /*!
424  * External array stores a sorted sequence of values on the hard disk and
425  * allows access to the first block (containing the smallest values). The
426  * class uses buffering and prefetching in order to improve the performance.
427  *
428  * \tparam ValueType Type of the contained objects (POD with no references to
429  * internal memory).
430  *
431  * \tparam BlockSize External block size. Default =
432  * STXXL_DEFAULT_BLOCK_SIZE(ValueType).
433  *
434  * \tparam AllocStrategy Allocation strategy for the external memory. Default =
435  * STXXL_DEFAULT_ALLOC_STRATEGY.
436  */
437 template <
438  class ValueType,
439  unsigned_type BlockSize = STXXL_DEFAULT_BLOCK_SIZE(ValueType),
440  class AllocStrategy = STXXL_DEFAULT_ALLOC_STRATEGY
441  >
443 {
444 public:
445  typedef ValueType value_type;
447 
451  typedef std::vector<BID<BlockSize> > bid_vector;
452  typedef typename bid_vector::iterator bid_iterator;
453  typedef std::vector<block_type*> block_vector;
454  typedef std::vector<request_ptr> request_vector;
455  typedef std::vector<value_type> minima_vector;
458 
459  //! The number of elements fitting into one block
460  enum {
461  block_size = BlockSize,
462  block_items = BlockSize / sizeof(value_type)
463  };
464 
465  static const bool debug = false;
466 
467 protected:
468  //! The total size of the external array in items. Cannot be changed
469  //! after construction.
471 
472  //! Number of blocks, again: calculated at construction time.
474 
475  //! Level of external array (Sander's PQ: group number)
477 
478  //! Common prefetch and write buffer pool
480 
481  //! The IDs of each block in external memory.
483 
484  //! A vector of size m_num_blocks with block_type pointers, some of them
485  //! will be filled while writing, but most are NULL.
487 
488  //! Begin and end pointers for each block, used for merging with
489  //! ppq_iterator.
491 
492  //! The read request pointers are used to wait until the block has been
493  //! completely fetched.
495 
496  //! stores the minimum value of each block
498 
499  //! Is array in write phase? True = write phase, false = read phase.
501 
502  //! The total number of elements minus the number of extracted values
504 
505  //! The read position in the array.
507 
508  //! The index behind the last element that is located in RAM (or is at
509  //! least requested to be so)
511 
512  //! The first unhinted block index.
514 
515  //! The first unhinted block index as it was before the
516  //! prepare_rebuilding_hints() call. Used for removal of hints which aren't
517  //! needed anymore.
519 
520  //! allow writer to access to all variables
522 
523 public:
524  /*!
525  * Constructs an external array
526  *
527  * \param size The total number of elements. Cannot be changed after
528  * construction.
529  *
530  * \param num_prefetch_blocks Number of blocks to prefetch from hard disk
531  *
532  * \param num_write_buffer_blocks Size of the write buffer in number of
533  * blocks
534  */
536  : // constants
537  m_capacity(size),
538  m_num_blocks((size_t)div_ceil(m_capacity, block_items)),
539  m_level(level),
540  m_pool(pool),
541 
542  // vectors
543  m_bids(m_num_blocks),
544  m_blocks(m_num_blocks, reinterpret_cast<block_type*>(1)),
545  m_block_pointers(m_num_blocks),
546  m_requests(m_num_blocks, NULL),
547  m_minima(m_num_blocks),
548 
549  // state
550  m_write_phase(true),
551 
552  // indices
553  m_size(0),
554  m_index(0),
555  m_end_index(0),
556  m_unhinted_block(0),
557  m_old_unhinted_block(0)
558  {
559  assert(m_capacity > 0);
560  // allocate blocks in EM.
561  block_manager* bm = block_manager::get_instance();
562  bm->new_blocks(AllocStrategy(), m_bids.begin(), m_bids.end());
563  }
564 
565  //! Default constructor. Don't use this directy. Needed for regrowing in
566  //! surrounding vector.
568  : // constants
569  m_capacity(0),
570  m_num_blocks(0),
571  m_level(0),
572  m_pool(NULL),
573 
574  // vectors
575  m_bids(0),
576  m_blocks(0),
577  m_block_pointers(0),
578  m_requests(0),
579  m_minima(0),
580 
581  // state
582  m_write_phase(false),
583 
584  // indices
585  m_size(0),
586  m_index(0),
587  m_end_index(0),
588  m_unhinted_block(0),
589  m_old_unhinted_block(0)
590  { }
591 
592  //! Swap external_array with another one.
594  {
595  using std::swap;
596 
597  // constants
598  swap(m_capacity, o.m_capacity);
599  swap(m_num_blocks, o.m_num_blocks);
600  swap(m_level, o.m_level);
601  swap(m_pool, o.m_pool);
602 
603  // vectors
604  swap(m_bids, o.m_bids);
605  swap(m_requests, o.m_requests);
606  swap(m_blocks, o.m_blocks);
607  swap(m_block_pointers, o.m_block_pointers);
608  swap(m_minima, o.m_minima);
609 
610  // state
611  swap(m_write_phase, o.m_write_phase);
612 
613  // indices
614  swap(m_size, o.m_size);
615  swap(m_index, o.m_index);
616  swap(m_end_index, o.m_end_index);
617  swap(m_unhinted_block, o.m_unhinted_block);
618  swap(m_old_unhinted_block, o.m_old_unhinted_block);
619  }
620 
621  //! Swap external_array with another one.
622  friend void swap(external_array& a, external_array& b)
623  {
624  a.swap(b);
625  }
626 
627  //! Destructor
629  {
630  if (m_size == 0) return;
631 
632  // not all data has been read! this only happen when the PPQ is
633  // destroyed while containing data.
634 
635  const unsigned_type block_index = m_index / block_items;
636  const unsigned_type end_block_index = get_end_block_index();
637 
638  // released blocks currently held in RAM
639  for (size_t i = block_index; i < end_block_index; ++i) {
640  m_pool->add_prefetch(m_blocks[i]);
641  // cannot report the number of freed blocks to PPQ.
642  }
643 
644  // cancel currently hinted blocks
645  for (size_t i = end_block_index; i < m_unhinted_block; ++i) {
646  STXXL_DEBUG("ea[" << this << "]: discarding prefetch hint on"
647  " block " << i);
648 
649  m_requests[i]->cancel();
650  m_requests[i]->wait();
651  // put block back into pool
652  m_pool->add_prefetch(m_blocks[i]);
653  // invalidate block entry
654  m_blocks[i] = NULL;
655  m_requests[i] = request_ptr();
656  }
657 
658  // figure out first block that is still allocated in EM.
659  bid_iterator i_begin = m_bids.begin() + block_index;
660  block_manager::get_instance()->delete_blocks(i_begin, m_bids.end());
661 
662  // check that all is empty
663  for (size_t i = block_index; i < end_block_index; ++i)
664  assert(m_blocks[i] == NULL);
665  }
666 
667  //! Returns the capacity in items.
668  size_t capacity() const
669  {
670  return m_capacity;
671  }
672 
673  //! Returns the current size in items.
674  size_t size() const
675  {
676  return m_size;
677  }
678 
679  //! Returns true if the array is empty.
680  bool empty() const
681  {
682  return (m_size == 0);
683  }
684 
685  //! Returns the level (group number) of the array.
686  inline unsigned_type level() const
687  {
688  return m_level;
689  }
690 
691  //! Return the number of blocks.
692  size_t num_blocks() const
693  {
694  return m_num_blocks;
695  }
696 
697  //! Returns memory usage of EA with given capacity, excluding blocks loaded
698  //! in RAM. Blocks belong to prefetch pool.
699  static size_t int_memory(size_t capacity)
700  {
701  size_t num_blocks = div_ceil(capacity, block_items);
702 
703  return sizeof(external_array)
704  + num_blocks * sizeof(typename bid_vector::value_type)
705  + num_blocks * sizeof(typename block_vector::value_type)
706  + num_blocks * sizeof(typename block_pointers_type::value_type)
707  + num_blocks * sizeof(typename request_vector::value_type)
708  + num_blocks * sizeof(typename minima_vector::value_type);
709  }
710 
711  //! Return the amount of internal memory used by the EA.
712  inline size_t int_memory() const
713  {
714  return int_memory(m_capacity);
715  }
716 
717  //! Returns the number elements available in internal memory
718  size_t buffer_size() const
719  {
720  return (m_end_index - m_index);
721  }
722 
723  //! Returns the block beyond the block in which *(m_end_index-1) is located.
725  {
726  unsigned_type end_block_index = m_end_index / block_items;
727 
728  // increase block index if inside the block
729  if (m_end_index % block_items != 0) ++end_block_index;
730  assert(end_block_index <= m_num_blocks);
731 
732  return end_block_index;
733  }
734 
735  //! Returns the block in which m_index is located.
737  {
738  return (m_index / block_items);
739  }
740 
741  //! Returns a random-access iterator to the begin of the data
742  //! in internal memory.
743  iterator begin() const
744  {
745  //-TODO?: assert(block_valid(m_index / block_items) || m_index == m_capacity);
746  return iterator(&m_block_pointers, block_items, m_index);
747  }
748 
749  //! Returns a random-access iterator 1 behind the end of the data
750  //! in internal memory.
751  iterator end() const
752  {
753  //-TODO? assert(!block_valid(m_end_index / block_items) || m_end_index == m_capacity);
754  return iterator(&m_block_pointers, block_items, m_end_index);
755  }
756 
757  //! Returns the smallest element in the array
758  const value_type & get_min()
759  {
760  return *begin();
761  }
762 
763  //! Returns if there is data in EM, that's not randomly accessible.
764  bool has_em_data() const
765  {
766  return (get_end_block_index() < m_num_blocks);
767  }
768 
769  //! Returns the smallest element of the first block NOT in internal memory
770  //! (or at least requested to be in internal memory)
772  {
773  assert(get_end_block_index() < m_num_blocks);
774  return m_minima[get_end_block_index()];
775  }
776 
777  //! Returns if the data requested to be in internal memory is
778  //! completely fetched. True if wait() has been called before.
779  bool valid() const
780  {
781  bool result = true;
782  const unsigned_type block_index = m_index / block_items;
783  const unsigned_type end_block_index = get_end_block_index();
784  for (unsigned_type i = block_index; i < end_block_index; ++i) {
785  result = result && block_valid(i);
786  }
787  return result;
788  }
789 
790  //! Random access operator for data in internal memory
791  //! You should call wait() once after fetching data from EM.
792  value_type& operator [] (size_t i) const
793  {
794  assert(i < m_capacity);
795  const size_t block_index = i / block_items;
796  const size_t local_index = i % block_items;
797  assert(i < m_capacity);
798  assert(block_valid(block_index));
799  return m_blocks[block_index]->elem[local_index];
800  }
801 
802 public:
803  //! prepare the pool for writing external arrays with given number of
804  //! threads
805  static void prepare_write_pool(pool_type& pool, unsigned_type num_threads)
806  {
807  unsigned_type write_blocks = num_threads;
808  // need at least one
809  if (write_blocks == 0) write_blocks = 1;
810  // for holding boundary blocks
811  write_blocks *= 2;
812  // more disks than threads?
813  if (write_blocks < config::get_instance()->disks_number())
814  write_blocks = config::get_instance()->disks_number();
815 #if STXXL_DEBUG_ASSERTIONS
816  // required for re-reading the external array
817  write_blocks = 2 * write_blocks;
818 #endif
819  if (pool.size_write() < write_blocks) {
820  STXXL_ERRMSG("WARNING: enlarging PPQ write pool to " <<
821  write_blocks << " blocks = " <<
822  write_blocks * block_size / 1024 / 1024 << " MiB");
823  pool.resize_write(write_blocks);
824  }
825  }
826 
827 protected:
828  //! prepare the external_array for writing using multiway_merge() with
829  //! num_threads. this method is called by the external_array_writer's
830  //! constructor.
831  void prepare_write(unsigned_type num_threads)
832  {
833  prepare_write_pool(*m_pool, num_threads);
834  }
835 
836  //! finish the writing phase after multiway_merge() filled the vector. this
837  //! method is called by the external_array_writer's destructor..
839  {
840  // check that all blocks where written
841  for (unsigned_type i = 0; i < m_num_blocks; ++i)
842  assert(m_blocks[i] == NULL);
843 
844  // compatibility to the block write interface
845  m_size = m_capacity;
846  m_index = 0;
847  m_end_index = 0;
848  m_unhinted_block = 0;
849 
850  m_write_phase = false;
851  }
852 
853  //! Called by the external_array_writer to read a block from disk into
854  //! m_blocks[]. If the block is marked as uninitialized, then no read is
855  //! performed. This is the usual case, and in theory, no block ever has be
856  //! re-read from disk, since all can be written fully. However, we do
857  //! support re-reading blocks for debugging purposes inside
858  //! multiway_merge(), in a full performance build re-reading never occurs.
859  void read_block(size_t block_index)
860  {
861  assert(block_index < m_num_blocks);
862  assert(m_blocks[block_index] == NULL ||
863  m_blocks[block_index] == reinterpret_cast<block_type*>(1));
864 
865  if (m_blocks[block_index] == reinterpret_cast<block_type*>(1))
866  {
867  // special marker: this block is uninitialized -> no need to read
868  // from disk.
869  m_blocks[block_index] = m_pool->steal();
870  }
871  else
872  {
873  // block was already written, have to read from EM.
874  STXXL_DEBUG("ea[" << this << "]: "
875  "read_block needs to re-read block index=" << block_index);
876 
877  static bool s_warned = false;
878  if (!s_warned)
879  {
880  s_warned = true;
881  STXXL_ERRMSG("ppq::external_array[" << this << "] "
882  "writer requested to re-read block from EM.");
883  STXXL_ERRMSG("This should never occur in full-performance mode, "
884  "verify that you run in debug mode.");
885  }
886 
887  // this re-reading is not necessary for full performance builds, so
888  // we immediately wait for the I/O to be completed.
889  m_blocks[block_index] = m_pool->steal();
890  request_ptr req = m_pool->read(m_blocks[block_index], m_bids[block_index]);
891  req->wait();
892  assert(req->poll());
893  assert(m_blocks[block_index]);
894  }
895  }
896 
897  //! Called by the external_array_writer to write a block from m_blocks[] to
898  //! disk. Prior to writing and releasing the memory, extra information is
899  //! preserved.
900  void write_block(size_t block_index)
901  {
902  assert(block_index < m_num_blocks);
903  assert(m_blocks[block_index] != NULL &&
904  m_blocks[block_index] != reinterpret_cast<block_type*>(1));
905 
906  // calculate minimum and maximum values
907  const internal_size_type this_block_items =
908  std::min<internal_size_type>(block_items, m_capacity - block_index * (external_size_type)block_items);
909 
910  STXXL_DEBUG("ea[" << this << "]: write_block index=" << block_index <<
911  " this_block_items=" << this_block_items);
912 
913  assert(this_block_items > 0);
914  block_type& this_block = *m_blocks[block_index];
915 
916  m_minima[block_index] = this_block[0];
917 
918  // write out block (in background)
919  m_pool->write(m_blocks[block_index], m_bids[block_index]);
920 
921  m_blocks[block_index] = NULL;
922  }
923 
924 public:
925  //! \name Prefetching Hints
926  //! \{
927 
928  //! Prefetch the next unhinted block, requires one free read block from the
929  //! global pool.
931  {
932  assert(m_unhinted_block < m_num_blocks);
933 
934  // will read (prefetch) block i
935  size_t i = m_unhinted_block++;
936 
937  STXXL_DEBUG("ea[" << this << "]: prefetching block_index=" << i);
938 
939  assert(m_pool->size_write() > 0);
940  assert(m_blocks[i] == NULL);
941 
942  // steal block from pool, but also perform read via pool, since this
943  // checks the associated write_pool.
944  m_blocks[i] = m_pool->steal_prefetch();
945  m_requests[i] = m_pool->read(m_blocks[i], m_bids[i]);
946  }
947 
948  //! Returns if there is data in EM, that's not already hinted
949  //! to the prefetcher.
950  bool has_unhinted_em_data() const
951  {
952  return (m_unhinted_block < m_num_blocks);
953  }
954 
955  //! Returns the smallest element of the next hint candidate (the block
956  //! after the last hinted one).
958  {
959  assert(m_unhinted_block < m_num_blocks);
960  return m_minima[m_unhinted_block];
961  }
962 
963  //! Returns the number of hinted blocks.
964  size_t num_hinted_blocks() const
965  {
966  assert(get_end_block_index() <= m_unhinted_block);
967  return m_unhinted_block - get_end_block_index();
968  }
969 
970  //! This method prepares rebuilding the hints (this is done after creating
971  //! a new EA in order to always have globally the n blocks hinted which
972  //! will be fetched first). Resets m_unhinted_block to the first block not
973  //! in RAM. Thereafter prehint_next_block() is used to advance this index.
974  //! finish_rebuilding_hints() should be called after placing all hints in
975  //! order to clean up the prefetch pool.
977  {
978  m_old_unhinted_block = m_unhinted_block;
979  m_unhinted_block = get_end_block_index();
980  assert(get_end_block_index() <= m_old_unhinted_block);
981  }
982 
983  //! Advance m_unhinted_block index without actually prefetching.
985  {
986  assert(m_unhinted_block < m_num_blocks);
987 
988  // will read (prefetch) block after cancellations.
989 
990  STXXL_DEBUG("ea[" << this << "]: pre-hint of" <<
991  " block_index=" << m_unhinted_block);
992 
993  ++m_unhinted_block;
994  }
995 
996  //! Cancel hints which aren't needed anymore from the prefetcher and fixes
997  //! it's size. prepare_rebuilding_hints() must be called before!
999  {
1000  for (size_t i = m_unhinted_block; i < m_old_unhinted_block; ++i) {
1001  STXXL_DEBUG("ea[" << this << "]: discarding prefetch hint on"
1002  " block " << i);
1003  m_requests[i]->cancel();
1004  m_requests[i]->wait();
1005  // put block back into pool
1006  m_pool->add_prefetch(m_blocks[i]);
1007  // invalidate block entry
1008  m_blocks[i] = NULL;
1009  m_requests[i] = request_ptr();
1010  }
1011  }
1012 
1013  //! Perform real-hinting of pre-hinted blocks, since now canceled blocks
1014  //! are available.
1016  {
1017  for (size_t i = m_old_unhinted_block; i < m_unhinted_block; ++i)
1018  {
1019  STXXL_DEBUG("ea[" << this << "]: perform real-hinting of"
1020  " block " << i);
1021 
1022  assert(m_pool->size_write() > 0);
1023  assert(m_blocks[i] == NULL);
1024  m_blocks[i] = m_pool->steal_prefetch();
1025  m_requests[i] = m_pool->read(m_blocks[i], m_bids[i]);
1026  }
1027  }
1028 
1029  //! \}
1030 
1031 public:
1032  //! \name Waiting and Removal
1033  //! \{
1034 
1035  //! Waits until the next prefetched block is read into RAM, then polls for
1036  //! any further blocks that are done as well. Returns how many blocks were
1037  //! successfully read.
1039  {
1040  size_t begin = get_end_block_index(), i = begin;
1041 
1042  STXXL_DEBUG("ea[" << this << "]: waiting for" <<
1043  " block index=" << i <<
1044  " end_index=" << m_end_index);
1045 
1046  assert(has_em_data());
1047 
1048  assert(i < m_unhinted_block);
1049  assert(m_bids[i].valid());
1050  assert(m_requests[i].valid());
1051 
1052  // wait for prefetched request to finish.
1053  m_requests[i]->wait();
1054  assert(m_requests[i]->poll());
1055  assert(m_blocks[i]);
1056 
1057  update_block_pointers(i);
1058  ++i;
1059 
1060  // poll further hinted blocks if already done
1061  while (i < m_unhinted_block && m_requests[i]->poll())
1062  {
1063  STXXL_DEBUG("ea[" << this << "]: poll-ok for" <<
1064  " block index=" << i <<
1065  " end_index=" << m_end_index);
1066  m_requests[i]->wait();
1067  assert(m_requests[i]->poll());
1068  assert(m_blocks[i]);
1069 
1070  update_block_pointers(i);
1071  ++i;
1072  }
1073 
1074  m_end_index = std::min(m_capacity, i * (external_size_type)block_items);
1075 
1076  return i - begin;
1077  }
1078 
1079  //! Waits until all hinted blocks are read into RAM. Returns how many
1080  //! blocks were successfully read.
1082  {
1083  size_t begin = get_end_block_index(), i = begin;
1084  while (i < m_unhinted_block)
1085  {
1086  STXXL_DEBUG("wait_all_hinted_blocks(): ea[" << this << "]: waiting for" <<
1087  " block index=" << i <<
1088  " end_index=" << m_end_index);
1089  m_requests[i]->wait();
1090  assert(m_requests[i]->poll());
1091  assert(m_blocks[i]);
1092  update_block_pointers(i);
1093  ++i;
1094  }
1095  m_end_index = std::min(m_capacity, i * (external_size_type)block_items);
1096  return i - begin;
1097  }
1098 
1099  //! Returns the number of blocks loaded in RAM.
1100  size_t num_used_blocks() const
1101  {
1102  return get_end_block_index() - (m_index / block_items);
1103  }
1104 
1105  //! Removes the first n elements from the array. Returns the number of
1106  //! blocks released into the block pool.
1108  {
1109  assert(m_index + n <= m_capacity);
1110  assert(m_index + n <= m_end_index);
1111  assert(m_size >= n);
1112 
1113  STXXL_DEBUG("ea[" << this << "]: remove " << n << " items");
1114 
1115  if (n == 0)
1116  return 0;
1117 
1118  const size_t block_index = m_index / block_items;
1119 
1120  const size_t index_after = m_index + n;
1121  size_t block_index_after = index_after / block_items;
1122  size_t local_index_after = index_after % block_items;
1123 
1124  if (m_size == n && local_index_after != 0) // end of EA
1125  ++block_index_after;
1126 
1127  assert(block_index_after <= m_num_blocks);
1128 
1129  bid_iterator i_begin = m_bids.begin() + block_index;
1130  bid_iterator i_end = m_bids.begin() + block_index_after;
1131  assert(i_begin <= i_end);
1132  block_manager::get_instance()->delete_blocks(i_begin, i_end);
1133 
1134  for (size_t i = block_index; i < block_index_after; ++i) {
1135  assert(block_valid(i));
1136  // return block to pool
1137  m_pool->add_prefetch(m_blocks[i]);
1138  }
1139 
1140  m_index = index_after;
1141  m_size -= n;
1142 
1143  unsigned_type blocks_freed = block_index_after - block_index;
1144 
1145  STXXL_DEBUG("ea[" << this << "]: after remove:" <<
1146  " index_after=" << index_after <<
1147  " block_index_after=" << block_index_after <<
1148  " local_index_after=" << local_index_after <<
1149  " blocks_freed=" << blocks_freed <<
1150  " num_blocks=" << m_num_blocks <<
1151  " capacity=" << m_capacity);
1152 
1153  assert(block_index_after <= m_num_blocks);
1154  // at most one block outside of the currently loaded range
1155  assert(block_index_after <= get_end_block_index());
1156 
1157  return blocks_freed;
1158  }
1159 
1160  //! \}
1161 
1162 protected:
1163  //! Returns if the block with the given index is completely fetched.
1164  bool block_valid(size_t block_index) const
1165  {
1166  if (!m_write_phase) {
1167  if (block_index >= m_num_blocks) return false;
1168  return (m_requests[block_index] && m_requests[block_index]->poll());
1169  }
1170  else {
1171  return (bool)m_blocks[block_index];
1172  }
1173  }
1174 
1175  //! Updates the m_block_pointers vector.
1176  //! Should be called after any steal() or read() operation.
1177  //! This is necessary for the iterators to work properly.
1178  inline void update_block_pointers(size_t block_index)
1179  {
1180  STXXL_DEBUG("ea[" << this << "]: updating block pointers for " << block_index);
1181 
1182  m_block_pointers[block_index].first = m_blocks[block_index]->begin();
1183  if (block_index + 1 != m_num_blocks)
1184  m_block_pointers[block_index].second = m_blocks[block_index]->end();
1185  else
1186  m_block_pointers[block_index].second =
1187  m_block_pointers[block_index].first
1188  + (m_capacity - block_index * block_items);
1189 
1190  assert(m_block_pointers[block_index].first != NULL);
1191  assert(m_block_pointers[block_index].second != NULL);
1192  }
1193 
1194  inline size_t last_block_items()
1195  {
1196  size_t mod = m_capacity % block_items;
1197  return (mod > 0) ? mod : (size_t)block_items;
1198  }
1199 };
1200 
1201 /**
1202  * An external_array can only be written using an external_array_writer
1203  * object. The writer objects provides iterators which are designed to be used
1204  * by stxxl::parallel::multiway_merge() to write the external memory blocks in
1205  * parallel. Thus in the writer we coordinate thread-safe access to the blocks
1206  * using reference counting.
1207  *
1208  * An external_array_writer::iterator has two states: normal and "live". In
1209  * normal mode, the iterator only has a valid index into the external array's
1210  * items. In normal mode, only index calculations are possible. Once
1211  * operator*() is called, the iterators goes into "live" mode by requesting
1212  * access to the corresponding block. Using reference counting the blocks is
1213  * written once all iterators are finished with the corresponding block. Since
1214  * with operator*() we cannot know if the value is going to be written or read,
1215  * when going to live mode, the block must be read from EM. This read overhead,
1216  * however, is optimized by marking blocks as uninitialized in external_array,
1217  * and skipping reads for then. In a full performance build, no block needs to
1218  * be read from disk. Reads only occur in debug mode, when the results are
1219  * verify.
1220  *
1221  * The iterator's normal/live mode only stays active for the individual
1222  * iterator object. When an iterator is copied/assigned/calculated with the
1223  * mode is NOT inherited! The exception is prefix operator ++, which is used by
1224  * multiway_merge() to fill an array. Thus the implementation of the iterator
1225  * heavily depends on the behavior of multiway_merge() and is optimized for it.
1226  */
1227 template <class ExternalArrayType>
1228 class external_array_writer : public noncopyable
1229 {
1230 public:
1231  typedef ExternalArrayType ea_type;
1232 
1234 
1235  typedef typename ea_type::value_type value_type;
1236  typedef typename ea_type::block_type block_type;
1237 
1238  //! prototype declaration of nested class.
1239  class iterator;
1240 
1241  //! scope based debug variable
1242  static const bool debug = false;
1243 
1244 protected:
1245  //! reference to the external array to be written
1247 
1248 #ifndef NDEBUG
1249  //! total number of iterators referencing this writer
1250  unsigned int m_ref_total;
1251 #endif
1252 
1253  //! reference counters for the number of live iterators on the
1254  //! corresponding block in external_array.
1255  std::vector<unsigned int> m_ref_count;
1256 
1257  //! mutex for reference counting array (this is actually nicer than
1258  //! openmp's critical)
1260 
1261  //! optimization: hold live iterators for the expected boundary blocks of
1262  //! multiway_merge().
1263  std::vector<iterator> m_live_boundary;
1264 
1265 protected:
1266  //! read block into memory and increase reference count (called when an
1267  //! iterator goes live on the block).
1268  block_type * get_block_ref(size_t block_index)
1269  {
1270  scoped_mutex_lock lock(m_mutex);
1271 
1272  assert(block_index < m_ea.num_blocks());
1273  unsigned int ref = m_ref_count[block_index]++;
1274 #ifndef NDEBUG
1275  ++m_ref_total;
1276 #endif
1277 
1278  if (ref == 0) {
1279  STXXL_DEBUG("get_block_ref block_index=" << block_index <<
1280  " ref=" << ref << " reading.");
1281  m_ea.read_block(block_index);
1282  }
1283  else {
1284  STXXL_DEBUG("get_block_ref block_index=" << block_index <<
1285  " ref=" << ref);
1286  }
1287 
1288  return m_ea.m_blocks[block_index];
1289  }
1290 
1291  //! decrease reference count on the block, and possibly write it to disk
1292  //! (called when an iterator releases live mode).
1293  void free_block_ref(size_t block_index)
1294  {
1295  scoped_mutex_lock lock(m_mutex);
1296 
1297  assert(block_index < m_ea.num_blocks());
1298 #ifndef NDEBUG
1299  assert(m_ref_total > 0);
1300  --m_ref_total;
1301 #endif
1302  unsigned int ref = --m_ref_count[block_index];
1303 
1304  if (ref == 0) {
1305  STXXL_DEBUG("free_block_ref block_index=" << block_index <<
1306  " ref=" << ref << " written.");
1307  m_ea.write_block(block_index);
1308  }
1309  else {
1310  STXXL_DEBUG("free_block_ref block_index=" << block_index <<
1311  " ref=" << ref);
1312  }
1313  }
1314 
1315  //! allow access to the block_ref functions
1316  friend class iterator;
1317 
1318 public:
1319  /**
1320  * An iterator which can be used to write (and read) an external_array via
1321  * an external_array_writer. See the documentation of external_array_writer.
1322  */
1323  class iterator
1324  {
1325  public:
1327  typedef ExternalArrayType ea_type;
1328 
1329  typedef typename ea_type::value_type value_type;
1332  typedef ptrdiff_t difference_type;
1333  typedef std::random_access_iterator_tag iterator_category;
1334 
1336 
1337  static const size_t block_items = ea_type::block_items;
1338 
1339  //! scope based debug variable
1340  static const bool debug = false;
1341 
1342  protected:
1343  //! pointer to the external array containing the elements
1345 
1346  //! when operator* or operator-> are called, then the iterator goes
1347  //! live and allocates a reference to the block's data (possibly
1348  //! reading it from EM).
1349  bool m_live;
1350 
1351  //! index of the current element, absolute in the external array
1353 
1354  //! index of the current element's block in the external array's block
1355  //! list. undefined while m_live is false.
1357 
1358  //! pointer to the referenced block. undefined while m_live is false.
1360 
1361  //! pointer to the current element inside the referenced block.
1362  //! undefined while m_live is false.
1364 
1365  public:
1366  //! default constructor (should not be used directly)
1368  : m_writer(NULL), m_live(false), m_index(0)
1369  { }
1370 
1371  //! construct a new iterator
1373  : m_writer(writer),
1374  m_live(false),
1375  m_index(index)
1376  {
1377  STXXL_DEBUG("Construct iterator for index " << m_index);
1378  }
1379 
1380  //! copy an iterator, the new iterator is _not_ automatically live!
1381  iterator(const iterator& other)
1382  : m_writer(other.m_writer),
1383  m_live(false),
1384  m_index(other.m_index)
1385  {
1386  STXXL_DEBUG("Copy-Construct iterator for index " << m_index);
1387  }
1388 
1389  //! assign an iterator, the assigned iterator is not automatically live!
1390  iterator& operator = (const iterator& other)
1391  {
1392  if (&other != this)
1393  {
1394  STXXL_DEBUG("Assign iterator to index " << other.m_index);
1395 
1396  if (m_live)
1397  m_writer->free_block_ref(m_block_index);
1398 
1399  m_writer = other.m_writer;
1400  m_live = false;
1401  m_index = other.m_index;
1402  }
1403 
1404  return *this;
1405  }
1406 
1408  {
1409  if (!m_live) return; // no need for cleanup
1410 
1411  m_writer->free_block_ref(m_block_index);
1412 
1413  STXXL_DEBUG("Destruction of iterator for index " << m_index <<
1414  " in block " << m_index / block_items);
1415  }
1416 
1417  //! return the current absolute index inside the external array.
1419  {
1420  return m_index;
1421  }
1422 
1423  //! allocates a reference to the block's data (possibly reading it from
1424  //! EM).
1425  void make_live()
1426  {
1427  assert(!m_live);
1428 
1429  // calculate block and index inside
1430  m_block_index = m_index / block_items;
1431  m_current = m_index % block_items;
1432 
1433  STXXL_DEBUG("operator*() live request for index=" << m_index <<
1434  " block_index=" << m_block_index <<
1435  " m_current=" << m_current);
1436 
1437  // get block reference
1438  m_block = m_writer->get_block_ref(m_block_index);
1439  m_live = true;
1440  }
1441 
1442  //! access the current item
1443  reference operator * ()
1444  {
1445  if (UNLIKELY(!m_live))
1446  make_live();
1447 
1448  return (*m_block)[m_current];
1449  }
1450 
1451  //! access the current item
1452  pointer operator -> ()
1453  {
1454  return &(operator * ());
1455  }
1456 
1457  //! prefix-increment operator
1459  {
1460  ++m_index;
1461  if (UNLIKELY(!m_live)) return *this;
1462 
1463  // if index stays in the same block, everything is fine
1464  ++m_current;
1465  if (LIKELY(m_current != block_items)) return *this;
1466 
1467  // release current block
1468  m_writer->free_block_ref(m_block_index);
1469  m_live = false;
1470 
1471  return *this;
1472  }
1473 
1474  self_type operator + (difference_type addend) const
1475  {
1476  return self_type(m_writer, m_index + addend);
1477  }
1478  self_type operator - (difference_type subtrahend) const
1479  {
1480  return self_type(m_writer, m_index - subtrahend);
1481  }
1482  difference_type operator - (const self_type& o) const
1483  {
1484  return (m_index - o.m_index);
1485  }
1486 
1487  bool operator == (const self_type& o) const
1488  {
1489  return m_index == o.m_index;
1490  }
1491  bool operator != (const self_type& o) const
1492  {
1493  return m_index != o.m_index;
1494  }
1495  bool operator < (const self_type& o) const
1496  {
1497  return m_index < o.m_index;
1498  }
1499  bool operator <= (const self_type& o) const
1500  {
1501  return m_index <= o.m_index;
1502  }
1503  bool operator > (const self_type& o) const
1504  {
1505  return m_index > o.m_index;
1506  }
1507  bool operator >= (const self_type& o) const
1508  {
1509  return m_index >= o.m_index;
1510  }
1511  };
1512 
1513 public:
1514  external_array_writer(ea_type& ea, unsigned int num_threads = 0)
1515  : m_ea(ea),
1516  m_ref_count(ea.num_blocks(), 0)
1517  {
1518 #ifndef NDEBUG
1519  m_ref_total = 0;
1520 #endif
1521 
1522 #if STXXL_PARALLEL
1523  if (num_threads == 0)
1524  num_threads = omp_get_max_threads();
1525 #else
1526  if (num_threads == 0)
1527  num_threads = 1;
1528 #endif
1529 
1530  m_ea.prepare_write(num_threads);
1531 
1532  // optimization: hold live iterators for the boundary blocks which two
1533  // threads write to. this prohibits the blocks to be written to disk
1534  // and read again.
1535 
1536  double step = (double)m_ea.capacity() / (double)num_threads;
1537  m_live_boundary.resize(num_threads - 1);
1538 
1539  for (unsigned int i = 0; i < num_threads - 1; ++i)
1540  {
1541  external_size_type index = (external_size_type)((i + 1) * step);
1542  STXXL_DEBUG("hold index " << index <<
1543  " in block " << index / ea_type::block_items);
1544  m_live_boundary[i] = iterator(this, index);
1545  m_live_boundary[i].make_live();
1546  }
1547  }
1548 
1550  {
1551  m_live_boundary.clear(); // release block boundaries
1552 #ifndef NDEBUG
1553  STXXL_ASSERT(m_ref_total == 0);
1554 #endif
1555  m_ea.finish_write();
1556  }
1557 
1559  {
1560  return iterator(this, 0);
1561  }
1562 
1564  {
1565  return iterator(this, m_ea.capacity());
1566  }
1567 };
1568 
1569 /*!
1570  * The minima_tree contains minima from all sources inside the PPQ. It contains
1571  * four substructures: winner trees for insertion heaps, internal and external
1572  * arrays, each containing the minima from all currently allocated
1573  * structures. These three sources, plus the deletion buffer are combined using
1574  * a "head" inner tree containing only up to four item.
1575  */
1576 template <class ParentType>
1578 {
1579 public:
1580  typedef ParentType parent_type;
1582 
1588 
1589  static const unsigned initial_ia_size = 2;
1590  static const unsigned initial_ea_size = 2;
1591 
1592 protected:
1593  //! WinnerTree-Comparator for the head winner tree. It accesses all
1594  //! relevant data structures from the priority queue.
1595  struct head_comp
1596  {
1601 
1603  ias_type& ias, const compare_type& compare)
1604  : m_parent(parent),
1605  m_proc(proc),
1606  m_ias(ias),
1607  m_compare(compare)
1608  { }
1609 
1610  const value_type & get_value(int input) const
1611  {
1612  switch (input) {
1613  case HEAP:
1614  return m_proc[m_parent.m_heaps.top()]->insertion_heap[0];
1615  case IA:
1616  return m_ias[m_parent.m_ia.top()].get_min();
1617  case EB:
1618  return m_parent.m_parent.m_extract_buffer[
1619  m_parent.m_parent.m_extract_buffer_index
1620  ];
1621  default:
1622  abort();
1623  }
1624  }
1625 
1626  bool operator () (const int a, const int b) const
1627  {
1628  return m_compare(get_value(a), get_value(b));
1629  }
1630  };
1631 
1632  //! Comparator for the insertion heaps winner tree.
1633  struct heaps_comp
1634  {
1637 
1639  : m_proc(proc), m_compare(compare)
1640  { }
1641 
1642  const value_type & get_value(int index) const
1643  {
1644  return m_proc[index]->insertion_heap[0];
1645  }
1646 
1647  bool operator () (const int a, const int b) const
1648  {
1649  return m_compare(get_value(a), get_value(b));
1650  }
1651  };
1652 
1653  //! Comparator for the internal arrays winner tree.
1654  struct ia_comp
1655  {
1658 
1659  ia_comp(ias_type& ias, const compare_type& compare)
1660  : m_ias(ias), m_compare(compare)
1661  { }
1662 
1663  bool operator () (const int a, const int b) const
1664  {
1665  return m_compare(m_ias[a].get_min(), m_ias[b].get_min());
1666  }
1667  };
1668 
1669 protected:
1670  //! The priority queue
1672 
1673  //! value_type comparator
1675 
1676  //! Comperator instances
1677  head_comp m_head_comp;
1678  heaps_comp m_heaps_comp;
1679  ia_comp m_ia_comp;
1680 
1681  //! The winner trees
1685 
1686 public:
1687  //! Entries in the head winner tree.
1688  enum Types {
1689  HEAP = 0,
1690  IA = 1,
1691  EB = 2,
1692  TYPE_ERROR = 3
1693  };
1694 
1695  //! Construct the tree of minima sources.
1697  : m_parent(parent),
1698  m_compare(parent.m_inv_compare),
1699  // construct comparators
1700  m_head_comp(*this, parent.m_proc,
1701  parent.m_internal_arrays, m_compare),
1702  m_heaps_comp(parent.m_proc, m_compare),
1703  m_ia_comp(parent.m_internal_arrays, m_compare),
1704  // construct header winner tree
1705  m_head(3, m_head_comp),
1706  m_heaps(m_parent.m_num_insertion_heaps, m_heaps_comp),
1707  m_ia(initial_ia_size, m_ia_comp)
1708  { }
1709 
1710  //! Return smallest items of head winner tree.
1711  std::pair<unsigned, unsigned> top()
1712  {
1713  unsigned type = m_head.top();
1714  switch (type)
1715  {
1716  case HEAP:
1717  return std::make_pair(HEAP, m_heaps.top());
1718  case IA:
1719  return std::make_pair(IA, m_ia.top());
1720  case EB:
1721  return std::make_pair(EB, 0);
1722  default:
1723  return std::make_pair(TYPE_ERROR, 0);
1724  }
1725  }
1726 
1727  //! Update minima tree after an item from the heap index was removed.
1728  void update_heap(int_type index)
1729  {
1730  m_heaps.notify_change(index);
1731  m_head.notify_change(HEAP);
1732  }
1733 
1734  //! Update minima tree after an item of the extract buffer was removed.
1736  {
1737  m_head.notify_change(EB);
1738  }
1739 
1740  //! Update minima tree after an item from an internal array was removed.
1741  void update_internal_array(unsigned index)
1742  {
1743  m_ia.notify_change(index);
1744  m_head.notify_change(IA);
1745  }
1746 
1747  //! Add a newly created internal array to the minima tree.
1748  void add_internal_array(unsigned index)
1749  {
1750  m_ia.activate_player(index);
1751  m_head.notify_change(IA);
1752  }
1753 
1754  //! Remove an insertion heap from the minima tree.
1755  void deactivate_heap(unsigned index)
1756  {
1757  m_heaps.deactivate_player(index);
1758  if (!m_heaps.empty())
1759  m_head.notify_change(HEAP);
1760  else
1761  m_head.deactivate_player(HEAP);
1762  }
1763 
1764  //! Remove the extract buffer from the minima tree.
1766  {
1767  m_head.deactivate_player(EB);
1768  }
1769 
1770  //! Remove an internal array from the minima tree.
1771  void deactivate_internal_array(unsigned index)
1772  {
1773  m_ia.deactivate_player(index);
1774  if (!m_ia.empty())
1775  m_head.notify_change(IA);
1776  else
1777  m_head.deactivate_player(IA);
1778  }
1779 
1780  //! Remove all insertion heaps from the minima tree.
1782  {
1783  m_heaps.clear();
1784  m_head.deactivate_player(HEAP);
1785  }
1786 
1787  //! Remove all internal arrays from the minima tree.
1789  {
1790  m_ia.resize_and_clear(initial_ia_size);
1791  m_head.deactivate_player(IA);
1792  }
1793 
1795  {
1796  if (!m_parent.m_internal_arrays.empty())
1797  {
1798  m_ia.resize_and_rebuild(m_parent.m_internal_arrays.size());
1799  m_head.notify_change(IA);
1800  }
1801  else
1802  {
1803  m_head.deactivate_player(IA);
1804  }
1805  }
1806 
1807  //! Return size of internal arrays minima tree
1808  size_t ia_slots() const
1809  {
1810  return m_ia.num_slots();
1811  }
1812 
1813  //! Returns a readable representation of the winner tree as string.
1814  std::string to_string() const
1815  {
1816  std::ostringstream ss;
1817  ss << "Head:" << std::endl << m_head.to_string() << std::endl;
1818  ss << "Heaps:" << std::endl << m_heaps.to_string() << std::endl;
1819  ss << "IA:" << std::endl << m_ia.to_string() << std::endl;
1820  return ss.str();
1821  }
1822 
1823  //! Prints statistical data.
1824  void print_stats() const
1825  {
1826  STXXL_MSG("Head winner tree stats:");
1827  m_head.print_stats();
1828  STXXL_MSG("Heaps winner tree stats:");
1829  m_heaps.print_stats();
1830  STXXL_MSG("IA winner tree stats:");
1831  m_ia.print_stats();
1832  }
1833 };
1834 
1835 } // namespace ppq_local
1836 
1837 /*!
1838  * Parallelized External Memory Priority Queue.
1839  *
1840  * \tparam ValueType Type of the contained objects (POD with no references to
1841  * internal memory).
1842  *
1843  * \tparam CompareType The comparator type used to determine whether one
1844  * element is smaller than another element.
1845  *
1846  * \tparam DefaultMemSize Maximum memory consumption by the queue. Can be
1847  * overwritten by the constructor. Default = 1 GiB.
1848  *
1849  * \tparam MaxItems Maximum number of elements the queue contains at one
1850  * time. Default = 0 = unlimited. This is no hard limit and only used for
1851  * optimization. Can be overwritten by the constructor.
1852  *
1853  * \tparam BlockSize External block size. Default =
1854  * STXXL_DEFAULT_BLOCK_SIZE(ValueType).
1855  *
1856  * \tparam AllocStrategy Allocation strategy for the external memory. Default =
1857  * STXXL_DEFAULT_ALLOC_STRATEGY.
1858  */
1859 template <
1860  class ValueType,
1861  class CompareType = std::less<ValueType>,
1862  class AllocStrategy = STXXL_DEFAULT_ALLOC_STRATEGY,
1863  uint64 BlockSize = STXXL_DEFAULT_BLOCK_SIZE(ValueType),
1864  uint64 DefaultMemSize = 1* 1024L* 1024L* 1024L,
1865  uint64 MaxItems = 0
1866  >
1868 {
1869  //! \name Types
1870  //! \{
1871 
1872 public:
1873  typedef ValueType value_type;
1874  typedef CompareType compare_type;
1875  typedef AllocStrategy alloc_strategy;
1876  static const uint64 block_size = BlockSize;
1878 
1880  typedef std::vector<BID<block_size> > bid_vector;
1886  typedef typename std::vector<value_type>::iterator value_iterator;
1888  typedef std::pair<iterator, iterator> iterator_pair_type;
1889 
1890  static const bool debug = false;
1891 
1892  //! currently global public tuning parameter:
1894 
1895  //! currently global public tuning parameter:
1897 
1898 protected:
1899  //! type of insertion heap itself
1900  typedef std::vector<value_type> heap_type;
1901 
1902  //! type of internal arrays vector
1904  //! type of external arrays vector
1906  //! type of minima tree combining the structures
1907  typedef ppq_local::minima_tree<
1909  block_size, DefaultMemSize, MaxItems> > minima_type;
1910  //! allow minima tree access to internal data structures
1911  friend class ppq_local::minima_tree<
1913  block_size, DefaultMemSize, MaxItems> >;
1914 
1915  //! Inverse comparison functor
1917  {
1919 
1921  : compare(c)
1922  { }
1923 
1924  bool operator () (const value_type& x, const value_type& y) const
1925  {
1926  return compare(y, x);
1927  }
1928  };
1929 
1930  //! <-Comparator for value_type
1932 
1933  //! >-Comparator for value_type
1935 
1936  //! Defines if statistics are gathered: dummy_custom_stats_counter or
1937  //! custom_stats_counter
1939 
1940  //! Defines if statistics are gathered: fake_timer or timer
1942 
1943  //! \}
1944 
1945  //! \name Compile-Time Parameters
1946  //! \{
1947 
1948  //! Merge sorted heaps when flushing into an internal array.
1949  //! Pro: Reduces the risk of a large winner tree
1950  //! Con: Flush insertion heaps becomes slower.
1951  static const bool c_merge_sorted_heaps = true;
1952 
1953  //! Default number of write buffer block for a new external array being
1954  //! filled.
1955  static const unsigned c_num_write_buffer_blocks = 14;
1956 
1957  //! Defines for how much external arrays memory should be reserved in the
1958  //! constructor.
1959  static const unsigned c_num_reserved_external_arrays = 10;
1960 
1961  //! Size of a single insertion heap in Byte, if not defined otherwise in
1962  //! the constructor. Default: 1 MiB
1963  static const size_type c_default_single_heap_ram = 1L * 1024L * 1024L;
1964 
1965  //! Default limit of the extract buffer ram consumption as share of total
1966  //! ram
1967  // C++11: constexpr static double c_default_extract_buffer_ram_part = 0.05;
1968  // C++98 does not allow static const double initialization here.
1969  // It's located in global scope instead.
1971 
1972  /*!
1973  * Limit the size of the extract buffer to an absolute value.
1974  *
1975  * The actual size can be set using the extract_buffer_ram parameter of the
1976  * constructor. If this parameter is not set, the value is calculated by
1977  * (total_ram*c_default_extract_buffer_ram_part)
1978  *
1979  * If c_limit_extract_buffer==false, the memory consumption of the extract
1980  * buffer is only limited by the number of external and internal
1981  * arrays. This is considered in memory management using the
1982  * ram_per_external_array and ram_per_internal_array values. Attention:
1983  * Each internal array reserves space for the extract buffer in the size of
1984  * all heaps together.
1985  */
1986  static const bool c_limit_extract_buffer = true;
1987 
1988  //! For bulks of size up to c_single_insert_limit sequential single insert
1989  //! is faster than bulk_push.
1990  static const unsigned c_single_insert_limit = 100;
1991 
1992  //! \}
1993 
1994  //! \name Parameters and Sizes for Memory Allocation Policy
1995 
1996  //! Number of insertion heaps. Usually equal to the number of CPUs.
1998 
1999  //! Capacity of one inserion heap
2001 
2002  //! Return size of insertion heap reservation in bytes
2004  {
2005  return m_insertion_heap_capacity * sizeof(value_type);
2006  }
2007 
2008  //! Total amount of internal memory
2010 
2011  //! Maximum size of extract buffer in number of elements
2012  //! Only relevant if c_limit_extract_buffer==true
2014 
2015  //! Size of all insertion heaps together in bytes
2017 
2018  //! Number of read/prefetch blocks per external array.
2020 
2021  //! Total number of read/prefetch buffer blocks
2023  //! number of currently hinted prefetch blocks
2025  //! number of currently loaded blocks
2027 
2028  //! Free memory in bytes
2030 
2031  //! \}
2032 
2033  //! Flag if inside a bulk_push sequence.
2035 
2036  //! If the bulk currently being inserted is very large, this boolean is set
2037  //! and bulk_push just accumulate the elements for eventual sorting.
2039 
2040  //! First index in m_external_arrays that was not re-hinted during a
2041  //! bulk_push sequence.
2043 
2044  //! Index of the currently smallest element in the extract buffer
2046 
2047  //! \name Number of elements currently in the data structures
2048  //! \{
2049 
2050  //! Number of elements int the insertion heaps
2052 
2053  //! Number of elements in the extract buffer
2055 
2056  //! Number of elements in the internal arrays
2058 
2059  //! Number of elements in the external arrays
2061 
2062  //! \}
2063 
2064  //! \name Data Holding Structures
2065  //! \{
2066 
2067  //! A struct containing the local insertion heap and other information
2068  //! _local_ to a processor.
2070  {
2071  //! The heaps where new elements are usually inserted into
2073 
2074  //! The number of items inserted into the insheap during bulk parallel
2075  //! access.
2077  };
2078 
2079  typedef std::vector<ProcessorData*> proc_vector_type;
2080 
2081  //! Array of processor local data structures, including the insertion heaps.
2083 
2084  //! Prefetch and write buffer pool for external arrays (has to be in front
2085  //! of m_external_arrays)
2087 
2088  //! The extract buffer where external (and internal) arrays are merged into
2089  //! for extracting
2090  std::vector<value_type> m_extract_buffer;
2091 
2092  //! The sorted arrays in internal memory
2094 
2095  //! The sorted arrays in external memory
2097 
2098  //! The aggregated pushes. They cannot be extracted yet.
2099  std::vector<value_type> m_aggregated_pushes;
2100 
2101  //! The maximum number of internal array levels.
2102  static const unsigned_type c_max_internal_levels = 8;
2103 
2104  //! The number of internal arrays on each level, we use plain array.
2105  unsigned_type m_internal_levels[c_max_internal_levels];
2106 
2107  //! The maximum number of external array levels.
2108  static const unsigned_type c_max_external_levels = 8;
2109 
2110  //! The number of external arrays on each level, we use plain array.
2111  unsigned_type m_external_levels[c_max_external_levels];
2112 
2113  //! The winner tree containing the smallest values of all sources
2114  //! where the globally smallest element could come from.
2116 
2117  //! Compares the largest accessible value of two external arrays.
2121 
2123  const inv_compare_type& compare)
2124  : m_eas(eas), m_compare(compare) { }
2125 
2126  bool operator () (const size_t& a, const size_t& b) const
2127  {
2128  return m_compare(m_eas[a].get_next_block_min(),
2129  m_eas[b].get_next_block_min());
2130  }
2131  } m_external_min_comparator;
2132 
2133  //! Tracks the largest accessible values of the external arrays if there
2134  //! is unaccessible data in EM. The winning array is the first one that
2135  //! needs to fetch further data from EM. Used in calculate_merge_sequences.
2137 
2138  //! Compares the largest value of the block hinted the latest of two
2139  //! external arrays.
2143 
2145  const inv_compare_type& compare)
2146  : m_eas(eas), m_compare(compare) { }
2147 
2148  bool operator () (const size_t& a, const size_t& b) const
2149  {
2150  return m_compare(m_eas[a].get_next_hintable_min(),
2151  m_eas[b].get_next_hintable_min());
2152  }
2153  } m_hint_comparator;
2154 
2155  //! Tracks the largest values of the block hinted the latest of the
2156  //! external arrays if there is unaccessible data in EM. The winning
2157  //! array is the first one that needs to fetch further data from EM.
2158  //! Used for prefetch hints.
2160 
2161  //! Random number generator for randomly selecting a heap in sequential
2162  //! push()
2164 
2165  //! \}
2166 
2167  /*
2168  * Helper function to remove empty internal/external arrays.
2169  */
2170 
2171  //! Unary operator which returns true if the external array has run empty.
2173  bool operator () (external_array_type& a) const
2174  { return a.empty(); }
2175  };
2176 
2177  //! Unary operator which returns true if the internal array has run empty.
2179  bool operator () (internal_array_type& a) const
2180  { return a.empty(); }
2181  };
2182 
2183  //! Clean up empty internal arrays, free their memory and capacity
2185  {
2186  typename internal_arrays_type::iterator swap_end =
2187  stxxl::swap_remove_if(m_internal_arrays.begin(),
2188  m_internal_arrays.end(),
2190 
2191  for (typename internal_arrays_type::iterator ia = swap_end;
2192  ia != m_internal_arrays.end(); ++ia)
2193  {
2194  m_mem_left += ia->int_memory();
2195  --m_internal_levels[ia->level()];
2196  }
2197 
2198  if (swap_end != m_internal_arrays.end())
2199  STXXL_DEBUG0("cleanup_internal_arrays" <<
2200  " cleaned=" << m_internal_arrays.end() - swap_end);
2201 
2202  m_internal_arrays.erase(swap_end, m_internal_arrays.end());
2203  m_minima.rebuild_internal_arrays();
2204  }
2205 
2206  //! Clean up empty external arrays, free their memory and capacity
2208  {
2209  typedef typename external_arrays_type::iterator ea_iterator;
2211 
2212  // The following is a modified implementation of swap_remove_if().
2213  // Updates m_external_min_tree accordingly.
2214 
2215  ea_iterator first = m_external_arrays.begin();
2216  ea_iterator last = m_external_arrays.end();
2217  ea_iterator swap_end = first;
2218  size_t size = m_external_arrays.end() - m_external_arrays.begin();
2219  size_t first_removed = size;
2220  while (first != last)
2221  {
2222  if (!pred(*first))
2223  {
2224  using std::swap;
2225  swap(*first, *swap_end);
2226  ++swap_end;
2227  }
2228  else if (first_removed >= size)
2229  {
2230  first_removed = first - m_external_arrays.begin();
2231  }
2232  ++first;
2233  }
2234 
2235  // subtract memory of EAs, which will be freed
2236  for (ea_iterator ea = swap_end; ea != last; ++ea) {
2237  m_mem_left += ea->int_memory();
2238  --m_external_levels[ea->level()];
2239  }
2240 
2241  size_t swap_end_index = swap_end - m_external_arrays.begin();
2242 
2243  // Deactivating all affected players first.
2244  // Otherwise there might be outdated comparisons.
2245  for (size_t i = size; i != first_removed; ) {
2246  --i;
2247  m_external_min_tree.deactivate_player_step(i);
2248  // TODO delay if (m_in_bulk_push)?
2249  m_hint_tree.deactivate_player_step(i);
2250  }
2251 
2252  // Replay moved arrays.
2253  for (size_t i = first_removed; i < swap_end_index; ++i) {
2254  update_external_min_tree(i);
2255  // TODO delay if (m_in_bulk_push)?
2256  update_hint_tree(i);
2257  }
2258 
2259  STXXL_DEBUG("Removed " << m_external_arrays.end() - swap_end <<
2260  " empty external arrays.");
2261 
2262  m_external_arrays.erase(swap_end, m_external_arrays.end());
2263 
2264  resize_read_pool(); // shrinks read/prefetch pool
2265  }
2266 
2267  /*!
2268  * SiftUp a new element from the last position in the heap, reestablishing
2269  * the heap invariant. This is identical to std::push_heap, except that it
2270  * returns the last element modified by siftUp. Thus we can identify if the
2271  * minimum may have changed.
2272  */
2273  template <typename RandomAccessIterator, typename HeapCompareType>
2274  static inline unsigned_type
2275  push_heap(RandomAccessIterator first, RandomAccessIterator last,
2276  HeapCompareType comp)
2277  {
2278  typedef typename std::iterator_traits<RandomAccessIterator>::value_type
2279  value_type;
2280 
2281  value_type value = STXXL_MOVE(*(last - 1));
2282 
2283  unsigned_type index = (last - first) - 1;
2284  unsigned_type parent = (index - 1) / 2;
2285 
2286  while (index > 0 && comp(*(first + parent), value))
2287  {
2288  *(first + index) = STXXL_MOVE(*(first + parent));
2289  index = parent;
2290  parent = (index - 1) / 2;
2291  }
2292  *(first + index) = STXXL_MOVE(value);
2293 
2294  return index;
2295  }
2296 
2297 public:
2298  //! \name Initialization
2299  //! \{
2300 
2301  /*!
2302  * Constructor.
2303  *
2304  * \param compare Comparator for priority queue, which is a Max-PQ.
2305  *
2306  * \param total_ram Maximum RAM usage. 0 = Default = Use the template
2307  * value DefaultMemSize.
2308  *
2309  * \param num_read_blocks_per_ea Number of read blocks per external
2310  * array. Default = 1.5f
2311  *
2312  * \param num_write_buffer_blocks Number of write buffer blocks for a new
2313  * external array being filled. 0 = Default = c_num_write_buffer_blocks
2314  *
2315  * \param num_insertion_heaps Number of insertion heaps. 0 = Default =
2316  * Determine by omp_get_max_threads().
2317  *
2318  * \param single_heap_ram Memory usage for a single insertion heap.
2319  * Default = c_single_heap_ram.
2320  *
2321  * \param extract_buffer_ram Memory usage for the extract buffer. Only
2322  * relevant if c_limit_extract_buffer==true. 0 = Default = total_ram *
2323  * c_default_extract_buffer_ram_part.
2324  */
2326  const compare_type& compare = compare_type(),
2327  size_type total_ram = DefaultMemSize,
2328  float num_read_blocks_per_ea = 1.5f,
2329  unsigned_type num_write_buffer_blocks = c_num_write_buffer_blocks,
2330  unsigned_type num_insertion_heaps = 0,
2331  size_type single_heap_ram = c_default_single_heap_ram,
2332  size_type extract_buffer_ram = 0)
2333  : c_max_internal_level_size(64),
2334  c_max_external_level_size(64),
2335  m_compare(compare),
2336  m_inv_compare(m_compare),
2337  // Parameters and Sizes for Memory Allocation Policy
2338 #if STXXL_PARALLEL
2339  m_num_insertion_heaps(num_insertion_heaps > 0 ? num_insertion_heaps : omp_get_max_threads()),
2340 #else
2341  m_num_insertion_heaps(num_insertion_heaps > 0 ? num_insertion_heaps : 1),
2342 #endif
2343  m_insertion_heap_capacity(single_heap_ram / sizeof(value_type)),
2344  m_mem_total(total_ram),
2345  m_mem_for_heaps(m_num_insertion_heaps * single_heap_ram),
2346  m_num_read_blocks_per_ea(num_read_blocks_per_ea),
2347  m_num_read_blocks(0),
2348  m_num_hinted_blocks(0),
2349  m_num_used_read_blocks(0),
2350  // (unnamed)
2351  m_in_bulk_push(false),
2352  m_is_very_large_bulk(false),
2353  m_extract_buffer_index(0),
2354  // Number of elements currently in the data structures
2355  m_heaps_size(0),
2356  m_extract_buffer_size(0),
2357  m_internal_size(0),
2358  m_external_size(0),
2359  // Data Holding Structures
2360  m_proc(m_num_insertion_heaps),
2361  m_pool(0, num_write_buffer_blocks),
2362  m_external_arrays(),
2363  m_minima(*this),
2364  m_external_min_comparator(m_external_arrays, m_inv_compare),
2365  m_external_min_tree(4, m_external_min_comparator),
2366  m_hint_comparator(m_external_arrays, m_inv_compare),
2367  m_hint_tree(4, m_hint_comparator),
2368  // flags
2369  m_limit_extract(false)
2370  {
2371 #if STXXL_PARALLEL
2372  if (!omp_get_nested()) {
2373  omp_set_nested(1);
2374  if (!omp_get_nested()) {
2375  STXXL_ERRMSG("Could not enable OpenMP's nested parallelism, "
2376  "however, the PPQ requires this OpenMP feature.");
2377  abort();
2378  }
2379  }
2380 #else
2381  STXXL_ERRMSG("You are using stxxl::parallel_priority_queue without "
2382  "support for OpenMP parallelism.");
2383  STXXL_ERRMSG("This is probably not what you want, so check the "
2384  "compilation settings.");
2385 #endif
2386 
2387  if (c_limit_extract_buffer) {
2388  m_extract_buffer_limit = (extract_buffer_ram > 0)
2389  ? extract_buffer_ram / sizeof(value_type)
2390  : static_cast<size_type>(((double)(m_mem_total) * c_default_extract_buffer_ram_part / sizeof(value_type)));
2391  }
2392 
2393  for (unsigned_type i = 0; i < c_max_internal_levels; ++i)
2394  m_internal_levels[i] = 0;
2395 
2396  for (unsigned_type i = 0; i < c_max_external_levels; ++i)
2397  m_external_levels[i] = 0;
2398 
2399  // TODO: Do we still need this line? Insertion heap memory is
2400  // registered below. And merge buffer is equal to the new IA...
2401  // total_ram - ram for the heaps - ram for the heap merger
2402  m_mem_left = m_mem_total - 2 * m_mem_for_heaps;
2403 
2404  // reverse insertion heap memory on processor-local memory
2405 #if STXXL_PARALLEL
2406 #pragma omp parallel for
2407 #endif
2408  for (long p = 0; p < m_num_insertion_heaps; ++p)
2409  {
2410  m_proc[p] = new ProcessorData;
2411  m_proc[p]->insertion_heap.reserve(m_insertion_heap_capacity);
2412  assert(m_proc[p]->insertion_heap.capacity() * sizeof(value_type)
2413  == insertion_heap_int_memory());
2414  }
2415 
2416  m_mem_left -= m_num_insertion_heaps * insertion_heap_int_memory();
2417 
2418  // prepare prefetch buffer pool (already done in initializer),
2419  // initially zero.
2420 
2421  // prepare write buffer pool: calculate size and subtract from mem_left
2422  external_array_type::prepare_write_pool(m_pool, m_num_insertion_heaps);
2423  m_mem_left -= m_pool.size_write() * block_size;
2424 
2425  // prepare internal arrays
2426  if (c_merge_sorted_heaps) {
2427  m_internal_arrays.reserve(m_mem_total / m_mem_for_heaps);
2428  }
2429  else {
2430  m_internal_arrays.reserve(m_mem_total * m_num_insertion_heaps / m_mem_for_heaps);
2431  }
2432 
2433  // prepare external arrays
2434  m_external_arrays.reserve(c_num_reserved_external_arrays);
2435 
2436  if (m_mem_total < m_mem_left) // checks if unsigned type wrapped.
2437  {
2438  STXXL_ERRMSG("Minimum memory requirement insufficient, "
2439  "increase PPQ's memory limit or decrease buffers.");
2440  abort();
2441  }
2442 
2443  check_invariants();
2444  }
2445 
2446  //! Destructor.
2448  {
2449  // clean up data structures
2450 
2451  for (size_t p = 0; p < m_num_insertion_heaps; ++p)
2452  {
2453  delete m_proc[p];
2454  }
2455  }
2456 
2457 protected:
2458  //! Assert many invariants of the data structures.
2459  void check_invariants() const
2460  {
2461 #ifdef NDEBUG
2462  // disable in Release builds
2463  return;
2464 #endif
2465 
2466  size_type mem_used = 0;
2467 
2468  mem_used += 2 * m_mem_for_heaps
2469  + m_pool.size_write() * block_size
2470  + m_pool.free_size_prefetch() * block_size
2471  + m_num_hinted_blocks * block_size
2472  + m_num_used_read_blocks * block_size;
2473 
2474  // count number of blocks hinted in prefetcher
2475 
2476  size_t num_hinted = 0, num_used_read = 0;
2477  for (size_t i = 0; i < m_external_arrays.size(); ++i) {
2478  num_hinted += m_external_arrays[i].num_hinted_blocks();
2479  num_used_read += m_external_arrays[i].num_used_blocks();
2480  }
2481 
2482  STXXL_CHECK(num_hinted == m_num_hinted_blocks);
2483  STXXL_CHECK(num_used_read == m_num_used_read_blocks);
2484 
2485  STXXL_CHECK_EQUAL(m_num_used_read_blocks,
2486  m_num_read_blocks
2487  - m_pool.free_size_prefetch()
2488  - m_num_hinted_blocks);
2489 
2490  // test the processor local data structures
2491 
2492  size_type heaps_size = 0;
2493 
2494  for (int_type p = 0; p < m_num_insertion_heaps; ++p)
2495  {
2496  // check that each insertion heap is a heap
2497 
2498  // TODO: remove soon, because this is very expensive
2499  STXXL_CHECK(1 || stxxl::is_heap(m_proc[p]->insertion_heap.begin(),
2500  m_proc[p]->insertion_heap.end(),
2501  m_compare));
2502 
2503  STXXL_CHECK(m_proc[p]->insertion_heap.capacity() <= m_insertion_heap_capacity);
2504 
2505  heaps_size += m_proc[p]->insertion_heap.size();
2506  mem_used += m_proc[p]->insertion_heap.capacity() * sizeof(value_type);
2507  }
2508 
2509  if (!m_in_bulk_push)
2510  STXXL_CHECK_EQUAL(m_heaps_size, heaps_size);
2511 
2512  // count number of items and memory size of internal arrays
2513 
2514  size_type ia_size = 0;
2515  size_type ia_memory = 0;
2516  std::vector<unsigned_type> ia_levels(c_max_internal_levels, 0);
2517 
2518  for (typename internal_arrays_type::const_iterator ia =
2519  m_internal_arrays.begin(); ia != m_internal_arrays.end(); ++ia)
2520  {
2521  ia_size += ia->size();
2522  ia_memory += ia->int_memory();
2523  ++ia_levels[ia->level()];
2524  }
2525 
2526  STXXL_CHECK_EQUAL(m_internal_size, ia_size);
2527  mem_used += ia_memory;
2528 
2529  for (unsigned_type i = 0; i < c_max_internal_levels; ++i)
2530  STXXL_CHECK_EQUAL(m_internal_levels[i], ia_levels[i]);
2531 
2532  // count number of items in external arrays
2533 
2534  size_type ea_size = 0;
2535  size_type ea_memory = 0;
2536  std::vector<unsigned_type> ea_levels(c_max_external_levels, 0);
2537 
2538  for (typename external_arrays_type::const_iterator ea =
2539  m_external_arrays.begin(); ea != m_external_arrays.end(); ++ea)
2540  {
2541  ea_size += ea->size();
2542  ea_memory += ea->int_memory();
2543  ++ea_levels[ea->level()];
2544  }
2545 
2546  STXXL_CHECK_EQUAL(m_external_size, ea_size);
2547  mem_used += ea_memory;
2548 
2549  for (unsigned_type i = 0; i < c_max_external_levels; ++i)
2550  STXXL_CHECK_EQUAL(m_external_levels[i], ea_levels[i]);
2551 
2552  // calculate mem_used so that == mem_total - mem_left
2553 
2554  STXXL_CHECK_EQUAL(memory_consumption(), mem_used);
2555  }
2556 
2557  //! \}
2558 
2559  //! \name Properties
2560  //! \{
2561 
2562 public:
2563  //! The number of elements in the queue.
2564  inline size_type size() const
2565  {
2566  return m_heaps_size + m_internal_size + m_external_size + m_extract_buffer_size;
2567  }
2568 
2569  //! Returns if the queue is empty.
2570  inline bool empty() const
2571  {
2572  return (size() == 0);
2573  }
2574 
2575  //! The memory consumption in Bytes.
2577  {
2578  assert(m_mem_total >= m_mem_left);
2579  return (m_mem_total - m_mem_left);
2580  }
2581 
2582 protected:
2583  //! Returns if the extract buffer is empty.
2584  inline bool extract_buffer_empty() const
2585  {
2586  return (m_extract_buffer_size == 0);
2587  }
2588 
2589  //! \}
2590 
2591 public:
2592  //! \name Bulk Operations
2593  //! \{
2594 
2595  /*!
2596  * Start a sequence of push operations.
2597  * \param bulk_size Exact number of elements to push before the next pop.
2598  */
2599  void bulk_push_begin(size_type bulk_size)
2600  {
2601  assert(!m_in_bulk_push);
2602  m_in_bulk_push = true;
2603  m_bulk_first_delayed_external_array = m_external_arrays.size();
2604 
2605  size_type heap_capacity = m_num_insertion_heaps * m_insertion_heap_capacity;
2606 
2607  // if bulk_size is large: use simple aggregation instead of keeping the
2608  // heap property and sort everything afterwards.
2609  if (bulk_size > heap_capacity && 0) {
2610  m_is_very_large_bulk = true;
2611  }
2612  else {
2613  m_is_very_large_bulk = false;
2614 
2615  if (bulk_size + m_heaps_size > heap_capacity) {
2616  if (m_heaps_size > 0) {
2617  //flush_insertion_heaps();
2618  }
2619  }
2620  }
2621 
2622  // zero bulk insertion counters
2623  for (int_type p = 0; p < m_num_insertion_heaps; ++p)
2624  m_proc[p]->heap_add_size = 0;
2625  }
2626 
2627  /*!
2628  * Push an element inside a sequence of pushes.
2629  * Run bulk_push_begin() before using this method.
2630  *
2631  * \param element The element to push.
2632  * \param p The id of the insertion heap to use (usually the thread id).
2633  */
2634  void bulk_push(const value_type& element, const unsigned_type p)
2635  {
2636  assert(m_in_bulk_push);
2637 
2638  heap_type& insheap = m_proc[p]->insertion_heap;
2639 
2640  if (!m_is_very_large_bulk && 0)
2641  {
2642  // if small bulk: if heap is full -> sort locally and put into
2643  // internal array list. insert items and keep heap invariant.
2644  if (UNLIKELY(insheap.size() >= m_insertion_heap_capacity)) {
2645 #if STXXL_PARALLEL
2646 #pragma omp atomic
2647 #endif
2648  m_heaps_size += m_proc[p]->heap_add_size;
2649 
2650  m_proc[p]->heap_add_size = 0;
2651  flush_insertion_heap(p);
2652  }
2653 
2654  assert(insheap.size() < insheap.capacity());
2655 
2656  // put item onto heap and siftUp
2657  insheap.push_back(element);
2658  std::push_heap(insheap.begin(), insheap.end(), m_compare);
2659  }
2660  else if (!m_is_very_large_bulk && 1)
2661  {
2662  // if small bulk: if heap is full -> sort locally and put into
2663  // internal array list. insert items but DO NOT keep heap
2664  // invariant.
2665  if (UNLIKELY(insheap.size() >= m_insertion_heap_capacity)) {
2666 #if STXXL_PARALLEL
2667 #pragma omp atomic
2668 #endif
2669  m_heaps_size += m_proc[p]->heap_add_size;
2670 
2671  m_proc[p]->heap_add_size = 0;
2672  flush_insertion_heap(p);
2673  }
2674 
2675  assert(insheap.size() < insheap.capacity());
2676 
2677  // put item onto heap and DO NOT siftUp
2678  insheap.push_back(element);
2679  }
2680  else // m_is_very_large_bulk
2681  {
2682  if (UNLIKELY(insheap.size() >= 2 * 1024 * 1024)) {
2683 #if STXXL_PARALLEL
2684 #pragma omp atomic
2685 #endif
2686  m_heaps_size += m_proc[p]->heap_add_size;
2687 
2688  m_proc[p]->heap_add_size = 0;
2689  flush_insertion_heap(p);
2690  }
2691 
2692  assert(insheap.size() < insheap.capacity());
2693 
2694  // put onto insertion heap but do not keep heap property
2695  insheap.push_back(element);
2696  }
2697 
2698  m_proc[p]->heap_add_size++;
2699  }
2700 
2701  /*!
2702  * Push an element inside a bulk sequence of pushes. Run bulk_push_begin()
2703  * before using this method. This function uses the insertion heap id =
2704  * omp_get_thread_num().
2705  *
2706  * \param element The element to push.
2707  */
2708  void bulk_push(const value_type& element)
2709  {
2710 #if STXXL_PARALLEL
2711  return bulk_push(element, (unsigned_type)omp_get_thread_num());
2712 #else
2713  unsigned_type id = m_rng() % m_num_insertion_heaps;
2714  return bulk_push(element, id);
2715 #endif
2716  }
2717 
2718  /*!
2719  * Ends a sequence of push operations. Run bulk_push_begin() and some
2720  * bulk_push() before this.
2721  */
2723  {
2724  assert(m_in_bulk_push);
2725  m_in_bulk_push = false;
2726 
2727  if (!m_is_very_large_bulk && 0)
2728  {
2729  for (int_type p = 0; p < m_num_insertion_heaps; ++p)
2730  {
2731  m_heaps_size += m_proc[p]->heap_add_size;
2732 
2733  if (!m_proc[p]->insertion_heap.empty())
2734  m_minima.update_heap(p);
2735  }
2736  }
2737  else if (!m_is_very_large_bulk && 1)
2738  {
2739 #if STXXL_PARALLEL
2740 #pragma omp parallel for
2741 #endif
2742  for (int_type p = 0; p < m_num_insertion_heaps; ++p)
2743  {
2744  // reestablish heap property: siftUp only those items pushed
2745  for (unsigned_type index = m_proc[p]->heap_add_size; index != 0; ) {
2746  std::push_heap(m_proc[p]->insertion_heap.begin(),
2747  m_proc[p]->insertion_heap.end() - (--index),
2748  m_compare);
2749  }
2750 
2751 #if STXXL_PARALLEL
2752 #pragma omp atomic
2753 #endif
2754  m_heaps_size += m_proc[p]->heap_add_size;
2755  }
2756 
2757  for (int_type p = 0; p < m_num_insertion_heaps; ++p)
2758  {
2759  if (!m_proc[p]->insertion_heap.empty())
2760  m_minima.update_heap(p);
2761  }
2762  }
2763  else // m_is_very_large_bulk
2764  {
2765 #if STXXL_PARALLEL
2766 #pragma omp parallel for
2767 #endif
2768  for (int_type p = 0; p < m_num_insertion_heaps; ++p)
2769  {
2770  if (m_proc[p]->insertion_heap.size() >= m_insertion_heap_capacity) {
2771  // flush out overfull insertion heap arrays
2772 #if STXXL_PARALLEL
2773 #pragma omp atomic
2774 #endif
2775  m_heaps_size += m_proc[p]->heap_add_size;
2776 
2777  m_proc[p]->heap_add_size = 0;
2778  flush_insertion_heap(p);
2779  }
2780  else {
2781  // reestablish heap property: siftUp only those items pushed
2782  for (unsigned_type index = m_proc[p]->heap_add_size; index != 0; ) {
2783  std::push_heap(m_proc[p]->insertion_heap.begin(),
2784  m_proc[p]->insertion_heap.end() - (--index),
2785  m_compare);
2786  }
2787 
2788 #if STXXL_PARALLEL
2789 #pragma omp atomic
2790 #endif
2791  m_heaps_size += m_proc[p]->heap_add_size;
2792  m_proc[p]->heap_add_size = 0;
2793  }
2794  }
2795 
2796  for (int_type p = 0; p < m_num_insertion_heaps; ++p)
2797  {
2798  if (!m_proc[p]->insertion_heap.empty())
2799  m_minima.update_heap(p);
2800  }
2801  }
2802 
2803  if (m_bulk_first_delayed_external_array != m_external_arrays.size()) {
2804  STXXL_DEBUG("bulk_push_end: run delayed re-hinting of EAs");
2805  rebuild_hint_tree();
2806  }
2807 
2808  check_invariants();
2809  }
2810 
2811  //! Extract up to max_size values at once.
2812  void bulk_pop(std::vector<value_type>& out, size_t max_size)
2813  {
2814  STXXL_DEBUG("bulk_pop_size with max_size=" << max_size);
2815 
2816  const size_t n_elements = std::min<size_t>(max_size, size());
2817  assert(n_elements < m_extract_buffer_limit);
2818 
2819  if (m_heaps_size > 0)
2820  flush_insertion_heaps();
2821 
2822  convert_eb_into_ia();
2823 
2824  refill_extract_buffer(n_elements, n_elements);
2825 
2826  out.resize(0);
2827  using std::swap;
2828  swap(m_extract_buffer, out);
2829  m_extract_buffer_index = 0;
2830  m_extract_buffer_size = 0;
2831  m_minima.deactivate_extract_buffer();
2832 
2833  check_invariants();
2834  }
2835 
2836  //! Extracts all elements which are greater or equal to a given limit.
2837  //! \param out result vector
2838  //! \param limit limit value
2839  //! \param max_size maximum number of items to extract
2840  //! \return true if the buffer contains all items < limit, false it was too
2841  //! small.
2842  bool bulk_pop_limit(std::vector<value_type>& out, const value_type& limit,
2843  size_t max_size = std::numeric_limits<size_t>::max())
2844  {
2845  STXXL_DEBUG("bulk_pop_limit with limit=" << limit);
2846 
2847  convert_eb_into_ia();
2848 
2849  if (m_heaps_size > 0) {
2850  if (0)
2851  flush_insertion_heaps();
2852  else if (1)
2853  flush_insertion_heaps_with_limit(limit);
2854  }
2855 
2856  size_type ias = m_internal_arrays.size();
2857  size_type eas = m_external_arrays.size();
2858  std::vector<size_type> sizes(eas + ias);
2859  std::vector<iterator_pair_type> sequences(eas + ias);
2860  size_type output_size = 0;
2861 
2862  int limiting_ea_index = m_external_min_tree.top();
2863 
2864  // pop limit may have to change due to memory limit
2865  value_type this_limit = limit;
2866  bool has_full_range = true;
2867 
2868  // get all relevant blocks
2869  while (limiting_ea_index > -1)
2870  {
2871  const value_type& ea_limit =
2872  m_external_arrays[limiting_ea_index].get_next_block_min();
2873 
2874  if (m_compare(ea_limit, this_limit)) {
2875  // No more EM data smaller or equal to limit
2876  break;
2877  }
2878 
2879  if (m_external_arrays[limiting_ea_index].num_hinted_blocks() == 0) {
2880  // No more read/prefetch blocks available for EA
2881  this_limit = ea_limit;
2882  has_full_range = false;
2883  break;
2884  }
2885 
2886  wait_next_ea_blocks(limiting_ea_index);
2887  // consider next limiting EA
2888  limiting_ea_index = m_external_min_tree.top();
2889  STXXL_ASSERT(limiting_ea_index < (int)eas);
2890  }
2891 
2892  // build sequences
2893  for (size_type i = 0; i < eas + ias; ++i) {
2894  iterator begin, end;
2895 
2896  if (i < eas) {
2897  assert(!m_external_arrays[i].empty());
2898  assert(m_external_arrays[i].valid());
2899  begin = m_external_arrays[i].begin();
2900  end = m_external_arrays[i].end();
2901  }
2902  else {
2903  size_type j = i - eas;
2904  assert(!(m_internal_arrays[j].empty()));
2905  begin = m_internal_arrays[j].begin();
2906  end = m_internal_arrays[j].end();
2907  }
2908 
2909  end = std::lower_bound(begin, end, this_limit, m_inv_compare);
2910 
2911  sizes[i] = std::distance(begin, end);
2912  sequences[i] = std::make_pair(begin, end);
2913  }
2914 
2915  output_size = std::accumulate(sizes.begin(), sizes.end(), 0);
2916  if (output_size > max_size) {
2917  output_size = max_size;
2918  has_full_range = false;
2919  }
2920  out.resize(output_size);
2921 
2922  STXXL_DEBUG("bulk_pop_limit with" <<
2923  " sequences=" << sequences.size() <<
2924  " output_size=" << output_size <<
2925  " has_full_range=" << has_full_range);
2926 
2928  sequences.begin(), sequences.end(),
2929  out.begin(), output_size, m_inv_compare);
2930 
2931  advance_arrays(sequences, sizes, eas, ias);
2932 
2933  check_invariants();
2934 
2935  return has_full_range;
2936  }
2937 
2938 #if TODO_MAYBE_FIXUP_LATER
2939  /*!
2940  * Insert a vector of elements at one time.
2941  * \param elements Vector containing the elements to push.
2942  * Attention: elements vector may be owned by the PQ afterwards.
2943  */
2944  void bulk_push_vector(std::vector<value_type>& elements)
2945  {
2946  size_type heap_capacity = m_num_insertion_heaps * m_insertion_heap_capacity;
2947  if (elements.size() > heap_capacity / 2) {
2948  flush_array(elements);
2949  return;
2950  }
2951 
2952  bulk_push_begin(elements.size());
2953 #if STXXL_PARALLEL
2954  #pragma omp parallel
2955  {
2956  const unsigned thread_num = omp_get_thread_num();
2957  #pragma omp parallel for
2958  for (size_type i = 0; i < elements.size(); ++i) {
2959  bulk_push(elements[i], thread_num);
2960  }
2961  }
2962 #else
2963  const unsigned thread_num = m_rng() % m_num_insertion_heaps;
2964  for (size_type i = 0; i < elements.size(); ++i) {
2965  bulk_push(elements[i], thread_num);
2966  }
2967 #endif
2968  bulk_push_end();
2969  }
2970 #endif
2971 
2972  //! \}
2973 
2974  //! \name Aggregation Operations
2975  //! \{
2976 
2977  /*!
2978  * Aggregate pushes. Use flush_aggregated_pushes() to finally push
2979  * them. extract_min is allowed is allowed in between the aggregation of
2980  * pushes if you can assure, that the extracted value is smaller than all
2981  * of the aggregated values.
2982  * \param element The element to push.
2983  */
2984  void aggregate_push(const value_type& element)
2985  {
2986  m_aggregated_pushes.push_back(element);
2987  }
2988 
2989 #if TODO_MAYBE_FIXUP_LATER
2990  /*!
2991  * Insert the aggregated values into the queue using push(), bulk insert,
2992  * or sorting, depending on the number of aggregated values.
2993  */
2994  void flush_aggregated_pushes()
2995  {
2996  size_type size = m_aggregated_pushes.size();
2997  size_type ram_internal = 2 * size * sizeof(value_type); // ram for the sorted array + part of the ram for the merge buffer
2998  size_type heap_capacity = m_num_insertion_heaps * m_insertion_heap_capacity;
2999 
3000  if (ram_internal > m_mem_for_heaps / 2) {
3001  flush_array(m_aggregated_pushes);
3002  }
3003  else if ((m_aggregated_pushes.size() > c_single_insert_limit) && (m_aggregated_pushes.size() < heap_capacity)) {
3004  bulk_push_vector(m_aggregated_pushes);
3005  }
3006  else {
3007  for (value_iterator i = m_aggregated_pushes.begin(); i != m_aggregated_pushes.end(); ++i) {
3008  push(*i);
3009  }
3010  }
3011 
3012  m_aggregated_pushes.clear();
3013  }
3014 #endif
3015  //! \}
3016 
3017  //! \name std::priority_queue compliant operations
3018  //! \{
3019 
3020  /*!
3021  * Insert new element
3022  * \param element the element to insert.
3023  * \param p number of insertion heap to insert item into
3024  */
3025  void push(const value_type& element, unsigned_type p = 0)
3026  {
3027  assert(!m_in_bulk_push && !m_limit_extract);
3028 
3029  heap_type& insheap = m_proc[p]->insertion_heap;
3030 
3031  if (insheap.size() >= m_insertion_heap_capacity) {
3032  flush_insertion_heap(p);
3033  }
3034 
3035  // push item to end of heap and siftUp
3036  insheap.push_back(element);
3037  unsigned_type index = push_heap(insheap.begin(), insheap.end(),
3038  m_compare);
3039  ++m_heaps_size;
3040 
3041  if (insheap.size() == 1 || index == 0)
3042  m_minima.update_heap(p);
3043  }
3044 
3045  //! Access the minimum element.
3046  const value_type & top()
3047  {
3048  assert(!m_in_bulk_push && !m_limit_extract);
3049  assert(!empty());
3050 
3051  if (extract_buffer_empty()) {
3052  refill_extract_buffer(std::min(m_extract_buffer_limit,
3053  m_internal_size + m_external_size));
3054  }
3055 
3056  static const bool debug = false;
3057 
3058  std::pair<unsigned, unsigned> type_and_index = m_minima.top();
3059  const unsigned& type = type_and_index.first;
3060  const unsigned& index = type_and_index.second;
3061 
3062  assert(type < 4);
3063 
3064  switch (type) {
3065  case minima_type::HEAP:
3066  STXXL_DEBUG("heap " << index <<
3067  ": " << m_proc[index]->insertion_heap[0]);
3068  return m_proc[index]->insertion_heap[0];
3069  case minima_type::IA:
3070  STXXL_DEBUG("ia " << index <<
3071  ": " << m_internal_arrays[index].get_min());
3072  return m_internal_arrays[index].get_min();
3073  case minima_type::EB:
3074  STXXL_DEBUG("eb " << m_extract_buffer_index <<
3075  ": " << m_extract_buffer[m_extract_buffer_index]);
3076  return m_extract_buffer[m_extract_buffer_index];
3077  default:
3078  STXXL_ERRMSG("Unknown extract type: " << type);
3079  abort();
3080  }
3081  }
3082 
3083  //! Remove the minimum element.
3084  void pop()
3085  {
3086  assert(!m_in_bulk_push && !m_limit_extract);
3087 
3088  m_stats.num_extracts++;
3089 
3090  if (extract_buffer_empty()) {
3091  refill_extract_buffer(std::min(m_extract_buffer_limit,
3092  m_internal_size + m_external_size));
3093  }
3094 
3095  m_stats.extract_min_time.start();
3096 
3097  std::pair<unsigned, unsigned> type_and_index = m_minima.top();
3098  unsigned type = type_and_index.first;
3099  unsigned index = type_and_index.second;
3100 
3101  assert(type < 4);
3102 
3103  switch (type) {
3104  case minima_type::HEAP:
3105  {
3106  heap_type& insheap = m_proc[index]->insertion_heap;
3107 
3108  m_stats.pop_heap_time.start();
3109  std::pop_heap(insheap.begin(), insheap.end(), m_compare);
3110  insheap.pop_back();
3111  m_stats.pop_heap_time.stop();
3112 
3113  m_heaps_size--;
3114 
3115  if (!insheap.empty())
3116  m_minima.update_heap(index);
3117  else
3118  m_minima.deactivate_heap(index);
3119 
3120  break;
3121  }
3122  case minima_type::IA:
3123  {
3124  m_internal_arrays[index].inc_min();
3125  m_internal_size--;
3126 
3127  if (!(m_internal_arrays[index].empty()))
3128  m_minima.update_internal_array(index);
3129  else
3130  // internal array has run empty
3131  m_minima.deactivate_internal_array(index);
3132 
3133  break;
3134  }
3135  case minima_type::EB:
3136  {
3137  ++m_extract_buffer_index;
3138  assert(m_extract_buffer_size > 0);
3139  --m_extract_buffer_size;
3140 
3141  if (!extract_buffer_empty())
3142  m_minima.update_extract_buffer();
3143  else
3144  m_minima.deactivate_extract_buffer();
3145 
3146  break;
3147  }
3148  default:
3149  STXXL_ERRMSG("Unknown extract type: " << type);
3150  abort();
3151  }
3152 
3153  m_stats.extract_min_time.stop();
3154 
3155  check_invariants();
3156  }
3157 
3158  //! \}
3159 
3160  //! \name Bulk-Limit Operations
3161  //! \{
3162 
3163 protected:
3164  //! current limit element
3166 
3167  //! flag if inside a bulk limit extract session
3169 
3170  //! flag if the extract buffer contains the full limit range
3172 
3173 public:
3174  //! Begin bulk-limit extraction session with limit element.
3175  void limit_begin(const value_type& limit, size_type bulk_size)
3176  {
3177  m_limit_extract = true;
3178  m_limit_element = limit;
3179 
3180  std::vector<value_type> new_extract_buffer;
3181  m_limit_has_full_range =
3182  bulk_pop_limit(new_extract_buffer, limit, m_extract_buffer_limit);
3183  std::swap(new_extract_buffer, m_extract_buffer);
3184 
3185  m_extract_buffer_index = 0;
3186  m_extract_buffer_size = m_extract_buffer.size();
3187  if (m_extract_buffer_size)
3188  m_minima.update_extract_buffer();
3189  else
3190  m_minima.deactivate_extract_buffer();
3191 
3192  bulk_push_begin(bulk_size);
3193  }
3194 
3195  //! Push new item >= bulk-limit element into insertion heap p.
3196  void limit_push(const value_type& element, const unsigned_type p = 0)
3197  {
3198  assert(m_limit_extract);
3199  assert(!m_compare(m_limit_element, element));
3200 
3201  return bulk_push(element, p);
3202  }
3203 
3204  //! Access the minimum element, which can only be in the extract buffer.
3206  {
3207  assert(m_limit_extract);
3208 
3209  // if buffer is empty and we extracted the full range last time, return
3210  // limit items as sentinel.
3211  if (m_extract_buffer_size == 0 && m_limit_has_full_range)
3212  return m_limit_element;
3213 
3214  if (extract_buffer_empty())
3215  {
3216  // extract more items
3217  std::vector<value_type> new_extract_buffer;
3218  m_limit_has_full_range =
3219  bulk_pop_limit(new_extract_buffer, m_limit_element,
3220  m_extract_buffer_limit);
3221 
3222  std::swap(new_extract_buffer, m_extract_buffer);
3223 
3224  m_extract_buffer_index = 0;
3225  m_extract_buffer_size = m_extract_buffer.size();
3226  if (m_extract_buffer_size)
3227  m_minima.update_extract_buffer();
3228  else
3229  m_minima.deactivate_extract_buffer();
3230  }
3231 
3232  return m_extract_buffer[m_extract_buffer_index];
3233  }
3234 
3235  //! Remove the minimum element, only works correctly while elements < L.
3236  void limit_pop()
3237  {
3238  assert(m_limit_extract);
3239 
3240  ++m_extract_buffer_index;
3241  assert(m_extract_buffer_size > 0);
3242  --m_extract_buffer_size;
3243 
3244  if (extract_buffer_empty() && !m_limit_has_full_range)
3245  {
3246  // extract more items
3247  std::vector<value_type> new_extract_buffer;
3248  m_limit_has_full_range =
3249  bulk_pop_limit(new_extract_buffer, m_limit_element,
3250  m_extract_buffer_limit);
3251 
3252  std::swap(new_extract_buffer, m_extract_buffer);
3253 
3254  m_extract_buffer_index = 0;
3255  m_extract_buffer_size = m_extract_buffer.size();
3256  if (m_extract_buffer_size)
3257  m_minima.update_extract_buffer();
3258  else
3259  m_minima.deactivate_extract_buffer();
3260  }
3261  }
3262 
3263  //! Finish bulk-limit extraction session.
3264  void limit_end()
3265  {
3266  assert(m_limit_extract);
3267 
3268  bulk_push_end();
3269 
3270  m_limit_extract = false;
3271  }
3272 
3273  //! \}
3274 
3275 protected:
3276  //! Flushes all elements of the insertion heaps which are greater
3277  //! or equal to a given limit.
3278  //! \param limit limit value
3280  {
3281  // perform extract for all items < L into back of insertion_heap
3282  std::vector<unsigned_type> back_size(m_num_insertion_heaps);
3283 
3284 //#if STXXL_PARALLEL
3285 //#pragma omp parallel for
3286 //#endif
3287  for (size_t p = 0; p < m_num_insertion_heaps; ++p)
3288  {
3289  heap_type& insheap = m_proc[p]->insertion_heap;
3290 
3291  typename heap_type::iterator back = insheap.end();
3292 
3293  while (back != insheap.begin() &&
3294  m_compare(limit, insheap[0]))
3295  {
3296  // while top < L, perform pop_heap: put top to back and
3297  // siftDown new items (shortens heap by one)
3298  std::pop_heap(insheap.begin(), back, m_compare);
3299  --back;
3300  }
3301 
3302  // range insheap.begin() + back to insheap.end() is < L, rest >= L.
3303 
3304  for (typename heap_type::const_iterator it = insheap.begin();
3305  it != insheap.end(); ++it)
3306  {
3307  if (it < back)
3308  assert(!m_compare(limit, *it));
3309  else
3310  assert(m_compare(limit, *it));
3311  }
3312 
3313  back_size[p] = insheap.end() - back;
3314  }
3315 
3316  // put items from insertion heaps into an internal array
3317  unsigned_type back_sum = std::accumulate(
3318  back_size.begin(), back_size.end(), unsigned_type(0));
3319 
3320  STXXL_DEBUG("flush_insertion_heaps_with_limit(): back_sum = " << back_sum);
3321 
3322  if (back_sum)
3323  {
3324  // test that enough RAM is available for remaining items
3325  flush_ia_ea_until_memory_free(back_sum * sizeof(value_type));
3326 
3327  std::vector<value_type> values(back_sum);
3328 
3329  // copy items into values vector
3330  typename std::vector<value_type>::iterator vi = values.begin();
3331  for (size_t p = 0; p < m_num_insertion_heaps; ++p)
3332  {
3333  heap_type& insheap = m_proc[p]->insertion_heap;
3334 
3335  std::copy(insheap.end() - back_size[p], insheap.end(), vi);
3336  vi += back_size[p];
3337  insheap.resize(insheap.size() - back_size[p]);
3338 
3339  if (insheap.empty())
3340  m_minima.deactivate_heap(p);
3341  else
3342  m_minima.update_heap(p);
3343  }
3344 
3345  potentially_parallel::sort(values.begin(), values.end(), m_inv_compare);
3346 
3347  add_as_internal_array(values);
3348  m_heaps_size -= back_sum;
3349  }
3350  }
3351 
3352 public:
3353  /*!
3354  * Merges all external arrays and all internal arrays into one external array.
3355  * Public for benchmark purposes.
3356  */
3358  {
3359  STXXL_ERRMSG("Merging external arrays. This should not happen."
3360  << " You should adjust memory assignment and/or external array level size.");
3361  check_external_level(0, true);
3362  STXXL_DEBUG("Merging all external arrays done.");
3363 
3364  resize_read_pool();
3365 
3366  // Rebuild hint tree completely as the hint sequence may have changed.
3367  if (!m_in_bulk_push)
3368  rebuild_hint_tree();
3369  else
3370  assert(m_external_arrays.size() - 1 >= m_bulk_first_delayed_external_array);
3371 
3372  check_invariants();
3373  }
3374 
3375  //! Free up memory by flushing internal arrays and combining external
3376  //! arrays until enough bytes are free.
3378  {
3379  if (m_mem_left >= mem_free) return;
3380 
3381  if (m_internal_size > 0) {
3382  flush_internal_arrays();
3383  }
3384  else {
3385  merge_external_arrays();
3386  }
3387 
3388  assert(m_mem_left >= mem_free);
3389  }
3390 
3391  //! Automatically resize the read/prefetch buffer pool depending on number
3392  //! of external arrays.
3394  {
3395  unsigned_type new_num_read_blocks =
3396  m_num_read_blocks_per_ea * m_external_arrays.size();
3397 
3398  STXXL_DEBUG("resize_read_pool:" <<
3399  " m_num_read_blocks=" << m_num_read_blocks <<
3400  " ea_size=" << m_external_arrays.size() <<
3401  " m_num_read_blocks_per_ea=" << m_num_read_blocks_per_ea <<
3402  " new_num_read_blocks=" << new_num_read_blocks <<
3403  " free_size_prefetch=" << m_pool.free_size_prefetch() <<
3404  " m_num_hinted_blocks=" << m_num_hinted_blocks <<
3405  " m_num_used_read_blocks=" << m_num_used_read_blocks);
3406 
3407  // add new blocks
3408  if (new_num_read_blocks > m_num_read_blocks)
3409  {
3410  unsigned_type mem_needed =
3411  (new_num_read_blocks - m_num_read_blocks) * block_size;
3412 
3413  // -tb: this may recursively call this function!
3414  //flush_ia_ea_until_memory_free(mem_needed);
3415  STXXL_ASSERT(m_mem_left >= mem_needed);
3416 
3417  while (new_num_read_blocks > m_num_read_blocks) {
3418  block_type* new_block = new block_type();
3419  m_pool.add_prefetch(new_block);
3420  ++m_num_read_blocks;
3421  }
3422 
3423  m_mem_left -= mem_needed;
3424  }
3425 
3426  // steal extra blocks (as many as possible)
3427  if (new_num_read_blocks < m_num_read_blocks)
3428  {
3429  while (new_num_read_blocks < m_num_read_blocks &&
3430  m_pool.free_size_prefetch() > 0)
3431  {
3432  block_type* del_block = m_pool.steal_prefetch();
3433  delete del_block;
3434  --m_num_read_blocks;
3435  m_mem_left += block_size;
3436  }
3437 
3438  if (new_num_read_blocks < m_num_read_blocks)
3439  STXXL_ERRMSG("WARNING: could not immediately reduce read/prefetch pool!");
3440  }
3441  }
3442 
3443  //! Rebuild hint tree completely as the hint sequence may have changed, and
3444  //! re-hint the correct block sequence.
3446  {
3447  m_stats.hint_time.start();
3448 
3449  // prepare rehinting sequence: reset hint begin pointer
3450  for (size_t i = 0; i < m_external_arrays.size(); ++i)
3451  m_external_arrays[i].rebuild_hints_prepare();
3452 
3453  // rebuild hint tree with first elements
3454  for (size_t i = 0; i < m_external_arrays.size(); ++i)
3455  {
3456  if (m_external_arrays[i].has_unhinted_em_data()) {
3457  m_hint_tree.activate_without_replay(i);
3458  }
3459  else {
3460  m_hint_tree.deactivate_without_replay(i);
3461  }
3462  }
3463  m_hint_tree.rebuild();
3464 
3465  // virtually release all hints
3466  unsigned_type free_prefetch_blocks =
3467  m_pool.free_size_prefetch() + m_num_hinted_blocks;
3468  m_num_hinted_blocks = 0;
3469 
3470  int gmin_index;
3471  while (free_prefetch_blocks > 0 &&
3472  (gmin_index = m_hint_tree.top()) >= 0)
3473  {
3474  assert((size_t)gmin_index < m_external_arrays.size());
3475 
3476  STXXL_DEBUG("Give pre-hint in EA[" << gmin_index << "] min " <<
3477  m_external_arrays[gmin_index].get_next_hintable_min());
3478 
3479  m_external_arrays[gmin_index].rebuild_hints_prehint_next_block();
3480  --free_prefetch_blocks;
3481  ++m_num_hinted_blocks;
3482 
3483  if (m_external_arrays[gmin_index].has_unhinted_em_data()) {
3484  m_hint_tree.replay_on_change(gmin_index);
3485  }
3486  else {
3487  m_hint_tree.deactivate_player(gmin_index);
3488  }
3489  }
3490 
3491  // invalidate all hinted blocks no longer needed
3492  for (size_t i = 0; i < m_external_arrays.size(); ++i)
3493  m_external_arrays[i].rebuild_hints_cancel();
3494 
3495  // perform real hinting on pre-hinted blocks
3496  for (size_t i = 0; i < m_external_arrays.size(); ++i)
3497  m_external_arrays[i].rebuild_hints_finish();
3498 
3499  assert(free_prefetch_blocks == m_pool.free_size_prefetch());
3500 
3501  m_stats.hint_time.stop();
3502  }
3503 
3504  //! Updates the prefetch prediction tree afer a remove_items(), which frees
3505  //! up blocks.
3506  //! \param ea_index index of the external array in question
3507  inline void update_hint_tree(size_t ea_index)
3508  {
3509  m_stats.hint_time.start();
3510  if (m_external_arrays[ea_index].has_unhinted_em_data()) {
3511  m_hint_tree.replay_on_change(ea_index);
3512  }
3513  else {
3514  m_hint_tree.deactivate_player(ea_index);
3515  }
3516  m_stats.hint_time.stop();
3517  }
3518 
3519  //! Updates the external min tree afer a remove() or a
3520  //! wait_next_blocks() call.
3521  //! \param ea_index index of the external array in question
3522  inline void update_external_min_tree(size_t ea_index)
3523  {
3524  if (m_external_arrays[ea_index].has_em_data()) {
3525  m_external_min_tree.replay_on_change(ea_index);
3526  }
3527  else {
3528  m_external_min_tree.deactivate_player(ea_index);
3529  }
3530  }
3531 
3532  //! Hints EA blocks which will be needed soon. Hints at most
3533  //! m_num_prefetchers blocks globally.
3534  inline void hint_external_arrays()
3535  {
3536  m_stats.hint_time.start();
3537 
3538  STXXL_DEBUG("hint_external_arrays()"
3539  " for free_size_prefetch=" << m_pool.free_size_prefetch());
3540 
3541  int gmin_index;
3542  while (m_pool.free_size_prefetch() > 0 &&
3543  (gmin_index = m_hint_tree.top()) >= 0)
3544  {
3545  assert((size_t)gmin_index < m_external_arrays.size());
3546 
3547  STXXL_DEBUG("Give hint in EA[" << gmin_index << "]");
3548  m_external_arrays[gmin_index].hint_next_block();
3549  ++m_num_hinted_blocks;
3550 
3551  if (m_external_arrays[gmin_index].has_unhinted_em_data()) {
3552  m_hint_tree.replay_on_change(gmin_index);
3553  }
3554  else {
3555  m_hint_tree.deactivate_player(gmin_index);
3556  }
3557  }
3558 
3559  m_stats.hint_time.stop();
3560  }
3561 
3562  //! Print statistics.
3563  void print_stats() const
3564  {
3565  STXXL_VARDUMP(c_merge_sorted_heaps);
3566  STXXL_VARDUMP(c_limit_extract_buffer);
3567  STXXL_VARDUMP(c_single_insert_limit);
3568 
3569  if (c_limit_extract_buffer) {
3570  STXXL_VARDUMP(m_extract_buffer_limit);
3571  STXXL_MEMDUMP(m_extract_buffer_limit * sizeof(value_type));
3572  }
3573 
3574 #if STXXL_PARALLEL
3575  STXXL_VARDUMP(omp_get_max_threads());
3576 #endif
3577 
3578  STXXL_MEMDUMP(m_mem_for_heaps);
3579  STXXL_MEMDUMP(m_mem_left);
3580 
3581  //if (num_extract_buffer_refills > 0) {
3582  // STXXL_VARDUMP(total_extract_buffer_size / num_extract_buffer_refills);
3583  // STXXL_MEMDUMP(total_extract_buffer_size / num_extract_buffer_refills * sizeof(value_type));
3584  //}
3585 
3586  STXXL_MSG(m_stats);
3587  m_minima.print_stats();
3588  }
3589 
3590 protected:
3591  //! Calculates the sequences vector needed by the multiway merger,
3592  //! considering inaccessible data from external arrays.
3593  //! The sizes vector stores the size of each sequence.
3594  //! \param reuse_previous_lower_bounds Reuse upper bounds from previous runs.
3595  //! sequences[i].second must be valid upper bound iterator from a previous run!
3596  //! \returns the index of the external array which is limiting factor
3597  //! or m_external_arrays.size() if not limited.
3598  size_t calculate_merge_sequences(std::vector<size_type>& sizes,
3599  std::vector<iterator_pair_type>& sequences,
3600  bool reuse_previous_lower_bounds = false)
3601  {
3602  STXXL_DEBUG("calculate merge sequences");
3603 
3604  static const bool debug = false;
3605 
3606  const size_type eas = m_external_arrays.size();
3607  const size_type ias = m_internal_arrays.size();
3608 
3609  assert(sizes.size() == eas + ias);
3610  assert(sequences.size() == eas + ias);
3611 
3612  /*
3613  * determine minimum of each first block
3614  */
3615 
3616  int gmin_index = m_external_min_tree.top();
3617  bool needs_limit = (gmin_index >= 0) ? true : false;
3618 
3619 // test correctness of external block min tree
3620 #ifdef STXXL_DEBUG_ASSERTIONS
3621 
3622  bool test_needs_limit = false;
3623  int test_gmin_index = 0;
3624  value_type test_gmin_value;
3625 
3626  m_stats.refill_minmax_time.start();
3627  for (size_type i = 0; i < eas; ++i) {
3628  if (m_external_arrays[i].has_em_data()) {
3629  const value_type& min_value =
3630  m_external_arrays[i].get_next_block_min();
3631 
3632  if (!test_needs_limit) {
3633  test_needs_limit = true;
3634  test_gmin_value = min_value;
3635  test_gmin_index = i;
3636  }
3637  else {
3638  STXXL_DEBUG("min[" << i << "]: " << min_value <<
3639  " test: " << test_gmin_value <<
3640  ": " << m_inv_compare(min_value, test_gmin_value));
3641  if (m_inv_compare(min_value, test_gmin_value)) {
3642  test_gmin_value = min_value;
3643  test_gmin_index = i;
3644  }
3645  }
3646  }
3647  }
3648  m_stats.refill_minmax_time.stop();
3649 
3650  STXXL_ASSERT(needs_limit == test_needs_limit);
3651  STXXL_ASSERT(!needs_limit || gmin_index == test_gmin_index);
3652 
3653 #endif
3654 
3655  /*
3656  * calculate size and create sequences to merge
3657  */
3658 
3659 #if STXXL_PARALLEL
3660 // #pragma omp parallel for if(eas + ias > m_num_insertion_heaps)
3661 #endif
3662  for (size_type i = 0; i < eas + ias; ++i) {
3663  iterator begin, end;
3664 
3665  if (i < eas) {
3666  begin = m_external_arrays[i].begin();
3667  end = m_external_arrays[i].end();
3668  }
3669  else {
3670  size_type j = i - eas;
3671  begin = m_internal_arrays[j].begin();
3672  end = m_internal_arrays[j].end();
3673  }
3674 
3675  if (needs_limit) {
3676  const value_type& gmin_value =
3677  m_external_arrays[gmin_index].get_next_block_min();
3678 
3679  // remove timer if parallel
3680  //stats.refill_lower_bound_time.start();
3681  if (reuse_previous_lower_bounds) {
3682  // Be careful that sequences[i].second is really valid and
3683  // set by a previous calculate_merge_sequences() run!
3684  end = std::lower_bound(sequences[i].second, end,
3685  gmin_value, m_inv_compare);
3686  }
3687  else
3688  {
3689  end = std::lower_bound(begin, end,
3690  gmin_value, m_inv_compare);
3691  }
3692  //stats.refill_lower_bound_time.stop();
3693  }
3694 
3695  sizes[i] = std::distance(begin, end);
3696  sequences[i] = std::make_pair(begin, end);
3697 
3698  STXXL_DEBUG("sequence[" << i << "] " << (i < eas ? "ea " : "ia ") <<
3699  begin << " - " << end <<
3700  " size " << sizes[i] <<
3701  (needs_limit ? " with ub limit" : ""));
3702  }
3703 
3704  if (needs_limit) {
3705  STXXL_DEBUG("return with needs_limit: gmin_index=" << gmin_index);
3706  return gmin_index;
3707  }
3708  else {
3709  STXXL_DEBUG("return with needs_limit: eas=" << eas);
3710  return eas;
3711  }
3712  }
3713 
3714 protected:
3715  //! Convert extract buffer into a new internal array.
3716  void convert_eb_into_ia(bool do_not_flush = false)
3717  {
3718  if (m_extract_buffer_size == 0) return;
3719 
3720  STXXL_DEBUG("convert_eb_into_ia");
3721 
3722  // tb: if in limit sequence and the EB gets flushed out to EM, then we
3723  // have to re-merge items into the EB instead of returning the
3724  // sentinel.
3725  m_limit_has_full_range = false;
3726 
3727  // TODO: memory is NOT allocated, but extract buffer is currently not
3728  // counted
3729  if (!do_not_flush)
3730  flush_ia_ea_until_memory_free(
3731  internal_array_type::int_memory(m_extract_buffer.size())
3732  );
3733 
3734  if (m_extract_buffer_size == 0) return;
3735 
3736  // first deactivate extract buffer to replay tree for new IA.
3737  m_minima.deactivate_extract_buffer();
3738 
3739  // add eb as internal array with current index
3740  add_as_internal_array(m_extract_buffer, m_extract_buffer_index);
3741 
3742  m_extract_buffer_index = 0;
3743  m_extract_buffer_size = 0;
3744  }
3745 
3746  //! Refills the extract buffer from the external arrays.
3747  //! \param minimum_size requested minimum size of the resulting extract buffer.
3748  //! Prints a warning if there is not enough data to reach this size.
3749  //! \param maximum_size maximum size of the extract buffer. Using
3750  //! m_extract_buffer_limit if set to 0.
3751  inline void refill_extract_buffer(size_t minimum_size = 0,
3752  size_t maximum_size = 0)
3753  {
3754  STXXL_DEBUG("refilling extract buffer" <<
3755  " ia_size=" << m_internal_arrays.size() <<
3756  " ea_size=" << m_external_arrays.size());
3757 
3758  if (maximum_size == 0)
3759  maximum_size = m_extract_buffer_limit;
3760 
3761  check_invariants();
3762 
3763  assert(extract_buffer_empty());
3764  m_extract_buffer_index = 0;
3765 
3766  cleanup_external_arrays();
3767 
3768  size_type ias, eas = m_external_arrays.size();
3769 
3770  m_minima.clear_internal_arrays();
3771  cleanup_internal_arrays();
3772  ias = m_internal_arrays.size();
3773 
3774  if (eas == 0 && ias == 0) {
3775  m_extract_buffer.resize(0);
3776  m_minima.deactivate_extract_buffer();
3777  return;
3778  }
3779 
3780  m_stats.num_extract_buffer_refills++;
3781  m_stats.refill_extract_buffer_time.start();
3782  m_stats.refill_time_before_merge.start();
3783 
3784  std::vector<size_type> sizes(eas + ias);
3785  std::vector<iterator_pair_type> sequences(eas + ias);
3786  size_type output_size = 0;
3787 
3788  if (minimum_size > 0) {
3789  size_t limiting_ea_index = eas + 1;
3790  bool reuse_lower_bounds = false;
3791  while (output_size < minimum_size)
3792  {
3793  STXXL_DEBUG("refill: request more data," <<
3794  " output_size=" << output_size <<
3795  " minimum_size=" << minimum_size <<
3796  " limiting_ea_index=" << limiting_ea_index);
3797 
3798  if (limiting_ea_index < eas) {
3799  if (m_external_arrays[limiting_ea_index].num_hinted_blocks() == 0)
3800  break;
3801 
3802  wait_next_ea_blocks(limiting_ea_index);
3803  reuse_lower_bounds = true;
3804  }
3805  else if (limiting_ea_index == eas) {
3806  // no more unaccessible EM data
3807  STXXL_MSG("Warning: refill_extract_buffer(n): "
3808  "minimum_size > # mergeable elements!");
3809  break;
3810  }
3811 
3812  limiting_ea_index = calculate_merge_sequences(
3813  sizes, sequences, reuse_lower_bounds);
3814 
3815  output_size = std::accumulate(sizes.begin(), sizes.end(), 0);
3816  }
3817  }
3818  else {
3819  calculate_merge_sequences(sizes, sequences);
3820  output_size = std::accumulate(sizes.begin(), sizes.end(), 0);
3821  }
3822 
3823  if (c_limit_extract_buffer) {
3824  output_size = std::min<size_t>(output_size, maximum_size);
3825  }
3826 
3827  m_stats.max_extract_buffer_size.set_max(output_size);
3828  m_stats.total_extract_buffer_size += output_size;
3829 
3830  assert(output_size > 0);
3831  m_extract_buffer.resize(output_size);
3832  m_extract_buffer_size = output_size;
3833 
3834  m_stats.refill_time_before_merge.stop();
3835  m_stats.refill_merge_time.start();
3836 
3838  sequences.begin(), sequences.end(),
3839  m_extract_buffer.begin(), output_size, m_inv_compare);
3840 
3841  m_stats.refill_merge_time.stop();
3842  m_stats.refill_time_after_merge.start();
3843 
3844  advance_arrays(sequences, sizes, eas, ias);
3845 
3846  m_minima.update_extract_buffer();
3847 
3848  m_stats.refill_time_after_merge.stop();
3849  m_stats.refill_extract_buffer_time.stop();
3850 
3851  check_invariants();
3852  }
3853 
3854  //! Requests more EM data from a given EA and updates
3855  //! the winner trees and hints accordingly.
3856  inline void wait_next_ea_blocks(unsigned_type ea_index)
3857  {
3858  unsigned_type used_blocks =
3859  m_external_arrays[ea_index].wait_next_blocks();
3860 
3861  m_num_hinted_blocks -= used_blocks;
3862  m_num_used_read_blocks += used_blocks;
3863 
3864  update_external_min_tree(ea_index);
3865  }
3866 
3867  // Removes empty arrays and updates the winner trees accordingly
3868  inline void advance_arrays(std::vector<iterator_pair_type>& sequences,
3869  std::vector<size_type>& sizes,
3870  size_t eas, size_t ias)
3871  {
3872  unsigned_type total_freed_blocks = 0;
3873 
3874  for (size_type i = 0; i < eas + ias; ++i) {
3875  // dist represents the number of elements that haven't been merged
3876  size_type dist = std::distance(sequences[i].first,
3877  sequences[i].second);
3878  const size_t diff = sizes[i] - dist;
3879  if (diff == 0) continue;
3880 
3881  if (i < eas) {
3882  // remove items and free blocks in RAM.
3883  unsigned_type freed_blocks =
3884  m_external_arrays[i].remove_items(diff);
3885 
3886  m_num_used_read_blocks -= freed_blocks;
3887  total_freed_blocks += freed_blocks;
3888 
3889  // correct item count.
3890  assert(m_external_size >= diff);
3891  m_external_size -= diff;
3892  }
3893  else {
3894  size_type j = i - eas;
3895  m_internal_arrays[j].inc_min(diff);
3896  assert(m_internal_size >= diff);
3897  m_internal_size -= diff;
3898  }
3899  }
3900 
3901  // remove empty arrays - important for the next round (may also reduce
3902  // number of prefetch buffers, so must be before hinting).
3903  cleanup_external_arrays();
3904 
3905  // prefetch new blocks from EAs using freed blocks
3906  if (total_freed_blocks)
3907  hint_external_arrays();
3908 
3909  m_stats.num_new_external_arrays = 0;
3910  cleanup_internal_arrays();
3911  }
3912 
3913  //! Flushes the insertions heap p into an internal array.
3915  {
3916  assert(m_proc[p]->insertion_heap.size() != 0);
3917 
3918  heap_type& insheap = m_proc[p]->insertion_heap;
3919  size_t size = insheap.size();
3920 
3921  STXXL_DEBUG0(
3922  "Flushing insertion heap array p=" << p <<
3923  " size=" << insheap.size() <<
3924  " capacity=" << insheap.capacity() <<
3925  " int_memory=" << internal_array_type::int_memory(insheap.size()) <<
3926  " mem_left=" << m_mem_left);
3927 
3928  m_stats.num_insertion_heap_flushes++;
3929  stats_timer flush_time(true); // separate timer due to parallel sorting
3930 
3931  // sort locally, independent of others
3932  std::sort(insheap.begin(), insheap.end(), m_inv_compare);
3933 
3934 #if STXXL_PARALLEL
3935 #pragma omp critical (stxxl_flush_insertion_heap)
3936 #endif
3937  {
3938  // test that enough RAM is available for merged internal array:
3939  // otherwise flush the existing internal arrays out to disk.
3940  flush_ia_ea_until_memory_free(
3941  internal_array_type::int_memory(insheap.size()));
3942 
3943  // invalidate player in minima tree (before adding the IA to tree)
3944  m_minima.deactivate_heap(p);
3945 
3946  // insheap is empty afterwards, as vector was swapped into new_array
3947  add_as_internal_array(insheap);
3948 
3949  // reserve new insertion heap
3950  insheap.reserve(m_insertion_heap_capacity);
3951  assert(insheap.capacity() * sizeof(value_type)
3952  == insertion_heap_int_memory());
3953 
3954  // update item counts
3955 #if STXXL_PARALLEL
3956 #pragma omp atomic
3957 #endif
3958  m_heaps_size -= size;
3959  }
3960 
3961  m_stats.insertion_heap_flush_time += flush_time;
3962  }
3963 
3964  //! Flushes all insertions heaps into an internal array.
3966  {
3967  size_type max_mem_needed;
3968 
3969  if (c_merge_sorted_heaps) {
3970  max_mem_needed = m_mem_for_heaps;
3971  }
3972  else {
3973  max_mem_needed = insertion_heap_int_memory();
3974  }
3975 
3976  // test that enough RAM is available for merged internal array:
3977  // otherwise flush the existing internal arrays out to disk.
3978  flush_ia_ea_until_memory_free(max_mem_needed);
3979 
3980  m_stats.num_insertion_heap_flushes++;
3981  m_stats.insertion_heap_flush_time.start();
3982 
3983  size_type size = m_heaps_size;
3984  size_type int_memory = 0;
3985  assert(size > 0);
3986  std::vector<std::pair<value_iterator, value_iterator> > sequences(m_num_insertion_heaps);
3987 
3988 #if STXXL_PARALLEL
3989  #pragma omp parallel for
3990 #endif
3991  for (long i = 0; i < m_num_insertion_heaps; ++i)
3992  {
3993  heap_type& insheap = m_proc[i]->insertion_heap;
3994 
3995  std::sort(insheap.begin(), insheap.end(), m_inv_compare);
3996 
3997  if (c_merge_sorted_heaps)
3998  sequences[i] = std::make_pair(insheap.begin(), insheap.end());
3999 
4000  int_memory += insheap.capacity();
4001  }
4002 
4003  if (c_merge_sorted_heaps)
4004  {
4005  m_stats.merge_sorted_heaps_time.start();
4006  std::vector<value_type> merged_array(size);
4007 
4009  sequences.begin(), sequences.end(),
4010  merged_array.begin(), size, m_inv_compare);
4011 
4012  m_stats.merge_sorted_heaps_time.stop();
4013 
4014  add_as_internal_array(merged_array);
4015 
4016  for (int_type i = 0; i < m_num_insertion_heaps; ++i)
4017  {
4018  m_proc[i]->insertion_heap.clear();
4019  m_proc[i]->insertion_heap.reserve(m_insertion_heap_capacity);
4020  }
4021  m_minima.clear_heaps();
4022  }
4023  else
4024  {
4025  for (unsigned i = 0; i < m_num_insertion_heaps; ++i)
4026  {
4027  heap_type& insheap = m_proc[i]->insertion_heap;
4028 
4029  if (insheap.size() == 0) continue;
4030 
4031  add_as_internal_array(insheap);
4032 
4033  // reserve new insertion heap
4034  insheap.reserve(m_insertion_heap_capacity);
4035  }
4036 
4037  m_minima.clear_heaps();
4038  }
4039 
4040  m_heaps_size = 0;
4041 
4042  m_stats.insertion_heap_flush_time.stop();
4043 
4044  check_invariants();
4045  }
4046 
4047  //! Flushes the internal arrays into an external array.
4049  {
4050  STXXL_DEBUG("Flushing internal arrays" <<
4051  " num_arrays=" << m_internal_arrays.size());
4052 
4053  m_stats.num_internal_array_flushes++;
4054  m_stats.internal_array_flush_time.start();
4055 
4056  m_minima.clear_internal_arrays();
4057 
4058  // also flush extract buffer items out to disk.
4059  convert_eb_into_ia(true);
4060 
4061  // clean up internal arrays that have been deleted in extract_min!
4062  cleanup_internal_arrays();
4063 
4064  size_type num_arrays = m_internal_arrays.size();
4065  size_type size = m_internal_size;
4066  size_type int_memory = 0;
4067  std::vector<iterator_pair_type> sequences(num_arrays);
4068 
4069  for (unsigned i = 0; i < num_arrays; ++i)
4070  {
4071  sequences[i] = std::make_pair(m_internal_arrays[i].begin(),
4072  m_internal_arrays[i].end());
4073 
4074  int_memory += m_internal_arrays[i].int_memory();
4075  }
4076 
4077  // must release more RAM in IAs than the EA takes, otherwise: merge
4078  // external and internal arrays!
4079  if (int_memory < external_array_type::int_memory(size)
4080  + ceil(m_num_read_blocks_per_ea) * block_size)
4081  {
4082  return merge_external_arrays();
4083  }
4084 
4085  // construct new external array
4086 
4087  external_array_type ea(size, &m_pool, 0);
4088 
4089  m_stats.max_merge_buffer_size.set_max(size);
4090 
4091  {
4092  external_array_writer_type external_array_writer(ea);
4093 
4095  sequences.begin(), sequences.end(),
4096  external_array_writer.begin(), size, m_inv_compare);
4097  }
4098 
4099  STXXL_DEBUG("Merge done of new ea " << &ea);
4100 
4101  m_external_arrays.swap_back(ea);
4102 
4103  m_internal_size = 0;
4104  m_external_size += size;
4105 
4106  // register EA in min tree
4107  // important for check_external_level()!
4108  m_external_min_tree.activate_without_replay(m_external_arrays.size() - 1);
4109  update_external_min_tree(m_external_arrays.size() - 1);
4110 
4111  // register EA in hint tree
4112  m_hint_tree.activate_without_replay(m_external_arrays.size() - 1);
4113  if (!m_in_bulk_push)
4114  update_hint_tree(m_external_arrays.size() - 1);
4115  // else: done in bulk_push_end() -> rebuild_hint_tree()
4116 
4117  m_internal_arrays.clear();
4118  m_stats.num_new_internal_arrays = 0;
4119  cleanup_internal_arrays();
4120 
4121  // TODO: is this necessary? See cleanup_internal_arrays().
4122  for (size_t i = 0; i < c_max_internal_levels; ++i)
4123  m_internal_levels[i] = 0;
4124 
4125  m_mem_left += int_memory;
4126  m_mem_left -= m_external_arrays.back().int_memory();
4127 
4128  m_stats.max_num_external_arrays.set_max(m_external_arrays.size());
4129  m_stats.internal_array_flush_time.stop();
4130 
4131  // update EA level and potentially merge
4132  ++m_external_levels[0];
4133  check_external_level(0);
4134 
4135  resize_read_pool();
4136  // Rebuild hint tree completely as the hint sequence may have changed.
4137  if (!m_in_bulk_push)
4138  rebuild_hint_tree();
4139  else
4140  assert(m_external_arrays.size() - 1 >= m_bulk_first_delayed_external_array);
4141 
4142  check_invariants();
4143  }
4144 
4145  // Compares the largest accessible value of two external arrays.
4148  const std::vector<unsigned_type>& m_indices;
4150 
4152  const inv_compare_type& compare,
4153  const std::vector<unsigned_type>& indices)
4154  : m_eas(eas), m_indices(indices), m_compare(compare) { }
4155 
4156  bool operator () (const size_t& a, const size_t& b) const
4157  {
4158  return m_compare(m_eas[m_indices[a]].get_next_hintable_min(),
4159  m_eas[m_indices[b]].get_next_hintable_min());
4160  }
4161  };
4162 
4163  //! Merges external arrays if there are too many external arrays on
4164  //! the same level.
4165  void check_external_level(unsigned_type level, bool force_merge_all = false)
4166  {
4167  if (!force_merge_all)
4168  STXXL_DEBUG("Checking external level " << level);
4169 
4170  // return if EA level is not full
4171  if (m_external_levels[level] < c_max_external_level_size && !force_merge_all)
4172  return;
4173 
4174  unsigned_type level_size = 0;
4175  size_type int_memory = 0;
4176  std::vector<unsigned_type> ea_index;
4177 
4178  for (unsigned_type i = 0; i < m_external_arrays.size(); ++i)
4179  {
4180  if (m_external_arrays[i].level() != level && !force_merge_all) continue;
4181  if (m_external_arrays[i].empty()) continue;
4182 
4183  level_size += m_external_arrays[i].size();
4184  int_memory += m_external_arrays[i].int_memory();
4185  ea_index.push_back(i);
4186  }
4187 
4188  // return if there is not enough RAM for the new array.
4189  // TODO: force_merge_all==true is for freeing memory. Breaking here is not
4190  // helpful in this case. But one should maybe reserve some space in advance.
4191  if (m_mem_left < external_array_type::int_memory(level_size) && !force_merge_all)
4192  return;
4193  m_mem_left -= external_array_type::int_memory(level_size);
4194 
4195  STXXL_ASSERT(force_merge_all || c_max_external_level_size == ea_index.size());
4196  unsigned_type num_arrays_to_merge = ea_index.size();
4197 
4198  STXXL_DEBUG("merging external arrays" <<
4199  " level=" << level <<
4200  " level_size=" << level_size <<
4201  " sequences=" << num_arrays_to_merge <<
4202  " force_merge_all=" << force_merge_all);
4203 
4204  // if force_merge_all: create array in highest level to avoid merging
4205  // of such a large EA.
4206  unsigned_type new_level = force_merge_all ? c_max_external_levels - 1 : level + 1;
4207 
4208  // construct new external array
4209  external_array_type ea(level_size, &m_pool, new_level);
4210  {
4211  external_array_writer_type external_array_writer(ea);
4212  typename external_array_writer_type::iterator out_iter
4213  = external_array_writer.begin();
4214 
4215  // === build minima_tree over the level's arrays ===
4216 
4217  s_min_tree_comparator min_tree_comparator(m_external_arrays,
4218  m_inv_compare, ea_index);
4219 
4220  winner_tree<s_min_tree_comparator> min_tree(num_arrays_to_merge,
4221  min_tree_comparator);
4222 
4223  // =================================================
4224 
4225  int_type num_arrays_done = 0;
4226 
4227  while (num_arrays_to_merge != num_arrays_done)
4228  {
4229  STXXL_DEBUG("num_arrays_done = " << num_arrays_done);
4230 
4231  // === build hints ===
4232 
4233  for (int_type i = 0; i < num_arrays_to_merge; ++i) {
4234  if (m_external_arrays[ea_index[i]].has_unhinted_em_data()) {
4235  min_tree.activate_without_replay(i);
4236  }
4237  else {
4238  min_tree.deactivate_without_replay(i);
4239  }
4240  }
4241 
4242  min_tree.rebuild();
4243 
4244  // === fill available memory with read blocks ===
4245  while (m_mem_left >= block_size) {
4246  block_type* new_block = new block_type();
4247  m_pool.add_prefetch(new_block);
4248  ++m_num_read_blocks;
4249  m_mem_left -= block_size;
4250  }
4251  // ==============================================
4252 
4253  // cleanup hints (all arrays, not only the ones to merge)
4254  for (unsigned_type i = 0; i < m_external_arrays.size(); ++i) {
4255  m_external_arrays[i].rebuild_hints_prepare();
4256  }
4257 
4258  // virtually release all hints
4259  unsigned_type free_prefetch_blocks =
4260  m_pool.free_size_prefetch() + m_num_hinted_blocks;
4261  m_num_hinted_blocks = 0;
4262 
4263  int gmin_index_index; // index in ea_index
4264  while (free_prefetch_blocks > 0 &&
4265  (gmin_index_index = min_tree.top()) >= 0)
4266  {
4267  const unsigned_type gmin_index = ea_index[gmin_index_index];
4268  assert(gmin_index < m_external_arrays.size());
4269 
4270  STXXL_DEBUG0("check_external_level():Give pre-hint in EA[" << gmin_index << "] min " <<
4271  m_external_arrays[gmin_index].get_next_hintable_min());
4272 
4273  m_external_arrays[gmin_index].rebuild_hints_prehint_next_block();
4274  --free_prefetch_blocks;
4275  ++m_num_hinted_blocks;
4276 
4277  if (m_external_arrays[gmin_index].has_unhinted_em_data()) {
4278  min_tree.replay_on_change(gmin_index_index);
4279  }
4280  else {
4281  min_tree.deactivate_player(gmin_index_index);
4282  }
4283  }
4284 
4285  // invalidate all hinted blocks no longer needed
4286  // (all arrays, not only the ones to merge)
4287  for (size_t i = 0; i < m_external_arrays.size(); ++i)
4288  m_external_arrays[i].rebuild_hints_cancel();
4289 
4290  // perform real hinting on pre-hinted blocks
4291  // (all arrays, not only the ones to merge)
4292  for (size_t i = 0; i < m_external_arrays.size(); ++i)
4293  m_external_arrays[i].rebuild_hints_finish();
4294 
4295  assert(free_prefetch_blocks == m_pool.free_size_prefetch());
4296 
4297  // ================================ end build hints ======
4298 
4299  // === wait for data ===
4300  for (size_type i = 0; i < num_arrays_to_merge; ++i) {
4301  const unsigned_type index = ea_index[i];
4302 
4303  unsigned_type used_blocks =
4304  m_external_arrays[index].wait_all_hinted_blocks();
4305 
4306  m_num_hinted_blocks -= used_blocks;
4307  m_num_used_read_blocks += used_blocks;
4308  }
4309  // =====================
4310 
4311  // === build sequences ===
4312  std::vector<iterator_pair_type> sequences(num_arrays_to_merge);
4313  std::vector<size_type> sizes(num_arrays_to_merge);
4314 
4315  gmin_index_index = min_tree.top();
4316  bool needs_limit = (gmin_index_index >= 0) ? true : false;
4317 
4318  for (size_type i = 0; i < num_arrays_to_merge; ++i) {
4319  const unsigned_type index = ea_index[i];
4320  iterator begin = m_external_arrays[index].begin();
4321  iterator end = m_external_arrays[index].end();
4322 
4323  if (needs_limit) {
4324  const unsigned_type gmin_index = ea_index[gmin_index_index];
4325  const value_type& gmin_value =
4326  m_external_arrays[gmin_index].get_next_block_min();
4327 
4328  end = std::lower_bound(begin, end,
4329  gmin_value, m_inv_compare);
4330  }
4331 
4332  sizes[i] = std::distance(begin, end);
4333  sequences[i] = std::make_pair(begin, end);
4334 
4335  STXXL_DEBUG("sequence[" << i << "] ea " <<
4336  begin << " - " << end <<
4337  " size " << sizes[i] <<
4338  (needs_limit ? " with ub limit" : ""));
4339  }
4340  // ==========================================
4341 
4342  // === merge ===
4343 
4344  size_type output_size = std::accumulate(sizes.begin(), sizes.end(), 0);
4345 
4347  sequences.begin(), sequences.end(),
4348  out_iter, output_size, m_inv_compare);
4349 
4350  for (unsigned_type i = 0; i < num_arrays_to_merge; ++i) {
4351  const unsigned_type index = ea_index[i];
4352 
4353  if (!m_external_arrays[index].empty()) {
4354  // remove items and free blocks in RAM.
4355  unsigned_type freed_blocks =
4356  m_external_arrays[index].remove_items(sizes[i]);
4357 
4358  m_num_used_read_blocks -= freed_blocks;
4359 
4360  if (m_external_arrays[index].empty())
4361  ++num_arrays_done;
4362  }
4363  }
4364 
4365  // reset read buffer
4366  resize_read_pool();
4367 
4368  // cannot call clear_external_arrays() here, since it
4369  // corrupts ea_index.
4370  }
4371 
4372  if (m_in_bulk_push)
4373  m_bulk_first_delayed_external_array = 0; // TODO: workaround
4374  } // destroy external_array_writer
4375 
4376  // clean up now empty arrays
4377  cleanup_external_arrays();
4378 
4379  m_external_arrays.swap_back(ea);
4380  ++m_external_levels[new_level];
4381 
4382  // register EA in min tree
4383  m_external_min_tree.activate_without_replay(m_external_arrays.size() - 1);
4384  update_external_min_tree(m_external_arrays.size() - 1);
4385 
4386  // register EA in hint tree
4387  m_hint_tree.activate_without_replay(m_external_arrays.size() - 1);
4388  if (!m_in_bulk_push)
4389  update_hint_tree(m_external_arrays.size() - 1);
4390  // else: done in bulk_push_end() -> rebuild_hint_tree()
4391 
4392  STXXL_DEBUG("Merge done of new ea " << &ea);
4393 
4394  if (!force_merge_all)
4395  check_external_level(level + 1);
4396 
4397  check_invariants();
4398  }
4399 
4400  //! Add new internal array, which requires that values are sorted!
4401  //! automatically decreases m_mem_left! also merges internal arrays if
4402  //! there are too many internal arrays on the same level.
4403  void add_as_internal_array(std::vector<value_type>& values,
4404  unsigned_type used = 0,
4405  unsigned_type level = 0)
4406  {
4407  const size_t size = values.size();
4408  const size_t capacity = values.capacity();
4409  assert(size > used); // at least one element
4410 
4411  internal_array_type new_array(values, used, level);
4412  STXXL_ASSERT(new_array.int_memory() ==
4413  internal_array_type::int_memory(capacity));
4414  m_internal_arrays.swap_back(new_array);
4415 
4416  if (!extract_buffer_empty()) {
4417  m_stats.num_new_internal_arrays++;
4418  m_stats.max_num_new_internal_arrays.set_max(
4419  m_stats.num_new_internal_arrays);
4420  m_minima.add_internal_array(
4421  static_cast<unsigned>(m_internal_arrays.size()) - 1
4422  );
4423  }
4424 
4425  m_internal_size += size - used;
4426  m_mem_left -= internal_array_type::int_memory(capacity);
4427 
4428  STXXL_CHECK(level < c_max_internal_levels &&
4429  "Internal array level is larger than anything possible "
4430  "in this universe. Increase the size of m_internal_levels");
4431 
4432  ++m_internal_levels[level];
4433 
4434  m_stats.max_num_internal_arrays.set_max(m_internal_arrays.size());
4435 
4436  // if IA level is too large ...
4437  if (m_internal_levels[level] < c_max_internal_level_size) return;
4438 
4439  unsigned_type level_size = 0;
4440  size_type int_memory = 0;
4441  std::vector<iterator_pair_type> sequences;
4442  std::vector<unsigned_type> ia_index;
4443 
4444  for (unsigned_type i = 0; i < m_internal_arrays.size(); ++i)
4445  {
4446  if (m_internal_arrays[i].level() != level) continue;
4447  if (m_internal_arrays[i].empty()) continue;
4448 
4449  level_size += m_internal_arrays[i].size();
4450  int_memory += m_internal_arrays[i].int_memory();
4451  sequences.push_back(std::make_pair(m_internal_arrays[i].begin(),
4452  m_internal_arrays[i].end()));
4453  ia_index.push_back(i);
4454  }
4455 
4456  // AND there is enough RAM to merge it (without flushing out to EA).
4457  if (m_mem_left < internal_array_type::int_memory(level_size)) return;
4458 
4459  // must free up more memory than the new array needs.
4460  STXXL_ASSERT(int_memory >= internal_array_type::int_memory(level_size));
4461 
4462  STXXL_DEBUG("merging internal arrays" <<
4463  " level=" << level <<
4464  " level_size=" << level_size <<
4465  " sequences=" << sequences.size());
4466 
4467  std::vector<value_type> merged_array(level_size);
4468 
4470  sequences.begin(), sequences.end(),
4471  merged_array.begin(), level_size, m_inv_compare);
4472 
4473  // release memory of old internal arrays immediately
4474  for (unsigned_type i = 0; i < ia_index.size(); ++i)
4475  {
4476  unsigned_type ia = ia_index[i];
4477  m_internal_arrays[ia].make_empty();
4478  // this is done in cleanup_internal_arrays()...
4479  //if (ia < m_minima.ia_slots())
4480  // m_minima.deactivate_internal_array(ia);
4481  }
4482 
4483  cleanup_internal_arrays();
4484 
4485  // in add_as_internal_array the level_size is re-added!
4486  m_internal_size -= level_size;
4487 
4488  // add as new internal array at next level (and maybe recursively merge)
4489  add_as_internal_array(merged_array, 0, level + 1);
4490  }
4491 
4492  /*!
4493  * Sorts the values from values and writes them into an internal array.
4494  * Don't use the value vector afterwards!
4495  *
4496  * \param values the vector to sort and store
4497  */
4498  void flush_array_internal(std::vector<value_type>& values)
4499  {
4500  potentially_parallel::sort(values.begin(), values.end(), m_inv_compare);
4501 
4502  // flush until enough memory for new array
4503  flush_ia_ea_until_memory_free(
4504  internal_array_type::int_memory(values.size())
4505  );
4506 
4507  add_as_internal_array(values);
4508  }
4509 
4510  //! Struct of all statistical counters and timers. Turn on/off statistics
4511  //! using the stats_counter and stats_timer typedefs.
4512  struct stats_type
4513  {
4514  //! Largest number of elements in the extract buffer at the same time
4516 
4517  //! Sum of the sizes of each extract buffer refill. Used for average
4518  //! size.
4520 
4521  //! Largest number of elements in the merge buffer when running
4522  //! flush_internal_arrays()
4524 
4525  //! Total number of extracts
4527 
4528  //! Number of refill_extract_buffer() calls
4530 
4531  //! Number of flush_insertion_heaps() calls
4533 
4534  //! Number of flush_directly_to_hd() calls
4536 
4537  //! Number of flush_internal_arrays() calls
4539 
4540  //! Number of merge_external_arrays() calls
4542 
4543  //! Largest number of internal arrays at the same time
4545 
4546  //! Largest number of external arrays at the same time
4548 
4549  //! Temporary number of new external arrays at the same time (which
4550  //! were created while the extract buffer hadn't been empty)
4552 
4553  //! Largest number of new external arrays at the same time (which were
4554  //! created while the extract buffer hadn't been empty)
4556 
4557  //! Temporary number of new internal arrays at the same time (which
4558  //! were created while the extract buffer hadn't been empty)
4560 
4561  //! Largest number of new internal arrays at the same time (which were
4562  //! created while the extract buffer hadn't been empty)
4564 
4565  //! Total time for flush_insertion_heaps()
4567 
4568  //! Total time for flush_directly_to_hd()
4570 
4571  //! Total time for flush_internal_arrays()
4573 
4574  //! Total time for merge_external_arrays()
4576 
4577  //! Total time for extract_min()
4579 
4580  //! Total time for refill_extract_buffer()
4582 
4583  //! Total time for the merging in refill_extract_buffer()
4584  //! Part of refill_extract_buffer_time.
4586 
4587  //! Total time for all things before merging in refill_extract_buffer()
4588  //! Part of refill_extract_buffer_time.
4590 
4591  //! Total time for all things after merging in refill_extract_buffer()
4592  //! Part of refill_extract_buffer_time.
4594 
4595  //! Total time of wait() calls in first part of
4596  //! refill_extract_buffer(). Part of refill_time_before_merge and
4597  //! refill_extract_buffer_time.
4599 
4600  //! Total time for pop_heap() in extract_min().
4601  //! Part of extract_min_time.
4603 
4604  //! Total time for merging the sorted heaps.
4605  //! Part of flush_insertion_heaps.
4607 
4608  //! Total time for std::lower_bound calls in refill_extract_buffer()
4609  //! Part of refill_extract_buffer_time and refill_time_before_merge.
4610  // stats_timer refill_lower_bound_time;
4611 
4612  //! Total time for std::accumulate calls in refill_extract_buffer()
4613  //! Part of refill_extract_buffer_time and refill_time_before_merge.
4615 
4616  //! Total time for determining the smallest max value in refill_extract_buffer()
4617  //! Part of refill_extract_buffer_time and refill_time_before_merge.
4619 
4621 
4622  friend std::ostream& operator << (std::ostream& os, const stats_type& o)
4623  {
4624  return os << "max_extract_buffer_size=" << o.max_extract_buffer_size.as_memory_amount(sizeof(value_type)) << std::endl
4625  << "total_extract_buffer_size=" << o.total_extract_buffer_size.as_memory_amount(sizeof(value_type)) << std::endl
4626  << "max_merge_buffer_size=" << o.max_merge_buffer_size.as_memory_amount(sizeof(value_type)) << std::endl
4627  << "num_extracts=" << o.num_extracts << std::endl
4628  << "num_extract_buffer_refills=" << o.num_extract_buffer_refills << std::endl
4629  << "num_insertion_heap_flushes=" << o.num_insertion_heap_flushes << std::endl
4630  << "num_direct_flushes=" << o.num_direct_flushes << std::endl
4631  << "num_internal_array_flushes=" << o.num_internal_array_flushes << std::endl
4632  << "num_external_array_merges=" << o.num_external_array_merges << std::endl
4633  << "max_num_internal_arrays=" << o.max_num_internal_arrays << std::endl
4634  << "max_num_external_arrays=" << o.max_num_external_arrays << std::endl
4635  << "num_new_external_arrays=" << o.num_new_external_arrays << std::endl
4636  << "max_num_new_external_arrays=" << o.max_num_new_external_arrays << std::endl
4637  << "num_new_internal_arrays=" << o.num_new_internal_arrays << std::endl
4638  << "max_num_new_internal_arrays=" << o.max_num_new_internal_arrays << std::endl
4639  << "insertion_heap_flush_time=" << o.insertion_heap_flush_time << std::endl
4640  << "direct_flush_time=" << o.direct_flush_time << std::endl
4641  << "internal_array_flush_time=" << o.internal_array_flush_time << std::endl
4642  << "external_array_merge_time=" << o.external_array_merge_time << std::endl
4643  << "extract_min_time=" << o.extract_min_time << std::endl
4644  << "refill_extract_buffer_time=" << o.refill_extract_buffer_time << std::endl
4645  << "refill_merge_time=" << o.refill_merge_time << std::endl
4646  << "refill_time_before_merge=" << o.refill_time_before_merge << std::endl
4647  << "refill_time_after_merge=" << o.refill_time_after_merge << std::endl
4648  << "refill_wait_time=" << o.refill_wait_time << std::endl
4649  << "pop_heap_time=" << o.pop_heap_time << std::endl
4650  << "merge_sorted_heaps_time=" << o.merge_sorted_heaps_time << std::endl
4651  // << "refill_lower_bound_time=" << o.refill_lower_bound_time << std::endl
4652  << "refill_accumulate_time=" << o.refill_accumulate_time << std::endl
4653  << "refill_minmax_time=" << o.refill_minmax_time << std::endl
4654  << "hint_time=" << o.hint_time << std::endl;
4655  }
4656  };
4657 
4659 };
4660 
4661 // For C++98 compatibility:
4662 template <
4663  class ValueType,
4664  class CompareType,
4665  class AllocStrategy,
4666  uint64 BlockSize,
4667  uint64 DefaultMemSize,
4668  uint64 MaxItems
4669  >
4670 const double parallel_priority_queue<ValueType, CompareType, AllocStrategy, BlockSize,
4671  DefaultMemSize, MaxItems>::c_default_extract_buffer_ram_part = 0.05;
4672 
4674 
4675 #endif // !STXXL_CONTAINERS_PARALLEL_PRIORITY_QUEUE_HEADER
bool empty() const
Returns true if the array is empty.
const float m_num_read_blocks_per_ea
Number of read/prefetch blocks per external array.
void add_prefetch(block_type *&block)
Add block to prefetch pool.
size_type size() const
The number of elements in the queue.
const long m_num_insertion_heaps
Number of insertion heaps. Usually equal to the number of CPUs.
void bulk_push_end()
Ends a sequence of push operations.
parent_type::proc_vector_type proc_vector_type
void flush_insertion_heap(unsigned_type p)
Flushes the insertions heap p into an internal array.
#define STXXL_ASSERT(condition)
Definition: verbose.h:220
stats_counter num_extracts
Total number of extracts.
parent_type::inv_compare_type compare_type
size_t num_used_blocks() const
Returns the number of blocks loaded in RAM.
compat::remove_const< Integral >::type div_ceil(Integral n, Integral2 d)
Definition: utils.h:199
#define STXXL_DEFAULT_BLOCK_SIZE(type)
unsigned_type m_bulk_first_delayed_external_array
First index in m_external_arrays that was not re-hinted during a bulk_push sequence.
void swap(internal_array &o)
Swap internal_array with another one.
internal_size_type m_current
pointer to the current element inside the referenced block. undefined while m_live is false...
void rebuild_hints_prehint_next_block()
Advance m_unhinted_block index without actually prefetching.
stats_counter num_new_internal_arrays
Temporary number of new internal arrays at the same time (which were created while the extract buffer...
parent_type & m_parent
The priority queue.
void rebuild_hint_tree()
Rebuild hint tree completely as the hint sequence may have changed, and re-hint the correct block seq...
unsigned_type get_end_block_index() const
Returns the block beyond the block in which *(m_end_index-1) is located.
void flush_insertion_heaps()
Flushes all insertions heaps into an internal array.
head_comp(self_type &parent, proc_vector_type &proc, ias_type &ias, const compare_type &compare)
external_size_type get_index() const
return the current absolute index inside the external array.
bool extract_buffer_empty() const
Returns if the extract buffer is empty.
unsigned_type wait_all_hinted_blocks()
Waits until all hinted blocks are read into RAM. Returns how many blocks were successfully read...
#define LIKELY(c)
Definition: utils.h:219
size_type m_extract_buffer_index
Index of the currently smallest element in the extract buffer.
friend std::ostream & operator<<(std::ostream &os, const uint_pair &a)
make a uint_pair outputtable via iostreams, using unsigned long long.
Definition: uint_types.h:228
external_size_type m_capacity
The total size of the external array in items. Cannot be changed after construction.
bool is_heap(RandomAccessIterator first, RandomAccessIterator last, StrictWeakOrdering comp=std::less< typename std::iterator_traits< RandomAccessIterator >::value_type >())
Definition: is_heap.h:23
Struct of all statistical counters and timers. Turn on/off statistics using the stats_counter and sta...
stats_counter total_extract_buffer_size
Sum of the sizes of each extract buffer refill. Used for average size.
bool valid() const
Returns if the data requested to be in internal memory is completely fetched. True if wait() has been...
stats_counter num_new_external_arrays
Temporary number of new external arrays at the same time (which were created while the extract buffer...
unsigned_type m_num_read_blocks
Total number of read/prefetch buffer blocks.
stats_timer direct_flush_time
Total time for flush_directly_to_hd()
void bulk_push_begin(size_type bulk_size)
Start a sequence of push operations.
std::pair< iterator, iterator > iterator_pair_type
void rebuild_hints_finish()
Perform real-hinting of pre-hinted blocks, since now canceled blocks are available.
size_type m_internal_size
Number of elements in the internal arrays.
Block manager class.
Definition: block_manager.h:61
size_type m_extract_buffer_limit
Maximum size of extract buffer in number of elements Only relevant if c_limit_extract_buffer==true.
void print_stats() const
Prints statistical data.
size_t size() const
Returns the current size of the array.
stats_counter num_internal_array_flushes
Number of flush_internal_arrays() calls.
external_array(external_size_type size, pool_type *pool, unsigned_type level=0)
Constructs an external array.
bool has_em_data() const
Returns if there is data in EM, that&#39;s not randomly accessible.
void sort(ExtIterator first, ExtIterator last, StrictWeakOrdering cmp, unsigned_type M)
Sort records comparison-based, see stxxl::sort -- Sorting Comparison-Based.
Definition: sort.h:676
iterator()
default constructor (should not be used directly)
bool bulk_pop_limit(std::vector< value_type > &out, const value_type &limit, size_t max_size=std::numeric_limits< size_t >::max())
Extracts all elements which are greater or equal to a given limit.
writer_type * m_writer
pointer to the external array containing the elements
unsigned_type remove_items(size_t n)
Removes the first n elements from the array. Returns the number of blocks released into the block poo...
unsigned long long int uint64
Definition: types.h:39
std::vector< value_type > m_extract_buffer
The extract buffer where external (and internal) arrays are merged into for extracting.
void deactivate_player(unsigned_type index)
deactivate a player and replay.
Definition: winner_tree.h:125
void update_block_pointers(size_t block_index)
Updates the m_block_pointers vector. Should be called after any steal() or read() operation...
random_number32_r m_rng
Random number generator for randomly selecting a heap in sequential push()
bool m_is_very_large_bulk
If the bulk currently being inserted is very large, this boolean is set and bulk_push just accumulate...
const value_type & get_value(int index) const
size_t calculate_merge_sequences(std::vector< size_type > &sizes, std::vector< iterator_pair_type > &sequences, bool reuse_previous_lower_bounds=false)
Calculates the sequences vector needed by the multiway merger, considering inaccessible data from ext...
unsigned_type c_max_internal_level_size
currently global public tuning parameter:
void flush_insertion_heaps_with_limit(const value_type &limit)
Flushes all elements of the insertion heaps which are greater or equal to a given limit...
bool empty() const
Returns if the queue is empty.
void convert_eb_into_ia(bool do_not_flush=false)
Convert extract buffer into a new internal array.
size_t num_hinted_blocks() const
Returns the number of hinted blocks.
unsigned_type m_level
Level of internal array (Sander&#39;s PQ: group number)
ppq_local::minima_tree< parallel_priority_queue< value_type, compare_type, alloc_strategy, block_size, DefaultMemSize, MaxItems > > minima_type
type of minima tree combining the structures
void flush_internal_arrays()
Flushes the internal arrays into an external array.
stats_timer pop_heap_time
Total time for pop_heap() in extract_min(). Part of extract_min_time.
block_type * m_block
pointer to the referenced block. undefined while m_live is false.
void flush_array_internal(std::vector< value_type > &values)
Sorts the values from values and writes them into an internal array.
The class winner_tree is a binary tournament tree.
Definition: winner_tree.h:42
bool m_in_bulk_push
Flag if inside a bulk_push sequence.
Compares the largest accessible value of two external arrays.
RandomAccessIterator3 multiway_merge(RandomAccessIteratorPairIterator seqs_begin, RandomAccessIteratorPairIterator seqs_end, RandomAccessIterator3 target, DiffType length, Comparator comp)
Multi-way merging dispatcher.
Definition: parallel.h:144
minima_vector m_minima
stores the minimum value of each block
ForwardIterator swap_remove_if(ForwardIterator first, ForwardIterator last, UnaryPredicate pred)
Transforms the range [first,last) into a range with all the elements for which pred returns true remo...
Definition: swap_vector.h:267
stats_timer refill_extract_buffer_time
Total time for refill_extract_buffer()
winner_tree< hint_comparator > m_hint_tree
Tracks the largest values of the block hinted the latest of the external arrays if there is unaccessi...
void make_live()
allocates a reference to the block&#39;s data (possibly reading it from EM).
proc_vector_type m_proc
Array of processor local data structures, including the insertion heaps.
friend void swap(internal_array &a, internal_array &b)
Swap internal_array with another one.
std::vector< unsigned int > m_ref_count
reference counters for the number of live iterators on the corresponding block in external_array...
head_comp m_head_comp
Comperator instances.
size_t get_max_index() const
The index of the largest element in the array.
bool has_unhinted_em_data() const
Returns if there is data in EM, that&#39;s not already hinted to the prefetcher.
internal_size_type m_block_index
index of the current element&#39;s block in the external array&#39;s block list. undefined while m_live is fa...
#define STXXL_DEFAULT_ALLOC_STRATEGY
Definition: block_alloc.h:258
void replay_on_change(unsigned_type index, bool done=false)
Definition: winner_tree.h:210
static uint_pair min()
return an uint_pair instance containing the smallest value possible
Definition: uint_types.h:234
void update()
updates m_block_index and m_current based on m_index
#define STXXL_PARALLEL
Definition: config.h:83
High-performance smart pointer used as a wrapping reference counting pointer.
Definition: counting_ptr.h:50
stats_timer merge_sorted_heaps_time
Total time for merging the sorted heaps. Part of flush_insertion_heaps.
bool m_limit_extract
flag if inside a bulk limit extract session
inv_compare_type m_inv_compare
&gt;-Comparator for value_type
ppq_local::internal_array< value_type > internal_array_type
bool m_live
when operator* or operator-&gt; are called, then the iterator goes live and allocates a reference to the ...
void rebuild_hints_prepare()
This method prepares rebuilding the hints (this is done after creating a new EA in order to always ha...
stats_timer refill_wait_time
Total time of wait() calls in first part of refill_extract_buffer(). Part of refill_time_before_merge...
std::vector< value_type > m_aggregated_pushes
The aggregated pushes. They cannot be extracted yet.
#define STXXL_MEMDUMP(x)
Definition: verbose.h:88
request_ptr read(block_type *&block, bid_type bid)
Reads block.
stats_counter max_extract_buffer_size
Largest number of elements in the extract buffer at the same time.
stats_timer external_array_merge_time
Total time for merge_external_arrays()
unsigned_type m_level
Level of external array (Sander&#39;s PQ: group number)
size_t size() const
Returns the current size in items.
Class fake_timer is a drop-in replacement for timer, which does nothing.
Definition: timer.h:170
minima_type m_minima
The winner tree containing the smallest values of all sources where the globally smallest element cou...
unsigned_type level() const
Returns the level (group number) of the array.
ea_type & m_ea
reference to the external array to be written
read_write_pool< block_type > pool_type
void deactivate_heap(unsigned index)
Remove an insertion heap from the minima tree.
iterator(writer_type *writer, external_size_type index)
construct a new iterator
mutex m_mutex
mutex for reference counting array (this is actually nicer than openmp&#39;s critical) ...
uint_pair & operator++()
prefix increment operator (directly manipulates the integer parts)
Definition: uint_types.h:163
std::random_access_iterator_tag iterator_category
External array stores a sorted sequence of values on the hard disk and allows access to the first blo...
static size_t int_memory(size_t capacity)
Returns memory usage of EA with given capacity, excluding blocks loaded in RAM. Blocks belong to pref...
size_type insertion_heap_int_memory() const
Return size of insertion heap reservation in bytes.
std::vector< iterator > m_live_boundary
optimization: hold live iterators for the expected boundary blocks of multiway_merge().
bid_vector m_bids
The IDs of each block in external memory.
void wait_next_ea_blocks(unsigned_type ea_index)
Requests more EM data from a given EA and updates the winner trees and hints accordingly.
void cleanup_external_arrays()
Clean up empty external arrays, free their memory and capacity.
std::vector< ProcessorData * > proc_vector_type
size_t int_memory() const
Return the amount of internal memory used by the EA.
stats_timer refill_merge_time
Total time for the merging in refill_extract_buffer() Part of refill_extract_buffer_time.
#define STXXL_CHECK_EQUAL(a, b)
STXXL_CHECK_EQUAL(a,b) is an assertion macro for unit tests, similar to STXXL_CHECK(a==b). The difference is that STXXL_CHECK_EQUAL(a,b) also prints the values of a and b. Attention: a and b must be printable with std::cout!
Definition: verbose.h:192
unsigned_type m_old_unhinted_block
The first unhinted block index as it was before the prepare_rebuilding_hints() call. Used for removal of hints which aren&#39;t needed anymore.
void cleanup_internal_arrays()
Clean up empty internal arrays, free their memory and capacity.
unsigned_type top() const
Returns the winner. Remember running replay_on_pop() if the value of the winner changes.
Definition: winner_tree.h:176
friend void swap(external_array &a, external_array &b)
Swap external_array with another one.
ppq_local::external_array< value_type, block_size, AllocStrategy > external_array_type
size_t get_min_index() const
The index of the currently smallest element in the array.
stats_counter max_num_new_internal_arrays
Largest number of new internal arrays at the same time (which were created while the extract buffer h...
#define STXXL_CHECK(condition)
STXXL_CHECK is an assertion macro for unit tests, which contrarily to assert() also works in release ...
Definition: verbose.h:170
external_size_type m_index
index of the current element, absolute in the external array
external_size_type m_index
The read position in the array.
size_t int_memory() const
Return the amount of internal memory used by the array.
external_array_writer< self_type > writer_type
std::vector< block_type * > block_vector
request_vector m_requests
The read request pointers are used to wait until the block has been completely fetched.
uint_pair & operator+=(const uint_pair &b)
addition operator (uses 64-bit arithmetic)
Definition: uint_types.h:183
block_vector m_blocks
A vector of size m_num_blocks with block_type pointers, some of them will be filled while writing...
pool_type m_pool
Prefetch and write buffer pool for external arrays (has to be in front of m_external_arrays) ...
size_type m_external_size
Number of elements in the external arrays.
ppq_iterator()
default constructor (should not be used directly)
void merge_external_arrays()
Merges all external arrays and all internal arrays into one external array.
Block containing elements of fixed length.
Definition: typed_block.h:237
counting_ptr< request > request_ptr
A reference counting pointer for request.
Definition: request.h:113
stats_timer internal_array_flush_time
Total time for flush_internal_arrays()
unsigned_type m_min_index
Index of the current head.
void activate_without_replay(unsigned_type index)
activate a player and resize if necessary
Definition: winner_tree.h:108
external_array< value_type, BlockSize, AllocStrategy > self_type
size_t buffer_size() const
Returns the number elements available in internal memory.
void add_as_internal_array(std::vector< value_type > &values, unsigned_type used=0, unsigned_type level=0)
Add new internal array, which requires that values are sorted! automatically decreases m_mem_left! al...
std::vector< request_ptr > request_vector
void refill_extract_buffer(size_t minimum_size=0, size_t maximum_size=0)
Refills the extract buffer from the external arrays.
void update_hint_tree(size_t ea_index)
Updates the prefetch prediction tree afer a remove_items(), which frees up blocks.
bool operator!=(const uint_pair &b) const
inequality checking operator
Definition: uint_types.h:198
iterator begin() const
Returns a random-access iterator to the begin of the data in internal memory.
void write_block(size_t block_index)
Called by the external_array_writer to write a block from m_blocks[] to disk. Prior to writing and re...
iterator::block_pointers_type block_pointers_type
bool operator<=(const uint_pair &b) const
less-or-equal comparison operator
Definition: uint_types.h:210
stats_counter max_num_external_arrays
Largest number of external arrays at the same time.
void update_external_min_tree(size_t ea_index)
Updates the external min tree afer a remove() or a wait_next_blocks() call.
void limit_begin(const value_type &limit, size_type bulk_size)
Begin bulk-limit extraction session with limit element.
void make_empty()
Make this array empty.
void bulk_pop(std::vector< value_type > &out, size_t max_size)
Extract up to max_size values at once.
void hint_next_block()
Prefetch the next unhinted block, requires one free read block from the global pool.
choose_int_types< my_pointer_size >::int_type int_type
Definition: types.h:63
static const double c_default_extract_buffer_ram_part
Default limit of the extract buffer ram consumption as share of total ram.
std::vector< BID< BlockSize > > bid_vector
Compares the largest value of the block hinted the latest of two external arrays. ...
void read_block(size_t block_index)
Called by the external_array_writer to read a block from disk into m_blocks[]. If the block is marked...
void add_internal_array(unsigned index)
Add a newly created internal array to the minima tree.
stats_timer insertion_heap_flush_time
Total time for flush_insertion_heaps()
bool operator>(const uint_pair &b) const
greater comparison operator
Definition: uint_types.h:216
unsigned_type m_num_hinted_blocks
number of currently hinted prefetch blocks
stats_timer refill_accumulate_time
Total time for std::lower_bound calls in refill_extract_buffer() Part of refill_extract_buffer_time a...
const value_type & get_next_hintable_min() const
Returns the smallest element of the next hint candidate (the block after the last hinted one)...
iterator end() const
Returns a random-access iterator 1 behind the end of the data in internal memory. ...
stats_counter max_merge_buffer_size
Largest number of elements in the merge buffer when running flush_internal_arrays() ...
iterator end() const
End iterator.
std::pair< unsigned, unsigned > top()
Return smallest items of head winner tree.
unsigned_type wait_next_blocks()
Waits until the next prefetched block is read into RAM, then polls for any further blocks that are do...
void inc_min(size_t diff=1)
Use inc_min(diff) if multiple values have been extracted.
void rebuild_hints_cancel()
Cancel hints which aren&#39;t needed anymore from the prefetcher and fixes it&#39;s size. prepare_rebuilding_...
stats_counter num_direct_flushes
Number of flush_directly_to_hd() calls.
stats_counter max_num_new_external_arrays
Largest number of new external arrays at the same time (which were created while the extract buffer h...
unsigned_type internal_size_type
Definition: types.h:66
#define UNLIKELY(c)
Definition: utils.h:225
heap_type insertion_heap
The heaps where new elements are usually inserted into.
const value_type & get_next_block_min() const
Returns the smallest element of the first block NOT in internal memory (or at least requested to be i...
block_pointers_type m_block_pointers
Begin and end pointers for each block, used for merging with ppq_iterator.
A struct containing the local insertion heap and other information local to a processor.
#define STXXL_MOVE(T)
request_ptr write(block_type *&block, bid_type bid)
Passes a block to the pool for writing.
fake_timer stats_timer
Defines if statistics are gathered: fake_timer or timer.
std::string as_memory_amount(const value_type &) const
Definition: custom_stats.h:138
void clear_heaps()
Remove all insertion heaps from the minima tree.
stats_timer extract_min_time
Total time for extract_min()
size_t capacity() const
Returns the capacity in items.
bool operator>=(const uint_pair &b) const
greater-or-equal comparison operator
Definition: uint_types.h:222
minima_tree(parent_type &parent)
Construct the tree of minima sources.
size_t ia_slots() const
Return size of internal arrays minima tree.
void clear_internal_arrays()
Remove all internal arrays from the minima tree.
void limit_pop()
Remove the minimum element, only works correctly while elements &lt; L.
void flush_ia_ea_until_memory_free(internal_size_type mem_free)
Free up memory by flushing internal arrays and combining external arrays until enough bytes are free...
bool operator<(const uint_pair &b) const
less-than comparison operator
Definition: uint_types.h:204
std::vector< value_type > m_values
Contains the items of the sorted sequence.
Aquire a lock that&#39;s valid until the end of scope.
Definition: mutex.h:123
internal_arrays_type m_internal_arrays
The sorted arrays in internal memory.
#define STXXL_BEGIN_NAMESPACE
Definition: namespace.h:16
std::vector< std::pair< pointer, pointer > > block_pointers_type
void update_heap(int_type index)
Update minima tree after an item from the heap index was removed.
block_type * steal_prefetch()
Take out a block from the prefetch pool, one unhinted free block must be available.
A random-access iterator class for block oriented data.
void update_extract_buffer()
Update minima tree after an item of the extract buffer was removed.
uint_pair & operator--()
prefix decrement operator (directly manipulates the integer parts)
Definition: uint_types.h:173
Parallelized External Memory Priority Queue.
static size_t int_memory(size_t capacity)
Return the amount of internal memory used by an array with the capacity in number of items...
unsigned_type m_num_used_read_blocks
number of currently loaded blocks
static unsigned_type push_heap(RandomAccessIterator first, RandomAccessIterator last, HeapCompareType comp)
SiftUp a new element from the last position in the heap, reestablishing the heap invariant.
void bulk_push(const value_type &element, const unsigned_type p)
Push an element inside a sequence of pushes.
size_t num_blocks() const
Return the number of blocks.
unsigned_type m_unhinted_block
The first unhinted block index.
internal_array_type::iterator iterator
internal_array(std::vector< value_type > &values, unsigned_type min_index=0, unsigned_type level=0)
Constructor which takes a value vector. The value vector is empty afterwards.
Fast uniform [0, 2^32) pseudo-random generator with period 2^32, random bits: 32. Reentrant variant o...
Definition: rand.h:74
unsigned_type level() const
Returns the level (group number) of the array.
minima_tree< ParentType > self_type
#define STXXL_DEBUG(x)
STXXL_DEBUG is a macro which prints iff the locally defined variable &quot;debug&quot; is true. Use this for scoped-based debug variables.
Definition: verbose.h:156
void rebuild()
Build from winner tree from scratch.
Definition: winner_tree.h:309
static uint_pair max()
return an uint_pair instance containing the largest value possible
Definition: uint_types.h:241
parent_type::external_arrays_type eas_type
winner_tree< external_min_comparator > m_external_min_tree
Tracks the largest accessible values of the external arrays if there is unaccessible data in EM...
block_type * steal()
Take out a block from the pool.
parent_type::internal_arrays_type ias_type
winner_tree< head_comp > m_head
The winner trees.
const value_type & top()
Access the minimum element.
#define STXXL_VARDUMP(x)
Definition: verbose.h:81
pointer m_current
pointer to the current element
stats_timer refill_minmax_time
Total time for determining the smallest max value in refill_extract_buffer() Part of refill_extract_b...
external_min_comparator(const external_arrays_type &eas, const inv_compare_type &compare)
void aggregate_push(const value_type &element)
Aggregate pushes.
read_write_pool< block_type > pool_type
typed_block< block_size, value_type > block_type
void prepare_write(unsigned_type num_threads)
prepare the external_array for writing using multiway_merge() with num_threads. this method is called...
stats_timer refill_time_before_merge
Total time for all things before merging in refill_extract_buffer() Part of refill_extract_buffer_tim...
parallel_priority_queue(const compare_type &compare=compare_type(), size_type total_ram=DefaultMemSize, float num_read_blocks_per_ea=1.5f, unsigned_type num_write_buffer_blocks=c_num_write_buffer_blocks, unsigned_type num_insertion_heaps=0, size_type single_heap_ram=c_default_single_heap_ram, size_type extract_buffer_ram=0)
Constructor.
external_array()
Default constructor. Don&#39;t use this directy. Needed for regrowing in surrounding vector.
const size_type m_mem_for_heaps
Size of all insertion heaps together in bytes.
void new_blocks(const DiskAssignFunctor &functor, BIDIteratorClass bidbegin, BIDIteratorClass bidend, unsigned_type offset=0)
Allocates new blocks.
size_type memory_consumption() const
The memory consumption in Bytes.
external_arrays_type m_external_arrays
The sorted arrays in external memory.
stxxl::swap_vector< internal_array_type > internal_arrays_type
type of internal arrays vector
size_type heap_add_size
The number of items inserted into the insheap during bulk parallel access.
void deactivate_internal_array(unsigned index)
Remove an internal array from the minima tree.
size_t m_index
index of the current element
#define STXXL_ERRMSG(x)
Definition: verbose.h:94
pool_type * m_pool
Common prefetch and write buffer pool.
void advance_arrays(std::vector< iterator_pair_type > &sequences, std::vector< size_type > &sizes, size_t eas, size_t ias)
typed_block< BlockSize, value_type > block_type
void limit_push(const value_type &element, const unsigned_type p=0)
Push new item &gt;= bulk-limit element into insertion heap p.
std::string to_string() const
Returns a readable representation of the winner tree as string.
const compare_type & m_compare
value_type comparator
stats_counter num_insertion_heap_flushes
Number of flush_insertion_heaps() calls.
const value_type & get_value(int input) const
bool m_limit_has_full_range
flag if the extract buffer contains the full limit range
stats_timer refill_time_after_merge
Total time for all things after merging in refill_extract_buffer() Part of refill_extract_buffer_time...
internal_array()
Default constructor. Don&#39;t use this directy. Needed for regrowing in surrounding vector.
void resize_read_pool()
Automatically resize the read/prefetch buffer pool depending on number of external arrays...
uint64 external_size_type
Definition: types.h:67
const value_type & get_min()
Returns the smallest element in the array.
choose_int_types< my_pointer_size >::unsigned_type unsigned_type
Definition: types.h:64
size_t m_block_index
index of the current element&#39;s block
Internal arrays store a sorted sequence of values in RAM, which will be merged together into the dele...
size_t m_block_items
size of each data block
dummy_custom_stats_counter< uint64 > stats_counter
Defines if statistics are gathered: dummy_custom_stats_counter or custom_stats_counter.
void bulk_push(const value_type &element)
Push an element inside a bulk sequence of pushes.
#define STXXL_DEBUG0(x)
STXXL_DEBUG0 is a macro which never prints the debug message, used to temporarily disable STXXL_DEBUG...
Definition: verbose.h:160
std::vector< BID< block_size > > bid_vector
std::vector< value_type > minima_vector
stats_counter max_num_internal_arrays
Largest number of internal arrays at the same time.
void check_invariants() const
Assert many invariants of the data structures.
compare_type m_compare
&lt;-Comparator for value_type
#define STXXL_MSG(x)
Definition: verbose.h:73
Unary operator which returns true if the external array has run empty.
size_type m_mem_left
Free memory in bytes.
void finish_write()
finish the writing phase after multiway_merge() filled the vector. this method is called by the exter...
void deactivate_extract_buffer()
Remove the extract buffer from the minima tree.
unsigned_type c_max_external_level_size
currently global public tuning parameter:
void limit_end()
Finish bulk-limit extraction session.
ia_comp(ias_type &ias, const compare_type &compare)
iterator(const iterator &other)
copy an iterator, the new iterator is not automatically live!
block_type * get_block_ref(size_t block_index)
read block into memory and increase reference count (called when an iterator goes live on the block)...
void update_internal_array(unsigned index)
Update minima tree after an item from an internal array was removed.
heaps_comp(proc_vector_type &proc, const compare_type &compare)
Comparator for the insertion heaps winner tree.
The minima_tree contains minima from all sources inside the PPQ.
WinnerTree-Comparator for the head winner tree. It accesses all relevant data structures from the pri...
bool empty() const
Returns if the array has run empty.
const block_pointers_type * m_block_pointers
pointer to a vector of begin/end pointer pairs They allow access to the data blocks.
iterator begin() const
Begin iterator.
value_type m_limit_element
current limit element
void print_stats() const
Print statistics.
s_min_tree_comparator(const external_arrays_type &eas, const inv_compare_type &compare, const std::vector< unsigned_type > &indices)
const unsigned m_insertion_heap_capacity
Capacity of one inserion heap.
void swap(external_array &o)
Swap external_array with another one.
unsigned int m_ref_total
total number of iterators referencing this writer
const value_type & limit_top()
Access the minimum element, which can only be in the extract buffer.
const value_type & get_min() const
The currently smallest element in the array.
size_type m_heaps_size
Number of elements int the insertion heaps.
stats_counter num_external_array_merges
Number of merge_external_arrays() calls.
hint_comparator(const external_arrays_type &eas, const inv_compare_type &compare)
external_size_type m_end_index
The index behind the last element that is located in RAM (or is at least requested to be so) ...
std::vector< value_type > heap_type
type of insertion heap itself
Comparator for the internal arrays winner tree.
stats_counter num_extract_buffer_refills
Number of refill_extract_buffer() calls.
static void prepare_write_pool(pool_type &pool, unsigned_type num_threads)
prepare the pool for writing external arrays with given number of threads
external_size_type m_size
The total number of elements minus the number of extracted values.
void hint_external_arrays()
Hints EA blocks which will be needed soon. Hints at most m_num_prefetchers blocks globally...
size_t capacity() const
Returns the initial size of the array.
size_type m_extract_buffer_size
Number of elements in the extract buffer.
void push(const value_type &element, unsigned_type p=0)
Insert new element.
bool block_valid(size_t block_index) const
Returns if the block with the given index is completely fetched.
bool m_write_phase
Is array in write phase? True = write phase, false = read phase.
void resize_write(size_type new_size)
Resizes size of the pool.
Unary operator which returns true if the internal array has run empty.
block_pointers_type m_block_pointers
Begin and end pointers of the array This is used by the iterator.
size_type size_write() const
Returns number of blocks owned by the write_pool.
void pop()
Remove the minimum element.
void free_block_ref(size_t block_index)
decrease reference count on the block, and possibly write it to disk (called when an iterator release...
unsigned_type m_num_blocks
Number of blocks, again: calculated at construction time.
external_array_writer(ea_type &ea, unsigned int num_threads=0)
void deactivate_without_replay(unsigned_type index)
deactivate a player
Definition: winner_tree.h:117
external_array_type::writer_type external_array_writer_type
bool operator==(const uint_pair &b) const
equality checking operator
Definition: uint_types.h:192
ppq_iterator(const block_pointers_type *block_pointers, size_t block_items, size_t index)
constructor
void check_external_level(unsigned_type level, bool force_merge_all=false)
Merges external arrays if there are too many external arrays on the same level.
#define STXXL_END_NAMESPACE
Definition: namespace.h:17
size_t get_index() const
returns the value&#39;s index in the internal or external array
const size_type m_mem_total
Total amount of internal memory.
unsigned_type get_current_block_index() const
Returns the block in which m_index is located.
std::vector< value_type >::iterator value_iterator
stxxl::swap_vector< external_array_type > external_arrays_type
type of external arrays vector
iterator::block_pointers_type block_pointers_type