16 #ifndef STXXL_CONTAINERS_PQ_EXT_MERGER_HEADER
17 #define STXXL_CONTAINERS_PQ_EXT_MERGER_HEADER
30 namespace priority_queue_local {
36 template <
class BlockType,
class CompareType,
unsigned Arity,
46 enum { arity = Arity, max_arity = 1UL << (LOG2<Arity>::ceil) };
50 typedef typename block_type::bid_type
bid_type;
60 #if STXXL_PARALLEL && STXXL_PARALLEL_PQ_MULTIWAY_MERGE_EXTERNAL
62 typedef parallel_merger_adapter<self_type, CompareType, Arity>
tree_type;
82 const value_type& operator * ()
const
84 return (*block)[current];
88 : block(NULL), current(0),
104 (*block)[0] = cmp.min_value();
109 return !(cmp(cmp.min_value(), a));
114 return cmp(cmp.min_value(), a);
121 std::swap(current, obj.
current);
122 std::swap(block, obj.
block);
123 std::swap(bids, obj.
bids);
124 assert(merger == obj.
merger);
131 assert(not_sentinel((*block)[current]));
132 assert(current < block->size);
136 if (current == block->size)
138 STXXL_VERBOSE2(
"ext_merger sequence_state operator++ crossing block border ");
142 STXXL_VERBOSE2(
"ext_merger sequence_state operator++ it was the last block in the sequence ");
144 bid_container_type to_delete;
145 std::swap(to_delete, bids);
150 STXXL_VERBOSE2(
"ext_merger sequence_state operator++ there is another block ");
151 bid_type bid = bids.front();
153 merger->pool->hint(bid);
156 STXXL_VERBOSE2(
"ext_merger sequence_state operator++ more blocks exist in a sequence, hinting the next");
157 merger->pool->hint(bids.front());
159 merger->pool->read(block, bid)->wait();
160 STXXL_VERBOSE2(
"first element of read block " << bid <<
" " << *(block->begin()) <<
" cached in " << block);
162 merger->pool->hint(bids.front());
163 block_manager::get_instance()->delete_block(bid);
206 delete states[i].
block;
208 delete sentinel_block;
223 return is_sentinel(*(states[slot]));
242 states[a].
swap(states[b]);
254 STXXL_VERBOSE1(
"ext_merger::free_array() deleting array " << slot <<
" allocated=" <<
int(is_array_allocated(slot)));
255 assert(is_array_allocated(slot));
269 if (!states[i].bids.empty())
281 sentinel_block = NULL;
282 if (arity < max_arity)
286 (*sentinel_block)[i] = tree.
cmp.min_value();
287 if (arity + 1 == max_arity) {
289 STXXL_ERRMSG(
"inefficient PQ parameters for ext_merger: arity + 1 == max_arity");
299 states[i].
block = sentinel_block;
308 std::swap(cmp, obj.cmp);
310 std::swap(logK, obj.logK);
311 std::swap(m_size, obj.
m_size);
321 return (STXXL_MIN<unsigned_type>(arity + 1, max_arity) * block_type::raw_size);
351 STXXL_VERBOSE1(
"ext_merger::insert_segment(bidlist,...) " <<
this <<
" " << bidlist.size() <<
" " << slot);
352 assert(!is_array_allocated(slot));
353 assert(first_size > 0);
356 new_sequence.
current = block_type::size - first_size;
357 std::swap(new_sequence.
block, first_block);
359 std::swap(new_sequence.
bids, bidlist);
361 assert(is_array_allocated(slot));
366 template <
class Merger>
371 if (segment_size == 0)
382 assert(segment_size);
388 STXXL_VERBOSE1(
"ext_merger::insert_segment(merger,...) WARNING: inserting a segment with " <<
389 nblocks <<
" blocks");
390 STXXL_VERBOSE1(
"THIS IS INEFFICIENT: TRY TO CHANGE PRIORITY QUEUE PARAMETERS");
395 first_size = block_type::size;
401 bid_container_type bids(nblocks);
402 bm->
new_blocks(alloc_strategy(), bids.begin(), bids.end());
405 another_merger.multi_merge(
406 first_block->begin() + (block_type::size - first_size),
409 STXXL_VERBOSE1(
"last element of first block " << *(first_block->end() - 1));
410 assert(!tree.
cmp(*(first_block->begin() + (block_type::size - first_size)), *(first_block->end() - 1)));
414 for (
typename bid_container_type::iterator curbid = bids.begin(); curbid != bids.end(); ++curbid)
417 another_merger.multi_merge(b->begin(), b->end());
418 STXXL_VERBOSE1(
"first element of following block " << *curbid <<
" " << *(b->begin()));
419 STXXL_VERBOSE1(
"last element of following block " << *curbid <<
" " << *(b->end() - 1));
420 assert(!tree.
cmp(*(b->begin()), *(b->end() - 1)));
421 pool->
write(b, *curbid);
422 STXXL_VERBOSE1(
"written to block " << *curbid <<
" cached in " << b);
425 insert_segment(bids, first_block, first_size, index);
427 m_size += segment_size;
438 template <
class OutputIterator>
441 assert((size_type)(end - begin) <= m_size);
443 #if STXXL_PARALLEL && STXXL_PARALLEL_PQ_MULTIWAY_MERGE_EXTERNAL
444 multi_merge_parallel(begin, end);
445 #else // STXXL_PARALLEL && STXXL_PARALLEL_PQ_MULTIWAY_MERGE_EXTERNAL
447 m_size -= end - begin;
451 #if STXXL_PARALLEL && STXXL_PARALLEL_PQ_MULTIWAY_MERGE_EXTERNAL
457 template <
class OutputIterator>
458 void multi_merge_parallel(OutputIterator begin, OutputIterator end)
467 typedef std::pair<typename block_type::iterator, typename block_type::iterator>
sequence;
469 std::vector<sequence> seqs;
470 std::vector<unsigned_type> orig_seq_index;
476 if (states[i].current == states[i].block->size || is_sentinel(*states[i]))
479 seqs.push_back(std::make_pair(states[i].block->begin() + states[i].
current, states[i].
block->end()));
480 orig_seq_index.push_back(i);
482 #if STXXL_CHECK_ORDER_IN_SORTS
483 if (!is_sentinel(*seqs.back().first) && !
stxxl::is_sorted(seqs.back().first, seqs.back().second, inv_cmp))
485 STXXL_VERBOSE1(
"length " << i <<
" " << (seqs.back().second - seqs.back().first));
486 for (value_type* v = seqs.back().first + 1; v < seqs.back().second; ++v)
488 if (inv_cmp(*v, *(v - 1)))
490 STXXL_VERBOSE1(
"Error at position " << i <<
"/" << (v - seqs.back().first - 1) <<
"/" << (v - seqs.back().first) <<
" " << *(v - 1) <<
" " << *v);
494 STXXL_VERBOSE1(
"Wrong sentinel at position " << (v - seqs.back().first));
502 if (!states[i].bids.empty())
506 assert(seqs.size() > 0);
508 #if STXXL_CHECK_ORDER_IN_SORTS
509 value_type last_elem;
513 diff_type rest = end - begin;
518 value_type min_last = tree.
cmp.min_value();
520 diff_type total_size = 0;
524 diff_type seq_i_size = seqs[i].second - seqs[i].first;
527 total_size += seq_i_size;
528 if (inv_cmp(*(seqs[i].second - 1), min_last))
529 min_last = *(seqs[i].second - 1);
531 STXXL_VERBOSE1(
"front block of seq " << i <<
": front=" << *(seqs[i].first) <<
" back=" << *(seqs[i].second - 1) <<
" len=" << seq_i_size);
538 assert(total_size > 0);
539 assert(!is_sentinel(min_last));
541 STXXL_VERBOSE1(
"min_last " << min_last <<
" total size " << total_size <<
" num_seq " << seqs.size());
543 diff_type less_equal_than_min_last = 0;
549 typename block_type::iterator position =
550 std::upper_bound(seqs[i].first, seqs[i].second, min_last, inv_cmp);
554 STXXL_VERBOSE1(
"seq " << i <<
": " << (position - seqs[i].first) <<
" greater equal than " << min_last);
556 less_equal_than_min_last += (position - seqs[i].first);
560 diff_type output_size =
STXXL_MIN(less_equal_than_min_last, rest);
562 STXXL_VERBOSE1(
"output_size=" << output_size <<
" = min(leq_t_ml=" << less_equal_than_min_last <<
", rest=" << rest <<
")");
564 assert(output_size > 0);
570 seqs.begin(), seqs.end(), begin, output_size, inv_cmp);
573 m_size -= output_size;
577 sequence_state& state = states[orig_seq_index[i]];
579 state.
current = seqs[i].first - state.block->begin();
581 assert(seqs[i].first <= seqs[i].second);
583 if (seqs[i].first == seqs[i].second)
587 assert(state.current == state.block->size);
588 if (state.bids.empty())
591 STXXL_VERBOSE1(
"seq " << i <<
": ext_merger::multi_merge(...) it was the last block in the sequence ");
596 #if STXXL_CHECK_ORDER_IN_SORTS
597 last_elem = *(seqs[i].second - 1);
599 STXXL_VERBOSE1(
"seq " << i <<
": ext_merger::multi_merge(...) there is another block ");
600 bid_type bid = state.bids.front();
601 state.bids.pop_front();
603 if (!(state.bids.empty()))
605 STXXL_VERBOSE2(
"seq " << i <<
": ext_merger::multi_merge(...) more blocks exist, hinting the next");
606 pool->
hint(state.bids.front());
608 pool->
read(state.block, bid)->
wait();
609 STXXL_VERBOSE1(
"seq " << i <<
": first element of read block " << bid <<
" " << *(state.block->begin()) <<
" cached in " << state.block);
610 if (!(state.bids.empty()))
611 pool->
hint(state.bids.front());
613 seqs[i] = std::make_pair(state.block->begin() + state.current, state.block->end());
614 block_manager::get_instance()->delete_block(bid);
616 #if STXXL_CHECK_ORDER_IN_SORTS
617 STXXL_VERBOSE1(
"before " << last_elem <<
" after " << *seqs[i].first <<
" newly loaded block " << bid);
620 STXXL_VERBOSE1(
"length " << i <<
" " << (seqs[i].second - seqs[i].first));
621 for (value_type* v = seqs[i].first + 1; v < seqs[i].second; ++v)
623 if (inv_cmp(*v, *(v - 1)))
625 STXXL_VERBOSE1(
"Error at position " << i <<
"/" << (v - seqs[i].first - 1) <<
"/" << (v - seqs[i].first) <<
" " << *(v - 1) <<
" " << *v);
629 STXXL_VERBOSE1(
"Wrong sentinel at position " << (v - seqs[i].first));
643 if (is_array_empty(seg))
652 #endif // STXXL_PARALLEL && STXXL_PARALLEL_PQ_MULTIWAY_MERGE_EXTERNAL
661 #endif // !STXXL_CONTAINERS_PQ_EXT_MERGER_HEADER
bid_container_type bids
list of blocks forming this sequence
read_write_pool< block_type > pool_type
block_type::value_type value_type
const Type & STXXL_MIN(const Type &a, const Type &b)
bool is_space_available() const
BlockType block_type
class is parameterized by the block of the external arrays
External merger, based on the loser tree data structure.
void maybe_compact()
compact tree if it got considerably smaller
block_type * sentinel_block
a memory block filled with sentinel values
void set_pool(pool_type *pool_)
loser_tree< self_type, CompareType, Arity > tree_type
type of embedded loser tree
bool hint(bid_type bid)
Gives a hint for prefetching a block.
Inverts the order of a comparison functor by swapping its arguments.
std::deque< bid_type > bid_container_type
RandomAccessIterator3 multiway_merge(RandomAccessIteratorPairIterator seqs_begin, RandomAccessIteratorPairIterator seqs_end, RandomAccessIterator3 target, DiffType length, Comparator comp)
Multi-way merging dispatcher.
#define STXXL_DEFAULT_ALLOC_STRATEGY
ext_merger(const compare_type &c=compare_type())
request_ptr read(block_type *&block, bid_type bid)
Reads block.
void swap_1D_arrays(Type *a, Type *b, unsigned_type size)
block_type * block
current block
bool is_sentinel(const value_type &a) const
True if a is the sentinel value.
#define STXXL_VERBOSE2(x)
void insert_segment(bid_container_type &bidlist, block_type *first_block, unsigned_type first_size, unsigned_type slot)
uint_pair & operator++()
prefix increment operator (directly manipulates the integer parts)
bool is_array_allocated(unsigned_type slot) const
Is this segment allocated? Otherwise it's empty, already on the stack of free segment indices and can...
block_type::bid_type bid_type
size_type size() const
Return the number of items in the arrays.
ext_merger< BlockType, CompareType, Arity, AllocStr > self_type
our type
void free_player(unsigned_type slot)
Free a finished player's slot.
void delete_blocks(const BIDIteratorClass &bidbegin, const BIDIteratorClass &bidend)
Deallocates blocks.
virtual void wait(bool measure_time=true)=0
Suspends calling thread until completion of the request.
unsigned_type k
current tree size, invariant (k == 1 << logK), always a power of two
External sequence or deque container without random access. Introduction to sequence container: se...
sequence_type & get_array(unsigned_type slot)
Return the item sequence of the given slot.
unsigned_type mem_cons() const
bool is_array_empty(unsigned_type slot) const
is this segment empty ?
void multi_merge(Element *begin, Element *end)
void swap_arrays(unsigned_type a, unsigned_type b)
Swap contents of arrays a and b.
request_ptr write(block_type *&block, bid_type bid)
Passes a block to the pool for writing.
sequence_state sequence_type
type of sequences in which the values are stored: external arrays
#define STXXL_BEGIN_NAMESPACE
bool not_sentinel(const value_type &a) const
size_type m_size
total number of elements stored
#define STXXL_VERBOSE1(x)
void prefetch_arrays()
Hint (prefetch) first non-internal (actually second) block of each sequence.
pool_type * pool
read and writer block pool
bool is_sentinel(const value_type &a) const
void update_on_insert(unsigned_type node, const Element &newKey, unsigned_type newIndex, Element *winnerKey, unsigned_type *winnerIndex, unsigned_type *mask)
tree_type tree
loser tree instance
bool is_sorted(ForwardIterator first, ForwardIterator last, StrictWeakOrdering comp)
block_type * steal()
Take out a block from the pool.
void multi_merge(OutputIterator begin, OutputIterator end)
unsigned_type new_player()
Allocate a free slot for a new player.
void append_merger(Merger &another_merger, size_type segment_size)
Merge all items from another merger and insert the resulting external array into the merger...
bool is_space_available() const
Whether there is still space for new array.
void new_blocks(const DiskAssignFunctor &functor, BIDIteratorClass bidbegin, BIDIteratorClass bidend, unsigned_type offset=0)
Allocates new blocks.
void swap(sequence_state &obj)
sequence_state states[max_arity]
sequence including current position, dereference gives current element
uint64 external_size_type
void make_array_sentinel(unsigned_type a)
Set a usually empty array to the sentinel.
bool is_sentinel(const Element &a)
choose_int_types< my_pointer_size >::unsigned_type unsigned_type
size_type size_write() const
Returns number of blocks owned by the write_pool.
void free_array(unsigned_type slot)
free an empty segment.
unsigned_type current
current index in current block
external_size_type size_type
size type of total number of item in merger
#define STXXL_END_NAMESPACE