16 #ifndef STXXL_CONTAINERS_PQ_EXT_MERGER_HEADER
17 #define STXXL_CONTAINERS_PQ_EXT_MERGER_HEADER
29 namespace priority_queue_local {
31 template <
typename Iterator>
34 typedef std::pair<Iterator, Iterator>
pair;
39 typedef typename std::iterator_traits<iterator>::value_type
value_type;
40 typedef typename std::iterator_traits<iterator>::difference_type
size_type;
50 :
pair(first, last), m_origin(origin)
105 return end() - begin();
123 template <
class BlockType,
139 enum { arity = Arity, arity_bound = 1UL << (LOG2<Arity>::ceil) };
146 return !(cmp(cmp.min_value(), a));
151 return cmp(cmp.min_value(), a);
166 return (*block)[current];
170 : block(NULL), current(0),
171 bids(NULL), merger(NULL),
189 (*block)[0] = cmp.min_value();
194 return !(cmp(cmp.min_value(), a));
199 return cmp(cmp.min_value(), a);
206 std::swap(current, obj.
current);
207 std::swap(block, obj.
block);
208 std::swap(bids, obj.
bids);
209 assert(merger == obj.
merger);
216 assert(not_sentinel((*block)[current]));
217 assert(current < block->size);
221 if (current == block->size)
223 STXXL_VERBOSE2(
"ext_merger sequence_state operator++ crossing block border ");
225 assert(bids != NULL);
228 STXXL_VERBOSE2(
"ext_merger sequence_state operator++ it was the last block in the sequence ");
235 STXXL_VERBOSE2(
"ext_merger sequence_state operator++ there is another block ");
238 merger->pool->hint(bid);
239 if (!(bids->empty()))
241 STXXL_VERBOSE2(
"ext_merger sequence_state operator++ more blocks exist in a sequence, hinting the next");
242 merger->pool->hint(bids->front());
244 merger->pool->read(block, bid)->wait();
245 STXXL_VERBOSE2(
"first element of read block " << bid <<
" " << *(block->begin()) <<
" cached in " << block);
246 if (!(bids->empty()))
247 merger->pool->hint(bids->front());
248 block_manager::get_instance()->delete_block(bid);
256 #if STXXL_PQ_EXTERNAL_LOSER_TREE
262 #endif //STXXL_PQ_EXTERNAL_LOSER_TREE
274 #if STXXL_PQ_EXTERNAL_LOSER_TREE
277 Entry entry[arity_bound];
278 #endif //STXXL_PQ_EXTERNAL_LOSER_TREE
291 : size_(0), log_k(0), k(1), pool(0)
297 : size_(0), log_k(0), k(1),
308 delete states[i].
block;
310 delete sentinel_block;
322 assert(!cmp(cmp.min_value(), cmp.min_value()));
324 sentinel_block = NULL;
325 if (arity < arity_bound)
329 (*sentinel_block)[i] = cmp.min_value();
330 if (arity + 1 == arity_bound) {
332 STXXL_ERRMSG(
"inefficient PQ parameters for ext_merger: arity + 1 == arity_bound");
342 states[i].
block = sentinel_block;
348 free_segments.
push(0);
350 rebuild_loser_tree();
351 #if STXXL_PQ_EXTERNAL_LOSER_TREE
352 assert(is_sentinel(*states[entry[0].index]));
353 #endif //STXXL_PQ_EXTERNAL_LOSER_TREE
359 #if STXXL_PQ_EXTERNAL_LOSER_TREE
361 entry[0].index = winner;
362 entry[0].key = *(states[winner]);
363 #endif //STXXL_PQ_EXTERNAL_LOSER_TREE
366 #if STXXL_PQ_EXTERNAL_LOSER_TREE
374 if (root >= k || root >= arity_bound)
382 value_type lk = *(states[left]);
383 value_type rk = *(states[right]);
384 assert(root < arity_bound);
387 entry[root].index = right;
388 entry[root].key = rk;
393 entry[root].index = left;
394 entry[root].key = lk;
405 void update_on_insert(
407 const value_type& new_key,
409 value_type* winner_key,
416 *winner_key = entry[0].key;
417 *winner_index = entry[0].index;
418 if (cmp(entry[node].key, new_key))
420 entry[node].key = new_key;
421 entry[node].index = new_index;
426 update_on_insert(node >> 1, new_key, new_index, winner_key, winner_index, mask);
427 value_type loserKey = entry[node].key;
429 if ((*winner_index & *mask) != (new_index & *mask))
431 if (cmp(loserKey, new_key))
433 if (cmp(*winner_key, new_key))
435 entry[node].key = *winner_key;
436 entry[node].index = *winner_index;
440 entry[node].key = new_key;
441 entry[node].index = new_index;
444 *winner_key = loserKey;
445 *winner_index = loserIndex;
457 #endif //STXXL_PQ_EXTERNAL_LOSER_TREE
462 STXXL_VERBOSE1(
"ext_merger::double_k (before) k=" << k <<
" log_k=" << log_k <<
" arity_bound=" << arity_bound <<
" arity=" << arity <<
" #free=" << free_segments.
size());
465 assert(free_segments.
empty());
473 free_segments.
push(i);
480 STXXL_VERBOSE1(
"ext_merger::double_k (after) k=" << k <<
" log_k=" << log_k <<
" arity_bound=" << arity_bound <<
" arity=" << arity <<
" #free=" << free_segments.
size());
481 assert(!free_segments.
empty());
482 assert(k <= arity_bound);
485 rebuild_loser_tree();
491 STXXL_VERBOSE1(
"ext_merger::compact_tree (before) k=" << k <<
" log_k=" << log_k <<
" #free=" << free_segments.
size());
499 if (!is_segment_empty(from))
501 assert(is_segment_allocated(from));
504 assert(!is_segment_allocated(target));
505 states[target].
swap(states[from]);
512 while (k > 1 && target <= (k / 2))
519 free_segments.
clear();
520 for ( ; target < k; target++)
522 assert(!is_segment_allocated(target));
525 free_segments.
push(target);
528 STXXL_VERBOSE1(
"ext_merger::compact_tree (after) k=" << k <<
" log_k=" << log_k <<
" #free=" << free_segments.
size());
532 rebuild_loser_tree();
538 std::swap(cmp, obj.
cmp);
540 std::swap(size_, obj.
size_);
541 std::swap(log_k, obj.
log_k);
553 return (STXXL_MIN<unsigned_type>(arity + 1, arity_bound) * block_type::raw_size);
561 template <
class OutputIterator>
566 STXXL_VERBOSE1(
"ext_merger::multi_merge from " << k <<
" sequence(s),"
567 " length = " << length);
577 #if STXXL_PARALLEL && STXXL_PARALLEL_PQ_MULTIWAY_MERGE_EXTERNAL
580 typedef std::pair<typename block_type::iterator, typename block_type::iterator>
sequence;
582 std::vector<sequence> seqs;
583 std::vector<unsigned_type> orig_seq_index;
590 if (states[i].current == states[i].block->size || is_sentinel(*states[i]))
593 seqs.push_back(std::make_pair(states[i].block->begin() + states[i].
current, states[i].
block->end()));
594 orig_seq_index.push_back(i);
596 #if STXXL_CHECK_ORDER_IN_SORTS
597 if (!is_sentinel(*seqs.back().first) && !
stxxl::is_sorted(seqs.back().first, seqs.back().second, inv_cmp))
599 STXXL_VERBOSE1(
"length " << i <<
" " << (seqs.back().second - seqs.back().first));
600 for (
value_type* v = seqs.back().first + 1; v < seqs.back().second; ++v)
602 if (inv_cmp(*v, *(v - 1)))
604 STXXL_VERBOSE1(
"Error at position " << i <<
"/" << (v - seqs.back().first - 1) <<
"/" << (v - seqs.back().first) <<
" " << *(v - 1) <<
" " << *v);
608 STXXL_VERBOSE1(
"Wrong sentinel at position " << (v - seqs.back().first));
616 if (states[i].bids != NULL && !states[i].bids->empty())
617 pool->
hint(states[i].
bids->front());
620 assert(seqs.size() > 0);
622 #if STXXL_CHECK_ORDER_IN_SORTS
626 diff_type rest = length;
631 diff_type total_size = 0;
635 diff_type seq_i_size = seqs[i].second - seqs[i].first;
638 total_size += seq_i_size;
639 if (inv_cmp(*(seqs[i].second - 1), min_last))
640 min_last = *(seqs[i].second - 1);
642 STXXL_VERBOSE1(
"front block of seq " << i <<
": front=" << *(seqs[i].first) <<
" back=" << *(seqs[i].second - 1) <<
" len=" << seq_i_size);
648 assert(total_size > 0);
649 assert(!is_sentinel(min_last));
651 STXXL_VERBOSE1(
"min_last " << min_last <<
" total size " << total_size <<
" num_seq " << seqs.size());
653 diff_type less_equal_than_min_last = 0;
659 typename block_type::iterator position =
660 std::upper_bound(seqs[i].first, seqs[i].second, min_last, inv_cmp);
664 STXXL_VERBOSE1(
"seq " << i <<
": " << (position - seqs[i].first) <<
" greater equal than " << min_last);
666 less_equal_than_min_last += (position - seqs[i].first);
669 diff_type output_size =
STXXL_MIN(less_equal_than_min_last, rest);
671 STXXL_VERBOSE1(
"output_size=" << output_size <<
" = min(leq_t_ml=" << less_equal_than_min_last <<
", rest=" << rest <<
")");
673 assert(output_size > 0);
677 begin = parallel::multiway_merge(seqs.begin(), seqs.end(), begin, inv_cmp, output_size);
680 size_ -= output_size;
688 assert(seqs[i].first <= seqs[i].second);
690 if (seqs[i].first == seqs[i].second)
693 if (state.
bids == NULL || state.
bids->empty())
695 STXXL_VERBOSE1(
"seq " << i <<
": ext_merger::multi_merge(...) it was the last block in the sequence ");
700 #if STXXL_CHECK_ORDER_IN_SORTS
701 last_elem = *(seqs[i].second - 1);
703 STXXL_VERBOSE1(
"seq " << i <<
": ext_merger::multi_merge(...) there is another block ");
705 state.
bids->pop_front();
707 if (!(state.
bids->empty()))
709 STXXL_VERBOSE2(
"seq " << i <<
": ext_merger::multi_merge(...) more blocks exist, hinting the next");
713 STXXL_VERBOSE1(
"seq " << i <<
": first element of read block " << bid <<
" " << *(state.
block->begin()) <<
" cached in " << state.
block);
714 if (!(state.
bids->empty()))
717 seqs[i] = std::make_pair(state.
block->begin() + state.
current, state.
block->end());
718 block_manager::get_instance()->delete_block(bid);
720 #if STXXL_CHECK_ORDER_IN_SORTS
721 STXXL_VERBOSE1(
"before " << last_elem <<
" after " << *seqs[i].first <<
" newly loaded block " << bid);
724 STXXL_VERBOSE1(
"length " << i <<
" " << (seqs[i].second - seqs[i].first));
725 for (
value_type* v = seqs[i].first + 1; v < seqs[i].second; ++v)
727 if (inv_cmp(*v, *(v - 1)))
729 STXXL_VERBOSE1(
"Error at position " << i <<
"/" << (v - seqs[i].first - 1) <<
"/" << (v - seqs[i].first) <<
" " << *(v - 1) <<
" " << *v);
733 STXXL_VERBOSE1(
"Wrong sentinel at position " << (v - seqs[i].first));
747 if (is_segment_empty(seg))
750 deallocate_segment(seg);
754 #else // STXXL_PARALLEL && STXXL_PARALLEL_PQ_MULTIWAY_MERGE_EXTERNAL
759 if (states[i].bids != NULL && !states[i].bids->empty())
760 pool->
hint(states[i].
bids->front());
766 assert(entry[0].index == 0);
767 assert(free_segments.
empty());
770 for (
int_type i = 0; i < length; ++i, ++(states[0]), ++begin)
771 *begin = *(states[0]);
773 entry[0].key = **states;
774 if (is_segment_empty(0))
775 deallocate_segment(0);
781 rebuild_loser_tree();
782 if (is_segment_empty(0) && is_segment_allocated(0))
783 deallocate_segment(0);
785 if (is_segment_empty(1) && is_segment_allocated(1))
786 deallocate_segment(1);
791 if (is_segment_empty(3))
794 merge4_iterator(states[0], states[1], states[2], states[3], begin, length, cmp);
795 rebuild_loser_tree();
796 if (is_segment_empty(0) && is_segment_allocated(0))
797 deallocate_segment(0);
799 if (is_segment_empty(1) && is_segment_allocated(1))
800 deallocate_segment(1);
802 if (is_segment_empty(2) && is_segment_allocated(2))
803 deallocate_segment(2);
805 if (is_segment_empty(3) && is_segment_allocated(3))
806 deallocate_segment(3);
809 case 3: multi_merge_f<OutputIterator, 3>(begin, end);
811 case 4: multi_merge_f<OutputIterator, 4>(begin, end);
813 case 5: multi_merge_f<OutputIterator, 5>(begin, end);
815 case 6: multi_merge_f<OutputIterator, 6>(begin, end);
817 case 7: multi_merge_f<OutputIterator, 7>(begin, end);
819 case 8: multi_merge_f<OutputIterator, 8>(begin, end);
821 case 9: multi_merge_f<OutputIterator, 9>(begin, end);
823 case 10: multi_merge_f<OutputIterator, 10>(begin, end);
825 default: multi_merge_k(begin, end);
834 const unsigned_type num_segments_used = std::min<unsigned_type>(arity, k) - free_segments.
size();
841 STXXL_VERBOSE3(
"ext_merger compact? k=" << k <<
" #used=" << num_segments_used
842 <<
" <= #trigger=" << num_segments_trigger <<
" ==> "
843 << ((k > 1 && num_segments_used <= num_segments_trigger) ?
"yes" :
"no ")
845 << ((k == 4 && !free_segments.
empty() && !is_segment_empty(3)) ?
"yes" :
"no ")
846 <<
" #free=" << free_segments.
size());
847 if (k > 1 && ((num_segments_used <= num_segments_trigger) ||
848 (k == 4 && !free_segments.
empty() && !is_segment_empty(3))))
856 #if STXXL_PQ_EXTERNAL_LOSER_TREE
858 template <
class OutputIterator>
859 void multi_merge_k(OutputIterator begin, OutputIterator end)
862 value_type current_key;
865 OutputIterator done = end;
866 OutputIterator target = begin;
868 value_type winner_key = entry[0].key;
870 while (target != done)
873 *target = *(states[winner_index]);
876 ++(states[winner_index]);
878 winner_key = *(states[winner_index]);
881 if (is_sentinel(winner_key))
882 deallocate_segment(winner_index);
885 for (
unsigned_type i = (winner_index + kReg) >> 1; i > 0; i >>= 1)
887 current_pos = entry + i;
888 current_key = current_pos->key;
889 if (cmp(winner_key, current_key))
891 current_index = current_pos->index;
892 current_pos->key = winner_key;
893 current_pos->index = winner_index;
894 winner_key = current_key;
895 winner_index = current_index;
901 entry[0].index = winner_index;
902 entry[0].key = winner_key;
905 template <
class OutputIterator,
int LogK>
906 void multi_merge_f(OutputIterator begin, OutputIterator end)
908 OutputIterator done = end;
909 OutputIterator target = begin;
911 Entry* reg_entry = entry;
912 sequence_state* reg_states = states;
913 value_type winner_key = entry[0].key;
915 assert(log_k >= LogK);
916 while (target != done)
919 *target = *(reg_states[winner_index]);
922 ++(reg_states[winner_index]);
924 winner_key = *(reg_states[winner_index]);
927 if (is_sentinel(winner_key))
928 deallocate_segment(winner_index);
933 #define TreeStep(L) \
934 if (1 << LogK >= 1 << L) { \
935 Entry* pos ## L = reg_entry + ((winner_index + (1 << LogK)) >> (((int(LogK - L) + 1) >= 0) ? ((LogK - L) + 1) : 0)); \
936 value_type key ## L = pos ## L->key; \
937 if (cmp(winner_key, key ## L)) { \
938 unsigned_type index ## L = pos ## L->index; \
939 pos ## L->key = winner_key; \
940 pos ## L->index = winner_index; \
941 winner_key = key ## L; \
942 winner_index = index ## L; \
957 reg_entry[0].index = winner_index;
958 reg_entry[0].key = winner_key;
960 #endif //STXXL_PQ_EXTERNAL_LOSER_TREE
965 return k < arity || !free_segments.
empty();
970 template <
class Merger>
975 if (segment_size > 0)
978 if (free_segments.
empty())
982 assert(!free_segments.
empty());
987 assert(segment_size);
993 STXXL_VERBOSE1(
"ext_merger::insert_segment(merger,...) WARNING: inserting a segment with " <<
994 nblocks <<
" blocks");
995 STXXL_VERBOSE1(
"THIS IS INEFFICIENT: TRY TO CHANGE PRIORITY QUEUE PARAMETERS");
1000 first_size = block_type::size;
1004 std::list<bid_type>* bids =
new std::list<bid_type>(nblocks);
1008 another_merger.multi_merge(
1009 first_block->begin() + (block_type::size - first_size),
1010 first_block->end());
1012 STXXL_VERBOSE1(
"last element of first block " << *(first_block->end() - 1));
1013 assert(!cmp(*(first_block->begin() + (block_type::size - first_size)), *(first_block->end() - 1)));
1017 for (
typename std::list<bid_type>::iterator curbid = bids->begin(); curbid != bids->end(); ++curbid)
1020 another_merger.multi_merge(b->begin(), b->end());
1021 STXXL_VERBOSE1(
"first element of following block " << *curbid <<
" " << *(b->begin()));
1022 STXXL_VERBOSE1(
"last element of following block " << *curbid <<
" " << *(b->end() - 1));
1023 assert(!cmp(*(b->begin()), *(b->end() - 1)));
1024 pool->
write(b, *curbid);
1025 STXXL_VERBOSE1(
"written to block " << *curbid <<
" cached in " << b);
1028 insert_segment(bids, first_block, first_size, free_slot);
1030 size_ += segment_size;
1032 #if STXXL_PQ_EXTERNAL_LOSER_TREE
1037 update_on_insert((free_slot + k) >> 1, *(states[free_slot]), free_slot,
1038 &dummyKey, &dummyIndex, &dummyMask);
1039 #endif //STXXL_PQ_EXTERNAL_LOSER_TREE
1060 STXXL_VERBOSE1(
"ext_merger::insert_segment(bidlist,...) " <<
this <<
" " << bidlist->size() <<
" " << slot);
1061 assert(!is_segment_allocated(slot));
1062 assert(first_size > 0);
1065 new_sequence.
current = block_type::size - first_size;
1066 std::swap(new_sequence.
block, first_block);
1068 std::swap(new_sequence.
bids, bidlist);
1071 assert(bidlist->empty());
1075 assert(is_segment_allocated(slot));
1081 STXXL_VERBOSE1(
"ext_merger::deallocate_segment() deleting segment " << slot <<
" allocated=" <<
int(is_segment_allocated(slot)));
1082 assert(is_segment_allocated(slot));
1087 free_segments.
push(slot);
1093 return is_sentinel(*(states[slot]));
1110 #endif // !STXXL_CONTAINERS_PQ_EXT_MERGER_HEADER
sequence_state states[arity_bound]
block_type * sentinel_block
bool is_sorted(ForwardIterator first, ForwardIterator last)
bool not_sentinel(const value_type &a) const
const Type & STXXL_MIN(const Type &a, const Type &b)
const_reference front() const
External merger, based on the loser tree data structure.
bool is_sentinel(const value_type &a) const
unsigned long long int uint64
bool hint(bid_type bid)
Gives a hint for prefetching a block.
Inverts the order of a comparison functor by swapping its arguments.
bool is_segment_empty(unsigned_type slot) const
std::pair< Iterator, Iterator > pair
#define STXXL_DEFAULT_ALLOC_STRATEGY
std::iterator_traits< iterator >::value_type value_type
void deallocate_segment(unsigned_type slot)
void merge3_iterator(InputIterator &source0, InputIterator &source1, InputIterator &source2, OutputIterator target, SizeType length, CompareType &cmp)
request_ptr read(block_type *&block, bid_type bid)
Reads block. If this block is cached block is not read but passed from the cache. ...
void insert_segment(Merger &another_merger, size_type segment_size)
void swap_1D_arrays(Type *a, Type *b, unsigned_type size)
block_type::bid_type bid_type
short_sequence(Iterator first, Iterator last, origin_type origin)
#define STXXL_VERBOSE2(x)
const_iterator cbegin() const
unsigned_type mem_cons() const
uint_pair & operator++()
prefix increment operator (directly manipulates the integer parts)
bool is_sentinel(const value_type &a) const
const_iterator end() const
void delete_blocks(const BIDIteratorClass &bidbegin, const BIDIteratorClass &bidend)
Deallocates blocks.
block_type::value_type value_type
void multi_merge(OutputIterator begin, OutputIterator end)
virtual void wait(bool measure_time=true)=0
Suspends calling thread until completion of the request.
External sequence or deque container without random access. Introduction to sequence container: se...
#define STXXL_VERBOSE3(x)
Implements dynamically resizable buffered writing and prefetched reading pool.
void set_pool(pool_type *pool_)
choose_int_types< my_pointer_size >::int_type int_type
void insert_segment(std::list< bid_type > *bidlist, block_type *first_block, unsigned_type first_size, unsigned_type slot)
void push(const value_type &x)
unsigned_type origin_type
request_ptr write(block_type *&block, bid_type bid)
Passes a block to the pool for writing.
#define STXXL_BEGIN_NAMESPACE
#define STXXL_VERBOSE1(x)
origin_type origin() const
bool not_sentinel(const value_type &a) const
block_type * steal()
Take out a block from the pool.
void rebuild_loser_tree()
std::list< bid_type > * bids
const value_type & const_reference
void new_blocks(const DiskAssignFunctor &functor, BIDIteratorClass bidbegin, BIDIteratorClass bidend, unsigned_type offset=0)
Allocates new blocks.
const_iterator begin() const
choose_int_types< my_pointer_size >::unsigned_type unsigned_type
bool is_space_available() const
read_write_pool< block_type > pool_type
bool is_segment_allocated(unsigned_type slot) const
const value_type & top() const
std::iterator_traits< iterator >::difference_type size_type
void merge_iterator(InputIterator &source0, InputIterator &source1, OutputIterator target, SizeType length, CompareType &cmp)
const_iterator cend() const
const_reference back() const
size_type size_write() const
Returns number of blocks owned by the write_pool.
void swap(sequence_state &obj)
#define STXXL_END_NAMESPACE
const iterator const_iterator
ext_merger(pool_type *pool_)
void merge4_iterator(InputIterator &source0, InputIterator &source1, InputIterator &source2, InputIterator &source3, OutputIterator target, SizeType length, CompareType &cmp)
internal_bounded_stack< unsigned_type, arity > free_segments