16 #ifndef STXXL_CONTAINERS_PQ_EXT_MERGER_HEADER
17 #define STXXL_CONTAINERS_PQ_EXT_MERGER_HEADER
29 namespace priority_queue_local {
31 template <
typename Iterator>
34 typedef std::pair<Iterator, Iterator>
pair;
39 typedef typename std::iterator_traits<iterator>::value_type
value_type;
40 typedef typename std::iterator_traits<iterator>::difference_type
size_type;
50 pair(first, last), m_origin(origin)
105 return end() - begin();
123 template <
class BlockType_,
139 enum { arity = Arity_, arity_bound = 1UL << (LOG2<Arity_>::ceil) };
146 return !(cmp(cmp.min_value(), a));
151 return cmp(cmp.min_value(), a);
166 return (*block)[current];
170 : block(NULL), current(0),
171 bids(NULL), merger(NULL),
189 (*block)[0] = cmp.min_value();
194 return !(cmp(cmp.min_value(), a));
199 return cmp(cmp.min_value(), a);
206 std::swap(current, obj.
current);
207 std::swap(block, obj.
block);
208 std::swap(bids, obj.
bids);
209 assert(merger == obj.
merger);
216 assert(not_sentinel((*block)[current]));
217 assert(current < block->size);
221 if (current == block->size)
223 STXXL_VERBOSE2(
"ext_merger sequence_state operator++ crossing block border ");
225 assert(bids != NULL);
228 STXXL_VERBOSE2(
"ext_merger sequence_state operator++ it was the last block in the sequence ");
235 STXXL_VERBOSE2(
"ext_merger sequence_state operator++ there is another block ");
238 merger->pool->hint(bid);
239 if (!(bids->empty()))
241 STXXL_VERBOSE2(
"ext_merger sequence_state operator++ more blocks exist in a sequence, hinting the next");
242 merger->pool->hint(bids->front());
244 merger->pool->read(block, bid)->wait();
245 STXXL_VERBOSE2(
"first element of read block " << bid <<
" " << *(block->begin()) <<
" cached in " << block);
246 if (!(bids->empty()))
247 merger->pool->hint(bids->front());
248 block_manager::get_instance()->delete_block(bid);
257 #if STXXL_PQ_EXTERNAL_LOSER_TREE
263 #endif //STXXL_PQ_EXTERNAL_LOSER_TREE
275 #if STXXL_PQ_EXTERNAL_LOSER_TREE
278 Entry entry[arity_bound];
279 #endif //STXXL_PQ_EXTERNAL_LOSER_TREE
292 size_(0), log_k(0), k(1), pool(0)
298 size_(0), log_k(0), k(1),
309 delete states[i].
block;
311 delete sentinel_block;
323 assert(!cmp(cmp.min_value(), cmp.min_value()));
325 sentinel_block = NULL;
326 if (arity < arity_bound)
330 (*sentinel_block)[i] = cmp.min_value();
331 if (arity + 1 == arity_bound) {
333 STXXL_ERRMSG(
"inefficient PQ parameters for ext_merger: arity + 1 == arity_bound");
343 states[i].
block = sentinel_block;
349 free_segments.
push(0);
351 rebuild_loser_tree();
352 #if STXXL_PQ_EXTERNAL_LOSER_TREE
353 assert(is_sentinel(*states[entry[0].index]));
354 #endif //STXXL_PQ_EXTERNAL_LOSER_TREE
360 #if STXXL_PQ_EXTERNAL_LOSER_TREE
362 entry[0].index = winner;
363 entry[0].key = *(states[winner]);
364 #endif //STXXL_PQ_EXTERNAL_LOSER_TREE
368 #if STXXL_PQ_EXTERNAL_LOSER_TREE
376 if (root >= k || root >= arity_bound)
384 value_type lk = *(states[left]);
385 value_type rk = *(states[right]);
386 assert(root < arity_bound);
389 entry[root].index = right;
390 entry[root].key = rk;
395 entry[root].index = left;
396 entry[root].key = lk;
407 void update_on_insert(
409 const value_type& new_key,
411 value_type* winner_key,
418 *winner_key = entry[0].key;
419 *winner_index = entry[0].index;
420 if (cmp(entry[node].key, new_key))
422 entry[node].key = new_key;
423 entry[node].index = new_index;
428 update_on_insert(node >> 1, new_key, new_index, winner_key, winner_index, mask);
429 value_type loserKey = entry[node].key;
431 if ((*winner_index & *mask) != (new_index & *mask))
433 if (cmp(loserKey, new_key))
435 if (cmp(*winner_key, new_key))
437 entry[node].key = *winner_key;
438 entry[node].index = *winner_index;
442 entry[node].key = new_key;
443 entry[node].index = new_index;
446 *winner_key = loserKey;
447 *winner_index = loserIndex;
459 #endif //STXXL_PQ_EXTERNAL_LOSER_TREE
464 STXXL_VERBOSE1(
"ext_merger::double_k (before) k=" << k <<
" log_k=" << log_k <<
" arity_bound=" << arity_bound <<
" arity=" << arity <<
" #free=" << free_segments.
size());
467 assert(free_segments.
empty());
475 free_segments.
push(i);
482 STXXL_VERBOSE1(
"ext_merger::double_k (after) k=" << k <<
" log_k=" << log_k <<
" arity_bound=" << arity_bound <<
" arity=" << arity <<
" #free=" << free_segments.
size());
483 assert(!free_segments.
empty());
484 assert(k <= arity_bound);
487 rebuild_loser_tree();
494 STXXL_VERBOSE1(
"ext_merger::compact_tree (before) k=" << k <<
" log_k=" << log_k <<
" #free=" << free_segments.
size());
502 if (!is_segment_empty(from))
504 assert(is_segment_allocated(from));
507 assert(!is_segment_allocated(target));
508 states[target].
swap(states[from]);
515 while (k > 1 && target <= (k / 2))
522 free_segments.
clear();
523 for ( ; target < k; target++)
525 assert(!is_segment_allocated(target));
528 free_segments.
push(target);
531 STXXL_VERBOSE1(
"ext_merger::compact_tree (after) k=" << k <<
" log_k=" << log_k <<
" #free=" << free_segments.
size());
535 rebuild_loser_tree();
542 std::swap(cmp, obj.
cmp);
544 std::swap(size_, obj.
size_);
545 std::swap(log_k, obj.
log_k);
557 return (STXXL_MIN<unsigned_type>(arity + 1, arity_bound) * block_type::raw_size);
565 template <
class OutputIterator>
570 STXXL_VERBOSE1(
"ext_merger::multi_merge from " << k <<
" sequence(s), length = " << length);
576 assert(length <= size_);
580 #if STXXL_PARALLEL && STXXL_PARALLEL_PQ_MULTIWAY_MERGE_EXTERNAL
582 typedef std::pair<typename block_type::iterator, typename block_type::iterator>
sequence;
584 std::vector<sequence> seqs;
585 std::vector<unsigned_type> orig_seq_index;
592 if (states[i].current == states[i].block->size || is_sentinel(*states[i]))
595 seqs.push_back(std::make_pair(states[i].block->begin() + states[i].
current, states[i].
block->end()));
596 orig_seq_index.push_back(i);
598 #if STXXL_CHECK_ORDER_IN_SORTS
599 if (!is_sentinel(*seqs.back().first) && !
stxxl::is_sorted(seqs.back().first, seqs.back().second, inv_cmp))
601 STXXL_VERBOSE1(
"length " << i <<
" " << (seqs.back().second - seqs.back().first));
602 for (
value_type* v = seqs.back().first + 1; v < seqs.back().second; ++v)
604 if (inv_cmp(*v, *(v - 1)))
606 STXXL_VERBOSE1(
"Error at position " << i <<
"/" << (v - seqs.back().first - 1) <<
"/" << (v - seqs.back().first) <<
" " << *(v - 1) <<
" " << *v);
610 STXXL_VERBOSE1(
"Wrong sentinel at position " << (v - seqs.back().first));
618 if (states[i].bids != NULL && !states[i].bids->empty())
619 pool->
hint(states[i].
bids->front());
622 assert(seqs.size() > 0);
624 #if STXXL_CHECK_ORDER_IN_SORTS
628 diff_type rest = length;
633 diff_type total_size = 0;
637 diff_type seq_i_size = seqs[i].second - seqs[i].first;
640 total_size += seq_i_size;
641 if (inv_cmp(*(seqs[i].second - 1), min_last))
642 min_last = *(seqs[i].second - 1);
644 STXXL_VERBOSE1(
"front block of seq " << i <<
": front=" << *(seqs[i].first) <<
" back=" << *(seqs[i].second - 1) <<
" len=" << seq_i_size);
650 assert(total_size > 0);
651 assert(!is_sentinel(min_last));
653 STXXL_VERBOSE1(
"min_last " << min_last <<
" total size " << total_size <<
" num_seq " << seqs.size());
655 diff_type less_equal_than_min_last = 0;
661 typename block_type::iterator position =
662 std::upper_bound(seqs[i].first, seqs[i].second, min_last, inv_cmp);
666 STXXL_VERBOSE1(
"seq " << i <<
": " << (position - seqs[i].first) <<
" greater equal than " << min_last);
668 less_equal_than_min_last += (position - seqs[i].first);
671 diff_type output_size =
STXXL_MIN(less_equal_than_min_last, rest);
673 STXXL_VERBOSE1(
"output_size=" << output_size <<
" = min(leq_t_ml=" << less_equal_than_min_last <<
", rest=" << rest <<
")");
675 assert(output_size > 0);
679 begin = parallel::multiway_merge(seqs.begin(), seqs.end(), begin, inv_cmp, output_size);
682 size_ -= output_size;
690 assert(seqs[i].first <= seqs[i].second);
692 if (seqs[i].first == seqs[i].second)
695 if (state.
bids == NULL || state.
bids->empty())
697 STXXL_VERBOSE1(
"seq " << i <<
": ext_merger::multi_merge(...) it was the last block in the sequence ");
702 #if STXXL_CHECK_ORDER_IN_SORTS
703 last_elem = *(seqs[i].second - 1);
705 STXXL_VERBOSE1(
"seq " << i <<
": ext_merger::multi_merge(...) there is another block ");
707 state.
bids->pop_front();
709 if (!(state.
bids->empty()))
711 STXXL_VERBOSE2(
"seq " << i <<
": ext_merger::multi_merge(...) more blocks exist, hinting the next");
715 STXXL_VERBOSE1(
"seq " << i <<
": first element of read block " << bid <<
" " << *(state.
block->begin()) <<
" cached in " << state.
block);
716 if (!(state.
bids->empty()))
719 seqs[i] = std::make_pair(state.
block->begin() + state.
current, state.
block->end());
720 block_manager::get_instance()->delete_block(bid);
722 #if STXXL_CHECK_ORDER_IN_SORTS
723 STXXL_VERBOSE1(
"before " << last_elem <<
" after " << *seqs[i].first <<
" newly loaded block " << bid);
726 STXXL_VERBOSE1(
"length " << i <<
" " << (seqs[i].second - seqs[i].first));
727 for (
value_type* v = seqs[i].first + 1; v < seqs[i].second; ++v)
729 if (inv_cmp(*v, *(v - 1)))
731 STXXL_VERBOSE1(
"Error at position " << i <<
"/" << (v - seqs[i].first - 1) <<
"/" << (v - seqs[i].first) <<
" " << *(v - 1) <<
" " << *v);
735 STXXL_VERBOSE1(
"Wrong sentinel at position " << (v - seqs[i].first));
749 if (is_segment_empty(seg))
752 deallocate_segment(seg);
756 #else // STXXL_PARALLEL && STXXL_PARALLEL_PQ_MULTIWAY_MERGE_EXTERNAL
761 if (states[i].bids != NULL && !states[i].bids->empty())
762 pool->
hint(states[i].
bids->front());
768 assert(entry[0].index == 0);
769 assert(free_segments.
empty());
772 for (
size_type i = 0; i < length; ++i, ++(states[0]), ++begin)
773 *begin = *(states[0]);
775 entry[0].key = **states;
776 if (is_segment_empty(0))
777 deallocate_segment(0);
783 rebuild_loser_tree();
784 if (is_segment_empty(0) && is_segment_allocated(0))
785 deallocate_segment(0);
787 if (is_segment_empty(1) && is_segment_allocated(1))
788 deallocate_segment(1);
793 if (is_segment_empty(3))
796 merge4_iterator(states[0], states[1], states[2], states[3], begin, length, cmp);
797 rebuild_loser_tree();
798 if (is_segment_empty(0) && is_segment_allocated(0))
799 deallocate_segment(0);
801 if (is_segment_empty(1) && is_segment_allocated(1))
802 deallocate_segment(1);
804 if (is_segment_empty(2) && is_segment_allocated(2))
805 deallocate_segment(2);
807 if (is_segment_empty(3) && is_segment_allocated(3))
808 deallocate_segment(3);
811 case 3: multi_merge_f<OutputIterator, 3>(begin, end);
813 case 4: multi_merge_f<OutputIterator, 4>(begin, end);
815 case 5: multi_merge_f<OutputIterator, 5>(begin, end);
817 case 6: multi_merge_f<OutputIterator, 6>(begin, end);
819 case 7: multi_merge_f<OutputIterator, 7>(begin, end);
821 case 8: multi_merge_f<OutputIterator, 8>(begin, end);
823 case 9: multi_merge_f<OutputIterator, 9>(begin, end);
825 case 10: multi_merge_f<OutputIterator, 10>(begin, end);
827 default: multi_merge_k(begin, end);
836 const unsigned_type num_segments_used = std::min<unsigned_type>(arity, k) - free_segments.
size();
843 STXXL_VERBOSE3(
"ext_merger compact? k=" << k <<
" #used=" << num_segments_used
844 <<
" <= #trigger=" << num_segments_trigger <<
" ==> "
845 << ((k > 1 && num_segments_used <= num_segments_trigger) ?
"yes" :
"no ")
847 << ((k == 4 && !free_segments.
empty() && !is_segment_empty(3)) ?
"yes" :
"no ")
848 <<
" #free=" << free_segments.
size());
849 if (k > 1 && ((num_segments_used <= num_segments_trigger) ||
850 (k == 4 && !free_segments.
empty() && !is_segment_empty(3))))
858 #if STXXL_PQ_EXTERNAL_LOSER_TREE
860 template <
class OutputIterator>
861 void multi_merge_k(OutputIterator begin, OutputIterator end)
864 value_type current_key;
867 OutputIterator done = end;
868 OutputIterator target = begin;
870 value_type winner_key = entry[0].key;
872 while (target != done)
875 *target = *(states[winner_index]);
878 ++(states[winner_index]);
880 winner_key = *(states[winner_index]);
883 if (is_sentinel(winner_key))
884 deallocate_segment(winner_index);
888 for (
unsigned_type i = (winner_index + kReg) >> 1; i > 0; i >>= 1)
890 current_pos = entry + i;
891 current_key = current_pos->key;
892 if (cmp(winner_key, current_key))
894 current_index = current_pos->index;
895 current_pos->key = winner_key;
896 current_pos->index = winner_index;
897 winner_key = current_key;
898 winner_index = current_index;
904 entry[0].index = winner_index;
905 entry[0].key = winner_key;
908 template <
class OutputIterator,
int LogK>
909 void multi_merge_f(OutputIterator begin, OutputIterator end)
911 OutputIterator done = end;
912 OutputIterator target = begin;
914 Entry* reg_entry = entry;
915 sequence_state* reg_states = states;
916 value_type winner_key = entry[0].key;
918 assert(log_k >= LogK);
919 while (target != done)
922 *target = *(reg_states[winner_index]);
925 ++(reg_states[winner_index]);
927 winner_key = *(reg_states[winner_index]);
930 if (is_sentinel(winner_key))
931 deallocate_segment(winner_index);
937 #define TreeStep(L) \
938 if (1 << LogK >= 1 << L) { \
939 Entry* pos ## L = reg_entry + ((winner_index + (1 << LogK)) >> (((int(LogK - L) + 1) >= 0) ? ((LogK - L) + 1) : 0)); \
940 value_type key ## L = pos ## L->key; \
941 if (cmp(winner_key, key ## L)) { \
942 unsigned_type index ## L = pos ## L->index; \
943 pos ## L->key = winner_key; \
944 pos ## L->index = winner_index; \
945 winner_key = key ## L; \
946 winner_index = index ## L; \
961 reg_entry[0].index = winner_index;
962 reg_entry[0].key = winner_key;
964 #endif //STXXL_PQ_EXTERNAL_LOSER_TREE
969 return k < arity || !free_segments.
empty();
974 template <
class Merger>
979 if (segment_size > 0)
982 if (free_segments.
empty())
986 assert(!free_segments.
empty());
992 assert(segment_size);
998 STXXL_VERBOSE1(
"ext_merger::insert_segment(merger,...) WARNING: inserting a segment with " <<
999 nblocks <<
" blocks");
1000 STXXL_VERBOSE1(
"THIS IS INEFFICIENT: TRY TO CHANGE PRIORITY QUEUE PARAMETERS");
1003 if (first_size == 0)
1005 first_size = block_type::size;
1009 std::list<bid_type>* bids =
new std::list<bid_type>(nblocks);
1013 another_merger.multi_merge(
1014 first_block->begin() + (block_type::size - first_size),
1015 first_block->end());
1017 STXXL_VERBOSE1(
"last element of first block " << *(first_block->end() - 1));
1018 assert(!cmp(*(first_block->begin() + (block_type::size - first_size)), *(first_block->end() - 1)));
1022 for (
typename std::list<bid_type>::iterator curbid = bids->begin(); curbid != bids->end(); ++curbid)
1025 another_merger.multi_merge(b->begin(), b->end());
1026 STXXL_VERBOSE1(
"first element of following block " << *curbid <<
" " << *(b->begin()));
1027 STXXL_VERBOSE1(
"last element of following block " << *curbid <<
" " << *(b->end() - 1));
1028 assert(!cmp(*(b->begin()), *(b->end() - 1)));
1029 pool->
write(b, *curbid);
1030 STXXL_VERBOSE1(
"written to block " << *curbid <<
" cached in " << b);
1033 insert_segment(bids, first_block, first_size, free_slot);
1035 size_ += segment_size;
1037 #if STXXL_PQ_EXTERNAL_LOSER_TREE
1042 update_on_insert((free_slot + k) >> 1, *(states[free_slot]), free_slot,
1043 &dummyKey, &dummyIndex, &dummyMask);
1044 #endif //STXXL_PQ_EXTERNAL_LOSER_TREE
1065 STXXL_VERBOSE1(
"ext_merger::insert_segment(bidlist,...) " <<
this <<
" " << bidlist->size() <<
" " << slot);
1066 assert(!is_segment_allocated(slot));
1067 assert(first_size > 0);
1070 new_sequence.
current = block_type::size - first_size;
1071 std::swap(new_sequence.
block, first_block);
1073 std::swap(new_sequence.
bids, bidlist);
1076 assert(bidlist->empty());
1080 assert(is_segment_allocated(slot));
1086 STXXL_VERBOSE1(
"ext_merger::deallocate_segment() deleting segment " << slot <<
" allocated=" <<
int(is_segment_allocated(slot)));
1087 assert(is_segment_allocated(slot));
1092 free_segments.
push(slot);
1098 return is_sentinel(*(states[slot]));
1115 #endif // !STXXL_CONTAINERS_PQ_EXT_MERGER_HEADER
const Tp & STXXL_MIN(const Tp &a, const Tp &b)
const_reference front() const
bool is_sorted(_ForwardIter __first, _ForwardIter __last)
block_type::bid_type bid_type
External merger, based on the loser tree data structure.
void swap_1D_arrays(T *a, T *b, unsigned_type size)
unsigned long long int uint64
bool is_segment_allocated(unsigned_type slot) const
std::list< bid_type > * bids
bool hint(bid_type bid)
Gives a hint for prefetching a block.
Inverts the order of a comparison functor by swapping its arguments.
std::pair< Iterator, Iterator > pair
void insert_segment(std::list< bid_type > *bidlist, block_type *first_block, unsigned_type first_size, unsigned_type slot)
#define STXXL_DEFAULT_ALLOC_STRATEGY
std::iterator_traits< iterator >::value_type value_type
void merge4_iterator(InputIterator &source0, InputIterator &source1, InputIterator &source2, InputIterator &source3, OutputIterator target, unsigned_type length, CompareType &cmp)
void set_pool(pool_type *pool_)
request_ptr read(block_type *&block, bid_type bid)
Reads block. If this block is cached block is not read but passed from the cache. ...
short_sequence(Iterator first, Iterator last, origin_type origin)
bool not_sentinel(const value_type &a) const
#define STXXL_VERBOSE2(x)
void insert_segment(Merger &another_merger, size_type segment_size)
const_iterator cbegin() const
void merge_iterator(InputIterator &source0, InputIterator &source1, OutputIterator target, unsigned_type length, CompareType &cmp)
internal_bounded_stack< unsigned_type, arity > free_segments
uint_pair & operator++()
prefix increment operator (directly manipulates the integer parts)
sequence_state states[arity_bound]
ext_merger(pool_type *pool_)
const_iterator end() const
void delete_blocks(const BIDIteratorClass &bidbegin, const BIDIteratorClass &bidend)
Deallocates blocks.
void swap(sequence_state &obj)
read_write_pool< block_type > pool_type
virtual void wait(bool measure_time=true)=0
Suspends calling thread until completion of the request.
External sequence or deque container without random access. Introduction to sequence container: se...
void deallocate_segment(unsigned_type slot)
#define STXXL_VERBOSE3(x)
Implements dynamically resizable buffered writing and prefetched reading pool.
unsigned_type origin_type
request_ptr write(block_type *&block, bid_type bid)
Passes a block to the pool for writing.
void merge3_iterator(InputIterator &source0, InputIterator &source1, InputIterator &source2, OutputIterator target, unsigned_type length, CompareType &cmp)
#define STXXL_BEGIN_NAMESPACE
#define STXXL_VERBOSE1(x)
origin_type origin() const
block_type * sentinel_block
block_type::value_type value_type
block_type * steal()
Take out a block from the pool.
bool is_space_available() const
void rebuild_loser_tree()
const value_type & const_reference
void new_blocks(const DiskAssignFunctor &functor, BIDIteratorClass bidbegin, BIDIteratorClass bidend, unsigned_type offset=0)
Allocates new blocks.
void push(const value_type &x)
const_iterator begin() const
unsigned_type mem_cons() const
choose_int_types< my_pointer_size >::unsigned_type unsigned_type
void multi_merge(OutputIterator begin, OutputIterator end)
bool is_sentinel(const value_type &a) const
std::iterator_traits< iterator >::difference_type size_type
const_iterator cend() const
const_reference back() const
const value_type & top() const
bool not_sentinel(const value_type &a) const
bool is_sentinel(const value_type &a) const
size_type size_write() const
Returns number of blocks owned by the write_pool.
#define STXXL_END_NAMESPACE
const iterator const_iterator
bool is_segment_empty(unsigned_type slot) const