16 #ifndef STXXL_STREAM_SORT_STREAM_HEADER
17 #define STXXL_STREAM_SORT_STREAM_HEADER
56 static const unsigned block_size = BlockSize_;
82 while (!m_input.empty() && curr_idx != last_idx) {
95 if (first_idx < last_idx) {
98 while (first_idx != last_idx) {
99 *curr = m_cmp.max_value();
115 void compute_result();
127 m_result_computed(false)
131 throw bad_parameter(
"stxxl::runs_creator<>:runs_creator(): INSUFFICIENT MEMORY provided, please increase parameter 'memory_to_use'");
133 assert(m_memsize > 0);
141 if (!m_result_computed)
144 m_result_computed =
true;
145 #ifdef STXXL_PRINT_STAT_AFTER_RF
147 #endif //STXXL_PRINT_STAT_AFTER_RF
156 template <
class Input_,
class CompareType_,
unsigned BlockSize_,
class AllocStr_>
166 #ifndef STXXL_SMALL_INPUT_PSORT_OPT
170 while (!input.empty() && blocks1_length != block_type::size)
172 m_result->small_run.push_back(*input);
177 if (blocks1_length == block_type::size && !input.empty())
180 std::copy(m_result->small_run.begin(), m_result->small_run.end(), Blocks1[0].
begin());
181 m_result->small_run.clear();
185 STXXL_VERBOSE1(
"basic_runs_creator: Small input optimization, input length: " << blocks1_length);
186 m_result->elements = blocks1_length;
191 #endif //STXXL_SMALL_INPUT_PSORT_OPT
194 blocks1_length = fetch(Blocks1, blocks1_length, el_in_run);
197 sort_run(Blocks1, blocks1_length);
199 if (blocks1_length <= block_type::size && m_input.empty())
202 STXXL_VERBOSE1(
"basic_runs_creator: Small input optimization, input length: " << blocks1_length);
203 assert(m_result->small_run.empty());
204 m_result->small_run.assign(Blocks1[0].begin(), Blocks1[0].begin() + blocks1_length);
205 m_result->elements = blocks1_length;
216 run.resize(cur_run_size);
219 disk_queues::get_instance()->set_priority_op(request_queue::WRITE);
222 fill_with_max_value(Blocks1, cur_run_size, blocks1_length);
224 for (i = 0; i < cur_run_size; ++i)
226 run[i].value = Blocks1[i][0];
227 write_reqs[i] = Blocks1[i].
write(run[i].bid);
229 m_result->runs.push_back(run);
230 m_result->runs_sizes.push_back(blocks1_length);
231 m_result->elements += blocks1_length;
236 wait_all(write_reqs, write_reqs + cur_run_size);
242 STXXL_VERBOSE1(
"Filling the second part of the allocated blocks");
243 blocks2_length = fetch(Blocks2, 0, el_in_run);
249 blocks2_length += el_in_run;
250 sort_run(Blocks1, blocks2_length);
251 wait_all(write_reqs, write_reqs + cur_run_size);
254 cur_run_size =
div_ceil(blocks2_length, block_type::size);
255 run.resize(cur_run_size);
259 fill_with_max_value(Blocks1, cur_run_size, blocks2_length);
261 assert(cur_run_size > m2);
263 for (i = 0; i < m2; ++i)
265 run[i].value = Blocks1[i][0];
266 write_reqs[i]->
wait();
267 write_reqs[i] = Blocks1[i].
write(run[i].bid);
272 for ( ; i < cur_run_size; ++i)
274 run[i].value = Blocks1[i][0];
275 write_reqs1[i - m2] = Blocks1[i].
write(run[i].bid);
278 m_result->runs[0] = run;
279 m_result->runs_sizes[0] = blocks2_length;
280 m_result->elements = blocks2_length;
282 wait_all(write_reqs, write_reqs + m2);
284 wait_all(write_reqs1, write_reqs1 + cur_run_size - m2);
285 delete[] write_reqs1;
294 sort_run(Blocks2, blocks2_length);
296 cur_run_size =
div_ceil(blocks2_length, block_type::size);
297 run.resize(cur_run_size);
300 for (i = 0; i < cur_run_size; ++i)
302 run[i].value = Blocks2[i][0];
303 write_reqs[i]->
wait();
304 write_reqs[i] = Blocks2[i].
write(run[i].bid);
306 assert((blocks2_length % el_in_run) == 0);
308 m_result->add_run(run, blocks2_length);
310 while (!m_input.empty())
312 blocks1_length = fetch(Blocks1, 0, el_in_run);
313 sort_run(Blocks1, blocks1_length);
314 cur_run_size =
div_ceil(blocks1_length, block_type::size);
315 run.resize(cur_run_size);
319 fill_with_max_value(Blocks1, cur_run_size, blocks1_length);
321 for (i = 0; i < cur_run_size; ++i)
323 run[i].value = Blocks1[i][0];
324 write_reqs[i]->
wait();
325 write_reqs[i] = Blocks1[i].
write(run[i].bid);
327 m_result->add_run(run, blocks1_length);
329 std::swap(Blocks1, Blocks2);
330 std::swap(blocks1_length, blocks2_length);
333 wait_all(write_reqs, write_reqs + m2);
335 delete[] ((Blocks1 < Blocks2) ? Blocks1 : Blocks2);
367 :
base(input, cmp, memory_to_use)
379 template <
class ValueType_>
460 if (first_idx < last_idx) {
463 while (first_idx != last_idx) {
464 *curr = m_cmp.max_value();
485 sort_run(m_blocks1, m_cur_el);
487 if (m_cur_el <= block_type::size && m_result->elements == 0)
490 STXXL_VERBOSE1(
"runs_creator(use_push): Small input optimization, input length: " << m_cur_el);
491 m_result->small_run.assign(m_blocks1[0].begin(), m_blocks1[0].begin() + m_cur_el);
492 m_result->elements = m_cur_el;
497 run.resize(cur_run_size);
501 disk_queues::get_instance()->set_priority_op(request_queue::WRITE);
504 fill_with_max_value(m_blocks1, cur_run_size, m_cur_el);
507 for ( ; i < cur_run_size; ++i)
509 run[i].value = m_blocks1[i][0];
510 if (m_write_reqs[i].
get())
511 m_write_reqs[i]->wait();
513 m_write_reqs[i] = m_blocks1[i].write(run[i].bid);
515 m_result->add_run(run, m_cur_el);
517 for (i = 0; i < m_m2; ++i)
519 if (m_write_reqs[i].
get())
520 m_write_reqs[i]->wait();
530 m_memory_to_use(memory_to_use),
534 m_blocks1(NULL), m_blocks2(NULL),
539 throw bad_parameter(
"stxxl::runs_creator<>:runs_creator(): INSUFFICIENT MEMORY provided, please increase parameter 'memory_to_use'");
548 m_result_computed = 1;
560 m_result_computed =
false;
565 if (m_write_reqs[i].
get())
566 m_write_reqs[i]->cancel();
576 m_blocks2 = m_blocks1 + m_m2;
591 delete[] ((m_blocks1 < m_blocks2) ? m_blocks1 : m_blocks2);
592 m_blocks1 = m_blocks2 = NULL;
594 delete[] m_write_reqs;
603 assert(m_result_computed ==
false);
604 if (
LIKELY(m_cur_el < m_el_in_run))
606 m_blocks1[m_cur_el / block_type::size][m_cur_el % block_type::size] = val;
611 assert(m_el_in_run == m_cur_el);
615 sort_run(m_blocks1, m_el_in_run);
618 run.resize(cur_run_blocks);
622 disk_queues::get_instance()->set_priority_op(request_queue::WRITE);
626 run[i].value = m_blocks1[i][0];
627 if (m_write_reqs[i].
get())
628 m_write_reqs[i]->wait();
630 m_write_reqs[i] = m_blocks1[i].write(run[i].bid);
633 m_result->add_run(run, m_el_in_run);
635 std::swap(m_blocks1, m_blocks2);
645 if (!m_result_computed)
648 m_result_computed =
true;
649 #ifdef STXXL_PRINT_STAT_AFTER_RF
651 #endif //STXXL_PRINT_STAT_AFTER_RF
659 return m_result->elements + m_cur_el;
671 return m_memory_to_use;
682 template <
class ValueType_>
746 cur_block(writer.get_free_block()),
754 throw bad_parameter(
"stxxl::runs_creator<>:runs_creator(): INSUFFICIENT MEMORY provided, please increase parameter 'memory_to_use'");
762 assert(offset < block_type::size);
764 (*cur_block)[offset] = val;
767 if (offset == block_type::size)
773 result_->runs.resize(irun + 1);
774 result_->runs[irun].resize(iblock + 1);
782 result_->runs[irun][iblock].value = (*cur_block)[0];
783 cur_block = writer.write(cur_block, result_->runs[irun][iblock].bid);
795 if (offset == 0 && iblock == 0)
799 result_->runs_sizes.resize(irun + 1);
800 result_->runs_sizes.back() = iblock * block_type::size + offset;
804 while (offset != block_type::size)
806 (*cur_block)[offset] = cmp.max_value();
813 result_->runs.resize(irun + 1);
814 result_->runs[irun].resize(iblock + 1);
822 result_->runs[irun][iblock].value = (*cur_block)[0];
823 cur_block = writer.write(cur_block, result_->runs[irun][iblock].bid);
850 template <
class RunsType_,
class CompareType_>
854 typedef typename RunsType_::element_type::block_type block_type;
859 for (irun = 0; irun < nruns; ++irun)
862 block_type* blocks =
new block_type[nblocks];
866 reqs[j] = blocks[j].read(sruns->runs[irun][j].bid);
873 if (cmp(blocks[j][0], sruns->runs[irun][j].value) ||
874 cmp(sruns->runs[irun][j].value, blocks[j][0]))
876 STXXL_ERRMSG(
"check_sorted_runs wrong trigger in the run");
885 STXXL_ERRMSG(
"check_sorted_runs wrong order in the run");
893 STXXL_MSG(
"Checking runs finished successfully");
909 template <
class RunsType_,
920 typedef typename sorted_runs_data_type::size_type
size_type;
921 typedef typename sorted_runs_data_type::run_type
run_type;
922 typedef typename sorted_runs_data_type::block_type
block_type;
930 typedef std::pair<typename block_type::iterator, typename block_type::iterator>
sequence;
935 typedef typename sorted_runs_data_type::value_type
value_type;
971 #if STXXL_PARALLEL_MULTIWAY_MERGE
972 std::vector<sequence>* seqs;
973 std::vector<block_type*>* buffers;
977 #if STXXL_CHECK_ORDER_IN_SORTS
980 #endif //STXXL_CHECK_ORDER_IN_SORTS
984 void merge_recursively();
991 #if STXXL_PARALLEL_MULTIWAY_MERGE
996 delete[] m_prefetch_seq;
1006 #if STXXL_PARALLEL_MULTIWAY_MERGE
1012 if (num_currently_mergeable < rest)
1014 if (!m_prefetcher || m_prefetcher->empty())
1017 num_currently_mergeable = m_elements_remaining;
1022 *seqs, m_consume_seq[m_prefetcher->pos()].value, m_cmp);
1030 stxxl::parallel::multiway_merge((*seqs).begin(), (*seqs).end(), m_buffer_block->end() - rest, m_cmp, output_size);
1033 rest -= output_size;
1034 num_currently_mergeable -= output_size;
1039 }
while (rest > 0 && (*seqs).size() > 0);
1041 #if STXXL_CHECK_ORDER_IN_SORTS
1042 if (!
stxxl::is_sorted(m_buffer_block->begin(), m_buffer_block->end(), cmp))
1044 for (
value_type* i = m_buffer_block->begin() + 1; i != m_buffer_block->end(); ++i)
1045 if (cmp(*i, *(i - 1)))
1047 STXXL_VERBOSE1(
"Error at position " << (i - m_buffer_block->begin()));
1051 #endif //STXXL_CHECK_ORDER_IN_SORTS
1056 #endif //STXXL_PARALLEL_MULTIWAY_MERGE
1061 m_losers->multi_merge(m_buffer_block->elem, m_buffer_block->elem + STXXL_MIN<size_type>(out_block_type::size, m_elements_remaining));
1066 m_current_ptr = m_buffer_block->elem;
1067 m_current_end = m_buffer_block->elem + STXXL_MIN<size_type>(out_block_type::size, m_elements_remaining);
1069 if (m_elements_remaining <= out_block_type::size)
1070 deallocate_prefetcher();
1079 m_memory_to_use(memory_to_use),
1080 m_buffer_block(new out_block_type),
1081 m_prefetch_seq(NULL),
1087 num_currently_mergeable(0)
1090 , m_last_element(m_cmp.min_value())
1099 m_memory_to_use = memory_to_use;
1106 m_elements_remaining = m_sruns->elements;
1111 if (!m_sruns->small_run.empty())
1114 STXXL_VERBOSE1(
"basic_runs_merger: small input optimization, input length: " << m_elements_remaining);
1115 assert(m_elements_remaining ==
size_type(m_sruns->small_run.size()));
1117 m_current_ptr = &m_sruns->small_run[0];
1118 m_current_end = m_current_ptr + m_sruns->small_run.size();
1123 #if STXXL_CHECK_ORDER_IN_SORTS
1125 #endif //STXXL_CHECK_ORDER_IN_SORTS
1129 disk_queues::get_instance()->set_priority_op(request_queue::WRITE);
1131 int_type disks_number = config::get_instance()->disks_number();
1133 unsigned_type input_buffers = (m_memory_to_use >
sizeof(out_block_type) ? m_memory_to_use -
sizeof(out_block_type) : 0) / block_type::raw_size;
1136 if (input_buffers < nruns + min_prefetch_buffers)
1143 STXXL_WARNMSG_RECURSIVE_SORT(
"memory_to_use=" << m_memory_to_use <<
" bytes block_type::raw_size=" << block_type::raw_size <<
" bytes");
1146 unsigned_type recursive_merge_buffers = m_memory_to_use / block_type::raw_size;
1147 if (recursive_merge_buffers < 2 * min_prefetch_buffers + 1 + 2) {
1150 STXXL_ERRMSG(
"There are only m=" << recursive_merge_buffers <<
" blocks available for recursive merging, but "
1151 << min_prefetch_buffers <<
"+" << min_prefetch_buffers <<
"+1 are needed read-ahead/write-back/output, and");
1152 STXXL_ERRMSG(
"the merger requires memory to store at least two input blocks internally. Aborting.");
1153 throw bad_parameter(
"basic_runs_merger::sort(): INSUFFICIENT MEMORY provided, please increase parameter 'memory_to_use'");
1156 merge_recursively();
1158 nruns = m_sruns->runs.size();
1161 assert(nruns + min_prefetch_buffers <= input_buffers);
1165 deallocate_prefetcher();
1170 prefetch_seq_size += m_sruns->runs[i].size();
1173 m_consume_seq.resize(prefetch_seq_size);
1174 m_prefetch_seq =
new int_type[prefetch_seq_size];
1176 typename run_type::iterator copy_start = m_consume_seq.begin();
1179 copy_start = std::copy(m_sruns->runs[i].begin(),
1180 m_sruns->runs[i].end(),
1184 std::stable_sort(m_consume_seq.begin(), m_consume_seq.end(),
1189 #if STXXL_SORT_OPTIMAL_PREFETCHING
1191 const int_type n_opt_prefetch_buffers = min_prefetch_buffers + (3 * (n_prefetch_buffers - min_prefetch_buffers)) / 10;
1196 n_opt_prefetch_buffers,
1200 m_prefetch_seq[i] = i;
1201 #endif //STXXL_SORT_OPTIMAL_PREFETCHING
1204 m_consume_seq.begin(),
1205 m_consume_seq.end(),
1207 STXXL_MIN(nruns + n_prefetch_buffers, prefetch_seq_size));
1211 #if STXXL_PARALLEL_MULTIWAY_MERGE
1213 seqs =
new std::vector<sequence>(nruns);
1214 buffers =
new std::vector<block_type*>(nruns);
1219 (*seqs)[i] = std::make_pair((*buffers)[i]->begin(), (*buffers)[i]->end());
1224 #endif //STXXL_PARALLEL_MULTIWAY_MERGE
1233 fill_buffer_block();
1239 deallocate_prefetcher();
1247 return (m_elements_remaining == 0);
1253 return m_elements_remaining;
1260 return *m_current_ptr;
1266 return &(operator * ());
1273 assert(m_current_ptr != m_current_end);
1275 --m_elements_remaining;
1278 if (
LIKELY(m_current_ptr == m_current_end && !empty()))
1280 fill_buffer_block();
1282 #if STXXL_CHECK_ORDER_IN_SORTS
1283 assert(
stxxl::is_sorted(m_buffer_block->elem, m_buffer_block->elem + STXXL_MIN<size_type>(m_elements_remaining, m_buffer_block->size), m_cmp));
1284 #endif //STXXL_CHECK_ORDER_IN_SORTS
1287 #if STXXL_CHECK_ORDER_IN_SORTS
1290 assert(!m_cmp(
operator * (), m_last_element));
1291 m_last_element = operator * ();
1293 #endif //STXXL_CHECK_ORDER_IN_SORTS
1302 deallocate_prefetcher();
1304 delete m_buffer_block;
1309 template <
class RunsType_,
class CompareType_,
class AllocStr_>
1313 unsigned_type ndisks = config::get_instance()->disks_number();
1321 + recursive_merger_memory_prefetch_buffers
1322 + recursive_merger_memory_out_block;
1324 unsigned_type max_arity = (m_memory_to_use > memory_for_buffers ? m_memory_to_use - memory_for_buffers : 0) / block_type::raw_size;
1328 assert(merge_factor > 1);
1329 assert(merge_factor <= max_arity);
1331 while (nruns > max_arity)
1334 STXXL_MSG(
"Starting new merge phase: nruns: " << nruns <<
1335 " opt_merge_factor: " << merge_factor <<
1336 " max_arity: " << max_arity <<
" new_nruns: " << new_nruns);
1341 new_runs.runs.resize(new_nruns);
1342 new_runs.runs_sizes.resize(new_nruns);
1343 new_runs.elements = m_sruns->elements;
1349 size_type elements_left = m_sruns->elements;
1351 while (runs_left > 0)
1354 STXXL_MSG(
"Merging " << runs2merge <<
" runs");
1360 for (
unsigned_type i = nruns - runs_left; i < (nruns - runs_left + runs2merge); ++i)
1362 elements_in_new_run += m_sruns->runs_sizes[i];
1364 new_runs.runs_sizes[cur_out_run] = elements_in_new_run;
1370 new_runs.runs[cur_out_run].resize(blocks_in_new_run);
1377 cur_runs->runs.resize(runs2merge);
1378 cur_runs->runs_sizes.resize(runs2merge);
1380 std::copy(m_sruns->runs.begin() + nruns - runs_left,
1381 m_sruns->runs.begin() + nruns - runs_left + runs2merge,
1382 cur_runs->runs.begin());
1383 std::copy(m_sruns->runs_sizes.begin() + nruns - runs_left,
1384 m_sruns->runs_sizes.begin() + nruns - runs_left + runs2merge,
1385 cur_runs->runs_sizes.begin());
1387 cur_runs->elements = elements_in_new_run;
1388 elements_left -= elements_in_new_run;
1397 new_runs.runs[cur_out_run].begin(),
1401 const size_type cnt_max = cur_runs->elements;
1403 while (cnt != cnt_max)
1406 if ((cnt % block_type::size) == 0)
1407 new_runs.runs[cur_out_run][cnt /
size_type(block_type::size)].value = *merger;
1409 ++cnt, ++out, ++merger;
1411 assert(merger.
empty());
1413 while (cnt % block_type::size)
1415 *out = m_cmp.max_value();
1424 assert(cur_out_run + 1 == new_runs.runs.size());
1426 elements_left -= m_sruns->runs_sizes.back();
1429 new_runs.runs.back() = m_sruns->runs.back();
1430 new_runs.runs_sizes.back() = m_sruns->runs_sizes.back();
1433 runs_left -= runs2merge;
1437 assert(elements_left == 0);
1439 m_sruns->runs.clear();
1441 std::swap(nruns, new_nruns);
1442 m_sruns->swap(new_runs);
1453 template <
class RunsType_,
1454 class CompareType_ =
typename RunsType_::element_type::cmp_type,
1473 :
base(cmp, memory_to_use)
1475 this->initialize(sruns);
1482 :
base(cmp, memory_to_use)
1498 template <
class Input_,
1502 class runs_creator_type = runs_creator<Input_, CompareType_, BlockSize_, AllocStr_> >
1520 creator(in, c, memory_to_use),
1521 merger(creator.result(), c, memory_to_use)
1532 creator(in, c, m_memory_to_userc),
1533 merger(creator.result(), c, m_memory_to_use)
1542 return merger.empty();
1555 return merger.operator -> ();
1572 unsigned BlockSize_>
1600 template <
unsigned BlockSize,
1601 class RandomAccessIterator,
1604 void sort(RandomAccessIterator begin,
1605 RandomAccessIterator end,
1612 typedef typename streamify_traits<RandomAccessIterator>::stream_type InputType;
1615 #endif // STXXL_MSVC
1616 InputType Input(begin, end);
1618 sorter_type Sort(Input, cmp, MemSize);
1626 #endif // !STXXL_STREAM_SORT_STREAM_HEADER
sorted_runs_type m_sruns
smart pointer to sorted_runs object
const Tp & STXXL_MIN(const Tp &a, const Tp &b)
counting_ptr< sorted_runs_data_type > sorted_runs_type
sort_helper::run_cursor2_cmp< block_type, prefetcher_type, value_cmp > run_cursor2_cmp_type
#define STXXL_THROW_UNREACHABLE()
Throws stxxl::unreachable with "Error in file [file], line [line] : this code should never be reachab...
#define STXXL_DEFAULT_BLOCK_SIZE(type)
alloc_strategy_type alloc_strategy
const unsigned_type m_m2
m_memsize / 2
CompareType_ m_cmp
reference to the input stream
typed_block< BlockSize_, value_type > block_type
size_type m_elements_remaining
items remaining in input
const value_type * m_current_ptr
pointer into current memory buffer: this is either m_buffer_block or the small_runs vector ...
sort_helper::trigger_entry< block_type > trigger_entry_type
Forms sorted runs of data from a stream.
sorted_runs_data_type::block_type block_type
unsigned sort_memory_usage_factor()
runs_merger(value_cmp cmp, unsigned_type memory_to_use)
Creates a runs merger object without initializing a round of sorted_runs.
bool m_result_computed
memory for internal use in blocks
CompareType_ m_cmp
comparator object to sort runs
Input_::value_type value_type
bool is_sorted(_ForwardIter __first, _ForwardIter __last)
const unsigned_type m_memsize
memory size in numberr of blocks for internal use
sorted_runs< trigger_entry_type, std::less< value_type > > result
runs_creator(CompareType_ cmp, unsigned_type memory_to_use)
Creates the object.
void sort(ExtIterator first, ExtIterator last, StrictWeakOrdering cmp, unsigned_type M)
Sort records comparison-based, see stxxl::sort -- Sorting Comparison-Based.
sort(Input_ &in, CompareType_ c, unsigned_type m_memory_to_userc, unsigned_type m_memory_to_use)
Creates the object.
void sort_run(block_type *run, unsigned_type elements)
Sort a specific run, contained in a sequences of blocks.
trigger_entry_iterator< Iterator > make_bid_iterator(Iterator iter)
unsigned_type count_elements_less_equal(const SequenceVector &seqs, const ValueType &bound, Comparator cmp)
All sorted runs of a sort operation.
sorted_runs< trigger_entry_type, cmp_type > sorted_runs_data_type
block_type * m_blocks1
accumulation buffer of size m_m2 blocks, half the available memory size
base::block_type block_type
void sort_run(block_type *run, unsigned_type elements)
Sort a specific run, contained in a sequences of blocks.
#define STXXL_PARALLEL_MULTIWAY_MERGE
sorted_runs_type result_type
#define STXXL_WARNMSG_RECURSIVE_SORT
sorted_runs_data_type::run_type run_type
void allocate()
Allocates input buffers and clears result.
void verify_sentinel_strict_weak_ordering(StrictWeakOrdering cmp)
virtual ~basic_runs_merger()
Destructor.
AllocStr_ allocation_strategy_type
#define STXXL_DEFAULT_ALLOC_STRATEGY
void refill_or_remove_empty_sequences(SequenceVector &seqs, BufferPtrVector &buffers, Prefetcher &prefetcher)
BID< BlockSize_ > bid_type
prefetcher_type * m_prefetcher
prefetcher object
sorted_runs< trigger_entry_type, cmp_type > sorted_runs_data_type
sorted_runs_data_type::run_type run_type
std::pair< typename block_type::iterator, typename block_type::iterator > sequence
Computes sorted runs type from value type and block size.
buffered_writer< block_type > writer
#define STXXL_VERBOSE2(x)
void push(const value_type &val)
Adds new element to the sorter.
void check_sort_settings()
block_type * pull_block()
Pulls next unconsumed block from the consumption sequence.
helper::element_iterator_generator< BlockType, BlockType::has_only_data >::iterator element_iterator
loser_tree_type * m_losers
loser tree used for native merging
uint_pair & operator++()
prefix increment operator (directly manipulates the integer parts)
base::value_cmp value_cmp
element_iterator_traits< BlockType >::element_iterator make_element_iterator(BlockType *blocks, unsigned_type offset)
run_type run
run object containing block ids of the run being written to disk
void wait_all(request_iterator_ reqs_begin, request_iterator_ reqs_end)
Collection of functions to track statuses of a number of requests.
runs_creator_type::sorted_runs_type sorted_runs_type
sorted_runs_data_type::run_type run_type
run_cursor2< block_type, prefetcher_type > run_cursor_type
typed_block< BlockSize_, value_type > block_type
unsigned_type fetch(block_type *blocks, unsigned_type first_idx, unsigned_type last_idx)
true iff result is already computed (used in 'result()' method)
Input_::value_type value_type
Standard stream typedef.
request_ptr * m_write_reqs
reference to write requests transporting the last accumulation buffer to disk
void delete_blocks(const BIDIteratorClass &bidbegin, const BIDIteratorClass &bidend)
Deallocates blocks.
counting_ptr< sorted_runs_data_type > sorted_runs_type
sorted_runs_data_type::run_type run_type
Input strategy for runs_creator class.
Block containing elements of fixed length.
sort_helper::trigger_entry< block_type > trigger_entry_type
virtual void wait(bool measure_time=true)=0
Suspends calling thread until completion of the request.
Encapsulates asynchronous prefetching engine.
sorted_runs_type result_type
unsigned_type size() const
number of items currently inserted.
void clear()
Clear the internal state of the object: release all runs and reset.
base::sorted_runs_type sorted_runs_type
const Tp & STXXL_MAX(const Tp &a, const Tp &b)
#define _STXXL_SORT_TRIGGER_FORCE_SEQUENTIAL
choose_int_types< my_pointer_size >::int_type int_type
basic_runs_creator(Input_ &input, CompareType_ cmp, unsigned_type memory_to_use)
Create the object.
void fill_with_max_value(block_type *blocks, unsigned_type num_blocks, unsigned_type first_idx)
fill the rest of the block with max values
block_type out_block_type
RunsType_ sorted_runs_type
sorted_runs_type & result()
Returns the sorted runs object.
basic_runs_creator< Input_, CompareType_, BlockSize_, AllocStr_ > base
Forms sorted runs of data from a stream.
int_type * m_prefetch_seq
precalculated order of blocks in which they are prefetched
out_block_type * m_buffer_block
memory buffer for merging from external streams
runs_creator(Input_ &input, CompareType_ cmp, unsigned_type memory_to_use)
Creates the object.
typed_block< BlockSize_, value_type > block_type
sorted_runs< trigger_entry_type, cmp_type > sorted_runs_data_type
base::value_type value_type
sorted_runs_type m_result
comparator used to sort block groups
#define STXXL_CHECK_ORDER_IN_SORTS
#define STXXL_BEGIN_NAMESPACE
void STXXL_UNUSED(const U &)
const unsigned_type m_memory_to_use
memory size in bytes to use
const cmp_type & cmp() const
return comparator object.
OutputIterator_ materialize(StreamAlgorithm_ &in, OutputIterator_ out)
Stores consecutively stream content to an output iterator.
unsigned_type m_memsize
stores the result (sorted runs) as smart pointer
#define STXXL_VERBOSE1(x)
run_type m_consume_seq
sequence of block needed for merging
sort_helper::trigger_entry< block_type > trigger_entry_type
Produces sorted stream from input stream.
void clear()
Clear current state and remove all items.
void set_memory_to_use(unsigned_type memory_to_use)
Set memory amount to use for the merger in bytes.
sort(Input_ &in, CompareType_ c, unsigned_type memory_to_use)
Creates the object.
sorted_runs_data_type::value_type value_type
Standard stream typedef.
void deallocate()
Deallocates input buffers but not the current result.
iterator begin()
Returns iterator pointing to the first element.
basic_runs_merger(value_cmp c, unsigned_type memory_to_use)
Creates a runs merger object.
bool m_result_computed
true after the result() method was called for the first time
void new_blocks(const DiskAssignFunctor &functor, BIDIteratorClass bidbegin, BIDIteratorClass bidend, unsigned_type offset=0)
Allocates new blocks.
RunsType_ sorted_runs_type
base::sorted_runs_data_type sorted_runs_data_type
void finish()
Finishes current run and begins new one.
bool empty() const
Standard stream method.
void initialize(const sorted_runs_type &sruns)
Initialize the runs merger object with a new round of sorted_runs.
AllocStr_ alloc_strategy_type
loser_tree< run_cursor_type, run_cursor2_cmp_type > loser_tree_type
run_type::value_type trigger_entry_type
choose_int_types< my_pointer_size >::unsigned_type unsigned_type
const value_type * m_current_end
pointer into current memory buffer: end after range of current values
bool check_sorted_runs(run_type **runs, unsigned_type nruns, unsigned_type m, value_cmp cmp)
void deallocate()
Deallocate temporary structures freeing memory prior to next initialize().
compat::remove_const< Integral >::type div_ceil(Integral __n, Integral2 __d)
void compute_prefetch_schedule(const int_type *first, const int_type *last, int_type *out_first, int_type m, int_type D)
sorted_runs_type m_result
stores the result (sorted runs) in a reference counted object
void push(const value_type &val)
Adds new element to the current run.
size_type size() const
Standard size method.
unsigned_type optimal_merge_factor(unsigned_type num_runs, unsigned_type max_concurrent_runs)
std::vector< sequence >::size_type seqs_size_type
void deallocate_prefetcher()
sorted_runs_type & result()
Returns the sorted runs object.
unsigned_type memory_used() const
return memory size used (in bytes).
std::vector< trigger_entry_type > run_type
unsigned_type m_cur_el
current number of elements in the run m_blocks1
block_type * m_blocks2
accumulation buffer that is currently being written to disk
sorted_runs_type & result()
Returns the sorted runs object.
runs_creator(CompareType_ c, unsigned_type memory_to_use)
Creates the object.
request_ptr write(const bid_type &bid, completion_handler on_cmpl=default_completion_handler())
Writes block to the disk(s).
const unsigned_type m_el_in_run
total number of elements in a run
counting_ptr< sorted_runs_data_type > sorted_runs_type
Input strategy for runs_creator class.
Encapsulates asynchronous buffered block writing engine.
runs_merger(sorted_runs_type &sruns, value_cmp cmp, unsigned_type memory_to_use)
Creates a runs merger object.
block_prefetcher< block_type, typename run_type::iterator > prefetcher_type
void fill_with_max_value(block_type *blocks, unsigned_type num_blocks, unsigned_type first_idx)
fill the rest of the block with max values
runs_merger< sorted_runs_type, CompareType_, AllocStr_ > runs_merger_type
iterator2stream< InputIterator_ > streamify(InputIterator_ begin, InputIterator_ end)
Input iterator range to stream converter.
basic_runs_merger< RunsType_, CompareType_, AllocStr_ > base
base::block_type block_type
#define STXXL_END_NAMESPACE
runs_creator_type creator
unsigned_type m_memory_to_use
memory size in bytes to use
sorted_runs_type::element_type sorted_runs_data_type
value_cmp m_cmp
comparator object to sort runs
sort_helper::trigger_entry< bid_type, value_type > trigger_entry_type
sorted_runs_data_type::size_type size_type
bool empty() const
Standard stream method.