15 #ifndef STXXL_ALGO_KSORT_HEADER
16 #define STXXL_ALGO_KSORT_HEADER
53 namespace ksort_local {
55 template <
typename BIDType,
typename KeyType>
71 template <
typename BIDType,
typename KeyType>
72 inline bool operator < (const trigger_entry<BIDType, KeyType>& a,
75 return (a.key < b.key);
78 template <
typename BIDType,
typename KeyType>
85 template <
typename type,
typename key_type1>
97 template <
typename type,
typename key1>
100 return a.
key < b.key;
103 template <
typename type,
typename key1>
110 template <
typename block_type,
typename b
id_type>
118 * req = block->read(bid);
122 template <
typename type_key_,
125 typename input_bid_iterator,
126 typename key_extractor>
130 block_type*& cur_blk,
131 const block_type* end_blk,
136 typename block_type::bid_type*& bids,
139 input_bid_iterator& it,
140 key_extractor keyobj)
142 typedef typename block_type::type type;
144 type* elem = cur_blk->elem;
145 for (type_key_* p = begin; p < end; p++)
147 elem[out_pos++] = *(p->ptr);
149 if (out_pos >= block_type::size)
151 run[out_block].key = keyobj(*(cur_blk->elem));
153 if (cur_blk < end_blk)
155 next_read->
block = cur_blk;
156 next_read->
req = read_reqs + out_block;
157 read_reqs[out_block] = NULL;
158 bids[out_block] = next_read->
bid = *(it++);
160 write_reqs[out_block] = cur_blk->write(
168 write_reqs[out_block] = cur_blk->write(run[out_block].bid);
172 elem = cur_blk->elem;
182 typename input_bid_iterator,
183 typename key_extractor>
186 input_bid_iterator it,
190 key_extractor keyobj)
192 typedef typename block_type::value_type type;
193 typedef typename block_type::bid_type bid_type;
194 typedef typename key_extractor::key_type key_type;
198 block_type* Blocks1 =
new block_type[m2];
199 block_type* Blocks2 =
new block_type[m2];
200 bid_type* bids =
new bid_type[m2];
201 type_key_* refs1 =
new type_key_[m2 * Blocks1->size];
202 type_key_* refs2 =
new type_key_[m2 * Blocks1->size];
210 int_type run_size = (*runs)->size();
213 (m2 * block_type::size *
sizeof(type_key_) /
STXXL_L2_SIZE) : 2);
214 const int log_k2 =
ilog2_floor(m2 * Blocks1->size) - log_k1 - 1;
222 disk_queues::get_instance()->set_priority_op(request_queue::WRITE);
224 for (i = 0; i < run_size; i++)
227 read_reqs[i] = Blocks1[i].read(bids[i]);
231 const int shift1 = (int)(
sizeof(key_type) * 8 - log_k1);
232 const int shift2 = shift1 - log_k2;
235 for ( ; k < nruns; k++)
238 run_size = run->size();
240 std::fill(bucket1, bucket1 + k1, 0);
242 type_key_* ref_ptr = refs1;
243 for (i = 0; i < run_size; i++)
246 write_reqs[i]->
wait();
248 read_reqs[i]->
wait();
251 classify_block(Blocks1[i].begin(), Blocks1[i].end(), ref_ptr, bucket1, offset, shift1, keyobj);
255 classify(refs1, refs1 + run_size * Blocks1->size, refs2, bucket1,
260 unsigned_type next_run_size = (k < nruns - 1) ? (runs[k + 1]->size()) : 0;
263 type_key_* c = refs2;
264 type_key_* d = refs1;
265 block_type* cur_blk = Blocks2;
266 block_type* end_blk = Blocks2 + next_run_size;
269 for (i = 0; i < k1; i++)
271 type_key_* cEnd = refs2 + bucket1[i];
272 type_key_* dEnd = refs1 + bucket1[i];
274 l1sort(c, cEnd, d, bucket2, k2,
275 offset + (key_type(1) << key_type(shift1)) * key_type(i), shift2);
278 d, dEnd, cur_blk, end_blk,
279 out_block, out_pos, *run, next_read, bids,
280 write_reqs, read_reqs, it, keyobj);
286 std::swap(Blocks1, Blocks2);
298 delete[] next_run_reads;
303 template <
typename block_type,
304 typename prefetcher_type,
305 typename key_extractor>
306 struct run_cursor2_cmp :
public std::binary_function<run_cursor2<block_type, prefetcher_type>, run_cursor2<block_type, prefetcher_type>, bool>
331 template <
typename record_type,
typename key_extractor>
332 class key_comparison :
public std::binary_function<record_type, record_type, bool>
339 bool operator () (
const record_type& a,
const record_type& b)
const
341 return ke(a) < ke(b);
346 template <
typename block_type,
typename run_type,
typename key_ext_>
352 typedef typename block_type::value_type value_type;
354 STXXL_MSG(
"check_ksorted_runs Runs: " << nruns);
356 for (irun = 0; irun < nruns; ++irun)
360 block_type* blocks =
new block_type[m];
362 value_type last = keyext.min_value();
368 blocks_left -= nblocks;
372 reqs[j] = blocks[j].read((*runs[irun])[off + j].bid);
376 if (off && (keyext(blocks[0][0]) < keyext(last)))
378 STXXL_MSG(
"check_sorted_runs wrong first value in the run " << irun);
379 STXXL_MSG(
" first value: " << blocks[0][0] <<
" with key" << keyext(blocks[0][0]));
380 STXXL_MSG(
" last value: " << last <<
" with key" << keyext(last));
382 STXXL_MSG(
"Element " << k <<
" in the block is :" << blocks[0][k] <<
" key: " << keyext(blocks[0][k]));
391 if (keyext(blocks[j][0]) != (*runs[irun])[off + j].key)
393 STXXL_MSG(
"check_sorted_runs wrong trigger in the run " << irun <<
" block " << (off + j));
394 STXXL_MSG(
" trigger value: " << (*runs[irun])[off + j].key);
397 STXXL_MSG(
"Element " << k <<
" in the block is :" << blocks[j][k] <<
" with key: " << keyext(blocks[j][k]));
404 STXXL_MSG(
"BID " << (off + k) <<
" is: " << ((*runs[irun])[off + k].bid));
416 STXXL_MSG(
"check_sorted_runs wrong order in the run " << irun);
421 STXXL_MSG(
" Element " << k <<
" in block " << (off + j) <<
" is :" << blocks[j][k] <<
" with key: " << keyext(blocks[j][k]));
426 STXXL_MSG(
"BID " << (k + off) <<
" is: " << ((*runs[irun])[k + off].bid));
433 last = blocks[nblocks - 1][block_type::size - 1];
436 assert(blocks_left == 0);
445 template <
typename block_type,
typename run_type,
typename key_extractor>
452 run_type consume_seq(out_run->size());
456 typename run_type::iterator copy_start = consume_seq.begin();
457 for (i = 0; i < nruns; i++)
460 copy_start = std::copy(
467 size_t disks_number = config::get_instance()->disks_number();
469 #ifdef PLAY_WITH_OPT_PREF
470 const int_type n_write_buffers = 4 * disks_number;
478 STXXL_VERBOSE(
"Prefetch buffers " << n_opt_prefetch_buffers);
481 #if STXXL_SORT_OPTIMAL_PREFETCHING
485 n_opt_prefetch_buffers,
488 for (i = 0; i < out_run->size(); i++)
494 prefetcher_type prefetcher(consume_seq.begin(),
497 nruns + n_prefetch_buffers);
507 losers(&prefetcher, nruns, cmp);
509 block_type* out_buffer = writer.get_free_block();
511 for (i = 0; i < out_run_size; i++)
513 losers.multi_merge(out_buffer->elem, out_buffer->elem + block_type::size);
514 (*out_run)[i].key = keyobj(out_buffer->elem[0]);
515 out_buffer = writer.write(out_buffer, (*out_run)[i].bid);
518 delete[] prefetch_seq;
521 for (i = 0; i < nruns; i++)
532 template <
typename block_type,
533 typename alloc_strategy,
534 typename input_bid_iterator,
535 typename key_extractor>
540 typedef typename block_type::value_type type;
541 typedef typename key_extractor::key_type key_type;
542 typedef typename block_type::bid_type bid_type;
550 STXXL_VERBOSE(
"Reducing number of blocks in a run from " << m2 <<
" to " <<
551 m2_rf <<
" due to key size: " <<
sizeof(
typename key_extractor::key_type) <<
" bytes");
560 STXXL_VERBOSE(
"n=" << _n <<
" nruns=" << nruns <<
"=" << full_runs <<
"+" << partial_runs);
562 double begin =
timestamp(), after_runs_creation, end;
564 run_type** runs =
new run_type*[nruns];
566 for (i = 0; i < full_runs; i++)
567 runs[i] =
new run_type(m2);
570 #ifdef INTERLEAVED_ALLOC
574 runs[i] =
new run_type(last_run_size);
576 mng->
new_blocks(interleaved_alloc_strategy(nruns, alloc_strategy()),
578 (runs, 0, nruns, last_run_size),
580 (runs, _n, nruns, last_run_size));
583 mng->
new_blocks(interleaved_alloc_strategy(nruns, alloc_strategy()),
591 runs[i] =
new run_type(_n - full_runs * m2);
593 for (i = 0; i < nruns; i++)
602 key_extractor>(input_bids, runs, nruns, m2, keyobj);
606 double io_wait_after_rf = stats::get_instance()->get_io_wait_time();
608 disk_queues::get_instance()->set_priority_op(request_queue::WRITE);
616 STXXL_VERBOSE(
"Starting new merge phase: nruns: " << nruns <<
617 " opt_merge_factor: " << merge_factor <<
" m:" << _m <<
" new_nruns: " << new_nruns);
619 new_runs =
new run_type*[new_nruns];
625 while (runs_left > 0)
628 blocks_in_new_run = 0;
629 for (
unsigned_type i = nruns - runs_left; i < (nruns - runs_left + runs2merge); i++)
630 blocks_in_new_run += runs[i]->size();
633 new_runs[cur_out_run++] =
new run_type(blocks_in_new_run);
634 runs_left -= runs2merge;
637 if (cur_out_run == 1 && blocks_in_new_run ==
int_type(_n) && !input_bids->is_managed())
640 input_bid_iterator cur = input_bids;
641 for (
int_type i = 0; cur != (input_bids + _n); ++cur)
643 (*new_runs[0])[i++].bid = *cur;
646 bid_type& firstBID = (*new_runs[0])[0].bid;
647 if (firstBID.is_managed())
653 bid_type& lastBID = (*new_runs[0])[_n - 1].bid;
654 if (lastBID.is_managed())
663 mng->
new_blocks(interleaved_alloc_strategy(new_nruns, alloc_strategy()),
672 while (runs_left > 0)
675 #if STXXL_CHECK_ORDER_IN_SORTS
676 assert((check_ksorted_runs<block_type, run_type, key_extractor>(runs + nruns - runs_left, runs2merge, m2, keyobj)));
679 merge_runs<block_type, run_type, key_extractor>(runs + nruns - runs_left,
680 runs2merge, *(new_runs + (cur_out_run++)), _m, keyobj);
681 runs_left -= runs2merge;
689 run_type* result = *runs;
694 STXXL_VERBOSE(
"Elapsed time : " << end - begin <<
" s. Run creation time: " <<
695 after_runs_creation - begin <<
" s");
696 STXXL_VERBOSE(
"Time in I/O wait(rf): " << io_wait_after_rf <<
" s");
698 STXXL_UNUSED(begin + after_runs_creation + end + io_wait_after_rf);
730 template <
typename ExtIterator,
typename KeyExtractor>
734 typename KeyExtractor::key_type> > run_type;
735 typedef typename ExtIterator::vector_type::value_type value_type;
736 typedef typename ExtIterator::block_type block_type;
744 if ((last - first) *
sizeof(value_type) < M)
750 assert(2 * block_type::raw_size <= M);
752 if (first.block_offset())
754 if (last.block_offset())
757 typename ExtIterator::block_type * first_block =
new typename ExtIterator::block_type;
758 typename ExtIterator::block_type * last_block =
new typename ExtIterator::block_type;
759 typename ExtIterator::bid_type first_bid, last_bid;
762 req = first_block->read(*first.bid());
768 req = last_block->read(*last.bid());
771 for ( ; i < first.block_offset(); i++)
773 first_block->elem[i] = keyobj.min_value();
778 req = first_block->write(first_bid);
779 for (i = last.block_offset(); i < block_type::size; i++)
781 last_block->elem[i] = keyobj.max_value();
786 req = last_block->write(last_bid);
788 n = last.bid() - first.bid() + 1;
790 std::swap(first_bid, *first.bid());
791 std::swap(last_bid, *last.bid());
800 typename ExtIterator::block_type,
801 typename ExtIterator::vector_type::alloc_strategy_type,
802 typename ExtIterator::bids_container_iterator,
804 (first.bid(), n, M / block_type::raw_size, keyobj);
807 first_block =
new typename ExtIterator::block_type;
808 last_block =
new typename ExtIterator::block_type;
809 typename ExtIterator::block_type * sorted_first_block =
new typename ExtIterator::block_type;
810 typename ExtIterator::block_type * sorted_last_block =
new typename ExtIterator::block_type;
813 reqs[0] = first_block->read(first_bid);
814 reqs[1] = sorted_first_block->read((*(out->begin())).bid);
817 reqs[0] = last_block->read(last_bid);
818 reqs[1] = sorted_last_block->read(((*out)[out->size() - 1]).bid);
820 for (i = first.block_offset(); i < block_type::size; i++)
822 first_block->elem[i] = sorted_first_block->elem[i];
826 req = first_block->write(first_bid);
828 for (i = 0; i < last.block_offset(); i++)
830 last_block->elem[i] = sorted_last_block->elem[i];
836 req = last_block->write(last_bid);
841 *first.bid() = first_bid;
842 *last.bid() = last_bid;
844 typename run_type::iterator it = out->begin();
846 typename ExtIterator::bids_container_iterator cur_bid = first.bid();
849 for ( ; cur_bid != last.bid(); cur_bid++, it++)
851 *cur_bid = (*it).bid;
855 delete sorted_first_block;
856 delete sorted_last_block;
869 typename ExtIterator::block_type * first_block =
new typename ExtIterator::block_type;
870 typename ExtIterator::bid_type first_bid;
873 req = first_block->read(*first.bid());
879 for ( ; i < first.block_offset(); i++)
881 first_block->elem[i] = keyobj.min_value();
884 req = first_block->write(first_bid);
886 n = last.bid() - first.bid();
888 std::swap(first_bid, *first.bid());
896 typename ExtIterator::block_type,
897 typename ExtIterator::vector_type::alloc_strategy_type,
898 typename ExtIterator::bids_container_iterator,
900 (first.bid(), n, M / block_type::raw_size, keyobj);
903 first_block =
new typename ExtIterator::block_type;
905 typename ExtIterator::block_type * sorted_first_block =
new typename ExtIterator::block_type;
909 reqs[0] = first_block->read(first_bid);
910 reqs[1] = sorted_first_block->read((*(out->begin())).bid);
913 for (i = first.block_offset(); i < block_type::size; i++)
915 first_block->elem[i] = sorted_first_block->elem[i];
918 req = first_block->write(first_bid);
922 *first.bid() = first_bid;
924 typename run_type::iterator it = out->begin();
926 typename ExtIterator::bids_container_iterator cur_bid = first.bid();
929 for ( ; cur_bid != last.bid(); cur_bid++, it++)
931 *cur_bid = (*it).bid;
934 *cur_bid = (*it).bid;
936 delete sorted_first_block;
947 if (last.block_offset())
950 typename ExtIterator::block_type * last_block =
new typename ExtIterator::block_type;
951 typename ExtIterator::bid_type last_bid;
955 req = last_block->read(*last.bid());
959 for (i = last.block_offset(); i < block_type::size; i++)
961 last_block->elem[i] = keyobj.max_value();
964 req = last_block->write(last_bid);
966 n = last.bid() - first.bid() + 1;
968 std::swap(last_bid, *last.bid());
976 typename ExtIterator::block_type,
977 typename ExtIterator::vector_type::alloc_strategy_type,
978 typename ExtIterator::bids_container_iterator,
980 (first.bid(), n, M / block_type::raw_size, keyobj);
983 last_block =
new typename ExtIterator::block_type;
984 typename ExtIterator::block_type * sorted_last_block =
new typename ExtIterator::block_type;
987 reqs[0] = last_block->read(last_bid);
988 reqs[1] = sorted_last_block->read(((*out)[out->size() - 1]).bid);
991 for (i = 0; i < last.block_offset(); i++)
993 last_block->elem[i] = sorted_last_block->elem[i];
996 req = last_block->write(last_bid);
1000 *last.bid() = last_bid;
1002 typename run_type::iterator it = out->begin();
1003 typename ExtIterator::bids_container_iterator cur_bid = first.bid();
1005 for ( ; cur_bid != last.bid(); cur_bid++, it++)
1007 *cur_bid = (*it).bid;
1010 delete sorted_last_block;
1021 n = last.bid() - first.bid();
1025 typename ExtIterator::block_type,
1026 typename ExtIterator::vector_type::alloc_strategy_type,
1027 typename ExtIterator::bids_container_iterator,
1029 (first.bid(), n, M / block_type::raw_size, keyobj);
1031 typename run_type::iterator it = out->begin();
1032 typename ExtIterator::bids_container_iterator cur_bid = first.bid();
1034 for ( ; cur_bid != last.bid(); cur_bid++, it++)
1036 *cur_bid = (*it).bid;
1044 #if STXXL_CHECK_ORDER_IN_SORTS
1045 typedef typename ExtIterator::const_iterator const_iterator;
1051 template <
typename record_type>
1061 return record_type::max_value();
1065 return record_type::min_value();
1085 template <
typename ExtIterator>
1096 #endif // !STXXL_ALGO_KSORT_HEADER
const Tp & STXXL_MIN(const Tp &a, const Tp &b)
run_cursor2_cmp(key_extractor keyobj_)
#define STXXL_ASSERT(condition)
key_comparison(key_extractor ke_)
Fully randomized disk allocation scheme functor.
bool is_sorted(_ForwardIter __first, _ForwardIter __last)
void stl_in_memory_sort(ExtIterator_ first, ExtIterator_ last, StrictWeakOrdering_ cmp)
trigger_entry_iterator< Iterator > make_bid_iterator(Iterator iter)
record_type max_value() const
void delete_block(const BID< BLK_SIZE > &bid)
Deallocates a block.
block_type::const_reference current() const
void ksort(ExtIterator first, ExtIterator last, KeyExtractor keyobj, unsigned_type M)
Sort records with integer keys, see stxxl::ksort -- Sorting Integer Keys.
element_iterator_traits< BlockType >::element_iterator make_element_iterator(BlockType *blocks, unsigned_type offset)
void wait_all(request_iterator_ reqs_begin, request_iterator_ reqs_end)
Collection of functions to track statuses of a number of requests.
type_key(key_type k, type *p)
record_type::key_type key_type
void write_out(type_key_ *begin, type_key_ *end, block_type *&cur_blk, const block_type *end_blk, int_type &out_block, int_type &out_pos, run_type &run, write_completion_handler< block_type, typename block_type::bid_type > *&next_read, typename block_type::bid_type *&bids, request_ptr *write_reqs, request_ptr *read_reqs, input_bid_iterator &it, key_extractor keyobj)
virtual void wait(bool measure_time=true)=0
Suspends calling thread until completion of the request.
Encapsulates asynchronous prefetching engine.
double timestamp()
Returns number of seconds since the epoch, high resolution.
const Tp & STXXL_MAX(const Tp &a, const Tp &b)
#define _STXXL_SORT_TRIGGER_FORCE_SEQUENTIAL
choose_int_types< my_pointer_size >::int_type int_type
simple_vector< trigger_entry< typename block_type::bid_type, typename key_extractor::key_type > > * ksort_blocks(input_bid_iterator input_bids, unsigned_type _n, unsigned_type _m, key_extractor keyobj)
bool operator>(const uint_pair &b) const
greater comparison operator
run_cursor2< block_type, prefetcher_type > cursor_type
#define STXXL_BEGIN_NAMESPACE
void STXXL_UNUSED(const U &)
static void classify(type_key *a, type_key *aEnd, type_key *b, int_type *bucket, typename type_key::key_type offset, unsigned shift)
void new_block(const DiskAssignFunctor &functor, BID< BLK_SIZE > &bid, unsigned_type offset=0)
Allocates a new block according to the strategy given by functor and stores the block identifier to b...
void l1sort(type_key *a, type_key *aEnd, type_key *b, int_type *bucket, int_type K, typename type_key::key_type offset, int shift)
unsigned int ilog2_floor(IntegerType i)
calculate the log2 floor of an integer type (by repeated bit shifts)
static void exclusive_prefix_sum(int_type *bucket, int_type K)
void new_blocks(const DiskAssignFunctor &functor, BIDIteratorClass bidbegin, BIDIteratorClass bidend, unsigned_type offset=0)
Allocates new blocks.
Simpler non-growing vector without initialization.
void create_runs(input_bid_iterator it, run_type **runs, const unsigned_type nruns, const unsigned_type m2, key_extractor keyobj)
choose_int_types< my_pointer_size >::unsigned_type unsigned_type
unsigned int ilog2_ceil(const IntegerType &i)
calculate the log2 ceiling of an integer type (by repeated bit shifts)
compat::remove_const< Integral >::type div_ceil(Integral __n, Integral2 __d)
void compute_prefetch_schedule(const int_type *first, const int_type *last, int_type *out_first, int_type m, int_type D)
unsigned_type optimal_merge_factor(unsigned_type num_runs, unsigned_type max_concurrent_runs)
bool check_ksorted_runs(run_type **runs, unsigned_type nruns, unsigned_type m, key_ext_ keyext)
Encapsulates asynchronous buffered block writing engine.
void classify_block(type *begin, type *end, type_key *&out, int_type *bucket, typename key_extractor::key_type offset, unsigned shift, key_extractor keyobj)
Request with basic properties like file and offset.
record_type min_value() const
#define STXXL_END_NAMESPACE
void merge_runs(run_type **in_runs, unsigned_type nruns, run_type *out_run, unsigned_type _m, key_extractor keyobj)