STXXL  1.4.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
priority_queue.h
Go to the documentation of this file.
1 /***************************************************************************
2  * include/stxxl/bits/containers/priority_queue.h
3  *
4  * Implements a data structure from "Peter Sanders. Fast Priority Queues for
5  * Cached Memory. ALENEX'99" for external memory.
6  *
7  * Part of the STXXL. See http://stxxl.sourceforge.net
8  *
9  * Copyright (C) 1999 Peter Sanders <[email protected]>
10  * Copyright (C) 2003, 2004, 2007 Roman Dementiev <[email protected]>
11  * Copyright (C) 2007-2009 Johannes Singler <[email protected]>
12  * Copyright (C) 2007-2010 Andreas Beckmann <[email protected]>
13  *
14  * Distributed under the Boost Software License, Version 1.0.
15  * (See accompanying file LICENSE_1_0.txt or copy at
16  * http://www.boost.org/LICENSE_1_0.txt)
17  **************************************************************************/
18 
19 #ifndef STXXL_CONTAINERS_PRIORITY_QUEUE_HEADER
20 #define STXXL_CONTAINERS_PRIORITY_QUEUE_HEADER
21 
26 
28 
29 /*
30  KNBufferSize1 = 32;
31  KNN = 512; // length of group 1 sequences
32  KNKMAX = 64; // maximal arity
33  LogKNKMAX = 6; // ceil(log KNKMAX)
34  KNLevels = 4; // overall capacity >= KNN*KNKMAX^KNLevels
35  */
36 
37 // internal memory consumption >= N_*(KMAX_^IntLevels_) + ext
38 
39 template <
40  class ValueType,
41  class CompareType,
42  unsigned BufferSize1_ = 32, // equalize procedure call overheads etc.
43  unsigned N_ = 512, // length of group 1 sequences
44  unsigned IntKMAX_ = 64, // maximal arity for internal mergers
45  unsigned IntLevels_ = 4, // number of internal groups
46  unsigned BlockSize_ = (2* 1024* 1024), // external block size
47  unsigned ExtKMAX_ = 64, // maximal arity for external mergers
48  unsigned ExtLevels_ = 2, // number of external groups
49  class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY
50  >
52 {
53  typedef ValueType value_type;
54  typedef CompareType comparator_type;
55  typedef AllocStr_ alloc_strategy_type;
56  enum
57  {
58  delete_buffer_size = BufferSize1_,
59  N = N_,
60  IntKMAX = IntKMAX_,
61  num_int_groups = IntLevels_,
62  num_ext_groups = ExtLevels_,
63  BlockSize = BlockSize_,
64  ExtKMAX = ExtKMAX_,
65  element_size = sizeof(ValueType)
66  };
67 };
68 
70 
71 namespace std {
72 
73 template <class BlockType,
74  class CompareType,
75  unsigned Arity,
76  class AllocStr>
79 {
80  a.swap(b);
81 }
82 template <class ValueType, class CompareType, unsigned KNKMAX>
85 {
86  a.swap(b);
87 }
88 
89 } // namespace std
90 
92 
93 //! External priority queue data structure \n
94 //! <b> Introduction </b> to priority queue container: see \ref tutorial_pqueue tutorial. \n
95 //! <b> Design and Internals </b> of priority queue container: see \ref design_pqueue.
96 template <class ConfigType>
97 class priority_queue : private noncopyable
98 {
99 public:
100  typedef ConfigType Config;
101  enum
102  {
103  delete_buffer_size = Config::delete_buffer_size,
104  N = Config::N,
105  IntKMAX = Config::IntKMAX,
106  num_int_groups = Config::num_int_groups,
107  num_ext_groups = Config::num_ext_groups,
108  total_num_groups = Config::num_int_groups + Config::num_ext_groups,
109  BlockSize = Config::BlockSize,
110  ExtKMAX = Config::ExtKMAX
111  };
112 
113  //! The type of object stored in the priority_queue.
114  typedef typename Config::value_type value_type;
115  //! Comparison object.
116  typedef typename Config::comparator_type comparator_type;
117  typedef typename Config::alloc_strategy_type alloc_strategy_type;
118  //! An unsigned integral type (64 bit).
120  //! Type of the block used in disk-memory transfers
123 
124 protected:
127 
129  value_type,
130  comparator_type,
131  IntKMAX> int_merger_type;
132 
134  block_type,
135  comparator_type,
136  ExtKMAX,
137  alloc_strategy_type> ext_merger_type;
138 
139 
140  int_merger_type int_mergers[num_int_groups];
144 
145  // one delete buffer for each tree => group buffer
146  value_type group_buffers[total_num_groups][N + 1]; // tree->group_buffers->delete_buffer (extra space for sentinel)
147  value_type* group_buffer_current_mins[total_num_groups]; // group_buffer_current_mins[i] is current start of group_buffers[i], end is group_buffers[i] + N
148 
149  // overall delete buffer
150  value_type delete_buffer[delete_buffer_size + 1];
151  value_type* delete_buffer_current_min; // current start of delete_buffer
152  value_type* delete_buffer_end; // end of delete_buffer
153 
154  comparator_type cmp;
155 
156  // insert buffer
158 
159  // how many groups are active
161 
162  // total size not counting insert_heap and delete_buffer
164 
165 private:
166  void init();
167 
168  void refill_delete_buffer();
169  unsigned_type refill_group_buffer(unsigned_type k);
170 
171  unsigned_type make_space_available(unsigned_type level);
172  void empty_insert_heap();
173 
174  value_type get_supremum() const { return cmp.min_value(); } //{ return group_buffers[0][KNN].key; }
175  unsigned_type current_delete_buffer_size() const { return delete_buffer_end - delete_buffer_current_min; }
176  unsigned_type current_group_buffer_size(unsigned_type i) const { return &(group_buffers[i][N]) - group_buffer_current_mins[i]; }
177 
178 public:
179  //! \name Constructors/Destructors
180  //! \{
181 
182  //! Constructs external priority queue object.
183  //! \param pool_ pool of blocks that will be used
184  //! for data writing and prefetching for the disk<->memory transfers
185  //! happening in the priority queue. Larger pool size
186  //! helps to speed up operations.
187  priority_queue(pool_type& pool_);
188 
189  //! Constructs external priority queue object.
190  //! \param p_pool_ pool of blocks that will be used
191  //! for data prefetching for the disk<->memory transfers
192  //! happening in the priority queue. Larger pool size
193  //! helps to speed up operations.
194  //! \param w_pool_ pool of blocks that will be used
195  //! for writing data for the memory<->disk transfers
196  //! happening in the priority queue. Larger pool size
197  //! helps to speed up operations.
200  );
201 
202  //! Constructs external priority queue object.
203  //! \param p_pool_mem memory (in bytes) for prefetch pool that will be used
204  //! for data prefetching for the disk<->memory transfers
205  //! happening in the priority queue. Larger pool size
206  //! helps to speed up operations.
207  //! \param w_pool_mem memory (in bytes) for buffered write pool that will be used
208  //! for writing data for the memory<->disk transfers
209  //! happening in the priority queue. Larger pool size
210  //! helps to speed up operations.
211  priority_queue(unsigned_type p_pool_mem, unsigned_type w_pool_mem);
212 
213  virtual ~priority_queue();
214 
215  //! \}
216 
217 #if 0
218 
219  //! swap this priority queue with another one.
220  //! Implementation correctness is questionable.
221  void swap(priority_queue& obj)
222  {
223  //swap_1D_arrays(int_mergers,obj.int_mergers,num_int_groups); // does not work in g++ 3.4.3 :( bug?
224  for (unsigned_type i = 0; i < num_int_groups; ++i)
225  std::swap(int_mergers[i], obj.int_mergers[i]);
226 
227  //std::swap(pool,obj.pool);
228  //std::swap(pool_owned, obj.pool_owned);
229  std::swap(ext_mergers, obj.ext_mergers);
230  for (unsigned_type i1 = 0; i1 < total_num_groups; ++i1)
231  for (unsigned_type i2 = 0; i2 < (N + 1); ++i2)
232  std::swap(group_buffers[i1][i2], obj.group_buffers[i1][i2]);
233 
234  STXXL_STATIC_ASSERT(false);
235  // Shoot yourself in the foot: group_buffer_current_mins contains pointers into group_buffers ...
236  // either recompute them or add/subtract (&this->group_buffers[0][0] - &obj->group_buffers[0][0])
237  swap_1D_arrays(group_buffer_current_mins, obj.group_buffer_current_mins, total_num_groups);
238  swap_1D_arrays(delete_buffer, obj.delete_buffer, delete_buffer_size + 1);
239  std::swap(delete_buffer_current_min, obj.delete_buffer_current_min);
240  std::swap(delete_buffer_end, obj.delete_buffer_end);
241  std::swap(cmp, obj.cmp);
242  std::swap(insert_heap, obj.insert_heap);
243  std::swap(num_active_groups, obj.num_active_groups);
244  std::swap(size_, obj.size_);
245  }
246 #endif
247 
248  //! \name Capacity
249  //! \{
250 
251  //! Returns number of elements contained.
252  //! \return number of elements contained
253  size_type size() const;
254 
255  //! Returns true if queue has no elements.
256  //! \return \b true if queue has no elements, \b false otherwise
257  bool empty() const { return (size() == 0); }
258 
259  //! \}
260 
261  //! \name Operators
262  //! \{
263 
264  //! Returns "largest" element.
265  //!
266  //! Returns a const reference to the element at the top of the
267  //! priority_queue. The element at the top is guaranteed to be the largest
268  //! element in the \b priority queue, as determined by the comparison
269  //! function \b comparator_type (the same as the second parameter of
270  //! PRIORITY_QUEUE_GENERATOR utility class). That is, for every other
271  //! element \b x in the priority_queue, \b comparator_type(Q.top(), x) is
272  //! false. Precondition: \c empty() is false.
273  const value_type & top() const;
274 
275  //! \}
276 
277  //! \name Modifiers
278  //! \{
279 
280  //! Removes the element at the top.
281  //!
282  //! Removes the element at the top of the priority_queue, that is, the
283  //! largest element in the \b priority_queue. Precondition: \c empty() is
284  //! \b false. Postcondition: \c size() will be decremented by 1.
285  void pop();
286 
287  //! Inserts x into the priority_queue.
288  //!
289  //! Inserts x into the priority_queue. Postcondition: \c size() will be
290  //! incremented by 1.
291  void push(const value_type& obj);
292 
293  //! \}
294 
295  //! \name Miscellaneous
296  //! \{
297 
298  //! Number of bytes consumed by the \b priority_queue from the internal
299  //! memory not including pools (see the constructor)
301  {
302  unsigned_type dynam_alloc_mem = 0;
303  //dynam_alloc_mem += w_pool.mem_cons();
304  //dynam_alloc_mem += p_pool.mem_cons();
305  for (int i = 0; i < num_int_groups; ++i)
306  dynam_alloc_mem += int_mergers[i].mem_cons();
307 
308  for (int i = 0; i < num_ext_groups; ++i)
309  dynam_alloc_mem += ext_mergers[i]->mem_cons();
310 
311 
312  return (sizeof(*this) +
313  sizeof(ext_merger_type) * num_ext_groups +
314  dynam_alloc_mem);
315  }
316 
317  void dump_sizes() const;
318  void dump_params() const;
319 
320  //! \}
321 };
322 
323 
324 template <class ConfigType>
326 {
327  return size_ +
328  insert_heap.size() - 1 +
329  (delete_buffer_end - delete_buffer_current_min);
330 }
331 
332 
333 template <class ConfigType>
335 {
336  assert(!insert_heap.empty());
337 
338  const typename priority_queue<ConfigType>::value_type& t = insert_heap.top();
339  if (/*(!insert_heap.empty()) && */ cmp(*delete_buffer_current_min, t))
340  return t;
341  else
342  return *delete_buffer_current_min;
343 }
344 
345 template <class ConfigType>
347 {
348  //STXXL_VERBOSE1("priority_queue::pop()");
349  assert(!insert_heap.empty());
350 
351  if (/*(!insert_heap.empty()) && */ cmp(*delete_buffer_current_min, insert_heap.top()))
352  insert_heap.pop();
353  else
354  {
355  assert(delete_buffer_current_min < delete_buffer_end);
356  ++delete_buffer_current_min;
357  if (delete_buffer_current_min == delete_buffer_end)
358  refill_delete_buffer();
359  }
360 }
361 
362 template <class ConfigType>
363 inline void priority_queue<ConfigType>::push(const value_type& obj)
364 {
365  //STXXL_VERBOSE3("priority_queue::push("<< obj <<")");
366  assert(int_mergers->not_sentinel(obj));
367  if (insert_heap.size() == N + 1)
368  empty_insert_heap();
369 
370 
371  assert(!insert_heap.empty());
372 
373  insert_heap.push(obj);
374 }
375 
376 
377 ////////////////////////////////////////////////////////////////
378 
379 template <class ConfigType>
381  pool(&pool_),
382  pool_owned(false),
383  delete_buffer_end(delete_buffer + delete_buffer_size),
384  insert_heap(N + 2),
385  num_active_groups(0), size_(0)
386 {
387  STXXL_VERBOSE_PQ("priority_queue(pool)");
388  init();
389 }
390 
391 // DEPRECATED
392 template <class ConfigType>
394  pool(new pool_type(p_pool_, w_pool_)),
395  pool_owned(true),
396  delete_buffer_end(delete_buffer + delete_buffer_size),
397  insert_heap(N + 2),
398  num_active_groups(0), size_(0)
399 {
400  STXXL_VERBOSE_PQ("priority_queue(p_pool, w_pool)");
401  init();
402 }
403 
404 template <class ConfigType>
406  pool(new pool_type(p_pool_mem / BlockSize, w_pool_mem / BlockSize)),
407  pool_owned(true),
408  delete_buffer_end(delete_buffer + delete_buffer_size),
409  insert_heap(N + 2),
410  num_active_groups(0), size_(0)
411 {
412  STXXL_VERBOSE_PQ("priority_queue(pool sizes)");
413  init();
414 }
415 
416 template <class ConfigType>
418 {
419  assert(!cmp(cmp.min_value(), cmp.min_value())); // verify strict weak ordering
420 
421  ext_mergers = new ext_merger_type*[num_ext_groups];
422  for (unsigned_type j = 0; j < num_ext_groups; ++j) {
423  ext_mergers[j] = new ext_merger_type;
424  ext_mergers[j]->set_pool(pool);
425  }
426 
427  value_type sentinel = cmp.min_value();
428  insert_heap.push(sentinel); // always keep the sentinel
429  delete_buffer[delete_buffer_size] = sentinel; // sentinel
430  delete_buffer_current_min = delete_buffer_end; // empty
431  for (unsigned_type i = 0; i < total_num_groups; i++)
432  {
433  group_buffers[i][N] = sentinel; // sentinel
434  group_buffer_current_mins[i] = &(group_buffers[i][N]); // empty
435  }
436 }
437 
438 template <class ConfigType>
440 {
441  STXXL_VERBOSE_PQ("~priority_queue()");
442  if (pool_owned)
443  delete pool;
444 
445  for (unsigned_type j = 0; j < num_ext_groups; ++j)
446  delete ext_mergers[j];
447  delete[] ext_mergers;
448 }
449 
450 //--------------------- Buffer refilling -------------------------------
451 
452 // refill group_buffers[j] and return number of elements found
453 template <class ConfigType>
455 {
456  STXXL_VERBOSE_PQ("refill_group_buffer(" << group << ")");
457 
458  value_type* target;
459  unsigned_type length;
460  size_type group_size = (group < num_int_groups) ?
461  int_mergers[group].size() :
462  ext_mergers[group - num_int_groups]->size(); // elements left in segments
463  unsigned_type left_elements = group_buffers[group] + N - group_buffer_current_mins[group]; //elements left in target buffer
464  if (group_size + left_elements >= size_type(N))
465  { // buffer will be filled completely
466  target = group_buffers[group];
467  length = N - left_elements;
468  }
469  else
470  {
471  target = group_buffers[group] + N - group_size - left_elements;
472  length = group_size;
473  }
474 
475  if (length > 0)
476  {
477  // shift remaininig elements to front
478  memmove(target, group_buffer_current_mins[group], left_elements * sizeof(value_type));
479  group_buffer_current_mins[group] = target;
480 
481  // fill remaining space from group
482  if (group < num_int_groups)
483  int_mergers[group].multi_merge(target + left_elements, length);
484  else
485  ext_mergers[group - num_int_groups]->multi_merge(
486  target + left_elements,
487  target + left_elements + length);
488  }
489 
490  //STXXL_MSG(length + left_elements);
491  //std::copy(target,target + length + left_elements,std::ostream_iterator<value_type>(std::cout, "\n"));
492 #if STXXL_CHECK_ORDER_IN_SORTS
494  if (!stxxl::is_sorted(group_buffer_current_mins[group], group_buffers[group] + N, inv_cmp))
495  {
496  STXXL_VERBOSE_PQ("refill_grp... length: " << length << " left_elements: " << left_elements);
497  for (value_type* v = group_buffer_current_mins[group] + 1; v < group_buffer_current_mins[group] + left_elements; ++v)
498  {
499  if (inv_cmp(*v, *(v - 1)))
500  {
501  STXXL_MSG("Error in buffer " << group << " at position " << (v - group_buffer_current_mins[group] - 1) << "/" << (v - group_buffer_current_mins[group]) << " " << *(v - 2) << " " << *(v - 1) << " " << *v << " " << *(v + 1));
502  }
503  }
504  assert(false);
505  }
506 #endif
507 
508  return length + left_elements;
509 }
510 
511 template <class ConfigType>
513 {
514  STXXL_VERBOSE_PQ("refill_delete_buffer()");
515 
516  size_type total_group_size = 0;
517  //num_active_groups is <= 4
518  for (int i = (int)num_active_groups - 1; i >= 0; i--)
519  {
520  if ((group_buffers[i] + N) - group_buffer_current_mins[i] < delete_buffer_size)
521  {
522  unsigned_type length = refill_group_buffer(i);
523  // max active level dry now?
524  if (length == 0 && unsigned(i) == num_active_groups - 1)
525  --num_active_groups;
526 
527  total_group_size += length;
528  }
529  else
530  total_group_size += delete_buffer_size; // actually only a sufficient lower bound
531  }
532 
533  unsigned_type length;
534  if (total_group_size >= delete_buffer_size) // buffer can be filled completely
535  {
536  length = delete_buffer_size; // amount to be copied
537  size_ -= size_type(delete_buffer_size); // amount left in group_buffers
538  }
539  else
540  {
541  length = total_group_size;
542  assert(size_ == size_type(length)); // trees and group_buffers get empty
543  size_ = 0;
544  }
545 
547 
548  // now call simplified refill routines
549  // which can make the assumption that
550  // they find all they are asked in the buffers
551  delete_buffer_current_min = delete_buffer_end - length;
552  STXXL_VERBOSE_PQ("refill_del... Active groups = " << num_active_groups);
553  switch (num_active_groups)
554  {
555  case 0:
556  break;
557  case 1:
558  std::copy(group_buffer_current_mins[0], group_buffer_current_mins[0] + length, delete_buffer_current_min);
559  group_buffer_current_mins[0] += length;
560  break;
561  case 2:
562 #if STXXL_PARALLEL && STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER
563  {
564  std::pair<value_type*, value_type*> seqs[2] =
565  {
566  std::make_pair(group_buffer_current_mins[0], group_buffers[0] + N),
567  std::make_pair(group_buffer_current_mins[1], group_buffers[1] + N)
568  };
569  parallel::multiway_merge_sentinel(seqs, seqs + 2, delete_buffer_current_min, inv_cmp, length); //sequence iterators are progressed appropriately
570 
571  group_buffer_current_mins[0] = seqs[0].first;
572  group_buffer_current_mins[1] = seqs[1].first;
573  }
574 #else
576  group_buffer_current_mins[0],
577  group_buffer_current_mins[1], delete_buffer_current_min, length, cmp);
578 #endif
579  break;
580  case 3:
581 #if STXXL_PARALLEL && STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER
582  {
583  std::pair<value_type*, value_type*> seqs[3] =
584  {
585  std::make_pair(group_buffer_current_mins[0], group_buffers[0] + N),
586  std::make_pair(group_buffer_current_mins[1], group_buffers[1] + N),
587  std::make_pair(group_buffer_current_mins[2], group_buffers[2] + N)
588  };
589  parallel::multiway_merge_sentinel(seqs, seqs + 3, delete_buffer_current_min, inv_cmp, length); //sequence iterators are progressed appropriately
590 
591  group_buffer_current_mins[0] = seqs[0].first;
592  group_buffer_current_mins[1] = seqs[1].first;
593  group_buffer_current_mins[2] = seqs[2].first;
594  }
595 #else
597  group_buffer_current_mins[0],
598  group_buffer_current_mins[1],
599  group_buffer_current_mins[2], delete_buffer_current_min, length, cmp);
600 #endif
601  break;
602  case 4:
603 #if STXXL_PARALLEL && STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER
604  {
605  std::pair<value_type*, value_type*> seqs[4] =
606  {
607  std::make_pair(group_buffer_current_mins[0], group_buffers[0] + N),
608  std::make_pair(group_buffer_current_mins[1], group_buffers[1] + N),
609  std::make_pair(group_buffer_current_mins[2], group_buffers[2] + N),
610  std::make_pair(group_buffer_current_mins[3], group_buffers[3] + N)
611  };
612  parallel::multiway_merge_sentinel(seqs, seqs + 4, delete_buffer_current_min, inv_cmp, length); //sequence iterators are progressed appropriately
613 
614  group_buffer_current_mins[0] = seqs[0].first;
615  group_buffer_current_mins[1] = seqs[1].first;
616  group_buffer_current_mins[2] = seqs[2].first;
617  group_buffer_current_mins[3] = seqs[3].first;
618  }
619 #else
621  group_buffer_current_mins[0],
622  group_buffer_current_mins[1],
623  group_buffer_current_mins[2],
624  group_buffer_current_mins[3], delete_buffer_current_min, length, cmp); //side effect free
625 #endif
626  break;
627  default:
628  STXXL_THROW2(std::runtime_error, "priority_queue<...>::refill_delete_buffer()",
629  "Overflow! The number of buffers on 2nd level in stxxl::priority_queue is currently limited to 4");
630  }
631 
632 #if STXXL_CHECK_ORDER_IN_SORTS
633  if (!stxxl::is_sorted(delete_buffer_current_min, delete_buffer_end, inv_cmp))
634  {
635  for (value_type* v = delete_buffer_current_min + 1; v < delete_buffer_end; ++v)
636  {
637  if (inv_cmp(*v, *(v - 1)))
638  {
639  STXXL_MSG("Error at position " << (v - delete_buffer_current_min - 1) << "/" << (v - delete_buffer_current_min) << " " << *(v - 1) << " " << *v);
640  }
641  }
642  assert(false);
643  }
644 #endif
645  //std::copy(delete_buffer_current_min,delete_buffer_current_min + length,std::ostream_iterator<value_type>(std::cout, "\n"));
646 }
647 
648 //--------------------------------------------------------------------
649 
650 // check if space is available on level k and
651 // empty this level if necessary leading to a recursive call.
652 // return the level where space was finally available
653 template <class ConfigType>
655 {
656  STXXL_VERBOSE_PQ("make_space_available(" << level << ")");
657  unsigned_type finalLevel;
658  assert(level < total_num_groups);
659  assert(level <= num_active_groups);
660 
661  if (level == num_active_groups)
662  ++num_active_groups;
663 
664  const bool spaceIsAvailable_ =
665  (level < num_int_groups) ? int_mergers[level].is_space_available()
666  : (ext_mergers[level - num_int_groups]->is_space_available());
667 
668  if (spaceIsAvailable_)
669  {
670  finalLevel = level;
671  }
672  else if (level == total_num_groups - 1)
673  {
674  size_type capacity = N;
675  for (int i = 0; i < num_int_groups; ++i)
676  capacity *= IntKMAX;
677  for (int i = 0; i < num_ext_groups; ++i)
678  capacity *= ExtKMAX;
679  STXXL_ERRMSG("priority_queue OVERFLOW - all groups full, size=" << size() <<
680  ", capacity(last externel group (" << num_int_groups + num_ext_groups - 1 << "))=" << capacity);
681  dump_sizes();
682 
683  unsigned_type extLevel = level - num_int_groups;
684  const size_type segmentSize = ext_mergers[extLevel]->size();
685  STXXL_VERBOSE1("Inserting segment into last level external: " << level << " " << segmentSize);
686  ext_merger_type* overflow_merger = new ext_merger_type;
687  overflow_merger->set_pool(pool);
688  overflow_merger->insert_segment(*ext_mergers[extLevel], segmentSize);
689  std::swap(ext_mergers[extLevel], overflow_merger);
690  delete overflow_merger;
691  finalLevel = level;
692  }
693  else
694  {
695  finalLevel = make_space_available(level + 1);
696 
697  if (level < num_int_groups - 1) // from internal to internal tree
698  {
699  unsigned_type segmentSize = int_mergers[level].size();
700  value_type* newSegment = new value_type[segmentSize + 1];
701  int_mergers[level].multi_merge(newSegment, segmentSize); // empty this level
702 
703  newSegment[segmentSize] = delete_buffer[delete_buffer_size]; // sentinel
704  // for queues where size << #inserts
705  // it might make sense to stay in this level if
706  // segmentSize < alpha * KNN * k^level for some alpha < 1
707  int_mergers[level + 1].insert_segment(newSegment, segmentSize);
708  }
709  else
710  {
711  if (level == num_int_groups - 1) // from internal to external tree
712  {
713  const unsigned_type segmentSize = int_mergers[num_int_groups - 1].size();
714  STXXL_VERBOSE_PQ("make_space... Inserting segment into first level external: " << level << " " << segmentSize);
715  ext_mergers[0]->insert_segment(int_mergers[num_int_groups - 1], segmentSize);
716  }
717  else // from external to external tree
718  {
719  const size_type segmentSize = ext_mergers[level - num_int_groups]->size();
720  STXXL_VERBOSE_PQ("make_space... Inserting segment into second level external: " << level << " " << segmentSize);
721  ext_mergers[level - num_int_groups + 1]->insert_segment(*ext_mergers[level - num_int_groups], segmentSize);
722  }
723  }
724  }
725  return finalLevel;
726 }
727 
728 
729 // empty the insert heap into the main data structure
730 template <class ConfigType>
732 {
733  STXXL_VERBOSE_PQ("empty_insert_heap()");
734  assert(insert_heap.size() == (N + 1));
735 
736  const value_type sup = get_supremum();
737 
738  // build new segment
739  value_type* newSegment = new value_type[N + 1];
740  value_type* newPos = newSegment;
741 
742  // put the new data there for now
743  //insert_heap.sortTo(newSegment);
744  value_type* SortTo = newSegment;
745 
746  insert_heap.sort_to(SortTo);
747 
748  SortTo = newSegment + N;
749  insert_heap.clear();
750  insert_heap.push(*SortTo);
751 
752  assert(insert_heap.size() == 1);
753 
754  newSegment[N] = sup; // sentinel
755 
756  // copy the delete_buffer and group_buffers[0] to temporary storage
757  // (the temporary can be eliminated using some dirty tricks)
758  const unsigned_type tempSize = N + delete_buffer_size;
759  value_type temp[tempSize + 1];
760  unsigned_type sz1 = current_delete_buffer_size();
761  unsigned_type sz2 = current_group_buffer_size(0);
762  value_type* pos = temp + tempSize - sz1 - sz2;
763  std::copy(delete_buffer_current_min, delete_buffer_current_min + sz1, pos);
764  std::copy(group_buffer_current_mins[0], group_buffer_current_mins[0] + sz2, pos + sz1);
765  temp[tempSize] = sup; // sentinel
766 
767  // refill delete_buffer
768  // (using more complicated code it could be made somewhat fuller
769  // in certain circumstances)
770  priority_queue_local::merge_iterator(pos, newPos, delete_buffer_current_min, sz1, cmp);
771 
772  // refill group_buffers[0]
773  // (as above we might want to take the opportunity
774  // to make group_buffers[0] fuller)
775  priority_queue_local::merge_iterator(pos, newPos, group_buffer_current_mins[0], sz2, cmp);
776 
777  // merge the rest to the new segment
778  // note that merge exactly trips into the footsteps
779  // of itself
780  priority_queue_local::merge_iterator(pos, newPos, newSegment, N, cmp);
781 
782  // and insert it
783  unsigned_type freeLevel = make_space_available(0);
784  assert(freeLevel == 0 || int_mergers[0].size() == 0);
785  int_mergers[0].insert_segment(newSegment, N);
786 
787  // get rid of invalid level 2 buffers
788  // by inserting them into tree 0 (which is almost empty in this case)
789  if (freeLevel > 0)
790  {
791  for (int_type i = freeLevel; i >= 0; i--)
792  {
793  // reverse order not needed
794  // but would allow immediate refill
795 
796  newSegment = new value_type[current_group_buffer_size(i) + 1]; // with sentinel
797  std::copy(group_buffer_current_mins[i], group_buffer_current_mins[i] + current_group_buffer_size(i) + 1, newSegment);
798  int_mergers[0].insert_segment(newSegment, current_group_buffer_size(i));
799  group_buffer_current_mins[i] = group_buffers[i] + N; // empty
800  }
801  }
802 
803  // update size
804  size_ += size_type(N);
805 
806  // special case if the tree was empty before
807  if (delete_buffer_current_min == delete_buffer_end)
808  refill_delete_buffer();
809 }
810 
811 template <class ConfigType>
813 {
814  unsigned_type capacity = N;
815  STXXL_MSG("pq::size()\t= " << size());
816  STXXL_MSG(" insert_heap\t= " << insert_heap.size() - 1 << "/" << capacity);
817  STXXL_MSG(" delete_buffer\t= " << (delete_buffer_end - delete_buffer_current_min) << "/" << delete_buffer_size);
818  for (int i = 0; i < num_int_groups; ++i) {
819  capacity *= IntKMAX;
820  STXXL_MSG(" grp " << i << " int" <<
821  " grpbuf=" << current_group_buffer_size(i) <<
822  " size=" << int_mergers[i].size() << "/" << capacity <<
823  " (" << (int)(int_mergers[i].size() * 100.0 / capacity) << "%)" <<
824  " space=" << int_mergers[i].is_space_available());
825  }
826  for (int i = 0; i < num_ext_groups; ++i) {
827  capacity *= ExtKMAX;
828  STXXL_MSG(" grp " << i + num_int_groups << " ext" <<
829  " grpbuf=" << current_group_buffer_size(i + num_int_groups) <<
830  " size=" << ext_mergers[i]->size() << "/" << capacity <<
831  " (" << (int)(ext_mergers[i]->size() * 100.0 / capacity) << "%)" <<
832  " space=" << ext_mergers[i]->is_space_available());
833  }
834  dump_params();
835 }
836 
837 template <class ConfigType>
839 {
840  STXXL_MSG("params: delete_buffer_size=" << delete_buffer_size << " N=" << N << " IntKMAX=" << IntKMAX << " num_int_groups=" << num_int_groups << " ExtKMAX=" << ExtKMAX << " num_ext_groups=" << num_ext_groups << " BlockSize=" << BlockSize);
841 }
842 
843 namespace priority_queue_local {
844 
846 {
847  enum { fits = false };
849 };
850 
851 struct dummy
852 {
853  enum { fits = false };
854  typedef dummy result;
855 };
856 
857 template <unsigned_type E_, internal_size_type IntMem_, external_size_type MaxItems, unsigned_type B_, unsigned_type m_, bool stop = false>
858 struct find_B_m
859 {
861 
862  static const unsigned_type k = IntMem_ / B_; // number of blocks that fit into M
863  static const unsigned_type element_size = E_; // element size
864  static const internal_size_type IntMem = IntMem_;
865  static const unsigned_type B = B_; // block size
866  static const external_size_type m = m_; // number of blocks fitting into buffers
867  static const unsigned_type c = k - m_;
868  // memory occupied by block must be at least 10 times larger than size of ext sequence
869  // && satisfy memory req && if we have two ext mergers their degree must be at least 64=m/2
870  static const external_size_type fits = (c > 10) &&
871  (((k - m) * (m) * (m * B / (element_size * 4 * 1024))) >= MaxItems) &&
872  ((MaxItems < ((k - m) * m / (2 * element_size)) * 1024) || m >= 128);
873  static const unsigned_type step = 1;
874 
876  typedef typename find_B_m<element_size, IntMem, MaxItems, B / 2, 1, fits || candidate1::fits>::result candidate2;
878 };
879 
880 // specialization for the case when no valid parameters are found
881 template <unsigned_type E_, unsigned_type IntMem, unsigned_type MaxItems, bool stop>
882 struct find_B_m<E_, IntMem, MaxItems, 2048, 1, stop>
883 {
884  enum { fits = false };
886 };
887 
888 // to speedup search
889 template <unsigned_type E_, unsigned_type IntMem, unsigned_type MaxItems, unsigned_type B_, unsigned_type m_>
890 struct find_B_m<E_, IntMem, MaxItems, B_, m_, true>
891 {
892  enum { fits = false };
893  typedef dummy result;
894 };
895 
896 // E_ size of element in bytes
897 template <unsigned_type E_, unsigned_type IntMem, unsigned_type MaxItems>
899 {
900  // start from block size (8*1024*1024) bytes
902 };
903 
905 {
907 };
908 
909 
910 template <unsigned_type AI_, unsigned_type X_, unsigned_type CriticalSize>
911 struct compute_N
912 {
914 
915  static const unsigned_type X = X_;
916  static const unsigned_type AI = AI_;
917  static const unsigned_type N = X / (AI * AI); // two stage internal
918 
919  typedef typename IF<(N >= CriticalSize), Self, typename compute_N<AI / 2, X, CriticalSize>::result>::result result;
920 };
921 
922 template <unsigned_type X_, unsigned_type CriticalSize_>
923 struct compute_N<1, X_, CriticalSize_>
924 {
926 };
927 
928 } // namespace priority_queue_local
929 
930 //! \}
931 
932 //! \addtogroup stlcont
933 //! \{
934 
935 //! \brief Priority queue type generator. \n
936 //! <b> Introduction </b> to priority queue container: see \ref tutorial_pqueue tutorial. \n
937 //! <b> Design and Internals </b> of priority queue container: see \ref design_pqueue.
938 //!
939 //! \tparam ValueType type of the contained objects (POD with no references to internal memory)
940 //!
941 //! \tparam CompareType the comparator type used to determine whether one element is
942 //! smaller than another element.
943 //!
944 //! \tparam IntMemory upper limit for internal memory consumption in bytes.
945 //!
946 //! \tparam MaxItems upper limit for number of elements contained in the priority queue (in 1024 units). <BR>
947 //! Example: if you are sure that priority queue contains no more than
948 //! one million elements in a time, then the right parameter is (1000000 / 1024) = 976.
949 //!
950 //! \tparam Tune tuning parameter for meta-program search. <BR>
951 //! Try to play with it if the code does not compile (larger than default
952 //! values might help). Code does not compile if no suitable internal
953 //! parameters were found for given IntMemory and MaxItems. It might also
954 //! happen that given IntMemory is too small for given MaxItems, try larger
955 //! values.
956 template <class ValueType,
957  class CompareType,
958  internal_size_type IntMemory,
959  external_size_type MaxItems,
960  unsigned Tune = 6>
962 {
963 public:
964  // actual calculation of B, m, k and element_size
966  enum {
967  B = settings::B,
968  m = settings::m,
969  X = B * (settings::k - m) / settings::element_size, // interpretation of result
970  Buffer1Size = 32 // fixed
971  };
972  // derivation of N, AI, AE
974  enum
975  {
978  AE = (m / 2 < 2) ? 2 : (m / 2) // at least 2
979  };
980 
981  // Estimation of maximum internal memory consumption (in bytes)
982  static const unsigned_type EConsumption = X * settings::element_size + settings::B * AE + ((MaxItems / X) / AE) * settings::B * 1024;
983 
984  /*
985  unsigned BufferSize1_ = 32, // equalize procedure call overheads etc.
986  unsigned N_ = 512, // bandwidth
987  unsigned IntKMAX_ = 64, // maximal arity for internal mergers
988  unsigned IntLevels_ = 4,
989  unsigned BlockSize = (2*1024*1024),
990  unsigned ExtKMAX_ = 64, // maximal arity for external mergers
991  unsigned ExtLevels_ = 2,
992  */
994 };
995 
996 //! \}
997 
999 
1000 
1001 namespace std {
1002 
1003 template <class ConfigType>
1006 {
1007  a.swap(b);
1008 }
1009 
1010 } // namespace std
1011 
1012 #endif // !STXXL_CONTAINERS_PRIORITY_QUEUE_HEADER
1013 // vim: et:ts=4:sw=4
static const unsigned_type EConsumption
bool empty() const
Returns true if queue has no elements.
value_type * group_buffer_current_mins[total_num_groups]
void push(const value_type &obj)
Inserts x into the priority_queue.
unsigned_type make_space_available(unsigned_type level)
static const unsigned_type element_size
int_merger_type int_mergers[num_int_groups]
bool is_sorted(_ForwardIter __first, _ForwardIter __last)
Definition: is_sorted.h:54
External merger, based on the loser tree data structure.
priority_queue_local::find_settings< sizeof(ValueType), IntMemory, MaxItems >::result settings
priority_queue_local::ext_merger< block_type, comparator_type, ExtKMAX, alloc_strategy_type > ext_merger_type
void swap_1D_arrays(T *a, T *b, unsigned_type size)
Definition: utils.h:247
unsigned long long int uint64
Definition: types.h:41
static const internal_size_type IntMem
Inverts the order of a comparison functor by swapping its arguments.
Definition: pq_helpers.h:189
typed_block< BlockSize, value_type > block_type
Type of the block used in disk-memory transfers.
Priority queue type generator. Introduction to priority queue container: see STXXL Priority Queue ...
priority_queue_local::internal_priority_queue< value_type, std::vector< value_type >, comparator_type > insert_heap_type
value_type group_buffers[total_num_groups][N+1]
#define STXXL_DEFAULT_ALLOC_STRATEGY
Definition: block_alloc.h:259
stxxl::uint64 size_type
An unsigned integral type (64 bit).
void merge4_iterator(InputIterator &source0, InputIterator &source1, InputIterator &source2, InputIterator &source3, OutputIterator target, unsigned_type length, CompareType &cmp)
Definition: pq_mergers.h:141
find_B_m< E_, IntMem, MaxItems,(8 *1024 *1024), 1 >::result result
static const unsigned_type step
void insert_segment(Merger &another_merger, size_type segment_size)
void merge_iterator(InputIterator &source0, InputIterator &source1, OutputIterator target, unsigned_type length, CompareType &cmp)
Definition: pq_mergers.h:41
#define STXXL_DEPRECATED(x)
Definition: deprecated.h:30
Loser tree from Knuth, &quot;Sorting and Searching&quot;, Section 5.4.1.
Definition: pq_losertree.h:38
unsigned_type mem_cons() const
Number of bytes consumed by the priority_queue from the internal memory not including pools (see the ...
static const unsigned_type B
Implements dynamically resizable buffered writing pool.
Definition: write_pool.h:34
static const external_size_type m
Config::alloc_strategy_type alloc_strategy_type
Implements dynamically resizable prefetching pool.
Definition: prefetch_pool.h:30
Block containing elements of fixed length.
Definition: typed_block.h:238
value_type * delete_buffer_current_min
find_B_m< element_size, IntMem, MaxItems, B/2, 1, fits||candidate1::fits >::result candidate2
Parameters_for_priority_queue_not_found_Increase_IntMem result
Implements dynamically resizable buffered writing and prefetched reading pool.
choose_int_types< my_pointer_size >::int_type int_type
Definition: types.h:66
find_B_m< element_size, IntMem, MaxItems, B, m+step, fits||(m >=k-step)>::result candidate1
comparator_type cmp
size_type size() const
Returns number of elements contained.
unsigned_type internal_size_type
Definition: types.h:69
value_type delete_buffer[delete_buffer_size+1]
Container::value_type pop(Container &c)
Definition: utils.h:270
unsigned_type current_delete_buffer_size() const
void dump_params() const
void merge3_iterator(InputIterator &source0, InputIterator &source1, InputIterator &source2, OutputIterator target, unsigned_type length, CompareType &cmp)
Definition: pq_mergers.h:73
Config::comparator_type comparator_type
Comparison object.
#define STXXL_VERBOSE_PQ(msg)
Definition: pq_helpers.h:71
#define STXXL_BEGIN_NAMESPACE
Definition: namespace.h:16
Parameters_not_found_Try_to_change_the_Tune_parameter result
const value_type & top() const
Returns &quot;largest&quot; element.
External priority queue data structure Introduction to priority queue container: see STXXL Priorit...
#define STXXL_VERBOSE1(x)
Definition: verbose.h:99
static const unsigned_type k
unsigned_type current_group_buffer_size(unsigned_type i) const
#define STXXL_THROW2(exception_type, location, error_message)
Throws exception_type with &quot;Error in [location] : [error_message]&quot;.
void swap(counting_ptr< A > &a1, counting_ptr< A > &a2)
swap enclosed object with another counting pointer (no reference counts need change) ...
Definition: counting_ptr.h:163
Similar to std::priority_queue, with the following differences:
Definition: pq_helpers.h:92
insert_heap_type insert_heap
unsigned_type refill_group_buffer(unsigned_type k)
#define STXXL_STATIC_ASSERT(x)
Definition: utils.h:49
#define STXXL_ERRMSG(x)
Definition: verbose.h:79
void dump_sizes() const
find_B_m< E_, IntMem_, MaxItems, B_, m_, stop > Self
static const external_size_type fits
uint64 external_size_type
Definition: types.h:70
choose_int_types< my_pointer_size >::unsigned_type unsigned_type
Definition: types.h:67
priority_queue< priority_queue_config< ValueType, CompareType, Buffer1Size, N, AI, 2, B, AE, 2 > > result
#define STXXL_MSG(x)
Definition: verbose.h:72
priority_queue_local::loser_tree< value_type, comparator_type, IntKMAX > int_merger_type
IF<(N >=CriticalSize), Self, typename compute_N< AI/2, X, CriticalSize >::result >::result result
value_type get_supremum() const
unsigned_type num_active_groups
IF template metaprogramming statement.
Definition: tmeta.h:31
read_write_pool< block_type > pool_type
Config::value_type value_type
The type of object stored in the priority_queue.
static const unsigned_type c
ext_merger_type ** ext_mergers
IF< fits, Self, typename IF< candidate1::fits, candidate1, candidate2 >::result >::result result
value_type * delete_buffer_end
compute_N< AI_, X_, CriticalSize > Self
#define STXXL_END_NAMESPACE
Definition: namespace.h:17
priority_queue_local::compute_N<(1<< Tune), X, 4 *Buffer1Size >::result ComputeN
priority_queue(pool_type &pool_)
Constructs external priority queue object.