16 #ifndef STXXL_ALGO_SORT_HEADER
17 #define STXXL_ALGO_SORT_HEADER
49 namespace sort_local {
51 template <
typename BlockType,
typename B
idType>
60 * req = block->read(bid);
67 typename InputBidIterator,
77 typedef BlockType block_type;
78 typedef RunType run_type;
80 typedef typename block_type::bid_type bid_type;
81 STXXL_VERBOSE1(
"stxxl::create_runs nruns=" << nruns <<
" m=" << _m);
85 block_type* Blocks1 =
new block_type[m2];
86 block_type* Blocks2 =
new block_type[m2];
87 bid_type* bids1 =
new bid_type[m2];
88 bid_type* bids2 =
new bid_type[m2];
95 disk_queues::get_instance()->set_priority_op(request_queue::WRITE);
102 run_size = runs[0]->size();
103 assert(run_size == m2);
105 for (i = 0; i < run_size; ++i)
107 STXXL_VERBOSE1(
"stxxl::create_runs posting read " << Blocks1[i].elem);
109 read_reqs1[i] = Blocks1[i].read(bids1[i]);
112 run_size = runs[1]->size();
114 for (i = 0; i < run_size; ++i)
116 STXXL_VERBOSE1(
"stxxl::create_runs posting read " << Blocks2[i].elem);
118 read_reqs2[i] = Blocks2[i].read(bids2[i]);
121 for (
int_type k = 0; k < nruns - 1; ++k)
123 run_type* run = runs[k];
124 run_size = run->size();
125 assert(run_size == m2);
126 int_type next_run_size = runs[k + 1]->size();
127 STXXL_ASSERT((next_run_size == m2) || (next_run_size <= m2 && k == nruns - 2));
132 for (i = 0; i < run_size; ++i)
146 int_type runplus2size = (k < nruns - 2) ? runs[k + 2]->size() : 0;
147 for (i = 0; i < m2; ++i)
149 STXXL_VERBOSE1(
"stxxl::create_runs posting write " << Blocks1[i].elem);
150 (*run)[i].value = Blocks1[i][0];
151 if (i >= runplus2size) {
152 write_reqs[i] = Blocks1[i].write((*run)[i].bid);
156 next_run_reads[i].
block = Blocks1 + i;
157 next_run_reads[i].
req = read_reqs1 + i;
158 bids1[i] = next_run_reads[i].
bid = *(it++);
159 write_reqs[i] = Blocks1[i].write((*run)[i].bid, next_run_reads[i]);
162 std::swap(Blocks1, Blocks2);
163 std::swap(bids1, bids2);
164 std::swap(read_reqs1, read_reqs2);
167 run_type* run = runs[nruns - 1];
168 run_size = run->size();
172 for (i = 0; i < run_size; ++i)
185 for (i = 0; i < run_size; ++i)
187 STXXL_VERBOSE1(
"stxxl::create_runs posting write " << Blocks1[i].elem);
188 (*run)[i].value = Blocks1[i][0];
189 write_reqs[i] = Blocks1[i].write((*run)[i].bid);
203 delete[] next_run_reads;
206 template <
typename BlockType,
typename RunType,
typename ValueCmp>
212 typedef BlockType block_type;
213 typedef typename block_type::value_type value_type;
215 STXXL_MSG(
"check_sorted_runs Runs: " << nruns);
217 for (irun = 0; irun < nruns; ++irun)
221 block_type* blocks =
new block_type[m];
223 value_type last = cmp.min_value();
229 blocks_left -= nblocks;
233 reqs[j] = blocks[j].read((*runs[irun])[j + off].bid);
237 if (off && cmp(blocks[0][0], last))
239 STXXL_MSG(
"check_sorted_runs wrong first value in the run " << irun);
240 STXXL_MSG(
" first value: " << blocks[0][0]);
243 STXXL_MSG(
"Element " << k <<
" in the block is :" << blocks[0][k]);
252 if (!(blocks[j][0] == (*runs[irun])[j + off].value))
254 STXXL_MSG(
"check_sorted_runs wrong trigger in the run " << irun <<
" block " << (j + off));
255 STXXL_MSG(
" trigger value: " << (*runs[irun])[j + off].value);
258 STXXL_MSG(
"Element " << k <<
" in the block is :" << blocks[j][k]);
265 STXXL_MSG(
"BID " << (k + off) <<
" is: " << ((*runs[irun])[k + off].bid));
277 STXXL_MSG(
"check_sorted_runs wrong order in the run " << irun);
282 STXXL_MSG(
" Element " << k <<
" in block " << (j + off) <<
" is :" << blocks[j][k]);
287 STXXL_MSG(
"BID " << (k + off) <<
" is: " << ((*runs[irun])[k + off].bid));
295 last = blocks[nblocks - 1][block_type::size - 1];
298 assert(blocks_left == 0);
306 template <
typename BlockType,
typename RunType,
typename ValueCmp>
310 typedef BlockType block_type;
311 typedef RunType run_type;
312 typedef ValueCmp value_cmp;
313 typedef typename run_type::value_type trigger_entry_type;
318 run_type consume_seq(out_run->size());
322 typename run_type::iterator copy_start = consume_seq.begin();
323 for (
int_type i = 0; i < nruns; i++)
326 copy_start = std::copy(
332 std::stable_sort(consume_seq.begin(), consume_seq.end(),
335 int_type disks_number = config::get_instance()->disks_number();
337 #ifdef PLAY_WITH_OPT_PREF
338 const int_type n_write_buffers = 4 * disks_number;
342 #if STXXL_SORT_OPTIMAL_PREFETCHING
344 const int_type n_opt_prefetch_buffers = 2 * disks_number + (3 * (n_prefetch_buffers - 2 * disks_number)) / 10;
348 #if STXXL_SORT_OPTIMAL_PREFETCHING
352 n_opt_prefetch_buffers,
353 config::get_instance()->get_max_device_id());
360 prefetcher_type prefetcher(consume_seq.begin(),
363 nruns + n_prefetch_buffers);
367 int_type out_run_size = out_run->size();
369 block_type* out_buffer = writer.get_free_block();
376 #if STXXL_PARALLEL_MULTIWAY_MERGE
381 typedef std::pair<typename block_type::iterator, typename block_type::iterator>
sequence;
382 std::vector<sequence> seqs(nruns);
383 std::vector<block_type*> buffers(nruns);
385 for (
int_type i = 0; i < nruns; i++)
387 buffers[i] = prefetcher.pull_block();
388 seqs[i] = std::make_pair(buffers[i]->begin(), buffers[i]->end());
392 #if STXXL_CHECK_ORDER_IN_SORTS
393 value_type last_elem = cmp.min_value();
395 diff_type num_currently_mergeable = 0;
397 for (
int_type j = 0; j < out_run_size; ++j)
399 diff_type rest = block_type::size;
403 if (num_currently_mergeable < rest)
405 if (prefetcher.empty())
408 num_currently_mergeable = (out_run_size - j) * block_type::size
409 - (block_type::size - rest);
414 seqs, consume_seq[prefetcher.pos()].value, cmp);
418 diff_type output_size =
STXXL_MIN(num_currently_mergeable, rest);
423 seqs.begin(), seqs.end(),
424 out_buffer->end() - rest, output_size, cmp);
428 num_currently_mergeable -= output_size;
433 }
while (rest > 0 && seqs.size() > 0);
435 #if STXXL_CHECK_ORDER_IN_SORTS
438 for (value_type* i = out_buffer->begin() + 1; i != out_buffer->end(); i++)
439 if (cmp(*i, *(i - 1)))
441 STXXL_VERBOSE1(
"Error at position " << (i - out_buffer->begin()));
447 assert(cmp((*out_buffer)[0], last_elem) ==
false);
449 last_elem = (*out_buffer)[block_type::size - 1];
452 (*out_run)[j].value = (*out_buffer)[0];
454 out_buffer = writer.write(out_buffer, (*out_run)[j].bid);
468 losers(&prefetcher, nruns, run_cursor2_cmp_type(cmp));
470 #if STXXL_CHECK_ORDER_IN_SORTS
471 value_type last_elem = cmp.min_value();
474 for (
int_type i = 0; i < out_run_size; ++i)
476 losers.
multi_merge(out_buffer->elem, out_buffer->elem + block_type::size);
477 (*out_run)[i].value = *(out_buffer->elem);
479 #if STXXL_CHECK_ORDER_IN_SORTS
486 assert(cmp(*(out_buffer->elem), last_elem) ==
false);
488 last_elem = (*out_buffer).elem[block_type::size - 1];
491 out_buffer = writer.write(out_buffer, (*out_run)[i].bid);
497 delete[] prefetch_seq;
500 for (
int_type i = 0; i < nruns; ++i)
510 template <
typename BlockType,
511 typename AllocStrategy,
512 typename InputBidIterator,
520 typedef BlockType block_type;
521 typedef AllocStrategy alloc_strategy;
522 typedef InputBidIterator input_bid_iterator;
523 typedef ValueCmp value_cmp;
524 typedef typename block_type::bid_type bid_type;
539 double begin =
timestamp(), after_runs_creation, end;
541 run_type** runs =
new run_type*[nruns];
543 for (i = 0; i < full_runs; i++)
544 runs[i] =
new run_type(m2);
547 runs[i] =
new run_type(_n - full_runs * m2);
549 for (i = 0; i < nruns; ++i)
555 value_cmp>(input_bids, runs, nruns, _m, cmp);
559 double io_wait_after_rf = stats::get_instance()->get_io_wait_time();
561 disk_queues::get_instance()->set_priority_op(request_queue::WRITE);
569 STXXL_VERBOSE(
"Starting new merge phase: nruns: " << nruns <<
570 " opt_merge_factor: " << merge_factor <<
" m:" << _m <<
" new_nruns: " << new_nruns);
572 new_runs =
new run_type*[new_nruns];
578 while (runs_left > 0)
581 blocks_in_new_run = 0;
582 for (
unsigned_type i = nruns - runs_left; i < (nruns - runs_left + runs2merge); i++)
583 blocks_in_new_run += runs[i]->size();
586 new_runs[cur_out_run++] =
new run_type(blocks_in_new_run);
587 runs_left -= runs2merge;
590 if (cur_out_run == 1 && blocks_in_new_run ==
int_type(_n) && !input_bids->is_managed())
593 input_bid_iterator cur = input_bids;
594 for (
int_type i = 0; cur != (input_bids + _n); ++cur)
596 (*new_runs[0])[i++].bid = *cur;
599 bid_type& firstBID = (*new_runs[0])[0].bid;
600 if (firstBID.is_managed())
606 bid_type& lastBID = (*new_runs[0])[_n - 1].bid;
607 if (lastBID.is_managed())
616 mng->
new_blocks(interleaved_alloc_strategy(new_nruns, alloc_strategy()),
623 while (runs_left > 0)
626 #if STXXL_CHECK_ORDER_IN_SORTS
627 assert((check_sorted_runs<block_type, run_type, value_cmp>(runs + nruns - runs_left, runs2merge, m2, cmp)));
630 merge_runs<block_type, run_type>(runs + nruns - runs_left,
631 runs2merge, *(new_runs + (cur_out_run++)), _m, cmp
633 runs_left -= runs2merge;
641 run_type* result = *runs;
646 STXXL_VERBOSE(
"Elapsed time : " << end - begin <<
" s. Run creation time: " <<
647 after_runs_creation - begin <<
" s");
648 STXXL_VERBOSE(
"Time in I/O wait(rf): " << io_wait_after_rf <<
" s");
675 template <
typename ExtIterator,
typename StrictWeakOrdering>
680 typedef typename ExtIterator::vector_type::value_type value_type;
681 typedef typename ExtIterator::block_type block_type;
682 typedef typename ExtIterator::bid_type bid_type;
683 typedef typename ExtIterator::vector_type::alloc_strategy_type alloc_strategy_type;
684 typedef typename ExtIterator::bids_container_iterator bids_container_iterator;
701 throw bad_parameter(
"stxxl::sort(): INSUFFICIENT MEMORY provided, please increase parameter 'M'");
704 if (first.block_offset())
706 if (last.block_offset())
709 block_type* first_block =
new block_type;
710 block_type* last_block =
new block_type;
711 typename ExtIterator::bid_type first_bid, last_bid;
714 req = first_block->read(*first.bid());
719 req = last_block->read(*last.bid());
722 for ( ; i < first.block_offset(); ++i)
724 first_block->elem[i] = cmp.min_value();
729 req = first_block->write(first_bid);
730 for (i = last.block_offset(); i < block_type::size; ++i)
732 last_block->elem[i] = cmp.max_value();
737 req = last_block->write(last_bid);
739 n = last.bid() - first.bid() + 1;
741 std::swap(first_bid, *first.bid());
742 std::swap(last_bid, *last.bid());
751 block_type, alloc_strategy_type, bids_container_iterator
755 first_block =
new block_type;
756 last_block =
new block_type;
757 block_type* sorted_first_block =
new block_type;
758 block_type* sorted_last_block =
new block_type;
761 reqs[0] = first_block->read(first_bid);
762 reqs[1] = sorted_first_block->read((*(out->begin())).bid);
767 reqs[0] = last_block->read(last_bid);
768 reqs[1] = sorted_last_block->read(((*out)[out->size() - 1]).bid);
770 for (i = first.block_offset(); i < block_type::size; i++)
772 first_block->elem[i] = sorted_first_block->elem[i];
778 req = first_block->write(first_bid);
780 for (i = 0; i < last.block_offset(); ++i)
782 last_block->elem[i] = sorted_last_block->elem[i];
787 req = last_block->write(last_bid);
792 *first.bid() = first_bid;
793 *last.bid() = last_bid;
795 typename run_type::iterator it = out->begin();
797 bids_container_iterator cur_bid = first.bid();
800 for ( ; cur_bid != last.bid(); ++cur_bid, ++it)
802 *cur_bid = (*it).bid;
806 delete sorted_first_block;
807 delete sorted_last_block;
819 block_type* first_block =
new block_type;
823 req = first_block->read(*first.bid());
828 for ( ; i < first.block_offset(); ++i)
830 first_block->elem[i] = cmp.min_value();
833 req = first_block->write(first_bid);
835 n = last.bid() - first.bid();
837 std::swap(first_bid, *first.bid());
845 block_type, alloc_strategy_type, bids_container_iterator
849 first_block =
new block_type;
851 block_type* sorted_first_block =
new block_type;
855 reqs[0] = first_block->read(first_bid);
856 reqs[1] = sorted_first_block->read((*(out->begin())).bid);
861 for (i = first.block_offset(); i < block_type::size; ++i)
863 first_block->elem[i] = sorted_first_block->elem[i];
866 req = first_block->write(first_bid);
870 *first.bid() = first_bid;
872 typename run_type::iterator it = out->begin();
874 bids_container_iterator cur_bid = first.bid();
877 for ( ; cur_bid != last.bid(); ++cur_bid, ++it)
879 *cur_bid = (*it).bid;
882 *cur_bid = (*it).bid;
884 delete sorted_first_block;
895 if (last.block_offset())
898 block_type* last_block =
new block_type;
903 req = last_block->read(*last.bid());
907 for (i = last.block_offset(); i < block_type::size; ++i)
909 last_block->elem[i] = cmp.max_value();
912 req = last_block->write(last_bid);
914 n = last.bid() - first.bid() + 1;
916 std::swap(last_bid, *last.bid());
924 block_type, alloc_strategy_type, bids_container_iterator
928 last_block =
new block_type;
929 block_type* sorted_last_block =
new block_type;
932 reqs[0] = last_block->read(last_bid);
933 reqs[1] = sorted_last_block->read(((*out)[out->size() - 1]).bid);
938 for (i = 0; i < last.block_offset(); ++i)
940 last_block->elem[i] = sorted_last_block->elem[i];
943 req = last_block->write(last_bid);
947 *last.bid() = last_bid;
949 typename run_type::iterator it = out->begin();
950 bids_container_iterator cur_bid = first.bid();
952 for ( ; cur_bid != last.bid(); ++cur_bid, ++it)
954 *cur_bid = (*it).bid;
957 delete sorted_last_block;
968 n = last.bid() - first.bid();
972 block_type, alloc_strategy_type, bids_container_iterator
976 typename run_type::iterator it = out->begin();
977 bids_container_iterator cur_bid = first.bid();
979 for ( ; cur_bid != last.bid(); ++cur_bid, ++it)
981 *cur_bid = (*it).bid;
989 #if STXXL_CHECK_ORDER_IN_SORTS
990 typedef typename ExtIterator::const_iterator const_iterator;
999 #endif // !STXXL_ALGO_SORT_HEADER
simple_vector< sort_helper::trigger_entry< BlockType > > * sort_blocks(InputBidIterator input_bids, unsigned_type _n, unsigned_type _m, ValueCmp cmp)
#define STXXL_THROW_UNREACHABLE()
Throws stxxl::unreachable with "Error in file [file], line [line] : this code should never be reachab...
#define STXXL_ASSERT(condition)
compat::remove_const< Integral >::type div_ceil(Integral n, Integral2 d)
Fully randomized disk allocation scheme functor.
unsigned sort_memory_usage_factor()
const Type & STXXL_MIN(const Type &a, const Type &b)
bool check_sorted_runs(RunType **runs, unsigned_type nruns, unsigned_type m, ValueCmp cmp)
void sort(ExtIterator first, ExtIterator last, StrictWeakOrdering cmp, unsigned_type M)
Sort records comparison-based, see stxxl::sort -- Sorting Comparison-Based.
trigger_entry_iterator< Iterator > make_bid_iterator(Iterator iter)
unsigned_type count_elements_less_equal(const SequenceVector &seqs, const ValueType &bound, Comparator cmp)
void stl_in_memory_sort(ExtIterator first, ExtIterator last, StrictWeakOrdering cmp)
RandomAccessIterator3 multiway_merge(RandomAccessIteratorPairIterator seqs_begin, RandomAccessIteratorPairIterator seqs_end, RandomAccessIterator3 target, DiffType length, Comparator comp)
Multi-way merging dispatcher.
void verify_sentinel_strict_weak_ordering(StrictWeakOrdering cmp)
void refill_or_remove_empty_sequences(SequenceVector &seqs, BufferPtrVector &buffers, Prefetcher &prefetcher)
void delete_block(const BID< BLK_SIZE > &bid)
Deallocates a block.
void check_sort_settings()
Encapsulates asynchronous prefetching engine.
External sequence or deque container without random access. Introduction to sequence container: se...
void merge_runs(RunType **in_runs, unsigned_type nruns, RunType *out_run, unsigned_type _m, KeyExtractor keyobj)
#define _STXXL_SORT_TRIGGER_FORCE_SEQUENTIAL
choose_int_types< my_pointer_size >::int_type int_type
const Type & STXXL_MAX(const Type &a, const Type &b)
#define STXXL_BEGIN_NAMESPACE
#define STXXL_VERBOSE1(x)
void new_block(const DiskAssignFunctor &functor, BID< BLK_SIZE > &bid, unsigned_type offset=0)
Allocates a new block according to the strategy given by functor and stores the block identifier to b...
bool is_sorted(ForwardIterator first, ForwardIterator last, StrictWeakOrdering comp)
void new_blocks(const DiskAssignFunctor &functor, BIDIteratorClass bidbegin, BIDIteratorClass bidend, unsigned_type offset=0)
Allocates new blocks.
element_iterator_traits< BlockType, SizeType >::element_iterator make_element_iterator(BlockType *blocks, SizeType offset)
Provides a static class to store runtime tuning parameters.
Simpler non-growing vector without initialization.
void wait_all(RequestIterator reqs_begin, RequestIterator reqs_end)
Collection of functions to track statuses of a number of requests.
choose_int_types< my_pointer_size >::unsigned_type unsigned_type
double timestamp()
Returns number of seconds since the epoch, high resolution.
void compute_prefetch_schedule(const int_type *first, const int_type *last, int_type *out_first, int_type m, int_type D)
unsigned_type optimal_merge_factor(unsigned_type num_runs, unsigned_type max_concurrent_runs)
void create_runs(InputBidIterator it, RunType **runs, const unsigned_type nruns, const unsigned_type m2, KeyExtractor keyobj)
Request object encapsulating basic properties like file and offset.
void multi_merge(value_type *out_first, value_type *out_last)
#define STXXL_END_NAMESPACE