Stxxl  1.3.2
priority_queue.h
1 /***************************************************************************
2  * include/stxxl/bits/containers/priority_queue.h
3  *
4  * Part of the STXXL. See http://stxxl.sourceforge.net
5  *
6  * Copyright (C) 1999 Peter Sanders <[email protected]>
7  * Copyright (C) 2003, 2004, 2007 Roman Dementiev <[email protected]>
8  * Copyright (C) 2007-2009 Johannes Singler <[email protected]>
9  * Copyright (C) 2007-2010 Andreas Beckmann <[email protected]>
10  *
11  * Distributed under the Boost Software License, Version 1.0.
12  * (See accompanying file LICENSE_1_0.txt or copy at
13  * http://www.boost.org/LICENSE_1_0.txt)
14  **************************************************************************/
15 
16 #ifndef STXXL_PRIORITY_QUEUE_HEADER
17 #define STXXL_PRIORITY_QUEUE_HEADER
18 
19 #include <vector>
20 
21 #include <stxxl/bits/deprecated.h>
22 #include <stxxl/bits/mng/typed_block.h>
23 #include <stxxl/bits/mng/block_alloc.h>
24 #include <stxxl/bits/mng/read_write_pool.h>
25 #include <stxxl/bits/mng/prefetch_pool.h>
26 #include <stxxl/bits/mng/write_pool.h>
27 #include <stxxl/bits/common/tmeta.h>
28 #include <stxxl/bits/algo/sort_base.h>
29 #include <stxxl/bits/parallel.h>
30 #include <stxxl/bits/common/is_sorted.h>
31 
32 #if STXXL_PARALLEL
33 
34 #if defined(STXXL_PARALLEL_MODE) && ((__GNUC__ * 10000 + __GNUC_MINOR__ * 100) < 40400)
35 #undef STXXL_PARALLEL_PQ_MULTIWAY_MERGE_INTERNAL
36 #undef STXXL_PARALLEL_PQ_MULTIWAY_MERGE_EXTERNAL
37 #undef STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER
38 #define STXXL_PARALLEL_PQ_MULTIWAY_MERGE_INTERNAL 0
39 #define STXXL_PARALLEL_PQ_MULTIWAY_MERGE_EXTERNAL 0
40 #define STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER 0
41 #endif
42 
43 // enable/disable parallel merging for certain cases, for performance tuning
44 #ifndef STXXL_PARALLEL_PQ_MULTIWAY_MERGE_INTERNAL
45 #define STXXL_PARALLEL_PQ_MULTIWAY_MERGE_INTERNAL 1
46 #endif
47 #ifndef STXXL_PARALLEL_PQ_MULTIWAY_MERGE_EXTERNAL
48 #define STXXL_PARALLEL_PQ_MULTIWAY_MERGE_EXTERNAL 1
49 #endif
50 #ifndef STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER
51 #define STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER 1
52 #endif
53 
54 #endif //STXXL_PARALLEL
55 
56 #if STXXL_PARALLEL && STXXL_PARALLEL_PQ_MULTIWAY_MERGE_EXTERNAL
57 #define STXXL_PQ_EXTERNAL_LOSER_TREE 0 // no loser tree for the external sequences
58 #else
59 #define STXXL_PQ_EXTERNAL_LOSER_TREE 1
60 #endif
61 
62 #if STXXL_PARALLEL && STXXL_PARALLEL_PQ_MULTIWAY_MERGE_INTERNAL
63 #define STXXL_PQ_INTERNAL_LOSER_TREE 0 // no loser tree for the internal sequences
64 #else
65 #define STXXL_PQ_INTERNAL_LOSER_TREE 1
66 #endif
67 
68 #include <stxxl/bits/containers/pq_helpers.h>
69 #include <stxxl/bits/containers/pq_mergers.h>
70 #include <stxxl/bits/containers/pq_ext_merger.h>
71 #include <stxxl/bits/containers/pq_losertree.h>
72 
73 __STXXL_BEGIN_NAMESPACE
74 
75 /*
76  KNBufferSize1 = 32;
77  KNN = 512; // length of group 1 sequences
78  KNKMAX = 64; // maximal arity
79  LogKNKMAX = 6; // ceil(log KNKMAX)
80  KNLevels = 4; // overall capacity >= KNN*KNKMAX^KNLevels
81  */
82 
83 // internal memory consumption >= N_*(KMAX_^IntLevels_) + ext
84 
85 template <
86  class Tp_,
87  class Cmp_,
88  unsigned BufferSize1_ = 32, // equalize procedure call overheads etc.
89  unsigned N_ = 512, // length of group 1 sequences
90  unsigned IntKMAX_ = 64, // maximal arity for internal mergers
91  unsigned IntLevels_ = 4, // number of internal groups
92  unsigned BlockSize_ = (2 * 1024 * 1024), // external block size
93  unsigned ExtKMAX_ = 64, // maximal arity for external mergers
94  unsigned ExtLevels_ = 2, // number of external groups
95  class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY
96  >
97 struct priority_queue_config
98 {
99  typedef Tp_ value_type;
100  typedef Cmp_ comparator_type;
101  typedef AllocStr_ alloc_strategy_type;
102  enum
103  {
104  delete_buffer_size = BufferSize1_,
105  N = N_,
106  IntKMAX = IntKMAX_,
107  num_int_groups = IntLevels_,
108  num_ext_groups = ExtLevels_,
109  BlockSize = BlockSize_,
110  ExtKMAX = ExtKMAX_,
111  element_size = sizeof(Tp_)
112  };
113 };
114 
115 __STXXL_END_NAMESPACE
116 
117 namespace std
118 {
119  template <class BlockType_,
120  class Cmp_,
121  unsigned Arity_,
122  class AllocStr_>
123  void swap(stxxl::priority_queue_local::ext_merger<BlockType_, Cmp_, Arity_, AllocStr_> & a,
124  stxxl::priority_queue_local::ext_merger<BlockType_, Cmp_, Arity_, AllocStr_> & b)
125  {
126  a.swap(b);
127  }
128  template <class ValTp_, class Cmp_, unsigned KNKMAX>
129  void swap(stxxl::priority_queue_local::loser_tree<ValTp_, Cmp_, KNKMAX> & a,
130  stxxl::priority_queue_local::loser_tree<ValTp_, Cmp_, KNKMAX> & b)
131  {
132  a.swap(b);
133  }
134 }
135 
136 __STXXL_BEGIN_NAMESPACE
137 
139 template <class Config_>
140 class priority_queue : private noncopyable
141 {
142 public:
143  typedef Config_ Config;
144  enum
145  {
146  delete_buffer_size = Config::delete_buffer_size,
147  N = Config::N,
148  IntKMAX = Config::IntKMAX,
149  num_int_groups = Config::num_int_groups,
150  num_ext_groups = Config::num_ext_groups,
151  total_num_groups = Config::num_int_groups + Config::num_ext_groups,
152  BlockSize = Config::BlockSize,
153  ExtKMAX = Config::ExtKMAX
154  };
155 
157  typedef typename Config::value_type value_type;
159  typedef typename Config::comparator_type comparator_type;
160  typedef typename Config::alloc_strategy_type alloc_strategy_type;
162  typedef stxxl::uint64 size_type;
165 
166 protected:
169 
171  value_type,
173  IntKMAX> int_merger_type;
174 
176  block_type,
178  ExtKMAX,
179  alloc_strategy_type> ext_merger_type;
180 
181 
182  int_merger_type int_mergers[num_int_groups];
183  pool_type * pool;
184  bool pool_owned;
185  ext_merger_type * ext_mergers;
186 
187  // one delete buffer for each tree => group buffer
188  value_type group_buffers[total_num_groups][N + 1]; // tree->group_buffers->delete_buffer (extra space for sentinel)
189  value_type * group_buffer_current_mins[total_num_groups]; // group_buffer_current_mins[i] is current start of group_buffers[i], end is group_buffers[i] + N
190 
191  // overall delete buffer
192  value_type delete_buffer[delete_buffer_size + 1];
193  value_type * delete_buffer_current_min; // current start of delete_buffer
194  value_type * delete_buffer_end; // end of delete_buffer
195 
196  comparator_type cmp;
197 
198  // insert buffer
199  insert_heap_type insert_heap;
200 
201  // how many groups are active
202  unsigned_type num_active_groups;
203 
204  // total size not counting insert_heap and delete_buffer
205  size_type size_;
206 
207 private:
208  void init();
209 
210  void refill_delete_buffer();
211  unsigned_type refill_group_buffer(unsigned_type k);
212 
213  unsigned_type make_space_available(unsigned_type level);
214  void empty_insert_heap();
215 
216  value_type get_supremum() const { return cmp.min_value(); } //{ return group_buffers[0][KNN].key; }
217  unsigned_type current_delete_buffer_size() const { return delete_buffer_end - delete_buffer_current_min; }
218  unsigned_type current_group_buffer_size(unsigned_type i) const { return &(group_buffers[i][N]) - group_buffer_current_mins[i]; }
219 
220 public:
226  priority_queue(pool_type & pool_);
227 
238 
248  priority_queue(unsigned_type p_pool_mem, unsigned_type w_pool_mem);
249 
252  void swap(priority_queue & obj)
253  {
254  //swap_1D_arrays(int_mergers,obj.int_mergers,num_int_groups); // does not work in g++ 3.4.3 :( bug?
255  for (unsigned_type i = 0; i < num_int_groups; ++i)
256  std::swap(int_mergers[i], obj.int_mergers[i]);
257 
258  //std::swap(pool,obj.pool);
259  //std::swap(pool_owned, obj.pool_owned);
260  std::swap(ext_mergers, obj.ext_mergers);
261  for (unsigned_type i1 = 0; i1 < total_num_groups; ++i1)
262  for (unsigned_type i2 = 0; i2 < (N + 1); ++i2)
263  std::swap(group_buffers[i1][i2], obj.group_buffers[i1][i2]);
264 
265  swap_1D_arrays(group_buffer_current_mins, obj.group_buffer_current_mins, total_num_groups);
266  swap_1D_arrays(delete_buffer, obj.delete_buffer, delete_buffer_size + 1);
267  std::swap(delete_buffer_current_min, obj.delete_buffer_current_min);
268  std::swap(delete_buffer_end, obj.delete_buffer_end);
269  std::swap(cmp, obj.cmp);
270  std::swap(insert_heap, obj.insert_heap);
271  std::swap(num_active_groups, obj.num_active_groups);
272  std::swap(size_, obj.size_);
273  }
274 
275  virtual ~priority_queue();
276 
279  size_type size() const;
280 
283  bool empty() const { return (size() == 0); }
284 
296  const value_type & top() const;
297 
304  void pop();
305 
310  void push(const value_type & obj);
311 
316  unsigned_type mem_cons() const
317  {
318  unsigned_type dynam_alloc_mem = 0;
319  //dynam_alloc_mem += w_pool.mem_cons();
320  //dynam_alloc_mem += p_pool.mem_cons();
321  for (int i = 0; i < num_int_groups; ++i)
322  dynam_alloc_mem += int_mergers[i].mem_cons();
323 
324  for (int i = 0; i < num_ext_groups; ++i)
325  dynam_alloc_mem += ext_mergers[i].mem_cons();
326 
327 
328  return (sizeof(*this) +
329  sizeof(ext_merger_type) * num_ext_groups +
330  dynam_alloc_mem);
331  }
332 };
333 
334 
335 template <class Config_>
337 {
338  return size_ +
339  insert_heap.size() - 1 +
340  (delete_buffer_end - delete_buffer_current_min);
341 }
342 
343 
344 template <class Config_>
346 {
347  assert(!insert_heap.empty());
348 
349  const typename priority_queue<Config_>::value_type & t = insert_heap.top();
350  if (/*(!insert_heap.empty()) && */ cmp(*delete_buffer_current_min, t))
351  return t;
352  else
353  return *delete_buffer_current_min;
354 }
355 
356 template <class Config_>
358 {
359  //STXXL_VERBOSE1("priority_queue::pop()");
360  assert(!insert_heap.empty());
361 
362  if (/*(!insert_heap.empty()) && */ cmp(*delete_buffer_current_min, insert_heap.top()))
363  insert_heap.pop();
364  else
365  {
366  assert(delete_buffer_current_min < delete_buffer_end);
367  ++delete_buffer_current_min;
368  if (delete_buffer_current_min == delete_buffer_end)
369  refill_delete_buffer();
370  }
371 }
372 
373 template <class Config_>
374 inline void priority_queue<Config_>::push(const value_type & obj)
375 {
376  //STXXL_VERBOSE3("priority_queue::push("<< obj <<")");
377  assert(int_mergers->not_sentinel(obj));
378  if (insert_heap.size() == N + 1)
379  empty_insert_heap();
380 
381 
382  assert(!insert_heap.empty());
383 
384  insert_heap.push(obj);
385 }
386 
387 
389 
390 template <class Config_>
392  pool(&pool_),
393  pool_owned(false),
394  delete_buffer_end(delete_buffer + delete_buffer_size),
395  insert_heap(N + 2),
396  num_active_groups(0), size_(0)
397 {
398  STXXL_VERBOSE2("priority_queue::priority_queue(pool)");
399  init();
400 }
401 
402 // DEPRECATED
403 template <class Config_>
405  pool(new pool_type(p_pool_, w_pool_)),
406  pool_owned(true),
407  delete_buffer_end(delete_buffer + delete_buffer_size),
408  insert_heap(N + 2),
409  num_active_groups(0), size_(0)
410 {
411  STXXL_VERBOSE2("priority_queue::priority_queue(p_pool, w_pool)");
412  init();
413 }
414 
415 template <class Config_>
416 priority_queue<Config_>::priority_queue(unsigned_type p_pool_mem, unsigned_type w_pool_mem) :
417  pool(new pool_type(p_pool_mem / BlockSize, w_pool_mem / BlockSize)),
418  pool_owned(true),
419  delete_buffer_end(delete_buffer + delete_buffer_size),
420  insert_heap(N + 2),
421  num_active_groups(0), size_(0)
422 {
423  STXXL_VERBOSE2("priority_queue::priority_queue(pool sizes)");
424  init();
425 }
426 
427 template <class Config_>
429 {
430  assert(!cmp(cmp.min_value(), cmp.min_value())); // verify strict weak ordering
431 
432  ext_mergers = new ext_merger_type[num_ext_groups];
433  for (unsigned_type j = 0; j < num_ext_groups; ++j)
434  ext_mergers[j].set_pool(pool);
435 
436  value_type sentinel = cmp.min_value();
437  insert_heap.push(sentinel); // always keep the sentinel
438  delete_buffer[delete_buffer_size] = sentinel; // sentinel
439  delete_buffer_current_min = delete_buffer_end; // empty
440  for (unsigned_type i = 0; i < total_num_groups; i++)
441  {
442  group_buffers[i][N] = sentinel; // sentinel
443  group_buffer_current_mins[i] = &(group_buffers[i][N]); // empty
444  }
445 }
446 
447 template <class Config_>
449 {
450  STXXL_VERBOSE2("priority_queue::~priority_queue()");
451  if (pool_owned)
452  delete pool;
453 
454  delete[] ext_mergers;
455 }
456 
457 //--------------------- Buffer refilling -------------------------------
458 
459 // refill group_buffers[j] and return number of elements found
460 template <class Config_>
461 unsigned_type priority_queue<Config_>::refill_group_buffer(unsigned_type group)
462 {
463  STXXL_VERBOSE2("priority_queue::refill_group_buffer(" << group << ")");
464 
465  value_type * target;
466  unsigned_type length;
467  size_type group_size = (group < num_int_groups) ?
468  int_mergers[group].size() :
469  ext_mergers[group - num_int_groups].size(); //elements left in segments
470  unsigned_type left_elements = group_buffers[group] + N - group_buffer_current_mins[group]; //elements left in target buffer
471  if (group_size + left_elements >= size_type(N))
472  { // buffer will be filled completely
473  target = group_buffers[group];
474  length = N - left_elements;
475  }
476  else
477  {
478  target = group_buffers[group] + N - group_size - left_elements;
479  length = group_size;
480  }
481 
482  if (length > 0)
483  {
484  // shift remaininig elements to front
485  memmove(target, group_buffer_current_mins[group], left_elements * sizeof(value_type));
486  group_buffer_current_mins[group] = target;
487 
488  // fill remaining space from group
489  if (group < num_int_groups)
490  int_mergers[group].multi_merge(target + left_elements, length);
491  else
492  ext_mergers[group - num_int_groups].multi_merge(
493  target + left_elements,
494  target + left_elements + length);
495  }
496 
497  //STXXL_MSG(length + left_elements);
498  //std::copy(target,target + length + left_elements,std::ostream_iterator<value_type>(std::cout, "\n"));
499 #if STXXL_CHECK_ORDER_IN_SORTS
501  if (!stxxl::is_sorted(group_buffer_current_mins[group], group_buffers[group] + N, inv_cmp))
502  {
503  STXXL_VERBOSE2("length: " << length << " left_elements: " << left_elements);
504  for (value_type * v = group_buffer_current_mins[group] + 1; v < group_buffer_current_mins[group] + left_elements; ++v)
505  {
506  if (inv_cmp(*v, *(v - 1)))
507  {
508  STXXL_MSG("Error in buffer " << group << " at position " << (v - group_buffer_current_mins[group] - 1) << "/" << (v - group_buffer_current_mins[group]) << " " << *(v - 2) << " " << *(v - 1) << " " << *v << " " << *(v + 1));
509  }
510  }
511  assert(false);
512  }
513 #endif
514 
515  return length + left_elements;
516 }
517 
518 
519 template <class Config_>
521 {
522  STXXL_VERBOSE2("priority_queue::refill_delete_buffer()");
523 
524  size_type total_group_size = 0;
525  //num_active_groups is <= 4
526  for (int i = num_active_groups - 1; i >= 0; i--)
527  {
528  if ((group_buffers[i] + N) - group_buffer_current_mins[i] < delete_buffer_size)
529  {
530  unsigned_type length = refill_group_buffer(i);
531  // max active level dry now?
532  if (length == 0 && unsigned(i) == num_active_groups - 1)
533  --num_active_groups;
534 
535  total_group_size += length;
536  }
537  else
538  total_group_size += delete_buffer_size; // actually only a sufficient lower bound
539  }
540 
541  unsigned_type length;
542  if (total_group_size >= delete_buffer_size) // buffer can be filled completely
543  {
544  length = delete_buffer_size; // amount to be copied
545  size_ -= size_type(delete_buffer_size); // amount left in group_buffers
546  }
547  else
548  {
549  length = total_group_size;
550  assert(size_ == size_type(length)); // trees and group_buffers get empty
551  size_ = 0;
552  }
553 
555 
556  // now call simplified refill routines
557  // which can make the assumption that
558  // they find all they are asked in the buffers
559  delete_buffer_current_min = delete_buffer_end - length;
560  STXXL_VERBOSE2("Active groups = " << num_active_groups);
561  switch (num_active_groups)
562  {
563  case 0:
564  break;
565  case 1:
566  std::copy(group_buffer_current_mins[0], group_buffer_current_mins[0] + length, delete_buffer_current_min);
567  group_buffer_current_mins[0] += length;
568  break;
569  case 2:
570 #if STXXL_PARALLEL && STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER
571  {
572  std::pair<value_type *, value_type *> seqs[2] =
573  {
574  std::make_pair(group_buffer_current_mins[0], group_buffers[0] + N),
575  std::make_pair(group_buffer_current_mins[1], group_buffers[1] + N)
576  };
577  parallel::multiway_merge_sentinel(seqs, seqs + 2, delete_buffer_current_min, inv_cmp, length); //sequence iterators are progressed appropriately
578 
579  group_buffer_current_mins[0] = seqs[0].first;
580  group_buffer_current_mins[1] = seqs[1].first;
581  }
582 #else
583  priority_queue_local::merge_iterator(
584  group_buffer_current_mins[0],
585  group_buffer_current_mins[1], delete_buffer_current_min, length, cmp);
586 #endif
587  break;
588  case 3:
589 #if STXXL_PARALLEL && STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER
590  {
591  std::pair<value_type *, value_type *> seqs[3] =
592  {
593  std::make_pair(group_buffer_current_mins[0], group_buffers[0] + N),
594  std::make_pair(group_buffer_current_mins[1], group_buffers[1] + N),
595  std::make_pair(group_buffer_current_mins[2], group_buffers[2] + N)
596  };
597  parallel::multiway_merge_sentinel(seqs, seqs + 3, delete_buffer_current_min, inv_cmp, length); //sequence iterators are progressed appropriately
598 
599  group_buffer_current_mins[0] = seqs[0].first;
600  group_buffer_current_mins[1] = seqs[1].first;
601  group_buffer_current_mins[2] = seqs[2].first;
602  }
603 #else
604  priority_queue_local::merge3_iterator(
605  group_buffer_current_mins[0],
606  group_buffer_current_mins[1],
607  group_buffer_current_mins[2], delete_buffer_current_min, length, cmp);
608 #endif
609  break;
610  case 4:
611 #if STXXL_PARALLEL && STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER
612  {
613  std::pair<value_type *, value_type *> seqs[4] =
614  {
615  std::make_pair(group_buffer_current_mins[0], group_buffers[0] + N),
616  std::make_pair(group_buffer_current_mins[1], group_buffers[1] + N),
617  std::make_pair(group_buffer_current_mins[2], group_buffers[2] + N),
618  std::make_pair(group_buffer_current_mins[3], group_buffers[3] + N)
619  };
620  parallel::multiway_merge_sentinel(seqs, seqs + 4, delete_buffer_current_min, inv_cmp, length); //sequence iterators are progressed appropriately
621 
622  group_buffer_current_mins[0] = seqs[0].first;
623  group_buffer_current_mins[1] = seqs[1].first;
624  group_buffer_current_mins[2] = seqs[2].first;
625  group_buffer_current_mins[3] = seqs[3].first;
626  }
627 #else
628  priority_queue_local::merge4_iterator(
629  group_buffer_current_mins[0],
630  group_buffer_current_mins[1],
631  group_buffer_current_mins[2],
632  group_buffer_current_mins[3], delete_buffer_current_min, length, cmp); //side effect free
633 #endif
634  break;
635  default:
636  STXXL_THROW(std::runtime_error, "priority_queue<...>::refill_delete_buffer()",
637  "Overflow! The number of buffers on 2nd level in stxxl::priority_queue is currently limited to 4");
638  }
639 
640 #if STXXL_CHECK_ORDER_IN_SORTS
641  if (!stxxl::is_sorted(delete_buffer_current_min, delete_buffer_end, inv_cmp))
642  {
643  for (value_type * v = delete_buffer_current_min + 1; v < delete_buffer_end; ++v)
644  {
645  if (inv_cmp(*v, *(v - 1)))
646  {
647  STXXL_MSG("Error at position " << (v - delete_buffer_current_min - 1) << "/" << (v - delete_buffer_current_min) << " " << *(v - 1) << " " << *v);
648  }
649  }
650  assert(false);
651  }
652 #endif
653  //std::copy(delete_buffer_current_min,delete_buffer_current_min + length,std::ostream_iterator<value_type>(std::cout, "\n"));
654 }
655 
656 //--------------------------------------------------------------------
657 
658 // check if space is available on level k and
659 // empty this level if necessary leading to a recursive call.
660 // return the level where space was finally available
661 template <class Config_>
662 unsigned_type priority_queue<Config_>::make_space_available(unsigned_type level)
663 {
664  STXXL_VERBOSE2("priority_queue::make_space_available(" << level << ")");
665  unsigned_type finalLevel;
666  assert(level < total_num_groups);
667  assert(level <= num_active_groups);
668 
669  if (level == num_active_groups)
670  ++num_active_groups;
671 
672  const bool spaceIsAvailable_ =
673  (level < num_int_groups) ? int_mergers[level].is_space_available()
674  : ((level == total_num_groups - 1) ? true : (ext_mergers[level - num_int_groups].is_space_available()));
675 
676  if (spaceIsAvailable_)
677  {
678  finalLevel = level;
679  }
680  else
681  {
682  finalLevel = make_space_available(level + 1);
683 
684  if (level < num_int_groups - 1) // from internal to internal tree
685  {
686  unsigned_type segmentSize = int_mergers[level].size();
687  value_type * newSegment = new value_type[segmentSize + 1];
688  int_mergers[level].multi_merge(newSegment, segmentSize); // empty this level
689 
690  newSegment[segmentSize] = delete_buffer[delete_buffer_size]; // sentinel
691  // for queues where size << #inserts
692  // it might make sense to stay in this level if
693  // segmentSize < alpha * KNN * k^level for some alpha < 1
694  int_mergers[level + 1].insert_segment(newSegment, segmentSize);
695  }
696  else
697  {
698  if (level == num_int_groups - 1) // from internal to external tree
699  {
700  const unsigned_type segmentSize = int_mergers[num_int_groups - 1].size();
701  STXXL_VERBOSE1("Inserting segment into first level external: " << level << " " << segmentSize);
702  ext_mergers[0].insert_segment(int_mergers[num_int_groups - 1], segmentSize);
703  }
704  else // from external to external tree
705  {
706  const size_type segmentSize = ext_mergers[level - num_int_groups].size();
707  STXXL_VERBOSE1("Inserting segment into second level external: " << level << " " << segmentSize);
708  ext_mergers[level - num_int_groups + 1].insert_segment(ext_mergers[level - num_int_groups], segmentSize);
709  }
710  }
711  }
712  return finalLevel;
713 }
714 
715 
716 // empty the insert heap into the main data structure
717 template <class Config_>
719 {
720  STXXL_VERBOSE2("priority_queue::empty_insert_heap()");
721  assert(insert_heap.size() == (N + 1));
722 
723  const value_type sup = get_supremum();
724 
725  // build new segment
726  value_type * newSegment = new value_type[N + 1];
727  value_type * newPos = newSegment;
728 
729  // put the new data there for now
730  //insert_heap.sortTo(newSegment);
731  value_type * SortTo = newSegment;
732 
733  insert_heap.sort_to(SortTo);
734 
735  SortTo = newSegment + N;
736  insert_heap.clear();
737  insert_heap.push(*SortTo);
738 
739  assert(insert_heap.size() == 1);
740 
741  newSegment[N] = sup; // sentinel
742 
743  // copy the delete_buffer and group_buffers[0] to temporary storage
744  // (the temporary can be eliminated using some dirty tricks)
745  const unsigned_type tempSize = N + delete_buffer_size;
746  value_type temp[tempSize + 1];
747  unsigned_type sz1 = current_delete_buffer_size();
748  unsigned_type sz2 = current_group_buffer_size(0);
749  value_type * pos = temp + tempSize - sz1 - sz2;
750  std::copy(delete_buffer_current_min, delete_buffer_current_min + sz1, pos);
751  std::copy(group_buffer_current_mins[0], group_buffer_current_mins[0] + sz2, pos + sz1);
752  temp[tempSize] = sup; // sentinel
753 
754  // refill delete_buffer
755  // (using more complicated code it could be made somewhat fuller
756  // in certain circumstances)
757  priority_queue_local::merge_iterator(pos, newPos, delete_buffer_current_min, sz1, cmp);
758 
759  // refill group_buffers[0]
760  // (as above we might want to take the opportunity
761  // to make group_buffers[0] fuller)
762  priority_queue_local::merge_iterator(pos, newPos, group_buffer_current_mins[0], sz2, cmp);
763 
764  // merge the rest to the new segment
765  // note that merge exactly trips into the footsteps
766  // of itself
767  priority_queue_local::merge_iterator(pos, newPos, newSegment, N, cmp);
768 
769  // and insert it
770  unsigned_type freeLevel = make_space_available(0);
771  assert(freeLevel == 0 || int_mergers[0].size() == 0);
772  int_mergers[0].insert_segment(newSegment, N);
773 
774  // get rid of invalid level 2 buffers
775  // by inserting them into tree 0 (which is almost empty in this case)
776  if (freeLevel > 0)
777  {
778  for (int_type i = freeLevel; i >= 0; i--)
779  {
780  // reverse order not needed
781  // but would allow immediate refill
782 
783  newSegment = new value_type[current_group_buffer_size(i) + 1]; // with sentinel
784  std::copy(group_buffer_current_mins[i], group_buffer_current_mins[i] + current_group_buffer_size(i) + 1, newSegment);
785  int_mergers[0].insert_segment(newSegment, current_group_buffer_size(i));
786  group_buffer_current_mins[i] = group_buffers[i] + N; // empty
787  }
788  }
789 
790  // update size
791  size_ += size_type(N);
792 
793  // special case if the tree was empty before
794  if (delete_buffer_current_min == delete_buffer_end)
795  refill_delete_buffer();
796 }
797 
798 namespace priority_queue_local
799 {
800  struct Parameters_for_priority_queue_not_found_Increase_IntM
801  {
802  enum { fits = false };
803  typedef Parameters_for_priority_queue_not_found_Increase_IntM result;
804  };
805 
806  struct dummy
807  {
808  enum { fits = false };
809  typedef dummy result;
810  };
811 
812  template <unsigned_type E_, unsigned_type IntM_, unsigned_type MaxS_, unsigned_type B_, unsigned_type m_, bool stop = false>
813  struct find_B_m
814  {
815  typedef find_B_m<E_, IntM_, MaxS_, B_, m_, stop> Self;
816  enum {
817  k = IntM_ / B_, // number of blocks that fit into M
818  element_size = E_, // element size
819  IntM = IntM_,
820  B = B_, // block size
821  m = m_, // number of blocks fitting into buffers
822  c = k - m_,
823  // memory occ. by block must be at least 10 times larger than size of ext sequence
824  // && satisfy memory req && if we have two ext mergers their degree must be at least 64=m/2
825  fits = c > 10 && ((k - m) * (m) * (m * B / (element_size * 4 * 1024))) >= MaxS_
826  && ((MaxS_ < ((k - m) * m / (2 * element_size)) * 1024) || m >= 128),
827  step = 1
828  };
829 
830  typedef typename find_B_m<element_size, IntM, MaxS_, B, m + step, fits || (m >= k - step)>::result candidate1;
831  typedef typename find_B_m<element_size, IntM, MaxS_, B / 2, 1, fits || candidate1::fits>::result candidate2;
833  };
834 
835  // specialization for the case when no valid parameters are found
836  template <unsigned_type E_, unsigned_type IntM_, unsigned_type MaxS_, bool stop>
837  struct find_B_m<E_, IntM_, MaxS_, 2048, 1, stop>
838  {
839  enum { fits = false };
840  typedef Parameters_for_priority_queue_not_found_Increase_IntM result;
841  };
842 
843  // to speedup search
844  template <unsigned_type E_, unsigned_type IntM_, unsigned_type MaxS_, unsigned_type B_, unsigned_type m_>
845  struct find_B_m<E_, IntM_, MaxS_, B_, m_, true>
846  {
847  enum { fits = false };
848  typedef dummy result;
849  };
850 
851  // E_ size of element in bytes
852  template <unsigned_type E_, unsigned_type IntM_, unsigned_type MaxS_>
853  struct find_settings
854  {
855  // start from block size (8*1024*1024) bytes
856  typedef typename find_B_m<E_, IntM_, MaxS_, (8 * 1024 * 1024), 1>::result result;
857  };
858 
859  struct Parameters_not_found_Try_to_change_the_Tune_parameter
860  {
861  typedef Parameters_not_found_Try_to_change_the_Tune_parameter result;
862  };
863 
864 
865  template <unsigned_type AI_, unsigned_type X_, unsigned_type CriticalSize_>
866  struct compute_N
867  {
868  typedef compute_N<AI_, X_, CriticalSize_> Self;
869  enum
870  {
871  X = X_,
872  AI = AI_,
873  N = X / (AI * AI) // two stage internal
874  };
875  typedef typename IF<(N >= CriticalSize_), Self, typename compute_N<AI / 2, X, CriticalSize_>::result>::result result;
876  };
877 
878  template <unsigned_type X_, unsigned_type CriticalSize_>
879  struct compute_N<1, X_, CriticalSize_>
880  {
881  typedef Parameters_not_found_Try_to_change_the_Tune_parameter result;
882  };
883 }
884 
886 
889 
891 
954 template <class Tp_, class Cmp_, unsigned_type IntM_, unsigned MaxS_, unsigned Tune_ = 6>
956 {
957 public:
958  // actual calculation of B, m, k and element_size
959  typedef typename priority_queue_local::find_settings<sizeof(Tp_), IntM_, MaxS_>::result settings;
960  enum {
961  B = settings::B,
962  m = settings::m,
963  X = B * (settings::k - m) / settings::element_size, // interpretation of result
964  Buffer1Size = 32 // fixed
965  };
966  // derivation of N, AI, AE
967  typedef typename priority_queue_local::compute_N<(1 << Tune_), X, 4 * Buffer1Size>::result ComputeN;
968  enum
969  {
970  N = ComputeN::N,
971  AI = ComputeN::AI,
972  AE = (m / 2 < 2) ? 2 : (m / 2) // at least 2
973  };
974  enum {
975  // Estimation of maximum internal memory consumption (in bytes)
976  EConsumption = X * settings::element_size + settings::B * AE + ((MaxS_ / X) / AE) * settings::B * 1024
977  };
978  /*
979  unsigned BufferSize1_ = 32, // equalize procedure call overheads etc.
980  unsigned N_ = 512, // bandwidth
981  unsigned IntKMAX_ = 64, // maximal arity for internal mergers
982  unsigned IntLevels_ = 4,
983  unsigned BlockSize_ = (2*1024*1024),
984  unsigned ExtKMAX_ = 64, // maximal arity for external mergers
985  unsigned ExtLevels_ = 2,
986  */
988 };
989 
991 
992 __STXXL_END_NAMESPACE
993 
994 
995 namespace std
996 {
997  template <class Config_>
998  void swap(stxxl::priority_queue<Config_> & a,
999  stxxl::priority_queue<Config_> & b)
1000  {
1001  a.swap(b);
1002  }
1003 }
1004 
1005 #endif // !STXXL_PRIORITY_QUEUE_HEADER
1006 // vim: et:ts=4:sw=4
Priority queue type generator.
Definition: priority_queue.h:955
_STXXL_DEPRECATED(priority_queue(prefetch_pool< block_type > &p_pool_, write_pool< block_type > &w_pool_))
Constructs external priority queue object.
Block containing elements of fixed length.
Definition: typed_block.h:223
const value_type & top() const
Returns &quot;largest&quot; element.
Definition: priority_queue.h:345
Implements dynamically resizable prefetching pool.
Definition: prefetch_pool.h:34
priority_queue(pool_type &pool_)
Constructs external priority queue object.
Definition: priority_queue.h:391
Inverts the order of a comparison functor by swapping its arguments.
Definition: pq_helpers.h:145
Implements dynamically resizable buffered writing pool.
Definition: write_pool.h:36
Implements dynamically resizable buffered writing and prefetched reading pool.
Definition: read_write_pool.h:28
Loser tree from Knuth, &quot;Sorting and Searching&quot;, Section 5.4.1 !
Definition: pq_losertree.h:36
Similar to std::priority_queue, with the following differences:
Definition: pq_helpers.h:39
void push(const value_type &obj)
Inserts x into the priority_queue.
Definition: priority_queue.h:374
stxxl::uint64 size_type
An unsigned integral type (64 bit)
Definition: priority_queue.h:162
size_type size() const
Returns number of elements contained.
Definition: priority_queue.h:336
unsigned_type mem_cons() const
Returns number of bytes consumed by the priority_queue.
Definition: priority_queue.h:316
IF template metaprogramming statement.
Definition: tmeta.h:31
bool empty() const
Returns true if queue has no elements.
Definition: priority_queue.h:283
Config::value_type value_type
The type of object stored in the priority_queue.
Definition: priority_queue.h:157
External priority queue data structure.
Definition: priority_queue.h:140
void swap(priority_queue &obj)
swap this priority queue with another one Implementation correctness is questionable.
Definition: priority_queue.h:252
void pop()
Removes the element at the top.
Definition: priority_queue.h:357
External merger, based on the loser tree data structure. !
Definition: pq_ext_merger.h:127
Config::comparator_type comparator_type
Comparison object.
Definition: priority_queue.h:159