00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014 #ifndef STXXL_SORT_STREAM_HEADER
00015 #define STXXL_SORT_STREAM_HEADER
00016
00017 #ifdef STXXL_BOOST_CONFIG
00018 #include <boost/config.hpp>
00019 #endif
00020
00021 #include <stxxl/bits/stream/stream.h>
00022 #include <stxxl/sort>
00023
00024
00025 __STXXL_BEGIN_NAMESPACE
00026
00027 namespace stream
00028 {
00031
00032 template <class ValueType, class TriggerEntryType>
00033 struct sorted_runs
00034 {
00035 typedef TriggerEntryType trigger_entry_type;
00036 typedef ValueType value_type;
00037 typedef typename trigger_entry_type::bid_type bid_type;
00038 typedef stxxl::int64 size_type;
00039 typedef std::vector<trigger_entry_type> run_type;
00040 typedef typed_block<bid_type::size, value_type> block_type;
00041 size_type elements;
00042 std::vector<run_type> runs;
00043 std::vector<unsigned_type> runs_sizes;
00044
00045
00046
00047
00048
00049
00050 std::vector<ValueType> small_;
00051
00052 sorted_runs() : elements(0) { }
00053
00058 void deallocate_blocks()
00059 {
00060 block_manager * bm = block_manager::get_instance();
00061 for (unsigned_type i = 0; i < runs.size(); ++i)
00062 bm->delete_blocks(
00063 trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(runs[i].begin()),
00064 trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(runs[i].end()));
00065
00066
00067 runs.clear();
00068 }
00069 };
00070
00078 template <
00079 class Input_,
00080 class Cmp_,
00081 unsigned BlockSize_ = STXXL_DEFAULT_BLOCK_SIZE(typename Input_::value_type),
00082 class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY>
00083 class runs_creator : private noncopyable
00084 {
00085 Input_ & input;
00086 Cmp_ cmp;
00087
00088 public:
00089 typedef Cmp_ cmp_type;
00090 typedef typename Input_::value_type value_type;
00091 typedef BID<BlockSize_> bid_type;
00092 typedef typed_block<BlockSize_, value_type> block_type;
00093 typedef sort_local::trigger_entry<bid_type, value_type> trigger_entry_type;
00094 typedef sorted_runs<value_type, trigger_entry_type> sorted_runs_type;
00095
00096 private:
00097 typedef typename sorted_runs_type::run_type run_type;
00098 sorted_runs_type result_;
00099 unsigned_type m_;
00100 bool result_computed;
00101
00102 void compute_result();
00103 void sort_run(block_type * run, unsigned_type elements)
00104 {
00105 if (block_type::has_filler)
00106 std::sort(
00107 #if 1
00108 ArrayOfSequencesIterator<
00109 block_type, typename block_type::value_type, block_type::size
00110 >(run, 0),
00111 ArrayOfSequencesIterator<
00112 block_type, typename block_type::value_type, block_type::size
00113 >(run, elements),
00114 #else
00115 TwoToOneDimArrayRowAdaptor<
00116 block_type, value_type, block_type::size
00117 >(run, 0),
00118 TwoToOneDimArrayRowAdaptor<
00119 block_type, value_type, block_type::size
00120 >(run, elements),
00121 #endif
00122 cmp);
00123
00124 else
00125 std::sort(run[0].elem, run[0].elem + elements, cmp);
00126 }
00127
00128 public:
00133 runs_creator(Input_ & i, Cmp_ c, unsigned_type memory_to_use) :
00134 input(i), cmp(c), m_(memory_to_use / BlockSize_ / sort_memory_usage_factor()), result_computed(false)
00135 {
00136 assert(2 * BlockSize_ * sort_memory_usage_factor() <= memory_to_use);
00137 }
00138
00142 const sorted_runs_type & result()
00143 {
00144 if (!result_computed)
00145 {
00146 compute_result();
00147 result_computed = true;
00148 #ifdef STXXL_PRINT_STAT_AFTER_RF
00149 STXXL_MSG(*stats::get_instance());
00150 #endif
00151 }
00152 return result_;
00153 }
00154 };
00155
00156
00157 template <class Input_, class Cmp_, unsigned BlockSize_, class AllocStr_>
00158 void runs_creator<Input_, Cmp_, BlockSize_, AllocStr_>::compute_result()
00159 {
00160 unsigned_type i = 0;
00161 unsigned_type m2 = m_ / 2;
00162 const unsigned_type el_in_run = m2 * block_type::size;
00163 STXXL_VERBOSE1("runs_creator::compute_result m2=" << m2);
00164 unsigned_type pos = 0;
00165
00166 #ifndef STXXL_SMALL_INPUT_PSORT_OPT
00167 block_type * Blocks1 = new block_type[m2 * 2];
00168 #else
00169 #if 0
00170 block_type * Blocks1 = new block_type[1];
00171
00172 while (!input.empty() && pos != block_type::size)
00173 {
00174 Blocks1[pos / block_type::size][pos % block_type::size] = *input;
00175 ++input;
00176 ++pos;
00177 }
00178 #endif
00179
00180 while (!input.empty() && pos != block_type::size)
00181 {
00182 result_.small_.push_back(*input);
00183 ++input;
00184 ++pos;
00185 }
00186
00187 block_type * Blocks1;
00188
00189 if (pos == block_type::size)
00190 {
00191 block_type * NewBlocks = new block_type[m2 * 2];
00192 std::copy(result_.small_.begin(), result_.small_.end(), NewBlocks[0].begin());
00193 result_.small_.clear();
00194
00195 Blocks1 = NewBlocks;
00196 }
00197 else
00198 {
00199 STXXL_VERBOSE1("runs_creator: Small input optimization, input length: " << pos);
00200 result_.elements = pos;
00201 std::sort(result_.small_.begin(), result_.small_.end(), cmp);
00202 return;
00203 }
00204 #endif
00205
00206 while (!input.empty() && pos != el_in_run)
00207 {
00208 Blocks1[pos / block_type::size][pos % block_type::size] = *input;
00209 ++input;
00210 ++pos;
00211 }
00212
00213
00214 sort_run(Blocks1, pos);
00215 result_.elements = pos;
00216 if (pos < block_type::size && input.empty())
00217 {
00218 STXXL_VERBOSE1("runs_creator: Small input optimization, input length: " << pos);
00219 result_.small_.resize(pos);
00220 std::copy(Blocks1[0].begin(), Blocks1[0].begin() + pos, result_.small_.begin());
00221 delete[] Blocks1;
00222 return;
00223 }
00224
00225
00226 block_type * Blocks2 = Blocks1 + m2;
00227 block_manager * bm = block_manager::get_instance();
00228 request_ptr * write_reqs = new request_ptr[m2];
00229 run_type run;
00230
00231
00232 unsigned_type cur_run_size = div_and_round_up(pos, block_type::size);
00233 run.resize(cur_run_size);
00234 bm->new_blocks(AllocStr_(),
00235 trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(run.begin()),
00236 trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(run.end())
00237 );
00238
00239 disk_queues::get_instance()->set_priority_op(disk_queue::WRITE);
00240
00241 result_.runs_sizes.push_back(pos);
00242
00243
00244 for ( ; pos != el_in_run; ++pos)
00245 Blocks1[pos / block_type::size][pos % block_type::size] = cmp.max_value();
00246
00247
00248 for (i = 0; i < cur_run_size; ++i)
00249 {
00250 run[i].value = Blocks1[i][0];
00251 write_reqs[i] = Blocks1[i].write(run[i].bid);
00252
00253 }
00254 result_.runs.push_back(run);
00255
00256 if (input.empty())
00257 {
00258
00259 wait_all(write_reqs, write_reqs + cur_run_size);
00260 delete[] write_reqs;
00261 delete[] Blocks1;
00262 return;
00263 }
00264
00265 STXXL_VERBOSE1("Filling the second part of the allocated blocks");
00266 pos = 0;
00267 while (!input.empty() && pos != el_in_run)
00268 {
00269 Blocks2[pos / block_type::size][pos % block_type::size] = *input;
00270 ++input;
00271 ++pos;
00272 }
00273 result_.elements += pos;
00274
00275 if (input.empty())
00276 {
00277
00278 pos += el_in_run;
00279 sort_run(Blocks1, pos);
00280 wait_all(write_reqs, write_reqs + cur_run_size);
00281 bm->delete_blocks(trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(run.begin()),
00282 trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(run.end()));
00283
00284 cur_run_size = div_and_round_up(pos, block_type::size);
00285 run.resize(cur_run_size);
00286 bm->new_blocks(AllocStr_(),
00287 trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(run.begin()),
00288 trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(run.end())
00289 );
00290
00291 result_.runs_sizes[0] = pos;
00292
00293 for ( ; pos != 2 * el_in_run; ++pos)
00294 Blocks1[pos / block_type::size][pos % block_type::size] = cmp.max_value();
00295
00296
00297 assert(cur_run_size > m2);
00298
00299 for (i = 0; i < m2; ++i)
00300 {
00301 run[i].value = Blocks1[i][0];
00302 write_reqs[i]->wait();
00303 write_reqs[i] = Blocks1[i].write(run[i].bid);
00304 }
00305
00306 request_ptr * write_reqs1 = new request_ptr[cur_run_size - m2];
00307
00308 for ( ; i < cur_run_size; ++i)
00309 {
00310 run[i].value = Blocks1[i][0];
00311 write_reqs1[i - m2] = Blocks1[i].write(run[i].bid);
00312 }
00313
00314 result_.runs[0] = run;
00315
00316 wait_all(write_reqs, write_reqs + m2);
00317 delete[] write_reqs;
00318 wait_all(write_reqs1, write_reqs1 + cur_run_size - m2);
00319 delete[] write_reqs1;
00320
00321 delete[] Blocks1;
00322
00323 return;
00324 }
00325
00326 sort_run(Blocks2, pos);
00327
00328 cur_run_size = div_and_round_up(pos, block_type::size);
00329 run.resize(cur_run_size);
00330 bm->new_blocks(AllocStr_(),
00331 trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(run.begin()),
00332 trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(run.end())
00333 );
00334
00335 for (i = 0; i < cur_run_size; ++i)
00336 {
00337 run[i].value = Blocks2[i][0];
00338 write_reqs[i]->wait();
00339 write_reqs[i] = Blocks2[i].write(run[i].bid);
00340 }
00341 assert((pos % el_in_run) == 0);
00342
00343 result_.runs.push_back(run);
00344 result_.runs_sizes.push_back(pos);
00345
00346 while (!input.empty())
00347 {
00348 pos = 0;
00349 while (!input.empty() && pos != el_in_run)
00350 {
00351 Blocks1[pos / block_type::size][pos % block_type::size] = *input;
00352 ++input;
00353 ++pos;
00354 }
00355 result_.elements += pos;
00356 sort_run(Blocks1, pos);
00357 cur_run_size = div_and_round_up(pos, block_type::size);
00358 run.resize(cur_run_size);
00359 bm->new_blocks(AllocStr_(),
00360 trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(run.begin()),
00361 trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(run.end())
00362 );
00363
00364 result_.runs_sizes.push_back(pos);
00365
00366
00367 for ( ; pos != el_in_run; ++pos)
00368 Blocks1[pos / block_type::size][pos % block_type::size] = cmp.max_value();
00369
00370
00371 for (i = 0; i < cur_run_size; ++i)
00372 {
00373 run[i].value = Blocks1[i][0];
00374 write_reqs[i]->wait();
00375 write_reqs[i] = Blocks1[i].write(run[i].bid);
00376 }
00377 result_.runs.push_back(run);
00378
00379 std::swap(Blocks1, Blocks2);
00380 }
00381
00382 wait_all(write_reqs, write_reqs + m2);
00383 delete[] write_reqs;
00384 delete[] ((Blocks1 < Blocks2) ? Blocks1 : Blocks2);
00385 }
00386
00387
00395 template <class ValueType_>
00396 struct use_push
00397 {
00398 typedef ValueType_ value_type;
00399 };
00400
00412 template <
00413 class ValueType_,
00414 class Cmp_,
00415 unsigned BlockSize_,
00416 class AllocStr_>
00417 class runs_creator<
00418 use_push<ValueType_>,
00419 Cmp_,
00420 BlockSize_,
00421 AllocStr_>
00422 {
00423 Cmp_ cmp;
00424
00425 public:
00426 typedef Cmp_ cmp_type;
00427 typedef ValueType_ value_type;
00428 typedef BID<BlockSize_> bid_type;
00429 typedef typed_block<BlockSize_, value_type> block_type;
00430 typedef sort_local::trigger_entry<bid_type, value_type> trigger_entry_type;
00431 typedef sorted_runs<value_type, trigger_entry_type> sorted_runs_type;
00432
00433 private:
00434 typedef typename sorted_runs_type::run_type run_type;
00435 sorted_runs_type result_;
00436 unsigned_type m_;
00437
00438 bool output_requested;
00439
00440 const unsigned_type m2;
00441 const unsigned_type el_in_run;
00442 unsigned_type cur_el;
00443 block_type * Blocks1;
00444 block_type * Blocks2;
00445 request_ptr * write_reqs;
00446 run_type run;
00447
00448 void sort_run(block_type * run, unsigned_type elements)
00449 {
00450 if (block_type::has_filler)
00451 std::sort(
00452 #if 1
00453 ArrayOfSequencesIterator<
00454 block_type, typename block_type::value_type, block_type::size
00455 >(run, 0),
00456 ArrayOfSequencesIterator<
00457 block_type, typename block_type::value_type, block_type::size
00458 >(run, elements),
00459 #else
00460 TwoToOneDimArrayRowAdaptor<
00461 block_type, value_type, block_type::size
00462 >(run, 0),
00463 TwoToOneDimArrayRowAdaptor<
00464 block_type, value_type, block_type::size
00465 >(run, elements),
00466 #endif
00467 cmp);
00468
00469 else
00470 std::sort(run[0].elem, run[0].elem + elements, cmp);
00471 }
00472 void finish_result()
00473 {
00474 if (cur_el == 0)
00475 return;
00476
00477
00478 unsigned_type cur_el_reg = cur_el;
00479 sort_run(Blocks1, cur_el_reg);
00480 result_.elements += cur_el_reg;
00481 if (cur_el_reg < unsigned_type(block_type::size) &&
00482 unsigned_type(result_.elements) == cur_el_reg)
00483 {
00484 STXXL_VERBOSE1("runs_creator(use_push): Small input optimization, input length: " << cur_el_reg);
00485 result_.small_.resize(cur_el_reg);
00486 std::copy(Blocks1[0].begin(), Blocks1[0].begin() + cur_el_reg, result_.small_.begin());
00487 return;
00488 }
00489
00490 const unsigned_type cur_run_size = div_and_round_up(cur_el_reg, block_type::size);
00491 run.resize(cur_run_size);
00492 block_manager * bm = block_manager::get_instance();
00493 bm->new_blocks(AllocStr_(),
00494 trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(run.begin()),
00495 trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(run.end())
00496 );
00497
00498 disk_queues::get_instance()->set_priority_op(disk_queue::WRITE);
00499
00500 result_.runs_sizes.push_back(cur_el_reg);
00501
00502
00503 for ( ; cur_el_reg != el_in_run; ++cur_el_reg)
00504 Blocks1[cur_el_reg / block_type::size][cur_el_reg % block_type::size] = cmp.max_value();
00505
00506
00507 unsigned_type i = 0;
00508 for ( ; i < cur_run_size; ++i)
00509 {
00510 run[i].value = Blocks1[i][0];
00511 if (write_reqs[i].get())
00512 write_reqs[i]->wait();
00513
00514 write_reqs[i] = Blocks1[i].write(run[i].bid);
00515 }
00516 result_.runs.push_back(run);
00517
00518 for (i = 0; i < m2; ++i)
00519 if (write_reqs[i].get())
00520 write_reqs[i]->wait();
00521 }
00522 void cleanup()
00523 {
00524 delete[] write_reqs;
00525 delete[] ((Blocks1 < Blocks2) ? Blocks1 : Blocks2);
00526 write_reqs = NULL;
00527 Blocks1 = Blocks2 = NULL;
00528 }
00529
00530 public:
00534 runs_creator(Cmp_ c, unsigned_type memory_to_use) :
00535 cmp(c), m_(memory_to_use / BlockSize_ / sort_memory_usage_factor()), output_requested(false),
00536 m2(m_ / 2),
00537 el_in_run(m2 * block_type::size),
00538 cur_el(0),
00539 Blocks1(new block_type[m2 * 2]),
00540 Blocks2(Blocks1 + m2),
00541 write_reqs(new request_ptr[m2])
00542 {
00543 assert(2 * BlockSize_ * sort_memory_usage_factor() <= memory_to_use);
00544 }
00545
00546 ~runs_creator()
00547 {
00548 if (!output_requested)
00549 cleanup();
00550 }
00551
00554 void push(const value_type & val)
00555 {
00556 assert(output_requested == false);
00557 unsigned_type cur_el_reg = cur_el;
00558 if (cur_el_reg < el_in_run)
00559 {
00560 Blocks1[cur_el_reg / block_type::size][cur_el_reg % block_type::size] = val;
00561 ++cur_el;
00562 return;
00563 }
00564
00565 assert(el_in_run == cur_el);
00566 cur_el = 0;
00567
00568
00569 sort_run(Blocks1, el_in_run);
00570 result_.elements += el_in_run;
00571
00572 const unsigned_type cur_run_size = div_and_round_up(el_in_run, block_type::size);
00573 run.resize(cur_run_size);
00574 block_manager * bm = block_manager::get_instance();
00575 bm->new_blocks(AllocStr_(),
00576 trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(run.begin()),
00577 trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(run.end())
00578 );
00579
00580 disk_queues::get_instance()->set_priority_op(disk_queue::WRITE);
00581
00582 result_.runs_sizes.push_back(el_in_run);
00583
00584 for (unsigned_type i = 0; i < cur_run_size; ++i)
00585 {
00586 run[i].value = Blocks1[i][0];
00587 if (write_reqs[i].get())
00588 write_reqs[i]->wait();
00589
00590 write_reqs[i] = Blocks1[i].write(run[i].bid);
00591 }
00592
00593 result_.runs.push_back(run);
00594
00595 std::swap(Blocks1, Blocks2);
00596
00597 push(val);
00598 }
00599
00603 const sorted_runs_type & result()
00604 {
00605 if (!output_requested)
00606 {
00607 finish_result();
00608 output_requested = true;
00609 cleanup();
00610 #ifdef STXXL_PRINT_STAT_AFTER_RF
00611 STXXL_MSG(*stats::get_instance());
00612 #endif
00613 }
00614 return result_;
00615 }
00616 };
00617
00618
00625 template <class ValueType_>
00626 struct from_sorted_sequences
00627 {
00628 typedef ValueType_ value_type;
00629 };
00630
00642 template <
00643 class ValueType_,
00644 class Cmp_,
00645 unsigned BlockSize_,
00646 class AllocStr_>
00647 class runs_creator<
00648 from_sorted_sequences<ValueType_>,
00649 Cmp_,
00650 BlockSize_,
00651 AllocStr_>
00652 {
00653 typedef ValueType_ value_type;
00654 typedef BID<BlockSize_> bid_type;
00655 typedef typed_block<BlockSize_, value_type> block_type;
00656 typedef sort_local::trigger_entry<bid_type, value_type> trigger_entry_type;
00657 typedef AllocStr_ alloc_strategy_type;
00658 Cmp_ cmp;
00659
00660 public:
00661 typedef Cmp_ cmp_type;
00662 typedef sorted_runs<value_type, trigger_entry_type> sorted_runs_type;
00663
00664 private:
00665 typedef typename sorted_runs_type::run_type run_type;
00666 sorted_runs_type result_;
00667 unsigned_type m_;
00668 buffered_writer<block_type> writer;
00669 block_type * cur_block;
00670 unsigned_type offset;
00671 unsigned_type iblock;
00672 unsigned_type irun;
00673 alloc_strategy_type alloc_strategy;
00674
00675 public:
00680 runs_creator(Cmp_ c, unsigned_type memory_to_use) :
00681 cmp(c),
00682 m_(memory_to_use / BlockSize_ / sort_memory_usage_factor()),
00683 writer(m_, m_ / 2),
00684 cur_block(writer.get_free_block()),
00685 offset(0),
00686 iblock(0),
00687 irun(0)
00688 {
00689 assert(2 * BlockSize_ * sort_memory_usage_factor() <= memory_to_use);
00690 }
00691
00694 void push(const value_type & val)
00695 {
00696 assert(offset < block_type::size);
00697
00698 (*cur_block)[offset] = val;
00699 ++offset;
00700
00701 if (offset == block_type::size)
00702 {
00703
00704
00705 block_manager * bm = block_manager::get_instance();
00706
00707 result_.runs.resize(irun + 1);
00708 result_.runs[irun].resize(iblock + 1);
00709 bm->new_blocks(
00710 offset_allocator<alloc_strategy_type>(iblock, alloc_strategy),
00711 trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(
00712 result_.runs[irun].begin() + iblock),
00713 trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(
00714 result_.runs[irun].end())
00715 );
00716
00717 result_.runs[irun][iblock].value = (*cur_block)[0];
00718 cur_block = writer.write(cur_block, result_.runs[irun][iblock].bid);
00719 ++iblock;
00720
00721 offset = 0;
00722 }
00723
00724 ++result_.elements;
00725 }
00726
00728 void finish()
00729 {
00730 if (offset == 0 && iblock == 0)
00731 return;
00732
00733
00734 result_.runs_sizes.resize(irun + 1);
00735 result_.runs_sizes.back() = iblock * block_type::size + offset;
00736
00737 if (offset)
00738 {
00739 while (offset != block_type::size)
00740 {
00741 (*cur_block)[offset] = cmp.max_value();
00742 ++offset;
00743 }
00744 offset = 0;
00745
00746 block_manager * bm = block_manager::get_instance();
00747
00748 result_.runs.resize(irun + 1);
00749 result_.runs[irun].resize(iblock + 1);
00750 bm->new_blocks(
00751 offset_allocator<alloc_strategy_type>(iblock, alloc_strategy),
00752 trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(
00753 result_.runs[irun].begin() + iblock),
00754 trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(
00755 result_.runs[irun].end())
00756 );
00757
00758 result_.runs[irun][iblock].value = (*cur_block)[0];
00759 cur_block = writer.write(cur_block, result_.runs[irun][iblock].bid);
00760 }
00761 else
00762 { }
00763
00764 iblock = 0;
00765 ++irun;
00766 }
00767
00771 const sorted_runs_type & result()
00772 {
00773 finish();
00774 writer.flush();
00775
00776 return result_;
00777 }
00778 };
00779
00780
00785 template <class RunsType_, class Cmp_>
00786 bool check_sorted_runs(RunsType_ & sruns, Cmp_ cmp)
00787 {
00788 typedef typename RunsType_::block_type block_type;
00789 typedef typename block_type::value_type value_type;
00790 STXXL_VERBOSE2("Elements: " << sruns.elements);
00791 unsigned_type nruns = sruns.runs.size();
00792 STXXL_VERBOSE2("Runs: " << nruns);
00793 unsigned_type irun = 0;
00794 for (irun = 0; irun < nruns; ++irun)
00795 {
00796 const unsigned_type nblocks = sruns.runs[irun].size();
00797 block_type * blocks = new block_type[nblocks];
00798 request_ptr * reqs = new request_ptr[nblocks];
00799 for (unsigned_type j = 0; j < nblocks; ++j)
00800 {
00801 reqs[j] = blocks[j].read(sruns.runs[irun][j].bid);
00802 }
00803 wait_all(reqs, reqs + nblocks);
00804 for (unsigned_type j = 0; j < nblocks; ++j)
00805 {
00806 if (cmp(blocks[j][0], sruns.runs[irun][j].value) ||
00807 cmp(sruns.runs[irun][j].value, blocks[j][0]))
00808 {
00809 STXXL_ERRMSG("check_sorted_runs wrong trigger in the run");
00810 return false;
00811 }
00812 }
00813 if (!stxxl::is_sorted(
00814 #if 1
00815 ArrayOfSequencesIterator<
00816 block_type, typename block_type::value_type, block_type::size
00817 >(blocks, 0),
00818 ArrayOfSequencesIterator<
00819 block_type, typename block_type::value_type, block_type::size
00820 >(blocks, sruns.runs_sizes[irun]),
00821 #else
00822 TwoToOneDimArrayRowAdaptor<
00823 block_type, value_type, block_type::size
00824 >(blocks, 0),
00825 TwoToOneDimArrayRowAdaptor<
00826 block_type, value_type, block_type::size
00827 >(blocks,
00828
00829
00830 sruns.runs_sizes[irun]
00831 ),
00832 #endif
00833 cmp))
00834 {
00835 STXXL_ERRMSG("check_sorted_runs wrong order in the run");
00836 return false;
00837 }
00838
00839 delete[] reqs;
00840 delete[] blocks;
00841 }
00842
00843 STXXL_MSG("Checking runs finished successfully");
00844
00845 return true;
00846 }
00847
00848
00856 template <class RunsType_,
00857 class Cmp_,
00858 class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY>
00859 class runs_merger : private noncopyable
00860 {
00861 typedef RunsType_ sorted_runs_type;
00862 typedef AllocStr_ alloc_strategy;
00863 typedef typename sorted_runs_type::size_type size_type;
00864 typedef Cmp_ value_cmp;
00865 typedef typename sorted_runs_type::run_type run_type;
00866 typedef typename sorted_runs_type::block_type block_type;
00867 typedef typename block_type::bid_type bid_type;
00868 typedef block_prefetcher<block_type, typename run_type::iterator> prefetcher_type;
00869 typedef run_cursor2<block_type, prefetcher_type> run_cursor_type;
00870 typedef sort_local::run_cursor2_cmp<block_type, prefetcher_type, value_cmp> run_cursor2_cmp_type;
00871 typedef loser_tree<run_cursor_type, run_cursor2_cmp_type, block_type::size> loser_tree_type;
00872
00873 typedef stxxl::int64 diff_type;
00874 typedef std::pair<typename block_type::iterator, typename block_type::iterator> sequence;
00875 typedef typename std::vector<sequence>::size_type seqs_size_type;
00876 std::vector<sequence> * seqs;
00877 std::vector<block_type *> * buffers;
00878
00879
00880 sorted_runs_type sruns;
00881 unsigned_type m_;
00882 value_cmp cmp;
00883 size_type elements_remaining;
00884 unsigned_type buffer_pos;
00885 block_type * current_block;
00886 run_type consume_seq;
00887 prefetcher_type * prefetcher;
00888 loser_tree_type * losers;
00889 int_type * prefetch_seq;
00890 unsigned_type nruns;
00891 #ifdef STXXL_CHECK_ORDER_IN_SORTS
00892 typename block_type::value_type last_element;
00893 #endif
00894
00895 void merge_recursively();
00896
00897 void deallocate_prefetcher()
00898 {
00899 if (prefetcher)
00900 {
00901 delete losers;
00902 delete seqs;
00903 delete buffers;
00904 delete prefetcher;
00905 delete[] prefetch_seq;
00906 prefetcher = NULL;
00907 }
00908
00909 sruns.deallocate_blocks();
00910 }
00911
00912 void initialize_current_block()
00913 {
00914 if (do_parallel_merge())
00915 {
00916 #if STXXL_PARALLEL_MULTIWAY_MERGE
00917
00918 seqs = new std::vector<sequence>(nruns);
00919 buffers = new std::vector<block_type *>(nruns);
00920
00921 for (unsigned_type i = 0; i < nruns; i++)
00922 {
00923 (*buffers)[i] = prefetcher->pull_block();
00924 (*seqs)[i] = std::make_pair((*buffers)[i]->begin(), (*buffers)[i]->end());
00925 }
00926
00927
00928 #else
00929 assert(false);
00930 #endif
00931 }
00932 else
00933 {
00934
00935
00936 losers = new loser_tree_type(prefetcher, nruns, run_cursor2_cmp_type(cmp));
00937
00938
00939 }
00940 }
00941
00942 void fill_current_block()
00943 {
00944 if (do_parallel_merge())
00945 {
00946 #if STXXL_PARALLEL_MULTIWAY_MERGE
00947
00948 diff_type rest = block_type::size;
00949
00950 do
00951 {
00952 value_type * min_last_element = NULL;
00953 diff_type total_size = 0;
00954
00955 for (seqs_size_type i = 0; i < (*seqs).size(); i++)
00956 {
00957 if ((*seqs)[i].first == (*seqs)[i].second)
00958 continue;
00959
00960 if (min_last_element == NULL)
00961 min_last_element = &(*((*seqs)[i].second - 1));
00962 else
00963 min_last_element = cmp(*min_last_element, *((*seqs)[i].second - 1)) ? min_last_element : &(*((*seqs)[i].second - 1));
00964
00965 total_size += (*seqs)[i].second - (*seqs)[i].first;
00966 STXXL_VERBOSE1("last " << *((*seqs)[i].second - 1) << " block size " << ((*seqs)[i].second - (*seqs)[i].first));
00967 }
00968
00969 assert(min_last_element != NULL);
00970
00971 STXXL_VERBOSE1("min_last_element " << min_last_element << " total size " << total_size + (block_type::size - rest));
00972
00973 diff_type less_equal_than_min_last = 0;
00974
00975 for (seqs_size_type i = 0; i < (*seqs).size(); i++)
00976 {
00977 if ((*seqs)[i].first == (*seqs)[i].second)
00978 continue;
00979
00980 typename block_type::iterator position = std::upper_bound((*seqs)[i].first, (*seqs)[i].second, *min_last_element, cmp);
00981 STXXL_VERBOSE1("greater equal than " << position - (*seqs)[i].first);
00982 less_equal_than_min_last += position - (*seqs)[i].first;
00983 }
00984
00985 STXXL_VERBOSE1("finished loop");
00986
00987 ptrdiff_t output_size = (std::min)(less_equal_than_min_last, rest);
00988
00989 STXXL_VERBOSE1("before merge" << output_size);
00990
00991 stxxl::parallel::multiway_merge((*seqs).begin(), (*seqs).end(), current_block->end() - rest, cmp, output_size);
00992
00993
00994 STXXL_VERBOSE1("after merge");
00995
00996 rest -= output_size;
00997
00998 STXXL_VERBOSE1("so long");
00999
01000 for (seqs_size_type i = 0; i < (*seqs).size(); i++)
01001 {
01002 if ((*seqs)[i].first == (*seqs)[i].second)
01003 {
01004 if (prefetcher->block_consumed((*buffers)[i]))
01005 {
01006 (*seqs)[i].first = (*buffers)[i]->begin();
01007 (*seqs)[i].second = (*buffers)[i]->end();
01008 STXXL_VERBOSE1("block ran empty " << i);
01009 }
01010 else
01011 {
01012 (*seqs).erase((*seqs).begin() + i);
01013 (*buffers).erase((*buffers).begin() + i);
01014 STXXL_VERBOSE1("seq removed " << i);
01015 }
01016 }
01017 }
01018 } while (rest > 0 && (*seqs).size() > 0);
01019
01020 #ifdef STXXL_CHECK_ORDER_IN_SORTS
01021 if (!stxxl::is_sorted(current_block->begin(), current_block->end(), cmp))
01022 {
01023 for (value_type * i = current_block->begin() + 1; i != current_block->end(); i++)
01024 if (cmp(*i, *(i - 1)))
01025 {
01026 STXXL_VERBOSE1("Error at position " << (i - current_block->begin()));
01027 }
01028 assert(false);
01029 }
01030 #endif
01031
01032
01033 #else
01034 assert(false);
01035 #endif
01036 }
01037 else
01038 {
01039
01040
01041 losers->multi_merge(current_block->elem);
01042
01043
01044 }
01045 }
01046
01047 public:
01049 typedef typename sorted_runs_type::value_type value_type;
01050
01055 runs_merger(const sorted_runs_type & r, value_cmp c, unsigned_type memory_to_use) :
01056 sruns(r), m_(memory_to_use / block_type::raw_size / sort_memory_usage_factor() ), cmp(c),
01057 elements_remaining(r.elements),
01058 current_block(NULL),
01059 prefetcher(NULL)
01060 #ifdef STXXL_CHECK_ORDER_IN_SORTS
01061 , last_element(cmp.min_value())
01062 #endif
01063 {
01064 if (empty())
01065 return;
01066
01067
01068 if (!sruns.small_.empty())
01069
01070 {
01071 STXXL_VERBOSE1("runs_merger: small input optimization, input length: " << elements_remaining);
01072 assert(elements_remaining == size_type(sruns.small_.size()));
01073 current_block = new block_type;
01074 std::copy(sruns.small_.begin(), sruns.small_.end(), current_block->begin());
01075 current_value = current_block->elem[0];
01076 buffer_pos = 1;
01077
01078 return;
01079 }
01080
01081 #ifdef STXXL_CHECK_ORDER_IN_SORTS
01082 assert(check_sorted_runs(r, cmp));
01083 #endif
01084
01085 current_block = new block_type;
01086
01087 disk_queues::get_instance()->set_priority_op(disk_queue::WRITE);
01088
01089 nruns = sruns.runs.size();
01090
01091 if (m_ < nruns)
01092 {
01093
01094
01095 STXXL_ERRMSG("The implementation of sort requires more than one merge pass, therefore for a better");
01096 STXXL_ERRMSG("efficiency decrease block size of run storage (a parameter of the run_creator)");
01097 STXXL_ERRMSG("or increase the amount memory dedicated to the merger.");
01098 STXXL_ERRMSG("m = " << m_ << " nruns=" << nruns);
01099
01100
01101 if (m_ < 2) {
01102 STXXL_ERRMSG("The merger requires memory to store at least two blocks internally. Aborting.");
01103 abort();
01104 }
01105
01106 merge_recursively();
01107
01108 nruns = sruns.runs.size();
01109 }
01110
01111 assert(nruns <= m_);
01112
01113 unsigned_type i;
01114
01115
01116
01117
01118 unsigned_type prefetch_seq_size = 0;
01119 for (i = 0; i < nruns; i++)
01120 {
01121 prefetch_seq_size += sruns.runs[i].size();
01122 }
01123
01124 consume_seq.resize(prefetch_seq_size);
01125
01126 prefetch_seq = new int_type[prefetch_seq_size];
01127
01128 typename run_type::iterator copy_start = consume_seq.begin();
01129 for (i = 0; i < nruns; i++)
01130 {
01131 copy_start = std::copy(
01132 sruns.runs[i].begin(),
01133 sruns.runs[i].end(),
01134 copy_start);
01135 }
01136
01137 std::stable_sort(consume_seq.begin(), consume_seq.end(),
01138 sort_local::trigger_entry_cmp<bid_type, value_type, value_cmp>(cmp));
01139
01140 int_type disks_number = config::get_instance()->disks_number();
01141
01142 const int_type n_prefetch_buffers = STXXL_MAX(2 * disks_number, (int_type(m_) - int_type(nruns)));
01143
01144
01145 #ifdef SORT_OPTIMAL_PREFETCHING
01146
01147 const int_type n_opt_prefetch_buffers = 2 * disks_number + (3 * (n_prefetch_buffers - 2 * disks_number)) / 10;
01148
01149 compute_prefetch_schedule(
01150 consume_seq,
01151 prefetch_seq,
01152 n_opt_prefetch_buffers,
01153 disks_number);
01154 #else
01155 for (i = 0; i < prefetch_seq_size; ++i)
01156 prefetch_seq[i] = i;
01157
01158 #endif
01159
01160
01161 prefetcher = new prefetcher_type(
01162 consume_seq.begin(),
01163 consume_seq.end(),
01164 prefetch_seq,
01165 nruns + n_prefetch_buffers);
01166
01167 losers = NULL;
01168 seqs = NULL;
01169 buffers = NULL;
01170
01171 initialize_current_block();
01172 fill_current_block();
01173
01174
01175 current_value = current_block->elem[0];
01176 buffer_pos = 1;
01177
01178 if (elements_remaining <= block_type::size)
01179 deallocate_prefetcher();
01180 }
01181
01183 bool empty() const
01184 {
01185 return elements_remaining == 0;
01186 }
01188 runs_merger & operator ++ ()
01189 {
01190 assert(!empty());
01191
01192 --elements_remaining;
01193
01194 if (buffer_pos != block_type::size)
01195 {
01196 current_value = current_block->elem[buffer_pos];
01197 ++buffer_pos;
01198 }
01199 else
01200 {
01201 if (!empty())
01202 {
01203 fill_current_block();
01204
01205 #ifdef STXXL_CHECK_ORDER_IN_SORTS
01206 assert(stxxl::is_sorted(current_block->elem, current_block->elem + current_block->size, cmp));
01207 assert(!cmp(current_block->elem[0], current_value));
01208 #endif
01209 current_value = current_block->elem[0];
01210 buffer_pos = 1;
01211 }
01212 if (elements_remaining <= block_type::size)
01213 deallocate_prefetcher();
01214 }
01215
01216
01217 #ifdef STXXL_CHECK_ORDER_IN_SORTS
01218 if (!empty())
01219 {
01220 assert(!cmp(current_value, last_element));
01221 last_element = current_value;
01222 }
01223 #endif
01224
01225 return *this;
01226 }
01228 const value_type & operator * () const
01229 {
01230 assert(!empty());
01231 return current_value;
01232 }
01233
01235 const value_type * operator -> () const
01236 {
01237 assert(!empty());
01238 return ¤t_value;
01239 }
01240
01241
01244 virtual ~runs_merger()
01245 {
01246 deallocate_prefetcher();
01247
01248 if (current_block)
01249 delete current_block;
01250
01251
01252 sruns.deallocate_blocks();
01253 }
01254
01255 private:
01256
01257 value_type current_value;
01258 };
01259
01260
01261 template <class RunsType_, class Cmp_, class AllocStr_>
01262 void runs_merger<RunsType_, Cmp_, AllocStr_>::merge_recursively()
01263 {
01264 block_manager * bm = block_manager::get_instance();
01265 unsigned_type ndisks = config::get_instance()->disks_number();
01266 unsigned_type nwrite_buffers = 2 * ndisks;
01267
01268 unsigned_type nruns = sruns.runs.size();
01269 const unsigned_type merge_factor =
01270 static_cast<unsigned_type>(ceil(pow(nruns, 1. / ceil(log(double(nruns)) / log(double(m_))))));
01271 assert(merge_factor <= m_);
01272 while (nruns > m_)
01273 {
01274 unsigned_type new_nruns = div_and_round_up(nruns, merge_factor);
01275 STXXL_VERBOSE("Starting new merge phase: nruns: " << nruns <<
01276 " opt_merge_factor: " << merge_factor << " m:" << m_ << " new_nruns: " << new_nruns);
01277
01278 sorted_runs_type new_runs;
01279 new_runs.runs.resize(new_nruns);
01280 new_runs.runs_sizes.resize(new_nruns);
01281 new_runs.elements = sruns.elements;
01282
01283 unsigned_type runs_left = nruns;
01284 unsigned_type cur_out_run = 0;
01285 unsigned_type elements_in_new_run = 0;
01286
01287
01288
01289 while (runs_left > 0)
01290 {
01291 int_type runs2merge = STXXL_MIN(runs_left, merge_factor);
01292
01293 elements_in_new_run = 0;
01294 for (unsigned_type i = nruns - runs_left; i < (nruns - runs_left + runs2merge); ++i)
01295 {
01296 elements_in_new_run += sruns.runs_sizes[i];
01297
01298 }
01299 const unsigned_type blocks_in_new_run1 = div_and_round_up(elements_in_new_run, block_type::size);
01300
01301
01302 new_runs.runs_sizes[cur_out_run] = elements_in_new_run;
01303
01304 new_runs.runs[cur_out_run++].resize(blocks_in_new_run1);
01305 runs_left -= runs2merge;
01306 }
01307
01308
01309 for (unsigned_type i = 0; i < new_runs.runs.size(); ++i)
01310 bm->new_blocks(alloc_strategy(),
01311 trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(new_runs.runs[i].begin()),
01312 trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(new_runs.runs[i].end()));
01313
01314
01315
01316 runs_left = nruns;
01317 cur_out_run = 0;
01318 size_type elements_left = sruns.elements;
01319
01320 while (runs_left > 0)
01321 {
01322 unsigned_type runs2merge = STXXL_MIN(runs_left, merge_factor);
01323 STXXL_VERBOSE("Merging " << runs2merge << " runs");
01324
01325 sorted_runs_type cur_runs;
01326 cur_runs.runs.resize(runs2merge);
01327 cur_runs.runs_sizes.resize(runs2merge);
01328
01329 std::copy(sruns.runs.begin() + nruns - runs_left,
01330 sruns.runs.begin() + nruns - runs_left + runs2merge,
01331 cur_runs.runs.begin());
01332 std::copy(sruns.runs_sizes.begin() + nruns - runs_left,
01333 sruns.runs_sizes.begin() + nruns - runs_left + runs2merge,
01334 cur_runs.runs_sizes.begin());
01335
01336 runs_left -= runs2merge;
01337
01338
01339
01340
01341
01342 cur_runs.elements = new_runs.runs_sizes[cur_out_run];
01343 elements_left -= cur_runs.elements;
01344
01345 if (runs2merge > 1)
01346 {
01347 runs_merger<RunsType_, Cmp_, AllocStr_> merger(cur_runs, cmp, m_ * block_type::raw_size * sort_memory_usage_factor());
01348
01349 {
01350 buf_ostream<block_type, typename run_type::iterator> out(
01351 new_runs.runs[cur_out_run].begin(),
01352 nwrite_buffers);
01353
01354 size_type cnt = 0;
01355 const size_type cnt_max = cur_runs.elements;
01356
01357 while (cnt != cnt_max)
01358 {
01359 *out = *merger;
01360 if ((cnt % block_type::size) == 0)
01361 new_runs.runs[cur_out_run][cnt / size_type(block_type::size)].value = *merger;
01362
01363
01364 ++cnt;
01365 ++out;
01366 ++merger;
01367 }
01368 assert(merger.empty());
01369
01370 while (cnt % block_type::size)
01371 {
01372 *out = cmp.max_value();
01373 ++out;
01374 ++cnt;
01375 }
01376 }
01377 }
01378 else
01379 {
01380 bm->delete_blocks(
01381 trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(
01382 new_runs.runs.back().begin()),
01383 trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(
01384 new_runs.runs.back().end()));
01385
01386 assert(cur_runs.runs.size() == 1);
01387
01388 std::copy(cur_runs.runs.front().begin(),
01389 cur_runs.runs.front().end(),
01390 new_runs.runs.back().begin());
01391 new_runs.runs_sizes.back() = cur_runs.runs_sizes.back();
01392 }
01393
01394 ++cur_out_run;
01395 }
01396 assert(elements_left == 0);
01397
01398 nruns = new_nruns;
01399 sruns = new_runs;
01400 }
01401 }
01402
01403
01412 template <class Input_,
01413 class Cmp_,
01414 unsigned BlockSize_ = STXXL_DEFAULT_BLOCK_SIZE(typename Input_::value_type),
01415 class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY>
01416 class sort : public noncopyable
01417 {
01418 typedef runs_creator<Input_, Cmp_, BlockSize_, AllocStr_> runs_creator_type;
01419 typedef typename runs_creator_type::sorted_runs_type sorted_runs_type;
01420 typedef runs_merger<sorted_runs_type, Cmp_, AllocStr_> runs_merger_type;
01421
01422 runs_creator_type creator;
01423 runs_merger_type merger;
01424
01425 public:
01427 typedef typename Input_::value_type value_type;
01428
01433 sort(Input_ & in, Cmp_ c, unsigned_type memory_to_use) :
01434 creator(in, c, memory_to_use),
01435 merger(creator.result(), c, memory_to_use)
01436 { }
01437
01443 sort(Input_ & in, Cmp_ c, unsigned_type memory_to_use_rc, unsigned_type memory_to_use_m) :
01444 creator(in, c, memory_to_use_rc),
01445 merger(creator.result(), c, memory_to_use_m)
01446 { }
01447
01448
01450 const value_type & operator * () const
01451 {
01452 assert(!empty());
01453 return *merger;
01454 }
01455
01456 const value_type * operator -> () const
01457 {
01458 assert(!empty());
01459 return merger.operator -> ();
01460 }
01461
01463 sort & operator ++ ()
01464 {
01465 ++merger;
01466 return *this;
01467 }
01468
01470 bool empty() const
01471 {
01472 return merger.empty();
01473 }
01474 };
01475
01481 template <
01482 class ValueType_,
01483 unsigned BlockSize_>
01484 class compute_sorted_runs_type
01485 {
01486 typedef ValueType_ value_type;
01487 typedef BID<BlockSize_> bid_type;
01488 typedef sort_local::trigger_entry<bid_type, value_type> trigger_entry_type;
01489
01490 public:
01491 typedef sorted_runs<value_type, trigger_entry_type> result;
01492 };
01493
01495 }
01496
01499
01501
01510 template <unsigned BlockSize,
01511 class RandomAccessIterator,
01512 class CmpType,
01513 class AllocStr>
01514 void sort(RandomAccessIterator begin,
01515 RandomAccessIterator end,
01516 CmpType cmp,
01517 unsigned_type MemSize,
01518 AllocStr AS)
01519 {
01520 UNUSED(AS);
01521 #ifdef BOOST_MSVC
01522 typedef typename streamify_traits<RandomAccessIterator>::stream_type InputType;
01523 #else
01524 typedef __typeof__(stream::streamify(begin, end)) InputType;
01525 #endif
01526 InputType Input(begin, end);
01527 typedef stream::sort<InputType, CmpType, BlockSize, AllocStr> sorter_type;
01528 sorter_type Sort(Input, cmp, MemSize);
01529 stream::materialize(Sort, begin);
01530 }
01531
01533
01534 __STXXL_END_NAMESPACE
01535
01536 #endif // !STXXL_SORT_STREAM_HEADER
01537