STXXL  1.4-dev
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
priority_queue.h
Go to the documentation of this file.
1 /***************************************************************************
2  * include/stxxl/bits/containers/priority_queue.h
3  *
4  * Implements a data structure from "Peter Sanders. Fast Priority Queues for
5  * Cached Memory. ALENEX'99" for external memory.
6  *
7  * Part of the STXXL. See http://stxxl.sourceforge.net
8  *
9  * Copyright (C) 1999 Peter Sanders <[email protected]>
10  * Copyright (C) 2003, 2004, 2007 Roman Dementiev <[email protected]>
11  * Copyright (C) 2007-2009 Johannes Singler <[email protected]>
12  * Copyright (C) 2007-2010 Andreas Beckmann <[email protected]>
13  * Copyright (C) 2014 Timo Bingmann <[email protected]>
14  *
15  * Distributed under the Boost Software License, Version 1.0.
16  * (See accompanying file LICENSE_1_0.txt or copy at
17  * http://www.boost.org/LICENSE_1_0.txt)
18  **************************************************************************/
19 
20 #ifndef STXXL_CONTAINERS_PRIORITY_QUEUE_HEADER
21 #define STXXL_CONTAINERS_PRIORITY_QUEUE_HEADER
22 
27 
29 
30 /*
31  KNBufferSize1 = 32;
32  KNN = 512; // length of group 1 sequences
33  KNKMAX = 64; // maximal arity
34  LogKNKMAX = 6; // ceil(log KNKMAX)
35  KNLevels = 4; // overall capacity >= KNN*KNKMAX^KNLevels
36  */
37 
38 // internal memory consumption >= N_*(KMAX_^IntLevels_) + ext
39 
40 template <
41  class ValueType,
42  class CompareType,
43  unsigned BufferSize1_ = 32, // equalize procedure call overheads etc.
44  unsigned N_ = 512, // length of group 1 sequences
45  unsigned IntKMAX_ = 64, // maximal arity for internal mergers
46  unsigned IntLevels_ = 4, // number of internal groups
47  unsigned BlockSize_ = (2* 1024* 1024), // external block size
48  unsigned ExtKMAX_ = 64, // maximal arity for external mergers
49  unsigned ExtLevels_ = 2, // number of external groups
50  class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY
51  >
53 {
54  typedef ValueType value_type;
55  typedef CompareType comparator_type;
56  typedef AllocStr_ alloc_strategy_type;
57  enum
58  {
59  delete_buffer_size = BufferSize1_,
60  N = N_,
61  IntKMAX = IntKMAX_,
62  num_int_groups = IntLevels_,
63  num_ext_groups = ExtLevels_,
64  BlockSize = BlockSize_,
65  ExtKMAX = ExtKMAX_,
66  element_size = sizeof(ValueType)
67  };
68 };
69 
71 
72 namespace std {
73 
74 template <class BlockType,
75  class CompareType,
76  unsigned Arity,
77  class AllocStr>
80 {
81  a.swap(b);
82 }
83 template <class ValueType, class CompareType, unsigned KNKMAX>
86 {
87  a.swap(b);
88 }
89 
90 } // namespace std
91 
93 
94 //! External priority queue data structure \n
95 //! <b> Introduction </b> to priority queue container: see \ref tutorial_pqueue tutorial. \n
96 //! <b> Design and Internals </b> of priority queue container: see \ref design_pqueue.
97 template <class ConfigType>
98 class priority_queue : private noncopyable
99 {
100 public:
101  typedef ConfigType Config;
102  enum
103  {
104  delete_buffer_size = Config::delete_buffer_size,
105  N = Config::N,
106  IntKMAX = Config::IntKMAX,
107  num_int_groups = Config::num_int_groups,
108  num_ext_groups = Config::num_ext_groups,
109  total_num_groups = Config::num_int_groups + Config::num_ext_groups,
110  BlockSize = Config::BlockSize,
111  ExtKMAX = Config::ExtKMAX
112  };
113 
114  //! The type of object stored in the priority_queue.
115  typedef typename Config::value_type value_type;
116  //! Comparison object.
117  typedef typename Config::comparator_type comparator_type;
118  typedef typename Config::alloc_strategy_type alloc_strategy_type;
119  //! An unsigned integral type (64 bit).
121  //! Type of the block used in disk-memory transfers
124 
125 protected:
128 
130  value_type,
131  comparator_type,
132  IntKMAX> int_merger_type;
133 
135  block_type,
136  comparator_type,
137  ExtKMAX,
138  alloc_strategy_type> ext_merger_type;
139 
140  int_merger_type int_mergers[num_int_groups];
144 
145  // one delete buffer for each tree => group buffer
146  value_type group_buffers[total_num_groups][N + 1]; // tree->group_buffers->delete_buffer (extra space for sentinel)
147  value_type* group_buffer_current_mins[total_num_groups]; // group_buffer_current_mins[i] is current start of group_buffers[i], end is group_buffers[i] + N
148 
149  // overall delete buffer
150  value_type delete_buffer[delete_buffer_size + 1];
151  value_type* delete_buffer_current_min; // current start of delete_buffer
152  value_type* delete_buffer_end; // end of delete_buffer
153 
154  comparator_type cmp;
155 
156  // insert buffer
158 
159  // how many groups are active
161 
162  // total size not counting insert_heap and delete_buffer
164 
165 private:
166  void init();
167 
168  void refill_delete_buffer();
169  size_type refill_group_buffer(unsigned_type k);
170 
171  unsigned_type make_space_available(unsigned_type level);
172  void empty_insert_heap();
173 
174  value_type get_supremum() const { return cmp.min_value(); } //{ return group_buffers[0][KNN].key; }
175  unsigned_type current_delete_buffer_size() const { return delete_buffer_end - delete_buffer_current_min; }
176  unsigned_type current_group_buffer_size(unsigned_type i) const { return &(group_buffers[i][N]) - group_buffer_current_mins[i]; }
177 
178 public:
179  //! \name Constructors/Destructors
180  //! \{
181 
182  //! Constructs external priority queue object.
183  //! \param pool_ pool of blocks that will be used
184  //! for data writing and prefetching for the disk<->memory transfers
185  //! happening in the priority queue. Larger pool size
186  //! helps to speed up operations.
187  priority_queue(pool_type& pool_);
188 
189  //! Constructs external priority queue object.
190  //! \param p_pool_ pool of blocks that will be used
191  //! for data prefetching for the disk<->memory transfers
192  //! happening in the priority queue. Larger pool size
193  //! helps to speed up operations.
194  //! \param w_pool_ pool of blocks that will be used
195  //! for writing data for the memory<->disk transfers
196  //! happening in the priority queue. Larger pool size
197  //! helps to speed up operations.
200  );
201 
202  //! Constructs external priority queue object.
203  //! \param p_pool_mem memory (in bytes) for prefetch pool that will be used
204  //! for data prefetching for the disk<->memory transfers
205  //! happening in the priority queue. Larger pool size
206  //! helps to speed up operations.
207  //! \param w_pool_mem memory (in bytes) for buffered write pool that will be used
208  //! for writing data for the memory<->disk transfers
209  //! happening in the priority queue. Larger pool size
210  //! helps to speed up operations.
211  priority_queue(unsigned_type p_pool_mem, unsigned_type w_pool_mem);
212 
213  virtual ~priority_queue();
214 
215  //! \}
216 
217 #if 0
218 
219  //! swap this priority queue with another one.
220  //! Implementation correctness is questionable.
221  void swap(priority_queue& obj)
222  {
223  //swap_1D_arrays(int_mergers,obj.int_mergers,num_int_groups); // does not work in g++ 3.4.3 :( bug?
224  for (unsigned_type i = 0; i < num_int_groups; ++i)
225  std::swap(int_mergers[i], obj.int_mergers[i]);
226 
227  //std::swap(pool,obj.pool);
228  //std::swap(pool_owned, obj.pool_owned);
229  std::swap(ext_mergers, obj.ext_mergers);
230  for (unsigned_type i1 = 0; i1 < total_num_groups; ++i1)
231  for (unsigned_type i2 = 0; i2 < (N + 1); ++i2)
232  std::swap(group_buffers[i1][i2], obj.group_buffers[i1][i2]);
233 
234  STXXL_STATIC_ASSERT(false);
235  // Shoot yourself in the foot: group_buffer_current_mins contains pointers into group_buffers ...
236  // either recompute them or add/subtract (&this->group_buffers[0][0] - &obj->group_buffers[0][0])
237  swap_1D_arrays(group_buffer_current_mins, obj.group_buffer_current_mins, total_num_groups);
238  swap_1D_arrays(delete_buffer, obj.delete_buffer, delete_buffer_size + 1);
239  std::swap(delete_buffer_current_min, obj.delete_buffer_current_min);
240  std::swap(delete_buffer_end, obj.delete_buffer_end);
241  std::swap(cmp, obj.cmp);
242  std::swap(insert_heap, obj.insert_heap);
243  std::swap(num_active_groups, obj.num_active_groups);
244  std::swap(size_, obj.size_);
245  }
246 #endif
247 
248  //! \name Capacity
249  //! \{
250 
251  //! Returns number of elements contained.
252  //! \return number of elements contained
253  size_type size() const;
254 
255  //! Returns true if queue has no elements.
256  //! \return \b true if queue has no elements, \b false otherwise
257  bool empty() const { return (size() == 0); }
258 
259  //! \}
260 
261  //! \name Operators
262  //! \{
263 
264  //! Returns "largest" element.
265  //!
266  //! Returns a const reference to the element at the top of the
267  //! priority_queue. The element at the top is guaranteed to be the largest
268  //! element in the \b priority queue, as determined by the comparison
269  //! function \b comparator_type (the same as the second parameter of
270  //! PRIORITY_QUEUE_GENERATOR utility class). That is, for every other
271  //! element \b x in the priority_queue, \b comparator_type(Q.top(), x) is
272  //! false. Precondition: \c empty() is false.
273  const value_type & top() const;
274 
275  //! \}
276 
277  //! \name Modifiers
278  //! \{
279 
280  //! Removes the element at the top.
281  //!
282  //! Removes the element at the top of the priority_queue, that is, the
283  //! largest element in the \b priority_queue. Precondition: \c empty() is
284  //! \b false. Postcondition: \c size() will be decremented by 1.
285  void pop();
286 
287  //! Inserts x into the priority_queue.
288  //!
289  //! Inserts x into the priority_queue. Postcondition: \c size() will be
290  //! incremented by 1.
291  void push(const value_type& obj);
292 
293  //! \}
294 
295  //! \name Miscellaneous
296  //! \{
297 
298  //! Number of bytes consumed by the \b priority_queue from the internal
299  //! memory not including pools (see the constructor)
301  {
302  unsigned_type dynam_alloc_mem = 0;
303  //dynam_alloc_mem += w_pool.mem_cons();
304  //dynam_alloc_mem += p_pool.mem_cons();
305  for (int i = 0; i < num_int_groups; ++i)
306  dynam_alloc_mem += int_mergers[i].mem_cons();
307 
308  for (int i = 0; i < num_ext_groups; ++i)
309  dynam_alloc_mem += ext_mergers[i]->mem_cons();
310 
311  return (sizeof(*this) +
312  sizeof(ext_merger_type) * num_ext_groups +
313  dynam_alloc_mem);
314  }
315 
316  void dump_sizes() const;
317  void dump_params() const;
318 
319  //! \}
320 };
321 
322 template <class ConfigType>
323 inline typename priority_queue<ConfigType>::size_type
325 {
326  return size_ +
327  insert_heap.size() - 1 +
328  (delete_buffer_end - delete_buffer_current_min);
329 }
330 
331 template <class ConfigType>
332 inline const typename priority_queue<ConfigType>::value_type &
334 {
335  assert(!insert_heap.empty());
336 
337  const typename priority_queue<ConfigType>::value_type& t = insert_heap.top();
338  if (/*(!insert_heap.empty()) && */ cmp(*delete_buffer_current_min, t))
339  return t;
340  else
341  return *delete_buffer_current_min;
342 }
343 
344 template <class ConfigType>
346 {
347  //STXXL_VERBOSE1("priority_queue::pop()");
348  assert(!insert_heap.empty());
349 
350  if (/*(!insert_heap.empty()) && */ cmp(*delete_buffer_current_min, insert_heap.top()))
351  insert_heap.pop();
352  else
353  {
354  assert(delete_buffer_current_min < delete_buffer_end);
355  ++delete_buffer_current_min;
356  if (delete_buffer_current_min == delete_buffer_end)
357  refill_delete_buffer();
358  }
359 }
360 
361 template <class ConfigType>
362 inline void priority_queue<ConfigType>::push(const value_type& obj)
363 {
364  //STXXL_VERBOSE3("priority_queue::push("<< obj <<")");
365  assert(!int_mergers->is_sentinel(obj));
366  if (insert_heap.size() == N + 1)
367  empty_insert_heap();
368 
369  assert(!insert_heap.empty());
370 
371  insert_heap.push(obj);
372 }
373 
374 ////////////////////////////////////////////////////////////////
375 
376 template <class ConfigType>
378  : pool(&pool_),
379  pool_owned(false),
380  delete_buffer_end(delete_buffer + delete_buffer_size),
381  insert_heap(N + 2),
382  num_active_groups(0), size_(0)
383 {
384  STXXL_VERBOSE_PQ("priority_queue(pool)");
385  init();
386 }
387 
388 // DEPRECATED
389 template <class ConfigType>
391  : pool(new pool_type(p_pool_, w_pool_)),
392  pool_owned(true),
393  delete_buffer_end(delete_buffer + delete_buffer_size),
394  insert_heap(N + 2),
395  num_active_groups(0), size_(0)
396 {
397  STXXL_VERBOSE_PQ("priority_queue(p_pool, w_pool)");
398  init();
399 }
400 
401 template <class ConfigType>
403  : pool(new pool_type(p_pool_mem / BlockSize, w_pool_mem / BlockSize)),
404  pool_owned(true),
405  delete_buffer_end(delete_buffer + delete_buffer_size),
406  insert_heap(N + 2),
407  num_active_groups(0), size_(0)
408 {
409  STXXL_VERBOSE_PQ("priority_queue(pool sizes)");
410  init();
411 }
412 
413 template <class ConfigType>
415 {
416  assert(!cmp(cmp.min_value(), cmp.min_value())); // verify strict weak ordering
417 
418  ext_mergers = new ext_merger_type*[num_ext_groups];
419  for (unsigned_type j = 0; j < num_ext_groups; ++j) {
420  ext_mergers[j] = new ext_merger_type;
421  ext_mergers[j]->set_pool(pool);
422  }
423 
424  value_type sentinel = cmp.min_value();
425  insert_heap.push(sentinel); // always keep the sentinel
426  delete_buffer[delete_buffer_size] = sentinel; // sentinel
427  delete_buffer_current_min = delete_buffer_end; // empty
428  for (unsigned_type i = 0; i < total_num_groups; i++)
429  {
430  group_buffers[i][N] = sentinel; // sentinel
431  group_buffer_current_mins[i] = &(group_buffers[i][N]); // empty
432  }
433 }
434 
435 template <class ConfigType>
437 {
438  STXXL_VERBOSE_PQ("~priority_queue()");
439  if (pool_owned)
440  delete pool;
441 
442  for (unsigned_type j = 0; j < num_ext_groups; ++j)
443  delete ext_mergers[j];
444  delete[] ext_mergers;
445 }
446 
447 //--------------------- Buffer refilling -------------------------------
448 
449 // refill group_buffers[j] and return number of elements found
450 template <class ConfigType>
453 {
454  STXXL_VERBOSE_PQ("refill_group_buffer(" << group << ")");
455 
456  value_type* target;
457  size_type length;
458  size_type group_size = (group < num_int_groups) ?
459  int_mergers[group].size() :
460  ext_mergers[group - num_int_groups]->size(); // elements left in segments
461  unsigned_type left_elements = group_buffers[group] + N - group_buffer_current_mins[group]; //elements left in target buffer
462  if (group_size + left_elements >= size_type(N))
463  { // buffer will be filled completely
464  target = group_buffers[group];
465  length = N - left_elements;
466  }
467  else
468  {
469  target = group_buffers[group] + N - group_size - left_elements;
470  length = group_size;
471  }
472 
473  if (length > 0)
474  {
475  // shift remaininig elements to front
476  memmove(target, group_buffer_current_mins[group], left_elements * sizeof(value_type));
477  group_buffer_current_mins[group] = target;
478 
479  // fill remaining space from group
480  if (group < num_int_groups)
481  int_mergers[group].multi_merge(target + left_elements,
482  target + left_elements + length);
483  else
484  ext_mergers[group - num_int_groups]->multi_merge(
485  target + left_elements,
486  target + left_elements + length);
487  }
488 
489  //STXXL_MSG(length + left_elements);
490  //std::copy(target,target + length + left_elements,std::ostream_iterator<value_type>(std::cout, "\n"));
491 #if STXXL_CHECK_ORDER_IN_SORTS
493  if (!stxxl::is_sorted(group_buffer_current_mins[group], group_buffers[group] + N, inv_cmp))
494  {
495  STXXL_VERBOSE_PQ("refill_grp... length: " << length << " left_elements: " << left_elements);
496  for (value_type* v = group_buffer_current_mins[group] + 1; v < group_buffer_current_mins[group] + left_elements; ++v)
497  {
498  if (inv_cmp(*v, *(v - 1)))
499  {
500  STXXL_MSG("Error in buffer " << group << " at position " << (v - group_buffer_current_mins[group] - 1) << "/" << (v - group_buffer_current_mins[group]) << " " << *(v - 2) << " " << *(v - 1) << " " << *v << " " << *(v + 1));
501  }
502  }
503  assert(false);
504  }
505 #endif
506 
507  return length + left_elements;
508 }
509 
510 template <class ConfigType>
512 {
513  STXXL_VERBOSE_PQ("refill_delete_buffer()");
514 
515  size_type total_group_size = 0;
516  //num_active_groups is <= 4
517  for (unsigned_type i = num_active_groups; i > 0; )
518  {
519  --i;
520  if ((group_buffers[i] + N) - group_buffer_current_mins[i] < delete_buffer_size)
521  {
522  size_type length = refill_group_buffer(i);
523  // max active level dry now?
524  if (length == 0 && unsigned(i) == num_active_groups - 1)
525  --num_active_groups;
526 
527  total_group_size += length;
528  }
529  else
530  total_group_size += delete_buffer_size; // actually only a sufficient lower bound
531  }
532 
533  size_type length;
534  if (total_group_size >= delete_buffer_size) // buffer can be filled completely
535  {
536  length = delete_buffer_size; // amount to be copied
537  size_ -= size_type(delete_buffer_size); // amount left in group_buffers
538  }
539  else
540  {
541  length = total_group_size;
542  assert(size_ == length); // trees and group_buffers get empty
543  size_ = 0;
544  }
545 
547 
548  // now call simplified refill routines
549  // which can make the assumption that
550  // they find all they are asked in the buffers
551  delete_buffer_current_min = delete_buffer_end - length;
552  STXXL_VERBOSE_PQ("refill_del... Active groups = " << num_active_groups);
553  switch (num_active_groups)
554  {
555  case 0:
556  break;
557  case 1:
558  std::copy(group_buffer_current_mins[0], group_buffer_current_mins[0] + length, delete_buffer_current_min);
559  group_buffer_current_mins[0] += length;
560  break;
561  case 2:
562 #if STXXL_PARALLEL && STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER
563  {
564  std::pair<value_type*, value_type*> seqs[2] =
565  {
566  std::make_pair(group_buffer_current_mins[0], group_buffers[0] + N),
567  std::make_pair(group_buffer_current_mins[1], group_buffers[1] + N)
568  };
569 
571  seqs, seqs + 2, delete_buffer_current_min, length, inv_cmp);
572  // sequence iterators are progressed appropriately
573 
574  group_buffer_current_mins[0] = seqs[0].first;
575  group_buffer_current_mins[1] = seqs[1].first;
576  }
577 #else
579  group_buffer_current_mins[0], group_buffer_current_mins[1],
580  delete_buffer_current_min, delete_buffer_current_min + length, cmp);
581 #endif
582  break;
583  case 3:
584 #if STXXL_PARALLEL && STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER
585  {
586  std::pair<value_type*, value_type*> seqs[3] =
587  {
588  std::make_pair(group_buffer_current_mins[0], group_buffers[0] + N),
589  std::make_pair(group_buffer_current_mins[1], group_buffers[1] + N),
590  std::make_pair(group_buffer_current_mins[2], group_buffers[2] + N)
591  };
592 
594  seqs, seqs + 3, delete_buffer_current_min, length, inv_cmp);
595  // sequence iterators are progressed appropriately
596 
597  group_buffer_current_mins[0] = seqs[0].first;
598  group_buffer_current_mins[1] = seqs[1].first;
599  group_buffer_current_mins[2] = seqs[2].first;
600  }
601 #else
603  group_buffer_current_mins[0],
604  group_buffer_current_mins[1],
605  group_buffer_current_mins[2],
606  delete_buffer_current_min, delete_buffer_current_min + length, cmp);
607 #endif
608  break;
609  case 4:
610 #if STXXL_PARALLEL && STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER
611  {
612  std::pair<value_type*, value_type*> seqs[4] =
613  {
614  std::make_pair(group_buffer_current_mins[0], group_buffers[0] + N),
615  std::make_pair(group_buffer_current_mins[1], group_buffers[1] + N),
616  std::make_pair(group_buffer_current_mins[2], group_buffers[2] + N),
617  std::make_pair(group_buffer_current_mins[3], group_buffers[3] + N)
618  };
619 
621  seqs, seqs + 4, delete_buffer_current_min, length, inv_cmp);
622  // sequence iterators are progressed appropriately
623 
624  group_buffer_current_mins[0] = seqs[0].first;
625  group_buffer_current_mins[1] = seqs[1].first;
626  group_buffer_current_mins[2] = seqs[2].first;
627  group_buffer_current_mins[3] = seqs[3].first;
628  }
629 #else
631  group_buffer_current_mins[0],
632  group_buffer_current_mins[1],
633  group_buffer_current_mins[2],
634  group_buffer_current_mins[3],
635  delete_buffer_current_min, delete_buffer_current_min + length, cmp);
636  // side effect free
637 #endif
638  break;
639  default:
640  STXXL_THROW2(std::runtime_error, "priority_queue<...>::refill_delete_buffer()",
641  "Overflow! The number of buffers on 2nd level in stxxl::priority_queue is currently limited to 4");
642  }
643 
644 #if STXXL_CHECK_ORDER_IN_SORTS
645  if (!stxxl::is_sorted(delete_buffer_current_min, delete_buffer_end, inv_cmp))
646  {
647  for (value_type* v = delete_buffer_current_min + 1; v < delete_buffer_end; ++v)
648  {
649  if (inv_cmp(*v, *(v - 1)))
650  {
651  STXXL_MSG("Error at position " << (v - delete_buffer_current_min - 1) << "/" << (v - delete_buffer_current_min) << " " << *(v - 1) << " " << *v);
652  }
653  }
654  assert(false);
655  }
656 #endif
657  //std::copy(delete_buffer_current_min,delete_buffer_current_min + length,std::ostream_iterator<value_type>(std::cout, "\n"));
658 }
659 
660 //--------------------------------------------------------------------
661 
662 // check if space is available on level k and
663 // empty this level if necessary leading to a recursive call.
664 // return the level where space was finally available
665 template <class ConfigType>
667 {
668  STXXL_VERBOSE_PQ("make_space_available(" << level << ")");
669  unsigned_type finalLevel;
670  assert(level < total_num_groups);
671  assert(level <= num_active_groups);
672 
673  if (level == num_active_groups)
674  ++num_active_groups;
675 
676  const bool spaceIsAvailable_ =
677  (level < num_int_groups) ? int_mergers[level].is_space_available()
678  : (ext_mergers[level - num_int_groups]->is_space_available());
679 
680  if (spaceIsAvailable_)
681  {
682  finalLevel = level;
683  }
684  else if (level == total_num_groups - 1)
685  {
686  size_type capacity = N;
687  for (int i = 0; i < num_int_groups; ++i)
688  capacity *= IntKMAX;
689  for (int i = 0; i < num_ext_groups; ++i)
690  capacity *= ExtKMAX;
691  STXXL_ERRMSG("priority_queue OVERFLOW - all groups full, size=" << size() <<
692  ", capacity(last externel group (" << num_int_groups + num_ext_groups - 1 << "))=" << capacity);
693  dump_sizes();
694 
695  unsigned_type extLevel = level - num_int_groups;
696  const size_type segmentSize = ext_mergers[extLevel]->size();
697  STXXL_VERBOSE1("Inserting segment into last level external: " << level << " " << segmentSize);
698  ext_merger_type* overflow_merger = new ext_merger_type;
699  overflow_merger->set_pool(pool);
700  overflow_merger->append_merger(*ext_mergers[extLevel], segmentSize);
701  std::swap(ext_mergers[extLevel], overflow_merger);
702  delete overflow_merger;
703  finalLevel = level;
704  }
705  else
706  {
707  finalLevel = make_space_available(level + 1);
708 
709  if (level < num_int_groups - 1) // from internal to internal tree
710  {
711  unsigned_type segmentSize = int_mergers[level].size();
712  value_type* newSegment = new value_type[segmentSize + 1];
713  int_mergers[level].multi_merge(newSegment, newSegment + segmentSize); // empty this level
714 
715  newSegment[segmentSize] = delete_buffer[delete_buffer_size]; // sentinel
716  // for queues where size << #inserts
717  // it might make sense to stay in this level if
718  // segmentSize < alpha * KNN * k^level for some alpha < 1
719  int_mergers[level + 1].append_array(newSegment, segmentSize);
720  }
721  else
722  {
723  if (level == num_int_groups - 1) // from internal to external tree
724  {
725  const unsigned_type segmentSize = int_mergers[num_int_groups - 1].size();
726  STXXL_VERBOSE_PQ("make_space... Inserting segment into first level external: " << level << " " << segmentSize);
727  ext_mergers[0]->append_merger(int_mergers[num_int_groups - 1], segmentSize);
728  }
729  else // from external to external tree
730  {
731  const size_type segmentSize = ext_mergers[level - num_int_groups]->size();
732  STXXL_VERBOSE_PQ("make_space... Inserting segment into second level external: " << level << " " << segmentSize);
733  ext_mergers[level - num_int_groups + 1]->append_merger(*ext_mergers[level - num_int_groups], segmentSize);
734  }
735  }
736  }
737  return finalLevel;
738 }
739 
740 // empty the insert heap into the main data structure
741 template <class ConfigType>
743 {
744  STXXL_VERBOSE_PQ("empty_insert_heap()");
745  assert(insert_heap.size() == (N + 1));
746 
747  const value_type sup = get_supremum();
748 
749  // build new segment
750  value_type* newSegment = new value_type[N + 1];
751  value_type* newPos = newSegment;
752 
753  // put the new data there for now
754  //insert_heap.sortTo(newSegment);
755  value_type* SortTo = newSegment;
756 
757  insert_heap.sort_to(SortTo);
758 
759  SortTo = newSegment + N;
760  insert_heap.clear();
761  insert_heap.push(*SortTo);
762 
763  assert(insert_heap.size() == 1);
764 
765  newSegment[N] = sup; // sentinel
766 
767  // copy the delete_buffer and group_buffers[0] to temporary storage
768  // (the temporary can be eliminated using some dirty tricks)
769  const unsigned_type tempSize = N + delete_buffer_size;
770  value_type temp[tempSize + 1];
771  unsigned_type sz1 = current_delete_buffer_size();
772  unsigned_type sz2 = current_group_buffer_size(0);
773  value_type* pos = temp + tempSize - sz1 - sz2;
774  std::copy(delete_buffer_current_min, delete_buffer_current_min + sz1, pos);
775  std::copy(group_buffer_current_mins[0], group_buffer_current_mins[0] + sz2, pos + sz1);
776  temp[tempSize] = sup; // sentinel
777 
778  // refill delete_buffer
779  // (using more complicated code it could be made somewhat fuller
780  // in certain circumstances)
782  pos, newPos,
783  delete_buffer_current_min, delete_buffer_current_min + sz1, cmp);
784 
785  // refill group_buffers[0]
786  // (as above we might want to take the opportunity
787  // to make group_buffers[0] fuller)
789  pos, newPos,
790  group_buffer_current_mins[0], group_buffer_current_mins[0] + sz2, cmp);
791 
792  // merge the rest to the new segment
793  // note that merge exactly trips into the footsteps
794  // of itself
796  newSegment, newSegment + N, cmp);
797 
798  // and insert it
799  unsigned_type freeLevel = make_space_available(0);
800  assert(freeLevel == 0 || int_mergers[0].size() == 0);
801  int_mergers[0].append_array(newSegment, N);
802 
803  // get rid of invalid level 2 buffers
804  // by inserting them into tree 0 (which is almost empty in this case)
805  if (freeLevel > 0)
806  {
807  for (int_type i = freeLevel; i >= 0; i--)
808  {
809  // reverse order not needed
810  // but would allow immediate refill
811 
812  newSegment = new value_type[current_group_buffer_size(i) + 1]; // with sentinel
813  std::copy(group_buffer_current_mins[i], group_buffer_current_mins[i] + current_group_buffer_size(i) + 1, newSegment);
814  int_mergers[0].append_array(newSegment, current_group_buffer_size(i));
815  group_buffer_current_mins[i] = group_buffers[i] + N; // empty
816  }
817  }
818 
819  // update size
820  size_ += size_type(N);
821 
822  // special case if the tree was empty before
823  if (delete_buffer_current_min == delete_buffer_end)
824  refill_delete_buffer();
825 }
826 
827 template <class ConfigType>
829 {
830  unsigned_type capacity = N;
831  STXXL_MSG("pq::size()\t= " << size());
832  STXXL_MSG(" insert_heap\t= " << insert_heap.size() - 1 << "/" << capacity);
833  STXXL_MSG(" delete_buffer\t= " << (delete_buffer_end - delete_buffer_current_min) << "/" << delete_buffer_size);
834  for (int i = 0; i < num_int_groups; ++i) {
835  capacity *= IntKMAX;
836  STXXL_MSG(" grp " << i << " int" <<
837  " grpbuf=" << current_group_buffer_size(i) <<
838  " size=" << int_mergers[i].size() << "/" << capacity <<
839  " (" << (int)((double)int_mergers[i].size() * 100.0 / (double)capacity) << "%)" <<
840  " space=" << int_mergers[i].is_space_available());
841  }
842  for (int i = 0; i < num_ext_groups; ++i) {
843  capacity *= ExtKMAX;
844  STXXL_MSG(" grp " << i + num_int_groups << " ext" <<
845  " grpbuf=" << current_group_buffer_size(i + num_int_groups) <<
846  " size=" << ext_mergers[i]->size() << "/" << capacity <<
847  " (" << (int)((double)ext_mergers[i]->size() * 100.0 / (double)capacity) << "%)" <<
848  " space=" << ext_mergers[i]->is_space_available());
849  }
850  dump_params();
851 }
852 
853 template <class ConfigType>
855 {
856  STXXL_MSG("params: delete_buffer_size=" << delete_buffer_size << " N=" << N << " IntKMAX=" << IntKMAX << " num_int_groups=" << num_int_groups << " ExtKMAX=" << ExtKMAX << " num_ext_groups=" << num_ext_groups << " BlockSize=" << BlockSize);
857 }
858 
859 namespace priority_queue_local {
860 
862 {
863  enum { fits = false };
865 };
866 
867 struct dummy
868 {
869  enum { fits = false };
870  typedef dummy result;
871 };
872 
873 template <internal_size_type ElementSize, internal_size_type IntMem,
874  external_size_type MaxItems, internal_size_type BlockSize,
875  unsigned_type m_, bool stop = false>
876 struct find_B_m
877 {
878  typedef find_B_m<ElementSize, IntMem,
879  MaxItems, BlockSize, m_, stop> self_type;
880 
881  //! element size
882  static const internal_size_type element_size = ElementSize;
883  //! internal memory size of PQ
884  static const internal_size_type intmem = IntMem;
885  //! block size (iterates from 8 MiB downwards)
886  static const internal_size_type B = BlockSize;
887 
888  //! number of blocks that fit into internal memory (M)
889  static const internal_size_type k = IntMem / BlockSize;
890  //! number of blocks fitting into buffers of mergers (arity of both
891  //! mergers), increased from 1 to 2048 ?-tb
892  static const internal_size_type m = m_;
893  //! remaining blocks, (freely moving, not necessarily unused) ?-tb
894  static const int_type c = k - m_;
895 
896  // memory occupied by block must be at least 10 times larger than size of ext sequence
897 
898  //! calculated boolean whether the configuration fits into internal memory.
899  static const external_size_type fits =
900  // need some temporary constant-size internal blocks
901  (c > 10) &&
902  // satisfy items requirement
903  (((k - m) * m * (m * B / (ElementSize * 4 * 1024))) >= MaxItems) &&
904  // if we have two ext mergers their degree must be at least 64=m/2
905  ((MaxItems < ((k - m) * m / (2 * ElementSize)) * 1024) || m >= 128);
906 
907  static const unsigned_type step = 1;
908 
909  //! if not fits, recurse into configuration with +step more internal buffers
910  typedef typename find_B_m<ElementSize, IntMem, MaxItems, B,
911  m + step, fits || (m + step >= k)>::result candidate1;
912  //! if not fits, recurse into configuration with block size halved.
913  typedef typename find_B_m<ElementSize, IntMem, MaxItems, B / 2,
914  1, fits || candidate1::fits>::result candidate2;
915 
916  //! return a fitting configuration.
918 };
919 
920 // specialization for the case when no valid parameters are found
921 template <internal_size_type ElementSize, internal_size_type IntMem,
922  external_size_type MaxItems, bool stop>
923 struct find_B_m<ElementSize, IntMem, MaxItems, 2048, 1, stop>
924 {
925  enum { fits = false };
927 };
928 
929 // to speedup search
930 template <internal_size_type ElementSize, internal_size_type IntMem,
931  external_size_type MaxItems, unsigned_type BlockSize,
932  unsigned_type m_>
933 struct find_B_m<ElementSize, IntMem, MaxItems, BlockSize, m_, true>
934 {
935  enum { fits = false };
936  typedef dummy result;
937 };
938 
939 // start search
940 template <internal_size_type ElementSize, internal_size_type IntMem,
941  external_size_type MaxItems>
943 {
944  // start from block size (8*1024*1024) bytes
945  typedef typename find_B_m<ElementSize, IntMem,
946  MaxItems, (8* 1024* 1024), 1>::result result;
947 };
948 
950 {
952 };
953 
954 template <unsigned_type AI_, unsigned_type X_, unsigned_type CriticalSize>
955 struct compute_N
956 {
958 
959  static const unsigned_type X = X_;
960  static const unsigned_type AI = AI_;
961  static const unsigned_type N = X / (AI * AI); // two stage internal
962 
963  typedef typename IF<(N >= CriticalSize), Self, typename compute_N<AI / 2, X, CriticalSize>::result>::result result;
964 };
965 
966 template <unsigned_type X_, unsigned_type CriticalSize_>
967 struct compute_N<1, X_, CriticalSize_>
968 {
970 };
971 
972 } // namespace priority_queue_local
973 
974 //! \}
975 
976 //! \addtogroup stlcont
977 //! \{
978 
979 //! Priority queue type generator. \n
980 //! <b> Introduction </b> to priority queue container: see \ref tutorial_pqueue tutorial. \n
981 //! <b> Design and Internals </b> of priority queue container: see \ref design_pqueue.
982 //!
983 //! \tparam ValueType type of the contained objects (POD with no references to internal memory)
984 //!
985 //! \tparam CompareType the comparator type used to determine whether one element is
986 //! smaller than another element.
987 //!
988 //! \tparam IntMemory upper limit for internal memory consumption in bytes.
989 //!
990 //! \tparam MaxItems upper limit for number of elements contained in the priority queue (in 1024 units). <BR>
991 //! Example: if you are sure that priority queue contains no more than
992 //! one million elements in a time, then the right parameter is (1000000 / 1024) = 976.
993 //!
994 //! \tparam Tune tuning parameter for meta-program search. <BR>
995 //! Try to play with it if the code does not compile (larger than default
996 //! values might help). Code does not compile if no suitable internal
997 //! parameters were found for given IntMemory and MaxItems. It might also
998 //! happen that given IntMemory is too small for given MaxItems, try larger
999 //! values.
1000 template <class ValueType,
1001  class CompareType,
1002  internal_size_type IntMemory,
1003  external_size_type MaxItems,
1004  unsigned Tune = 6>
1006 {
1007 public:
1008  // actual calculation of B, m, k and element_size
1010  enum {
1011  B = settings::B,
1012  m = settings::m,
1013  X = B * (settings::k - m) / settings::element_size, // interpretation of result
1014  Buffer1Size = 32 // fixed
1015  };
1016  // derivation of N, AI, AE
1018  enum
1019  {
1022  AE = (m / 2 < 2) ? 2 : (m / 2) // at least 2
1023  };
1024 
1025  // Estimation of maximum internal memory consumption (in bytes)
1026  static const unsigned_type EConsumption = X * settings::element_size + settings::B * AE + ((MaxItems / X) / AE) * settings::B * 1024;
1027 
1028  /*
1029  unsigned BufferSize1_ = 32, // equalize procedure call overheads etc.
1030  unsigned N_ = 512, // bandwidth
1031  unsigned IntKMAX_ = 64, // maximal arity for internal mergers
1032  unsigned IntLevels_ = 4,
1033  unsigned BlockSize = (2*1024*1024),
1034  unsigned ExtKMAX_ = 64, // maximal arity for external mergers
1035  unsigned ExtLevels_ = 2,
1036  */
1038 };
1039 
1040 //! \}
1041 
1043 
1044 namespace std {
1045 
1046 template <class ConfigType>
1049 {
1050  a.swap(b);
1051 }
1052 
1053 } // namespace std
1054 
1055 #endif // !STXXL_CONTAINERS_PRIORITY_QUEUE_HEADER
1056 // vim: et:ts=4:sw=4
static const unsigned_type EConsumption
static const internal_size_type m
number of blocks fitting into buffers of mergers (arity of both mergers), increased from 1 to 2048 ...
find_B_m< ElementSize, IntMem, MaxItems,(8 *1024 *1024), 1 >::result result
bool empty() const
Returns true if queue has no elements.
IF< fits, self_type, typename IF< candidate1::fits, candidate1, candidate2 >::result >::result result
return a fitting configuration.
value_type * group_buffer_current_mins[total_num_groups]
void push(const value_type &obj)
Inserts x into the priority_queue.
unsigned_type make_space_available(unsigned_type level)
int_merger_type int_mergers[num_int_groups]
External merger, based on the loser tree data structure.
Definition: pq_ext_merger.h:38
priority_queue_local::find_settings< sizeof(ValueType), IntMemory, MaxItems >::result settings
priority_queue_local::ext_merger< block_type, comparator_type, ExtKMAX, alloc_strategy_type > ext_merger_type
unsigned long long int uint64
Definition: types.h:39
static const external_size_type fits
calculated boolean whether the configuration fits into internal memory.
Inverts the order of a comparison functor by swapping its arguments.
Definition: pq_helpers.h:187
typed_block< BlockSize, value_type > block_type
Type of the block used in disk-memory transfers.
Priority queue type generator. Introduction to priority queue container: see STXXL Priority Queue ...
priority_queue_local::internal_priority_queue< value_type, std::vector< value_type >, comparator_type > insert_heap_type
static const internal_size_type intmem
internal memory size of PQ
value_type group_buffers[total_num_groups][N+1]
#define STXXL_DEFAULT_ALLOC_STRATEGY
Definition: block_alloc.h:258
void merge3_iterator(InputIterator &source0, InputIterator &source1, InputIterator &source2, OutputIterator target, OutputIterator end, CompareType &cmp)
Definition: pq_mergers.h:71
stxxl::uint64 size_type
An unsigned integral type (64 bit).
void swap_1D_arrays(Type *a, Type *b, unsigned_type size)
Definition: utils.h:246
#define STXXL_DEPRECATED(x)
Definition: deprecated.h:30
unsigned_type mem_cons() const
Number of bytes consumed by the priority_queue from the internal memory not including pools (see the ...
static const internal_size_type element_size
element size
Implements dynamically resizable buffered writing pool.
Definition: write_pool.h:32
Config::alloc_strategy_type alloc_strategy_type
Implements dynamically resizable prefetching pool.
Definition: prefetch_pool.h:29
Block containing elements of fixed length.
Definition: typed_block.h:237
static const unsigned_type step
value_type * delete_buffer_current_min
void merge4_iterator(InputIterator &source0, InputIterator &source1, InputIterator &source2, InputIterator &source3, OutputIterator target, OutputIterator end, CompareType &cmp)
Definition: pq_mergers.h:138
find_B_m< ElementSize, IntMem, MaxItems, BlockSize, m_, stop > self_type
RandomAccessIterator3 multiway_merge_sentinels(RandomAccessIteratorPairIterator seqs_begin, RandomAccessIteratorPairIterator seqs_end, RandomAccessIterator3 target, DiffType length, Comparator comp)
Multi-way merging front-end.
Definition: parallel.h:195
choose_int_types< my_pointer_size >::int_type int_type
Definition: types.h:63
comparator_type cmp
size_type size() const
Returns number of elements contained.
unsigned_type internal_size_type
Definition: types.h:66
void swap(counting_ptr< A > &a1, counting_ptr< A > &a2)
swap enclosed object with another counting pointer (no reference counts need change) ...
Definition: counting_ptr.h:165
value_type delete_buffer[delete_buffer_size+1]
Container::value_type pop(Container &c)
Definition: utils.h:279
unsigned_type current_delete_buffer_size() const
void dump_params() const
Config::comparator_type comparator_type
Comparison object.
#define STXXL_VERBOSE_PQ(msg)
Definition: pq_helpers.h:71
static const internal_size_type k
number of blocks that fit into internal memory (M)
#define STXXL_BEGIN_NAMESPACE
Definition: namespace.h:16
Parameters_not_found_Try_to_change_the_Tune_parameter result
size_type refill_group_buffer(unsigned_type k)
const value_type & top() const
Returns &quot;largest&quot; element.
External priority queue data structure Introduction to priority queue container: see STXXL Priorit...
#define STXXL_VERBOSE1(x)
Definition: verbose.h:113
find_B_m< ElementSize, IntMem, MaxItems, B, m+step, fits||(m+step >=k)>::result candidate1
if not fits, recurse into configuration with +step more internal buffers
unsigned_type current_group_buffer_size(unsigned_type i) const
#define STXXL_THROW2(exception_type, location, error_message)
Throws exception_type with &quot;Error in [location] : [error_message]&quot;.
bool is_sorted(ForwardIterator first, ForwardIterator last, StrictWeakOrdering comp)
Definition: is_sorted.h:24
Similar to std::priority_queue, with the following differences:
Definition: pq_helpers.h:92
insert_heap_type insert_heap
void append_merger(Merger &another_merger, size_type segment_size)
Merge all items from another merger and insert the resulting external array into the merger...
#define STXXL_STATIC_ASSERT(x)
Definition: utils.h:48
#define STXXL_ERRMSG(x)
Definition: verbose.h:94
void dump_sizes() const
priority_queue_local::int_merger< value_type, comparator_type, IntKMAX > int_merger_type
static const int_type c
remaining blocks, (freely moving, not necessarily unused) ?-tb
uint64 external_size_type
Definition: types.h:67
choose_int_types< my_pointer_size >::unsigned_type unsigned_type
Definition: types.h:64
priority_queue< priority_queue_config< ValueType, CompareType, Buffer1Size, N, AI, 2, B, AE, 2 > > result
static const size_t sentinel
#define STXXL_MSG(x)
Definition: verbose.h:73
static const internal_size_type B
block size (iterates from 8 MiB downwards)
IF<(N >=CriticalSize), Self, typename compute_N< AI/2, X, CriticalSize >::result >::result result
value_type get_supremum() const
find_B_m< ElementSize, IntMem, MaxItems, B/2, 1, fits||candidate1::fits >::result candidate2
if not fits, recurse into configuration with block size halved.
unsigned_type num_active_groups
IF template metaprogramming statement.
Definition: tmeta.h:30
read_write_pool< block_type > pool_type
Config::value_type value_type
The type of object stored in the priority_queue.
ext_merger_type ** ext_mergers
value_type * delete_buffer_end
compute_N< AI_, X_, CriticalSize > Self
void merge2_iterator(InputIterator &source0, InputIterator &source1, OutputIterator target, OutputIterator end, CompareType &cmp)
Definition: pq_mergers.h:41
#define STXXL_END_NAMESPACE
Definition: namespace.h:17
priority_queue_local::compute_N<(1<< Tune), X, 4 *Buffer1Size >::result ComputeN
priority_queue(pool_type &pool_)
Constructs external priority queue object.