20 #ifndef STXXL_CONTAINERS_PRIORITY_QUEUE_HEADER
21 #define STXXL_CONTAINERS_PRIORITY_QUEUE_HEADER
43 unsigned BufferSize1_ = 32,
45 unsigned IntKMAX_ = 64,
46 unsigned IntLevels_ = 4,
47 unsigned BlockSize_ = (2* 1024* 1024),
48 unsigned ExtKMAX_ = 64,
49 unsigned ExtLevels_ = 2,
59 delete_buffer_size = BufferSize1_,
62 num_int_groups = IntLevels_,
63 num_ext_groups = ExtLevels_,
64 BlockSize = BlockSize_,
66 element_size =
sizeof(ValueType)
74 template <
class BlockType,
83 template <
class ValueType,
class CompareType,
unsigned KNKMAX>
97 template <
class ConfigType>
104 delete_buffer_size = Config::delete_buffer_size,
106 IntKMAX = Config::IntKMAX,
107 num_int_groups = Config::num_int_groups,
108 num_ext_groups = Config::num_ext_groups,
109 total_num_groups = Config::num_int_groups + Config::num_ext_groups,
110 BlockSize = Config::BlockSize,
111 ExtKMAX = Config::ExtKMAX
146 value_type group_buffers[total_num_groups][N + 1];
147 value_type* group_buffer_current_mins[total_num_groups];
150 value_type delete_buffer[delete_buffer_size + 1];
168 void refill_delete_buffer();
172 void empty_insert_heap();
241 std::swap(cmp, obj.
cmp);
244 std::swap(size_, obj.
size_);
253 size_type size()
const;
257 bool empty()
const {
return (size() == 0); }
273 const value_type & top()
const;
291 void push(
const value_type& obj);
305 for (
int i = 0; i < num_int_groups; ++i)
306 dynam_alloc_mem += int_mergers[i].mem_cons();
308 for (
int i = 0; i < num_ext_groups; ++i)
309 dynam_alloc_mem += ext_mergers[i]->mem_cons();
311 return (
sizeof(*
this) +
316 void dump_sizes()
const;
317 void dump_params()
const;
322 template <
class ConfigType>
323 inline typename priority_queue<ConfigType>::size_type
327 insert_heap.
size() - 1 +
328 (delete_buffer_end - delete_buffer_current_min);
331 template <
class ConfigType>
335 assert(!insert_heap.empty());
338 if ( cmp(*delete_buffer_current_min, t))
341 return *delete_buffer_current_min;
344 template <
class ConfigType>
348 assert(!insert_heap.empty());
350 if ( cmp(*delete_buffer_current_min, insert_heap.top()))
354 assert(delete_buffer_current_min < delete_buffer_end);
355 ++delete_buffer_current_min;
356 if (delete_buffer_current_min == delete_buffer_end)
357 refill_delete_buffer();
361 template <
class ConfigType>
365 assert(int_mergers->not_sentinel(obj));
366 if (insert_heap.size() == N + 1)
369 assert(!insert_heap.empty());
371 insert_heap.
push(obj);
376 template <
class ConfigType>
380 delete_buffer_end(delete_buffer + delete_buffer_size),
382 num_active_groups(0), size_(0)
389 template <
class ConfigType>
393 delete_buffer_end(delete_buffer + delete_buffer_size),
395 num_active_groups(0), size_(0)
401 template <
class ConfigType>
403 : pool(new
pool_type(p_pool_mem / BlockSize, w_pool_mem / BlockSize)),
405 delete_buffer_end(delete_buffer + delete_buffer_size),
407 num_active_groups(0), size_(0)
413 template <
class ConfigType>
416 assert(!cmp(cmp.min_value(), cmp.min_value()));
424 value_type sentinel = cmp.min_value();
425 insert_heap.push(sentinel);
426 delete_buffer[delete_buffer_size] = sentinel;
427 delete_buffer_current_min = delete_buffer_end;
430 group_buffers[i][N] = sentinel;
431 group_buffer_current_mins[i] = &(group_buffers[i][N]);
435 template <
class ConfigType>
443 delete ext_mergers[j];
444 delete[] ext_mergers;
450 template <
class ConfigType>
458 size_type group_size = (group < num_int_groups) ?
459 int_mergers[group].size() :
460 ext_mergers[group - num_int_groups]->size();
461 unsigned_type left_elements = group_buffers[group] + N - group_buffer_current_mins[group];
462 if (group_size + left_elements >=
size_type(N))
464 target = group_buffers[group];
465 length = N - left_elements;
469 target = group_buffers[group] + N - group_size - left_elements;
476 memmove(target, group_buffer_current_mins[group], left_elements *
sizeof(value_type));
477 group_buffer_current_mins[group] = target;
480 if (group < num_int_groups)
481 int_mergers[group].multi_merge(target + left_elements,
484 ext_mergers[group - num_int_groups]->multi_merge(
485 target + left_elements,
486 target + left_elements + length);
491 #if STXXL_CHECK_ORDER_IN_SORTS
493 if (!
stxxl::is_sorted(group_buffer_current_mins[group], group_buffers[group] + N, inv_cmp))
495 STXXL_VERBOSE_PQ(
"refill_grp... length: " << length <<
" left_elements: " << left_elements);
496 for (value_type* v = group_buffer_current_mins[group] + 1; v < group_buffer_current_mins[group] + left_elements; ++v)
498 if (inv_cmp(*v, *(v - 1)))
500 STXXL_MSG(
"Error in buffer " << group <<
" at position " << (v - group_buffer_current_mins[group] - 1) <<
"/" << (v - group_buffer_current_mins[group]) <<
" " << *(v - 2) <<
" " << *(v - 1) <<
" " << *v <<
" " << *(v + 1));
507 return length + left_elements;
510 template <
class ConfigType>
520 if ((group_buffers[i] + N) - group_buffer_current_mins[i] < delete_buffer_size)
522 size_type length = refill_group_buffer(i);
524 if (length == 0 &&
unsigned(i) == num_active_groups - 1)
527 total_group_size += length;
530 total_group_size += delete_buffer_size;
534 if (total_group_size >= delete_buffer_size)
536 length = delete_buffer_size;
541 length = total_group_size;
542 assert(size_ == length);
551 delete_buffer_current_min = delete_buffer_end - length;
553 switch (num_active_groups)
558 std::copy(group_buffer_current_mins[0], group_buffer_current_mins[0] + length, delete_buffer_current_min);
559 group_buffer_current_mins[0] += length;
562 #if STXXL_PARALLEL && STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER
564 std::pair<value_type*, value_type*> seqs[2] =
566 std::make_pair(group_buffer_current_mins[0], group_buffers[0] + N),
567 std::make_pair(group_buffer_current_mins[1], group_buffers[1] + N)
569 parallel::multiway_merge_sentinel(seqs, seqs + 2, delete_buffer_current_min, inv_cmp, length);
571 group_buffer_current_mins[0] = seqs[0].first;
572 group_buffer_current_mins[1] = seqs[1].first;
576 group_buffer_current_mins[0],
577 group_buffer_current_mins[1], delete_buffer_current_min, length, cmp);
581 #if STXXL_PARALLEL && STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER
583 std::pair<value_type*, value_type*> seqs[3] =
585 std::make_pair(group_buffer_current_mins[0], group_buffers[0] + N),
586 std::make_pair(group_buffer_current_mins[1], group_buffers[1] + N),
587 std::make_pair(group_buffer_current_mins[2], group_buffers[2] + N)
589 parallel::multiway_merge_sentinel(seqs, seqs + 3, delete_buffer_current_min, inv_cmp, length);
591 group_buffer_current_mins[0] = seqs[0].first;
592 group_buffer_current_mins[1] = seqs[1].first;
593 group_buffer_current_mins[2] = seqs[2].first;
597 group_buffer_current_mins[0],
598 group_buffer_current_mins[1],
599 group_buffer_current_mins[2], delete_buffer_current_min, length, cmp);
603 #if STXXL_PARALLEL && STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER
605 std::pair<value_type*, value_type*> seqs[4] =
607 std::make_pair(group_buffer_current_mins[0], group_buffers[0] + N),
608 std::make_pair(group_buffer_current_mins[1], group_buffers[1] + N),
609 std::make_pair(group_buffer_current_mins[2], group_buffers[2] + N),
610 std::make_pair(group_buffer_current_mins[3], group_buffers[3] + N)
612 parallel::multiway_merge_sentinel(seqs, seqs + 4, delete_buffer_current_min, inv_cmp, length);
614 group_buffer_current_mins[0] = seqs[0].first;
615 group_buffer_current_mins[1] = seqs[1].first;
616 group_buffer_current_mins[2] = seqs[2].first;
617 group_buffer_current_mins[3] = seqs[3].first;
621 group_buffer_current_mins[0],
622 group_buffer_current_mins[1],
623 group_buffer_current_mins[2],
624 group_buffer_current_mins[3], delete_buffer_current_min, length, cmp);
628 STXXL_THROW2(std::runtime_error,
"priority_queue<...>::refill_delete_buffer()",
629 "Overflow! The number of buffers on 2nd level in stxxl::priority_queue is currently limited to 4");
632 #if STXXL_CHECK_ORDER_IN_SORTS
633 if (!
stxxl::is_sorted(delete_buffer_current_min, delete_buffer_end, inv_cmp))
635 for (value_type* v = delete_buffer_current_min + 1; v < delete_buffer_end; ++v)
637 if (inv_cmp(*v, *(v - 1)))
639 STXXL_MSG(
"Error at position " << (v - delete_buffer_current_min - 1) <<
"/" << (v - delete_buffer_current_min) <<
" " << *(v - 1) <<
" " << *v);
653 template <
class ConfigType>
658 assert(level < total_num_groups);
659 assert(level <= num_active_groups);
661 if (level == num_active_groups)
664 const bool spaceIsAvailable_ =
665 (level < num_int_groups) ? int_mergers[level].is_space_available()
666 : (ext_mergers[level - num_int_groups]->is_space_available());
668 if (spaceIsAvailable_)
672 else if (level == total_num_groups - 1)
675 for (
int i = 0; i < num_int_groups; ++i)
677 for (
int i = 0; i < num_ext_groups; ++i)
679 STXXL_ERRMSG(
"priority_queue OVERFLOW - all groups full, size=" << size() <<
680 ", capacity(last externel group (" << num_int_groups + num_ext_groups - 1 <<
"))=" << capacity);
684 const size_type segmentSize = ext_mergers[extLevel]->size();
685 STXXL_VERBOSE1(
"Inserting segment into last level external: " << level <<
" " << segmentSize);
688 overflow_merger->
insert_segment(*ext_mergers[extLevel], segmentSize);
689 std::swap(ext_mergers[extLevel], overflow_merger);
690 delete overflow_merger;
695 finalLevel = make_space_available(level + 1);
697 if (level < num_int_groups - 1)
700 value_type* newSegment =
new value_type[segmentSize + 1];
701 int_mergers[level].multi_merge(newSegment, segmentSize);
703 newSegment[segmentSize] = delete_buffer[delete_buffer_size];
707 int_mergers[level + 1].insert_segment(newSegment, segmentSize);
711 if (level == num_int_groups - 1)
713 const unsigned_type segmentSize = int_mergers[num_int_groups - 1].size();
714 STXXL_VERBOSE_PQ(
"make_space... Inserting segment into first level external: " << level <<
" " << segmentSize);
715 ext_mergers[0]->insert_segment(int_mergers[num_int_groups - 1], segmentSize);
719 const size_type segmentSize = ext_mergers[level - num_int_groups]->size();
720 STXXL_VERBOSE_PQ(
"make_space... Inserting segment into second level external: " << level <<
" " << segmentSize);
721 ext_mergers[level - num_int_groups + 1]->insert_segment(*ext_mergers[level - num_int_groups], segmentSize);
729 template <
class ConfigType>
733 assert(insert_heap.size() == (N + 1));
735 const value_type sup = get_supremum();
738 value_type* newSegment =
new value_type[N + 1];
739 value_type* newPos = newSegment;
743 value_type* SortTo = newSegment;
745 insert_heap.sort_to(SortTo);
747 SortTo = newSegment + N;
749 insert_heap.push(*SortTo);
751 assert(insert_heap.size() == 1);
758 value_type temp[tempSize + 1];
761 value_type* pos = temp + tempSize - sz1 - sz2;
762 std::copy(delete_buffer_current_min, delete_buffer_current_min + sz1, pos);
763 std::copy(group_buffer_current_mins[0], group_buffer_current_mins[0] + sz2, pos + sz1);
764 temp[tempSize] = sup;
783 assert(freeLevel == 0 || int_mergers[0].size() == 0);
784 int_mergers[0].insert_segment(newSegment, N);
790 for (
int_type i = freeLevel; i >= 0; i--)
795 newSegment =
new value_type[current_group_buffer_size(i) + 1];
796 std::copy(group_buffer_current_mins[i], group_buffer_current_mins[i] + current_group_buffer_size(i) + 1, newSegment);
797 int_mergers[0].insert_segment(newSegment, current_group_buffer_size(i));
798 group_buffer_current_mins[i] = group_buffers[i] + N;
806 if (delete_buffer_current_min == delete_buffer_end)
807 refill_delete_buffer();
810 template <
class ConfigType>
815 STXXL_MSG(
" insert_heap\t= " << insert_heap.size() - 1 <<
"/" << capacity);
816 STXXL_MSG(
" delete_buffer\t= " << (delete_buffer_end - delete_buffer_current_min) <<
"/" << delete_buffer_size);
817 for (
int i = 0; i < num_int_groups; ++i) {
820 " grpbuf=" << current_group_buffer_size(i) <<
821 " size=" << int_mergers[i].size() <<
"/" << capacity <<
822 " (" << (
int)((
double)int_mergers[i].size() * 100.0 / (
double)capacity) <<
"%)" <<
823 " space=" << int_mergers[i].is_space_available());
825 for (
int i = 0; i < num_ext_groups; ++i) {
827 STXXL_MSG(
" grp " << i + num_int_groups <<
" ext" <<
828 " grpbuf=" << current_group_buffer_size(i + num_int_groups) <<
829 " size=" << ext_mergers[i]->size() <<
"/" << capacity <<
830 " (" << (
int)((
double)ext_mergers[i]->size() * 100.0 / (
double)capacity) <<
"%)" <<
831 " space=" << ext_mergers[i]->is_space_available());
836 template <
class ConfigType>
839 STXXL_MSG(
"params: delete_buffer_size=" << delete_buffer_size <<
" N=" << N <<
" IntKMAX=" << IntKMAX <<
" num_int_groups=" << num_int_groups <<
" ExtKMAX=" << ExtKMAX <<
" num_ext_groups=" << num_ext_groups <<
" BlockSize=" << BlockSize);
842 namespace priority_queue_local {
861 typedef find_B_m<ElementSize, IntMem,
886 (((
k -
m) *
m * (
m *
B / (ElementSize * 4 * 1024))) >= MaxItems) &&
888 ((MaxItems < ((
k -
m) *
m / (2 * ElementSize)) * 1024) ||
m >= 128);
893 typedef typename find_B_m<ElementSize, IntMem, MaxItems,
B,
896 typedef typename find_B_m<ElementSize, IntMem, MaxItems,
B / 2,
906 struct find_B_m<ElementSize, IntMem, MaxItems, 2048, 1, stop>
916 struct find_B_m<ElementSize, IntMem, MaxItems, BlockSize, m_, true>
928 typedef typename find_B_m<ElementSize, IntMem,
937 template <
unsigned_type AI_,
unsigned_type X_,
unsigned_type CriticalSize>
949 template <
unsigned_type X_,
unsigned_type CriticalSize_>
983 template <
class ValueType,
996 X =
B * (settings::k -
m) / settings::element_size,
1029 template <
class ConfigType>
1038 #endif // !STXXL_CONTAINERS_PRIORITY_QUEUE_HEADER
virtual ~priority_queue()
static const unsigned_type EConsumption
static const internal_size_type m
number of blocks fitting into buffers of mergers (arity of both mergers), increased from 1 to 2048 ...
find_B_m< ElementSize, IntMem, MaxItems,(8 *1024 *1024), 1 >::result result
bool empty() const
Returns true if queue has no elements.
IF< fits, self_type, typename IF< candidate1::fits, candidate1, candidate2 >::result >::result result
return a fitting configuration.
bool is_sorted(ForwardIterator first, ForwardIterator last)
value_type * group_buffer_current_mins[total_num_groups]
void push(const value_type &obj)
Inserts x into the priority_queue.
static const unsigned_type X
unsigned_type make_space_available(unsigned_type level)
int_merger_type int_mergers[num_int_groups]
External merger, based on the loser tree data structure.
priority_queue_local::find_settings< sizeof(ValueType), IntMemory, MaxItems >::result settings
void refill_delete_buffer()
priority_queue_local::ext_merger< block_type, comparator_type, ExtKMAX, alloc_strategy_type > ext_merger_type
unsigned long long int uint64
static const external_size_type fits
calculated boolean whether the configuration fits into internal memory.
Inverts the order of a comparison functor by swapping its arguments.
typed_block< BlockSize, value_type > block_type
Type of the block used in disk-memory transfers.
Priority queue type generator. Introduction to priority queue container: see STXXL Priority Queue ...
priority_queue_local::internal_priority_queue< value_type, std::vector< value_type >, comparator_type > insert_heap_type
static const internal_size_type intmem
internal memory size of PQ
value_type group_buffers[total_num_groups][N+1]
#define STXXL_DEFAULT_ALLOC_STRATEGY
stxxl::uint64 size_type
An unsigned integral type (64 bit).
void merge3_iterator(InputIterator &source0, InputIterator &source1, InputIterator &source2, OutputIterator target, SizeType length, CompareType &cmp)
void insert_segment(Merger &another_merger, size_type segment_size)
void swap_1D_arrays(Type *a, Type *b, unsigned_type size)
AllocStr_ alloc_strategy_type
static const unsigned_type N
CompareType comparator_type
#define STXXL_DEPRECATED(x)
Loser tree from Knuth, "Sorting and Searching", Section 5.4.1.
unsigned_type mem_cons() const
Number of bytes consumed by the priority_queue from the internal memory not including pools (see the ...
static const internal_size_type element_size
element size
Implements dynamically resizable buffered writing pool.
Config::alloc_strategy_type alloc_strategy_type
Implements dynamically resizable prefetching pool.
Block containing elements of fixed length.
static const unsigned_type step
value_type * delete_buffer_current_min
find_B_m< ElementSize, IntMem, MaxItems, BlockSize, m_, stop > self_type
Parameters_not_found_Try_to_change_the_Tune_parameter result
Implements dynamically resizable buffered writing and prefetched reading pool.
void set_pool(pool_type *pool_)
choose_int_types< my_pointer_size >::int_type int_type
size_type size() const
Returns number of elements contained.
unsigned_type internal_size_type
void swap(counting_ptr< A > &a1, counting_ptr< A > &a2)
swap enclosed object with another counting pointer (no reference counts need change) ...
value_type delete_buffer[delete_buffer_size+1]
Container::value_type pop(Container &c)
unsigned_type current_delete_buffer_size() const
Config::comparator_type comparator_type
Comparison object.
Parameters_for_priority_queue_not_found_Increase_IntMem result
#define STXXL_VERBOSE_PQ(msg)
static const internal_size_type k
number of blocks that fit into internal memory (M)
#define STXXL_BEGIN_NAMESPACE
Parameters_not_found_Try_to_change_the_Tune_parameter result
size_type refill_group_buffer(unsigned_type k)
const value_type & top() const
Returns "largest" element.
External priority queue data structure Introduction to priority queue container: see STXXL Priorit...
#define STXXL_VERBOSE1(x)
find_B_m< ElementSize, IntMem, MaxItems, B, m+step, fits||(m+step >=k)>::result candidate1
if not fits, recurse into configuration with +step more internal buffers
unsigned_type current_group_buffer_size(unsigned_type i) const
Parameters_for_priority_queue_not_found_Increase_IntMem result
#define STXXL_THROW2(exception_type, location, error_message)
Throws exception_type with "Error in [location] : [error_message]".
Similar to std::priority_queue, with the following differences:
insert_heap_type insert_heap
#define STXXL_STATIC_ASSERT(x)
static const int_type c
remaining blocks, (freely moving, not necessarily unused) ?-tb
uint64 external_size_type
choose_int_types< my_pointer_size >::unsigned_type unsigned_type
priority_queue< priority_queue_config< ValueType, CompareType, Buffer1Size, N, AI, 2, B, AE, 2 > > result
static const internal_size_type B
block size (iterates from 8 MiB downwards)
priority_queue_local::loser_tree< value_type, comparator_type, IntKMAX > int_merger_type
IF<(N >=CriticalSize), Self, typename compute_N< AI/2, X, CriticalSize >::result >::result result
value_type get_supremum() const
find_B_m< ElementSize, IntMem, MaxItems, B/2, 1, fits||candidate1::fits >::result candidate2
if not fits, recurse into configuration with block size halved.
unsigned_type num_active_groups
IF template metaprogramming statement.
static const unsigned_type AI
read_write_pool< block_type > pool_type
Config::value_type value_type
The type of object stored in the priority_queue.
void merge_iterator(InputIterator &source0, InputIterator &source1, OutputIterator target, SizeType length, CompareType &cmp)
ext_merger_type ** ext_mergers
value_type * delete_buffer_end
compute_N< AI_, X_, CriticalSize > Self
void swap(loser_tree &obj)
#define STXXL_END_NAMESPACE
priority_queue_local::compute_N<(1<< Tune), X, 4 *Buffer1Size >::result ComputeN
priority_queue(pool_type &pool_)
Constructs external priority queue object.
void merge4_iterator(InputIterator &source0, InputIterator &source1, InputIterator &source2, InputIterator &source3, OutputIterator target, SizeType length, CompareType &cmp)