15 #ifndef STXXL_SORT_STREAM_HEADER
16 #define STXXL_SORT_STREAM_HEADER
18 #ifdef STXXL_BOOST_CONFIG
19 #include <boost/config.hpp>
22 #include <stxxl/bits/stream/stream.h>
23 #include <stxxl/bits/mng/mng.h>
24 #include <stxxl/bits/algo/sort_base.h>
25 #include <stxxl/bits/algo/sort_helper.h>
26 #include <stxxl/bits/algo/adaptor.h>
27 #include <stxxl/bits/algo/run_cursor.h>
28 #include <stxxl/bits/algo/losertree.h>
29 #include <stxxl/bits/stream/sorted_runs.h>
32 __STXXL_BEGIN_NAMESPACE
53 unsigned BlockSize_ = STXXL_DEFAULT_BLOCK_SIZE(
typename Input_::value_type),
54 class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY>
62 typedef Cmp_ cmp_type;
63 typedef typename Input_::value_type value_type;
65 typedef sort_helper::trigger_entry<block_type> trigger_entry_type;
69 typedef typename sorted_runs_type::run_type run_type;
75 unsigned_type fetch(
block_type * blocks, unsigned_type first_idx, unsigned_type last_idx)
77 typename element_iterator_traits<block_type>::element_iterator output =
78 make_element_iterator(blocks, first_idx);
79 unsigned_type curr_idx = first_idx;
80 while (!input.empty() && curr_idx != last_idx) {
89 void fill_with_max_value(
block_type * blocks, unsigned_type num_blocks, unsigned_type first_idx)
92 if (first_idx < last_idx) {
93 typename element_iterator_traits<block_type>::element_iterator curr =
94 make_element_iterator(blocks, first_idx);
95 while (first_idx != last_idx) {
96 *curr = cmp.max_value();
104 void sort_run(
block_type * run, unsigned_type elements)
107 sort(make_element_iterator(run, 0),
108 make_element_iterator(run, elements),
112 void compute_result();
120 input(i), cmp(c), m_(memory_to_use / BlockSize_ / sort_memory_usage_factor()), result_computed(false)
122 sort_helper::verify_sentinel_strict_weak_ordering(cmp);
123 if (!(2 * BlockSize_ * sort_memory_usage_factor() <= memory_to_use)) {
124 throw bad_parameter(
"stxxl::runs_creator<>:runs_creator(): INSUFFICIENT MEMORY provided, please increase parameter 'memory_to_use'");
134 if (!result_computed)
137 result_computed =
true;
138 #ifdef STXXL_PRINT_STAT_AFTER_RF
139 STXXL_MSG(*stats::get_instance());
140 #endif //STXXL_PRINT_STAT_AFTER_RF
149 template <
class Input_,
class Cmp_,
unsigned BlockSize_,
class AllocStr_>
150 void basic_runs_creator<Input_, Cmp_, BlockSize_, AllocStr_>::compute_result()
153 unsigned_type m2 = m_ / 2;
154 const unsigned_type el_in_run = m2 * block_type::size;
155 STXXL_VERBOSE1(
"basic_runs_creator::compute_result m2=" << m2);
156 unsigned_type blocks1_length = 0, blocks2_length = 0;
157 block_type * Blocks1 = NULL;
159 #ifndef STXXL_SMALL_INPUT_PSORT_OPT
160 Blocks1 =
new block_type[m2 * 2];
162 while (!input.empty() && blocks1_length != block_type::size)
164 result_.small_.push_back(*input);
169 if (blocks1_length == block_type::size && !input.empty())
171 Blocks1 =
new block_type[m2 * 2];
172 std::copy(result_.small_.begin(), result_.small_.end(), Blocks1[0].begin());
173 result_.small_.clear();
177 STXXL_VERBOSE1(
"basic_runs_creator: Small input optimization, input length: " << blocks1_length);
178 result_.elements = blocks1_length;
180 sort(result_.small_.begin(), result_.small_.end(), cmp);
183 #endif //STXXL_SMALL_INPUT_PSORT_OPT
186 blocks1_length = fetch(Blocks1, blocks1_length, el_in_run);
189 sort_run(Blocks1, blocks1_length);
190 if (blocks1_length <= block_type::size && input.empty())
193 STXXL_VERBOSE1(
"basic_runs_creator: Small input optimization, input length: " << blocks1_length);
194 assert(result_.small_.empty());
195 result_.small_.insert(result_.small_.end(), Blocks1[0].begin(), Blocks1[0].begin() + blocks1_length);
196 result_.elements = blocks1_length;
201 block_type * Blocks2 = Blocks1 + m2;
206 unsigned_type cur_run_size = div_ceil(blocks1_length, block_type::size);
207 run.resize(cur_run_size);
208 bm->
new_blocks(AllocStr_(), make_bid_iterator(run.begin()), make_bid_iterator(run.end()));
210 disk_queues::get_instance()->set_priority_op(request_queue::WRITE);
213 fill_with_max_value(Blocks1, cur_run_size, blocks1_length);
215 for (i = 0; i < cur_run_size; ++i)
217 run[i].value = Blocks1[i][0];
218 write_reqs[i] = Blocks1[i].write(run[i].bid);
220 result_.runs.push_back(run);
221 result_.runs_sizes.push_back(blocks1_length);
222 result_.elements += blocks1_length;
227 wait_all(write_reqs, write_reqs + cur_run_size);
233 STXXL_VERBOSE1(
"Filling the second part of the allocated blocks");
234 blocks2_length = fetch(Blocks2, 0, el_in_run);
240 blocks2_length += el_in_run;
241 sort_run(Blocks1, blocks2_length);
242 wait_all(write_reqs, write_reqs + cur_run_size);
243 bm->
delete_blocks(make_bid_iterator(run.begin()), make_bid_iterator(run.end()));
245 cur_run_size = div_ceil(blocks2_length, block_type::size);
246 run.resize(cur_run_size);
247 bm->
new_blocks(AllocStr_(), make_bid_iterator(run.begin()), make_bid_iterator(run.end()));
250 fill_with_max_value(Blocks1, cur_run_size, blocks2_length);
252 assert(cur_run_size > m2);
254 for (i = 0; i < m2; ++i)
256 run[i].value = Blocks1[i][0];
257 write_reqs[i]->
wait();
258 write_reqs[i] = Blocks1[i].write(run[i].bid);
263 for ( ; i < cur_run_size; ++i)
265 run[i].value = Blocks1[i][0];
266 write_reqs1[i - m2] = Blocks1[i].write(run[i].bid);
269 result_.runs[0] = run;
270 result_.runs_sizes[0] = blocks2_length;
271 result_.elements = blocks2_length;
273 wait_all(write_reqs, write_reqs + m2);
275 wait_all(write_reqs1, write_reqs1 + cur_run_size - m2);
276 delete[] write_reqs1;
285 sort_run(Blocks2, blocks2_length);
287 cur_run_size = div_ceil(blocks2_length, block_type::size);
288 run.resize(cur_run_size);
289 bm->
new_blocks(AllocStr_(), make_bid_iterator(run.begin()), make_bid_iterator(run.end()));
291 for (i = 0; i < cur_run_size; ++i)
293 run[i].value = Blocks2[i][0];
294 write_reqs[i]->
wait();
295 write_reqs[i] = Blocks2[i].write(run[i].bid);
297 assert((blocks2_length % el_in_run) == 0);
299 result_.runs.push_back(run);
300 result_.runs_sizes.push_back(blocks2_length);
301 result_.elements += blocks2_length;
303 while (!input.empty())
305 blocks1_length = fetch(Blocks1, 0, el_in_run);
306 sort_run(Blocks1, blocks1_length);
307 cur_run_size = div_ceil(blocks1_length, block_type::size);
308 run.resize(cur_run_size);
309 bm->
new_blocks(AllocStr_(), make_bid_iterator(run.begin()), make_bid_iterator(run.end()));
312 fill_with_max_value(Blocks1, cur_run_size, blocks1_length);
314 for (i = 0; i < cur_run_size; ++i)
316 run[i].value = Blocks1[i][0];
317 write_reqs[i]->
wait();
318 write_reqs[i] = Blocks1[i].write(run[i].bid);
320 result_.runs.push_back(run);
321 result_.runs_sizes.push_back(blocks1_length);
322 result_.elements += blocks1_length;
324 std::swap(Blocks1, Blocks2);
325 std::swap(blocks1_length, blocks2_length);
328 wait_all(write_reqs, write_reqs + m2);
330 delete[] ((Blocks1 < Blocks2) ? Blocks1 : Blocks2);
342 unsigned BlockSize_ = STXXL_DEFAULT_BLOCK_SIZE(
typename Input_::value_type),
343 class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY>
372 template <
class ValueType_>
375 typedef ValueType_ value_type;
398 :
private noncopyable
403 typedef Cmp_ cmp_type;
404 typedef ValueType_ value_type;
406 typedef sort_helper::trigger_entry<block_type> trigger_entry_type;
411 typedef typename sorted_runs_type::run_type run_type;
415 bool result_computed;
417 const unsigned_type m2;
418 const unsigned_type el_in_run;
419 unsigned_type cur_el;
425 void fill_with_max_value(
block_type * blocks, unsigned_type num_blocks, unsigned_type first_idx)
428 if (first_idx < last_idx) {
429 typename element_iterator_traits<block_type>::element_iterator curr =
430 make_element_iterator(blocks, first_idx);
431 while (first_idx != last_idx) {
432 *curr = cmp.max_value();
439 void sort_run(
block_type * run, unsigned_type elements)
442 sort(make_element_iterator(run, 0),
443 make_element_iterator(run, elements),
447 void compute_result()
452 unsigned_type cur_el_reg = cur_el;
453 sort_run(Blocks1, cur_el_reg);
454 result_.elements += cur_el_reg;
455 if (cur_el_reg <= block_type::size && result_.elements == cur_el_reg)
458 STXXL_VERBOSE1(
"runs_creator(use_push): Small input optimization, input length: " << cur_el_reg);
459 result_.small_.resize(cur_el_reg);
460 std::copy(Blocks1[0].begin(), Blocks1[0].begin() + cur_el_reg, result_.small_.begin());
464 const unsigned_type cur_run_size = div_ceil(cur_el_reg, block_type::size);
465 run.resize(cur_run_size);
467 bm->
new_blocks(AllocStr_(), make_bid_iterator(run.
begin()), make_bid_iterator(run.
end()));
469 disk_queues::get_instance()->set_priority_op(request_queue::WRITE);
471 result_.runs_sizes.push_back(cur_el_reg);
474 fill_with_max_value(Blocks1, cur_run_size, cur_el_reg);
477 for ( ; i < cur_run_size; ++i)
479 run[i].value = Blocks1[i][0];
480 if (write_reqs[i].
get())
481 write_reqs[i]->
wait();
483 write_reqs[i] = Blocks1[i].write(run[i].bid);
485 result_.runs.push_back(run);
487 for (i = 0; i < m2; ++i)
488 if (write_reqs[i].
get())
489 write_reqs[i]->
wait();
495 delete[] ((Blocks1 < Blocks2) ? Blocks1 : Blocks2);
497 Blocks1 = Blocks2 = NULL;
505 cmp(c), m_(memory_to_use / BlockSize_ / sort_memory_usage_factor()), result_computed(false),
510 Blocks2(Blocks1 + m2),
513 sort_helper::verify_sentinel_strict_weak_ordering(cmp);
514 if (!(2 * BlockSize_ * sort_memory_usage_factor() <= memory_to_use)) {
515 throw bad_parameter(
"stxxl::runs_creator<>:runs_creator(): INSUFFICIENT MEMORY provided, please increase parameter 'memory_to_use'");
522 if (!result_computed)
528 void push(
const value_type & val)
530 assert(result_computed ==
false);
531 unsigned_type cur_el_reg = cur_el;
532 if (cur_el_reg < el_in_run)
539 assert(el_in_run == cur_el);
543 sort_run(Blocks1, el_in_run);
544 result_.elements += el_in_run;
546 const unsigned_type cur_run_size = div_ceil(el_in_run, block_type::size);
547 run.resize(cur_run_size);
549 bm->
new_blocks(AllocStr_(), make_bid_iterator(run.
begin()), make_bid_iterator(run.
end()));
551 disk_queues::get_instance()->set_priority_op(request_queue::WRITE);
553 result_.runs_sizes.push_back(el_in_run);
555 for (unsigned_type i = 0; i < cur_run_size; ++i)
557 run[i].value = Blocks1[i][0];
558 if (write_reqs[i].
get())
559 write_reqs[i]->
wait();
561 write_reqs[i] = Blocks1[i].write(run[i].bid);
564 result_.runs.push_back(run);
566 std::swap(Blocks1, Blocks2);
578 if (!result_computed)
581 result_computed =
true;
583 #ifdef STXXL_PRINT_STAT_AFTER_RF
584 STXXL_MSG(*stats::get_instance());
585 #endif //STXXL_PRINT_STAT_AFTER_RF
599 template <
class ValueType_>
602 typedef ValueType_ value_type;
625 :
private noncopyable
627 typedef ValueType_ value_type;
629 typedef sort_helper::trigger_entry<block_type> trigger_entry_type;
630 typedef AllocStr_ alloc_strategy_type;
634 typedef Cmp_ cmp_type;
639 typedef typename sorted_runs_type::run_type run_type;
645 unsigned_type offset;
646 unsigned_type iblock;
648 alloc_strategy_type alloc_strategy;
657 m_(memory_to_use / BlockSize_ / sort_memory_usage_factor()),
659 cur_block(writer.get_free_block()),
664 sort_helper::verify_sentinel_strict_weak_ordering(cmp);
666 if (!(2 * BlockSize_ * sort_memory_usage_factor() <= memory_to_use)) {
667 throw bad_parameter(
"stxxl::runs_creator<>:runs_creator(): INSUFFICIENT MEMORY provided, please increase parameter 'memory_to_use'");
673 void push(
const value_type & val)
675 assert(offset < block_type::size);
677 (*cur_block)[offset] = val;
680 if (offset == block_type::size)
686 result_.runs.resize(irun + 1);
687 result_.runs[irun].resize(iblock + 1);
690 make_bid_iterator(result_.runs[irun].begin() + iblock),
691 make_bid_iterator(result_.runs[irun].end()),
695 result_.runs[irun][iblock].value = (*cur_block)[0];
696 cur_block = writer.write(cur_block, result_.runs[irun][iblock].bid);
708 if (offset == 0 && iblock == 0)
712 result_.runs_sizes.resize(irun + 1);
713 result_.runs_sizes.back() = iblock * block_type::size + offset;
717 while (offset != block_type::size)
719 (*cur_block)[offset] = cmp.max_value();
726 result_.runs.resize(irun + 1);
727 result_.runs[irun].resize(iblock + 1);
730 make_bid_iterator(result_.runs[irun].begin() + iblock),
731 make_bid_iterator(result_.runs[irun].end()),
735 result_.runs[irun][iblock].value = (*cur_block)[0];
736 cur_block = writer.write(cur_block, result_.runs[irun][iblock].bid);
741 alloc_strategy = alloc_strategy_type();
763 template <
class RunsType_,
class Cmp_>
766 sort_helper::verify_sentinel_strict_weak_ordering(cmp);
767 typedef typename RunsType_::block_type block_type;
768 typedef typename block_type::value_type value_type;
769 STXXL_VERBOSE2(
"Elements: " << sruns.elements);
770 unsigned_type nruns = sruns.runs.size();
771 STXXL_VERBOSE2(
"Runs: " << nruns);
772 unsigned_type irun = 0;
773 for (irun = 0; irun < nruns; ++irun)
775 const unsigned_type nblocks = sruns.runs[irun].size();
776 block_type * blocks =
new block_type[nblocks];
778 for (unsigned_type j = 0; j < nblocks; ++j)
780 reqs[j] = blocks[j].read(sruns.runs[irun][j].bid);
783 for (unsigned_type j = 0; j < nblocks; ++j)
785 if (cmp(blocks[j][0], sruns.runs[irun][j].value) ||
786 cmp(sruns.runs[irun][j].value, blocks[j][0]))
788 STXXL_ERRMSG(
"check_sorted_runs wrong trigger in the run");
792 if (!stxxl::is_sorted(make_element_iterator(blocks, 0),
793 make_element_iterator(blocks, sruns.runs_sizes[irun]),
796 STXXL_ERRMSG(
"check_sorted_runs wrong order in the run");
804 STXXL_MSG(
"Checking runs finished successfully");
820 template <
class RunsType_,
822 class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY>
826 typedef RunsType_ sorted_runs_type;
827 typedef AllocStr_ alloc_strategy;
828 typedef typename sorted_runs_type::size_type size_type;
829 typedef Cmp_ value_cmp;
830 typedef typename sorted_runs_type::run_type run_type;
831 typedef typename sorted_runs_type::block_type block_type;
832 typedef block_type out_block_type;
833 typedef typename run_type::value_type trigger_entry_type;
835 typedef run_cursor2<block_type, prefetcher_type> run_cursor_type;
836 typedef sort_helper::run_cursor2_cmp<block_type, prefetcher_type, value_cmp> run_cursor2_cmp_type;
837 typedef loser_tree<run_cursor_type, run_cursor2_cmp_type> loser_tree_type;
838 typedef stxxl::int64 diff_type;
839 typedef std::pair<typename block_type::iterator, typename block_type::iterator> sequence;
840 typedef typename std::vector<sequence>::size_type seqs_size_type;
847 sorted_runs_type sruns;
849 size_type elements_remaining;
851 out_block_type * current_block;
852 unsigned_type buffer_pos;
855 run_type consume_seq;
856 int_type * prefetch_seq;
858 loser_tree_type * losers;
859 #if STXXL_PARALLEL_MULTIWAY_MERGE
860 std::vector<sequence> * seqs;
861 std::vector<block_type *> * buffers;
862 diff_type num_currently_mergeable;
865 #if STXXL_CHECK_ORDER_IN_SORTS
867 #endif //STXXL_CHECK_ORDER_IN_SORTS
871 void merge_recursively(unsigned_type memory_to_use);
873 void deallocate_prefetcher()
878 #if STXXL_PARALLEL_MULTIWAY_MERGE
883 delete[] prefetch_seq;
887 sruns.deallocate_blocks();
890 void fill_current_block()
892 STXXL_VERBOSE1(
"fill_current_block");
893 if (do_parallel_merge())
895 #if STXXL_PARALLEL_MULTIWAY_MERGE
897 diff_type rest = out_block_type::size;
901 if (num_currently_mergeable < rest)
903 if (!prefetcher || prefetcher->empty())
906 num_currently_mergeable = elements_remaining;
910 num_currently_mergeable = sort_helper::count_elements_less_equal(
911 *seqs, consume_seq[prefetcher->pos()].value, cmp);
915 diff_type output_size = STXXL_MIN(num_currently_mergeable, rest);
917 STXXL_VERBOSE1(
"before merge " << output_size);
919 stxxl::parallel::multiway_merge((*seqs).begin(), (*seqs).end(), current_block->end() - rest, cmp, output_size);
923 num_currently_mergeable -= output_size;
925 STXXL_VERBOSE1(
"after merge");
927 sort_helper::refill_or_remove_empty_sequences(*seqs, *buffers, *prefetcher);
928 }
while (rest > 0 && (*seqs).size() > 0);
930 #if STXXL_CHECK_ORDER_IN_SORTS
931 if (!stxxl::is_sorted(current_block->begin(), current_block->end(), cmp))
933 for (value_type * i = current_block->begin() + 1; i != current_block->end(); ++i)
934 if (cmp(*i, *(i - 1)))
936 STXXL_VERBOSE1(
"Error at position " << (i - current_block->begin()));
940 #endif //STXXL_CHECK_ORDER_IN_SORTS
944 STXXL_THROW_UNREACHABLE();
945 #endif //STXXL_PARALLEL_MULTIWAY_MERGE
950 losers->multi_merge(current_block->elem, current_block->elem + STXXL_MIN<size_type>(out_block_type::size, elements_remaining));
953 STXXL_VERBOSE1(
"current block filled");
955 if (elements_remaining <= out_block_type::size)
956 deallocate_prefetcher();
967 elements_remaining(sruns.elements),
973 #if STXXL_PARALLEL_MULTIWAY_MERGE
976 num_currently_mergeable(0)
978 #if STXXL_CHECK_ORDER_IN_SORTS
979 , last_element(cmp.min_value())
982 initialize(r, memory_to_use);
986 void initialize(
const sorted_runs_type & r, unsigned_type memory_to_use)
988 sort_helper::verify_sentinel_strict_weak_ordering(cmp);
991 elements_remaining = r.elements;
996 if (!sruns.small_run().empty())
999 STXXL_VERBOSE1(
"basic_runs_merger: small input optimization, input length: " << elements_remaining);
1000 assert(elements_remaining == size_type(sruns.small_run().size()));
1001 assert(sruns.small_run().size() <= out_block_type::size);
1002 current_block =
new out_block_type;
1003 std::copy(sruns.small_run().begin(), sruns.small_run().end(), current_block->begin());
1004 current_value = current_block->elem[0];
1010 #if STXXL_CHECK_ORDER_IN_SORTS
1012 #endif //STXXL_CHECK_ORDER_IN_SORTS
1014 disk_queues::get_instance()->set_priority_op(request_queue::WRITE);
1016 int_type disks_number = config::get_instance()->disks_number();
1017 unsigned_type min_prefetch_buffers = 2 * disks_number;
1018 unsigned_type input_buffers = (memory_to_use >
sizeof(out_block_type) ? memory_to_use -
sizeof(out_block_type) : 0) / block_type::raw_size;
1019 unsigned_type nruns = sruns.runs.size();
1021 if (input_buffers < nruns + min_prefetch_buffers)
1025 STXXL_WARNMSG_RECURSIVE_SORT(
"The implementation of sort requires more than one merge pass, therefore for a better");
1026 STXXL_WARNMSG_RECURSIVE_SORT(
"efficiency decrease block size of run storage (a parameter of the run_creator)");
1027 STXXL_WARNMSG_RECURSIVE_SORT(
"or increase the amount memory dedicated to the merger.");
1028 STXXL_WARNMSG_RECURSIVE_SORT(
"m=" << input_buffers <<
" nruns=" << nruns <<
" prefetch_blocks=" << min_prefetch_buffers);
1031 unsigned_type recursive_merge_buffers = memory_to_use / block_type::raw_size;
1032 if (recursive_merge_buffers < 2 * min_prefetch_buffers + 1 + 2) {
1035 STXXL_ERRMSG(
"There are only m=" << recursive_merge_buffers <<
" blocks available for recursive merging, but "
1036 << min_prefetch_buffers <<
"+" << min_prefetch_buffers <<
"+1 are needed read-ahead/write-back/output, and");
1037 STXXL_ERRMSG(
"the merger requires memory to store at least two input blocks internally. Aborting.");
1038 throw bad_parameter(
"basic_runs_merger::sort(): INSUFFICIENT MEMORY provided, please increase parameter 'memory_to_use'");
1041 merge_recursively(memory_to_use);
1043 nruns = sruns.runs.size();
1046 assert(nruns + min_prefetch_buffers <= input_buffers);
1049 unsigned_type prefetch_seq_size = 0;
1050 for (i = 0; i < nruns; ++i)
1052 prefetch_seq_size += sruns.runs[i].size();
1055 consume_seq.resize(prefetch_seq_size);
1056 prefetch_seq =
new int_type[prefetch_seq_size];
1058 typename run_type::iterator copy_start = consume_seq.begin();
1059 for (i = 0; i < nruns; ++i)
1061 copy_start = std::copy(
1062 sruns.runs[i].begin(),
1063 sruns.runs[i].end(),
1067 std::stable_sort(consume_seq.begin(), consume_seq.end(),
1068 sort_helper::trigger_entry_cmp<trigger_entry_type, value_cmp>(cmp) _STXXL_SORT_TRIGGER_FORCE_SEQUENTIAL);
1070 const unsigned_type n_prefetch_buffers = STXXL_MAX(min_prefetch_buffers, input_buffers - nruns);
1072 #if STXXL_SORT_OPTIMAL_PREFETCHING
1074 const int_type n_opt_prefetch_buffers = min_prefetch_buffers + (3 * (n_prefetch_buffers - min_prefetch_buffers)) / 10;
1076 compute_prefetch_schedule(
1079 n_opt_prefetch_buffers,
1082 for (i = 0; i < prefetch_seq_size; ++i)
1083 prefetch_seq[i] = i;
1084 #endif //STXXL_SORT_OPTIMAL_PREFETCHING
1086 prefetcher =
new prefetcher_type(
1087 consume_seq.begin(),
1090 STXXL_MIN(nruns + n_prefetch_buffers, prefetch_seq_size));
1092 if (do_parallel_merge())
1094 #if STXXL_PARALLEL_MULTIWAY_MERGE
1096 seqs =
new std::vector<sequence>(nruns);
1097 buffers =
new std::vector<block_type *>(nruns);
1099 for (unsigned_type i = 0; i < nruns; ++i)
1102 (*seqs)[i] = std::make_pair((*buffers)[i]->begin(), (*buffers)[i]->end());
1106 STXXL_THROW_UNREACHABLE();
1107 #endif //STXXL_PARALLEL_MULTIWAY_MERGE
1112 losers =
new loser_tree_type(prefetcher, nruns, run_cursor2_cmp_type(cmp));
1116 current_block =
new out_block_type;
1117 fill_current_block();
1119 current_value = current_block->elem[0];
1127 return elements_remaining == 0;
1134 return current_value;
1148 --elements_remaining;
1150 if (buffer_pos != out_block_type::size)
1152 current_value = current_block->elem[buffer_pos];
1159 fill_current_block();
1161 #if STXXL_CHECK_ORDER_IN_SORTS
1162 assert(stxxl::is_sorted(current_block->elem, current_block->elem + STXXL_MIN<size_type>(elements_remaining, current_block->size), cmp));
1163 assert(!cmp(current_block->elem[0], current_value));
1164 #endif //STXXL_CHECK_ORDER_IN_SORTS
1165 current_value = current_block->elem[0];
1170 #if STXXL_CHECK_ORDER_IN_SORTS
1173 assert(!cmp(current_value, last_element));
1174 last_element = current_value;
1176 #endif //STXXL_CHECK_ORDER_IN_SORTS
1185 deallocate_prefetcher();
1186 delete current_block;
1191 template <
class RunsType_,
class Cmp_,
class AllocStr_>
1192 void basic_runs_merger<RunsType_, Cmp_, AllocStr_>::merge_recursively(unsigned_type memory_to_use)
1195 unsigned_type ndisks = config::get_instance()->disks_number();
1196 unsigned_type nwrite_buffers = 2 * ndisks;
1197 unsigned_type memory_for_write_buffers = nwrite_buffers *
sizeof(block_type);
1200 unsigned_type recursive_merger_memory_prefetch_buffers = 2 * ndisks *
sizeof(block_type);
1201 unsigned_type recursive_merger_memory_out_block =
sizeof(block_type);
1202 unsigned_type memory_for_buffers = memory_for_write_buffers
1203 + recursive_merger_memory_prefetch_buffers
1204 + recursive_merger_memory_out_block;
1206 unsigned_type max_arity = (memory_to_use > memory_for_buffers ? memory_to_use - memory_for_buffers : 0) / block_type::raw_size;
1208 unsigned_type nruns = sruns.runs.size();
1209 const unsigned_type merge_factor = optimal_merge_factor(nruns, max_arity);
1210 assert(merge_factor > 1);
1211 assert(merge_factor <= max_arity);
1212 while (nruns > max_arity)
1214 unsigned_type new_nruns = div_ceil(nruns, merge_factor);
1215 STXXL_VERBOSE(
"Starting new merge phase: nruns: " << nruns <<
1216 " opt_merge_factor: " << merge_factor <<
" max_arity: " << max_arity <<
" new_nruns: " << new_nruns);
1218 sorted_runs_type new_runs;
1219 new_runs.runs.resize(new_nruns);
1220 new_runs.runs_sizes.resize(new_nruns);
1221 new_runs.elements = sruns.elements;
1223 unsigned_type runs_left = nruns;
1224 unsigned_type cur_out_run = 0;
1225 unsigned_type elements_in_new_run = 0;
1227 while (runs_left > 0)
1229 int_type runs2merge = STXXL_MIN(runs_left, merge_factor);
1230 elements_in_new_run = 0;
1231 for (unsigned_type i = nruns - runs_left; i < (nruns - runs_left + runs2merge); ++i)
1233 elements_in_new_run += sruns.runs_sizes[i];
1235 const unsigned_type blocks_in_new_run1 = div_ceil(elements_in_new_run, block_type::size);
1237 new_runs.runs_sizes[cur_out_run] = elements_in_new_run;
1239 new_runs.runs[cur_out_run++].resize(blocks_in_new_run1);
1240 runs_left -= runs2merge;
1244 for (unsigned_type i = 0; i < new_runs.runs.size(); ++i)
1245 bm->
new_blocks(alloc_strategy(), make_bid_iterator(new_runs.runs[i].begin()), make_bid_iterator(new_runs.runs[i].end()));
1250 size_type elements_left = sruns.elements;
1252 while (runs_left > 0)
1254 unsigned_type runs2merge = STXXL_MIN(runs_left, merge_factor);
1255 STXXL_VERBOSE(
"Merging " << runs2merge <<
" runs");
1257 sorted_runs_type cur_runs;
1258 cur_runs.runs.resize(runs2merge);
1259 cur_runs.runs_sizes.resize(runs2merge);
1261 std::copy(sruns.runs.begin() + nruns - runs_left,
1262 sruns.runs.begin() + nruns - runs_left + runs2merge,
1263 cur_runs.runs.begin());
1264 std::copy(sruns.runs_sizes.begin() + nruns - runs_left,
1265 sruns.runs_sizes.begin() + nruns - runs_left + runs2merge,
1266 cur_runs.runs_sizes.begin());
1268 runs_left -= runs2merge;
1274 cur_runs.elements = new_runs.runs_sizes[cur_out_run];
1275 elements_left -= cur_runs.elements;
1279 basic_runs_merger<RunsType_, Cmp_, AllocStr_> merger(cur_runs, cmp, memory_to_use - memory_for_write_buffers);
1283 new_runs.runs[cur_out_run].begin(),
1287 const size_type cnt_max = cur_runs.elements;
1289 while (cnt != cnt_max)
1292 if ((cnt % block_type::size) == 0)
1293 new_runs.runs[cur_out_run][cnt / size_type(block_type::size)].value = *merger;
1299 assert(merger.empty());
1301 while (cnt % block_type::size)
1303 *out = cmp.max_value();
1312 make_bid_iterator(new_runs.runs.back().begin()),
1313 make_bid_iterator(new_runs.runs.back().end()));
1315 assert(cur_runs.runs.size() == 1);
1317 std::copy(cur_runs.runs.front().begin(),
1318 cur_runs.runs.front().end(),
1319 new_runs.runs.back().begin());
1320 new_runs.runs_sizes.back() = cur_runs.runs_sizes.front();
1325 assert(elements_left == 0);
1339 template <
class RunsType_,
1341 class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY>
1345 typedef RunsType_ sorted_runs_type;
1347 typedef typename base::value_cmp value_cmp;
1348 typedef typename base::block_type block_type;
1355 runs_merger(
const sorted_runs_type & r, value_cmp c, unsigned_type memory_to_use) :
1356 base(r, c, memory_to_use)
1372 template <
class Input_,
1374 unsigned BlockSize_ = STXXL_DEFAULT_BLOCK_SIZE(
typename Input_::value_type),
1375 class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY,
1376 class runs_creator_type = runs_creator<Input_, Cmp_, BlockSize_, AllocStr_> >
1379 typedef typename runs_creator_type::sorted_runs_type sorted_runs_type;
1382 runs_creator_type creator;
1393 sort(Input_ & in, Cmp_ c, unsigned_type memory_to_use) :
1394 creator(in, c, memory_to_use),
1395 merger(creator.result(), c, memory_to_use)
1397 sort_helper::verify_sentinel_strict_weak_ordering(c);
1405 sort(Input_ & in, Cmp_ c, unsigned_type memory_to_use_rc, unsigned_type memory_to_use_m) :
1406 creator(in, c, memory_to_use_rc),
1407 merger(creator.result(), c, memory_to_use_m)
1409 sort_helper::verify_sentinel_strict_weak_ordering(c);
1416 return merger.
empty();
1426 const value_type * operator -> ()
const
1429 return merger.operator -> ();
1446 unsigned BlockSize_>
1449 typedef ValueType_ value_type;
1451 typedef sort_helper::trigger_entry<bid_type, value_type> trigger_entry_type;
1473 template <
unsigned BlockSize,
1474 class RandomAccessIterator,
1477 void sort(RandomAccessIterator begin,
1478 RandomAccessIterator end,
1480 unsigned_type MemSize,
1485 typedef typename streamify_traits<RandomAccessIterator>::stream_type InputType;
1489 InputType Input(begin, end);
1491 sorter_type Sort(Input, cmp, MemSize);
1497 __STXXL_END_NAMESPACE
1499 #endif // !STXXL_SORT_STREAM_HEADER
runs_merger(const sorted_runs_type &r, value_cmp c, unsigned_type memory_to_use)
Creates a runs merger object.
Definition: sort_stream.h:1355
runs_creator(Cmp_ c, unsigned_type memory_to_use)
Creates the object.
Definition: sort_stream.h:655
Forms sorted runs of data from a stream.
Definition: sort_stream.h:55
Encapsulates asynchronous buffered block writing engine.
Definition: buf_writer.h:37
runs_creator(Cmp_ c, unsigned_type memory_to_use)
Creates the object.
Definition: sort_stream.h:504
void sort(ExtIterator_ first, ExtIterator_ last, StrictWeakOrdering_ cmp, unsigned_type M)
Sort records comparison-based.
Definition: sort.h:700
sort(Input_ &in, Cmp_ c, unsigned_type memory_to_use)
Creates the object.
Definition: sort_stream.h:1393
Block containing elements of fixed length.
Definition: typed_block.h:223
Produces sorted stream from input stream.
Definition: sort_stream.h:1377
number of elements in block
Definition: typed_block.h:240
bool empty() const
Standard stream method.
Definition: sort_stream.h:1125
sorted_runs_type::value_type value_type
Standard stream typedef.
Definition: sort_stream.h:844
iterator2stream< InputIterator_ > streamify(InputIterator_ begin, InputIterator_ end)
Input iterator range to stream converter.
Definition: stream.h:115
basic_runs_merger & operator++()
Standard stream method.
Definition: sort_stream.h:1144
Input strategy for runs_creator class.
Definition: sort_stream.h:600
virtual ~basic_runs_merger()
Destructor.
Definition: sort_stream.h:1183
Input_::value_type value_type
Standard stream typedef.
Definition: sort_stream.h:1387
block_type * pull_block()
Pulls next unconsumed block from the consumption sequence.
Definition: block_prefetcher.h:149
iterator begin()
Returns iterator pointing to the first element.
Definition: typed_block.h:81
Merges sorted runs.
Definition: sort_stream.h:1342
void delete_blocks(const BIDIteratorClass &bidbegin, const BIDIteratorClass &bidend)
Deallocates blocks.
Definition: mng.h:218
Merges sorted runs.
Definition: sort_stream.h:823
basic_runs_merger(const sorted_runs_type &r, value_cmp c, unsigned_type memory_to_use)
Creates a runs merger object.
Definition: sort_stream.h:964
const sorted_runs_type & result()
Returns the sorted runs object.
Definition: sort_stream.h:749
void push(const value_type &val)
Adds new element to the current run.
Definition: sort_stream.h:673
bool check_sorted_runs(const RunsType_ &sruns, Cmp_ cmp)
Checker for the sorted runs object created by the runs_creator .
Definition: sort_stream.h:764
void new_blocks(const DiskAssignFunctor &functor, BIDIteratorClass bidbegin, BIDIteratorClass bidend, unsigned_type offset=0)
Allocates new blocks.
Definition: mng.h:90
Implemented as reference counting smart pointer.
Definition: request_ptr.h:34
Encapsulates asynchronous prefetching engine.
Definition: block_prefetcher.h:54
Buffered output stream.
Definition: buf_ostream.h:30
Computes sorted runs type from value type and block size.
Definition: sort_stream.h:1447
iterator end()
Returns iterator pointing to the end element.
Definition: typed_block.h:99
const sorted_runs_type & result()
Returns the sorted runs object.
Definition: sort_stream.h:574
void wait_all(request_iterator_ reqs_begin, request_iterator_ reqs_end)
Collection of functions to track statuses of a number of requests.
Definition: request_operations.h:36
OutputIterator_ materialize(StreamAlgorithm_ &in, OutputIterator_ out)
Stores consecutively stream content to an output iterator.
Definition: stream.h:392
void finish()
Finishes current run and begins new one.
Definition: sort_stream.h:706
const sorted_runs_type & result()
Returns the sorted runs object.
Definition: sort_stream.h:132
basic_runs_creator(Input_ &i, Cmp_ c, unsigned_type memory_to_use)
Create the object.
Definition: sort_stream.h:119
virtual void wait(bool measure_time=true)=0
Suspends calling thread until completion of the request.
sort(Input_ &in, Cmp_ c, unsigned_type memory_to_use_rc, unsigned_type memory_to_use_m)
Creates the object.
Definition: sort_stream.h:1405
const value_type * operator->() const
Standard stream method.
Definition: sort_stream.h:1138
void push(const value_type &val)
Adds new element to the sorter.
Definition: sort_stream.h:528
const value_type & operator*() const
Standard stream method.
Definition: sort_stream.h:1131
Block manager class.
Definition: mng.h:59
const value_type & operator*() const
Standard stream method.
Definition: sort_stream.h:1420
bool empty() const
Standard stream method.
Definition: sort_stream.h:1414
runs_creator(Input_ &i, Cmp_ c, unsigned_type memory_to_use)
Creates the object.
Definition: sort_stream.h:360
Forms sorted runs of data from a stream.
Definition: sort_stream.h:344
Input strategy for runs_creator class.
Definition: sort_stream.h:373
sort & operator++()
Standard stream method.
Definition: sort_stream.h:1433
Block identifier class.
Definition: bid.h:40