15 #ifndef STXXL_ALGO_KSORT_HEADER
16 #define STXXL_ALGO_KSORT_HEADER
51 namespace ksort_local {
53 template <
typename BIDType,
typename KeyType>
68 template <
typename BIDType,
typename KeyType>
69 inline bool operator < (const trigger_entry<BIDType, KeyType>& a,
72 return (a.key < b.key);
75 template <
typename BIDType,
typename KeyType>
82 template <
typename Type,
typename KeyType>
94 template <
typename Type,
typename KeyType>
100 template <
typename Type,
typename KeyType>
106 template <
typename BlockType,
typename B
idType>
114 * req = block->read(bid);
118 template <
typename TypeKey,
121 typename InputBidIterator,
122 typename KeyExtractor>
127 const BlockType* end_blk,
132 typename BlockType::bid_type*& bids,
135 InputBidIterator& it,
138 typedef typename BlockType::type type;
140 type* elem = cur_blk->elem;
141 for (TypeKey* p = begin; p < end; p++)
143 elem[out_pos++] = *(p->ptr);
145 if (out_pos >= BlockType::size)
147 run[out_block].key = keyobj(*(cur_blk->elem));
149 if (cur_blk < end_blk)
151 next_read->
block = cur_blk;
152 next_read->
req = read_reqs + out_block;
153 read_reqs[out_block] = NULL;
154 bids[out_block] = next_read->
bid = *(it++);
156 write_reqs[out_block] = cur_blk->write(
164 write_reqs[out_block] = cur_blk->write(run[out_block].bid);
168 elem = cur_blk->elem;
178 typename InputBidIterator,
179 typename KeyExtractor>
188 typedef typename BlockType::value_type type;
189 typedef typename BlockType::bid_type bid_type;
190 typedef typename KeyExtractor::key_type key_type;
194 BlockType* Blocks1 =
new BlockType[m2];
195 BlockType* Blocks2 =
new BlockType[m2];
196 bid_type* bids =
new bid_type[m2];
197 type_key_* refs1 =
new type_key_[m2 * Blocks1->size];
198 type_key_* refs2 =
new type_key_[m2 * Blocks1->size];
206 int_type run_size = (*runs)->size();
209 (m2 * BlockType::size *
sizeof(type_key_) /
STXXL_L2_SIZE) : 2);
210 const int log_k2 =
ilog2_floor(m2 * Blocks1->size) - log_k1 - 1;
218 disk_queues::get_instance()->set_priority_op(request_queue::WRITE);
220 for (i = 0; i < run_size; i++)
223 read_reqs[i] = Blocks1[i].read(bids[i]);
227 const int shift1 = (int)(
sizeof(key_type) * 8 - log_k1);
228 const int shift2 = shift1 - log_k2;
231 for ( ; k < nruns; k++)
234 run_size = run->size();
236 std::fill(bucket1, bucket1 + k1, 0);
238 type_key_* ref_ptr = refs1;
239 for (i = 0; i < run_size; i++)
242 write_reqs[i]->
wait();
244 read_reqs[i]->
wait();
247 classify_block(Blocks1[i].begin(), Blocks1[i].end(), ref_ptr, bucket1, offset, shift1, keyobj);
251 classify(refs1, refs1 + run_size * Blocks1->size, refs2, bucket1,
256 unsigned_type next_run_size = (k < nruns - 1) ? (runs[k + 1]->size()) : 0;
259 type_key_* c = refs2;
260 type_key_* d = refs1;
261 BlockType* cur_blk = Blocks2;
262 BlockType* end_blk = Blocks2 + next_run_size;
265 for (i = 0; i < k1; i++)
267 type_key_* cEnd = refs2 + bucket1[i];
268 type_key_* dEnd = refs1 + bucket1[i];
270 l1sort(c, cEnd, d, bucket2, k2,
271 offset + (key_type(1) << key_type(shift1)) * key_type(i), shift2);
274 d, dEnd, cur_blk, end_blk,
275 out_block, out_pos, *run, next_read, bids,
276 write_reqs, read_reqs, it, keyobj);
282 std::swap(Blocks1, Blocks2);
294 delete[] next_run_reads;
299 template <
typename BlockType,
300 typename prefetcher_type,
301 typename KeyExtractor>
303 run_cursor2<BlockType, prefetcher_type>,
304 run_cursor2<BlockType, prefetcher_type>,
329 template <
typename RecordType,
typename KeyExtractor>
337 bool operator () (
const RecordType& a,
const RecordType& b)
const
339 return ke(a) < ke(b);
343 template <
typename BlockType,
typename RunType,
typename KeyExtractor>
349 typedef BlockType block_type;
350 typedef typename BlockType::value_type value_type;
352 STXXL_MSG(
"check_ksorted_runs Runs: " << nruns);
354 for (irun = 0; irun < nruns; ++irun)
358 block_type* blocks =
new block_type[m];
360 value_type last = keyext.min_value();
366 blocks_left -= nblocks;
370 reqs[j] = blocks[j].read((*runs[irun])[off + j].bid);
374 if (off && (keyext(blocks[0][0]) < keyext(last)))
376 STXXL_MSG(
"check_sorted_runs wrong first value in the run " << irun);
377 STXXL_MSG(
" first value: " << blocks[0][0] <<
" with key" << keyext(blocks[0][0]));
378 STXXL_MSG(
" last value: " << last <<
" with key" << keyext(last));
380 STXXL_MSG(
"Element " << k <<
" in the block is :" << blocks[0][k] <<
" key: " << keyext(blocks[0][k]));
389 if (keyext(blocks[j][0]) != (*runs[irun])[off + j].key)
391 STXXL_MSG(
"check_sorted_runs wrong trigger in the run " << irun <<
" block " << (off + j));
392 STXXL_MSG(
" trigger value: " << (*runs[irun])[off + j].key);
395 STXXL_MSG(
"Element " << k <<
" in the block is :" << blocks[j][k] <<
" with key: " << keyext(blocks[j][k]));
402 STXXL_MSG(
"BID " << (off + k) <<
" is: " << ((*runs[irun])[off + k].bid));
414 STXXL_MSG(
"check_sorted_runs wrong order in the run " << irun);
419 STXXL_MSG(
" Element " << k <<
" in block " << (off + j) <<
" is :" << blocks[j][k] <<
" with key: " << keyext(blocks[j][k]));
424 STXXL_MSG(
"BID " << (k + off) <<
" is: " << ((*runs[irun])[k + off].bid));
431 last = blocks[nblocks - 1][block_type::size - 1];
434 assert(blocks_left == 0);
442 template <
typename BlockType,
typename RunType,
typename KeyExtractor>
445 typedef BlockType block_type;
450 RunType consume_seq(out_run->size());
454 typename RunType::iterator copy_start = consume_seq.begin();
455 for (i = 0; i < nruns; i++)
458 copy_start = std::copy(
465 size_t disks_number = config::get_instance()->disks_number();
467 #ifdef PLAY_WITH_OPT_PREF
468 const int_type n_write_buffers = 4 * disks_number;
476 STXXL_VERBOSE(
"Prefetch buffers " << n_opt_prefetch_buffers);
479 #if STXXL_SORT_OPTIMAL_PREFETCHING
483 n_opt_prefetch_buffers,
484 config::get_instance()->get_max_device_id());
486 for (i = 0; i < out_run->size(); i++)
491 prefetcher_type prefetcher(consume_seq.begin(),
494 nruns + n_prefetch_buffers);
504 losers(&prefetcher, nruns, cmp);
506 block_type* out_buffer = writer.get_free_block();
508 for (i = 0; i < out_run_size; i++)
510 losers.multi_merge(out_buffer->elem, out_buffer->elem + block_type::size);
511 (*out_run)[i].key = keyobj(out_buffer->elem[0]);
512 out_buffer = writer.write(out_buffer, (*out_run)[i].bid);
515 delete[] prefetch_seq;
518 for (i = 0; i < nruns; i++)
528 template <
typename BlockType,
529 typename AllocStrategy,
530 typename InputBidIterator,
531 typename KeyExtractor>
533 trigger_entry<typename BlockType::bid_type, typename KeyExtractor::key_type>
538 typedef BlockType block_type;
539 typedef typename BlockType::value_type type;
540 typedef typename KeyExtractor::key_type key_type;
541 typedef typename BlockType::bid_type bid_type;
549 STXXL_VERBOSE(
"Reducing number of blocks in a run from " << m2 <<
" to " <<
550 m2_rf <<
" due to key size: " <<
sizeof(
typename KeyExtractor::key_type) <<
" bytes");
559 STXXL_VERBOSE(
"n=" << _n <<
" nruns=" << nruns <<
"=" << full_runs <<
"+" << partial_runs);
561 double begin =
timestamp(), after_runs_creation, end;
563 run_type** runs =
new run_type*[nruns];
565 for (i = 0; i < full_runs; i++)
566 runs[i] =
new run_type(m2);
568 #ifdef INTERLEAVED_ALLOC
572 runs[i] =
new run_type(last_run_size);
574 mng->
new_blocks(interleaved_alloc_strategy(nruns, AllocStrategy()),
576 (runs, 0, nruns, last_run_size),
578 (runs, _n, nruns, last_run_size));
581 mng->
new_blocks(interleaved_alloc_strategy(nruns, AllocStrategy()),
589 runs[i] =
new run_type(_n - full_runs * m2);
591 for (i = 0; i < nruns; i++)
597 create_runs<block_type, run_type, InputBidIterator, KeyExtractor>(
598 input_bids, runs, nruns, m2, keyobj);
602 double io_wait_after_rf = stats::get_instance()->get_io_wait_time();
604 disk_queues::get_instance()->set_priority_op(request_queue::WRITE);
612 STXXL_VERBOSE(
"Starting new merge phase: nruns: " << nruns <<
613 " opt_merge_factor: " << merge_factor <<
" m:" << _m <<
" new_nruns: " << new_nruns);
615 new_runs =
new run_type*[new_nruns];
621 while (runs_left > 0)
624 blocks_in_new_run = 0;
625 for (
unsigned_type i = nruns - runs_left; i < (nruns - runs_left + runs2merge); i++)
626 blocks_in_new_run += runs[i]->size();
629 new_runs[cur_out_run++] =
new run_type(blocks_in_new_run);
630 runs_left -= runs2merge;
633 if (cur_out_run == 1 && blocks_in_new_run ==
int_type(_n) && !input_bids->is_managed())
636 InputBidIterator cur = input_bids;
637 for (
int_type i = 0; cur != (input_bids + _n); ++cur)
639 (*new_runs[0])[i++].bid = *cur;
642 bid_type& firstBID = (*new_runs[0])[0].bid;
643 if (firstBID.is_managed())
649 bid_type& lastBID = (*new_runs[0])[_n - 1].bid;
650 if (lastBID.is_managed())
659 mng->
new_blocks(interleaved_alloc_strategy(new_nruns, AllocStrategy()),
667 while (runs_left > 0)
670 #if STXXL_CHECK_ORDER_IN_SORTS
671 assert((check_ksorted_runs<block_type, run_type, KeyExtractor>(runs + nruns - runs_left, runs2merge, m2, keyobj)));
674 merge_runs<block_type, run_type, KeyExtractor>(runs + nruns - runs_left,
675 runs2merge, *(new_runs + (cur_out_run++)), _m, keyobj);
676 runs_left -= runs2merge;
684 run_type* result = *runs;
689 STXXL_VERBOSE(
"Elapsed time : " << end - begin <<
" s. Run creation time: " <<
690 after_runs_creation - begin <<
" s");
691 STXXL_VERBOSE(
"Time in I/O wait(rf): " << io_wait_after_rf <<
" s");
724 template <
typename ExtIterator,
typename KeyExtractor>
729 typename ExtIterator::bid_type,
typename KeyExtractor::key_type
732 typedef typename ExtIterator::vector_type::value_type value_type;
733 typedef typename ExtIterator::bid_type bid_type;
734 typedef typename ExtIterator::block_type block_type;
735 typedef typename ExtIterator::vector_type::alloc_strategy_type alloc_strategy_type;
736 typedef typename ExtIterator::bids_container_iterator bids_container_iterator;
743 if ((last - first) *
sizeof(value_type) < M)
750 assert(2 * block_type::raw_size <= M);
752 if (first.block_offset())
754 if (last.block_offset())
757 block_type* first_block =
new block_type;
758 block_type* last_block =
new block_type;
759 bid_type first_bid, last_bid;
762 req = first_block->read(*first.bid());
767 req = last_block->read(*last.bid());
770 for ( ; i < first.block_offset(); i++)
772 first_block->elem[i] = keyobj.min_value();
777 req = first_block->write(first_bid);
778 for (i = last.block_offset(); i < block_type::size; i++)
780 last_block->elem[i] = keyobj.max_value();
785 req = last_block->write(last_bid);
787 n = last.bid() - first.bid() + 1;
789 std::swap(first_bid, *first.bid());
790 std::swap(last_bid, *last.bid());
799 block_type, alloc_strategy_type,
800 bids_container_iterator, KeyExtractor
801 >(first.bid(), n, M / block_type::raw_size, keyobj);
803 first_block =
new block_type;
804 last_block =
new block_type;
805 block_type* sorted_first_block =
new block_type;
806 block_type* sorted_last_block =
new block_type;
809 reqs[0] = first_block->read(first_bid);
810 reqs[1] = sorted_first_block->read((*(out->begin())).bid);
813 reqs[0] = last_block->read(last_bid);
814 reqs[1] = sorted_last_block->read(((*out)[out->size() - 1]).bid);
816 for (i = first.block_offset(); i < block_type::size; i++)
818 first_block->elem[i] = sorted_first_block->elem[i];
822 req = first_block->write(first_bid);
824 for (i = 0; i < last.block_offset(); i++)
826 last_block->elem[i] = sorted_last_block->elem[i];
831 req = last_block->write(last_bid);
836 *first.bid() = first_bid;
837 *last.bid() = last_bid;
839 typename run_type::iterator it = out->begin();
841 bids_container_iterator cur_bid = first.bid();
844 for ( ; cur_bid != last.bid(); cur_bid++, it++)
846 *cur_bid = (*it).bid;
850 delete sorted_first_block;
851 delete sorted_last_block;
864 block_type* first_block =
new block_type;
868 req = first_block->read(*first.bid());
873 for ( ; i < first.block_offset(); i++)
875 first_block->elem[i] = keyobj.min_value();
878 req = first_block->write(first_bid);
880 n = last.bid() - first.bid();
882 std::swap(first_bid, *first.bid());
890 block_type, alloc_strategy_type,
891 bids_container_iterator, KeyExtractor
892 >(first.bid(), n, M / block_type::raw_size, keyobj);
894 first_block =
new block_type;
896 block_type* sorted_first_block =
new block_type;
900 reqs[0] = first_block->read(first_bid);
901 reqs[1] = sorted_first_block->read((*(out->begin())).bid);
904 for (i = first.block_offset(); i < block_type::size; i++)
906 first_block->elem[i] = sorted_first_block->elem[i];
909 req = first_block->write(first_bid);
913 *first.bid() = first_bid;
915 typename run_type::iterator it = out->begin();
917 bids_container_iterator cur_bid = first.bid();
920 for ( ; cur_bid != last.bid(); cur_bid++, it++)
922 *cur_bid = (*it).bid;
925 *cur_bid = (*it).bid;
927 delete sorted_first_block;
938 if (last.block_offset())
941 block_type* last_block =
new block_type;
946 req = last_block->read(*last.bid());
950 for (i = last.block_offset(); i < block_type::size; i++)
952 last_block->elem[i] = keyobj.max_value();
955 req = last_block->write(last_bid);
957 n = last.bid() - first.bid() + 1;
959 std::swap(last_bid, *last.bid());
967 block_type, alloc_strategy_type,
968 bids_container_iterator, KeyExtractor
969 >(first.bid(), n, M / block_type::raw_size, keyobj);
971 last_block =
new block_type;
972 block_type* sorted_last_block =
new block_type;
975 reqs[0] = last_block->read(last_bid);
976 reqs[1] = sorted_last_block->read(((*out)[out->size() - 1]).bid);
979 for (i = 0; i < last.block_offset(); i++)
981 last_block->elem[i] = sorted_last_block->elem[i];
984 req = last_block->write(last_bid);
988 *last.bid() = last_bid;
990 typename run_type::iterator it = out->begin();
991 bids_container_iterator cur_bid = first.bid();
993 for ( ; cur_bid != last.bid(); cur_bid++, it++)
995 *cur_bid = (*it).bid;
998 delete sorted_last_block;
1009 n = last.bid() - first.bid();
1013 block_type, alloc_strategy_type,
1014 bids_container_iterator, KeyExtractor
1015 >(first.bid(), n, M / block_type::raw_size, keyobj);
1017 typename run_type::iterator it = out->begin();
1018 bids_container_iterator cur_bid = first.bid();
1020 for ( ; cur_bid != last.bid(); cur_bid++, it++)
1022 *cur_bid = (*it).bid;
1030 #if STXXL_CHECK_ORDER_IN_SORTS
1031 typedef typename ExtIterator::const_iterator const_iterator;
1037 template <
typename RecordType>
1047 return RecordType::max_value();
1051 return RecordType::min_value();
1071 template <
typename ExtIterator>
1082 #endif // !STXXL_ALGO_KSORT_HEADER
#define STXXL_ASSERT(condition)
compat::remove_const< Integral >::type div_ceil(Integral n, Integral2 d)
run_cursor2< BlockType, prefetcher_type > cursor_type
Fully randomized disk allocation scheme functor.
const Type & STXXL_MIN(const Type &a, const Type &b)
trigger_entry_iterator< Iterator > make_bid_iterator(Iterator iter)
bool check_ksorted_runs(RunType **runs, unsigned_type nruns, unsigned_type m, KeyExtractor keyext)
void stl_in_memory_sort(ExtIterator first, ExtIterator last, StrictWeakOrdering cmp)
void write_out(TypeKey *begin, TypeKey *end, BlockType *&cur_blk, const BlockType *end_blk, int_type &out_block, int_type &out_pos, RunType &run, write_completion_handler< BlockType, typename BlockType::bid_type > *&next_read, typename BlockType::bid_type *&bids, request_ptr *write_reqs, request_ptr *read_reqs, InputBidIterator &it, KeyExtractor keyobj)
RecordType min_value() const
key_comparison(KeyExtractor ke_)
void delete_block(const BID< BLK_SIZE > &bid)
Deallocates a block.
run_cursor2_cmp(KeyExtractor _keyobj)
void ksort(ExtIterator first, ExtIterator last, KeyExtractor keyobj, unsigned_type M)
Sort records with integer keys, see stxxl::ksort -- Sorting Integer Keys.
virtual void wait(bool measure_time=true)=0
Suspends calling thread until completion of the request.
Encapsulates asynchronous prefetching engine.
static void classify(TypeKey *a, TypeKey *aEnd, TypeKey *b, int_type *bucket, typename TypeKey::key_type offset, unsigned shift)
void merge_runs(RunType **in_runs, unsigned_type nruns, RunType *out_run, unsigned_type _m, KeyExtractor keyobj)
void classify_block(Type *begin, Type *end, TypeKey *&out, int_type *bucket, typename KeyExtractor::key_type offset, unsigned shift, KeyExtractor keyobj)
#define _STXXL_SORT_TRIGGER_FORCE_SEQUENTIAL
choose_int_types< my_pointer_size >::int_type int_type
bool operator>(const uint_pair &b) const
greater comparison operator
const Type & STXXL_MAX(const Type &a, const Type &b)
type_key(key_type k, Type *p)
#define STXXL_BEGIN_NAMESPACE
void l1sort(TypeKey *a, TypeKey *aEnd, TypeKey *b, int_type *bucket, int_type K, typename TypeKey::key_type offset, int shift)
void new_block(const DiskAssignFunctor &functor, BID< BLK_SIZE > &bid, unsigned_type offset=0)
Allocates a new block according to the strategy given by functor and stores the block identifier to b...
bool is_sorted(ForwardIterator first, ForwardIterator last, StrictWeakOrdering comp)
simple_vector< trigger_entry< typename BlockType::bid_type, typename KeyExtractor::key_type > > * ksort_blocks(InputBidIterator input_bids, unsigned_type _n, unsigned_type _m, KeyExtractor keyobj)
unsigned int ilog2_floor(IntegerType i)
calculate the log2 floor of an integer type (by repeated bit shifts)
static void exclusive_prefix_sum(int_type *bucket, int_type K)
BlockType::const_reference current() const
void new_blocks(const DiskAssignFunctor &functor, BIDIteratorClass bidbegin, BIDIteratorClass bidend, unsigned_type offset=0)
Allocates new blocks.
RecordType max_value() const
element_iterator_traits< BlockType, SizeType >::element_iterator make_element_iterator(BlockType *blocks, SizeType offset)
Simpler non-growing vector without initialization.
void wait_all(RequestIterator reqs_begin, RequestIterator reqs_end)
Collection of functions to track statuses of a number of requests.
choose_int_types< my_pointer_size >::unsigned_type unsigned_type
unsigned int ilog2_ceil(const IntegerType &i)
calculate the log2 ceiling of an integer type (by repeated bit shifts)
double timestamp()
Returns number of seconds since the epoch, high resolution.
void compute_prefetch_schedule(const int_type *first, const int_type *last, int_type *out_first, int_type m, int_type D)
unsigned_type optimal_merge_factor(unsigned_type num_runs, unsigned_type max_concurrent_runs)
void create_runs(InputBidIterator it, RunType **runs, const unsigned_type nruns, const unsigned_type m2, KeyExtractor keyobj)
Request object encapsulating basic properties like file and offset.
RecordType::key_type key_type
#define STXXL_END_NAMESPACE