16 #ifndef STXXL_STREAM_SORT_STREAM_HEADER
17 #define STXXL_STREAM_SORT_STREAM_HEADER
56 static const unsigned block_size = BlockSize;
89 while (!m_input.empty() && curr_idx != last_idx) {
103 if (first_idx < last_idx) {
105 while (first_idx != last_idx) {
106 *curr = m_cmp.max_value();
122 void compute_result();
136 m_result_computed(false)
141 "INSUFFICIENT MEMORY provided, "
142 "please increase parameter 'memory_to_use'");
144 assert(m_memsize > 0);
152 if (!m_result_computed)
155 m_result_computed =
true;
156 #ifdef STXXL_PRINT_STAT_AFTER_RF
158 #endif //STXXL_PRINT_STAT_AFTER_RF
167 template <
class Input,
class CompareType,
unsigned BlockSize,
class AllocStr>
177 #ifndef STXXL_SMALL_INPUT_PSORT_OPT
181 while (!input.empty() && blocks1_length != block_type::size)
183 m_result->small_run.push_back(*input);
188 if (blocks1_length == block_type::size && !input.empty())
191 std::copy(m_result->small_run.begin(), m_result->small_run.end(),
193 m_result->small_run.clear();
197 STXXL_VERBOSE1(
"basic_runs_creator: Small input optimization, input length: " << blocks1_length);
198 m_result->elements = blocks1_length;
203 #endif //STXXL_SMALL_INPUT_PSORT_OPT
206 blocks1_length = fetch(Blocks1, blocks1_length, el_in_run);
209 sort_run(Blocks1, blocks1_length);
211 if (blocks1_length <= block_type::size && m_input.empty())
214 STXXL_VERBOSE1(
"basic_runs_creator: Small input optimization, input length: " << blocks1_length);
215 assert(m_result->small_run.empty());
216 m_result->small_run.assign(Blocks1[0].begin(), Blocks1[0].begin() + blocks1_length);
217 m_result->elements = blocks1_length;
228 run.resize(cur_run_size);
231 disk_queues::get_instance()->set_priority_op(request_queue::WRITE);
234 fill_with_max_value(Blocks1, cur_run_size, blocks1_length);
236 for (i = 0; i < cur_run_size; ++i)
238 run[i].value = Blocks1[i][0];
239 write_reqs[i] = Blocks1[i].
write(run[i].bid);
241 m_result->runs.push_back(run);
242 m_result->runs_sizes.push_back(blocks1_length);
243 m_result->elements += blocks1_length;
248 wait_all(write_reqs, write_reqs + cur_run_size);
254 STXXL_VERBOSE1(
"Filling the second part of the allocated blocks");
255 blocks2_length = fetch(Blocks2, 0, el_in_run);
261 blocks2_length += el_in_run;
262 sort_run(Blocks1, blocks2_length);
263 wait_all(write_reqs, write_reqs + cur_run_size);
266 cur_run_size =
div_ceil(blocks2_length, block_type::size);
267 run.resize(cur_run_size);
271 fill_with_max_value(Blocks1, cur_run_size, blocks2_length);
273 assert(cur_run_size > m2);
275 for (i = 0; i < m2; ++i)
277 run[i].value = Blocks1[i][0];
278 write_reqs[i]->
wait();
279 write_reqs[i] = Blocks1[i].
write(run[i].bid);
284 for ( ; i < cur_run_size; ++i)
286 run[i].value = Blocks1[i][0];
287 write_reqs1[i - m2] = Blocks1[i].
write(run[i].bid);
290 m_result->runs[0] = run;
291 m_result->runs_sizes[0] = blocks2_length;
292 m_result->elements = blocks2_length;
294 wait_all(write_reqs, write_reqs + m2);
296 wait_all(write_reqs1, write_reqs1 + cur_run_size - m2);
297 delete[] write_reqs1;
306 sort_run(Blocks2, blocks2_length);
308 cur_run_size =
div_ceil(blocks2_length, block_type::size);
309 run.resize(cur_run_size);
312 for (i = 0; i < cur_run_size; ++i)
314 run[i].value = Blocks2[i][0];
315 write_reqs[i]->
wait();
316 write_reqs[i] = Blocks2[i].
write(run[i].bid);
318 assert((blocks2_length % el_in_run) == 0);
320 m_result->add_run(run, blocks2_length);
322 while (!m_input.empty())
324 blocks1_length = fetch(Blocks1, 0, el_in_run);
325 sort_run(Blocks1, blocks1_length);
326 cur_run_size =
div_ceil(blocks1_length, block_type::size);
327 run.resize(cur_run_size);
331 fill_with_max_value(Blocks1, cur_run_size, blocks1_length);
333 for (i = 0; i < cur_run_size; ++i)
335 run[i].value = Blocks1[i][0];
336 write_reqs[i]->
wait();
337 write_reqs[i] = Blocks1[i].
write(run[i].bid);
339 m_result->add_run(run, blocks1_length);
341 std::swap(Blocks1, Blocks2);
342 std::swap(blocks1_length, blocks2_length);
345 wait_all(write_reqs, write_reqs + m2);
347 delete[] ((Blocks1 < Blocks2) ? Blocks1 : Blocks2);
381 :
base(input, cmp, memory_to_use)
392 template <
class ValueType>
478 if (first_idx < last_idx) {
480 while (first_idx != last_idx) {
481 *curr = m_cmp.max_value();
502 sort_run(m_blocks1, m_cur_el);
504 if (m_cur_el <= block_type::size && m_result->elements == 0)
507 STXXL_VERBOSE1(
"runs_creator(use_push): Small input optimization, input length: " << m_cur_el);
508 m_result->small_run.assign(m_blocks1[0].begin(), m_blocks1[0].begin() + m_cur_el);
509 m_result->elements = m_cur_el;
514 run.resize(cur_run_size);
518 disk_queues::get_instance()->set_priority_op(request_queue::WRITE);
521 fill_with_max_value(m_blocks1, cur_run_size, m_cur_el);
524 for ( ; i < cur_run_size; ++i)
526 run[i].value = m_blocks1[i][0];
527 if (m_write_reqs[i].
get())
528 m_write_reqs[i]->wait();
530 m_write_reqs[i] = m_blocks1[i].write(run[i].bid);
532 m_result->add_run(run, m_cur_el);
534 for (i = 0; i < m_m2; ++i)
536 if (m_write_reqs[i].
get())
537 m_write_reqs[i]->wait();
547 m_memory_to_use(memory_to_use),
551 m_blocks1(NULL), m_blocks2(NULL),
557 "INSUFFICIENT MEMORY provided, "
558 "please increase parameter 'memory_to_use'");
567 m_result_computed = 1;
579 m_result_computed =
false;
584 if (m_write_reqs[i].
get())
585 m_write_reqs[i]->cancel();
595 m_blocks2 = m_blocks1 + m_m2;
610 delete[] ((m_blocks1 < m_blocks2) ? m_blocks1 : m_blocks2);
611 m_blocks1 = m_blocks2 = NULL;
613 delete[] m_write_reqs;
622 assert(m_result_computed ==
false);
623 if (
LIKELY(m_cur_el < m_el_in_run))
625 m_blocks1[m_cur_el / block_type::size][m_cur_el % block_type::size] = val;
630 assert(m_el_in_run == m_cur_el);
634 sort_run(m_blocks1, m_el_in_run);
637 run.resize(cur_run_blocks);
641 disk_queues::get_instance()->set_priority_op(request_queue::WRITE);
645 run[i].value = m_blocks1[i][0];
646 if (m_write_reqs[i].
get())
647 m_write_reqs[i]->wait();
649 m_write_reqs[i] = m_blocks1[i].write(run[i].bid);
652 m_result->add_run(run, m_el_in_run);
654 std::swap(m_blocks1, m_blocks2);
664 if (!m_result_computed)
667 m_result_computed =
true;
668 #ifdef STXXL_PRINT_STAT_AFTER_RF
670 #endif //STXXL_PRINT_STAT_AFTER_RF
678 return m_result->elements + m_cur_el;
690 return m_memory_to_use;
700 template <
class ValueType>
768 cur_block(writer.get_free_block()),
777 "INSUFFICIENT MEMORY provided, "
778 "please increase parameter 'memory_to_use'");
786 assert(offset < block_type::size);
788 (*cur_block)[offset] = val;
791 if (offset == block_type::size)
797 result_->runs.resize(irun + 1);
798 result_->runs[irun].resize(iblock + 1);
806 result_->runs[irun][iblock].value = (*cur_block)[0];
807 cur_block = writer.write(cur_block, result_->runs[irun][iblock].bid);
819 if (offset == 0 && iblock == 0)
822 result_->runs_sizes.resize(irun + 1);
823 result_->runs_sizes.back() = iblock * block_type::size + offset;
827 while (offset != block_type::size)
829 (*cur_block)[offset] = cmp.max_value();
836 result_->runs.resize(irun + 1);
837 result_->runs[irun].resize(iblock + 1);
845 result_->runs[irun][iblock].value = (*cur_block)[0];
846 cur_block = writer.write(cur_block, result_->runs[irun][iblock].bid);
872 template <
class RunsType,
class CompareType>
876 typedef typename RunsType::element_type::block_type block_type;
881 for (irun = 0; irun < nruns; ++irun)
884 block_type* blocks =
new block_type[nblocks];
888 reqs[j] = blocks[j].read(sruns->runs[irun][j].bid);
895 if (cmp(blocks[j][0], sruns->runs[irun][j].value) ||
896 cmp(sruns->runs[irun][j].value, blocks[j][0]))
898 STXXL_ERRMSG(
"check_sorted_runs wrong trigger in the run");
908 STXXL_ERRMSG(
"check_sorted_runs wrong order in the run");
916 STXXL_MSG(
"Checking runs finished successfully");
931 template <
class RunsType,
942 typedef typename sorted_runs_data_type::size_type
size_type;
943 typedef typename sorted_runs_data_type::run_type
run_type;
944 typedef typename sorted_runs_data_type::block_type
block_type;
952 typedef std::pair<typename block_type::iterator, typename block_type::iterator>
sequence;
957 typedef typename sorted_runs_data_type::value_type
value_type;
993 #if STXXL_PARALLEL_MULTIWAY_MERGE
994 std::vector<sequence>* seqs;
995 std::vector<block_type*>* buffers;
999 #if STXXL_CHECK_ORDER_IN_SORTS
1002 #endif //STXXL_CHECK_ORDER_IN_SORTS
1006 void merge_recursively();
1013 #if STXXL_PARALLEL_MULTIWAY_MERGE
1017 delete m_prefetcher;
1018 delete[] m_prefetch_seq;
1019 m_prefetcher = NULL;
1028 #if STXXL_PARALLEL_MULTIWAY_MERGE
1034 if (num_currently_mergeable < rest)
1036 if (!m_prefetcher || m_prefetcher->empty())
1039 num_currently_mergeable = m_elements_remaining;
1044 *seqs, m_consume_seq[m_prefetcher->pos()].value, m_cmp);
1052 stxxl::parallel::multiway_merge((*seqs).begin(), (*seqs).end(), m_buffer_block->end() - rest, m_cmp, output_size);
1055 rest -= output_size;
1056 num_currently_mergeable -= output_size;
1061 }
while (rest > 0 && (*seqs).size() > 0);
1063 #if STXXL_CHECK_ORDER_IN_SORTS
1064 if (!
stxxl::is_sorted(m_buffer_block->begin(), m_buffer_block->end(), cmp))
1066 for (
value_type* i = m_buffer_block->begin() + 1; i != m_buffer_block->end(); ++i)
1067 if (cmp(*i, *(i - 1)))
1069 STXXL_VERBOSE1(
"Error at position " << (i - m_buffer_block->begin()));
1073 #endif //STXXL_CHECK_ORDER_IN_SORTS
1078 #endif //STXXL_PARALLEL_MULTIWAY_MERGE
1083 m_losers->multi_merge(m_buffer_block->elem, m_buffer_block->elem + STXXL_MIN<size_type>(out_block_type::size, m_elements_remaining));
1088 m_current_ptr = m_buffer_block->elem;
1089 m_current_end = m_buffer_block->elem + STXXL_MIN<size_type>(out_block_type::size, m_elements_remaining);
1091 if (m_elements_remaining <= out_block_type::size)
1092 deallocate_prefetcher();
1101 m_memory_to_use(memory_to_use),
1103 m_prefetch_seq(NULL),
1109 num_currently_mergeable(0)
1112 , m_last_element(m_cmp.min_value())
1121 m_memory_to_use = memory_to_use;
1128 m_elements_remaining = m_sruns->elements;
1133 if (!m_sruns->small_run.empty())
1136 STXXL_VERBOSE1(
"basic_runs_merger: small input optimization, input length: " << m_elements_remaining);
1137 assert(m_elements_remaining ==
size_type(m_sruns->small_run.size()));
1139 m_current_ptr = &m_sruns->small_run[0];
1140 m_current_end = m_current_ptr + m_sruns->small_run.size();
1145 #if STXXL_CHECK_ORDER_IN_SORTS
1147 #endif //STXXL_CHECK_ORDER_IN_SORTS
1151 disk_queues::get_instance()->set_priority_op(request_queue::WRITE);
1153 int_type disks_number = config::get_instance()->disks_number();
1158 : 0) / block_type::raw_size;
1161 if (input_buffers < nruns + min_prefetch_buffers)
1168 STXXL_WARNMSG_RECURSIVE_SORT(
"memory_to_use=" << m_memory_to_use <<
" bytes block_type::raw_size=" << block_type::raw_size <<
" bytes");
1171 unsigned_type recursive_merge_buffers = m_memory_to_use / block_type::raw_size;
1172 if (recursive_merge_buffers < 2 * min_prefetch_buffers + 1 + 2) {
1175 STXXL_ERRMSG(
"There are only m=" << recursive_merge_buffers <<
" blocks available for recursive merging, but "
1176 << min_prefetch_buffers <<
"+" << min_prefetch_buffers <<
"+1 are needed read-ahead/write-back/output, and");
1177 STXXL_ERRMSG(
"the merger requires memory to store at least two input blocks internally. Aborting.");
1178 throw bad_parameter(
"basic_runs_merger::sort(): INSUFFICIENT MEMORY provided, please increase parameter 'memory_to_use'");
1181 merge_recursively();
1183 nruns = m_sruns->runs.size();
1186 assert(nruns + min_prefetch_buffers <= input_buffers);
1190 deallocate_prefetcher();
1195 prefetch_seq_size += m_sruns->runs[i].size();
1198 m_consume_seq.resize(prefetch_seq_size);
1199 m_prefetch_seq =
new int_type[prefetch_seq_size];
1201 typename run_type::iterator copy_start = m_consume_seq.begin();
1204 copy_start = std::copy(m_sruns->runs[i].begin(),
1205 m_sruns->runs[i].end(),
1209 std::stable_sort(m_consume_seq.begin(), m_consume_seq.end(),
1214 #if STXXL_SORT_OPTIMAL_PREFETCHING
1216 const int_type n_opt_prefetch_buffers = min_prefetch_buffers + (3 * (n_prefetch_buffers - min_prefetch_buffers)) / 10;
1221 n_opt_prefetch_buffers,
1222 config::get_instance()->get_max_device_id());
1225 m_prefetch_seq[i] = i;
1226 #endif //STXXL_SORT_OPTIMAL_PREFETCHING
1229 m_consume_seq.begin(),
1230 m_consume_seq.end(),
1232 STXXL_MIN(nruns + n_prefetch_buffers, prefetch_seq_size));
1236 #if STXXL_PARALLEL_MULTIWAY_MERGE
1238 seqs =
new std::vector<sequence>(nruns);
1239 buffers =
new std::vector<block_type*>(nruns);
1244 (*seqs)[i] = std::make_pair((*buffers)[i]->begin(), (*buffers)[i]->end());
1249 #endif //STXXL_PARALLEL_MULTIWAY_MERGE
1258 fill_buffer_block();
1264 deallocate_prefetcher();
1272 return (m_elements_remaining == 0);
1278 return m_elements_remaining;
1285 return *m_current_ptr;
1291 return &(operator * ());
1298 assert(m_current_ptr != m_current_end);
1300 --m_elements_remaining;
1303 if (
LIKELY(m_current_ptr == m_current_end && !empty()))
1305 fill_buffer_block();
1307 #if STXXL_CHECK_ORDER_IN_SORTS
1308 assert(
stxxl::is_sorted(m_buffer_block->elem, m_buffer_block->elem + STXXL_MIN<size_type>(m_elements_remaining, m_buffer_block->size), m_cmp));
1309 #endif //STXXL_CHECK_ORDER_IN_SORTS
1312 #if STXXL_CHECK_ORDER_IN_SORTS
1315 assert(!m_cmp(
operator * (), m_last_element));
1316 m_last_element = operator * ();
1318 #endif //STXXL_CHECK_ORDER_IN_SORTS
1327 deallocate_prefetcher();
1329 delete m_buffer_block;
1333 template <
class RunsType,
class CompareType,
class AllocStr>
1337 unsigned_type ndisks = config::get_instance()->disks_number();
1346 + recursive_merger_memory_prefetch_buffers
1347 + recursive_merger_memory_out_block;
1349 unsigned_type max_arity = (m_memory_to_use > memory_for_buffers ? m_memory_to_use - memory_for_buffers : 0) / block_type::raw_size;
1353 assert(merge_factor > 1);
1354 assert(merge_factor <= max_arity);
1356 while (nruns > max_arity)
1359 STXXL_MSG(
"Starting new merge phase: nruns: " << nruns <<
1360 " opt_merge_factor: " << merge_factor <<
1361 " max_arity: " << max_arity <<
" new_nruns: " << new_nruns);
1367 new_runs.runs.resize(new_nruns);
1368 new_runs.runs_sizes.resize(new_nruns);
1369 new_runs.elements = m_sruns->elements;
1375 size_type elements_left = m_sruns->elements;
1377 while (runs_left > 0)
1380 STXXL_MSG(
"Merging " << runs2merge <<
" runs");
1386 for (
unsigned_type i = nruns - runs_left; i < (nruns - runs_left + runs2merge); ++i)
1388 elements_in_new_run += m_sruns->runs_sizes[i];
1390 new_runs.runs_sizes[cur_out_run] = elements_in_new_run;
1396 new_runs.runs[cur_out_run].resize(blocks_in_new_run);
1403 cur_runs->runs.resize(runs2merge);
1404 cur_runs->runs_sizes.resize(runs2merge);
1406 std::copy(m_sruns->runs.begin() + nruns - runs_left,
1407 m_sruns->runs.begin() + nruns - runs_left + runs2merge,
1408 cur_runs->runs.begin());
1409 std::copy(m_sruns->runs_sizes.begin() + nruns - runs_left,
1410 m_sruns->runs_sizes.begin() + nruns - runs_left + runs2merge,
1411 cur_runs->runs_sizes.begin());
1413 cur_runs->elements = elements_in_new_run;
1414 elements_left -= elements_in_new_run;
1419 merger(m_cmp, m_memory_to_use - memory_for_write_buffers);
1424 new_runs.runs[cur_out_run].begin(),
1428 const size_type cnt_max = cur_runs->elements;
1430 while (cnt != cnt_max)
1433 if ((cnt % block_type::size) == 0)
1436 ++cnt, ++out, ++merger;
1438 assert(merger.
empty());
1440 while (cnt % block_type::size)
1442 *out = m_cmp.max_value();
1451 assert(cur_out_run + 1 == new_runs.runs.size());
1453 elements_left -= m_sruns->runs_sizes.back();
1456 new_runs.runs.back() = m_sruns->runs.back();
1457 new_runs.runs_sizes.back() = m_sruns->runs_sizes.back();
1460 runs_left -= runs2merge;
1464 assert(elements_left == 0);
1468 m_sruns->runs.clear();
1472 std::swap(nruns, new_nruns);
1473 m_sruns->swap(new_runs);
1483 template <
class RunsType,
1484 class CompareType =
typename RunsType::element_type::cmp_type,
1504 :
base(cmp, memory_to_use)
1506 this->initialize(sruns);
1513 :
base(cmp, memory_to_use)
1533 class RunsCreatorType = runs_creator<Input, CompareType, BlockSize, AllocStr>
1553 : creator(in, c, memory_to_use),
1554 merger(creator.result(), c, memory_to_use)
1566 : creator(in, c, m_memory_to_userc),
1567 merger(creator.result(), c, m_memory_to_use)
1575 return merger.empty();
1588 return merger.operator -> ();
1636 class RandomAccessIterator,
1640 void sort(RandomAccessIterator begin,
1641 RandomAccessIterator end,
1648 InputType Input(begin, end);
1650 sorter_type Sort(Input, cmp, MemSize);
1658 #endif // !STXXL_STREAM_SORT_STREAM_HEADER
const unsigned_type m_m2
m_memsize / 2
sort_helper::run_cursor2_cmp< block_type, prefetcher_type, value_cmp > run_cursor2_cmp_type
sorted_runs_data_type::run_type run_type
#define STXXL_THROW_UNREACHABLE()
Throws stxxl::unreachable with "Error in file [file], line [line] : this code should never be reachab...
counting_ptr< sorted_runs_data_type > sorted_runs_type
block_prefetcher< block_type, typename run_type::iterator > prefetcher_type
compat::remove_const< Integral >::type div_ceil(Integral n, Integral2 d)
#define STXXL_DEFAULT_BLOCK_SIZE(type)
AllocStr allocation_strategy_type
Forms sorted runs of data from a stream.
bool is_sorted(ForwardIterator first, ForwardIterator last)
counting_ptr< sorted_runs_data_type > sorted_runs_type
unsigned sort_memory_usage_factor()
sorted_runs_data_type::run_type run_type
const Type & STXXL_MIN(const Type &a, const Type &b)
bool check_sorted_runs(RunType **runs, unsigned_type nruns, unsigned_type m, ValueCmp cmp)
runs_merger< sorted_runs_type, CompareType, AllocStr > runs_merger_type
external_size_type size() const
number of items currently inserted.
bool m_result_computed
true after the result() method was called for the first time
base::value_type value_type
base::sorted_runs_type sorted_runs_type
unsigned_type fetch(block_type *blocks, unsigned_type first_idx, unsigned_type last_idx)
Fetch data from input into blocks[first_idx,last_idx).
void sort(ExtIterator first, ExtIterator last, StrictWeakOrdering cmp, unsigned_type M)
Sort records comparison-based, see stxxl::sort -- Sorting Comparison-Based.
trigger_entry_iterator< Iterator > make_bid_iterator(Iterator iter)
void fill_with_max_value(block_type *blocks, unsigned_type num_blocks, unsigned_type first_idx)
fill the rest of the block with max values
unsigned_type count_elements_less_equal(const SequenceVector &seqs, const ValueType &bound, Comparator cmp)
All sorted runs of a sort operation.
A model of stream that retrieves the data from an input iterator. For convenience use streamify funct...
#define STXXL_PARALLEL_MULTIWAY_MERGE
buffered_writer< block_type > writer
void deallocate()
Deallocates input buffers but not the current result.
loser_tree_type * m_losers
loser tree used for native merging
sorted_runs_type result_type
void deallocate()
Deallocate temporary structures freeing memory prior to next initialize().
const value_type * m_current_end
pointer into current memory buffer: end after range of current values
#define STXXL_WARNMSG_RECURSIVE_SORT
RunsType sorted_runs_type
typed_block< BlockSize, value_type > block_type
void verify_sentinel_strict_weak_ordering(StrictWeakOrdering cmp)
CompareType m_cmp
comparator used to sort block groups
sorted_runs< trigger_entry_type, cmp_type > sorted_runs_data_type
bool m_result_computed
true iff result is already computed (used in 'result()' method)
bool empty() const
Standard stream method.
#define STXXL_DEFAULT_ALLOC_STRATEGY
sorted_runs_type m_result
stores the result (sorted runs) as smart pointer
basic_runs_merger< RunsType, CompareType, AllocStr > base
runs_merger(sorted_runs_type &sruns, value_cmp cmp, unsigned_type memory_to_use)
Creates a runs merger object.
void refill_or_remove_empty_sequences(SequenceVector &seqs, BufferPtrVector &buffers, Prefetcher &prefetcher)
run_type m_consume_seq
sequence of block needed for merging
unsigned_type m_memory_to_use
memory size in bytes to use
request_ptr * m_write_reqs
reference to write requests transporting the last accumulation buffer to disk
sorted_runs_type & result()
Returns the sorted runs object.
run_cursor2< block_type, prefetcher_type > run_cursor_type
Computes sorted runs type from value type and block size.
#define STXXL_VERBOSE2(x)
block_type * m_blocks1
accumulation buffer of size m_m2 blocks, half the available memory size
int_type * m_prefetch_seq
precalculated order of blocks in which they are prefetched
const unsigned_type m_memsize
memory size in numberr of blocks for internal use
void check_sort_settings()
std::vector< sequence >::size_type seqs_size_type
block_type out_block_type
void push(const value_type &val)
Adds new element to the sorter.
uint_pair & operator++()
prefix increment operator (directly manipulates the integer parts)
unsigned_type m_memsize
memory for internal use in blocks
run_type run
run object containing block ids of the run being written to disk
void deallocate_prefetcher()
basic_runs_merger(value_cmp c, unsigned_type memory_to_use)
Creates a runs merger object.
sorted_runs_data_type::run_type run_type
base::block_type block_type
sorted_runs_type::element_type sorted_runs_data_type
AllocStr alloc_strategy_type
typed_block< BlockSize, value_type > block_type
void delete_blocks(const BIDIteratorClass &bidbegin, const BIDIteratorClass &bidend)
Deallocates blocks.
element_iterator_traits< block_type, external_size_type >::element_iterator element_iterator
sort_helper::trigger_entry< block_type > trigger_entry_type
runs_creator_type creator
Input strategy for runs_creator class.
Block containing elements of fixed length.
virtual void wait(bool measure_time=true)=0
Suspends calling thread until completion of the request.
Encapsulates asynchronous prefetching engine.
sorted_runs_type result_
stores the result (sorted runs)
sorted_runs_type result_type
void clear()
Clear the internal state of the object: release all runs and reset.
virtual ~basic_runs_merger()
Destructor.
const unsigned_type m_el_in_run
total number of elements in a run
runs_merger(value_cmp cmp, unsigned_type memory_to_use)
Creates a runs merger object without initializing a round of sorted_runs.
sort(Input &in, CompareType c, unsigned_type memory_to_use)
Creates the object.
Input::value_type value_type
#define _STXXL_SORT_TRIGGER_FORCE_SEQUENTIAL
choose_int_types< my_pointer_size >::int_type int_type
loser_tree< run_cursor_type, run_cursor2_cmp_type > loser_tree_type
sorted_runs_data_type::size_type size_type
RunsCreatorType runs_creator_type
basic_runs_creator(Input &input, CompareType cmp, unsigned_type memory_to_use)
Create the object.
out_block_type * m_buffer_block
memory buffer for merging from external streams
internal_size_type m_cur_el
current number of elements in the run m_blocks1
void initialize(const sorted_runs_type &sruns)
Initialize the runs merger object with a new round of sorted_runs.
base::block_type block_type
unsigned_type internal_size_type
Forms sorted runs of data from a stream.
size_type size() const
Standard size method.
runs_creator(Input &input, CompareType cmp, unsigned_type memory_to_use)
Creates the object.
sorted_runs_data_type::value_type value_type
Standard stream typedef.
const Type & STXXL_MAX(const Type &a, const Type &b)
runs_creator(CompareType c, unsigned_type memory_to_use)
Creates the object.
sort(Input &in, CompareType c, unsigned_type m_memory_to_userc, unsigned_type m_memory_to_use)
Creates the object.
void sort_run(block_type *run, unsigned_type elements)
Sort a specific run, contained in a sequences of blocks.
#define STXXL_CHECK_ORDER_IN_SORTS
#define STXXL_BEGIN_NAMESPACE
void STXXL_UNUSED(const U &)
sorted_runs_type & result()
Returns the sorted runs object.
void set_memory_to_use(unsigned_type memory_to_use)
Set memory amount to use for the merger in bytes.
#define STXXL_VERBOSE1(x)
base::sorted_runs_data_type sorted_runs_data_type
size_type m_elements_remaining
items remaining in input
sorted_runs< trigger_entry_type, std::less< value_type > > result
Produces sorted stream from input stream.
sort_helper::trigger_entry< block_type > trigger_entry_type
unsigned_type memory_used() const
return memory size used (in bytes).
iterator begin()
Returns iterator pointing to the first element.
sorted_runs_type m_result
stores the result (sorted runs) in a reference counted object
sorted_runs_data_type::run_type run_type
const cmp_type & cmp() const
return comparator object.
OutputIterator materialize(StreamAlgorithm &in, OutputIterator out)
Stores consecutively stream content to an output iterator.
block_type * m_blocks2
accumulation buffer that is currently being written to disk
sorted_runs_data_type::block_type block_type
bool empty() const
Standard stream method.
void new_blocks(const DiskAssignFunctor &functor, BIDIteratorClass bidbegin, BIDIteratorClass bidend, unsigned_type offset=0)
Allocates new blocks.
sorted_runs< trigger_entry_type, cmp_type > sorted_runs_data_type
typed_block< BlockSize, value_type > block_type
element_iterator_traits< BlockType, SizeType >::element_iterator make_element_iterator(BlockType *blocks, SizeType offset)
base::value_cmp value_cmp
runs_creator_type::sorted_runs_type sorted_runs_type
void push(const value_type &val)
Adds new element to the current run.
void clear()
Clear current state and remove all items.
void wait_all(RequestIterator reqs_begin, RequestIterator reqs_end)
Collection of functions to track statuses of a number of requests.
uint64 external_size_type
choose_int_types< my_pointer_size >::unsigned_type unsigned_type
block_type * pull_block()
Pulls next unconsumed block from the consumption sequence.
const value_type * m_current_ptr
pointer into current memory buffer: this is either m_buffer_block or the small_runs vector ...
void compute_prefetch_schedule(const int_type *first, const int_type *last, int_type *out_first, int_type m, int_type D)
RunsType sorted_runs_type
unsigned_type optimal_merge_factor(unsigned_type num_runs, unsigned_type max_concurrent_runs)
alloc_strategy_type alloc_strategy
needs to be reset after each run
sorted_runs< trigger_entry_type, cmp_type > sorted_runs_data_type
sort_helper::trigger_entry< bid_type, value_type > trigger_entry_type
std::vector< trigger_entry_type > run_type
CompareType m_cmp
comparator object to sort runs
void allocate()
Allocates input buffers and clears result.
run_type::value_type trigger_entry_type
Input strategy for runs_creator class.
runs_creator(CompareType cmp, unsigned_type memory_to_use)
Creates the object.
std::pair< typename block_type::iterator, typename block_type::iterator > sequence
Input & m_input
reference to the input stream
counting_ptr< sorted_runs_data_type > sorted_runs_type
sort_helper::trigger_entry< block_type > trigger_entry_type
void finish()
Finishes current run and begins new one.
basic_runs_creator< Input, CompareType, BlockSize, AllocStr > base
prefetcher_type * m_prefetcher
prefetcher object
value_cmp m_cmp
comparator object to sort runs
request_ptr write(const bid_type &bid, completion_handler on_cmpl=completion_handler())
Writes block to the disk(s).
void sort_run(block_type *run, unsigned_type elements)
Sort a specific run, contained in a sequences of blocks.
BID< BlockSize > bid_type
element_iterator_traits< block_type, external_size_type >::element_iterator element_iterator
void fill_with_max_value(block_type *blocks, unsigned_type num_blocks, unsigned_type first_idx)
fill the rest of the block with max values
Input::value_type value_type
Standard stream typedef.
sorted_runs_type & result()
Returns the sorted runs object.
#define STXXL_END_NAMESPACE
helper::element_iterator_generator< BlockType, SizeType, BlockType::has_only_data >::iterator element_iterator
sorted_runs_type m_sruns
smart pointer to sorted_runs object
const unsigned_type m_memory_to_use
memory size in bytes to use
unsigned_type m_
memory for internal use in blocks