20 #ifndef STXXL_CONTAINERS_PRIORITY_QUEUE_HEADER
21 #define STXXL_CONTAINERS_PRIORITY_QUEUE_HEADER
43 unsigned BufferSize1_ = 32,
45 unsigned IntKMAX_ = 64,
46 unsigned IntLevels_ = 4,
47 unsigned BlockSize_ = (2* 1024* 1024),
48 unsigned ExtKMAX_ = 64,
49 unsigned ExtLevels_ = 2,
59 delete_buffer_size = BufferSize1_,
62 num_int_groups = IntLevels_,
63 num_ext_groups = ExtLevels_,
64 BlockSize = BlockSize_,
66 element_size =
sizeof(ValueType)
74 template <
class BlockType,
83 template <
class ValueType,
class CompareType,
unsigned KNKMAX>
97 template <
class ConfigType>
104 delete_buffer_size = Config::delete_buffer_size,
106 IntKMAX = Config::IntKMAX,
107 num_int_groups = Config::num_int_groups,
108 num_ext_groups = Config::num_ext_groups,
109 total_num_groups = Config::num_int_groups + Config::num_ext_groups,
110 BlockSize = Config::BlockSize,
111 ExtKMAX = Config::ExtKMAX
146 value_type group_buffers[total_num_groups][N + 1];
147 value_type* group_buffer_current_mins[total_num_groups];
150 value_type delete_buffer[delete_buffer_size + 1];
168 void refill_delete_buffer();
172 void empty_insert_heap();
241 std::swap(cmp, obj.
cmp);
244 std::swap(size_, obj.
size_);
253 size_type size()
const;
257 bool empty()
const {
return (size() == 0); }
273 const value_type & top()
const;
291 void push(
const value_type& obj);
305 for (
int i = 0; i < num_int_groups; ++i)
306 dynam_alloc_mem += int_mergers[i].mem_cons();
308 for (
int i = 0; i < num_ext_groups; ++i)
309 dynam_alloc_mem += ext_mergers[i]->mem_cons();
311 return (
sizeof(*
this) +
316 void dump_sizes()
const;
317 void dump_params()
const;
322 template <
class ConfigType>
323 inline typename priority_queue<ConfigType>::size_type
327 insert_heap.
size() - 1 +
328 (delete_buffer_end - delete_buffer_current_min);
331 template <
class ConfigType>
335 assert(!insert_heap.empty());
338 if ( cmp(*delete_buffer_current_min, t))
341 return *delete_buffer_current_min;
344 template <
class ConfigType>
348 assert(!insert_heap.empty());
350 if ( cmp(*delete_buffer_current_min, insert_heap.top()))
354 assert(delete_buffer_current_min < delete_buffer_end);
355 ++delete_buffer_current_min;
356 if (delete_buffer_current_min == delete_buffer_end)
357 refill_delete_buffer();
361 template <
class ConfigType>
365 assert(!int_mergers->is_sentinel(obj));
366 if (insert_heap.size() == N + 1)
369 assert(!insert_heap.empty());
371 insert_heap.
push(obj);
376 template <
class ConfigType>
380 delete_buffer_end(delete_buffer + delete_buffer_size),
382 num_active_groups(0), size_(0)
389 template <
class ConfigType>
393 delete_buffer_end(delete_buffer + delete_buffer_size),
395 num_active_groups(0), size_(0)
401 template <
class ConfigType>
403 : pool(new
pool_type(p_pool_mem / BlockSize, w_pool_mem / BlockSize)),
405 delete_buffer_end(delete_buffer + delete_buffer_size),
407 num_active_groups(0), size_(0)
413 template <
class ConfigType>
416 assert(!cmp(cmp.min_value(), cmp.min_value()));
424 value_type
sentinel = cmp.min_value();
425 insert_heap.push(sentinel);
426 delete_buffer[delete_buffer_size] =
sentinel;
427 delete_buffer_current_min = delete_buffer_end;
431 group_buffer_current_mins[i] = &(group_buffers[i][N]);
435 template <
class ConfigType>
443 delete ext_mergers[j];
444 delete[] ext_mergers;
450 template <
class ConfigType>
458 size_type group_size = (group < num_int_groups) ?
459 int_mergers[group].size() :
460 ext_mergers[group - num_int_groups]->size();
461 unsigned_type left_elements = group_buffers[group] + N - group_buffer_current_mins[group];
462 if (group_size + left_elements >=
size_type(N))
464 target = group_buffers[group];
465 length = N - left_elements;
469 target = group_buffers[group] + N - group_size - left_elements;
476 memmove(target, group_buffer_current_mins[group], left_elements *
sizeof(value_type));
477 group_buffer_current_mins[group] = target;
480 if (group < num_int_groups)
481 int_mergers[group].multi_merge(target + left_elements,
482 target + left_elements + length);
484 ext_mergers[group - num_int_groups]->multi_merge(
485 target + left_elements,
486 target + left_elements + length);
491 #if STXXL_CHECK_ORDER_IN_SORTS
493 if (!
stxxl::is_sorted(group_buffer_current_mins[group], group_buffers[group] + N, inv_cmp))
495 STXXL_VERBOSE_PQ(
"refill_grp... length: " << length <<
" left_elements: " << left_elements);
496 for (value_type* v = group_buffer_current_mins[group] + 1; v < group_buffer_current_mins[group] + left_elements; ++v)
498 if (inv_cmp(*v, *(v - 1)))
500 STXXL_MSG(
"Error in buffer " << group <<
" at position " << (v - group_buffer_current_mins[group] - 1) <<
"/" << (v - group_buffer_current_mins[group]) <<
" " << *(v - 2) <<
" " << *(v - 1) <<
" " << *v <<
" " << *(v + 1));
507 return length + left_elements;
510 template <
class ConfigType>
520 if ((group_buffers[i] + N) - group_buffer_current_mins[i] < delete_buffer_size)
522 size_type length = refill_group_buffer(i);
524 if (length == 0 &&
unsigned(i) == num_active_groups - 1)
527 total_group_size += length;
530 total_group_size += delete_buffer_size;
534 if (total_group_size >= delete_buffer_size)
536 length = delete_buffer_size;
541 length = total_group_size;
542 assert(size_ == length);
551 delete_buffer_current_min = delete_buffer_end - length;
553 switch (num_active_groups)
558 std::copy(group_buffer_current_mins[0], group_buffer_current_mins[0] + length, delete_buffer_current_min);
559 group_buffer_current_mins[0] += length;
562 #if STXXL_PARALLEL && STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER
564 std::pair<value_type*, value_type*> seqs[2] =
566 std::make_pair(group_buffer_current_mins[0], group_buffers[0] + N),
567 std::make_pair(group_buffer_current_mins[1], group_buffers[1] + N)
571 seqs, seqs + 2, delete_buffer_current_min, length, inv_cmp);
574 group_buffer_current_mins[0] = seqs[0].first;
575 group_buffer_current_mins[1] = seqs[1].first;
579 group_buffer_current_mins[0], group_buffer_current_mins[1],
580 delete_buffer_current_min, delete_buffer_current_min + length, cmp);
584 #if STXXL_PARALLEL && STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER
586 std::pair<value_type*, value_type*> seqs[3] =
588 std::make_pair(group_buffer_current_mins[0], group_buffers[0] + N),
589 std::make_pair(group_buffer_current_mins[1], group_buffers[1] + N),
590 std::make_pair(group_buffer_current_mins[2], group_buffers[2] + N)
594 seqs, seqs + 3, delete_buffer_current_min, length, inv_cmp);
597 group_buffer_current_mins[0] = seqs[0].first;
598 group_buffer_current_mins[1] = seqs[1].first;
599 group_buffer_current_mins[2] = seqs[2].first;
603 group_buffer_current_mins[0],
604 group_buffer_current_mins[1],
605 group_buffer_current_mins[2],
606 delete_buffer_current_min, delete_buffer_current_min + length, cmp);
610 #if STXXL_PARALLEL && STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER
612 std::pair<value_type*, value_type*> seqs[4] =
614 std::make_pair(group_buffer_current_mins[0], group_buffers[0] + N),
615 std::make_pair(group_buffer_current_mins[1], group_buffers[1] + N),
616 std::make_pair(group_buffer_current_mins[2], group_buffers[2] + N),
617 std::make_pair(group_buffer_current_mins[3], group_buffers[3] + N)
621 seqs, seqs + 4, delete_buffer_current_min, length, inv_cmp);
624 group_buffer_current_mins[0] = seqs[0].first;
625 group_buffer_current_mins[1] = seqs[1].first;
626 group_buffer_current_mins[2] = seqs[2].first;
627 group_buffer_current_mins[3] = seqs[3].first;
631 group_buffer_current_mins[0],
632 group_buffer_current_mins[1],
633 group_buffer_current_mins[2],
634 group_buffer_current_mins[3],
635 delete_buffer_current_min, delete_buffer_current_min + length, cmp);
640 STXXL_THROW2(std::runtime_error,
"priority_queue<...>::refill_delete_buffer()",
641 "Overflow! The number of buffers on 2nd level in stxxl::priority_queue is currently limited to 4");
644 #if STXXL_CHECK_ORDER_IN_SORTS
645 if (!
stxxl::is_sorted(delete_buffer_current_min, delete_buffer_end, inv_cmp))
647 for (value_type* v = delete_buffer_current_min + 1; v < delete_buffer_end; ++v)
649 if (inv_cmp(*v, *(v - 1)))
651 STXXL_MSG(
"Error at position " << (v - delete_buffer_current_min - 1) <<
"/" << (v - delete_buffer_current_min) <<
" " << *(v - 1) <<
" " << *v);
665 template <
class ConfigType>
670 assert(level < total_num_groups);
671 assert(level <= num_active_groups);
673 if (level == num_active_groups)
676 const bool spaceIsAvailable_ =
677 (level < num_int_groups) ? int_mergers[level].is_space_available()
678 : (ext_mergers[level - num_int_groups]->is_space_available());
680 if (spaceIsAvailable_)
684 else if (level == total_num_groups - 1)
687 for (
int i = 0; i < num_int_groups; ++i)
689 for (
int i = 0; i < num_ext_groups; ++i)
691 STXXL_ERRMSG(
"priority_queue OVERFLOW - all groups full, size=" << size() <<
692 ", capacity(last externel group (" << num_int_groups + num_ext_groups - 1 <<
"))=" << capacity);
696 const size_type segmentSize = ext_mergers[extLevel]->size();
697 STXXL_VERBOSE1(
"Inserting segment into last level external: " << level <<
" " << segmentSize);
700 overflow_merger->
append_merger(*ext_mergers[extLevel], segmentSize);
701 std::swap(ext_mergers[extLevel], overflow_merger);
702 delete overflow_merger;
707 finalLevel = make_space_available(level + 1);
709 if (level < num_int_groups - 1)
712 value_type* newSegment =
new value_type[segmentSize + 1];
713 int_mergers[level].multi_merge(newSegment, newSegment + segmentSize);
715 newSegment[segmentSize] = delete_buffer[delete_buffer_size];
719 int_mergers[level + 1].append_array(newSegment, segmentSize);
723 if (level == num_int_groups - 1)
725 const unsigned_type segmentSize = int_mergers[num_int_groups - 1].size();
726 STXXL_VERBOSE_PQ(
"make_space... Inserting segment into first level external: " << level <<
" " << segmentSize);
727 ext_mergers[0]->append_merger(int_mergers[num_int_groups - 1], segmentSize);
731 const size_type segmentSize = ext_mergers[level - num_int_groups]->size();
732 STXXL_VERBOSE_PQ(
"make_space... Inserting segment into second level external: " << level <<
" " << segmentSize);
733 ext_mergers[level - num_int_groups + 1]->append_merger(*ext_mergers[level - num_int_groups], segmentSize);
741 template <
class ConfigType>
745 assert(insert_heap.size() == (N + 1));
747 const value_type sup = get_supremum();
750 value_type* newSegment =
new value_type[N + 1];
751 value_type* newPos = newSegment;
755 value_type* SortTo = newSegment;
757 insert_heap.sort_to(SortTo);
759 SortTo = newSegment + N;
761 insert_heap.push(*SortTo);
763 assert(insert_heap.size() == 1);
770 value_type temp[tempSize + 1];
773 value_type* pos = temp + tempSize - sz1 - sz2;
774 std::copy(delete_buffer_current_min, delete_buffer_current_min + sz1, pos);
775 std::copy(group_buffer_current_mins[0], group_buffer_current_mins[0] + sz2, pos + sz1);
776 temp[tempSize] = sup;
783 delete_buffer_current_min, delete_buffer_current_min + sz1, cmp);
790 group_buffer_current_mins[0], group_buffer_current_mins[0] + sz2, cmp);
796 newSegment, newSegment + N, cmp);
800 assert(freeLevel == 0 || int_mergers[0].size() == 0);
801 int_mergers[0].append_array(newSegment, N);
807 for (
int_type i = freeLevel; i >= 0; i--)
812 newSegment =
new value_type[current_group_buffer_size(i) + 1];
813 std::copy(group_buffer_current_mins[i], group_buffer_current_mins[i] + current_group_buffer_size(i) + 1, newSegment);
814 int_mergers[0].append_array(newSegment, current_group_buffer_size(i));
815 group_buffer_current_mins[i] = group_buffers[i] + N;
823 if (delete_buffer_current_min == delete_buffer_end)
824 refill_delete_buffer();
827 template <
class ConfigType>
832 STXXL_MSG(
" insert_heap\t= " << insert_heap.size() - 1 <<
"/" << capacity);
833 STXXL_MSG(
" delete_buffer\t= " << (delete_buffer_end - delete_buffer_current_min) <<
"/" << delete_buffer_size);
834 for (
int i = 0; i < num_int_groups; ++i) {
837 " grpbuf=" << current_group_buffer_size(i) <<
838 " size=" << int_mergers[i].size() <<
"/" << capacity <<
839 " (" << (
int)((
double)int_mergers[i].size() * 100.0 / (
double)capacity) <<
"%)" <<
840 " space=" << int_mergers[i].is_space_available());
842 for (
int i = 0; i < num_ext_groups; ++i) {
844 STXXL_MSG(
" grp " << i + num_int_groups <<
" ext" <<
845 " grpbuf=" << current_group_buffer_size(i + num_int_groups) <<
846 " size=" << ext_mergers[i]->size() <<
"/" << capacity <<
847 " (" << (
int)((
double)ext_mergers[i]->size() * 100.0 / (
double)capacity) <<
"%)" <<
848 " space=" << ext_mergers[i]->is_space_available());
853 template <
class ConfigType>
856 STXXL_MSG(
"params: delete_buffer_size=" << delete_buffer_size <<
" N=" << N <<
" IntKMAX=" << IntKMAX <<
" num_int_groups=" << num_int_groups <<
" ExtKMAX=" << ExtKMAX <<
" num_ext_groups=" << num_ext_groups <<
" BlockSize=" << BlockSize);
859 namespace priority_queue_local {
878 typedef find_B_m<ElementSize, IntMem,
903 (((
k -
m) *
m * (
m *
B / (ElementSize * 4 * 1024))) >= MaxItems) &&
905 ((MaxItems < ((
k -
m) *
m / (2 * ElementSize)) * 1024) ||
m >= 128);
910 typedef typename find_B_m<ElementSize, IntMem, MaxItems,
B,
913 typedef typename find_B_m<ElementSize, IntMem, MaxItems,
B / 2,
923 struct find_B_m<ElementSize, IntMem, MaxItems, 2048, 1, stop>
933 struct find_B_m<ElementSize, IntMem, MaxItems, BlockSize, m_, true>
945 typedef typename find_B_m<ElementSize, IntMem,
954 template <
unsigned_type AI_,
unsigned_type X_,
unsigned_type CriticalSize>
966 template <
unsigned_type X_,
unsigned_type CriticalSize_>
1000 template <
class ValueType,
1013 X =
B * (settings::k -
m) / settings::element_size,
1046 template <
class ConfigType>
1055 #endif // !STXXL_CONTAINERS_PRIORITY_QUEUE_HEADER
virtual ~priority_queue()
static const unsigned_type EConsumption
static const internal_size_type m
number of blocks fitting into buffers of mergers (arity of both mergers), increased from 1 to 2048 ...
find_B_m< ElementSize, IntMem, MaxItems,(8 *1024 *1024), 1 >::result result
bool empty() const
Returns true if queue has no elements.
IF< fits, self_type, typename IF< candidate1::fits, candidate1, candidate2 >::result >::result result
return a fitting configuration.
value_type * group_buffer_current_mins[total_num_groups]
void push(const value_type &obj)
Inserts x into the priority_queue.
static const unsigned_type X
unsigned_type make_space_available(unsigned_type level)
int_merger_type int_mergers[num_int_groups]
External merger, based on the loser tree data structure.
priority_queue_local::find_settings< sizeof(ValueType), IntMemory, MaxItems >::result settings
void refill_delete_buffer()
priority_queue_local::ext_merger< block_type, comparator_type, ExtKMAX, alloc_strategy_type > ext_merger_type
void set_pool(pool_type *pool_)
unsigned long long int uint64
static const external_size_type fits
calculated boolean whether the configuration fits into internal memory.
Inverts the order of a comparison functor by swapping its arguments.
typed_block< BlockSize, value_type > block_type
Type of the block used in disk-memory transfers.
Priority queue type generator. Introduction to priority queue container: see STXXL Priority Queue ...
priority_queue_local::internal_priority_queue< value_type, std::vector< value_type >, comparator_type > insert_heap_type
static const internal_size_type intmem
internal memory size of PQ
value_type group_buffers[total_num_groups][N+1]
#define STXXL_DEFAULT_ALLOC_STRATEGY
void merge3_iterator(InputIterator &source0, InputIterator &source1, InputIterator &source2, OutputIterator target, OutputIterator end, CompareType &cmp)
stxxl::uint64 size_type
An unsigned integral type (64 bit).
void swap_1D_arrays(Type *a, Type *b, unsigned_type size)
AllocStr_ alloc_strategy_type
static const unsigned_type N
CompareType comparator_type
#define STXXL_DEPRECATED(x)
unsigned_type mem_cons() const
Number of bytes consumed by the priority_queue from the internal memory not including pools (see the ...
static const internal_size_type element_size
element size
Implements dynamically resizable buffered writing pool.
Config::alloc_strategy_type alloc_strategy_type
Implements dynamically resizable prefetching pool.
Block containing elements of fixed length.
static const unsigned_type step
value_type * delete_buffer_current_min
void merge4_iterator(InputIterator &source0, InputIterator &source1, InputIterator &source2, InputIterator &source3, OutputIterator target, OutputIterator end, CompareType &cmp)
find_B_m< ElementSize, IntMem, MaxItems, BlockSize, m_, stop > self_type
RandomAccessIterator3 multiway_merge_sentinels(RandomAccessIteratorPairIterator seqs_begin, RandomAccessIteratorPairIterator seqs_end, RandomAccessIterator3 target, DiffType length, Comparator comp)
Multi-way merging front-end.
Parameters_not_found_Try_to_change_the_Tune_parameter result
choose_int_types< my_pointer_size >::int_type int_type
size_type size() const
Returns number of elements contained.
unsigned_type internal_size_type
void swap(counting_ptr< A > &a1, counting_ptr< A > &a2)
swap enclosed object with another counting pointer (no reference counts need change) ...
value_type delete_buffer[delete_buffer_size+1]
Container::value_type pop(Container &c)
unsigned_type current_delete_buffer_size() const
Config::comparator_type comparator_type
Comparison object.
Parameters_for_priority_queue_not_found_Increase_IntMem result
#define STXXL_VERBOSE_PQ(msg)
static const internal_size_type k
number of blocks that fit into internal memory (M)
#define STXXL_BEGIN_NAMESPACE
Parameters_not_found_Try_to_change_the_Tune_parameter result
size_type refill_group_buffer(unsigned_type k)
const value_type & top() const
Returns "largest" element.
External priority queue data structure Introduction to priority queue container: see STXXL Priorit...
#define STXXL_VERBOSE1(x)
find_B_m< ElementSize, IntMem, MaxItems, B, m+step, fits||(m+step >=k)>::result candidate1
if not fits, recurse into configuration with +step more internal buffers
unsigned_type current_group_buffer_size(unsigned_type i) const
Parameters_for_priority_queue_not_found_Increase_IntMem result
#define STXXL_THROW2(exception_type, location, error_message)
Throws exception_type with "Error in [location] : [error_message]".
bool is_sorted(ForwardIterator first, ForwardIterator last, StrictWeakOrdering comp)
Similar to std::priority_queue, with the following differences:
void swap(int_merger &obj)
insert_heap_type insert_heap
void append_merger(Merger &another_merger, size_type segment_size)
Merge all items from another merger and insert the resulting external array into the merger...
#define STXXL_STATIC_ASSERT(x)
priority_queue_local::int_merger< value_type, comparator_type, IntKMAX > int_merger_type
static const int_type c
remaining blocks, (freely moving, not necessarily unused) ?-tb
uint64 external_size_type
choose_int_types< my_pointer_size >::unsigned_type unsigned_type
priority_queue< priority_queue_config< ValueType, CompareType, Buffer1Size, N, AI, 2, B, AE, 2 > > result
static const size_t sentinel
static const internal_size_type B
block size (iterates from 8 MiB downwards)
IF<(N >=CriticalSize), Self, typename compute_N< AI/2, X, CriticalSize >::result >::result result
value_type get_supremum() const
find_B_m< ElementSize, IntMem, MaxItems, B/2, 1, fits||candidate1::fits >::result candidate2
if not fits, recurse into configuration with block size halved.
unsigned_type num_active_groups
IF template metaprogramming statement.
static const unsigned_type AI
read_write_pool< block_type > pool_type
Config::value_type value_type
The type of object stored in the priority_queue.
ext_merger_type ** ext_mergers
value_type * delete_buffer_end
compute_N< AI_, X_, CriticalSize > Self
void merge2_iterator(InputIterator &source0, InputIterator &source1, OutputIterator target, OutputIterator end, CompareType &cmp)
#define STXXL_END_NAMESPACE
priority_queue_local::compute_N<(1<< Tune), X, 4 *Buffer1Size >::result ComputeN
priority_queue(pool_type &pool_)
Constructs external priority queue object.