00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015 #ifndef STXXL_SORT_STREAM_HEADER
00016 #define STXXL_SORT_STREAM_HEADER
00017
00018 #ifdef STXXL_BOOST_CONFIG
00019 #include <boost/config.hpp>
00020 #endif
00021
00022 #include <stxxl/bits/stream/stream.h>
00023 #include <stxxl/bits/mng/mng.h>
00024 #include <stxxl/bits/algo/sort_base.h>
00025 #include <stxxl/bits/algo/sort_helper.h>
00026 #include <stxxl/bits/algo/adaptor.h>
00027 #include <stxxl/bits/algo/run_cursor.h>
00028 #include <stxxl/bits/algo/losertree.h>
00029 #include <stxxl/bits/stream/sorted_runs.h>
00030
00031
00032 __STXXL_BEGIN_NAMESPACE
00033
00034 namespace stream
00035 {
00038
00039
00041
00043
00050 template <
00051 class Input_,
00052 class Cmp_,
00053 unsigned BlockSize_ = STXXL_DEFAULT_BLOCK_SIZE(typename Input_::value_type),
00054 class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY>
00055 class basic_runs_creator : private noncopyable
00056 {
00057 protected:
00058 Input_ & input;
00059 Cmp_ cmp;
00060
00061 public:
00062 typedef Cmp_ cmp_type;
00063 typedef typename Input_::value_type value_type;
00064 typedef typed_block<BlockSize_, value_type> block_type;
00065 typedef sort_helper::trigger_entry<block_type> trigger_entry_type;
00066 typedef sorted_runs<trigger_entry_type> sorted_runs_type;
00067
00068 private:
00069 typedef typename sorted_runs_type::run_type run_type;
00070 sorted_runs_type result_;
00071 unsigned_type m_;
00072 bool result_computed;
00073
00075 unsigned_type fetch(block_type * blocks, unsigned_type first_idx, unsigned_type last_idx)
00076 {
00077 typename element_iterator_traits<block_type>::element_iterator output =
00078 make_element_iterator(blocks, first_idx);
00079 unsigned_type curr_idx = first_idx;
00080 while (!input.empty() && curr_idx != last_idx) {
00081 *output = *input;
00082 ++input;
00083 ++output;
00084 ++curr_idx;
00085 }
00086 return curr_idx;
00087 }
00088
00089 void fill_with_max_value(block_type * blocks, unsigned_type num_blocks, unsigned_type first_idx)
00090 {
00091 unsigned_type last_idx = num_blocks * block_type::size;
00092 if (first_idx < last_idx) {
00093 typename element_iterator_traits<block_type>::element_iterator curr =
00094 make_element_iterator(blocks, first_idx);
00095 while (first_idx != last_idx) {
00096 *curr = cmp.max_value();
00097 ++curr;
00098 ++first_idx;
00099 }
00100 }
00101 }
00102
00104 void sort_run(block_type * run, unsigned_type elements)
00105 {
00106 potentially_parallel::
00107 sort(make_element_iterator(run, 0),
00108 make_element_iterator(run, elements),
00109 cmp);
00110 }
00111
00112 void compute_result();
00113
00114 public:
00119 basic_runs_creator(Input_ & i, Cmp_ c, unsigned_type memory_to_use) :
00120 input(i), cmp(c), m_(memory_to_use / BlockSize_ / sort_memory_usage_factor()), result_computed(false)
00121 {
00122 sort_helper::verify_sentinel_strict_weak_ordering(cmp);
00123 if (!(2 * BlockSize_ * sort_memory_usage_factor() <= memory_to_use)) {
00124 throw bad_parameter("stxxl::runs_creator<>:runs_creator(): INSUFFICIENT MEMORY provided, please increase parameter 'memory_to_use'");
00125 }
00126 assert(m_ > 0);
00127 }
00128
00132 const sorted_runs_type & result()
00133 {
00134 if (!result_computed)
00135 {
00136 compute_result();
00137 result_computed = true;
00138 #ifdef STXXL_PRINT_STAT_AFTER_RF
00139 STXXL_MSG(*stats::get_instance());
00140 #endif //STXXL_PRINT_STAT_AFTER_RF
00141 }
00142 return result_;
00143 }
00144 };
00145
00149 template <class Input_, class Cmp_, unsigned BlockSize_, class AllocStr_>
00150 void basic_runs_creator<Input_, Cmp_, BlockSize_, AllocStr_>::compute_result()
00151 {
00152 unsigned_type i = 0;
00153 unsigned_type m2 = m_ / 2;
00154 const unsigned_type el_in_run = m2 * block_type::size;
00155 STXXL_VERBOSE1("basic_runs_creator::compute_result m2=" << m2);
00156 unsigned_type blocks1_length = 0, blocks2_length = 0;
00157 block_type * Blocks1 = NULL;
00158
00159 #ifndef STXXL_SMALL_INPUT_PSORT_OPT
00160 Blocks1 = new block_type[m2 * 2];
00161 #else
00162 while (!input.empty() && blocks1_length != block_type::size)
00163 {
00164 result_.small_.push_back(*input);
00165 ++input;
00166 ++blocks1_length;
00167 }
00168
00169 if (blocks1_length == block_type::size && !input.empty())
00170 {
00171 Blocks1 = new block_type[m2 * 2];
00172 std::copy(result_.small_.begin(), result_.small_.end(), Blocks1[0].begin());
00173 result_.small_.clear();
00174 }
00175 else
00176 {
00177 STXXL_VERBOSE1("basic_runs_creator: Small input optimization, input length: " << blocks1_length);
00178 result_.elements = blocks1_length;
00179 potentially_parallel::
00180 sort(result_.small_.begin(), result_.small_.end(), cmp);
00181 return;
00182 }
00183 #endif //STXXL_SMALL_INPUT_PSORT_OPT
00184
00185
00186 blocks1_length = fetch(Blocks1, blocks1_length, el_in_run);
00187
00188
00189 sort_run(Blocks1, blocks1_length);
00190 if (blocks1_length <= block_type::size && input.empty())
00191 {
00192
00193 STXXL_VERBOSE1("basic_runs_creator: Small input optimization, input length: " << blocks1_length);
00194 assert(result_.small_.empty());
00195 result_.small_.insert(result_.small_.end(), Blocks1[0].begin(), Blocks1[0].begin() + blocks1_length);
00196 result_.elements = blocks1_length;
00197 delete[] Blocks1;
00198 return;
00199 }
00200
00201 block_type * Blocks2 = Blocks1 + m2;
00202 block_manager * bm = block_manager::get_instance();
00203 request_ptr * write_reqs = new request_ptr[m2];
00204 run_type run;
00205
00206 unsigned_type cur_run_size = div_ceil(blocks1_length, block_type::size);
00207 run.resize(cur_run_size);
00208 bm->new_blocks(AllocStr_(), make_bid_iterator(run.begin()), make_bid_iterator(run.end()));
00209
00210 disk_queues::get_instance()->set_priority_op(request_queue::WRITE);
00211
00212
00213 fill_with_max_value(Blocks1, cur_run_size, blocks1_length);
00214
00215 for (i = 0; i < cur_run_size; ++i)
00216 {
00217 run[i].value = Blocks1[i][0];
00218 write_reqs[i] = Blocks1[i].write(run[i].bid);
00219 }
00220 result_.runs.push_back(run);
00221 result_.runs_sizes.push_back(blocks1_length);
00222 result_.elements += blocks1_length;
00223
00224 if (input.empty())
00225 {
00226
00227 wait_all(write_reqs, write_reqs + cur_run_size);
00228 delete[] write_reqs;
00229 delete[] Blocks1;
00230 return;
00231 }
00232
00233 STXXL_VERBOSE1("Filling the second part of the allocated blocks");
00234 blocks2_length = fetch(Blocks2, 0, el_in_run);
00235
00236 if (input.empty())
00237 {
00238
00239
00240 blocks2_length += el_in_run;
00241 sort_run(Blocks1, blocks2_length);
00242 wait_all(write_reqs, write_reqs + cur_run_size);
00243 bm->delete_blocks(make_bid_iterator(run.begin()), make_bid_iterator(run.end()));
00244
00245 cur_run_size = div_ceil(blocks2_length, block_type::size);
00246 run.resize(cur_run_size);
00247 bm->new_blocks(AllocStr_(), make_bid_iterator(run.begin()), make_bid_iterator(run.end()));
00248
00249
00250 fill_with_max_value(Blocks1, cur_run_size, blocks2_length);
00251
00252 assert(cur_run_size > m2);
00253
00254 for (i = 0; i < m2; ++i)
00255 {
00256 run[i].value = Blocks1[i][0];
00257 write_reqs[i]->wait();
00258 write_reqs[i] = Blocks1[i].write(run[i].bid);
00259 }
00260
00261 request_ptr * write_reqs1 = new request_ptr[cur_run_size - m2];
00262
00263 for ( ; i < cur_run_size; ++i)
00264 {
00265 run[i].value = Blocks1[i][0];
00266 write_reqs1[i - m2] = Blocks1[i].write(run[i].bid);
00267 }
00268
00269 result_.runs[0] = run;
00270 result_.runs_sizes[0] = blocks2_length;
00271 result_.elements = blocks2_length;
00272
00273 wait_all(write_reqs, write_reqs + m2);
00274 delete[] write_reqs;
00275 wait_all(write_reqs1, write_reqs1 + cur_run_size - m2);
00276 delete[] write_reqs1;
00277
00278 delete[] Blocks1;
00279
00280 return;
00281 }
00282
00283
00284
00285 sort_run(Blocks2, blocks2_length);
00286
00287 cur_run_size = div_ceil(blocks2_length, block_type::size);
00288 run.resize(cur_run_size);
00289 bm->new_blocks(AllocStr_(), make_bid_iterator(run.begin()), make_bid_iterator(run.end()));
00290
00291 for (i = 0; i < cur_run_size; ++i)
00292 {
00293 run[i].value = Blocks2[i][0];
00294 write_reqs[i]->wait();
00295 write_reqs[i] = Blocks2[i].write(run[i].bid);
00296 }
00297 assert((blocks2_length % el_in_run) == 0);
00298
00299 result_.runs.push_back(run);
00300 result_.runs_sizes.push_back(blocks2_length);
00301 result_.elements += blocks2_length;
00302
00303 while (!input.empty())
00304 {
00305 blocks1_length = fetch(Blocks1, 0, el_in_run);
00306 sort_run(Blocks1, blocks1_length);
00307 cur_run_size = div_ceil(blocks1_length, block_type::size);
00308 run.resize(cur_run_size);
00309 bm->new_blocks(AllocStr_(), make_bid_iterator(run.begin()), make_bid_iterator(run.end()));
00310
00311
00312 fill_with_max_value(Blocks1, cur_run_size, blocks1_length);
00313
00314 for (i = 0; i < cur_run_size; ++i)
00315 {
00316 run[i].value = Blocks1[i][0];
00317 write_reqs[i]->wait();
00318 write_reqs[i] = Blocks1[i].write(run[i].bid);
00319 }
00320 result_.runs.push_back(run);
00321 result_.runs_sizes.push_back(blocks1_length);
00322 result_.elements += blocks1_length;
00323
00324 std::swap(Blocks1, Blocks2);
00325 std::swap(blocks1_length, blocks2_length);
00326 }
00327
00328 wait_all(write_reqs, write_reqs + m2);
00329 delete[] write_reqs;
00330 delete[] ((Blocks1 < Blocks2) ? Blocks1 : Blocks2);
00331 }
00332
00339 template <
00340 class Input_,
00341 class Cmp_,
00342 unsigned BlockSize_ = STXXL_DEFAULT_BLOCK_SIZE(typename Input_::value_type),
00343 class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY>
00344 class runs_creator : public basic_runs_creator<Input_, Cmp_, BlockSize_, AllocStr_>
00345 {
00346 private:
00347 typedef basic_runs_creator<Input_, Cmp_, BlockSize_, AllocStr_> base;
00348
00349 public:
00350 typedef typename base::block_type block_type;
00351
00352 private:
00353 using base::input;
00354
00355 public:
00360 runs_creator(Input_ & i, Cmp_ c, unsigned_type memory_to_use) : base(i, c, memory_to_use)
00361 { }
00362 };
00363
00364
00372 template <class ValueType_>
00373 struct use_push
00374 {
00375 typedef ValueType_ value_type;
00376 };
00377
00388 template <
00389 class ValueType_,
00390 class Cmp_,
00391 unsigned BlockSize_,
00392 class AllocStr_>
00393 class runs_creator<
00394 use_push<ValueType_>,
00395 Cmp_,
00396 BlockSize_,
00397 AllocStr_>
00398 : private noncopyable
00399 {
00400 Cmp_ cmp;
00401
00402 public:
00403 typedef Cmp_ cmp_type;
00404 typedef ValueType_ value_type;
00405 typedef typed_block<BlockSize_, value_type> block_type;
00406 typedef sort_helper::trigger_entry<block_type> trigger_entry_type;
00407 typedef sorted_runs<trigger_entry_type> sorted_runs_type;
00408 typedef sorted_runs_type result_type;
00409
00410 private:
00411 typedef typename sorted_runs_type::run_type run_type;
00412 sorted_runs_type result_;
00413 unsigned_type m_;
00414
00415 bool result_computed;
00416
00417 const unsigned_type m2;
00418 const unsigned_type el_in_run;
00419 unsigned_type cur_el;
00420 block_type * Blocks1;
00421 block_type * Blocks2;
00422 request_ptr * write_reqs;
00423 run_type run;
00424
00425 void fill_with_max_value(block_type * blocks, unsigned_type num_blocks, unsigned_type first_idx)
00426 {
00427 unsigned_type last_idx = num_blocks * block_type::size;
00428 if (first_idx < last_idx) {
00429 typename element_iterator_traits<block_type>::element_iterator curr =
00430 make_element_iterator(blocks, first_idx);
00431 while (first_idx != last_idx) {
00432 *curr = cmp.max_value();
00433 ++curr;
00434 ++first_idx;
00435 }
00436 }
00437 }
00438
00439 void sort_run(block_type * run, unsigned_type elements)
00440 {
00441 potentially_parallel::
00442 sort(make_element_iterator(run, 0),
00443 make_element_iterator(run, elements),
00444 cmp);
00445 }
00446
00447 void compute_result()
00448 {
00449 if (cur_el == 0)
00450 return;
00451
00452 unsigned_type cur_el_reg = cur_el;
00453 sort_run(Blocks1, cur_el_reg);
00454 result_.elements += cur_el_reg;
00455 if (cur_el_reg <= block_type::size && result_.elements == cur_el_reg)
00456 {
00457
00458 STXXL_VERBOSE1("runs_creator(use_push): Small input optimization, input length: " << cur_el_reg);
00459 result_.small_.resize(cur_el_reg);
00460 std::copy(Blocks1[0].begin(), Blocks1[0].begin() + cur_el_reg, result_.small_.begin());
00461 return;
00462 }
00463
00464 const unsigned_type cur_run_size = div_ceil(cur_el_reg, block_type::size);
00465 run.resize(cur_run_size);
00466 block_manager * bm = block_manager::get_instance();
00467 bm->new_blocks(AllocStr_(), make_bid_iterator(run.begin()), make_bid_iterator(run.end()));
00468
00469 disk_queues::get_instance()->set_priority_op(request_queue::WRITE);
00470
00471 result_.runs_sizes.push_back(cur_el_reg);
00472
00473
00474 fill_with_max_value(Blocks1, cur_run_size, cur_el_reg);
00475
00476 unsigned_type i = 0;
00477 for ( ; i < cur_run_size; ++i)
00478 {
00479 run[i].value = Blocks1[i][0];
00480 if (write_reqs[i].get())
00481 write_reqs[i]->wait();
00482
00483 write_reqs[i] = Blocks1[i].write(run[i].bid);
00484 }
00485 result_.runs.push_back(run);
00486
00487 for (i = 0; i < m2; ++i)
00488 if (write_reqs[i].get())
00489 write_reqs[i]->wait();
00490 }
00491
00492 void cleanup()
00493 {
00494 delete[] write_reqs;
00495 delete[] ((Blocks1 < Blocks2) ? Blocks1 : Blocks2);
00496 write_reqs = NULL;
00497 Blocks1 = Blocks2 = NULL;
00498 }
00499
00500 public:
00504 runs_creator(Cmp_ c, unsigned_type memory_to_use) :
00505 cmp(c), m_(memory_to_use / BlockSize_ / sort_memory_usage_factor()), result_computed(false),
00506 m2(m_ / 2),
00507 el_in_run(m2 * block_type::size),
00508 cur_el(0),
00509 Blocks1(new block_type[m2 * 2]),
00510 Blocks2(Blocks1 + m2),
00511 write_reqs(new request_ptr[m2])
00512 {
00513 sort_helper::verify_sentinel_strict_weak_ordering(cmp);
00514 if (!(2 * BlockSize_ * sort_memory_usage_factor() <= memory_to_use)) {
00515 throw bad_parameter("stxxl::runs_creator<>:runs_creator(): INSUFFICIENT MEMORY provided, please increase parameter 'memory_to_use'");
00516 }
00517 assert(m2 > 0);
00518 }
00519
00520 ~runs_creator()
00521 {
00522 if (!result_computed)
00523 cleanup();
00524 }
00525
00528 void push(const value_type & val)
00529 {
00530 assert(result_computed == false);
00531 unsigned_type cur_el_reg = cur_el;
00532 if (cur_el_reg < el_in_run)
00533 {
00534 Blocks1[cur_el_reg / block_type::size][cur_el_reg % block_type::size] = val;
00535 ++cur_el;
00536 return;
00537 }
00538
00539 assert(el_in_run == cur_el);
00540 cur_el = 0;
00541
00542
00543 sort_run(Blocks1, el_in_run);
00544 result_.elements += el_in_run;
00545
00546 const unsigned_type cur_run_size = div_ceil(el_in_run, block_type::size);
00547 run.resize(cur_run_size);
00548 block_manager * bm = block_manager::get_instance();
00549 bm->new_blocks(AllocStr_(), make_bid_iterator(run.begin()), make_bid_iterator(run.end()));
00550
00551 disk_queues::get_instance()->set_priority_op(request_queue::WRITE);
00552
00553 result_.runs_sizes.push_back(el_in_run);
00554
00555 for (unsigned_type i = 0; i < cur_run_size; ++i)
00556 {
00557 run[i].value = Blocks1[i][0];
00558 if (write_reqs[i].get())
00559 write_reqs[i]->wait();
00560
00561 write_reqs[i] = Blocks1[i].write(run[i].bid);
00562 }
00563
00564 result_.runs.push_back(run);
00565
00566 std::swap(Blocks1, Blocks2);
00567
00568 push(val);
00569 }
00570
00574 const sorted_runs_type & result()
00575 {
00576 if (1)
00577 {
00578 if (!result_computed)
00579 {
00580 compute_result();
00581 result_computed = true;
00582 cleanup();
00583 #ifdef STXXL_PRINT_STAT_AFTER_RF
00584 STXXL_MSG(*stats::get_instance());
00585 #endif //STXXL_PRINT_STAT_AFTER_RF
00586 }
00587 }
00588 return result_;
00589 }
00590 };
00591
00592
00599 template <class ValueType_>
00600 struct from_sorted_sequences
00601 {
00602 typedef ValueType_ value_type;
00603 };
00604
00615 template <
00616 class ValueType_,
00617 class Cmp_,
00618 unsigned BlockSize_,
00619 class AllocStr_>
00620 class runs_creator<
00621 from_sorted_sequences<ValueType_>,
00622 Cmp_,
00623 BlockSize_,
00624 AllocStr_>
00625 : private noncopyable
00626 {
00627 typedef ValueType_ value_type;
00628 typedef typed_block<BlockSize_, value_type> block_type;
00629 typedef sort_helper::trigger_entry<block_type> trigger_entry_type;
00630 typedef AllocStr_ alloc_strategy_type;
00631 Cmp_ cmp;
00632
00633 public:
00634 typedef Cmp_ cmp_type;
00635 typedef sorted_runs<trigger_entry_type> sorted_runs_type;
00636 typedef sorted_runs_type result_type;
00637
00638 private:
00639 typedef typename sorted_runs_type::run_type run_type;
00640
00641 sorted_runs_type result_;
00642 unsigned_type m_;
00643 buffered_writer<block_type> writer;
00644 block_type * cur_block;
00645 unsigned_type offset;
00646 unsigned_type iblock;
00647 unsigned_type irun;
00648 alloc_strategy_type alloc_strategy;
00649
00650 public:
00655 runs_creator(Cmp_ c, unsigned_type memory_to_use) :
00656 cmp(c),
00657 m_(memory_to_use / BlockSize_ / sort_memory_usage_factor()),
00658 writer(m_, m_ / 2),
00659 cur_block(writer.get_free_block()),
00660 offset(0),
00661 iblock(0),
00662 irun(0)
00663 {
00664 sort_helper::verify_sentinel_strict_weak_ordering(cmp);
00665 assert(m_ > 0);
00666 if (!(2 * BlockSize_ * sort_memory_usage_factor() <= memory_to_use)) {
00667 throw bad_parameter("stxxl::runs_creator<>:runs_creator(): INSUFFICIENT MEMORY provided, please increase parameter 'memory_to_use'");
00668 }
00669 }
00670
00673 void push(const value_type & val)
00674 {
00675 assert(offset < block_type::size);
00676
00677 (*cur_block)[offset] = val;
00678 ++offset;
00679
00680 if (offset == block_type::size)
00681 {
00682
00683
00684 block_manager * bm = block_manager::get_instance();
00685
00686 result_.runs.resize(irun + 1);
00687 result_.runs[irun].resize(iblock + 1);
00688 bm->new_blocks(
00689 alloc_strategy,
00690 make_bid_iterator(result_.runs[irun].begin() + iblock),
00691 make_bid_iterator(result_.runs[irun].end()),
00692 iblock
00693 );
00694
00695 result_.runs[irun][iblock].value = (*cur_block)[0];
00696 cur_block = writer.write(cur_block, result_.runs[irun][iblock].bid);
00697 ++iblock;
00698
00699 offset = 0;
00700 }
00701
00702 ++result_.elements;
00703 }
00704
00706 void finish()
00707 {
00708 if (offset == 0 && iblock == 0)
00709 return;
00710
00711
00712 result_.runs_sizes.resize(irun + 1);
00713 result_.runs_sizes.back() = iblock * block_type::size + offset;
00714
00715 if (offset)
00716 {
00717 while (offset != block_type::size)
00718 {
00719 (*cur_block)[offset] = cmp.max_value();
00720 ++offset;
00721 }
00722 offset = 0;
00723
00724 block_manager * bm = block_manager::get_instance();
00725
00726 result_.runs.resize(irun + 1);
00727 result_.runs[irun].resize(iblock + 1);
00728 bm->new_blocks(
00729 alloc_strategy,
00730 make_bid_iterator(result_.runs[irun].begin() + iblock),
00731 make_bid_iterator(result_.runs[irun].end()),
00732 iblock
00733 );
00734
00735 result_.runs[irun][iblock].value = (*cur_block)[0];
00736 cur_block = writer.write(cur_block, result_.runs[irun][iblock].bid);
00737 }
00738 else
00739 { }
00740
00741 alloc_strategy = alloc_strategy_type();
00742 iblock = 0;
00743 ++irun;
00744 }
00745
00749 const sorted_runs_type & result()
00750 {
00751 finish();
00752 writer.flush();
00753
00754 return result_;
00755 }
00756 };
00757
00758
00763 template <class RunsType_, class Cmp_>
00764 bool check_sorted_runs(const RunsType_ & sruns, Cmp_ cmp)
00765 {
00766 sort_helper::verify_sentinel_strict_weak_ordering(cmp);
00767 typedef typename RunsType_::block_type block_type;
00768 typedef typename block_type::value_type value_type;
00769 STXXL_VERBOSE2("Elements: " << sruns.elements);
00770 unsigned_type nruns = sruns.runs.size();
00771 STXXL_VERBOSE2("Runs: " << nruns);
00772 unsigned_type irun = 0;
00773 for (irun = 0; irun < nruns; ++irun)
00774 {
00775 const unsigned_type nblocks = sruns.runs[irun].size();
00776 block_type * blocks = new block_type[nblocks];
00777 request_ptr * reqs = new request_ptr[nblocks];
00778 for (unsigned_type j = 0; j < nblocks; ++j)
00779 {
00780 reqs[j] = blocks[j].read(sruns.runs[irun][j].bid);
00781 }
00782 wait_all(reqs, reqs + nblocks);
00783 for (unsigned_type j = 0; j < nblocks; ++j)
00784 {
00785 if (cmp(blocks[j][0], sruns.runs[irun][j].value) ||
00786 cmp(sruns.runs[irun][j].value, blocks[j][0]))
00787 {
00788 STXXL_ERRMSG("check_sorted_runs wrong trigger in the run");
00789 return false;
00790 }
00791 }
00792 if (!stxxl::is_sorted(make_element_iterator(blocks, 0),
00793 make_element_iterator(blocks, sruns.runs_sizes[irun]),
00794 cmp))
00795 {
00796 STXXL_ERRMSG("check_sorted_runs wrong order in the run");
00797 return false;
00798 }
00799
00800 delete[] reqs;
00801 delete[] blocks;
00802 }
00803
00804 STXXL_MSG("Checking runs finished successfully");
00805
00806 return true;
00807 }
00808
00809
00811
00813
00820 template <class RunsType_,
00821 class Cmp_,
00822 class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY>
00823 class basic_runs_merger : private noncopyable
00824 {
00825 protected:
00826 typedef RunsType_ sorted_runs_type;
00827 typedef AllocStr_ alloc_strategy;
00828 typedef typename sorted_runs_type::size_type size_type;
00829 typedef Cmp_ value_cmp;
00830 typedef typename sorted_runs_type::run_type run_type;
00831 typedef typename sorted_runs_type::block_type block_type;
00832 typedef block_type out_block_type;
00833 typedef typename run_type::value_type trigger_entry_type;
00834 typedef block_prefetcher<block_type, typename run_type::iterator> prefetcher_type;
00835 typedef run_cursor2<block_type, prefetcher_type> run_cursor_type;
00836 typedef sort_helper::run_cursor2_cmp<block_type, prefetcher_type, value_cmp> run_cursor2_cmp_type;
00837 typedef loser_tree<run_cursor_type, run_cursor2_cmp_type> loser_tree_type;
00838 typedef stxxl::int64 diff_type;
00839 typedef std::pair<typename block_type::iterator, typename block_type::iterator> sequence;
00840 typedef typename std::vector<sequence>::size_type seqs_size_type;
00841
00842 public:
00844 typedef typename sorted_runs_type::value_type value_type;
00845
00846 private:
00847 sorted_runs_type sruns;
00848 value_cmp cmp;
00849 size_type elements_remaining;
00850
00851 out_block_type * current_block;
00852 unsigned_type buffer_pos;
00853 value_type current_value;
00854
00855 run_type consume_seq;
00856 int_type * prefetch_seq;
00857 prefetcher_type * prefetcher;
00858 loser_tree_type * losers;
00859 #if STXXL_PARALLEL_MULTIWAY_MERGE
00860 std::vector<sequence> * seqs;
00861 std::vector<block_type *> * buffers;
00862 diff_type num_currently_mergeable;
00863 #endif
00864
00865 #if STXXL_CHECK_ORDER_IN_SORTS
00866 value_type last_element;
00867 #endif //STXXL_CHECK_ORDER_IN_SORTS
00868
00870
00871 void merge_recursively(unsigned_type memory_to_use);
00872
00873 void deallocate_prefetcher()
00874 {
00875 if (prefetcher)
00876 {
00877 delete losers;
00878 #if STXXL_PARALLEL_MULTIWAY_MERGE
00879 delete seqs;
00880 delete buffers;
00881 #endif
00882 delete prefetcher;
00883 delete[] prefetch_seq;
00884 prefetcher = NULL;
00885 }
00886
00887 sruns.deallocate_blocks();
00888 }
00889
00890 void fill_current_block()
00891 {
00892 STXXL_VERBOSE1("fill_current_block");
00893 if (do_parallel_merge())
00894 {
00895 #if STXXL_PARALLEL_MULTIWAY_MERGE
00896
00897 diff_type rest = out_block_type::size;
00898
00899 do
00900 {
00901 if (num_currently_mergeable < rest)
00902 {
00903 if (!prefetcher || prefetcher->empty())
00904 {
00905
00906 num_currently_mergeable = elements_remaining;
00907 }
00908 else
00909 {
00910 num_currently_mergeable = sort_helper::count_elements_less_equal(
00911 *seqs, consume_seq[prefetcher->pos()].value, cmp);
00912 }
00913 }
00914
00915 diff_type output_size = STXXL_MIN(num_currently_mergeable, rest);
00916
00917 STXXL_VERBOSE1("before merge " << output_size);
00918
00919 stxxl::parallel::multiway_merge((*seqs).begin(), (*seqs).end(), current_block->end() - rest, cmp, output_size);
00920
00921
00922 rest -= output_size;
00923 num_currently_mergeable -= output_size;
00924
00925 STXXL_VERBOSE1("after merge");
00926
00927 sort_helper::refill_or_remove_empty_sequences(*seqs, *buffers, *prefetcher);
00928 } while (rest > 0 && (*seqs).size() > 0);
00929
00930 #if STXXL_CHECK_ORDER_IN_SORTS
00931 if (!stxxl::is_sorted(current_block->begin(), current_block->end(), cmp))
00932 {
00933 for (value_type * i = current_block->begin() + 1; i != current_block->end(); ++i)
00934 if (cmp(*i, *(i - 1)))
00935 {
00936 STXXL_VERBOSE1("Error at position " << (i - current_block->begin()));
00937 }
00938 assert(false);
00939 }
00940 #endif //STXXL_CHECK_ORDER_IN_SORTS
00941
00942
00943 #else
00944 STXXL_THROW_UNREACHABLE();
00945 #endif //STXXL_PARALLEL_MULTIWAY_MERGE
00946 }
00947 else
00948 {
00949
00950 losers->multi_merge(current_block->elem, current_block->elem + STXXL_MIN<size_type>(out_block_type::size, elements_remaining));
00951
00952 }
00953 STXXL_VERBOSE1("current block filled");
00954
00955 if (elements_remaining <= out_block_type::size)
00956 deallocate_prefetcher();
00957 }
00958
00959 public:
00964 basic_runs_merger(const sorted_runs_type & r, value_cmp c, unsigned_type memory_to_use) :
00965 sruns(r),
00966 cmp(c),
00967 elements_remaining(sruns.elements),
00968 current_block(NULL),
00969 buffer_pos(0),
00970 prefetch_seq(NULL),
00971 prefetcher(NULL),
00972 losers(NULL)
00973 #if STXXL_PARALLEL_MULTIWAY_MERGE
00974 , seqs(NULL),
00975 buffers(NULL),
00976 num_currently_mergeable(0)
00977 #endif
00978 #if STXXL_CHECK_ORDER_IN_SORTS
00979 , last_element(cmp.min_value())
00980 #endif
00981 {
00982 initialize(r, memory_to_use);
00983 }
00984
00985 protected:
00986 void initialize(const sorted_runs_type & r, unsigned_type memory_to_use)
00987 {
00988 sort_helper::verify_sentinel_strict_weak_ordering(cmp);
00989
00990 sruns = r;
00991 elements_remaining = r.elements;
00992
00993 if (empty())
00994 return;
00995
00996 if (!sruns.small_run().empty())
00997 {
00998
00999 STXXL_VERBOSE1("basic_runs_merger: small input optimization, input length: " << elements_remaining);
01000 assert(elements_remaining == size_type(sruns.small_run().size()));
01001 assert(sruns.small_run().size() <= out_block_type::size);
01002 current_block = new out_block_type;
01003 std::copy(sruns.small_run().begin(), sruns.small_run().end(), current_block->begin());
01004 current_value = current_block->elem[0];
01005 buffer_pos = 1;
01006
01007 return;
01008 }
01009
01010 #if STXXL_CHECK_ORDER_IN_SORTS
01011 assert(check_sorted_runs(r, cmp));
01012 #endif //STXXL_CHECK_ORDER_IN_SORTS
01013
01014 disk_queues::get_instance()->set_priority_op(request_queue::WRITE);
01015
01016 int_type disks_number = config::get_instance()->disks_number();
01017 unsigned_type min_prefetch_buffers = 2 * disks_number;
01018 unsigned_type input_buffers = (memory_to_use > sizeof(out_block_type) ? memory_to_use - sizeof(out_block_type) : 0) / block_type::raw_size;
01019 unsigned_type nruns = sruns.runs.size();
01020
01021 if (input_buffers < nruns + min_prefetch_buffers)
01022 {
01023
01024
01025 STXXL_WARNMSG_RECURSIVE_SORT("The implementation of sort requires more than one merge pass, therefore for a better");
01026 STXXL_WARNMSG_RECURSIVE_SORT("efficiency decrease block size of run storage (a parameter of the run_creator)");
01027 STXXL_WARNMSG_RECURSIVE_SORT("or increase the amount memory dedicated to the merger.");
01028 STXXL_WARNMSG_RECURSIVE_SORT("m=" << input_buffers << " nruns=" << nruns << " prefetch_blocks=" << min_prefetch_buffers);
01029
01030
01031 unsigned_type recursive_merge_buffers = memory_to_use / block_type::raw_size;
01032 if (recursive_merge_buffers < 2 * min_prefetch_buffers + 1 + 2) {
01033
01034
01035 STXXL_ERRMSG("There are only m=" << recursive_merge_buffers << " blocks available for recursive merging, but "
01036 << min_prefetch_buffers << "+" << min_prefetch_buffers << "+1 are needed read-ahead/write-back/output, and");
01037 STXXL_ERRMSG("the merger requires memory to store at least two input blocks internally. Aborting.");
01038 throw bad_parameter("basic_runs_merger::sort(): INSUFFICIENT MEMORY provided, please increase parameter 'memory_to_use'");
01039 }
01040
01041 merge_recursively(memory_to_use);
01042
01043 nruns = sruns.runs.size();
01044 }
01045
01046 assert(nruns + min_prefetch_buffers <= input_buffers);
01047
01048 unsigned_type i;
01049 unsigned_type prefetch_seq_size = 0;
01050 for (i = 0; i < nruns; ++i)
01051 {
01052 prefetch_seq_size += sruns.runs[i].size();
01053 }
01054
01055 consume_seq.resize(prefetch_seq_size);
01056 prefetch_seq = new int_type[prefetch_seq_size];
01057
01058 typename run_type::iterator copy_start = consume_seq.begin();
01059 for (i = 0; i < nruns; ++i)
01060 {
01061 copy_start = std::copy(
01062 sruns.runs[i].begin(),
01063 sruns.runs[i].end(),
01064 copy_start);
01065 }
01066
01067 std::stable_sort(consume_seq.begin(), consume_seq.end(),
01068 sort_helper::trigger_entry_cmp<trigger_entry_type, value_cmp>(cmp) _STXXL_SORT_TRIGGER_FORCE_SEQUENTIAL);
01069
01070 const unsigned_type n_prefetch_buffers = STXXL_MAX(min_prefetch_buffers, input_buffers - nruns);
01071
01072 #if STXXL_SORT_OPTIMAL_PREFETCHING
01073
01074 const int_type n_opt_prefetch_buffers = min_prefetch_buffers + (3 * (n_prefetch_buffers - min_prefetch_buffers)) / 10;
01075
01076 compute_prefetch_schedule(
01077 consume_seq,
01078 prefetch_seq,
01079 n_opt_prefetch_buffers,
01080 disks_number);
01081 #else
01082 for (i = 0; i < prefetch_seq_size; ++i)
01083 prefetch_seq[i] = i;
01084 #endif //STXXL_SORT_OPTIMAL_PREFETCHING
01085
01086 prefetcher = new prefetcher_type(
01087 consume_seq.begin(),
01088 consume_seq.end(),
01089 prefetch_seq,
01090 STXXL_MIN(nruns + n_prefetch_buffers, prefetch_seq_size));
01091
01092 if (do_parallel_merge())
01093 {
01094 #if STXXL_PARALLEL_MULTIWAY_MERGE
01095
01096 seqs = new std::vector<sequence>(nruns);
01097 buffers = new std::vector<block_type *>(nruns);
01098
01099 for (unsigned_type i = 0; i < nruns; ++i)
01100 {
01101 (*buffers)[i] = prefetcher->pull_block();
01102 (*seqs)[i] = std::make_pair((*buffers)[i]->begin(), (*buffers)[i]->end());
01103 }
01104
01105 #else
01106 STXXL_THROW_UNREACHABLE();
01107 #endif //STXXL_PARALLEL_MULTIWAY_MERGE
01108 }
01109 else
01110 {
01111
01112 losers = new loser_tree_type(prefetcher, nruns, run_cursor2_cmp_type(cmp));
01113
01114 }
01115
01116 current_block = new out_block_type;
01117 fill_current_block();
01118
01119 current_value = current_block->elem[0];
01120 buffer_pos = 1;
01121 }
01122
01123 public:
01125 bool empty() const
01126 {
01127 return elements_remaining == 0;
01128 }
01129
01131 const value_type & operator * () const
01132 {
01133 assert(!empty());
01134 return current_value;
01135 }
01136
01138 const value_type * operator -> () const
01139 {
01140 return &(operator * ());
01141 }
01142
01144 basic_runs_merger & operator ++ ()
01145 {
01146 assert(!empty());
01147
01148 --elements_remaining;
01149
01150 if (buffer_pos != out_block_type::size)
01151 {
01152 current_value = current_block->elem[buffer_pos];
01153 ++buffer_pos;
01154 }
01155 else
01156 {
01157 if (!empty())
01158 {
01159 fill_current_block();
01160
01161 #if STXXL_CHECK_ORDER_IN_SORTS
01162 assert(stxxl::is_sorted(current_block->elem, current_block->elem + STXXL_MIN<size_type>(elements_remaining, current_block->size), cmp));
01163 assert(!cmp(current_block->elem[0], current_value));
01164 #endif //STXXL_CHECK_ORDER_IN_SORTS
01165 current_value = current_block->elem[0];
01166 buffer_pos = 1;
01167 }
01168 }
01169
01170 #if STXXL_CHECK_ORDER_IN_SORTS
01171 if (!empty())
01172 {
01173 assert(!cmp(current_value, last_element));
01174 last_element = current_value;
01175 }
01176 #endif //STXXL_CHECK_ORDER_IN_SORTS
01177
01178 return *this;
01179 }
01180
01183 virtual ~basic_runs_merger()
01184 {
01185 deallocate_prefetcher();
01186 delete current_block;
01187 }
01188 };
01189
01190
01191 template <class RunsType_, class Cmp_, class AllocStr_>
01192 void basic_runs_merger<RunsType_, Cmp_, AllocStr_>::merge_recursively(unsigned_type memory_to_use)
01193 {
01194 block_manager * bm = block_manager::get_instance();
01195 unsigned_type ndisks = config::get_instance()->disks_number();
01196 unsigned_type nwrite_buffers = 2 * ndisks;
01197 unsigned_type memory_for_write_buffers = nwrite_buffers * sizeof(block_type);
01198
01199
01200 unsigned_type recursive_merger_memory_prefetch_buffers = 2 * ndisks * sizeof(block_type);
01201 unsigned_type recursive_merger_memory_out_block = sizeof(block_type);
01202 unsigned_type memory_for_buffers = memory_for_write_buffers
01203 + recursive_merger_memory_prefetch_buffers
01204 + recursive_merger_memory_out_block;
01205
01206 unsigned_type max_arity = (memory_to_use > memory_for_buffers ? memory_to_use - memory_for_buffers : 0) / block_type::raw_size;
01207
01208 unsigned_type nruns = sruns.runs.size();
01209 const unsigned_type merge_factor = optimal_merge_factor(nruns, max_arity);
01210 assert(merge_factor > 1);
01211 assert(merge_factor <= max_arity);
01212 while (nruns > max_arity)
01213 {
01214 unsigned_type new_nruns = div_ceil(nruns, merge_factor);
01215 STXXL_VERBOSE("Starting new merge phase: nruns: " << nruns <<
01216 " opt_merge_factor: " << merge_factor << " max_arity: " << max_arity << " new_nruns: " << new_nruns);
01217
01218 sorted_runs_type new_runs;
01219 new_runs.runs.resize(new_nruns);
01220 new_runs.runs_sizes.resize(new_nruns);
01221 new_runs.elements = sruns.elements;
01222
01223 unsigned_type runs_left = nruns;
01224 unsigned_type cur_out_run = 0;
01225 unsigned_type elements_in_new_run = 0;
01226
01227 while (runs_left > 0)
01228 {
01229 int_type runs2merge = STXXL_MIN(runs_left, merge_factor);
01230 elements_in_new_run = 0;
01231 for (unsigned_type i = nruns - runs_left; i < (nruns - runs_left + runs2merge); ++i)
01232 {
01233 elements_in_new_run += sruns.runs_sizes[i];
01234 }
01235 const unsigned_type blocks_in_new_run1 = div_ceil(elements_in_new_run, block_type::size);
01236
01237 new_runs.runs_sizes[cur_out_run] = elements_in_new_run;
01238
01239 new_runs.runs[cur_out_run++].resize(blocks_in_new_run1);
01240 runs_left -= runs2merge;
01241 }
01242
01243
01244 for (unsigned_type i = 0; i < new_runs.runs.size(); ++i)
01245 bm->new_blocks(alloc_strategy(), make_bid_iterator(new_runs.runs[i].begin()), make_bid_iterator(new_runs.runs[i].end()));
01246
01247
01248 runs_left = nruns;
01249 cur_out_run = 0;
01250 size_type elements_left = sruns.elements;
01251
01252 while (runs_left > 0)
01253 {
01254 unsigned_type runs2merge = STXXL_MIN(runs_left, merge_factor);
01255 STXXL_VERBOSE("Merging " << runs2merge << " runs");
01256
01257 sorted_runs_type cur_runs;
01258 cur_runs.runs.resize(runs2merge);
01259 cur_runs.runs_sizes.resize(runs2merge);
01260
01261 std::copy(sruns.runs.begin() + nruns - runs_left,
01262 sruns.runs.begin() + nruns - runs_left + runs2merge,
01263 cur_runs.runs.begin());
01264 std::copy(sruns.runs_sizes.begin() + nruns - runs_left,
01265 sruns.runs_sizes.begin() + nruns - runs_left + runs2merge,
01266 cur_runs.runs_sizes.begin());
01267
01268 runs_left -= runs2merge;
01269
01270
01271
01272
01273
01274 cur_runs.elements = new_runs.runs_sizes[cur_out_run];
01275 elements_left -= cur_runs.elements;
01276
01277 if (runs2merge > 1)
01278 {
01279 basic_runs_merger<RunsType_, Cmp_, AllocStr_> merger(cur_runs, cmp, memory_to_use - memory_for_write_buffers);
01280
01281 {
01282 buf_ostream<block_type, typename run_type::iterator> out(
01283 new_runs.runs[cur_out_run].begin(),
01284 nwrite_buffers);
01285
01286 size_type cnt = 0;
01287 const size_type cnt_max = cur_runs.elements;
01288
01289 while (cnt != cnt_max)
01290 {
01291 *out = *merger;
01292 if ((cnt % block_type::size) == 0)
01293 new_runs.runs[cur_out_run][cnt / size_type(block_type::size)].value = *merger;
01294
01295 ++cnt;
01296 ++out;
01297 ++merger;
01298 }
01299 assert(merger.empty());
01300
01301 while (cnt % block_type::size)
01302 {
01303 *out = cmp.max_value();
01304 ++out;
01305 ++cnt;
01306 }
01307 }
01308 }
01309 else
01310 {
01311 bm->delete_blocks(
01312 make_bid_iterator(new_runs.runs.back().begin()),
01313 make_bid_iterator(new_runs.runs.back().end()));
01314
01315 assert(cur_runs.runs.size() == 1);
01316
01317 std::copy(cur_runs.runs.front().begin(),
01318 cur_runs.runs.front().end(),
01319 new_runs.runs.back().begin());
01320 new_runs.runs_sizes.back() = cur_runs.runs_sizes.front();
01321 }
01322
01323 ++cur_out_run;
01324 }
01325 assert(elements_left == 0);
01326
01327 nruns = new_nruns;
01328 sruns = new_runs;
01329 }
01330 }
01331
01332
01339 template <class RunsType_,
01340 class Cmp_,
01341 class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY>
01342 class runs_merger : public basic_runs_merger<RunsType_, Cmp_, AllocStr_>
01343 {
01344 private:
01345 typedef RunsType_ sorted_runs_type;
01346 typedef basic_runs_merger<RunsType_, Cmp_, AllocStr_> base;
01347 typedef typename base::value_cmp value_cmp;
01348 typedef typename base::block_type block_type;
01349
01350 public:
01355 runs_merger(const sorted_runs_type & r, value_cmp c, unsigned_type memory_to_use) :
01356 base(r, c, memory_to_use)
01357 { }
01358 };
01359
01360
01362
01364
01372 template <class Input_,
01373 class Cmp_,
01374 unsigned BlockSize_ = STXXL_DEFAULT_BLOCK_SIZE(typename Input_::value_type),
01375 class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY,
01376 class runs_creator_type = runs_creator<Input_, Cmp_, BlockSize_, AllocStr_> >
01377 class sort : public noncopyable
01378 {
01379 typedef typename runs_creator_type::sorted_runs_type sorted_runs_type;
01380 typedef runs_merger<sorted_runs_type, Cmp_, AllocStr_> runs_merger_type;
01381
01382 runs_creator_type creator;
01383 runs_merger_type merger;
01384
01385 public:
01387 typedef typename Input_::value_type value_type;
01388
01393 sort(Input_ & in, Cmp_ c, unsigned_type memory_to_use) :
01394 creator(in, c, memory_to_use),
01395 merger(creator.result(), c, memory_to_use)
01396 {
01397 sort_helper::verify_sentinel_strict_weak_ordering(c);
01398 }
01399
01405 sort(Input_ & in, Cmp_ c, unsigned_type memory_to_use_rc, unsigned_type memory_to_use_m) :
01406 creator(in, c, memory_to_use_rc),
01407 merger(creator.result(), c, memory_to_use_m)
01408 {
01409 sort_helper::verify_sentinel_strict_weak_ordering(c);
01410 }
01411
01412
01414 bool empty() const
01415 {
01416 return merger.empty();
01417 }
01418
01420 const value_type & operator * () const
01421 {
01422 assert(!empty());
01423 return *merger;
01424 }
01425
01426 const value_type * operator -> () const
01427 {
01428 assert(!empty());
01429 return merger.operator -> ();
01430 }
01431
01433 sort & operator ++ ()
01434 {
01435 ++merger;
01436 return *this;
01437 }
01438 };
01439
01444 template <
01445 class ValueType_,
01446 unsigned BlockSize_>
01447 class compute_sorted_runs_type
01448 {
01449 typedef ValueType_ value_type;
01450 typedef BID<BlockSize_> bid_type;
01451 typedef sort_helper::trigger_entry<bid_type, value_type> trigger_entry_type;
01452
01453 public:
01454 typedef sorted_runs<trigger_entry_type> result;
01455 };
01456
01458 }
01459
01462
01464
01473 template <unsigned BlockSize,
01474 class RandomAccessIterator,
01475 class CmpType,
01476 class AllocStr>
01477 void sort(RandomAccessIterator begin,
01478 RandomAccessIterator end,
01479 CmpType cmp,
01480 unsigned_type MemSize,
01481 AllocStr AS)
01482 {
01483 STXXL_UNUSED(AS);
01484 #ifdef BOOST_MSVC
01485 typedef typename streamify_traits<RandomAccessIterator>::stream_type InputType;
01486 #else
01487 typedef __typeof__(stream::streamify(begin, end)) InputType;
01488 #endif //BOOST_MSVC
01489 InputType Input(begin, end);
01490 typedef stream::sort<InputType, CmpType, BlockSize, AllocStr> sorter_type;
01491 sorter_type Sort(Input, cmp, MemSize);
01492 stream::materialize(Sort, begin);
01493 }
01494
01496
01497 __STXXL_END_NAMESPACE
01498
01499 #endif // !STXXL_SORT_STREAM_HEADER
01500