STXXL  1.4-dev
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
sort_stream.h
Go to the documentation of this file.
1 /***************************************************************************
2  * include/stxxl/bits/stream/sort_stream.h
3  *
4  * Part of the STXXL. See http://stxxl.sourceforge.net
5  *
6  * Copyright (C) 2002-2005 Roman Dementiev <[email protected]>
7  * Copyright (C) 2006-2008 Johannes Singler <[email protected]>
8  * Copyright (C) 2008-2010 Andreas Beckmann <[email protected]>
9  * Copyright (C) 2013 Timo Bingmann <[email protected]>
10  *
11  * Distributed under the Boost Software License, Version 1.0.
12  * (See accompanying file LICENSE_1_0.txt or copy at
13  * http://www.boost.org/LICENSE_1_0.txt)
14  **************************************************************************/
15 
16 #ifndef STXXL_STREAM_SORT_STREAM_HEADER
17 #define STXXL_STREAM_SORT_STREAM_HEADER
18 
19 #include <stxxl/bits/config.h>
28 
30 
31 namespace stream {
32 
33 //! \addtogroup streampack Stream Package
34 //! \{
35 
36 ////////////////////////////////////////////////////////////////////////
37 // CREATE RUNS //
38 ////////////////////////////////////////////////////////////////////////
39 
40 //! Forms sorted runs of data from a stream.
41 //!
42 //! \tparam Input type of the input stream
43 //! \tparam CompareType type of comparison object used for sorting the runs
44 //! \tparam BlockSize size of blocks used to store the runs (in bytes)
45 //! \tparam AllocStr functor that defines allocation strategy for the runs
46 template <
47  class Input,
48  class CompareType,
49  unsigned BlockSize = STXXL_DEFAULT_BLOCK_SIZE(typename Input::value_type),
50  class AllocStr = STXXL_DEFAULT_ALLOC_STRATEGY>
52 {
53 public:
54  typedef Input input_type;
55  typedef CompareType cmp_type;
56  static const unsigned block_size = BlockSize;
57  typedef AllocStr allocation_strategy_type;
58 
59 public:
60  typedef typename Input::value_type value_type;
66 
68 
69 protected:
70  //! reference to the input stream
71  Input& m_input;
72  //! comparator used to sort block groups
73  CompareType m_cmp;
74 
75 private:
76  //! stores the result (sorted runs) as smart pointer
78  //! memory for internal use in blocks
80  //! true iff result is already computed (used in 'result()' method)
82 
83  //! Fetch data from input into blocks[first_idx,last_idx).
85  unsigned_type first_idx, unsigned_type last_idx)
86  {
87  element_iterator output = make_element_iterator(blocks, first_idx);
88  unsigned_type curr_idx = first_idx;
89  while (!m_input.empty() && curr_idx != last_idx) {
90  *output = *m_input;
91  ++m_input;
92  ++output;
93  ++curr_idx;
94  }
95  return curr_idx;
96  }
97 
98  //! fill the rest of the block with max values
99  void fill_with_max_value(block_type* blocks, unsigned_type num_blocks,
100  unsigned_type first_idx)
101  {
102  unsigned_type last_idx = num_blocks * block_type::size;
103  if (first_idx < last_idx) {
104  element_iterator curr = make_element_iterator(blocks, first_idx);
105  while (first_idx != last_idx) {
106  *curr = m_cmp.max_value();
107  ++curr;
108  ++first_idx;
109  }
110  }
111  }
112 
113  //! Sort a specific run, contained in a sequences of blocks.
114  void sort_run(block_type* run, unsigned_type elements)
115  {
118  make_element_iterator(run, elements),
119  m_cmp);
120  }
121 
122  void compute_result();
123 
124 public:
125  //! Create the object.
126  //! \param input input stream
127  //! \param cmp comparator object
128  //! \param memory_to_use memory amount that is allowed to used by the
129  //! sorter in bytes
130  basic_runs_creator(Input& input, CompareType cmp,
131  unsigned_type memory_to_use)
132  : m_input(input),
133  m_cmp(cmp),
134  m_result(new sorted_runs_data_type),
135  m_memsize(memory_to_use / BlockSize / sort_memory_usage_factor()),
136  m_result_computed(false)
137  {
139  if (!(2 * BlockSize * sort_memory_usage_factor() <= memory_to_use)) {
140  throw bad_parameter("stxxl::runs_creator<>:runs_creator(): "
141  "INSUFFICIENT MEMORY provided, "
142  "please increase parameter 'memory_to_use'");
143  }
144  assert(m_memsize > 0);
145  }
146 
147  //! Returns the sorted runs object.
148  //! \return Sorted runs object. The result is computed lazily, i.e. on the first call
149  //! \remark Returned object is intended to be used by \c runs_merger object as input
151  {
152  if (!m_result_computed)
153  {
154  compute_result();
155  m_result_computed = true;
156 #ifdef STXXL_PRINT_STAT_AFTER_RF
157  STXXL_MSG(*stats::get_instance());
158 #endif //STXXL_PRINT_STAT_AFTER_RF
159  }
160  return m_result;
161  }
162 };
163 
164 //! Finish the results, i. e. create all runs.
165 //!
166 //! This is the main routine of this class.
167 template <class Input, class CompareType, unsigned BlockSize, class AllocStr>
169 {
170  unsigned_type i = 0;
171  unsigned_type m2 = m_memsize / 2;
172  const unsigned_type el_in_run = m2 * block_type::size; // # el in a run
173  STXXL_VERBOSE1("basic_runs_creator::compute_result m2=" << m2);
174  unsigned_type blocks1_length = 0, blocks2_length = 0;
175  block_type* Blocks1 = NULL;
176 
177 #ifndef STXXL_SMALL_INPUT_PSORT_OPT
178  Blocks1 = new block_type[m2 * 2];
179 #else
180  // push input element into small_run vector in result until it is full
181  while (!input.empty() && blocks1_length != block_type::size)
182  {
183  m_result->small_run.push_back(*input);
184  ++input;
185  ++blocks1_length;
186  }
187 
188  if (blocks1_length == block_type::size && !input.empty())
189  {
190  Blocks1 = new block_type[m2 * 2];
191  std::copy(m_result->small_run.begin(), m_result->small_run.end(),
192  Blocks1[0].begin());
193  m_result->small_run.clear();
194  }
195  else
196  {
197  STXXL_VERBOSE1("basic_runs_creator: Small input optimization, input length: " << blocks1_length);
198  m_result->elements = blocks1_length;
200  potentially_parallel::sort(m_result->small_run.begin(), m_result->small_run.end(), cmp);
201  return;
202  }
203 #endif //STXXL_SMALL_INPUT_PSORT_OPT
204 
205  // the first block may be there already, now fetch until memsize is filled.
206  blocks1_length = fetch(Blocks1, blocks1_length, el_in_run);
207 
208  // sort first run
209  sort_run(Blocks1, blocks1_length);
210 
211  if (blocks1_length <= block_type::size && m_input.empty())
212  {
213  // small input, do not flush it on the disk(s)
214  STXXL_VERBOSE1("basic_runs_creator: Small input optimization, input length: " << blocks1_length);
215  assert(m_result->small_run.empty());
216  m_result->small_run.assign(Blocks1[0].begin(), Blocks1[0].begin() + blocks1_length);
217  m_result->elements = blocks1_length;
218  delete[] Blocks1;
219  return;
220  }
221 
222  block_type* Blocks2 = Blocks1 + m2;
223  block_manager* bm = block_manager::get_instance();
224  request_ptr* write_reqs = new request_ptr[m2];
225  run_type run;
226 
227  unsigned_type cur_run_size = div_ceil(blocks1_length, block_type::size); // in blocks
228  run.resize(cur_run_size);
229  bm->new_blocks(AllocStr(), make_bid_iterator(run.begin()), make_bid_iterator(run.end()));
230 
231  disk_queues::get_instance()->set_priority_op(request_queue::WRITE);
232 
233  // fill the rest of the last block with max values
234  fill_with_max_value(Blocks1, cur_run_size, blocks1_length);
235 
236  for (i = 0; i < cur_run_size; ++i)
237  {
238  run[i].value = Blocks1[i][0];
239  write_reqs[i] = Blocks1[i].write(run[i].bid);
240  }
241  m_result->runs.push_back(run);
242  m_result->runs_sizes.push_back(blocks1_length);
243  m_result->elements += blocks1_length;
244 
245  if (m_input.empty())
246  {
247  // return
248  wait_all(write_reqs, write_reqs + cur_run_size);
249  delete[] write_reqs;
250  delete[] Blocks1;
251  return;
252  }
253 
254  STXXL_VERBOSE1("Filling the second part of the allocated blocks");
255  blocks2_length = fetch(Blocks2, 0, el_in_run);
256 
257  if (m_input.empty())
258  {
259  // optimization if the whole set fits into both halves
260  // (re)sort internally and return
261  blocks2_length += el_in_run;
262  sort_run(Blocks1, blocks2_length); // sort first an second run together
263  wait_all(write_reqs, write_reqs + cur_run_size);
264  bm->delete_blocks(make_bid_iterator(run.begin()), make_bid_iterator(run.end()));
265 
266  cur_run_size = div_ceil(blocks2_length, block_type::size);
267  run.resize(cur_run_size);
268  bm->new_blocks(AllocStr(), make_bid_iterator(run.begin()), make_bid_iterator(run.end()));
269 
270  // fill the rest of the last block with max values
271  fill_with_max_value(Blocks1, cur_run_size, blocks2_length);
272 
273  assert(cur_run_size > m2);
274 
275  for (i = 0; i < m2; ++i)
276  {
277  run[i].value = Blocks1[i][0];
278  write_reqs[i]->wait();
279  write_reqs[i] = Blocks1[i].write(run[i].bid);
280  }
281 
282  request_ptr* write_reqs1 = new request_ptr[cur_run_size - m2];
283 
284  for ( ; i < cur_run_size; ++i)
285  {
286  run[i].value = Blocks1[i][0];
287  write_reqs1[i - m2] = Blocks1[i].write(run[i].bid);
288  }
289 
290  m_result->runs[0] = run;
291  m_result->runs_sizes[0] = blocks2_length;
292  m_result->elements = blocks2_length;
293 
294  wait_all(write_reqs, write_reqs + m2);
295  delete[] write_reqs;
296  wait_all(write_reqs1, write_reqs1 + cur_run_size - m2);
297  delete[] write_reqs1;
298 
299  delete[] Blocks1;
300 
301  return;
302  }
303 
304  // more than 2 runs can be filled, i. e. the general case
305 
306  sort_run(Blocks2, blocks2_length);
307 
308  cur_run_size = div_ceil(blocks2_length, block_type::size); // in blocks
309  run.resize(cur_run_size);
310  bm->new_blocks(AllocStr(), make_bid_iterator(run.begin()), make_bid_iterator(run.end()));
311 
312  for (i = 0; i < cur_run_size; ++i)
313  {
314  run[i].value = Blocks2[i][0];
315  write_reqs[i]->wait();
316  write_reqs[i] = Blocks2[i].write(run[i].bid);
317  }
318  assert((blocks2_length % el_in_run) == 0);
319 
320  m_result->add_run(run, blocks2_length);
321 
322  while (!m_input.empty())
323  {
324  blocks1_length = fetch(Blocks1, 0, el_in_run);
325  sort_run(Blocks1, blocks1_length);
326  cur_run_size = div_ceil(blocks1_length, block_type::size); // in blocks
327  run.resize(cur_run_size);
328  bm->new_blocks(AllocStr(), make_bid_iterator(run.begin()), make_bid_iterator(run.end()));
329 
330  // fill the rest of the last block with max values (occurs only on the last run)
331  fill_with_max_value(Blocks1, cur_run_size, blocks1_length);
332 
333  for (i = 0; i < cur_run_size; ++i)
334  {
335  run[i].value = Blocks1[i][0];
336  write_reqs[i]->wait();
337  write_reqs[i] = Blocks1[i].write(run[i].bid);
338  }
339  m_result->add_run(run, blocks1_length);
340 
341  std::swap(Blocks1, Blocks2);
342  std::swap(blocks1_length, blocks2_length);
343  }
344 
345  wait_all(write_reqs, write_reqs + m2);
346  delete[] write_reqs;
347  delete[] ((Blocks1 < Blocks2) ? Blocks1 : Blocks2);
348 }
349 
350 //! Forms sorted runs of data from a stream.
351 //!
352 //! \tparam Input type of the input stream
353 //! \tparam CompareType type of omparison object used for sorting the runs
354 //! \tparam BlockSize size of blocks used to store the runs
355 //! \tparam AllocStr functor that defines allocation strategy for the runs
356 template <
357  class Input,
358  class CompareType,
359  unsigned BlockSize = STXXL_DEFAULT_BLOCK_SIZE(typename Input::value_type),
360  class AllocStr = STXXL_DEFAULT_ALLOC_STRATEGY
361  >
362 class runs_creator : public basic_runs_creator<Input, CompareType, BlockSize, AllocStr>
363 {
364 private:
366 
367 public:
368  typedef typename base::cmp_type cmp_type;
369  typedef typename base::value_type value_type;
370  typedef typename base::block_type block_type;
373 
374 public:
375  //! Creates the object.
376  //! \param input input stream
377  //! \param cmp comparator object
378  //! \param memory_to_use memory amount that is allowed to used by the
379  //! sorter in bytes
380  runs_creator(Input& input, CompareType cmp, unsigned_type memory_to_use)
381  : base(input, cmp, memory_to_use)
382  { }
383 };
384 
385 //! Input strategy for \c runs_creator class.
386 //!
387 //! This strategy together with \c runs_creator class
388 //! allows to create sorted runs
389 //! data structure usable for \c runs_merger
390 //! pushing elements into the sorter
391 //! (using runs_creator::push())
392 template <class ValueType>
393 struct use_push
394 {
395  typedef ValueType value_type;
396 };
397 
398 //! Forms sorted runs of elements passed in push() method.
399 //!
400 //! A specialization of \c runs_creator that
401 //! allows to create sorted runs
402 //! data structure usable for \c runs_merger from
403 //! elements passed in sorted push() method. <BR>
404 //! \tparam ValueType type of values (parameter for \c use_push strategy)
405 //! \tparam CompareType type of comparison object used for sorting the runs
406 //! \tparam BlockSize size of blocks used to store the runs
407 //! \tparam AllocStr functor that defines allocation strategy for the runs
408 template <
409  class ValueType,
410  class CompareType,
411  unsigned BlockSize,
412  class AllocStr
413  >
415  use_push<ValueType>,
416  CompareType,
417  BlockSize,
418  AllocStr
419  >: private noncopyable
420 {
421 public:
422  typedef CompareType cmp_type;
423  typedef ValueType value_type;
429 
431 
432 private:
433  //! comparator object to sort runs
434  CompareType m_cmp;
435 
437 
438  //! stores the result (sorted runs) in a reference counted object
440 
441  //! memory size in bytes to use
443 
444  //! memory size in numberr of blocks for internal use
446 
447  //! m_memsize / 2
449 
450  //! true after the result() method was called for the first time
452 
453  //! total number of elements in a run
455 
456  //! current number of elements in the run m_blocks1
458 
459  //! accumulation buffer of size m_m2 blocks, half the available memory size
461 
462  //! accumulation buffer that is currently being written to disk
464 
465  //! reference to write requests transporting the last accumulation buffer
466  //! to disk
468 
469  //! run object containing block ids of the run being written to disk
471 
472 protected:
473  //! fill the rest of the block with max values
474  void fill_with_max_value(block_type* blocks, unsigned_type num_blocks,
475  unsigned_type first_idx)
476  {
477  unsigned_type last_idx = num_blocks * block_type::size;
478  if (first_idx < last_idx) {
479  element_iterator curr = make_element_iterator(blocks, first_idx);
480  while (first_idx != last_idx) {
481  *curr = m_cmp.max_value();
482  ++curr;
483  ++first_idx;
484  }
485  }
486  }
487 
488  //! Sort a specific run, contained in a sequences of blocks.
489  void sort_run(block_type* run, unsigned_type elements)
490  {
493  make_element_iterator(run, elements),
494  m_cmp);
495  }
496 
498  {
499  if (m_cur_el == 0)
500  return;
501 
502  sort_run(m_blocks1, m_cur_el);
503 
504  if (m_cur_el <= block_type::size && m_result->elements == 0)
505  {
506  // small input, do not flush it on the disk(s)
507  STXXL_VERBOSE1("runs_creator(use_push): Small input optimization, input length: " << m_cur_el);
508  m_result->small_run.assign(m_blocks1[0].begin(), m_blocks1[0].begin() + m_cur_el);
509  m_result->elements = m_cur_el;
510  return;
511  }
512 
513  const unsigned_type cur_run_size = div_ceil(m_cur_el, block_type::size); // in blocks
514  run.resize(cur_run_size);
515  block_manager* bm = block_manager::get_instance();
516  bm->new_blocks(AllocStr(), make_bid_iterator(run.begin()), make_bid_iterator(run.end()));
517 
518  disk_queues::get_instance()->set_priority_op(request_queue::WRITE);
519 
520  // fill the rest of the last block with max values
521  fill_with_max_value(m_blocks1, cur_run_size, m_cur_el);
522 
523  unsigned_type i = 0;
524  for ( ; i < cur_run_size; ++i)
525  {
526  run[i].value = m_blocks1[i][0];
527  if (m_write_reqs[i].get())
528  m_write_reqs[i]->wait();
529 
530  m_write_reqs[i] = m_blocks1[i].write(run[i].bid);
531  }
532  m_result->add_run(run, m_cur_el);
533 
534  for (i = 0; i < m_m2; ++i)
535  {
536  if (m_write_reqs[i].get())
537  m_write_reqs[i]->wait();
538  }
539  }
540 
541 public:
542  //! Creates the object.
543  //! \param cmp comparator object
544  //! \param memory_to_use memory amount that is allowed to used by the sorter in bytes
545  runs_creator(CompareType cmp, unsigned_type memory_to_use)
546  : m_cmp(cmp),
547  m_memory_to_use(memory_to_use),
548  m_memsize(memory_to_use / BlockSize / sort_memory_usage_factor()),
549  m_m2(m_memsize / 2),
550  m_el_in_run(m_m2 * block_type::size),
551  m_blocks1(NULL), m_blocks2(NULL),
552  m_write_reqs(NULL)
553  {
555  if (!(2 * BlockSize * sort_memory_usage_factor() <= m_memory_to_use)) {
556  throw bad_parameter("stxxl::runs_creator<>:runs_creator(): "
557  "INSUFFICIENT MEMORY provided, "
558  "please increase parameter 'memory_to_use'");
559  }
560  assert(m_m2 > 0);
561 
562  allocate();
563  }
564 
566  {
567  m_result_computed = 1;
568  deallocate();
569  }
570 
571  //! Clear current state and remove all items.
572  void clear()
573  {
574  if (!m_result)
575  m_result = new sorted_runs_data_type;
576  else
577  m_result->clear();
578 
579  m_result_computed = false;
580  m_cur_el = 0;
581 
582  for (unsigned_type i = 0; i < m_m2; ++i)
583  {
584  if (m_write_reqs[i].get())
585  m_write_reqs[i]->cancel();
586  }
587  }
588 
589  //! Allocates input buffers and clears result.
590  void allocate()
591  {
592  if (!m_blocks1)
593  {
594  m_blocks1 = new block_type[m_m2 * 2];
595  m_blocks2 = m_blocks1 + m_m2;
596 
597  m_write_reqs = new request_ptr[m_m2];
598  }
599 
600  clear();
601  }
602 
603  //! Deallocates input buffers but not the current result.
604  void deallocate()
605  {
606  result(); // finishes result
607 
608  if (m_blocks1)
609  {
610  delete[] ((m_blocks1 < m_blocks2) ? m_blocks1 : m_blocks2);
611  m_blocks1 = m_blocks2 = NULL;
612 
613  delete[] m_write_reqs;
614  m_write_reqs = NULL;
615  }
616  }
617 
618  //! Adds new element to the sorter.
619  //! \param val value to be added
620  void push(const value_type& val)
621  {
622  assert(m_result_computed == false);
623  if (LIKELY(m_cur_el < m_el_in_run))
624  {
625  m_blocks1[m_cur_el / block_type::size][m_cur_el % block_type::size] = val;
626  ++m_cur_el;
627  return;
628  }
629 
630  assert(m_el_in_run == m_cur_el);
631  m_cur_el = 0;
632 
633  // sort and store m_blocks1
634  sort_run(m_blocks1, m_el_in_run);
635 
636  const unsigned_type cur_run_blocks = div_ceil(m_el_in_run, block_type::size); // in blocks
637  run.resize(cur_run_blocks);
638  block_manager* bm = block_manager::get_instance();
639  bm->new_blocks(AllocStr(), make_bid_iterator(run.begin()), make_bid_iterator(run.end()));
640 
641  disk_queues::get_instance()->set_priority_op(request_queue::WRITE);
642 
643  for (unsigned_type i = 0; i < cur_run_blocks; ++i)
644  {
645  run[i].value = m_blocks1[i][0];
646  if (m_write_reqs[i].get())
647  m_write_reqs[i]->wait();
648 
649  m_write_reqs[i] = m_blocks1[i].write(run[i].bid);
650  }
651 
652  m_result->add_run(run, m_el_in_run);
653 
654  std::swap(m_blocks1, m_blocks2);
655 
656  push(val);
657  }
658 
659  //! Returns the sorted runs object.
660  //! \return Sorted runs object.
661  //! \remark Returned object is intended to be used by \c runs_merger object as input
663  {
664  if (!m_result_computed)
665  {
666  compute_result();
667  m_result_computed = true;
668 #ifdef STXXL_PRINT_STAT_AFTER_RF
669  STXXL_MSG(*stats::get_instance());
670 #endif //STXXL_PRINT_STAT_AFTER_RF
671  }
672  return m_result;
673  }
674 
675  //! number of items currently inserted.
677  {
678  return m_result->elements + m_cur_el;
679  }
680 
681  //! return comparator object.
682  const cmp_type & cmp() const
683  {
684  return m_cmp;
685  }
686 
687  //! return memory size used (in bytes).
689  {
690  return m_memory_to_use;
691  }
692 };
693 
694 //! Input strategy for \c runs_creator class.
695 //!
696 //! This strategy together with \c runs_creator class
697 //! allows to create sorted runs
698 //! data structure usable for \c runs_merger from
699 //! sequences of elements in sorted order
700 template <class ValueType>
702 {
703  typedef ValueType value_type;
704 };
705 
706 //! Forms sorted runs of data taking elements in sorted order (element by element).
707 //!
708 //! A specialization of \c runs_creator that
709 //! allows to create sorted runs
710 //! data structure usable for \c runs_merger from
711 //! sequences of elements in sorted order. <BR>
712 //! \tparam ValueType type of values (parameter for \c from_sorted_sequences strategy)
713 //! \tparam CompareType type of comparison object used for sorting the runs
714 //! \tparam BlockSize size of blocks used to store the runs
715 //! \tparam AllocStr functor that defines allocation strategy for the runs
716 template <
717  class ValueType,
718  class CompareType,
719  unsigned BlockSize,
720  class AllocStr
721  >
723  from_sorted_sequences<ValueType>,
724  CompareType,
725  BlockSize,
726  AllocStr
727  >: private noncopyable
728 {
729 public:
730  typedef ValueType value_type;
733  typedef AllocStr alloc_strategy_type;
734 
735 public:
736  typedef CompareType cmp_type;
740 
741 private:
743 
744  CompareType cmp;
745 
746  //! stores the result (sorted runs)
748  //! memory for internal use in blocks
755  //! needs to be reset after each run
757 
758 public:
759  //! Creates the object.
760  //! \param c comparator object
761  //! \param memory_to_use memory amount that is allowed to used by the sorter in bytes.
762  //! Recommended value: 2 * block_size * D
763  runs_creator(CompareType c, unsigned_type memory_to_use)
764  : cmp(c),
765  result_(new sorted_runs_data_type),
766  m_(memory_to_use / BlockSize / sort_memory_usage_factor()),
767  writer(m_, m_ / 2),
768  cur_block(writer.get_free_block()),
769  offset(0),
770  iblock(0),
771  irun(0)
772  {
774  assert(m_ > 0);
775  if (!(2 * BlockSize * sort_memory_usage_factor() <= memory_to_use)) {
776  throw bad_parameter("stxxl::runs_creator<>:runs_creator(): "
777  "INSUFFICIENT MEMORY provided, "
778  "please increase parameter 'memory_to_use'");
779  }
780  }
781 
782  //! Adds new element to the current run.
783  //! \param val value to be added to the current run
784  void push(const value_type& val)
785  {
786  assert(offset < block_type::size);
787 
788  (*cur_block)[offset] = val;
789  ++offset;
790 
791  if (offset == block_type::size)
792  {
793  // write current block
794 
795  block_manager* bm = block_manager::get_instance();
796  // allocate space for the block
797  result_->runs.resize(irun + 1);
798  result_->runs[irun].resize(iblock + 1);
799  bm->new_blocks(
800  alloc_strategy,
801  make_bid_iterator(result_->runs[irun].begin() + iblock),
802  make_bid_iterator(result_->runs[irun].end()),
803  iblock
804  );
805 
806  result_->runs[irun][iblock].value = (*cur_block)[0]; // init trigger
807  cur_block = writer.write(cur_block, result_->runs[irun][iblock].bid);
808  ++iblock;
809 
810  offset = 0;
811  }
812 
813  ++result_->elements;
814  }
815 
816  //! Finishes current run and begins new one.
817  void finish()
818  {
819  if (offset == 0 && iblock == 0) // current run is empty
820  return;
821 
822  result_->runs_sizes.resize(irun + 1);
823  result_->runs_sizes.back() = iblock * block_type::size + offset;
824 
825  if (offset) // if current block is partially filled
826  {
827  while (offset != block_type::size)
828  {
829  (*cur_block)[offset] = cmp.max_value();
830  ++offset;
831  }
832  offset = 0;
833 
834  block_manager* bm = block_manager::get_instance();
835  // allocate space for the block
836  result_->runs.resize(irun + 1);
837  result_->runs[irun].resize(iblock + 1);
838  bm->new_blocks(
839  alloc_strategy,
840  make_bid_iterator(result_->runs[irun].begin() + iblock),
841  make_bid_iterator(result_->runs[irun].end()),
842  iblock
843  );
844 
845  result_->runs[irun][iblock].value = (*cur_block)[0]; // init trigger
846  cur_block = writer.write(cur_block, result_->runs[irun][iblock].bid);
847  }
848  else
849  { }
850 
851  alloc_strategy = alloc_strategy_type(); // reinitialize block allocator for the next run
852  iblock = 0;
853  ++irun;
854  }
855 
856  //! Returns the sorted runs object.
857  //! \return Sorted runs object
858  //! \remark Returned object is intended to be used by \c runs_merger object as input
860  {
861  finish();
862  writer.flush();
863 
864  return result_;
865  }
866 };
867 
868 //! Checker for the sorted runs object created by the \c runs_creator .
869 //! \param sruns sorted runs object
870 //! \param cmp comparison object used for checking the order of elements in runs
871 //! \return \c true if runs are sorted, \c false otherwise
872 template <class RunsType, class CompareType>
873 bool check_sorted_runs(const RunsType& sruns, CompareType cmp)
874 {
876  typedef typename RunsType::element_type::block_type block_type;
877  STXXL_VERBOSE2("Elements: " << sruns->elements);
878  unsigned_type nruns = sruns->runs.size();
879  STXXL_VERBOSE2("Runs: " << nruns);
880  unsigned_type irun = 0;
881  for (irun = 0; irun < nruns; ++irun)
882  {
883  const unsigned_type nblocks = sruns->runs[irun].size();
884  block_type* blocks = new block_type[nblocks];
885  request_ptr* reqs = new request_ptr[nblocks];
886  for (unsigned_type j = 0; j < nblocks; ++j)
887  {
888  reqs[j] = blocks[j].read(sruns->runs[irun][j].bid);
889  }
890  wait_all(reqs, reqs + nblocks);
891  delete[] reqs;
892 
893  for (unsigned_type j = 0; j < nblocks; ++j)
894  {
895  if (cmp(blocks[j][0], sruns->runs[irun][j].value) ||
896  cmp(sruns->runs[irun][j].value, blocks[j][0])) //!=
897  {
898  STXXL_ERRMSG("check_sorted_runs wrong trigger in the run");
899  delete[] blocks;
900  return false;
901  }
902  }
903  if (!stxxl::is_sorted(
904  make_element_iterator(blocks, 0),
905  make_element_iterator(blocks, sruns->runs_sizes[irun]),
906  cmp))
907  {
908  STXXL_ERRMSG("check_sorted_runs wrong order in the run");
909  delete[] blocks;
910  return false;
911  }
912 
913  delete[] blocks;
914  }
915 
916  STXXL_MSG("Checking runs finished successfully");
917 
918  return true;
919 }
920 
921 ////////////////////////////////////////////////////////////////////////
922 // MERGE RUNS //
923 ////////////////////////////////////////////////////////////////////////
924 
925 //! Merges sorted runs.
926 //!
927 //! \tparam RunsType type of the sorted runs, available as \c runs_creator::sorted_runs_type ,
928 //! \tparam CompareType type of comparison object used for merging
929 //! \tparam AllocStr allocation strategy used to allocate the blocks for
930 //! storing intermediate results if several merge passes are required
931 template <class RunsType,
932  class CompareType,
933  class AllocStr = STXXL_DEFAULT_ALLOC_STRATEGY>
935 {
936 public:
937  typedef RunsType sorted_runs_type;
938  typedef CompareType value_cmp;
939  typedef AllocStr alloc_strategy;
940 
941  typedef typename sorted_runs_type::element_type sorted_runs_data_type;
942  typedef typename sorted_runs_data_type::size_type size_type;
943  typedef typename sorted_runs_data_type::run_type run_type;
944  typedef typename sorted_runs_data_type::block_type block_type;
946  typedef typename run_type::value_type trigger_entry_type;
952  typedef std::pair<typename block_type::iterator, typename block_type::iterator> sequence;
953  typedef typename std::vector<sequence>::size_type seqs_size_type;
954 
955 public:
956  //! Standard stream typedef.
957  typedef typename sorted_runs_data_type::value_type value_type;
958 
959 private:
960  //! comparator object to sort runs
962 
963  //! memory size in bytes to use
965 
966  //! smart pointer to sorted_runs object
968 
969  //! items remaining in input
971 
972  //! memory buffer for merging from external streams
974 
975  //! pointer into current memory buffer: this is either m_buffer_block or the small_runs vector
977 
978  //! pointer into current memory buffer: end after range of current values
980 
981  //! sequence of block needed for merging
983 
984  //! precalculated order of blocks in which they are prefetched
986 
987  //! prefetcher object
989 
990  //! loser tree used for native merging
992 
993 #if STXXL_PARALLEL_MULTIWAY_MERGE
994  std::vector<sequence>* seqs;
995  std::vector<block_type*>* buffers;
996  diff_type num_currently_mergeable;
997 #endif
998 
999 #if STXXL_CHECK_ORDER_IN_SORTS
1000  //! previous element to ensure the current output ordering
1001  value_type m_last_element;
1002 #endif //STXXL_CHECK_ORDER_IN_SORTS
1003 
1004  ////////////////////////////////////////////////////////////////////
1005 
1006  void merge_recursively();
1007 
1009  {
1010  if (m_prefetcher)
1011  {
1012  delete m_losers;
1013 #if STXXL_PARALLEL_MULTIWAY_MERGE
1014  delete seqs;
1015  delete buffers;
1016 #endif
1017  delete m_prefetcher;
1018  delete[] m_prefetch_seq;
1019  m_prefetcher = NULL;
1020  }
1021  }
1022 
1024  {
1025  STXXL_VERBOSE1("fill_buffer_block");
1026  if (do_parallel_merge())
1027  {
1028 #if STXXL_PARALLEL_MULTIWAY_MERGE
1029 // begin of STL-style merging
1030  diff_type rest = out_block_type::size; // elements still to merge for this output block
1031 
1032  do // while rest > 0 and still elements available
1033  {
1034  if (num_currently_mergeable < rest)
1035  {
1036  if (!m_prefetcher || m_prefetcher->empty())
1037  {
1038  // anything remaining is already in memory
1039  num_currently_mergeable = m_elements_remaining;
1040  }
1041  else
1042  {
1043  num_currently_mergeable = sort_helper::count_elements_less_equal(
1044  *seqs, m_consume_seq[m_prefetcher->pos()].value, m_cmp);
1045  }
1046  }
1047 
1048  diff_type output_size = STXXL_MIN(num_currently_mergeable, rest); // at most rest elements
1049 
1050  STXXL_VERBOSE1("before merge " << output_size);
1051 
1053  (*seqs).begin(), (*seqs).end(),
1054  m_buffer_block->end() - rest, output_size, m_cmp);
1055  // sequence iterators are progressed appropriately
1056 
1057  rest -= output_size;
1058  num_currently_mergeable -= output_size;
1059 
1060  STXXL_VERBOSE1("after merge");
1061 
1062  sort_helper::refill_or_remove_empty_sequences(*seqs, *buffers, *m_prefetcher);
1063  } while (rest > 0 && (*seqs).size() > 0);
1064 
1065 #if STXXL_CHECK_ORDER_IN_SORTS
1066  if (!stxxl::is_sorted(m_buffer_block->begin(), m_buffer_block->end(), cmp))
1067  {
1068  for (value_type* i = m_buffer_block->begin() + 1; i != m_buffer_block->end(); ++i)
1069  if (cmp(*i, *(i - 1)))
1070  {
1071  STXXL_VERBOSE1("Error at position " << (i - m_buffer_block->begin()));
1072  }
1073  assert(false);
1074  }
1075 #endif //STXXL_CHECK_ORDER_IN_SORTS
1076 
1077 // end of STL-style merging
1078 #else
1080 #endif //STXXL_PARALLEL_MULTIWAY_MERGE
1081  }
1082  else
1083  {
1084 // begin of native merging procedure
1085  m_losers->multi_merge(m_buffer_block->elem, m_buffer_block->elem + STXXL_MIN<size_type>(out_block_type::size, m_elements_remaining));
1086 // end of native merging procedure
1087  }
1088  STXXL_VERBOSE1("current block filled");
1089 
1090  m_current_ptr = m_buffer_block->elem;
1091  m_current_end = m_buffer_block->elem + STXXL_MIN<size_type>(out_block_type::size, m_elements_remaining);
1092 
1093  if (m_elements_remaining <= out_block_type::size)
1094  deallocate_prefetcher();
1095  }
1096 
1097 public:
1098  //! Creates a runs merger object.
1099  //! \param c comparison object
1100  //! \param memory_to_use amount of memory available for the merger in bytes
1102  : m_cmp(c),
1103  m_memory_to_use(memory_to_use),
1104  m_buffer_block(new out_block_type),
1105  m_prefetch_seq(NULL),
1106  m_prefetcher(NULL),
1107  m_losers(NULL)
1109  , seqs(NULL),
1110  buffers(NULL),
1111  num_currently_mergeable(0)
1112 #endif
1114  , m_last_element(m_cmp.min_value())
1115 #endif //STXXL_CHECK_ORDER_IN_SORTS
1116  {
1118  }
1119 
1120  //! Set memory amount to use for the merger in bytes.
1121  void set_memory_to_use(unsigned_type memory_to_use)
1122  {
1123  m_memory_to_use = memory_to_use;
1124  }
1125 
1126  //! Initialize the runs merger object with a new round of sorted_runs.
1127  void initialize(const sorted_runs_type& sruns)
1128  {
1129  m_sruns = sruns;
1130  m_elements_remaining = m_sruns->elements;
1131 
1132  if (empty())
1133  return;
1134 
1135  if (!m_sruns->small_run.empty())
1136  {
1137  // we have a small input <= B, that is kept in the main memory
1138  STXXL_VERBOSE1("basic_runs_merger: small input optimization, input length: " << m_elements_remaining);
1139  assert(m_elements_remaining == size_type(m_sruns->small_run.size()));
1140 
1141  m_current_ptr = &m_sruns->small_run[0];
1142  m_current_end = m_current_ptr + m_sruns->small_run.size();
1143 
1144  return;
1145  }
1146 
1147 #if STXXL_CHECK_ORDER_IN_SORTS
1148  assert(check_sorted_runs(m_sruns, m_cmp));
1149 #endif //STXXL_CHECK_ORDER_IN_SORTS
1150 
1151  // *** test whether recursive merging is necessary
1152 
1153  disk_queues::get_instance()->set_priority_op(request_queue::WRITE);
1154 
1155  int_type disks_number = config::get_instance()->disks_number();
1156  unsigned_type min_prefetch_buffers = 2 * disks_number;
1157  unsigned_type input_buffers =
1158  (m_memory_to_use > sizeof(out_block_type)
1159  ? m_memory_to_use - sizeof(out_block_type)
1160  : 0) / block_type::raw_size;
1161  unsigned_type nruns = m_sruns->runs.size();
1162 
1163  if (input_buffers < nruns + min_prefetch_buffers)
1164  {
1165  // can not merge runs in one pass. merge recursively:
1166  STXXL_WARNMSG_RECURSIVE_SORT("The implementation of sort requires more than one merge pass, therefore for a better");
1167  STXXL_WARNMSG_RECURSIVE_SORT("efficiency decrease block size of run storage (a parameter of the run_creator)");
1168  STXXL_WARNMSG_RECURSIVE_SORT("or increase the amount memory dedicated to the merger.");
1169  STXXL_WARNMSG_RECURSIVE_SORT("m=" << input_buffers << " nruns=" << nruns << " prefetch_blocks=" << min_prefetch_buffers);
1170  STXXL_WARNMSG_RECURSIVE_SORT("memory_to_use=" << m_memory_to_use << " bytes block_type::raw_size=" << block_type::raw_size << " bytes");
1171 
1172  // check whether we have enough memory to merge recursively
1173  unsigned_type recursive_merge_buffers = m_memory_to_use / block_type::raw_size;
1174  if (recursive_merge_buffers < 2 * min_prefetch_buffers + 1 + 2) {
1175  // recursive merge uses min_prefetch_buffers for input buffering and min_prefetch_buffers output buffering
1176  // as well as 1 current output block and at least 2 input blocks
1177  STXXL_ERRMSG("There are only m=" << recursive_merge_buffers << " blocks available for recursive merging, but "
1178  << min_prefetch_buffers << "+" << min_prefetch_buffers << "+1 are needed read-ahead/write-back/output, and");
1179  STXXL_ERRMSG("the merger requires memory to store at least two input blocks internally. Aborting.");
1180  throw bad_parameter("basic_runs_merger::sort(): INSUFFICIENT MEMORY provided, please increase parameter 'memory_to_use'");
1181  }
1182 
1183  merge_recursively();
1184 
1185  nruns = m_sruns->runs.size();
1186  }
1187 
1188  assert(nruns + min_prefetch_buffers <= input_buffers);
1189 
1190  // *** Allocate prefetcher and merge data structure
1191 
1192  deallocate_prefetcher();
1193 
1194  unsigned_type prefetch_seq_size = 0;
1195  for (unsigned_type i = 0; i < nruns; ++i)
1196  {
1197  prefetch_seq_size += m_sruns->runs[i].size();
1198  }
1199 
1200  m_consume_seq.resize(prefetch_seq_size);
1201  m_prefetch_seq = new int_type[prefetch_seq_size];
1202 
1203  typename run_type::iterator copy_start = m_consume_seq.begin();
1204  for (unsigned_type i = 0; i < nruns; ++i)
1205  {
1206  copy_start = std::copy(m_sruns->runs[i].begin(),
1207  m_sruns->runs[i].end(),
1208  copy_start);
1209  }
1210 
1211  std::stable_sort(m_consume_seq.begin(), m_consume_seq.end(),
1213 
1214  const unsigned_type n_prefetch_buffers = STXXL_MAX(min_prefetch_buffers, input_buffers - nruns);
1215 
1216 #if STXXL_SORT_OPTIMAL_PREFETCHING
1217  // heuristic
1218  const int_type n_opt_prefetch_buffers = min_prefetch_buffers + (3 * (n_prefetch_buffers - min_prefetch_buffers)) / 10;
1219 
1221  m_consume_seq,
1222  m_prefetch_seq,
1223  n_opt_prefetch_buffers,
1224  config::get_instance()->get_max_device_id());
1225 #else
1226  for (unsigned_type i = 0; i < prefetch_seq_size; ++i)
1227  m_prefetch_seq[i] = i;
1228 #endif //STXXL_SORT_OPTIMAL_PREFETCHING
1229 
1230  m_prefetcher = new prefetcher_type(
1231  m_consume_seq.begin(),
1232  m_consume_seq.end(),
1233  m_prefetch_seq,
1234  STXXL_MIN(nruns + n_prefetch_buffers, prefetch_seq_size));
1235 
1236  if (do_parallel_merge())
1237  {
1238 #if STXXL_PARALLEL_MULTIWAY_MERGE
1239 // begin of STL-style merging
1240  seqs = new std::vector<sequence>(nruns);
1241  buffers = new std::vector<block_type*>(nruns);
1242 
1243  for (unsigned_type i = 0; i < nruns; ++i) //initialize sequences
1244  {
1245  (*buffers)[i] = m_prefetcher->pull_block(); //get first block of each run
1246  (*seqs)[i] = std::make_pair((*buffers)[i]->begin(), (*buffers)[i]->end()); //this memory location stays the same, only the data is exchanged
1247  }
1248 // end of STL-style merging
1249 #else
1251 #endif //STXXL_PARALLEL_MULTIWAY_MERGE
1252  }
1253  else
1254  {
1255 // begin of native merging procedure
1256  m_losers = new loser_tree_type(m_prefetcher, nruns, run_cursor2_cmp_type(m_cmp));
1257 // end of native merging procedure
1258  }
1259 
1260  fill_buffer_block();
1261  }
1262 
1263  //! Deallocate temporary structures freeing memory prior to next initialize().
1264  void deallocate()
1265  {
1266  deallocate_prefetcher();
1267  m_sruns = NULL; // release reference on result object
1268  }
1269 
1270 public:
1271  //! Standard stream method.
1272  bool empty() const
1273  {
1274  return (m_elements_remaining == 0);
1275  }
1276 
1277  //! Standard size method.
1278  size_type size() const
1279  {
1280  return m_elements_remaining;
1281  }
1282 
1283  //! Standard stream method.
1284  const value_type& operator * () const
1285  {
1286  assert(!empty());
1287  return *m_current_ptr;
1288  }
1289 
1290  //! Standard stream method.
1291  const value_type* operator -> () const
1292  {
1293  return &(operator * ());
1294  }
1295 
1296  //! Standard stream method.
1297  basic_runs_merger& operator ++ () // preincrement operator
1298  {
1299  assert(!empty());
1300  assert(m_current_ptr != m_current_end);
1301 
1302  --m_elements_remaining;
1303  ++m_current_ptr;
1304 
1305  if (LIKELY(m_current_ptr == m_current_end && !empty()))
1306  {
1307  fill_buffer_block();
1308 
1309 #if STXXL_CHECK_ORDER_IN_SORTS
1310  assert(stxxl::is_sorted(m_buffer_block->elem, m_buffer_block->elem + STXXL_MIN<size_type>(m_elements_remaining, m_buffer_block->size), m_cmp));
1311 #endif //STXXL_CHECK_ORDER_IN_SORTS
1312  }
1313 
1314 #if STXXL_CHECK_ORDER_IN_SORTS
1315  if (!empty())
1316  {
1317  assert(!m_cmp(operator * (), m_last_element));
1318  m_last_element = operator * ();
1319  }
1320 #endif //STXXL_CHECK_ORDER_IN_SORTS
1321 
1322  return *this;
1323  }
1324 
1325  //! Destructor.
1326  //! \remark Deallocates blocks of the input sorted runs object
1328  {
1329  deallocate_prefetcher();
1330 
1331  delete m_buffer_block;
1332  }
1333 };
1334 
1335 template <class RunsType, class CompareType, class AllocStr>
1337 {
1338  block_manager* bm = block_manager::get_instance();
1339  unsigned_type ndisks = config::get_instance()->disks_number();
1340  unsigned_type nwrite_buffers = 2 * ndisks;
1341  unsigned_type memory_for_write_buffers = nwrite_buffers * sizeof(block_type);
1342 
1343  // memory consumption of the recursive merger (uses block_type as
1344  // out_block_type)
1345  unsigned_type recursive_merger_memory_prefetch_buffers = 2 * ndisks * sizeof(block_type);
1346  unsigned_type recursive_merger_memory_out_block = sizeof(block_type);
1347  unsigned_type memory_for_buffers = memory_for_write_buffers
1348  + recursive_merger_memory_prefetch_buffers
1349  + recursive_merger_memory_out_block;
1350  // maximum arity in the recursive merger
1351  unsigned_type max_arity = (m_memory_to_use > memory_for_buffers ? m_memory_to_use - memory_for_buffers : 0) / block_type::raw_size;
1352 
1353  unsigned_type nruns = m_sruns->runs.size();
1354  const unsigned_type merge_factor = optimal_merge_factor(nruns, max_arity);
1355  assert(merge_factor > 1);
1356  assert(merge_factor <= max_arity);
1357 
1358  while (nruns > max_arity)
1359  {
1360  unsigned_type new_nruns = div_ceil(nruns, merge_factor);
1361  STXXL_MSG("Starting new merge phase: nruns: " << nruns <<
1362  " opt_merge_factor: " << merge_factor <<
1363  " max_arity: " << max_arity << " new_nruns: " << new_nruns);
1364 
1365  // construct new sorted_runs data object which will be swapped into
1366  // m_sruns
1367 
1368  sorted_runs_data_type new_runs;
1369  new_runs.runs.resize(new_nruns);
1370  new_runs.runs_sizes.resize(new_nruns);
1371  new_runs.elements = m_sruns->elements;
1372 
1373  // merge all runs from m_runs into news_runs
1374 
1375  unsigned_type runs_left = nruns;
1376  unsigned_type cur_out_run = 0;
1377  size_type elements_left = m_sruns->elements;
1378 
1379  while (runs_left > 0)
1380  {
1381  unsigned_type runs2merge = STXXL_MIN(runs_left, merge_factor);
1382  STXXL_MSG("Merging " << runs2merge << " runs");
1383 
1384  if (runs2merge > 1) // non-trivial merge
1385  {
1386  // count the number of elements in the run
1387  size_type elements_in_new_run = 0;
1388  for (unsigned_type i = nruns - runs_left; i < (nruns - runs_left + runs2merge); ++i)
1389  {
1390  elements_in_new_run += m_sruns->runs_sizes[i];
1391  }
1392  new_runs.runs_sizes[cur_out_run] = elements_in_new_run;
1393 
1394  // calculate blocks in run
1395  const unsigned_type blocks_in_new_run = (unsigned_type)div_ceil(elements_in_new_run, block_type::size);
1396 
1397  // allocate blocks for the new runs
1398  new_runs.runs[cur_out_run].resize(blocks_in_new_run);
1399  bm->new_blocks(alloc_strategy(), make_bid_iterator(new_runs.runs[cur_out_run].begin()), make_bid_iterator(new_runs.runs[cur_out_run].end()));
1400 
1401  // Construct temporary sorted_runs object as input into recursive merger.
1402  // This sorted_runs is copied a subset of the over-large set of runs, which
1403  // will be deallocated from external memory once the runs are merged.
1404  sorted_runs_type cur_runs = new sorted_runs_data_type;
1405  cur_runs->runs.resize(runs2merge);
1406  cur_runs->runs_sizes.resize(runs2merge);
1407 
1408  std::copy(m_sruns->runs.begin() + nruns - runs_left,
1409  m_sruns->runs.begin() + nruns - runs_left + runs2merge,
1410  cur_runs->runs.begin());
1411  std::copy(m_sruns->runs_sizes.begin() + nruns - runs_left,
1412  m_sruns->runs_sizes.begin() + nruns - runs_left + runs2merge,
1413  cur_runs->runs_sizes.begin());
1414 
1415  cur_runs->elements = elements_in_new_run;
1416  elements_left -= elements_in_new_run;
1417 
1418  // construct recursive merger
1419 
1421  merger(m_cmp, m_memory_to_use - memory_for_write_buffers);
1422  merger.initialize(cur_runs);
1423 
1424  { // make sure everything is being destroyed in right time
1426  new_runs.runs[cur_out_run].begin(),
1427  nwrite_buffers);
1428 
1429  size_type cnt = 0;
1430  const size_type cnt_max = cur_runs->elements;
1431 
1432  while (cnt != cnt_max)
1433  {
1434  *out = *merger;
1435  if ((cnt % block_type::size) == 0) // have to write the trigger value
1436  new_runs.runs[cur_out_run][(unsigned_type)(cnt / size_type(block_type::size))].value = *merger;
1437 
1438  ++cnt, ++out, ++merger;
1439  }
1440  assert(merger.empty());
1441 
1442  while (cnt % block_type::size)
1443  {
1444  *out = m_cmp.max_value();
1445  ++out, ++cnt;
1446  }
1447  }
1448 
1449  // deallocate merged runs by destroying cur_runs
1450  }
1451  else // runs2merge = 1 -> no merging needed
1452  {
1453  assert(cur_out_run + 1 == new_runs.runs.size());
1454 
1455  elements_left -= m_sruns->runs_sizes.back();
1456 
1457  // copy block identifiers into new sorted_runs object
1458  new_runs.runs.back() = m_sruns->runs.back();
1459  new_runs.runs_sizes.back() = m_sruns->runs_sizes.back();
1460  }
1461 
1462  runs_left -= runs2merge;
1463  ++cur_out_run;
1464  }
1465 
1466  assert(elements_left == 0);
1467 
1468  // clear bid vector of m_sruns to skip deallocation of blocks in
1469  // destructor
1470  m_sruns->runs.clear();
1471 
1472  // replaces data in referenced counted object m_sruns end while (nruns
1473  // > max_arity)
1474  std::swap(nruns, new_nruns);
1475  m_sruns->swap(new_runs);
1476  }
1477 }
1478 
1479 //! Merges sorted runs.
1480 //!
1481 //! \tparam RunsType type of the sorted runs, available as \c runs_creator::sorted_runs_type ,
1482 //! \tparam CompareType type of comparison object used for merging
1483 //! \tparam AllocStr allocation strategy used to allocate the blocks for
1484 //! storing intermediate results if several merge passes are required
1485 template <class RunsType,
1486  class CompareType = typename RunsType::element_type::cmp_type,
1487  class AllocStr = STXXL_DEFAULT_ALLOC_STRATEGY>
1488 class runs_merger : public basic_runs_merger<RunsType, CompareType, AllocStr>
1489 {
1490 protected:
1492 
1493 public:
1494  typedef RunsType sorted_runs_type;
1495  typedef typename base::value_cmp value_cmp;
1496  typedef typename base::value_cmp cmp_type;
1497  typedef typename base::block_type block_type;
1498 
1499 public:
1500  //! Creates a runs merger object.
1501  //! \param sruns input sorted runs object
1502  //! \param cmp comparison object
1503  //! \param memory_to_use amount of memory available for the merger in bytes
1505  unsigned_type memory_to_use)
1506  : base(cmp, memory_to_use)
1507  {
1508  this->initialize(sruns);
1509  }
1510 
1511  //! Creates a runs merger object without initializing a round of sorted_runs.
1512  //! \param cmp comparison object
1513  //! \param memory_to_use amount of memory available for the merger in bytes
1515  : base(cmp, memory_to_use)
1516  { }
1517 };
1518 
1519 ////////////////////////////////////////////////////////////////////////
1520 // SORT //
1521 ////////////////////////////////////////////////////////////////////////
1522 
1523 //! Produces sorted stream from input stream.
1524 //!
1525 //! \tparam Input type of the input stream
1526 //! \tparam CompareType type of comparison object used for sorting the runs
1527 //! \tparam BlockSize size of blocks used to store the runs
1528 //! \tparam AllocStr functor that defines allocation strategy for the runs
1529 //! \remark Implemented as the composition of \c runs_creator and \c runs_merger .
1530 template <
1531  class Input,
1532  class CompareType,
1533  unsigned BlockSize = STXXL_DEFAULT_BLOCK_SIZE(typename Input::value_type),
1534  class AllocStr = STXXL_DEFAULT_ALLOC_STRATEGY,
1535  class RunsCreatorType = runs_creator<Input, CompareType, BlockSize, AllocStr>
1536  >
1537 class sort : public noncopyable
1538 {
1539  typedef RunsCreatorType runs_creator_type;
1540  typedef typename runs_creator_type::sorted_runs_type sorted_runs_type;
1542 
1545 
1546 public:
1547  //! Standard stream typedef.
1548  typedef typename Input::value_type value_type;
1549 
1550  //! Creates the object.
1551  //! \param in input stream
1552  //! \param c comparator object
1553  //! \param memory_to_use memory amount that is allowed to used by the sorter in bytes
1554  sort(Input& in, CompareType c, unsigned_type memory_to_use)
1555  : creator(in, c, memory_to_use),
1556  merger(creator.result(), c, memory_to_use)
1557  {
1559  }
1560 
1561  //! Creates the object.
1562  //! \param in input stream
1563  //! \param c comparator object
1564  //! \param m_memory_to_userc memory amount that is allowed to used by the runs creator in bytes
1565  //! \param m_memory_to_use memory amount that is allowed to used by the merger in bytes
1566  sort(Input& in, CompareType c, unsigned_type m_memory_to_userc,
1567  unsigned_type m_memory_to_use)
1568  : creator(in, c, m_memory_to_userc),
1569  merger(creator.result(), c, m_memory_to_use)
1570  {
1572  }
1573 
1574  //! Standard stream method.
1575  bool empty() const
1576  {
1577  return merger.empty();
1578  }
1579 
1580  //! Standard stream method.
1581  const value_type& operator * () const
1582  {
1583  assert(!empty());
1584  return *merger;
1585  }
1586 
1587  const value_type* operator -> () const
1588  {
1589  assert(!empty());
1590  return merger.operator -> ();
1591  }
1592 
1593  //! Standard stream method.
1595  {
1596  ++merger;
1597  return *this;
1598  }
1599 };
1600 
1601 //! Computes sorted runs type from value type and block size.
1602 //!
1603 //! \tparam ValueType type of values ins sorted runs
1604 //! \tparam BlockSize size of blocks where sorted runs stored
1605 template <
1606  class ValueType,
1607  unsigned BlockSize
1608  >
1610 {
1611  typedef ValueType value_type;
1614 
1615 public:
1617 };
1618 
1619 //! \}
1620 
1621 } // namespace stream
1622 
1623 //! \addtogroup stlalgo
1624 //! \{
1625 
1626 //! Sorts range of any random access iterators externally.
1627 //!
1628 //! \param begin iterator pointing to the first element of the range
1629 //! \param end iterator pointing to the last+1 element of the range
1630 //! \param cmp comparison object
1631 //! \param MemSize memory to use for sorting (in bytes)
1632 //! \param AS allocation strategy
1633 //!
1634 //! The \c BlockSize template parameter defines the block size to use (in bytes)
1635 //! \warning Slower than External Iterator Sort
1636 template <
1637  unsigned BlockSize,
1638  class RandomAccessIterator,
1639  class CmpType,
1640  class AllocStr
1641  >
1642 void sort(RandomAccessIterator begin,
1643  RandomAccessIterator end,
1644  CmpType cmp,
1645  unsigned_type MemSize,
1646  AllocStr AS)
1647 {
1648  STXXL_UNUSED(AS);
1650  InputType Input(begin, end);
1652  sorter_type Sort(Input, cmp, MemSize);
1653  stream::materialize(Sort, begin);
1654 }
1655 
1656 //! \}
1657 
1659 
1660 #endif // !STXXL_STREAM_SORT_STREAM_HEADER
1661 // vim: et:ts=4:sw=4
sort_helper::run_cursor2_cmp< block_type, prefetcher_type, value_cmp > run_cursor2_cmp_type
Definition: sort_stream.h:949
sorted_runs_data_type::run_type run_type
Definition: sort_stream.h:64
#define STXXL_THROW_UNREACHABLE()
Throws stxxl::unreachable with &quot;Error in file [file], line [line] : this code should never be reachab...
counting_ptr< sorted_runs_data_type > sorted_runs_type
Definition: sort_stream.h:65
block_prefetcher< block_type, typename run_type::iterator > prefetcher_type
Definition: sort_stream.h:947
compat::remove_const< Integral >::type div_ceil(Integral n, Integral2 d)
Definition: utils.h:199
#define STXXL_DEFAULT_BLOCK_SIZE(type)
#define LIKELY(c)
Definition: utils.h:219
Forms sorted runs of data from a stream.
Definition: sort_stream.h:51
unsigned sort_memory_usage_factor()
Definition: parallel.h:73
sorted_runs_data_type::run_type run_type
Definition: sort_stream.h:943
const Type & STXXL_MIN(const Type &a, const Type &b)
Definition: utils.h:146
bool check_sorted_runs(RunType **runs, unsigned_type nruns, unsigned_type m, ValueCmp cmp)
Definition: sort.h:207
runs_merger< sorted_runs_type, CompareType, AllocStr > runs_merger_type
Definition: sort_stream.h:1541
Block manager class.
Definition: block_manager.h:61
external_size_type size() const
number of items currently inserted.
Definition: sort_stream.h:676
bool m_result_computed
true after the result() method was called for the first time
Definition: sort_stream.h:451
base::value_type value_type
Definition: sort_stream.h:369
base::sorted_runs_type sorted_runs_type
Definition: sort_stream.h:372
unsigned_type fetch(block_type *blocks, unsigned_type first_idx, unsigned_type last_idx)
Fetch data from input into blocks[first_idx,last_idx).
Definition: sort_stream.h:84
long long int int64
Definition: types.h:38
void sort(ExtIterator first, ExtIterator last, StrictWeakOrdering cmp, unsigned_type M)
Sort records comparison-based, see stxxl::sort -- Sorting Comparison-Based.
Definition: sort.h:676
trigger_entry_iterator< Iterator > make_bid_iterator(Iterator iter)
Definition: adaptor.h:239
void fill_with_max_value(block_type *blocks, unsigned_type num_blocks, unsigned_type first_idx)
fill the rest of the block with max values
Definition: sort_stream.h:99
unsigned_type count_elements_less_equal(const SequenceVector &seqs, const ValueType &bound, Comparator cmp)
Definition: sort_helper.h:101
All sorted runs of a sort operation.
Definition: sorted_runs.h:37
A model of stream that retrieves the data from an input iterator. For convenience use streamify funct...
Definition: stream.h:46
#define STXXL_PARALLEL_MULTIWAY_MERGE
Definition: parallel.h:47
void deallocate()
Deallocates input buffers but not the current result.
Definition: sort_stream.h:604
loser_tree_type * m_losers
loser tree used for native merging
Definition: sort_stream.h:991
void deallocate()
Deallocate temporary structures freeing memory prior to next initialize().
Definition: sort_stream.h:1264
const value_type * m_current_end
pointer into current memory buffer: end after range of current values
Definition: sort_stream.h:979
RandomAccessIterator3 multiway_merge(RandomAccessIteratorPairIterator seqs_begin, RandomAccessIteratorPairIterator seqs_end, RandomAccessIterator3 target, DiffType length, Comparator comp)
Multi-way merging dispatcher.
Definition: parallel.h:144
#define STXXL_WARNMSG_RECURSIVE_SORT
Definition: sort_base.h:20
void verify_sentinel_strict_weak_ordering(StrictWeakOrdering cmp)
Definition: sort_helper.h:28
CompareType m_cmp
comparator used to sort block groups
Definition: sort_stream.h:73
bool m_result_computed
true iff result is already computed (used in &#39;result()&#39; method)
Definition: sort_stream.h:81
bool empty() const
Standard stream method.
Definition: sort_stream.h:1575
#define STXXL_DEFAULT_ALLOC_STRATEGY
Definition: block_alloc.h:258
sorted_runs_type m_result
stores the result (sorted runs) as smart pointer
Definition: sort_stream.h:77
basic_runs_merger< RunsType, CompareType, AllocStr > base
Definition: sort_stream.h:1491
runs_merger(sorted_runs_type &sruns, value_cmp cmp, unsigned_type memory_to_use)
Creates a runs merger object.
Definition: sort_stream.h:1504
void refill_or_remove_empty_sequences(SequenceVector &seqs, BufferPtrVector &buffers, Prefetcher &prefetcher)
Definition: sort_helper.h:121
run_type m_consume_seq
sequence of block needed for merging
Definition: sort_stream.h:982
unsigned_type m_memory_to_use
memory size in bytes to use
Definition: sort_stream.h:964
request_ptr * m_write_reqs
reference to write requests transporting the last accumulation buffer to disk
Definition: sort_stream.h:467
run_cursor2< block_type, prefetcher_type > run_cursor_type
Definition: sort_stream.h:948
Definition: sort_helper.h:53
Computes sorted runs type from value type and block size.
Definition: sort_stream.h:1609
#define STXXL_VERBOSE2(x)
Definition: verbose.h:121
block_type * m_blocks1
accumulation buffer of size m_m2 blocks, half the available memory size
Definition: sort_stream.h:460
int_type * m_prefetch_seq
precalculated order of blocks in which they are prefetched
Definition: sort_stream.h:985
Buffered output stream.
Definition: buf_ostream.h:29
const unsigned_type m_memsize
memory size in numberr of blocks for internal use
Definition: sort_stream.h:445
void check_sort_settings()
Definition: parallel.h:82
std::vector< sequence >::size_type seqs_size_type
Definition: sort_stream.h:953
void push(const value_type &val)
Adds new element to the sorter.
Definition: sort_stream.h:620
uint_pair & operator++()
prefix increment operator (directly manipulates the integer parts)
Definition: uint_types.h:163
unsigned_type m_memsize
memory for internal use in blocks
Definition: sort_stream.h:79
runs_merger_type merger
Definition: sort_stream.h:1544
run_type run
run object containing block ids of the run being written to disk
Definition: sort_stream.h:470
basic_runs_merger(value_cmp c, unsigned_type memory_to_use)
Creates a runs merger object.
Definition: sort_stream.h:1101
base::block_type block_type
Definition: sort_stream.h:1497
sorted_runs_type::element_type sorted_runs_data_type
Definition: sort_stream.h:941
static long long curr
void delete_blocks(const BIDIteratorClass &bidbegin, const BIDIteratorClass &bidend)
Deallocates blocks.
element_iterator_traits< block_type, external_size_type >::element_iterator element_iterator
Definition: sort_stream.h:67
sort_helper::trigger_entry< block_type > trigger_entry_type
Definition: sort_stream.h:62
runs_creator_type creator
Definition: sort_stream.h:1543
Input strategy for runs_creator class.
Definition: sort_stream.h:701
Block containing elements of fixed length.
Definition: typed_block.h:237
virtual void wait(bool measure_time=true)=0
Suspends calling thread until completion of the request.
Encapsulates asynchronous prefetching engine.
void clear()
Clear the internal state of the object: release all runs and reset.
Definition: sorted_runs.h:76
Merges sorted runs.
Definition: sort_stream.h:1488
virtual ~basic_runs_merger()
Destructor.
Definition: sort_stream.h:1327
runs_merger(value_cmp cmp, unsigned_type memory_to_use)
Creates a runs merger object without initializing a round of sorted_runs.
Definition: sort_stream.h:1514
sort(Input &in, CompareType c, unsigned_type memory_to_use)
Creates the object.
Definition: sort_stream.h:1554
#define _STXXL_SORT_TRIGGER_FORCE_SEQUENTIAL
Definition: parallel.h:42
choose_int_types< my_pointer_size >::int_type int_type
Definition: types.h:63
loser_tree< run_cursor_type, run_cursor2_cmp_type > loser_tree_type
Definition: sort_stream.h:950
sorted_runs_data_type::size_type size_type
Definition: sort_stream.h:942
RunsCreatorType runs_creator_type
Definition: sort_stream.h:1539
basic_runs_creator(Input &input, CompareType cmp, unsigned_type memory_to_use)
Create the object.
Definition: sort_stream.h:130
out_block_type * m_buffer_block
memory buffer for merging from external streams
Definition: sort_stream.h:973
internal_size_type m_cur_el
current number of elements in the run m_blocks1
Definition: sort_stream.h:457
void initialize(const sorted_runs_type &sruns)
Initialize the runs merger object with a new round of sorted_runs.
Definition: sort_stream.h:1127
base::block_type block_type
Definition: sort_stream.h:370
unsigned_type internal_size_type
Definition: types.h:66
Forms sorted runs of data from a stream.
Definition: sort_stream.h:362
size_type size() const
Standard size method.
Definition: sort_stream.h:1278
runs_creator(Input &input, CompareType cmp, unsigned_type memory_to_use)
Creates the object.
Definition: sort_stream.h:380
sorted_runs_data_type::value_type value_type
Standard stream typedef.
Definition: sort_stream.h:957
const Type & STXXL_MAX(const Type &a, const Type &b)
Definition: utils.h:153
runs_creator(CompareType c, unsigned_type memory_to_use)
Creates the object.
Definition: sort_stream.h:763
sort(Input &in, CompareType c, unsigned_type m_memory_to_userc, unsigned_type m_memory_to_use)
Creates the object.
Definition: sort_stream.h:1566
void sort_run(block_type *run, unsigned_type elements)
Sort a specific run, contained in a sequences of blocks.
Definition: sort_stream.h:114
#define STXXL_CHECK_ORDER_IN_SORTS
Definition: sort_base.h:30
#define STXXL_BEGIN_NAMESPACE
Definition: namespace.h:16
void STXXL_UNUSED(const U &)
Definition: unused.h:22
sorted_runs_type & result()
Returns the sorted runs object.
Definition: sort_stream.h:150
void set_memory_to_use(unsigned_type memory_to_use)
Set memory amount to use for the merger in bytes.
Definition: sort_stream.h:1121
#define STXXL_VERBOSE1(x)
Definition: verbose.h:113
base::sorted_runs_data_type sorted_runs_data_type
Definition: sort_stream.h:371
size_type m_elements_remaining
items remaining in input
Definition: sort_stream.h:970
sorted_runs< trigger_entry_type, std::less< value_type > > result
Definition: sort_stream.h:1616
Produces sorted stream from input stream.
Definition: sort_stream.h:1537
bool is_sorted(ForwardIterator first, ForwardIterator last, StrictWeakOrdering comp)
Definition: is_sorted.h:24
unsigned_type memory_used() const
return memory size used (in bytes).
Definition: sort_stream.h:688
iterator begin()
Returns iterator pointing to the first element.
Definition: typed_block.h:92
sorted_runs_type m_result
stores the result (sorted runs) in a reference counted object
Definition: sort_stream.h:439
OutputIterator materialize(StreamAlgorithm &in, OutputIterator out)
Stores consecutively stream content to an output iterator.
Definition: stream.h:428
block_type * m_blocks2
accumulation buffer that is currently being written to disk
Definition: sort_stream.h:463
sorted_runs_data_type::block_type block_type
Definition: sort_stream.h:944
bool empty() const
Standard stream method.
Definition: sort_stream.h:1272
void new_blocks(const DiskAssignFunctor &functor, BIDIteratorClass bidbegin, BIDIteratorClass bidend, unsigned_type offset=0)
Allocates new blocks.
sorted_runs< trigger_entry_type, cmp_type > sorted_runs_data_type
Definition: sort_stream.h:63
typed_block< BlockSize, value_type > block_type
Definition: sort_stream.h:61
element_iterator_traits< BlockType, SizeType >::element_iterator make_element_iterator(BlockType *blocks, SizeType offset)
Definition: adaptor.h:646
base::value_cmp value_cmp
Definition: sort_stream.h:1495
#define STXXL_ERRMSG(x)
Definition: verbose.h:94
runs_creator_type::sorted_runs_type sorted_runs_type
Definition: sort_stream.h:1540
void wait_all(RequestIterator reqs_begin, RequestIterator reqs_end)
Collection of functions to track statuses of a number of requests.
uint64 external_size_type
Definition: types.h:67
choose_int_types< my_pointer_size >::unsigned_type unsigned_type
Definition: types.h:64
block_type * pull_block()
Pulls next unconsumed block from the consumption sequence.
const value_type * m_current_ptr
pointer into current memory buffer: this is either m_buffer_block or the small_runs vector ...
Definition: sort_stream.h:976
void compute_prefetch_schedule(const int_type *first, const int_type *last, int_type *out_first, int_type m, int_type D)
#define STXXL_MSG(x)
Definition: verbose.h:73
unsigned_type optimal_merge_factor(unsigned_type num_runs, unsigned_type max_concurrent_runs)
Definition: sort_base.h:40
Definition: sort_helper.h:37
sort_helper::trigger_entry< bid_type, value_type > trigger_entry_type
Definition: sort_stream.h:1613
std::vector< trigger_entry_type > run_type
Definition: sorted_runs.h:43
run_type::value_type trigger_entry_type
Definition: sort_stream.h:946
Input strategy for runs_creator class.
Definition: sort_stream.h:393
runs_creator(CompareType cmp, unsigned_type memory_to_use)
Creates the object.
Definition: sort_stream.h:545
std::pair< typename block_type::iterator, typename block_type::iterator > sequence
Definition: sort_stream.h:952
Input & m_input
reference to the input stream
Definition: sort_stream.h:71
basic_runs_creator< Input, CompareType, BlockSize, AllocStr > base
Definition: sort_stream.h:365
prefetcher_type * m_prefetcher
prefetcher object
Definition: sort_stream.h:988
value_cmp m_cmp
comparator object to sort runs
Definition: sort_stream.h:961
request_ptr write(const bid_type &bid, completion_handler on_cmpl=completion_handler())
Writes block to the disk(s).
Definition: typed_block.h:283
void sort_run(block_type *run, unsigned_type elements)
Sort a specific run, contained in a sequences of blocks.
Definition: sort_stream.h:489
element_iterator_traits< block_type, external_size_type >::element_iterator element_iterator
Definition: sort_stream.h:430
bool do_parallel_merge()
Definition: parallel.h:102
base::value_cmp cmp_type
Definition: sort_stream.h:1496
void fill_with_max_value(block_type *blocks, unsigned_type num_blocks, unsigned_type first_idx)
fill the rest of the block with max values
Definition: sort_stream.h:474
Input::value_type value_type
Standard stream typedef.
Definition: sort_stream.h:1548
#define STXXL_END_NAMESPACE
Definition: namespace.h:17
helper::element_iterator_generator< BlockType, SizeType, BlockType::has_only_data >::iterator element_iterator
Definition: adaptor.h:640
sorted_runs_type m_sruns
smart pointer to sorted_runs object
Definition: sort_stream.h:967