STXXL  1.4.0
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
sort_stream.h
Go to the documentation of this file.
1 /***************************************************************************
2  * include/stxxl/bits/stream/sort_stream.h
3  *
4  * Part of the STXXL. See http://stxxl.sourceforge.net
5  *
6  * Copyright (C) 2002-2005 Roman Dementiev <[email protected]>
7  * Copyright (C) 2006-2008 Johannes Singler <[email protected]>
8  * Copyright (C) 2008-2010 Andreas Beckmann <[email protected]>
9  * Copyright (C) 2013 Timo Bingmann <[email protected]>
10  *
11  * Distributed under the Boost Software License, Version 1.0.
12  * (See accompanying file LICENSE_1_0.txt or copy at
13  * http://www.boost.org/LICENSE_1_0.txt)
14  **************************************************************************/
15 
16 #ifndef STXXL_STREAM_SORT_STREAM_HEADER
17 #define STXXL_STREAM_SORT_STREAM_HEADER
18 
19 #include <stxxl/bits/config.h>
28 
30 
31 namespace stream {
32 
33 //! \addtogroup streampack Stream Package
34 //! \{
35 
36 ////////////////////////////////////////////////////////////////////////
37 // CREATE RUNS //
38 ////////////////////////////////////////////////////////////////////////
39 
40 //! Forms sorted runs of data from a stream.
41 //!
42 //! \tparam Input_ type of the input stream
43 //! \tparam CompareType_ type of comparison object used for sorting the runs
44 //! \tparam BlockSize_ size of blocks used to store the runs (in bytes)
45 //! \tparam AllocStr_ functor that defines allocation strategy for the runs
46 template <
47  class Input_,
48  class CompareType_,
49  unsigned BlockSize_ = STXXL_DEFAULT_BLOCK_SIZE(typename Input_::value_type),
50  class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY>
52 {
53 public:
54  typedef Input_ input_type;
55  typedef CompareType_ cmp_type;
56  static const unsigned block_size = BlockSize_;
57  typedef AllocStr_ allocation_strategy_type;
58 
59 public:
60  typedef typename Input_::value_type value_type;
66 
67 protected:
68  Input_& m_input; //! reference to the input stream
69  CompareType_ m_cmp; //! comparator used to sort block groups
70 
71 private:
72  sorted_runs_type m_result; //! stores the result (sorted runs) as smart pointer
73  unsigned_type m_memsize; //! memory for internal use in blocks
74  bool m_result_computed; //! true iff result is already computed (used in 'result()' method)
75 
76  //! Fetch data from input into blocks[first_idx,last_idx).
78  {
80  make_element_iterator(blocks, first_idx);
81  unsigned_type curr_idx = first_idx;
82  while (!m_input.empty() && curr_idx != last_idx) {
83  *output = *m_input;
84  ++m_input;
85  ++output;
86  ++curr_idx;
87  }
88  return curr_idx;
89  }
90 
91  //! fill the rest of the block with max values
92  void fill_with_max_value(block_type* blocks, unsigned_type num_blocks, unsigned_type first_idx)
93  {
94  unsigned_type last_idx = num_blocks * block_type::size;
95  if (first_idx < last_idx) {
97  make_element_iterator(blocks, first_idx);
98  while (first_idx != last_idx) {
99  *curr = m_cmp.max_value();
100  ++curr;
101  ++first_idx;
102  }
103  }
104  }
105 
106  //! Sort a specific run, contained in a sequences of blocks.
107  void sort_run(block_type* run, unsigned_type elements)
108  {
111  make_element_iterator(run, elements),
112  m_cmp);
113  }
114 
115  void compute_result();
116 
117 public:
118  //! Create the object.
119  //! \param input input stream
120  //! \param cmp comparator object
121  //! \param memory_to_use memory amount that is allowed to used by the sorter in bytes
122  basic_runs_creator(Input_& input, CompareType_ cmp, unsigned_type memory_to_use)
123  : m_input(input),
124  m_cmp(cmp),
125  m_result(new sorted_runs_data_type),
126  m_memsize(memory_to_use / BlockSize_ / sort_memory_usage_factor()),
127  m_result_computed(false)
128  {
130  if (!(2 * BlockSize_ * sort_memory_usage_factor() <= memory_to_use)) {
131  throw bad_parameter("stxxl::runs_creator<>:runs_creator(): INSUFFICIENT MEMORY provided, please increase parameter 'memory_to_use'");
132  }
133  assert(m_memsize > 0);
134  }
135 
136  //! Returns the sorted runs object.
137  //! \return Sorted runs object. The result is computed lazily, i.e. on the first call
138  //! \remark Returned object is intended to be used by \c runs_merger object as input
140  {
141  if (!m_result_computed)
142  {
143  compute_result();
144  m_result_computed = true;
145 #ifdef STXXL_PRINT_STAT_AFTER_RF
146  STXXL_MSG(*stats::get_instance());
147 #endif //STXXL_PRINT_STAT_AFTER_RF
148  }
149  return m_result;
150  }
151 };
152 
153 //! Finish the results, i. e. create all runs.
154 //!
155 //! This is the main routine of this class.
156 template <class Input_, class CompareType_, unsigned BlockSize_, class AllocStr_>
158 {
159  unsigned_type i = 0;
160  unsigned_type m2 = m_memsize / 2;
161  const unsigned_type el_in_run = m2 * block_type::size; // # el in a run
162  STXXL_VERBOSE1("basic_runs_creator::compute_result m2=" << m2);
163  unsigned_type blocks1_length = 0, blocks2_length = 0;
164  block_type* Blocks1 = NULL;
165 
166 #ifndef STXXL_SMALL_INPUT_PSORT_OPT
167  Blocks1 = new block_type[m2 * 2];
168 #else
169  // push input element into small_run vector in result until it is full
170  while (!input.empty() && blocks1_length != block_type::size)
171  {
172  m_result->small_run.push_back(*input);
173  ++input;
174  ++blocks1_length;
175  }
176 
177  if (blocks1_length == block_type::size && !input.empty())
178  {
179  Blocks1 = new block_type[m2 * 2];
180  std::copy(m_result->small_run.begin(), m_result->small_run.end(), Blocks1[0].begin());
181  m_result->small_run.clear();
182  }
183  else
184  {
185  STXXL_VERBOSE1("basic_runs_creator: Small input optimization, input length: " << blocks1_length);
186  m_result->elements = blocks1_length;
188  potentially_parallel::sort(m_result->small_run.begin(), m_result->small_run.end(), cmp);
189  return;
190  }
191 #endif //STXXL_SMALL_INPUT_PSORT_OPT
192 
193  // the first block may be there already, now fetch until memsize is filled.
194  blocks1_length = fetch(Blocks1, blocks1_length, el_in_run);
195 
196  // sort first run
197  sort_run(Blocks1, blocks1_length);
198 
199  if (blocks1_length <= block_type::size && m_input.empty())
200  {
201  // small input, do not flush it on the disk(s)
202  STXXL_VERBOSE1("basic_runs_creator: Small input optimization, input length: " << blocks1_length);
203  assert(m_result->small_run.empty());
204  m_result->small_run.assign(Blocks1[0].begin(), Blocks1[0].begin() + blocks1_length);
205  m_result->elements = blocks1_length;
206  delete[] Blocks1;
207  return;
208  }
209 
210  block_type* Blocks2 = Blocks1 + m2;
211  block_manager* bm = block_manager::get_instance();
212  request_ptr* write_reqs = new request_ptr[m2];
213  run_type run;
214 
215  unsigned_type cur_run_size = div_ceil(blocks1_length, block_type::size); // in blocks
216  run.resize(cur_run_size);
217  bm->new_blocks(AllocStr_(), make_bid_iterator(run.begin()), make_bid_iterator(run.end()));
218 
219  disk_queues::get_instance()->set_priority_op(request_queue::WRITE);
220 
221  // fill the rest of the last block with max values
222  fill_with_max_value(Blocks1, cur_run_size, blocks1_length);
223 
224  for (i = 0; i < cur_run_size; ++i)
225  {
226  run[i].value = Blocks1[i][0];
227  write_reqs[i] = Blocks1[i].write(run[i].bid);
228  }
229  m_result->runs.push_back(run);
230  m_result->runs_sizes.push_back(blocks1_length);
231  m_result->elements += blocks1_length;
232 
233  if (m_input.empty())
234  {
235  // return
236  wait_all(write_reqs, write_reqs + cur_run_size);
237  delete[] write_reqs;
238  delete[] Blocks1;
239  return;
240  }
241 
242  STXXL_VERBOSE1("Filling the second part of the allocated blocks");
243  blocks2_length = fetch(Blocks2, 0, el_in_run);
244 
245  if (m_input.empty())
246  {
247  // optimization if the whole set fits into both halves
248  // (re)sort internally and return
249  blocks2_length += el_in_run;
250  sort_run(Blocks1, blocks2_length); // sort first an second run together
251  wait_all(write_reqs, write_reqs + cur_run_size);
252  bm->delete_blocks(make_bid_iterator(run.begin()), make_bid_iterator(run.end()));
253 
254  cur_run_size = div_ceil(blocks2_length, block_type::size);
255  run.resize(cur_run_size);
256  bm->new_blocks(AllocStr_(), make_bid_iterator(run.begin()), make_bid_iterator(run.end()));
257 
258  // fill the rest of the last block with max values
259  fill_with_max_value(Blocks1, cur_run_size, blocks2_length);
260 
261  assert(cur_run_size > m2);
262 
263  for (i = 0; i < m2; ++i)
264  {
265  run[i].value = Blocks1[i][0];
266  write_reqs[i]->wait();
267  write_reqs[i] = Blocks1[i].write(run[i].bid);
268  }
269 
270  request_ptr* write_reqs1 = new request_ptr[cur_run_size - m2];
271 
272  for ( ; i < cur_run_size; ++i)
273  {
274  run[i].value = Blocks1[i][0];
275  write_reqs1[i - m2] = Blocks1[i].write(run[i].bid);
276  }
277 
278  m_result->runs[0] = run;
279  m_result->runs_sizes[0] = blocks2_length;
280  m_result->elements = blocks2_length;
281 
282  wait_all(write_reqs, write_reqs + m2);
283  delete[] write_reqs;
284  wait_all(write_reqs1, write_reqs1 + cur_run_size - m2);
285  delete[] write_reqs1;
286 
287  delete[] Blocks1;
288 
289  return;
290  }
291 
292  // more than 2 runs can be filled, i. e. the general case
293 
294  sort_run(Blocks2, blocks2_length);
295 
296  cur_run_size = div_ceil(blocks2_length, block_type::size); // in blocks
297  run.resize(cur_run_size);
298  bm->new_blocks(AllocStr_(), make_bid_iterator(run.begin()), make_bid_iterator(run.end()));
299 
300  for (i = 0; i < cur_run_size; ++i)
301  {
302  run[i].value = Blocks2[i][0];
303  write_reqs[i]->wait();
304  write_reqs[i] = Blocks2[i].write(run[i].bid);
305  }
306  assert((blocks2_length % el_in_run) == 0);
307 
308  m_result->add_run(run, blocks2_length);
309 
310  while (!m_input.empty())
311  {
312  blocks1_length = fetch(Blocks1, 0, el_in_run);
313  sort_run(Blocks1, blocks1_length);
314  cur_run_size = div_ceil(blocks1_length, block_type::size); // in blocks
315  run.resize(cur_run_size);
316  bm->new_blocks(AllocStr_(), make_bid_iterator(run.begin()), make_bid_iterator(run.end()));
317 
318  // fill the rest of the last block with max values (occurs only on the last run)
319  fill_with_max_value(Blocks1, cur_run_size, blocks1_length);
320 
321  for (i = 0; i < cur_run_size; ++i)
322  {
323  run[i].value = Blocks1[i][0];
324  write_reqs[i]->wait();
325  write_reqs[i] = Blocks1[i].write(run[i].bid);
326  }
327  m_result->add_run(run, blocks1_length);
328 
329  std::swap(Blocks1, Blocks2);
330  std::swap(blocks1_length, blocks2_length);
331  }
332 
333  wait_all(write_reqs, write_reqs + m2);
334  delete[] write_reqs;
335  delete[] ((Blocks1 < Blocks2) ? Blocks1 : Blocks2);
336 }
337 
338 //! Forms sorted runs of data from a stream.
339 //!
340 //! \tparam Input_ type of the input stream
341 //! \tparam CompareType_ type of omparison object used for sorting the runs
342 //! \tparam BlockSize_ size of blocks used to store the runs
343 //! \tparam AllocStr_ functor that defines allocation strategy for the runs
344 template <
345  class Input_,
346  class CompareType_,
347  unsigned BlockSize_ = STXXL_DEFAULT_BLOCK_SIZE(typename Input_::value_type),
348  class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY>
349 class runs_creator : public basic_runs_creator<Input_, CompareType_, BlockSize_, AllocStr_>
350 {
351 private:
353 
354 public:
355  typedef typename base::cmp_type cmp_type;
356  typedef typename base::value_type value_type;
357  typedef typename base::block_type block_type;
360 
361 public:
362  //! Creates the object.
363  //! \param input input stream
364  //! \param cmp comparator object
365  //! \param memory_to_use memory amount that is allowed to used by the sorter in bytes
366  runs_creator(Input_& input, CompareType_ cmp, unsigned_type memory_to_use)
367  : base(input, cmp, memory_to_use)
368  { }
369 };
370 
371 
372 //! Input strategy for \c runs_creator class.
373 //!
374 //! This strategy together with \c runs_creator class
375 //! allows to create sorted runs
376 //! data structure usable for \c runs_merger
377 //! pushing elements into the sorter
378 //! (using runs_creator::push())
379 template <class ValueType_>
380 struct use_push
381 {
382  typedef ValueType_ value_type;
383 };
384 
385 //! Forms sorted runs of elements passed in push() method.
386 //!
387 //! A specialization of \c runs_creator that
388 //! allows to create sorted runs
389 //! data structure usable for \c runs_merger from
390 //! elements passed in sorted push() method. <BR>
391 //! \tparam ValueType_ type of values (parameter for \c use_push strategy)
392 //! \tparam CompareType_ type of comparison object used for sorting the runs
393 //! \tparam BlockSize_ size of blocks used to store the runs
394 //! \tparam AllocStr_ functor that defines allocation strategy for the runs
395 template <
396  class ValueType_,
397  class CompareType_,
398  unsigned BlockSize_,
399  class AllocStr_>
401  use_push<ValueType_>,
402  CompareType_,
403  BlockSize_,
404  AllocStr_>
405  : private noncopyable
406 {
407 public:
408  typedef CompareType_ cmp_type;
409  typedef ValueType_ value_type;
415 
416 private:
417  //! comparator object to sort runs
418  CompareType_ m_cmp;
419 
421 
422  //! stores the result (sorted runs) in a reference counted object
424 
425  //! memory size in bytes to use
427 
428  //! memory size in numberr of blocks for internal use
430 
431  //! m_memsize / 2
433 
434  //! true after the result() method was called for the first time
436 
437  //! total number of elements in a run
439 
440  //! current number of elements in the run m_blocks1
442 
443  //! accumulation buffer of size m_m2 blocks, half the available memory size
445 
446  //! accumulation buffer that is currently being written to disk
448 
449  //! reference to write requests transporting the last accumulation buffer to disk
451 
452  //! run object containing block ids of the run being written to disk
454 
455 protected:
456  //! fill the rest of the block with max values
457  void fill_with_max_value(block_type* blocks, unsigned_type num_blocks, unsigned_type first_idx)
458  {
459  unsigned_type last_idx = num_blocks * block_type::size;
460  if (first_idx < last_idx) {
462  make_element_iterator(blocks, first_idx);
463  while (first_idx != last_idx) {
464  *curr = m_cmp.max_value();
465  ++curr;
466  ++first_idx;
467  }
468  }
469  }
470 
471  //! Sort a specific run, contained in a sequences of blocks.
472  void sort_run(block_type* run, unsigned_type elements)
473  {
476  make_element_iterator(run, elements),
477  m_cmp);
478  }
479 
481  {
482  if (m_cur_el == 0)
483  return;
484 
485  sort_run(m_blocks1, m_cur_el);
486 
487  if (m_cur_el <= block_type::size && m_result->elements == 0)
488  {
489  // small input, do not flush it on the disk(s)
490  STXXL_VERBOSE1("runs_creator(use_push): Small input optimization, input length: " << m_cur_el);
491  m_result->small_run.assign(m_blocks1[0].begin(), m_blocks1[0].begin() + m_cur_el);
492  m_result->elements = m_cur_el;
493  return;
494  }
495 
496  const unsigned_type cur_run_size = div_ceil(m_cur_el, block_type::size); // in blocks
497  run.resize(cur_run_size);
498  block_manager* bm = block_manager::get_instance();
499  bm->new_blocks(AllocStr_(), make_bid_iterator(run.begin()), make_bid_iterator(run.end()));
500 
501  disk_queues::get_instance()->set_priority_op(request_queue::WRITE);
502 
503  // fill the rest of the last block with max values
504  fill_with_max_value(m_blocks1, cur_run_size, m_cur_el);
505 
506  unsigned_type i = 0;
507  for ( ; i < cur_run_size; ++i)
508  {
509  run[i].value = m_blocks1[i][0];
510  if (m_write_reqs[i].get())
511  m_write_reqs[i]->wait();
512 
513  m_write_reqs[i] = m_blocks1[i].write(run[i].bid);
514  }
515  m_result->add_run(run, m_cur_el);
516 
517  for (i = 0; i < m_m2; ++i)
518  {
519  if (m_write_reqs[i].get())
520  m_write_reqs[i]->wait();
521  }
522  }
523 
524 public:
525  //! Creates the object.
526  //! \param cmp comparator object
527  //! \param memory_to_use memory amount that is allowed to used by the sorter in bytes
528  runs_creator(CompareType_ cmp, unsigned_type memory_to_use) :
529  m_cmp(cmp),
530  m_memory_to_use(memory_to_use),
531  m_memsize(memory_to_use / BlockSize_ / sort_memory_usage_factor()),
532  m_m2(m_memsize / 2),
533  m_el_in_run(m_m2 * block_type::size),
534  m_blocks1(NULL), m_blocks2(NULL),
535  m_write_reqs(NULL)
536  {
538  if (!(2 * BlockSize_ * sort_memory_usage_factor() <= m_memory_to_use)) {
539  throw bad_parameter("stxxl::runs_creator<>:runs_creator(): INSUFFICIENT MEMORY provided, please increase parameter 'memory_to_use'");
540  }
541  assert(m_m2 > 0);
542 
543  allocate();
544  }
545 
547  {
548  m_result_computed = 1;
549  deallocate();
550  }
551 
552  //! Clear current state and remove all items.
553  void clear()
554  {
555  if (!m_result)
556  m_result = new sorted_runs_data_type;
557  else
558  m_result->clear();
559 
560  m_result_computed = false;
561  m_cur_el = 0;
562 
563  for (unsigned_type i = 0; i < m_m2; ++i)
564  {
565  if (m_write_reqs[i].get())
566  m_write_reqs[i]->cancel();
567  }
568  }
569 
570  //! Allocates input buffers and clears result.
571  void allocate()
572  {
573  if (!m_blocks1)
574  {
575  m_blocks1 = new block_type[m_m2 * 2];
576  m_blocks2 = m_blocks1 + m_m2;
577 
578  m_write_reqs = new request_ptr[m_m2];
579  }
580 
581  clear();
582  }
583 
584  //! Deallocates input buffers but not the current result.
585  void deallocate()
586  {
587  result(); // finishes result
588 
589  if (m_blocks1)
590  {
591  delete[] ((m_blocks1 < m_blocks2) ? m_blocks1 : m_blocks2);
592  m_blocks1 = m_blocks2 = NULL;
593 
594  delete[] m_write_reqs;
595  m_write_reqs = NULL;
596  }
597  }
598 
599  //! Adds new element to the sorter.
600  //! \param val value to be added
601  void push(const value_type& val)
602  {
603  assert(m_result_computed == false);
604  if (LIKELY(m_cur_el < m_el_in_run))
605  {
606  m_blocks1[m_cur_el / block_type::size][m_cur_el % block_type::size] = val;
607  ++m_cur_el;
608  return;
609  }
610 
611  assert(m_el_in_run == m_cur_el);
612  m_cur_el = 0;
613 
614  // sort and store m_blocks1
615  sort_run(m_blocks1, m_el_in_run);
616 
617  const unsigned_type cur_run_blocks = div_ceil(m_el_in_run, block_type::size); // in blocks
618  run.resize(cur_run_blocks);
619  block_manager* bm = block_manager::get_instance();
620  bm->new_blocks(AllocStr_(), make_bid_iterator(run.begin()), make_bid_iterator(run.end()));
621 
622  disk_queues::get_instance()->set_priority_op(request_queue::WRITE);
623 
624  for (unsigned_type i = 0; i < cur_run_blocks; ++i)
625  {
626  run[i].value = m_blocks1[i][0];
627  if (m_write_reqs[i].get())
628  m_write_reqs[i]->wait();
629 
630  m_write_reqs[i] = m_blocks1[i].write(run[i].bid);
631  }
632 
633  m_result->add_run(run, m_el_in_run);
634 
635  std::swap(m_blocks1, m_blocks2);
636 
637  push(val);
638  }
639 
640  //! Returns the sorted runs object.
641  //! \return Sorted runs object.
642  //! \remark Returned object is intended to be used by \c runs_merger object as input
644  {
645  if (!m_result_computed)
646  {
647  compute_result();
648  m_result_computed = true;
649 #ifdef STXXL_PRINT_STAT_AFTER_RF
650  STXXL_MSG(*stats::get_instance());
651 #endif //STXXL_PRINT_STAT_AFTER_RF
652  }
653  return m_result;
654  }
655 
656  //! number of items currently inserted.
658  {
659  return m_result->elements + m_cur_el;
660  }
661 
662  //! return comparator object.
663  const cmp_type & cmp() const
664  {
665  return m_cmp;
666  }
667 
668  //! return memory size used (in bytes).
670  {
671  return m_memory_to_use;
672  }
673 };
674 
675 
676 //! Input strategy for \c runs_creator class.
677 //!
678 //! This strategy together with \c runs_creator class
679 //! allows to create sorted runs
680 //! data structure usable for \c runs_merger from
681 //! sequences of elements in sorted order
682 template <class ValueType_>
684 {
685  typedef ValueType_ value_type;
686 };
687 
688 //! Forms sorted runs of data taking elements in sorted order (element by element).
689 //!
690 //! A specialization of \c runs_creator that
691 //! allows to create sorted runs
692 //! data structure usable for \c runs_merger from
693 //! sequences of elements in sorted order. <BR>
694 //! \tparam ValueType_ type of values (parameter for \c from_sorted_sequences strategy)
695 //! \tparam CompareType_ type of comparison object used for sorting the runs
696 //! \tparam BlockSize_ size of blocks used to store the runs
697 //! \tparam AllocStr_ functor that defines allocation strategy for the runs
698 template <
699  class ValueType_,
700  class CompareType_,
701  unsigned BlockSize_,
702  class AllocStr_>
704  from_sorted_sequences<ValueType_>,
705  CompareType_,
706  BlockSize_,
707  AllocStr_>
708  : private noncopyable
709 {
710 public:
711  typedef ValueType_ value_type;
714  typedef AllocStr_ alloc_strategy_type;
715 
716 public:
717  typedef CompareType_ cmp_type;
721 
722 private:
724 
725  CompareType_ cmp;
726 
727  sorted_runs_type result_; // stores the result (sorted runs)
728  unsigned_type m_; // memory for internal use in blocks
734  alloc_strategy_type alloc_strategy; // needs to be reset after each run
735 
736 public:
737  //! Creates the object.
738  //! \param c comparator object
739  //! \param memory_to_use memory amount that is allowed to used by the sorter in bytes.
740  //! Recommended value: 2 * block_size * D
741  runs_creator(CompareType_ c, unsigned_type memory_to_use) :
742  cmp(c),
743  result_(new sorted_runs_data_type),
744  m_(memory_to_use / BlockSize_ / sort_memory_usage_factor()),
745  writer(m_, m_ / 2),
746  cur_block(writer.get_free_block()),
747  offset(0),
748  iblock(0),
749  irun(0)
750  {
752  assert(m_ > 0);
753  if (!(2 * BlockSize_ * sort_memory_usage_factor() <= memory_to_use)) {
754  throw bad_parameter("stxxl::runs_creator<>:runs_creator(): INSUFFICIENT MEMORY provided, please increase parameter 'memory_to_use'");
755  }
756  }
757 
758  //! Adds new element to the current run.
759  //! \param val value to be added to the current run
760  void push(const value_type& val)
761  {
762  assert(offset < block_type::size);
763 
764  (*cur_block)[offset] = val;
765  ++offset;
766 
767  if (offset == block_type::size)
768  {
769  // write current block
770 
771  block_manager* bm = block_manager::get_instance();
772  // allocate space for the block
773  result_->runs.resize(irun + 1);
774  result_->runs[irun].resize(iblock + 1);
775  bm->new_blocks(
776  alloc_strategy,
777  make_bid_iterator(result_->runs[irun].begin() + iblock),
778  make_bid_iterator(result_->runs[irun].end()),
779  iblock
780  );
781 
782  result_->runs[irun][iblock].value = (*cur_block)[0]; // init trigger
783  cur_block = writer.write(cur_block, result_->runs[irun][iblock].bid);
784  ++iblock;
785 
786  offset = 0;
787  }
788 
789  ++result_->elements;
790  }
791 
792  //! Finishes current run and begins new one.
793  void finish()
794  {
795  if (offset == 0 && iblock == 0) // current run is empty
796  return;
797 
798 
799  result_->runs_sizes.resize(irun + 1);
800  result_->runs_sizes.back() = iblock * block_type::size + offset;
801 
802  if (offset) // if current block is partially filled
803  {
804  while (offset != block_type::size)
805  {
806  (*cur_block)[offset] = cmp.max_value();
807  ++offset;
808  }
809  offset = 0;
810 
811  block_manager* bm = block_manager::get_instance();
812  // allocate space for the block
813  result_->runs.resize(irun + 1);
814  result_->runs[irun].resize(iblock + 1);
815  bm->new_blocks(
816  alloc_strategy,
817  make_bid_iterator(result_->runs[irun].begin() + iblock),
818  make_bid_iterator(result_->runs[irun].end()),
819  iblock
820  );
821 
822  result_->runs[irun][iblock].value = (*cur_block)[0]; // init trigger
823  cur_block = writer.write(cur_block, result_->runs[irun][iblock].bid);
824  }
825  else
826  { }
827 
828  alloc_strategy = alloc_strategy_type(); // reinitialize block allocator for the next run
829  iblock = 0;
830  ++irun;
831  }
832 
833  //! Returns the sorted runs object.
834  //! \return Sorted runs object
835  //! \remark Returned object is intended to be used by \c runs_merger object as input
837  {
838  finish();
839  writer.flush();
840 
841  return result_;
842  }
843 };
844 
845 
846 //! Checker for the sorted runs object created by the \c runs_creator .
847 //! \param sruns sorted runs object
848 //! \param cmp comparison object used for checking the order of elements in runs
849 //! \return \c true if runs are sorted, \c false otherwise
850 template <class RunsType_, class CompareType_>
851 bool check_sorted_runs(const RunsType_& sruns, CompareType_ cmp)
852 {
854  typedef typename RunsType_::element_type::block_type block_type;
855  STXXL_VERBOSE2("Elements: " << sruns->elements);
856  unsigned_type nruns = sruns->runs.size();
857  STXXL_VERBOSE2("Runs: " << nruns);
858  unsigned_type irun = 0;
859  for (irun = 0; irun < nruns; ++irun)
860  {
861  const unsigned_type nblocks = sruns->runs[irun].size();
862  block_type* blocks = new block_type[nblocks];
863  request_ptr* reqs = new request_ptr[nblocks];
864  for (unsigned_type j = 0; j < nblocks; ++j)
865  {
866  reqs[j] = blocks[j].read(sruns->runs[irun][j].bid);
867  }
868  wait_all(reqs, reqs + nblocks);
869  delete[] reqs;
870 
871  for (unsigned_type j = 0; j < nblocks; ++j)
872  {
873  if (cmp(blocks[j][0], sruns->runs[irun][j].value) ||
874  cmp(sruns->runs[irun][j].value, blocks[j][0])) //!=
875  {
876  STXXL_ERRMSG("check_sorted_runs wrong trigger in the run");
877  delete[] blocks;
878  return false;
879  }
880  }
881  if (!stxxl::is_sorted(make_element_iterator(blocks, 0),
882  make_element_iterator(blocks, sruns->runs_sizes[irun]),
883  cmp))
884  {
885  STXXL_ERRMSG("check_sorted_runs wrong order in the run");
886  delete[] blocks;
887  return false;
888  }
889 
890  delete[] blocks;
891  }
892 
893  STXXL_MSG("Checking runs finished successfully");
894 
895  return true;
896 }
897 
898 
899 ////////////////////////////////////////////////////////////////////////
900 // MERGE RUNS //
901 ////////////////////////////////////////////////////////////////////////
902 
903 //! Merges sorted runs.
904 //!
905 //! \tparam RunsType_ type of the sorted runs, available as \c runs_creator::sorted_runs_type ,
906 //! \tparam CompareType_ type of comparison object used for merging
907 //! \tparam AllocStr_ allocation strategy used to allocate the blocks for
908 //! storing intermediate results if several merge passes are required
909 template <class RunsType_,
910  class CompareType_,
911  class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY>
913 {
914 public:
915  typedef RunsType_ sorted_runs_type;
916  typedef CompareType_ value_cmp;
917  typedef AllocStr_ alloc_strategy;
918 
919  typedef typename sorted_runs_type::element_type sorted_runs_data_type;
920  typedef typename sorted_runs_data_type::size_type size_type;
921  typedef typename sorted_runs_data_type::run_type run_type;
922  typedef typename sorted_runs_data_type::block_type block_type;
924  typedef typename run_type::value_type trigger_entry_type;
930  typedef std::pair<typename block_type::iterator, typename block_type::iterator> sequence;
931  typedef typename std::vector<sequence>::size_type seqs_size_type;
932 
933 public:
934  //! Standard stream typedef.
935  typedef typename sorted_runs_data_type::value_type value_type;
936 
937 private:
938  //! comparator object to sort runs
940 
941  //! memory size in bytes to use
943 
944  //! smart pointer to sorted_runs object
946 
947  //! items remaining in input
949 
950  //! memory buffer for merging from external streams
951  out_block_type* m_buffer_block;
952 
953  //! pointer into current memory buffer: this is either m_buffer_block or the small_runs vector
955 
956  //! pointer into current memory buffer: end after range of current values
958 
959  //! sequence of block needed for merging
961 
962  //! precalculated order of blocks in which they are prefetched
964 
965  //! prefetcher object
967 
968  //! loser tree used for native merging
970 
971 #if STXXL_PARALLEL_MULTIWAY_MERGE
972  std::vector<sequence>* seqs;
973  std::vector<block_type*>* buffers;
974  diff_type num_currently_mergeable;
975 #endif
976 
977 #if STXXL_CHECK_ORDER_IN_SORTS
978  //! previous element to ensure the current output ordering
979  value_type m_last_element;
980 #endif //STXXL_CHECK_ORDER_IN_SORTS
981 
982  ////////////////////////////////////////////////////////////////////
983 
984  void merge_recursively();
985 
987  {
988  if (m_prefetcher)
989  {
990  delete m_losers;
991 #if STXXL_PARALLEL_MULTIWAY_MERGE
992  delete seqs;
993  delete buffers;
994 #endif
995  delete m_prefetcher;
996  delete[] m_prefetch_seq;
997  m_prefetcher = NULL;
998  }
999  }
1000 
1002  {
1003  STXXL_VERBOSE1("fill_buffer_block");
1004  if (do_parallel_merge())
1005  {
1006 #if STXXL_PARALLEL_MULTIWAY_MERGE
1007 // begin of STL-style merging
1008  diff_type rest = out_block_type::size; // elements still to merge for this output block
1009 
1010  do // while rest > 0 and still elements available
1011  {
1012  if (num_currently_mergeable < rest)
1013  {
1014  if (!m_prefetcher || m_prefetcher->empty())
1015  {
1016  // anything remaining is already in memory
1017  num_currently_mergeable = m_elements_remaining;
1018  }
1019  else
1020  {
1021  num_currently_mergeable = sort_helper::count_elements_less_equal(
1022  *seqs, m_consume_seq[m_prefetcher->pos()].value, m_cmp);
1023  }
1024  }
1025 
1026  diff_type output_size = STXXL_MIN(num_currently_mergeable, rest); // at most rest elements
1027 
1028  STXXL_VERBOSE1("before merge " << output_size);
1029 
1030  stxxl::parallel::multiway_merge((*seqs).begin(), (*seqs).end(), m_buffer_block->end() - rest, m_cmp, output_size);
1031  // sequence iterators are progressed appropriately
1032 
1033  rest -= output_size;
1034  num_currently_mergeable -= output_size;
1035 
1036  STXXL_VERBOSE1("after merge");
1037 
1038  sort_helper::refill_or_remove_empty_sequences(*seqs, *buffers, *m_prefetcher);
1039  } while (rest > 0 && (*seqs).size() > 0);
1040 
1041 #if STXXL_CHECK_ORDER_IN_SORTS
1042  if (!stxxl::is_sorted(m_buffer_block->begin(), m_buffer_block->end(), cmp))
1043  {
1044  for (value_type* i = m_buffer_block->begin() + 1; i != m_buffer_block->end(); ++i)
1045  if (cmp(*i, *(i - 1)))
1046  {
1047  STXXL_VERBOSE1("Error at position " << (i - m_buffer_block->begin()));
1048  }
1049  assert(false);
1050  }
1051 #endif //STXXL_CHECK_ORDER_IN_SORTS
1052 
1053 // end of STL-style merging
1054 #else
1056 #endif //STXXL_PARALLEL_MULTIWAY_MERGE
1057  }
1058  else
1059  {
1060 // begin of native merging procedure
1061  m_losers->multi_merge(m_buffer_block->elem, m_buffer_block->elem + STXXL_MIN<size_type>(out_block_type::size, m_elements_remaining));
1062 // end of native merging procedure
1063  }
1064  STXXL_VERBOSE1("current block filled");
1065 
1066  m_current_ptr = m_buffer_block->elem;
1067  m_current_end = m_buffer_block->elem + STXXL_MIN<size_type>(out_block_type::size, m_elements_remaining);
1068 
1069  if (m_elements_remaining <= out_block_type::size)
1070  deallocate_prefetcher();
1071  }
1072 
1073 public:
1074  //! Creates a runs merger object.
1075  //! \param c comparison object
1076  //! \param memory_to_use amount of memory available for the merger in bytes
1078  : m_cmp(c),
1079  m_memory_to_use(memory_to_use),
1080  m_buffer_block(new out_block_type),
1081  m_prefetch_seq(NULL),
1082  m_prefetcher(NULL),
1083  m_losers(NULL)
1085  , seqs(NULL),
1086  buffers(NULL),
1087  num_currently_mergeable(0)
1088 #endif
1090  , m_last_element(m_cmp.min_value())
1091 #endif //STXXL_CHECK_ORDER_IN_SORTS
1092  {
1094  }
1095 
1096  //! Set memory amount to use for the merger in bytes.
1097  void set_memory_to_use(unsigned_type memory_to_use)
1098  {
1099  m_memory_to_use = memory_to_use;
1100  }
1101 
1102  //! Initialize the runs merger object with a new round of sorted_runs.
1103  void initialize(const sorted_runs_type& sruns)
1104  {
1105  m_sruns = sruns;
1106  m_elements_remaining = m_sruns->elements;
1107 
1108  if (empty())
1109  return;
1110 
1111  if (!m_sruns->small_run.empty())
1112  {
1113  // we have a small input <= B, that is kept in the main memory
1114  STXXL_VERBOSE1("basic_runs_merger: small input optimization, input length: " << m_elements_remaining);
1115  assert(m_elements_remaining == size_type(m_sruns->small_run.size()));
1116 
1117  m_current_ptr = &m_sruns->small_run[0];
1118  m_current_end = m_current_ptr + m_sruns->small_run.size();
1119 
1120  return;
1121  }
1122 
1123 #if STXXL_CHECK_ORDER_IN_SORTS
1124  assert(check_sorted_runs(m_sruns, m_cmp));
1125 #endif //STXXL_CHECK_ORDER_IN_SORTS
1126 
1127  // *** test whether recursive merging is necessary
1128 
1129  disk_queues::get_instance()->set_priority_op(request_queue::WRITE);
1130 
1131  int_type disks_number = config::get_instance()->disks_number();
1132  unsigned_type min_prefetch_buffers = 2 * disks_number;
1133  unsigned_type input_buffers = (m_memory_to_use > sizeof(out_block_type) ? m_memory_to_use - sizeof(out_block_type) : 0) / block_type::raw_size;
1134  unsigned_type nruns = m_sruns->runs.size();
1135 
1136  if (input_buffers < nruns + min_prefetch_buffers)
1137  {
1138  // can not merge runs in one pass. merge recursively:
1139  STXXL_WARNMSG_RECURSIVE_SORT("The implementation of sort requires more than one merge pass, therefore for a better");
1140  STXXL_WARNMSG_RECURSIVE_SORT("efficiency decrease block size of run storage (a parameter of the run_creator)");
1141  STXXL_WARNMSG_RECURSIVE_SORT("or increase the amount memory dedicated to the merger.");
1142  STXXL_WARNMSG_RECURSIVE_SORT("m=" << input_buffers << " nruns=" << nruns << " prefetch_blocks=" << min_prefetch_buffers);
1143  STXXL_WARNMSG_RECURSIVE_SORT("memory_to_use=" << m_memory_to_use << " bytes block_type::raw_size=" << block_type::raw_size << " bytes");
1144 
1145  // check whether we have enough memory to merge recursively
1146  unsigned_type recursive_merge_buffers = m_memory_to_use / block_type::raw_size;
1147  if (recursive_merge_buffers < 2 * min_prefetch_buffers + 1 + 2) {
1148  // recursive merge uses min_prefetch_buffers for input buffering and min_prefetch_buffers output buffering
1149  // as well as 1 current output block and at least 2 input blocks
1150  STXXL_ERRMSG("There are only m=" << recursive_merge_buffers << " blocks available for recursive merging, but "
1151  << min_prefetch_buffers << "+" << min_prefetch_buffers << "+1 are needed read-ahead/write-back/output, and");
1152  STXXL_ERRMSG("the merger requires memory to store at least two input blocks internally. Aborting.");
1153  throw bad_parameter("basic_runs_merger::sort(): INSUFFICIENT MEMORY provided, please increase parameter 'memory_to_use'");
1154  }
1155 
1156  merge_recursively();
1157 
1158  nruns = m_sruns->runs.size();
1159  }
1160 
1161  assert(nruns + min_prefetch_buffers <= input_buffers);
1162 
1163  // *** Allocate prefetcher and merge data structure
1164 
1165  deallocate_prefetcher();
1166 
1167  unsigned_type prefetch_seq_size = 0;
1168  for (unsigned_type i = 0; i < nruns; ++i)
1169  {
1170  prefetch_seq_size += m_sruns->runs[i].size();
1171  }
1172 
1173  m_consume_seq.resize(prefetch_seq_size);
1174  m_prefetch_seq = new int_type[prefetch_seq_size];
1175 
1176  typename run_type::iterator copy_start = m_consume_seq.begin();
1177  for (unsigned_type i = 0; i < nruns; ++i)
1178  {
1179  copy_start = std::copy(m_sruns->runs[i].begin(),
1180  m_sruns->runs[i].end(),
1181  copy_start);
1182  }
1183 
1184  std::stable_sort(m_consume_seq.begin(), m_consume_seq.end(),
1186 
1187  const unsigned_type n_prefetch_buffers = STXXL_MAX(min_prefetch_buffers, input_buffers - nruns);
1188 
1189 #if STXXL_SORT_OPTIMAL_PREFETCHING
1190  // heuristic
1191  const int_type n_opt_prefetch_buffers = min_prefetch_buffers + (3 * (n_prefetch_buffers - min_prefetch_buffers)) / 10;
1192 
1194  m_consume_seq,
1195  m_prefetch_seq,
1196  n_opt_prefetch_buffers,
1197  disks_number);
1198 #else
1199  for (unsigned_type i = 0; i < prefetch_seq_size; ++i)
1200  m_prefetch_seq[i] = i;
1201 #endif //STXXL_SORT_OPTIMAL_PREFETCHING
1202 
1203  m_prefetcher = new prefetcher_type(
1204  m_consume_seq.begin(),
1205  m_consume_seq.end(),
1206  m_prefetch_seq,
1207  STXXL_MIN(nruns + n_prefetch_buffers, prefetch_seq_size));
1208 
1209  if (do_parallel_merge())
1210  {
1211 #if STXXL_PARALLEL_MULTIWAY_MERGE
1212 // begin of STL-style merging
1213  seqs = new std::vector<sequence>(nruns);
1214  buffers = new std::vector<block_type*>(nruns);
1215 
1216  for (unsigned_type i = 0; i < nruns; ++i) //initialize sequences
1217  {
1218  (*buffers)[i] = m_prefetcher->pull_block(); //get first block of each run
1219  (*seqs)[i] = std::make_pair((*buffers)[i]->begin(), (*buffers)[i]->end()); //this memory location stays the same, only the data is exchanged
1220  }
1221 // end of STL-style merging
1222 #else
1224 #endif //STXXL_PARALLEL_MULTIWAY_MERGE
1225  }
1226  else
1227  {
1228 // begin of native merging procedure
1229  m_losers = new loser_tree_type(m_prefetcher, nruns, run_cursor2_cmp_type(m_cmp));
1230 // end of native merging procedure
1231  }
1232 
1233  fill_buffer_block();
1234  }
1235 
1236  //! Deallocate temporary structures freeing memory prior to next initialize().
1237  void deallocate()
1238  {
1239  deallocate_prefetcher();
1240  m_sruns = NULL; // release reference on result object
1241  }
1242 
1243 public:
1244  //! Standard stream method.
1245  bool empty() const
1246  {
1247  return (m_elements_remaining == 0);
1248  }
1249 
1250  //! Standard size method.
1251  size_type size() const
1252  {
1253  return m_elements_remaining;
1254  }
1255 
1256  //! Standard stream method.
1257  const value_type& operator * () const
1258  {
1259  assert(!empty());
1260  return *m_current_ptr;
1261  }
1262 
1263  //! Standard stream method.
1264  const value_type* operator -> () const
1265  {
1266  return &(operator * ());
1267  }
1268 
1269  //! Standard stream method.
1270  basic_runs_merger& operator ++ () // preincrement operator
1271  {
1272  assert(!empty());
1273  assert(m_current_ptr != m_current_end);
1274 
1275  --m_elements_remaining;
1276  ++m_current_ptr;
1277 
1278  if (LIKELY(m_current_ptr == m_current_end && !empty()))
1279  {
1280  fill_buffer_block();
1281 
1282 #if STXXL_CHECK_ORDER_IN_SORTS
1283  assert(stxxl::is_sorted(m_buffer_block->elem, m_buffer_block->elem + STXXL_MIN<size_type>(m_elements_remaining, m_buffer_block->size), m_cmp));
1284 #endif //STXXL_CHECK_ORDER_IN_SORTS
1285  }
1286 
1287 #if STXXL_CHECK_ORDER_IN_SORTS
1288  if (!empty())
1289  {
1290  assert(!m_cmp(operator * (), m_last_element));
1291  m_last_element = operator * ();
1292  }
1293 #endif //STXXL_CHECK_ORDER_IN_SORTS
1294 
1295  return *this;
1296  }
1297 
1298  //! Destructor.
1299  //! \remark Deallocates blocks of the input sorted runs object
1301  {
1302  deallocate_prefetcher();
1303 
1304  delete m_buffer_block;
1305  }
1306 };
1307 
1308 
1309 template <class RunsType_, class CompareType_, class AllocStr_>
1311 {
1312  block_manager* bm = block_manager::get_instance();
1313  unsigned_type ndisks = config::get_instance()->disks_number();
1314  unsigned_type nwrite_buffers = 2 * ndisks;
1315  unsigned_type memory_for_write_buffers = nwrite_buffers * sizeof(block_type);
1316 
1317  // memory consumption of the recursive merger (uses block_type as out_block_type)
1318  unsigned_type recursive_merger_memory_prefetch_buffers = 2 * ndisks * sizeof(block_type);
1319  unsigned_type recursive_merger_memory_out_block = sizeof(block_type);
1320  unsigned_type memory_for_buffers = memory_for_write_buffers
1321  + recursive_merger_memory_prefetch_buffers
1322  + recursive_merger_memory_out_block;
1323  // maximum arity in the recursive merger
1324  unsigned_type max_arity = (m_memory_to_use > memory_for_buffers ? m_memory_to_use - memory_for_buffers : 0) / block_type::raw_size;
1325 
1326  unsigned_type nruns = m_sruns->runs.size();
1327  const unsigned_type merge_factor = optimal_merge_factor(nruns, max_arity);
1328  assert(merge_factor > 1);
1329  assert(merge_factor <= max_arity);
1330 
1331  while (nruns > max_arity)
1332  {
1333  unsigned_type new_nruns = div_ceil(nruns, merge_factor);
1334  STXXL_MSG("Starting new merge phase: nruns: " << nruns <<
1335  " opt_merge_factor: " << merge_factor <<
1336  " max_arity: " << max_arity << " new_nruns: " << new_nruns);
1337 
1338  // construct new sorted_runs data object which will be swapped into m_sruns
1339 
1340  sorted_runs_data_type new_runs;
1341  new_runs.runs.resize(new_nruns);
1342  new_runs.runs_sizes.resize(new_nruns);
1343  new_runs.elements = m_sruns->elements;
1344 
1345  // merge all runs from m_runs into news_runs
1346 
1347  unsigned_type runs_left = nruns;
1348  unsigned_type cur_out_run = 0;
1349  size_type elements_left = m_sruns->elements;
1350 
1351  while (runs_left > 0)
1352  {
1353  unsigned_type runs2merge = STXXL_MIN(runs_left, merge_factor);
1354  STXXL_MSG("Merging " << runs2merge << " runs");
1355 
1356  if (runs2merge > 1) // non-trivial merge
1357  {
1358  // count the number of elements in the run
1359  size_type elements_in_new_run = 0;
1360  for (unsigned_type i = nruns - runs_left; i < (nruns - runs_left + runs2merge); ++i)
1361  {
1362  elements_in_new_run += m_sruns->runs_sizes[i];
1363  }
1364  new_runs.runs_sizes[cur_out_run] = elements_in_new_run;
1365 
1366  // calculate blocks in run
1367  const unsigned_type blocks_in_new_run = div_ceil(elements_in_new_run, block_type::size);
1368 
1369  // allocate blocks for the new runs
1370  new_runs.runs[cur_out_run].resize(blocks_in_new_run);
1371  bm->new_blocks(alloc_strategy(), make_bid_iterator(new_runs.runs[cur_out_run].begin()), make_bid_iterator(new_runs.runs[cur_out_run].end()));
1372 
1373  // Construct temporary sorted_runs object as input into recursive merger.
1374  // This sorted_runs is copied a subset of the over-large set of runs, which
1375  // will be deallocated from external memory once the runs are merged.
1376  sorted_runs_type cur_runs = new sorted_runs_data_type;
1377  cur_runs->runs.resize(runs2merge);
1378  cur_runs->runs_sizes.resize(runs2merge);
1379 
1380  std::copy(m_sruns->runs.begin() + nruns - runs_left,
1381  m_sruns->runs.begin() + nruns - runs_left + runs2merge,
1382  cur_runs->runs.begin());
1383  std::copy(m_sruns->runs_sizes.begin() + nruns - runs_left,
1384  m_sruns->runs_sizes.begin() + nruns - runs_left + runs2merge,
1385  cur_runs->runs_sizes.begin());
1386 
1387  cur_runs->elements = elements_in_new_run;
1388  elements_left -= elements_in_new_run;
1389 
1390  // construct recursive merger
1391 
1392  basic_runs_merger<RunsType_, CompareType_, AllocStr_> merger(m_cmp, m_memory_to_use - memory_for_write_buffers);
1393  merger.initialize(cur_runs);
1394 
1395  { // make sure everything is being destroyed in right time
1397  new_runs.runs[cur_out_run].begin(),
1398  nwrite_buffers);
1399 
1400  size_type cnt = 0;
1401  const size_type cnt_max = cur_runs->elements;
1402 
1403  while (cnt != cnt_max)
1404  {
1405  *out = *merger;
1406  if ((cnt % block_type::size) == 0) // have to write the trigger value
1407  new_runs.runs[cur_out_run][cnt / size_type(block_type::size)].value = *merger;
1408 
1409  ++cnt, ++out, ++merger;
1410  }
1411  assert(merger.empty());
1412 
1413  while (cnt % block_type::size)
1414  {
1415  *out = m_cmp.max_value();
1416  ++out, ++cnt;
1417  }
1418  }
1419 
1420  // deallocate merged runs by destroying cur_runs
1421  }
1422  else // runs2merge = 1 -> no merging needed
1423  {
1424  assert(cur_out_run + 1 == new_runs.runs.size());
1425 
1426  elements_left -= m_sruns->runs_sizes.back();
1427 
1428  // copy block identifiers into new sorted_runs object
1429  new_runs.runs.back() = m_sruns->runs.back();
1430  new_runs.runs_sizes.back() = m_sruns->runs_sizes.back();
1431  }
1432 
1433  runs_left -= runs2merge;
1434  ++cur_out_run;
1435  }
1436 
1437  assert(elements_left == 0);
1438 
1439  m_sruns->runs.clear(); // clear bid vector of m_sruns to skip deallocation of blocks in destructor
1440 
1441  std::swap(nruns, new_nruns);
1442  m_sruns->swap(new_runs); // replaces data in referenced counted object m_sruns
1443  } // end while (nruns > max_arity)
1444 }
1445 
1446 
1447 //! Merges sorted runs.
1448 //!
1449 //! \tparam RunsType_ type of the sorted runs, available as \c runs_creator::sorted_runs_type ,
1450 //! \tparam CompareType_ type of comparison object used for merging
1451 //! \tparam AllocStr_ allocation strategy used to allocate the blocks for
1452 //! storing intermediate results if several merge passes are required
1453 template <class RunsType_,
1454  class CompareType_ = typename RunsType_::element_type::cmp_type,
1455  class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY>
1456 class runs_merger : public basic_runs_merger<RunsType_, CompareType_, AllocStr_>
1457 {
1458 protected:
1460 
1461 public:
1462  typedef RunsType_ sorted_runs_type;
1463  typedef typename base::value_cmp value_cmp;
1464  typedef typename base::value_cmp cmp_type;
1465  typedef typename base::block_type block_type;
1466 
1467 public:
1468  //! Creates a runs merger object.
1469  //! \param sruns input sorted runs object
1470  //! \param cmp comparison object
1471  //! \param memory_to_use amount of memory available for the merger in bytes
1473  : base(cmp, memory_to_use)
1474  {
1475  this->initialize(sruns);
1476  }
1477 
1478  //! Creates a runs merger object without initializing a round of sorted_runs.
1479  //! \param cmp comparison object
1480  //! \param memory_to_use amount of memory available for the merger in bytes
1482  : base(cmp, memory_to_use)
1483  { }
1484 };
1485 
1486 
1487 ////////////////////////////////////////////////////////////////////////
1488 // SORT //
1489 ////////////////////////////////////////////////////////////////////////
1490 
1491 //! Produces sorted stream from input stream.
1492 //!
1493 //! \tparam Input_ type of the input stream
1494 //! \tparam CompareType_ type of comparison object used for sorting the runs
1495 //! \tparam BlockSize_ size of blocks used to store the runs
1496 //! \tparam AllocStr_ functor that defines allocation strategy for the runs
1497 //! \remark Implemented as the composition of \c runs_creator and \c runs_merger .
1498 template <class Input_,
1499  class CompareType_,
1500  unsigned BlockSize_ = STXXL_DEFAULT_BLOCK_SIZE(typename Input_::value_type),
1501  class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY,
1502  class runs_creator_type = runs_creator<Input_, CompareType_, BlockSize_, AllocStr_> >
1503 class sort : public noncopyable
1504 {
1505  typedef typename runs_creator_type::sorted_runs_type sorted_runs_type;
1507 
1508  runs_creator_type creator;
1510 
1511 public:
1512  //! Standard stream typedef.
1513  typedef typename Input_::value_type value_type;
1514 
1515  //! Creates the object.
1516  //! \param in input stream
1517  //! \param c comparator object
1518  //! \param memory_to_use memory amount that is allowed to used by the sorter in bytes
1519  sort(Input_& in, CompareType_ c, unsigned_type memory_to_use) :
1520  creator(in, c, memory_to_use),
1521  merger(creator.result(), c, memory_to_use)
1522  {
1524  }
1525 
1526  //! Creates the object.
1527  //! \param in input stream
1528  //! \param c comparator object
1529  //! \param m_memory_to_userc memory amount that is allowed to used by the runs creator in bytes
1530  //! \param m_memory_to_use memory amount that is allowed to used by the merger in bytes
1531  sort(Input_& in, CompareType_ c, unsigned_type m_memory_to_userc, unsigned_type m_memory_to_use) :
1532  creator(in, c, m_memory_to_userc),
1533  merger(creator.result(), c, m_memory_to_use)
1534  {
1536  }
1537 
1538 
1539  //! Standard stream method.
1540  bool empty() const
1541  {
1542  return merger.empty();
1543  }
1544 
1545  //! Standard stream method.
1546  const value_type& operator * () const
1547  {
1548  assert(!empty());
1549  return *merger;
1550  }
1551 
1552  const value_type* operator -> () const
1553  {
1554  assert(!empty());
1555  return merger.operator -> ();
1556  }
1557 
1558  //! Standard stream method.
1560  {
1561  ++merger;
1562  return *this;
1563  }
1564 };
1565 
1566 //! Computes sorted runs type from value type and block size.
1567 //!
1568 //! \tparam ValueType_ type of values ins sorted runs
1569 //! \tparam BlockSize_ size of blocks where sorted runs stored
1570 template <
1571  class ValueType_,
1572  unsigned BlockSize_>
1574 {
1575  typedef ValueType_ value_type;
1578 
1579 public:
1581 };
1582 
1583 //! \}
1584 
1585 } // namespace stream
1586 
1587 //! \addtogroup stlalgo
1588 //! \{
1589 
1590 //! Sorts range of any random access iterators externally.
1591 //!
1592 //! \param begin iterator pointing to the first element of the range
1593 //! \param end iterator pointing to the last+1 element of the range
1594 //! \param cmp comparison object
1595 //! \param MemSize memory to use for sorting (in bytes)
1596 //! \param AS allocation strategy
1597 //!
1598 //! The \c BlockSize template parameter defines the block size to use (in bytes)
1599 //! \warning Slower than External Iterator Sort
1600 template <unsigned BlockSize,
1601  class RandomAccessIterator,
1602  class CmpType,
1603  class AllocStr>
1604 void sort(RandomAccessIterator begin,
1605  RandomAccessIterator end,
1606  CmpType cmp,
1607  unsigned_type MemSize,
1608  AllocStr AS)
1609 {
1610  STXXL_UNUSED(AS);
1611 #if STXXL_MSVC
1612  typedef typename streamify_traits<RandomAccessIterator>::stream_type InputType;
1613 #else
1614  typedef __typeof__ (stream::streamify(begin, end)) InputType;
1615 #endif // STXXL_MSVC
1616  InputType Input(begin, end);
1618  sorter_type Sort(Input, cmp, MemSize);
1619  stream::materialize(Sort, begin);
1620 }
1621 
1622 //! \}
1623 
1625 
1626 #endif // !STXXL_STREAM_SORT_STREAM_HEADER
1627 // vim: et:ts=4:sw=4
sorted_runs_type m_sruns
smart pointer to sorted_runs object
Definition: sort_stream.h:945
const Tp & STXXL_MIN(const Tp &a, const Tp &b)
Definition: utils.h:147
counting_ptr< sorted_runs_data_type > sorted_runs_type
Definition: sort_stream.h:65
sort_helper::run_cursor2_cmp< block_type, prefetcher_type, value_cmp > run_cursor2_cmp_type
Definition: sort_stream.h:927
#define STXXL_THROW_UNREACHABLE()
Throws stxxl::unreachable with &quot;Error in file [file], line [line] : this code should never be reachab...
#define STXXL_DEFAULT_BLOCK_SIZE(type)
CompareType_ m_cmp
reference to the input stream
Definition: sort_stream.h:69
size_type m_elements_remaining
items remaining in input
Definition: sort_stream.h:948
#define LIKELY(c)
Definition: utils.h:220
const value_type * m_current_ptr
pointer into current memory buffer: this is either m_buffer_block or the small_runs vector ...
Definition: sort_stream.h:954
Forms sorted runs of data from a stream.
Definition: sort_stream.h:51
sorted_runs_data_type::block_type block_type
Definition: sort_stream.h:922
unsigned sort_memory_usage_factor()
Definition: parallel.h:89
runs_merger(value_cmp cmp, unsigned_type memory_to_use)
Creates a runs merger object without initializing a round of sorted_runs.
Definition: sort_stream.h:1481
bool m_result_computed
memory for internal use in blocks
Definition: sort_stream.h:74
Input_::value_type value_type
Definition: sort_stream.h:60
Block manager class.
Definition: block_manager.h:63
bool is_sorted(_ForwardIter __first, _ForwardIter __last)
Definition: is_sorted.h:54
const unsigned_type m_memsize
memory size in numberr of blocks for internal use
Definition: sort_stream.h:429
long long int int64
Definition: types.h:40
sorted_runs< trigger_entry_type, std::less< value_type > > result
Definition: sort_stream.h:1580
runs_creator(CompareType_ cmp, unsigned_type memory_to_use)
Creates the object.
Definition: sort_stream.h:528
void sort(ExtIterator first, ExtIterator last, StrictWeakOrdering cmp, unsigned_type M)
Sort records comparison-based, see stxxl::sort -- Sorting Comparison-Based.
Definition: sort.h:673
sort(Input_ &in, CompareType_ c, unsigned_type m_memory_to_userc, unsigned_type m_memory_to_use)
Creates the object.
Definition: sort_stream.h:1531
void sort_run(block_type *run, unsigned_type elements)
Sort a specific run, contained in a sequences of blocks.
Definition: sort_stream.h:107
trigger_entry_iterator< Iterator > make_bid_iterator(Iterator iter)
Definition: adaptor.h:241
unsigned_type count_elements_less_equal(const SequenceVector &seqs, const ValueType &bound, Comparator cmp)
Definition: sort_helper.h:93
All sorted runs of a sort operation.
Definition: sorted_runs.h:39
sorted_runs< trigger_entry_type, cmp_type > sorted_runs_data_type
Definition: sort_stream.h:63
block_type * m_blocks1
accumulation buffer of size m_m2 blocks, half the available memory size
Definition: sort_stream.h:444
base::block_type block_type
Definition: sort_stream.h:357
void sort_run(block_type *run, unsigned_type elements)
Sort a specific run, contained in a sequences of blocks.
Definition: sort_stream.h:472
#define STXXL_PARALLEL_MULTIWAY_MERGE
Definition: parallel.h:64
#define STXXL_WARNMSG_RECURSIVE_SORT
Definition: sort_base.h:21
void verify_sentinel_strict_weak_ordering(StrictWeakOrdering cmp)
Definition: sort_helper.h:29
virtual ~basic_runs_merger()
Destructor.
Definition: sort_stream.h:1300
#define STXXL_DEFAULT_ALLOC_STRATEGY
Definition: block_alloc.h:259
void refill_or_remove_empty_sequences(SequenceVector &seqs, BufferPtrVector &buffers, Prefetcher &prefetcher)
Definition: sort_helper.h:112
runs_merger_type merger
Definition: sort_stream.h:1509
prefetcher_type * m_prefetcher
prefetcher object
Definition: sort_stream.h:966
sorted_runs_data_type::run_type run_type
Definition: sort_stream.h:64
std::pair< typename block_type::iterator, typename block_type::iterator > sequence
Definition: sort_stream.h:930
Definition: sort_helper.h:55
Computes sorted runs type from value type and block size.
Definition: sort_stream.h:1573
#define STXXL_VERBOSE2(x)
Definition: verbose.h:107
Buffered output stream.
Definition: buf_ostream.h:31
void push(const value_type &val)
Adds new element to the sorter.
Definition: sort_stream.h:601
void check_sort_settings()
Definition: parallel.h:98
block_type * pull_block()
Pulls next unconsumed block from the consumption sequence.
helper::element_iterator_generator< BlockType, BlockType::has_only_data >::iterator element_iterator
Definition: adaptor.h:625
loser_tree_type * m_losers
loser tree used for native merging
Definition: sort_stream.h:969
uint_pair & operator++()
prefix increment operator (directly manipulates the integer parts)
Definition: uint_types.h:166
base::value_cmp value_cmp
Definition: sort_stream.h:1463
element_iterator_traits< BlockType >::element_iterator make_element_iterator(BlockType *blocks, unsigned_type offset)
Definition: adaptor.h:631
run_type run
run object containing block ids of the run being written to disk
Definition: sort_stream.h:453
void wait_all(request_iterator_ reqs_begin, request_iterator_ reqs_end)
Collection of functions to track statuses of a number of requests.
runs_creator_type::sorted_runs_type sorted_runs_type
Definition: sort_stream.h:1505
run_cursor2< block_type, prefetcher_type > run_cursor_type
Definition: sort_stream.h:926
unsigned_type fetch(block_type *blocks, unsigned_type first_idx, unsigned_type last_idx)
true iff result is already computed (used in &#39;result()&#39; method)
Definition: sort_stream.h:77
Input_::value_type value_type
Standard stream typedef.
Definition: sort_stream.h:1513
request_ptr * m_write_reqs
reference to write requests transporting the last accumulation buffer to disk
Definition: sort_stream.h:450
void delete_blocks(const BIDIteratorClass &bidbegin, const BIDIteratorClass &bidend)
Deallocates blocks.
sorted_runs_data_type::run_type run_type
Definition: sort_stream.h:921
Input strategy for runs_creator class.
Definition: sort_stream.h:683
Block containing elements of fixed length.
Definition: typed_block.h:238
virtual void wait(bool measure_time=true)=0
Suspends calling thread until completion of the request.
Encapsulates asynchronous prefetching engine.
void clear()
Clear the internal state of the object: release all runs and reset.
Definition: sorted_runs.h:77
Merges sorted runs.
Definition: sort_stream.h:1456
base::sorted_runs_type sorted_runs_type
Definition: sort_stream.h:359
const Tp & STXXL_MAX(const Tp &a, const Tp &b)
Definition: utils.h:154
#define _STXXL_SORT_TRIGGER_FORCE_SEQUENTIAL
Definition: parallel.h:59
choose_int_types< my_pointer_size >::int_type int_type
Definition: types.h:66
basic_runs_creator(Input_ &input, CompareType_ cmp, unsigned_type memory_to_use)
Create the object.
Definition: sort_stream.h:122
Block identifier class.
Definition: bid.h:42
void fill_with_max_value(block_type *blocks, unsigned_type num_blocks, unsigned_type first_idx)
fill the rest of the block with max values
Definition: sort_stream.h:457
basic_runs_creator< Input_, CompareType_, BlockSize_, AllocStr_ > base
Definition: sort_stream.h:352
Forms sorted runs of data from a stream.
Definition: sort_stream.h:349
int_type * m_prefetch_seq
precalculated order of blocks in which they are prefetched
Definition: sort_stream.h:963
out_block_type * m_buffer_block
memory buffer for merging from external streams
Definition: sort_stream.h:951
runs_creator(Input_ &input, CompareType_ cmp, unsigned_type memory_to_use)
Creates the object.
Definition: sort_stream.h:366
typed_block< BlockSize_, value_type > block_type
Definition: sort_stream.h:61
base::value_type value_type
Definition: sort_stream.h:356
sorted_runs_type m_result
comparator used to sort block groups
Definition: sort_stream.h:72
#define STXXL_CHECK_ORDER_IN_SORTS
Definition: sort_base.h:31
#define STXXL_BEGIN_NAMESPACE
Definition: namespace.h:16
void STXXL_UNUSED(const U &)
Definition: unused.h:23
OutputIterator_ materialize(StreamAlgorithm_ &in, OutputIterator_ out)
Stores consecutively stream content to an output iterator.
Definition: stream.h:396
unsigned_type m_memsize
stores the result (sorted runs) as smart pointer
Definition: sort_stream.h:73
#define STXXL_VERBOSE1(x)
Definition: verbose.h:99
run_type m_consume_seq
sequence of block needed for merging
Definition: sort_stream.h:960
sort_helper::trigger_entry< block_type > trigger_entry_type
Definition: sort_stream.h:62
Produces sorted stream from input stream.
Definition: sort_stream.h:1503
void set_memory_to_use(unsigned_type memory_to_use)
Set memory amount to use for the merger in bytes.
Definition: sort_stream.h:1097
sort(Input_ &in, CompareType_ c, unsigned_type memory_to_use)
Creates the object.
Definition: sort_stream.h:1519
sorted_runs_data_type::value_type value_type
Standard stream typedef.
Definition: sort_stream.h:935
void deallocate()
Deallocates input buffers but not the current result.
Definition: sort_stream.h:585
iterator begin()
Returns iterator pointing to the first element.
Definition: typed_block.h:93
base::value_cmp cmp_type
Definition: sort_stream.h:1464
basic_runs_merger(value_cmp c, unsigned_type memory_to_use)
Creates a runs merger object.
Definition: sort_stream.h:1077
bool m_result_computed
true after the result() method was called for the first time
Definition: sort_stream.h:435
void new_blocks(const DiskAssignFunctor &functor, BIDIteratorClass bidbegin, BIDIteratorClass bidend, unsigned_type offset=0)
Allocates new blocks.
base::sorted_runs_data_type sorted_runs_data_type
Definition: sort_stream.h:358
#define STXXL_ERRMSG(x)
Definition: verbose.h:79
bool empty() const
Standard stream method.
Definition: sort_stream.h:1245
void initialize(const sorted_runs_type &sruns)
Initialize the runs merger object with a new round of sorted_runs.
Definition: sort_stream.h:1103
loser_tree< run_cursor_type, run_cursor2_cmp_type > loser_tree_type
Definition: sort_stream.h:928
run_type::value_type trigger_entry_type
Definition: sort_stream.h:924
choose_int_types< my_pointer_size >::unsigned_type unsigned_type
Definition: types.h:67
const value_type * m_current_end
pointer into current memory buffer: end after range of current values
Definition: sort_stream.h:957
bool check_sorted_runs(run_type **runs, unsigned_type nruns, unsigned_type m, value_cmp cmp)
Definition: sort.h:209
void deallocate()
Deallocate temporary structures freeing memory prior to next initialize().
Definition: sort_stream.h:1237
compat::remove_const< Integral >::type div_ceil(Integral __n, Integral2 __d)
Definition: utils.h:200
void compute_prefetch_schedule(const int_type *first, const int_type *last, int_type *out_first, int_type m, int_type D)
sorted_runs_type m_result
stores the result (sorted runs) in a reference counted object
Definition: sort_stream.h:423
#define STXXL_MSG(x)
Definition: verbose.h:72
size_type size() const
Standard size method.
Definition: sort_stream.h:1251
unsigned_type optimal_merge_factor(unsigned_type num_runs, unsigned_type max_concurrent_runs)
Definition: sort_base.h:41
std::vector< sequence >::size_type seqs_size_type
Definition: sort_stream.h:931
Definition: sort_helper.h:39
unsigned_type memory_used() const
return memory size used (in bytes).
Definition: sort_stream.h:669
std::vector< trigger_entry_type > run_type
Definition: sorted_runs.h:44
unsigned_type m_cur_el
current number of elements in the run m_blocks1
Definition: sort_stream.h:441
block_type * m_blocks2
accumulation buffer that is currently being written to disk
Definition: sort_stream.h:447
sorted_runs_type & result()
Returns the sorted runs object.
Definition: sort_stream.h:139
request_ptr write(const bid_type &bid, completion_handler on_cmpl=default_completion_handler())
Writes block to the disk(s).
Definition: typed_block.h:284
Input strategy for runs_creator class.
Definition: sort_stream.h:380
Encapsulates asynchronous buffered block writing engine.
Definition: buf_writer.h:38
runs_merger(sorted_runs_type &sruns, value_cmp cmp, unsigned_type memory_to_use)
Creates a runs merger object.
Definition: sort_stream.h:1472
block_prefetcher< block_type, typename run_type::iterator > prefetcher_type
Definition: sort_stream.h:925
void fill_with_max_value(block_type *blocks, unsigned_type num_blocks, unsigned_type first_idx)
fill the rest of the block with max values
Definition: sort_stream.h:92
runs_merger< sorted_runs_type, CompareType_, AllocStr_ > runs_merger_type
Definition: sort_stream.h:1506
iterator2stream< InputIterator_ > streamify(InputIterator_ begin, InputIterator_ end)
Input iterator range to stream converter.
Definition: stream.h:116
basic_runs_merger< RunsType_, CompareType_, AllocStr_ > base
Definition: sort_stream.h:1459
bool do_parallel_merge()
Definition: parallel.h:117
base::block_type block_type
Definition: sort_stream.h:1465
#define STXXL_END_NAMESPACE
Definition: namespace.h:17
runs_creator_type creator
Definition: sort_stream.h:1508
unsigned_type m_memory_to_use
memory size in bytes to use
Definition: sort_stream.h:942
sorted_runs_type::element_type sorted_runs_data_type
Definition: sort_stream.h:919
value_cmp m_cmp
comparator object to sort runs
Definition: sort_stream.h:939
sort_helper::trigger_entry< bid_type, value_type > trigger_entry_type
Definition: sort_stream.h:1577
sorted_runs_data_type::size_type size_type
Definition: sort_stream.h:920
bool empty() const
Standard stream method.
Definition: sort_stream.h:1540