00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016 #ifndef STXXL_PQ_EXT_MERGER_HEADER
00017 #define STXXL_PQ_EXT_MERGER_HEADER
00018
00019 #include <stxxl/bits/mng/mng.h>
00020
00021 __STXXL_BEGIN_NAMESPACE
00022
00026
00029 namespace priority_queue_local
00030 {
00031 template <typename Iterator>
00032 class short_sequence : public std::pair<Iterator, Iterator>
00033 {
00034 typedef std::pair<Iterator, Iterator> pair;
00035
00036 public:
00037 typedef Iterator iterator;
00038 typedef const iterator const_iterator;
00039 typedef typename std::iterator_traits<iterator>::value_type value_type;
00040 typedef typename std::iterator_traits<iterator>::difference_type size_type;
00041 typedef value_type & reference;
00042 typedef const value_type & const_reference;
00043 typedef unsigned_type origin_type;
00044
00045 private:
00046 origin_type m_origin;
00047
00048 public:
00049 short_sequence(Iterator first, Iterator last, origin_type origin) :
00050 pair(first, last), m_origin(origin)
00051 { }
00052
00053 iterator begin()
00054 {
00055 return this->first;
00056 }
00057
00058 const_iterator begin() const
00059 {
00060 return this->first;
00061 }
00062
00063 const_iterator cbegin() const
00064 {
00065 return begin();
00066 }
00067
00068 iterator end()
00069 {
00070 return this->second;
00071 }
00072
00073 const_iterator end() const
00074 {
00075 return this->second;
00076 }
00077
00078 const_iterator cend() const
00079 {
00080 return end();
00081 }
00082
00083 reference front()
00084 {
00085 return *begin();
00086 }
00087
00088 const_reference front() const
00089 {
00090 return *begin();
00091 }
00092
00093 reference back()
00094 {
00095 return *(end() - 1);
00096 }
00097
00098 const_reference back() const
00099 {
00100 return *(end() - 1);
00101 }
00102
00103 size_type size() const
00104 {
00105 return end() - begin();
00106 }
00107
00108 bool empty() const
00109 {
00110 return size() == 0;
00111 }
00112
00113 origin_type origin() const
00114 {
00115 return m_origin;
00116 }
00117 };
00118
00123 template <class BlockType_,
00124 class Cmp_,
00125 unsigned Arity_,
00126 class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY>
00127 class ext_merger : private noncopyable
00128 {
00129 public:
00130 typedef stxxl::uint64 size_type;
00131 typedef BlockType_ block_type;
00132 typedef typename block_type::bid_type bid_type;
00133 typedef typename block_type::value_type value_type;
00134 typedef Cmp_ comparator_type;
00135 typedef AllocStr_ alloc_strategy;
00136 typedef read_write_pool<block_type> pool_type;
00137
00138
00139 enum { arity = Arity_, arity_bound = 1UL << (LOG2<Arity_>::ceil) };
00140
00141 protected:
00142 comparator_type cmp;
00143
00144 bool is_sentinel(const value_type & a) const
00145 {
00146 return !(cmp(cmp.min_value(), a));
00147 }
00148
00149 bool not_sentinel(const value_type & a) const
00150 {
00151 return cmp(cmp.min_value(), a);
00152 }
00153
00154 struct sequence_state : private noncopyable
00155 {
00156 block_type * block;
00157 unsigned_type current;
00158 std::list<bid_type> * bids;
00159 comparator_type cmp;
00160 ext_merger * merger;
00161 bool allocated;
00162
00164 const value_type & operator * () const
00165 {
00166 return (*block)[current];
00167 }
00168
00169 sequence_state() : bids(NULL), allocated(false)
00170 { }
00171
00172 ~sequence_state()
00173 {
00174 STXXL_VERBOSE2("ext_merger sequence_state::~sequence_state()");
00175 if (bids != NULL)
00176 {
00177 block_manager * bm = block_manager::get_instance();
00178 bm->delete_blocks(bids->begin(), bids->end());
00179 delete bids;
00180 }
00181 }
00182
00183 void make_inf()
00184 {
00185 current = 0;
00186 (*block)[0] = cmp.min_value();
00187 }
00188
00189 bool is_sentinel(const value_type & a) const
00190 {
00191 return !(cmp(cmp.min_value(), a));
00192 }
00193
00194 bool not_sentinel(const value_type & a) const
00195 {
00196 return cmp(cmp.min_value(), a);
00197 }
00198
00199 void swap(sequence_state & obj)
00200 {
00201 if (&obj != this)
00202 {
00203 std::swap(current, obj.current);
00204 std::swap(block, obj.block);
00205 std::swap(bids, obj.bids);
00206 assert(merger == obj.merger);
00207 std::swap(allocated, obj.allocated);
00208 }
00209 }
00210
00211 sequence_state & operator ++ ()
00212 {
00213 assert(not_sentinel((*block)[current]));
00214 assert(current < block->size);
00215
00216 ++current;
00217
00218 if (current == block->size)
00219 {
00220 STXXL_VERBOSE2("ext_merger sequence_state operator++ crossing block border ");
00221
00222 assert(bids != NULL);
00223 if (bids->empty())
00224 {
00225 STXXL_VERBOSE2("ext_merger sequence_state operator++ it was the last block in the sequence ");
00226 delete bids;
00227 bids = NULL;
00228 make_inf();
00229 }
00230 else
00231 {
00232 STXXL_VERBOSE2("ext_merger sequence_state operator++ there is another block ");
00233 bid_type bid = bids->front();
00234 bids->pop_front();
00235 merger->pool->hint(bid);
00236 if (!(bids->empty()))
00237 {
00238 STXXL_VERBOSE2("ext_merger sequence_state operator++ more blocks exist in a sequence, hinting the next");
00239 merger->pool->hint(bids->front());
00240 }
00241 merger->pool->read(block, bid)->wait();
00242 STXXL_VERBOSE2("first element of read block " << bid << " " << *(block->begin()) << " cached in " << block);
00243 if (!(bids->empty()))
00244 merger->pool->hint(bids->front());
00245 block_manager::get_instance()->delete_block(bid);
00246 current = 0;
00247 }
00248 }
00249 return *this;
00250 }
00251 };
00252
00253
00254 #if STXXL_PQ_EXTERNAL_LOSER_TREE
00255 struct Entry
00256 {
00257 value_type key;
00258 unsigned_type index;
00259 };
00260 #endif //STXXL_PQ_EXTERNAL_LOSER_TREE
00261
00262 size_type size_;
00263 unsigned_type log_k;
00264 unsigned_type k;
00265
00266
00267
00268
00269
00270 internal_bounded_stack<unsigned_type, arity> free_segments;
00271
00272 #if STXXL_PQ_EXTERNAL_LOSER_TREE
00273
00274
00275 Entry entry[arity_bound];
00276 #endif //STXXL_PQ_EXTERNAL_LOSER_TREE
00277
00278
00279
00280
00281 sequence_state states[arity_bound];
00282
00283 pool_type * pool;
00284
00285 block_type * sentinel_block;
00286
00287 public:
00288 ext_merger() :
00289 size_(0), log_k(0), k(1), pool(0)
00290 {
00291 init();
00292 }
00293
00294 ext_merger(pool_type * pool_) :
00295 size_(0), log_k(0), k(1),
00296 pool(pool_)
00297 {
00298 init();
00299 }
00300
00301 virtual ~ext_merger()
00302 {
00303 STXXL_VERBOSE1("ext_merger::~ext_merger()");
00304 for (unsigned_type i = 0; i < arity; ++i)
00305 {
00306 delete states[i].block;
00307 }
00308 delete sentinel_block;
00309 }
00310
00311 void set_pool(pool_type * pool_)
00312 {
00313 pool = pool_;
00314 }
00315
00316 private:
00317 void init()
00318 {
00319 STXXL_VERBOSE2("ext_merger::init()");
00320 assert(!cmp(cmp.min_value(), cmp.min_value()));
00321
00322 sentinel_block = NULL;
00323 if (arity < arity_bound)
00324 {
00325 sentinel_block = new block_type;
00326 for (unsigned_type i = 0; i < block_type::size; ++i)
00327 (*sentinel_block)[i] = cmp.min_value();
00328 if (arity + 1 == arity_bound) {
00329
00330 STXXL_ERRMSG("inefficient PQ parameters for ext_merger: arity + 1 == arity_bound");
00331 }
00332 }
00333
00334 for (unsigned_type i = 0; i < arity_bound; ++i)
00335 {
00336 states[i].merger = this;
00337 if (i < arity)
00338 states[i].block = new block_type;
00339 else
00340 states[i].block = sentinel_block;
00341
00342 states[i].make_inf();
00343 }
00344
00345 assert(k == 1);
00346 free_segments.push(0);
00347
00348 rebuild_loser_tree();
00349 #if STXXL_PQ_EXTERNAL_LOSER_TREE
00350 assert(is_sentinel(*states[entry[0].index]));
00351 #endif //STXXL_PQ_EXTERNAL_LOSER_TREE
00352 }
00353
00354
00355 void rebuild_loser_tree()
00356 {
00357 #if STXXL_PQ_EXTERNAL_LOSER_TREE
00358 unsigned_type winner = init_winner(1);
00359 entry[0].index = winner;
00360 entry[0].key = *(states[winner]);
00361 #endif //STXXL_PQ_EXTERNAL_LOSER_TREE
00362 }
00363
00364
00365 #if STXXL_PQ_EXTERNAL_LOSER_TREE
00366
00367
00368
00369
00370
00371 unsigned_type init_winner(unsigned_type root)
00372 {
00373 if (root >= k)
00374 {
00375 return root - k;
00376 }
00377 else
00378 {
00379 unsigned_type left = init_winner(2 * root);
00380 unsigned_type right = init_winner(2 * root + 1);
00381 value_type lk = *(states[left]);
00382 value_type rk = *(states[right]);
00383 if (!(cmp(lk, rk)))
00384 {
00385 entry[root].index = right;
00386 entry[root].key = rk;
00387 return left;
00388 }
00389 else
00390 {
00391 entry[root].index = left;
00392 entry[root].key = lk;
00393 return right;
00394 }
00395 }
00396 }
00397
00398
00399
00400
00401
00402
00403 void update_on_insert(
00404 unsigned_type node,
00405 const value_type & new_key,
00406 unsigned_type new_index,
00407 value_type * winner_key,
00408 unsigned_type * winner_index,
00409 unsigned_type * mask)
00410 {
00411 if (node == 0)
00412 {
00413 *mask = 1 << (log_k - 1);
00414 *winner_key = entry[0].key;
00415 *winner_index = entry[0].index;
00416 if (cmp(entry[node].key, new_key))
00417 {
00418 entry[node].key = new_key;
00419 entry[node].index = new_index;
00420 }
00421 }
00422 else
00423 {
00424 update_on_insert(node >> 1, new_key, new_index, winner_key, winner_index, mask);
00425 value_type loserKey = entry[node].key;
00426 unsigned_type loserIndex = entry[node].index;
00427 if ((*winner_index & *mask) != (new_index & *mask))
00428 {
00429 if (cmp(loserKey, new_key))
00430 {
00431 if (cmp(*winner_key, new_key))
00432 {
00433 entry[node].key = *winner_key;
00434 entry[node].index = *winner_index;
00435 }
00436 else
00437 {
00438 entry[node].key = new_key;
00439 entry[node].index = new_index;
00440 }
00441 }
00442 *winner_key = loserKey;
00443 *winner_index = loserIndex;
00444 }
00445
00446
00447
00448
00449
00450
00451
00452 *mask >>= 1;
00453 }
00454 }
00455 #endif //STXXL_PQ_EXTERNAL_LOSER_TREE
00456
00457
00458 void double_k()
00459 {
00460 STXXL_VERBOSE1("ext_merger::double_k (before) k=" << k << " log_k=" << log_k << " arity_bound=" << arity_bound << " arity=" << arity << " #free=" << free_segments.size());
00461 assert(k > 0);
00462 assert(k < arity);
00463 assert(free_segments.empty());
00464
00465
00466
00467 for (unsigned_type i = 2 * k - 1; i >= k; i--)
00468 {
00469 states[i].make_inf();
00470 if (i < arity)
00471 free_segments.push(i);
00472 }
00473
00474
00475 k *= 2;
00476 log_k++;
00477
00478 STXXL_VERBOSE1("ext_merger::double_k (after) k=" << k << " log_k=" << log_k << " arity_bound=" << arity_bound << " arity=" << arity << " #free=" << free_segments.size());
00479 assert(!free_segments.empty());
00480
00481
00482 rebuild_loser_tree();
00483 }
00484
00485
00486
00487 void compact_tree()
00488 {
00489 STXXL_VERBOSE1("ext_merger::compact_tree (before) k=" << k << " log_k=" << log_k << " #free=" << free_segments.size());
00490 assert(log_k > 0);
00491
00492
00493
00494 unsigned_type target = 0;
00495 for (unsigned_type from = 0; from < k; from++)
00496 {
00497 if (!is_segment_empty(from))
00498 {
00499 assert(is_segment_allocated(from));
00500 if (from != target)
00501 {
00502 assert(!is_segment_allocated(target));
00503 states[target].swap(states[from]);
00504 }
00505 ++target;
00506 }
00507 }
00508
00509
00510 while (k > 1 && target <= (k / 2))
00511 {
00512 k /= 2;
00513 log_k--;
00514 }
00515
00516
00517 free_segments.clear();
00518 for ( ; target < k; target++)
00519 {
00520 assert(!is_segment_allocated(target));
00521 states[target].make_inf();
00522 if (target < arity)
00523 free_segments.push(target);
00524 }
00525
00526 STXXL_VERBOSE1("ext_merger::compact_tree (after) k=" << k << " log_k=" << log_k << " #free=" << free_segments.size());
00527 assert(k > 0);
00528
00529
00530 rebuild_loser_tree();
00531 }
00532
00533
00534 #if 0
00535 void swap(ext_merger & obj)
00536 {
00537 std::swap(cmp, obj.cmp);
00538 std::swap(free_segments, obj.free_segments);
00539 std::swap(size_, obj.size_);
00540 std::swap(log_k, obj.log_k);
00541 std::swap(k, obj.k);
00542 swap_1D_arrays(entry, obj.entry, arity_bound);
00543 swap_1D_arrays(states, obj.states, arity_bound);
00544
00545
00546 }
00547 #endif
00548
00549 public:
00550 unsigned_type mem_cons() const
00551 {
00552 return (STXXL_MIN<unsigned_type>(arity + 1, arity_bound) * block_type::raw_size);
00553 }
00554
00555
00556
00557
00558
00559
00560 template <class OutputIterator>
00561 void multi_merge(OutputIterator begin, OutputIterator end)
00562 {
00563 size_type length = end - begin;
00564
00565 STXXL_VERBOSE1("ext_merger::multi_merge from " << k << " sequence(s), length = " << length);
00566
00567 if (length == 0)
00568 return;
00569
00570 assert(k > 0);
00571 assert(length <= size_);
00572
00573
00574
00575 #if STXXL_PARALLEL && STXXL_PARALLEL_PQ_MULTIWAY_MERGE_EXTERNAL
00576 typedef stxxl::int64 diff_type;
00577 typedef std::pair<typename block_type::iterator, typename block_type::iterator> sequence;
00578
00579 std::vector<sequence> seqs;
00580 std::vector<unsigned_type> orig_seq_index;
00581
00582 Cmp_ cmp;
00583 priority_queue_local::invert_order<Cmp_, value_type, value_type> inv_cmp(cmp);
00584
00585 for (unsigned_type i = 0; i < k; ++i)
00586 {
00587 if (states[i].current == states[i].block->size || is_sentinel(*states[i]))
00588 continue;
00589
00590 seqs.push_back(std::make_pair(states[i].block->begin() + states[i].current, states[i].block->end()));
00591 orig_seq_index.push_back(i);
00592
00593 #if STXXL_CHECK_ORDER_IN_SORTS
00594 if (!is_sentinel(*seqs.back().first) && !stxxl::is_sorted(seqs.back().first, seqs.back().second, inv_cmp))
00595 {
00596 STXXL_VERBOSE1("length " << i << " " << (seqs.back().second - seqs.back().first));
00597 for (value_type * v = seqs.back().first + 1; v < seqs.back().second; ++v)
00598 {
00599 if (inv_cmp(*v, *(v - 1)))
00600 {
00601 STXXL_VERBOSE1("Error at position " << i << "/" << (v - seqs.back().first - 1) << "/" << (v - seqs.back().first) << " " << *(v - 1) << " " << *v);
00602 }
00603 if (is_sentinel(*v))
00604 {
00605 STXXL_VERBOSE1("Wrong sentinel at position " << (v - seqs.back().first));
00606 }
00607 }
00608 assert(false);
00609 }
00610 #endif
00611
00612
00613 if (states[i].bids != NULL && !states[i].bids->empty())
00614 pool->hint(states[i].bids->front());
00615 }
00616
00617 assert(seqs.size() > 0);
00618
00619 #if STXXL_CHECK_ORDER_IN_SORTS
00620 value_type last_elem;
00621 #endif
00622
00623 diff_type rest = length;
00624
00625 while (rest > 0)
00626 {
00627 value_type min_last = cmp.min_value();
00628 diff_type total_size = 0;
00629
00630 for (unsigned_type i = 0; i < seqs.size(); ++i)
00631 {
00632 diff_type seq_i_size = seqs[i].second - seqs[i].first;
00633 if (seq_i_size > 0)
00634 {
00635 total_size += seq_i_size;
00636 if (inv_cmp(*(seqs[i].second - 1), min_last))
00637 min_last = *(seqs[i].second - 1);
00638
00639 STXXL_VERBOSE1("front block of seq " << i << ": front=" << *(seqs[i].first) << " back=" << *(seqs[i].second - 1) << " len=" << seq_i_size);
00640 } else {
00641 STXXL_VERBOSE1("front block of seq " << i << ": empty");
00642 }
00643 }
00644
00645 assert(total_size > 0);
00646 assert(!is_sentinel(min_last));
00647
00648 STXXL_VERBOSE1("min_last " << min_last << " total size " << total_size << " num_seq " << seqs.size());
00649
00650 diff_type less_equal_than_min_last = 0;
00651
00652 for (unsigned_type i = 0; i < seqs.size(); ++i)
00653 {
00654
00655
00656 typename block_type::iterator position =
00657 std::upper_bound(seqs[i].first, seqs[i].second, min_last, inv_cmp);
00658
00659
00660
00661 STXXL_VERBOSE1("seq " << i << ": " << (position - seqs[i].first) << " greater equal than " << min_last);
00662
00663 less_equal_than_min_last += (position - seqs[i].first);
00664 }
00665
00666 diff_type output_size = STXXL_MIN(less_equal_than_min_last, rest);
00667
00668 STXXL_VERBOSE1("output_size=" << output_size << " = min(leq_t_ml=" << less_equal_than_min_last << ", rest=" << rest << ")");
00669
00670 assert(output_size > 0);
00671
00672
00673
00674 begin = parallel::multiway_merge(seqs.begin(), seqs.end(), begin, inv_cmp, output_size);
00675
00676 rest -= output_size;
00677 size_ -= output_size;
00678
00679 for (unsigned_type i = 0; i < seqs.size(); ++i)
00680 {
00681 sequence_state & state = states[orig_seq_index[i]];
00682
00683 state.current = seqs[i].first - state.block->begin();
00684
00685 assert(seqs[i].first <= seqs[i].second);
00686
00687 if (seqs[i].first == seqs[i].second)
00688 {
00689 assert(state.current == state.block->size);
00690 if (state.bids == NULL || state.bids->empty())
00691 {
00692 STXXL_VERBOSE1("seq " << i << ": ext_merger::multi_merge(...) it was the last block in the sequence ");
00693 state.make_inf();
00694 }
00695 else
00696 {
00697 #if STXXL_CHECK_ORDER_IN_SORTS
00698 last_elem = *(seqs[i].second - 1);
00699 #endif
00700 STXXL_VERBOSE1("seq " << i << ": ext_merger::multi_merge(...) there is another block ");
00701 bid_type bid = state.bids->front();
00702 state.bids->pop_front();
00703 pool->hint(bid);
00704 if (!(state.bids->empty()))
00705 {
00706 STXXL_VERBOSE2("seq " << i << ": ext_merger::multi_merge(...) more blocks exist, hinting the next");
00707 pool->hint(state.bids->front());
00708 }
00709 pool->read(state.block, bid)->wait();
00710 STXXL_VERBOSE1("seq " << i << ": first element of read block " << bid << " " << *(state.block->begin()) << " cached in " << state.block);
00711 if (!(state.bids->empty()))
00712 pool->hint(state.bids->front());
00713 state.current = 0;
00714 seqs[i] = std::make_pair(state.block->begin() + state.current, state.block->end());
00715 block_manager::get_instance()->delete_block(bid);
00716
00717 #if STXXL_CHECK_ORDER_IN_SORTS
00718 STXXL_VERBOSE1("before " << last_elem << " after " << *seqs[i].first << " newly loaded block " << bid);
00719 if (!stxxl::is_sorted(seqs[i].first, seqs[i].second, inv_cmp))
00720 {
00721 STXXL_VERBOSE1("length " << i << " " << (seqs[i].second - seqs[i].first));
00722 for (value_type * v = seqs[i].first + 1; v < seqs[i].second; ++v)
00723 {
00724 if (inv_cmp(*v, *(v - 1)))
00725 {
00726 STXXL_VERBOSE1("Error at position " << i << "/" << (v - seqs[i].first - 1) << "/" << (v - seqs[i].first) << " " << *(v - 1) << " " << *v);
00727 }
00728 if (is_sentinel(*v))
00729 {
00730 STXXL_VERBOSE1("Wrong sentinel at position " << (v - seqs[i].first));
00731 }
00732 }
00733 assert(false);
00734 }
00735 #endif
00736 }
00737 }
00738 }
00739 }
00740
00741 for (unsigned_type i = 0; i < seqs.size(); ++i)
00742 {
00743 unsigned_type seg = orig_seq_index[i];
00744 if (is_segment_empty(seg))
00745 {
00746 STXXL_VERBOSE1("deallocated " << seg);
00747 deallocate_segment(seg);
00748 }
00749 }
00750
00751 #else // STXXL_PARALLEL && STXXL_PARALLEL_PQ_MULTIWAY_MERGE_EXTERNAL
00752
00753
00754 for (unsigned_type i = 0; i < k; ++i)
00755 {
00756 if (states[i].bids != NULL && !states[i].bids->empty())
00757 pool->hint(states[i].bids->front());
00758 }
00759
00760 switch (log_k) {
00761 case 0:
00762 assert(k == 1);
00763 assert(entry[0].index == 0);
00764 assert(free_segments.empty());
00765
00766
00767 for (size_type i = 0; i < length; ++i, ++(states[0]), ++begin)
00768 *begin = *(states[0]);
00769
00770 entry[0].key = **states;
00771 if (is_segment_empty(0))
00772 deallocate_segment(0);
00773
00774 break;
00775 case 1:
00776 assert(k == 2);
00777 merge_iterator(states[0], states[1], begin, length, cmp);
00778 rebuild_loser_tree();
00779 if (is_segment_empty(0) && is_segment_allocated(0))
00780 deallocate_segment(0);
00781
00782 if (is_segment_empty(1) && is_segment_allocated(1))
00783 deallocate_segment(1);
00784
00785 break;
00786 case 2:
00787 assert(k == 4);
00788 if (is_segment_empty(3))
00789 merge3_iterator(states[0], states[1], states[2], begin, length, cmp);
00790 else
00791 merge4_iterator(states[0], states[1], states[2], states[3], begin, length, cmp);
00792 rebuild_loser_tree();
00793 if (is_segment_empty(0) && is_segment_allocated(0))
00794 deallocate_segment(0);
00795
00796 if (is_segment_empty(1) && is_segment_allocated(1))
00797 deallocate_segment(1);
00798
00799 if (is_segment_empty(2) && is_segment_allocated(2))
00800 deallocate_segment(2);
00801
00802 if (is_segment_empty(3) && is_segment_allocated(3))
00803 deallocate_segment(3);
00804
00805 break;
00806 case 3: multi_merge_f<OutputIterator, 3>(begin, end);
00807 break;
00808 case 4: multi_merge_f<OutputIterator, 4>(begin, end);
00809 break;
00810 case 5: multi_merge_f<OutputIterator, 5>(begin, end);
00811 break;
00812 case 6: multi_merge_f<OutputIterator, 6>(begin, end);
00813 break;
00814 case 7: multi_merge_f<OutputIterator, 7>(begin, end);
00815 break;
00816 case 8: multi_merge_f<OutputIterator, 8>(begin, end);
00817 break;
00818 case 9: multi_merge_f<OutputIterator, 9>(begin, end);
00819 break;
00820 case 10: multi_merge_f<OutputIterator, 10>(begin, end);
00821 break;
00822 default: multi_merge_k(begin, end);
00823 break;
00824 }
00825
00826 size_ -= length;
00827 #endif
00828
00829
00830 {
00831 const unsigned_type num_segments_used = std::min<unsigned_type>(arity, k) - free_segments.size();
00832 const unsigned_type num_segments_trigger = k - (3 * k / 5);
00833
00834
00835
00836
00837
00838 STXXL_VERBOSE3("ext_merger compact? k=" << k << " #used=" << num_segments_used
00839 << " <= #trigger=" << num_segments_trigger << " ==> "
00840 << ((k > 1 && num_segments_used <= num_segments_trigger) ? "yes" : "no ")
00841 << " || "
00842 << ((k == 4 && !free_segments.empty() && !is_segment_empty(3)) ? "yes" : "no ")
00843 << " #free=" << free_segments.size());
00844 if (k > 1 && ((num_segments_used <= num_segments_trigger) ||
00845 (k == 4 && !free_segments.empty() && !is_segment_empty(3))))
00846 {
00847 compact_tree();
00848 }
00849 }
00850 }
00851
00852 private:
00853 #if STXXL_PQ_EXTERNAL_LOSER_TREE
00854
00855 template <class OutputIterator>
00856 void multi_merge_k(OutputIterator begin, OutputIterator end)
00857 {
00858 Entry * current_pos;
00859 value_type current_key;
00860 unsigned_type current_index;
00861 unsigned_type kReg = k;
00862 OutputIterator done = end;
00863 OutputIterator target = begin;
00864 unsigned_type winner_index = entry[0].index;
00865 value_type winner_key = entry[0].key;
00866
00867 while (target != done)
00868 {
00869
00870 *target = *(states[winner_index]);
00871
00872
00873 ++(states[winner_index]);
00874
00875 winner_key = *(states[winner_index]);
00876
00877
00878 if (is_sentinel(winner_key))
00879 deallocate_segment(winner_index);
00880
00881
00882
00883 for (unsigned_type i = (winner_index + kReg) >> 1; i > 0; i >>= 1)
00884 {
00885 current_pos = entry + i;
00886 current_key = current_pos->key;
00887 if (cmp(winner_key, current_key))
00888 {
00889 current_index = current_pos->index;
00890 current_pos->key = winner_key;
00891 current_pos->index = winner_index;
00892 winner_key = current_key;
00893 winner_index = current_index;
00894 }
00895 }
00896
00897 ++target;
00898 }
00899 entry[0].index = winner_index;
00900 entry[0].key = winner_key;
00901 }
00902
00903 template <class OutputIterator, unsigned LogK>
00904 void multi_merge_f(OutputIterator begin, OutputIterator end)
00905 {
00906 OutputIterator done = end;
00907 OutputIterator target = begin;
00908 unsigned_type winner_index = entry[0].index;
00909 Entry * reg_entry = entry;
00910 sequence_state * reg_states = states;
00911 value_type winner_key = entry[0].key;
00912
00913 assert(log_k >= LogK);
00914 while (target != done)
00915 {
00916
00917 *target = *(reg_states[winner_index]);
00918
00919
00920 ++(reg_states[winner_index]);
00921
00922 winner_key = *(reg_states[winner_index]);
00923
00924
00925
00926 if (is_sentinel(winner_key))
00927 deallocate_segment(winner_index);
00928
00929
00930 ++target;
00931
00932
00933 #define TreeStep(L) \
00934 if (1 << LogK >= 1 << L) { \
00935 Entry * pos ## L = reg_entry + ((winner_index + (1 << LogK)) >> (((int(LogK - L) + 1) >= 0) ? ((LogK - L) + 1) : 0)); \
00936 value_type key ## L = pos ## L->key; \
00937 if (cmp(winner_key, key ## L)) { \
00938 unsigned_type index ## L = pos ## L->index; \
00939 pos ## L->key = winner_key; \
00940 pos ## L->index = winner_index; \
00941 winner_key = key ## L; \
00942 winner_index = index ## L; \
00943 } \
00944 }
00945 TreeStep(10);
00946 TreeStep(9);
00947 TreeStep(8);
00948 TreeStep(7);
00949 TreeStep(6);
00950 TreeStep(5);
00951 TreeStep(4);
00952 TreeStep(3);
00953 TreeStep(2);
00954 TreeStep(1);
00955 #undef TreeStep
00956 }
00957 reg_entry[0].index = winner_index;
00958 reg_entry[0].key = winner_key;
00959 }
00960 #endif //STXXL_PQ_EXTERNAL_LOSER_TREE
00961
00962 public:
00963 bool is_space_available() const
00964 {
00965 return k < arity || !free_segments.empty();
00966 }
00967
00968
00969
00970
00971 template <class Merger>
00972 void insert_segment(Merger & another_merger, size_type segment_size)
00973 {
00974 STXXL_VERBOSE1("ext_merger::insert_segment(merger,...)" << this);
00975
00976 if (segment_size > 0)
00977 {
00978
00979 if (free_segments.empty())
00980 {
00981 double_k();
00982 }
00983 assert(!free_segments.empty());
00984 unsigned_type free_slot = free_segments.top();
00985 free_segments.pop();
00986
00987
00988
00989 assert(segment_size);
00990 unsigned_type nblocks = segment_size / block_type::size;
00991
00992 STXXL_VERBOSE1("ext_merger::insert_segment nblocks=" << nblocks);
00993 if (nblocks == 0)
00994 {
00995 STXXL_VERBOSE1("ext_merger::insert_segment(merger,...) WARNING: inserting a segment with " <<
00996 nblocks << " blocks");
00997 STXXL_VERBOSE1("THIS IS INEFFICIENT: TRY TO CHANGE PRIORITY QUEUE PARAMETERS");
00998 }
00999 unsigned_type first_size = segment_size % block_type::size;
01000 if (first_size == 0)
01001 {
01002 first_size = block_type::size;
01003 --nblocks;
01004 }
01005 block_manager * bm = block_manager::get_instance();
01006 std::list<bid_type> * bids = new std::list<bid_type>(nblocks);
01007 bm->new_blocks(alloc_strategy(), bids->begin(), bids->end());
01008 block_type * first_block = new block_type;
01009
01010 another_merger.multi_merge(
01011 first_block->begin() + (block_type::size - first_size),
01012 first_block->end());
01013
01014 STXXL_VERBOSE1("last element of first block " << *(first_block->end() - 1));
01015 assert(!cmp(*(first_block->begin() + (block_type::size - first_size)), *(first_block->end() - 1)));
01016
01017 assert(pool->size_write() > 0);
01018
01019 for (typename std::list<bid_type>::iterator curbid = bids->begin(); curbid != bids->end(); ++curbid)
01020 {
01021 block_type * b = pool->steal();
01022 another_merger.multi_merge(b->begin(), b->end());
01023 STXXL_VERBOSE1("first element of following block " << *curbid << " " << *(b->begin()));
01024 STXXL_VERBOSE1("last element of following block " << *curbid << " " << *(b->end() - 1));
01025 assert(!cmp(*(b->begin()), *(b->end() - 1)));
01026 pool->write(b, *curbid);
01027 STXXL_VERBOSE1("written to block " << *curbid << " cached in " << b);
01028 }
01029
01030 insert_segment(bids, first_block, first_size, free_slot);
01031
01032 size_ += segment_size;
01033
01034 #if STXXL_PQ_EXTERNAL_LOSER_TREE
01035
01036 value_type dummyKey;
01037 unsigned_type dummyIndex;
01038 unsigned_type dummyMask;
01039 update_on_insert((free_slot + k) >> 1, *(states[free_slot]), free_slot,
01040 &dummyKey, &dummyIndex, &dummyMask);
01041 #endif //STXXL_PQ_EXTERNAL_LOSER_TREE
01042 }
01043 else
01044 {
01045
01046 STXXL_VERBOSE1("Merged segment with zero size.");
01047 }
01048 }
01049
01050 size_type size() const { return size_; }
01051
01052 protected:
01058 void insert_segment(std::list<bid_type> * bidlist, block_type * first_block,
01059 unsigned_type first_size, unsigned_type slot)
01060 {
01061 STXXL_VERBOSE1("ext_merger::insert_segment(bidlist,...) " << this << " " << bidlist->size() << " " << slot);
01062 assert(!is_segment_allocated(slot));
01063 assert(first_size > 0);
01064
01065 sequence_state & new_sequence = states[slot];
01066 new_sequence.current = block_type::size - first_size;
01067 std::swap(new_sequence.block, first_block);
01068 delete first_block;
01069 std::swap(new_sequence.bids, bidlist);
01070 if (bidlist)
01071 {
01072 assert(bidlist->empty());
01073 delete bidlist;
01074 }
01075 new_sequence.allocated = true;
01076 assert(is_segment_allocated(slot));
01077 }
01078
01079
01080 void deallocate_segment(unsigned_type slot)
01081 {
01082 STXXL_VERBOSE1("ext_merger::deallocate_segment() deleting segment " << slot << " allocated=" << int(is_segment_allocated(slot)));
01083 assert(is_segment_allocated(slot));
01084 states[slot].allocated = false;
01085 states[slot].make_inf();
01086
01087
01088 free_segments.push(slot);
01089 }
01090
01091
01092 bool is_segment_empty(unsigned_type slot) const
01093 {
01094 return is_sentinel(*(states[slot]));
01095 }
01096
01097
01098
01099 bool is_segment_allocated(unsigned_type slot) const
01100 {
01101 return states[slot].allocated;
01102 }
01103 };
01104 }
01105
01107
01108 __STXXL_END_NAMESPACE
01109
01110 #endif // !STXXL_PQ_EXT_MERGER_HEADER
01111