19 #ifndef STXXL_CONTAINERS_PRIORITY_QUEUE_HEADER
20 #define STXXL_CONTAINERS_PRIORITY_QUEUE_HEADER
42 unsigned BufferSize1_ = 32,
44 unsigned IntKMAX_ = 64,
45 unsigned IntLevels_ = 4,
46 unsigned BlockSize_ = (2* 1024* 1024),
47 unsigned ExtKMAX_ = 64,
48 unsigned ExtLevels_ = 2,
58 delete_buffer_size = BufferSize1_,
61 num_int_groups = IntLevels_,
62 num_ext_groups = ExtLevels_,
63 BlockSize = BlockSize_,
65 element_size =
sizeof(ValueType)
73 template <
class BlockType,
82 template <
class ValueType,
class CompareType,
unsigned KNKMAX>
96 template <
class ConfigType>
103 delete_buffer_size = Config::delete_buffer_size,
105 IntKMAX = Config::IntKMAX,
106 num_int_groups = Config::num_int_groups,
107 num_ext_groups = Config::num_ext_groups,
108 total_num_groups = Config::num_int_groups + Config::num_ext_groups,
109 BlockSize = Config::BlockSize,
110 ExtKMAX = Config::ExtKMAX
146 value_type group_buffers[total_num_groups][N + 1];
147 value_type* group_buffer_current_mins[total_num_groups];
150 value_type delete_buffer[delete_buffer_size + 1];
168 void refill_delete_buffer();
172 void empty_insert_heap();
241 std::swap(cmp, obj.
cmp);
244 std::swap(size_, obj.
size_);
253 size_type size()
const;
257 bool empty()
const {
return (size() == 0); }
273 const value_type & top()
const;
291 void push(
const value_type& obj);
305 for (
int i = 0; i < num_int_groups; ++i)
306 dynam_alloc_mem += int_mergers[i].mem_cons();
308 for (
int i = 0; i < num_ext_groups; ++i)
309 dynam_alloc_mem += ext_mergers[i]->mem_cons();
312 return (
sizeof(*
this) +
317 void dump_sizes()
const;
318 void dump_params()
const;
324 template <
class ConfigType>
328 insert_heap.
size() - 1 +
329 (delete_buffer_end - delete_buffer_current_min);
333 template <
class ConfigType>
336 assert(!insert_heap.empty());
339 if ( cmp(*delete_buffer_current_min, t))
342 return *delete_buffer_current_min;
345 template <
class ConfigType>
349 assert(!insert_heap.empty());
351 if ( cmp(*delete_buffer_current_min, insert_heap.top()))
355 assert(delete_buffer_current_min < delete_buffer_end);
356 ++delete_buffer_current_min;
357 if (delete_buffer_current_min == delete_buffer_end)
358 refill_delete_buffer();
362 template <
class ConfigType>
366 assert(int_mergers->not_sentinel(obj));
367 if (insert_heap.size() == N + 1)
371 assert(!insert_heap.empty());
373 insert_heap.
push(obj);
379 template <
class ConfigType>
383 delete_buffer_end(delete_buffer + delete_buffer_size),
385 num_active_groups(0), size_(0)
392 template <
class ConfigType>
396 delete_buffer_end(delete_buffer + delete_buffer_size),
398 num_active_groups(0), size_(0)
404 template <
class ConfigType>
406 pool(new
pool_type(p_pool_mem / BlockSize, w_pool_mem / BlockSize)),
408 delete_buffer_end(delete_buffer + delete_buffer_size),
410 num_active_groups(0), size_(0)
416 template <
class ConfigType>
419 assert(!cmp(cmp.min_value(), cmp.min_value()));
427 value_type sentinel = cmp.min_value();
428 insert_heap.push(sentinel);
429 delete_buffer[delete_buffer_size] = sentinel;
430 delete_buffer_current_min = delete_buffer_end;
433 group_buffers[i][N] = sentinel;
434 group_buffer_current_mins[i] = &(group_buffers[i][N]);
438 template <
class ConfigType>
446 delete ext_mergers[j];
447 delete[] ext_mergers;
453 template <
class ConfigType>
460 size_type group_size = (group < num_int_groups) ?
461 int_mergers[group].size() :
462 ext_mergers[group - num_int_groups]->size();
463 unsigned_type left_elements = group_buffers[group] + N - group_buffer_current_mins[group];
464 if (group_size + left_elements >=
size_type(N))
466 target = group_buffers[group];
467 length = N - left_elements;
471 target = group_buffers[group] + N - group_size - left_elements;
478 memmove(target, group_buffer_current_mins[group], left_elements *
sizeof(value_type));
479 group_buffer_current_mins[group] = target;
482 if (group < num_int_groups)
483 int_mergers[group].multi_merge(target + left_elements, length);
485 ext_mergers[group - num_int_groups]->multi_merge(
486 target + left_elements,
487 target + left_elements + length);
492 #if STXXL_CHECK_ORDER_IN_SORTS
494 if (!
stxxl::is_sorted(group_buffer_current_mins[group], group_buffers[group] + N, inv_cmp))
496 STXXL_VERBOSE_PQ(
"refill_grp... length: " << length <<
" left_elements: " << left_elements);
497 for (value_type* v = group_buffer_current_mins[group] + 1; v < group_buffer_current_mins[group] + left_elements; ++v)
499 if (inv_cmp(*v, *(v - 1)))
501 STXXL_MSG(
"Error in buffer " << group <<
" at position " << (v - group_buffer_current_mins[group] - 1) <<
"/" << (v - group_buffer_current_mins[group]) <<
" " << *(v - 2) <<
" " << *(v - 1) <<
" " << *v <<
" " << *(v + 1));
508 return length + left_elements;
511 template <
class ConfigType>
518 for (
int i = (
int)num_active_groups - 1; i >= 0; i--)
520 if ((group_buffers[i] + N) - group_buffer_current_mins[i] < delete_buffer_size)
524 if (length == 0 &&
unsigned(i) == num_active_groups - 1)
527 total_group_size += length;
530 total_group_size += delete_buffer_size;
534 if (total_group_size >= delete_buffer_size)
536 length = delete_buffer_size;
541 length = total_group_size;
551 delete_buffer_current_min = delete_buffer_end - length;
553 switch (num_active_groups)
558 std::copy(group_buffer_current_mins[0], group_buffer_current_mins[0] + length, delete_buffer_current_min);
559 group_buffer_current_mins[0] += length;
562 #if STXXL_PARALLEL && STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER
564 std::pair<value_type*, value_type*> seqs[2] =
566 std::make_pair(group_buffer_current_mins[0], group_buffers[0] + N),
567 std::make_pair(group_buffer_current_mins[1], group_buffers[1] + N)
569 parallel::multiway_merge_sentinel(seqs, seqs + 2, delete_buffer_current_min, inv_cmp, length);
571 group_buffer_current_mins[0] = seqs[0].first;
572 group_buffer_current_mins[1] = seqs[1].first;
576 group_buffer_current_mins[0],
577 group_buffer_current_mins[1], delete_buffer_current_min, length, cmp);
581 #if STXXL_PARALLEL && STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER
583 std::pair<value_type*, value_type*> seqs[3] =
585 std::make_pair(group_buffer_current_mins[0], group_buffers[0] + N),
586 std::make_pair(group_buffer_current_mins[1], group_buffers[1] + N),
587 std::make_pair(group_buffer_current_mins[2], group_buffers[2] + N)
589 parallel::multiway_merge_sentinel(seqs, seqs + 3, delete_buffer_current_min, inv_cmp, length);
591 group_buffer_current_mins[0] = seqs[0].first;
592 group_buffer_current_mins[1] = seqs[1].first;
593 group_buffer_current_mins[2] = seqs[2].first;
597 group_buffer_current_mins[0],
598 group_buffer_current_mins[1],
599 group_buffer_current_mins[2], delete_buffer_current_min, length, cmp);
603 #if STXXL_PARALLEL && STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER
605 std::pair<value_type*, value_type*> seqs[4] =
607 std::make_pair(group_buffer_current_mins[0], group_buffers[0] + N),
608 std::make_pair(group_buffer_current_mins[1], group_buffers[1] + N),
609 std::make_pair(group_buffer_current_mins[2], group_buffers[2] + N),
610 std::make_pair(group_buffer_current_mins[3], group_buffers[3] + N)
612 parallel::multiway_merge_sentinel(seqs, seqs + 4, delete_buffer_current_min, inv_cmp, length);
614 group_buffer_current_mins[0] = seqs[0].first;
615 group_buffer_current_mins[1] = seqs[1].first;
616 group_buffer_current_mins[2] = seqs[2].first;
617 group_buffer_current_mins[3] = seqs[3].first;
621 group_buffer_current_mins[0],
622 group_buffer_current_mins[1],
623 group_buffer_current_mins[2],
624 group_buffer_current_mins[3], delete_buffer_current_min, length, cmp);
628 STXXL_THROW2(std::runtime_error,
"priority_queue<...>::refill_delete_buffer()",
629 "Overflow! The number of buffers on 2nd level in stxxl::priority_queue is currently limited to 4");
632 #if STXXL_CHECK_ORDER_IN_SORTS
633 if (!
stxxl::is_sorted(delete_buffer_current_min, delete_buffer_end, inv_cmp))
635 for (value_type* v = delete_buffer_current_min + 1; v < delete_buffer_end; ++v)
637 if (inv_cmp(*v, *(v - 1)))
639 STXXL_MSG(
"Error at position " << (v - delete_buffer_current_min - 1) <<
"/" << (v - delete_buffer_current_min) <<
" " << *(v - 1) <<
" " << *v);
653 template <
class ConfigType>
658 assert(level < total_num_groups);
659 assert(level <= num_active_groups);
661 if (level == num_active_groups)
664 const bool spaceIsAvailable_ =
665 (level < num_int_groups) ? int_mergers[level].is_space_available()
666 : (ext_mergers[level - num_int_groups]->is_space_available());
668 if (spaceIsAvailable_)
672 else if (level == total_num_groups - 1)
675 for (
int i = 0; i < num_int_groups; ++i)
677 for (
int i = 0; i < num_ext_groups; ++i)
679 STXXL_ERRMSG(
"priority_queue OVERFLOW - all groups full, size=" << size() <<
680 ", capacity(last externel group (" << num_int_groups + num_ext_groups - 1 <<
"))=" << capacity);
684 const size_type segmentSize = ext_mergers[extLevel]->size();
685 STXXL_VERBOSE1(
"Inserting segment into last level external: " << level <<
" " << segmentSize);
688 overflow_merger->
insert_segment(*ext_mergers[extLevel], segmentSize);
689 std::swap(ext_mergers[extLevel], overflow_merger);
690 delete overflow_merger;
695 finalLevel = make_space_available(level + 1);
697 if (level < num_int_groups - 1)
700 value_type* newSegment =
new value_type[segmentSize + 1];
701 int_mergers[level].multi_merge(newSegment, segmentSize);
703 newSegment[segmentSize] = delete_buffer[delete_buffer_size];
707 int_mergers[level + 1].insert_segment(newSegment, segmentSize);
711 if (level == num_int_groups - 1)
713 const unsigned_type segmentSize = int_mergers[num_int_groups - 1].size();
714 STXXL_VERBOSE_PQ(
"make_space... Inserting segment into first level external: " << level <<
" " << segmentSize);
715 ext_mergers[0]->insert_segment(int_mergers[num_int_groups - 1], segmentSize);
719 const size_type segmentSize = ext_mergers[level - num_int_groups]->size();
720 STXXL_VERBOSE_PQ(
"make_space... Inserting segment into second level external: " << level <<
" " << segmentSize);
721 ext_mergers[level - num_int_groups + 1]->insert_segment(*ext_mergers[level - num_int_groups], segmentSize);
730 template <
class ConfigType>
734 assert(insert_heap.size() == (N + 1));
736 const value_type sup = get_supremum();
739 value_type* newSegment =
new value_type[N + 1];
740 value_type* newPos = newSegment;
744 value_type* SortTo = newSegment;
746 insert_heap.sort_to(SortTo);
748 SortTo = newSegment + N;
750 insert_heap.push(*SortTo);
752 assert(insert_heap.size() == 1);
759 value_type temp[tempSize + 1];
762 value_type* pos = temp + tempSize - sz1 - sz2;
763 std::copy(delete_buffer_current_min, delete_buffer_current_min + sz1, pos);
764 std::copy(group_buffer_current_mins[0], group_buffer_current_mins[0] + sz2, pos + sz1);
765 temp[tempSize] = sup;
784 assert(freeLevel == 0 || int_mergers[0].size() == 0);
785 int_mergers[0].insert_segment(newSegment, N);
791 for (
int_type i = freeLevel; i >= 0; i--)
796 newSegment =
new value_type[current_group_buffer_size(i) + 1];
797 std::copy(group_buffer_current_mins[i], group_buffer_current_mins[i] + current_group_buffer_size(i) + 1, newSegment);
798 int_mergers[0].insert_segment(newSegment, current_group_buffer_size(i));
799 group_buffer_current_mins[i] = group_buffers[i] + N;
807 if (delete_buffer_current_min == delete_buffer_end)
808 refill_delete_buffer();
811 template <
class ConfigType>
816 STXXL_MSG(
" insert_heap\t= " << insert_heap.size() - 1 <<
"/" << capacity);
817 STXXL_MSG(
" delete_buffer\t= " << (delete_buffer_end - delete_buffer_current_min) <<
"/" << delete_buffer_size);
818 for (
int i = 0; i < num_int_groups; ++i) {
821 " grpbuf=" << current_group_buffer_size(i) <<
822 " size=" << int_mergers[i].size() <<
"/" << capacity <<
823 " (" << (
int)(int_mergers[i].size() * 100.0 / capacity) <<
"%)" <<
824 " space=" << int_mergers[i].is_space_available());
826 for (
int i = 0; i < num_ext_groups; ++i) {
828 STXXL_MSG(
" grp " << i + num_int_groups <<
" ext" <<
829 " grpbuf=" << current_group_buffer_size(i + num_int_groups) <<
830 " size=" << ext_mergers[i]->size() <<
"/" << capacity <<
831 " (" << (
int)(ext_mergers[i]->size() * 100.0 / capacity) <<
"%)" <<
832 " space=" << ext_mergers[i]->is_space_available());
837 template <
class ConfigType>
840 STXXL_MSG(
"params: delete_buffer_size=" << delete_buffer_size <<
" N=" << N <<
" IntKMAX=" << IntKMAX <<
" num_int_groups=" << num_int_groups <<
" ExtKMAX=" << ExtKMAX <<
" num_ext_groups=" << num_ext_groups <<
" BlockSize=" << BlockSize);
843 namespace priority_queue_local {
857 template <
unsigned_type E_,
internal_
size_type IntMem_, external_
size_type MaxItems,
unsigned_type B_,
unsigned_type m_,
bool stop = false>
881 template <
unsigned_type E_,
unsigned_type IntMem,
unsigned_type MaxItems,
bool stop>
882 struct find_B_m<E_, IntMem, MaxItems, 2048, 1, stop>
889 template <
unsigned_type E_,
unsigned_type IntMem,
unsigned_type MaxItems,
unsigned_type B_,
unsigned_type m_>
890 struct find_B_m<E_, IntMem, MaxItems, B_, m_, true>
897 template <
unsigned_type E_,
unsigned_type IntMem,
unsigned_type MaxItems>
910 template <
unsigned_type AI_,
unsigned_type X_,
unsigned_type CriticalSize>
922 template <
unsigned_type X_,
unsigned_type CriticalSize_>
956 template <
class ValueType,
969 X =
B * (settings::k -
m) / settings::element_size,
1003 template <
class ConfigType>
1012 #endif // !STXXL_CONTAINERS_PRIORITY_QUEUE_HEADER
virtual ~priority_queue()
static const unsigned_type EConsumption
bool empty() const
Returns true if queue has no elements.
value_type * group_buffer_current_mins[total_num_groups]
void push(const value_type &obj)
Inserts x into the priority_queue.
static const unsigned_type X
unsigned_type make_space_available(unsigned_type level)
static const unsigned_type element_size
int_merger_type int_mergers[num_int_groups]
bool is_sorted(_ForwardIter __first, _ForwardIter __last)
External merger, based on the loser tree data structure.
priority_queue_local::find_settings< sizeof(ValueType), IntMemory, MaxItems >::result settings
void refill_delete_buffer()
priority_queue_local::ext_merger< block_type, comparator_type, ExtKMAX, alloc_strategy_type > ext_merger_type
void swap_1D_arrays(T *a, T *b, unsigned_type size)
unsigned long long int uint64
static const internal_size_type IntMem
Inverts the order of a comparison functor by swapping its arguments.
typed_block< BlockSize, value_type > block_type
Type of the block used in disk-memory transfers.
Priority queue type generator. Introduction to priority queue container: see STXXL Priority Queue ...
priority_queue_local::internal_priority_queue< value_type, std::vector< value_type >, comparator_type > insert_heap_type
value_type group_buffers[total_num_groups][N+1]
#define STXXL_DEFAULT_ALLOC_STRATEGY
stxxl::uint64 size_type
An unsigned integral type (64 bit).
void merge4_iterator(InputIterator &source0, InputIterator &source1, InputIterator &source2, InputIterator &source3, OutputIterator target, unsigned_type length, CompareType &cmp)
find_B_m< E_, IntMem, MaxItems,(8 *1024 *1024), 1 >::result result
void set_pool(pool_type *pool_)
static const unsigned_type step
AllocStr_ alloc_strategy_type
void insert_segment(Merger &another_merger, size_type segment_size)
static const unsigned_type N
void merge_iterator(InputIterator &source0, InputIterator &source1, OutputIterator target, unsigned_type length, CompareType &cmp)
CompareType comparator_type
#define STXXL_DEPRECATED(x)
Loser tree from Knuth, "Sorting and Searching", Section 5.4.1.
unsigned_type mem_cons() const
Number of bytes consumed by the priority_queue from the internal memory not including pools (see the ...
static const unsigned_type B
Implements dynamically resizable buffered writing pool.
static const external_size_type m
Config::alloc_strategy_type alloc_strategy_type
Implements dynamically resizable prefetching pool.
Block containing elements of fixed length.
value_type * delete_buffer_current_min
find_B_m< element_size, IntMem, MaxItems, B/2, 1, fits||candidate1::fits >::result candidate2
Parameters_for_priority_queue_not_found_Increase_IntMem result
Parameters_not_found_Try_to_change_the_Tune_parameter result
Implements dynamically resizable buffered writing and prefetched reading pool.
choose_int_types< my_pointer_size >::int_type int_type
find_B_m< element_size, IntMem, MaxItems, B, m+step, fits||(m >=k-step)>::result candidate1
size_type size() const
Returns number of elements contained.
unsigned_type internal_size_type
value_type delete_buffer[delete_buffer_size+1]
Container::value_type pop(Container &c)
unsigned_type current_delete_buffer_size() const
void merge3_iterator(InputIterator &source0, InputIterator &source1, InputIterator &source2, OutputIterator target, unsigned_type length, CompareType &cmp)
Config::comparator_type comparator_type
Comparison object.
#define STXXL_VERBOSE_PQ(msg)
#define STXXL_BEGIN_NAMESPACE
Parameters_not_found_Try_to_change_the_Tune_parameter result
const value_type & top() const
Returns "largest" element.
External priority queue data structure Introduction to priority queue container: see STXXL Priorit...
#define STXXL_VERBOSE1(x)
static const unsigned_type k
unsigned_type current_group_buffer_size(unsigned_type i) const
Parameters_for_priority_queue_not_found_Increase_IntMem result
#define STXXL_THROW2(exception_type, location, error_message)
Throws exception_type with "Error in [location] : [error_message]".
void swap(counting_ptr< A > &a1, counting_ptr< A > &a2)
swap enclosed object with another counting pointer (no reference counts need change) ...
Similar to std::priority_queue, with the following differences:
insert_heap_type insert_heap
unsigned_type refill_group_buffer(unsigned_type k)
#define STXXL_STATIC_ASSERT(x)
find_B_m< E_, IntMem_, MaxItems, B_, m_, stop > Self
static const external_size_type fits
uint64 external_size_type
choose_int_types< my_pointer_size >::unsigned_type unsigned_type
priority_queue< priority_queue_config< ValueType, CompareType, Buffer1Size, N, AI, 2, B, AE, 2 > > result
priority_queue_local::loser_tree< value_type, comparator_type, IntKMAX > int_merger_type
void swap(loser_tree &obj)
IF<(N >=CriticalSize), Self, typename compute_N< AI/2, X, CriticalSize >::result >::result result
value_type get_supremum() const
unsigned_type num_active_groups
IF template metaprogramming statement.
static const unsigned_type AI
read_write_pool< block_type > pool_type
Config::value_type value_type
The type of object stored in the priority_queue.
static const unsigned_type c
ext_merger_type ** ext_mergers
IF< fits, Self, typename IF< candidate1::fits, candidate1, candidate2 >::result >::result result
value_type * delete_buffer_end
compute_N< AI_, X_, CriticalSize > Self
#define STXXL_END_NAMESPACE
priority_queue_local::compute_N<(1<< Tune), X, 4 *Buffer1Size >::result ComputeN
priority_queue(pool_type &pool_)
Constructs external priority queue object.