STXXL  1.4.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
priority_queue.h
Go to the documentation of this file.
1 /***************************************************************************
2  * include/stxxl/bits/containers/priority_queue.h
3  *
4  * Implements a data structure from "Peter Sanders. Fast Priority Queues for
5  * Cached Memory. ALENEX'99" for external memory.
6  *
7  * Part of the STXXL. See http://stxxl.sourceforge.net
8  *
9  * Copyright (C) 1999 Peter Sanders <[email protected]>
10  * Copyright (C) 2003, 2004, 2007 Roman Dementiev <[email protected]>
11  * Copyright (C) 2007-2009 Johannes Singler <[email protected]>
12  * Copyright (C) 2007-2010 Andreas Beckmann <[email protected]>
13  * Copyright (C) 2014 Timo Bingmann <[email protected]>
14  *
15  * Distributed under the Boost Software License, Version 1.0.
16  * (See accompanying file LICENSE_1_0.txt or copy at
17  * http://www.boost.org/LICENSE_1_0.txt)
18  **************************************************************************/
19 
20 #ifndef STXXL_CONTAINERS_PRIORITY_QUEUE_HEADER
21 #define STXXL_CONTAINERS_PRIORITY_QUEUE_HEADER
22 
27 
29 
30 /*
31  KNBufferSize1 = 32;
32  KNN = 512; // length of group 1 sequences
33  KNKMAX = 64; // maximal arity
34  LogKNKMAX = 6; // ceil(log KNKMAX)
35  KNLevels = 4; // overall capacity >= KNN*KNKMAX^KNLevels
36  */
37 
38 // internal memory consumption >= N_*(KMAX_^IntLevels_) + ext
39 
40 template <
41  class ValueType,
42  class CompareType,
43  unsigned BufferSize1_ = 32, // equalize procedure call overheads etc.
44  unsigned N_ = 512, // length of group 1 sequences
45  unsigned IntKMAX_ = 64, // maximal arity for internal mergers
46  unsigned IntLevels_ = 4, // number of internal groups
47  unsigned BlockSize_ = (2* 1024* 1024), // external block size
48  unsigned ExtKMAX_ = 64, // maximal arity for external mergers
49  unsigned ExtLevels_ = 2, // number of external groups
50  class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY
51  >
53 {
54  typedef ValueType value_type;
55  typedef CompareType comparator_type;
56  typedef AllocStr_ alloc_strategy_type;
57  enum
58  {
59  delete_buffer_size = BufferSize1_,
60  N = N_,
61  IntKMAX = IntKMAX_,
62  num_int_groups = IntLevels_,
63  num_ext_groups = ExtLevels_,
64  BlockSize = BlockSize_,
65  ExtKMAX = ExtKMAX_,
66  element_size = sizeof(ValueType)
67  };
68 };
69 
71 
72 namespace std {
73 
74 template <class BlockType,
75  class CompareType,
76  unsigned Arity,
77  class AllocStr>
80 {
81  a.swap(b);
82 }
83 template <class ValueType, class CompareType, unsigned KNKMAX>
86 {
87  a.swap(b);
88 }
89 
90 } // namespace std
91 
93 
94 //! External priority queue data structure \n
95 //! <b> Introduction </b> to priority queue container: see \ref tutorial_pqueue tutorial. \n
96 //! <b> Design and Internals </b> of priority queue container: see \ref design_pqueue.
97 template <class ConfigType>
98 class priority_queue : private noncopyable
99 {
100 public:
101  typedef ConfigType Config;
102  enum
103  {
104  delete_buffer_size = Config::delete_buffer_size,
105  N = Config::N,
106  IntKMAX = Config::IntKMAX,
107  num_int_groups = Config::num_int_groups,
108  num_ext_groups = Config::num_ext_groups,
109  total_num_groups = Config::num_int_groups + Config::num_ext_groups,
110  BlockSize = Config::BlockSize,
111  ExtKMAX = Config::ExtKMAX
112  };
113 
114  //! The type of object stored in the priority_queue.
115  typedef typename Config::value_type value_type;
116  //! Comparison object.
117  typedef typename Config::comparator_type comparator_type;
118  typedef typename Config::alloc_strategy_type alloc_strategy_type;
119  //! An unsigned integral type (64 bit).
121  //! Type of the block used in disk-memory transfers
124 
125 protected:
128 
130  value_type,
131  comparator_type,
132  IntKMAX> int_merger_type;
133 
135  block_type,
136  comparator_type,
137  ExtKMAX,
138  alloc_strategy_type> ext_merger_type;
139 
140  int_merger_type int_mergers[num_int_groups];
144 
145  // one delete buffer for each tree => group buffer
146  value_type group_buffers[total_num_groups][N + 1]; // tree->group_buffers->delete_buffer (extra space for sentinel)
147  value_type* group_buffer_current_mins[total_num_groups]; // group_buffer_current_mins[i] is current start of group_buffers[i], end is group_buffers[i] + N
148 
149  // overall delete buffer
150  value_type delete_buffer[delete_buffer_size + 1];
151  value_type* delete_buffer_current_min; // current start of delete_buffer
152  value_type* delete_buffer_end; // end of delete_buffer
153 
154  comparator_type cmp;
155 
156  // insert buffer
158 
159  // how many groups are active
161 
162  // total size not counting insert_heap and delete_buffer
164 
165 private:
166  void init();
167 
168  void refill_delete_buffer();
169  size_type refill_group_buffer(unsigned_type k);
170 
171  unsigned_type make_space_available(unsigned_type level);
172  void empty_insert_heap();
173 
174  value_type get_supremum() const { return cmp.min_value(); } //{ return group_buffers[0][KNN].key; }
175  unsigned_type current_delete_buffer_size() const { return delete_buffer_end - delete_buffer_current_min; }
176  unsigned_type current_group_buffer_size(unsigned_type i) const { return &(group_buffers[i][N]) - group_buffer_current_mins[i]; }
177 
178 public:
179  //! \name Constructors/Destructors
180  //! \{
181 
182  //! Constructs external priority queue object.
183  //! \param pool_ pool of blocks that will be used
184  //! for data writing and prefetching for the disk<->memory transfers
185  //! happening in the priority queue. Larger pool size
186  //! helps to speed up operations.
187  priority_queue(pool_type& pool_);
188 
189  //! Constructs external priority queue object.
190  //! \param p_pool_ pool of blocks that will be used
191  //! for data prefetching for the disk<->memory transfers
192  //! happening in the priority queue. Larger pool size
193  //! helps to speed up operations.
194  //! \param w_pool_ pool of blocks that will be used
195  //! for writing data for the memory<->disk transfers
196  //! happening in the priority queue. Larger pool size
197  //! helps to speed up operations.
200  );
201 
202  //! Constructs external priority queue object.
203  //! \param p_pool_mem memory (in bytes) for prefetch pool that will be used
204  //! for data prefetching for the disk<->memory transfers
205  //! happening in the priority queue. Larger pool size
206  //! helps to speed up operations.
207  //! \param w_pool_mem memory (in bytes) for buffered write pool that will be used
208  //! for writing data for the memory<->disk transfers
209  //! happening in the priority queue. Larger pool size
210  //! helps to speed up operations.
211  priority_queue(unsigned_type p_pool_mem, unsigned_type w_pool_mem);
212 
213  virtual ~priority_queue();
214 
215  //! \}
216 
217 #if 0
218 
219  //! swap this priority queue with another one.
220  //! Implementation correctness is questionable.
221  void swap(priority_queue& obj)
222  {
223  //swap_1D_arrays(int_mergers,obj.int_mergers,num_int_groups); // does not work in g++ 3.4.3 :( bug?
224  for (unsigned_type i = 0; i < num_int_groups; ++i)
225  std::swap(int_mergers[i], obj.int_mergers[i]);
226 
227  //std::swap(pool,obj.pool);
228  //std::swap(pool_owned, obj.pool_owned);
229  std::swap(ext_mergers, obj.ext_mergers);
230  for (unsigned_type i1 = 0; i1 < total_num_groups; ++i1)
231  for (unsigned_type i2 = 0; i2 < (N + 1); ++i2)
232  std::swap(group_buffers[i1][i2], obj.group_buffers[i1][i2]);
233 
234  STXXL_STATIC_ASSERT(false);
235  // Shoot yourself in the foot: group_buffer_current_mins contains pointers into group_buffers ...
236  // either recompute them or add/subtract (&this->group_buffers[0][0] - &obj->group_buffers[0][0])
237  swap_1D_arrays(group_buffer_current_mins, obj.group_buffer_current_mins, total_num_groups);
238  swap_1D_arrays(delete_buffer, obj.delete_buffer, delete_buffer_size + 1);
239  std::swap(delete_buffer_current_min, obj.delete_buffer_current_min);
240  std::swap(delete_buffer_end, obj.delete_buffer_end);
241  std::swap(cmp, obj.cmp);
242  std::swap(insert_heap, obj.insert_heap);
243  std::swap(num_active_groups, obj.num_active_groups);
244  std::swap(size_, obj.size_);
245  }
246 #endif
247 
248  //! \name Capacity
249  //! \{
250 
251  //! Returns number of elements contained.
252  //! \return number of elements contained
253  size_type size() const;
254 
255  //! Returns true if queue has no elements.
256  //! \return \b true if queue has no elements, \b false otherwise
257  bool empty() const { return (size() == 0); }
258 
259  //! \}
260 
261  //! \name Operators
262  //! \{
263 
264  //! Returns "largest" element.
265  //!
266  //! Returns a const reference to the element at the top of the
267  //! priority_queue. The element at the top is guaranteed to be the largest
268  //! element in the \b priority queue, as determined by the comparison
269  //! function \b comparator_type (the same as the second parameter of
270  //! PRIORITY_QUEUE_GENERATOR utility class). That is, for every other
271  //! element \b x in the priority_queue, \b comparator_type(Q.top(), x) is
272  //! false. Precondition: \c empty() is false.
273  const value_type & top() const;
274 
275  //! \}
276 
277  //! \name Modifiers
278  //! \{
279 
280  //! Removes the element at the top.
281  //!
282  //! Removes the element at the top of the priority_queue, that is, the
283  //! largest element in the \b priority_queue. Precondition: \c empty() is
284  //! \b false. Postcondition: \c size() will be decremented by 1.
285  void pop();
286 
287  //! Inserts x into the priority_queue.
288  //!
289  //! Inserts x into the priority_queue. Postcondition: \c size() will be
290  //! incremented by 1.
291  void push(const value_type& obj);
292 
293  //! \}
294 
295  //! \name Miscellaneous
296  //! \{
297 
298  //! Number of bytes consumed by the \b priority_queue from the internal
299  //! memory not including pools (see the constructor)
301  {
302  unsigned_type dynam_alloc_mem = 0;
303  //dynam_alloc_mem += w_pool.mem_cons();
304  //dynam_alloc_mem += p_pool.mem_cons();
305  for (int i = 0; i < num_int_groups; ++i)
306  dynam_alloc_mem += int_mergers[i].mem_cons();
307 
308  for (int i = 0; i < num_ext_groups; ++i)
309  dynam_alloc_mem += ext_mergers[i]->mem_cons();
310 
311  return (sizeof(*this) +
312  sizeof(ext_merger_type) * num_ext_groups +
313  dynam_alloc_mem);
314  }
315 
316  void dump_sizes() const;
317  void dump_params() const;
318 
319  //! \}
320 };
321 
322 template <class ConfigType>
323 inline typename priority_queue<ConfigType>::size_type
325 {
326  return size_ +
327  insert_heap.size() - 1 +
328  (delete_buffer_end - delete_buffer_current_min);
329 }
330 
331 template <class ConfigType>
332 inline const typename priority_queue<ConfigType>::value_type &
334 {
335  assert(!insert_heap.empty());
336 
337  const typename priority_queue<ConfigType>::value_type& t = insert_heap.top();
338  if (/*(!insert_heap.empty()) && */ cmp(*delete_buffer_current_min, t))
339  return t;
340  else
341  return *delete_buffer_current_min;
342 }
343 
344 template <class ConfigType>
346 {
347  //STXXL_VERBOSE1("priority_queue::pop()");
348  assert(!insert_heap.empty());
349 
350  if (/*(!insert_heap.empty()) && */ cmp(*delete_buffer_current_min, insert_heap.top()))
351  insert_heap.pop();
352  else
353  {
354  assert(delete_buffer_current_min < delete_buffer_end);
355  ++delete_buffer_current_min;
356  if (delete_buffer_current_min == delete_buffer_end)
357  refill_delete_buffer();
358  }
359 }
360 
361 template <class ConfigType>
362 inline void priority_queue<ConfigType>::push(const value_type& obj)
363 {
364  //STXXL_VERBOSE3("priority_queue::push("<< obj <<")");
365  assert(int_mergers->not_sentinel(obj));
366  if (insert_heap.size() == N + 1)
367  empty_insert_heap();
368 
369  assert(!insert_heap.empty());
370 
371  insert_heap.push(obj);
372 }
373 
374 ////////////////////////////////////////////////////////////////
375 
376 template <class ConfigType>
378  : pool(&pool_),
379  pool_owned(false),
380  delete_buffer_end(delete_buffer + delete_buffer_size),
381  insert_heap(N + 2),
382  num_active_groups(0), size_(0)
383 {
384  STXXL_VERBOSE_PQ("priority_queue(pool)");
385  init();
386 }
387 
388 // DEPRECATED
389 template <class ConfigType>
391  : pool(new pool_type(p_pool_, w_pool_)),
392  pool_owned(true),
393  delete_buffer_end(delete_buffer + delete_buffer_size),
394  insert_heap(N + 2),
395  num_active_groups(0), size_(0)
396 {
397  STXXL_VERBOSE_PQ("priority_queue(p_pool, w_pool)");
398  init();
399 }
400 
401 template <class ConfigType>
403  : pool(new pool_type(p_pool_mem / BlockSize, w_pool_mem / BlockSize)),
404  pool_owned(true),
405  delete_buffer_end(delete_buffer + delete_buffer_size),
406  insert_heap(N + 2),
407  num_active_groups(0), size_(0)
408 {
409  STXXL_VERBOSE_PQ("priority_queue(pool sizes)");
410  init();
411 }
412 
413 template <class ConfigType>
415 {
416  assert(!cmp(cmp.min_value(), cmp.min_value())); // verify strict weak ordering
417 
418  ext_mergers = new ext_merger_type*[num_ext_groups];
419  for (unsigned_type j = 0; j < num_ext_groups; ++j) {
420  ext_mergers[j] = new ext_merger_type;
421  ext_mergers[j]->set_pool(pool);
422  }
423 
424  value_type sentinel = cmp.min_value();
425  insert_heap.push(sentinel); // always keep the sentinel
426  delete_buffer[delete_buffer_size] = sentinel; // sentinel
427  delete_buffer_current_min = delete_buffer_end; // empty
428  for (unsigned_type i = 0; i < total_num_groups; i++)
429  {
430  group_buffers[i][N] = sentinel; // sentinel
431  group_buffer_current_mins[i] = &(group_buffers[i][N]); // empty
432  }
433 }
434 
435 template <class ConfigType>
437 {
438  STXXL_VERBOSE_PQ("~priority_queue()");
439  if (pool_owned)
440  delete pool;
441 
442  for (unsigned_type j = 0; j < num_ext_groups; ++j)
443  delete ext_mergers[j];
444  delete[] ext_mergers;
445 }
446 
447 //--------------------- Buffer refilling -------------------------------
448 
449 // refill group_buffers[j] and return number of elements found
450 template <class ConfigType>
453 {
454  STXXL_VERBOSE_PQ("refill_group_buffer(" << group << ")");
455 
456  value_type* target;
457  size_type length;
458  size_type group_size = (group < num_int_groups) ?
459  int_mergers[group].size() :
460  ext_mergers[group - num_int_groups]->size(); // elements left in segments
461  unsigned_type left_elements = group_buffers[group] + N - group_buffer_current_mins[group]; //elements left in target buffer
462  if (group_size + left_elements >= size_type(N))
463  { // buffer will be filled completely
464  target = group_buffers[group];
465  length = N - left_elements;
466  }
467  else
468  {
469  target = group_buffers[group] + N - group_size - left_elements;
470  length = group_size;
471  }
472 
473  if (length > 0)
474  {
475  // shift remaininig elements to front
476  memmove(target, group_buffer_current_mins[group], left_elements * sizeof(value_type));
477  group_buffer_current_mins[group] = target;
478 
479  // fill remaining space from group
480  if (group < num_int_groups)
481  int_mergers[group].multi_merge(target + left_elements,
482  (unsigned_type)length);
483  else
484  ext_mergers[group - num_int_groups]->multi_merge(
485  target + left_elements,
486  target + left_elements + length);
487  }
488 
489  //STXXL_MSG(length + left_elements);
490  //std::copy(target,target + length + left_elements,std::ostream_iterator<value_type>(std::cout, "\n"));
491 #if STXXL_CHECK_ORDER_IN_SORTS
493  if (!stxxl::is_sorted(group_buffer_current_mins[group], group_buffers[group] + N, inv_cmp))
494  {
495  STXXL_VERBOSE_PQ("refill_grp... length: " << length << " left_elements: " << left_elements);
496  for (value_type* v = group_buffer_current_mins[group] + 1; v < group_buffer_current_mins[group] + left_elements; ++v)
497  {
498  if (inv_cmp(*v, *(v - 1)))
499  {
500  STXXL_MSG("Error in buffer " << group << " at position " << (v - group_buffer_current_mins[group] - 1) << "/" << (v - group_buffer_current_mins[group]) << " " << *(v - 2) << " " << *(v - 1) << " " << *v << " " << *(v + 1));
501  }
502  }
503  assert(false);
504  }
505 #endif
506 
507  return length + left_elements;
508 }
509 
510 template <class ConfigType>
512 {
513  STXXL_VERBOSE_PQ("refill_delete_buffer()");
514 
515  size_type total_group_size = 0;
516  //num_active_groups is <= 4
517  for (unsigned_type i = num_active_groups; i > 0; )
518  {
519  --i;
520  if ((group_buffers[i] + N) - group_buffer_current_mins[i] < delete_buffer_size)
521  {
522  size_type length = refill_group_buffer(i);
523  // max active level dry now?
524  if (length == 0 && unsigned(i) == num_active_groups - 1)
525  --num_active_groups;
526 
527  total_group_size += length;
528  }
529  else
530  total_group_size += delete_buffer_size; // actually only a sufficient lower bound
531  }
532 
533  size_type length;
534  if (total_group_size >= delete_buffer_size) // buffer can be filled completely
535  {
536  length = delete_buffer_size; // amount to be copied
537  size_ -= size_type(delete_buffer_size); // amount left in group_buffers
538  }
539  else
540  {
541  length = total_group_size;
542  assert(size_ == length); // trees and group_buffers get empty
543  size_ = 0;
544  }
545 
547 
548  // now call simplified refill routines
549  // which can make the assumption that
550  // they find all they are asked in the buffers
551  delete_buffer_current_min = delete_buffer_end - length;
552  STXXL_VERBOSE_PQ("refill_del... Active groups = " << num_active_groups);
553  switch (num_active_groups)
554  {
555  case 0:
556  break;
557  case 1:
558  std::copy(group_buffer_current_mins[0], group_buffer_current_mins[0] + length, delete_buffer_current_min);
559  group_buffer_current_mins[0] += length;
560  break;
561  case 2:
562 #if STXXL_PARALLEL && STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER
563  {
564  std::pair<value_type*, value_type*> seqs[2] =
565  {
566  std::make_pair(group_buffer_current_mins[0], group_buffers[0] + N),
567  std::make_pair(group_buffer_current_mins[1], group_buffers[1] + N)
568  };
569  parallel::multiway_merge_sentinel(seqs, seqs + 2, delete_buffer_current_min, inv_cmp, length); //sequence iterators are progressed appropriately
570 
571  group_buffer_current_mins[0] = seqs[0].first;
572  group_buffer_current_mins[1] = seqs[1].first;
573  }
574 #else
576  group_buffer_current_mins[0],
577  group_buffer_current_mins[1], delete_buffer_current_min, length, cmp);
578 #endif
579  break;
580  case 3:
581 #if STXXL_PARALLEL && STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER
582  {
583  std::pair<value_type*, value_type*> seqs[3] =
584  {
585  std::make_pair(group_buffer_current_mins[0], group_buffers[0] + N),
586  std::make_pair(group_buffer_current_mins[1], group_buffers[1] + N),
587  std::make_pair(group_buffer_current_mins[2], group_buffers[2] + N)
588  };
589  parallel::multiway_merge_sentinel(seqs, seqs + 3, delete_buffer_current_min, inv_cmp, length); //sequence iterators are progressed appropriately
590 
591  group_buffer_current_mins[0] = seqs[0].first;
592  group_buffer_current_mins[1] = seqs[1].first;
593  group_buffer_current_mins[2] = seqs[2].first;
594  }
595 #else
597  group_buffer_current_mins[0],
598  group_buffer_current_mins[1],
599  group_buffer_current_mins[2], delete_buffer_current_min, length, cmp);
600 #endif
601  break;
602  case 4:
603 #if STXXL_PARALLEL && STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER
604  {
605  std::pair<value_type*, value_type*> seqs[4] =
606  {
607  std::make_pair(group_buffer_current_mins[0], group_buffers[0] + N),
608  std::make_pair(group_buffer_current_mins[1], group_buffers[1] + N),
609  std::make_pair(group_buffer_current_mins[2], group_buffers[2] + N),
610  std::make_pair(group_buffer_current_mins[3], group_buffers[3] + N)
611  };
612  parallel::multiway_merge_sentinel(seqs, seqs + 4, delete_buffer_current_min, inv_cmp, length); //sequence iterators are progressed appropriately
613 
614  group_buffer_current_mins[0] = seqs[0].first;
615  group_buffer_current_mins[1] = seqs[1].first;
616  group_buffer_current_mins[2] = seqs[2].first;
617  group_buffer_current_mins[3] = seqs[3].first;
618  }
619 #else
621  group_buffer_current_mins[0],
622  group_buffer_current_mins[1],
623  group_buffer_current_mins[2],
624  group_buffer_current_mins[3], delete_buffer_current_min, length, cmp); //side effect free
625 #endif
626  break;
627  default:
628  STXXL_THROW2(std::runtime_error, "priority_queue<...>::refill_delete_buffer()",
629  "Overflow! The number of buffers on 2nd level in stxxl::priority_queue is currently limited to 4");
630  }
631 
632 #if STXXL_CHECK_ORDER_IN_SORTS
633  if (!stxxl::is_sorted(delete_buffer_current_min, delete_buffer_end, inv_cmp))
634  {
635  for (value_type* v = delete_buffer_current_min + 1; v < delete_buffer_end; ++v)
636  {
637  if (inv_cmp(*v, *(v - 1)))
638  {
639  STXXL_MSG("Error at position " << (v - delete_buffer_current_min - 1) << "/" << (v - delete_buffer_current_min) << " " << *(v - 1) << " " << *v);
640  }
641  }
642  assert(false);
643  }
644 #endif
645  //std::copy(delete_buffer_current_min,delete_buffer_current_min + length,std::ostream_iterator<value_type>(std::cout, "\n"));
646 }
647 
648 //--------------------------------------------------------------------
649 
650 // check if space is available on level k and
651 // empty this level if necessary leading to a recursive call.
652 // return the level where space was finally available
653 template <class ConfigType>
655 {
656  STXXL_VERBOSE_PQ("make_space_available(" << level << ")");
657  unsigned_type finalLevel;
658  assert(level < total_num_groups);
659  assert(level <= num_active_groups);
660 
661  if (level == num_active_groups)
662  ++num_active_groups;
663 
664  const bool spaceIsAvailable_ =
665  (level < num_int_groups) ? int_mergers[level].is_space_available()
666  : (ext_mergers[level - num_int_groups]->is_space_available());
667 
668  if (spaceIsAvailable_)
669  {
670  finalLevel = level;
671  }
672  else if (level == total_num_groups - 1)
673  {
674  size_type capacity = N;
675  for (int i = 0; i < num_int_groups; ++i)
676  capacity *= IntKMAX;
677  for (int i = 0; i < num_ext_groups; ++i)
678  capacity *= ExtKMAX;
679  STXXL_ERRMSG("priority_queue OVERFLOW - all groups full, size=" << size() <<
680  ", capacity(last externel group (" << num_int_groups + num_ext_groups - 1 << "))=" << capacity);
681  dump_sizes();
682 
683  unsigned_type extLevel = level - num_int_groups;
684  const size_type segmentSize = ext_mergers[extLevel]->size();
685  STXXL_VERBOSE1("Inserting segment into last level external: " << level << " " << segmentSize);
686  ext_merger_type* overflow_merger = new ext_merger_type;
687  overflow_merger->set_pool(pool);
688  overflow_merger->insert_segment(*ext_mergers[extLevel], segmentSize);
689  std::swap(ext_mergers[extLevel], overflow_merger);
690  delete overflow_merger;
691  finalLevel = level;
692  }
693  else
694  {
695  finalLevel = make_space_available(level + 1);
696 
697  if (level < num_int_groups - 1) // from internal to internal tree
698  {
699  unsigned_type segmentSize = int_mergers[level].size();
700  value_type* newSegment = new value_type[segmentSize + 1];
701  int_mergers[level].multi_merge(newSegment, segmentSize); // empty this level
702 
703  newSegment[segmentSize] = delete_buffer[delete_buffer_size]; // sentinel
704  // for queues where size << #inserts
705  // it might make sense to stay in this level if
706  // segmentSize < alpha * KNN * k^level for some alpha < 1
707  int_mergers[level + 1].insert_segment(newSegment, segmentSize);
708  }
709  else
710  {
711  if (level == num_int_groups - 1) // from internal to external tree
712  {
713  const unsigned_type segmentSize = int_mergers[num_int_groups - 1].size();
714  STXXL_VERBOSE_PQ("make_space... Inserting segment into first level external: " << level << " " << segmentSize);
715  ext_mergers[0]->insert_segment(int_mergers[num_int_groups - 1], segmentSize);
716  }
717  else // from external to external tree
718  {
719  const size_type segmentSize = ext_mergers[level - num_int_groups]->size();
720  STXXL_VERBOSE_PQ("make_space... Inserting segment into second level external: " << level << " " << segmentSize);
721  ext_mergers[level - num_int_groups + 1]->insert_segment(*ext_mergers[level - num_int_groups], segmentSize);
722  }
723  }
724  }
725  return finalLevel;
726 }
727 
728 // empty the insert heap into the main data structure
729 template <class ConfigType>
731 {
732  STXXL_VERBOSE_PQ("empty_insert_heap()");
733  assert(insert_heap.size() == (N + 1));
734 
735  const value_type sup = get_supremum();
736 
737  // build new segment
738  value_type* newSegment = new value_type[N + 1];
739  value_type* newPos = newSegment;
740 
741  // put the new data there for now
742  //insert_heap.sortTo(newSegment);
743  value_type* SortTo = newSegment;
744 
745  insert_heap.sort_to(SortTo);
746 
747  SortTo = newSegment + N;
748  insert_heap.clear();
749  insert_heap.push(*SortTo);
750 
751  assert(insert_heap.size() == 1);
752 
753  newSegment[N] = sup; // sentinel
754 
755  // copy the delete_buffer and group_buffers[0] to temporary storage
756  // (the temporary can be eliminated using some dirty tricks)
757  const unsigned_type tempSize = N + delete_buffer_size;
758  value_type temp[tempSize + 1];
759  unsigned_type sz1 = current_delete_buffer_size();
760  unsigned_type sz2 = current_group_buffer_size(0);
761  value_type* pos = temp + tempSize - sz1 - sz2;
762  std::copy(delete_buffer_current_min, delete_buffer_current_min + sz1, pos);
763  std::copy(group_buffer_current_mins[0], group_buffer_current_mins[0] + sz2, pos + sz1);
764  temp[tempSize] = sup; // sentinel
765 
766  // refill delete_buffer
767  // (using more complicated code it could be made somewhat fuller
768  // in certain circumstances)
769  priority_queue_local::merge_iterator(pos, newPos, delete_buffer_current_min, sz1, cmp);
770 
771  // refill group_buffers[0]
772  // (as above we might want to take the opportunity
773  // to make group_buffers[0] fuller)
774  priority_queue_local::merge_iterator(pos, newPos, group_buffer_current_mins[0], sz2, cmp);
775 
776  // merge the rest to the new segment
777  // note that merge exactly trips into the footsteps
778  // of itself
779  priority_queue_local::merge_iterator(pos, newPos, newSegment, (unsigned_type)N, cmp);
780 
781  // and insert it
782  unsigned_type freeLevel = make_space_available(0);
783  assert(freeLevel == 0 || int_mergers[0].size() == 0);
784  int_mergers[0].insert_segment(newSegment, N);
785 
786  // get rid of invalid level 2 buffers
787  // by inserting them into tree 0 (which is almost empty in this case)
788  if (freeLevel > 0)
789  {
790  for (int_type i = freeLevel; i >= 0; i--)
791  {
792  // reverse order not needed
793  // but would allow immediate refill
794 
795  newSegment = new value_type[current_group_buffer_size(i) + 1]; // with sentinel
796  std::copy(group_buffer_current_mins[i], group_buffer_current_mins[i] + current_group_buffer_size(i) + 1, newSegment);
797  int_mergers[0].insert_segment(newSegment, current_group_buffer_size(i));
798  group_buffer_current_mins[i] = group_buffers[i] + N; // empty
799  }
800  }
801 
802  // update size
803  size_ += size_type(N);
804 
805  // special case if the tree was empty before
806  if (delete_buffer_current_min == delete_buffer_end)
807  refill_delete_buffer();
808 }
809 
810 template <class ConfigType>
812 {
813  unsigned_type capacity = N;
814  STXXL_MSG("pq::size()\t= " << size());
815  STXXL_MSG(" insert_heap\t= " << insert_heap.size() - 1 << "/" << capacity);
816  STXXL_MSG(" delete_buffer\t= " << (delete_buffer_end - delete_buffer_current_min) << "/" << delete_buffer_size);
817  for (int i = 0; i < num_int_groups; ++i) {
818  capacity *= IntKMAX;
819  STXXL_MSG(" grp " << i << " int" <<
820  " grpbuf=" << current_group_buffer_size(i) <<
821  " size=" << int_mergers[i].size() << "/" << capacity <<
822  " (" << (int)((double)int_mergers[i].size() * 100.0 / (double)capacity) << "%)" <<
823  " space=" << int_mergers[i].is_space_available());
824  }
825  for (int i = 0; i < num_ext_groups; ++i) {
826  capacity *= ExtKMAX;
827  STXXL_MSG(" grp " << i + num_int_groups << " ext" <<
828  " grpbuf=" << current_group_buffer_size(i + num_int_groups) <<
829  " size=" << ext_mergers[i]->size() << "/" << capacity <<
830  " (" << (int)((double)ext_mergers[i]->size() * 100.0 / (double)capacity) << "%)" <<
831  " space=" << ext_mergers[i]->is_space_available());
832  }
833  dump_params();
834 }
835 
836 template <class ConfigType>
838 {
839  STXXL_MSG("params: delete_buffer_size=" << delete_buffer_size << " N=" << N << " IntKMAX=" << IntKMAX << " num_int_groups=" << num_int_groups << " ExtKMAX=" << ExtKMAX << " num_ext_groups=" << num_ext_groups << " BlockSize=" << BlockSize);
840 }
841 
842 namespace priority_queue_local {
843 
845 {
846  enum { fits = false };
848 };
849 
850 struct dummy
851 {
852  enum { fits = false };
853  typedef dummy result;
854 };
855 
856 template <internal_size_type ElementSize, internal_size_type IntMem,
857  external_size_type MaxItems, internal_size_type BlockSize,
858  unsigned_type m_, bool stop = false>
859 struct find_B_m
860 {
861  typedef find_B_m<ElementSize, IntMem,
862  MaxItems, BlockSize, m_, stop> self_type;
863 
864  //! element size
865  static const internal_size_type element_size = ElementSize;
866  //! internal memory size of PQ
867  static const internal_size_type intmem = IntMem;
868  //! block size (iterates from 8 MiB downwards)
869  static const internal_size_type B = BlockSize;
870 
871  //! number of blocks that fit into internal memory (M)
872  static const internal_size_type k = IntMem / BlockSize;
873  //! number of blocks fitting into buffers of mergers (arity of both
874  //! mergers), increased from 1 to 2048 ?-tb
875  static const internal_size_type m = m_;
876  //! remaining blocks, (freely moving, not necessarily unused) ?-tb
877  static const int_type c = k - m_;
878 
879  // memory occupied by block must be at least 10 times larger than size of ext sequence
880 
881  //! calculated boolean whether the configuration fits into internal memory.
882  static const external_size_type fits =
883  // need some temporary constant-size internal blocks
884  (c > 10) &&
885  // satisfy items requirement
886  (((k - m) * m * (m * B / (ElementSize * 4 * 1024))) >= MaxItems) &&
887  // if we have two ext mergers their degree must be at least 64=m/2
888  ((MaxItems < ((k - m) * m / (2 * ElementSize)) * 1024) || m >= 128);
889 
890  static const unsigned_type step = 1;
891 
892  //! if not fits, recurse into configuration with +step more internal buffers
893  typedef typename find_B_m<ElementSize, IntMem, MaxItems, B,
894  m + step, fits || (m + step >= k)>::result candidate1;
895  //! if not fits, recurse into configuration with block size halved.
896  typedef typename find_B_m<ElementSize, IntMem, MaxItems, B / 2,
897  1, fits || candidate1::fits>::result candidate2;
898 
899  //! return a fitting configuration.
901 };
902 
903 // specialization for the case when no valid parameters are found
904 template <internal_size_type ElementSize, internal_size_type IntMem,
905  external_size_type MaxItems, bool stop>
906 struct find_B_m<ElementSize, IntMem, MaxItems, 2048, 1, stop>
907 {
908  enum { fits = false };
910 };
911 
912 // to speedup search
913 template <internal_size_type ElementSize, internal_size_type IntMem,
914  external_size_type MaxItems, unsigned_type BlockSize,
915  unsigned_type m_>
916 struct find_B_m<ElementSize, IntMem, MaxItems, BlockSize, m_, true>
917 {
918  enum { fits = false };
919  typedef dummy result;
920 };
921 
922 // start search
923 template <internal_size_type ElementSize, internal_size_type IntMem,
924  external_size_type MaxItems>
926 {
927  // start from block size (8*1024*1024) bytes
928  typedef typename find_B_m<ElementSize, IntMem,
929  MaxItems, (8* 1024* 1024), 1>::result result;
930 };
931 
933 {
935 };
936 
937 template <unsigned_type AI_, unsigned_type X_, unsigned_type CriticalSize>
938 struct compute_N
939 {
941 
942  static const unsigned_type X = X_;
943  static const unsigned_type AI = AI_;
944  static const unsigned_type N = X / (AI * AI); // two stage internal
945 
946  typedef typename IF<(N >= CriticalSize), Self, typename compute_N<AI / 2, X, CriticalSize>::result>::result result;
947 };
948 
949 template <unsigned_type X_, unsigned_type CriticalSize_>
950 struct compute_N<1, X_, CriticalSize_>
951 {
953 };
954 
955 } // namespace priority_queue_local
956 
957 //! \}
958 
959 //! \addtogroup stlcont
960 //! \{
961 
962 //! Priority queue type generator. \n
963 //! <b> Introduction </b> to priority queue container: see \ref tutorial_pqueue tutorial. \n
964 //! <b> Design and Internals </b> of priority queue container: see \ref design_pqueue.
965 //!
966 //! \tparam ValueType type of the contained objects (POD with no references to internal memory)
967 //!
968 //! \tparam CompareType the comparator type used to determine whether one element is
969 //! smaller than another element.
970 //!
971 //! \tparam IntMemory upper limit for internal memory consumption in bytes.
972 //!
973 //! \tparam MaxItems upper limit for number of elements contained in the priority queue (in 1024 units). <BR>
974 //! Example: if you are sure that priority queue contains no more than
975 //! one million elements in a time, then the right parameter is (1000000 / 1024) = 976.
976 //!
977 //! \tparam Tune tuning parameter for meta-program search. <BR>
978 //! Try to play with it if the code does not compile (larger than default
979 //! values might help). Code does not compile if no suitable internal
980 //! parameters were found for given IntMemory and MaxItems. It might also
981 //! happen that given IntMemory is too small for given MaxItems, try larger
982 //! values.
983 template <class ValueType,
984  class CompareType,
985  internal_size_type IntMemory,
986  external_size_type MaxItems,
987  unsigned Tune = 6>
989 {
990 public:
991  // actual calculation of B, m, k and element_size
993  enum {
994  B = settings::B,
995  m = settings::m,
996  X = B * (settings::k - m) / settings::element_size, // interpretation of result
997  Buffer1Size = 32 // fixed
998  };
999  // derivation of N, AI, AE
1001  enum
1002  {
1005  AE = (m / 2 < 2) ? 2 : (m / 2) // at least 2
1006  };
1007 
1008  // Estimation of maximum internal memory consumption (in bytes)
1009  static const unsigned_type EConsumption = X * settings::element_size + settings::B * AE + ((MaxItems / X) / AE) * settings::B * 1024;
1010 
1011  /*
1012  unsigned BufferSize1_ = 32, // equalize procedure call overheads etc.
1013  unsigned N_ = 512, // bandwidth
1014  unsigned IntKMAX_ = 64, // maximal arity for internal mergers
1015  unsigned IntLevels_ = 4,
1016  unsigned BlockSize = (2*1024*1024),
1017  unsigned ExtKMAX_ = 64, // maximal arity for external mergers
1018  unsigned ExtLevels_ = 2,
1019  */
1021 };
1022 
1023 //! \}
1024 
1026 
1027 namespace std {
1028 
1029 template <class ConfigType>
1032 {
1033  a.swap(b);
1034 }
1035 
1036 } // namespace std
1037 
1038 #endif // !STXXL_CONTAINERS_PRIORITY_QUEUE_HEADER
1039 // vim: et:ts=4:sw=4
static const unsigned_type EConsumption
static const internal_size_type m
number of blocks fitting into buffers of mergers (arity of both mergers), increased from 1 to 2048 ...
find_B_m< ElementSize, IntMem, MaxItems,(8 *1024 *1024), 1 >::result result
bool empty() const
Returns true if queue has no elements.
IF< fits, self_type, typename IF< candidate1::fits, candidate1, candidate2 >::result >::result result
return a fitting configuration.
bool is_sorted(ForwardIterator first, ForwardIterator last)
Definition: is_sorted.h:53
value_type * group_buffer_current_mins[total_num_groups]
void push(const value_type &obj)
Inserts x into the priority_queue.
unsigned_type make_space_available(unsigned_type level)
int_merger_type int_mergers[num_int_groups]
External merger, based on the loser tree data structure.
priority_queue_local::find_settings< sizeof(ValueType), IntMemory, MaxItems >::result settings
priority_queue_local::ext_merger< block_type, comparator_type, ExtKMAX, alloc_strategy_type > ext_merger_type
unsigned long long int uint64
Definition: types.h:39
static const external_size_type fits
calculated boolean whether the configuration fits into internal memory.
Inverts the order of a comparison functor by swapping its arguments.
Definition: pq_helpers.h:187
typed_block< BlockSize, value_type > block_type
Type of the block used in disk-memory transfers.
Priority queue type generator. Introduction to priority queue container: see STXXL Priority Queue ...
priority_queue_local::internal_priority_queue< value_type, std::vector< value_type >, comparator_type > insert_heap_type
static const internal_size_type intmem
internal memory size of PQ
value_type group_buffers[total_num_groups][N+1]
#define STXXL_DEFAULT_ALLOC_STRATEGY
Definition: block_alloc.h:258
stxxl::uint64 size_type
An unsigned integral type (64 bit).
void merge3_iterator(InputIterator &source0, InputIterator &source1, InputIterator &source2, OutputIterator target, SizeType length, CompareType &cmp)
Definition: pq_mergers.h:75
void insert_segment(Merger &another_merger, size_type segment_size)
void swap_1D_arrays(Type *a, Type *b, unsigned_type size)
Definition: utils.h:246
#define STXXL_DEPRECATED(x)
Definition: deprecated.h:30
Loser tree from Knuth, &quot;Sorting and Searching&quot;, Section 5.4.1.
Definition: pq_losertree.h:38
unsigned_type mem_cons() const
Number of bytes consumed by the priority_queue from the internal memory not including pools (see the ...
static const internal_size_type element_size
element size
Implements dynamically resizable buffered writing pool.
Definition: write_pool.h:32
Config::alloc_strategy_type alloc_strategy_type
Implements dynamically resizable prefetching pool.
Definition: prefetch_pool.h:29
Block containing elements of fixed length.
Definition: typed_block.h:237
static const unsigned_type step
value_type * delete_buffer_current_min
find_B_m< ElementSize, IntMem, MaxItems, BlockSize, m_, stop > self_type
Implements dynamically resizable buffered writing and prefetched reading pool.
choose_int_types< my_pointer_size >::int_type int_type
Definition: types.h:63
comparator_type cmp
size_type size() const
Returns number of elements contained.
unsigned_type internal_size_type
Definition: types.h:66
void swap(counting_ptr< A > &a1, counting_ptr< A > &a2)
swap enclosed object with another counting pointer (no reference counts need change) ...
Definition: counting_ptr.h:165
value_type delete_buffer[delete_buffer_size+1]
Container::value_type pop(Container &c)
Definition: utils.h:269
unsigned_type current_delete_buffer_size() const
void dump_params() const
Config::comparator_type comparator_type
Comparison object.
#define STXXL_VERBOSE_PQ(msg)
Definition: pq_helpers.h:71
static const internal_size_type k
number of blocks that fit into internal memory (M)
#define STXXL_BEGIN_NAMESPACE
Definition: namespace.h:16
Parameters_not_found_Try_to_change_the_Tune_parameter result
size_type refill_group_buffer(unsigned_type k)
const value_type & top() const
Returns &quot;largest&quot; element.
External priority queue data structure Introduction to priority queue container: see STXXL Priorit...
#define STXXL_VERBOSE1(x)
Definition: verbose.h:99
find_B_m< ElementSize, IntMem, MaxItems, B, m+step, fits||(m+step >=k)>::result candidate1
if not fits, recurse into configuration with +step more internal buffers
unsigned_type current_group_buffer_size(unsigned_type i) const
#define STXXL_THROW2(exception_type, location, error_message)
Throws exception_type with &quot;Error in [location] : [error_message]&quot;.
Similar to std::priority_queue, with the following differences:
Definition: pq_helpers.h:92
insert_heap_type insert_heap
#define STXXL_STATIC_ASSERT(x)
Definition: utils.h:48
#define STXXL_ERRMSG(x)
Definition: verbose.h:80
void dump_sizes() const
static const int_type c
remaining blocks, (freely moving, not necessarily unused) ?-tb
uint64 external_size_type
Definition: types.h:67
choose_int_types< my_pointer_size >::unsigned_type unsigned_type
Definition: types.h:64
priority_queue< priority_queue_config< ValueType, CompareType, Buffer1Size, N, AI, 2, B, AE, 2 > > result
#define STXXL_MSG(x)
Definition: verbose.h:73
static const internal_size_type B
block size (iterates from 8 MiB downwards)
priority_queue_local::loser_tree< value_type, comparator_type, IntKMAX > int_merger_type
IF<(N >=CriticalSize), Self, typename compute_N< AI/2, X, CriticalSize >::result >::result result
value_type get_supremum() const
find_B_m< ElementSize, IntMem, MaxItems, B/2, 1, fits||candidate1::fits >::result candidate2
if not fits, recurse into configuration with block size halved.
unsigned_type num_active_groups
IF template metaprogramming statement.
Definition: tmeta.h:30
read_write_pool< block_type > pool_type
Config::value_type value_type
The type of object stored in the priority_queue.
void merge_iterator(InputIterator &source0, InputIterator &source1, OutputIterator target, SizeType length, CompareType &cmp)
Definition: pq_mergers.h:42
ext_merger_type ** ext_mergers
value_type * delete_buffer_end
compute_N< AI_, X_, CriticalSize > Self
#define STXXL_END_NAMESPACE
Definition: namespace.h:17
priority_queue_local::compute_N<(1<< Tune), X, 4 *Buffer1Size >::result ComputeN
priority_queue(pool_type &pool_)
Constructs external priority queue object.
void merge4_iterator(InputIterator &source0, InputIterator &source1, InputIterator &source2, InputIterator &source3, OutputIterator target, SizeType length, CompareType &cmp)
Definition: pq_mergers.h:143