16 #ifndef STXXL_ALGO_SORT_HEADER
17 #define STXXL_ALGO_SORT_HEADER
51 namespace sort_local {
53 template <
typename block_type,
typename b
id_type>
61 * req = block->read(bid);
69 typename input_bid_iterator,
73 input_bid_iterator it,
79 typedef typename block_type::bid_type bid_type;
80 STXXL_VERBOSE1(
"stxxl::create_runs nruns=" << nruns <<
" m=" << _m);
84 block_type* Blocks1 =
new block_type[m2];
85 block_type* Blocks2 =
new block_type[m2];
86 bid_type* bids1 =
new bid_type[m2];
87 bid_type* bids2 =
new bid_type[m2];
94 disk_queues::get_instance()->set_priority_op(request_queue::WRITE);
101 run_size = runs[0]->size();
102 assert(run_size == m2);
104 for (i = 0; i < run_size; ++i)
106 STXXL_VERBOSE1(
"stxxl::create_runs posting read " <<
long(Blocks1[i].elem));
108 read_reqs1[i] = Blocks1[i].read(bids1[i]);
111 run_size = runs[1]->size();
113 for (i = 0; i < run_size; ++i)
115 STXXL_VERBOSE1(
"stxxl::create_runs posting read " <<
long(Blocks2[i].elem));
117 read_reqs2[i] = Blocks2[i].read(bids2[i]);
120 for (
int_type k = 0; k < nruns - 1; ++k)
122 run_type* run = runs[k];
123 run_size = run->size();
124 assert(run_size == m2);
126 int_type next_run_size = runs[k + 1]->size();
128 assert((next_run_size == m2) || (next_run_size <= m2 && k == nruns - 2));
133 for (i = 0; i < run_size; ++i)
147 int_type runplus2size = (k < nruns - 2) ? runs[k + 2]->size() : 0;
148 for (i = 0; i < m2; ++i)
150 STXXL_VERBOSE1(
"stxxl::create_runs posting write " <<
long(Blocks1[i].elem));
151 (*run)[i].value = Blocks1[i][0];
152 if (i >= runplus2size) {
153 write_reqs[i] = Blocks1[i].write((*run)[i].bid);
157 next_run_reads[i].
block = Blocks1 + i;
158 next_run_reads[i].
req = read_reqs1 + i;
159 bids1[i] = next_run_reads[i].
bid = *(it++);
160 write_reqs[i] = Blocks1[i].write((*run)[i].bid, next_run_reads[i]);
163 std::swap(Blocks1, Blocks2);
164 std::swap(bids1, bids2);
165 std::swap(read_reqs1, read_reqs2);
168 run_type* run = runs[nruns - 1];
169 run_size = run->size();
173 for (i = 0; i < run_size; ++i)
186 for (i = 0; i < run_size; ++i)
188 STXXL_VERBOSE1(
"stxxl::create_runs posting write " <<
long(Blocks1[i].elem));
189 (*run)[i].value = Blocks1[i][0];
190 write_reqs[i] = Blocks1[i].write((*run)[i].bid);
204 delete[] next_run_reads;
208 template <
typename block_type,
typename run_type,
typename value_cmp>
214 typedef typename block_type::value_type value_type;
216 STXXL_MSG(
"check_sorted_runs Runs: " << nruns);
218 for (irun = 0; irun < nruns; ++irun)
222 block_type* blocks =
new block_type[m];
224 value_type last = cmp.min_value();
230 blocks_left -= nblocks;
234 reqs[j] = blocks[j].read((*runs[irun])[j + off].bid);
238 if (off && cmp(blocks[0][0], last))
240 STXXL_MSG(
"check_sorted_runs wrong first value in the run " << irun);
241 STXXL_MSG(
" first value: " << blocks[0][0]);
244 STXXL_MSG(
"Element " << k <<
" in the block is :" << blocks[0][k]);
253 if (!(blocks[j][0] == (*runs[irun])[j + off].value))
255 STXXL_MSG(
"check_sorted_runs wrong trigger in the run " << irun <<
" block " << (j + off));
256 STXXL_MSG(
" trigger value: " << (*runs[irun])[j + off].value);
259 STXXL_MSG(
"Element " << k <<
" in the block is :" << blocks[j][k]);
266 STXXL_MSG(
"BID " << (k + off) <<
" is: " << ((*runs[irun])[k + off].bid));
278 STXXL_MSG(
"check_sorted_runs wrong order in the run " << irun);
283 STXXL_MSG(
" Element " << k <<
" in block " << (j + off) <<
" is :" << blocks[j][k]);
288 STXXL_MSG(
"BID " << (k + off) <<
" is: " << ((*runs[irun])[k + off].bid));
296 last = blocks[nblocks - 1][block_type::size - 1];
299 assert(blocks_left == 0);
308 template <
typename block_type,
typename run_type,
typename value_cmp>
311 typedef typename run_type::value_type trigger_entry_type;
316 run_type consume_seq(out_run->size());
320 typename run_type::iterator copy_start = consume_seq.begin();
321 for (
int_type i = 0; i < nruns; i++)
324 copy_start = std::copy(
330 std::stable_sort(consume_seq.begin(), consume_seq.end(),
333 int_type disks_number = config::get_instance()->disks_number();
335 #ifdef PLAY_WITH_OPT_PREF
336 const int_type n_write_buffers = 4 * disks_number;
340 #if STXXL_SORT_OPTIMAL_PREFETCHING
342 const int_type n_opt_prefetch_buffers = 2 * disks_number + (3 * (n_prefetch_buffers - 2 * disks_number)) / 10;
346 #if STXXL_SORT_OPTIMAL_PREFETCHING
350 n_opt_prefetch_buffers,
358 prefetcher_type prefetcher(consume_seq.begin(),
361 nruns + n_prefetch_buffers);
365 int_type out_run_size = out_run->size();
367 block_type* out_buffer = writer.get_free_block();
374 #if STXXL_PARALLEL_MULTIWAY_MERGE
379 typedef std::pair<typename block_type::iterator, typename block_type::iterator>
sequence;
380 std::vector<sequence> seqs(nruns);
381 std::vector<block_type*> buffers(nruns);
383 for (
int_type i = 0; i < nruns; i++)
385 buffers[i] = prefetcher.pull_block();
386 seqs[i] = std::make_pair(buffers[i]->begin(), buffers[i]->end());
390 #if STXXL_CHECK_ORDER_IN_SORTS
391 value_type last_elem = cmp.min_value();
393 diff_type num_currently_mergeable = 0;
395 for (
int_type j = 0; j < out_run_size; ++j)
397 diff_type rest = block_type::size;
401 if (num_currently_mergeable < rest)
403 if (prefetcher.empty())
406 num_currently_mergeable = (out_run_size - j) * block_type::size
407 - (block_type::size - rest);
412 seqs, consume_seq[prefetcher.pos()].value, cmp);
416 diff_type output_size =
STXXL_MIN(num_currently_mergeable, rest);
420 stxxl::parallel::multiway_merge(seqs.begin(), seqs.end(), out_buffer->end() - rest, cmp, output_size);
424 num_currently_mergeable -= output_size;
429 }
while (rest > 0 && seqs.size() > 0);
431 #if STXXL_CHECK_ORDER_IN_SORTS
434 for (value_type* i = out_buffer->begin() + 1; i != out_buffer->end(); i++)
435 if (cmp(*i, *(i - 1)))
437 STXXL_VERBOSE1(
"Error at position " << (i - out_buffer->begin()));
443 assert(cmp((*out_buffer)[0], last_elem) ==
false);
445 last_elem = (*out_buffer)[block_type::size - 1];
448 (*out_run)[j].value = (*out_buffer)[0];
450 out_buffer = writer.write(out_buffer, (*out_run)[j].bid);
464 losers(&prefetcher, nruns, run_cursor2_cmp_type(cmp));
466 #if STXXL_CHECK_ORDER_IN_SORTS
467 value_type last_elem = cmp.min_value();
470 for (
int_type i = 0; i < out_run_size; ++i)
472 losers.
multi_merge(out_buffer->elem, out_buffer->elem + block_type::size);
473 (*out_run)[i].value = *(out_buffer->elem);
475 #if STXXL_CHECK_ORDER_IN_SORTS
482 assert(cmp(*(out_buffer->elem), last_elem) ==
false);
484 last_elem = (*out_buffer).elem[block_type::size - 1];
487 out_buffer = writer.write(out_buffer, (*out_run)[i].bid);
493 delete[] prefetch_seq;
496 for (
int_type i = 0; i < nruns; ++i)
508 template <
typename block_type,
509 typename alloc_strategy,
510 typename input_bid_iterator,
519 typedef typename block_type::bid_type bid_type;
534 double begin =
timestamp(), after_runs_creation, end;
536 run_type** runs =
new run_type*[nruns];
538 for (i = 0; i < full_runs; i++)
539 runs[i] =
new run_type(m2);
543 runs[i] =
new run_type(_n - full_runs * m2);
545 for (i = 0; i < nruns; ++i)
551 value_cmp>(input_bids, runs, nruns, _m, cmp);
555 double io_wait_after_rf = stats::get_instance()->get_io_wait_time();
557 disk_queues::get_instance()->set_priority_op(request_queue::WRITE);
565 STXXL_VERBOSE(
"Starting new merge phase: nruns: " << nruns <<
566 " opt_merge_factor: " << merge_factor <<
" m:" << _m <<
" new_nruns: " << new_nruns);
568 new_runs =
new run_type*[new_nruns];
574 while (runs_left > 0)
577 blocks_in_new_run = 0;
578 for (
unsigned_type i = nruns - runs_left; i < (nruns - runs_left + runs2merge); i++)
579 blocks_in_new_run += runs[i]->size();
582 new_runs[cur_out_run++] =
new run_type(blocks_in_new_run);
583 runs_left -= runs2merge;
586 if (cur_out_run == 1 && blocks_in_new_run ==
int_type(_n) && !input_bids->is_managed())
589 input_bid_iterator cur = input_bids;
590 for (
int_type i = 0; cur != (input_bids + _n); ++cur)
592 (*new_runs[0])[i++].bid = *cur;
595 bid_type& firstBID = (*new_runs[0])[0].bid;
596 if (firstBID.is_managed())
602 bid_type& lastBID = (*new_runs[0])[_n - 1].bid;
603 if (lastBID.is_managed())
612 mng->
new_blocks(interleaved_alloc_strategy(new_nruns, alloc_strategy()),
619 while (runs_left > 0)
622 #if STXXL_CHECK_ORDER_IN_SORTS
623 assert((check_sorted_runs<block_type, run_type, value_cmp>(runs + nruns - runs_left, runs2merge, m2, cmp)));
626 merge_runs<block_type, run_type>(runs + nruns - runs_left,
627 runs2merge, *(new_runs + (cur_out_run++)), _m, cmp
629 runs_left -= runs2merge;
637 run_type* result = *runs;
642 STXXL_VERBOSE(
"Elapsed time : " << end - begin <<
" s. Run creation time: " <<
643 after_runs_creation - begin <<
" s");
644 STXXL_VERBOSE(
"Time in I/O wait(rf): " << io_wait_after_rf <<
" s");
646 STXXL_UNUSED(begin + after_runs_creation + end + io_wait_after_rf);
672 template <
typename ExtIterator,
typename StrictWeakOrdering>
679 typedef typename ExtIterator::vector_type::value_type value_type;
680 typedef typename ExtIterator::block_type block_type;
695 throw bad_parameter(
"stxxl::sort(): INSUFFICIENT MEMORY provided, please increase parameter 'M'");
698 if (first.block_offset())
700 if (last.block_offset())
703 typename ExtIterator::block_type * first_block =
new typename ExtIterator::block_type;
704 typename ExtIterator::block_type * last_block =
new typename ExtIterator::block_type;
705 typename ExtIterator::bid_type first_bid, last_bid;
708 req = first_block->read(*first.bid());
714 req = last_block->read(*last.bid());
717 for ( ; i < first.block_offset(); ++i)
719 first_block->elem[i] = cmp.min_value();
725 req = first_block->write(first_bid);
726 for (i = last.block_offset(); i < block_type::size; ++i)
728 last_block->elem[i] = cmp.max_value();
734 req = last_block->write(last_bid);
736 n = last.bid() - first.bid() + 1;
738 std::swap(first_bid, *first.bid());
739 std::swap(last_bid, *last.bid());
749 typename ExtIterator::block_type,
750 typename ExtIterator::vector_type::alloc_strategy_type,
751 typename ExtIterator::bids_container_iterator>
755 first_block =
new typename ExtIterator::block_type;
756 last_block =
new typename ExtIterator::block_type;
757 typename ExtIterator::block_type * sorted_first_block =
new typename ExtIterator::block_type;
758 typename ExtIterator::block_type * sorted_last_block =
new typename ExtIterator::block_type;
761 reqs[0] = first_block->read(first_bid);
762 reqs[1] = sorted_first_block->read((*(out->begin())).bid);
767 reqs[0] = last_block->read(last_bid);
768 reqs[1] = sorted_last_block->read(((*out)[out->size() - 1]).bid);
770 for (i = first.block_offset(); i < block_type::size; i++)
772 first_block->elem[i] = sorted_first_block->elem[i];
778 req = first_block->write(first_bid);
780 for (i = 0; i < last.block_offset(); ++i)
782 last_block->elem[i] = sorted_last_block->elem[i];
787 req = last_block->write(last_bid);
792 *first.bid() = first_bid;
793 *last.bid() = last_bid;
795 typename run_type::iterator it = out->begin();
797 typename ExtIterator::bids_container_iterator cur_bid = first.bid();
800 for ( ; cur_bid != last.bid(); ++cur_bid, ++it)
802 *cur_bid = (*it).bid;
806 delete sorted_first_block;
807 delete sorted_last_block;
820 typename ExtIterator::block_type * first_block =
new typename ExtIterator::block_type;
821 typename ExtIterator::bid_type first_bid;
824 req = first_block->read(*first.bid());
830 for ( ; i < first.block_offset(); ++i)
832 first_block->elem[i] = cmp.min_value();
835 req = first_block->write(first_bid);
837 n = last.bid() - first.bid();
839 std::swap(first_bid, *first.bid());
848 typename ExtIterator::block_type,
849 typename ExtIterator::vector_type::alloc_strategy_type,
850 typename ExtIterator::bids_container_iterator>
854 first_block =
new typename ExtIterator::block_type;
856 typename ExtIterator::block_type * sorted_first_block =
new typename ExtIterator::block_type;
860 reqs[0] = first_block->read(first_bid);
861 reqs[1] = sorted_first_block->read((*(out->begin())).bid);
866 for (i = first.block_offset(); i < block_type::size; ++i)
868 first_block->elem[i] = sorted_first_block->elem[i];
871 req = first_block->write(first_bid);
875 *first.bid() = first_bid;
877 typename run_type::iterator it = out->begin();
879 typename ExtIterator::bids_container_iterator cur_bid = first.bid();
882 for ( ; cur_bid != last.bid(); ++cur_bid, ++it)
884 *cur_bid = (*it).bid;
887 *cur_bid = (*it).bid;
889 delete sorted_first_block;
900 if (last.block_offset())
903 typename ExtIterator::block_type * last_block =
new typename ExtIterator::block_type;
904 typename ExtIterator::bid_type last_bid;
908 req = last_block->read(*last.bid());
913 for (i = last.block_offset(); i < block_type::size; ++i)
915 last_block->elem[i] = cmp.max_value();
918 req = last_block->write(last_bid);
920 n = last.bid() - first.bid() + 1;
922 std::swap(last_bid, *last.bid());
931 typename ExtIterator::block_type,
932 typename ExtIterator::vector_type::alloc_strategy_type,
933 typename ExtIterator::bids_container_iterator>
937 last_block =
new typename ExtIterator::block_type;
938 typename ExtIterator::block_type * sorted_last_block =
new typename ExtIterator::block_type;
941 reqs[0] = last_block->read(last_bid);
942 reqs[1] = sorted_last_block->read(((*out)[out->size() - 1]).bid);
947 for (i = 0; i < last.block_offset(); ++i)
949 last_block->elem[i] = sorted_last_block->elem[i];
952 req = last_block->write(last_bid);
956 *last.bid() = last_bid;
958 typename run_type::iterator it = out->begin();
959 typename ExtIterator::bids_container_iterator cur_bid = first.bid();
961 for ( ; cur_bid != last.bid(); ++cur_bid, ++it)
963 *cur_bid = (*it).bid;
966 delete sorted_last_block;
977 n = last.bid() - first.bid();
981 typename ExtIterator::vector_type::alloc_strategy_type,
982 typename ExtIterator::bids_container_iterator>
985 typename run_type::iterator it = out->begin();
986 typename ExtIterator::bids_container_iterator cur_bid = first.bid();
988 for ( ; cur_bid != last.bid(); ++cur_bid, ++it)
990 *cur_bid = (*it).bid;
998 #if STXXL_CHECK_ORDER_IN_SORTS
999 typedef typename ExtIterator::const_iterator const_iterator;
1000 assert(
stxxl::is_sorted(const_iterator(first), const_iterator(last), cmp));
1008 #endif // !STXXL_ALGO_SORT_HEADER
const Tp & STXXL_MIN(const Tp &a, const Tp &b)
#define STXXL_THROW_UNREACHABLE()
Throws stxxl::unreachable with "Error in file [file], line [line] : this code should never be reachab...
Fully randomized disk allocation scheme functor.
unsigned sort_memory_usage_factor()
bool is_sorted(_ForwardIter __first, _ForwardIter __last)
simple_vector< sort_helper::trigger_entry< block_type > > * sort_blocks(input_bid_iterator input_bids, unsigned_type _n, unsigned_type _m, value_cmp cmp)
void stl_in_memory_sort(ExtIterator_ first, ExtIterator_ last, StrictWeakOrdering_ cmp)
void sort(ExtIterator first, ExtIterator last, StrictWeakOrdering cmp, unsigned_type M)
Sort records comparison-based, see stxxl::sort -- Sorting Comparison-Based.
trigger_entry_iterator< Iterator > make_bid_iterator(Iterator iter)
unsigned_type count_elements_less_equal(const SequenceVector &seqs, const ValueType &bound, Comparator cmp)
void verify_sentinel_strict_weak_ordering(StrictWeakOrdering cmp)
void refill_or_remove_empty_sequences(SequenceVector &seqs, BufferPtrVector &buffers, Prefetcher &prefetcher)
void delete_block(const BID< BLK_SIZE > &bid)
Deallocates a block.
void check_sort_settings()
element_iterator_traits< BlockType >::element_iterator make_element_iterator(BlockType *blocks, unsigned_type offset)
void wait_all(request_iterator_ reqs_begin, request_iterator_ reqs_end)
Collection of functions to track statuses of a number of requests.
virtual void wait(bool measure_time=true)=0
Suspends calling thread until completion of the request.
Encapsulates asynchronous prefetching engine.
External sequence or deque container without random access. Introduction to sequence container: se...
double timestamp()
Returns number of seconds since the epoch, high resolution.
const Tp & STXXL_MAX(const Tp &a, const Tp &b)
#define _STXXL_SORT_TRIGGER_FORCE_SEQUENTIAL
choose_int_types< my_pointer_size >::int_type int_type
#define STXXL_BEGIN_NAMESPACE
void STXXL_UNUSED(const U &)
#define STXXL_VERBOSE1(x)
void new_block(const DiskAssignFunctor &functor, BID< BLK_SIZE > &bid, unsigned_type offset=0)
Allocates a new block according to the strategy given by functor and stores the block identifier to b...
void new_blocks(const DiskAssignFunctor &functor, BIDIteratorClass bidbegin, BIDIteratorClass bidend, unsigned_type offset=0)
Allocates new blocks.
Provides a static class to store runtime tuning parameters.
void multi_merge(value_type *out_first, value_type *out_last)
Simpler non-growing vector without initialization.
void create_runs(input_bid_iterator it, run_type **runs, const unsigned_type nruns, const unsigned_type m2, key_extractor keyobj)
choose_int_types< my_pointer_size >::unsigned_type unsigned_type
bool check_sorted_runs(run_type **runs, unsigned_type nruns, unsigned_type m, value_cmp cmp)
compat::remove_const< Integral >::type div_ceil(Integral __n, Integral2 __d)
void compute_prefetch_schedule(const int_type *first, const int_type *last, int_type *out_first, int_type m, int_type D)
unsigned_type optimal_merge_factor(unsigned_type num_runs, unsigned_type max_concurrent_runs)
Encapsulates asynchronous buffered block writing engine.
Request with basic properties like file and offset.
#define STXXL_END_NAMESPACE
void merge_runs(run_type **in_runs, unsigned_type nruns, run_type *out_run, unsigned_type _m, key_extractor keyobj)