STXXL  1.4.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
sort_stream.h
Go to the documentation of this file.
1 /***************************************************************************
2  * include/stxxl/bits/stream/sort_stream.h
3  *
4  * Part of the STXXL. See http://stxxl.sourceforge.net
5  *
6  * Copyright (C) 2002-2005 Roman Dementiev <[email protected]>
7  * Copyright (C) 2006-2008 Johannes Singler <[email protected]>
8  * Copyright (C) 2008-2010 Andreas Beckmann <[email protected]urt.de>
9  * Copyright (C) 2013 Timo Bingmann <[email protected]>
10  *
11  * Distributed under the Boost Software License, Version 1.0.
12  * (See accompanying file LICENSE_1_0.txt or copy at
13  * http://www.boost.org/LICENSE_1_0.txt)
14  **************************************************************************/
15 
16 #ifndef STXXL_STREAM_SORT_STREAM_HEADER
17 #define STXXL_STREAM_SORT_STREAM_HEADER
18 
19 #include <stxxl/bits/config.h>
28 
30 
31 namespace stream {
32 
33 //! \addtogroup streampack Stream Package
34 //! \{
35 
36 ////////////////////////////////////////////////////////////////////////
37 // CREATE RUNS //
38 ////////////////////////////////////////////////////////////////////////
39 
40 //! Forms sorted runs of data from a stream.
41 //!
42 //! \tparam Input type of the input stream
43 //! \tparam CompareType type of comparison object used for sorting the runs
44 //! \tparam BlockSize size of blocks used to store the runs (in bytes)
45 //! \tparam AllocStr functor that defines allocation strategy for the runs
46 template <
47  class Input,
48  class CompareType,
49  unsigned BlockSize = STXXL_DEFAULT_BLOCK_SIZE(typename Input::value_type),
50  class AllocStr = STXXL_DEFAULT_ALLOC_STRATEGY>
52 {
53 public:
54  typedef Input input_type;
55  typedef CompareType cmp_type;
56  static const unsigned block_size = BlockSize;
57  typedef AllocStr allocation_strategy_type;
58 
59 public:
60  typedef typename Input::value_type value_type;
66 
68 
69 protected:
70  //! reference to the input stream
71  Input& m_input;
72  //! comparator used to sort block groups
73  CompareType m_cmp;
74 
75 private:
76  //! stores the result (sorted runs) as smart pointer
78  //! memory for internal use in blocks
80  //! true iff result is already computed (used in 'result()' method)
82 
83  //! Fetch data from input into blocks[first_idx,last_idx).
85  unsigned_type first_idx, unsigned_type last_idx)
86  {
87  element_iterator output = make_element_iterator(blocks, first_idx);
88  unsigned_type curr_idx = first_idx;
89  while (!m_input.empty() && curr_idx != last_idx) {
90  *output = *m_input;
91  ++m_input;
92  ++output;
93  ++curr_idx;
94  }
95  return curr_idx;
96  }
97 
98  //! fill the rest of the block with max values
99  void fill_with_max_value(block_type* blocks, unsigned_type num_blocks,
100  unsigned_type first_idx)
101  {
102  unsigned_type last_idx = num_blocks * block_type::size;
103  if (first_idx < last_idx) {
104  element_iterator curr = make_element_iterator(blocks, first_idx);
105  while (first_idx != last_idx) {
106  *curr = m_cmp.max_value();
107  ++curr;
108  ++first_idx;
109  }
110  }
111  }
112 
113  //! Sort a specific run, contained in a sequences of blocks.
114  void sort_run(block_type* run, unsigned_type elements)
115  {
118  make_element_iterator(run, elements),
119  m_cmp);
120  }
121 
122  void compute_result();
123 
124 public:
125  //! Create the object.
126  //! \param input input stream
127  //! \param cmp comparator object
128  //! \param memory_to_use memory amount that is allowed to used by the
129  //! sorter in bytes
130  basic_runs_creator(Input& input, CompareType cmp,
131  unsigned_type memory_to_use)
132  : m_input(input),
133  m_cmp(cmp),
134  m_result(new sorted_runs_data_type),
135  m_memsize(memory_to_use / BlockSize / sort_memory_usage_factor()),
136  m_result_computed(false)
137  {
139  if (!(2 * BlockSize * sort_memory_usage_factor() <= memory_to_use)) {
140  throw bad_parameter("stxxl::runs_creator<>:runs_creator(): "
141  "INSUFFICIENT MEMORY provided, "
142  "please increase parameter 'memory_to_use'");
143  }
144  assert(m_memsize > 0);
145  }
146 
147  //! Returns the sorted runs object.
148  //! \return Sorted runs object. The result is computed lazily, i.e. on the first call
149  //! \remark Returned object is intended to be used by \c runs_merger object as input
151  {
152  if (!m_result_computed)
153  {
154  compute_result();
155  m_result_computed = true;
156 #ifdef STXXL_PRINT_STAT_AFTER_RF
157  STXXL_MSG(*stats::get_instance());
158 #endif //STXXL_PRINT_STAT_AFTER_RF
159  }
160  return m_result;
161  }
162 };
163 
164 //! Finish the results, i. e. create all runs.
165 //!
166 //! This is the main routine of this class.
167 template <class Input, class CompareType, unsigned BlockSize, class AllocStr>
169 {
170  unsigned_type i = 0;
171  unsigned_type m2 = m_memsize / 2;
172  const unsigned_type el_in_run = m2 * block_type::size; // # el in a run
173  STXXL_VERBOSE1("basic_runs_creator::compute_result m2=" << m2);
174  unsigned_type blocks1_length = 0, blocks2_length = 0;
175  block_type* Blocks1 = NULL;
176 
177 #ifndef STXXL_SMALL_INPUT_PSORT_OPT
178  Blocks1 = new block_type[m2 * 2];
179 #else
180  // push input element into small_run vector in result until it is full
181  while (!input.empty() && blocks1_length != block_type::size)
182  {
183  m_result->small_run.push_back(*input);
184  ++input;
185  ++blocks1_length;
186  }
187 
188  if (blocks1_length == block_type::size && !input.empty())
189  {
190  Blocks1 = new block_type[m2 * 2];
191  std::copy(m_result->small_run.begin(), m_result->small_run.end(),
192  Blocks1[0].begin());
193  m_result->small_run.clear();
194  }
195  else
196  {
197  STXXL_VERBOSE1("basic_runs_creator: Small input optimization, input length: " << blocks1_length);
198  m_result->elements = blocks1_length;
200  potentially_parallel::sort(m_result->small_run.begin(), m_result->small_run.end(), cmp);
201  return;
202  }
203 #endif //STXXL_SMALL_INPUT_PSORT_OPT
204 
205  // the first block may be there already, now fetch until memsize is filled.
206  blocks1_length = fetch(Blocks1, blocks1_length, el_in_run);
207 
208  // sort first run
209  sort_run(Blocks1, blocks1_length);
210 
211  if (blocks1_length <= block_type::size && m_input.empty())
212  {
213  // small input, do not flush it on the disk(s)
214  STXXL_VERBOSE1("basic_runs_creator: Small input optimization, input length: " << blocks1_length);
215  assert(m_result->small_run.empty());
216  m_result->small_run.assign(Blocks1[0].begin(), Blocks1[0].begin() + blocks1_length);
217  m_result->elements = blocks1_length;
218  delete[] Blocks1;
219  return;
220  }
221 
222  block_type* Blocks2 = Blocks1 + m2;
223  block_manager* bm = block_manager::get_instance();
224  request_ptr* write_reqs = new request_ptr[m2];
225  run_type run;
226 
227  unsigned_type cur_run_size = div_ceil(blocks1_length, block_type::size); // in blocks
228  run.resize(cur_run_size);
229  bm->new_blocks(AllocStr(), make_bid_iterator(run.begin()), make_bid_iterator(run.end()));
230 
231  disk_queues::get_instance()->set_priority_op(request_queue::WRITE);
232 
233  // fill the rest of the last block with max values
234  fill_with_max_value(Blocks1, cur_run_size, blocks1_length);
235 
236  for (i = 0; i < cur_run_size; ++i)
237  {
238  run[i].value = Blocks1[i][0];
239  write_reqs[i] = Blocks1[i].write(run[i].bid);
240  }
241  m_result->runs.push_back(run);
242  m_result->runs_sizes.push_back(blocks1_length);
243  m_result->elements += blocks1_length;
244 
245  if (m_input.empty())
246  {
247  // return
248  wait_all(write_reqs, write_reqs + cur_run_size);
249  delete[] write_reqs;
250  delete[] Blocks1;
251  return;
252  }
253 
254  STXXL_VERBOSE1("Filling the second part of the allocated blocks");
255  blocks2_length = fetch(Blocks2, 0, el_in_run);
256 
257  if (m_input.empty())
258  {
259  // optimization if the whole set fits into both halves
260  // (re)sort internally and return
261  blocks2_length += el_in_run;
262  sort_run(Blocks1, blocks2_length); // sort first an second run together
263  wait_all(write_reqs, write_reqs + cur_run_size);
264  bm->delete_blocks(make_bid_iterator(run.begin()), make_bid_iterator(run.end()));
265 
266  cur_run_size = div_ceil(blocks2_length, block_type::size);
267  run.resize(cur_run_size);
268  bm->new_blocks(AllocStr(), make_bid_iterator(run.begin()), make_bid_iterator(run.end()));
269 
270  // fill the rest of the last block with max values
271  fill_with_max_value(Blocks1, cur_run_size, blocks2_length);
272 
273  assert(cur_run_size > m2);
274 
275  for (i = 0; i < m2; ++i)
276  {
277  run[i].value = Blocks1[i][0];
278  write_reqs[i]->wait();
279  write_reqs[i] = Blocks1[i].write(run[i].bid);
280  }
281 
282  request_ptr* write_reqs1 = new request_ptr[cur_run_size - m2];
283 
284  for ( ; i < cur_run_size; ++i)
285  {
286  run[i].value = Blocks1[i][0];
287  write_reqs1[i - m2] = Blocks1[i].write(run[i].bid);
288  }
289 
290  m_result->runs[0] = run;
291  m_result->runs_sizes[0] = blocks2_length;
292  m_result->elements = blocks2_length;
293 
294  wait_all(write_reqs, write_reqs + m2);
295  delete[] write_reqs;
296  wait_all(write_reqs1, write_reqs1 + cur_run_size - m2);
297  delete[] write_reqs1;
298 
299  delete[] Blocks1;
300 
301  return;
302  }
303 
304  // more than 2 runs can be filled, i. e. the general case
305 
306  sort_run(Blocks2, blocks2_length);
307 
308  cur_run_size = div_ceil(blocks2_length, block_type::size); // in blocks
309  run.resize(cur_run_size);
310  bm->new_blocks(AllocStr(), make_bid_iterator(run.begin()), make_bid_iterator(run.end()));
311 
312  for (i = 0; i < cur_run_size; ++i)
313  {
314  run[i].value = Blocks2[i][0];
315  write_reqs[i]->wait();
316  write_reqs[i] = Blocks2[i].write(run[i].bid);
317  }
318  assert((blocks2_length % el_in_run) == 0);
319 
320  m_result->add_run(run, blocks2_length);
321 
322  while (!m_input.empty())
323  {
324  blocks1_length = fetch(Blocks1, 0, el_in_run);
325  sort_run(Blocks1, blocks1_length);
326  cur_run_size = div_ceil(blocks1_length, block_type::size); // in blocks
327  run.resize(cur_run_size);
328  bm->new_blocks(AllocStr(), make_bid_iterator(run.begin()), make_bid_iterator(run.end()));
329 
330  // fill the rest of the last block with max values (occurs only on the last run)
331  fill_with_max_value(Blocks1, cur_run_size, blocks1_length);
332 
333  for (i = 0; i < cur_run_size; ++i)
334  {
335  run[i].value = Blocks1[i][0];
336  write_reqs[i]->wait();
337  write_reqs[i] = Blocks1[i].write(run[i].bid);
338  }
339  m_result->add_run(run, blocks1_length);
340 
341  std::swap(Blocks1, Blocks2);
342  std::swap(blocks1_length, blocks2_length);
343  }
344 
345  wait_all(write_reqs, write_reqs + m2);
346  delete[] write_reqs;
347  delete[] ((Blocks1 < Blocks2) ? Blocks1 : Blocks2);
348 }
349 
350 //! Forms sorted runs of data from a stream.
351 //!
352 //! \tparam Input type of the input stream
353 //! \tparam CompareType type of omparison object used for sorting the runs
354 //! \tparam BlockSize size of blocks used to store the runs
355 //! \tparam AllocStr functor that defines allocation strategy for the runs
356 template <
357  class Input,
358  class CompareType,
359  unsigned BlockSize = STXXL_DEFAULT_BLOCK_SIZE(typename Input::value_type),
360  class AllocStr = STXXL_DEFAULT_ALLOC_STRATEGY
361  >
362 class runs_creator : public basic_runs_creator<Input, CompareType, BlockSize, AllocStr>
363 {
364 private:
366 
367 public:
368  typedef typename base::cmp_type cmp_type;
369  typedef typename base::value_type value_type;
370  typedef typename base::block_type block_type;
373 
374 public:
375  //! Creates the object.
376  //! \param input input stream
377  //! \param cmp comparator object
378  //! \param memory_to_use memory amount that is allowed to used by the
379  //! sorter in bytes
380  runs_creator(Input& input, CompareType cmp, unsigned_type memory_to_use)
381  : base(input, cmp, memory_to_use)
382  { }
383 };
384 
385 //! Input strategy for \c runs_creator class.
386 //!
387 //! This strategy together with \c runs_creator class
388 //! allows to create sorted runs
389 //! data structure usable for \c runs_merger
390 //! pushing elements into the sorter
391 //! (using runs_creator::push())
392 template <class ValueType>
393 struct use_push
394 {
395  typedef ValueType value_type;
396 };
397 
398 //! Forms sorted runs of elements passed in push() method.
399 //!
400 //! A specialization of \c runs_creator that
401 //! allows to create sorted runs
402 //! data structure usable for \c runs_merger from
403 //! elements passed in sorted push() method. <BR>
404 //! \tparam ValueType type of values (parameter for \c use_push strategy)
405 //! \tparam CompareType type of comparison object used for sorting the runs
406 //! \tparam BlockSize size of blocks used to store the runs
407 //! \tparam AllocStr functor that defines allocation strategy for the runs
408 template <
409  class ValueType,
410  class CompareType,
411  unsigned BlockSize,
412  class AllocStr
413  >
415  use_push<ValueType>,
416  CompareType,
417  BlockSize,
418  AllocStr
419  >: private noncopyable
420 {
421 public:
422  typedef CompareType cmp_type;
423  typedef ValueType value_type;
429 
431 
432 private:
433  //! comparator object to sort runs
434  CompareType m_cmp;
435 
437 
438  //! stores the result (sorted runs) in a reference counted object
440 
441  //! memory size in bytes to use
443 
444  //! memory size in numberr of blocks for internal use
446 
447  //! m_memsize / 2
449 
450  //! true after the result() method was called for the first time
452 
453  //! total number of elements in a run
455 
456  //! current number of elements in the run m_blocks1
458 
459  //! accumulation buffer of size m_m2 blocks, half the available memory size
461 
462  //! accumulation buffer that is currently being written to disk
464 
465  //! reference to write requests transporting the last accumulation buffer
466  //! to disk
468 
469  //! run object containing block ids of the run being written to disk
471 
472 protected:
473  //! fill the rest of the block with max values
474  void fill_with_max_value(block_type* blocks, unsigned_type num_blocks,
475  unsigned_type first_idx)
476  {
477  unsigned_type last_idx = num_blocks * block_type::size;
478  if (first_idx < last_idx) {
479  element_iterator curr = make_element_iterator(blocks, first_idx);
480  while (first_idx != last_idx) {
481  *curr = m_cmp.max_value();
482  ++curr;
483  ++first_idx;
484  }
485  }
486  }
487 
488  //! Sort a specific run, contained in a sequences of blocks.
489  void sort_run(block_type* run, unsigned_type elements)
490  {
493  make_element_iterator(run, elements),
494  m_cmp);
495  }
496 
498  {
499  if (m_cur_el == 0)
500  return;
501 
502  sort_run(m_blocks1, m_cur_el);
503 
504  if (m_cur_el <= block_type::size && m_result->elements == 0)
505  {
506  // small input, do not flush it on the disk(s)
507  STXXL_VERBOSE1("runs_creator(use_push): Small input optimization, input length: " << m_cur_el);
508  m_result->small_run.assign(m_blocks1[0].begin(), m_blocks1[0].begin() + m_cur_el);
509  m_result->elements = m_cur_el;
510  return;
511  }
512 
513  const unsigned_type cur_run_size = div_ceil(m_cur_el, block_type::size); // in blocks
514  run.resize(cur_run_size);
515  block_manager* bm = block_manager::get_instance();
516  bm->new_blocks(AllocStr(), make_bid_iterator(run.begin()), make_bid_iterator(run.end()));
517 
518  disk_queues::get_instance()->set_priority_op(request_queue::WRITE);
519 
520  // fill the rest of the last block with max values
521  fill_with_max_value(m_blocks1, cur_run_size, m_cur_el);
522 
523  unsigned_type i = 0;
524  for ( ; i < cur_run_size; ++i)
525  {
526  run[i].value = m_blocks1[i][0];
527  if (m_write_reqs[i].get())
528  m_write_reqs[i]->wait();
529 
530  m_write_reqs[i] = m_blocks1[i].write(run[i].bid);
531  }
532  m_result->add_run(run, m_cur_el);
533 
534  for (i = 0; i < m_m2; ++i)
535  {
536  if (m_write_reqs[i].get())
537  m_write_reqs[i]->wait();
538  }
539  }
540 
541 public:
542  //! Creates the object.
543  //! \param cmp comparator object
544  //! \param memory_to_use memory amount that is allowed to used by the sorter in bytes
545  runs_creator(CompareType cmp, unsigned_type memory_to_use)
546  : m_cmp(cmp),
547  m_memory_to_use(memory_to_use),
548  m_memsize(memory_to_use / BlockSize / sort_memory_usage_factor()),
549  m_m2(m_memsize / 2),
550  m_el_in_run(m_m2 * block_type::size),
551  m_blocks1(NULL), m_blocks2(NULL),
552  m_write_reqs(NULL)
553  {
555  if (!(2 * BlockSize * sort_memory_usage_factor() <= m_memory_to_use)) {
556  throw bad_parameter("stxxl::runs_creator<>:runs_creator(): "
557  "INSUFFICIENT MEMORY provided, "
558  "please increase parameter 'memory_to_use'");
559  }
560  assert(m_m2 > 0);
561 
562  allocate();
563  }
564 
566  {
567  m_result_computed = 1;
568  deallocate();
569  }
570 
571  //! Clear current state and remove all items.
572  void clear()
573  {
574  if (!m_result)
575  m_result = new sorted_runs_data_type;
576  else
577  m_result->clear();
578 
579  m_result_computed = false;
580  m_cur_el = 0;
581 
582  for (unsigned_type i = 0; i < m_m2; ++i)
583  {
584  if (m_write_reqs[i].get())
585  m_write_reqs[i]->cancel();
586  }
587  }
588 
589  //! Allocates input buffers and clears result.
590  void allocate()
591  {
592  if (!m_blocks1)
593  {
594  m_blocks1 = new block_type[m_m2 * 2];
595  m_blocks2 = m_blocks1 + m_m2;
596 
597  m_write_reqs = new request_ptr[m_m2];
598  }
599 
600  clear();
601  }
602 
603  //! Deallocates input buffers but not the current result.
604  void deallocate()
605  {
606  result(); // finishes result
607 
608  if (m_blocks1)
609  {
610  delete[] ((m_blocks1 < m_blocks2) ? m_blocks1 : m_blocks2);
611  m_blocks1 = m_blocks2 = NULL;
612 
613  delete[] m_write_reqs;
614  m_write_reqs = NULL;
615  }
616  }
617 
618  //! Adds new element to the sorter.
619  //! \param val value to be added
620  void push(const value_type& val)
621  {
622  assert(m_result_computed == false);
623  if (LIKELY(m_cur_el < m_el_in_run))
624  {
625  m_blocks1[m_cur_el / block_type::size][m_cur_el % block_type::size] = val;
626  ++m_cur_el;
627  return;
628  }
629 
630  assert(m_el_in_run == m_cur_el);
631  m_cur_el = 0;
632 
633  // sort and store m_blocks1
634  sort_run(m_blocks1, m_el_in_run);
635 
636  const unsigned_type cur_run_blocks = div_ceil(m_el_in_run, block_type::size); // in blocks
637  run.resize(cur_run_blocks);
638  block_manager* bm = block_manager::get_instance();
639  bm->new_blocks(AllocStr(), make_bid_iterator(run.begin()), make_bid_iterator(run.end()));
640 
641  disk_queues::get_instance()->set_priority_op(request_queue::WRITE);
642 
643  for (unsigned_type i = 0; i < cur_run_blocks; ++i)
644  {
645  run[i].value = m_blocks1[i][0];
646  if (m_write_reqs[i].get())
647  m_write_reqs[i]->wait();
648 
649  m_write_reqs[i] = m_blocks1[i].write(run[i].bid);
650  }
651 
652  m_result->add_run(run, m_el_in_run);
653 
654  std::swap(m_blocks1, m_blocks2);
655 
656  push(val);
657  }
658 
659  //! Returns the sorted runs object.
660  //! \return Sorted runs object.
661  //! \remark Returned object is intended to be used by \c runs_merger object as input
663  {
664  if (!m_result_computed)
665  {
666  compute_result();
667  m_result_computed = true;
668 #ifdef STXXL_PRINT_STAT_AFTER_RF
669  STXXL_MSG(*stats::get_instance());
670 #endif //STXXL_PRINT_STAT_AFTER_RF
671  }
672  return m_result;
673  }
674 
675  //! number of items currently inserted.
677  {
678  return m_result->elements + m_cur_el;
679  }
680 
681  //! return comparator object.
682  const cmp_type & cmp() const
683  {
684  return m_cmp;
685  }
686 
687  //! return memory size used (in bytes).
689  {
690  return m_memory_to_use;
691  }
692 };
693 
694 //! Input strategy for \c runs_creator class.
695 //!
696 //! This strategy together with \c runs_creator class
697 //! allows to create sorted runs
698 //! data structure usable for \c runs_merger from
699 //! sequences of elements in sorted order
700 template <class ValueType>
702 {
703  typedef ValueType value_type;
704 };
705 
706 //! Forms sorted runs of data taking elements in sorted order (element by element).
707 //!
708 //! A specialization of \c runs_creator that
709 //! allows to create sorted runs
710 //! data structure usable for \c runs_merger from
711 //! sequences of elements in sorted order. <BR>
712 //! \tparam ValueType type of values (parameter for \c from_sorted_sequences strategy)
713 //! \tparam CompareType type of comparison object used for sorting the runs
714 //! \tparam BlockSize size of blocks used to store the runs
715 //! \tparam AllocStr functor that defines allocation strategy for the runs
716 template <
717  class ValueType,
718  class CompareType,
719  unsigned BlockSize,
720  class AllocStr
721  >
723  from_sorted_sequences<ValueType>,
724  CompareType,
725  BlockSize,
726  AllocStr
727  >: private noncopyable
728 {
729 public:
730  typedef ValueType value_type;
733  typedef AllocStr alloc_strategy_type;
734 
735 public:
736  typedef CompareType cmp_type;
740 
741 private:
743 
744  CompareType cmp;
745 
746  //! stores the result (sorted runs)
748  //! memory for internal use in blocks
755  //! needs to be reset after each run
757 
758 public:
759  //! Creates the object.
760  //! \param c comparator object
761  //! \param memory_to_use memory amount that is allowed to used by the sorter in bytes.
762  //! Recommended value: 2 * block_size * D
763  runs_creator(CompareType c, unsigned_type memory_to_use)
764  : cmp(c),
765  result_(new sorted_runs_data_type),
766  m_(memory_to_use / BlockSize / sort_memory_usage_factor()),
767  writer(m_, m_ / 2),
768  cur_block(writer.get_free_block()),
769  offset(0),
770  iblock(0),
771  irun(0)
772  {
774  assert(m_ > 0);
775  if (!(2 * BlockSize * sort_memory_usage_factor() <= memory_to_use)) {
776  throw bad_parameter("stxxl::runs_creator<>:runs_creator(): "
777  "INSUFFICIENT MEMORY provided, "
778  "please increase parameter 'memory_to_use'");
779  }
780  }
781 
782  //! Adds new element to the current run.
783  //! \param val value to be added to the current run
784  void push(const value_type& val)
785  {
786  assert(offset < block_type::size);
787 
788  (*cur_block)[offset] = val;
789  ++offset;
790 
791  if (offset == block_type::size)
792  {
793  // write current block
794 
795  block_manager* bm = block_manager::get_instance();
796  // allocate space for the block
797  result_->runs.resize(irun + 1);
798  result_->runs[irun].resize(iblock + 1);
799  bm->new_blocks(
800  alloc_strategy,
801  make_bid_iterator(result_->runs[irun].begin() + iblock),
802  make_bid_iterator(result_->runs[irun].end()),
803  iblock
804  );
805 
806  result_->runs[irun][iblock].value = (*cur_block)[0]; // init trigger
807  cur_block = writer.write(cur_block, result_->runs[irun][iblock].bid);
808  ++iblock;
809 
810  offset = 0;
811  }
812 
813  ++result_->elements;
814  }
815 
816  //! Finishes current run and begins new one.
817  void finish()
818  {
819  if (offset == 0 && iblock == 0) // current run is empty
820  return;
821 
822  result_->runs_sizes.resize(irun + 1);
823  result_->runs_sizes.back() = iblock * block_type::size + offset;
824 
825  if (offset) // if current block is partially filled
826  {
827  while (offset != block_type::size)
828  {
829  (*cur_block)[offset] = cmp.max_value();
830  ++offset;
831  }
832  offset = 0;
833 
834  block_manager* bm = block_manager::get_instance();
835  // allocate space for the block
836  result_->runs.resize(irun + 1);
837  result_->runs[irun].resize(iblock + 1);
838  bm->new_blocks(
839  alloc_strategy,
840  make_bid_iterator(result_->runs[irun].begin() + iblock),
841  make_bid_iterator(result_->runs[irun].end()),
842  iblock
843  );
844 
845  result_->runs[irun][iblock].value = (*cur_block)[0]; // init trigger
846  cur_block = writer.write(cur_block, result_->runs[irun][iblock].bid);
847  }
848  else
849  { }
850 
851  alloc_strategy = alloc_strategy_type(); // reinitialize block allocator for the next run
852  iblock = 0;
853  ++irun;
854  }
855 
856  //! Returns the sorted runs object.
857  //! \return Sorted runs object
858  //! \remark Returned object is intended to be used by \c runs_merger object as input
860  {
861  finish();
862  writer.flush();
863 
864  return result_;
865  }
866 };
867 
868 //! Checker for the sorted runs object created by the \c runs_creator .
869 //! \param sruns sorted runs object
870 //! \param cmp comparison object used for checking the order of elements in runs
871 //! \return \c true if runs are sorted, \c false otherwise
872 template <class RunsType, class CompareType>
873 bool check_sorted_runs(const RunsType& sruns, CompareType cmp)
874 {
876  typedef typename RunsType::element_type::block_type block_type;
877  STXXL_VERBOSE2("Elements: " << sruns->elements);
878  unsigned_type nruns = sruns->runs.size();
879  STXXL_VERBOSE2("Runs: " << nruns);
880  unsigned_type irun = 0;
881  for (irun = 0; irun < nruns; ++irun)
882  {
883  const unsigned_type nblocks = sruns->runs[irun].size();
884  block_type* blocks = new block_type[nblocks];
885  request_ptr* reqs = new request_ptr[nblocks];
886  for (unsigned_type j = 0; j < nblocks; ++j)
887  {
888  reqs[j] = blocks[j].read(sruns->runs[irun][j].bid);
889  }
890  wait_all(reqs, reqs + nblocks);
891  delete[] reqs;
892 
893  for (unsigned_type j = 0; j < nblocks; ++j)
894  {
895  if (cmp(blocks[j][0], sruns->runs[irun][j].value) ||
896  cmp(sruns->runs[irun][j].value, blocks[j][0])) //!=
897  {
898  STXXL_ERRMSG("check_sorted_runs wrong trigger in the run");
899  delete[] blocks;
900  return false;
901  }
902  }
903  if (!stxxl::is_sorted(
904  make_element_iterator(blocks, 0),
905  make_element_iterator(blocks, sruns->runs_sizes[irun]),
906  cmp))
907  {
908  STXXL_ERRMSG("check_sorted_runs wrong order in the run");
909  delete[] blocks;
910  return false;
911  }
912 
913  delete[] blocks;
914  }
915 
916  STXXL_MSG("Checking runs finished successfully");
917 
918  return true;
919 }
920 
921 ////////////////////////////////////////////////////////////////////////
922 // MERGE RUNS //
923 ////////////////////////////////////////////////////////////////////////
924 
925 //! Merges sorted runs.
926 //!
927 //! \tparam RunsType type of the sorted runs, available as \c runs_creator::sorted_runs_type ,
928 //! \tparam CompareType type of comparison object used for merging
929 //! \tparam AllocStr allocation strategy used to allocate the blocks for
930 //! storing intermediate results if several merge passes are required
931 template <class RunsType,
932  class CompareType,
933  class AllocStr = STXXL_DEFAULT_ALLOC_STRATEGY>
935 {
936 public:
937  typedef RunsType sorted_runs_type;
938  typedef CompareType value_cmp;
939  typedef AllocStr alloc_strategy;
940 
941  typedef typename sorted_runs_type::element_type sorted_runs_data_type;
942  typedef typename sorted_runs_data_type::size_type size_type;
943  typedef typename sorted_runs_data_type::run_type run_type;
944  typedef typename sorted_runs_data_type::block_type block_type;
946  typedef typename run_type::value_type trigger_entry_type;
952  typedef std::pair<typename block_type::iterator, typename block_type::iterator> sequence;
953  typedef typename std::vector<sequence>::size_type seqs_size_type;
954 
955 public:
956  //! Standard stream typedef.
957  typedef typename sorted_runs_data_type::value_type value_type;
958 
959 private:
960  //! comparator object to sort runs
962 
963  //! memory size in bytes to use
965 
966  //! smart pointer to sorted_runs object
968 
969  //! items remaining in input
971 
972  //! memory buffer for merging from external streams
974 
975  //! pointer into current memory buffer: this is either m_buffer_block or the small_runs vector
977 
978  //! pointer into current memory buffer: end after range of current values
980 
981  //! sequence of block needed for merging
983 
984  //! precalculated order of blocks in which they are prefetched
986 
987  //! prefetcher object
989 
990  //! loser tree used for native merging
992 
993 #if STXXL_PARALLEL_MULTIWAY_MERGE
994  std::vector<sequence>* seqs;
995  std::vector<block_type*>* buffers;
996  diff_type num_currently_mergeable;
997 #endif
998 
999 #if STXXL_CHECK_ORDER_IN_SORTS
1000  //! previous element to ensure the current output ordering
1001  value_type m_last_element;
1002 #endif //STXXL_CHECK_ORDER_IN_SORTS
1003 
1004  ////////////////////////////////////////////////////////////////////
1005 
1006  void merge_recursively();
1007 
1009  {
1010  if (m_prefetcher)
1011  {
1012  delete m_losers;
1013 #if STXXL_PARALLEL_MULTIWAY_MERGE
1014  delete seqs;
1015  delete buffers;
1016 #endif
1017  delete m_prefetcher;
1018  delete[] m_prefetch_seq;
1019  m_prefetcher = NULL;
1020  }
1021  }
1022 
1024  {
1025  STXXL_VERBOSE1("fill_buffer_block");
1026  if (do_parallel_merge())
1027  {
1028 #if STXXL_PARALLEL_MULTIWAY_MERGE
1029 // begin of STL-style merging
1030  diff_type rest = out_block_type::size; // elements still to merge for this output block
1031 
1032  do // while rest > 0 and still elements available
1033  {
1034  if (num_currently_mergeable < rest)
1035  {
1036  if (!m_prefetcher || m_prefetcher->empty())
1037  {
1038  // anything remaining is already in memory
1039  num_currently_mergeable = m_elements_remaining;
1040  }
1041  else
1042  {
1043  num_currently_mergeable = sort_helper::count_elements_less_equal(
1044  *seqs, m_consume_seq[m_prefetcher->pos()].value, m_cmp);
1045  }
1046  }
1047 
1048  diff_type output_size = STXXL_MIN(num_currently_mergeable, rest); // at most rest elements
1049 
1050  STXXL_VERBOSE1("before merge " << output_size);
1051 
1052  stxxl::parallel::multiway_merge((*seqs).begin(), (*seqs).end(), m_buffer_block->end() - rest, m_cmp, output_size);
1053  // sequence iterators are progressed appropriately
1054 
1055  rest -= output_size;
1056  num_currently_mergeable -= output_size;
1057 
1058  STXXL_VERBOSE1("after merge");
1059 
1060  sort_helper::refill_or_remove_empty_sequences(*seqs, *buffers, *m_prefetcher);
1061  } while (rest > 0 && (*seqs).size() > 0);
1062 
1063 #if STXXL_CHECK_ORDER_IN_SORTS
1064  if (!stxxl::is_sorted(m_buffer_block->begin(), m_buffer_block->end(), cmp))
1065  {
1066  for (value_type* i = m_buffer_block->begin() + 1; i != m_buffer_block->end(); ++i)
1067  if (cmp(*i, *(i - 1)))
1068  {
1069  STXXL_VERBOSE1("Error at position " << (i - m_buffer_block->begin()));
1070  }
1071  assert(false);
1072  }
1073 #endif //STXXL_CHECK_ORDER_IN_SORTS
1074 
1075 // end of STL-style merging
1076 #else
1078 #endif //STXXL_PARALLEL_MULTIWAY_MERGE
1079  }
1080  else
1081  {
1082 // begin of native merging procedure
1083  m_losers->multi_merge(m_buffer_block->elem, m_buffer_block->elem + STXXL_MIN<size_type>(out_block_type::size, m_elements_remaining));
1084 // end of native merging procedure
1085  }
1086  STXXL_VERBOSE1("current block filled");
1087 
1088  m_current_ptr = m_buffer_block->elem;
1089  m_current_end = m_buffer_block->elem + STXXL_MIN<size_type>(out_block_type::size, m_elements_remaining);
1090 
1091  if (m_elements_remaining <= out_block_type::size)
1092  deallocate_prefetcher();
1093  }
1094 
1095 public:
1096  //! Creates a runs merger object.
1097  //! \param c comparison object
1098  //! \param memory_to_use amount of memory available for the merger in bytes
1100  : m_cmp(c),
1101  m_memory_to_use(memory_to_use),
1102  m_buffer_block(new out_block_type),
1103  m_prefetch_seq(NULL),
1104  m_prefetcher(NULL),
1105  m_losers(NULL)
1107  , seqs(NULL),
1108  buffers(NULL),
1109  num_currently_mergeable(0)
1110 #endif
1112  , m_last_element(m_cmp.min_value())
1113 #endif //STXXL_CHECK_ORDER_IN_SORTS
1114  {
1116  }
1117 
1118  //! Set memory amount to use for the merger in bytes.
1119  void set_memory_to_use(unsigned_type memory_to_use)
1120  {
1121  m_memory_to_use = memory_to_use;
1122  }
1123 
1124  //! Initialize the runs merger object with a new round of sorted_runs.
1125  void initialize(const sorted_runs_type& sruns)
1126  {
1127  m_sruns = sruns;
1128  m_elements_remaining = m_sruns->elements;
1129 
1130  if (empty())
1131  return;
1132 
1133  if (!m_sruns->small_run.empty())
1134  {
1135  // we have a small input <= B, that is kept in the main memory
1136  STXXL_VERBOSE1("basic_runs_merger: small input optimization, input length: " << m_elements_remaining);
1137  assert(m_elements_remaining == size_type(m_sruns->small_run.size()));
1138 
1139  m_current_ptr = &m_sruns->small_run[0];
1140  m_current_end = m_current_ptr + m_sruns->small_run.size();
1141 
1142  return;
1143  }
1144 
1145 #if STXXL_CHECK_ORDER_IN_SORTS
1146  assert(check_sorted_runs(m_sruns, m_cmp));
1147 #endif //STXXL_CHECK_ORDER_IN_SORTS
1148 
1149  // *** test whether recursive merging is necessary
1150 
1151  disk_queues::get_instance()->set_priority_op(request_queue::WRITE);
1152 
1153  int_type disks_number = config::get_instance()->disks_number();
1154  unsigned_type min_prefetch_buffers = 2 * disks_number;
1155  unsigned_type input_buffers =
1156  (m_memory_to_use > sizeof(out_block_type)
1157  ? m_memory_to_use - sizeof(out_block_type)
1158  : 0) / block_type::raw_size;
1159  unsigned_type nruns = m_sruns->runs.size();
1160 
1161  if (input_buffers < nruns + min_prefetch_buffers)
1162  {
1163  // can not merge runs in one pass. merge recursively:
1164  STXXL_WARNMSG_RECURSIVE_SORT("The implementation of sort requires more than one merge pass, therefore for a better");
1165  STXXL_WARNMSG_RECURSIVE_SORT("efficiency decrease block size of run storage (a parameter of the run_creator)");
1166  STXXL_WARNMSG_RECURSIVE_SORT("or increase the amount memory dedicated to the merger.");
1167  STXXL_WARNMSG_RECURSIVE_SORT("m=" << input_buffers << " nruns=" << nruns << " prefetch_blocks=" << min_prefetch_buffers);
1168  STXXL_WARNMSG_RECURSIVE_SORT("memory_to_use=" << m_memory_to_use << " bytes block_type::raw_size=" << block_type::raw_size << " bytes");
1169 
1170  // check whether we have enough memory to merge recursively
1171  unsigned_type recursive_merge_buffers = m_memory_to_use / block_type::raw_size;
1172  if (recursive_merge_buffers < 2 * min_prefetch_buffers + 1 + 2) {
1173  // recursive merge uses min_prefetch_buffers for input buffering and min_prefetch_buffers output buffering
1174  // as well as 1 current output block and at least 2 input blocks
1175  STXXL_ERRMSG("There are only m=" << recursive_merge_buffers << " blocks available for recursive merging, but "
1176  << min_prefetch_buffers << "+" << min_prefetch_buffers << "+1 are needed read-ahead/write-back/output, and");
1177  STXXL_ERRMSG("the merger requires memory to store at least two input blocks internally. Aborting.");
1178  throw bad_parameter("basic_runs_merger::sort(): INSUFFICIENT MEMORY provided, please increase parameter 'memory_to_use'");
1179  }
1180 
1181  merge_recursively();
1182 
1183  nruns = m_sruns->runs.size();
1184  }
1185 
1186  assert(nruns + min_prefetch_buffers <= input_buffers);
1187 
1188  // *** Allocate prefetcher and merge data structure
1189 
1190  deallocate_prefetcher();
1191 
1192  unsigned_type prefetch_seq_size = 0;
1193  for (unsigned_type i = 0; i < nruns; ++i)
1194  {
1195  prefetch_seq_size += m_sruns->runs[i].size();
1196  }
1197 
1198  m_consume_seq.resize(prefetch_seq_size);
1199  m_prefetch_seq = new int_type[prefetch_seq_size];
1200 
1201  typename run_type::iterator copy_start = m_consume_seq.begin();
1202  for (unsigned_type i = 0; i < nruns; ++i)
1203  {
1204  copy_start = std::copy(m_sruns->runs[i].begin(),
1205  m_sruns->runs[i].end(),
1206  copy_start);
1207  }
1208 
1209  std::stable_sort(m_consume_seq.begin(), m_consume_seq.end(),
1211 
1212  const unsigned_type n_prefetch_buffers = STXXL_MAX(min_prefetch_buffers, input_buffers - nruns);
1213 
1214 #if STXXL_SORT_OPTIMAL_PREFETCHING
1215  // heuristic
1216  const int_type n_opt_prefetch_buffers = min_prefetch_buffers + (3 * (n_prefetch_buffers - min_prefetch_buffers)) / 10;
1217 
1219  m_consume_seq,
1220  m_prefetch_seq,
1221  n_opt_prefetch_buffers,
1222  config::get_instance()->get_max_device_id());
1223 #else
1224  for (unsigned_type i = 0; i < prefetch_seq_size; ++i)
1225  m_prefetch_seq[i] = i;
1226 #endif //STXXL_SORT_OPTIMAL_PREFETCHING
1227 
1228  m_prefetcher = new prefetcher_type(
1229  m_consume_seq.begin(),
1230  m_consume_seq.end(),
1231  m_prefetch_seq,
1232  STXXL_MIN(nruns + n_prefetch_buffers, prefetch_seq_size));
1233 
1234  if (do_parallel_merge())
1235  {
1236 #if STXXL_PARALLEL_MULTIWAY_MERGE
1237 // begin of STL-style merging
1238  seqs = new std::vector<sequence>(nruns);
1239  buffers = new std::vector<block_type*>(nruns);
1240 
1241  for (unsigned_type i = 0; i < nruns; ++i) //initialize sequences
1242  {
1243  (*buffers)[i] = m_prefetcher->pull_block(); //get first block of each run
1244  (*seqs)[i] = std::make_pair((*buffers)[i]->begin(), (*buffers)[i]->end()); //this memory location stays the same, only the data is exchanged
1245  }
1246 // end of STL-style merging
1247 #else
1249 #endif //STXXL_PARALLEL_MULTIWAY_MERGE
1250  }
1251  else
1252  {
1253 // begin of native merging procedure
1254  m_losers = new loser_tree_type(m_prefetcher, nruns, run_cursor2_cmp_type(m_cmp));
1255 // end of native merging procedure
1256  }
1257 
1258  fill_buffer_block();
1259  }
1260 
1261  //! Deallocate temporary structures freeing memory prior to next initialize().
1262  void deallocate()
1263  {
1264  deallocate_prefetcher();
1265  m_sruns = NULL; // release reference on result object
1266  }
1267 
1268 public:
1269  //! Standard stream method.
1270  bool empty() const
1271  {
1272  return (m_elements_remaining == 0);
1273  }
1274 
1275  //! Standard size method.
1276  size_type size() const
1277  {
1278  return m_elements_remaining;
1279  }
1280 
1281  //! Standard stream method.
1282  const value_type& operator * () const
1283  {
1284  assert(!empty());
1285  return *m_current_ptr;
1286  }
1287 
1288  //! Standard stream method.
1289  const value_type* operator -> () const
1290  {
1291  return &(operator * ());
1292  }
1293 
1294  //! Standard stream method.
1295  basic_runs_merger& operator ++ () // preincrement operator
1296  {
1297  assert(!empty());
1298  assert(m_current_ptr != m_current_end);
1299 
1300  --m_elements_remaining;
1301  ++m_current_ptr;
1302 
1303  if (LIKELY(m_current_ptr == m_current_end && !empty()))
1304  {
1305  fill_buffer_block();
1306 
1307 #if STXXL_CHECK_ORDER_IN_SORTS
1308  assert(stxxl::is_sorted(m_buffer_block->elem, m_buffer_block->elem + STXXL_MIN<size_type>(m_elements_remaining, m_buffer_block->size), m_cmp));
1309 #endif //STXXL_CHECK_ORDER_IN_SORTS
1310  }
1311 
1312 #if STXXL_CHECK_ORDER_IN_SORTS
1313  if (!empty())
1314  {
1315  assert(!m_cmp(operator * (), m_last_element));
1316  m_last_element = operator * ();
1317  }
1318 #endif //STXXL_CHECK_ORDER_IN_SORTS
1319 
1320  return *this;
1321  }
1322 
1323  //! Destructor.
1324  //! \remark Deallocates blocks of the input sorted runs object
1326  {
1327  deallocate_prefetcher();
1328 
1329  delete m_buffer_block;
1330  }
1331 };
1332 
1333 template <class RunsType, class CompareType, class AllocStr>
1335 {
1336  block_manager* bm = block_manager::get_instance();
1337  unsigned_type ndisks = config::get_instance()->disks_number();
1338  unsigned_type nwrite_buffers = 2 * ndisks;
1339  unsigned_type memory_for_write_buffers = nwrite_buffers * sizeof(block_type);
1340 
1341  // memory consumption of the recursive merger (uses block_type as
1342  // out_block_type)
1343  unsigned_type recursive_merger_memory_prefetch_buffers = 2 * ndisks * sizeof(block_type);
1344  unsigned_type recursive_merger_memory_out_block = sizeof(block_type);
1345  unsigned_type memory_for_buffers = memory_for_write_buffers
1346  + recursive_merger_memory_prefetch_buffers
1347  + recursive_merger_memory_out_block;
1348  // maximum arity in the recursive merger
1349  unsigned_type max_arity = (m_memory_to_use > memory_for_buffers ? m_memory_to_use - memory_for_buffers : 0) / block_type::raw_size;
1350 
1351  unsigned_type nruns = m_sruns->runs.size();
1352  const unsigned_type merge_factor = optimal_merge_factor(nruns, max_arity);
1353  assert(merge_factor > 1);
1354  assert(merge_factor <= max_arity);
1355 
1356  while (nruns > max_arity)
1357  {
1358  unsigned_type new_nruns = div_ceil(nruns, merge_factor);
1359  STXXL_MSG("Starting new merge phase: nruns: " << nruns <<
1360  " opt_merge_factor: " << merge_factor <<
1361  " max_arity: " << max_arity << " new_nruns: " << new_nruns);
1362 
1363  // construct new sorted_runs data object which will be swapped into
1364  // m_sruns
1365 
1366  sorted_runs_data_type new_runs;
1367  new_runs.runs.resize(new_nruns);
1368  new_runs.runs_sizes.resize(new_nruns);
1369  new_runs.elements = m_sruns->elements;
1370 
1371  // merge all runs from m_runs into news_runs
1372 
1373  unsigned_type runs_left = nruns;
1374  unsigned_type cur_out_run = 0;
1375  size_type elements_left = m_sruns->elements;
1376 
1377  while (runs_left > 0)
1378  {
1379  unsigned_type runs2merge = STXXL_MIN(runs_left, merge_factor);
1380  STXXL_MSG("Merging " << runs2merge << " runs");
1381 
1382  if (runs2merge > 1) // non-trivial merge
1383  {
1384  // count the number of elements in the run
1385  size_type elements_in_new_run = 0;
1386  for (unsigned_type i = nruns - runs_left; i < (nruns - runs_left + runs2merge); ++i)
1387  {
1388  elements_in_new_run += m_sruns->runs_sizes[i];
1389  }
1390  new_runs.runs_sizes[cur_out_run] = elements_in_new_run;
1391 
1392  // calculate blocks in run
1393  const unsigned_type blocks_in_new_run = (unsigned_type)div_ceil(elements_in_new_run, block_type::size);
1394 
1395  // allocate blocks for the new runs
1396  new_runs.runs[cur_out_run].resize(blocks_in_new_run);
1397  bm->new_blocks(alloc_strategy(), make_bid_iterator(new_runs.runs[cur_out_run].begin()), make_bid_iterator(new_runs.runs[cur_out_run].end()));
1398 
1399  // Construct temporary sorted_runs object as input into recursive merger.
1400  // This sorted_runs is copied a subset of the over-large set of runs, which
1401  // will be deallocated from external memory once the runs are merged.
1402  sorted_runs_type cur_runs = new sorted_runs_data_type;
1403  cur_runs->runs.resize(runs2merge);
1404  cur_runs->runs_sizes.resize(runs2merge);
1405 
1406  std::copy(m_sruns->runs.begin() + nruns - runs_left,
1407  m_sruns->runs.begin() + nruns - runs_left + runs2merge,
1408  cur_runs->runs.begin());
1409  std::copy(m_sruns->runs_sizes.begin() + nruns - runs_left,
1410  m_sruns->runs_sizes.begin() + nruns - runs_left + runs2merge,
1411  cur_runs->runs_sizes.begin());
1412 
1413  cur_runs->elements = elements_in_new_run;
1414  elements_left -= elements_in_new_run;
1415 
1416  // construct recursive merger
1417 
1419  merger(m_cmp, m_memory_to_use - memory_for_write_buffers);
1420  merger.initialize(cur_runs);
1421 
1422  { // make sure everything is being destroyed in right time
1424  new_runs.runs[cur_out_run].begin(),
1425  nwrite_buffers);
1426 
1427  size_type cnt = 0;
1428  const size_type cnt_max = cur_runs->elements;
1429 
1430  while (cnt != cnt_max)
1431  {
1432  *out = *merger;
1433  if ((cnt % block_type::size) == 0) // have to write the trigger value
1434  new_runs.runs[cur_out_run][(unsigned_type)(cnt / size_type(block_type::size))].value = *merger;
1435 
1436  ++cnt, ++out, ++merger;
1437  }
1438  assert(merger.empty());
1439 
1440  while (cnt % block_type::size)
1441  {
1442  *out = m_cmp.max_value();
1443  ++out, ++cnt;
1444  }
1445  }
1446 
1447  // deallocate merged runs by destroying cur_runs
1448  }
1449  else // runs2merge = 1 -> no merging needed
1450  {
1451  assert(cur_out_run + 1 == new_runs.runs.size());
1452 
1453  elements_left -= m_sruns->runs_sizes.back();
1454 
1455  // copy block identifiers into new sorted_runs object
1456  new_runs.runs.back() = m_sruns->runs.back();
1457  new_runs.runs_sizes.back() = m_sruns->runs_sizes.back();
1458  }
1459 
1460  runs_left -= runs2merge;
1461  ++cur_out_run;
1462  }
1463 
1464  assert(elements_left == 0);
1465 
1466  // clear bid vector of m_sruns to skip deallocation of blocks in
1467  // destructor
1468  m_sruns->runs.clear();
1469 
1470  // replaces data in referenced counted object m_sruns end while (nruns
1471  // > max_arity)
1472  std::swap(nruns, new_nruns);
1473  m_sruns->swap(new_runs);
1474  }
1475 }
1476 
1477 //! Merges sorted runs.
1478 //!
1479 //! \tparam RunsType type of the sorted runs, available as \c runs_creator::sorted_runs_type ,
1480 //! \tparam CompareType type of comparison object used for merging
1481 //! \tparam AllocStr allocation strategy used to allocate the blocks for
1482 //! storing intermediate results if several merge passes are required
1483 template <class RunsType,
1484  class CompareType = typename RunsType::element_type::cmp_type,
1485  class AllocStr = STXXL_DEFAULT_ALLOC_STRATEGY>
1486 class runs_merger : public basic_runs_merger<RunsType, CompareType, AllocStr>
1487 {
1488 protected:
1490 
1491 public:
1492  typedef RunsType sorted_runs_type;
1493  typedef typename base::value_cmp value_cmp;
1494  typedef typename base::value_cmp cmp_type;
1495  typedef typename base::block_type block_type;
1496 
1497 public:
1498  //! Creates a runs merger object.
1499  //! \param sruns input sorted runs object
1500  //! \param cmp comparison object
1501  //! \param memory_to_use amount of memory available for the merger in bytes
1503  unsigned_type memory_to_use)
1504  : base(cmp, memory_to_use)
1505  {
1506  this->initialize(sruns);
1507  }
1508 
1509  //! Creates a runs merger object without initializing a round of sorted_runs.
1510  //! \param cmp comparison object
1511  //! \param memory_to_use amount of memory available for the merger in bytes
1513  : base(cmp, memory_to_use)
1514  { }
1515 };
1516 
1517 ////////////////////////////////////////////////////////////////////////
1518 // SORT //
1519 ////////////////////////////////////////////////////////////////////////
1520 
1521 //! Produces sorted stream from input stream.
1522 //!
1523 //! \tparam Input type of the input stream
1524 //! \tparam CompareType type of comparison object used for sorting the runs
1525 //! \tparam BlockSize size of blocks used to store the runs
1526 //! \tparam AllocStr functor that defines allocation strategy for the runs
1527 //! \remark Implemented as the composition of \c runs_creator and \c runs_merger .
1528 template <
1529  class Input,
1530  class CompareType,
1531  unsigned BlockSize = STXXL_DEFAULT_BLOCK_SIZE(typename Input::value_type),
1532  class AllocStr = STXXL_DEFAULT_ALLOC_STRATEGY,
1533  class RunsCreatorType = runs_creator<Input, CompareType, BlockSize, AllocStr>
1534  >
1535 class sort : public noncopyable
1536 {
1537  typedef RunsCreatorType runs_creator_type;
1538  typedef typename runs_creator_type::sorted_runs_type sorted_runs_type;
1540 
1543 
1544 public:
1545  //! Standard stream typedef.
1546  typedef typename Input::value_type value_type;
1547 
1548  //! Creates the object.
1549  //! \param in input stream
1550  //! \param c comparator object
1551  //! \param memory_to_use memory amount that is allowed to used by the sorter in bytes
1552  sort(Input& in, CompareType c, unsigned_type memory_to_use)
1553  : creator(in, c, memory_to_use),
1554  merger(creator.result(), c, memory_to_use)
1555  {
1557  }
1558 
1559  //! Creates the object.
1560  //! \param in input stream
1561  //! \param c comparator object
1562  //! \param m_memory_to_userc memory amount that is allowed to used by the runs creator in bytes
1563  //! \param m_memory_to_use memory amount that is allowed to used by the merger in bytes
1564  sort(Input& in, CompareType c, unsigned_type m_memory_to_userc,
1565  unsigned_type m_memory_to_use)
1566  : creator(in, c, m_memory_to_userc),
1567  merger(creator.result(), c, m_memory_to_use)
1568  {
1570  }
1571 
1572  //! Standard stream method.
1573  bool empty() const
1574  {
1575  return merger.empty();
1576  }
1577 
1578  //! Standard stream method.
1579  const value_type& operator * () const
1580  {
1581  assert(!empty());
1582  return *merger;
1583  }
1584 
1585  const value_type* operator -> () const
1586  {
1587  assert(!empty());
1588  return merger.operator -> ();
1589  }
1590 
1591  //! Standard stream method.
1593  {
1594  ++merger;
1595  return *this;
1596  }
1597 };
1598 
1599 //! Computes sorted runs type from value type and block size.
1600 //!
1601 //! \tparam ValueType type of values ins sorted runs
1602 //! \tparam BlockSize size of blocks where sorted runs stored
1603 template <
1604  class ValueType,
1605  unsigned BlockSize
1606  >
1608 {
1609  typedef ValueType value_type;
1612 
1613 public:
1615 };
1616 
1617 //! \}
1618 
1619 } // namespace stream
1620 
1621 //! \addtogroup stlalgo
1622 //! \{
1623 
1624 //! Sorts range of any random access iterators externally.
1625 //!
1626 //! \param begin iterator pointing to the first element of the range
1627 //! \param end iterator pointing to the last+1 element of the range
1628 //! \param cmp comparison object
1629 //! \param MemSize memory to use for sorting (in bytes)
1630 //! \param AS allocation strategy
1631 //!
1632 //! The \c BlockSize template parameter defines the block size to use (in bytes)
1633 //! \warning Slower than External Iterator Sort
1634 template <
1635  unsigned BlockSize,
1636  class RandomAccessIterator,
1637  class CmpType,
1638  class AllocStr
1639  >
1640 void sort(RandomAccessIterator begin,
1641  RandomAccessIterator end,
1642  CmpType cmp,
1643  unsigned_type MemSize,
1644  AllocStr AS)
1645 {
1646  STXXL_UNUSED(AS);
1648  InputType Input(begin, end);
1650  sorter_type Sort(Input, cmp, MemSize);
1651  stream::materialize(Sort, begin);
1652 }
1653 
1654 //! \}
1655 
1657 
1658 #endif // !STXXL_STREAM_SORT_STREAM_HEADER
1659 // vim: et:ts=4:sw=4
sort_helper::run_cursor2_cmp< block_type, prefetcher_type, value_cmp > run_cursor2_cmp_type
Definition: sort_stream.h:949
sorted_runs_data_type::run_type run_type
Definition: sort_stream.h:64
#define STXXL_THROW_UNREACHABLE()
Throws stxxl::unreachable with &quot;Error in file [file], line [line] : this code should never be reachab...
counting_ptr< sorted_runs_data_type > sorted_runs_type
Definition: sort_stream.h:65
block_prefetcher< block_type, typename run_type::iterator > prefetcher_type
Definition: sort_stream.h:947
compat::remove_const< Integral >::type div_ceil(Integral n, Integral2 d)
Definition: utils.h:199
#define STXXL_DEFAULT_BLOCK_SIZE(type)
#define LIKELY(c)
Definition: utils.h:219
Forms sorted runs of data from a stream.
Definition: sort_stream.h:51
bool is_sorted(ForwardIterator first, ForwardIterator last)
Definition: is_sorted.h:53
unsigned sort_memory_usage_factor()
Definition: parallel.h:87
sorted_runs_data_type::run_type run_type
Definition: sort_stream.h:943
const Type & STXXL_MIN(const Type &a, const Type &b)
Definition: utils.h:146
bool check_sorted_runs(RunType **runs, unsigned_type nruns, unsigned_type m, ValueCmp cmp)
Definition: sort.h:207
runs_merger< sorted_runs_type, CompareType, AllocStr > runs_merger_type
Definition: sort_stream.h:1539
Block manager class.
Definition: block_manager.h:61
external_size_type size() const
number of items currently inserted.
Definition: sort_stream.h:676
bool m_result_computed
true after the result() method was called for the first time
Definition: sort_stream.h:451
base::value_type value_type
Definition: sort_stream.h:369
base::sorted_runs_type sorted_runs_type
Definition: sort_stream.h:372
unsigned_type fetch(block_type *blocks, unsigned_type first_idx, unsigned_type last_idx)
Fetch data from input into blocks[first_idx,last_idx).
Definition: sort_stream.h:84
long long int int64
Definition: types.h:38
void sort(ExtIterator first, ExtIterator last, StrictWeakOrdering cmp, unsigned_type M)
Sort records comparison-based, see stxxl::sort -- Sorting Comparison-Based.
Definition: sort.h:674
trigger_entry_iterator< Iterator > make_bid_iterator(Iterator iter)
Definition: adaptor.h:239
void fill_with_max_value(block_type *blocks, unsigned_type num_blocks, unsigned_type first_idx)
fill the rest of the block with max values
Definition: sort_stream.h:99
unsigned_type count_elements_less_equal(const SequenceVector &seqs, const ValueType &bound, Comparator cmp)
Definition: sort_helper.h:101
All sorted runs of a sort operation.
Definition: sorted_runs.h:37
A model of stream that retrieves the data from an input iterator. For convenience use streamify funct...
Definition: stream.h:46
#define STXXL_PARALLEL_MULTIWAY_MERGE
Definition: parallel.h:63
void deallocate()
Deallocates input buffers but not the current result.
Definition: sort_stream.h:604
loser_tree_type * m_losers
loser tree used for native merging
Definition: sort_stream.h:991
void deallocate()
Deallocate temporary structures freeing memory prior to next initialize().
Definition: sort_stream.h:1262
const value_type * m_current_end
pointer into current memory buffer: end after range of current values
Definition: sort_stream.h:979
#define STXXL_WARNMSG_RECURSIVE_SORT
Definition: sort_base.h:20
void verify_sentinel_strict_weak_ordering(StrictWeakOrdering cmp)
Definition: sort_helper.h:28
CompareType m_cmp
comparator used to sort block groups
Definition: sort_stream.h:73
bool m_result_computed
true iff result is already computed (used in &#39;result()&#39; method)
Definition: sort_stream.h:81
bool empty() const
Standard stream method.
Definition: sort_stream.h:1573
#define STXXL_DEFAULT_ALLOC_STRATEGY
Definition: block_alloc.h:258
sorted_runs_type m_result
stores the result (sorted runs) as smart pointer
Definition: sort_stream.h:77
basic_runs_merger< RunsType, CompareType, AllocStr > base
Definition: sort_stream.h:1489
runs_merger(sorted_runs_type &sruns, value_cmp cmp, unsigned_type memory_to_use)
Creates a runs merger object.
Definition: sort_stream.h:1502
void refill_or_remove_empty_sequences(SequenceVector &seqs, BufferPtrVector &buffers, Prefetcher &prefetcher)
Definition: sort_helper.h:121
run_type m_consume_seq
sequence of block needed for merging
Definition: sort_stream.h:982
unsigned_type m_memory_to_use
memory size in bytes to use
Definition: sort_stream.h:964
request_ptr * m_write_reqs
reference to write requests transporting the last accumulation buffer to disk
Definition: sort_stream.h:467
run_cursor2< block_type, prefetcher_type > run_cursor_type
Definition: sort_stream.h:948
Definition: sort_helper.h:53
Computes sorted runs type from value type and block size.
Definition: sort_stream.h:1607
#define STXXL_VERBOSE2(x)
Definition: verbose.h:107
block_type * m_blocks1
accumulation buffer of size m_m2 blocks, half the available memory size
Definition: sort_stream.h:460
int_type * m_prefetch_seq
precalculated order of blocks in which they are prefetched
Definition: sort_stream.h:985
Buffered output stream.
Definition: buf_ostream.h:29
const unsigned_type m_memsize
memory size in numberr of blocks for internal use
Definition: sort_stream.h:445
void check_sort_settings()
Definition: parallel.h:96
std::vector< sequence >::size_type seqs_size_type
Definition: sort_stream.h:953
void push(const value_type &val)
Adds new element to the sorter.
Definition: sort_stream.h:620
uint_pair & operator++()
prefix increment operator (directly manipulates the integer parts)
Definition: uint_types.h:163
unsigned_type m_memsize
memory for internal use in blocks
Definition: sort_stream.h:79
runs_merger_type merger
Definition: sort_stream.h:1542
run_type run
run object containing block ids of the run being written to disk
Definition: sort_stream.h:470
basic_runs_merger(value_cmp c, unsigned_type memory_to_use)
Creates a runs merger object.
Definition: sort_stream.h:1099
base::block_type block_type
Definition: sort_stream.h:1495
sorted_runs_type::element_type sorted_runs_data_type
Definition: sort_stream.h:941
void delete_blocks(const BIDIteratorClass &bidbegin, const BIDIteratorClass &bidend)
Deallocates blocks.
element_iterator_traits< block_type, external_size_type >::element_iterator element_iterator
Definition: sort_stream.h:67
sort_helper::trigger_entry< block_type > trigger_entry_type
Definition: sort_stream.h:62
runs_creator_type creator
Definition: sort_stream.h:1541
Input strategy for runs_creator class.
Definition: sort_stream.h:701
Block containing elements of fixed length.
Definition: typed_block.h:237
virtual void wait(bool measure_time=true)=0
Suspends calling thread until completion of the request.
Encapsulates asynchronous prefetching engine.
void clear()
Clear the internal state of the object: release all runs and reset.
Definition: sorted_runs.h:76
Merges sorted runs.
Definition: sort_stream.h:1486
virtual ~basic_runs_merger()
Destructor.
Definition: sort_stream.h:1325
runs_merger(value_cmp cmp, unsigned_type memory_to_use)
Creates a runs merger object without initializing a round of sorted_runs.
Definition: sort_stream.h:1512
sort(Input &in, CompareType c, unsigned_type memory_to_use)
Creates the object.
Definition: sort_stream.h:1552
#define _STXXL_SORT_TRIGGER_FORCE_SEQUENTIAL
Definition: parallel.h:58
choose_int_types< my_pointer_size >::int_type int_type
Definition: types.h:63
loser_tree< run_cursor_type, run_cursor2_cmp_type > loser_tree_type
Definition: sort_stream.h:950
Block identifier class.
Definition: bid.h:41
sorted_runs_data_type::size_type size_type
Definition: sort_stream.h:942
RunsCreatorType runs_creator_type
Definition: sort_stream.h:1537
basic_runs_creator(Input &input, CompareType cmp, unsigned_type memory_to_use)
Create the object.
Definition: sort_stream.h:130
out_block_type * m_buffer_block
memory buffer for merging from external streams
Definition: sort_stream.h:973
internal_size_type m_cur_el
current number of elements in the run m_blocks1
Definition: sort_stream.h:457
void initialize(const sorted_runs_type &sruns)
Initialize the runs merger object with a new round of sorted_runs.
Definition: sort_stream.h:1125
base::block_type block_type
Definition: sort_stream.h:370
unsigned_type internal_size_type
Definition: types.h:66
Forms sorted runs of data from a stream.
Definition: sort_stream.h:362
size_type size() const
Standard size method.
Definition: sort_stream.h:1276
runs_creator(Input &input, CompareType cmp, unsigned_type memory_to_use)
Creates the object.
Definition: sort_stream.h:380
sorted_runs_data_type::value_type value_type
Standard stream typedef.
Definition: sort_stream.h:957
const Type & STXXL_MAX(const Type &a, const Type &b)
Definition: utils.h:153
runs_creator(CompareType c, unsigned_type memory_to_use)
Creates the object.
Definition: sort_stream.h:763
sort(Input &in, CompareType c, unsigned_type m_memory_to_userc, unsigned_type m_memory_to_use)
Creates the object.
Definition: sort_stream.h:1564
void sort_run(block_type *run, unsigned_type elements)
Sort a specific run, contained in a sequences of blocks.
Definition: sort_stream.h:114
#define STXXL_CHECK_ORDER_IN_SORTS
Definition: sort_base.h:30
#define STXXL_BEGIN_NAMESPACE
Definition: namespace.h:16
void STXXL_UNUSED(const U &)
Definition: unused.h:22
sorted_runs_type & result()
Returns the sorted runs object.
Definition: sort_stream.h:150
void set_memory_to_use(unsigned_type memory_to_use)
Set memory amount to use for the merger in bytes.
Definition: sort_stream.h:1119
#define STXXL_VERBOSE1(x)
Definition: verbose.h:99
base::sorted_runs_data_type sorted_runs_data_type
Definition: sort_stream.h:371
size_type m_elements_remaining
items remaining in input
Definition: sort_stream.h:970
sorted_runs< trigger_entry_type, std::less< value_type > > result
Definition: sort_stream.h:1614
Produces sorted stream from input stream.
Definition: sort_stream.h:1535
unsigned_type memory_used() const
return memory size used (in bytes).
Definition: sort_stream.h:688
iterator begin()
Returns iterator pointing to the first element.
Definition: typed_block.h:92
sorted_runs_type m_result
stores the result (sorted runs) in a reference counted object
Definition: sort_stream.h:439
OutputIterator materialize(StreamAlgorithm &in, OutputIterator out)
Stores consecutively stream content to an output iterator.
Definition: stream.h:428
block_type * m_blocks2
accumulation buffer that is currently being written to disk
Definition: sort_stream.h:463
sorted_runs_data_type::block_type block_type
Definition: sort_stream.h:944
bool empty() const
Standard stream method.
Definition: sort_stream.h:1270
void new_blocks(const DiskAssignFunctor &functor, BIDIteratorClass bidbegin, BIDIteratorClass bidend, unsigned_type offset=0)
Allocates new blocks.
sorted_runs< trigger_entry_type, cmp_type > sorted_runs_data_type
Definition: sort_stream.h:63
typed_block< BlockSize, value_type > block_type
Definition: sort_stream.h:61
element_iterator_traits< BlockType, SizeType >::element_iterator make_element_iterator(BlockType *blocks, SizeType offset)
Definition: adaptor.h:646
base::value_cmp value_cmp
Definition: sort_stream.h:1493
#define STXXL_ERRMSG(x)
Definition: verbose.h:80
runs_creator_type::sorted_runs_type sorted_runs_type
Definition: sort_stream.h:1538
void wait_all(RequestIterator reqs_begin, RequestIterator reqs_end)
Collection of functions to track statuses of a number of requests.
uint64 external_size_type
Definition: types.h:67
choose_int_types< my_pointer_size >::unsigned_type unsigned_type
Definition: types.h:64
block_type * pull_block()
Pulls next unconsumed block from the consumption sequence.
const value_type * m_current_ptr
pointer into current memory buffer: this is either m_buffer_block or the small_runs vector ...
Definition: sort_stream.h:976
void compute_prefetch_schedule(const int_type *first, const int_type *last, int_type *out_first, int_type m, int_type D)
#define STXXL_MSG(x)
Definition: verbose.h:73
unsigned_type optimal_merge_factor(unsigned_type num_runs, unsigned_type max_concurrent_runs)
Definition: sort_base.h:40
Definition: sort_helper.h:37
sort_helper::trigger_entry< bid_type, value_type > trigger_entry_type
Definition: sort_stream.h:1611
std::vector< trigger_entry_type > run_type
Definition: sorted_runs.h:43
run_type::value_type trigger_entry_type
Definition: sort_stream.h:946
Input strategy for runs_creator class.
Definition: sort_stream.h:393
runs_creator(CompareType cmp, unsigned_type memory_to_use)
Creates the object.
Definition: sort_stream.h:545
std::pair< typename block_type::iterator, typename block_type::iterator > sequence
Definition: sort_stream.h:952
Input & m_input
reference to the input stream
Definition: sort_stream.h:71
basic_runs_creator< Input, CompareType, BlockSize, AllocStr > base
Definition: sort_stream.h:365
prefetcher_type * m_prefetcher
prefetcher object
Definition: sort_stream.h:988
value_cmp m_cmp
comparator object to sort runs
Definition: sort_stream.h:961
request_ptr write(const bid_type &bid, completion_handler on_cmpl=completion_handler())
Writes block to the disk(s).
Definition: typed_block.h:283
void sort_run(block_type *run, unsigned_type elements)
Sort a specific run, contained in a sequences of blocks.
Definition: sort_stream.h:489
element_iterator_traits< block_type, external_size_type >::element_iterator element_iterator
Definition: sort_stream.h:430
bool do_parallel_merge()
Definition: parallel.h:115
base::value_cmp cmp_type
Definition: sort_stream.h:1494
void fill_with_max_value(block_type *blocks, unsigned_type num_blocks, unsigned_type first_idx)
fill the rest of the block with max values
Definition: sort_stream.h:474
Input::value_type value_type
Standard stream typedef.
Definition: sort_stream.h:1546
#define STXXL_END_NAMESPACE
Definition: namespace.h:17
helper::element_iterator_generator< BlockType, SizeType, BlockType::has_only_data >::iterator element_iterator
Definition: adaptor.h:640
sorted_runs_type m_sruns
smart pointer to sorted_runs object
Definition: sort_stream.h:967